use rettle::{
Ingredient,
Argument,
Fill,
Brewery,
make_tea,
};
use std::sync::{Arc, RwLock};
use std::io::{BufReader};
use std::io::prelude::*;
use std::fs::File;
use std::any::Any;
use std::fmt::Debug;
use nom::IResult;
pub struct FillLogArg<T> where
T: Send + Debug + Sized + 'static,
{
filepath: String,
batch_size: usize,
parser: fn(&str) -> IResult<&str, T>,
}
impl<T> FillLogArg<T> where
T: Send + Debug + Sized + 'static,
{
pub fn new(filepath: &str, batch_size: usize, parser: fn(&str) -> IResult<&str, T>) -> FillLogArg<T> {
let filepath = String::from(filepath);
FillLogArg { filepath, batch_size, parser }
}
}
impl<T> Argument for FillLogArg<T> where
T: Send + Debug + Sized + 'static,
{
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct FillLogTea {}
impl FillLogTea {
pub fn new<T: Send + Debug + Sized + 'static>(name: &str, source: &str, params: FillLogArg<T>) -> Box<Fill<T>> {
Box::new(Fill {
name: String::from(name),
source: String::from(source),
computation: Box::new(|args, brewery, recipe| {
fill_from_log::<T>(args, brewery, recipe);
}),
params: Some(Box::new(params))
})
}
}
fn call_brewery<T: Send + 'static>(brewery: &Brewery, recipe: Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>, tea_batch: Vec<T>) {
brewery.take_order(|| {
make_tea(tea_batch, recipe);
});
}
fn fill_from_log<T: Send + Debug + Sized + 'static>(args: &Option<Box<dyn Argument + Send>>, brewery: &Brewery, recipe: Arc<RwLock<Vec<Box<dyn Ingredient<T> + Send + Sync>>>>) {
match args {
None => (),
Some(box_args) => {
let box_args = box_args.as_any().downcast_ref::<FillLogArg<T>>().unwrap();
println!("{:?}", &box_args.filepath);
let f = File::open(&box_args.filepath);
let reader = match f {
Ok(f) => {
BufReader::new(f)
},
Err(e) => {
println!("Failed opening file! Error: {:}", e);
return
},
};
let parser = &box_args.parser;
let mut tea_batch: Vec<T> = Vec::with_capacity(box_args.batch_size);
for line in reader.lines() {
let line = line.unwrap();
if tea_batch.len() == box_args.batch_size {
let recipe = Arc::clone(&recipe);
call_brewery(brewery, recipe, tea_batch);
tea_batch = Vec::with_capacity(box_args.batch_size);
}
let (_input, tea) = parser(&line).unwrap();
tea_batch.push(tea);
}
let recipe = Arc::clone(&recipe);
call_brewery(brewery, recipe, tea_batch);
}
}
}
#[cfg(test)]
mod tests {
use super::{FillLogArg, FillLogTea};
use rettle::Pot;
use nom::IResult;
#[derive(Default, Clone, Debug)]
struct TestLogTea {
log_type: String,
datetime: String,
msg: String,
}
fn test_parser(input: &str) -> IResult<&str, TestLogTea> {
Ok((input, TestLogTea::default()))
}
#[test]
fn create_log_args() {
let log_args = FillLogArg::new("fixtures/test.csv", 50, test_parser);
assert_eq!(log_args.filepath, "fixtures/test.csv");
assert_eq!(log_args.batch_size, 50);
}
#[test]
fn create_fill_logtea() {
let log_args = FillLogArg::new("fixtures/test.csv", 50, test_parser);
let fill_logtea = FillLogTea::new::<TestLogTea>("test_log", "fixture", log_args);
let new_pot = Pot::new()
.add_source(fill_logtea);
assert_eq!(new_pot.get_sources().len(), 1);
assert_eq!(new_pot.get_sources()[0].get_name(), "test_log");
}
}