1use async_trait::async_trait;
2use fluxus_utils::models::{Record, StreamError, StreamResult};
3use futures::TryStreamExt;
4use reqwest;
5use std::io::{self, Error, ErrorKind};
6use std::path::PathBuf;
7use std::time::Duration;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio_util::io::StreamReader;
11
12use super::Source;
13
14pub struct CsvSource {
16 source: CsvSourceType,
17 reader: Option<Box<dyn tokio::io::AsyncBufRead + Unpin + Send + Sync>>,
18}
19
20enum CsvSourceType {
21 LocalFile(PathBuf),
22 RemoteUrl(String),
23}
24
25impl CsvSource {
26 pub fn new<P: Into<PathBuf>>(path: P) -> Self {
28 Self {
29 source: CsvSourceType::LocalFile(path.into()),
30 reader: None,
31 }
32 }
33
34 pub fn from_url<S: Into<String>>(url: S) -> Self {
36 Self {
37 source: CsvSourceType::RemoteUrl(url.into()),
38 reader: None,
39 }
40 }
41}
42
43#[async_trait]
44impl Source<String> for CsvSource {
45 async fn init(&mut self) -> StreamResult<()> {
46 match &self.source {
47 CsvSourceType::LocalFile(path) => {
48 let file = File::open(path)
49 .await
50 .map_err(|e| StreamError::Io(Error::new(ErrorKind::Other, format!("{}", e))))?;
51 self.reader = Some(Box::new(BufReader::new(file)));
52 }
53 CsvSourceType::RemoteUrl(url) => {
54 let client = reqwest::Client::builder()
55 .timeout(Duration::from_secs(30))
56 .build()
57 .map_err(|_e| {
58 StreamError::Io(io::Error::new(
59 io::ErrorKind::Other,
60 "create http client error",
61 ))
62 })?;
63 let response = client.get(url).send().await.map_err(|e| {
64 StreamError::Io(Error::new(
65 ErrorKind::Other,
66 format!("Failed to fetch URL: {}", e),
67 ))
68 })?;
69
70 if !response.status().is_success() {
71 return Err(StreamError::Io(Error::new(
72 ErrorKind::Other,
73 format!("HTTP error: {}", response.status()),
74 )));
75 }
76
77 let byte_stream = response
78 .bytes_stream()
79 .map_err(|e| Error::new(ErrorKind::Other, format!("{}", e)));
80
81 let reader = StreamReader::new(byte_stream);
82 self.reader = Some(Box::new(BufReader::new(reader)));
83 }
84 }
85 Ok(())
86 }
87
88 async fn next(&mut self) -> StreamResult<Option<Record<String>>> {
89 if let Some(reader) = &mut self.reader {
90 let mut line = String::new();
91 match reader.read_line(&mut line).await {
92 Ok(0) => Ok(None), Ok(_) => {
94 let line = line.trim().to_string();
95 Ok(Some(Record::new(line)))
96 }
97 Err(e) => Err(e.into()),
98 }
99 } else {
100 Ok(None)
101 }
102 }
103
104 async fn close(&mut self) -> StreamResult<()> {
105 self.reader = None;
106 Ok(())
107 }
108}