1use anyhow::{Context, Error, Result};
2use std::fs;
3use std::io::Write;
4use std::process::{Command, Stdio};
5use std::time::Instant;
6use tempfile;
7use tokio::io::{AsyncBufReadExt, BufReader};
8use tokio::process::Command as TokioCommand;
9use tracing::{error, info};
10
11use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
12
13pub struct LocalExecutor;
15
16impl LocalExecutor {
17 pub async fn execute_script_with_realtime_output(
19 script: Option<String>,
20 global_scripts: Vec<String>,
21 step: &Step,
22 pipeline_name: &str,
23 _step_name: &str,
24 output_callback: Option<OutputCallback>,
25 variables: std::collections::HashMap<String, String>,
26 ) -> Result<ExecutionResult> {
27 let start_time = Instant::now();
28 let pipeline_name = pipeline_name.to_string();
29
30 let script_path_str = step.script.clone();
31 let script_path = std::path::Path::new(&script_path_str);
32 if !script_path.exists() {
33 return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
34 }
35
36 let mut script_content = std::fs::read_to_string(&script_path).map_err(|e| {
38 anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e)
39 })?;
40
41 let mut gloabl_script_content = global_scripts
42 .iter()
43 .map(|v| std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
44 .fold(Ok("".to_string()), |p: Result<String>, v| {
45 if p.is_err() {
46 return p;
47 }
48
49 if v.is_err() {
50 return Err(Error::msg(format!("{:?}", v.err())));
51 }
52 let content = v.unwrap();
53
54 let mut s = p.unwrap_or_default();
55
56 s.push_str("\n");
57 s.push_str(&content);
58
59 return Ok(s.clone());
60 })?;
61
62 if let Some(script_header) = script {
63 let cont = fs::read_to_string(&script_header).map_err(|e| {
64 anyhow::anyhow!(
65 "Failed to read script header file '{}': {}",
66 script_header,
67 e
68 )
69 })?;
70 gloabl_script_content.push_str("\n");
71 gloabl_script_content.push_str(&cont);
72 }
73
74 gloabl_script_content.push_str("\n");
75 gloabl_script_content.push_str(&script_content);
76
77 let mut script_content = gloabl_script_content.clone();
78
79 for (key, value) in &variables {
80 let placeholder = format!("{{{{ {} }}}}", key);
81 script_content = script_content.replace(&placeholder, value);
82 }
83
84 let mut temp_file = tempfile::NamedTempFile::new()
86 .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
87 temp_file
88 .write_all(script_content.as_bytes())
89 .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
90 let temp_path = temp_file.path().to_path_buf();
91
92 info!(
93 "Executing local script: {} (with content variable substitution)",
94 script_path_str
95 );
96
97 if let Some(callback) = &output_callback {
99 let event = OutputEvent {
100 pipeline_name: pipeline_name.clone(),
101 server_name: "localhost".to_string(),
102 step: step.clone(),
103 script_path: script_path_str.clone(),
104 output_type: OutputType::Log,
105 content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
106 timestamp: Instant::now(),
107 variables: variables.clone(),
108 };
109 callback(event);
110 }
111
112 let timeout_seconds = step.timeout_seconds.unwrap_or(60);
114
115 let mut command = TokioCommand::new("bash");
117 command.arg(temp_path.to_str().unwrap());
118 command.current_dir(std::env::current_dir()?);
119
120 for (key, value) in &variables {
122 command.env(key, value);
123 }
124
125 command.stdout(Stdio::piped());
127 command.stderr(Stdio::piped());
128
129 let mut child = command
131 .spawn()
132 .context("Failed to spawn local script process")?;
133
134 let stdout = child.stdout.take().expect("Failed to capture stdout");
135 let stderr = child.stderr.take().expect("Failed to capture stderr");
136
137 let step_clone = step.clone();
139 let pipeline_name1 = pipeline_name.clone();
140 let pipeline_name2 = pipeline_name.clone();
141 let variables_clone = variables.clone();
142 let variables_clone2 = variables.clone();
143 let output_callback_clone = output_callback.clone();
144 let output_callback_clone2 = output_callback.clone();
145 let script_path = script_path_str.clone();
146
147 let stdout_task = tokio::spawn(async move {
149 let reader = BufReader::new(stdout);
150 let mut lines = reader.lines();
151 let mut content = String::new();
152
153 while let Ok(Some(line)) = lines.next_line().await {
154 content.push_str(&line);
155 content.push('\n');
156
157 if let Some(callback) = &output_callback_clone {
159 let event = OutputEvent {
160 pipeline_name: pipeline_name1.to_string(),
161 server_name: "localhost".to_string(),
162 step: step_clone.clone(),
163 script_path: script_path.clone(),
164 output_type: OutputType::Stdout,
165 content: line,
166 timestamp: Instant::now(),
167 variables: variables_clone.clone(),
168 };
169 callback(event);
170 }
171 }
172 content
173 });
174
175 let script_path = script_path_str.clone();
176 let step_clone2 = step.clone();
177 let stderr_task = tokio::spawn(async move {
178 let reader = BufReader::new(stderr);
179 let mut lines = reader.lines();
180 let mut content = String::new();
181
182 while let Ok(Some(line)) = lines.next_line().await {
183 content.push_str(&line);
184 content.push('\n');
185
186 if let Some(callback) = &output_callback_clone2 {
188 let event = OutputEvent {
189 pipeline_name: pipeline_name2.to_string(),
190 server_name: "localhost".to_string(),
191 step: step_clone2.clone(),
192 script_path: script_path.clone(),
193 output_type: OutputType::Stderr,
194 content: line,
195 timestamp: Instant::now(),
196 variables: variables_clone2.clone(),
197 };
198 callback(event);
199 }
200 }
201 content
202 });
203
204 let status = tokio::time::timeout(
206 std::time::Duration::from_secs(timeout_seconds),
207 child.wait(),
208 )
209 .await;
210
211 let exit_code = match status {
212 Ok(Ok(exit_status)) => exit_status.code().unwrap_or(-1),
213 Ok(Err(e)) => {
214 error!("Local script execution failed: {}", e);
215 return Err(anyhow::anyhow!("Local script execution failed: {}", e));
216 }
217 Err(_) => {
218 let _ = child.kill().await;
220 return Err(anyhow::anyhow!(
221 "Local script execution timed out after {} seconds",
222 timeout_seconds
223 ));
224 }
225 };
226
227 let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
229
230 let stdout_content = stdout_result.unwrap_or_default();
231 let stderr_content = stderr_result.unwrap_or_default();
232
233 let execution_time = start_time.elapsed().as_millis() as u64;
234 let success = exit_code == 0;
235
236 info!(
237 "Local script '{}' completed with exit code: {}",
238 script_path_str, exit_code
239 );
240
241 if let Some(callback) = &output_callback {
243 let status = if success { "成功" } else { "失败" };
244 let event = OutputEvent {
245 pipeline_name: pipeline_name.to_string(),
246 server_name: "localhost".to_string(),
247 step: step.clone(),
248 script_path: script_path_str.clone(),
249 output_type: OutputType::Log,
250 content: format!(
251 "本地脚本执行完成: {} ({}) - 耗时: {}ms",
252 script_path_str, status, execution_time
253 ),
254 timestamp: Instant::now(),
255 variables: variables.clone(),
256 };
257 callback(event);
258 }
259
260 drop(temp_file);
262
263 Ok(ExecutionResult {
264 success,
265 stdout: stdout_content,
266 stderr: stderr_content,
267 script: script_path_str.clone(),
268 exit_code,
269 execution_time_ms: execution_time,
270 error_message: if success {
271 None
272 } else {
273 Some(format!("Script exited with code {}", exit_code))
274 },
275 })
276 }
277
278 pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
280 let start_time = Instant::now();
281
282 let script_path = std::path::Path::new(&step.script);
284 if !script_path.exists() {
285 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
286 }
287
288 info!("Executing local script: {}", step.script);
289
290 let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
292
293 let output = Command::new("bash")
295 .arg(&step.script)
296 .current_dir(std::env::current_dir()?)
297 .output()
298 .context("Failed to execute local script")?;
299
300 let execution_time = start_time.elapsed().as_millis() as u64;
301 let exit_code = output.status.code().unwrap_or(-1);
302 let success = exit_code == 0;
303
304 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
305 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
306
307 info!(
308 "Local script '{}' completed with exit code: {}",
309 step.script, exit_code
310 );
311
312 Ok(ExecutionResult {
313 success,
314 stdout,
315 stderr,
316 script: step.script.clone(),
317 exit_code,
318 execution_time_ms: execution_time,
319 error_message: if success {
320 None
321 } else {
322 Some(format!("Script exited with code {}", exit_code))
323 },
324 })
325 }
326}