1use std::sync::Arc;
7
8use comp_cat_rs::effect::io::Io;
9use comp_cat_rs::effect::resource::Resource;
10use comp_cat_rs::effect::stream::Stream;
11
12use crate::error::CsvError;
13use crate::row::Row;
14
15#[derive(Debug, Clone)]
17pub struct ReaderConfig {
18 has_headers: bool,
19 delimiter: u8,
20 flexible: bool,
21}
22
23impl ReaderConfig {
24 #[must_use]
26 pub fn new() -> Self {
27 Self {
28 has_headers: true,
29 delimiter: b',',
30 flexible: false,
31 }
32 }
33
34 #[must_use]
36 pub fn has_headers(self, v: bool) -> Self {
37 Self { has_headers: v, ..self }
38 }
39
40 #[must_use]
42 pub fn delimiter(self, d: u8) -> Self {
43 Self { delimiter: d, ..self }
44 }
45
46 #[must_use]
48 pub fn flexible(self, v: bool) -> Self {
49 Self { flexible: v, ..self }
50 }
51
52 fn to_csv_builder(&self) -> csv::ReaderBuilder {
53 let builder = csv::ReaderBuilder::new();
54 build_csv_reader(builder, self.has_headers, self.delimiter, self.flexible)
57 }
58}
59
60impl Default for ReaderConfig {
61 fn default() -> Self { Self::new() }
62}
63
64fn build_csv_reader(
68 builder: csv::ReaderBuilder,
69 has_headers: bool,
70 delimiter: u8,
71 flexible: bool,
72) -> csv::ReaderBuilder {
73 #[allow(clippy::let_and_return)]
76 {
77 let b = builder;
78 #[allow(unused_mut)]
81 let mut b = b;
82 b.has_headers(has_headers);
83 b.delimiter(delimiter);
84 b.flexible(flexible);
85 b
86 }
87}
88
89pub fn read_all(
98 path: impl Into<String>,
99 config: ReaderConfig,
100) -> Io<CsvError, Vec<Row>> {
101 let path = path.into();
102 Io::suspend(move || {
103 let reader = config.to_csv_builder().from_path(&path)?;
104 reader.into_records()
105 .map(|result| result.map(Row::from_record).map_err(CsvError::from))
106 .collect()
107 })
108}
109
110pub fn stream_rows(
119 path: impl Into<String>,
120 config: ReaderConfig,
121) -> Stream<CsvError, Row> {
122 let path: String = path.into();
123 Stream::from_io(read_all(path, config))
128 .flat_map_inner()
129}
130
131pub fn reader_resource(
136 path: impl Into<String>,
137 config: ReaderConfig,
138) -> Resource<CsvError, Vec<Row>> {
139 let path: String = path.into();
140 Resource::make(
141 move || read_all(path, config),
142 |_rows| Io::pure(()),
143 )
144}
145
146#[must_use]
152pub fn from_str(
153 data: &str,
154 config: ReaderConfig,
155) -> Io<CsvError, Vec<Row>> {
156 let data = data.to_owned();
157 Io::suspend(move || {
158 let reader = config.to_csv_builder().from_reader(data.as_bytes());
159 reader.into_records()
160 .map(|result| result.map(Row::from_record).map_err(CsvError::from))
161 .collect()
162 })
163}
164
165trait StreamFlatMapInner {
168 fn flat_map_inner(self) -> Stream<CsvError, Row>;
169}
170
171impl StreamFlatMapInner for Stream<CsvError, Vec<Row>> {
172 fn flat_map_inner(self) -> Stream<CsvError, Row> {
173 let io = self.fold(Vec::new(), Arc::new(|acc: Vec<Row>, rows: Vec<Row>| {
175 acc.into_iter().chain(rows).collect()
176 }));
177 Stream::from_io(io.map(|rows| {
178 Stream::from_vec(rows)
179 })).flat_map_inner_nested()
180 }
181}
182
183trait StreamFlatMapInnerNested {
185 type Item;
186 fn flat_map_inner_nested(self) -> Stream<CsvError, Self::Item>;
187}
188
189impl StreamFlatMapInnerNested for Stream<CsvError, Stream<CsvError, Row>> {
190 type Item = Row;
191 fn flat_map_inner_nested(self) -> Stream<CsvError, Row> {
192 Stream::from_io(
194 self.fold(Stream::empty(), Arc::new(|_acc: Stream<CsvError, Row>, inner: Stream<CsvError, Row>| inner))
195 .flat_map(Io::pure)
196 ).flat_map_inner_final()
197 }
198}
199
200trait StreamFlatMapInnerFinal {
201 fn flat_map_inner_final(self) -> Stream<CsvError, Row>;
202}
203
204impl StreamFlatMapInnerFinal for Stream<CsvError, Stream<CsvError, Row>> {
205 fn flat_map_inner_final(self) -> Stream<CsvError, Row> {
206 Stream::from_io(
209 self.fold(Vec::new(), Arc::new(|acc: Vec<Row>, inner: Stream<CsvError, Row>| {
210 let collected = inner.collect().run().unwrap_or_default();
211 acc.into_iter().chain(collected).collect()
212 }))
213 ).flat_map_inner()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 const SAMPLE_CSV: &str = "name,age,city\nalice,30,seattle\nbob,25,portland\n";
222
223 #[test]
224 fn from_str_reads_all_rows() -> Result<(), CsvError> {
225 let rows = from_str(SAMPLE_CSV, ReaderConfig::new()).run()?;
226 assert_eq!(rows.len(), 2);
227 assert_eq!(rows[0].get(0)?, "alice");
228 assert_eq!(rows[1].get(0)?, "bob");
229 Ok(())
230 }
231
232 #[test]
233 fn from_str_with_no_headers() -> Result<(), CsvError> {
234 let data = "alice,30\nbob,25\n";
235 let rows = from_str(data, ReaderConfig::new().has_headers(false)).run()?;
236 assert_eq!(rows.len(), 2);
237 Ok(())
238 }
239
240 #[test]
241 fn from_str_with_tab_delimiter() -> Result<(), CsvError> {
242 let data = "name\tage\nalice\t30\n";
243 let rows = from_str(data, ReaderConfig::new().delimiter(b'\t')).run()?;
244 assert_eq!(rows.len(), 1);
245 assert_eq!(rows[0].get(0)?, "alice");
246 assert_eq!(rows[0].get(1)?, "30");
247 Ok(())
248 }
249
250 #[test]
251 fn config_defaults() {
252 let config = ReaderConfig::new();
253 assert!(config.has_headers);
254 assert_eq!(config.delimiter, b',');
255 assert!(!config.flexible);
256 }
257}