net_shell/ssh/
local.rs

1use anyhow::{Context, Error, Result};
2use std::process::{Command, Stdio};
3use std::time::Instant;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::Command as TokioCommand;
6use tracing::{error, info};
7use tempfile;
8use std::io::Write;
9
10use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
11
12/// 本地脚本执行器
13pub struct LocalExecutor;
14
15impl LocalExecutor {
16    /// 在本地执行shell脚本(支持实时输出)
17    pub async fn execute_script_with_realtime_output(
18        global_scripts:Vec<String>,
19        step: &Step,
20        pipeline_name: &str,
21        _step_name: &str,
22        output_callback: Option<OutputCallback>,
23        variables: std::collections::HashMap<String, String>,
24    ) -> Result<ExecutionResult> {
25        let start_time = Instant::now();
26        let pipeline_name = pipeline_name.to_string();
27        
28        let script_path_str = step.script.clone();
29        let script_path = std::path::Path::new(&script_path_str);
30        if !script_path.exists() {
31            return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
32        }
33
34        // 读取脚本内容并进行变量替换
35        let mut script_content = std::fs::read_to_string(&script_path)
36            .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
37        for (key, value) in &variables {
38            let placeholder = format!("{{{{ {} }}}}", key);
39            script_content = script_content.replace(&placeholder, value);
40        }
41
42        let mut gloabl_script_content = global_scripts.iter()
43        .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
44        .fold(Ok("".to_string()), |p:Result<String>,v|{
45            if p.is_err(){
46                return p;
47            }
48
49            if v.is_err(){
50                return Err(Error::msg(format!("{:?}", v.err())));
51            }
52            let content = v.unwrap();
53
54            let mut s = p.unwrap_or_default();
55
56            s.push_str("\n");
57            s.push_str(&content);
58
59            return  Ok(s.clone());
60        })?;
61
62        gloabl_script_content.push_str("\n");
63        gloabl_script_content.push_str(&script_content);
64
65        let script_content = gloabl_script_content.clone();
66
67        // 写入临时文件
68        let mut temp_file = tempfile::NamedTempFile::new()
69            .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
70        temp_file.write_all(script_content.as_bytes())
71            .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
72        let temp_path = temp_file.path().to_path_buf();
73
74        info!("Executing local script: {} (with content variable substitution)", script_path_str);
75
76        // 发送开始执行的日志
77        if let Some(callback) = &output_callback {
78            let event = OutputEvent {
79                pipeline_name: pipeline_name.clone(),
80                server_name: "localhost".to_string(),
81                step: step.clone(),
82                script_path:script_path_str.clone(),
83                output_type: OutputType::Log,
84                content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
85                timestamp: Instant::now(),
86                variables: variables.clone(),
87            };
88            callback(event);
89        }
90
91        // 设置超时
92        let timeout_seconds = step.timeout_seconds.unwrap_or(60);
93        
94        // 创建异步命令
95        let mut command = TokioCommand::new("bash");
96        command.arg(temp_path.to_str().unwrap());
97        command.current_dir(std::env::current_dir()?);
98        
99        // 设置环境变量
100        for (key, value) in &variables {
101            command.env(key, value);
102        }
103
104        // 设置标准输出和错误输出
105        command.stdout(Stdio::piped());
106        command.stderr(Stdio::piped());
107
108        // 执行命令
109        let mut child = command.spawn()
110            .context("Failed to spawn local script process")?;
111
112        let stdout = child.stdout.take().expect("Failed to capture stdout");
113        let stderr = child.stderr.take().expect("Failed to capture stderr");
114
115        // 克隆必要的数据用于异步任务
116        let step_clone = step.clone();
117        let pipeline_name1 = pipeline_name.clone();
118        let pipeline_name2 = pipeline_name.clone();
119        let variables_clone = variables.clone();
120        let variables_clone2 = variables.clone();
121        let output_callback_clone = output_callback.clone();
122        let output_callback_clone2 = output_callback.clone();
123        let script_path = script_path_str.clone();
124
125        // 创建输出读取任务
126        let stdout_task = tokio::spawn(async move {
127            let reader = BufReader::new(stdout);
128            let mut lines = reader.lines();
129            let mut content = String::new();
130            
131            while let Ok(Some(line)) = lines.next_line().await {
132                content.push_str(&line);
133                content.push('\n');
134                
135                // 发送实时输出
136                if let Some(callback) = &output_callback_clone {
137                    let event = OutputEvent {
138                        pipeline_name: pipeline_name1.to_string(),
139                        server_name: "localhost".to_string(),
140                        step: step_clone.clone(),
141                        script_path:script_path.clone(),
142                        output_type: OutputType::Stdout,
143                        content: line,
144                        timestamp: Instant::now(),
145                        variables: variables_clone.clone(),
146                    };
147                    callback(event);
148                }
149            }
150            content
151        });
152
153        let script_path = script_path_str.clone();
154        let step_clone2 = step.clone();
155        let stderr_task = tokio::spawn(async move {
156            let reader = BufReader::new(stderr);
157            let mut lines = reader.lines();
158            let mut content = String::new();
159            
160            while let Ok(Some(line)) = lines.next_line().await {
161                content.push_str(&line);
162                content.push('\n');
163                
164                // 发送实时输出
165                if let Some(callback) = &output_callback_clone2 {
166                    let event = OutputEvent {
167                        pipeline_name: pipeline_name2.to_string(),
168                        server_name: "localhost".to_string(),
169                        step: step_clone2.clone(),
170                        script_path:script_path.clone(),
171                        output_type: OutputType::Stderr,
172                        content: line,
173                        timestamp: Instant::now(),
174                        variables: variables_clone2.clone(),
175                    };
176                    callback(event);
177                }
178            }
179            content
180        });
181
182        // 等待命令完成(带超时)
183        let status = tokio::time::timeout(
184            std::time::Duration::from_secs(timeout_seconds),
185            child.wait()
186        ).await;
187
188        let exit_code = match status {
189            Ok(Ok(exit_status)) => {
190                exit_status.code().unwrap_or(-1)
191            }
192            Ok(Err(e)) => {
193                error!("Local script execution failed: {}", e);
194                return Err(anyhow::anyhow!("Local script execution failed: {}", e));
195            }
196            Err(_) => {
197                // 超时,强制终止进程
198                let _ = child.kill().await;
199                return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
200            }
201        };
202
203        // 等待输出读取完成
204        let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
205        
206        let stdout_content = stdout_result.unwrap_or_default();
207        let stderr_content = stderr_result.unwrap_or_default();
208
209        let execution_time = start_time.elapsed().as_millis() as u64;
210        let success = exit_code == 0;
211
212        info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
213
214        // 发送完成日志
215        if let Some(callback) = &output_callback {
216            let status = if success { "成功" } else { "失败" };
217            let event = OutputEvent {
218                pipeline_name: pipeline_name.to_string(),
219                server_name: "localhost".to_string(),
220                step: step.clone(),
221                script_path:script_path_str.clone(),
222                output_type: OutputType::Log,
223                content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
224                timestamp: Instant::now(),
225                variables: variables.clone(),
226            };
227            callback(event);
228        }
229
230        // 清理临时文件(drop后自动删除)
231        drop(temp_file);
232
233        Ok(ExecutionResult {
234            success,
235            stdout: stdout_content,
236            stderr: stderr_content,
237            script: script_path_str.clone(),
238            exit_code,
239            execution_time_ms: execution_time,
240            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
241        })
242    }
243
244    /// 在本地执行shell脚本(同步版本,用于兼容性)
245    pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
246        let start_time = Instant::now();
247        
248        // 检查脚本文件是否存在
249        let script_path = std::path::Path::new(&step.script);
250        if !script_path.exists() {
251            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
252        }
253
254        info!("Executing local script: {}", step.script);
255
256        // 设置超时(注意:同步版本无法真正实现超时,这里只是记录)
257        let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
258        
259        // 创建命令
260        let output = Command::new("bash")
261            .arg(&step.script)
262            .current_dir(std::env::current_dir()?)
263            .output()
264            .context("Failed to execute local script")?;
265
266        let execution_time = start_time.elapsed().as_millis() as u64;
267        let exit_code = output.status.code().unwrap_or(-1);
268        let success = exit_code == 0;
269
270        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
271        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
272
273        info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
274
275        Ok(ExecutionResult {
276            success,
277            stdout,
278            stderr,
279            script: step.script.clone(),
280            exit_code,
281            execution_time_ms: execution_time,
282            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
283        })
284    }
285}