net_shell/executor/
mod.rs

1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tracing::{error, info};
7
8use crate::config::ConfigManager;
9use crate::models::{
10    ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult, 
11    RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
12};
13use crate::ssh::SshExecutor;
14use crate::ssh::local::LocalExecutor;
15use crate::vars::VariableManager;
16use crate::ShellExecutionResult;
17
18/// 远程执行器
19pub struct RemoteExecutor {
20    config: RemoteExecutionConfig,
21    variable_manager: VariableManager,
22}
23
24impl RemoteExecutor {
25
26
27    /// 从YAML文件创建执行器
28    pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
29        let content = std::fs::read_to_string(path)
30            .context("Failed to read YAML configuration file")?;
31        
32        Self::from_yaml_str(&content, variables)
33    }
34
35    /// 从YAML字符串创建执行器
36    pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
37        // 提取初始变量
38        let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
39
40        // 合并变量
41        let mut all_variables = HashMap::new();
42
43        if let Some(v) = initial_variables {
44            all_variables.extend(v);
45        }
46
47        if let Some(v) = variables {
48            all_variables.extend(v);
49        }
50        
51        // 创建变量管理器
52        let variable_manager = VariableManager::new(Some(all_variables));
53        
54        // 应用变量替换解析配置
55        let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
56        ConfigManager::validate_config(&config)?;
57        
58        Ok(Self { config, variable_manager})
59    }
60
61    /// 执行指定的流水线(支持实时输出)
62    pub async fn execute_pipeline_with_realtime_output(
63        &mut self, // 需要可变引用
64        pipeline_name: &str,
65        output_callback: Option<OutputCallback>,
66        log_callback: Option<OutputCallback>
67    ) -> Result<PipelineExecutionResult> {
68        let pipeline = self.config.pipelines.iter()
69            .find(|p| p.name == pipeline_name)
70            .cloned()
71            .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
72
73        let pipeline_name = pipeline.name.clone();
74        let steps: Vec<Step> = pipeline.steps.clone();
75        let start_time = std::time::Instant::now();
76        let mut all_step_results = Vec::new();
77
78        // 发送开始执行流水线的日志
79        if let Some(callback) = &log_callback {
80            let event = OutputEvent {
81                pipeline_name: pipeline_name.clone(),
82                server_name: "system".to_string(),
83                step: Step::default(), // 流水线开始事件没有具体的步骤
84                output_type: crate::models::OutputType::Log,
85                content: format!("开始执行流水线: {}", pipeline_name),
86                timestamp: std::time::Instant::now(),
87                variables: self.variable_manager.get_variables().clone(),
88            };
89            callback(event);
90        }
91
92        info!("Starting pipeline: {}", pipeline_name);
93
94        // 按顺序执行每个步骤(串行)
95        for step in steps {
96            // 合并 step 级变量到全局变量(优先级高)
97            let mut step_var_keys = Vec::new();
98            if let Some(vars) = &step.variables {
99                for (k, v) in vars {
100                    self.variable_manager.set_variable(k.clone(), v.clone());
101                    step_var_keys.push(k.clone());
102                }
103            }
104            // 对当前步骤应用变量替换
105            let mut step_with_variables = step.clone();
106            step_with_variables.script = self.variable_manager.replace_variables(&step.script);
107            
108            // 发送步骤开始事件
109            if let Some(callback) = &output_callback {
110                let event = OutputEvent {
111                    pipeline_name: pipeline_name.clone(),
112                    server_name: "system".to_string(),
113                    step: step.clone(), // 传递完整的Step对象
114                    output_type: crate::models::OutputType::StepStarted,
115                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
116                    timestamp: std::time::Instant::now(),
117                    variables: self.variable_manager.get_variables().clone(),
118                };
119                callback(event);
120            }
121            
122            // 发送开始执行步骤的日志
123            if let Some(callback) = &log_callback {
124                let event = OutputEvent {
125                    pipeline_name: pipeline_name.clone(),
126                    server_name: "system".to_string(),
127                    step: step.clone(), // 传递完整的Step对象
128                    output_type: crate::models::OutputType::Log,
129                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
130                    timestamp: std::time::Instant::now(),
131                    variables: self.variable_manager.get_variables().clone(),
132                };
133                callback(event);
134            }
135
136            info!("Starting step: {} on {} servers", step.name, step.servers.len());
137            
138            // 同一步骤内的所有服务器并发执行
139            let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
140            
141            // 检查步骤是否成功(所有服务器都成功才算成功)
142            let step_success = step_results.iter().all(|r| r.execution_result.success);
143            
144            // 添加步骤结果
145            all_step_results.extend(step_results);
146
147            // 发送步骤完成事件
148            if let Some(callback) = &output_callback {
149                let status = if step_success { "成功" } else { "失败" };
150                let event = OutputEvent {
151                    pipeline_name: pipeline_name.clone(),
152                    server_name: "system".to_string(),
153                    step: step.clone(), // 传递完整的Step对象
154                    output_type: crate::models::OutputType::StepCompleted,
155                    content: format!("步骤完成: {} ({})", step.name, status),
156                    timestamp: std::time::Instant::now(),
157                    variables: self.variable_manager.get_variables().clone(),
158                };
159                callback(event);
160            }
161
162            // 如果步骤失败,可以选择是否继续执行后续步骤
163            if !step_success {
164                info!("Step '{}' failed, stopping pipeline", step.name);
165                break;
166            }
167            
168            info!("Step '{}' completed successfully", step.name);
169        }
170
171        let total_time = start_time.elapsed().as_millis() as u64;
172        let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
173
174        // 发送流水线完成日志
175        if let Some(callback) = &log_callback {
176            let status = if overall_success { "成功" } else { "失败" };
177            let event = OutputEvent {
178                pipeline_name: pipeline_name.clone(),
179                server_name: "system".to_string(),
180                step: Step::default(), // 流水线完成事件没有具体的步骤
181                output_type: crate::models::OutputType::Log,
182                content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
183                timestamp: std::time::Instant::now(),
184                variables: self.variable_manager.get_variables().clone(),
185            };
186            callback(event);
187        }
188
189        Ok(PipelineExecutionResult {
190            title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
191            pipeline_name: pipeline_name.clone(),
192            step_results: all_step_results,
193            overall_success,
194            total_execution_time_ms: total_time,
195        })
196    }
197
198    /// 执行所有流水线(支持实时输出)
199    pub async fn execute_all_pipelines_with_realtime_output(
200        &mut self, // 需要可变引用
201        output_callback: Option<OutputCallback>,
202        log_callback: Option<OutputCallback>
203    ) -> Result<ShellExecutionResult> {
204        let mut results = Vec::new();
205        
206        // 发送开始执行所有流水线的日志
207        if let Some(callback) = &log_callback {
208            let event = OutputEvent {
209                pipeline_name: "system".to_string(),
210                server_name: "system".to_string(),
211                step: Step::default(), // 系统级别事件没有具体步骤
212                output_type: crate::models::OutputType::Log,
213                content: format!("=== 远程脚本执行器 ==="),
214                timestamp: std::time::Instant::now(),
215                variables: self.variable_manager.get_variables().clone(),
216            };
217            callback(event);
218            
219            let event = OutputEvent {
220                pipeline_name: "system".to_string(),
221                server_name: "system".to_string(),
222                step: Step::default(), // 系统级别事件没有具体步骤
223                output_type: crate::models::OutputType::Log,
224                content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
225                timestamp: std::time::Instant::now(),
226                variables: self.variable_manager.get_variables().clone(),
227            };
228            callback(event);
229            
230            let event = OutputEvent {
231                pipeline_name: "system".to_string(),
232                server_name: "system".to_string(),
233                step: Step::default(), // 系统级别事件没有具体步骤
234                output_type: crate::models::OutputType::Log,
235                content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
236                timestamp: std::time::Instant::now(),
237                variables: self.variable_manager.get_variables().clone(),
238            };
239            callback(event);
240        }
241        
242        // 按顺序执行每个流水线(串行)
243        let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
244        for pipeline_name in pipeline_names {
245            // 发送开始执行流水线的日志
246            if let Some(callback) = &log_callback {
247                let event = OutputEvent {
248                    pipeline_name: pipeline_name.clone(),
249                    server_name: "system".to_string(),
250                    step: Step::default(), // 流水线开始事件没有具体的步骤
251                    output_type: crate::models::OutputType::Log,
252                    content: format!("开始执行流水线: {}", pipeline_name),
253                    timestamp: std::time::Instant::now(),
254                    variables: self.variable_manager.get_variables().clone(),
255                };
256                callback(event);
257            }
258            info!("Starting pipeline: {}", pipeline_name);
259
260            let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
261            let success = result.overall_success;
262            results.push(result);
263            if !success {
264                info!("Pipeline '{}' failed, stopping execution", pipeline_name);
265                break;
266            }
267            info!("Pipeline '{}' completed successfully", pipeline_name);
268        }
269        
270        Ok(ShellExecutionResult{
271            success: true,
272            reason: "ok".to_string(),
273            pipeline_results: results,
274        })
275    }
276
277    /// 执行指定的流水线(原有方法,保持兼容性)
278    pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
279        self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
280    }
281
282    /// 执行单个步骤(支持实时输出)
283    async fn execute_step_with_realtime_output(
284        &mut self,
285        step: &Step,
286        pipeline_name: &str,
287        output_callback: Option<&OutputCallback>
288    ) -> Result<Vec<StepExecutionResult>> {
289        let start_time = std::time::Instant::now();
290        // Clone config at the start to avoid &self borrow conflicts
291        let config = self.config.clone();
292        let variable_manager = &mut self.variable_manager;
293        
294        // 检查是否有服务器配置
295        if step.servers.is_empty() {
296            // 本地执行
297            info!("Executing step: {} locally (no servers specified)", step.name);
298            let output_callback = output_callback.cloned();
299            let step_clone = step.clone();
300            let pipeline_name = pipeline_name.to_string();
301            let step_name = step.name.clone();
302            let mut variables = variable_manager.get_variables().clone();
303            variables.insert("pipeline_name".to_string(), pipeline_name.clone());
304            variables.insert("step_name".to_string(), step_name.clone());
305            let execution_result = LocalExecutor::execute_script_with_realtime_output(
306                self.config.global_scripts.clone(),
307                &step_clone,
308                &pipeline_name,
309                &step_name,
310                output_callback,
311                variables,
312            ).await?;
313            let success = execution_result.success;
314            // 提取变量(如果有extract规则)
315            if let Some(extract_rules) = step.extract.clone() {
316                if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
317                    info!("Failed to extract variables from step '{}': {}", step.name, e);
318                }
319            }
320            let step_result = StepExecutionResult {
321                title: step.title.clone().unwrap_or(step.name.clone()),
322                step_name: step.name.clone(),
323                server_name: "localhost".to_string(),
324                execution_result,
325                overall_success: success,
326                execution_time_ms: start_time.elapsed().as_millis() as u64,
327            };
328            return Ok(vec![step_result]);
329        }
330        
331        // 远程执行(原有逻辑)
332        info!("Executing step: {} on {} servers", step.name, step.servers.len());
333
334        let mut step_results = Vec::new();
335        let mut futures = Vec::new();
336        // 用于收集所有服务器提取到的变量 (变量名, 变量值)
337        let mut extracted_vars: Vec<(String, String)> = Vec::new();
338        let clone_variable_manager = variable_manager.clone();
339
340        // 为每个服务器创建执行任务
341        let server_names: Vec<String> = step.servers.clone();
342        let  global_script= Arc::new(self.config.global_scripts.clone());
343        let clone_global_script = global_script.clone();
344        for server_name in server_names {
345            if !config.clients.contains_key(&server_name) {
346                return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
347            }
348
349            // 克隆必要的数据以避免生命周期问题
350            let config = config.clone();
351            let step_name = step.name.clone();
352            let output_callback = output_callback.cloned();
353            let clone_step = step.clone();
354            let pipeline_name = pipeline_name.to_string();
355            let mut clone_variable_manager = clone_variable_manager.clone();
356            clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
357            clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
358
359            let clone_global_script = clone_global_script.clone();
360
361            let future = tokio::spawn(async move {
362                // 创建新的执行器实例
363                let executor = RemoteExecutor { 
364                    config,
365                    variable_manager:clone_variable_manager,
366                };
367
368                match executor.execute_script_with_realtime_output(clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
369                    Ok(result) => {
370                        info!("Step '{}' on server '{}' completed with exit code: {}", 
371                              step_name, server_name, result.exit_code);
372                        Ok((server_name, result))
373                    }
374                    Err(e) => {
375                        error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
376                        Err(e)
377                    }
378                }
379
380            });
381
382            futures.push(future);
383        }
384
385        // 等待所有执行完成
386        let results = join_all(futures).await;
387        
388        for result in results {
389            match result {
390                Ok(Ok((server_name, execution_result))) => {
391                    let success = execution_result.success;
392                    // 提取变量(如果有extract规则)
393                    if let Some(extract_rules) = step.extract.clone() {
394                        // 提取变量到本地 map
395                        let mut temp_vm = VariableManager::new(None);
396                        if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
397                            info!("Failed to extract variables from step '{}': {}", step.name, e);
398                        } else {
399                            for (k, v) in temp_vm.get_variables() {
400                                extracted_vars.push((k.clone(), v.clone()));
401                            }
402                        }
403                    }
404                    
405                    step_results.push(StepExecutionResult {
406                        title: step.title.clone().unwrap_or(step.name.clone()),
407                        step_name: step.name.clone(),
408                        server_name,
409                        execution_result,
410                        overall_success: success,
411                        execution_time_ms: start_time.elapsed().as_millis() as u64,
412                    });
413                }
414                Ok(Err(e)) => {
415                    return Err(e);
416                }
417                Err(e) => {
418                    return Err(anyhow::anyhow!("Task execution failed: {}", e));
419                }
420            }
421        }
422        // 合并所有服务器提取到的变量到全局 variable_manager
423        for (k, v) in extracted_vars {
424            variable_manager.set_variable(k, v);
425        }
426
427        Ok(step_results)
428    }
429
430    /// 在指定客户端执行shell脚本(支持实时输出)
431    pub async fn execute_script_with_realtime_output(
432        &self, 
433        global_scripts:Arc<Vec<String>>,
434        client_name: &str, 
435        step: Step,
436        pipeline_name: &str,
437        output_callback: Option<OutputCallback>
438    ) -> Result<ExecutionResult> {
439        // 检查脚本文件是否存在
440        let script_path = Path::new(step.script.as_str());
441        if !script_path.exists() {
442            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
443        }
444
445        let client_config = self.config
446            .clients
447            .get(client_name)
448            .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
449
450        match client_config.execution_method {
451            ExecutionMethod::SSH => {
452                self.execute_script_via_ssh_with_realtime_output(global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
453            }
454            ExecutionMethod::WebSocket => {
455                Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
456            }
457        }
458    }
459
460    /// 通过SSH执行脚本(支持实时输出)
461    async fn execute_script_via_ssh_with_realtime_output(
462        &self, 
463        global_scripts:Arc<Vec<String>>,
464        client_config: &ClientConfig, 
465        step: Step,
466        server_name: &str,
467        pipeline_name: &str,
468        output_callback: Option<OutputCallback>
469    ) -> Result<ExecutionResult> {
470        let ssh_config = client_config.ssh_config.as_ref()
471            .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
472
473        let start_time = std::time::Instant::now();
474
475        // 克隆数据以避免生命周期问题
476        let ssh_config = ssh_config.clone();
477        let script_content = step.script.to_string();
478        let server_name = server_name.to_string();
479        let pipeline_name = pipeline_name.to_string();
480        let step_name = step.name.clone();
481        let extract_rules = step.extract.clone();
482        let variable_manager = self.variable_manager.clone();
483
484        // 在tokio的阻塞线程池中执行SSH操作
485        let result = match tokio::task::spawn_blocking(move || {
486            SshExecutor::execute_script_with_realtime_output(
487                global_scripts.clone(),
488                &server_name, 
489                &ssh_config, 
490                &step,
491                &pipeline_name,
492                &step_name,
493                output_callback,
494                variable_manager,
495                extract_rules
496            )
497        }).await?.context(format!("run script faield")) {
498            Ok(v) => v,
499            Err(e) => {
500
501                let execution_time = start_time.elapsed().as_millis() as u64;
502                return Ok(ExecutionResult{
503                    success: false,
504                    stdout: "".to_string(),
505                    stderr: format!("{:?}", e),
506                    script: script_content,
507                    exit_code: 0,
508                    execution_time_ms: execution_time,
509                    error_message: Some(format!("{:?}", e)),
510                });
511            }
512        };
513
514        let execution_time = start_time.elapsed().as_millis() as u64;
515
516        Ok(ExecutionResult {
517            success: result.exit_code == 0,
518            stdout: result.stdout,
519            stderr: result.stderr,
520            script: script_content,
521            exit_code: result.exit_code,
522            execution_time_ms: execution_time,
523            error_message: result.error_message,
524        })
525    }
526
527    /// 获取所有可用的客户端名称
528    pub fn get_available_clients(&self) -> Vec<String> {
529        self.config.clients.keys().cloned().collect()
530    }
531
532    /// 检查客户端是否存在
533    pub fn client_exists(&self, client_name: &str) -> bool {
534        self.config.clients.contains_key(client_name)
535    }
536
537    /// 获取所有可用的流水线名称
538    pub fn get_available_pipelines(&self) -> Vec<String> {
539        self.config.pipelines.iter().map(|p| p.name.clone()).collect()
540    }
541
542    /// 检查流水线是否存在
543    pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
544        self.config.pipelines.iter().any(|p| p.name == pipeline_name)
545    }
546}