use crate::utils::Tup2;
use crate::{
DBData, OrdZSet, Runtime, ZWeight,
circuit::{
Scope,
operator_traits::{Operator, SourceOperator},
},
};
use csv::Reader as CsvReader;
use serde::Deserialize;
use std::{borrow::Cow, io::Read, marker::PhantomData};
pub struct CsvSource<R, T> {
reader: CsvReader<R>,
time: usize,
_t: PhantomData<fn(&T)>,
}
impl<R, T> CsvSource<R, T>
where
R: Read,
{
pub fn from_reader(reader: R) -> Self {
Self::from_csv_reader(CsvReader::from_reader(reader))
}
pub fn from_csv_reader(reader: CsvReader<R>) -> Self {
Self {
reader,
time: 0,
_t: PhantomData,
}
}
}
impl<R, T> Operator for CsvSource<R, T>
where
R: 'static,
T: 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("CsvSource")
}
fn clock_start(&mut self, _scope: Scope) {
self.time = 0;
}
fn fixedpoint(&self, _scope: Scope) -> bool {
self.time >= 2
}
}
impl<R, T> SourceOperator<OrdZSet<T>> for CsvSource<R, T>
where
T: DBData + for<'de> Deserialize<'de> + 'static,
R: Read + 'static,
{
async fn eval(&mut self) -> OrdZSet<T> {
let source = if self.time == 0 && Runtime::worker_index() == 0 {
let mut data = Vec::<Tup2<Tup2<T, ()>, ZWeight>>::new();
for x in self.reader.deserialize() {
data.push(Tup2(Tup2(x.unwrap(), ()), 1));
}
OrdZSet::<T>::from_tuples((), data)
} else {
OrdZSet::<T>::empty()
};
self.time += 1;
source
}
}
#[cfg(test)]
mod test {
use crate::operator::CsvSource;
use crate::utils::Tup3;
use crate::{Circuit, OrdZSet, RootCircuit, zset};
use csv::ReaderBuilder;
#[test]
fn test_csv_reader() {
let circuit = RootCircuit::build(move |circuit| {
let expected = zset! {
Tup3(18, 3, 237641) => 1,
Tup3(237641, 4, 18) => 1,
Tup3(18, 5, 21) => 1,
Tup3(18, 5, 22) => 1,
Tup3(18, 5, 23) => 1,
Tup3(18, 5, 24) => 1,
Tup3(18, 5, 25) => 1,
};
let csv_data = "\
18,3,237641
237641,4,18
18,5,21
18,5,22
18,5,23
18,5,24
18,5,25
";
let reader = ReaderBuilder::new()
.delimiter(b',')
.has_headers(false)
.from_reader(csv_data.as_bytes());
circuit
.add_source(CsvSource::from_csv_reader(reader))
.inspect(move |data: &OrdZSet<Tup3<u64, u64, u64>>| assert_eq!(data, &expected));
Ok(())
})
.unwrap()
.0;
circuit.transaction().unwrap();
}
}