1use crate::utils::Tup2;
11use crate::{
12 DBData, OrdZSet, Runtime, ZWeight,
13 circuit::{
14 Scope,
15 operator_traits::{Operator, SourceOperator},
16 },
17};
18use csv::Reader as CsvReader;
19use serde::Deserialize;
20use std::{borrow::Cow, io::Read, marker::PhantomData};
21
22pub struct CsvSource<R, T> {
27 reader: CsvReader<R>,
28 time: usize,
29 _t: PhantomData<fn(&T)>,
30}
31
32impl<R, T> CsvSource<R, T>
33where
34 R: Read,
35{
36 pub fn from_reader(reader: R) -> Self {
39 Self::from_csv_reader(CsvReader::from_reader(reader))
40 }
41
42 pub fn from_csv_reader(reader: CsvReader<R>) -> Self {
44 Self {
45 reader,
46 time: 0,
47 _t: PhantomData,
48 }
49 }
50}
51
52impl<R, T> Operator for CsvSource<R, T>
53where
54 R: 'static,
55 T: 'static,
56{
57 fn name(&self) -> Cow<'static, str> {
58 Cow::from("CsvSource")
59 }
60 fn clock_start(&mut self, _scope: Scope) {
61 self.time = 0;
62 }
63 fn fixedpoint(&self, _scope: Scope) -> bool {
64 self.time >= 2
65 }
66 fn is_input(&self) -> bool {
67 true
68 }
69}
70
71impl<R, T> SourceOperator<OrdZSet<T>> for CsvSource<R, T>
72where
73 T: DBData + for<'de> Deserialize<'de> + 'static,
74 R: Read + 'static,
75{
76 async fn eval(&mut self) -> OrdZSet<T> {
77 let source = if self.time == 0 && Runtime::worker_index() == 0 {
78 let mut data = Vec::<Tup2<Tup2<T, ()>, ZWeight>>::new();
79
80 for x in self.reader.deserialize() {
81 data.push(Tup2(Tup2(x.unwrap(), ()), 1));
82 }
83
84 OrdZSet::<T>::from_tuples((), data)
85 } else {
86 OrdZSet::<T>::empty()
87 };
88 self.time += 1;
89
90 source
91 }
92}
93
94#[cfg(test)]
95mod test {
96 use crate::operator::CsvSource;
97 use crate::utils::Tup3;
98 use crate::{Circuit, OrdZSet, RootCircuit, zset};
99 use csv::ReaderBuilder;
100
101 #[test]
102 fn test_csv_reader() {
103 let circuit = RootCircuit::build(move |circuit| {
104 let expected = zset! {
105 Tup3(18, 3, 237641) => 1,
106 Tup3(237641, 4, 18) => 1,
107 Tup3(18, 5, 21) => 1,
108 Tup3(18, 5, 22) => 1,
109 Tup3(18, 5, 23) => 1,
110 Tup3(18, 5, 24) => 1,
111 Tup3(18, 5, 25) => 1,
112 };
113 let csv_data = "\
11418,3,237641
115237641,4,18
11618,5,21
11718,5,22
11818,5,23
11918,5,24
12018,5,25
121";
122 let reader = ReaderBuilder::new()
123 .delimiter(b',')
124 .has_headers(false)
125 .from_reader(csv_data.as_bytes());
126 circuit
127 .add_source(CsvSource::from_csv_reader(reader))
128 .inspect(move |data: &OrdZSet<Tup3<u64, u64, u64>>| assert_eq!(data, &expected));
129 Ok(())
130 })
131 .unwrap()
132 .0;
133
134 circuit.transaction().unwrap();
135 }
136}