1use crate::config::CsvSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use serde_json::{Map, Value};
7use std::pin::Pin;
8
9pub struct CsvSource {
15 config: CsvSourceConfig,
16}
17
18impl CsvSource {
19 pub fn new(config: CsvSourceConfig) -> Self {
21 Self { config }
22 }
23}
24
25#[async_trait]
26impl faucet_core::Source for CsvSource {
27 async fn fetch_with_context(
28 &self,
29 context: &std::collections::HashMap<String, serde_json::Value>,
30 ) -> Result<Vec<Value>, FaucetError> {
31 use futures::StreamExt;
32 let mut all = Vec::new();
33 let mut s = self.stream_pages(context, self.config.batch_size);
34 while let Some(page) = s.next().await {
35 let page = page?;
36 all.extend(page.records);
37 }
38 Ok(all)
39 }
40
41 fn stream_pages<'a>(
59 &'a self,
60 context: &'a std::collections::HashMap<String, Value>,
61 _batch_size: usize,
62 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
63 let batch_size = self.config.batch_size;
64
65 Box::pin(async_stream::try_stream! {
66 use futures::StreamExt as _;
67
68 let mut config = self.config.clone();
69 if !context.is_empty() {
70 config.path = faucet_core::util::substitute_context(&config.path, context);
71 }
72
73 let file = tokio::fs::File::open(&config.path).await.map_err(|e| {
74 FaucetError::Config(format!(
75 "failed to open CSV file '{}': {e}",
76 config.path
77 ))
78 })?;
79 let reader = tokio::io::BufReader::new(file);
80 #[cfg(feature = "compression")]
81 let reader = {
82 let codec = config.compression.resolve(&config.path);
83 faucet_core::compression::warn_mismatch(&config.path, codec);
84 faucet_core::compression::wrap_async_reader(reader, codec)
85 };
86
87 let mut csv_reader = csv_async::AsyncReaderBuilder::new()
92 .has_headers(false)
93 .delimiter(config.delimiter)
94 .quote(config.quote)
95 .flexible(true)
98 .create_reader(reader);
99
100 let mut records = csv_reader.records();
101
102 let headers: Vec<String> = if config.has_headers {
104 match records.next().await {
105 Some(rec) => {
106 let rec = rec.map_err(|e| FaucetError::Config(format!(
107 "CSV header parse error in '{}': {e}", config.path
108 )))?;
109 rec.iter().map(|f| f.to_string()).collect()
110 }
111 None => Vec::new(),
112 }
113 } else {
114 Vec::new()
115 };
116
117 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
118 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
119 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
120 let mut total = 0usize;
121 let mut row_idx = 0usize;
122
123 while let Some(rec) = records.next().await {
124 let record = rec.map_err(|e| FaucetError::Config(format!(
125 "CSV parse error at line {} in '{}': {e}",
126 row_idx + 1 + usize::from(config.has_headers),
130 config.path
131 )))?;
132
133 let mut obj = Map::new();
134 for (col_idx, field) in record.iter().enumerate() {
135 let key = if col_idx < headers.len() {
136 headers[col_idx].clone()
137 } else {
138 format!("column_{col_idx}")
139 };
140 obj.insert(key, Value::String(field.to_string()));
141 }
142 buffer.push(Value::Object(obj));
143 row_idx += 1;
144
145 if buffer.len() >= chunk {
146 let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
147 total += page.len();
148 yield StreamPage { records: page, bookmark: None };
149 }
150 }
151
152 if !buffer.is_empty() {
153 total += buffer.len();
154 yield StreamPage { records: buffer, bookmark: None };
155 }
156
157 tracing::info!(
158 rows = total,
159 batch_size,
160 path = %config.path,
161 "CSV source stream complete",
162 );
163 })
164 }
165
166 fn config_schema(&self) -> serde_json::Value {
167 serde_json::to_value(faucet_core::schema_for!(CsvSourceConfig))
168 .expect("schema serialization")
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use faucet_core::Source;
176 use std::io::Write;
177 use tempfile::NamedTempFile;
178
179 #[tokio::test]
180 async fn reads_csv_with_headers() {
181 let mut tmp = NamedTempFile::new().unwrap();
182 writeln!(tmp, "id,name,age").unwrap();
183 writeln!(tmp, "1,Alice,30").unwrap();
184 writeln!(tmp, "2,Bob,25").unwrap();
185 tmp.flush().unwrap();
186
187 let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
188 let source = CsvSource::new(config);
189 let records = source.fetch_all().await.unwrap();
190
191 assert_eq!(records.len(), 2);
192 assert_eq!(records[0]["id"], "1");
193 assert_eq!(records[0]["name"], "Alice");
194 assert_eq!(records[0]["age"], "30");
195 assert_eq!(records[1]["name"], "Bob");
196 }
197
198 #[tokio::test]
199 async fn reads_csv_without_headers() {
200 let mut tmp = NamedTempFile::new().unwrap();
201 writeln!(tmp, "Alice,30").unwrap();
202 writeln!(tmp, "Bob,25").unwrap();
203 tmp.flush().unwrap();
204
205 let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).has_headers(false);
206 let source = CsvSource::new(config);
207 let records = source.fetch_all().await.unwrap();
208
209 assert_eq!(records.len(), 2);
210 assert_eq!(records[0]["column_0"], "Alice");
211 assert_eq!(records[0]["column_1"], "30");
212 }
213
214 #[tokio::test]
215 async fn reads_tsv_with_custom_delimiter() {
216 let mut tmp = NamedTempFile::new().unwrap();
217 writeln!(tmp, "id\tname").unwrap();
218 writeln!(tmp, "1\tAlice").unwrap();
219 tmp.flush().unwrap();
220
221 let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).delimiter(b'\t');
222 let source = CsvSource::new(config);
223 let records = source.fetch_all().await.unwrap();
224
225 assert_eq!(records.len(), 1);
226 assert_eq!(records[0]["id"], "1");
227 assert_eq!(records[0]["name"], "Alice");
228 }
229
230 #[tokio::test]
231 async fn reads_quoted_field_with_embedded_newline() {
232 let mut tmp = NamedTempFile::new().unwrap();
235 write!(tmp, "id,note\n1,\"line one\nline two\"\n2,\"plain\"\n").unwrap();
236 tmp.flush().unwrap();
237
238 let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
239 let source = CsvSource::new(config);
240 let records = source.fetch_all().await.unwrap();
241
242 assert_eq!(records.len(), 2);
243 assert_eq!(records[0]["id"], "1");
244 assert_eq!(records[0]["note"], "line one\nline two");
245 assert_eq!(records[1]["note"], "plain");
246 }
247
248 #[tokio::test]
249 async fn reads_quoted_field_with_embedded_delimiter() {
250 let mut tmp = NamedTempFile::new().unwrap();
251 write!(tmp, "id,name\n1,\"Doe, John\"\n").unwrap();
252 tmp.flush().unwrap();
253
254 let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
255 let source = CsvSource::new(config);
256 let records = source.fetch_all().await.unwrap();
257 assert_eq!(records.len(), 1);
258 assert_eq!(records[0]["name"], "Doe, John");
259 }
260
261 #[tokio::test]
262 async fn empty_csv_returns_empty_vec() {
263 let mut tmp = NamedTempFile::new().unwrap();
264 writeln!(tmp, "id,name").unwrap();
265 tmp.flush().unwrap();
266
267 let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
268 let source = CsvSource::new(config);
269 let records = source.fetch_all().await.unwrap();
270
271 assert!(records.is_empty());
272 }
273
274 #[tokio::test]
275 async fn missing_file_returns_error() {
276 let config = CsvSourceConfig::new("/nonexistent/path/data.csv");
277 let source = CsvSource::new(config);
278 let result = source.fetch_all().await;
279
280 assert!(result.is_err());
281 }
282
283 #[cfg(feature = "compression")]
284 #[tokio::test]
285 async fn roundtrip_gzip_via_stream_pages() {
286 use faucet_core::CompressionConfig;
287 let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
288 let path = tmp.path().to_str().unwrap().to_string();
289 let plain = b"id,name\n1,Alice\n2,Bob\n";
290 let compressed =
291 faucet_core::compression::compress_buf(plain, faucet_core::Compression::Gzip).unwrap();
292 tokio::fs::write(&path, &compressed).await.unwrap();
293
294 let config = CsvSourceConfig::new(&path).compression(CompressionConfig::Auto);
295 let source = CsvSource::new(config);
296 let records = source.fetch_all().await.unwrap();
297 assert_eq!(records.len(), 2);
298 assert_eq!(records[0]["name"], "Alice");
299 assert_eq!(records[1]["name"], "Bob");
300 }
301
302 #[cfg(feature = "compression")]
303 #[tokio::test]
304 async fn roundtrip_zstd_via_stream_pages() {
305 use faucet_core::CompressionConfig;
306 let tmp = NamedTempFile::with_suffix(".csv.zst").unwrap();
307 let path = tmp.path().to_str().unwrap().to_string();
308 let plain = b"id,name\n1,Carol\n";
309 let compressed =
310 faucet_core::compression::compress_buf(plain, faucet_core::Compression::Zstd).unwrap();
311 tokio::fs::write(&path, &compressed).await.unwrap();
312
313 let config = CsvSourceConfig::new(&path).compression(CompressionConfig::Auto);
314 let source = CsvSource::new(config);
315 let records = source.fetch_all().await.unwrap();
316 assert_eq!(records.len(), 1);
317 assert_eq!(records[0]["name"], "Carol");
318 }
319}