Skip to main content

logtea/
fill.rs

1use rettle::{
2    Ingredient, 
3    Argument,
4    Fill,
5    Brewery,
6    make_tea,
7};
8
9use std::sync::{Arc, RwLock};
10use std::io::{BufReader};
11use std::io::prelude::*;
12use std::fs::File;
13use std::any::Any;
14use std::fmt::Debug;
15
16use nom::IResult;
17
18///
19/// Ingredient params for FillLogTea.
20pub struct FillLogArg<T> where
21    T: Send + Debug + Sized + 'static,
22{
23    /// The filepath to the csv that will be processed.
24    filepath: String,
25    batch_size: usize,
26    parser: fn(&str) -> IResult<&str, T>,
27}
28
29impl<T> FillLogArg<T> where
30    T: Send + Debug + Sized + 'static,
31{
32    ///
33    /// Returns a FillLogArg to be used as params in FillLogTea.
34    ///
35    /// # Arguments
36    ///
37    /// * `filepath` - filepath for log file to load.
38    /// * `batch_size` - number of lines to process at a time.
39    /// * `parser` - nom parser to parse data from lines
40    pub fn new(filepath: &str, batch_size: usize, parser: fn(&str) -> IResult<&str, T>) -> FillLogArg<T> {
41        let filepath = String::from(filepath);
42        FillLogArg { filepath, batch_size, parser }
43    }
44}
45
46impl<T> Argument for FillLogArg<T> where
47    T: Send + Debug + Sized + 'static,
48{
49    fn as_any(&self) -> &dyn Any {
50        self
51    }
52}
53
54///
55/// Wrapper to simplifiy the creation of the Fill Ingredient to be used in the rettle Pot.
56pub struct FillLogTea {}
57
58impl FillLogTea {
59    ///
60    /// Returns the Fill Ingredient to be added to the `rettle` Pot.
61    ///
62    /// # Arguments
63    ///
64    /// * `name` - Ingredient name
65    /// * `source` - Ingredient source
66    /// * `params` - Params data structure holding the `filepath`, `batch_size`, and `parser`
67    pub fn new<T: Send + Debug + Sized + 'static>(name: &str, source: &str, params: FillLogArg<T>) -> Box<Fill<T>> {
68        Box::new(Fill {
69            name: String::from(name),
70            source: String::from(source),
71            computation: Box::new(|args, brewery, recipe| {
72                fill_from_log::<T>(args, brewery, recipe);
73            }),
74            params: Some(Box::new(params))
75        })
76    }
77}
78
79/// Helper function that sends to batch request to Brewers for processing.
80///
81/// # Arguments
82///
83/// * `brewery` - Brewery that processes the data.
84/// * `recipe` - Recipe for the ETL used by the Brewery.
85/// * `tea_batch` - Current batch to be sent and processed
86fn call_brewery<T: Send + 'static>(brewery: &Brewery, recipe: Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>, tea_batch: Vec<T>) {
87    brewery.take_order(|| {
88        make_tea(tea_batch, recipe);
89    });
90}
91
92///
93/// Implements the log file read, parse to specified data struct, and passes the data to the
94/// brewery for processing.
95///
96/// # Arguments
97///
98/// * `args` - Params specifying the filepath, batch_size, and custom parser.
99/// * `brewery` - Brewery that processes the data.
100/// * `recipe` - Recipe for the ETL used by the Brewery.
101fn fill_from_log<T: Send + Debug + Sized + 'static>(args: &Option<Box<dyn Argument + Send>>, brewery: &Brewery, recipe: Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>) {
102    match args {
103        None => (),
104        Some(box_args) => {
105            // Unwrap params.
106            let box_args = box_args.as_any().downcast_ref::<FillLogArg<T>>().unwrap();
107            
108            // Initialize reader with specified file from path.
109            println!("{:?}", &box_args.filepath);
110            let f = File::open(&box_args.filepath); 
111
112            let reader = match f {
113                Ok(f) => {
114                    BufReader::new(f)
115                },
116                Err(e) => {
117                    println!("Failed opening file! Error: {:}", e);
118                    return
119                },
120            };
121
122            // Pull out parser function
123            let parser = &box_args.parser;
124            
125            // Iterate over log lines and push data into processer
126            let mut tea_batch: Vec<T> = Vec::with_capacity(box_args.batch_size);
127            for line in reader.lines() {
128                let line = line.unwrap();
129                // Check if batch size has been reached and send to brewers if so.
130                if tea_batch.len() == box_args.batch_size {
131                    let recipe = Arc::clone(&recipe);
132                    call_brewery(brewery, recipe, tea_batch);
133                    tea_batch = Vec::with_capacity(box_args.batch_size);
134                }
135                let (_input, tea) = parser(&line).unwrap();
136                tea_batch.push(tea);
137            }
138            let recipe = Arc::clone(&recipe);
139            call_brewery(brewery, recipe, tea_batch);
140        }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::{FillLogArg, FillLogTea};
147    use rettle::Pot;
148    use nom::IResult;
149
150    #[derive(Default, Clone, Debug)]
151    struct TestLogTea {
152        log_type: String,
153        datetime: String,
154        msg: String,
155    }
156
157    fn test_parser(input: &str) -> IResult<&str, TestLogTea> {
158        Ok((input, TestLogTea::default()))
159    }
160
161    #[test]
162    fn create_log_args() {
163        let log_args = FillLogArg::new("fixtures/test.csv", 50, test_parser);
164        assert_eq!(log_args.filepath, "fixtures/test.csv");
165        assert_eq!(log_args.batch_size, 50);
166    }
167
168    #[test]
169    fn create_fill_logtea() {
170        let log_args = FillLogArg::new("fixtures/test.csv", 50, test_parser);
171        let fill_logtea = FillLogTea::new::<TestLogTea>("test_log", "fixture", log_args);
172        let new_pot = Pot::new()
173            .add_source(fill_logtea);
174        assert_eq!(new_pot.get_sources().len(), 1);
175        assert_eq!(new_pot.get_sources()[0].get_name(), "test_log");
176    }
177}