faucet_source_s3/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum S3FileFormat {
11 #[default]
13 JsonLines,
14 JsonArray,
16 RawText,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
22pub struct S3SourceConfig {
23 pub bucket: String,
25 pub prefix: Option<String>,
27 pub region: Option<String>,
29 pub endpoint_url: Option<String>,
31 pub file_format: S3FileFormat,
33 pub max_objects: Option<usize>,
35 pub concurrency: usize,
37 #[serde(default = "default_batch_size")]
51 pub batch_size: usize,
52 #[cfg(feature = "compression")]
58 #[serde(default)]
59 pub compression: faucet_core::CompressionConfig,
60}
61
62fn default_batch_size() -> usize {
63 DEFAULT_BATCH_SIZE
64}
65
66impl S3SourceConfig {
67 pub fn new(bucket: impl Into<String>) -> Self {
69 Self {
70 bucket: bucket.into(),
71 prefix: None,
72 region: None,
73 endpoint_url: None,
74 file_format: S3FileFormat::default(),
75 max_objects: None,
76 concurrency: 10,
77 batch_size: DEFAULT_BATCH_SIZE,
78 #[cfg(feature = "compression")]
79 compression: faucet_core::CompressionConfig::default(),
80 }
81 }
82
83 pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
85 self.prefix = Some(prefix.into());
86 self
87 }
88
89 pub fn region(mut self, region: impl Into<String>) -> Self {
91 self.region = Some(region.into());
92 self
93 }
94
95 pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
97 self.endpoint_url = Some(url.into());
98 self
99 }
100
101 pub fn file_format(mut self, format: S3FileFormat) -> Self {
103 self.file_format = format;
104 self
105 }
106
107 pub fn max_objects(mut self, max: usize) -> Self {
109 self.max_objects = Some(max);
110 self
111 }
112
113 pub fn concurrency(mut self, concurrency: usize) -> Self {
115 self.concurrency = concurrency;
116 self
117 }
118
119 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
125 self.batch_size = batch_size;
126 self
127 }
128
129 #[cfg(feature = "compression")]
131 pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
132 self.compression = c;
133 self
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn default_config() {
143 let config = S3SourceConfig::new("my-bucket");
144 assert_eq!(config.bucket, "my-bucket");
145 assert!(config.prefix.is_none());
146 assert!(config.region.is_none());
147 assert!(config.endpoint_url.is_none());
148 assert!(matches!(config.file_format, S3FileFormat::JsonLines));
149 assert!(config.max_objects.is_none());
150 }
151
152 #[test]
153 fn builder_methods() {
154 let config = S3SourceConfig::new("my-bucket")
155 .prefix("data/")
156 .region("us-west-2")
157 .endpoint_url("http://localhost:9000")
158 .file_format(S3FileFormat::JsonArray)
159 .max_objects(10);
160
161 assert_eq!(config.bucket, "my-bucket");
162 assert_eq!(config.prefix.as_deref(), Some("data/"));
163 assert_eq!(config.region.as_deref(), Some("us-west-2"));
164 assert_eq!(
165 config.endpoint_url.as_deref(),
166 Some("http://localhost:9000")
167 );
168 assert!(matches!(config.file_format, S3FileFormat::JsonArray));
169 assert_eq!(config.max_objects, Some(10));
170 }
171
172 #[test]
173 fn file_format_default_is_json_lines() {
174 let format = S3FileFormat::default();
175 assert!(matches!(format, S3FileFormat::JsonLines));
176 }
177
178 #[test]
179 fn batch_size_defaults_to_default_batch_size() {
180 let config = S3SourceConfig::new("my-bucket");
181 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
182 }
183
184 #[test]
185 fn with_batch_size_overrides_default() {
186 let config = S3SourceConfig::new("my-bucket").with_batch_size(500);
187 assert_eq!(config.batch_size, 500);
188 }
189
190 #[test]
191 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
192 let config = S3SourceConfig::new("my-bucket").with_batch_size(0);
193 assert_eq!(config.batch_size, 0);
194 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
195 }
196
197 #[test]
198 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
199 let config =
200 S3SourceConfig::new("my-bucket").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
201 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
202 }
203
204 #[test]
205 fn batch_size_deserializes_from_json() {
206 let json = r#"{
207 "bucket": "my-bucket",
208 "prefix": null,
209 "region": null,
210 "endpoint_url": null,
211 "file_format": "json_lines",
212 "max_objects": null,
213 "concurrency": 10,
214 "batch_size": 250
215 }"#;
216 let config: S3SourceConfig = serde_json::from_str(json).unwrap();
217 assert_eq!(config.batch_size, 250);
218 }
219
220 #[cfg(feature = "compression")]
221 #[test]
222 fn compression_default_is_auto() {
223 let cfg = S3SourceConfig::new("bucket");
224 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
225 }
226}