1use anyhow::{Context, Error, Result};
2use std::fs;
3use std::process::{Command, Stdio};
4use std::time::Instant;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command as TokioCommand;
7use tracing::{error, info};
8use tempfile;
9use std::io::Write;
10
11use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
12
13pub struct LocalExecutor;
15
16impl LocalExecutor {
17 pub async fn execute_script_with_realtime_output(
19 script: Option<String>,
20 global_scripts:Vec<String>,
21 step: &Step,
22 pipeline_name: &str,
23 _step_name: &str,
24 output_callback: Option<OutputCallback>,
25 variables: std::collections::HashMap<String, String>,
26 ) -> Result<ExecutionResult> {
27 let start_time = Instant::now();
28 let pipeline_name = pipeline_name.to_string();
29
30 let script_path_str = step.script.clone();
31 let script_path = std::path::Path::new(&script_path_str);
32 if !script_path.exists() {
33 return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
34 }
35
36 let mut script_content = std::fs::read_to_string(&script_path)
38 .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
39 for (key, value) in &variables {
40 let placeholder = format!("{{{{ {} }}}}", key);
41 script_content = script_content.replace(&placeholder, value);
42 }
43
44 let mut gloabl_script_content = global_scripts.iter()
45 .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
46 .fold(Ok("".to_string()), |p:Result<String>,v|{
47 if p.is_err(){
48 return p;
49 }
50
51 if v.is_err(){
52 return Err(Error::msg(format!("{:?}", v.err())));
53 }
54 let content = v.unwrap();
55
56 let mut s = p.unwrap_or_default();
57
58 s.push_str("\n");
59 s.push_str(&content);
60
61 return Ok(s.clone());
62 })?;
63
64 if let Some(script_header) = script {
65 let cont = fs::read_to_string(&script_header)
66 .map_err(|e| anyhow::anyhow!("Failed to read script header file '{}': {}", script_header, e))?;
67 gloabl_script_content.push_str("\n");
68 gloabl_script_content.push_str(&cont);
69 }
70
71 gloabl_script_content.push_str("\n");
72 gloabl_script_content.push_str(&script_content);
73
74 let script_content = gloabl_script_content.clone();
75
76 let mut temp_file = tempfile::NamedTempFile::new()
78 .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
79 temp_file.write_all(script_content.as_bytes())
80 .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
81 let temp_path = temp_file.path().to_path_buf();
82
83 info!("Executing local script: {} (with content variable substitution)", script_path_str);
84
85 if let Some(callback) = &output_callback {
87 let event = OutputEvent {
88 pipeline_name: pipeline_name.clone(),
89 server_name: "localhost".to_string(),
90 step: step.clone(),
91 script_path:script_path_str.clone(),
92 output_type: OutputType::Log,
93 content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
94 timestamp: Instant::now(),
95 variables: variables.clone(),
96 };
97 callback(event);
98 }
99
100 let timeout_seconds = step.timeout_seconds.unwrap_or(60);
102
103 let mut command = TokioCommand::new("bash");
105 command.arg(temp_path.to_str().unwrap());
106 command.current_dir(std::env::current_dir()?);
107
108 for (key, value) in &variables {
110 command.env(key, value);
111 }
112
113 command.stdout(Stdio::piped());
115 command.stderr(Stdio::piped());
116
117 let mut child = command.spawn()
119 .context("Failed to spawn local script process")?;
120
121 let stdout = child.stdout.take().expect("Failed to capture stdout");
122 let stderr = child.stderr.take().expect("Failed to capture stderr");
123
124 let step_clone = step.clone();
126 let pipeline_name1 = pipeline_name.clone();
127 let pipeline_name2 = pipeline_name.clone();
128 let variables_clone = variables.clone();
129 let variables_clone2 = variables.clone();
130 let output_callback_clone = output_callback.clone();
131 let output_callback_clone2 = output_callback.clone();
132 let script_path = script_path_str.clone();
133
134 let stdout_task = tokio::spawn(async move {
136 let reader = BufReader::new(stdout);
137 let mut lines = reader.lines();
138 let mut content = String::new();
139
140 while let Ok(Some(line)) = lines.next_line().await {
141 content.push_str(&line);
142 content.push('\n');
143
144 if let Some(callback) = &output_callback_clone {
146 let event = OutputEvent {
147 pipeline_name: pipeline_name1.to_string(),
148 server_name: "localhost".to_string(),
149 step: step_clone.clone(),
150 script_path:script_path.clone(),
151 output_type: OutputType::Stdout,
152 content: line,
153 timestamp: Instant::now(),
154 variables: variables_clone.clone(),
155 };
156 callback(event);
157 }
158 }
159 content
160 });
161
162 let script_path = script_path_str.clone();
163 let step_clone2 = step.clone();
164 let stderr_task = tokio::spawn(async move {
165 let reader = BufReader::new(stderr);
166 let mut lines = reader.lines();
167 let mut content = String::new();
168
169 while let Ok(Some(line)) = lines.next_line().await {
170 content.push_str(&line);
171 content.push('\n');
172
173 if let Some(callback) = &output_callback_clone2 {
175 let event = OutputEvent {
176 pipeline_name: pipeline_name2.to_string(),
177 server_name: "localhost".to_string(),
178 step: step_clone2.clone(),
179 script_path:script_path.clone(),
180 output_type: OutputType::Stderr,
181 content: line,
182 timestamp: Instant::now(),
183 variables: variables_clone2.clone(),
184 };
185 callback(event);
186 }
187 }
188 content
189 });
190
191 let status = tokio::time::timeout(
193 std::time::Duration::from_secs(timeout_seconds),
194 child.wait()
195 ).await;
196
197 let exit_code = match status {
198 Ok(Ok(exit_status)) => {
199 exit_status.code().unwrap_or(-1)
200 }
201 Ok(Err(e)) => {
202 error!("Local script execution failed: {}", e);
203 return Err(anyhow::anyhow!("Local script execution failed: {}", e));
204 }
205 Err(_) => {
206 let _ = child.kill().await;
208 return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
209 }
210 };
211
212 let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
214
215 let stdout_content = stdout_result.unwrap_or_default();
216 let stderr_content = stderr_result.unwrap_or_default();
217
218 let execution_time = start_time.elapsed().as_millis() as u64;
219 let success = exit_code == 0;
220
221 info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
222
223 if let Some(callback) = &output_callback {
225 let status = if success { "成功" } else { "失败" };
226 let event = OutputEvent {
227 pipeline_name: pipeline_name.to_string(),
228 server_name: "localhost".to_string(),
229 step: step.clone(),
230 script_path:script_path_str.clone(),
231 output_type: OutputType::Log,
232 content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
233 timestamp: Instant::now(),
234 variables: variables.clone(),
235 };
236 callback(event);
237 }
238
239 drop(temp_file);
241
242 Ok(ExecutionResult {
243 success,
244 stdout: stdout_content,
245 stderr: stderr_content,
246 script: script_path_str.clone(),
247 exit_code,
248 execution_time_ms: execution_time,
249 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
250 })
251 }
252
253 pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
255 let start_time = Instant::now();
256
257 let script_path = std::path::Path::new(&step.script);
259 if !script_path.exists() {
260 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
261 }
262
263 info!("Executing local script: {}", step.script);
264
265 let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
267
268 let output = Command::new("bash")
270 .arg(&step.script)
271 .current_dir(std::env::current_dir()?)
272 .output()
273 .context("Failed to execute local script")?;
274
275 let execution_time = start_time.elapsed().as_millis() as u64;
276 let exit_code = output.status.code().unwrap_or(-1);
277 let success = exit_code == 0;
278
279 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
280 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
281
282 info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
283
284 Ok(ExecutionResult {
285 success,
286 stdout,
287 stderr,
288 script: step.script.clone(),
289 exit_code,
290 execution_time_ms: execution_time,
291 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
292 })
293 }
294}