net_shell/ssh/
local.rs

1use anyhow::{Context, Error, Result};
2use std::process::{Command, Stdio};
3use std::time::Instant;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::Command as TokioCommand;
6use tracing::{error, info};
7use tempfile;
8use std::io::Write;
9
10use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
11
12/// 本地脚本执行器
13pub struct LocalExecutor;
14
15impl LocalExecutor {
16    /// 在本地执行shell脚本(支持实时输出)
17    pub async fn execute_script_with_realtime_output(
18        global_scripts:Vec<String>,
19        step: &Step,
20        pipeline_name: &str,
21        _step_name: &str,
22        output_callback: Option<OutputCallback>,
23        variables: std::collections::HashMap<String, String>,
24    ) -> Result<ExecutionResult> {
25        let start_time = Instant::now();
26        let pipeline_name = pipeline_name.to_string();
27        
28        let script_path_str = step.script.clone();
29        let script_path = std::path::Path::new(&script_path_str);
30        if !script_path.exists() {
31            return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
32        }
33
34        // 读取脚本内容并进行变量替换
35        let mut script_content = std::fs::read_to_string(&script_path)
36            .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
37        for (key, value) in &variables {
38            let placeholder = format!("{{{{ {} }}}}", key);
39            script_content = script_content.replace(&placeholder, value);
40        }
41
42        let mut gloabl_script_content = global_scripts.iter()
43        .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
44        .fold(Ok("".to_string()), |p:Result<String>,v|{
45            if p.is_err(){
46                return p;
47            }
48
49            if v.is_err(){
50                return Err(Error::msg(format!("{:?}", v.err())));
51            }
52            let content = v.unwrap();
53
54            let mut s = p.unwrap_or_default();
55
56            s.push_str("\n");
57            s.push_str(&content);
58
59            return  Ok(s.clone());
60        })?;
61
62        gloabl_script_content.push_str("\n");
63        gloabl_script_content.push_str(&script_content);
64
65        let script_content = gloabl_script_content.clone();
66
67        // 写入临时文件
68        let mut temp_file = tempfile::NamedTempFile::new()
69            .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
70        temp_file.write_all(script_content.as_bytes())
71            .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
72        let temp_path = temp_file.path().to_path_buf();
73
74        info!("Executing local script: {} (with content variable substitution)", script_path_str);
75
76        // 发送开始执行的日志
77        if let Some(callback) = &output_callback {
78            let event = OutputEvent {
79                pipeline_name: pipeline_name.clone(),
80                server_name: "localhost".to_string(),
81                step: step.clone(),
82                output_type: OutputType::Log,
83                content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
84                timestamp: Instant::now(),
85                variables: variables.clone(),
86            };
87            callback(event);
88        }
89
90        // 设置超时
91        let timeout_seconds = step.timeout_seconds.unwrap_or(60);
92        
93        // 创建异步命令
94        let mut command = TokioCommand::new("bash");
95        command.arg(temp_path.to_str().unwrap());
96        command.current_dir(std::env::current_dir()?);
97        
98        // 设置环境变量
99        for (key, value) in &variables {
100            command.env(key, value);
101        }
102
103        // 设置标准输出和错误输出
104        command.stdout(Stdio::piped());
105        command.stderr(Stdio::piped());
106
107        // 执行命令
108        let mut child = command.spawn()
109            .context("Failed to spawn local script process")?;
110
111        let stdout = child.stdout.take().expect("Failed to capture stdout");
112        let stderr = child.stderr.take().expect("Failed to capture stderr");
113
114        // 克隆必要的数据用于异步任务
115        let step_clone = step.clone();
116        let pipeline_name1 = pipeline_name.clone();
117        let pipeline_name2 = pipeline_name.clone();
118        let variables_clone = variables.clone();
119        let variables_clone2 = variables.clone();
120        let output_callback_clone = output_callback.clone();
121        let output_callback_clone2 = output_callback.clone();
122
123        // 创建输出读取任务
124        let stdout_task = tokio::spawn(async move {
125            let reader = BufReader::new(stdout);
126            let mut lines = reader.lines();
127            let mut content = String::new();
128            
129            while let Ok(Some(line)) = lines.next_line().await {
130                content.push_str(&line);
131                content.push('\n');
132                
133                // 发送实时输出
134                if let Some(callback) = &output_callback_clone {
135                    let event = OutputEvent {
136                        pipeline_name: pipeline_name1.to_string(),
137                        server_name: "localhost".to_string(),
138                        step: step_clone.clone(),
139                        output_type: OutputType::Stdout,
140                        content: line,
141                        timestamp: Instant::now(),
142                        variables: variables_clone.clone(),
143                    };
144                    callback(event);
145                }
146            }
147            content
148        });
149
150        let step_clone2 = step.clone();
151        let stderr_task = tokio::spawn(async move {
152            let reader = BufReader::new(stderr);
153            let mut lines = reader.lines();
154            let mut content = String::new();
155            
156            while let Ok(Some(line)) = lines.next_line().await {
157                content.push_str(&line);
158                content.push('\n');
159                
160                // 发送实时输出
161                if let Some(callback) = &output_callback_clone2 {
162                    let event = OutputEvent {
163                        pipeline_name: pipeline_name2.to_string(),
164                        server_name: "localhost".to_string(),
165                        step: step_clone2.clone(),
166                        output_type: OutputType::Stderr,
167                        content: line,
168                        timestamp: Instant::now(),
169                        variables: variables_clone2.clone(),
170                    };
171                    callback(event);
172                }
173            }
174            content
175        });
176
177        // 等待命令完成(带超时)
178        let status = tokio::time::timeout(
179            std::time::Duration::from_secs(timeout_seconds),
180            child.wait()
181        ).await;
182
183        let exit_code = match status {
184            Ok(Ok(exit_status)) => {
185                exit_status.code().unwrap_or(-1)
186            }
187            Ok(Err(e)) => {
188                error!("Local script execution failed: {}", e);
189                return Err(anyhow::anyhow!("Local script execution failed: {}", e));
190            }
191            Err(_) => {
192                // 超时,强制终止进程
193                let _ = child.kill().await;
194                return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
195            }
196        };
197
198        // 等待输出读取完成
199        let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
200        
201        let stdout_content = stdout_result.unwrap_or_default();
202        let stderr_content = stderr_result.unwrap_or_default();
203
204        let execution_time = start_time.elapsed().as_millis() as u64;
205        let success = exit_code == 0;
206
207        info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
208
209        // 发送完成日志
210        if let Some(callback) = &output_callback {
211            let status = if success { "成功" } else { "失败" };
212            let event = OutputEvent {
213                pipeline_name: pipeline_name.to_string(),
214                server_name: "localhost".to_string(),
215                step: step.clone(),
216                output_type: OutputType::Log,
217                content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
218                timestamp: Instant::now(),
219                variables: variables.clone(),
220            };
221            callback(event);
222        }
223
224        // 清理临时文件(drop后自动删除)
225        drop(temp_file);
226
227        Ok(ExecutionResult {
228            success,
229            stdout: stdout_content,
230            stderr: stderr_content,
231            script: script_path_str.clone(),
232            exit_code,
233            execution_time_ms: execution_time,
234            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
235        })
236    }
237
238    /// 在本地执行shell脚本(同步版本,用于兼容性)
239    pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
240        let start_time = Instant::now();
241        
242        // 检查脚本文件是否存在
243        let script_path = std::path::Path::new(&step.script);
244        if !script_path.exists() {
245            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
246        }
247
248        info!("Executing local script: {}", step.script);
249
250        // 设置超时(注意:同步版本无法真正实现超时,这里只是记录)
251        let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
252        
253        // 创建命令
254        let output = Command::new("bash")
255            .arg(&step.script)
256            .current_dir(std::env::current_dir()?)
257            .output()
258            .context("Failed to execute local script")?;
259
260        let execution_time = start_time.elapsed().as_millis() as u64;
261        let exit_code = output.status.code().unwrap_or(-1);
262        let success = exit_code == 0;
263
264        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
265        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
266
267        info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
268
269        Ok(ExecutionResult {
270            success,
271            stdout,
272            stderr,
273            script: step.script.clone(),
274            exit_code,
275            execution_time_ms: execution_time,
276            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
277        })
278    }
279}