net_shell/executor/
mod.rs

1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tracing::{error, info};
7
8use crate::config::ConfigManager;
9use crate::models::{
10    ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult, 
11    RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
12};
13use crate::ssh::SshExecutor;
14use crate::ssh::local::LocalExecutor;
15use crate::vars::VariableManager;
16use crate::ShellExecutionResult;
17
18/// 远程执行器
19pub struct RemoteExecutor {
20    config: RemoteExecutionConfig,
21    variable_manager: VariableManager,
22}
23
24impl RemoteExecutor {
25
26
27    /// 从YAML文件创建执行器
28    pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
29        let content = std::fs::read_to_string(path)
30            .context("Failed to read YAML configuration file")?;
31        
32        Self::from_yaml_str(&content, variables)
33    }
34
35    /// 从YAML字符串创建执行器
36    pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
37        // 提取初始变量
38        let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
39
40        // 合并变量
41        let mut all_variables = HashMap::new();
42
43        if let Some(v) = initial_variables {
44            all_variables.extend(v);
45        }
46
47        if let Some(v) = variables {
48            all_variables.extend(v);
49        }
50        
51        // 创建变量管理器
52        let variable_manager = VariableManager::new(Some(all_variables));
53        
54        // 应用变量替换解析配置
55        let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
56        ConfigManager::validate_config(&config)?;
57        
58        Ok(Self { config, variable_manager})
59    }
60
61    /// 执行指定的流水线(支持实时输出)
62    pub async fn execute_pipeline_with_realtime_output(
63        &mut self, // 需要可变引用
64        pipeline_name: &str,
65        output_callback: Option<OutputCallback>,
66        log_callback: Option<OutputCallback>
67    ) -> Result<PipelineExecutionResult> {
68        let pipeline = self.config.pipelines.iter()
69            .find(|p| p.name == pipeline_name)
70            .cloned()
71            .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
72
73        let pipeline_name = pipeline.name.clone();
74        let steps: Vec<Step> = pipeline.steps.clone();
75        let start_time = std::time::Instant::now();
76        let mut all_step_results = Vec::new();
77
78        // 发送开始执行流水线的日志
79        if let Some(callback) = &log_callback {
80            let event = OutputEvent {
81                pipeline_name: pipeline_name.clone(),
82                server_name: "system".to_string(),
83                script_path:"".to_string(),
84                step: Step::default(), // 流水线开始事件没有具体的步骤
85                output_type: crate::models::OutputType::Log,
86                content: format!("开始执行流水线: {}", pipeline_name),
87                timestamp: std::time::Instant::now(),
88                variables: self.variable_manager.get_variables().clone(),
89            };
90            callback(event);
91        }
92
93        info!("Starting pipeline: {}", pipeline_name);
94
95        // 按顺序执行每个步骤(串行)
96        for step in steps {
97            // 合并 step 级变量到全局变量(优先级高)
98            let mut step_var_keys = Vec::new();
99            if let Some(vars) = &step.variables {
100                for (k, v) in vars {
101                    self.variable_manager.set_variable(k.clone(), v.clone());
102                    step_var_keys.push(k.clone());
103                }
104            }
105            // 对当前步骤应用变量替换
106            let mut step_with_variables = step.clone();
107            step_with_variables.script = self.variable_manager.replace_variables(&step.script);
108            
109            // 发送步骤开始事件
110            if let Some(callback) = &output_callback {
111                let event = OutputEvent {
112                    pipeline_name: pipeline_name.clone(),
113                    server_name: "system".to_string(),
114                    step: step.clone(), // 传递完整的Step对象
115                    output_type: crate::models::OutputType::StepStarted,
116                    script_path:step.script.clone(),
117                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
118                    timestamp: std::time::Instant::now(),
119                    variables: self.variable_manager.get_variables().clone(),
120                };
121                callback(event);
122            }
123            
124            // 发送开始执行步骤的日志
125            if let Some(callback) = &log_callback {
126                let event = OutputEvent {
127                    pipeline_name: pipeline_name.clone(),
128                    server_name: "system".to_string(),
129                    step: step.clone(), // 传递完整的Step对象
130                    script_path:step.script.clone(),
131                    output_type: crate::models::OutputType::Log,
132                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
133                    timestamp: std::time::Instant::now(),
134                    variables: self.variable_manager.get_variables().clone(),
135                };
136                callback(event);
137            }
138
139            info!("Starting step: {} on {} servers", step.name, step.servers.len());
140            
141            // 同一步骤内的所有服务器并发执行
142            let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
143            
144            // 检查步骤是否成功(所有服务器都成功才算成功)
145            let step_success = step_results.iter().all(|r| r.execution_result.success);
146            
147            // 添加步骤结果
148            all_step_results.extend(step_results);
149
150            // 发送步骤完成事件
151            if let Some(callback) = &output_callback {
152                let status = if step_success { "成功" } else { "失败" };
153                let event = OutputEvent {
154                    pipeline_name: pipeline_name.clone(),
155                    script_path:step.script.clone(),
156                    server_name: "system".to_string(),
157                    step: step.clone(), // 传递完整的Step对象
158                    output_type: crate::models::OutputType::StepCompleted,
159                    content: format!("步骤完成: {} ({})", step.name, status),
160                    timestamp: std::time::Instant::now(),
161                    variables: self.variable_manager.get_variables().clone(),
162                };
163                callback(event);
164            }
165
166            // 如果步骤失败,可以选择是否继续执行后续步骤
167            if !step_success {
168                info!("Step '{}' failed, stopping pipeline", step.name);
169                break;
170            }
171            
172            info!("Step '{}' completed successfully", step.name);
173        }
174
175        let total_time = start_time.elapsed().as_millis() as u64;
176        let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
177
178        // 发送流水线完成日志
179        if let Some(callback) = &log_callback {
180            let status = if overall_success { "成功" } else { "失败" };
181            let event = OutputEvent {
182                pipeline_name: pipeline_name.clone(),
183                script_path:"".to_string(),
184                server_name: "system".to_string(),
185                step: Step::default(), // 流水线完成事件没有具体的步骤
186                output_type: crate::models::OutputType::Log,
187                content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
188                timestamp: std::time::Instant::now(),
189                variables: self.variable_manager.get_variables().clone(),
190            };
191            callback(event);
192        }
193
194        Ok(PipelineExecutionResult {
195            title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
196            pipeline_name: pipeline_name.clone(),
197            step_results: all_step_results,
198            overall_success,
199            total_execution_time_ms: total_time,
200        })
201    }
202
203    /// 执行所有流水线(支持实时输出)
204    pub async fn execute_all_pipelines_with_realtime_output(
205        &mut self, // 需要可变引用
206        output_callback: Option<OutputCallback>,
207        log_callback: Option<OutputCallback>
208    ) -> Result<ShellExecutionResult> {
209        let mut results = Vec::new();
210        
211        // 发送开始执行所有流水线的日志
212        if let Some(callback) = &log_callback {
213            let event = OutputEvent {
214                pipeline_name: "system".to_string(),
215                server_name: "system".to_string(),
216                script_path :"".to_string(),
217                step: Step::default(), // 系统级别事件没有具体步骤
218                output_type: crate::models::OutputType::Log,
219                content: format!("=== 远程脚本执行器 ==="),
220                timestamp: std::time::Instant::now(),
221                variables: self.variable_manager.get_variables().clone(),
222            };
223            callback(event);
224            
225            let event = OutputEvent {
226                pipeline_name: "system".to_string(),
227                server_name: "system".to_string(),
228                script_path :"".to_string(),
229                step: Step::default(), // 系统级别事件没有具体步骤
230                output_type: crate::models::OutputType::Log,
231                content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
232                timestamp: std::time::Instant::now(),
233                variables: self.variable_manager.get_variables().clone(),
234            };
235            callback(event);
236            
237            let event = OutputEvent {
238                pipeline_name: "system".to_string(),
239                script_path :"".to_string(),
240                server_name: "system".to_string(),
241                step: Step::default(), // 系统级别事件没有具体步骤
242                output_type: crate::models::OutputType::Log,
243                content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
244                timestamp: std::time::Instant::now(),
245                variables: self.variable_manager.get_variables().clone(),
246            };
247            callback(event);
248        }
249        
250        // 按顺序执行每个流水线(串行)
251        let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
252        for pipeline_name in pipeline_names {
253            // 发送开始执行流水线的日志
254            if let Some(callback) = &log_callback {
255                let event = OutputEvent {
256                    pipeline_name: pipeline_name.clone(),
257                    server_name: "system".to_string(),
258                    script_path :"".to_string(),
259                    step: Step::default(), // 流水线开始事件没有具体的步骤
260                    output_type: crate::models::OutputType::Log,
261                    content: format!("开始执行流水线: {}", pipeline_name),
262                    timestamp: std::time::Instant::now(),
263                    variables: self.variable_manager.get_variables().clone(),
264                };
265                callback(event);
266            }
267            info!("Starting pipeline: {}", pipeline_name);
268
269            let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
270            let success = result.overall_success;
271            results.push(result);
272            if !success {
273                info!("Pipeline '{}' failed, stopping execution", pipeline_name);
274                break;
275            }
276            info!("Pipeline '{}' completed successfully", pipeline_name);
277        }
278        
279        Ok(ShellExecutionResult{
280            success: true,
281            reason: "ok".to_string(),
282            pipeline_results: results,
283        })
284    }
285
286    /// 执行指定的流水线(原有方法,保持兼容性)
287    pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
288        self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
289    }
290
291    /// 执行单个步骤(支持实时输出)
292    async fn execute_step_with_realtime_output(
293        &mut self,
294        step: &Step,
295        pipeline_name: &str,
296        output_callback: Option<&OutputCallback>
297    ) -> Result<Vec<StepExecutionResult>> {
298        let start_time = std::time::Instant::now();
299        // Clone config at the start to avoid &self borrow conflicts
300        let config = self.config.clone();
301        let variable_manager = &mut self.variable_manager;
302        
303        // 检查是否有服务器配置
304        if step.servers.is_empty() {
305            // 本地执行
306            info!("Executing step: {} locally (no servers specified)", step.name);
307            let output_callback = output_callback.cloned();
308            let step_clone = step.clone();
309            let pipeline_name = pipeline_name.to_string();
310            let step_name = step.name.clone();
311            let mut variables = variable_manager.get_variables().clone();
312            variables.insert("pipeline_name".to_string(), pipeline_name.clone());
313            variables.insert("step_name".to_string(), step_name.clone());
314            let execution_result = LocalExecutor::execute_script_with_realtime_output(
315                self.config.global_scripts.clone(),
316                &step_clone,
317                &pipeline_name,
318                &step_name,
319                output_callback,
320                variables,
321            ).await?;
322            let success = execution_result.success;
323            // 提取变量(如果有extract规则)
324            if let Some(extract_rules) = step.extract.clone() {
325                if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
326                    info!("Failed to extract variables from step '{}': {}", step.name, e);
327                }
328            }
329            let step_result = StepExecutionResult {
330                title: step.title.clone().unwrap_or(step.name.clone()),
331                step_name: step.name.clone(),
332                scritp_path:step.script.clone(),
333                server_name: "localhost".to_string(),
334                execution_result,
335                overall_success: success,
336                execution_time_ms: start_time.elapsed().as_millis() as u64,
337            };
338            return Ok(vec![step_result]);
339        }
340        
341        // 远程执行(原有逻辑)
342        info!("Executing step: {} on {} servers", step.name, step.servers.len());
343
344        let mut step_results = Vec::new();
345        let mut futures = Vec::new();
346        // 用于收集所有服务器提取到的变量 (变量名, 变量值)
347        let mut extracted_vars: Vec<(String, String)> = Vec::new();
348        let clone_variable_manager = variable_manager.clone();
349
350        // 为每个服务器创建执行任务
351        let server_names: Vec<String> = step.servers.clone();
352        let  global_script= Arc::new(self.config.global_scripts.clone());
353        let clone_global_script = global_script.clone();
354        for server_name in server_names {
355            if !config.clients.contains_key(&server_name) {
356                return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
357            }
358
359            // 克隆必要的数据以避免生命周期问题
360            let config = config.clone();
361            let step_name = step.name.clone();
362            let output_callback = output_callback.cloned();
363            let clone_step = step.clone();
364            let pipeline_name = pipeline_name.to_string();
365            let mut clone_variable_manager = clone_variable_manager.clone();
366            clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
367            clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
368
369            let clone_global_script = clone_global_script.clone();
370
371            let future = tokio::spawn(async move {
372                // 创建新的执行器实例
373                let executor = RemoteExecutor { 
374                    config,
375                    variable_manager:clone_variable_manager,
376                };
377
378                match executor.execute_script_with_realtime_output(clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
379                    Ok(result) => {
380                        info!("Step '{}' on server '{}' completed with exit code: {}", 
381                              step_name, server_name, result.exit_code);
382                        Ok((server_name, result))
383                    }
384                    Err(e) => {
385                        error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
386                        Err(e)
387                    }
388                }
389
390            });
391
392            futures.push(future);
393        }
394
395        // 等待所有执行完成
396        let results = join_all(futures).await;
397        
398        for result in results {
399            match result {
400                Ok(Ok((server_name, execution_result))) => {
401                    let success = execution_result.success;
402                    // 提取变量(如果有extract规则)
403                    if let Some(extract_rules) = step.extract.clone() {
404                        // 提取变量到本地 map
405                        let mut temp_vm = VariableManager::new(None);
406                        if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
407                            info!("Failed to extract variables from step '{}': {}", step.name, e);
408                        } else {
409                            for (k, v) in temp_vm.get_variables() {
410                                extracted_vars.push((k.clone(), v.clone()));
411                            }
412                        }
413                    }
414                    
415                    step_results.push(StepExecutionResult {
416                        title: step.title.clone().unwrap_or(step.name.clone()),
417                        step_name: step.name.clone(),
418                        server_name,
419                        scritp_path:step.script.clone(),
420                        execution_result,
421                        overall_success: success,
422                        execution_time_ms: start_time.elapsed().as_millis() as u64,
423                    });
424                }
425                Ok(Err(e)) => {
426                    return Err(e);
427                }
428                Err(e) => {
429                    return Err(anyhow::anyhow!("Task execution failed: {}", e));
430                }
431            }
432        }
433        // 合并所有服务器提取到的变量到全局 variable_manager
434        for (k, v) in extracted_vars {
435            variable_manager.set_variable(k, v);
436        }
437
438        Ok(step_results)
439    }
440
441    /// 在指定客户端执行shell脚本(支持实时输出)
442    pub async fn execute_script_with_realtime_output(
443        &self, 
444        global_scripts:Arc<Vec<String>>,
445        client_name: &str, 
446        step: Step,
447        pipeline_name: &str,
448        output_callback: Option<OutputCallback>
449    ) -> Result<ExecutionResult> {
450        // 检查脚本文件是否存在
451        let script_path = Path::new(step.script.as_str());
452        if !script_path.exists() {
453            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
454        }
455
456        let client_config = self.config
457            .clients
458            .get(client_name)
459            .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
460
461        match client_config.execution_method {
462            ExecutionMethod::SSH => {
463                self.execute_script_via_ssh_with_realtime_output(global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
464            }
465            ExecutionMethod::WebSocket => {
466                Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
467            }
468        }
469    }
470
471    /// 通过SSH执行脚本(支持实时输出)
472    async fn execute_script_via_ssh_with_realtime_output(
473        &self, 
474        global_scripts:Arc<Vec<String>>,
475        client_config: &ClientConfig, 
476        step: Step,
477        server_name: &str,
478        pipeline_name: &str,
479        output_callback: Option<OutputCallback>
480    ) -> Result<ExecutionResult> {
481        let ssh_config = client_config.ssh_config.as_ref()
482            .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
483
484        let start_time = std::time::Instant::now();
485
486        // 克隆数据以避免生命周期问题
487        let ssh_config = ssh_config.clone();
488        let script_content = step.script.to_string();
489        let server_name = server_name.to_string();
490        let pipeline_name = pipeline_name.to_string();
491        let step_name = step.name.clone();
492        let extract_rules = step.extract.clone();
493        let variable_manager = self.variable_manager.clone();
494
495        // 在tokio的阻塞线程池中执行SSH操作
496        let result = match tokio::task::spawn_blocking(move || {
497            SshExecutor::execute_script_with_realtime_output(
498                global_scripts.clone(),
499                &server_name, 
500                &ssh_config, 
501                &step,
502                &pipeline_name,
503                &step_name,
504                output_callback,
505                variable_manager,
506                extract_rules
507            )
508        }).await?.context(format!("run script faield")) {
509            Ok(v) => v,
510            Err(e) => {
511
512                let execution_time = start_time.elapsed().as_millis() as u64;
513                return Ok(ExecutionResult{
514                    success: false,
515                    stdout: "".to_string(),
516                    stderr: format!("{:?}", e),
517                    script: script_content,
518                    exit_code: 0,
519                    execution_time_ms: execution_time,
520                    error_message: Some(format!("{:?}", e)),
521                });
522            }
523        };
524
525        let execution_time = start_time.elapsed().as_millis() as u64;
526
527        Ok(ExecutionResult {
528            success: result.exit_code == 0,
529            stdout: result.stdout,
530            stderr: result.stderr,
531            script: script_content,
532            exit_code: result.exit_code,
533            execution_time_ms: execution_time,
534            error_message: result.error_message,
535        })
536    }
537
538    /// 获取所有可用的客户端名称
539    pub fn get_available_clients(&self) -> Vec<String> {
540        self.config.clients.keys().cloned().collect()
541    }
542
543    /// 检查客户端是否存在
544    pub fn client_exists(&self, client_name: &str) -> bool {
545        self.config.clients.contains_key(client_name)
546    }
547
548    /// 获取所有可用的流水线名称
549    pub fn get_available_pipelines(&self) -> Vec<String> {
550        self.config.pipelines.iter().map(|p| p.name.clone()).collect()
551    }
552
553    /// 检查流水线是否存在
554    pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
555        self.config.pipelines.iter().any(|p| p.name == pipeline_name)
556    }
557}