1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use tracing::{error, info};
6
7use crate::config::ConfigManager;
8use crate::models::{
9 ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
10 RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
11};
12use crate::ssh::SshExecutor;
13use crate::ssh::local::LocalExecutor;
14use crate::vars::VariableManager;
15
16pub struct RemoteExecutor {
18 config: RemoteExecutionConfig,
19 variable_manager: VariableManager,
20}
21
22impl RemoteExecutor {
23
24
25 pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
27 let content = std::fs::read_to_string(path)
28 .context("Failed to read YAML configuration file")?;
29
30 Self::from_yaml_str(&content, variables)
31 }
32
33 pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
35 let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
37
38 let mut all_variables = HashMap::new();
40
41 if let Some(v) = initial_variables {
42 all_variables.extend(v);
43 }
44
45 if let Some(v) = variables {
46 all_variables.extend(v);
47 }
48
49 let variable_manager = VariableManager::new(Some(all_variables));
51
52 let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
54 ConfigManager::validate_config(&config)?;
55
56 Ok(Self { config, variable_manager})
57 }
58
59 pub async fn execute_pipeline_with_realtime_output(
61 &mut self, pipeline_name: &str,
63 output_callback: Option<OutputCallback>,
64 log_callback: Option<OutputCallback>
65 ) -> Result<PipelineExecutionResult> {
66 let pipeline = self.config.pipelines.iter()
67 .find(|p| p.name == pipeline_name)
68 .cloned()
69 .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
70
71 let pipeline_name = pipeline.name.clone();
72 let steps: Vec<Step> = pipeline.steps.clone();
73 let start_time = std::time::Instant::now();
74 let mut all_step_results = Vec::new();
75
76 if let Some(callback) = &log_callback {
78 let event = OutputEvent {
79 pipeline_name: pipeline_name.clone(),
80 server_name: "system".to_string(),
81 step: Step::default(), output_type: crate::models::OutputType::Log,
83 content: format!("开始执行流水线: {}", pipeline_name),
84 timestamp: std::time::Instant::now(),
85 variables: self.variable_manager.get_variables().clone(),
86 };
87 callback(event);
88 }
89
90 info!("Starting pipeline: {}", pipeline_name);
91
92 for step in steps {
94 let mut step_var_keys = Vec::new();
96 if let Some(vars) = &step.variables {
97 for (k, v) in vars {
98 self.variable_manager.set_variable(k.clone(), v.clone());
99 step_var_keys.push(k.clone());
100 }
101 }
102 let mut step_with_variables = step.clone();
104 step_with_variables.script = self.variable_manager.replace_variables(&step.script);
105
106 if let Some(callback) = &output_callback {
108 let event = OutputEvent {
109 pipeline_name: pipeline_name.clone(),
110 server_name: "system".to_string(),
111 step: step.clone(), output_type: crate::models::OutputType::StepStarted,
113 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
114 timestamp: std::time::Instant::now(),
115 variables: self.variable_manager.get_variables().clone(),
116 };
117 callback(event);
118 }
119
120 if let Some(callback) = &log_callback {
122 let event = OutputEvent {
123 pipeline_name: pipeline_name.clone(),
124 server_name: "system".to_string(),
125 step: step.clone(), output_type: crate::models::OutputType::Log,
127 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
128 timestamp: std::time::Instant::now(),
129 variables: self.variable_manager.get_variables().clone(),
130 };
131 callback(event);
132 }
133
134 info!("Starting step: {} on {} servers", step.name, step.servers.len());
135
136 let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
138
139 let step_success = step_results.iter().all(|r| r.execution_result.success);
141
142 all_step_results.extend(step_results);
144
145 if let Some(callback) = &output_callback {
147 let status = if step_success { "成功" } else { "失败" };
148 let event = OutputEvent {
149 pipeline_name: pipeline_name.clone(),
150 server_name: "system".to_string(),
151 step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
153 content: format!("步骤完成: {} ({})", step.name, status),
154 timestamp: std::time::Instant::now(),
155 variables: self.variable_manager.get_variables().clone(),
156 };
157 callback(event);
158 }
159
160 if !step_success {
162 info!("Step '{}' failed, stopping pipeline", step.name);
163 break;
164 }
165
166 info!("Step '{}' completed successfully", step.name);
167 }
168
169 let total_time = start_time.elapsed().as_millis() as u64;
170 let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
171
172 if let Some(callback) = &log_callback {
174 let status = if overall_success { "成功" } else { "失败" };
175 let event = OutputEvent {
176 pipeline_name: pipeline_name.clone(),
177 server_name: "system".to_string(),
178 step: Step::default(), output_type: crate::models::OutputType::Log,
180 content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
181 timestamp: std::time::Instant::now(),
182 variables: self.variable_manager.get_variables().clone(),
183 };
184 callback(event);
185 }
186
187 Ok(PipelineExecutionResult {
188 pipeline_name: pipeline_name.clone(),
189 step_results: all_step_results,
190 overall_success,
191 total_execution_time_ms: total_time,
192 })
193 }
194
195 pub async fn execute_all_pipelines_with_realtime_output(
197 &mut self, output_callback: Option<OutputCallback>,
199 log_callback: Option<OutputCallback>
200 ) -> Result<Vec<PipelineExecutionResult>> {
201 let mut results = Vec::new();
202
203 if let Some(callback) = &log_callback {
205 let event = OutputEvent {
206 pipeline_name: "system".to_string(),
207 server_name: "system".to_string(),
208 step: Step::default(), output_type: crate::models::OutputType::Log,
210 content: format!("=== 远程脚本执行器 ==="),
211 timestamp: std::time::Instant::now(),
212 variables: self.variable_manager.get_variables().clone(),
213 };
214 callback(event);
215
216 let event = OutputEvent {
217 pipeline_name: "system".to_string(),
218 server_name: "system".to_string(),
219 step: Step::default(), output_type: crate::models::OutputType::Log,
221 content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
222 timestamp: std::time::Instant::now(),
223 variables: self.variable_manager.get_variables().clone(),
224 };
225 callback(event);
226
227 let event = OutputEvent {
228 pipeline_name: "system".to_string(),
229 server_name: "system".to_string(),
230 step: Step::default(), output_type: crate::models::OutputType::Log,
232 content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
233 timestamp: std::time::Instant::now(),
234 variables: self.variable_manager.get_variables().clone(),
235 };
236 callback(event);
237 }
238
239 let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
241 for pipeline_name in pipeline_names {
242 if let Some(callback) = &log_callback {
244 let event = OutputEvent {
245 pipeline_name: pipeline_name.clone(),
246 server_name: "system".to_string(),
247 step: Step::default(), output_type: crate::models::OutputType::Log,
249 content: format!("开始执行流水线: {}", pipeline_name),
250 timestamp: std::time::Instant::now(),
251 variables: self.variable_manager.get_variables().clone(),
252 };
253 callback(event);
254 }
255 info!("Starting pipeline: {}", pipeline_name);
256 let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
257 let success = result.overall_success;
258 results.push(result);
259 if !success {
260 info!("Pipeline '{}' failed, stopping execution", pipeline_name);
261 break;
262 }
263 info!("Pipeline '{}' completed successfully", pipeline_name);
264 }
265
266 Ok(results)
267 }
268
269 pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
271 self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
272 }
273
274 async fn execute_step_with_realtime_output(
276 &mut self,
277 step: &Step,
278 pipeline_name: &str,
279 output_callback: Option<&OutputCallback>
280 ) -> Result<Vec<StepExecutionResult>> {
281 let start_time = std::time::Instant::now();
282 let config = self.config.clone();
284 let variable_manager = &mut self.variable_manager;
285
286 if step.servers.is_empty() {
288 info!("Executing step: {} locally (no servers specified)", step.name);
290 let output_callback = output_callback.cloned();
291 let step_clone = step.clone();
292 let pipeline_name = pipeline_name.to_string();
293 let step_name = step.name.clone();
294 let mut variables = variable_manager.get_variables().clone();
295 variables.insert("pipeline_name".to_string(), pipeline_name.clone());
296 variables.insert("step_name".to_string(), step_name.clone());
297 let execution_result = LocalExecutor::execute_script_with_realtime_output(
298 &step_clone,
299 &pipeline_name,
300 &step_name,
301 output_callback,
302 variables,
303 ).await?;
304 let success = execution_result.success;
305 if let Some(extract_rules) = step.extract.clone() {
307 if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
308 info!("Failed to extract variables from step '{}': {}", step.name, e);
309 }
310 }
311 let step_result = StepExecutionResult {
312 step_name: step.name.clone(),
313 server_name: "localhost".to_string(),
314 execution_result,
315 overall_success: success,
316 execution_time_ms: start_time.elapsed().as_millis() as u64,
317 };
318 return Ok(vec![step_result]);
319 }
320
321 info!("Executing step: {} on {} servers", step.name, step.servers.len());
323
324 let mut step_results = Vec::new();
325 let mut futures = Vec::new();
326 let mut extracted_vars: Vec<(String, String)> = Vec::new();
328 let clone_variable_manager = variable_manager.clone();
329
330 let server_names: Vec<String> = step.servers.clone();
332 for server_name in server_names {
333 if !config.clients.contains_key(&server_name) {
334 return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
335 }
336
337 let config = config.clone();
339 let step_name = step.name.clone();
340 let output_callback = output_callback.cloned();
341 let clone_step = step.clone();
342 let pipeline_name = pipeline_name.to_string();
343 let mut clone_variable_manager = clone_variable_manager.clone();
344 clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
345 clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
346
347 let future = tokio::spawn(async move {
348 let executor = RemoteExecutor {
350 config,
351 variable_manager:clone_variable_manager,
352 };
353
354 match executor.execute_script_with_realtime_output(&server_name, clone_step, &pipeline_name, output_callback).await {
355 Ok(result) => {
356 info!("Step '{}' on server '{}' completed with exit code: {}",
357 step_name, server_name, result.exit_code);
358 Ok((server_name, result))
359 }
360 Err(e) => {
361 error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
362 Err(e)
363 }
364 }
365
366 });
367
368 futures.push(future);
369 }
370
371 let results = join_all(futures).await;
373
374 for result in results {
375 match result {
376 Ok(Ok((server_name, execution_result))) => {
377 let success = execution_result.success;
378 if let Some(extract_rules) = step.extract.clone() {
380 let mut temp_vm = VariableManager::new(None);
382 if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
383 info!("Failed to extract variables from step '{}': {}", step.name, e);
384 } else {
385 for (k, v) in temp_vm.get_variables() {
386 extracted_vars.push((k.clone(), v.clone()));
387 }
388 }
389 }
390
391 step_results.push(StepExecutionResult {
392 step_name: step.name.clone(),
393 server_name,
394 execution_result,
395 overall_success: success,
396 execution_time_ms: start_time.elapsed().as_millis() as u64,
397 });
398 }
399 Ok(Err(e)) => {
400 return Err(e);
401 }
402 Err(e) => {
403 return Err(anyhow::anyhow!("Task execution failed: {}", e));
404 }
405 }
406 }
407 for (k, v) in extracted_vars {
409 variable_manager.set_variable(k, v);
410 }
411
412 Ok(step_results)
413 }
414
415 async fn execute_step(&mut self, step: &Step) -> Result<Vec<StepExecutionResult>> {
417 self.execute_step_with_realtime_output(step, "unknown", None).await
418 }
419
420 pub async fn execute_script_with_realtime_output(
422 &self,
423 client_name: &str,
424 step: Step,
425 pipeline_name: &str,
426 output_callback: Option<OutputCallback>
427 ) -> Result<ExecutionResult> {
428 let script_path = Path::new(step.script.as_str());
430 if !script_path.exists() {
431 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
432 }
433
434 let client_config = self.config
435 .clients
436 .get(client_name)
437 .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
438
439 match client_config.execution_method {
440 ExecutionMethod::SSH => {
441 self.execute_script_via_ssh_with_realtime_output(client_config, step, client_name, pipeline_name, output_callback).await
442 }
443 ExecutionMethod::WebSocket => {
444 Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
445 }
446 }
447 }
448
449 async fn execute_script_via_ssh_with_realtime_output(
451 &self,
452 client_config: &ClientConfig,
453 step: Step,
454 server_name: &str,
455 pipeline_name: &str,
456 output_callback: Option<OutputCallback>
457 ) -> Result<ExecutionResult> {
458 let ssh_config = client_config.ssh_config.as_ref()
459 .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
460
461 let start_time = std::time::Instant::now();
462
463 let ssh_config = ssh_config.clone();
465 let script_content = step.script.to_string();
466 let server_name = server_name.to_string();
467 let pipeline_name = pipeline_name.to_string();
468 let step_name = step.name.clone();
469 let extract_rules = step.extract.clone();
470 let variable_manager = self.variable_manager.clone();
471 let clone_ssh_config = ssh_config.clone();
472
473 let result = tokio::task::spawn_blocking(move || {
475 SshExecutor::execute_script_with_realtime_output(
476 &server_name,
477 &ssh_config,
478 &step,
479 &pipeline_name,
480 &step_name,
481 output_callback,
482 variable_manager,
483 extract_rules
484 )
485 }).await?.context(format!("{:#?}", clone_ssh_config))?;
486
487 let execution_time = start_time.elapsed().as_millis() as u64;
488
489 Ok(ExecutionResult {
490 success: result.exit_code == 0,
491 stdout: result.stdout,
492 stderr: result.stderr,
493 script: script_content,
494 exit_code: result.exit_code,
495 execution_time_ms: execution_time,
496 error_message: result.error_message,
497 })
498 }
499
500 pub fn get_available_clients(&self) -> Vec<String> {
502 self.config.clients.keys().cloned().collect()
503 }
504
505 pub fn client_exists(&self, client_name: &str) -> bool {
507 self.config.clients.contains_key(client_name)
508 }
509
510 pub fn get_available_pipelines(&self) -> Vec<String> {
512 self.config.pipelines.iter().map(|p| p.name.clone()).collect()
513 }
514
515 pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
517 self.config.pipelines.iter().any(|p| p.name == pipeline_name)
518 }
519}