chewdata 3.6.1

Extract Transform and Load data
Documentation
//! Generate an empty [`crate::DataResult`] that you can enrich with the [`crate::step::Transformer`].
//!
//! It possible to duplicate input data and enrich them after.
//!
//! ### Actions
//!
//! 1 - Get a [`crate::Context`] from the input queue.  
//! 2 - Extract the [`crate::DataResult`] from the [`crate::Context`].  
//! 3 - Clone the current [`crate::Context`] or it create a new one if empty.  
//! 4 - Push the [`crate::Context`] into the output queue.  
//! 5 - Go to the step 3 n times.  
//! 6 - Go to step 1 until the input queue is not empty.  
//!
//! ### Configuration
//!
//! | key          | alias | Description                                                                     | Default Value | Possible Values                  |
//! | ------------ | ----- | ------------------------------------------------------------------------------- | ------------- | -------------------------------- |
//! | type         | -     | Required in order to use generator step                                         | `generator`   | `generator` / `g`                |
//! | name         | alias | Name step                                                                       | `null`        | Auto generate alphanumeric value |
//! | data_type    | data  | Type of data used for the transformation. skip other data type                  | `ok`          | `ok` / `err`                     |
//! | record_limit  | -   | Maximum number of records that this step can hold in memory at the same time.     | `100`        | unsigned number                              |
//!
//! ### Examples
//!
//! ```json
//! [
//!     {
//!         "type": "generator",
//!         "name": "my_generator",
//!         "data_type": "ok",
//!         "record_limit": 1000,
//!     },
//!     {
//!         "type": "transformer",
//!         "actions": [
//!             {
//!                 "field":"firstname",
//!                 "pattern": "{{ fake_first_name() }}"
//!             },
//!             {
//!                 "field":"lastname",
//!                 "pattern": "{{ fake_last_name() }}"
//!             },
//!             {
//!                 "field":"city",
//!                 "pattern": "{{ fake_city() }}"
//!             },
//!             {
//!                 "field":"password",
//!                 "pattern": "{{ fake_password(min = 5, max = 10) }}"
//!             },
//!             {
//!                 "field":"color",
//!                 "pattern": "{{ fake_color_hex() }}"
//!             }
//!         ]
//!     },
//!     {
//!         "type": "writer"
//!     }
//! ]
//! ```
//!
//! No input.
//!
//! output:
//!
//! ```json
//! [
//!     {"firstname": "my firstname", "lastname": "my lastname", "city": "my city", "password": "my password", "color": "my color"},
//!     ...
//! ]
//! ```
use crate::step::Step;
use crate::Context;
use crate::DataResult;
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use smol::stream::StreamExt;
use serde::Deserialize;
use serde_json::Value;
use std::io;
use uuid::Uuid;

#[derive(Debug, Deserialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Generator {
    #[serde(alias = "alias")]
    pub name: String,
    #[serde(alias = "data")]
    pub data_type: String,
    #[serde(alias = "batch")]
    #[serde(alias = "size")]
    pub record_limit: usize,
    #[serde(skip)]
    pub receiver: Option<Receiver<Context>>,
    #[serde(skip)]
    pub sender: Option<Sender<Context>>,
}

impl Default for Generator {
    fn default() -> Self {
        let uuid = Uuid::new_v4();
        Generator {
            name: uuid.simple().to_string(),
            data_type: DataResult::OK.to_string(),
            record_limit: 1,
            receiver: None,
            sender: None,
        }
    }
}

#[async_trait]
impl Step for Generator {
    /// See [`Step::set_receiver`] for more details.
    fn set_receiver(&mut self, receiver: Receiver<Context>) {
        self.receiver = Some(receiver);
    }
    /// See [`Step::receiver`] for more details.
    fn receiver(&self) -> Option<&Receiver<Context>> {
        self.receiver.as_ref()
    }
    /// See [`Step::set_sender`] for more details.
    fn set_sender(&mut self, sender: Sender<Context>) {
        self.sender = Some(sender);
    }
    /// See [`Step::sender`] for more details.
    fn sender(&self) -> Option<&Sender<Context>> {
        self.sender.as_ref()
    }
    #[instrument(name = "generator::exec",
        skip(self),
        fields(name=self.name, 
        data_type=self.data_type,
        record_limit=self.record_limit,
    ))]
    async fn exec(&self) -> io::Result<()> {
        info!("Start generating data...");
        
        let mut receiver_stream = self.receive().await;
        let mut has_data_been_received = false;
        let record_limit = self.record_limit;

        while let Some(context_received) = receiver_stream.next().await {
            if !has_data_been_received {
                has_data_been_received = true;
            }

            if !context_received.input().is_type(self.data_type.as_ref()) {
                trace!("Handles only this data type");
                self.send(&context_received).await;
                continue;
            }

            for _ in 0..record_limit {
                let mut context = context_received.clone();
                context.insert_step_result(self.name(), context.input());
                self.send(&context).await;
            }
        }

        if !has_data_been_received {
            for _ in 0..record_limit {
                let context = Context::new(self.name(), DataResult::Ok(Value::Null));
                self.send(&context).await;
            }
        }

        trace!(
            "Stops generating data and sending context in the channel"
        );

        Ok(())
    }
    fn name(&self) -> String {
        self.name.clone()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use macro_rules_attribute::apply;
    use smol_macros::test;
    use serde_json::Value;
    use std::io::{Error, ErrorKind};
    use std::thread;

    #[apply(test!)]
    async fn exec_with_different_data_result_type() {
        let mut step = Generator::default();
        let (sender_input, receiver_input) = async_channel::unbounded();
        let (sender_output, receiver_output) = async_channel::unbounded();
        let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
        let error = Error::new(ErrorKind::InvalidData, "My error");
        let context = Context::new("before".to_string(), DataResult::Err((data, error)));
        let expected_context = context.clone();

        thread::spawn(move || {
            sender_input.try_send(context).unwrap();
        });

        step.receiver = Some(receiver_input);
        step.sender = Some(sender_output);
        step.exec().await.unwrap();

        assert_eq!(expected_context, receiver_output.recv().await.unwrap());
    }
    #[apply(test!)]
    async fn exec_with_same_data_result_type() {
        let mut step = Generator::default();
        let (sender_input, receiver_input) = async_channel::unbounded();
        let (sender_output, receiver_output) = async_channel::unbounded();
        let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
        let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
        let mut expected_context = context.clone();
        expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data));

        thread::spawn(move || {
            sender_input.try_send(context).unwrap();
        });

        step.receiver = Some(receiver_input);
        step.sender = Some(sender_output);
        step.name = "my_step".to_string();
        step.exec().await.unwrap();

        assert_eq!(expected_context, receiver_output.recv().await.unwrap());
    }
}