use anyhow::{Context, Result};
use futures::future::join_all;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tracing::{error, info};
use crate::config::ConfigManager;
use crate::models::{
ClientConfig, ExecutionMethod, ExecutionResult, PipelineExecutionResult,
RemoteExecutionConfig, Step, StepExecutionResult, OutputCallback, OutputEvent
};
use crate::ssh::SshExecutor;
use crate::ssh::local::LocalExecutor;
use crate::vars::VariableManager;
use crate::ShellExecutionResult;
pub struct RemoteExecutor {
config: RemoteExecutionConfig,
variable_manager: VariableManager,
}
impl RemoteExecutor {
pub fn from_yaml_file<P: AsRef<Path>>(path: P, variables: Option<HashMap<String, String>>) -> Result<Self> {
let content = std::fs::read_to_string(path)
.context("Failed to read YAML configuration file")?;
Self::from_yaml_str(&content, variables)
}
pub fn from_yaml_str(yaml_content: &str, variables: Option<HashMap<String, String>>) -> Result<Self> {
let initial_variables = ConfigManager::extract_initial_variables(yaml_content)?;
let mut all_variables = HashMap::new();
if let Some(v) = initial_variables {
all_variables.extend(v);
}
if let Some(v) = variables {
all_variables.extend(v);
}
let variable_manager = VariableManager::new(Some(all_variables));
let config = ConfigManager::from_yaml_str_with_variables(yaml_content, &variable_manager)?;
ConfigManager::validate_config(&config)?;
Ok(Self { config, variable_manager})
}
pub async fn execute_pipeline_with_realtime_output(
&mut self, pipeline_name: &str,
output_callback: Option<OutputCallback>,
log_callback: Option<OutputCallback>
) -> Result<PipelineExecutionResult> {
let pipeline = self.config.pipelines.iter()
.find(|p| p.name == pipeline_name)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Pipeline '{}' not found", pipeline_name))?;
let pipeline_name = pipeline.name.clone();
let steps: Vec<Step> = pipeline.steps.clone();
let start_time = std::time::Instant::now();
let mut all_step_results = Vec::new();
if let Some(callback) = &log_callback {
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
server_name: "system".to_string(),
script_path:"".to_string(),
step: Step::default(), output_type: crate::models::OutputType::Log,
content: format!("开始执行流水线: {}", pipeline_name),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
info!("Starting pipeline: {}", pipeline_name);
for step in steps {
let mut step_var_keys = Vec::new();
if let Some(vars) = &step.variables {
for (k, v) in vars {
self.variable_manager.set_variable(k.clone(), v.clone());
step_var_keys.push(k.clone());
}
}
let mut step_with_variables = step.clone();
step_with_variables.script = self.variable_manager.replace_variables(&step.script);
if let Some(callback) = &output_callback {
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
server_name: "system".to_string(),
step: step.clone(), output_type: crate::models::OutputType::StepStarted,
script_path:step.script.clone(),
content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
if let Some(callback) = &log_callback {
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
server_name: "system".to_string(),
step: step.clone(), script_path:step.script.clone(),
output_type: crate::models::OutputType::Log,
content: format!("开始执行步骤: {} ({} 个服务器)", step.name, step.servers.len()),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
info!("Starting step: {} on {} servers", step.name, step.servers.len());
let step_results = self.execute_step_with_realtime_output(pipeline.script.clone(),&step_with_variables, pipeline_name.as_str(), output_callback.as_ref()).await?;
let step_success = step_results.iter().all(|r| r.execution_result.success);
all_step_results.extend(step_results);
if let Some(callback) = &output_callback {
let status = if step_success { "成功" } else { "失败" };
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
script_path:step.script.clone(),
server_name: "system".to_string(),
step: step.clone(), output_type: crate::models::OutputType::StepCompleted,
content: format!("步骤完成: {} ({})", step.name, status),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
if !step_success {
info!("Step '{}' failed, stopping pipeline", step.name);
break;
}
info!("Step '{}' completed successfully", step.name);
}
let total_time = start_time.elapsed().as_millis() as u64;
let overall_success = all_step_results.iter().all(|r| r.execution_result.success);
if let Some(callback) = &log_callback {
let status = if overall_success { "成功" } else { "失败" };
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
script_path:"".to_string(),
server_name: "system".to_string(),
step: Step::default(), output_type: crate::models::OutputType::Log,
content: format!("流水线完成: {} ({}) - 总耗时: {}ms", pipeline_name, status, total_time),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
Ok(PipelineExecutionResult {
title: pipeline.title.clone().unwrap_or(pipeline_name.clone()),
pipeline_name: pipeline_name.clone(),
step_results: all_step_results,
overall_success,
total_execution_time_ms: total_time,
})
}
pub async fn execute_all_pipelines_with_realtime_output(
&mut self, output_callback: Option<OutputCallback>,
log_callback: Option<OutputCallback>
) -> Result<ShellExecutionResult> {
let mut results = Vec::new();
if let Some(callback) = &log_callback {
let event = OutputEvent {
pipeline_name: "system".to_string(),
server_name: "system".to_string(),
script_path :"".to_string(),
step: Step::default(), output_type: crate::models::OutputType::Log,
content: format!("=== 远程脚本执行器 ==="),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
let event = OutputEvent {
pipeline_name: "system".to_string(),
server_name: "system".to_string(),
script_path :"".to_string(),
step: Step::default(), output_type: crate::models::OutputType::Log,
content: format!("配置加载成功,发现 {} 个流水线", self.config.pipelines.len()),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
let event = OutputEvent {
pipeline_name: "system".to_string(),
script_path :"".to_string(),
server_name: "system".to_string(),
step: Step::default(), output_type: crate::models::OutputType::Log,
content: format!("执行模式: 步骤串行执行,同一步骤内服务器并发执行"),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
let pipeline_names: Vec<String> = self.config.pipelines.iter().map(|p| p.name.clone()).collect();
for pipeline_name in pipeline_names {
if let Some(callback) = &log_callback {
let event = OutputEvent {
pipeline_name: pipeline_name.clone(),
server_name: "system".to_string(),
script_path :"".to_string(),
step: Step::default(), output_type: crate::models::OutputType::Log,
content: format!("开始执行流水线: {}", pipeline_name),
timestamp: std::time::Instant::now(),
variables: self.variable_manager.get_variables().clone(),
};
callback(event);
}
info!("Starting pipeline: {}", pipeline_name);
let result = self.execute_pipeline_with_realtime_output(&pipeline_name, output_callback.as_ref().cloned(), log_callback.as_ref().cloned()).await?;
let success = result.overall_success;
results.push(result);
if !success {
info!("Pipeline '{}' failed, stopping execution", pipeline_name);
break;
}
info!("Pipeline '{}' completed successfully", pipeline_name);
}
Ok(ShellExecutionResult{
success: true,
reason: "ok".to_string(),
pipeline_results: results,
})
}
pub async fn execute_pipeline(&mut self, pipeline_name: &str) -> Result<PipelineExecutionResult> {
self.execute_pipeline_with_realtime_output(pipeline_name, None, None).await
}
async fn execute_step_with_realtime_output(
&mut self,
script: Option<String>,
step: &Step,
pipeline_name: &str,
output_callback: Option<&OutputCallback>
) -> Result<Vec<StepExecutionResult>> {
let start_time = std::time::Instant::now();
let config = self.config.clone();
let variable_manager = &mut self.variable_manager;
if step.servers.is_empty() {
info!("Executing step: {} locally (no servers specified)", step.name);
let output_callback = output_callback.cloned();
let step_clone = step.clone();
let pipeline_name = pipeline_name.to_string();
let step_name = step.name.clone();
let mut variables = variable_manager.get_variables().clone();
variables.insert("pipeline_name".to_string(), pipeline_name.clone());
variables.insert("step_name".to_string(), step_name.clone());
let execution_result = LocalExecutor::execute_script_with_realtime_output(
script.clone(),
self.config.global_scripts.clone(),
&step_clone,
&pipeline_name,
&step_name,
output_callback,
variables,
).await?;
let success = execution_result.success;
if let Some(extract_rules) = step.extract.clone() {
if let Err(e) = variable_manager.extract_variables(&extract_rules, &execution_result) {
info!("Failed to extract variables from step '{}': {}", step.name, e);
}
}
let step_result = StepExecutionResult {
title: step.title.clone().unwrap_or(step.name.clone()),
step_name: step.name.clone(),
scritp_path:step.script.clone(),
server_name: "localhost".to_string(),
execution_result,
overall_success: success,
execution_time_ms: start_time.elapsed().as_millis() as u64,
};
return Ok(vec![step_result]);
}
info!("Executing step: {} on {} servers", step.name, step.servers.len());
let mut step_results = Vec::new();
let mut futures = Vec::new();
let mut extracted_vars: Vec<(String, String)> = Vec::new();
let clone_variable_manager = variable_manager.clone();
let server_names: Vec<String> = step.servers.clone();
let global_script= Arc::new(self.config.global_scripts.clone());
let clone_global_script = global_script.clone();
for server_name in server_names {
if !config.clients.contains_key(&server_name) {
return Err(anyhow::anyhow!("Server '{}' not found in configuration", server_name));
}
let config = config.clone();
let step_name = step.name.clone();
let output_callback = output_callback.cloned();
let clone_step = step.clone();
let pipeline_name = pipeline_name.to_string();
let mut clone_variable_manager = clone_variable_manager.clone();
clone_variable_manager.set_variable("pipeline_name".to_string(), pipeline_name.clone());
clone_variable_manager.set_variable("step_name".to_string(), step_name.clone());
let script = script.clone();
let clone_global_script = clone_global_script.clone();
let future = tokio::spawn(async move {
let executor = RemoteExecutor {
config,
variable_manager:clone_variable_manager,
};
match executor.execute_script_with_realtime_output(script,clone_global_script,&server_name, clone_step, &pipeline_name, output_callback).await {
Ok(result) => {
info!("Step '{}' on server '{}' completed with exit code: {}",
step_name, server_name, result.exit_code);
Ok((server_name, result))
}
Err(e) => {
error!("Step '{}' on server '{}' failed: {}", step_name, server_name, e);
Err(e)
}
}
});
futures.push(future);
}
let results = join_all(futures).await;
for result in results {
match result {
Ok(Ok((server_name, execution_result))) => {
let success = execution_result.success;
if let Some(extract_rules) = step.extract.clone() {
let mut temp_vm = VariableManager::new(None);
if let Err(e) = temp_vm.extract_variables(&extract_rules, &execution_result) {
info!("Failed to extract variables from step '{}': {}", step.name, e);
} else {
for (k, v) in temp_vm.get_variables() {
extracted_vars.push((k.clone(), v.clone()));
}
}
}
step_results.push(StepExecutionResult {
title: step.title.clone().unwrap_or(step.name.clone()),
step_name: step.name.clone(),
server_name,
scritp_path:step.script.clone(),
execution_result,
overall_success: success,
execution_time_ms: start_time.elapsed().as_millis() as u64,
});
}
Ok(Err(e)) => {
return Err(e);
}
Err(e) => {
return Err(anyhow::anyhow!("Task execution failed: {}", e));
}
}
}
for (k, v) in extracted_vars {
variable_manager.set_variable(k, v);
}
Ok(step_results)
}
pub async fn execute_script_with_realtime_output(
&self,
script: Option<String>,
global_scripts:Arc<Vec<String>>,
client_name: &str,
step: Step,
pipeline_name: &str,
output_callback: Option<OutputCallback>
) -> Result<ExecutionResult> {
let script_path = Path::new(step.script.as_str());
if !script_path.exists() {
return Err(anyhow::anyhow!("Script '{}' not found", step.script));
}
let client_config = self.config
.clients
.get(client_name)
.ok_or_else(|| anyhow::anyhow!("Client '{}' not found in configuration", client_name))?;
match client_config.execution_method {
ExecutionMethod::SSH => {
self.execute_script_via_ssh_with_realtime_output(script,global_scripts,client_config, step, client_name, pipeline_name, output_callback).await
}
ExecutionMethod::WebSocket => {
Err(anyhow::anyhow!("WebSocket execution not implemented yet"))
}
}
}
async fn execute_script_via_ssh_with_realtime_output(
&self,
script: Option<String>,
global_scripts:Arc<Vec<String>>,
client_config: &ClientConfig,
step: Step,
server_name: &str,
pipeline_name: &str,
output_callback: Option<OutputCallback>
) -> Result<ExecutionResult> {
let ssh_config = client_config.ssh_config.as_ref()
.ok_or_else(|| anyhow::anyhow!("SSH configuration not found for client '{}'", client_config.name))?;
let start_time = std::time::Instant::now();
let ssh_config = ssh_config.clone();
let script_content = step.script.to_string();
let server_name = server_name.to_string();
let pipeline_name = pipeline_name.to_string();
let step_name = step.name.clone();
let extract_rules = step.extract.clone();
let variable_manager = self.variable_manager.clone();
let result = match tokio::task::spawn_blocking(move || {
SshExecutor::execute_script_with_realtime_output(
script.clone(),
global_scripts.clone(),
&server_name,
&ssh_config,
&step,
&pipeline_name,
&step_name,
output_callback,
variable_manager,
extract_rules
)
}).await?.context(format!("run script faield")) {
Ok(v) => v,
Err(e) => {
let execution_time = start_time.elapsed().as_millis() as u64;
return Ok(ExecutionResult{
success: false,
stdout: "".to_string(),
stderr: format!("{:?}", e),
script: script_content,
exit_code: 0,
execution_time_ms: execution_time,
error_message: Some(format!("{:?}", e)),
});
}
};
let execution_time = start_time.elapsed().as_millis() as u64;
Ok(ExecutionResult {
success: result.exit_code == 0,
stdout: result.stdout,
stderr: result.stderr,
script: script_content,
exit_code: result.exit_code,
execution_time_ms: execution_time,
error_message: result.error_message,
})
}
pub fn get_available_clients(&self) -> Vec<String> {
self.config.clients.keys().cloned().collect()
}
pub fn client_exists(&self, client_name: &str) -> bool {
self.config.clients.contains_key(client_name)
}
pub fn get_available_pipelines(&self) -> Vec<String> {
self.config.pipelines.iter().map(|p| p.name.clone()).collect()
}
pub fn pipeline_exists(&self, pipeline_name: &str) -> bool {
self.config.pipelines.iter().any(|p| p.name == pipeline_name)
}
}