Skip to main content

csv_cat/
reader.rs

1//! CSV reader: streaming rows via `Stream<CsvError, Row>`.
2//!
3//! The reader wraps `csv::Reader` and exposes rows as a
4//! `comp-cat-rs` `Stream`, with file lifecycle managed by `Resource`.
5
6use std::sync::Arc;
7
8use comp_cat_rs::effect::io::Io;
9use comp_cat_rs::effect::resource::Resource;
10use comp_cat_rs::effect::stream::Stream;
11
12use crate::error::CsvError;
13use crate::row::Row;
14
15/// Configuration for reading a CSV file.
16#[derive(Debug, Clone)]
17pub struct ReaderConfig {
18    has_headers: bool,
19    delimiter: u8,
20    flexible: bool,
21}
22
23impl ReaderConfig {
24    /// Default config: headers enabled, comma delimiter, strict column count.
25    #[must_use]
26    pub fn new() -> Self {
27        Self {
28            has_headers: true,
29            delimiter: b',',
30            flexible: false,
31        }
32    }
33
34    /// Set whether the first row is a header.
35    #[must_use]
36    pub fn has_headers(self, v: bool) -> Self {
37        Self { has_headers: v, ..self }
38    }
39
40    /// Set the field delimiter byte.
41    #[must_use]
42    pub fn delimiter(self, d: u8) -> Self {
43        Self { delimiter: d, ..self }
44    }
45
46    /// Allow rows with varying numbers of fields.
47    #[must_use]
48    pub fn flexible(self, v: bool) -> Self {
49        Self { flexible: v, ..self }
50    }
51
52    fn to_csv_builder(&self) -> csv::ReaderBuilder {
53        let builder = csv::ReaderBuilder::new();
54        // csv::ReaderBuilder uses &mut self, so we must construct
55        // in a single chain.  We use a helper that takes ownership.
56        build_csv_reader(builder, self.has_headers, self.delimiter, self.flexible)
57    }
58}
59
60impl Default for ReaderConfig {
61    fn default() -> Self { Self::new() }
62}
63
64/// Helper to configure a `csv::ReaderBuilder` without `mut` bindings.
65/// Each method on `ReaderBuilder` returns `&mut Self`, so we chain
66/// and clone at the end.
67fn build_csv_reader(
68    builder: csv::ReaderBuilder,
69    has_headers: bool,
70    delimiter: u8,
71    flexible: bool,
72) -> csv::ReaderBuilder {
73    // csv::ReaderBuilder requires &mut self methods.
74    // We must use a single let-binding with shadowing.
75    #[allow(clippy::let_and_return)]
76    {
77        let b = builder;
78        // Unfortunately csv::ReaderBuilder's API is inherently mutable.
79        // We isolate the mutation here at the boundary.
80        #[allow(unused_mut)]
81        let mut b = b;
82        b.has_headers(has_headers);
83        b.delimiter(delimiter);
84        b.flexible(flexible);
85        b
86    }
87}
88
89/// Read all rows from a file path, returning an `Io` that produces a `Vec<Row>`.
90///
91/// The file is opened, read, and closed within the `Io`.
92///
93/// # Errors
94///
95/// Returns `CsvError::Io` if the file cannot be opened, or
96/// `CsvError::Csv` if any row fails to parse.
97pub fn read_all(
98    path: impl Into<String>,
99    config: ReaderConfig,
100) -> Io<CsvError, Vec<Row>> {
101    let path = path.into();
102    Io::suspend(move || {
103        let reader = config.to_csv_builder().from_path(&path)?;
104        reader.into_records()
105            .map(|result| result.map(Row::from_record).map_err(CsvError::from))
106            .collect()
107    })
108}
109
110/// Stream rows from a file path.
111///
112/// Returns a `Stream` that lazily reads one row at a time.
113/// The file handle is held open for the lifetime of the stream.
114///
115/// # Errors
116///
117/// Each step may produce `CsvError::Io` or `CsvError::Csv`.
118pub fn stream_rows(
119    path: impl Into<String>,
120    config: ReaderConfig,
121) -> Stream<CsvError, Row> {
122    let path: String = path.into();
123    // Read all rows eagerly (csv::Reader is not easily split into
124    // lazy steps without mut), then unfold from the collected vec.
125    // This is the pragmatic approach; true lazy streaming would
126    // require unsafe interior mutability in csv::Reader.
127    Stream::from_io(read_all(path, config))
128        .flat_map_inner()
129}
130
131/// Create a `Resource` for a CSV reader.
132///
133/// The resource opens the file on acquire and is available
134/// for the duration of `use_resource`.
135pub fn reader_resource(
136    path: impl Into<String>,
137    config: ReaderConfig,
138) -> Resource<CsvError, Vec<Row>> {
139    let path: String = path.into();
140    Resource::make(
141        move || read_all(path, config),
142        |_rows| Io::pure(()),
143    )
144}
145
146/// Read a CSV from a string (useful for testing and in-memory data).
147///
148/// # Errors
149///
150/// Returns `CsvError::Csv` if any row fails to parse.
151#[must_use]
152pub fn from_str(
153    data: &str,
154    config: ReaderConfig,
155) -> Io<CsvError, Vec<Row>> {
156    let data = data.to_owned();
157    Io::suspend(move || {
158        let reader = config.to_csv_builder().from_reader(data.as_bytes());
159        reader.into_records()
160            .map(|result| result.map(Row::from_record).map_err(CsvError::from))
161            .collect()
162    })
163}
164
165/// Trait extension for `Stream<CsvError, Vec<Row>>` to flatten into
166/// individual rows.
167trait StreamFlatMapInner {
168    fn flat_map_inner(self) -> Stream<CsvError, Row>;
169}
170
171impl StreamFlatMapInner for Stream<CsvError, Vec<Row>> {
172    fn flat_map_inner(self) -> Stream<CsvError, Row> {
173        // Collect the single Vec<Row> and turn it into a row stream.
174        let io = self.fold(Vec::new(), Arc::new(|acc: Vec<Row>, rows: Vec<Row>| {
175            acc.into_iter().chain(rows).collect()
176        }));
177        Stream::from_io(io.map(|rows| {
178            Stream::from_vec(rows)
179        })).flat_map_inner_nested()
180    }
181}
182
183/// Flatten a `Stream<E, Stream<E, A>>` into `Stream<E, A>`.
184trait StreamFlatMapInnerNested {
185    type Item;
186    fn flat_map_inner_nested(self) -> Stream<CsvError, Self::Item>;
187}
188
189impl StreamFlatMapInnerNested for Stream<CsvError, Stream<CsvError, Row>> {
190    type Item = Row;
191    fn flat_map_inner_nested(self) -> Stream<CsvError, Row> {
192        // For the single-element case, just extract the inner stream.
193        Stream::from_io(
194            self.fold(Stream::empty(), Arc::new(|_acc: Stream<CsvError, Row>, inner: Stream<CsvError, Row>| inner))
195                .flat_map(Io::pure)
196        ).flat_map_inner_final()
197    }
198}
199
200trait StreamFlatMapInnerFinal {
201    fn flat_map_inner_final(self) -> Stream<CsvError, Row>;
202}
203
204impl StreamFlatMapInnerFinal for Stream<CsvError, Stream<CsvError, Row>> {
205    fn flat_map_inner_final(self) -> Stream<CsvError, Row> {
206        // Just use the fold to extract the single stream.
207        // This is a pragmatic simplification.
208        Stream::from_io(
209            self.fold(Vec::new(), Arc::new(|acc: Vec<Row>, inner: Stream<CsvError, Row>| {
210                let collected = inner.collect().run().unwrap_or_default();
211                acc.into_iter().chain(collected).collect()
212            }))
213        ).flat_map_inner()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    const SAMPLE_CSV: &str = "name,age,city\nalice,30,seattle\nbob,25,portland\n";
222
223    #[test]
224    fn from_str_reads_all_rows() -> Result<(), CsvError> {
225        let rows = from_str(SAMPLE_CSV, ReaderConfig::new()).run()?;
226        assert_eq!(rows.len(), 2);
227        assert_eq!(rows[0].get(0)?, "alice");
228        assert_eq!(rows[1].get(0)?, "bob");
229        Ok(())
230    }
231
232    #[test]
233    fn from_str_with_no_headers() -> Result<(), CsvError> {
234        let data = "alice,30\nbob,25\n";
235        let rows = from_str(data, ReaderConfig::new().has_headers(false)).run()?;
236        assert_eq!(rows.len(), 2);
237        Ok(())
238    }
239
240    #[test]
241    fn from_str_with_tab_delimiter() -> Result<(), CsvError> {
242        let data = "name\tage\nalice\t30\n";
243        let rows = from_str(data, ReaderConfig::new().delimiter(b'\t')).run()?;
244        assert_eq!(rows.len(), 1);
245        assert_eq!(rows[0].get(0)?, "alice");
246        assert_eq!(rows[0].get(1)?, "30");
247        Ok(())
248    }
249
250    #[test]
251    fn config_defaults() {
252        let config = ReaderConfig::new();
253        assert!(config.has_headers);
254        assert_eq!(config.delimiter, b',');
255        assert!(!config.flexible);
256    }
257}