Skip to main content

dbsp/operator/
csv.rs

1//! Source operator that reads data from a CSV file.
2
3// TODO:
4// - Error handling (currently we just panic on reader error or deserialization
5//   error).
6// - Batching (don't read the whole file in one clock cycle)
7// - Async implementation (wait for data to become available in the reader)
8// - Sharded implementation (currently we feed all data on worker 0).
9
10use crate::utils::Tup2;
11use crate::{
12    DBData, OrdZSet, Runtime, ZWeight,
13    circuit::{
14        Scope,
15        operator_traits::{Operator, SourceOperator},
16    },
17};
18use csv::Reader as CsvReader;
19use serde::Deserialize;
20use std::{borrow::Cow, io::Read, marker::PhantomData};
21
22/// A source operator that reads records of type `T` from a CSV file.
23///
24/// The operator reads the entire file and yields its contents
25/// in the first clock cycle as a Z-set with unit weights.
26pub struct CsvSource<R, T> {
27    reader: CsvReader<R>,
28    time: usize,
29    _t: PhantomData<fn(&T)>,
30}
31
32impl<R, T> CsvSource<R, T>
33where
34    R: Read,
35{
36    /// Create a [`CsvSource`] instance from any reader using
37    /// default `CsvReader` settings.
38    pub fn from_reader(reader: R) -> Self {
39        Self::from_csv_reader(CsvReader::from_reader(reader))
40    }
41
42    /// Create a [`CsvSource`] from a pre-configured `CsvReader`.
43    pub fn from_csv_reader(reader: CsvReader<R>) -> Self {
44        Self {
45            reader,
46            time: 0,
47            _t: PhantomData,
48        }
49    }
50}
51
52impl<R, T> Operator for CsvSource<R, T>
53where
54    R: 'static,
55    T: 'static,
56{
57    fn name(&self) -> Cow<'static, str> {
58        Cow::from("CsvSource")
59    }
60    fn clock_start(&mut self, _scope: Scope) {
61        self.time = 0;
62    }
63    fn fixedpoint(&self, _scope: Scope) -> bool {
64        self.time >= 2
65    }
66    fn is_input(&self) -> bool {
67        true
68    }
69}
70
71impl<R, T> SourceOperator<OrdZSet<T>> for CsvSource<R, T>
72where
73    T: DBData + for<'de> Deserialize<'de> + 'static,
74    R: Read + 'static,
75{
76    async fn eval(&mut self) -> OrdZSet<T> {
77        let source = if self.time == 0 && Runtime::worker_index() == 0 {
78            let mut data = Vec::<Tup2<Tup2<T, ()>, ZWeight>>::new();
79
80            for x in self.reader.deserialize() {
81                data.push(Tup2(Tup2(x.unwrap(), ()), 1));
82            }
83
84            OrdZSet::<T>::from_tuples((), data)
85        } else {
86            OrdZSet::<T>::empty()
87        };
88        self.time += 1;
89
90        source
91    }
92}
93
94#[cfg(test)]
95mod test {
96    use crate::operator::CsvSource;
97    use crate::utils::Tup3;
98    use crate::{Circuit, OrdZSet, RootCircuit, zset};
99    use csv::ReaderBuilder;
100
101    #[test]
102    fn test_csv_reader() {
103        let circuit = RootCircuit::build(move |circuit| {
104            let expected = zset! {
105                Tup3(18, 3, 237641) => 1,
106                Tup3(237641, 4, 18) => 1,
107                Tup3(18, 5, 21) => 1,
108                Tup3(18, 5, 22) => 1,
109                Tup3(18, 5, 23) => 1,
110                Tup3(18, 5, 24) => 1,
111                Tup3(18, 5, 25) => 1,
112            };
113            let csv_data = "\
11418,3,237641
115237641,4,18
11618,5,21
11718,5,22
11818,5,23
11918,5,24
12018,5,25
121";
122            let reader = ReaderBuilder::new()
123                .delimiter(b',')
124                .has_headers(false)
125                .from_reader(csv_data.as_bytes());
126            circuit
127                .add_source(CsvSource::from_csv_reader(reader))
128                .inspect(move |data: &OrdZSet<Tup3<u64, u64, u64>>| assert_eq!(data, &expected));
129            Ok(())
130        })
131        .unwrap()
132        .0;
133
134        circuit.transaction().unwrap();
135    }
136}