arkflow_core/cli/
mod.rs

1use crate::config::EngineConfig;
2use crate::engine::Engine;
3use clap::{Arg, Command};
4use std::process;
5use tracing::{error, info, Level};
6use tracing_subscriber::FmtSubscriber;
7
8pub struct Cli {
9    pub config: Option<EngineConfig>,
10}
11impl Default for Cli {
12    fn default() -> Self {
13        Self { config: None }
14    }
15}
16
17impl Cli {
18    pub fn parse(&mut self) -> Result<(), Box<dyn std::error::Error>> {
19        let matches = Command::new("arkflow")
20            .version("0.1.0")
21            .author("chenquan")
22            .about("High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors.")
23            .arg(
24                Arg::new("config")
25                    .short('c')
26                    .long("config")
27                    .value_name("FILE")
28                    .help("Specify the profile path.")
29                    .required(true),
30            )
31            .arg(
32                Arg::new("validate")
33                    .short('v')
34                    .long("validate")
35                    .help("Only the profile is verified, not the engine is started.")
36                    .action(clap::ArgAction::SetTrue),
37            )
38            .get_matches();
39
40        // Get the profile path
41        let config_path = matches.get_one::<String>("config").unwrap();
42
43        // Get the profile path
44        let config = match EngineConfig::from_file(config_path) {
45            Ok(config) => config,
46            Err(e) => {
47                error!("Failed to load configuration file: {}", e);
48                process::exit(1);
49            }
50        };
51
52        // If you just verify the configuration, exit it
53        if matches.get_flag("validate") {
54            info!("The config is validated.");
55            return Ok(());
56        }
57        self.config = Some(config);
58        Ok(())
59    }
60    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
61        // Initialize the logging system
62        let config = self.config.clone().unwrap();
63        init_logging(&config);
64        let engine = Engine::new(config);
65        engine.run().await?;
66        Ok(())
67    }
68}
69fn init_logging(config: &EngineConfig) -> () {
70    let log_level = if let Some(logging) = &config.logging {
71        match logging.level.as_str() {
72            "trace" => Level::TRACE,
73            "debug" => Level::DEBUG,
74            "info" => Level::INFO,
75            "warn" => Level::WARN,
76            "error" => Level::ERROR,
77            _ => Level::INFO,
78        }
79    } else {
80        Level::INFO
81    };
82
83    let subscriber = FmtSubscriber::builder().with_max_level(log_level).finish();
84
85    tracing::subscriber::set_global_default(subscriber)
86        .expect("You can't set a global default log subscriber");
87}