faucet_sink_gcs/
config.rs1use faucet_common_gcs::GcsCredentials;
4use faucet_core::DEFAULT_BATCH_SIZE;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
10pub struct GcsSinkConfig {
11 pub bucket: String,
13 pub prefix: String,
15 #[serde(default)]
17 pub auth: GcsCredentials,
18 #[serde(default = "default_file_extension")]
20 pub file_extension: String,
21 pub max_records_per_file: Option<usize>,
24 #[serde(default = "default_concurrency")]
26 pub concurrency: usize,
27 #[serde(default = "default_batch_size")]
32 pub batch_size: usize,
33 pub storage_host: Option<String>,
35 #[cfg(feature = "compression")]
42 #[serde(default)]
43 pub compression: faucet_core::CompressionConfig,
44}
45
46fn default_file_extension() -> String {
47 ".jsonl".to_string()
48}
49fn default_batch_size() -> usize {
50 DEFAULT_BATCH_SIZE
51}
52fn default_concurrency() -> usize {
53 10
54}
55
56impl GcsSinkConfig {
57 pub fn new(bucket: impl Into<String>) -> Self {
58 Self {
59 bucket: bucket.into(),
60 prefix: String::new(),
61 auth: GcsCredentials::default(),
62 file_extension: default_file_extension(),
63 max_records_per_file: None,
64 concurrency: default_concurrency(),
65 batch_size: default_batch_size(),
66 storage_host: None,
67 #[cfg(feature = "compression")]
68 compression: faucet_core::CompressionConfig::Auto,
69 }
70 }
71
72 pub fn prefix(mut self, p: impl Into<String>) -> Self {
73 self.prefix = p.into();
74 self
75 }
76 pub fn auth(mut self, c: GcsCredentials) -> Self {
77 self.auth = c;
78 self
79 }
80 pub fn file_extension(mut self, ext: impl Into<String>) -> Self {
81 self.file_extension = ext.into();
82 self
83 }
84 pub fn max_records_per_file(mut self, n: usize) -> Self {
85 self.max_records_per_file = Some(n);
86 self
87 }
88 pub fn concurrency(mut self, n: usize) -> Self {
89 self.concurrency = n;
90 self
91 }
92 pub fn with_batch_size(mut self, n: usize) -> Self {
93 self.batch_size = n;
94 self
95 }
96 pub fn storage_host(mut self, h: impl Into<String>) -> Self {
97 self.storage_host = Some(h.into());
98 self
99 }
100
101 #[cfg(feature = "compression")]
103 pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
104 self.compression = c;
105 self
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[test]
114 fn defaults() {
115 let c = GcsSinkConfig::new("b");
116 assert_eq!(c.bucket, "b");
117 assert_eq!(c.prefix, "");
118 assert!(matches!(c.auth, GcsCredentials::ApplicationDefault));
119 assert_eq!(c.file_extension, ".jsonl");
120 assert!(c.max_records_per_file.is_none());
121 assert_eq!(c.concurrency, 10);
122 assert_eq!(c.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
123 assert!(c.storage_host.is_none());
124 }
125
126 #[test]
127 fn builder_methods() {
128 let c = GcsSinkConfig::new("b")
129 .prefix("out/")
130 .file_extension(".ndjson")
131 .max_records_per_file(500)
132 .concurrency(4)
133 .with_batch_size(0)
134 .storage_host("http://localhost:4443");
135 assert_eq!(c.prefix, "out/");
136 assert_eq!(c.file_extension, ".ndjson");
137 assert_eq!(c.max_records_per_file, Some(500));
138 assert_eq!(c.concurrency, 4);
139 assert_eq!(c.batch_size, 0);
140 assert_eq!(c.storage_host.as_deref(), Some("http://localhost:4443"));
141 }
142
143 #[test]
144 fn batch_size_sentinel_accepted_and_above_max_rejected() {
145 assert!(faucet_core::validate_batch_size(0).is_ok());
146 assert!(faucet_core::validate_batch_size(faucet_core::MAX_BATCH_SIZE + 1).is_err());
147 }
148
149 #[test]
150 fn batch_size_defaults_when_omitted_from_json() {
151 let json = r#"{
152 "bucket": "b",
153 "prefix": "p/",
154 "max_records_per_file": null,
155 "concurrency": 10,
156 "storage_host": null
157 }"#;
158 let c: GcsSinkConfig = serde_json::from_str(json).unwrap();
159 assert_eq!(c.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
160 }
161
162 #[cfg(feature = "compression")]
163 #[test]
164 fn compression_default_is_auto() {
165 let cfg = GcsSinkConfig::new("bucket");
166 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
167 }
168
169 #[cfg(feature = "compression")]
170 #[test]
171 fn compression_config_round_trips() {
172 let json = r#"{
173 "bucket": "b",
174 "prefix": "",
175 "file_extension": ".jsonl.gz",
176 "max_records_per_file": null,
177 "concurrency": 1,
178 "batch_size": 0,
179 "storage_host": null,
180 "compression": "gzip"
181 }"#;
182 let cfg: GcsSinkConfig = serde_json::from_str(json).unwrap();
183 assert_eq!(cfg.compression, faucet_core::CompressionConfig::Gzip);
184 }
185}