pub mod config;
pub mod executor;
pub mod models;
pub mod ssh;
pub mod vars;
pub use executor::RemoteExecutor;
pub use models::*;
use net_shell::TemplateEngine;
use ssh2::DisconnectCode::AuthCancelledByUser;
use std::{env, fs};
use std::{collections::HashMap, sync::Arc};
use tracing_subscriber;
#[cfg(test)]
mod tests {
use tera::{Context, Tera};
use super::*;
#[test]
fn test_parse_yaml() {
}
#[test]
fn test_config_parsing() {
let yaml_content = r#"
global_scripts: []
clients:
server1:
name: "server1"
execution_method: ssh
ssh_config:
host: "192.168.1.100"
port: 22
username: "user"
password: "password"
timeout_seconds: 30
pipelines:
- name: "test_pipeline"
steps:
- name: "test_step"
script: "echo 'test'"
servers:
- server1
default_timeout: 60
"#;
let executor = RemoteExecutor::from_yaml_str(yaml_content, None).unwrap();
assert_eq!(executor.get_available_clients().len(), 1);
assert!(executor.client_exists("server1"));
assert_eq!(executor.get_available_pipelines().len(), 1);
assert!(executor.pipeline_exists("test_pipeline"));
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let args: Vec<String> = env::args().collect();
let config_path = if args.len() > 1 {
&args[1]
} else {
"config.yaml"
};
let mut variables = HashMap::new();
variables.insert("new_master_ip".to_string(), "192.168.1.100".to_string());
variables.insert("script_dir".to_string(), "./scripts".to_string());
let mut t = TemplateEngine::with_all_delimiters("#{", "}", "#{%", "%}");
let template_content = fs::read_to_string(config_path)?;
let data:RemoteExecutionConfig = serde_yaml::from_str(&template_content)?;
data.variables.unwrap_or_default().iter().for_each(|(k,v)|{
t.set_variable(k, v.as_str());
});
let parsed_content = t.set_preserve_loop_newlines(false).render_string(template_content.as_str())?;
println!("Parsed YAML Content:\n{}", parsed_content);
let mut executor = RemoteExecutor::from_yaml_str(&parsed_content, Some(variables))?;
let output_callback = Arc::new(|event: models::OutputEvent| {
let step = event.step.clone();
match event.output_type {
models::OutputType::Stdout => {
}
models::OutputType::Stderr => {
eprintln!(
"[STDERR] {}@{}@{}: {}, script:[{}]",
event.pipeline_name,
step.name,
event.server_name,
event.content,
event.step.script
);
}
models::OutputType::Log => {
}
models::OutputType::StepStarted => {
println!("🚀 {}:{}", event.pipeline_name, event.script_path,);
}
models::OutputType::StepCompleted => {
}
}
if !event.variables.is_empty() {
}
});
let res = executor
.execute_all_pipelines_with_realtime_output(
Some(output_callback.clone()),
Some(output_callback),
)
.await?;
let results = res.pipeline_results;
println!("\n=== 执行结果摘要 ===");
for result in &results {
println!(
"\n流水线: {} ({})",
result.title,
if result.overall_success {
"成功"
} else {
"失败"
}
);
println!("总执行时间: {}ms", result.total_execution_time_ms);
println!("步骤结果:");
for step_result in &result.step_results {
let status = if step_result.execution_result.success {
"✅"
} else {
"❌"
};
println!(
" {} [{}:{}] {} - {}ms, {} {}",
status,
result.title,
step_result.title,
step_result.server_name,
step_result.execution_result.execution_time_ms,
step_result
.execution_result
.error_message
.clone()
.unwrap_or_default(),
step_result.scritp_path
);
}
}
let total_pipelines = results.len();
let successful_pipelines = results.iter().filter(|r| r.overall_success).count();
let total_steps = results.iter().map(|r| r.step_results.len()).sum::<usize>();
let successful_steps = results
.iter()
.flat_map(|r| &r.step_results)
.filter(|r| r.execution_result.success)
.count();
println!("\n=== 总体统计 ===");
println!("流水线: {}/{} 成功", successful_pipelines, total_pipelines);
println!("步骤: {}/{} 成功", successful_steps, total_steps);
if !res.success {
println!("执行失败: {}", res.reason);
return Ok(());
}
Ok(())
}