1use crate::config::EngineConfig;
2use crate::engine::Engine;
3use clap::{Arg, Command};
4use std::process;
5use tracing::{error, info, Level};
6use tracing_subscriber::FmtSubscriber;
7
8pub struct Cli {
9 pub config: Option<EngineConfig>,
10}
11impl Default for Cli {
12 fn default() -> Self {
13 Self { config: None }
14 }
15}
16
17impl Cli {
18 pub fn parse(&mut self) -> Result<(), Box<dyn std::error::Error>> {
19 let matches = Command::new("arkflow")
20 .version("0.1.0")
21 .author("chenquan")
22 .about("High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors.")
23 .arg(
24 Arg::new("config")
25 .short('c')
26 .long("config")
27 .value_name("FILE")
28 .help("Specify the profile path.")
29 .required(true),
30 )
31 .arg(
32 Arg::new("validate")
33 .short('v')
34 .long("validate")
35 .help("Only the profile is verified, not the engine is started.")
36 .action(clap::ArgAction::SetTrue),
37 )
38 .get_matches();
39
40 let config_path = matches.get_one::<String>("config").unwrap();
42
43 let config = match EngineConfig::from_file(config_path) {
45 Ok(config) => config,
46 Err(e) => {
47 error!("Failed to load configuration file: {}", e);
48 process::exit(1);
49 }
50 };
51
52 if matches.get_flag("validate") {
54 info!("The config is validated.");
55 return Ok(());
56 }
57 self.config = Some(config);
58 Ok(())
59 }
60 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
61 let config = self.config.clone().unwrap();
63 init_logging(&config);
64 let engine = Engine::new(config);
65 engine.run().await?;
66 Ok(())
67 }
68}
69fn init_logging(config: &EngineConfig) -> () {
70 let log_level = if let Some(logging) = &config.logging {
71 match logging.level.as_str() {
72 "trace" => Level::TRACE,
73 "debug" => Level::DEBUG,
74 "info" => Level::INFO,
75 "warn" => Level::WARN,
76 "error" => Level::ERROR,
77 _ => Level::INFO,
78 }
79 } else {
80 Level::INFO
81 };
82
83 let subscriber = FmtSubscriber::builder().with_max_level(log_level).finish();
84
85 tracing::subscriber::set_global_default(subscriber)
86 .expect("You can't set a global default log subscriber");
87}