pipebase 0.2.0

A tokio based runtime library for data integration app
Documentation
use super::StoreContext;
use crate::common::{ConfigInto, Context, FromConfig, FromPath, Period, PipeContext, State};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashMap;
use std::time::Duration;
use tokio::time::{sleep, Interval};
use tracing::info;

#[derive(Deserialize)]
pub struct ContextPrinterConfig {
    pub interval: Period,
    pub delay: Option<Period>,
}

impl FromPath for ContextPrinterConfig {}

#[async_trait]
impl ConfigInto<ContextPrinter> for ContextPrinterConfig {}

pub struct ContextPrinter {
    interval: Interval,
    delay: Duration,
    contexts: HashMap<String, std::sync::Arc<Context>>,
}

#[async_trait]
impl FromConfig<ContextPrinterConfig> for ContextPrinter {
    async fn from_config(config: ContextPrinterConfig) -> anyhow::Result<Self> {
        let delay = match config.delay {
            Some(period) => period.into(),
            None => Duration::from_micros(0),
        };
        Ok(ContextPrinter {
            interval: tokio::time::interval(config.interval.into()),
            delay,
            contexts: HashMap::new(),
        })
    }
}

#[async_trait]
impl StoreContext<ContextPrinterConfig> for ContextPrinter {
    fn store_context(&mut self, pipe_name: String, context: std::sync::Arc<Context>) {
        self.contexts.insert(pipe_name, context);
    }

    fn load_context(&self, pipe_name: &str) -> Option<&std::sync::Arc<Context>> {
        self.contexts.get(pipe_name)
    }

    async fn run(&mut self) -> anyhow::Result<()> {
        sleep(self.delay).await;
        loop {
            self.interval.tick().await;
            let mut done: usize = 0;
            for (pipe_name, ctx) in &self.contexts {
                let state = &ctx.get_state();
                let total_run = ctx.get_total_run();
                let failure_run = ctx.get_failure_run();
                let display = PipeContext::new(
                    pipe_name.to_owned(),
                    state.to_owned(),
                    total_run,
                    failure_run,
                );
                print!("{}", display);
                if state == &State::Done {
                    done += 1;
                }
            }
            if done == self.contexts.len() {
                info!("all pipe in Done state, exit context printer");
                break;
            }
        }
        Ok(())
    }
}