chewdata 3.6.1

Extract Transform and Load data
Documentation
//! A step is a simple action.
pub mod eraser;
pub mod generator;
pub mod reader;
pub mod referential;
pub mod transformer;
pub mod validator;
pub mod writer;

use crate::helper::string::DisplayOnlyForDebugging;
use crate::{Context, DataResult};
use async_channel::{Receiver, Sender};
use async_stream::stream;
use async_trait::async_trait;
use eraser::Eraser;
use futures::Stream;
use reader::Reader;
use serde::Deserialize;
use smol::stream;
use std::{io, pin::Pin};
use transformer::Transformer;
use validator::Validator;
use writer::Writer;

use self::generator::Generator;

#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "type")]
pub enum StepType {
    #[serde(rename = "reader")]
    #[serde(alias = "read")]
    #[serde(alias = "r")]
    Reader(Reader),
    #[serde(rename = "writer")]
    #[serde(alias = "write")]
    #[serde(alias = "w")]
    Writer(Writer),
    #[serde(rename = "transformer")]
    #[serde(alias = "transform")]
    #[serde(alias = "t")]
    Transformer(Transformer),
    #[serde(rename = "eraser")]
    #[serde(alias = "erase")]
    #[serde(alias = "truncate")]
    #[serde(alias = "e")]
    Eraser(Eraser),
    #[serde(rename = "validator")]
    #[serde(alias = "validate")]
    #[serde(alias = "v")]
    Validator(Validator),
    #[serde(rename = "generator")]
    #[serde(alias = "g")]
    Generator(Generator),
}

impl StepType {
    pub fn step_inner(self) -> Box<dyn Step> {
        match self {
            StepType::Reader(step) => Box::new(step),
            StepType::Writer(step) => Box::new(step),
            StepType::Transformer(step) => Box::new(step),
            StepType::Eraser(step) => Box::new(step),
            StepType::Validator(step) => Box::new(step),
            StepType::Generator(step) => Box::new(step),
        }
    }
    pub fn step(&self) -> &dyn Step {
        match self {
            StepType::Reader(ref step) => step,
            StepType::Writer(ref step) => step,
            StepType::Transformer(ref step) => step,
            StepType::Eraser(ref step) => step,
            StepType::Validator(ref step) => step,
            StepType::Generator(ref step) => step,
        }
    }
    pub fn step_mut(&mut self) -> &mut dyn Step {
        match self {
            StepType::Reader(ref mut step) => step,
            StepType::Writer(ref mut step) => step,
            StepType::Transformer(ref mut step) => step,
            StepType::Eraser(ref mut step) => step,
            StepType::Validator(ref mut step) => step,
            StepType::Generator(ref mut step) => step,
        }
    }
}

#[async_trait]
pub trait Step: Send + Sync + StepClone {
    async fn exec(&self) -> io::Result<()>;
    fn number(&self) -> usize {
        1
    }
    // The maximum number of records that this step can hold in memory at the same time.
    fn record_limit(&self) -> usize {
        100
    }
    fn name(&self) -> String {
        "default".to_string()
    }
    fn set_receiver(&mut self, receiver: Receiver<Context>);
    fn receiver(&self) -> Option<&Receiver<Context>>;
    fn set_sender(&mut self, sender: Sender<Context>);
    fn sender(&self) -> Option<&Sender<Context>>;
    async fn send(&self, context: &Context) {
        if let Some(sender) = self.sender() {
            send(sender, context).await
        }
    }
    async fn receive<'step>(&'step self) -> Pin<Box<dyn Stream<Item = Context> + Send + 'step>> {
        match self.receiver() {
            Some(receiver) => receive(receiver).await,
            None => Box::pin(stream::empty::<Context>()),
        }
    }
}

pub(crate) async fn send(sender: &Sender<Context>, context: &Context) {
    match sender.send(context.clone()).await {
        Ok(_) => {
            trace!(
                context = context.display_only_for_debugging(),
                "Context sended in the channel"
            )
        }
        Err(e) => {
            trace!(
                error = format!("{:?}", e).as_str(),
                "The channel is disconnected. the step can't send any context",
            );
        }
    }
}

pub(crate) async fn receive<'step>(
    receiver: &'step Receiver<Context>,
) -> Pin<Box<dyn Stream<Item = Context> + Send + 'step>> {
    Box::pin(stream! {
        loop {
            match receiver.recv().await {
                Ok(context_received) => {
                    trace!(
                        context = context_received.display_only_for_debugging(),
                        "A new context received from the channel"
                    );

                    yield context_received;
                },
                Err(_) => {
                    trace!("The channel is disconnected. the step can't receive any context");
                    break;
                }
            };
        }
    })
}

pub trait StepClone {
    fn clone_box(&self) -> Box<dyn Step>;
}

impl<T> StepClone for T
where
    T: 'static + Step + Clone,
{
    fn clone_box(&self) -> Box<dyn Step> {
        Box::new(self.clone())
    }
}

impl Clone for Box<dyn Step> {
    fn clone(&self) -> Box<dyn Step> {
        self.clone_box()
    }
}