1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tracing::{error, info};
7
8use crate::config::ConfigManager;
9use crate::models::{
10 ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
11 RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
12};
13use crate::ssh::SshExecutor;
14use crate::ssh::local::LocalExecutor;
15use crate::vars::VariableManager;
16use crate::ShellExecutionResult;
17
18pub struct RemoteExecutor {
20 config: RemoteExecutionConfig,
21 variable_manager: VariableManager,
22}
23
24impl RemoteExecutor {
25
26
27 pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
29 let content = std::fs::read_to_string(path)
30 .context("Failed to read YAML configuration file")?;
31
32 Self::from_yaml_str(&content, variables)
33 }
34
35 pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
37 let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
39
40 let mut all_variables = HashMap::new();
42
43 if let Some(v) = initial_variables {
44 all_variables.extend(v);
45 }
46
47 if let Some(v) = variables {
48 all_variables.extend(v);
49 }
50
51 let variable_manager = VariableManager::new(Some(all_variables));
53
54 let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
56 ConfigManager::validate_config(&config)?;
57
58 Ok(Self { config, variable_manager})
59 }
60
61 pub async fn execute_pipeline_with_realtime_output(
63 &mut self, pipeline_name: &str,
65 output_callback: Option<OutputCallback>,
66 log_callback: Option<OutputCallback>
67 ) -> Result<PipelineExecutionResult> {
68 let pipeline = self.config.pipelines.iter()
69 .find(|p| p.name == pipeline_name)
70 .cloned()
71 .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
72
73 let pipeline_name = pipeline.name.clone();
74 let steps: Vec<Step> = pipeline.steps.clone();
75 let start_time = std::time::Instant::now();
76 let mut all_step_results = Vec::new();
77
78 if let Some(callback) = &log_callback {
80 let event = OutputEvent {
81 pipeline_name: pipeline_name.clone(),
82 server_name: "system".to_string(),
83 step: Step::default(), output_type: crate::models::OutputType::Log,
85 content: format!("开始执行流水线: {}", pipeline_name),
86 timestamp: std::time::Instant::now(),
87 variables: self.variable_manager.get_variables().clone(),
88 };
89 callback(event);
90 }
91
92 info!("Starting pipeline: {}", pipeline_name);
93
94 for step in steps {
96 let mut step_var_keys = Vec::new();
98 if let Some(vars) = &step.variables {
99 for (k, v) in vars {
100 self.variable_manager.set_variable(k.clone(), v.clone());
101 step_var_keys.push(k.clone());
102 }
103 }
104 let mut step_with_variables = step.clone();
106 step_with_variables.script = self.variable_manager.replace_variables(&step.script);
107
108 if let Some(callback) = &output_callback {
110 let event = OutputEvent {
111 pipeline_name: pipeline_name.clone(),
112 server_name: "system".to_string(),
113 step: step.clone(), output_type: crate::models::OutputType::StepStarted,
115 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
116 timestamp: std::time::Instant::now(),
117 variables: self.variable_manager.get_variables().clone(),
118 };
119 callback(event);
120 }
121
122 if let Some(callback) = &log_callback {
124 let event = OutputEvent {
125 pipeline_name: pipeline_name.clone(),
126 server_name: "system".to_string(),
127 step: step.clone(), output_type: crate::models::OutputType::Log,
129 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
130 timestamp: std::time::Instant::now(),
131 variables: self.variable_manager.get_variables().clone(),
132 };
133 callback(event);
134 }
135
136 info!("Starting step: {} on {} servers", step.name, step.servers.len());
137
138 let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
140
141 let step_success = step_results.iter().all(|r| r.execution_result.success);
143
144 all_step_results.extend(step_results);
146
147 if let Some(callback) = &output_callback {
149 let status = if step_success { "成功" } else { "失败" };
150 let event = OutputEvent {
151 pipeline_name: pipeline_name.clone(),
152 server_name: "system".to_string(),
153 step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
155 content: format!("步骤完成: {} ({})", step.name, status),
156 timestamp: std::time::Instant::now(),
157 variables: self.variable_manager.get_variables().clone(),
158 };
159 callback(event);
160 }
161
162 if !step_success {
164 info!("Step '{}' failed, stopping pipeline", step.name);
165 break;
166 }
167
168 info!("Step '{}' completed successfully", step.name);
169 }
170
171 let total_time = start_time.elapsed().as_millis() as u64;
172 let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
173
174 if let Some(callback) = &log_callback {
176 let status = if overall_success { "成功" } else { "失败" };
177 let event = OutputEvent {
178 pipeline_name: pipeline_name.clone(),
179 server_name: "system".to_string(),
180 step: Step::default(), output_type: crate::models::OutputType::Log,
182 content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
183 timestamp: std::time::Instant::now(),
184 variables: self.variable_manager.get_variables().clone(),
185 };
186 callback(event);
187 }
188
189 Ok(PipelineExecutionResult {
190 title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
191 pipeline_name: pipeline_name.clone(),
192 step_results: all_step_results,
193 overall_success,
194 total_execution_time_ms: total_time,
195 })
196 }
197
198 pub async fn execute_all_pipelines_with_realtime_output(
200 &mut self, output_callback: Option<OutputCallback>,
202 log_callback: Option<OutputCallback>
203 ) -> Result<ShellExecutionResult> {
204 let mut results = Vec::new();
205
206 if let Some(callback) = &log_callback {
208 let event = OutputEvent {
209 pipeline_name: "system".to_string(),
210 server_name: "system".to_string(),
211 step: Step::default(), output_type: crate::models::OutputType::Log,
213 content: format!("=== 远程脚本执行器 ==="),
214 timestamp: std::time::Instant::now(),
215 variables: self.variable_manager.get_variables().clone(),
216 };
217 callback(event);
218
219 let event = OutputEvent {
220 pipeline_name: "system".to_string(),
221 server_name: "system".to_string(),
222 step: Step::default(), output_type: crate::models::OutputType::Log,
224 content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
225 timestamp: std::time::Instant::now(),
226 variables: self.variable_manager.get_variables().clone(),
227 };
228 callback(event);
229
230 let event = OutputEvent {
231 pipeline_name: "system".to_string(),
232 server_name: "system".to_string(),
233 step: Step::default(), output_type: crate::models::OutputType::Log,
235 content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
236 timestamp: std::time::Instant::now(),
237 variables: self.variable_manager.get_variables().clone(),
238 };
239 callback(event);
240 }
241
242 let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
244 for pipeline_name in pipeline_names {
245 if let Some(callback) = &log_callback {
247 let event = OutputEvent {
248 pipeline_name: pipeline_name.clone(),
249 server_name: "system".to_string(),
250 step: Step::default(), output_type: crate::models::OutputType::Log,
252 content: format!("开始执行流水线: {}", pipeline_name),
253 timestamp: std::time::Instant::now(),
254 variables: self.variable_manager.get_variables().clone(),
255 };
256 callback(event);
257 }
258 info!("Starting pipeline: {}", pipeline_name);
259
260 let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
261 let success = result.overall_success;
262 results.push(result);
263 if !success {
264 info!("Pipeline '{}' failed, stopping execution", pipeline_name);
265 break;
266 }
267 info!("Pipeline '{}' completed successfully", pipeline_name);
268 }
269
270 Ok(ShellExecutionResult{
271 success: true,
272 reason: "ok".to_string(),
273 pipeline_results: results,
274 })
275 }
276
277 pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
279 self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
280 }
281
282 async fn execute_step_with_realtime_output(
284 &mut self,
285 step: &Step,
286 pipeline_name: &str,
287 output_callback: Option<&OutputCallback>
288 ) -> Result<Vec<StepExecutionResult>> {
289 let start_time = std::time::Instant::now();
290 let config = self.config.clone();
292 let variable_manager = &mut self.variable_manager;
293
294 if step.servers.is_empty() {
296 info!("Executing step: {} locally (no servers specified)", step.name);
298 let output_callback = output_callback.cloned();
299 let step_clone = step.clone();
300 let pipeline_name = pipeline_name.to_string();
301 let step_name = step.name.clone();
302 let mut variables = variable_manager.get_variables().clone();
303 variables.insert("pipeline_name".to_string(), pipeline_name.clone());
304 variables.insert("step_name".to_string(), step_name.clone());
305 let execution_result = LocalExecutor::execute_script_with_realtime_output(
306 self.config.global_scripts.clone(),
307 &step_clone,
308 &pipeline_name,
309 &step_name,
310 output_callback,
311 variables,
312 ).await?;
313 let success = execution_result.success;
314 if let Some(extract_rules) = step.extract.clone() {
316 if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
317 info!("Failed to extract variables from step '{}': {}", step.name, e);
318 }
319 }
320 let step_result = StepExecutionResult {
321 title: step.title.clone().unwrap_or(step.name.clone()),
322 step_name: step.name.clone(),
323 server_name: "localhost".to_string(),
324 execution_result,
325 overall_success: success,
326 execution_time_ms: start_time.elapsed().as_millis() as u64,
327 };
328 return Ok(vec![step_result]);
329 }
330
331 info!("Executing step: {} on {} servers", step.name, step.servers.len());
333
334 let mut step_results = Vec::new();
335 let mut futures = Vec::new();
336 let mut extracted_vars: Vec<(String, String)> = Vec::new();
338 let clone_variable_manager = variable_manager.clone();
339
340 let server_names: Vec<String> = step.servers.clone();
342 let global_script= Arc::new(self.config.global_scripts.clone());
343 let clone_global_script = global_script.clone();
344 for server_name in server_names {
345 if !config.clients.contains_key(&server_name) {
346 return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
347 }
348
349 let config = config.clone();
351 let step_name = step.name.clone();
352 let output_callback = output_callback.cloned();
353 let clone_step = step.clone();
354 let pipeline_name = pipeline_name.to_string();
355 let mut clone_variable_manager = clone_variable_manager.clone();
356 clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
357 clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
358
359 let clone_global_script = clone_global_script.clone();
360
361 let future = tokio::spawn(async move {
362 let executor = RemoteExecutor {
364 config,
365 variable_manager:clone_variable_manager,
366 };
367
368 match executor.execute_script_with_realtime_output(clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
369 Ok(result) => {
370 info!("Step '{}' on server '{}' completed with exit code: {}",
371 step_name, server_name, result.exit_code);
372 Ok((server_name, result))
373 }
374 Err(e) => {
375 error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
376 Err(e)
377 }
378 }
379
380 });
381
382 futures.push(future);
383 }
384
385 let results = join_all(futures).await;
387
388 for result in results {
389 match result {
390 Ok(Ok((server_name, execution_result))) => {
391 let success = execution_result.success;
392 if let Some(extract_rules) = step.extract.clone() {
394 let mut temp_vm = VariableManager::new(None);
396 if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
397 info!("Failed to extract variables from step '{}': {}", step.name, e);
398 } else {
399 for (k, v) in temp_vm.get_variables() {
400 extracted_vars.push((k.clone(), v.clone()));
401 }
402 }
403 }
404
405 step_results.push(StepExecutionResult {
406 title: step.title.clone().unwrap_or(step.name.clone()),
407 step_name: step.name.clone(),
408 server_name,
409 execution_result,
410 overall_success: success,
411 execution_time_ms: start_time.elapsed().as_millis() as u64,
412 });
413 }
414 Ok(Err(e)) => {
415 return Err(e);
416 }
417 Err(e) => {
418 return Err(anyhow::anyhow!("Task execution failed: {}", e));
419 }
420 }
421 }
422 for (k, v) in extracted_vars {
424 variable_manager.set_variable(k, v);
425 }
426
427 Ok(step_results)
428 }
429
430 pub async fn execute_script_with_realtime_output(
432 &self,
433 global_scripts:Arc<Vec<String>>,
434 client_name: &str,
435 step: Step,
436 pipeline_name: &str,
437 output_callback: Option<OutputCallback>
438 ) -> Result<ExecutionResult> {
439 let script_path = Path::new(step.script.as_str());
441 if !script_path.exists() {
442 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
443 }
444
445 let client_config = self.config
446 .clients
447 .get(client_name)
448 .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
449
450 match client_config.execution_method {
451 ExecutionMethod::SSH => {
452 self.execute_script_via_ssh_with_realtime_output(global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
453 }
454 ExecutionMethod::WebSocket => {
455 Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
456 }
457 }
458 }
459
460 async fn execute_script_via_ssh_with_realtime_output(
462 &self,
463 global_scripts:Arc<Vec<String>>,
464 client_config: &ClientConfig,
465 step: Step,
466 server_name: &str,
467 pipeline_name: &str,
468 output_callback: Option<OutputCallback>
469 ) -> Result<ExecutionResult> {
470 let ssh_config = client_config.ssh_config.as_ref()
471 .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
472
473 let start_time = std::time::Instant::now();
474
475 let ssh_config = ssh_config.clone();
477 let script_content = step.script.to_string();
478 let server_name = server_name.to_string();
479 let pipeline_name = pipeline_name.to_string();
480 let step_name = step.name.clone();
481 let extract_rules = step.extract.clone();
482 let variable_manager = self.variable_manager.clone();
483
484 let result = match tokio::task::spawn_blocking(move || {
486 SshExecutor::execute_script_with_realtime_output(
487 global_scripts.clone(),
488 &server_name,
489 &ssh_config,
490 &step,
491 &pipeline_name,
492 &step_name,
493 output_callback,
494 variable_manager,
495 extract_rules
496 )
497 }).await?.context(format!("run script faield")) {
498 Ok(v) => v,
499 Err(e) => {
500
501 let execution_time = start_time.elapsed().as_millis() as u64;
502 return Ok(ExecutionResult{
503 success: false,
504 stdout: "".to_string(),
505 stderr: format!("{:?}", e),
506 script: script_content,
507 exit_code: 0,
508 execution_time_ms: execution_time,
509 error_message: Some(format!("{:?}", e)),
510 });
511 }
512 };
513
514 let execution_time = start_time.elapsed().as_millis() as u64;
515
516 Ok(ExecutionResult {
517 success: result.exit_code == 0,
518 stdout: result.stdout,
519 stderr: result.stderr,
520 script: script_content,
521 exit_code: result.exit_code,
522 execution_time_ms: execution_time,
523 error_message: result.error_message,
524 })
525 }
526
527 pub fn get_available_clients(&self) -> Vec<String> {
529 self.config.clients.keys().cloned().collect()
530 }
531
532 pub fn client_exists(&self, client_name: &str) -> bool {
534 self.config.clients.contains_key(client_name)
535 }
536
537 pub fn get_available_pipelines(&self) -> Vec<String> {
539 self.config.pipelines.iter().map(|p| p.name.clone()).collect()
540 }
541
542 pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
544 self.config.pipelines.iter().any(|p| p.name == pipeline_name)
545 }
546}