1use anyhow::{Context, Error, Result};
2use std::process::{Command, Stdio};
3use std::time::Instant;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::Command as TokioCommand;
6use tracing::{error, info};
7use tempfile;
8use std::io::Write;
9
10use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
11
12pub struct LocalExecutor;
14
15impl LocalExecutor {
16 pub async fn execute_script_with_realtime_output(
18 global_scripts:Vec<String>,
19 step: &Step,
20 pipeline_name: &str,
21 _step_name: &str,
22 output_callback: Option<OutputCallback>,
23 variables: std::collections::HashMap<String, String>,
24 ) -> Result<ExecutionResult> {
25 let start_time = Instant::now();
26 let pipeline_name = pipeline_name.to_string();
27
28 let script_path_str = step.script.clone();
29 let script_path = std::path::Path::new(&script_path_str);
30 if !script_path.exists() {
31 return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
32 }
33
34 let mut script_content = std::fs::read_to_string(&script_path)
36 .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
37 for (key, value) in &variables {
38 let placeholder = format!("{{{{ {} }}}}", key);
39 script_content = script_content.replace(&placeholder, value);
40 }
41
42 let mut gloabl_script_content = global_scripts.iter()
43 .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
44 .fold(Ok("".to_string()), |p:Result<String>,v|{
45 if p.is_err(){
46 return p;
47 }
48
49 if v.is_err(){
50 return Err(Error::msg(format!("{:?}", v.err())));
51 }
52 let content = v.unwrap();
53
54 let mut s = p.unwrap_or_default();
55
56 s.push_str("\n");
57 s.push_str(&content);
58
59 return Ok(s.clone());
60 })?;
61
62 gloabl_script_content.push_str("\n");
63 gloabl_script_content.push_str(&script_content);
64
65 let script_content = gloabl_script_content.clone();
66
67 let mut temp_file = tempfile::NamedTempFile::new()
69 .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
70 temp_file.write_all(script_content.as_bytes())
71 .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
72 let temp_path = temp_file.path().to_path_buf();
73
74 info!("Executing local script: {} (with content variable substitution)", script_path_str);
75
76 if let Some(callback) = &output_callback {
78 let event = OutputEvent {
79 pipeline_name: pipeline_name.clone(),
80 server_name: "localhost".to_string(),
81 step: step.clone(),
82 script_path:script_path_str.clone(),
83 output_type: OutputType::Log,
84 content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
85 timestamp: Instant::now(),
86 variables: variables.clone(),
87 };
88 callback(event);
89 }
90
91 let timeout_seconds = step.timeout_seconds.unwrap_or(60);
93
94 let mut command = TokioCommand::new("bash");
96 command.arg(temp_path.to_str().unwrap());
97 command.current_dir(std::env::current_dir()?);
98
99 for (key, value) in &variables {
101 command.env(key, value);
102 }
103
104 command.stdout(Stdio::piped());
106 command.stderr(Stdio::piped());
107
108 let mut child = command.spawn()
110 .context("Failed to spawn local script process")?;
111
112 let stdout = child.stdout.take().expect("Failed to capture stdout");
113 let stderr = child.stderr.take().expect("Failed to capture stderr");
114
115 let step_clone = step.clone();
117 let pipeline_name1 = pipeline_name.clone();
118 let pipeline_name2 = pipeline_name.clone();
119 let variables_clone = variables.clone();
120 let variables_clone2 = variables.clone();
121 let output_callback_clone = output_callback.clone();
122 let output_callback_clone2 = output_callback.clone();
123 let script_path = script_path_str.clone();
124
125 let stdout_task = tokio::spawn(async move {
127 let reader = BufReader::new(stdout);
128 let mut lines = reader.lines();
129 let mut content = String::new();
130
131 while let Ok(Some(line)) = lines.next_line().await {
132 content.push_str(&line);
133 content.push('\n');
134
135 if let Some(callback) = &output_callback_clone {
137 let event = OutputEvent {
138 pipeline_name: pipeline_name1.to_string(),
139 server_name: "localhost".to_string(),
140 step: step_clone.clone(),
141 script_path:script_path.clone(),
142 output_type: OutputType::Stdout,
143 content: line,
144 timestamp: Instant::now(),
145 variables: variables_clone.clone(),
146 };
147 callback(event);
148 }
149 }
150 content
151 });
152
153 let script_path = script_path_str.clone();
154 let step_clone2 = step.clone();
155 let stderr_task = tokio::spawn(async move {
156 let reader = BufReader::new(stderr);
157 let mut lines = reader.lines();
158 let mut content = String::new();
159
160 while let Ok(Some(line)) = lines.next_line().await {
161 content.push_str(&line);
162 content.push('\n');
163
164 if let Some(callback) = &output_callback_clone2 {
166 let event = OutputEvent {
167 pipeline_name: pipeline_name2.to_string(),
168 server_name: "localhost".to_string(),
169 step: step_clone2.clone(),
170 script_path:script_path.clone(),
171 output_type: OutputType::Stderr,
172 content: line,
173 timestamp: Instant::now(),
174 variables: variables_clone2.clone(),
175 };
176 callback(event);
177 }
178 }
179 content
180 });
181
182 let status = tokio::time::timeout(
184 std::time::Duration::from_secs(timeout_seconds),
185 child.wait()
186 ).await;
187
188 let exit_code = match status {
189 Ok(Ok(exit_status)) => {
190 exit_status.code().unwrap_or(-1)
191 }
192 Ok(Err(e)) => {
193 error!("Local script execution failed: {}", e);
194 return Err(anyhow::anyhow!("Local script execution failed: {}", e));
195 }
196 Err(_) => {
197 let _ = child.kill().await;
199 return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
200 }
201 };
202
203 let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
205
206 let stdout_content = stdout_result.unwrap_or_default();
207 let stderr_content = stderr_result.unwrap_or_default();
208
209 let execution_time = start_time.elapsed().as_millis() as u64;
210 let success = exit_code == 0;
211
212 info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
213
214 if let Some(callback) = &output_callback {
216 let status = if success { "成功" } else { "失败" };
217 let event = OutputEvent {
218 pipeline_name: pipeline_name.to_string(),
219 server_name: "localhost".to_string(),
220 step: step.clone(),
221 script_path:script_path_str.clone(),
222 output_type: OutputType::Log,
223 content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
224 timestamp: Instant::now(),
225 variables: variables.clone(),
226 };
227 callback(event);
228 }
229
230 drop(temp_file);
232
233 Ok(ExecutionResult {
234 success,
235 stdout: stdout_content,
236 stderr: stderr_content,
237 script: script_path_str.clone(),
238 exit_code,
239 execution_time_ms: execution_time,
240 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
241 })
242 }
243
244 pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
246 let start_time = Instant::now();
247
248 let script_path = std::path::Path::new(&step.script);
250 if !script_path.exists() {
251 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
252 }
253
254 info!("Executing local script: {}", step.script);
255
256 let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
258
259 let output = Command::new("bash")
261 .arg(&step.script)
262 .current_dir(std::env::current_dir()?)
263 .output()
264 .context("Failed to execute local script")?;
265
266 let execution_time = start_time.elapsed().as_millis() as u64;
267 let exit_code = output.status.code().unwrap_or(-1);
268 let success = exit_code == 0;
269
270 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
271 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
272
273 info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
274
275 Ok(ExecutionResult {
276 success,
277 stdout,
278 stderr,
279 script: step.script.clone(),
280 exit_code,
281 execution_time_ms: execution_time,
282 error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
283 })
284 }
285}