Skip to main content

faucet_sink_s3/
config.rs

1//! S3 sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Configuration for the S3 sink connector.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct S3SinkConfig {
10    /// S3 bucket name.
11    pub bucket: String,
12    /// Key prefix for written objects.
13    pub prefix: String,
14    /// AWS region. `None` uses the SDK default.
15    pub region: Option<String>,
16    /// Custom endpoint URL for S3-compatible services (e.g. MinIO).
17    pub endpoint_url: Option<String>,
18    /// File extension for written objects (default: `.jsonl`).
19    pub file_extension: String,
20    /// Maximum records per file. `None` removes the per-file record cap — but
21    /// the sink still writes **one object per `write_batch` call** (i.e. one per
22    /// upstream page), and `batch_size` may chunk a call further; it does not
23    /// coalesce a streaming run into a single object.
24    pub max_records_per_file: Option<usize>,
25    /// Maximum number of concurrent file uploads (default: 10).
26    pub concurrency: usize,
27    /// Records per S3 object written by a single
28    /// [`Sink::write_batch`](faucet_core::Sink::write_batch) call. When a call
29    /// hands the sink `N` records with `batch_size = M > 0`, the sink writes
30    /// `ceil(N / M)` objects, each containing at most `M` records (the final
31    /// object holds the remainder). Defaults to [`DEFAULT_BATCH_SIZE`].
32    ///
33    /// `batch_size = 0` is the "no batching" sentinel: the sink writes
34    /// whatever upstream hands it without re-chunking (still honouring
35    /// `max_records_per_file` if set). Recommended for S3 — most callers
36    /// should leave this at `0` and let the source's `batch_size` drive
37    /// object sizing, because many tiny S3 objects are a well-known
38    /// anti-pattern (per-request overhead, slower downstream reads,
39    /// LIST/PUT cost).
40    ///
41    /// When both `batch_size > 0` and `max_records_per_file` are set, the
42    /// effective per-object cap is `min(batch_size, max_records_per_file)`.
43    #[serde(default = "default_batch_size")]
44    pub batch_size: usize,
45    /// Compression codec applied to each uploaded object body. Defaults to
46    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
47    /// resolves against `file_extension` (so `.jsonl.gz` triggers gzip).
48    /// Requires the crate-local `compression` feature. Note: this sink does
49    /// **not** set the S3 `Content-Encoding` header, so consumers must
50    /// decompress explicitly.
51    #[cfg(feature = "compression")]
52    #[serde(default)]
53    pub compression: faucet_core::CompressionConfig,
54}
55
56fn default_batch_size() -> usize {
57    DEFAULT_BATCH_SIZE
58}
59
60impl S3SinkConfig {
61    /// Create a new config with the required bucket name and sensible defaults.
62    pub fn new(bucket: impl Into<String>) -> Self {
63        Self {
64            bucket: bucket.into(),
65            prefix: String::new(),
66            region: None,
67            endpoint_url: None,
68            file_extension: ".jsonl".to_string(),
69            max_records_per_file: None,
70            concurrency: 10,
71            batch_size: DEFAULT_BATCH_SIZE,
72            #[cfg(feature = "compression")]
73            compression: faucet_core::CompressionConfig::Auto,
74        }
75    }
76
77    /// Set the key prefix for written objects.
78    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
79        self.prefix = prefix.into();
80        self
81    }
82
83    /// Set the AWS region.
84    pub fn region(mut self, region: impl Into<String>) -> Self {
85        self.region = Some(region.into());
86        self
87    }
88
89    /// Set a custom endpoint URL for S3-compatible services.
90    pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
91        self.endpoint_url = Some(url.into());
92        self
93    }
94
95    /// Set the file extension for written objects.
96    pub fn file_extension(mut self, ext: impl Into<String>) -> Self {
97        self.file_extension = ext.into();
98        self
99    }
100
101    /// Set the maximum number of records per file.
102    pub fn max_records_per_file(mut self, max: usize) -> Self {
103        self.max_records_per_file = Some(max);
104        self
105    }
106
107    /// Set the maximum number of concurrent file uploads.
108    pub fn concurrency(mut self, concurrency: usize) -> Self {
109        self.concurrency = concurrency;
110        self
111    }
112
113    /// Set the per-object record count for
114    /// [`Sink::write_batch`](faucet_core::Sink::write_batch).
115    ///
116    /// Pass `0` to opt out of write-side re-chunking — the sink writes
117    /// whatever upstream hands it as a single object (still honouring
118    /// `max_records_per_file` if set). `0` is the recommended value for S3
119    /// because writing many small objects is an anti-pattern.
120    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
121        self.batch_size = batch_size;
122        self
123    }
124
125    /// Set the compression codec. Available only with the `compression` feature.
126    #[cfg(feature = "compression")]
127    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
128        self.compression = c;
129        self
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn default_config() {
139        let config = S3SinkConfig::new("my-bucket");
140        assert_eq!(config.bucket, "my-bucket");
141        assert_eq!(config.prefix, "");
142        assert!(config.region.is_none());
143        assert!(config.endpoint_url.is_none());
144        assert_eq!(config.file_extension, ".jsonl");
145        assert!(config.max_records_per_file.is_none());
146    }
147
148    #[test]
149    fn builder_methods() {
150        let config = S3SinkConfig::new("my-bucket")
151            .prefix("output/")
152            .region("eu-west-1")
153            .endpoint_url("http://localhost:9000")
154            .file_extension(".json")
155            .max_records_per_file(1000);
156
157        assert_eq!(config.bucket, "my-bucket");
158        assert_eq!(config.prefix, "output/");
159        assert_eq!(config.region.as_deref(), Some("eu-west-1"));
160        assert_eq!(
161            config.endpoint_url.as_deref(),
162            Some("http://localhost:9000")
163        );
164        assert_eq!(config.file_extension, ".json");
165        assert_eq!(config.max_records_per_file, Some(1000));
166    }
167
168    #[test]
169    fn batch_size_defaults_to_default_batch_size() {
170        let config = S3SinkConfig::new("my-bucket");
171        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
172    }
173
174    #[test]
175    fn with_batch_size_overrides_default() {
176        let config = S3SinkConfig::new("my-bucket").with_batch_size(500);
177        assert_eq!(config.batch_size, 500);
178    }
179
180    #[test]
181    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
182        let config = S3SinkConfig::new("my-bucket").with_batch_size(0);
183        assert_eq!(config.batch_size, 0);
184        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
185    }
186
187    #[test]
188    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
189        let config =
190            S3SinkConfig::new("my-bucket").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
191        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
192    }
193
194    #[test]
195    fn batch_size_deserializes_from_json() {
196        let json = r#"{
197            "bucket": "my-bucket",
198            "prefix": "",
199            "region": null,
200            "endpoint_url": null,
201            "file_extension": ".jsonl",
202            "max_records_per_file": null,
203            "concurrency": 10,
204            "batch_size": 250
205        }"#;
206        let config: S3SinkConfig = serde_json::from_str(json).unwrap();
207        assert_eq!(config.batch_size, 250);
208    }
209
210    #[test]
211    fn batch_size_defaults_when_omitted_from_json() {
212        let json = r#"{
213            "bucket": "my-bucket",
214            "prefix": "",
215            "region": null,
216            "endpoint_url": null,
217            "file_extension": ".jsonl",
218            "max_records_per_file": null,
219            "concurrency": 10
220        }"#;
221        let config: S3SinkConfig = serde_json::from_str(json).unwrap();
222        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
223    }
224
225    #[cfg(feature = "compression")]
226    #[test]
227    fn compression_config_round_trips() {
228        let json = r#"{
229            "bucket": "b",
230            "prefix": "",
231            "region": null,
232            "endpoint_url": null,
233            "file_extension": ".jsonl.gz",
234            "max_records_per_file": null,
235            "concurrency": 1,
236            "batch_size": 0,
237            "compression": "gzip"
238        }"#;
239        let config: S3SinkConfig = serde_json::from_str(json).unwrap();
240        assert_eq!(config.compression, faucet_core::CompressionConfig::Gzip);
241    }
242
243    #[cfg(feature = "compression")]
244    #[test]
245    fn compression_default_is_auto() {
246        let cfg = S3SinkConfig::new("bucket");
247        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
248    }
249}