1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tracing::{error, info};
7
8use crate::config::ConfigManager;
9use crate::models::{
10 ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
11 RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
12};
13use crate::ssh::SshExecutor;
14use crate::ssh::local::LocalExecutor;
15use crate::vars::VariableManager;
16use crate::ShellExecutionResult;
17
18pub struct RemoteExecutor {
20 config: RemoteExecutionConfig,
21 variable_manager: VariableManager,
22}
23
24impl RemoteExecutor {
25
26
27 pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
29 let content = std::fs::read_to_string(path)
30 .context("Failed to read YAML configuration file")?;
31
32 Self::from_yaml_str(&content, variables)
33 }
34
35 pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
37 let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
39
40 let mut all_variables = HashMap::new();
42
43 if let Some(v) = initial_variables {
44 all_variables.extend(v);
45 }
46
47 if let Some(v) = variables {
48 all_variables.extend(v);
49 }
50
51 let variable_manager = VariableManager::new(Some(all_variables));
53
54 let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
56 ConfigManager::validate_config(&config)?;
57
58 Ok(Self { config, variable_manager})
59 }
60
61 pub async fn execute_pipeline_with_realtime_output(
63 &mut self, pipeline_name: &str,
65 output_callback: Option<OutputCallback>,
66 log_callback: Option<OutputCallback>
67 ) -> Result<PipelineExecutionResult> {
68 let pipeline = self.config.pipelines.iter()
69 .find(|p| p.name == pipeline_name)
70 .cloned()
71 .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
72
73 let pipeline_name = pipeline.name.clone();
74 let steps: Vec<Step> = pipeline.steps.clone();
75 let start_time = std::time::Instant::now();
76 let mut all_step_results = Vec::new();
77
78 if let Some(callback) = &log_callback {
80 let event = OutputEvent {
81 pipeline_name: pipeline_name.clone(),
82 server_name: "system".to_string(),
83 script_path:"".to_string(),
84 step: Step::default(), output_type: crate::models::OutputType::Log,
86 content: format!("开始执行流水线: {}", pipeline_name),
87 timestamp: std::time::Instant::now(),
88 variables: self.variable_manager.get_variables().clone(),
89 };
90 callback(event);
91 }
92
93 info!("Starting pipeline: {}", pipeline_name);
94
95 for step in steps {
97 let mut step_var_keys = Vec::new();
99 if let Some(vars) = &step.variables {
100 for (k, v) in vars {
101 self.variable_manager.set_variable(k.clone(), v.clone());
102 step_var_keys.push(k.clone());
103 }
104 }
105 let mut step_with_variables = step.clone();
107 step_with_variables.script = self.variable_manager.replace_variables(&step.script);
108
109 if let Some(callback) = &output_callback {
111 let event = OutputEvent {
112 pipeline_name: pipeline_name.clone(),
113 server_name: "system".to_string(),
114 step: step.clone(), output_type: crate::models::OutputType::StepStarted,
116 script_path:step.script.clone(),
117 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
118 timestamp: std::time::Instant::now(),
119 variables: self.variable_manager.get_variables().clone(),
120 };
121 callback(event);
122 }
123
124 if let Some(callback) = &log_callback {
126 let event = OutputEvent {
127 pipeline_name: pipeline_name.clone(),
128 server_name: "system".to_string(),
129 step: step.clone(), script_path:step.script.clone(),
131 output_type: crate::models::OutputType::Log,
132 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
133 timestamp: std::time::Instant::now(),
134 variables: self.variable_manager.get_variables().clone(),
135 };
136 callback(event);
137 }
138
139 info!("Starting step: {} on {} servers", step.name, step.servers.len());
140
141 let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
143
144 let step_success = step_results.iter().all(|r| r.execution_result.success);
146
147 all_step_results.extend(step_results);
149
150 if let Some(callback) = &output_callback {
152 let status = if step_success { "成功" } else { "失败" };
153 let event = OutputEvent {
154 pipeline_name: pipeline_name.clone(),
155 script_path:step.script.clone(),
156 server_name: "system".to_string(),
157 step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
159 content: format!("步骤完成: {} ({})", step.name, status),
160 timestamp: std::time::Instant::now(),
161 variables: self.variable_manager.get_variables().clone(),
162 };
163 callback(event);
164 }
165
166 if !step_success {
168 info!("Step '{}' failed, stopping pipeline", step.name);
169 break;
170 }
171
172 info!("Step '{}' completed successfully", step.name);
173 }
174
175 let total_time = start_time.elapsed().as_millis() as u64;
176 let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
177
178 if let Some(callback) = &log_callback {
180 let status = if overall_success { "成功" } else { "失败" };
181 let event = OutputEvent {
182 pipeline_name: pipeline_name.clone(),
183 script_path:"".to_string(),
184 server_name: "system".to_string(),
185 step: Step::default(), output_type: crate::models::OutputType::Log,
187 content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
188 timestamp: std::time::Instant::now(),
189 variables: self.variable_manager.get_variables().clone(),
190 };
191 callback(event);
192 }
193
194 Ok(PipelineExecutionResult {
195 title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
196 pipeline_name: pipeline_name.clone(),
197 step_results: all_step_results,
198 overall_success,
199 total_execution_time_ms: total_time,
200 })
201 }
202
203 pub async fn execute_all_pipelines_with_realtime_output(
205 &mut self, output_callback: Option<OutputCallback>,
207 log_callback: Option<OutputCallback>
208 ) -> Result<ShellExecutionResult> {
209 let mut results = Vec::new();
210
211 if let Some(callback) = &log_callback {
213 let event = OutputEvent {
214 pipeline_name: "system".to_string(),
215 server_name: "system".to_string(),
216 script_path :"".to_string(),
217 step: Step::default(), output_type: crate::models::OutputType::Log,
219 content: format!("=== 远程脚本执行器 ==="),
220 timestamp: std::time::Instant::now(),
221 variables: self.variable_manager.get_variables().clone(),
222 };
223 callback(event);
224
225 let event = OutputEvent {
226 pipeline_name: "system".to_string(),
227 server_name: "system".to_string(),
228 script_path :"".to_string(),
229 step: Step::default(), output_type: crate::models::OutputType::Log,
231 content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
232 timestamp: std::time::Instant::now(),
233 variables: self.variable_manager.get_variables().clone(),
234 };
235 callback(event);
236
237 let event = OutputEvent {
238 pipeline_name: "system".to_string(),
239 script_path :"".to_string(),
240 server_name: "system".to_string(),
241 step: Step::default(), output_type: crate::models::OutputType::Log,
243 content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
244 timestamp: std::time::Instant::now(),
245 variables: self.variable_manager.get_variables().clone(),
246 };
247 callback(event);
248 }
249
250 let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
252 for pipeline_name in pipeline_names {
253 if let Some(callback) = &log_callback {
255 let event = OutputEvent {
256 pipeline_name: pipeline_name.clone(),
257 server_name: "system".to_string(),
258 script_path :"".to_string(),
259 step: Step::default(), output_type: crate::models::OutputType::Log,
261 content: format!("开始执行流水线: {}", pipeline_name),
262 timestamp: std::time::Instant::now(),
263 variables: self.variable_manager.get_variables().clone(),
264 };
265 callback(event);
266 }
267 info!("Starting pipeline: {}", pipeline_name);
268
269 let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
270 let success = result.overall_success;
271 results.push(result);
272 if !success {
273 info!("Pipeline '{}' failed, stopping execution", pipeline_name);
274 break;
275 }
276 info!("Pipeline '{}' completed successfully", pipeline_name);
277 }
278
279 Ok(ShellExecutionResult{
280 success: true,
281 reason: "ok".to_string(),
282 pipeline_results: results,
283 })
284 }
285
286 pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
288 self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
289 }
290
291 async fn execute_step_with_realtime_output(
293 &mut self,
294 step: &Step,
295 pipeline_name: &str,
296 output_callback: Option<&OutputCallback>
297 ) -> Result<Vec<StepExecutionResult>> {
298 let start_time = std::time::Instant::now();
299 let config = self.config.clone();
301 let variable_manager = &mut self.variable_manager;
302
303 if step.servers.is_empty() {
305 info!("Executing step: {} locally (no servers specified)", step.name);
307 let output_callback = output_callback.cloned();
308 let step_clone = step.clone();
309 let pipeline_name = pipeline_name.to_string();
310 let step_name = step.name.clone();
311 let mut variables = variable_manager.get_variables().clone();
312 variables.insert("pipeline_name".to_string(), pipeline_name.clone());
313 variables.insert("step_name".to_string(), step_name.clone());
314 let execution_result = LocalExecutor::execute_script_with_realtime_output(
315 self.config.global_scripts.clone(),
316 &step_clone,
317 &pipeline_name,
318 &step_name,
319 output_callback,
320 variables,
321 ).await?;
322 let success = execution_result.success;
323 if let Some(extract_rules) = step.extract.clone() {
325 if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
326 info!("Failed to extract variables from step '{}': {}", step.name, e);
327 }
328 }
329 let step_result = StepExecutionResult {
330 title: step.title.clone().unwrap_or(step.name.clone()),
331 step_name: step.name.clone(),
332 scritp_path:step.script.clone(),
333 server_name: "localhost".to_string(),
334 execution_result,
335 overall_success: success,
336 execution_time_ms: start_time.elapsed().as_millis() as u64,
337 };
338 return Ok(vec![step_result]);
339 }
340
341 info!("Executing step: {} on {} servers", step.name, step.servers.len());
343
344 let mut step_results = Vec::new();
345 let mut futures = Vec::new();
346 let mut extracted_vars: Vec<(String, String)> = Vec::new();
348 let clone_variable_manager = variable_manager.clone();
349
350 let server_names: Vec<String> = step.servers.clone();
352 let global_script= Arc::new(self.config.global_scripts.clone());
353 let clone_global_script = global_script.clone();
354 for server_name in server_names {
355 if !config.clients.contains_key(&server_name) {
356 return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
357 }
358
359 let config = config.clone();
361 let step_name = step.name.clone();
362 let output_callback = output_callback.cloned();
363 let clone_step = step.clone();
364 let pipeline_name = pipeline_name.to_string();
365 let mut clone_variable_manager = clone_variable_manager.clone();
366 clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
367 clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
368
369 let clone_global_script = clone_global_script.clone();
370
371 let future = tokio::spawn(async move {
372 let executor = RemoteExecutor {
374 config,
375 variable_manager:clone_variable_manager,
376 };
377
378 match executor.execute_script_with_realtime_output(clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
379 Ok(result) => {
380 info!("Step '{}' on server '{}' completed with exit code: {}",
381 step_name, server_name, result.exit_code);
382 Ok((server_name, result))
383 }
384 Err(e) => {
385 error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
386 Err(e)
387 }
388 }
389
390 });
391
392 futures.push(future);
393 }
394
395 let results = join_all(futures).await;
397
398 for result in results {
399 match result {
400 Ok(Ok((server_name, execution_result))) => {
401 let success = execution_result.success;
402 if let Some(extract_rules) = step.extract.clone() {
404 let mut temp_vm = VariableManager::new(None);
406 if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
407 info!("Failed to extract variables from step '{}': {}", step.name, e);
408 } else {
409 for (k, v) in temp_vm.get_variables() {
410 extracted_vars.push((k.clone(), v.clone()));
411 }
412 }
413 }
414
415 step_results.push(StepExecutionResult {
416 title: step.title.clone().unwrap_or(step.name.clone()),
417 step_name: step.name.clone(),
418 server_name,
419 scritp_path:step.script.clone(),
420 execution_result,
421 overall_success: success,
422 execution_time_ms: start_time.elapsed().as_millis() as u64,
423 });
424 }
425 Ok(Err(e)) => {
426 return Err(e);
427 }
428 Err(e) => {
429 return Err(anyhow::anyhow!("Task execution failed: {}", e));
430 }
431 }
432 }
433 for (k, v) in extracted_vars {
435 variable_manager.set_variable(k, v);
436 }
437
438 Ok(step_results)
439 }
440
441 pub async fn execute_script_with_realtime_output(
443 &self,
444 global_scripts:Arc<Vec<String>>,
445 client_name: &str,
446 step: Step,
447 pipeline_name: &str,
448 output_callback: Option<OutputCallback>
449 ) -> Result<ExecutionResult> {
450 let script_path = Path::new(step.script.as_str());
452 if !script_path.exists() {
453 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
454 }
455
456 let client_config = self.config
457 .clients
458 .get(client_name)
459 .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
460
461 match client_config.execution_method {
462 ExecutionMethod::SSH => {
463 self.execute_script_via_ssh_with_realtime_output(global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
464 }
465 ExecutionMethod::WebSocket => {
466 Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
467 }
468 }
469 }
470
471 async fn execute_script_via_ssh_with_realtime_output(
473 &self,
474 global_scripts:Arc<Vec<String>>,
475 client_config: &ClientConfig,
476 step: Step,
477 server_name: &str,
478 pipeline_name: &str,
479 output_callback: Option<OutputCallback>
480 ) -> Result<ExecutionResult> {
481 let ssh_config = client_config.ssh_config.as_ref()
482 .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
483
484 let start_time = std::time::Instant::now();
485
486 let ssh_config = ssh_config.clone();
488 let script_content = step.script.to_string();
489 let server_name = server_name.to_string();
490 let pipeline_name = pipeline_name.to_string();
491 let step_name = step.name.clone();
492 let extract_rules = step.extract.clone();
493 let variable_manager = self.variable_manager.clone();
494
495 let result = match tokio::task::spawn_blocking(move || {
497 SshExecutor::execute_script_with_realtime_output(
498 global_scripts.clone(),
499 &server_name,
500 &ssh_config,
501 &step,
502 &pipeline_name,
503 &step_name,
504 output_callback,
505 variable_manager,
506 extract_rules
507 )
508 }).await?.context(format!("run script faield")) {
509 Ok(v) => v,
510 Err(e) => {
511
512 let execution_time = start_time.elapsed().as_millis() as u64;
513 return Ok(ExecutionResult{
514 success: false,
515 stdout: "".to_string(),
516 stderr: format!("{:?}", e),
517 script: script_content,
518 exit_code: 0,
519 execution_time_ms: execution_time,
520 error_message: Some(format!("{:?}", e)),
521 });
522 }
523 };
524
525 let execution_time = start_time.elapsed().as_millis() as u64;
526
527 Ok(ExecutionResult {
528 success: result.exit_code == 0,
529 stdout: result.stdout,
530 stderr: result.stderr,
531 script: script_content,
532 exit_code: result.exit_code,
533 execution_time_ms: execution_time,
534 error_message: result.error_message,
535 })
536 }
537
538 pub fn get_available_clients(&self) -> Vec<String> {
540 self.config.clients.keys().cloned().collect()
541 }
542
543 pub fn client_exists(&self, client_name: &str) -> bool {
545 self.config.clients.contains_key(client_name)
546 }
547
548 pub fn get_available_pipelines(&self) -> Vec<String> {
550 self.config.pipelines.iter().map(|p| p.name.clone()).collect()
551 }
552
553 pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
555 self.config.pipelines.iter().any(|p| p.name == pipeline_name)
556 }
557}