1use anyhow::{Context, Result};
2use futures::future::join_all;
3use std::collections::HashMap;
4use std::path::Path;
5use tracing::{error, info};
6
7use crate::config::ConfigManager;
8use crate::models::{
9 ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
10 RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
11};
12use crate::ssh::SshExecutor;
13use crate::ssh::local::LocalExecutor;
14use crate::vars::VariableManager;
15
16pub struct RemoteExecutor {
18 config: RemoteExecutionConfig,
19 variable_manager: VariableManager,
20}
21
22impl RemoteExecutor {
23
24
25 pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
27 let content = std::fs::read_to_string(path)
28 .context("Failed to read YAML configuration file")?;
29
30 Self::from_yaml_str(&content, variables)
31 }
32
33 pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
35 let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
37
38 let mut all_variables = HashMap::new();
40
41 if let Some(v) = initial_variables {
42 all_variables.extend(v);
43 }
44
45 if let Some(v) = variables {
46 all_variables.extend(v);
47 }
48
49 let variable_manager = VariableManager::new(Some(all_variables));
51
52 let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
54 ConfigManager::validate_config(&config)?;
55
56 Ok(Self { config, variable_manager})
57 }
58
59 pub async fn execute_pipeline_with_realtime_output(
61 &mut self, pipeline_name: &str,
63 output_callback: Option<OutputCallback>,
64 log_callback: Option<OutputCallback>
65 ) -> Result<PipelineExecutionResult> {
66 let pipeline = self.config.pipelines.iter()
67 .find(|p| p.name == pipeline_name)
68 .cloned()
69 .ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
70
71 let pipeline_name = pipeline.name.clone();
72 let steps: Vec<Step> = pipeline.steps.clone();
73 let start_time = std::time::Instant::now();
74 let mut all_step_results = Vec::new();
75
76 if let Some(callback) = &log_callback {
78 let event = OutputEvent {
79 pipeline_name: pipeline_name.clone(),
80 server_name: "system".to_string(),
81 step: Step::default(), output_type: crate::models::OutputType::Log,
83 content: format!("开始执行流水线: {}", pipeline_name),
84 timestamp: std::time::Instant::now(),
85 variables: self.variable_manager.get_variables().clone(),
86 };
87 callback(event);
88 }
89
90 info!("Starting pipeline: {}", pipeline_name);
91
92 for step in steps {
94 let mut step_var_keys = Vec::new();
96 if let Some(vars) = &step.variables {
97 for (k, v) in vars {
98 self.variable_manager.set_variable(k.clone(), v.clone());
99 step_var_keys.push(k.clone());
100 }
101 }
102 let mut step_with_variables = step.clone();
104 step_with_variables.script = self.variable_manager.replace_variables(&step.script);
105
106 if let Some(callback) = &output_callback {
108 let event = OutputEvent {
109 pipeline_name: pipeline_name.clone(),
110 server_name: "system".to_string(),
111 step: step.clone(), output_type: crate::models::OutputType::StepStarted,
113 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
114 timestamp: std::time::Instant::now(),
115 variables: self.variable_manager.get_variables().clone(),
116 };
117 callback(event);
118 }
119
120 if let Some(callback) = &log_callback {
122 let event = OutputEvent {
123 pipeline_name: pipeline_name.clone(),
124 server_name: "system".to_string(),
125 step: step.clone(), output_type: crate::models::OutputType::Log,
127 content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
128 timestamp: std::time::Instant::now(),
129 variables: self.variable_manager.get_variables().clone(),
130 };
131 callback(event);
132 }
133
134 info!("Starting step: {} on {} servers", step.name, step.servers.len());
135
136 let step_results = self.execute_step_with_realtime_output(&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
138
139 let step_success = step_results.iter().all(|r| r.execution_result.success);
141
142 all_step_results.extend(step_results);
144
145 if let Some(callback) = &output_callback {
147 let status = if step_success { "成功" } else { "失败" };
148 let event = OutputEvent {
149 pipeline_name: pipeline_name.clone(),
150 server_name: "system".to_string(),
151 step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
153 content: format!("步骤完成: {} ({})", step.name, status),
154 timestamp: std::time::Instant::now(),
155 variables: self.variable_manager.get_variables().clone(),
156 };
157 callback(event);
158 }
159
160 if !step_success {
162 info!("Step '{}' failed, stopping pipeline", step.name);
163 break;
164 }
165
166 info!("Step '{}' completed successfully", step.name);
167 }
168
169 let total_time = start_time.elapsed().as_millis() as u64;
170 let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
171
172 if let Some(callback) = &log_callback {
174 let status = if overall_success { "成功" } else { "失败" };
175 let event = OutputEvent {
176 pipeline_name: pipeline_name.clone(),
177 server_name: "system".to_string(),
178 step: Step::default(), output_type: crate::models::OutputType::Log,
180 content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
181 timestamp: std::time::Instant::now(),
182 variables: self.variable_manager.get_variables().clone(),
183 };
184 callback(event);
185 }
186
187 Ok(PipelineExecutionResult {
188 title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
189 pipeline_name: pipeline_name.clone(),
190 step_results: all_step_results,
191 overall_success,
192 total_execution_time_ms: total_time,
193 })
194 }
195
196 pub async fn execute_all_pipelines_with_realtime_output(
198 &mut self, output_callback: Option<OutputCallback>,
200 log_callback: Option<OutputCallback>
201 ) -> Result<Vec<PipelineExecutionResult>> {
202 let mut results = Vec::new();
203
204 if let Some(callback) = &log_callback {
206 let event = OutputEvent {
207 pipeline_name: "system".to_string(),
208 server_name: "system".to_string(),
209 step: Step::default(), output_type: crate::models::OutputType::Log,
211 content: format!("=== 远程脚本执行器 ==="),
212 timestamp: std::time::Instant::now(),
213 variables: self.variable_manager.get_variables().clone(),
214 };
215 callback(event);
216
217 let event = OutputEvent {
218 pipeline_name: "system".to_string(),
219 server_name: "system".to_string(),
220 step: Step::default(), output_type: crate::models::OutputType::Log,
222 content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
223 timestamp: std::time::Instant::now(),
224 variables: self.variable_manager.get_variables().clone(),
225 };
226 callback(event);
227
228 let event = OutputEvent {
229 pipeline_name: "system".to_string(),
230 server_name: "system".to_string(),
231 step: Step::default(), output_type: crate::models::OutputType::Log,
233 content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
234 timestamp: std::time::Instant::now(),
235 variables: self.variable_manager.get_variables().clone(),
236 };
237 callback(event);
238 }
239
240 let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
242 for pipeline_name in pipeline_names {
243 if let Some(callback) = &log_callback {
245 let event = OutputEvent {
246 pipeline_name: pipeline_name.clone(),
247 server_name: "system".to_string(),
248 step: Step::default(), output_type: crate::models::OutputType::Log,
250 content: format!("开始执行流水线: {}", pipeline_name),
251 timestamp: std::time::Instant::now(),
252 variables: self.variable_manager.get_variables().clone(),
253 };
254 callback(event);
255 }
256 info!("Starting pipeline: {}", pipeline_name);
257 let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
258 let success = result.overall_success;
259 results.push(result);
260 if !success {
261 info!("Pipeline '{}' failed, stopping execution", pipeline_name);
262 break;
263 }
264 info!("Pipeline '{}' completed successfully", pipeline_name);
265 }
266
267 Ok(results)
268 }
269
270 pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
272 self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
273 }
274
275 async fn execute_step_with_realtime_output(
277 &mut self,
278 step: &Step,
279 pipeline_name: &str,
280 output_callback: Option<&OutputCallback>
281 ) -> Result<Vec<StepExecutionResult>> {
282 let start_time = std::time::Instant::now();
283 let config = self.config.clone();
285 let variable_manager = &mut self.variable_manager;
286
287 if step.servers.is_empty() {
289 info!("Executing step: {} locally (no servers specified)", step.name);
291 let output_callback = output_callback.cloned();
292 let step_clone = step.clone();
293 let pipeline_name = pipeline_name.to_string();
294 let step_name = step.name.clone();
295 let mut variables = variable_manager.get_variables().clone();
296 variables.insert("pipeline_name".to_string(), pipeline_name.clone());
297 variables.insert("step_name".to_string(), step_name.clone());
298 let execution_result = LocalExecutor::execute_script_with_realtime_output(
299 &step_clone,
300 &pipeline_name,
301 &step_name,
302 output_callback,
303 variables,
304 ).await?;
305 let success = execution_result.success;
306 if let Some(extract_rules) = step.extract.clone() {
308 if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
309 info!("Failed to extract variables from step '{}': {}", step.name, e);
310 }
311 }
312 let step_result = StepExecutionResult {
313 title: step.title.clone().unwrap_or(step.name.clone()),
314 step_name: step.name.clone(),
315 server_name: "localhost".to_string(),
316 execution_result,
317 overall_success: success,
318 execution_time_ms: start_time.elapsed().as_millis() as u64,
319 };
320 return Ok(vec![step_result]);
321 }
322
323 info!("Executing step: {} on {} servers", step.name, step.servers.len());
325
326 let mut step_results = Vec::new();
327 let mut futures = Vec::new();
328 let mut extracted_vars: Vec<(String, String)> = Vec::new();
330 let clone_variable_manager = variable_manager.clone();
331
332 let server_names: Vec<String> = step.servers.clone();
334 for server_name in server_names {
335 if !config.clients.contains_key(&server_name) {
336 return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
337 }
338
339 let config = config.clone();
341 let step_name = step.name.clone();
342 let output_callback = output_callback.cloned();
343 let clone_step = step.clone();
344 let pipeline_name = pipeline_name.to_string();
345 let mut clone_variable_manager = clone_variable_manager.clone();
346 clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
347 clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
348
349 let future = tokio::spawn(async move {
350 let executor = RemoteExecutor {
352 config,
353 variable_manager:clone_variable_manager,
354 };
355
356 match executor.execute_script_with_realtime_output(&server_name, clone_step, &pipeline_name, output_callback).await {
357 Ok(result) => {
358 info!("Step '{}' on server '{}' completed with exit code: {}",
359 step_name, server_name, result.exit_code);
360 Ok((server_name, result))
361 }
362 Err(e) => {
363 error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
364 Err(e)
365 }
366 }
367
368 });
369
370 futures.push(future);
371 }
372
373 let results = join_all(futures).await;
375
376 for result in results {
377 match result {
378 Ok(Ok((server_name, execution_result))) => {
379 let success = execution_result.success;
380 if let Some(extract_rules) = step.extract.clone() {
382 let mut temp_vm = VariableManager::new(None);
384 if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
385 info!("Failed to extract variables from step '{}': {}", step.name, e);
386 } else {
387 for (k, v) in temp_vm.get_variables() {
388 extracted_vars.push((k.clone(), v.clone()));
389 }
390 }
391 }
392
393 step_results.push(StepExecutionResult {
394 title: step.title.clone().unwrap_or(step.name.clone()),
395 step_name: step.name.clone(),
396 server_name,
397 execution_result,
398 overall_success: success,
399 execution_time_ms: start_time.elapsed().as_millis() as u64,
400 });
401 }
402 Ok(Err(e)) => {
403 return Err(e);
404 }
405 Err(e) => {
406 return Err(anyhow::anyhow!("Task execution failed: {}", e));
407 }
408 }
409 }
410 for (k, v) in extracted_vars {
412 variable_manager.set_variable(k, v);
413 }
414
415 Ok(step_results)
416 }
417
418 async fn execute_step(&mut self, step: &Step) -> Result<Vec<StepExecutionResult>> {
420 self.execute_step_with_realtime_output(step, "unknown", None).await
421 }
422
423 pub async fn execute_script_with_realtime_output(
425 &self,
426 client_name: &str,
427 step: Step,
428 pipeline_name: &str,
429 output_callback: Option<OutputCallback>
430 ) -> Result<ExecutionResult> {
431 let script_path = Path::new(step.script.as_str());
433 if !script_path.exists() {
434 return Err(anyhow::anyhow!("Script '{}' not found", step.script));
435 }
436
437 let client_config = self.config
438 .clients
439 .get(client_name)
440 .ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
441
442 match client_config.execution_method {
443 ExecutionMethod::SSH => {
444 self.execute_script_via_ssh_with_realtime_output(client_config, step, client_name, pipeline_name, output_callback).await
445 }
446 ExecutionMethod::WebSocket => {
447 Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
448 }
449 }
450 }
451
452 async fn execute_script_via_ssh_with_realtime_output(
454 &self,
455 client_config: &ClientConfig,
456 step: Step,
457 server_name: &str,
458 pipeline_name: &str,
459 output_callback: Option<OutputCallback>
460 ) -> Result<ExecutionResult> {
461 let ssh_config = client_config.ssh_config.as_ref()
462 .ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
463
464 let start_time = std::time::Instant::now();
465
466 let ssh_config = ssh_config.clone();
468 let script_content = step.script.to_string();
469 let server_name = server_name.to_string();
470 let pipeline_name = pipeline_name.to_string();
471 let step_name = step.name.clone();
472 let extract_rules = step.extract.clone();
473 let variable_manager = self.variable_manager.clone();
474 let clone_ssh_config = ssh_config.clone();
475
476 let result = tokio::task::spawn_blocking(move || {
478 SshExecutor::execute_script_with_realtime_output(
479 &server_name,
480 &ssh_config,
481 &step,
482 &pipeline_name,
483 &step_name,
484 output_callback,
485 variable_manager,
486 extract_rules
487 )
488 }).await?.context(format!("{:#?}", clone_ssh_config))?;
489
490 let execution_time = start_time.elapsed().as_millis() as u64;
491
492 Ok(ExecutionResult {
493 success: result.exit_code == 0,
494 stdout: result.stdout,
495 stderr: result.stderr,
496 script: script_content,
497 exit_code: result.exit_code,
498 execution_time_ms: execution_time,
499 error_message: result.error_message,
500 })
501 }
502
503 pub fn get_available_clients(&self) -> Vec<String> {
505 self.config.clients.keys().cloned().collect()
506 }
507
508 pub fn client_exists(&self, client_name: &str) -> bool {
510 self.config.clients.contains_key(client_name)
511 }
512
513 pub fn get_available_pipelines(&self) -> Vec<String> {
515 self.config.pipelines.iter().map(|p| p.name.clone()).collect()
516 }
517
518 pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
520 self.config.pipelines.iter().any(|p| p.name == pipeline_name)
521 }
522}