1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use tracing::{error, info};
6
7use crate::config::ConfigManager;
8use crate::models::{
9 ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
10 RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
11};
12use crate::ssh::SshExecutor;
13use crate::ssh::local::LocalExecutor;
14use crate::vars::VariableManager;
15use crate::ShellExecutionResult;
16
17pub struct RemoteExecutor {
19 config: RemoteExecutionConfig,
20 variable_manager: VariableManager,
21}
22
23impl RemoteExecutor {
24
25
26 pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
28 let content = std::fs::read_to_string(path)
29 .context("Failed to read YAML configuration file")?;
30
31 Self::from_yaml_str(&content, variables)
32 }
33
34 pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
36 let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
38
39 let mut all_variables = HashMap::new();
41
42 if let Some(v) = initial_variables {
43 all_variables.extend(v);
44 }
45
46 if let Some(v) = variables {
47 all_variables.extend(v);
48 }
49
50 let variable_manager = VariableManager::new(Some(all_variables));
52
53 let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
55 ConfigManager::validate_config(&config)?;
56
57 Ok(Self { config, variable_manager})
58 }
59
60 pub async fn execute_pipeline_with_realtime_output(
62 &mut self, pipeline_name: &str,
64 output_callback: Option<OutputCallback>,
65 log_callback: Option<OutputCallback>
66 ) -> Result<PipelineExecutionResult> {
67 let pipeline = self.config.pipelines.iter()
68 .find(|p| p.name == pipeline_name)
69 .cloned()
70 .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
71
72 let pipeline_name = pipeline.name.clone();
73 let steps: Vec<Step> = pipeline.steps.clone();
74 let start_time = std::time::Instant::now();
75 let mut all_step_results = Vec::new();
76
77 if let Some(callback) = &log_callback {
79 let event = OutputEvent {
80 pipeline_name: pipeline_name.clone(),
81 server_name: "system".to_string(),
82 step: Step::default(), output_type: crate::models::OutputType::Log,
84 content: format!("开始执行流水线: {}", pipeline_name),
85 timestamp: std::time::Instant::now(),
86 variables: self.variable_manager.get_variables().clone(),
87 };
88 callback(event);
89 }
90
91 info!("Starting pipeline: {}", pipeline_name);
92
93 for step in steps {
95 let mut step_var_keys = Vec::new();
97 if let Some(vars) = &step.variables {
98 for (k, v) in vars {
99 self.variable_manager.set_variable(k.clone(), v.clone());
100 step_var_keys.push(k.clone());
101 }
102 }
103 let mut step_with_variables = step.clone();
105 step_with_variables.script = self.variable_manager.replace_variables(&step.script);
106
107 if let Some(callback) = &output_callback {
109 let event = OutputEvent {
110 pipeline_name: pipeline_name.clone(),
111 server_name: "system".to_string(),
112 step: step.clone(), output_type: crate::models::OutputType::StepStarted,
114 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
115 timestamp: std::time::Instant::now(),
116 variables: self.variable_manager.get_variables().clone(),
117 };
118 callback(event);
119 }
120
121 if let Some(callback) = &log_callback {
123 let event = OutputEvent {
124 pipeline_name: pipeline_name.clone(),
125 server_name: "system".to_string(),
126 step: step.clone(), output_type: crate::models::OutputType::Log,
128 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
129 timestamp: std::time::Instant::now(),
130 variables: self.variable_manager.get_variables().clone(),
131 };
132 callback(event);
133 }
134
135 info!("Starting step: {} on {} servers", step.name, step.servers.len());
136
137 let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
139
140 let step_success = step_results.iter().all(|r| r.execution_result.success);
142
143 all_step_results.extend(step_results);
145
146 if let Some(callback) = &output_callback {
148 let status = if step_success { "成功" } else { "失败" };
149 let event = OutputEvent {
150 pipeline_name: pipeline_name.clone(),
151 server_name: "system".to_string(),
152 step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
154 content: format!("步骤完成: {} ({})", step.name, status),
155 timestamp: std::time::Instant::now(),
156 variables: self.variable_manager.get_variables().clone(),
157 };
158 callback(event);
159 }
160
161 if !step_success {
163 info!("Step '{}' failed, stopping pipeline", step.name);
164 break;
165 }
166
167 info!("Step '{}' completed successfully", step.name);
168 }
169
170 let total_time = start_time.elapsed().as_millis() as u64;
171 let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
172
173 if let Some(callback) = &log_callback {
175 let status = if overall_success { "成功" } else { "失败" };
176 let event = OutputEvent {
177 pipeline_name: pipeline_name.clone(),
178 server_name: "system".to_string(),
179 step: Step::default(), output_type: crate::models::OutputType::Log,
181 content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
182 timestamp: std::time::Instant::now(),
183 variables: self.variable_manager.get_variables().clone(),
184 };
185 callback(event);
186 }
187
188 Ok(PipelineExecutionResult {
189 title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
190 pipeline_name: pipeline_name.clone(),
191 step_results: all_step_results,
192 overall_success,
193 total_execution_time_ms: total_time,
194 })
195 }
196
197 pub async fn execute_all_pipelines_with_realtime_output(
199 &mut self, output_callback: Option<OutputCallback>,
201 log_callback: Option<OutputCallback>
202 ) -> Result<ShellExecutionResult> {
203 let mut results = Vec::new();
204
205 if let Some(callback) = &log_callback {
207 let event = OutputEvent {
208 pipeline_name: "system".to_string(),
209 server_name: "system".to_string(),
210 step: Step::default(), output_type: crate::models::OutputType::Log,
212 content: format!("=== 远程脚本执行器 ==="),
213 timestamp: std::time::Instant::now(),
214 variables: self.variable_manager.get_variables().clone(),
215 };
216 callback(event);
217
218 let event = OutputEvent {
219 pipeline_name: "system".to_string(),
220 server_name: "system".to_string(),
221 step: Step::default(), output_type: crate::models::OutputType::Log,
223 content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
224 timestamp: std::time::Instant::now(),
225 variables: self.variable_manager.get_variables().clone(),
226 };
227 callback(event);
228
229 let event = OutputEvent {
230 pipeline_name: "system".to_string(),
231 server_name: "system".to_string(),
232 step: Step::default(), output_type: crate::models::OutputType::Log,
234 content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
235 timestamp: std::time::Instant::now(),
236 variables: self.variable_manager.get_variables().clone(),
237 };
238 callback(event);
239 }
240
241 let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
243 for pipeline_name in pipeline_names {
244 if let Some(callback) = &log_callback {
246 let event = OutputEvent {
247 pipeline_name: pipeline_name.clone(),
248 server_name: "system".to_string(),
249 step: Step::default(), output_type: crate::models::OutputType::Log,
251 content: format!("开始执行流水线: {}", pipeline_name),
252 timestamp: std::time::Instant::now(),
253 variables: self.variable_manager.get_variables().clone(),
254 };
255 callback(event);
256 }
257 info!("Starting pipeline: {}", pipeline_name);
258
259 let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
260 let success = result.overall_success;
261 results.push(result);
262 if !success {
263 info!("Pipeline '{}' failed, stopping execution", pipeline_name);
264 break;
265 }
266 info!("Pipeline '{}' completed successfully", pipeline_name);
267 }
268
269 Ok(ShellExecutionResult{
270 success: true,
271 reason: "ok".to_string(),
272 pipeline_results: results,
273 })
274 }
275
276 pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
278 self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
279 }
280
281 async fn execute_step_with_realtime_output(
283 &mut self,
284 step: &Step,
285 pipeline_name: &str,
286 output_callback: Option<&OutputCallback>
287 ) -> Result<Vec<StepExecutionResult>> {
288 let start_time = std::time::Instant::now();
289 let config = self.config.clone();
291 let variable_manager = &mut self.variable_manager;
292
293 if step.servers.is_empty() {
295 info!("Executing step: {} locally (no servers specified)", step.name);
297 let output_callback = output_callback.cloned();
298 let step_clone = step.clone();
299 let pipeline_name = pipeline_name.to_string();
300 let step_name = step.name.clone();
301 let mut variables = variable_manager.get_variables().clone();
302 variables.insert("pipeline_name".to_string(), pipeline_name.clone());
303 variables.insert("step_name".to_string(), step_name.clone());
304 let execution_result = LocalExecutor::execute_script_with_realtime_output(
305 &step_clone,
306 &pipeline_name,
307 &step_name,
308 output_callback,
309 variables,
310 ).await?;
311 let success = execution_result.success;
312 if let Some(extract_rules) = step.extract.clone() {
314 if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
315 info!("Failed to extract variables from step '{}': {}", step.name, e);
316 }
317 }
318 let step_result = StepExecutionResult {
319 title: step.title.clone().unwrap_or(step.name.clone()),
320 step_name: step.name.clone(),
321 server_name: "localhost".to_string(),
322 execution_result,
323 overall_success: success,
324 execution_time_ms: start_time.elapsed().as_millis() as u64,
325 };
326 return Ok(vec![step_result]);
327 }
328
329 info!("Executing step: {} on {} servers", step.name, step.servers.len());
331
332 let mut step_results = Vec::new();
333 let mut futures = Vec::new();
334 let mut extracted_vars: Vec<(String, String)> = Vec::new();
336 let clone_variable_manager = variable_manager.clone();
337
338 let server_names: Vec<String> = step.servers.clone();
340 for server_name in server_names {
341 if !config.clients.contains_key(&server_name) {
342 return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
343 }
344
345 let config = config.clone();
347 let step_name = step.name.clone();
348 let output_callback = output_callback.cloned();
349 let clone_step = step.clone();
350 let pipeline_name = pipeline_name.to_string();
351 let mut clone_variable_manager = clone_variable_manager.clone();
352 clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
353 clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
354
355 let future = tokio::spawn(async move {
356 let executor = RemoteExecutor {
358 config,
359 variable_manager:clone_variable_manager,
360 };
361
362 match executor.execute_script_with_realtime_output(&server_name, clone_step, &pipeline_name, output_callback).await {
363 Ok(result) => {
364 info!("Step '{}' on server '{}' completed with exit code: {}",
365 step_name, server_name, result.exit_code);
366 Ok((server_name, result))
367 }
368 Err(e) => {
369 error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
370 Err(e)
371 }
372 }
373
374 });
375
376 futures.push(future);
377 }
378
379 let results = join_all(futures).await;
381
382 for result in results {
383 match result {
384 Ok(Ok((server_name, execution_result))) => {
385 let success = execution_result.success;
386 if let Some(extract_rules) = step.extract.clone() {
388 let mut temp_vm = VariableManager::new(None);
390 if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
391 info!("Failed to extract variables from step '{}': {}", step.name, e);
392 } else {
393 for (k, v) in temp_vm.get_variables() {
394 extracted_vars.push((k.clone(), v.clone()));
395 }
396 }
397 }
398
399 step_results.push(StepExecutionResult {
400 title: step.title.clone().unwrap_or(step.name.clone()),
401 step_name: step.name.clone(),
402 server_name,
403 execution_result,
404 overall_success: success,
405 execution_time_ms: start_time.elapsed().as_millis() as u64,
406 });
407 }
408 Ok(Err(e)) => {
409 return Err(e);
410 }
411 Err(e) => {
412 return Err(anyhow::anyhow!("Task execution failed: {}", e));
413 }
414 }
415 }
416 for (k, v) in extracted_vars {
418 variable_manager.set_variable(k, v);
419 }
420
421 Ok(step_results)
422 }
423
424 async fn execute_step(&mut self, step: &Step) -> Result<Vec<StepExecutionResult>> {
426 self.execute_step_with_realtime_output(step, "unknown", None).await
427 }
428
429 pub async fn execute_script_with_realtime_output(
431 &self,
432 client_name: &str,
433 step: Step,
434 pipeline_name: &str,
435 output_callback: Option<OutputCallback>
436 ) -> Result<ExecutionResult> {
437 let script_path = Path::new(step.script.as_str());
439 if !script_path.exists() {
440 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
441 }
442
443 let client_config = self.config
444 .clients
445 .get(client_name)
446 .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
447
448 match client_config.execution_method {
449 ExecutionMethod::SSH => {
450 self.execute_script_via_ssh_with_realtime_output(client_config, step, client_name, pipeline_name, output_callback).await
451 }
452 ExecutionMethod::WebSocket => {
453 Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
454 }
455 }
456 }
457
458 async fn execute_script_via_ssh_with_realtime_output(
460 &self,
461 client_config: &ClientConfig,
462 step: Step,
463 server_name: &str,
464 pipeline_name: &str,
465 output_callback: Option<OutputCallback>
466 ) -> Result<ExecutionResult> {
467 let ssh_config = client_config.ssh_config.as_ref()
468 .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
469
470 let start_time = std::time::Instant::now();
471
472 let ssh_config = ssh_config.clone();
474 let script_content = step.script.to_string();
475 let server_name = server_name.to_string();
476 let pipeline_name = pipeline_name.to_string();
477 let step_name = step.name.clone();
478 let extract_rules = step.extract.clone();
479 let variable_manager = self.variable_manager.clone();
480 let clone_ssh_config = ssh_config.clone();
481
482 let result = match tokio::task::spawn_blocking(move || {
484 SshExecutor::execute_script_with_realtime_output(
485 &server_name,
486 &ssh_config,
487 &step,
488 &pipeline_name,
489 &step_name,
490 output_callback,
491 variable_manager,
492 extract_rules
493 )
494 }).await?.context(format!("run script faield")) {
495 Ok(v) => v,
496 Err(e) => {
497
498 let execution_time = start_time.elapsed().as_millis() as u64;
499 return Ok(ExecutionResult{
500 success: false,
501 stdout: "".to_string(),
502 stderr: format!("{:?}", e),
503 script: script_content,
504 exit_code: 0,
505 execution_time_ms: execution_time,
506 error_message: Some(format!("{:?}", e)),
507 });
508 }
509 };
510
511 let execution_time = start_time.elapsed().as_millis() as u64;
512
513 Ok(ExecutionResult {
514 success: result.exit_code == 0,
515 stdout: result.stdout,
516 stderr: result.stderr,
517 script: script_content,
518 exit_code: result.exit_code,
519 execution_time_ms: execution_time,
520 error_message: result.error_message,
521 })
522 }
523
524 pub fn get_available_clients(&self) -> Vec<String> {
526 self.config.clients.keys().cloned().collect()
527 }
528
529 pub fn client_exists(&self, client_name: &str) -> bool {
531 self.config.clients.contains_key(client_name)
532 }
533
534 pub fn get_available_pipelines(&self) -> Vec<String> {
536 self.config.pipelines.iter().map(|p| p.name.clone()).collect()
537 }
538
539 pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
541 self.config.pipelines.iter().any(|p| p.name == pipeline_name)
542 }
543}