fiddler_cmd/
lib.rs

1//! Fast and flexible data stream processor written in Rust
2//!
3//! Provides a cli for running, linting and testing data streaming pipelines
4//! using a declaritive yaml based configuration for data aggregation and
5//! transformation
6use clap::{Args, Parser};
7use futures::stream::FuturesOrdered;
8use futures::stream::StreamExt;
9use inline_colorization::{color_green, color_red, color_reset};
10use serde::Serialize;
11use std::fs;
12use std::process;
13use tracing_subscriber::filter::{EnvFilter, LevelFilter};
14
15use fiddler::Error;
16use fiddler::Runtime;
17
18mod test;
19
20#[derive(Parser)]
21#[command(name = "fiddler")]
22#[command(bin_name = "fiddler")]
23enum FiddlerCli {
24    Lint(LintArgs),
25    Run(RunArgs),
26    Test(RunArgs),
27}
28
29#[derive(Args)]
30#[command(author, version, about, long_about = None)]
31struct LintArgs {
32    #[arg(short, long)]
33    config: Vec<String>,
34}
35
36#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)]
37enum LogLevel {
38    Info,
39    Debug,
40    Trace,
41    Error,
42    #[default]
43    None,
44}
45
46#[derive(Args)]
47#[command(author, version, about, long_about = None)]
48struct RunArgs {
49    #[arg(short, long)]
50    config: Vec<String>,
51    #[arg(short, long, value_enum, default_value = "none")]
52    log_level: LogLevel,
53}
54
55/// Runs the default arguments to the fiddler command
56pub async fn run() -> Result<(), Error> {
57    match FiddlerCli::parse() {
58        FiddlerCli::Lint(args) => {
59            let mut failures: Vec<String> = Vec::new();
60            for c in args.config {
61                let conf = match fs::read_to_string(&c) {
62                    Ok(f) => f,
63                    Err(e) => {
64                        failures.push(format!("failed {}: {}", c, e));
65                        continue;
66                    }
67                };
68
69                if let Err(e) = Runtime::from_config(&conf).await {
70                    failures.push(format!("failed {}: {}", c, e));
71                    continue;
72                };
73            }
74
75            if failures.is_empty() {
76                println!("{color_green}Configuration is valid{color_reset}");
77                process::exit(0)
78            };
79
80            for f in failures {
81                println!("{color_red}{}{color_reset}", f);
82            }
83
84            process::exit(1);
85        }
86        FiddlerCli::Run(args) => {
87            setup_subscriber(args.log_level);
88
89            let mut environments = Vec::new();
90            for c in args.config {
91                let conf = fs::read_to_string(&c).map_err(|e| {
92                    Error::ConfigurationItemNotFound(format!("cannot read {}: {}", c, e))
93                })?;
94                let env = Runtime::from_config(&conf).await?;
95                environments.push(env);
96            }
97
98            let new_futures =
99                FuturesOrdered::from_iter(environments.iter().map(|e| e.run())).fuse();
100            let future_to_await = new_futures.collect::<Vec<Result<(), Error>>>();
101            futures::pin_mut!(future_to_await);
102            let results = future_to_await.await;
103            for r in results {
104                r?
105            }
106            process::exit(0)
107        }
108        FiddlerCli::Test(args) => {
109            setup_subscriber(args.log_level);
110            test::handle_tests(args.config).await?;
111            Ok(())
112        }
113    }
114}
115
116fn setup_subscriber(arg_log_level: LogLevel) {
117    let log_level = match arg_log_level {
118        LogLevel::Debug => Some(LevelFilter::DEBUG),
119        LogLevel::Error => Some(LevelFilter::ERROR),
120        LogLevel::Info => Some(LevelFilter::INFO),
121        LogLevel::Trace => Some(LevelFilter::TRACE),
122        LogLevel::None => None,
123    };
124
125    if let Some(l) = log_level {
126        let filter = EnvFilter::builder()
127            .with_default_directive(LevelFilter::OFF.into())
128            .from_env()
129            .unwrap()
130            .add_directive(format!("fiddler={}", l).parse().unwrap());
131
132        tracing_subscriber::fmt()
133            .with_env_filter(filter)
134            .compact()
135            .json()
136            .init();
137    };
138}