Skip to main content

faucet_sink_bigquery/
config.rs

1//! BigQuery sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7// Re-export the shared credentials type so end-user imports remain stable
8// (`use faucet_sink_bigquery::BigQueryCredentials;` keeps working).
9pub use faucet_common_bigquery::BigQueryCredentials;
10
11/// Configuration for the BigQuery streaming insert sink.
12#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
13pub struct BigQuerySinkConfig {
14    /// GCP project ID.
15    pub project_id: String,
16    /// BigQuery dataset ID.
17    pub dataset_id: String,
18    /// BigQuery table ID.
19    pub table_id: String,
20    /// Authentication credentials. YAML/JSON key is `auth` for consistency with
21    /// every other connector's auth block.
22    pub auth: BigQueryCredentials,
23    /// Maximum rows per `tabledata.insertAll` request. Defaults to
24    /// [`DEFAULT_BATCH_SIZE`].
25    ///
26    /// When the upstream `StreamPage` carries more records than `batch_size`,
27    /// the sink slices the page into `batch_size`-row chunks and issues one
28    /// `insertAll` HTTP call per chunk. When `batch_size = 0`, the page is
29    /// sent as a single request — useful when the source already chunks to
30    /// BigQuery's preferred size (e.g. ~500 rows for streaming inserts).
31    ///
32    /// `batch_size = 0` is the "no batching" sentinel: the entire upstream
33    /// page is forwarded in one `insertAll` call, subject to BigQuery's
34    /// natural per-request limits (~10MB body, ~500 rows recommended).
35    /// Larger pages may exceed those limits — keep the default unless the
36    /// upstream `StreamPage` size is already tuned for BigQuery.
37    #[serde(default = "default_batch_size")]
38    pub batch_size: usize,
39    /// Optional record field whose value is sent as the BigQuery streaming
40    /// `insertId` for each row. BigQuery uses `insertId` for best-effort
41    /// de-duplication over a short window, so a stable per-row key here makes
42    /// streaming inserts resilient to transport retries (which are otherwise
43    /// at-least-once and can produce duplicate rows) (#78/#31). When `None`
44    /// (the default) no `insertId` is sent. A row missing the field is
45    /// inserted without an `insertId` (no dedup for that row).
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub insert_id_field: Option<String>,
48}
49
50fn default_batch_size() -> usize {
51    DEFAULT_BATCH_SIZE
52}
53
54impl BigQuerySinkConfig {
55    /// Create a new config with the required fields and sensible defaults.
56    pub fn new(
57        project_id: impl Into<String>,
58        dataset_id: impl Into<String>,
59        table_id: impl Into<String>,
60        credentials: BigQueryCredentials,
61    ) -> Self {
62        Self {
63            project_id: project_id.into(),
64            dataset_id: dataset_id.into(),
65            table_id: table_id.into(),
66            auth: credentials,
67            batch_size: DEFAULT_BATCH_SIZE,
68            insert_id_field: None,
69        }
70    }
71
72    /// Set the record field used as the per-row BigQuery streaming `insertId`
73    /// for best-effort de-duplication on retry.
74    pub fn with_insert_id_field(mut self, field: impl Into<String>) -> Self {
75        self.insert_id_field = Some(field.into());
76        self
77    }
78
79    /// Set the per-request row count for `tabledata.insertAll`.
80    ///
81    /// Pass `0` to opt out of re-chunking — the sink forwards each upstream
82    /// [`StreamPage`](faucet_core::StreamPage) as a single `insertAll` call.
83    /// BigQuery's streaming-insert sweet spot is ~500 rows per request.
84    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
85        self.batch_size = batch_size;
86        self
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[test]
95    fn batch_size_defaults_to_default_batch_size() {
96        let config = BigQuerySinkConfig::new(
97            "my-project",
98            "my_dataset",
99            "my_table",
100            BigQueryCredentials::ApplicationDefault,
101        );
102        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
103    }
104
105    #[test]
106    fn with_batch_size_overrides_default() {
107        let config =
108            BigQuerySinkConfig::new("proj", "ds", "tbl", BigQueryCredentials::ApplicationDefault)
109                .with_batch_size(500);
110        assert_eq!(config.batch_size, 500);
111    }
112
113    #[test]
114    fn config_stores_all_fields() {
115        let config = BigQuerySinkConfig::new(
116            "my-project",
117            "my_dataset",
118            "my_table",
119            BigQueryCredentials::ServiceAccountKeyPath {
120                path: "/path/to/key.json".into(),
121            },
122        );
123        assert_eq!(config.project_id, "my-project");
124        assert_eq!(config.dataset_id, "my_dataset");
125        assert_eq!(config.table_id, "my_table");
126        assert!(matches!(
127            config.auth,
128            BigQueryCredentials::ServiceAccountKeyPath { .. }
129        ));
130    }
131
132    #[test]
133    fn config_with_inline_key() {
134        let config = BigQuerySinkConfig::new(
135            "proj",
136            "ds",
137            "tbl",
138            BigQueryCredentials::ServiceAccountKey {
139                json: r#"{"type":"service_account"}"#.into(),
140            },
141        );
142        if let BigQueryCredentials::ServiceAccountKey { json } = &config.auth {
143            assert!(json.contains("service_account"));
144        } else {
145            panic!("expected ServiceAccountKey");
146        }
147    }
148
149    #[test]
150    fn config_builder_chaining() {
151        let config =
152            BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault)
153                .with_batch_size(100)
154                .with_batch_size(250);
155        assert_eq!(config.batch_size, 250);
156    }
157
158    #[test]
159    fn config_clone() {
160        let config =
161            BigQuerySinkConfig::new("proj", "ds", "tbl", BigQueryCredentials::ApplicationDefault)
162                .with_batch_size(42);
163        let cloned = config.clone();
164        assert_eq!(cloned.project_id, "proj");
165        assert_eq!(cloned.batch_size, 42);
166    }
167
168    #[test]
169    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
170        let config =
171            BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault)
172                .with_batch_size(0);
173        assert_eq!(config.batch_size, 0);
174        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
175    }
176
177    #[test]
178    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
179        let config =
180            BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault)
181                .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
182        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
183    }
184
185    #[test]
186    fn insert_id_field_defaults_none_and_builder_sets_it() {
187        let config =
188            BigQuerySinkConfig::new("p", "d", "t", BigQueryCredentials::ApplicationDefault);
189        assert!(config.insert_id_field.is_none());
190        let config = config.with_insert_id_field("event_id");
191        assert_eq!(config.insert_id_field.as_deref(), Some("event_id"));
192    }
193
194    #[test]
195    fn insert_id_field_deserializes_from_json() {
196        let json = r#"{
197            "project_id": "p",
198            "dataset_id": "d",
199            "table_id": "t",
200            "auth": {"type": "application_default"},
201            "insert_id_field": "id"
202        }"#;
203        let config: BigQuerySinkConfig = serde_json::from_str(json).unwrap();
204        assert_eq!(config.insert_id_field.as_deref(), Some("id"));
205    }
206
207    #[test]
208    fn batch_size_deserializes_from_json() {
209        let json = r#"{
210            "project_id": "p",
211            "dataset_id": "d",
212            "table_id": "t",
213            "auth": {"type": "application_default"},
214            "batch_size": 250
215        }"#;
216        let config: BigQuerySinkConfig = serde_json::from_str(json).unwrap();
217        assert_eq!(config.batch_size, 250);
218    }
219
220    #[test]
221    fn batch_size_defaults_when_absent_in_json() {
222        let json = r#"{
223            "project_id": "p",
224            "dataset_id": "d",
225            "table_id": "t",
226            "auth": {"type": "application_default"}
227        }"#;
228        let config: BigQuerySinkConfig = serde_json::from_str(json).unwrap();
229        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
230    }
231}