net_shell/executor/
mod.rs

1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use tracing::{error, info};
6
7use crate::config::ConfigManager;
8use crate::models::{
9    ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult, 
10    RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
11};
12use crate::ssh::SshExecutor;
13use crate::ssh::local::LocalExecutor;
14use crate::vars::VariableManager;
15use crate::ShellExecutionResult;
16
17/// 远程执行器
18pub struct RemoteExecutor {
19    config: RemoteExecutionConfig,
20    variable_manager: VariableManager,
21}
22
23impl RemoteExecutor {
24
25
26    /// 从YAML文件创建执行器
27    pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
28        let content = std::fs::read_to_string(path)
29            .context("Failed to read YAML configuration file")?;
30        
31        Self::from_yaml_str(&content, variables)
32    }
33
34    /// 从YAML字符串创建执行器
35    pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
36        // 提取初始变量
37        let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
38
39        // 合并变量
40        let mut all_variables = HashMap::new();
41
42        if let Some(v) = initial_variables {
43            all_variables.extend(v);
44        }
45
46        if let Some(v) = variables {
47            all_variables.extend(v);
48        }
49        
50        // 创建变量管理器
51        let variable_manager = VariableManager::new(Some(all_variables));
52        
53        // 应用变量替换解析配置
54        let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
55        ConfigManager::validate_config(&config)?;
56        
57        Ok(Self { config, variable_manager})
58    }
59
60    /// 执行指定的流水线(支持实时输出)
61    pub async fn execute_pipeline_with_realtime_output(
62        &mut self, // 需要可变引用
63        pipeline_name: &str,
64        output_callback: Option<OutputCallback>,
65        log_callback: Option<OutputCallback>
66    ) -> Result<PipelineExecutionResult> {
67        let pipeline = self.config.pipelines.iter()
68            .find(|p| p.name == pipeline_name)
69            .cloned()
70            .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
71
72        let pipeline_name = pipeline.name.clone();
73        let steps: Vec<Step> = pipeline.steps.clone();
74        let start_time = std::time::Instant::now();
75        let mut all_step_results = Vec::new();
76
77        // 发送开始执行流水线的日志
78        if let Some(callback) = &log_callback {
79            let event = OutputEvent {
80                pipeline_name: pipeline_name.clone(),
81                server_name: "system".to_string(),
82                step: Step::default(), // 流水线开始事件没有具体的步骤
83                output_type: crate::models::OutputType::Log,
84                content: format!("开始执行流水线: {}", pipeline_name),
85                timestamp: std::time::Instant::now(),
86                variables: self.variable_manager.get_variables().clone(),
87            };
88            callback(event);
89        }
90
91        info!("Starting pipeline: {}", pipeline_name);
92
93        // 按顺序执行每个步骤(串行)
94        for step in steps {
95            // 合并 step 级变量到全局变量(优先级高)
96            let mut step_var_keys = Vec::new();
97            if let Some(vars) = &step.variables {
98                for (k, v) in vars {
99                    self.variable_manager.set_variable(k.clone(), v.clone());
100                    step_var_keys.push(k.clone());
101                }
102            }
103            // 对当前步骤应用变量替换
104            let mut step_with_variables = step.clone();
105            step_with_variables.script = self.variable_manager.replace_variables(&step.script);
106            
107            // 发送步骤开始事件
108            if let Some(callback) = &output_callback {
109                let event = OutputEvent {
110                    pipeline_name: pipeline_name.clone(),
111                    server_name: "system".to_string(),
112                    step: step.clone(), // 传递完整的Step对象
113                    output_type: crate::models::OutputType::StepStarted,
114                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
115                    timestamp: std::time::Instant::now(),
116                    variables: self.variable_manager.get_variables().clone(),
117                };
118                callback(event);
119            }
120            
121            // 发送开始执行步骤的日志
122            if let Some(callback) = &log_callback {
123                let event = OutputEvent {
124                    pipeline_name: pipeline_name.clone(),
125                    server_name: "system".to_string(),
126                    step: step.clone(), // 传递完整的Step对象
127                    output_type: crate::models::OutputType::Log,
128                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
129                    timestamp: std::time::Instant::now(),
130                    variables: self.variable_manager.get_variables().clone(),
131                };
132                callback(event);
133            }
134
135            info!("Starting step: {} on {} servers", step.name, step.servers.len());
136            
137            // 同一步骤内的所有服务器并发执行
138            let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
139            
140            // 检查步骤是否成功(所有服务器都成功才算成功)
141            let step_success = step_results.iter().all(|r| r.execution_result.success);
142            
143            // 添加步骤结果
144            all_step_results.extend(step_results);
145
146            // 发送步骤完成事件
147            if let Some(callback) = &output_callback {
148                let status = if step_success { "成功" } else { "失败" };
149                let event = OutputEvent {
150                    pipeline_name: pipeline_name.clone(),
151                    server_name: "system".to_string(),
152                    step: step.clone(), // 传递完整的Step对象
153                    output_type: crate::models::OutputType::StepCompleted,
154                    content: format!("步骤完成: {} ({})", step.name, status),
155                    timestamp: std::time::Instant::now(),
156                    variables: self.variable_manager.get_variables().clone(),
157                };
158                callback(event);
159            }
160
161            // 如果步骤失败,可以选择是否继续执行后续步骤
162            if !step_success {
163                info!("Step '{}' failed, stopping pipeline", step.name);
164                break;
165            }
166            
167            info!("Step '{}' completed successfully", step.name);
168        }
169
170        let total_time = start_time.elapsed().as_millis() as u64;
171        let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
172
173        // 发送流水线完成日志
174        if let Some(callback) = &log_callback {
175            let status = if overall_success { "成功" } else { "失败" };
176            let event = OutputEvent {
177                pipeline_name: pipeline_name.clone(),
178                server_name: "system".to_string(),
179                step: Step::default(), // 流水线完成事件没有具体的步骤
180                output_type: crate::models::OutputType::Log,
181                content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
182                timestamp: std::time::Instant::now(),
183                variables: self.variable_manager.get_variables().clone(),
184            };
185            callback(event);
186        }
187
188        Ok(PipelineExecutionResult {
189            title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
190            pipeline_name: pipeline_name.clone(),
191            step_results: all_step_results,
192            overall_success,
193            total_execution_time_ms: total_time,
194        })
195    }
196
197    /// 执行所有流水线(支持实时输出)
198    pub async fn execute_all_pipelines_with_realtime_output(
199        &mut self, // 需要可变引用
200        output_callback: Option<OutputCallback>,
201        log_callback: Option<OutputCallback>
202    ) -> Result<ShellExecutionResult> {
203        let mut results = Vec::new();
204        
205        // 发送开始执行所有流水线的日志
206        if let Some(callback) = &log_callback {
207            let event = OutputEvent {
208                pipeline_name: "system".to_string(),
209                server_name: "system".to_string(),
210                step: Step::default(), // 系统级别事件没有具体步骤
211                output_type: crate::models::OutputType::Log,
212                content: format!("=== 远程脚本执行器 ==="),
213                timestamp: std::time::Instant::now(),
214                variables: self.variable_manager.get_variables().clone(),
215            };
216            callback(event);
217            
218            let event = OutputEvent {
219                pipeline_name: "system".to_string(),
220                server_name: "system".to_string(),
221                step: Step::default(), // 系统级别事件没有具体步骤
222                output_type: crate::models::OutputType::Log,
223                content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
224                timestamp: std::time::Instant::now(),
225                variables: self.variable_manager.get_variables().clone(),
226            };
227            callback(event);
228            
229            let event = OutputEvent {
230                pipeline_name: "system".to_string(),
231                server_name: "system".to_string(),
232                step: Step::default(), // 系统级别事件没有具体步骤
233                output_type: crate::models::OutputType::Log,
234                content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
235                timestamp: std::time::Instant::now(),
236                variables: self.variable_manager.get_variables().clone(),
237            };
238            callback(event);
239        }
240        
241        // 按顺序执行每个流水线(串行)
242        let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
243        for pipeline_name in pipeline_names {
244            // 发送开始执行流水线的日志
245            if let Some(callback) = &log_callback {
246                let event = OutputEvent {
247                    pipeline_name: pipeline_name.clone(),
248                    server_name: "system".to_string(),
249                    step: Step::default(), // 流水线开始事件没有具体的步骤
250                    output_type: crate::models::OutputType::Log,
251                    content: format!("开始执行流水线: {}", pipeline_name),
252                    timestamp: std::time::Instant::now(),
253                    variables: self.variable_manager.get_variables().clone(),
254                };
255                callback(event);
256            }
257            info!("Starting pipeline: {}", pipeline_name);
258
259            let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
260            let success = result.overall_success;
261            results.push(result);
262            if !success {
263                info!("Pipeline '{}' failed, stopping execution", pipeline_name);
264                break;
265            }
266            info!("Pipeline '{}' completed successfully", pipeline_name);
267        }
268        
269        Ok(ShellExecutionResult{
270            success: true,
271            reason: "ok".to_string(),
272            pipeline_results: results,
273        })
274    }
275
276    /// 执行指定的流水线(原有方法,保持兼容性)
277    pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
278        self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
279    }
280
281    /// 执行单个步骤(支持实时输出)
282    async fn execute_step_with_realtime_output(
283        &mut self,
284        step: &Step,
285        pipeline_name: &str,
286        output_callback: Option<&OutputCallback>
287    ) -> Result<Vec<StepExecutionResult>> {
288        let start_time = std::time::Instant::now();
289        // Clone config at the start to avoid &self borrow conflicts
290        let config = self.config.clone();
291        let variable_manager = &mut self.variable_manager;
292        
293        // 检查是否有服务器配置
294        if step.servers.is_empty() {
295            // 本地执行
296            info!("Executing step: {} locally (no servers specified)", step.name);
297            let output_callback = output_callback.cloned();
298            let step_clone = step.clone();
299            let pipeline_name = pipeline_name.to_string();
300            let step_name = step.name.clone();
301            let mut variables = variable_manager.get_variables().clone();
302            variables.insert("pipeline_name".to_string(), pipeline_name.clone());
303            variables.insert("step_name".to_string(), step_name.clone());
304            let execution_result = LocalExecutor::execute_script_with_realtime_output(
305                &step_clone,
306                &pipeline_name,
307                &step_name,
308                output_callback,
309                variables,
310            ).await?;
311            let success = execution_result.success;
312            // 提取变量(如果有extract规则)
313            if let Some(extract_rules) = step.extract.clone() {
314                if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
315                    info!("Failed to extract variables from step '{}': {}", step.name, e);
316                }
317            }
318            let step_result = StepExecutionResult {
319                title: step.title.clone().unwrap_or(step.name.clone()),
320                step_name: step.name.clone(),
321                server_name: "localhost".to_string(),
322                execution_result,
323                overall_success: success,
324                execution_time_ms: start_time.elapsed().as_millis() as u64,
325            };
326            return Ok(vec![step_result]);
327        }
328        
329        // 远程执行(原有逻辑)
330        info!("Executing step: {} on {} servers", step.name, step.servers.len());
331
332        let mut step_results = Vec::new();
333        let mut futures = Vec::new();
334        // 用于收集所有服务器提取到的变量 (变量名, 变量值)
335        let mut extracted_vars: Vec<(String, String)> = Vec::new();
336        let clone_variable_manager = variable_manager.clone();
337
338        // 为每个服务器创建执行任务
339        let server_names: Vec<String> = step.servers.clone();
340        for server_name in server_names {
341            if !config.clients.contains_key(&server_name) {
342                return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
343            }
344
345            // 克隆必要的数据以避免生命周期问题
346            let config = config.clone();
347            let step_name = step.name.clone();
348            let output_callback = output_callback.cloned();
349            let clone_step = step.clone();
350            let pipeline_name = pipeline_name.to_string();
351            let mut clone_variable_manager = clone_variable_manager.clone();
352            clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
353            clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
354
355            let future = tokio::spawn(async move {
356                // 创建新的执行器实例
357                let executor = RemoteExecutor { 
358                    config,
359                    variable_manager:clone_variable_manager,
360                };
361
362                match executor.execute_script_with_realtime_output(&server_name, clone_step, &pipeline_name, output_callback).await {
363                    Ok(result) => {
364                        info!("Step '{}' on server '{}' completed with exit code: {}", 
365                              step_name, server_name, result.exit_code);
366                        Ok((server_name, result))
367                    }
368                    Err(e) => {
369                        error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
370                        Err(e)
371                    }
372                }
373
374            });
375
376            futures.push(future);
377        }
378
379        // 等待所有执行完成
380        let results = join_all(futures).await;
381        
382        for result in results {
383            match result {
384                Ok(Ok((server_name, execution_result))) => {
385                    let success = execution_result.success;
386                    // 提取变量(如果有extract规则)
387                    if let Some(extract_rules) = step.extract.clone() {
388                        // 提取变量到本地 map
389                        let mut temp_vm = VariableManager::new(None);
390                        if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
391                            info!("Failed to extract variables from step '{}': {}", step.name, e);
392                        } else {
393                            for (k, v) in temp_vm.get_variables() {
394                                extracted_vars.push((k.clone(), v.clone()));
395                            }
396                        }
397                    }
398                    
399                    step_results.push(StepExecutionResult {
400                        title: step.title.clone().unwrap_or(step.name.clone()),
401                        step_name: step.name.clone(),
402                        server_name,
403                        execution_result,
404                        overall_success: success,
405                        execution_time_ms: start_time.elapsed().as_millis() as u64,
406                    });
407                }
408                Ok(Err(e)) => {
409                    return Err(e);
410                }
411                Err(e) => {
412                    return Err(anyhow::anyhow!("Task execution failed: {}", e));
413                }
414            }
415        }
416        // 合并所有服务器提取到的变量到全局 variable_manager
417        for (k, v) in extracted_vars {
418            variable_manager.set_variable(k, v);
419        }
420
421        Ok(step_results)
422    }
423
424    /// 执行单个步骤(原有方法,保持兼容性)
425    async fn execute_step(&mut self, step: &Step) -> Result<Vec<StepExecutionResult>> {
426        self.execute_step_with_realtime_output(step, "unknown", None).await
427    }
428
429    /// 在指定客户端执行shell脚本(支持实时输出)
430    pub async fn execute_script_with_realtime_output(
431        &self, 
432        client_name: &str, 
433        step: Step,
434        pipeline_name: &str,
435        output_callback: Option<OutputCallback>
436    ) -> Result<ExecutionResult> {
437        // 检查脚本文件是否存在
438        let script_path = Path::new(step.script.as_str());
439        if !script_path.exists() {
440            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
441        }
442
443        let client_config = self.config
444            .clients
445            .get(client_name)
446            .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
447
448        match client_config.execution_method {
449            ExecutionMethod::SSH => {
450                self.execute_script_via_ssh_with_realtime_output(client_config, step, client_name, pipeline_name, output_callback).await
451            }
452            ExecutionMethod::WebSocket => {
453                Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
454            }
455        }
456    }
457
458    /// 通过SSH执行脚本(支持实时输出)
459    async fn execute_script_via_ssh_with_realtime_output(
460        &self, 
461        client_config: &ClientConfig, 
462        step: Step,
463        server_name: &str,
464        pipeline_name: &str,
465        output_callback: Option<OutputCallback>
466    ) -> Result<ExecutionResult> {
467        let ssh_config = client_config.ssh_config.as_ref()
468            .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
469
470        let start_time = std::time::Instant::now();
471
472        // 克隆数据以避免生命周期问题
473        let ssh_config = ssh_config.clone();
474        let script_content = step.script.to_string();
475        let server_name = server_name.to_string();
476        let pipeline_name = pipeline_name.to_string();
477        let step_name = step.name.clone();
478        let extract_rules = step.extract.clone();
479        let variable_manager = self.variable_manager.clone();
480        let clone_ssh_config = ssh_config.clone();
481
482        // 在tokio的阻塞线程池中执行SSH操作
483        let result = match tokio::task::spawn_blocking(move || {
484            SshExecutor::execute_script_with_realtime_output(
485                &server_name, 
486                &ssh_config, 
487                &step,
488                &pipeline_name,
489                &step_name,
490                output_callback,
491                variable_manager,
492                extract_rules
493            )
494        }).await?.context(format!("run script faield")) {
495            Ok(v) => v,
496            Err(e) => {
497
498                let execution_time = start_time.elapsed().as_millis() as u64;
499                return Ok(ExecutionResult{
500                    success: false,
501                    stdout: "".to_string(),
502                    stderr: format!("{:?}", e),
503                    script: script_content,
504                    exit_code: 0,
505                    execution_time_ms: execution_time,
506                    error_message: Some(format!("{:?}", e)),
507                });
508            }
509        };
510
511        let execution_time = start_time.elapsed().as_millis() as u64;
512
513        Ok(ExecutionResult {
514            success: result.exit_code == 0,
515            stdout: result.stdout,
516            stderr: result.stderr,
517            script: script_content,
518            exit_code: result.exit_code,
519            execution_time_ms: execution_time,
520            error_message: result.error_message,
521        })
522    }
523
524    /// 获取所有可用的客户端名称
525    pub fn get_available_clients(&self) -> Vec<String> {
526        self.config.clients.keys().cloned().collect()
527    }
528
529    /// 检查客户端是否存在
530    pub fn client_exists(&self, client_name: &str) -> bool {
531        self.config.clients.contains_key(client_name)
532    }
533
534    /// 获取所有可用的流水线名称
535    pub fn get_available_pipelines(&self) -> Vec<String> {
536        self.config.pipelines.iter().map(|p| p.name.clone()).collect()
537    }
538
539    /// 检查流水线是否存在
540    pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
541        self.config.pipelines.iter().any(|p| p.name == pipeline_name)
542    }
543}