net_shell/ssh/
local.rs

1use anyhow::{Context, Error, Result};
2use std::fs;
3use std::process::{Command, Stdio};
4use std::time::Instant;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command as TokioCommand;
7use tracing::{error, info};
8use tempfile;
9use std::io::Write;
10
11use crate::models::{ExecutionResult, OutputCallback, OutputEvent, OutputType, Step};
12
13/// 本地脚本执行器
14pub struct LocalExecutor;
15
16impl LocalExecutor {
17    /// 在本地执行shell脚本(支持实时输出)
18    pub async fn execute_script_with_realtime_output(
19        script: Option<String>,
20        global_scripts:Vec<String>,
21        step: &Step,
22        pipeline_name: &str,
23        _step_name: &str,
24        output_callback: Option<OutputCallback>,
25        variables: std::collections::HashMap<String, String>,
26    ) -> Result<ExecutionResult> {
27        let start_time = Instant::now();
28        let pipeline_name = pipeline_name.to_string();
29        
30        let script_path_str = step.script.clone();
31        let script_path = std::path::Path::new(&script_path_str);
32        if !script_path.exists() {
33            return Err(anyhow::anyhow!("Script '{}' not found", script_path_str));
34        }
35
36        // 读取脚本内容并进行变量替换
37        let mut script_content = std::fs::read_to_string(&script_path)
38            .map_err(|e| anyhow::anyhow!("Failed to read script file '{}': {}", script_path_str, e))?;
39        for (key, value) in &variables {
40            let placeholder = format!("{{{{ {} }}}}", key);
41            script_content = script_content.replace(&placeholder, value);
42        }
43
44        let mut gloabl_script_content = global_scripts.iter()
45        .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
46        .fold(Ok("".to_string()), |p:Result<String>,v|{
47            if p.is_err(){
48                return p;
49            }
50
51            if v.is_err(){
52                return Err(Error::msg(format!("{:?}", v.err())));
53            }
54            let content = v.unwrap();
55
56            let mut s = p.unwrap_or_default();
57
58            s.push_str("\n");
59            s.push_str(&content);
60
61            return  Ok(s.clone());
62        })?;
63
64        if let Some(script_header) = script {
65            let cont =  fs::read_to_string(&script_header)
66                .map_err(|e| anyhow::anyhow!("Failed to read script header file '{}': {}", script_header, e))?;
67            gloabl_script_content.push_str("\n");
68            gloabl_script_content.push_str(&cont);
69        }
70
71        gloabl_script_content.push_str("\n");
72        gloabl_script_content.push_str(&script_content);
73
74        let script_content = gloabl_script_content.clone();
75
76        // 写入临时文件
77        let mut temp_file = tempfile::NamedTempFile::new()
78            .map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
79        temp_file.write_all(script_content.as_bytes())
80            .map_err(|e| anyhow::anyhow!("Failed to write to temp file: {}", e))?;
81        let temp_path = temp_file.path().to_path_buf();
82
83        info!("Executing local script: {} (with content variable substitution)", script_path_str);
84
85        // 发送开始执行的日志
86        if let Some(callback) = &output_callback {
87            let event = OutputEvent {
88                pipeline_name: pipeline_name.clone(),
89                server_name: "localhost".to_string(),
90                step: step.clone(),
91                script_path:script_path_str.clone(),
92                output_type: OutputType::Log,
93                content: format!("开始执行本地脚本: {} (内容已变量替换)", script_path_str),
94                timestamp: Instant::now(),
95                variables: variables.clone(),
96            };
97            callback(event);
98        }
99
100        // 设置超时
101        let timeout_seconds = step.timeout_seconds.unwrap_or(60);
102        
103        // 创建异步命令
104        let mut command = TokioCommand::new("bash");
105        command.arg(temp_path.to_str().unwrap());
106        command.current_dir(std::env::current_dir()?);
107        
108        // 设置环境变量
109        for (key, value) in &variables {
110            command.env(key, value);
111        }
112
113        // 设置标准输出和错误输出
114        command.stdout(Stdio::piped());
115        command.stderr(Stdio::piped());
116
117        // 执行命令
118        let mut child = command.spawn()
119            .context("Failed to spawn local script process")?;
120
121        let stdout = child.stdout.take().expect("Failed to capture stdout");
122        let stderr = child.stderr.take().expect("Failed to capture stderr");
123
124        // 克隆必要的数据用于异步任务
125        let step_clone = step.clone();
126        let pipeline_name1 = pipeline_name.clone();
127        let pipeline_name2 = pipeline_name.clone();
128        let variables_clone = variables.clone();
129        let variables_clone2 = variables.clone();
130        let output_callback_clone = output_callback.clone();
131        let output_callback_clone2 = output_callback.clone();
132        let script_path = script_path_str.clone();
133
134        // 创建输出读取任务
135        let stdout_task = tokio::spawn(async move {
136            let reader = BufReader::new(stdout);
137            let mut lines = reader.lines();
138            let mut content = String::new();
139            
140            while let Ok(Some(line)) = lines.next_line().await {
141                content.push_str(&line);
142                content.push('\n');
143                
144                // 发送实时输出
145                if let Some(callback) = &output_callback_clone {
146                    let event = OutputEvent {
147                        pipeline_name: pipeline_name1.to_string(),
148                        server_name: "localhost".to_string(),
149                        step: step_clone.clone(),
150                        script_path:script_path.clone(),
151                        output_type: OutputType::Stdout,
152                        content: line,
153                        timestamp: Instant::now(),
154                        variables: variables_clone.clone(),
155                    };
156                    callback(event);
157                }
158            }
159            content
160        });
161
162        let script_path = script_path_str.clone();
163        let step_clone2 = step.clone();
164        let stderr_task = tokio::spawn(async move {
165            let reader = BufReader::new(stderr);
166            let mut lines = reader.lines();
167            let mut content = String::new();
168            
169            while let Ok(Some(line)) = lines.next_line().await {
170                content.push_str(&line);
171                content.push('\n');
172                
173                // 发送实时输出
174                if let Some(callback) = &output_callback_clone2 {
175                    let event = OutputEvent {
176                        pipeline_name: pipeline_name2.to_string(),
177                        server_name: "localhost".to_string(),
178                        step: step_clone2.clone(),
179                        script_path:script_path.clone(),
180                        output_type: OutputType::Stderr,
181                        content: line,
182                        timestamp: Instant::now(),
183                        variables: variables_clone2.clone(),
184                    };
185                    callback(event);
186                }
187            }
188            content
189        });
190
191        // 等待命令完成(带超时)
192        let status = tokio::time::timeout(
193            std::time::Duration::from_secs(timeout_seconds),
194            child.wait()
195        ).await;
196
197        let exit_code = match status {
198            Ok(Ok(exit_status)) => {
199                exit_status.code().unwrap_or(-1)
200            }
201            Ok(Err(e)) => {
202                error!("Local script execution failed: {}", e);
203                return Err(anyhow::anyhow!("Local script execution failed: {}", e));
204            }
205            Err(_) => {
206                // 超时,强制终止进程
207                let _ = child.kill().await;
208                return Err(anyhow::anyhow!("Local script execution timed out after {} seconds", timeout_seconds));
209            }
210        };
211
212        // 等待输出读取完成
213        let (stdout_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
214        
215        let stdout_content = stdout_result.unwrap_or_default();
216        let stderr_content = stderr_result.unwrap_or_default();
217
218        let execution_time = start_time.elapsed().as_millis() as u64;
219        let success = exit_code == 0;
220
221        info!("Local script '{}' completed with exit code: {}", script_path_str, exit_code);
222
223        // 发送完成日志
224        if let Some(callback) = &output_callback {
225            let status = if success { "成功" } else { "失败" };
226            let event = OutputEvent {
227                pipeline_name: pipeline_name.to_string(),
228                server_name: "localhost".to_string(),
229                step: step.clone(),
230                script_path:script_path_str.clone(),
231                output_type: OutputType::Log,
232                content: format!("本地脚本执行完成: {} ({}) - 耗时: {}ms", script_path_str, status, execution_time),
233                timestamp: Instant::now(),
234                variables: variables.clone(),
235            };
236            callback(event);
237        }
238
239        // 清理临时文件(drop后自动删除)
240        drop(temp_file);
241
242        Ok(ExecutionResult {
243            success,
244            stdout: stdout_content,
245            stderr: stderr_content,
246            script: script_path_str.clone(),
247            exit_code,
248            execution_time_ms: execution_time,
249            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
250        })
251    }
252
253    /// 在本地执行shell脚本(同步版本,用于兼容性)
254    pub fn execute_script(step: &Step) -> Result<ExecutionResult> {
255        let start_time = Instant::now();
256        
257        // 检查脚本文件是否存在
258        let script_path = std::path::Path::new(&step.script);
259        if !script_path.exists() {
260            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
261        }
262
263        info!("Executing local script: {}", step.script);
264
265        // 设置超时(注意:同步版本无法真正实现超时,这里只是记录)
266        let _timeout_seconds = step.timeout_seconds.unwrap_or(60);
267        
268        // 创建命令
269        let output = Command::new("bash")
270            .arg(&step.script)
271            .current_dir(std::env::current_dir()?)
272            .output()
273            .context("Failed to execute local script")?;
274
275        let execution_time = start_time.elapsed().as_millis() as u64;
276        let exit_code = output.status.code().unwrap_or(-1);
277        let success = exit_code == 0;
278
279        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
280        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
281
282        info!("Local script '{}' completed with exit code: {}", step.script, exit_code);
283
284        Ok(ExecutionResult {
285            success,
286            stdout,
287            stderr,
288            script: step.script.clone(),
289            exit_code,
290            execution_time_ms: execution_time,
291            error_message: if success { None } else { Some(format!("Script exited with code {}", exit_code)) },
292        })
293    }
294}