fluxus_sources/
csv.rs

1use async_trait::async_trait;
2use fluxus_utils::models::{Record, StreamError, StreamResult};
3use futures::TryStreamExt;
4use reqwest;
5use std::io::{self, Error, ErrorKind};
6use std::path::PathBuf;
7use std::time::Duration;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio_util::io::StreamReader;
11
12use super::Source;
13
14/// A source that reads CSV files
15pub struct CsvSource {
16    source: CsvSourceType,
17    reader: Option<Box<dyn tokio::io::AsyncBufRead + Unpin + Send + Sync>>,
18}
19
20enum CsvSourceType {
21    LocalFile(PathBuf),
22    RemoteUrl(String),
23}
24
25impl CsvSource {
26    /// Create a new CSV source from a local file path
27    pub fn new<P: Into<PathBuf>>(path: P) -> Self {
28        Self {
29            source: CsvSourceType::LocalFile(path.into()),
30            reader: None,
31        }
32    }
33
34    /// Create a new CSV source from a remote URL
35    pub fn from_url<S: Into<String>>(url: S) -> Self {
36        Self {
37            source: CsvSourceType::RemoteUrl(url.into()),
38            reader: None,
39        }
40    }
41}
42
43#[async_trait]
44impl Source<String> for CsvSource {
45    async fn init(&mut self) -> StreamResult<()> {
46        match &self.source {
47            CsvSourceType::LocalFile(path) => {
48                let file = File::open(path)
49                    .await
50                    .map_err(|e| StreamError::Io(Error::new(ErrorKind::Other, format!("{}", e))))?;
51                self.reader = Some(Box::new(BufReader::new(file)));
52            }
53            CsvSourceType::RemoteUrl(url) => {
54                let client = reqwest::Client::builder()
55                    .timeout(Duration::from_secs(30))
56                    .build()
57                    .map_err(|_e| {
58                        StreamError::Io(io::Error::new(
59                            io::ErrorKind::Other,
60                            "create http client error",
61                        ))
62                    })?;
63                let response = client.get(url).send().await.map_err(|e| {
64                    StreamError::Io(Error::new(
65                        ErrorKind::Other,
66                        format!("Failed to fetch URL: {}", e),
67                    ))
68                })?;
69
70                if !response.status().is_success() {
71                    return Err(StreamError::Io(Error::new(
72                        ErrorKind::Other,
73                        format!("HTTP error: {}", response.status()),
74                    )));
75                }
76
77                let byte_stream = response
78                    .bytes_stream()
79                    .map_err(|e| Error::new(ErrorKind::Other, format!("{}", e)));
80
81                let reader = StreamReader::new(byte_stream);
82                self.reader = Some(Box::new(BufReader::new(reader)));
83            }
84        }
85        Ok(())
86    }
87
88    async fn next(&mut self) -> StreamResult<Option<Record<String>>> {
89        if let Some(reader) = &mut self.reader {
90            let mut line = String::new();
91            match reader.read_line(&mut line).await {
92                Ok(0) => Ok(None), // EOF
93                Ok(_) => {
94                    let line = line.trim().to_string();
95                    Ok(Some(Record::new(line)))
96                }
97                Err(e) => Err(e.into()),
98            }
99        } else {
100            Ok(None)
101        }
102    }
103
104    async fn close(&mut self) -> StreamResult<()> {
105        self.reader = None;
106        Ok(())
107    }
108}