1use crate::config::{S3FileFormat, S3SourceConfig};
4use async_trait::async_trait;
5use aws_sdk_s3::Client;
6use faucet_core::{FaucetError, Stream, StreamPage};
7use futures::stream::{self, StreamExt, TryStreamExt};
8use serde_json::Value;
9use std::pin::Pin;
10use tokio::io::AsyncBufReadExt;
11
12pub struct S3Source {
14 config: S3SourceConfig,
15 client: Client,
16}
17
18impl S3Source {
19 pub async fn new(config: S3SourceConfig) -> Result<Self, FaucetError> {
23 let client = Self::build_client(&config).await?;
24 Ok(Self { config, client })
25 }
26
27 async fn build_client(config: &S3SourceConfig) -> Result<Client, FaucetError> {
29 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
30
31 if let Some(ref region) = config.region {
32 config_loader = config_loader.region(aws_config::Region::new(region.clone()));
33 }
34
35 if let Some(ref endpoint) = config.endpoint_url {
36 config_loader = config_loader.endpoint_url(endpoint);
37 }
38
39 let sdk_config = config_loader.load().await;
40 let client = Client::new(&sdk_config);
41 Ok(client)
42 }
43
44 async fn list_object_keys(
49 &self,
50 prefix_override: Option<&str>,
51 ) -> Result<Vec<String>, FaucetError> {
52 let mut keys = Vec::new();
53 let mut continuation_token: Option<String> = None;
54
55 let effective_prefix = prefix_override.or(self.config.prefix.as_deref());
56
57 loop {
58 let mut req = self.client.list_objects_v2().bucket(&self.config.bucket);
59
60 if let Some(prefix) = effective_prefix {
61 req = req.prefix(prefix);
62 }
63
64 if let Some(ref token) = continuation_token {
65 req = req.continuation_token(token);
66 }
67
68 let response = req.send().await.map_err(|e| {
69 FaucetError::Source(format!(
70 "S3 list objects error for bucket '{}': {e}",
71 self.config.bucket
72 ))
73 })?;
74
75 for object in response.contents() {
76 let key: &str = object.key().unwrap_or_default();
77 if key.is_empty() {
78 continue;
79 }
80 keys.push(key.to_string());
81
82 if let Some(max) = self.config.max_objects
83 && keys.len() >= max
84 {
85 return Ok(keys);
86 }
87 }
88
89 if response.is_truncated() == Some(true) {
90 continuation_token = response.next_continuation_token().map(String::from);
91 } else {
92 break;
93 }
94 }
95
96 Ok(keys)
97 }
98
99 async fn read_object(&self, key: &str) -> Result<Vec<Value>, FaucetError> {
101 let text = self.read_object_text(key).await?;
102 self.parse_content(key, &text)
103 }
104
105 async fn read_object_text(&self, key: &str) -> Result<String, FaucetError> {
113 use tokio::io::AsyncReadExt as _;
114 let mut reader = self.open_object_reader(key).await?;
115 let mut text = String::new();
116 reader.read_to_string(&mut text).await.map_err(|e| {
117 FaucetError::Source(format!(
118 "S3 read/decode error for key '{key}' (not valid UTF-8?): {e}"
119 ))
120 })?;
121 Ok(text)
122 }
123
124 async fn open_object_reader(
129 &self,
130 key: &str,
131 ) -> Result<std::pin::Pin<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>>, FaucetError> {
132 let response = self
133 .client
134 .get_object()
135 .bucket(&self.config.bucket)
136 .key(key)
137 .send()
138 .await
139 .map_err(|e| {
140 FaucetError::Source(format!("S3 get object error for key '{key}': {e}"))
141 })?;
142
143 let buffered = tokio::io::BufReader::new(response.body.into_async_read());
146 #[cfg(feature = "compression")]
147 {
148 let codec = self.config.compression.resolve(key);
149 faucet_core::compression::warn_mismatch(key, codec);
150 Ok(faucet_core::compression::wrap_async_reader(buffered, codec))
151 }
152 #[cfg(not(feature = "compression"))]
153 {
154 Ok(Box::pin(buffered))
155 }
156 }
157
158 fn parse_content(&self, key: &str, text: &str) -> Result<Vec<Value>, FaucetError> {
160 match self.config.file_format {
161 S3FileFormat::JsonLines => {
162 let mut records = Vec::new();
163 for (line_num, line) in text.lines().enumerate() {
164 let trimmed = line.trim();
165 if trimmed.is_empty() {
166 continue;
167 }
168 let value: Value = serde_json::from_str(trimmed).map_err(|e| {
169 FaucetError::Source(format!(
170 "S3 JSON parse error in '{key}' at line {}: {e}",
171 line_num + 1
172 ))
173 })?;
174 records.push(value);
175 }
176 Ok(records)
177 }
178 S3FileFormat::JsonArray => {
179 let value: Value = serde_json::from_str(text).map_err(|e| {
180 FaucetError::Source(format!("S3 JSON parse error in '{key}': {e}"))
181 })?;
182 match value {
183 Value::Array(arr) => Ok(arr),
184 _ => Err(FaucetError::Source(format!(
185 "S3 expected JSON array in '{key}', got {}",
186 value_type_name(&value)
187 ))),
188 }
189 }
190 S3FileFormat::RawText => {
191 let record = serde_json::json!({
192 "key": key,
193 "content": text,
194 });
195 Ok(vec![record])
196 }
197 }
198 }
199}
200
201#[async_trait]
202impl faucet_core::Source for S3Source {
203 async fn fetch_with_context(
204 &self,
205 context: &std::collections::HashMap<String, serde_json::Value>,
206 ) -> Result<Vec<Value>, FaucetError> {
207 let substituted_prefix: Option<String> = if !context.is_empty() {
209 self.config
210 .prefix
211 .as_ref()
212 .map(|p| faucet_core::util::substitute_context(p, context))
213 } else {
214 None
215 };
216
217 let keys = self.list_object_keys(substituted_prefix.as_deref()).await?;
218
219 tracing::info!(
220 bucket = %self.config.bucket,
221 objects = keys.len(),
222 "Listed S3 objects"
223 );
224
225 let concurrency = self.config.concurrency.max(1);
226
227 let results: Vec<Vec<Value>> = stream::iter(keys)
228 .map(|key| async move {
229 let records = self.read_object(&key).await?;
230 tracing::debug!(key = %key, records = records.len(), "Read S3 object");
231 Ok::<Vec<Value>, FaucetError>(records)
232 })
233 .buffer_unordered(concurrency)
234 .try_collect()
235 .await?;
236
237 let all_records: Vec<Value> = results.into_iter().flatten().collect();
238
239 tracing::info!(total_records = all_records.len(), "S3 fetch complete");
240 Ok(all_records)
241 }
242
243 fn stream_pages<'a>(
269 &'a self,
270 context: &'a std::collections::HashMap<String, Value>,
271 _batch_size: usize,
272 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
273 let batch_size = self.config.batch_size;
274
275 Box::pin(async_stream::try_stream! {
276 let substituted_prefix: Option<String> = if !context.is_empty() {
278 self.config
279 .prefix
280 .as_ref()
281 .map(|p| faucet_core::util::substitute_context(p, context))
282 } else {
283 None
284 };
285
286 let keys = self.list_object_keys(substituted_prefix.as_deref()).await?;
287 tracing::info!(
288 bucket = %self.config.bucket,
289 objects = keys.len(),
290 "Listed S3 objects (stream)",
291 );
292
293 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
294 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
295 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
296 let mut total = 0usize;
297
298 for key in &keys {
299 match self.config.file_format {
300 S3FileFormat::JsonLines => {
301 let reader = self.open_object_reader(key).await?;
302 let mut lines = reader.lines();
303 let mut line_num: usize = 0;
304 while let Some(line) = lines
305 .next_line()
306 .await
307 .map_err(|e| FaucetError::Source(format!(
308 "S3 read body error for key '{key}': {e}"
309 )))?
310 {
311 line_num += 1;
312 let trimmed = line.trim();
313 if trimmed.is_empty() {
314 continue;
315 }
316 let value: Value =
317 serde_json::from_str(trimmed).map_err(|e| {
318 FaucetError::Source(format!(
319 "S3 JSON parse error in '{key}' at line {line_num}: {e}",
320 ))
321 })?;
322 buffer.push(value);
323 if batch_size != 0 && buffer.len() >= chunk {
324 let page = std::mem::replace(
325 &mut buffer,
326 Vec::with_capacity(initial_capacity),
327 );
328 total += page.len();
329 yield StreamPage { records: page, bookmark: None };
330 }
331 }
332 if batch_size == 0 && !buffer.is_empty() {
333 let page = std::mem::take(&mut buffer);
334 total += page.len();
335 yield StreamPage { records: page, bookmark: None };
336 }
337 }
338 S3FileFormat::RawText => {
339 let text = self.read_object_text(key).await?;
344 let record = serde_json::json!({
345 "key": key,
346 "content": text,
347 });
348 buffer.push(record);
349 if batch_size == 0 {
350 let page = std::mem::take(&mut buffer);
351 total += page.len();
352 yield StreamPage { records: page, bookmark: None };
353 } else if buffer.len() >= chunk {
354 let page = std::mem::replace(
355 &mut buffer,
356 Vec::with_capacity(initial_capacity),
357 );
358 total += page.len();
359 yield StreamPage { records: page, bookmark: None };
360 }
361 }
362 S3FileFormat::JsonArray => {
363 let text = self.read_object_text(key).await?;
369 let value: Value = serde_json::from_str(&text).map_err(|e| {
370 FaucetError::Source(format!("S3 JSON parse error in '{key}': {e}"))
371 })?;
372 let array = match value {
373 Value::Array(arr) => arr,
374 other => Err(FaucetError::Source(format!(
375 "S3 expected JSON array in '{key}', got {}",
376 value_type_name(&other)
377 )))?,
378 };
379 if batch_size == 0 {
380 if !buffer.is_empty() {
385 let page = std::mem::take(&mut buffer);
386 total += page.len();
387 yield StreamPage { records: page, bookmark: None };
388 }
389 total += array.len();
390 yield StreamPage { records: array, bookmark: None };
391 } else {
392 for record in array {
393 buffer.push(record);
394 if buffer.len() >= chunk {
395 let page = std::mem::replace(
396 &mut buffer,
397 Vec::with_capacity(initial_capacity),
398 );
399 total += page.len();
400 yield StreamPage { records: page, bookmark: None };
401 }
402 }
403 }
404 }
405 }
406 }
407
408 if !buffer.is_empty() {
409 let page = std::mem::take(&mut buffer);
410 total += page.len();
411 yield StreamPage { records: page, bookmark: None };
412 }
413
414 tracing::info!(
415 total_records = total,
416 batch_size,
417 objects = keys.len(),
418 "S3 source stream complete",
419 );
420 })
421 }
422
423 fn config_schema(&self) -> serde_json::Value {
424 serde_json::to_value(faucet_core::schema_for!(S3SourceConfig))
425 .expect("schema serialization")
426 }
427}
428
429fn value_type_name(v: &Value) -> &'static str {
431 match v {
432 Value::Null => "null",
433 Value::Bool(_) => "boolean",
434 Value::Number(_) => "number",
435 Value::String(_) => "string",
436 Value::Array(_) => "array",
437 Value::Object(_) => "object",
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use crate::config::S3SourceConfig;
445 use serde_json::json;
446
447 fn test_source(config: S3SourceConfig) -> S3Source {
451 let sdk_config = aws_config::SdkConfig::builder()
453 .behavior_version(aws_config::BehaviorVersion::latest())
454 .build();
455 let client = Client::new(&sdk_config);
456 S3Source { config, client }
457 }
458
459 #[test]
460 fn parse_json_lines() {
461 let source = test_source(S3SourceConfig::new("test"));
462 let text = r#"{"id":1,"name":"Alice"}
463{"id":2,"name":"Bob"}
464"#;
465 let records = source.parse_content("test.jsonl", text).unwrap();
466 assert_eq!(records.len(), 2);
467 assert_eq!(records[0]["id"], 1);
468 assert_eq!(records[1]["name"], "Bob");
469 }
470
471 #[test]
472 fn parse_json_lines_skips_empty() {
473 let source = test_source(S3SourceConfig::new("test"));
474 let text = r#"{"id":1}
475
476{"id":2}
477
478"#;
479 let records = source.parse_content("test.jsonl", text).unwrap();
480 assert_eq!(records.len(), 2);
481 }
482
483 #[test]
484 fn parse_json_lines_invalid() {
485 let source = test_source(S3SourceConfig::new("test"));
486 let text = "not json\n";
487 let result = source.parse_content("test.jsonl", text);
488 assert!(result.is_err());
489 let err = result.unwrap_err().to_string();
490 assert!(err.contains("JSON parse error"));
491 assert!(err.contains("line 1"));
492 }
493
494 #[test]
495 fn parse_json_array() {
496 let source = test_source(S3SourceConfig::new("test").file_format(S3FileFormat::JsonArray));
497 let text = r#"[{"id":1},{"id":2}]"#;
498 let records = source.parse_content("test.json", text).unwrap();
499 assert_eq!(records.len(), 2);
500 assert_eq!(records[0]["id"], 1);
501 }
502
503 #[test]
504 fn parse_json_array_not_array() {
505 let source = test_source(S3SourceConfig::new("test").file_format(S3FileFormat::JsonArray));
506 let text = r#"{"id":1}"#;
507 let result = source.parse_content("test.json", text);
508 assert!(result.is_err());
509 let err = result.unwrap_err().to_string();
510 assert!(err.contains("expected JSON array"));
511 }
512
513 #[test]
514 fn parse_raw_text() {
515 let source = test_source(S3SourceConfig::new("test").file_format(S3FileFormat::RawText));
516 let text = "hello world\nline two";
517 let records = source.parse_content("data/file.txt", text).unwrap();
518 assert_eq!(records.len(), 1);
519 assert_eq!(
520 records[0],
521 json!({"key": "data/file.txt", "content": "hello world\nline two"})
522 );
523 }
524
525 #[cfg(feature = "compression")]
526 #[test]
527 fn compression_default_is_auto() {
528 let cfg = S3SourceConfig::new("bucket");
529 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
530 }
531}