Skip to main content

faucet_source_s3/
stream.rs

1//! S3 source stream executor.
2
3use crate::config::{S3FileFormat, S3SourceConfig};
4use async_trait::async_trait;
5use aws_sdk_s3::Client;
6use faucet_core::{FaucetError, Stream, StreamPage};
7use futures::stream::{self, StreamExt, TryStreamExt};
8use serde_json::Value;
9use std::pin::Pin;
10use tokio::io::AsyncBufReadExt;
11
12/// An S3 source that lists and reads objects from a bucket.
13pub struct S3Source {
14    config: S3SourceConfig,
15    client: Client,
16}
17
18impl S3Source {
19    /// Create a new S3 source from the given configuration.
20    ///
21    /// Builds the S3 client eagerly so it is reused across calls.
22    pub async fn new(config: S3SourceConfig) -> Result<Self, FaucetError> {
23        let client = Self::build_client(&config).await?;
24        Ok(Self { config, client })
25    }
26
27    /// Build an S3 client from the configuration.
28    async fn build_client(config: &S3SourceConfig) -> Result<Client, FaucetError> {
29        let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
30
31        if let Some(ref region) = config.region {
32            config_loader = config_loader.region(aws_config::Region::new(region.clone()));
33        }
34
35        if let Some(ref endpoint) = config.endpoint_url {
36            config_loader = config_loader.endpoint_url(endpoint);
37        }
38
39        let sdk_config = config_loader.load().await;
40        let client = Client::new(&sdk_config);
41        Ok(client)
42    }
43
44    /// List object keys matching the configured bucket and prefix.
45    ///
46    /// When `prefix_override` is `Some`, it is used instead of `self.config.prefix`
47    /// (used for parent-context substitution).
48    async fn list_object_keys(
49        &self,
50        prefix_override: Option<&str>,
51    ) -> Result<Vec<String>, FaucetError> {
52        let mut keys = Vec::new();
53        let mut continuation_token: Option<String> = None;
54
55        let effective_prefix = prefix_override.or(self.config.prefix.as_deref());
56
57        loop {
58            let mut req = self.client.list_objects_v2().bucket(&self.config.bucket);
59
60            if let Some(prefix) = effective_prefix {
61                req = req.prefix(prefix);
62            }
63
64            if let Some(ref token) = continuation_token {
65                req = req.continuation_token(token);
66            }
67
68            let response = req.send().await.map_err(|e| {
69                FaucetError::Source(format!(
70                    "S3 list objects error for bucket '{}': {e}",
71                    self.config.bucket
72                ))
73            })?;
74
75            for object in response.contents() {
76                let key: &str = object.key().unwrap_or_default();
77                if key.is_empty() {
78                    continue;
79                }
80                keys.push(key.to_string());
81
82                if let Some(max) = self.config.max_objects
83                    && keys.len() >= max
84                {
85                    return Ok(keys);
86                }
87            }
88
89            if response.is_truncated() == Some(true) {
90                continuation_token = response.next_continuation_token().map(String::from);
91            } else {
92                break;
93            }
94        }
95
96        Ok(keys)
97    }
98
99    /// Read and parse a single S3 object into records.
100    async fn read_object(&self, key: &str) -> Result<Vec<Value>, FaucetError> {
101        let text = self.read_object_text(key).await?;
102        self.parse_content(key, &text)
103    }
104
105    /// Read the full body of a single S3 object into a UTF-8 `String`.
106    ///
107    /// Streams the (optionally decompressed) body straight into one `String`
108    /// via [`open_object_reader`](Self::open_object_reader) rather than
109    /// buffering the raw bytes AND the decompressed bytes AND the `String`
110    /// at once (#78/#25). The whole object is still one unit for
111    /// `JsonArray` / `RawText`, but peak memory is now ~1× the decoded size.
112    async fn read_object_text(&self, key: &str) -> Result<String, FaucetError> {
113        use tokio::io::AsyncReadExt as _;
114        let mut reader = self.open_object_reader(key).await?;
115        let mut text = String::new();
116        reader.read_to_string(&mut text).await.map_err(|e| {
117            FaucetError::Source(format!(
118                "S3 read/decode error for key '{key}' (not valid UTF-8?): {e}"
119            ))
120        })?;
121        Ok(text)
122    }
123
124    /// Open an S3 object as an [`AsyncBufRead`](tokio::io::AsyncBufRead) over
125    /// its body. Used by [`Source::stream_pages`](faucet_core::Source::stream_pages)
126    /// to decode `JsonLines` objects line-by-line without buffering the
127    /// whole file.
128    async fn open_object_reader(
129        &self,
130        key: &str,
131    ) -> Result<std::pin::Pin<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>>, FaucetError> {
132        let response = self
133            .client
134            .get_object()
135            .bucket(&self.config.bucket)
136            .key(key)
137            .send()
138            .await
139            .map_err(|e| {
140                FaucetError::Source(format!("S3 get object error for key '{key}': {e}"))
141            })?;
142
143        // `ByteStream::into_async_read` returns `impl AsyncRead`; wrap in a
144        // `BufReader` so `.lines()` is usable and ownership is `Unpin`.
145        let buffered = tokio::io::BufReader::new(response.body.into_async_read());
146        #[cfg(feature = "compression")]
147        {
148            let codec = self.config.compression.resolve(key);
149            faucet_core::compression::warn_mismatch(key, codec);
150            Ok(faucet_core::compression::wrap_async_reader(buffered, codec))
151        }
152        #[cfg(not(feature = "compression"))]
153        {
154            Ok(Box::pin(buffered))
155        }
156    }
157
158    /// Parse file content into records based on the configured file format.
159    fn parse_content(&self, key: &str, text: &str) -> Result<Vec<Value>, FaucetError> {
160        match self.config.file_format {
161            S3FileFormat::JsonLines => {
162                let mut records = Vec::new();
163                for (line_num, line) in text.lines().enumerate() {
164                    let trimmed = line.trim();
165                    if trimmed.is_empty() {
166                        continue;
167                    }
168                    let value: Value = serde_json::from_str(trimmed).map_err(|e| {
169                        FaucetError::Source(format!(
170                            "S3 JSON parse error in '{key}' at line {}: {e}",
171                            line_num + 1
172                        ))
173                    })?;
174                    records.push(value);
175                }
176                Ok(records)
177            }
178            S3FileFormat::JsonArray => {
179                let value: Value = serde_json::from_str(text).map_err(|e| {
180                    FaucetError::Source(format!("S3 JSON parse error in '{key}': {e}"))
181                })?;
182                match value {
183                    Value::Array(arr) => Ok(arr),
184                    _ => Err(FaucetError::Source(format!(
185                        "S3 expected JSON array in '{key}', got {}",
186                        value_type_name(&value)
187                    ))),
188                }
189            }
190            S3FileFormat::RawText => {
191                let record = serde_json::json!({
192                    "key": key,
193                    "content": text,
194                });
195                Ok(vec![record])
196            }
197        }
198    }
199}
200
201#[async_trait]
202impl faucet_core::Source for S3Source {
203    async fn fetch_with_context(
204        &self,
205        context: &std::collections::HashMap<String, serde_json::Value>,
206    ) -> Result<Vec<Value>, FaucetError> {
207        // Substitute context into prefix when parent context is provided.
208        let substituted_prefix: Option<String> = if !context.is_empty() {
209            self.config
210                .prefix
211                .as_ref()
212                .map(|p| faucet_core::util::substitute_context(p, context))
213        } else {
214            None
215        };
216
217        let keys = self.list_object_keys(substituted_prefix.as_deref()).await?;
218
219        tracing::info!(
220            bucket = %self.config.bucket,
221            objects = keys.len(),
222            "Listed S3 objects"
223        );
224
225        let concurrency = self.config.concurrency.max(1);
226
227        let results: Vec<Vec<Value>> = stream::iter(keys)
228            .map(|key| async move {
229                let records = self.read_object(&key).await?;
230                tracing::debug!(key = %key, records = records.len(), "Read S3 object");
231                Ok::<Vec<Value>, FaucetError>(records)
232            })
233            .buffer_unordered(concurrency)
234            .try_collect()
235            .await?;
236
237        let all_records: Vec<Value> = results.into_iter().flatten().collect();
238
239        tracing::info!(total_records = all_records.len(), "S3 fetch complete");
240        Ok(all_records)
241    }
242
243    /// Stream records from listed S3 objects without buffering the full
244    /// scan. Each emitted [`StreamPage`] holds up to
245    /// [`S3SourceConfig::batch_size`] records.
246    ///
247    /// The trait-level `batch_size` argument is ignored in favour of the
248    /// config field — the config is the user-facing knob the README
249    /// documents, and routing the pipeline-supplied hint through it would
250    /// silently override an explicit config value.
251    ///
252    /// Behaviour by format:
253    ///
254    /// - `JsonLines` / `RawText`: the object body is decoded line-by-line
255    ///   via [`tokio::io::AsyncBufReadExt::lines`] so client-side memory is
256    ///   bounded at `O(batch_size)` per object. Multi-object scans are
257    ///   flattened — a single page may carry lines drawn from any object.
258    /// - `JsonArray`: each object is buffered fully (the JSON value can
259    ///   only be parsed once the array is complete) and then its records
260    ///   are chunked into pages of `batch_size`. See the README "Streaming
261    ///   and batching" section for the caveat.
262    ///
263    /// `batch_size = 0` is the "no batching" sentinel: one [`StreamPage`]
264    /// is emitted per S3 object (no within-object chunking and no
265    /// cross-object accumulation). The S3 source has no
266    /// incremental-replication mode today, so every emitted page carries
267    /// `bookmark: None`.
268    fn stream_pages<'a>(
269        &'a self,
270        context: &'a std::collections::HashMap<String, Value>,
271        _batch_size: usize,
272    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
273        let batch_size = self.config.batch_size;
274
275        Box::pin(async_stream::try_stream! {
276            // Substitute context into prefix when parent context is provided.
277            let substituted_prefix: Option<String> = if !context.is_empty() {
278                self.config
279                    .prefix
280                    .as_ref()
281                    .map(|p| faucet_core::util::substitute_context(p, context))
282            } else {
283                None
284            };
285
286            let keys = self.list_object_keys(substituted_prefix.as_deref()).await?;
287            tracing::info!(
288                bucket = %self.config.bucket,
289                objects = keys.len(),
290                "Listed S3 objects (stream)",
291            );
292
293            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
294            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
295            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
296            let mut total = 0usize;
297
298            for key in &keys {
299                match self.config.file_format {
300                    S3FileFormat::JsonLines => {
301                        let reader = self.open_object_reader(key).await?;
302                        let mut lines = reader.lines();
303                        let mut line_num: usize = 0;
304                        while let Some(line) = lines
305                            .next_line()
306                            .await
307                            .map_err(|e| FaucetError::Source(format!(
308                                "S3 read body error for key '{key}': {e}"
309                            )))?
310                        {
311                            line_num += 1;
312                            let trimmed = line.trim();
313                            if trimmed.is_empty() {
314                                continue;
315                            }
316                            let value: Value =
317                                serde_json::from_str(trimmed).map_err(|e| {
318                                    FaucetError::Source(format!(
319                                        "S3 JSON parse error in '{key}' at line {line_num}: {e}",
320                                    ))
321                                })?;
322                            buffer.push(value);
323                            if batch_size != 0 && buffer.len() >= chunk {
324                                let page = std::mem::replace(
325                                    &mut buffer,
326                                    Vec::with_capacity(initial_capacity),
327                                );
328                                total += page.len();
329                                yield StreamPage { records: page, bookmark: None };
330                            }
331                        }
332                        if batch_size == 0 && !buffer.is_empty() {
333                            let page = std::mem::take(&mut buffer);
334                            total += page.len();
335                            yield StreamPage { records: page, bookmark: None };
336                        }
337                    }
338                    S3FileFormat::RawText => {
339                        // RawText emits a single record per object; the
340                        // `key` + `content` shape is unchanged so we
341                        // continue to buffer the body fully. This still
342                        // streams *across* objects.
343                        let text = self.read_object_text(key).await?;
344                        let record = serde_json::json!({
345                            "key": key,
346                            "content": text,
347                        });
348                        buffer.push(record);
349                        if batch_size == 0 {
350                            let page = std::mem::take(&mut buffer);
351                            total += page.len();
352                            yield StreamPage { records: page, bookmark: None };
353                        } else if buffer.len() >= chunk {
354                            let page = std::mem::replace(
355                                &mut buffer,
356                                Vec::with_capacity(initial_capacity),
357                            );
358                            total += page.len();
359                            yield StreamPage { records: page, bookmark: None };
360                        }
361                    }
362                    S3FileFormat::JsonArray => {
363                        // JSON-array files cannot be parsed incrementally
364                        // (the closing `]` is required to validate the
365                        // structure), so each object is buffered fully and
366                        // then chunked. The caveat is documented in the
367                        // crate README.
368                        let text = self.read_object_text(key).await?;
369                        let value: Value = serde_json::from_str(&text).map_err(|e| {
370                            FaucetError::Source(format!("S3 JSON parse error in '{key}': {e}"))
371                        })?;
372                        let array = match value {
373                            Value::Array(arr) => arr,
374                            other => Err(FaucetError::Source(format!(
375                                "S3 expected JSON array in '{key}', got {}",
376                                value_type_name(&other)
377                            )))?,
378                        };
379                        if batch_size == 0 {
380                            // Flush any cross-object buffer first (none
381                            // here because each iteration completes its
382                            // own object — but keep symmetric with the
383                            // line-shaped branches).
384                            if !buffer.is_empty() {
385                                let page = std::mem::take(&mut buffer);
386                                total += page.len();
387                                yield StreamPage { records: page, bookmark: None };
388                            }
389                            total += array.len();
390                            yield StreamPage { records: array, bookmark: None };
391                        } else {
392                            for record in array {
393                                buffer.push(record);
394                                if buffer.len() >= chunk {
395                                    let page = std::mem::replace(
396                                        &mut buffer,
397                                        Vec::with_capacity(initial_capacity),
398                                    );
399                                    total += page.len();
400                                    yield StreamPage { records: page, bookmark: None };
401                                }
402                            }
403                        }
404                    }
405                }
406            }
407
408            if !buffer.is_empty() {
409                let page = std::mem::take(&mut buffer);
410                total += page.len();
411                yield StreamPage { records: page, bookmark: None };
412            }
413
414            tracing::info!(
415                total_records = total,
416                batch_size,
417                objects = keys.len(),
418                "S3 source stream complete",
419            );
420        })
421    }
422
423    fn config_schema(&self) -> serde_json::Value {
424        serde_json::to_value(faucet_core::schema_for!(S3SourceConfig))
425            .expect("schema serialization")
426    }
427}
428
429/// Return a human-readable name for a JSON value type.
430fn value_type_name(v: &Value) -> &'static str {
431    match v {
432        Value::Null => "null",
433        Value::Bool(_) => "boolean",
434        Value::Number(_) => "number",
435        Value::String(_) => "string",
436        Value::Array(_) => "array",
437        Value::Object(_) => "object",
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use crate::config::S3SourceConfig;
445    use serde_json::json;
446
447    /// Helper to build an S3Source synchronously for parse-only tests.
448    /// We construct it directly to avoid needing an async runtime for unit tests
449    /// that only exercise `parse_content`.
450    fn test_source(config: S3SourceConfig) -> S3Source {
451        // Build a dummy client — these tests never make network calls.
452        let sdk_config = aws_config::SdkConfig::builder()
453            .behavior_version(aws_config::BehaviorVersion::latest())
454            .build();
455        let client = Client::new(&sdk_config);
456        S3Source { config, client }
457    }
458
459    #[test]
460    fn parse_json_lines() {
461        let source = test_source(S3SourceConfig::new("test"));
462        let text = r#"{"id":1,"name":"Alice"}
463{"id":2,"name":"Bob"}
464"#;
465        let records = source.parse_content("test.jsonl", text).unwrap();
466        assert_eq!(records.len(), 2);
467        assert_eq!(records[0]["id"], 1);
468        assert_eq!(records[1]["name"], "Bob");
469    }
470
471    #[test]
472    fn parse_json_lines_skips_empty() {
473        let source = test_source(S3SourceConfig::new("test"));
474        let text = r#"{"id":1}
475
476{"id":2}
477
478"#;
479        let records = source.parse_content("test.jsonl", text).unwrap();
480        assert_eq!(records.len(), 2);
481    }
482
483    #[test]
484    fn parse_json_lines_invalid() {
485        let source = test_source(S3SourceConfig::new("test"));
486        let text = "not json\n";
487        let result = source.parse_content("test.jsonl", text);
488        assert!(result.is_err());
489        let err = result.unwrap_err().to_string();
490        assert!(err.contains("JSON parse error"));
491        assert!(err.contains("line 1"));
492    }
493
494    #[test]
495    fn parse_json_array() {
496        let source = test_source(S3SourceConfig::new("test").file_format(S3FileFormat::JsonArray));
497        let text = r#"[{"id":1},{"id":2}]"#;
498        let records = source.parse_content("test.json", text).unwrap();
499        assert_eq!(records.len(), 2);
500        assert_eq!(records[0]["id"], 1);
501    }
502
503    #[test]
504    fn parse_json_array_not_array() {
505        let source = test_source(S3SourceConfig::new("test").file_format(S3FileFormat::JsonArray));
506        let text = r#"{"id":1}"#;
507        let result = source.parse_content("test.json", text);
508        assert!(result.is_err());
509        let err = result.unwrap_err().to_string();
510        assert!(err.contains("expected JSON array"));
511    }
512
513    #[test]
514    fn parse_raw_text() {
515        let source = test_source(S3SourceConfig::new("test").file_format(S3FileFormat::RawText));
516        let text = "hello world\nline two";
517        let records = source.parse_content("data/file.txt", text).unwrap();
518        assert_eq!(records.len(), 1);
519        assert_eq!(
520            records[0],
521            json!({"key": "data/file.txt", "content": "hello world\nline two"})
522        );
523    }
524
525    #[cfg(feature = "compression")]
526    #[test]
527    fn compression_default_is_auto() {
528        let cfg = S3SourceConfig::new("bucket");
529        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
530    }
531}