Skip to main content

rettle/
ingredient.rs

1use crate::brewery::Brewery;
2
3use std::any::Any;
4use std::sync::{Arc, RwLock};
5
6///
7/// Trait given to Box elements added to Pot for pulling, processing, or sending data.
8pub trait Ingredient<T: Send> {
9    ///
10    /// Run computation on batch of Tea.
11    ///
12    /// # Arguements
13    ///
14    /// * `tea_batch` - current tea batch to be processed
15    fn exec(&self, tea_batch: Vec<T>) -> Vec<T>;
16
17    ///
18    /// Print out current step information.
19    fn print(&self); 
20
21    ///
22    /// Used to convert Box<dyn Ingredient> to Any to unwrap Ingredient. 
23    fn as_any(&self) -> &dyn Any;
24
25    ///
26    /// Returns name given to Ingredient.
27    fn get_name(&self) -> &str;
28}
29
30///
31/// Trait given to Box elements that add params to Ingredients.
32pub trait Argument {
33    fn as_any(&self) -> &dyn Any;
34}
35
36///
37/// Ingredient used to import or create Tea used in the Pot.
38pub struct Fill<T: Send> {
39    pub source: String,
40    pub name: String,
41    pub computation: Box<fn(&Option<Box<dyn Argument + Send>>, &Brewery, Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>)>,
42    pub params: Option<Box<dyn Argument + Send>>,
43}
44
45///
46/// Ingredient used to combine Tea pulled from multiple Fill sources. *Not currently implemented*
47pub struct Transfuse;
48
49///
50/// Ingredient used to transform Tea in the Pot.
51pub struct Steep<T: Send> {
52    pub name: String,
53    pub computation: Box<fn(Vec<T>, &Option<Box<dyn Argument + Send>>) -> Vec<T>>, 
54    pub params: Option<Box<dyn Argument + Send>>,
55}
56
57///
58/// Ingredient used to remove fields on Tea in the Pot. *Not currently implemented*
59pub struct Skim<T: Send> {
60    pub name: String,
61    pub computation: Box<fn(Vec<T>, &Option<Box<dyn Argument + Send>>) -> Vec<T>>, 
62    pub params: Option<Box<dyn Argument + Send>>,
63}
64
65///
66/// Ingredient used to send Tea to somewhere else.
67pub struct Pour<T: Send> {
68    pub name: String,
69    pub computation: Box<fn(Vec<T>, &Option<Box<dyn Argument + Send>>) -> Vec<T>>, 
70    pub params: Option<Box<dyn Argument + Send>>,
71}
72
73impl<T: Send> Fill<T> {
74    ///
75    /// Return params, if any, initialized to this step.
76    pub fn get_params(&self) -> &Option<Box<dyn Argument + Send>> {
77        &self.params
78    }
79}
80
81impl<T: Send> Steep<T> {
82    ///
83    /// Return params, if any, initialized to this step.
84    pub fn get_params(&self) -> &Option<Box<dyn Argument + Send>> {
85        &self.params
86    }
87}
88
89impl<T: Send> Skim<T> {
90    ///
91    /// Return params, if any, initialized to this step.
92    pub fn get_params(&self) -> &Option<Box<dyn Argument + Send>> {
93        &self.params
94    }
95}
96
97impl<T: Send> Pour<T> {
98    ///
99    /// Return params, if any, initialized to this step.
100    pub fn get_params(&self) -> &Option<Box<dyn Argument + Send>> {
101        &self.params
102    }
103}
104
105unsafe impl<T: Send> Send for Steep<T> {}
106unsafe impl<T: Send>  Sync for Steep<T> {}
107unsafe impl<T: Send>  Send for Skim<T>{}
108unsafe impl<T: Send>  Sync for Skim<T> {}
109unsafe impl<T: Send>  Send for Pour<T> {}
110unsafe impl<T: Send>  Sync for Pour<T> {}
111
112impl<T: Send + 'static> Ingredient<T> for Steep<T> {
113    fn exec(&self, tea_batch: Vec<T>) -> Vec<T> {
114        (self.computation)(tea_batch, self.get_params())
115    }
116    fn get_name(&self) -> &str {
117        &self.name[..]
118    }
119    fn print(&self) {
120        println!("Current Step: {}", self.get_name());
121    }
122    fn as_any(&self) -> &dyn Any {
123        self
124    }
125}
126
127impl<T: Send + 'static>  Ingredient<T> for Pour<T> {
128    fn get_name(&self) -> &str {
129        &self.name[..]
130    }
131    fn print(&self) {
132        println!("Current Step: {}", self.get_name());
133    }
134    fn as_any(&self) -> &dyn Any {
135        self
136    }
137    fn exec(&self, tea_batch: Vec<T>) -> Vec<T> {
138        (self.computation)(tea_batch, self.get_params())
139    }
140}
141
142// TODO: Implement Ingredient for Fill (add step plus logic to `brewery::make_tea` function)
143// Need to consider if this still makes sense as an Ingredient in the recipe vs just a source...
144
145impl<T: Send + 'static>  Ingredient<T> for Skim<T>  {
146    fn get_name(&self) -> &str {
147        &self.name[..]
148    }
149    fn print(&self) {
150        println!("Current Step: {}", self.get_name());
151    }
152    fn as_any(&self) -> &dyn Any {
153        self
154    }
155    fn exec(&self, tea_batch: Vec<T>) -> Vec<T> {
156        (self.computation)(tea_batch, self.get_params())
157    }
158}
159
160// TODO: Implement Ingredient for Transfuse (add step plus logic to `brewery::make_tea` function)
161
162#[cfg(test)]
163mod tests {
164    use super::super::ingredient::{Fill, Steep, Skim, Pour, Argument, Ingredient};
165    use super::super::source::Source;
166    use std::any::Any;
167    use std::sync::{Arc, RwLock};
168
169    #[derive(Debug, PartialEq, Default, Clone)]
170    struct TestTea {
171        x: Option<i32>,
172    }
173
174    #[derive(Default)]
175    struct TestArgs {
176        pub val: i32
177    }
178
179    impl Argument for TestArgs {
180        fn as_any(&self) -> &dyn Any {
181            self
182        }
183    }
184
185    #[derive(Default)]
186    struct TestSkimArgs {
187        pub field: &'static str
188    }
189
190    impl Argument for TestSkimArgs {
191        fn as_any(&self) -> &dyn Any {
192            self
193        }
194    }
195    
196    #[test]
197    fn create_fill_no_params() {
198        let fill = Fill {
199            name: String::from("test_fill"),
200            source: String::from("text"),
201            computation: Box::new(|_args, _brewery, _recipe: Arc<RwLock<Vec<Box<dyn Ingredient<TestTea> + Send + Sync>>>>| {}),
202            params: None,
203        };
204        assert_eq!(fill.get_name(), "test_fill");
205        assert_eq!(fill.get_source(), "text");
206    }
207
208    #[test]
209    fn create_fill_with_params() {
210        let fill = Fill {
211            name: String::from("test_fill"),
212            source: String::from("text"),
213            computation: Box::new(|_args, _brewery, _recipe: Arc<RwLock<Vec<Box<dyn Ingredient<TestTea> + Send + Sync>>>>| {}),
214            params: Some(Box::new(TestArgs { val: 5 })),
215        };
216        assert_eq!(fill.get_name(), "test_fill");
217        assert_eq!(fill.get_source(), "text");
218    }
219
220    #[test]
221    fn create_steep_no_params() {
222        let steep = Steep {
223            name: String::from("test_steep"),
224            computation: Box::new(|tea: Vec<TestTea>, _args| {
225                tea.into_iter()
226                   .map(|mut tea| {
227                       let new_val = match tea.x {
228                           Some(x) => Some(x + 5),
229                           None => None
230                       };
231                       tea.x = new_val;
232                       tea
233                   })
234                   .collect()
235            }),
236            params: None,
237        };
238        let orig_tea = vec![TestTea { x: Some(0) }];
239        let new_tea = steep.exec(orig_tea.clone());
240        assert_eq!(steep.get_name(), "test_steep");
241        assert_eq!(new_tea[0].x.unwrap(), orig_tea[0].x.unwrap() + 5);
242    }
243
244    #[test]
245    fn create_steep_with_params() {
246        let steep = Steep {
247            name: String::from("test_steep"),
248            computation: Box::new(|tea: Vec<TestTea>, args| {
249                tea.into_iter()
250                   .map(|mut tea| {
251                       match args {
252                           None => println!("Nothing"),
253                           Some(box_args) => {
254                               let box_args = box_args.as_any().downcast_ref::<TestArgs>().unwrap();
255                               let new_val: Option<i32> = match tea.x {
256                                   Some(x) => Some(x + box_args.val),
257                                   None => None
258                               };
259                               tea.x = new_val;
260                           }
261                       }
262                       tea
263                   })
264                   .collect()
265            }),
266            params: Some(Box::new(TestArgs { val: 10 })),
267        };
268        let orig_tea = vec![TestTea { x: Some(0) }];
269        let new_tea = steep.exec(orig_tea.clone());
270        assert_eq!(steep.get_name(), "test_steep");
271        assert_eq!(new_tea[0].x.unwrap(), orig_tea[0].x.unwrap() + 10);
272    }
273
274    #[test]
275    fn create_pour_no_params() {
276        let pour = Pour {
277            name: String::from("test_pour"),
278            computation: Box::new(|tea: Vec<TestTea>, _args| {
279                tea.into_iter()
280                   .map(|tea| {
281                       tea
282                   })
283                   .collect()
284            }),
285            params: None,
286        };
287        let orig_tea = vec![TestTea::default()];
288        let new_tea = pour.exec(orig_tea.clone());
289        assert_eq!(pour.get_name(), "test_pour");
290        assert_eq!(new_tea[0].x, orig_tea[0].x);
291    }
292
293    #[test]
294    fn create_pour_with_params() {
295        let pour = Pour {
296            name: String::from("test_pour"),
297            computation: Box::new(|tea: Vec<TestTea>, args| {
298                tea.into_iter()
299                   .map(|tea| {
300                       match args {
301                           None => println!("Nothing"),
302                           Some(_box_args) => {
303                               let _box_args = _box_args.as_any().downcast_ref::<TestArgs>().unwrap();
304                           }
305                       }
306                       tea
307                   })
308                   .collect()
309            }),
310            params: Some(Box::new(TestArgs { val: 10 })),
311        };
312        let orig_tea = vec![TestTea::default()];
313        let new_tea = pour.exec(orig_tea.clone());
314        assert_eq!(pour.get_name(), "test_pour");
315        assert_eq!(new_tea[0].x, orig_tea[0].x);
316    }
317
318    #[test]
319    fn create_skim_no_params() {
320        let skim = Skim {
321            name: String::from("test_skim"),
322            computation: Box::new(|tea: Vec<TestTea>, _args| {
323                tea.into_iter()
324                   .map(|mut tea| {
325                       tea.x = None;
326                       tea
327                   })
328                   .collect()
329            }),
330            params: None,
331        };
332        let orig_tea = vec![TestTea::default()];
333        let new_tea = skim.exec(orig_tea);
334        assert_eq!(skim.get_name(), "test_skim");
335        assert_eq!(new_tea[0].x, None);
336    }
337
338    #[test]
339    fn create_skim_with_params() {
340        let skim = Skim {
341            name: String::from("test_skim"),
342            computation: Box::new(|tea: Vec<TestTea>, args| {
343                tea.into_iter()
344                   .map(|mut tea| {
345                       match args {
346                           None => println!("Nothing"),
347                           Some(box_args) => {
348                               let box_args = box_args.as_any().downcast_ref::<TestSkimArgs>().unwrap();
349                               let field = box_args.field;
350                               match field {
351                                   "x" => tea.x = None,
352                                   _ => panic!("unknown field")
353                               };
354                           }
355                       }
356                       tea
357                   })
358                   .collect()
359            }),
360            params: Some(Box::new(TestSkimArgs { field: "x" })),
361        };
362        let orig_tea = vec![TestTea::default()];
363        let new_tea = skim.exec(orig_tea);
364        assert_eq!(skim.get_name(), "test_skim");
365        assert_eq!(new_tea[0].x, None);
366    }
367}