1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
/*! # rettle This library is a multithreaded ETL (**E**xtract, **T**ransfrom, **L**oad), with inspiration drawn from [Keras](https://keras.io/), to allow a "Brew Master" to define any order of operations for data transformations and outputs. ## Types rettle has the following Types to be used in any project to "Brew" data: - **Pot:** container that holds the set of instructions for data sources, sinks, and transforms (*See Ingredient Types below*) - **Brewery:** manager that holds the brewers and sends them jobs and the initial state of tea to be processed - **Brewer:** worker that brews the Tea ## Traits - **Tea:** inherited by custom data struct defined that will be transformed in the ETL pipeline - **Ingredient:** defines the steps that can be included in the ETL recipe - **Argument:** defines additional params that an Ingredient operation can use (Optional) ## Ingredient Types - **Fill:** data input source - **Transfuse:** combine data from multiple sources defined before this step *Not Implemented Yet* - **Steep:** data transformation step - **Skim:** remove a field (or Tea object) *Not Implemented Yet* - **Pour:** data output destination ## Using rettle In your custom project you first need to define the custom "Tea" struct that will be created by the `Fill` Ingredient. Example: ```ignore pub struct TextTea { pub x: i32, pub str_val: String, pub y: bool, } ``` Plus implement the `Tea` trait methods. Example: ```ignore impl Tea for TextTea { fn as_any(&self) -> &dyn Any { self } } ``` Next you can create a new `Pot` struct and supply it with sources and ingredients before calling it's `brew()` method to kick off the brewing process. Ingredients can be supplied with Optional `Argument` trait structs to pass additional runtime parameters used by your custom filters. Optional Steep Argument Example: ```ignore pub struct SteepArgs { pub increment: i32, } impl Argument for SteepArgs { fn as_any(&self) -> &dyn Any { self } } ``` Finally a `Brewery` struct must be created to specify the number of `Brewers` (threads) to run the code, and a `start_time` value to provide elapsed run time metrics. `Fill` operations collect and pass the `Tea` objects to be worked on to the `Brewery` for it to be processed by the `Brewers`. ### Example Project Code ```ignore fn main() { // Initialize variables let start_time = Instant::now(); let mut new_pot = Pot::new(); let brewery = Brewery::new(2, start_time); let steep_args = SteepArgs { increment: 10000 }; // Add source to pot new_pot.add_source(Box::new(Fill{ name: String::from("fake_tea"), source: String::from("hardcoded"), computation: Box::new(|_args, brewery, recipe| { let total_data = 1000000; let batch_size = 200; let num_iterations = total_data / batch_size; println!("Testing {} iterations", total_data); for _ in 0 .. num_iterations { let mut tea_batch = Vec::with_capacity(batch_size); for _ in 0 .. batch_size { tea_batch.push(Box::new(TextTea::default()) as Box<dyn Tea + Send>); } let recipe = Arc::clone(&recipe); brewery.take_order(|| { make_tea(tea_batch, recipe); }); } }), params: None, })); // Add ingredients to pot new_pot.add_ingredient(Box::new(Steep{ name: String::from("steep1"), computation: Box::new(|tea_batch, args| { tea_batch .into_iter() .map(|tea| { let tea = tea.as_any().downcast_ref::<TextTea>().unwrap(); let mut new_tea = tea.clone(); match args { None => panic!("No params passed, not editing object!"), Some(box_args) => { let box_args = box_args.as_any().downcast_ref::<SteepArgs>().unwrap(); new_tea.x = new_tea.x - box_args.increment; } } Box::new(new_tea) as Box<dyn Tea + Send> }) .collect() }), params: Some(Box::new(steep_args)), })); new_pot.add_ingredient(Box::new(Pour{ name: String::from("pour1"), computation: Box::new(|tea_batch, _args| { tea_batch .into_iter() .map(|tea| { println!("Final Tea: {:?}", tea.as_any().downcast_ref::<TextTea>().unwrap()); let tea = tea.as_any().downcast_ref::<TextTea>().unwrap(); let same_tea = TextTea { x: tea.x, str_val: String::from(&tea.str_val[..]), y: tea.y }; Box::new(same_tea) as Box<dyn Tea + Send> }) .collect() }), params: None, })); // Process Tea new_pot.brew(&brewery); // Display information brewery.get_brewer_info(); println!("Number of sources: {}", new_pot.get_sources().len()); println!("Number of steps: {}", new_pot.get_recipe().read().unwrap().len()); } ``` ### Ingredient Crates The community can add Ingredient crates that can be used along with this crate to simplify adding ingredients for common integrations or transformations. Some sample crates include: - [cstea](https://github.com/slaterb1/cstea): Fill/Pour integrations for csv files - [elastictea](https://github.com/slaterb1/elastictea): Fill/Pour integrations for Elasticsearch - [logtea](https://github.com/slaterb1/logtea): Fill integration for log files */ pub mod brewery; pub mod tea; pub mod ingredient; pub mod source; pub mod pot; // Re-export main structs and Traits for API convenience. pub use self::pot::Pot; pub use self::brewery::{Brewery, make_tea}; pub use self::tea::Tea; pub use self::ingredient::{Fill, Steep, Pour, Argument, Ingredient}; pub use self::source::Source;