pipebase/context/
print.rs

1use super::StoreContext;
2use crate::common::{ConfigInto, Context, FromConfig, FromPath, Period, PipeContext, State};
3use async_trait::async_trait;
4use serde::Deserialize;
5use std::collections::HashMap;
6use std::time::Duration;
7use tokio::time::{sleep, Interval};
8use tracing::info;
9
10#[derive(Deserialize)]
11pub struct ContextPrinterConfig {
12    pub interval: Period,
13    pub delay: Option<Period>,
14}
15
16impl FromPath for ContextPrinterConfig {}
17
18#[async_trait]
19impl ConfigInto<ContextPrinter> for ContextPrinterConfig {}
20
21pub struct ContextPrinter {
22    interval: Interval,
23    delay: Duration,
24    contexts: HashMap<String, std::sync::Arc<Context>>,
25}
26
27#[async_trait]
28impl FromConfig<ContextPrinterConfig> for ContextPrinter {
29    async fn from_config(config: ContextPrinterConfig) -> anyhow::Result<Self> {
30        let delay = match config.delay {
31            Some(period) => period.into(),
32            None => Duration::from_micros(0),
33        };
34        Ok(ContextPrinter {
35            interval: tokio::time::interval(config.interval.into()),
36            delay,
37            contexts: HashMap::new(),
38        })
39    }
40}
41
42#[async_trait]
43impl StoreContext<ContextPrinterConfig> for ContextPrinter {
44    fn store_context(&mut self, pipe_name: String, context: std::sync::Arc<Context>) {
45        self.contexts.insert(pipe_name, context);
46    }
47
48    fn load_context(&self, pipe_name: &str) -> Option<&std::sync::Arc<Context>> {
49        self.contexts.get(pipe_name)
50    }
51
52    async fn run(&mut self) -> anyhow::Result<()> {
53        sleep(self.delay).await;
54        loop {
55            self.interval.tick().await;
56            let mut done: usize = 0;
57            for (pipe_name, ctx) in &self.contexts {
58                let state = &ctx.get_state();
59                let total_run = ctx.get_total_run();
60                let failure_run = ctx.get_failure_run();
61                let display = PipeContext::new(
62                    pipe_name.to_owned(),
63                    state.to_owned(),
64                    total_run,
65                    failure_run,
66                );
67                print!("{}", display);
68                if state == &State::Done {
69                    done += 1;
70                }
71            }
72            if done == self.contexts.len() {
73                info!("all pipe in Done state, exit context printer");
74                break;
75            }
76        }
77        Ok(())
78    }
79}