fluxus_core/source/
csv.rs1use crate::models::{Record, StreamResult};
2use async_trait::async_trait;
3use std::path::PathBuf;
4use tokio::fs::File;
5use tokio::io::{AsyncBufReadExt, BufReader};
6
7use super::Source;
8
9pub struct CsvSource {
11 path: PathBuf,
12 reader: Option<BufReader<File>>,
13}
14
15impl CsvSource {
16 pub fn new<P: Into<PathBuf>>(path: P) -> Self {
18 Self {
19 path: path.into(),
20 reader: None,
21 }
22 }
23}
24
25#[async_trait]
26impl Source<String> for CsvSource {
27 async fn init(&mut self) -> StreamResult<()> {
28 let file = File::open(&self.path).await?;
29 self.reader = Some(BufReader::new(file));
30 Ok(())
31 }
32
33 async fn next(&mut self) -> StreamResult<Option<Record<String>>> {
34 if let Some(reader) = &mut self.reader {
35 let mut line = String::new();
36 match reader.read_line(&mut line).await {
37 Ok(0) => Ok(None), Ok(_) => {
39 let line = line.trim().to_string();
40 Ok(Some(Record::new(line)))
41 }
42 Err(e) => Err(e.into()),
43 }
44 } else {
45 Ok(None)
46 }
47 }
48
49 async fn close(&mut self) -> StreamResult<()> {
50 self.reader = None;
51 Ok(())
52 }
53}