1use anyhow::{Context, Result};
2use std::process::{Command, Stdio};
3use std::time::Instant;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::Command as TokioCommand;
6use tracing::{error, info};
7use tempfile;
8use std::io::Write;
9
10use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
11
12pub struct LocalExecutor;
14
15impl LocalExecutor {
16 pub async fn execute_script_with_realtime_output(
18 step: &Step,
19 pipeline_name: &str,
20 _step_name: &str,
21 output_callback: Option<OutputCallback>,
22 variables: std::collections::HashMap<String, String>,
23 ) -> Result<ExecutionResult> {
24 let start_time = Instant::now();
25 let pipeline_name = pipeline_name.to_string();
26
27 let mut script_path_str = step.script.clone();
29 for (key, value) in &variables {
30 let placeholder = format!("{{{{ {} }}}}", key);
31 script_path_str = script_path_str.replace(&placeholder, value);
32 }
33 let script_path = std::path::Path::new(&script_path_str);
34 if !script_path.exists() {
35 return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
36 }
37
38 let mut script_content = std::fs::read_to_string(&script_path)
40 .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
41 for (key, value) in &variables {
42 let placeholder = format!("{{{{ {} }}}}", key);
43 script_content = script_content.replace(&placeholder, value);
44 }
45
46 let mut temp_file = tempfile::NamedTempFile::new()
48 .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
49 temp_file.write_all(script_content.as_bytes())
50 .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
51 let temp_path = temp_file.path().to_path_buf();
52
53 info!("Executing local script: {} (with content variable substitution)", script_path_str);
54
55 if let Some(callback) = &output_callback {
57 let event = OutputEvent {
58 pipeline_name: pipeline_name.clone(),
59 server_name: "localhost".to_string(),
60 step: step.clone(),
61 output_type: OutputType::Log,
62 content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
63 timestamp: Instant::now(),
64 variables: variables.clone(),
65 };
66 callback(event);
67 }
68
69 let timeout_seconds = step.timeout_seconds.unwrap_or(60);
71
72 let mut command = TokioCommand::new("bash");
74 command.arg(temp_path.to_str().unwrap());
75 command.current_dir(std::env::current_dir()?);
76
77 for (key, value) in &variables {
79 command.env(key, value);
80 }
81
82 command.stdout(Stdio::piped());
84 command.stderr(Stdio::piped());
85
86 let mut child = command.spawn()
88 .context("Failed to spawn local script process")?;
89
90 let stdout = child.stdout.take().expect("Failed to capture stdout");
91 let stderr = child.stderr.take().expect("Failed to capture stderr");
92
93 let step_clone = step.clone();
95 let pipeline_name1 = pipeline_name.clone();
96 let pipeline_name2 = pipeline_name.clone();
97 let variables_clone = variables.clone();
98 let variables_clone2 = variables.clone();
99 let output_callback_clone = output_callback.clone();
100 let output_callback_clone2 = output_callback.clone();
101
102 let stdout_task = tokio::spawn(async move {
104 let reader = BufReader::new(stdout);
105 let mut lines = reader.lines();
106 let mut content = String::new();
107
108 while let Ok(Some(line)) = lines.next_line().await {
109 content.push_str(&line);
110 content.push('\n');
111
112 if let Some(callback) = &output_callback_clone {
114 let event = OutputEvent {
115 pipeline_name: pipeline_name1.to_string(),
116 server_name: "localhost".to_string(),
117 step: step_clone.clone(),
118 output_type: OutputType::Stdout,
119 content: line,
120 timestamp: Instant::now(),
121 variables: variables_clone.clone(),
122 };
123 callback(event);
124 }
125 }
126 content
127 });
128
129 let step_clone2 = step.clone();
130 let stderr_task = tokio::spawn(async move {
131 let reader = BufReader::new(stderr);
132 let mut lines = reader.lines();
133 let mut content = String::new();
134
135 while let Ok(Some(line)) = lines.next_line().await {
136 content.push_str(&line);
137 content.push('\n');
138
139 if let Some(callback) = &output_callback_clone2 {
141 let event = OutputEvent {
142 pipeline_name: pipeline_name2.to_string(),
143 server_name: "localhost".to_string(),
144 step: step_clone2.clone(),
145 output_type: OutputType::Stderr,
146 content: line,
147 timestamp: Instant::now(),
148 variables: variables_clone2.clone(),
149 };
150 callback(event);
151 }
152 }
153 content
154 });
155
156 let status = tokio::time::timeout(
158 std::time::Duration::from_secs(timeout_seconds),
159 child.wait()
160 ).await;
161
162 let exit_code = match status {
163 Ok(Ok(exit_status)) => {
164 exit_status.code().unwrap_or(-1)
165 }
166 Ok(Err(e)) => {
167 error!("Local script execution failed: {}", e);
168 return Err(anyhow::anyhow!("Local script execution failed: {}", e));
169 }
170 Err(_) => {
171 let _ = child.kill().await;
173 return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
174 }
175 };
176
177 let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
179
180 let stdout_content = stdout_result.unwrap_or_default();
181 let stderr_content = stderr_result.unwrap_or_default();
182
183 let execution_time = start_time.elapsed().as_millis() as u64;
184 let success = exit_code == 0;
185
186 info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
187
188 if let Some(callback) = &output_callback {
190 let status = if success { "成功" } else { "失败" };
191 let event = OutputEvent {
192 pipeline_name: pipeline_name.to_string(),
193 server_name: "localhost".to_string(),
194 step: step.clone(),
195 output_type: OutputType::Log,
196 content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
197 timestamp: Instant::now(),
198 variables: variables.clone(),
199 };
200 callback(event);
201 }
202
203 drop(temp_file);
205
206 Ok(ExecutionResult {
207 success,
208 stdout: stdout_content,
209 stderr: stderr_content,
210 script: script_path_str.clone(),
211 exit_code,
212 execution_time_ms: execution_time,
213 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
214 })
215 }
216
217 pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
219 let start_time = Instant::now();
220
221 let script_path = std::path::Path::new(&step.script);
223 if !script_path.exists() {
224 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
225 }
226
227 info!("Executing local script: {}", step.script);
228
229 let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
231
232 let output = Command::new("bash")
234 .arg(&step.script)
235 .current_dir(std::env::current_dir()?)
236 .output()
237 .context("Failed to execute local script")?;
238
239 let execution_time = start_time.elapsed().as_millis() as u64;
240 let exit_code = output.status.code().unwrap_or(-1);
241 let success = exit_code == 0;
242
243 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
244 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
245
246 info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
247
248 Ok(ExecutionResult {
249 success,
250 stdout,
251 stderr,
252 script: step.script.clone(),
253 exit_code,
254 execution_time_ms: execution_time,
255 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
256 })
257 }
258}