net_shell/ssh/
local.rs

1use anyhow::{Context, Result};
2use std::process::{Command, Stdio};
3use std::time::Instant;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::Command as TokioCommand;
6use tracing::{error, info};
7use tempfile;
8use std::io::Write;
9
10use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
11
12/// 本地脚本执行器
13pub struct LocalExecutor;
14
15impl LocalExecutor {
16    /// 在本地执行shell脚本(支持实时输出)
17    pub async fn execute_script_with_realtime_output(
18        step: &Step,
19        pipeline_name: &str,
20        _step_name: &str,
21        output_callback: Option<OutputCallback>,
22        variables: std::collections::HashMap<String, String>,
23    ) -> Result<ExecutionResult> {
24        let start_time = Instant::now();
25        let pipeline_name = pipeline_name.to_string();
26        
27        // 替换 script 路径中的 {{ 变量 }} 占位符
28        let mut script_path_str = step.script.clone();
29        for (key, value) in &variables {
30            let placeholder = format!("{{{{ {} }}}}", key);
31            script_path_str = script_path_str.replace(&placeholder, value);
32        }
33        let script_path = std::path::Path::new(&script_path_str);
34        if !script_path.exists() {
35            return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
36        }
37
38        // 读取脚本内容并进行变量替换
39        let mut script_content = std::fs::read_to_string(&script_path)
40            .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
41        for (key, value) in &variables {
42            let placeholder = format!("{{{{ {} }}}}", key);
43            script_content = script_content.replace(&placeholder, value);
44        }
45
46        // 写入临时文件
47        let mut temp_file = tempfile::NamedTempFile::new()
48            .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
49        temp_file.write_all(script_content.as_bytes())
50            .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
51        let temp_path = temp_file.path().to_path_buf();
52
53        info!("Executing local script: {} (with content variable substitution)", script_path_str);
54
55        // 发送开始执行的日志
56        if let Some(callback) = &output_callback {
57            let event = OutputEvent {
58                pipeline_name: pipeline_name.clone(),
59                server_name: "localhost".to_string(),
60                step: step.clone(),
61                output_type: OutputType::Log,
62                content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
63                timestamp: Instant::now(),
64                variables: variables.clone(),
65            };
66            callback(event);
67        }
68
69        // 设置超时
70        let timeout_seconds = step.timeout_seconds.unwrap_or(60);
71        
72        // 创建异步命令
73        let mut command = TokioCommand::new("bash");
74        command.arg(temp_path.to_str().unwrap());
75        command.current_dir(std::env::current_dir()?);
76        
77        // 设置环境变量
78        for (key, value) in &variables {
79            command.env(key, value);
80        }
81
82        // 设置标准输出和错误输出
83        command.stdout(Stdio::piped());
84        command.stderr(Stdio::piped());
85
86        // 执行命令
87        let mut child = command.spawn()
88            .context("Failed to spawn local script process")?;
89
90        let stdout = child.stdout.take().expect("Failed to capture stdout");
91        let stderr = child.stderr.take().expect("Failed to capture stderr");
92
93        // 克隆必要的数据用于异步任务
94        let step_clone = step.clone();
95        let pipeline_name1 = pipeline_name.clone();
96        let pipeline_name2 = pipeline_name.clone();
97        let variables_clone = variables.clone();
98        let variables_clone2 = variables.clone();
99        let output_callback_clone = output_callback.clone();
100        let output_callback_clone2 = output_callback.clone();
101
102        // 创建输出读取任务
103        let stdout_task = tokio::spawn(async move {
104            let reader = BufReader::new(stdout);
105            let mut lines = reader.lines();
106            let mut content = String::new();
107            
108            while let Ok(Some(line)) = lines.next_line().await {
109                content.push_str(&line);
110                content.push('\n');
111                
112                // 发送实时输出
113                if let Some(callback) = &output_callback_clone {
114                    let event = OutputEvent {
115                        pipeline_name: pipeline_name1.to_string(),
116                        server_name: "localhost".to_string(),
117                        step: step_clone.clone(),
118                        output_type: OutputType::Stdout,
119                        content: line,
120                        timestamp: Instant::now(),
121                        variables: variables_clone.clone(),
122                    };
123                    callback(event);
124                }
125            }
126            content
127        });
128
129        let step_clone2 = step.clone();
130        let stderr_task = tokio::spawn(async move {
131            let reader = BufReader::new(stderr);
132            let mut lines = reader.lines();
133            let mut content = String::new();
134            
135            while let Ok(Some(line)) = lines.next_line().await {
136                content.push_str(&line);
137                content.push('\n');
138                
139                // 发送实时输出
140                if let Some(callback) = &output_callback_clone2 {
141                    let event = OutputEvent {
142                        pipeline_name: pipeline_name2.to_string(),
143                        server_name: "localhost".to_string(),
144                        step: step_clone2.clone(),
145                        output_type: OutputType::Stderr,
146                        content: line,
147                        timestamp: Instant::now(),
148                        variables: variables_clone2.clone(),
149                    };
150                    callback(event);
151                }
152            }
153            content
154        });
155
156        // 等待命令完成(带超时)
157        let status = tokio::time::timeout(
158            std::time::Duration::from_secs(timeout_seconds),
159            child.wait()
160        ).await;
161
162        let exit_code = match status {
163            Ok(Ok(exit_status)) => {
164                exit_status.code().unwrap_or(-1)
165            }
166            Ok(Err(e)) => {
167                error!("Local script execution failed: {}", e);
168                return Err(anyhow::anyhow!("Local script execution failed: {}", e));
169            }
170            Err(_) => {
171                // 超时,强制终止进程
172                let _ = child.kill().await;
173                return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
174            }
175        };
176
177        // 等待输出读取完成
178        let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
179        
180        let stdout_content = stdout_result.unwrap_or_default();
181        let stderr_content = stderr_result.unwrap_or_default();
182
183        let execution_time = start_time.elapsed().as_millis() as u64;
184        let success = exit_code == 0;
185
186        info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
187
188        // 发送完成日志
189        if let Some(callback) = &output_callback {
190            let status = if success { "成功" } else { "失败" };
191            let event = OutputEvent {
192                pipeline_name: pipeline_name.to_string(),
193                server_name: "localhost".to_string(),
194                step: step.clone(),
195                output_type: OutputType::Log,
196                content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
197                timestamp: Instant::now(),
198                variables: variables.clone(),
199            };
200            callback(event);
201        }
202
203        // 清理临时文件(drop后自动删除)
204        drop(temp_file);
205
206        Ok(ExecutionResult {
207            success,
208            stdout: stdout_content,
209            stderr: stderr_content,
210            script: script_path_str.clone(),
211            exit_code,
212            execution_time_ms: execution_time,
213            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
214        })
215    }
216
217    /// 在本地执行shell脚本(同步版本,用于兼容性)
218    pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
219        let start_time = Instant::now();
220        
221        // 检查脚本文件是否存在
222        let script_path = std::path::Path::new(&step.script);
223        if !script_path.exists() {
224            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
225        }
226
227        info!("Executing local script: {}", step.script);
228
229        // 设置超时(注意:同步版本无法真正实现超时,这里只是记录)
230        let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
231        
232        // 创建命令
233        let output = Command::new("bash")
234            .arg(&step.script)
235            .current_dir(std::env::current_dir()?)
236            .output()
237            .context("Failed to execute local script")?;
238
239        let execution_time = start_time.elapsed().as_millis() as u64;
240        let exit_code = output.status.code().unwrap_or(-1);
241        let success = exit_code == 0;
242
243        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
244        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
245
246        info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
247
248        Ok(ExecutionResult {
249            success,
250            stdout,
251            stderr,
252            script: step.script.clone(),
253            exit_code,
254            execution_time_ms: execution_time,
255            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
256        })
257    }
258}