fluxus_core/source/
csv.rs

1use crate::models::{Record, StreamResult};
2use async_trait::async_trait;
3use std::path::PathBuf;
4use tokio::fs::File;
5use tokio::io::{AsyncBufReadExt, BufReader};
6
7use super::Source;
8
9/// A source that reads CSV files
10pub struct CsvSource {
11    path: PathBuf,
12    reader: Option<BufReader<File>>,
13}
14
15impl CsvSource {
16    /// Create a new CSV source from a file path
17    pub fn new<P: Into<PathBuf>>(path: P) -> Self {
18        Self {
19            path: path.into(),
20            reader: None,
21        }
22    }
23}
24
25#[async_trait]
26impl Source<String> for CsvSource {
27    async fn init(&mut self) -> StreamResult<()> {
28        let file = File::open(&self.path).await?;
29        self.reader = Some(BufReader::new(file));
30        Ok(())
31    }
32
33    async fn next(&mut self) -> StreamResult<Option<Record<String>>> {
34        if let Some(reader) = &mut self.reader {
35            let mut line = String::new();
36            match reader.read_line(&mut line).await {
37                Ok(0) => Ok(None), // EOF
38                Ok(_) => {
39                    let line = line.trim().to_string();
40                    Ok(Some(Record::new(line)))
41                }
42                Err(e) => Err(e.into()),
43            }
44        } else {
45            Ok(None)
46        }
47    }
48
49    async fn close(&mut self) -> StreamResult<()> {
50        self.reader = None;
51        Ok(())
52    }
53}