1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct S3SinkConfig {
10 pub bucket: String,
12 pub prefix: String,
14 pub region: Option<String>,
16 pub endpoint_url: Option<String>,
18 pub file_extension: String,
20 pub max_records_per_file: Option<usize>,
25 pub concurrency: usize,
27 #[serde(default = "default_batch_size")]
44 pub batch_size: usize,
45 #[cfg(feature = "compression")]
52 #[serde(default)]
53 pub compression: faucet_core::CompressionConfig,
54}
55
56fn default_batch_size() -> usize {
57 DEFAULT_BATCH_SIZE
58}
59
60impl S3SinkConfig {
61 pub fn new(bucket: impl Into<String>) -> Self {
63 Self {
64 bucket: bucket.into(),
65 prefix: String::new(),
66 region: None,
67 endpoint_url: None,
68 file_extension: ".jsonl".to_string(),
69 max_records_per_file: None,
70 concurrency: 10,
71 batch_size: DEFAULT_BATCH_SIZE,
72 #[cfg(feature = "compression")]
73 compression: faucet_core::CompressionConfig::Auto,
74 }
75 }
76
77 pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
79 self.prefix = prefix.into();
80 self
81 }
82
83 pub fn region(mut self, region: impl Into<String>) -> Self {
85 self.region = Some(region.into());
86 self
87 }
88
89 pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
91 self.endpoint_url = Some(url.into());
92 self
93 }
94
95 pub fn file_extension(mut self, ext: impl Into<String>) -> Self {
97 self.file_extension = ext.into();
98 self
99 }
100
101 pub fn max_records_per_file(mut self, max: usize) -> Self {
103 self.max_records_per_file = Some(max);
104 self
105 }
106
107 pub fn concurrency(mut self, concurrency: usize) -> Self {
109 self.concurrency = concurrency;
110 self
111 }
112
113 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
121 self.batch_size = batch_size;
122 self
123 }
124
125 #[cfg(feature = "compression")]
127 pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
128 self.compression = c;
129 self
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn default_config() {
139 let config = S3SinkConfig::new("my-bucket");
140 assert_eq!(config.bucket, "my-bucket");
141 assert_eq!(config.prefix, "");
142 assert!(config.region.is_none());
143 assert!(config.endpoint_url.is_none());
144 assert_eq!(config.file_extension, ".jsonl");
145 assert!(config.max_records_per_file.is_none());
146 }
147
148 #[test]
149 fn builder_methods() {
150 let config = S3SinkConfig::new("my-bucket")
151 .prefix("output/")
152 .region("eu-west-1")
153 .endpoint_url("http://localhost:9000")
154 .file_extension(".json")
155 .max_records_per_file(1000);
156
157 assert_eq!(config.bucket, "my-bucket");
158 assert_eq!(config.prefix, "output/");
159 assert_eq!(config.region.as_deref(), Some("eu-west-1"));
160 assert_eq!(
161 config.endpoint_url.as_deref(),
162 Some("http://localhost:9000")
163 );
164 assert_eq!(config.file_extension, ".json");
165 assert_eq!(config.max_records_per_file, Some(1000));
166 }
167
168 #[test]
169 fn batch_size_defaults_to_default_batch_size() {
170 let config = S3SinkConfig::new("my-bucket");
171 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
172 }
173
174 #[test]
175 fn with_batch_size_overrides_default() {
176 let config = S3SinkConfig::new("my-bucket").with_batch_size(500);
177 assert_eq!(config.batch_size, 500);
178 }
179
180 #[test]
181 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
182 let config = S3SinkConfig::new("my-bucket").with_batch_size(0);
183 assert_eq!(config.batch_size, 0);
184 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
185 }
186
187 #[test]
188 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
189 let config =
190 S3SinkConfig::new("my-bucket").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
191 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
192 }
193
194 #[test]
195 fn batch_size_deserializes_from_json() {
196 let json = r#"{
197 "bucket": "my-bucket",
198 "prefix": "",
199 "region": null,
200 "endpoint_url": null,
201 "file_extension": ".jsonl",
202 "max_records_per_file": null,
203 "concurrency": 10,
204 "batch_size": 250
205 }"#;
206 let config: S3SinkConfig = serde_json::from_str(json).unwrap();
207 assert_eq!(config.batch_size, 250);
208 }
209
210 #[test]
211 fn batch_size_defaults_when_omitted_from_json() {
212 let json = r#"{
213 "bucket": "my-bucket",
214 "prefix": "",
215 "region": null,
216 "endpoint_url": null,
217 "file_extension": ".jsonl",
218 "max_records_per_file": null,
219 "concurrency": 10
220 }"#;
221 let config: S3SinkConfig = serde_json::from_str(json).unwrap();
222 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
223 }
224
225 #[cfg(feature = "compression")]
226 #[test]
227 fn compression_config_round_trips() {
228 let json = r#"{
229 "bucket": "b",
230 "prefix": "",
231 "region": null,
232 "endpoint_url": null,
233 "file_extension": ".jsonl.gz",
234 "max_records_per_file": null,
235 "concurrency": 1,
236 "batch_size": 0,
237 "compression": "gzip"
238 }"#;
239 let config: S3SinkConfig = serde_json::from_str(json).unwrap();
240 assert_eq!(config.compression, faucet_core::CompressionConfig::Gzip);
241 }
242
243 #[cfg(feature = "compression")]
244 #[test]
245 fn compression_default_is_auto() {
246 let cfg = S3SinkConfig::new("bucket");
247 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
248 }
249}