use std::process;
use clap::{Arg, Command};
use tracing::{error, info, Level};
use tracing_subscriber::FmtSubscriber;
use arkflow::config::EngineConfig;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let matches = Command::new("Rust流处理引擎")
.version("0.1.0")
.author("Rust流处理引擎团队")
.about("高性能、可靠且易于扩展的数据流处理系统")
.arg(
Arg::new("config")
.short('c')
.long("config")
.value_name("FILE")
.help("指定配置文件路径")
.required(true),
)
.arg(
Arg::new("validate")
.short('v')
.long("validate")
.help("仅验证配置文件,不启动引擎")
.action(clap::ArgAction::SetTrue),
)
.get_matches();
let config_path = matches.get_one::<String>("config").unwrap();
let config = match EngineConfig::from_file(config_path) {
Ok(config) => {
println!("成功加载配置文件: {}", config_path);
config
}
Err(e) => {
println!("加载配置文件失败: {}", e);
process::exit(1);
}
};
if matches.get_flag("validate") {
info!("配置文件验证通过");
return Ok(());
}
init_logging(&config);
let mut streams = Vec::new();
let mut handles = Vec::new();
for (i, stream_config) in config.streams.iter().enumerate() {
info!("Initializing flow #{}", i + 1);
match stream_config.build() {
Ok(stream) => {
streams.push(stream);
}
Err(e) => {
error!("Initializing flow #{} error: {}", i + 1, e);
process::exit(1);
}
}
}
for (i, mut stream) in streams.into_iter().enumerate() {
info!("Starting flow #{}", i + 1);
let handle = tokio::spawn(async move {
match stream.run().await {
Ok(_) => info!("Flow #{} completed successfully", i + 1),
Err(e) => {
error!("Stream #{} ran with error: {}", i + 1, e)
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await?;
}
info!("All flow tasks have been complete");
Ok(())
}
fn init_logging(config: &EngineConfig) -> () {
let log_level = if let Some(logging) = &config.logging {
match logging.level.as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
"info" => Level::INFO,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
}
} else {
Level::INFO
};
let subscriber = FmtSubscriber::builder().with_max_level(log_level).finish();
tracing::subscriber::set_global_default(subscriber)
.expect("You can't set a global default log subscriber");
}