Skip to main content

faucet_sink_gcs/
config.rs

1//! GCS sink configuration.
2
3use faucet_common_gcs::GcsCredentials;
4use faucet_core::DEFAULT_BATCH_SIZE;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8/// Configuration for the GCS sink connector.
9#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
10pub struct GcsSinkConfig {
11    /// GCS bucket name.
12    pub bucket: String,
13    /// Object-name prefix for written files.
14    pub prefix: String,
15    /// Credential source.
16    #[serde(default)]
17    pub auth: GcsCredentials,
18    /// File extension for written objects (default `.jsonl`).
19    #[serde(default = "default_file_extension")]
20    pub file_extension: String,
21    /// Hard cap on records per uploaded object. `None` means a single
22    /// object per `write_batch` call (still subject to `batch_size`).
23    pub max_records_per_file: Option<usize>,
24    /// Maximum number of concurrent uploads (default 10).
25    #[serde(default = "default_concurrency")]
26    pub concurrency: usize,
27    /// Records per uploaded object from a single `write_batch` call.
28    /// `batch_size = 0` writes whatever upstream hands the sink as one
29    /// object. Recommended value for GCS is `0` — many tiny objects is
30    /// a well-known anti-pattern.
31    #[serde(default = "default_batch_size")]
32    pub batch_size: usize,
33    /// Optional storage-host override (integration-test escape hatch).
34    pub storage_host: Option<String>,
35    /// Compression codec applied to each uploaded object body. Defaults to
36    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
37    /// resolves against `file_extension` (so `.jsonl.gz` triggers gzip).
38    /// Requires the crate-local `compression` feature. Note: this sink does
39    /// **not** set the GCS `Content-Encoding` metadata, so consumers must
40    /// decompress explicitly.
41    #[cfg(feature = "compression")]
42    #[serde(default)]
43    pub compression: faucet_core::CompressionConfig,
44}
45
46fn default_file_extension() -> String {
47    ".jsonl".to_string()
48}
49fn default_batch_size() -> usize {
50    DEFAULT_BATCH_SIZE
51}
52fn default_concurrency() -> usize {
53    10
54}
55
56impl GcsSinkConfig {
57    pub fn new(bucket: impl Into<String>) -> Self {
58        Self {
59            bucket: bucket.into(),
60            prefix: String::new(),
61            auth: GcsCredentials::default(),
62            file_extension: default_file_extension(),
63            max_records_per_file: None,
64            concurrency: default_concurrency(),
65            batch_size: default_batch_size(),
66            storage_host: None,
67            #[cfg(feature = "compression")]
68            compression: faucet_core::CompressionConfig::Auto,
69        }
70    }
71
72    pub fn prefix(mut self, p: impl Into<String>) -> Self {
73        self.prefix = p.into();
74        self
75    }
76    pub fn auth(mut self, c: GcsCredentials) -> Self {
77        self.auth = c;
78        self
79    }
80    pub fn file_extension(mut self, ext: impl Into<String>) -> Self {
81        self.file_extension = ext.into();
82        self
83    }
84    pub fn max_records_per_file(mut self, n: usize) -> Self {
85        self.max_records_per_file = Some(n);
86        self
87    }
88    pub fn concurrency(mut self, n: usize) -> Self {
89        self.concurrency = n;
90        self
91    }
92    pub fn with_batch_size(mut self, n: usize) -> Self {
93        self.batch_size = n;
94        self
95    }
96    pub fn storage_host(mut self, h: impl Into<String>) -> Self {
97        self.storage_host = Some(h.into());
98        self
99    }
100
101    /// Set the compression codec. Available only with the `compression` feature.
102    #[cfg(feature = "compression")]
103    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
104        self.compression = c;
105        self
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn defaults() {
115        let c = GcsSinkConfig::new("b");
116        assert_eq!(c.bucket, "b");
117        assert_eq!(c.prefix, "");
118        assert!(matches!(c.auth, GcsCredentials::ApplicationDefault));
119        assert_eq!(c.file_extension, ".jsonl");
120        assert!(c.max_records_per_file.is_none());
121        assert_eq!(c.concurrency, 10);
122        assert_eq!(c.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
123        assert!(c.storage_host.is_none());
124    }
125
126    #[test]
127    fn builder_methods() {
128        let c = GcsSinkConfig::new("b")
129            .prefix("out/")
130            .file_extension(".ndjson")
131            .max_records_per_file(500)
132            .concurrency(4)
133            .with_batch_size(0)
134            .storage_host("http://localhost:4443");
135        assert_eq!(c.prefix, "out/");
136        assert_eq!(c.file_extension, ".ndjson");
137        assert_eq!(c.max_records_per_file, Some(500));
138        assert_eq!(c.concurrency, 4);
139        assert_eq!(c.batch_size, 0);
140        assert_eq!(c.storage_host.as_deref(), Some("http://localhost:4443"));
141    }
142
143    #[test]
144    fn batch_size_sentinel_accepted_and_above_max_rejected() {
145        assert!(faucet_core::validate_batch_size(0).is_ok());
146        assert!(faucet_core::validate_batch_size(faucet_core::MAX_BATCH_SIZE + 1).is_err());
147    }
148
149    #[test]
150    fn batch_size_defaults_when_omitted_from_json() {
151        let json = r#"{
152            "bucket": "b",
153            "prefix": "p/",
154            "max_records_per_file": null,
155            "concurrency": 10,
156            "storage_host": null
157        }"#;
158        let c: GcsSinkConfig = serde_json::from_str(json).unwrap();
159        assert_eq!(c.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
160    }
161
162    #[cfg(feature = "compression")]
163    #[test]
164    fn compression_default_is_auto() {
165        let cfg = GcsSinkConfig::new("bucket");
166        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
167    }
168
169    #[cfg(feature = "compression")]
170    #[test]
171    fn compression_config_round_trips() {
172        let json = r#"{
173            "bucket": "b",
174            "prefix": "",
175            "file_extension": ".jsonl.gz",
176            "max_records_per_file": null,
177            "concurrency": 1,
178            "batch_size": 0,
179            "storage_host": null,
180            "compression": "gzip"
181        }"#;
182        let cfg: GcsSinkConfig = serde_json::from_str(json).unwrap();
183        assert_eq!(cfg.compression, faucet_core::CompressionConfig::Gzip);
184    }
185}