1use rettle::{
2 Ingredient,
3 Argument,
4 Fill,
5 Brewery,
6 make_tea,
7};
8
9use std::sync::{Arc, RwLock};
10use std::io::{BufReader};
11use std::io::prelude::*;
12use std::fs::File;
13use std::any::Any;
14use std::fmt::Debug;
15
16use nom::IResult;
17
18pub struct FillLogArg<T> where
21 T: Send + Debug + Sized + 'static,
22{
23 filepath: String,
25 batch_size: usize,
26 parser: fn(&str) -> IResult<&str, T>,
27}
28
29impl<T> FillLogArg<T> where
30 T: Send + Debug + Sized + 'static,
31{
32 pub fn new(filepath: &str, batch_size: usize, parser: fn(&str) -> IResult<&str, T>) -> FillLogArg<T> {
41 let filepath = String::from(filepath);
42 FillLogArg { filepath, batch_size, parser }
43 }
44}
45
46impl<T> Argument for FillLogArg<T> where
47 T: Send + Debug + Sized + 'static,
48{
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52}
53
54pub struct FillLogTea {}
57
58impl FillLogTea {
59 pub fn new<T: Send + Debug + Sized + 'static>(name: &str, source: &str, params: FillLogArg<T>) -> Box<Fill<T>> {
68 Box::new(Fill {
69 name: String::from(name),
70 source: String::from(source),
71 computation: Box::new(|args, brewery, recipe| {
72 fill_from_log::<T>(args, brewery, recipe);
73 }),
74 params: Some(Box::new(params))
75 })
76 }
77}
78
79fn call_brewery<T: Send + 'static>(brewery: &Brewery, recipe: Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>, tea_batch: Vec<T>) {
87 brewery.take_order(|| {
88 make_tea(tea_batch, recipe);
89 });
90}
91
92fn fill_from_log<T: Send + Debug + Sized + 'static>(args: &Option<Box<dyn Argument + Send>>, brewery: &Brewery, recipe: Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>) {
102 match args {
103 None => (),
104 Some(box_args) => {
105 let box_args = box_args.as_any().downcast_ref::<FillLogArg<T>>().unwrap();
107
108 println!("{:?}", &box_args.filepath);
110 let f = File::open(&box_args.filepath);
111
112 let reader = match f {
113 Ok(f) => {
114 BufReader::new(f)
115 },
116 Err(e) => {
117 println!("Failed opening file! Error: {:}", e);
118 return
119 },
120 };
121
122 let parser = &box_args.parser;
124
125 let mut tea_batch: Vec<T> = Vec::with_capacity(box_args.batch_size);
127 for line in reader.lines() {
128 let line = line.unwrap();
129 if tea_batch.len() == box_args.batch_size {
131 let recipe = Arc::clone(&recipe);
132 call_brewery(brewery, recipe, tea_batch);
133 tea_batch = Vec::with_capacity(box_args.batch_size);
134 }
135 let (_input, tea) = parser(&line).unwrap();
136 tea_batch.push(tea);
137 }
138 let recipe = Arc::clone(&recipe);
139 call_brewery(brewery, recipe, tea_batch);
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::{FillLogArg, FillLogTea};
147 use rettle::Pot;
148 use nom::IResult;
149
150 #[derive(Default, Clone, Debug)]
151 struct TestLogTea {
152 log_type: String,
153 datetime: String,
154 msg: String,
155 }
156
157 fn test_parser(input: &str) -> IResult<&str, TestLogTea> {
158 Ok((input, TestLogTea::default()))
159 }
160
161 #[test]
162 fn create_log_args() {
163 let log_args = FillLogArg::new("fixtures/test.csv", 50, test_parser);
164 assert_eq!(log_args.filepath, "fixtures/test.csv");
165 assert_eq!(log_args.batch_size, 50);
166 }
167
168 #[test]
169 fn create_fill_logtea() {
170 let log_args = FillLogArg::new("fixtures/test.csv", 50, test_parser);
171 let fill_logtea = FillLogTea::new::<TestLogTea>("test_log", "fixture", log_args);
172 let new_pot = Pot::new()
173 .add_source(fill_logtea);
174 assert_eq!(new_pot.get_sources().len(), 1);
175 assert_eq!(new_pot.get_sources()[0].get_name(), "test_log");
176 }
177}