1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tracing::{error, info};
7
8use crate::config::ConfigManager;
9use crate::models::{
10 ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
11 RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
12};
13use crate::ssh::SshExecutor;
14use crate::ssh::local::LocalExecutor;
15use crate::vars::VariableManager;
16use crate::ShellExecutionResult;
17
18pub struct RemoteExecutor {
20 config: RemoteExecutionConfig,
21 variable_manager: VariableManager,
22}
23
24impl RemoteExecutor {
25
26
27 pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
29 let content = std::fs::read_to_string(path)
30 .context("Failed to read YAML configuration file")?;
31
32 Self::from_yaml_str(&content, variables)
33 }
34
35 pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
37 let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
39
40 let mut all_variables = HashMap::new();
42
43 if let Some(v) = initial_variables {
44 all_variables.extend(v);
45 }
46
47 if let Some(v) = variables {
48 all_variables.extend(v);
49 }
50
51 let variable_manager = VariableManager::new(Some(all_variables));
53
54 let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
56 ConfigManager::validate_config(&config)?;
57
58 Ok(Self { config, variable_manager})
59 }
60
61 pub async fn execute_pipeline_with_realtime_output(
63 &mut self, pipeline_name: &str,
65 output_callback: Option<OutputCallback>,
66 log_callback: Option<OutputCallback>
67 ) -> Result<PipelineExecutionResult> {
68 let pipeline = self.config.pipelines.iter()
69 .find(|p| p.name == pipeline_name)
70 .cloned()
71 .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
72
73 let pipeline_name = pipeline.name.clone();
74 let steps: Vec<Step> = pipeline.steps.clone();
75 let start_time = std::time::Instant::now();
76 let mut all_step_results = Vec::new();
77
78 if let Some(callback) = &log_callback {
80 let event = OutputEvent {
81 pipeline_name: pipeline_name.clone(),
82 server_name: "system".to_string(),
83 script_path:"".to_string(),
84 step: Step::default(), output_type: crate::models::OutputType::Log,
86 content: format!("开始执行流水线: {}", pipeline_name),
87 timestamp: std::time::Instant::now(),
88 variables: self.variable_manager.get_variables().clone(),
89 };
90 callback(event);
91 }
92
93 info!("Starting pipeline: {}", pipeline_name);
94
95 for step in steps {
97 let mut step_var_keys = Vec::new();
99 if let Some(vars) = &step.variables {
100 for (k, v) in vars {
101 self.variable_manager.set_variable(k.clone(), v.clone());
102 step_var_keys.push(k.clone());
103 }
104 }
105 let mut step_with_variables = step.clone();
107 step_with_variables.script = self.variable_manager.replace_variables(&step.script);
108
109 if let Some(callback) = &output_callback {
111 let event = OutputEvent {
112 pipeline_name: pipeline_name.clone(),
113 server_name: "system".to_string(),
114 step: step.clone(), output_type: crate::models::OutputType::StepStarted,
116 script_path:step.script.clone(),
117 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
118 timestamp: std::time::Instant::now(),
119 variables: self.variable_manager.get_variables().clone(),
120 };
121 callback(event);
122 }
123
124 if let Some(callback) = &log_callback {
126 let event = OutputEvent {
127 pipeline_name: pipeline_name.clone(),
128 server_name: "system".to_string(),
129 step: step.clone(), script_path:step.script.clone(),
131 output_type: crate::models::OutputType::Log,
132 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
133 timestamp: std::time::Instant::now(),
134 variables: self.variable_manager.get_variables().clone(),
135 };
136 callback(event);
137 }
138
139 info!("Starting step: {} on {} servers", step.name, step.servers.len());
140
141 let step_results = self.execute_step_with_realtime_output(pipeline.script.clone(),&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
143
144 let step_success = step_results.iter().all(|r| r.execution_result.success);
146
147 all_step_results.extend(step_results);
149
150 if let Some(callback) = &output_callback {
152 let status = if step_success { "成功" } else { "失败" };
153 let event = OutputEvent {
154 pipeline_name: pipeline_name.clone(),
155 script_path:step.script.clone(),
156 server_name: "system".to_string(),
157 step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
159 content: format!("步骤完成: {} ({})", step.name, status),
160 timestamp: std::time::Instant::now(),
161 variables: self.variable_manager.get_variables().clone(),
162 };
163 callback(event);
164 }
165
166 if !step_success {
168 info!("Step '{}' failed, stopping pipeline", step.name);
169 break;
170 }
171
172 info!("Step '{}' completed successfully", step.name);
173 }
174
175 let total_time = start_time.elapsed().as_millis() as u64;
176 let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
177
178 if let Some(callback) = &log_callback {
180 let status = if overall_success { "成功" } else { "失败" };
181 let event = OutputEvent {
182 pipeline_name: pipeline_name.clone(),
183 script_path:"".to_string(),
184 server_name: "system".to_string(),
185 step: Step::default(), output_type: crate::models::OutputType::Log,
187 content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
188 timestamp: std::time::Instant::now(),
189 variables: self.variable_manager.get_variables().clone(),
190 };
191 callback(event);
192 }
193
194 Ok(PipelineExecutionResult {
195 title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
196 pipeline_name: pipeline_name.clone(),
197 step_results: all_step_results,
198 overall_success,
199 total_execution_time_ms: total_time,
200 })
201 }
202
203 pub async fn execute_all_pipelines_with_realtime_output(
205 &mut self, output_callback: Option<OutputCallback>,
207 log_callback: Option<OutputCallback>
208 ) -> Result<ShellExecutionResult> {
209 let mut results = Vec::new();
210
211 if let Some(callback) = &log_callback {
213 let event = OutputEvent {
214 pipeline_name: "system".to_string(),
215 server_name: "system".to_string(),
216 script_path :"".to_string(),
217 step: Step::default(), output_type: crate::models::OutputType::Log,
219 content: format!("=== 远程脚本执行器 ==="),
220 timestamp: std::time::Instant::now(),
221 variables: self.variable_manager.get_variables().clone(),
222 };
223 callback(event);
224
225 let event = OutputEvent {
226 pipeline_name: "system".to_string(),
227 server_name: "system".to_string(),
228 script_path :"".to_string(),
229 step: Step::default(), output_type: crate::models::OutputType::Log,
231 content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
232 timestamp: std::time::Instant::now(),
233 variables: self.variable_manager.get_variables().clone(),
234 };
235 callback(event);
236
237 let event = OutputEvent {
238 pipeline_name: "system".to_string(),
239 script_path :"".to_string(),
240 server_name: "system".to_string(),
241 step: Step::default(), output_type: crate::models::OutputType::Log,
243 content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
244 timestamp: std::time::Instant::now(),
245 variables: self.variable_manager.get_variables().clone(),
246 };
247 callback(event);
248 }
249
250 let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
252 for pipeline_name in pipeline_names {
253 if let Some(callback) = &log_callback {
255 let event = OutputEvent {
256 pipeline_name: pipeline_name.clone(),
257 server_name: "system".to_string(),
258 script_path :"".to_string(),
259 step: Step::default(), output_type: crate::models::OutputType::Log,
261 content: format!("开始执行流水线: {}", pipeline_name),
262 timestamp: std::time::Instant::now(),
263 variables: self.variable_manager.get_variables().clone(),
264 };
265 callback(event);
266 }
267 info!("Starting pipeline: {}", pipeline_name);
268
269 let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
270 let success = result.overall_success;
271 results.push(result);
272 if !success {
273 info!("Pipeline '{}' failed, stopping execution", pipeline_name);
274 break;
275 }
276 info!("Pipeline '{}' completed successfully", pipeline_name);
277 }
278
279 Ok(ShellExecutionResult{
280 success: true,
281 reason: "ok".to_string(),
282 pipeline_results: results,
283 })
284 }
285
286 pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
288 self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
289 }
290
291 async fn execute_step_with_realtime_output(
293 &mut self,
294 script: Option<String>,
295 step: &Step,
296 pipeline_name: &str,
297 output_callback: Option<&OutputCallback>
298 ) -> Result<Vec<StepExecutionResult>> {
299 let start_time = std::time::Instant::now();
300 let config = self.config.clone();
302 let variable_manager = &mut self.variable_manager;
303
304 if step.servers.is_empty() {
306 info!("Executing step: {} locally (no servers specified)", step.name);
308 let output_callback = output_callback.cloned();
309 let step_clone = step.clone();
310 let pipeline_name = pipeline_name.to_string();
311 let step_name = step.name.clone();
312 let mut variables = variable_manager.get_variables().clone();
313 variables.insert("pipeline_name".to_string(), pipeline_name.clone());
314 variables.insert("step_name".to_string(), step_name.clone());
315 let execution_result = LocalExecutor::execute_script_with_realtime_output(
316 script.clone(),
317 self.config.global_scripts.clone(),
318 &step_clone,
319 &pipeline_name,
320 &step_name,
321 output_callback,
322 variables,
323 ).await?;
324 let success = execution_result.success;
325 if let Some(extract_rules) = step.extract.clone() {
327 if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
328 info!("Failed to extract variables from step '{}': {}", step.name, e);
329 }
330 }
331 let step_result = StepExecutionResult {
332 title: step.title.clone().unwrap_or(step.name.clone()),
333 step_name: step.name.clone(),
334 scritp_path:step.script.clone(),
335 server_name: "localhost".to_string(),
336 execution_result,
337 overall_success: success,
338 execution_time_ms: start_time.elapsed().as_millis() as u64,
339 };
340 return Ok(vec![step_result]);
341 }
342
343 info!("Executing step: {} on {} servers", step.name, step.servers.len());
345
346 let mut step_results = Vec::new();
347 let mut futures = Vec::new();
348 let mut extracted_vars: Vec<(String, String)> = Vec::new();
350 let clone_variable_manager = variable_manager.clone();
351
352 let server_names: Vec<String> = step.servers.clone();
354 let global_script= Arc::new(self.config.global_scripts.clone());
355 let clone_global_script = global_script.clone();
356 for server_name in server_names {
357 if !config.clients.contains_key(&server_name) {
358 return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
359 }
360
361 let config = config.clone();
363 let step_name = step.name.clone();
364 let output_callback = output_callback.cloned();
365 let clone_step = step.clone();
366 let pipeline_name = pipeline_name.to_string();
367 let mut clone_variable_manager = clone_variable_manager.clone();
368 clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
369 clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
370 let script = script.clone();
371
372 let clone_global_script = clone_global_script.clone();
373
374 let future = tokio::spawn(async move {
375 let executor = RemoteExecutor {
377 config,
378 variable_manager:clone_variable_manager,
379 };
380
381 match executor.execute_script_with_realtime_output(script,clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
382 Ok(result) => {
383 info!("Step '{}' on server '{}' completed with exit code: {}",
384 step_name, server_name, result.exit_code);
385 Ok((server_name, result))
386 }
387 Err(e) => {
388 error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
389 Err(e)
390 }
391 }
392
393 });
394
395 futures.push(future);
396 }
397
398 let results = join_all(futures).await;
400
401 for result in results {
402 match result {
403 Ok(Ok((server_name, execution_result))) => {
404 let success = execution_result.success;
405 if let Some(extract_rules) = step.extract.clone() {
407 let mut temp_vm = VariableManager::new(None);
409 if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
410 info!("Failed to extract variables from step '{}': {}", step.name, e);
411 } else {
412 for (k, v) in temp_vm.get_variables() {
413 extracted_vars.push((k.clone(), v.clone()));
414 }
415 }
416 }
417
418 step_results.push(StepExecutionResult {
419 title: step.title.clone().unwrap_or(step.name.clone()),
420 step_name: step.name.clone(),
421 server_name,
422 scritp_path:step.script.clone(),
423 execution_result,
424 overall_success: success,
425 execution_time_ms: start_time.elapsed().as_millis() as u64,
426 });
427 }
428 Ok(Err(e)) => {
429 return Err(e);
430 }
431 Err(e) => {
432 return Err(anyhow::anyhow!("Task execution failed: {}", e));
433 }
434 }
435 }
436 for (k, v) in extracted_vars {
438 variable_manager.set_variable(k, v);
439 }
440
441 Ok(step_results)
442 }
443
444 pub async fn execute_script_with_realtime_output(
446 &self,
447 script: Option<String>,
448 global_scripts:Arc<Vec<String>>,
449 client_name: &str,
450 step: Step,
451 pipeline_name: &str,
452 output_callback: Option<OutputCallback>
453 ) -> Result<ExecutionResult> {
454 let script_path = Path::new(step.script.as_str());
456 if !script_path.exists() {
457 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
458 }
459
460 let client_config = self.config
461 .clients
462 .get(client_name)
463 .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
464
465 match client_config.execution_method {
466 ExecutionMethod::SSH => {
467 self.execute_script_via_ssh_with_realtime_output(script,global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
468 }
469 ExecutionMethod::WebSocket => {
470 Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
471 }
472 }
473 }
474
475 async fn execute_script_via_ssh_with_realtime_output(
477 &self,
478 script: Option<String>,
479 global_scripts:Arc<Vec<String>>,
480 client_config: &ClientConfig,
481 step: Step,
482 server_name: &str,
483 pipeline_name: &str,
484 output_callback: Option<OutputCallback>
485 ) -> Result<ExecutionResult> {
486 let ssh_config = client_config.ssh_config.as_ref()
487 .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
488
489 let start_time = std::time::Instant::now();
490
491 let ssh_config = ssh_config.clone();
493 let script_content = step.script.to_string();
494 let server_name = server_name.to_string();
495 let pipeline_name = pipeline_name.to_string();
496 let step_name = step.name.clone();
497 let extract_rules = step.extract.clone();
498 let variable_manager = self.variable_manager.clone();
499
500 let result = match tokio::task::spawn_blocking(move || {
502 SshExecutor::execute_script_with_realtime_output(
503 script.clone(),
504 global_scripts.clone(),
505 &server_name,
506 &ssh_config,
507 &step,
508 &pipeline_name,
509 &step_name,
510 output_callback,
511 variable_manager,
512 extract_rules
513 )
514 }).await?.context(format!("run script faield")) {
515 Ok(v) => v,
516 Err(e) => {
517
518 let execution_time = start_time.elapsed().as_millis() as u64;
519 return Ok(ExecutionResult{
520 success: false,
521 stdout: "".to_string(),
522 stderr: format!("{:?}", e),
523 script: script_content,
524 exit_code: 0,
525 execution_time_ms: execution_time,
526 error_message: Some(format!("{:?}", e)),
527 });
528 }
529 };
530
531 let execution_time = start_time.elapsed().as_millis() as u64;
532
533 Ok(ExecutionResult {
534 success: result.exit_code == 0,
535 stdout: result.stdout,
536 stderr: result.stderr,
537 script: script_content,
538 exit_code: result.exit_code,
539 execution_time_ms: execution_time,
540 error_message: result.error_message,
541 })
542 }
543
544 pub fn get_available_clients(&self) -> Vec<String> {
546 self.config.clients.keys().cloned().collect()
547 }
548
549 pub fn client_exists(&self, client_name: &str) -> bool {
551 self.config.clients.contains_key(client_name)
552 }
553
554 pub fn get_available_pipelines(&self) -> Vec<String> {
556 self.config.pipelines.iter().map(|p| p.name.clone()).collect()
557 }
558
559 pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
561 self.config.pipelines.iter().any(|p| p.name == pipeline_name)
562 }
563}