Skip to main content

faucet_source_s3/
config.rs

1//! S3 source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Format of files stored in S3.
8#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum S3FileFormat {
11    /// Each line in the file is a separate JSON record.
12    #[default]
13    JsonLines,
14    /// The entire file is a JSON array of records.
15    JsonArray,
16    /// Each file becomes a single record with `"key"` and `"content"` fields.
17    RawText,
18}
19
20/// Configuration for the S3 source connector.
21#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
22pub struct S3SourceConfig {
23    /// S3 bucket name.
24    pub bucket: String,
25    /// Object key prefix filter.
26    pub prefix: Option<String>,
27    /// AWS region. `None` uses the SDK default.
28    pub region: Option<String>,
29    /// Custom endpoint URL for S3-compatible services (e.g. MinIO).
30    pub endpoint_url: Option<String>,
31    /// Format of the files to read.
32    pub file_format: S3FileFormat,
33    /// Maximum number of objects to read.
34    pub max_objects: Option<usize>,
35    /// Maximum number of concurrent object reads (default: 10).
36    pub concurrency: usize,
37    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). For
38    /// `JsonLines` and `RawText` formats, the object body is decoded
39    /// line-by-line via [`tokio::io::AsyncBufReadExt`] and a page is yielded
40    /// whenever the buffer reaches this size; multi-object scans flatten so
41    /// a single page may contain lines from any object. For `JsonArray`,
42    /// each object is buffered fully before its records are chunked into
43    /// pages of this size (see the README "Streaming and batching" section
44    /// for the caveat). Defaults to [`DEFAULT_BATCH_SIZE`].
45    ///
46    /// `batch_size = 0` is the "no batching" sentinel: every page is one
47    /// complete object — no within-object chunking. Useful for small
48    /// lookup files, or for sinks (e.g. SQL `COPY`, BigQuery load jobs)
49    /// that prefer one large request per file to many small ones.
50    #[serde(default = "default_batch_size")]
51    pub batch_size: usize,
52    /// Compression codec applied to each downloaded object. Defaults to
53    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
54    /// the codec is resolved per-object-key, so a single source can read a
55    /// mix of compressed and uncompressed objects. Requires the
56    /// crate-local `compression` feature.
57    #[cfg(feature = "compression")]
58    #[serde(default)]
59    pub compression: faucet_core::CompressionConfig,
60}
61
62fn default_batch_size() -> usize {
63    DEFAULT_BATCH_SIZE
64}
65
66impl S3SourceConfig {
67    /// Create a new config with the required bucket name and sensible defaults.
68    pub fn new(bucket: impl Into<String>) -> Self {
69        Self {
70            bucket: bucket.into(),
71            prefix: None,
72            region: None,
73            endpoint_url: None,
74            file_format: S3FileFormat::default(),
75            max_objects: None,
76            concurrency: 10,
77            batch_size: DEFAULT_BATCH_SIZE,
78            #[cfg(feature = "compression")]
79            compression: faucet_core::CompressionConfig::default(),
80        }
81    }
82
83    /// Set the object key prefix filter.
84    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
85        self.prefix = Some(prefix.into());
86        self
87    }
88
89    /// Set the AWS region.
90    pub fn region(mut self, region: impl Into<String>) -> Self {
91        self.region = Some(region.into());
92        self
93    }
94
95    /// Set a custom endpoint URL for S3-compatible services.
96    pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
97        self.endpoint_url = Some(url.into());
98        self
99    }
100
101    /// Set the file format.
102    pub fn file_format(mut self, format: S3FileFormat) -> Self {
103        self.file_format = format;
104        self
105    }
106
107    /// Set the maximum number of objects to read.
108    pub fn max_objects(mut self, max: usize) -> Self {
109        self.max_objects = Some(max);
110        self
111    }
112
113    /// Set the maximum number of concurrent object reads.
114    pub fn concurrency(mut self, concurrency: usize) -> Self {
115        self.concurrency = concurrency;
116        self
117    }
118
119    /// Set the per-page record count for [`Source::stream_pages`](faucet_core::Source::stream_pages).
120    ///
121    /// Pass `0` to opt out of within-object chunking — every emitted
122    /// [`StreamPage`](faucet_core::StreamPage) corresponds to exactly one
123    /// S3 object.
124    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
125        self.batch_size = batch_size;
126        self
127    }
128
129    /// Set the compression codec. Available only with the `compression` feature.
130    #[cfg(feature = "compression")]
131    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
132        self.compression = c;
133        self
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn default_config() {
143        let config = S3SourceConfig::new("my-bucket");
144        assert_eq!(config.bucket, "my-bucket");
145        assert!(config.prefix.is_none());
146        assert!(config.region.is_none());
147        assert!(config.endpoint_url.is_none());
148        assert!(matches!(config.file_format, S3FileFormat::JsonLines));
149        assert!(config.max_objects.is_none());
150    }
151
152    #[test]
153    fn builder_methods() {
154        let config = S3SourceConfig::new("my-bucket")
155            .prefix("data/")
156            .region("us-west-2")
157            .endpoint_url("http://localhost:9000")
158            .file_format(S3FileFormat::JsonArray)
159            .max_objects(10);
160
161        assert_eq!(config.bucket, "my-bucket");
162        assert_eq!(config.prefix.as_deref(), Some("data/"));
163        assert_eq!(config.region.as_deref(), Some("us-west-2"));
164        assert_eq!(
165            config.endpoint_url.as_deref(),
166            Some("http://localhost:9000")
167        );
168        assert!(matches!(config.file_format, S3FileFormat::JsonArray));
169        assert_eq!(config.max_objects, Some(10));
170    }
171
172    #[test]
173    fn file_format_default_is_json_lines() {
174        let format = S3FileFormat::default();
175        assert!(matches!(format, S3FileFormat::JsonLines));
176    }
177
178    #[test]
179    fn batch_size_defaults_to_default_batch_size() {
180        let config = S3SourceConfig::new("my-bucket");
181        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
182    }
183
184    #[test]
185    fn with_batch_size_overrides_default() {
186        let config = S3SourceConfig::new("my-bucket").with_batch_size(500);
187        assert_eq!(config.batch_size, 500);
188    }
189
190    #[test]
191    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
192        let config = S3SourceConfig::new("my-bucket").with_batch_size(0);
193        assert_eq!(config.batch_size, 0);
194        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
195    }
196
197    #[test]
198    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
199        let config =
200            S3SourceConfig::new("my-bucket").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
201        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
202    }
203
204    #[test]
205    fn batch_size_deserializes_from_json() {
206        let json = r#"{
207            "bucket": "my-bucket",
208            "prefix": null,
209            "region": null,
210            "endpoint_url": null,
211            "file_format": "json_lines",
212            "max_objects": null,
213            "concurrency": 10,
214            "batch_size": 250
215        }"#;
216        let config: S3SourceConfig = serde_json::from_str(json).unwrap();
217        assert_eq!(config.batch_size, 250);
218    }
219
220    #[cfg(feature = "compression")]
221    #[test]
222    fn compression_default_is_auto() {
223        let cfg = S3SourceConfig::new("bucket");
224        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
225    }
226}