faucet_sink_bigquery/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7pub use faucet_common_bigquery::BigQueryCredentials;
10
11#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
13pub struct BigQuerySinkConfig {
14 pub project_id: String,
16 pub dataset_id: String,
18 pub table_id: String,
20 pub auth: BigQueryCredentials,
23 #[serde(default = "default_batch_size")]
38 pub batch_size: usize,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
47 pub insert_id_field: Option<String>,
48}
49
50fn default_batch_size() -> usize {
51 DEFAULT_BATCH_SIZE
52}
53
54impl BigQuerySinkConfig {
55 pub fn new(
57 project_id: impl Into<String>,
58 dataset_id: impl Into<String>,
59 table_id: impl Into<String>,
60 credentials: BigQueryCredentials,
61 ) -> Self {
62 Self {
63 project_id: project_id.into(),
64 dataset_id: dataset_id.into(),
65 table_id: table_id.into(),
66 auth: credentials,
67 batch_size: DEFAULT_BATCH_SIZE,
68 insert_id_field: None,
69 }
70 }
71
72 pub fn with_insert_id_field(mut self, field: impl Into<String>) -> Self {
75 self.insert_id_field = Some(field.into());
76 self
77 }
78
79 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
85 self.batch_size = batch_size;
86 self
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93
94 #[test]
95 fn batch_size_defaults_to_default_batch_size() {
96 let config = BigQuerySinkConfig::new(
97 "my-project",
98 "my_dataset",
99 "my_table",
100 BigQueryCredentials::ApplicationDefault,
101 );
102 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
103 }
104
105 #[test]
106 fn with_batch_size_overrides_default() {
107 let config =
108 BigQuerySinkConfig::new("proj", "ds", "tbl", BigQueryCredentials::ApplicationDefault)
109 .with_batch_size(500);
110 assert_eq!(config.batch_size, 500);
111 }
112
113 #[test]
114 fn config_stores_all_fields() {
115 let config = BigQuerySinkConfig::new(
116 "my-project",
117 "my_dataset",
118 "my_table",
119 BigQueryCredentials::ServiceAccountKeyPath {
120 path: "/path/to/key.json".into(),
121 },
122 );
123 assert_eq!(config.project_id, "my-project");
124 assert_eq!(config.dataset_id, "my_dataset");
125 assert_eq!(config.table_id, "my_table");
126 assert!(matches!(
127 config.auth,
128 BigQueryCredentials::ServiceAccountKeyPath { .. }
129 ));
130 }
131
132 #[test]
133 fn config_with_inline_key() {
134 let config = BigQuerySinkConfig::new(
135 "proj",
136 "ds",
137 "tbl",
138 BigQueryCredentials::ServiceAccountKey {
139 json: r#"{"type":"service_account"}"#.into(),
140 },
141 );
142 if let BigQueryCredentials::ServiceAccountKey { json } = &config.auth {
143 assert!(json.contains("service_account"));
144 } else {
145 panic!("expected ServiceAccountKey");
146 }
147 }
148
149 #[test]
150 fn config_builder_chaining() {
151 let config =
152 BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault)
153 .with_batch_size(100)
154 .with_batch_size(250);
155 assert_eq!(config.batch_size, 250);
156 }
157
158 #[test]
159 fn config_clone() {
160 let config =
161 BigQuerySinkConfig::new("proj", "ds", "tbl", BigQueryCredentials::ApplicationDefault)
162 .with_batch_size(42);
163 let cloned = config.clone();
164 assert_eq!(cloned.project_id, "proj");
165 assert_eq!(cloned.batch_size, 42);
166 }
167
168 #[test]
169 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
170 let config =
171 BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault)
172 .with_batch_size(0);
173 assert_eq!(config.batch_size, 0);
174 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
175 }
176
177 #[test]
178 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
179 let config =
180 BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault)
181 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
182 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
183 }
184
185 #[test]
186 fn insert_id_field_defaults_none_and_builder_sets_it() {
187 let config =
188 BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault);
189 assert!(config.insert_id_field.is_none());
190 let config = config.with_insert_id_field("event_id");
191 assert_eq!(config.insert_id_field.as_deref(), Some("event_id"));
192 }
193
194 #[test]
195 fn insert_id_field_deserializes_from_json() {
196 let json = r#"{
197 "project_id": "p",
198 "dataset_id": "d",
199 "table_id": "t",
200 "auth": {"type": "application_default"},
201 "insert_id_field": "id"
202 }"#;
203 let config: BigQuerySinkConfig = serde_json::from_str(json).unwrap();
204 assert_eq!(config.insert_id_field.as_deref(), Some("id"));
205 }
206
207 #[test]
208 fn batch_size_deserializes_from_json() {
209 let json = r#"{
210 "project_id": "p",
211 "dataset_id": "d",
212 "table_id": "t",
213 "auth": {"type": "application_default"},
214 "batch_size": 250
215 }"#;
216 let config: BigQuerySinkConfig = serde_json::from_str(json).unwrap();
217 assert_eq!(config.batch_size, 250);
218 }
219
220 #[test]
221 fn batch_size_defaults_when_absent_in_json() {
222 let json = r#"{
223 "project_id": "p",
224 "dataset_id": "d",
225 "table_id": "t",
226 "auth": {"type": "application_default"}
227 }"#;
228 let config: BigQuerySinkConfig = serde_json::from_str(json).unwrap();
229 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
230 }
231}