pipebase 0.2.0

A tokio based runtime library for data integration app
Documentation
use super::HandleError;
use crate::common::{ConfigInto, PipeError, Result};
use tokio::sync::mpsc::Receiver;
use tracing::{error, info};

pub struct ErrorHandler {}

impl ErrorHandler {
    pub async fn run<H, C>(self, config: C, mut rx: Receiver<PipeError>) -> Result<()>
    where
        C: ConfigInto<H> + Send,
        H: HandleError<C>,
    {
        let mut handler = config.config_into().await?;
        info!("error handler run ...");
        while let Some(pipe_error) = rx.recv().await {
            match handler.handle_error(pipe_error).await {
                Ok(_) => continue,
                Err(e) => {
                    error!("error handler error '{:#?}'", e)
                }
            }
        }
        Ok(())
    }
}

#[macro_export]
macro_rules! error_handler {
    () => {{
        ErrorHandler {}
    }};
}

#[macro_export]
macro_rules! run_error_handler {
    ($error_handler:ident, $config:ident, $rx:ident) => {{
        tokio::spawn(async move {
            match $error_handler.run($config, $rx).await {
                Ok(_) => Ok(()),
                Err(err) => {
                    tracing::error!("error handler exit with error {:#?}", err);
                    Err(err)
                }
            }
        })
    }};
}

#[macro_export]
macro_rules! subscribe_error_handler {
    ([$( $pipe:expr ), *], $tx:ident) => {
        {
            $(
                $pipe.subscribe_error($tx.clone());
            )*
        }
    };
}