pipebase/context/
print.rs1use super::StoreContext;
2use crate::common::{ConfigInto, Context, FromConfig, FromPath, Period, PipeContext, State};
3use async_trait::async_trait;
4use serde::Deserialize;
5use std::collections::HashMap;
6use std::time::Duration;
7use tokio::time::{sleep, Interval};
8use tracing::info;
9
10#[derive(Deserialize)]
11pub struct ContextPrinterConfig {
12 pub interval: Period,
13 pub delay: Option<Period>,
14}
15
16impl FromPath for ContextPrinterConfig {}
17
18#[async_trait]
19impl ConfigInto<ContextPrinter> for ContextPrinterConfig {}
20
21pub struct ContextPrinter {
22 interval: Interval,
23 delay: Duration,
24 contexts: HashMap<String, std::sync::Arc<Context>>,
25}
26
27#[async_trait]
28impl FromConfig<ContextPrinterConfig> for ContextPrinter {
29 async fn from_config(config: ContextPrinterConfig) -> anyhow::Result<Self> {
30 let delay = match config.delay {
31 Some(period) => period.into(),
32 None => Duration::from_micros(0),
33 };
34 Ok(ContextPrinter {
35 interval: tokio::time::interval(config.interval.into()),
36 delay,
37 contexts: HashMap::new(),
38 })
39 }
40}
41
42#[async_trait]
43impl StoreContext<ContextPrinterConfig> for ContextPrinter {
44 fn store_context(&mut self, pipe_name: String, context: std::sync::Arc<Context>) {
45 self.contexts.insert(pipe_name, context);
46 }
47
48 fn load_context(&self, pipe_name: &str) -> Option<&std::sync::Arc<Context>> {
49 self.contexts.get(pipe_name)
50 }
51
52 async fn run(&mut self) -> anyhow::Result<()> {
53 sleep(self.delay).await;
54 loop {
55 self.interval.tick().await;
56 let mut done: usize = 0;
57 for (pipe_name, ctx) in &self.contexts {
58 let state = &ctx.get_state();
59 let total_run = ctx.get_total_run();
60 let failure_run = ctx.get_failure_run();
61 let display = PipeContext::new(
62 pipe_name.to_owned(),
63 state.to_owned(),
64 total_run,
65 failure_run,
66 );
67 print!("{}", display);
68 if state == &State::Done {
69 done += 1;
70 }
71 }
72 if done == self.contexts.len() {
73 info!("all pipe in Done state, exit context printer");
74 break;
75 }
76 }
77 Ok(())
78 }
79}