use anyhow::{Context, Error, Result};
use std::fs;
use std::process::{Command, Stdio};
use std::time::Instant;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
use tracing::{error, info};
use tempfile;
use std::io::Write;
use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
pub struct LocalExecutor;
impl LocalExecutor {
pub async fn execute_script_with_realtime_output(
script: Option<String>,
global_scripts:Vec<String>,
step: &Step,
pipeline_name: &str,
_step_name: &str,
output_callback: Option<OutputCallback>,
variables: std::collections::HashMap<String, String>,
) -> Result<ExecutionResult> {
let start_time = Instant::now();
let pipeline_name = pipeline_name.to_string();
let script_path_str = step.script.clone();
let script_path = std::path::Path::new(&script_path_str);
if !script_path.exists() {
return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
}
let mut script_content = std::fs::read_to_string(&script_path)
.map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
for (key, value) in &variables {
let placeholder = format!("{{{{ {} }}}}", key);
script_content = script_content.replace(&placeholder, value);
}
let mut gloabl_script_content = global_scripts.iter()
.map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
.fold(Ok("".to_string()), |p:Result<String>,v|{
if p.is_err(){
return p;
}
if v.is_err(){
return Err(Error::msg(format!("{:?}", v.err())));
}
let content = v.unwrap();
let mut s = p.unwrap_or_default();
s.push_str("\n");
s.push_str(&content);
return Ok(s.clone());
})?;
if let Some(script_header) = script {
let cont = fs::read_to_string(&script_header)
.map_err(|e| anyhow::anyhow!("Failed to read script header file '{}': {}", script_header, e))?;
gloabl_script_content.push_str("\n");
gloabl_script_content.push_str(&cont);
}
gloabl_script_content.push_str("\n");
gloabl_script_content.push_str(&script_content);
let script_content = gloabl_script_content.clone();
let mut temp_file = tempfile::NamedTempFile::new()
.map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
temp_file.write_all(script_content.as_bytes())
.map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
let temp_path = temp_file.path().to_path_buf();
info!("Executing local script: {} (with content variable substitution)", script_path_str);
if let Some(callback) = &output_callback {
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
server_name: "localhost".to_string(),
step: step.clone(),
script_path:script_path_str.clone(),
output_type: OutputType::Log,
content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
timestamp: Instant::now(),
variables: variables.clone(),
};
callback(event);
}
let timeout_seconds = step.timeout_seconds.unwrap_or(60);
let mut command = TokioCommand::new("bash");
command.arg(temp_path.to_str().unwrap());
command.current_dir(std::env::current_dir()?);
for (key, value) in &variables {
command.env(key, value);
}
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
let mut child = command.spawn()
.context("Failed to spawn local script process")?;
let stdout = child.stdout.take().expect("Failed to capture stdout");
let stderr = child.stderr.take().expect("Failed to capture stderr");
let step_clone = step.clone();
let pipeline_name1 = pipeline_name.clone();
let pipeline_name2 = pipeline_name.clone();
let variables_clone = variables.clone();
let variables_clone2 = variables.clone();
let output_callback_clone = output_callback.clone();
let output_callback_clone2 = output_callback.clone();
let script_path = script_path_str.clone();
let stdout_task = tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
let mut content = String::new();
while let Ok(Some(line)) = lines.next_line().await {
content.push_str(&line);
content.push('\n');
if let Some(callback) = &output_callback_clone {
let event = OutputEvent {
pipeline_name: pipeline_name1.to_string(),
server_name: "localhost".to_string(),
step: step_clone.clone(),
script_path:script_path.clone(),
output_type: OutputType::Stdout,
content: line,
timestamp: Instant::now(),
variables: variables_clone.clone(),
};
callback(event);
}
}
content
});
let script_path = script_path_str.clone();
let step_clone2 = step.clone();
let stderr_task = tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
let mut content = String::new();
while let Ok(Some(line)) = lines.next_line().await {
content.push_str(&line);
content.push('\n');
if let Some(callback) = &output_callback_clone2 {
let event = OutputEvent {
pipeline_name: pipeline_name2.to_string(),
server_name: "localhost".to_string(),
step: step_clone2.clone(),
script_path:script_path.clone(),
output_type: OutputType::Stderr,
content: line,
timestamp: Instant::now(),
variables: variables_clone2.clone(),
};
callback(event);
}
}
content
});
let status = tokio::time::timeout(
std::time::Duration::from_secs(timeout_seconds),
child.wait()
).await;
let exit_code = match status {
Ok(Ok(exit_status)) => {
exit_status.code().unwrap_or(-1)
}
Ok(Err(e)) => {
error!("Local script execution failed: {}", e);
return Err(anyhow::anyhow!("Local script execution failed: {}", e));
}
Err(_) => {
let _ = child.kill().await;
return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
}
};
let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
let stdout_content = stdout_result.unwrap_or_default();
let stderr_content = stderr_result.unwrap_or_default();
let execution_time = start_time.elapsed().as_millis() as u64;
let success = exit_code == 0;
info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
if let Some(callback) = &output_callback {
let status = if success { "成功" } else { "失败" };
let event = OutputEvent {
pipeline_name: pipeline_name.to_string(),
server_name: "localhost".to_string(),
step: step.clone(),
script_path:script_path_str.clone(),
output_type: OutputType::Log,
content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
timestamp: Instant::now(),
variables: variables.clone(),
};
callback(event);
}
drop(temp_file);
Ok(ExecutionResult {
success,
stdout: stdout_content,
stderr: stderr_content,
script: script_path_str.clone(),
exit_code,
execution_time_ms: execution_time,
error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
})
}
pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
let start_time = Instant::now();
let script_path = std::path::Path::new(&step.script);
if !script_path.exists() {
return Err(anyhow::anyhow!("Script '{}' not found", step.script));
}
info!("Executing local script: {}", step.script);
let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
let output = Command::new("bash")
.arg(&step.script)
.current_dir(std::env::current_dir()?)
.output()
.context("Failed to execute local script")?;
let execution_time = start_time.elapsed().as_millis() as u64;
let exit_code = output.status.code().unwrap_or(-1);
let success = exit_code == 0;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
Ok(ExecutionResult {
success,
stdout,
stderr,
script: step.script.clone(),
exit_code,
execution_time_ms: execution_time,
error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
})
}
}