1use anyhow::{Context, Error, Result};
2use std::process::{Command, Stdio};
3use std::time::Instant;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::Command as TokioCommand;
6use tracing::{error, info};
7use tempfile;
8use std::io::Write;
9
10use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
11
12pub struct LocalExecutor;
14
15impl LocalExecutor {
16 pub async fn execute_script_with_realtime_output(
18 global_scripts:Vec<String>,
19 step: &Step,
20 pipeline_name: &str,
21 _step_name: &str,
22 output_callback: Option<OutputCallback>,
23 variables: std::collections::HashMap<String, String>,
24 ) -> Result<ExecutionResult> {
25 let start_time = Instant::now();
26 let pipeline_name = pipeline_name.to_string();
27
28 let script_path_str = step.script.clone();
29 let script_path = std::path::Path::new(&script_path_str);
30 if !script_path.exists() {
31 return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
32 }
33
34 let mut script_content = std::fs::read_to_string(&script_path)
36 .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
37 for (key, value) in &variables {
38 let placeholder = format!("{{{{ {} }}}}", key);
39 script_content = script_content.replace(&placeholder, value);
40 }
41
42 let mut gloabl_script_content = global_scripts.iter()
43 .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
44 .fold(Ok("".to_string()), |p:Result<String>,v|{
45 if p.is_err(){
46 return p;
47 }
48
49 if v.is_err(){
50 return Err(Error::msg(format!("{:?}", v.err())));
51 }
52 let content = v.unwrap();
53
54 let mut s = p.unwrap_or_default();
55
56 s.push_str("\n");
57 s.push_str(&content);
58
59 return Ok(s.clone());
60 })?;
61
62 gloabl_script_content.push_str("\n");
63 gloabl_script_content.push_str(&script_content);
64
65 let script_content = gloabl_script_content.clone();
66
67 let mut temp_file = tempfile::NamedTempFile::new()
69 .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
70 temp_file.write_all(script_content.as_bytes())
71 .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
72 let temp_path = temp_file.path().to_path_buf();
73
74 info!("Executing local script: {} (with content variable substitution)", script_path_str);
75
76 if let Some(callback) = &output_callback {
78 let event = OutputEvent {
79 pipeline_name: pipeline_name.clone(),
80 server_name: "localhost".to_string(),
81 step: step.clone(),
82 output_type: OutputType::Log,
83 content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
84 timestamp: Instant::now(),
85 variables: variables.clone(),
86 };
87 callback(event);
88 }
89
90 let timeout_seconds = step.timeout_seconds.unwrap_or(60);
92
93 let mut command = TokioCommand::new("bash");
95 command.arg(temp_path.to_str().unwrap());
96 command.current_dir(std::env::current_dir()?);
97
98 for (key, value) in &variables {
100 command.env(key, value);
101 }
102
103 command.stdout(Stdio::piped());
105 command.stderr(Stdio::piped());
106
107 let mut child = command.spawn()
109 .context("Failed to spawn local script process")?;
110
111 let stdout = child.stdout.take().expect("Failed to capture stdout");
112 let stderr = child.stderr.take().expect("Failed to capture stderr");
113
114 let step_clone = step.clone();
116 let pipeline_name1 = pipeline_name.clone();
117 let pipeline_name2 = pipeline_name.clone();
118 let variables_clone = variables.clone();
119 let variables_clone2 = variables.clone();
120 let output_callback_clone = output_callback.clone();
121 let output_callback_clone2 = output_callback.clone();
122
123 let stdout_task = tokio::spawn(async move {
125 let reader = BufReader::new(stdout);
126 let mut lines = reader.lines();
127 let mut content = String::new();
128
129 while let Ok(Some(line)) = lines.next_line().await {
130 content.push_str(&line);
131 content.push('\n');
132
133 if let Some(callback) = &output_callback_clone {
135 let event = OutputEvent {
136 pipeline_name: pipeline_name1.to_string(),
137 server_name: "localhost".to_string(),
138 step: step_clone.clone(),
139 output_type: OutputType::Stdout,
140 content: line,
141 timestamp: Instant::now(),
142 variables: variables_clone.clone(),
143 };
144 callback(event);
145 }
146 }
147 content
148 });
149
150 let step_clone2 = step.clone();
151 let stderr_task = tokio::spawn(async move {
152 let reader = BufReader::new(stderr);
153 let mut lines = reader.lines();
154 let mut content = String::new();
155
156 while let Ok(Some(line)) = lines.next_line().await {
157 content.push_str(&line);
158 content.push('\n');
159
160 if let Some(callback) = &output_callback_clone2 {
162 let event = OutputEvent {
163 pipeline_name: pipeline_name2.to_string(),
164 server_name: "localhost".to_string(),
165 step: step_clone2.clone(),
166 output_type: OutputType::Stderr,
167 content: line,
168 timestamp: Instant::now(),
169 variables: variables_clone2.clone(),
170 };
171 callback(event);
172 }
173 }
174 content
175 });
176
177 let status = tokio::time::timeout(
179 std::time::Duration::from_secs(timeout_seconds),
180 child.wait()
181 ).await;
182
183 let exit_code = match status {
184 Ok(Ok(exit_status)) => {
185 exit_status.code().unwrap_or(-1)
186 }
187 Ok(Err(e)) => {
188 error!("Local script execution failed: {}", e);
189 return Err(anyhow::anyhow!("Local script execution failed: {}", e));
190 }
191 Err(_) => {
192 let _ = child.kill().await;
194 return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
195 }
196 };
197
198 let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
200
201 let stdout_content = stdout_result.unwrap_or_default();
202 let stderr_content = stderr_result.unwrap_or_default();
203
204 let execution_time = start_time.elapsed().as_millis() as u64;
205 let success = exit_code == 0;
206
207 info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
208
209 if let Some(callback) = &output_callback {
211 let status = if success { "成功" } else { "失败" };
212 let event = OutputEvent {
213 pipeline_name: pipeline_name.to_string(),
214 server_name: "localhost".to_string(),
215 step: step.clone(),
216 output_type: OutputType::Log,
217 content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
218 timestamp: Instant::now(),
219 variables: variables.clone(),
220 };
221 callback(event);
222 }
223
224 drop(temp_file);
226
227 Ok(ExecutionResult {
228 success,
229 stdout: stdout_content,
230 stderr: stderr_content,
231 script: script_path_str.clone(),
232 exit_code,
233 execution_time_ms: execution_time,
234 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
235 })
236 }
237
238 pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
240 let start_time = Instant::now();
241
242 let script_path = std::path::Path::new(&step.script);
244 if !script_path.exists() {
245 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
246 }
247
248 info!("Executing local script: {}", step.script);
249
250 let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
252
253 let output = Command::new("bash")
255 .arg(&step.script)
256 .current_dir(std::env::current_dir()?)
257 .output()
258 .context("Failed to execute local script")?;
259
260 let execution_time = start_time.elapsed().as_millis() as u64;
261 let exit_code = output.status.code().unwrap_or(-1);
262 let success = exit_code == 0;
263
264 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
265 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
266
267 info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
268
269 Ok(ExecutionResult {
270 success,
271 stdout,
272 stderr,
273 script: step.script.clone(),
274 exit_code,
275 execution_time_ms: execution_time,
276 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
277 })
278 }
279}