Skip to main content

faucet_source_gcs/
config.rs

1//! GCS source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use faucet_common_gcs::GcsCredentials;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8/// Format of files stored in GCS.
9#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
10#[serde(rename_all = "snake_case")]
11pub enum GcsFileFormat {
12    /// Each line in the file is a separate JSON record.
13    #[default]
14    JsonLines,
15    /// The entire file is a JSON array of records.
16    JsonArray,
17    /// Each file becomes a single record with `"key"` and `"content"` fields.
18    RawText,
19}
20
21/// Configuration for the GCS source connector.
22#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
23pub struct GcsSourceConfig {
24    /// GCS bucket name.
25    pub bucket: String,
26    /// Object name prefix filter. Ignored when `object_keys` is set.
27    pub prefix: Option<String>,
28    /// Explicit object names. When set, listing is skipped and `prefix`
29    /// is ignored.
30    pub object_keys: Option<Vec<String>>,
31    /// Credential source.
32    #[serde(default)]
33    pub auth: GcsCredentials,
34    /// File format.
35    #[serde(default)]
36    pub file_format: GcsFileFormat,
37    /// Hard cap on the number of objects read (after listing).
38    pub max_objects: Option<usize>,
39    /// Maximum concurrent object reads (default: 10).
40    #[serde(default = "default_concurrency")]
41    pub concurrency: usize,
42    /// Records per emitted `StreamPage`. See "Streaming and batching"
43    /// in the README. `batch_size = 0` is the "no batching" sentinel and
44    /// emits one page per object.
45    #[serde(default = "default_batch_size")]
46    pub batch_size: usize,
47    /// Optional storage-host override (e.g. `http://localhost:4443` for
48    /// fake-gcs-server). Production users should leave this unset.
49    pub storage_host: Option<String>,
50    /// Compression codec applied to each downloaded object. Defaults to
51    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
52    /// the codec is resolved per-object-key, so a single source can read a
53    /// mix of compressed and uncompressed objects. Requires the
54    /// crate-local `compression` feature.
55    #[cfg(feature = "compression")]
56    #[serde(default)]
57    pub compression: faucet_core::CompressionConfig,
58}
59
60fn default_batch_size() -> usize {
61    DEFAULT_BATCH_SIZE
62}
63fn default_concurrency() -> usize {
64    10
65}
66
67impl GcsSourceConfig {
68    /// Create a new config with the required bucket name and sensible defaults.
69    pub fn new(bucket: impl Into<String>) -> Self {
70        Self {
71            bucket: bucket.into(),
72            prefix: None,
73            object_keys: None,
74            auth: GcsCredentials::default(),
75            file_format: GcsFileFormat::default(),
76            max_objects: None,
77            concurrency: default_concurrency(),
78            batch_size: default_batch_size(),
79            storage_host: None,
80            #[cfg(feature = "compression")]
81            compression: faucet_core::CompressionConfig::default(),
82        }
83    }
84
85    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
86        self.prefix = Some(prefix.into());
87        self
88    }
89
90    pub fn object_keys(mut self, keys: Vec<String>) -> Self {
91        self.object_keys = Some(keys);
92        self
93    }
94
95    pub fn auth(mut self, creds: GcsCredentials) -> Self {
96        self.auth = creds;
97        self
98    }
99
100    pub fn file_format(mut self, format: GcsFileFormat) -> Self {
101        self.file_format = format;
102        self
103    }
104
105    pub fn max_objects(mut self, max: usize) -> Self {
106        self.max_objects = Some(max);
107        self
108    }
109
110    pub fn concurrency(mut self, concurrency: usize) -> Self {
111        self.concurrency = concurrency;
112        self
113    }
114
115    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
116        self.batch_size = batch_size;
117        self
118    }
119
120    pub fn storage_host(mut self, host: impl Into<String>) -> Self {
121        self.storage_host = Some(host.into());
122        self
123    }
124
125    /// Set the compression codec. Available only with the `compression` feature.
126    #[cfg(feature = "compression")]
127    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
128        self.compression = c;
129        self
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn default_config() {
139        let config = GcsSourceConfig::new("my-bucket");
140        assert_eq!(config.bucket, "my-bucket");
141        assert!(config.prefix.is_none());
142        assert!(config.object_keys.is_none());
143        assert!(matches!(config.auth, GcsCredentials::ApplicationDefault));
144        assert!(matches!(config.file_format, GcsFileFormat::JsonLines));
145        assert!(config.max_objects.is_none());
146        assert_eq!(config.concurrency, 10);
147        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
148        assert!(config.storage_host.is_none());
149    }
150
151    #[test]
152    fn builder_methods() {
153        let config = GcsSourceConfig::new("my-bucket")
154            .prefix("data/")
155            .file_format(GcsFileFormat::JsonArray)
156            .max_objects(5)
157            .concurrency(20)
158            .with_batch_size(250)
159            .storage_host("http://localhost:4443");
160
161        assert_eq!(config.bucket, "my-bucket");
162        assert_eq!(config.prefix.as_deref(), Some("data/"));
163        assert!(matches!(config.file_format, GcsFileFormat::JsonArray));
164        assert_eq!(config.max_objects, Some(5));
165        assert_eq!(config.concurrency, 20);
166        assert_eq!(config.batch_size, 250);
167        assert_eq!(
168            config.storage_host.as_deref(),
169            Some("http://localhost:4443")
170        );
171    }
172
173    #[test]
174    fn file_format_default_is_json_lines() {
175        assert!(matches!(GcsFileFormat::default(), GcsFileFormat::JsonLines));
176    }
177
178    #[test]
179    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
180        let config = GcsSourceConfig::new("b").with_batch_size(0);
181        assert_eq!(config.batch_size, 0);
182        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
183    }
184
185    #[test]
186    fn batch_size_above_max_is_rejected() {
187        let config = GcsSourceConfig::new("b").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
188        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
189    }
190
191    #[cfg(feature = "compression")]
192    #[test]
193    fn compression_default_is_auto() {
194        let cfg = GcsSourceConfig::new("bucket");
195        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
196    }
197
198    #[test]
199    fn batch_size_defaults_when_omitted_from_json() {
200        let json = r#"{
201            "bucket": "my-bucket",
202            "prefix": null,
203            "object_keys": null,
204            "file_format": "json_lines",
205            "max_objects": null,
206            "concurrency": 10,
207            "storage_host": null
208        }"#;
209        let config: GcsSourceConfig = serde_json::from_str(json).unwrap();
210        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
211    }
212}