net_shell/executor/
mod.rs

1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tracing::{error, info};
7
8use crate::config::ConfigManager;
9use crate::models::{
10    ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult, 
11    RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
12};
13use crate::ssh::SshExecutor;
14use crate::ssh::local::LocalExecutor;
15use crate::vars::VariableManager;
16use crate::ShellExecutionResult;
17
18/// 远程执行器
19pub struct RemoteExecutor {
20    config: RemoteExecutionConfig,
21    variable_manager: VariableManager,
22}
23
24impl RemoteExecutor {
25
26
27    /// 从YAML文件创建执行器
28    pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
29        let content = std::fs::read_to_string(path)
30            .context("Failed to read YAML configuration file")?;
31        
32        Self::from_yaml_str(&content, variables)
33    }
34
35    /// 从YAML字符串创建执行器
36    pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
37        // 提取初始变量
38        let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
39
40        // 合并变量
41        let mut all_variables = HashMap::new();
42
43        if let Some(v) = initial_variables {
44            all_variables.extend(v);
45        }
46
47        if let Some(v) = variables {
48            all_variables.extend(v);
49        }
50        
51        // 创建变量管理器
52        let variable_manager = VariableManager::new(Some(all_variables));
53        
54        // 应用变量替换解析配置
55        let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
56        ConfigManager::validate_config(&config)?;
57        
58        Ok(Self { config, variable_manager})
59    }
60
61    /// 执行指定的流水线(支持实时输出)
62    pub async fn execute_pipeline_with_realtime_output(
63        &mut self, // 需要可变引用
64        pipeline_name: &str,
65        output_callback: Option<OutputCallback>,
66        log_callback: Option<OutputCallback>
67    ) -> Result<PipelineExecutionResult> {
68        let pipeline = self.config.pipelines.iter()
69            .find(|p| p.name == pipeline_name)
70            .cloned()
71            .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
72
73        let pipeline_name = pipeline.name.clone();
74        let steps: Vec<Step> = pipeline.steps.clone();
75        let start_time = std::time::Instant::now();
76        let mut all_step_results = Vec::new();
77
78        // 发送开始执行流水线的日志
79        if let Some(callback) = &log_callback {
80            let event = OutputEvent {
81                pipeline_name: pipeline_name.clone(),
82                server_name: "system".to_string(),
83                script_path:"".to_string(),
84                step: Step::default(), // 流水线开始事件没有具体的步骤
85                output_type: crate::models::OutputType::Log,
86                content: format!("开始执行流水线: {}", pipeline_name),
87                timestamp: std::time::Instant::now(),
88                variables: self.variable_manager.get_variables().clone(),
89            };
90            callback(event);
91        }
92
93        info!("Starting pipeline: {}", pipeline_name);
94
95        // 按顺序执行每个步骤(串行)
96        for step in steps {
97            // 合并 step 级变量到全局变量(优先级高)
98            let mut step_var_keys = Vec::new();
99            if let Some(vars) = &step.variables {
100                for (k, v) in vars {
101                    self.variable_manager.set_variable(k.clone(), v.clone());
102                    step_var_keys.push(k.clone());
103                }
104            }
105            // 对当前步骤应用变量替换
106            let mut step_with_variables = step.clone();
107            step_with_variables.script = self.variable_manager.replace_variables(&step.script);
108            
109            // 发送步骤开始事件
110            if let Some(callback) = &output_callback {
111                let event = OutputEvent {
112                    pipeline_name: pipeline_name.clone(),
113                    server_name: "system".to_string(),
114                    step: step.clone(), // 传递完整的Step对象
115                    output_type: crate::models::OutputType::StepStarted,
116                    script_path:step.script.clone(),
117                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
118                    timestamp: std::time::Instant::now(),
119                    variables: self.variable_manager.get_variables().clone(),
120                };
121                callback(event);
122            }
123            
124            // 发送开始执行步骤的日志
125            if let Some(callback) = &log_callback {
126                let event = OutputEvent {
127                    pipeline_name: pipeline_name.clone(),
128                    server_name: "system".to_string(),
129                    step: step.clone(), // 传递完整的Step对象
130                    script_path:step.script.clone(),
131                    output_type: crate::models::OutputType::Log,
132                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
133                    timestamp: std::time::Instant::now(),
134                    variables: self.variable_manager.get_variables().clone(),
135                };
136                callback(event);
137            }
138
139            info!("Starting step: {} on {} servers", step.name, step.servers.len());
140            
141            // 同一步骤内的所有服务器并发执行
142            let step_results = self.execute_step_with_realtime_output(pipeline.script.clone(),&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
143            
144            // 检查步骤是否成功(所有服务器都成功才算成功)
145            let step_success = step_results.iter().all(|r| r.execution_result.success);
146            
147            // 添加步骤结果
148            all_step_results.extend(step_results);
149
150            // 发送步骤完成事件
151            if let Some(callback) = &output_callback {
152                let status = if step_success { "成功" } else { "失败" };
153                let event = OutputEvent {
154                    pipeline_name: pipeline_name.clone(),
155                    script_path:step.script.clone(),
156                    server_name: "system".to_string(),
157                    step: step.clone(), // 传递完整的Step对象
158                    output_type: crate::models::OutputType::StepCompleted,
159                    content: format!("步骤完成: {} ({})", step.name, status),
160                    timestamp: std::time::Instant::now(),
161                    variables: self.variable_manager.get_variables().clone(),
162                };
163                callback(event);
164            }
165
166            // 如果步骤失败,可以选择是否继续执行后续步骤
167            if !step_success {
168                info!("Step '{}' failed, stopping pipeline", step.name);
169                break;
170            }
171            
172            info!("Step '{}' completed successfully", step.name);
173        }
174
175        let total_time = start_time.elapsed().as_millis() as u64;
176        let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
177
178        // 发送流水线完成日志
179        if let Some(callback) = &log_callback {
180            let status = if overall_success { "成功" } else { "失败" };
181            let event = OutputEvent {
182                pipeline_name: pipeline_name.clone(),
183                script_path:"".to_string(),
184                server_name: "system".to_string(),
185                step: Step::default(), // 流水线完成事件没有具体的步骤
186                output_type: crate::models::OutputType::Log,
187                content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
188                timestamp: std::time::Instant::now(),
189                variables: self.variable_manager.get_variables().clone(),
190            };
191            callback(event);
192        }
193
194        Ok(PipelineExecutionResult {
195            title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
196            pipeline_name: pipeline_name.clone(),
197            step_results: all_step_results,
198            overall_success,
199            total_execution_time_ms: total_time,
200        })
201    }
202
203    /// 执行所有流水线(支持实时输出)
204    pub async fn execute_all_pipelines_with_realtime_output(
205        &mut self, // 需要可变引用
206        output_callback: Option<OutputCallback>,
207        log_callback: Option<OutputCallback>
208    ) -> Result<ShellExecutionResult> {
209        let mut results = Vec::new();
210        
211        // 发送开始执行所有流水线的日志
212        if let Some(callback) = &log_callback {
213            let event = OutputEvent {
214                pipeline_name: "system".to_string(),
215                server_name: "system".to_string(),
216                script_path :"".to_string(),
217                step: Step::default(), // 系统级别事件没有具体步骤
218                output_type: crate::models::OutputType::Log,
219                content: format!("=== 远程脚本执行器 ==="),
220                timestamp: std::time::Instant::now(),
221                variables: self.variable_manager.get_variables().clone(),
222            };
223            callback(event);
224            
225            let event = OutputEvent {
226                pipeline_name: "system".to_string(),
227                server_name: "system".to_string(),
228                script_path :"".to_string(),
229                step: Step::default(), // 系统级别事件没有具体步骤
230                output_type: crate::models::OutputType::Log,
231                content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
232                timestamp: std::time::Instant::now(),
233                variables: self.variable_manager.get_variables().clone(),
234            };
235            callback(event);
236            
237            let event = OutputEvent {
238                pipeline_name: "system".to_string(),
239                script_path :"".to_string(),
240                server_name: "system".to_string(),
241                step: Step::default(), // 系统级别事件没有具体步骤
242                output_type: crate::models::OutputType::Log,
243                content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
244                timestamp: std::time::Instant::now(),
245                variables: self.variable_manager.get_variables().clone(),
246            };
247            callback(event);
248        }
249        
250        // 按顺序执行每个流水线(串行)
251        let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
252        for pipeline_name in pipeline_names {
253            // 发送开始执行流水线的日志
254            if let Some(callback) = &log_callback {
255                let event = OutputEvent {
256                    pipeline_name: pipeline_name.clone(),
257                    server_name: "system".to_string(),
258                    script_path :"".to_string(),
259                    step: Step::default(), // 流水线开始事件没有具体的步骤
260                    output_type: crate::models::OutputType::Log,
261                    content: format!("开始执行流水线: {}", pipeline_name),
262                    timestamp: std::time::Instant::now(),
263                    variables: self.variable_manager.get_variables().clone(),
264                };
265                callback(event);
266            }
267            info!("Starting pipeline: {}", pipeline_name);
268
269            let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
270            let success = result.overall_success;
271            results.push(result);
272            if !success {
273                info!("Pipeline '{}' failed, stopping execution", pipeline_name);
274                break;
275            }
276            info!("Pipeline '{}' completed successfully", pipeline_name);
277        }
278        
279        Ok(ShellExecutionResult{
280            success: true,
281            reason: "ok".to_string(),
282            pipeline_results: results,
283        })
284    }
285
286    /// 执行指定的流水线(原有方法,保持兼容性)
287    pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
288        self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
289    }
290
291    /// 执行单个步骤(支持实时输出)
292    async fn execute_step_with_realtime_output(
293        &mut self,
294        script: Option<String>,
295        step: &Step,
296        pipeline_name: &str,
297        output_callback: Option<&OutputCallback>
298    ) -> Result<Vec<StepExecutionResult>> {
299        let start_time = std::time::Instant::now();
300        // Clone config at the start to avoid &self borrow conflicts
301        let config = self.config.clone();
302        let variable_manager = &mut self.variable_manager;
303        
304        // 检查是否有服务器配置
305        if step.servers.is_empty() {
306            // 本地执行
307            info!("Executing step: {} locally (no servers specified)", step.name);
308            let output_callback = output_callback.cloned();
309            let step_clone = step.clone();
310            let pipeline_name = pipeline_name.to_string();
311            let step_name = step.name.clone();
312            let mut variables = variable_manager.get_variables().clone();
313            variables.insert("pipeline_name".to_string(), pipeline_name.clone());
314            variables.insert("step_name".to_string(), step_name.clone());
315            let execution_result = LocalExecutor::execute_script_with_realtime_output(
316                script.clone(),
317                self.config.global_scripts.clone(),
318                &step_clone,
319                &pipeline_name,
320                &step_name,
321                output_callback,
322                variables,
323            ).await?;
324            let success = execution_result.success;
325            // 提取变量(如果有extract规则)
326            if let Some(extract_rules) = step.extract.clone() {
327                if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
328                    info!("Failed to extract variables from step '{}': {}", step.name, e);
329                }
330            }
331            let step_result = StepExecutionResult {
332                title: step.title.clone().unwrap_or(step.name.clone()),
333                step_name: step.name.clone(),
334                scritp_path:step.script.clone(),
335                server_name: "localhost".to_string(),
336                execution_result,
337                overall_success: success,
338                execution_time_ms: start_time.elapsed().as_millis() as u64,
339            };
340            return Ok(vec![step_result]);
341        }
342        
343        // 远程执行(原有逻辑)
344        info!("Executing step: {} on {} servers", step.name, step.servers.len());
345
346        let mut step_results = Vec::new();
347        let mut futures = Vec::new();
348        // 用于收集所有服务器提取到的变量 (变量名, 变量值)
349        let mut extracted_vars: Vec<(String, String)> = Vec::new();
350        let clone_variable_manager = variable_manager.clone();
351
352        // 为每个服务器创建执行任务
353        let server_names: Vec<String> = step.servers.clone();
354        let  global_script= Arc::new(self.config.global_scripts.clone());
355        let clone_global_script = global_script.clone();
356        for server_name in server_names {
357            if !config.clients.contains_key(&server_name) {
358                return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
359            }
360
361            // 克隆必要的数据以避免生命周期问题
362            let config = config.clone();
363            let step_name = step.name.clone();
364            let output_callback = output_callback.cloned();
365            let clone_step = step.clone();
366            let pipeline_name = pipeline_name.to_string();
367            let mut clone_variable_manager = clone_variable_manager.clone();
368            clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
369            clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
370            let script = script.clone();
371
372            let clone_global_script = clone_global_script.clone();
373
374            let future = tokio::spawn(async move {
375                // 创建新的执行器实例
376                let executor = RemoteExecutor { 
377                    config,
378                    variable_manager:clone_variable_manager,
379                };
380
381                match executor.execute_script_with_realtime_output(script,clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
382                    Ok(result) => {
383                        info!("Step '{}' on server '{}' completed with exit code: {}", 
384                              step_name, server_name, result.exit_code);
385                        Ok((server_name, result))
386                    }
387                    Err(e) => {
388                        error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
389                        Err(e)
390                    }
391                }
392
393            });
394
395            futures.push(future);
396        }
397
398        // 等待所有执行完成
399        let results = join_all(futures).await;
400        
401        for result in results {
402            match result {
403                Ok(Ok((server_name, execution_result))) => {
404                    let success = execution_result.success;
405                    // 提取变量(如果有extract规则)
406                    if let Some(extract_rules) = step.extract.clone() {
407                        // 提取变量到本地 map
408                        let mut temp_vm = VariableManager::new(None);
409                        if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
410                            info!("Failed to extract variables from step '{}': {}", step.name, e);
411                        } else {
412                            for (k, v) in temp_vm.get_variables() {
413                                extracted_vars.push((k.clone(), v.clone()));
414                            }
415                        }
416                    }
417                    
418                    step_results.push(StepExecutionResult {
419                        title: step.title.clone().unwrap_or(step.name.clone()),
420                        step_name: step.name.clone(),
421                        server_name,
422                        scritp_path:step.script.clone(),
423                        execution_result,
424                        overall_success: success,
425                        execution_time_ms: start_time.elapsed().as_millis() as u64,
426                    });
427                }
428                Ok(Err(e)) => {
429                    return Err(e);
430                }
431                Err(e) => {
432                    return Err(anyhow::anyhow!("Task execution failed: {}", e));
433                }
434            }
435        }
436        // 合并所有服务器提取到的变量到全局 variable_manager
437        for (k, v) in extracted_vars {
438            variable_manager.set_variable(k, v);
439        }
440
441        Ok(step_results)
442    }
443
444    /// 在指定客户端执行shell脚本(支持实时输出)
445    pub async fn execute_script_with_realtime_output(
446        &self, 
447        script: Option<String>,
448        global_scripts:Arc<Vec<String>>,
449        client_name: &str, 
450        step: Step,
451        pipeline_name: &str,
452        output_callback: Option<OutputCallback>
453    ) -> Result<ExecutionResult> {
454        // 检查脚本文件是否存在
455        let script_path = Path::new(step.script.as_str());
456        if !script_path.exists() {
457            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
458        }
459
460        let client_config = self.config
461            .clients
462            .get(client_name)
463            .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
464
465        match client_config.execution_method {
466            ExecutionMethod::SSH => {
467                self.execute_script_via_ssh_with_realtime_output(script,global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
468            }
469            ExecutionMethod::WebSocket => {
470                Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
471            }
472        }
473    }
474
475    /// 通过SSH执行脚本(支持实时输出)
476    async fn execute_script_via_ssh_with_realtime_output(
477        &self, 
478        script: Option<String>,
479        global_scripts:Arc<Vec<String>>,
480        client_config: &ClientConfig, 
481        step: Step,
482        server_name: &str,
483        pipeline_name: &str,
484        output_callback: Option<OutputCallback>
485    ) -> Result<ExecutionResult> {
486        let ssh_config = client_config.ssh_config.as_ref()
487            .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
488
489        let start_time = std::time::Instant::now();
490
491        // 克隆数据以避免生命周期问题
492        let ssh_config = ssh_config.clone();
493        let script_content = step.script.to_string();
494        let server_name = server_name.to_string();
495        let pipeline_name = pipeline_name.to_string();
496        let step_name = step.name.clone();
497        let extract_rules = step.extract.clone();
498        let variable_manager = self.variable_manager.clone();
499
500        // 在tokio的阻塞线程池中执行SSH操作
501        let result = match tokio::task::spawn_blocking(move || {
502            SshExecutor::execute_script_with_realtime_output(
503                script.clone(),
504                global_scripts.clone(),
505                &server_name, 
506                &ssh_config, 
507                &step,
508                &pipeline_name,
509                &step_name,
510                output_callback,
511                variable_manager,
512                extract_rules
513            )
514        }).await?.context(format!("run script faield")) {
515            Ok(v) => v,
516            Err(e) => {
517
518                let execution_time = start_time.elapsed().as_millis() as u64;
519                return Ok(ExecutionResult{
520                    success: false,
521                    stdout: "".to_string(),
522                    stderr: format!("{:?}", e),
523                    script: script_content,
524                    exit_code: 0,
525                    execution_time_ms: execution_time,
526                    error_message: Some(format!("{:?}", e)),
527                });
528            }
529        };
530
531        let execution_time = start_time.elapsed().as_millis() as u64;
532
533        Ok(ExecutionResult {
534            success: result.exit_code == 0,
535            stdout: result.stdout,
536            stderr: result.stderr,
537            script: script_content,
538            exit_code: result.exit_code,
539            execution_time_ms: execution_time,
540            error_message: result.error_message,
541        })
542    }
543
544    /// 获取所有可用的客户端名称
545    pub fn get_available_clients(&self) -> Vec<String> {
546        self.config.clients.keys().cloned().collect()
547    }
548
549    /// 检查客户端是否存在
550    pub fn client_exists(&self, client_name: &str) -> bool {
551        self.config.clients.contains_key(client_name)
552    }
553
554    /// 获取所有可用的流水线名称
555    pub fn get_available_pipelines(&self) -> Vec<String> {
556        self.config.pipelines.iter().map(|p| p.name.clone()).collect()
557    }
558
559    /// 检查流水线是否存在
560    pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
561        self.config.pipelines.iter().any(|p| p.name == pipeline_name)
562    }
563}