faucet_source_gcs/
config.rs1use faucet_common_gcs::GcsCredentials;
4use faucet_core::DEFAULT_BATCH_SIZE;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
10#[serde(rename_all = "snake_case")]
11pub enum GcsFileFormat {
12 #[default]
14 JsonLines,
15 JsonArray,
17 RawText,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
23pub struct GcsSourceConfig {
24 pub bucket: String,
26 pub prefix: Option<String>,
28 pub object_keys: Option<Vec<String>>,
31 #[serde(default)]
33 pub auth: GcsCredentials,
34 #[serde(default)]
36 pub file_format: GcsFileFormat,
37 pub max_objects: Option<usize>,
39 #[serde(default = "default_concurrency")]
41 pub concurrency: usize,
42 #[serde(default = "default_batch_size")]
46 pub batch_size: usize,
47 pub storage_host: Option<String>,
50 #[cfg(feature = "compression")]
56 #[serde(default)]
57 pub compression: faucet_core::CompressionConfig,
58}
59
60fn default_batch_size() -> usize {
61 DEFAULT_BATCH_SIZE
62}
63fn default_concurrency() -> usize {
64 10
65}
66
67impl GcsSourceConfig {
68 pub fn new(bucket: impl Into<String>) -> Self {
70 Self {
71 bucket: bucket.into(),
72 prefix: None,
73 object_keys: None,
74 auth: GcsCredentials::default(),
75 file_format: GcsFileFormat::default(),
76 max_objects: None,
77 concurrency: default_concurrency(),
78 batch_size: default_batch_size(),
79 storage_host: None,
80 #[cfg(feature = "compression")]
81 compression: faucet_core::CompressionConfig::default(),
82 }
83 }
84
85 pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
86 self.prefix = Some(prefix.into());
87 self
88 }
89
90 pub fn object_keys(mut self, keys: Vec<String>) -> Self {
91 self.object_keys = Some(keys);
92 self
93 }
94
95 pub fn auth(mut self, creds: GcsCredentials) -> Self {
96 self.auth = creds;
97 self
98 }
99
100 pub fn file_format(mut self, format: GcsFileFormat) -> Self {
101 self.file_format = format;
102 self
103 }
104
105 pub fn max_objects(mut self, max: usize) -> Self {
106 self.max_objects = Some(max);
107 self
108 }
109
110 pub fn concurrency(mut self, concurrency: usize) -> Self {
111 self.concurrency = concurrency;
112 self
113 }
114
115 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
116 self.batch_size = batch_size;
117 self
118 }
119
120 pub fn storage_host(mut self, host: impl Into<String>) -> Self {
121 self.storage_host = Some(host.into());
122 self
123 }
124
125 #[cfg(feature = "compression")]
127 pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
128 self.compression = c;
129 self
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn default_config() {
139 let config = GcsSourceConfig::new("my-bucket");
140 assert_eq!(config.bucket, "my-bucket");
141 assert!(config.prefix.is_none());
142 assert!(config.object_keys.is_none());
143 assert!(matches!(config.auth, GcsCredentials::ApplicationDefault));
144 assert!(matches!(config.file_format, GcsFileFormat::JsonLines));
145 assert!(config.max_objects.is_none());
146 assert_eq!(config.concurrency, 10);
147 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
148 assert!(config.storage_host.is_none());
149 }
150
151 #[test]
152 fn builder_methods() {
153 let config = GcsSourceConfig::new("my-bucket")
154 .prefix("data/")
155 .file_format(GcsFileFormat::JsonArray)
156 .max_objects(5)
157 .concurrency(20)
158 .with_batch_size(250)
159 .storage_host("http://localhost:4443");
160
161 assert_eq!(config.bucket, "my-bucket");
162 assert_eq!(config.prefix.as_deref(), Some("data/"));
163 assert!(matches!(config.file_format, GcsFileFormat::JsonArray));
164 assert_eq!(config.max_objects, Some(5));
165 assert_eq!(config.concurrency, 20);
166 assert_eq!(config.batch_size, 250);
167 assert_eq!(
168 config.storage_host.as_deref(),
169 Some("http://localhost:4443")
170 );
171 }
172
173 #[test]
174 fn file_format_default_is_json_lines() {
175 assert!(matches!(GcsFileFormat::default(), GcsFileFormat::JsonLines));
176 }
177
178 #[test]
179 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
180 let config = GcsSourceConfig::new("b").with_batch_size(0);
181 assert_eq!(config.batch_size, 0);
182 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
183 }
184
185 #[test]
186 fn batch_size_above_max_is_rejected() {
187 let config = GcsSourceConfig::new("b").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
188 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
189 }
190
191 #[cfg(feature = "compression")]
192 #[test]
193 fn compression_default_is_auto() {
194 let cfg = GcsSourceConfig::new("bucket");
195 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
196 }
197
198 #[test]
199 fn batch_size_defaults_when_omitted_from_json() {
200 let json = r#"{
201 "bucket": "my-bucket",
202 "prefix": null,
203 "object_keys": null,
204 "file_format": "json_lines",
205 "max_objects": null,
206 "concurrency": 10,
207 "storage_host": null
208 }"#;
209 let config: GcsSourceConfig = serde_json::from_str(json).unwrap();
210 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
211 }
212}