net_shell/executor/
mod.rs

1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use tracing::{error, info};
6
7use crate::config::ConfigManager;
8use crate::models::{
9    ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult, 
10    RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
11};
12use crate::ssh::SshExecutor;
13use crate::ssh::local::LocalExecutor;
14use crate::vars::VariableManager;
15
16/// 远程执行器
17pub struct RemoteExecutor {
18    config: RemoteExecutionConfig,
19    variable_manager: VariableManager,
20}
21
22impl RemoteExecutor {
23
24
25    /// 从YAML文件创建执行器
26    pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
27        let content = std::fs::read_to_string(path)
28            .context("Failed to read YAML configuration file")?;
29        
30        Self::from_yaml_str(&content, variables)
31    }
32
33    /// 从YAML字符串创建执行器
34    pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
35        // 提取初始变量
36        let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
37
38        // 合并变量
39        let mut all_variables = HashMap::new();
40
41        if let Some(v) = initial_variables {
42            all_variables.extend(v);
43        }
44
45        if let Some(v) = variables {
46            all_variables.extend(v);
47        }
48        
49        // 创建变量管理器
50        let variable_manager = VariableManager::new(Some(all_variables));
51        
52        // 应用变量替换解析配置
53        let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
54        ConfigManager::validate_config(&config)?;
55        
56        Ok(Self { config, variable_manager})
57    }
58
59    /// 执行指定的流水线(支持实时输出)
60    pub async fn execute_pipeline_with_realtime_output(
61        &mut self, // 需要可变引用
62        pipeline_name: &str,
63        output_callback: Option<OutputCallback>,
64        log_callback: Option<OutputCallback>
65    ) -> Result<PipelineExecutionResult> {
66        let pipeline = self.config.pipelines.iter()
67            .find(|p| p.name == pipeline_name)
68            .cloned()
69            .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
70
71        let pipeline_name = pipeline.name.clone();
72        let steps: Vec<Step> = pipeline.steps.clone();
73        let start_time = std::time::Instant::now();
74        let mut all_step_results = Vec::new();
75
76        // 发送开始执行流水线的日志
77        if let Some(callback) = &log_callback {
78            let event = OutputEvent {
79                pipeline_name: pipeline_name.clone(),
80                server_name: "system".to_string(),
81                step: Step::default(), // 流水线开始事件没有具体的步骤
82                output_type: crate::models::OutputType::Log,
83                content: format!("开始执行流水线: {}", pipeline_name),
84                timestamp: std::time::Instant::now(),
85                variables: self.variable_manager.get_variables().clone(),
86            };
87            callback(event);
88        }
89
90        info!("Starting pipeline: {}", pipeline_name);
91
92        // 按顺序执行每个步骤(串行)
93        for step in steps {
94            // 合并 step 级变量到全局变量(优先级高)
95            let mut step_var_keys = Vec::new();
96            if let Some(vars) = &step.variables {
97                for (k, v) in vars {
98                    self.variable_manager.set_variable(k.clone(), v.clone());
99                    step_var_keys.push(k.clone());
100                }
101            }
102            // 对当前步骤应用变量替换
103            let mut step_with_variables = step.clone();
104            step_with_variables.script = self.variable_manager.replace_variables(&step.script);
105            
106            // 发送步骤开始事件
107            if let Some(callback) = &output_callback {
108                let event = OutputEvent {
109                    pipeline_name: pipeline_name.clone(),
110                    server_name: "system".to_string(),
111                    step: step.clone(), // 传递完整的Step对象
112                    output_type: crate::models::OutputType::StepStarted,
113                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
114                    timestamp: std::time::Instant::now(),
115                    variables: self.variable_manager.get_variables().clone(),
116                };
117                callback(event);
118            }
119            
120            // 发送开始执行步骤的日志
121            if let Some(callback) = &log_callback {
122                let event = OutputEvent {
123                    pipeline_name: pipeline_name.clone(),
124                    server_name: "system".to_string(),
125                    step: step.clone(), // 传递完整的Step对象
126                    output_type: crate::models::OutputType::Log,
127                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
128                    timestamp: std::time::Instant::now(),
129                    variables: self.variable_manager.get_variables().clone(),
130                };
131                callback(event);
132            }
133
134            info!("Starting step: {} on {} servers", step.name, step.servers.len());
135            
136            // 同一步骤内的所有服务器并发执行
137            let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
138            
139            // 检查步骤是否成功(所有服务器都成功才算成功)
140            let step_success = step_results.iter().all(|r| r.execution_result.success);
141            
142            // 添加步骤结果
143            all_step_results.extend(step_results);
144
145            // 发送步骤完成事件
146            if let Some(callback) = &output_callback {
147                let status = if step_success { "成功" } else { "失败" };
148                let event = OutputEvent {
149                    pipeline_name: pipeline_name.clone(),
150                    server_name: "system".to_string(),
151                    step: step.clone(), // 传递完整的Step对象
152                    output_type: crate::models::OutputType::StepCompleted,
153                    content: format!("步骤完成: {} ({})", step.name, status),
154                    timestamp: std::time::Instant::now(),
155                    variables: self.variable_manager.get_variables().clone(),
156                };
157                callback(event);
158            }
159
160            // 如果步骤失败,可以选择是否继续执行后续步骤
161            if !step_success {
162                info!("Step '{}' failed, stopping pipeline", step.name);
163                break;
164            }
165            
166            info!("Step '{}' completed successfully", step.name);
167        }
168
169        let total_time = start_time.elapsed().as_millis() as u64;
170        let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
171
172        // 发送流水线完成日志
173        if let Some(callback) = &log_callback {
174            let status = if overall_success { "成功" } else { "失败" };
175            let event = OutputEvent {
176                pipeline_name: pipeline_name.clone(),
177                server_name: "system".to_string(),
178                step: Step::default(), // 流水线完成事件没有具体的步骤
179                output_type: crate::models::OutputType::Log,
180                content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
181                timestamp: std::time::Instant::now(),
182                variables: self.variable_manager.get_variables().clone(),
183            };
184            callback(event);
185        }
186
187        Ok(PipelineExecutionResult {
188            pipeline_name: pipeline_name.clone(),
189            step_results: all_step_results,
190            overall_success,
191            total_execution_time_ms: total_time,
192        })
193    }
194
195    /// 执行所有流水线(支持实时输出)
196    pub async fn execute_all_pipelines_with_realtime_output(
197        &mut self, // 需要可变引用
198        output_callback: Option<OutputCallback>,
199        log_callback: Option<OutputCallback>
200    ) -> Result<Vec<PipelineExecutionResult>> {
201        let mut results = Vec::new();
202        
203        // 发送开始执行所有流水线的日志
204        if let Some(callback) = &log_callback {
205            let event = OutputEvent {
206                pipeline_name: "system".to_string(),
207                server_name: "system".to_string(),
208                step: Step::default(), // 系统级别事件没有具体步骤
209                output_type: crate::models::OutputType::Log,
210                content: format!("=== 远程脚本执行器 ==="),
211                timestamp: std::time::Instant::now(),
212                variables: self.variable_manager.get_variables().clone(),
213            };
214            callback(event);
215            
216            let event = OutputEvent {
217                pipeline_name: "system".to_string(),
218                server_name: "system".to_string(),
219                step: Step::default(), // 系统级别事件没有具体步骤
220                output_type: crate::models::OutputType::Log,
221                content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
222                timestamp: std::time::Instant::now(),
223                variables: self.variable_manager.get_variables().clone(),
224            };
225            callback(event);
226            
227            let event = OutputEvent {
228                pipeline_name: "system".to_string(),
229                server_name: "system".to_string(),
230                step: Step::default(), // 系统级别事件没有具体步骤
231                output_type: crate::models::OutputType::Log,
232                content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
233                timestamp: std::time::Instant::now(),
234                variables: self.variable_manager.get_variables().clone(),
235            };
236            callback(event);
237        }
238        
239        // 按顺序执行每个流水线(串行)
240        let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
241        for pipeline_name in pipeline_names {
242            // 发送开始执行流水线的日志
243            if let Some(callback) = &log_callback {
244                let event = OutputEvent {
245                    pipeline_name: pipeline_name.clone(),
246                    server_name: "system".to_string(),
247                    step: Step::default(), // 流水线开始事件没有具体的步骤
248                    output_type: crate::models::OutputType::Log,
249                    content: format!("开始执行流水线: {}", pipeline_name),
250                    timestamp: std::time::Instant::now(),
251                    variables: self.variable_manager.get_variables().clone(),
252                };
253                callback(event);
254            }
255            info!("Starting pipeline: {}", pipeline_name);
256            let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
257            let success = result.overall_success;
258            results.push(result);
259            if !success {
260                info!("Pipeline '{}' failed, stopping execution", pipeline_name);
261                break;
262            }
263            info!("Pipeline '{}' completed successfully", pipeline_name);
264        }
265        
266        Ok(results)
267    }
268
269    /// 执行指定的流水线(原有方法,保持兼容性)
270    pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
271        self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
272    }
273
274    /// 执行单个步骤(支持实时输出)
275    async fn execute_step_with_realtime_output(
276        &mut self,
277        step: &Step,
278        pipeline_name: &str,
279        output_callback: Option<&OutputCallback>
280    ) -> Result<Vec<StepExecutionResult>> {
281        let start_time = std::time::Instant::now();
282        // Clone config at the start to avoid &self borrow conflicts
283        let config = self.config.clone();
284        let variable_manager = &mut self.variable_manager;
285        
286        // 检查是否有服务器配置
287        if step.servers.is_empty() {
288            // 本地执行
289            info!("Executing step: {} locally (no servers specified)", step.name);
290            let output_callback = output_callback.cloned();
291            let step_clone = step.clone();
292            let pipeline_name = pipeline_name.to_string();
293            let step_name = step.name.clone();
294            let mut variables = variable_manager.get_variables().clone();
295            variables.insert("pipeline_name".to_string(), pipeline_name.clone());
296            variables.insert("step_name".to_string(), step_name.clone());
297            let execution_result = LocalExecutor::execute_script_with_realtime_output(
298                &step_clone,
299                &pipeline_name,
300                &step_name,
301                output_callback,
302                variables,
303            ).await?;
304            let success = execution_result.success;
305            // 提取变量(如果有extract规则)
306            if let Some(extract_rules) = step.extract.clone() {
307                if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
308                    info!("Failed to extract variables from step '{}': {}", step.name, e);
309                }
310            }
311            let step_result = StepExecutionResult {
312                step_name: step.name.clone(),
313                server_name: "localhost".to_string(),
314                execution_result,
315                overall_success: success,
316                execution_time_ms: start_time.elapsed().as_millis() as u64,
317            };
318            return Ok(vec![step_result]);
319        }
320        
321        // 远程执行(原有逻辑)
322        info!("Executing step: {} on {} servers", step.name, step.servers.len());
323
324        let mut step_results = Vec::new();
325        let mut futures = Vec::new();
326        // 用于收集所有服务器提取到的变量 (变量名, 变量值)
327        let mut extracted_vars: Vec<(String, String)> = Vec::new();
328        let clone_variable_manager = variable_manager.clone();
329
330        // 为每个服务器创建执行任务
331        let server_names: Vec<String> = step.servers.clone();
332        for server_name in server_names {
333            if !config.clients.contains_key(&server_name) {
334                return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
335            }
336
337            // 克隆必要的数据以避免生命周期问题
338            let config = config.clone();
339            let step_name = step.name.clone();
340            let output_callback = output_callback.cloned();
341            let clone_step = step.clone();
342            let pipeline_name = pipeline_name.to_string();
343            let mut clone_variable_manager = clone_variable_manager.clone();
344            clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
345            clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
346
347            let future = tokio::spawn(async move {
348                // 创建新的执行器实例
349                let executor = RemoteExecutor { 
350                    config,
351                    variable_manager:clone_variable_manager,
352                };
353
354                match executor.execute_script_with_realtime_output(&server_name, clone_step, &pipeline_name, output_callback).await {
355                    Ok(result) => {
356                        info!("Step '{}' on server '{}' completed with exit code: {}", 
357                              step_name, server_name, result.exit_code);
358                        Ok((server_name, result))
359                    }
360                    Err(e) => {
361                        error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
362                        Err(e)
363                    }
364                }
365
366            });
367
368            futures.push(future);
369        }
370
371        // 等待所有执行完成
372        let results = join_all(futures).await;
373        
374        for result in results {
375            match result {
376                Ok(Ok((server_name, execution_result))) => {
377                    let success = execution_result.success;
378                    // 提取变量(如果有extract规则)
379                    if let Some(extract_rules) = step.extract.clone() {
380                        // 提取变量到本地 map
381                        let mut temp_vm = VariableManager::new(None);
382                        if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
383                            info!("Failed to extract variables from step '{}': {}", step.name, e);
384                        } else {
385                            for (k, v) in temp_vm.get_variables() {
386                                extracted_vars.push((k.clone(), v.clone()));
387                            }
388                        }
389                    }
390                    
391                    step_results.push(StepExecutionResult {
392                        step_name: step.name.clone(),
393                        server_name,
394                        execution_result,
395                        overall_success: success,
396                        execution_time_ms: start_time.elapsed().as_millis() as u64,
397                    });
398                }
399                Ok(Err(e)) => {
400                    return Err(e);
401                }
402                Err(e) => {
403                    return Err(anyhow::anyhow!("Task execution failed: {}", e));
404                }
405            }
406        }
407        // 合并所有服务器提取到的变量到全局 variable_manager
408        for (k, v) in extracted_vars {
409            variable_manager.set_variable(k, v);
410        }
411
412        Ok(step_results)
413    }
414
415    /// 执行单个步骤(原有方法,保持兼容性)
416    async fn execute_step(&mut self, step: &Step) -> Result<Vec<StepExecutionResult>> {
417        self.execute_step_with_realtime_output(step, "unknown", None).await
418    }
419
420    /// 在指定客户端执行shell脚本(支持实时输出)
421    pub async fn execute_script_with_realtime_output(
422        &self, 
423        client_name: &str, 
424        step: Step,
425        pipeline_name: &str,
426        output_callback: Option<OutputCallback>
427    ) -> Result<ExecutionResult> {
428        // 检查脚本文件是否存在
429        let script_path = Path::new(step.script.as_str());
430        if !script_path.exists() {
431            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
432        }
433
434        let client_config = self.config
435            .clients
436            .get(client_name)
437            .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
438
439        match client_config.execution_method {
440            ExecutionMethod::SSH => {
441                self.execute_script_via_ssh_with_realtime_output(client_config, step, client_name, pipeline_name, output_callback).await
442            }
443            ExecutionMethod::WebSocket => {
444                Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
445            }
446        }
447    }
448
449    /// 通过SSH执行脚本(支持实时输出)
450    async fn execute_script_via_ssh_with_realtime_output(
451        &self, 
452        client_config: &ClientConfig, 
453        step: Step,
454        server_name: &str,
455        pipeline_name: &str,
456        output_callback: Option<OutputCallback>
457    ) -> Result<ExecutionResult> {
458        let ssh_config = client_config.ssh_config.as_ref()
459            .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
460
461        let start_time = std::time::Instant::now();
462
463        // 克隆数据以避免生命周期问题
464        let ssh_config = ssh_config.clone();
465        let script_content = step.script.to_string();
466        let server_name = server_name.to_string();
467        let pipeline_name = pipeline_name.to_string();
468        let step_name = step.name.clone();
469        let extract_rules = step.extract.clone();
470        let variable_manager = self.variable_manager.clone();
471        let clone_ssh_config = ssh_config.clone();
472
473        // 在tokio的阻塞线程池中执行SSH操作
474        let result = tokio::task::spawn_blocking(move || {
475            SshExecutor::execute_script_with_realtime_output(
476                &server_name, 
477                &ssh_config, 
478                &step,
479                &pipeline_name,
480                &step_name,
481                output_callback,
482                variable_manager,
483                extract_rules
484            )
485        }).await?.context(format!("{:#?}", clone_ssh_config))?;
486
487        let execution_time = start_time.elapsed().as_millis() as u64;
488
489        Ok(ExecutionResult {
490            success: result.exit_code == 0,
491            stdout: result.stdout,
492            stderr: result.stderr,
493            script: script_content,
494            exit_code: result.exit_code,
495            execution_time_ms: execution_time,
496            error_message: result.error_message,
497        })
498    }
499
500    /// 获取所有可用的客户端名称
501    pub fn get_available_clients(&self) -> Vec<String> {
502        self.config.clients.keys().cloned().collect()
503    }
504
505    /// 检查客户端是否存在
506    pub fn client_exists(&self, client_name: &str) -> bool {
507        self.config.clients.contains_key(client_name)
508    }
509
510    /// 获取所有可用的流水线名称
511    pub fn get_available_pipelines(&self) -> Vec<String> {
512        self.config.pipelines.iter().map(|p| p.name.clone()).collect()
513    }
514
515    /// 检查流水线是否存在
516    pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
517        self.config.pipelines.iter().any(|p| p.name == pipeline_name)
518    }
519}