Skip to main content

rettle/
lib.rs

1/*!
2# rettle
3This library is a multithreaded ETL (**E**xtract, **T**ransfrom, **L**oad), with inspiration drawn from [Keras](https://keras.io/), to allow a "Brew Master" to define any order of operations for data transformations and outputs.
4
5## Types
6rettle has the following Types to be used in any project to "Brew" data:
7- **Pot:** container that holds the set of instructions for data sources, sinks, and transforms (*See Ingredient Types below*)
8- **Brewery:** manager that holds the brewers and sends them jobs and the initial state of tea to be processed
9- **Brewer:** worker that brews the Tea
10
11## Traits
12- **Ingredient:** defines the steps that can be included in the ETL recipe
13- **Argument:** defines additional params that an Ingredient operation can use (Optional)
14
15## Ingredient Types
16- **Fill:** data input source
17- **Transfuse:** combine data from multiple sources defined before this step *Not Implemented Yet*
18- **Steep:** data transformation step
19- **Skim:** remove a field (or Tea object) *Not Implemented Yet*
20- **Pour:** data output destination
21
22## Using rettle
23In your custom project you first need to define the custom "Tea" struct that will be created by the `Fill` Ingredient.
24
25Example:
26```ignore
27pub struct TextTea {
28    pub x: i32,
29    pub str_val: String,
30    pub y: bool,
31}
32```
33
34Next you can create a new `Pot` struct and supply it with sources and ingredients before calling it's `brew()` method to kick off the brewing process. Ingredients can be supplied with Optional `Argument` trait structs to pass additional runtime parameters used by your custom filters. 
35
36Optional Steep Argument Example:
37```ignore
38pub struct SteepArgs {
39    pub increment: i32,
40}
41
42impl Argument for SteepArgs {
43    fn as_any(&self) -> &dyn Any {
44        self
45    }
46}
47```
48
49Finally a `Brewery` struct must be created to specify the number of `Brewers` (threads) to run the code, and a `start_time` value to provide elapsed run time metrics.
50
51`Fill` operations collect and pass the `Tea` objects to be worked on to the `Brewery` for it to be processed by the `Brewers`.
52
53### Example Project Code
54```ignore
55fn main() {
56    // Initialize variables
57    let start_time = Instant::now();
58    let mut new_pot = Pot::new();
59    let brewery = Brewery::new(2, start_time);
60    let steep_args = SteepArgs { increment: 10000 };
61    
62    // Add source to pot
63    new_pot.add_source(Box::new(Fill{
64        name: String::from("fake_tea"),
65        source: String::from("hardcoded"),
66        computation: Box::new(|_args, brewery, recipe| {
67            let total_data = 1000000;
68            let batch_size = 200;
69            let num_iterations = total_data / batch_size;
70            println!("Testing {} iterations", total_data);
71            for _ in 0 .. num_iterations {
72                let mut tea_batch = Vec::with_capacity(batch_size);
73                for _ in 0 .. batch_size {
74                    tea_batch.push(Box::new(TextTea::default()) as Box<dyn Tea + Send>);
75                }
76                let recipe = Arc::clone(&recipe);
77                brewery.take_order(|| {
78                    make_tea(tea_batch, recipe);
79                });
80            }
81        }),
82        params: None,
83    }));
84    
85    // Add ingredients to pot
86    new_pot.add_ingredient(Box::new(Steep{
87        name: String::from("steep1"),
88        computation: Box::new(|tea_batch, args| {
89            tea_batch
90                .into_iter()
91                .map(|tea| {
92                    let tea = tea.as_any().downcast_ref::<TextTea>().unwrap();
93                    let mut new_tea = tea.clone();
94                    match args {
95                        None => panic!("No params passed, not editing object!"),
96                        Some(box_args) => {
97                            let box_args = box_args.as_any().downcast_ref::<SteepArgs>().unwrap();
98                            new_tea.x = new_tea.x - box_args.increment;
99                        }
100                    }
101                    Box::new(new_tea) as Box<dyn Tea + Send>
102                })
103                .collect()
104        }),
105        params: Some(Box::new(steep_args)),
106    }));
107    new_pot.add_ingredient(Box::new(Pour{
108        name: String::from("pour1"),
109        computation: Box::new(|tea_batch, _args| {
110            tea_batch
111                .into_iter()
112                .map(|tea| {
113                    println!("Final Tea: {:?}", tea.as_any().downcast_ref::<TextTea>().unwrap());
114                    let tea = tea.as_any().downcast_ref::<TextTea>().unwrap();
115                    let same_tea = TextTea { x: tea.x, str_val: String::from(&tea.str_val[..]), y: tea.y };
116                    Box::new(same_tea) as Box<dyn Tea + Send>
117                })
118                .collect()
119        }),
120        params: None,
121    }));
122    
123    // Process Tea
124    new_pot.brew(&brewery);
125    
126    // Display information
127    brewery.get_brewer_info();
128    println!("Number of sources: {}", new_pot.get_sources().len());
129    println!("Number of steps: {}", new_pot.get_recipe().read().unwrap().len());
130}
131```
132
133### Ingredient Crates
134The community can add Ingredient crates that can be used along with this crate to simplify adding ingredients for common integrations or transformations. Some sample crates include:  
135- [cstea](https://crates.io/crates/cstea): Fill/Pour integrations for csv files
136- [elastictea](https://crates.io/crates/elastictea): Fill/Pour integrations for Elasticsearch
137- [logtea](https://crates.io/crates/logtea): Fill integration for log files
138*/
139
140pub mod brewery;
141pub mod ingredient;
142pub mod source;
143pub mod pot;
144
145// Re-export main structs and Traits for API convenience.
146pub use self::pot::Pot;
147pub use self::brewery::{Brewery, make_tea};
148pub use self::ingredient::{Fill, Steep, Pour, Argument, Ingredient};
149pub use self::source::Source;