net_shell/ssh/
local.rs

1use anyhow::{Context, Error, Result};
2use std::fs;
3use std::io::Write;
4use std::process::{Command, Stdio};
5use std::time::Instant;
6use tempfile;
7use tokio::io::{AsyncBufReadExt, BufReader};
8use tokio::process::Command as TokioCommand;
9use tracing::{error, info};
10
11use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
12
13/// 本地脚本执行器
14pub struct LocalExecutor;
15
16impl LocalExecutor {
17    /// 在本地执行shell脚本(支持实时输出)
18    pub async fn execute_script_with_realtime_output(
19        script: Option<String>,
20        global_scripts: Vec<String>,
21        step: &Step,
22        pipeline_name: &str,
23        _step_name: &str,
24        output_callback: Option<OutputCallback>,
25        variables: std::collections::HashMap<String, String>,
26    ) -> Result<ExecutionResult> {
27        let start_time = Instant::now();
28        let pipeline_name = pipeline_name.to_string();
29
30        let script_path_str = step.script.clone();
31        let script_path = std::path::Path::new(&script_path_str);
32        if !script_path.exists() {
33            return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
34        }
35
36        // 读取脚本内容并进行变量替换
37        let mut script_content = std::fs::read_to_string(&script_path).map_err(|e| {
38            anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e)
39        })?;
40
41        let mut gloabl_script_content = global_scripts
42            .iter()
43            .map(|v| std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
44            .fold(Ok("".to_string()), |p: Result<String>, v| {
45                if p.is_err() {
46                    return p;
47                }
48
49                if v.is_err() {
50                    return Err(Error::msg(format!("{:?}", v.err())));
51                }
52                let content = v.unwrap();
53
54                let mut s = p.unwrap_or_default();
55
56                s.push_str("\n");
57                s.push_str(&content);
58
59                return Ok(s.clone());
60            })?;
61
62        if let Some(script_header) = script {
63            let cont = fs::read_to_string(&script_header).map_err(|e| {
64                anyhow::anyhow!(
65                    "Failed to read script header file '{}': {}",
66                    script_header,
67                    e
68                )
69            })?;
70            gloabl_script_content.push_str("\n");
71            gloabl_script_content.push_str(&cont);
72        }
73
74        gloabl_script_content.push_str("\n");
75        gloabl_script_content.push_str(&script_content);
76
77        let mut script_content = gloabl_script_content.clone();
78
79        for (key, value) in &variables {
80            let placeholder = format!("{{{{ {} }}}}", key);
81            script_content = script_content.replace(&placeholder, value);
82        }
83
84        // 写入临时文件
85        let mut temp_file = tempfile::NamedTempFile::new()
86            .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
87        temp_file
88            .write_all(script_content.as_bytes())
89            .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
90        let temp_path = temp_file.path().to_path_buf();
91
92        info!(
93            "Executing local script: {} (with content variable substitution)",
94            script_path_str
95        );
96
97        // 发送开始执行的日志
98        if let Some(callback) = &output_callback {
99            let event = OutputEvent {
100                pipeline_name: pipeline_name.clone(),
101                server_name: "localhost".to_string(),
102                step: step.clone(),
103                script_path: script_path_str.clone(),
104                output_type: OutputType::Log,
105                content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
106                timestamp: Instant::now(),
107                variables: variables.clone(),
108            };
109            callback(event);
110        }
111
112        // 设置超时
113        let timeout_seconds = step.timeout_seconds.unwrap_or(60);
114
115        // 创建异步命令
116        let mut command = TokioCommand::new("bash");
117        command.arg(temp_path.to_str().unwrap());
118        command.current_dir(std::env::current_dir()?);
119
120        // 设置环境变量
121        for (key, value) in &variables {
122            command.env(key, value);
123        }
124
125        // 设置标准输出和错误输出
126        command.stdout(Stdio::piped());
127        command.stderr(Stdio::piped());
128
129        // 执行命令
130        let mut child = command
131            .spawn()
132            .context("Failed to spawn local script process")?;
133
134        let stdout = child.stdout.take().expect("Failed to capture stdout");
135        let stderr = child.stderr.take().expect("Failed to capture stderr");
136
137        // 克隆必要的数据用于异步任务
138        let step_clone = step.clone();
139        let pipeline_name1 = pipeline_name.clone();
140        let pipeline_name2 = pipeline_name.clone();
141        let variables_clone = variables.clone();
142        let variables_clone2 = variables.clone();
143        let output_callback_clone = output_callback.clone();
144        let output_callback_clone2 = output_callback.clone();
145        let script_path = script_path_str.clone();
146
147        // 创建输出读取任务
148        let stdout_task = tokio::spawn(async move {
149            let reader = BufReader::new(stdout);
150            let mut lines = reader.lines();
151            let mut content = String::new();
152
153            while let Ok(Some(line)) = lines.next_line().await {
154                content.push_str(&line);
155                content.push('\n');
156
157                // 发送实时输出
158                if let Some(callback) = &output_callback_clone {
159                    let event = OutputEvent {
160                        pipeline_name: pipeline_name1.to_string(),
161                        server_name: "localhost".to_string(),
162                        step: step_clone.clone(),
163                        script_path: script_path.clone(),
164                        output_type: OutputType::Stdout,
165                        content: line,
166                        timestamp: Instant::now(),
167                        variables: variables_clone.clone(),
168                    };
169                    callback(event);
170                }
171            }
172            content
173        });
174
175        let script_path = script_path_str.clone();
176        let step_clone2 = step.clone();
177        let stderr_task = tokio::spawn(async move {
178            let reader = BufReader::new(stderr);
179            let mut lines = reader.lines();
180            let mut content = String::new();
181
182            while let Ok(Some(line)) = lines.next_line().await {
183                content.push_str(&line);
184                content.push('\n');
185
186                // 发送实时输出
187                if let Some(callback) = &output_callback_clone2 {
188                    let event = OutputEvent {
189                        pipeline_name: pipeline_name2.to_string(),
190                        server_name: "localhost".to_string(),
191                        step: step_clone2.clone(),
192                        script_path: script_path.clone(),
193                        output_type: OutputType::Stderr,
194                        content: line,
195                        timestamp: Instant::now(),
196                        variables: variables_clone2.clone(),
197                    };
198                    callback(event);
199                }
200            }
201            content
202        });
203
204        // 等待命令完成(带超时)
205        let status = tokio::time::timeout(
206            std::time::Duration::from_secs(timeout_seconds),
207            child.wait(),
208        )
209        .await;
210
211        let exit_code = match status {
212            Ok(Ok(exit_status)) => exit_status.code().unwrap_or(-1),
213            Ok(Err(e)) => {
214                error!("Local script execution failed: {}", e);
215                return Err(anyhow::anyhow!("Local script execution failed: {}", e));
216            }
217            Err(_) => {
218                // 超时,强制终止进程
219                let _ = child.kill().await;
220                return Err(anyhow::anyhow!(
221                    "Local script execution timed out after {} seconds",
222                    timeout_seconds
223                ));
224            }
225        };
226
227        // 等待输出读取完成
228        let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
229
230        let stdout_content = stdout_result.unwrap_or_default();
231        let stderr_content = stderr_result.unwrap_or_default();
232
233        let execution_time = start_time.elapsed().as_millis() as u64;
234        let success = exit_code == 0;
235
236        info!(
237            "Local script '{}' completed with exit code: {}",
238            script_path_str, exit_code
239        );
240
241        // 发送完成日志
242        if let Some(callback) = &output_callback {
243            let status = if success { "成功" } else { "失败" };
244            let event = OutputEvent {
245                pipeline_name: pipeline_name.to_string(),
246                server_name: "localhost".to_string(),
247                step: step.clone(),
248                script_path: script_path_str.clone(),
249                output_type: OutputType::Log,
250                content: format!(
251                    "本地脚本执行完成: {} ({}) - 耗时: {}ms",
252                    script_path_str, status, execution_time
253                ),
254                timestamp: Instant::now(),
255                variables: variables.clone(),
256            };
257            callback(event);
258        }
259
260        // 清理临时文件(drop后自动删除)
261        drop(temp_file);
262
263        Ok(ExecutionResult {
264            success,
265            stdout: stdout_content,
266            stderr: stderr_content,
267            script: script_path_str.clone(),
268            exit_code,
269            execution_time_ms: execution_time,
270            error_message: if success {
271                None
272            } else {
273                Some(format!("Script exited with code {}", exit_code))
274            },
275        })
276    }
277
278    /// 在本地执行shell脚本(同步版本,用于兼容性)
279    pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
280        let start_time = Instant::now();
281
282        // 检查脚本文件是否存在
283        let script_path = std::path::Path::new(&step.script);
284        if !script_path.exists() {
285            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
286        }
287
288        info!("Executing local script: {}", step.script);
289
290        // 设置超时(注意:同步版本无法真正实现超时,这里只是记录)
291        let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
292
293        // 创建命令
294        let output = Command::new("bash")
295            .arg(&step.script)
296            .current_dir(std::env::current_dir()?)
297            .output()
298            .context("Failed to execute local script")?;
299
300        let execution_time = start_time.elapsed().as_millis() as u64;
301        let exit_code = output.status.code().unwrap_or(-1);
302        let success = exit_code == 0;
303
304        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
305        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
306
307        info!(
308            "Local script '{}' completed with exit code: {}",
309            step.script, exit_code
310        );
311
312        Ok(ExecutionResult {
313            success,
314            stdout,
315            stderr,
316            script: step.script.clone(),
317            exit_code,
318            execution_time_ms: execution_time,
319            error_message: if success {
320                None
321            } else {
322                Some(format!("Script exited with code {}", exit_code))
323            },
324        })
325    }
326}