net_shell/executor/
mod.rs

1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use tracing::{error, info};
6
7use crate::config::ConfigManager;
8use crate::models::{
9    ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult, 
10    RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
11};
12use crate::ssh::SshExecutor;
13use crate::ssh::local::LocalExecutor;
14use crate::vars::VariableManager;
15
16/// 远程执行器
17pub struct RemoteExecutor {
18    config: RemoteExecutionConfig,
19    variable_manager: VariableManager,
20}
21
22impl RemoteExecutor {
23
24
25    /// 从YAML文件创建执行器
26    pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
27        let content = std::fs::read_to_string(path)
28            .context("Failed to read YAML configuration file")?;
29        
30        Self::from_yaml_str(&content, variables)
31    }
32
33    /// 从YAML字符串创建执行器
34    pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
35        // 提取初始变量
36        let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
37
38        // 合并变量
39        let mut all_variables = HashMap::new();
40
41        if let Some(v) = initial_variables {
42            all_variables.extend(v);
43        }
44
45        if let Some(v) = variables {
46            all_variables.extend(v);
47        }
48        
49        // 创建变量管理器
50        let variable_manager = VariableManager::new(Some(all_variables));
51        
52        // 应用变量替换解析配置
53        let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
54        ConfigManager::validate_config(&config)?;
55        
56        Ok(Self { config, variable_manager})
57    }
58
59    /// 执行指定的流水线(支持实时输出)
60    pub async fn execute_pipeline_with_realtime_output(
61        &mut self, // 需要可变引用
62        pipeline_name: &str,
63        output_callback: Option<OutputCallback>,
64        log_callback: Option<OutputCallback>
65    ) -> Result<PipelineExecutionResult> {
66        let pipeline = self.config.pipelines.iter()
67            .find(|p| p.name == pipeline_name)
68            .cloned()
69            .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
70
71        let pipeline_name = pipeline.name.clone();
72        let steps: Vec<Step> = pipeline.steps.clone();
73        let start_time = std::time::Instant::now();
74        let mut all_step_results = Vec::new();
75
76        // 发送开始执行流水线的日志
77        if let Some(callback) = &log_callback {
78            let event = OutputEvent {
79                pipeline_name: pipeline_name.clone(),
80                server_name: "system".to_string(),
81                step: Step::default(), // 流水线开始事件没有具体的步骤
82                output_type: crate::models::OutputType::Log,
83                content: format!("开始执行流水线: {}", pipeline_name),
84                timestamp: std::time::Instant::now(),
85                variables: self.variable_manager.get_variables().clone(),
86            };
87            callback(event);
88        }
89
90        info!("Starting pipeline: {}", pipeline_name);
91
92        // 按顺序执行每个步骤(串行)
93        for step in steps {
94            // 合并 step 级变量到全局变量(优先级高)
95            let mut step_var_keys = Vec::new();
96            if let Some(vars) = &step.variables {
97                for (k, v) in vars {
98                    self.variable_manager.set_variable(k.clone(), v.clone());
99                    step_var_keys.push(k.clone());
100                }
101            }
102            // 对当前步骤应用变量替换
103            let mut step_with_variables = step.clone();
104            step_with_variables.script = self.variable_manager.replace_variables(&step.script);
105            
106            // 发送步骤开始事件
107            if let Some(callback) = &output_callback {
108                let event = OutputEvent {
109                    pipeline_name: pipeline_name.clone(),
110                    server_name: "system".to_string(),
111                    step: step.clone(), // 传递完整的Step对象
112                    output_type: crate::models::OutputType::StepStarted,
113                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
114                    timestamp: std::time::Instant::now(),
115                    variables: self.variable_manager.get_variables().clone(),
116                };
117                callback(event);
118            }
119            
120            // 发送开始执行步骤的日志
121            if let Some(callback) = &log_callback {
122                let event = OutputEvent {
123                    pipeline_name: pipeline_name.clone(),
124                    server_name: "system".to_string(),
125                    step: step.clone(), // 传递完整的Step对象
126                    output_type: crate::models::OutputType::Log,
127                    content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
128                    timestamp: std::time::Instant::now(),
129                    variables: self.variable_manager.get_variables().clone(),
130                };
131                callback(event);
132            }
133
134            info!("Starting step: {} on {} servers", step.name, step.servers.len());
135            
136            // 同一步骤内的所有服务器并发执行
137            let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
138            
139            // 检查步骤是否成功(所有服务器都成功才算成功)
140            let step_success = step_results.iter().all(|r| r.execution_result.success);
141            
142            // 添加步骤结果
143            all_step_results.extend(step_results);
144
145            // 发送步骤完成事件
146            if let Some(callback) = &output_callback {
147                let status = if step_success { "成功" } else { "失败" };
148                let event = OutputEvent {
149                    pipeline_name: pipeline_name.clone(),
150                    server_name: "system".to_string(),
151                    step: step.clone(), // 传递完整的Step对象
152                    output_type: crate::models::OutputType::StepCompleted,
153                    content: format!("步骤完成: {} ({})", step.name, status),
154                    timestamp: std::time::Instant::now(),
155                    variables: self.variable_manager.get_variables().clone(),
156                };
157                callback(event);
158            }
159
160            // 如果步骤失败,可以选择是否继续执行后续步骤
161            if !step_success {
162                info!("Step '{}' failed, stopping pipeline", step.name);
163                break;
164            }
165            
166            info!("Step '{}' completed successfully", step.name);
167        }
168
169        let total_time = start_time.elapsed().as_millis() as u64;
170        let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
171
172        // 发送流水线完成日志
173        if let Some(callback) = &log_callback {
174            let status = if overall_success { "成功" } else { "失败" };
175            let event = OutputEvent {
176                pipeline_name: pipeline_name.clone(),
177                server_name: "system".to_string(),
178                step: Step::default(), // 流水线完成事件没有具体的步骤
179                output_type: crate::models::OutputType::Log,
180                content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
181                timestamp: std::time::Instant::now(),
182                variables: self.variable_manager.get_variables().clone(),
183            };
184            callback(event);
185        }
186
187        Ok(PipelineExecutionResult {
188            title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
189            pipeline_name: pipeline_name.clone(),
190            step_results: all_step_results,
191            overall_success,
192            total_execution_time_ms: total_time,
193        })
194    }
195
196    /// 执行所有流水线(支持实时输出)
197    pub async fn execute_all_pipelines_with_realtime_output(
198        &mut self, // 需要可变引用
199        output_callback: Option<OutputCallback>,
200        log_callback: Option<OutputCallback>
201    ) -> Result<Vec<PipelineExecutionResult>> {
202        let mut results = Vec::new();
203        
204        // 发送开始执行所有流水线的日志
205        if let Some(callback) = &log_callback {
206            let event = OutputEvent {
207                pipeline_name: "system".to_string(),
208                server_name: "system".to_string(),
209                step: Step::default(), // 系统级别事件没有具体步骤
210                output_type: crate::models::OutputType::Log,
211                content: format!("=== 远程脚本执行器 ==="),
212                timestamp: std::time::Instant::now(),
213                variables: self.variable_manager.get_variables().clone(),
214            };
215            callback(event);
216            
217            let event = OutputEvent {
218                pipeline_name: "system".to_string(),
219                server_name: "system".to_string(),
220                step: Step::default(), // 系统级别事件没有具体步骤
221                output_type: crate::models::OutputType::Log,
222                content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
223                timestamp: std::time::Instant::now(),
224                variables: self.variable_manager.get_variables().clone(),
225            };
226            callback(event);
227            
228            let event = OutputEvent {
229                pipeline_name: "system".to_string(),
230                server_name: "system".to_string(),
231                step: Step::default(), // 系统级别事件没有具体步骤
232                output_type: crate::models::OutputType::Log,
233                content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
234                timestamp: std::time::Instant::now(),
235                variables: self.variable_manager.get_variables().clone(),
236            };
237            callback(event);
238        }
239        
240        // 按顺序执行每个流水线(串行)
241        let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
242        for pipeline_name in pipeline_names {
243            // 发送开始执行流水线的日志
244            if let Some(callback) = &log_callback {
245                let event = OutputEvent {
246                    pipeline_name: pipeline_name.clone(),
247                    server_name: "system".to_string(),
248                    step: Step::default(), // 流水线开始事件没有具体的步骤
249                    output_type: crate::models::OutputType::Log,
250                    content: format!("开始执行流水线: {}", pipeline_name),
251                    timestamp: std::time::Instant::now(),
252                    variables: self.variable_manager.get_variables().clone(),
253                };
254                callback(event);
255            }
256            info!("Starting pipeline: {}", pipeline_name);
257            let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
258            let success = result.overall_success;
259            results.push(result);
260            if !success {
261                info!("Pipeline '{}' failed, stopping execution", pipeline_name);
262                break;
263            }
264            info!("Pipeline '{}' completed successfully", pipeline_name);
265        }
266        
267        Ok(results)
268    }
269
270    /// 执行指定的流水线(原有方法,保持兼容性)
271    pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
272        self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
273    }
274
275    /// 执行单个步骤(支持实时输出)
276    async fn execute_step_with_realtime_output(
277        &mut self,
278        step: &Step,
279        pipeline_name: &str,
280        output_callback: Option<&OutputCallback>
281    ) -> Result<Vec<StepExecutionResult>> {
282        let start_time = std::time::Instant::now();
283        // Clone config at the start to avoid &self borrow conflicts
284        let config = self.config.clone();
285        let variable_manager = &mut self.variable_manager;
286        
287        // 检查是否有服务器配置
288        if step.servers.is_empty() {
289            // 本地执行
290            info!("Executing step: {} locally (no servers specified)", step.name);
291            let output_callback = output_callback.cloned();
292            let step_clone = step.clone();
293            let pipeline_name = pipeline_name.to_string();
294            let step_name = step.name.clone();
295            let mut variables = variable_manager.get_variables().clone();
296            variables.insert("pipeline_name".to_string(), pipeline_name.clone());
297            variables.insert("step_name".to_string(), step_name.clone());
298            let execution_result = LocalExecutor::execute_script_with_realtime_output(
299                &step_clone,
300                &pipeline_name,
301                &step_name,
302                output_callback,
303                variables,
304            ).await?;
305            let success = execution_result.success;
306            // 提取变量(如果有extract规则)
307            if let Some(extract_rules) = step.extract.clone() {
308                if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
309                    info!("Failed to extract variables from step '{}': {}", step.name, e);
310                }
311            }
312            let step_result = StepExecutionResult {
313                title: step.title.clone().unwrap_or(step.name.clone()),
314                step_name: step.name.clone(),
315                server_name: "localhost".to_string(),
316                execution_result,
317                overall_success: success,
318                execution_time_ms: start_time.elapsed().as_millis() as u64,
319            };
320            return Ok(vec![step_result]);
321        }
322        
323        // 远程执行(原有逻辑)
324        info!("Executing step: {} on {} servers", step.name, step.servers.len());
325
326        let mut step_results = Vec::new();
327        let mut futures = Vec::new();
328        // 用于收集所有服务器提取到的变量 (变量名, 变量值)
329        let mut extracted_vars: Vec<(String, String)> = Vec::new();
330        let clone_variable_manager = variable_manager.clone();
331
332        // 为每个服务器创建执行任务
333        let server_names: Vec<String> = step.servers.clone();
334        for server_name in server_names {
335            if !config.clients.contains_key(&server_name) {
336                return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
337            }
338
339            // 克隆必要的数据以避免生命周期问题
340            let config = config.clone();
341            let step_name = step.name.clone();
342            let output_callback = output_callback.cloned();
343            let clone_step = step.clone();
344            let pipeline_name = pipeline_name.to_string();
345            let mut clone_variable_manager = clone_variable_manager.clone();
346            clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
347            clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
348
349            let future = tokio::spawn(async move {
350                // 创建新的执行器实例
351                let executor = RemoteExecutor { 
352                    config,
353                    variable_manager:clone_variable_manager,
354                };
355
356                match executor.execute_script_with_realtime_output(&server_name, clone_step, &pipeline_name, output_callback).await {
357                    Ok(result) => {
358                        info!("Step '{}' on server '{}' completed with exit code: {}", 
359                              step_name, server_name, result.exit_code);
360                        Ok((server_name, result))
361                    }
362                    Err(e) => {
363                        error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
364                        Err(e)
365                    }
366                }
367
368            });
369
370            futures.push(future);
371        }
372
373        // 等待所有执行完成
374        let results = join_all(futures).await;
375        
376        for result in results {
377            match result {
378                Ok(Ok((server_name, execution_result))) => {
379                    let success = execution_result.success;
380                    // 提取变量(如果有extract规则)
381                    if let Some(extract_rules) = step.extract.clone() {
382                        // 提取变量到本地 map
383                        let mut temp_vm = VariableManager::new(None);
384                        if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
385                            info!("Failed to extract variables from step '{}': {}", step.name, e);
386                        } else {
387                            for (k, v) in temp_vm.get_variables() {
388                                extracted_vars.push((k.clone(), v.clone()));
389                            }
390                        }
391                    }
392                    
393                    step_results.push(StepExecutionResult {
394                        title: step.title.clone().unwrap_or(step.name.clone()),
395                        step_name: step.name.clone(),
396                        server_name,
397                        execution_result,
398                        overall_success: success,
399                        execution_time_ms: start_time.elapsed().as_millis() as u64,
400                    });
401                }
402                Ok(Err(e)) => {
403                    return Err(e);
404                }
405                Err(e) => {
406                    return Err(anyhow::anyhow!("Task execution failed: {}", e));
407                }
408            }
409        }
410        // 合并所有服务器提取到的变量到全局 variable_manager
411        for (k, v) in extracted_vars {
412            variable_manager.set_variable(k, v);
413        }
414
415        Ok(step_results)
416    }
417
418    /// 执行单个步骤(原有方法,保持兼容性)
419    async fn execute_step(&mut self, step: &Step) -> Result<Vec<StepExecutionResult>> {
420        self.execute_step_with_realtime_output(step, "unknown", None).await
421    }
422
423    /// 在指定客户端执行shell脚本(支持实时输出)
424    pub async fn execute_script_with_realtime_output(
425        &self, 
426        client_name: &str, 
427        step: Step,
428        pipeline_name: &str,
429        output_callback: Option<OutputCallback>
430    ) -> Result<ExecutionResult> {
431        // 检查脚本文件是否存在
432        let script_path = Path::new(step.script.as_str());
433        if !script_path.exists() {
434            return Err(anyhow::anyhow!("Script '{}' not found", step.script));
435        }
436
437        let client_config = self.config
438            .clients
439            .get(client_name)
440            .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
441
442        match client_config.execution_method {
443            ExecutionMethod::SSH => {
444                self.execute_script_via_ssh_with_realtime_output(client_config, step, client_name, pipeline_name, output_callback).await
445            }
446            ExecutionMethod::WebSocket => {
447                Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
448            }
449        }
450    }
451
452    /// 通过SSH执行脚本(支持实时输出)
453    async fn execute_script_via_ssh_with_realtime_output(
454        &self, 
455        client_config: &ClientConfig, 
456        step: Step,
457        server_name: &str,
458        pipeline_name: &str,
459        output_callback: Option<OutputCallback>
460    ) -> Result<ExecutionResult> {
461        let ssh_config = client_config.ssh_config.as_ref()
462            .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
463
464        let start_time = std::time::Instant::now();
465
466        // 克隆数据以避免生命周期问题
467        let ssh_config = ssh_config.clone();
468        let script_content = step.script.to_string();
469        let server_name = server_name.to_string();
470        let pipeline_name = pipeline_name.to_string();
471        let step_name = step.name.clone();
472        let extract_rules = step.extract.clone();
473        let variable_manager = self.variable_manager.clone();
474        let clone_ssh_config = ssh_config.clone();
475
476        // 在tokio的阻塞线程池中执行SSH操作
477        let result = tokio::task::spawn_blocking(move || {
478            SshExecutor::execute_script_with_realtime_output(
479                &server_name, 
480                &ssh_config, 
481                &step,
482                &pipeline_name,
483                &step_name,
484                output_callback,
485                variable_manager,
486                extract_rules
487            )
488        }).await?.context(format!("{:#?}", clone_ssh_config))?;
489
490        let execution_time = start_time.elapsed().as_millis() as u64;
491
492        Ok(ExecutionResult {
493            success: result.exit_code == 0,
494            stdout: result.stdout,
495            stderr: result.stderr,
496            script: script_content,
497            exit_code: result.exit_code,
498            execution_time_ms: execution_time,
499            error_message: result.error_message,
500        })
501    }
502
503    /// 获取所有可用的客户端名称
504    pub fn get_available_clients(&self) -> Vec<String> {
505        self.config.clients.keys().cloned().collect()
506    }
507
508    /// 检查客户端是否存在
509    pub fn client_exists(&self, client_name: &str) -> bool {
510        self.config.clients.contains_key(client_name)
511    }
512
513    /// 获取所有可用的流水线名称
514    pub fn get_available_pipelines(&self) -> Vec<String> {
515        self.config.pipelines.iter().map(|p| p.name.clone()).collect()
516    }
517
518    /// 检查流水线是否存在
519    pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
520        self.config.pipelines.iter().any(|p| p.name == pipeline_name)
521    }
522}