Skip to main content

faucet_sink_bigquery/
sink.rs

1//! BigQuery streaming insert sink.
2
3use crate::config::BigQuerySinkConfig;
4use async_trait::async_trait;
5use faucet_common_bigquery::build_client;
6use faucet_core::FaucetError;
7use gcp_bigquery_client::Client;
8use gcp_bigquery_client::model::table_data_insert_all_request::TableDataInsertAllRequest;
9use gcp_bigquery_client::model::table_data_insert_all_response::TableDataInsertAllResponse;
10use serde_json::Value;
11
12/// A sink that writes JSON records to a BigQuery table using the streaming
13/// insert API (`tabledata.insertAll`).
14pub struct BigQuerySink {
15    config: BigQuerySinkConfig,
16    client: Client,
17}
18
19impl BigQuerySink {
20    /// Create a new BigQuery sink from the given configuration.
21    ///
22    /// This initialises the BigQuery client and authenticates with GCP.
23    /// Returns a [`FaucetError::Auth`] if authentication fails.
24    pub async fn new(config: BigQuerySinkConfig) -> Result<Self, FaucetError> {
25        faucet_core::validate_batch_size(config.batch_size)?;
26        let client = build_client(&config.auth).await?;
27        Ok(Self { config, client })
28    }
29
30    /// Construct a sink from a pre-built BigQuery client.
31    ///
32    /// This is a low-level escape hatch for callers that build their own
33    /// [`gcp_bigquery_client::Client`] — for example to target the
34    /// [`bigquery-emulator`](https://github.com/goccy/bigquery-emulator) via
35    /// [`ClientBuilder::with_v2_base_url`](gcp_bigquery_client::client_builder::ClientBuilder::with_v2_base_url),
36    /// or to drive a wiremock-backed test fixture. Production code should
37    /// prefer [`BigQuerySink::new`], which handles credential loading.
38    #[doc(hidden)]
39    pub fn from_parts(config: BigQuerySinkConfig, client: Client) -> Self {
40        Self { config, client }
41    }
42
43    /// Issue a single `tabledata.insertAll` call and return the raw response.
44    ///
45    /// Returns `Err` only on transport-level or HTTP-level failures. Per-row
46    /// `insertErrors` in the response body are surfaced to the caller as-is;
47    /// it is the caller's responsibility to inspect them.
48    ///
49    /// `skip_invalid_rows` maps to BigQuery's `skipInvalidRows` flag. When
50    /// `false` (the all-or-nothing [`write_batch`](Self::write_batch) path) a
51    /// single invalid row makes BigQuery commit *nothing* and return per-row
52    /// errors. When `true` (the [`write_batch_partial`] DLQ path) BigQuery
53    /// commits every valid row and reports `insertErrors` only for the rejected
54    /// ones — which is what makes the per-row `Ok`/`Err` mapping in
55    /// `write_batch_partial` truthful (without it, the "good" siblings are
56    /// reported `Ok` but were never actually committed → silent data loss).
57    ///
58    /// [`write_batch_partial`]: faucet_core::Sink::write_batch_partial
59    async fn insert_chunk_raw(
60        &self,
61        rows: &[Value],
62        skip_invalid_rows: bool,
63    ) -> Result<TableDataInsertAllResponse, FaucetError> {
64        let mut insert_request = TableDataInsertAllRequest::new();
65        if skip_invalid_rows {
66            insert_request.skip_invalid_rows();
67        }
68        for row in rows {
69            // When `insert_id_field` is configured, send that field's value as
70            // the streaming `insertId` so BigQuery can de-duplicate retries
71            // (#78/#31). A row lacking the field is inserted without one.
72            let insert_id = self.config.insert_id_field.as_ref().and_then(|field| {
73                row.get(field).map(|v| match v {
74                    Value::String(s) => s.clone(),
75                    other => other.to_string(),
76                })
77            });
78            insert_request.add_row(insert_id, row).map_err(|e| {
79                FaucetError::Sink(format!("failed to serialize row for BigQuery: {e}"))
80            })?;
81        }
82        self.client
83            .tabledata()
84            .insert_all(
85                &self.config.project_id,
86                &self.config.dataset_id,
87                &self.config.table_id,
88                insert_request,
89            )
90            .await
91            .map_err(|e| FaucetError::Sink(format!("BigQuery insertAll failed: {e}")))
92    }
93
94    /// Insert a single chunk of rows in one `tabledata.insertAll` call,
95    /// collapsing any per-row errors into a single [`FaucetError::Sink`].
96    ///
97    /// Used by [`write_batch`](Self::write_batch). Callers that need per-row
98    /// error granularity should use
99    /// [`write_batch_partial`](faucet_core::Sink::write_batch_partial) instead,
100    /// which calls [`insert_chunk_raw`](Self::insert_chunk_raw) directly.
101    async fn insert_batch(&self, rows: &[Value]) -> Result<usize, FaucetError> {
102        if rows.is_empty() {
103            return Ok(0);
104        }
105
106        // All-or-nothing path: `skipInvalidRows=false` so BigQuery commits the
107        // whole chunk or nothing. Any `insertErrors` below becomes an outer
108        // `Err`, so the pipeline aborts before the bookmark advances — no
109        // partial commit to resume past.
110        let response = self.insert_chunk_raw(rows, false).await?;
111
112        // Check for per-row errors.
113        if let Some(errors) = response.insert_errors
114            && !errors.is_empty()
115        {
116            let count = errors.len();
117            let first = &errors[0];
118            return Err(FaucetError::Sink(format!(
119                "BigQuery insertAll: {count} row(s) failed; first error on row {:?}: {:?}",
120                first.index,
121                first
122                    .errors
123                    .as_ref()
124                    .and_then(|errs| errs.first())
125                    .map(|e| &e.message),
126            )));
127        }
128
129        Ok(rows.len())
130    }
131}
132
133#[async_trait]
134impl faucet_core::Sink for BigQuerySink {
135    fn connector_name(&self) -> &'static str {
136        "bigquery"
137    }
138
139    fn config_schema(&self) -> serde_json::Value {
140        serde_json::to_value(faucet_core::schema_for!(BigQuerySinkConfig))
141            .expect("schema serialization")
142    }
143
144    /// Preflight check (`faucet doctor`).
145    ///
146    /// Runs a single read-only `tables.get` against the configured
147    /// `project_id.dataset_id.table_id` using the already-authenticated
148    /// client built in [`BigQuerySink::new`]. This mints/uses the access
149    /// token and confirms the credentials can read the target table's
150    /// metadata — without inserting any rows. Auth, missing-dataset,
151    /// missing-table, and permission errors all surface as a `Fail` probe
152    /// with a remediation hint. The access token is never included in the
153    /// reason or hint.
154    async fn check(
155        &self,
156        ctx: &faucet_core::check::CheckContext,
157    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
158        use faucet_core::check::{CheckReport, Probe};
159
160        let started = std::time::Instant::now();
161        let fqn = format!(
162            "{}.{}.{}",
163            self.config.project_id, self.config.dataset_id, self.config.table_id
164        );
165
166        let result = tokio::time::timeout(
167            ctx.timeout,
168            self.client.table().get(
169                &self.config.project_id,
170                &self.config.dataset_id,
171                &self.config.table_id,
172                Some(vec!["tableReference"]),
173            ),
174        )
175        .await;
176
177        let probe = match result {
178            Ok(Ok(_table)) => Probe::pass("auth", started.elapsed()),
179            Ok(Err(e)) => Probe::fail_hint(
180                "auth",
181                started.elapsed(),
182                format!("BigQuery tables.get on {fqn} failed: {e}"),
183                "Verify the service account has roles/bigquery.dataViewer (or \
184                 read access) on the dataset and that the project, dataset, and \
185                 table IDs are correct.",
186            ),
187            Err(_elapsed) => Probe::fail_hint(
188                "auth",
189                started.elapsed(),
190                format!(
191                    "BigQuery tables.get on {fqn} timed out after {:?}",
192                    ctx.timeout
193                ),
194                "Check network reachability to bigquery.googleapis.com and that \
195                 credentials can be minted within the timeout.",
196            ),
197        };
198
199        Ok(CheckReport::single(probe))
200    }
201
202    /// Write records to BigQuery.
203    ///
204    /// When `config.batch_size > 0` and the input slice is larger than
205    /// `batch_size`, the slice is split into chunks of `batch_size` rows and
206    /// each chunk is sent as a separate `tabledata.insertAll` call. When
207    /// `config.batch_size == 0`, the entire slice is sent in a single
208    /// `insertAll` request — useful when upstream `StreamPage`s are already
209    /// sized for BigQuery's per-request limits.
210    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
211        if records.is_empty() {
212            return Ok(0);
213        }
214
215        let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
216            // Sentinel: pass the entire upstream page through in a single
217            // insertAll call. Subject to BigQuery's ~10MB request limit.
218            vec![records]
219        } else {
220            records.chunks(self.config.batch_size).collect()
221        };
222
223        let mut total = 0;
224        for chunk in chunks {
225            total += self.insert_batch(chunk).await?;
226        }
227
228        tracing::info!(
229            table = %format!(
230                "{}.{}.{}",
231                self.config.project_id, self.config.dataset_id, self.config.table_id
232            ),
233            rows = total,
234            "BigQuery write complete"
235        );
236        Ok(total)
237    }
238
239    /// Write records to BigQuery, returning a per-row outcome vector.
240    ///
241    /// Unlike [`write_batch`](faucet_core::Sink::write_batch), which collapses all
242    /// `insertErrors` into a single `FaucetError`, this method maps each row
243    /// to `Ok(())` if BigQuery accepted it or `Err(FaucetError::Sink(...))` if
244    /// BigQuery reported a per-row error for it. This allows the pipeline's DLQ
245    /// router to quarantine only the rows that BigQuery actually rejected while
246    /// keeping already-committed siblings out of the dead-letter queue.
247    ///
248    /// Transport-level or HTTP-level failures (e.g. network errors, 4xx/5xx
249    /// responses) are still returned as an outer `Err` because no rows can be
250    /// considered committed in that case.
251    ///
252    /// Chunking follows the same `batch_size` semantics as `write_batch`:
253    /// `batch_size == 0` sends the entire slice in one call; `batch_size > 0`
254    /// splits the slice into chunks and concatenates the per-row outcomes in
255    /// input order.
256    async fn write_batch_partial(
257        &self,
258        records: &[Value],
259    ) -> Result<Vec<faucet_core::RowOutcome>, FaucetError> {
260        use std::collections::HashMap;
261
262        if records.is_empty() {
263            return Ok(Vec::new());
264        }
265
266        let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
267            vec![records]
268        } else {
269            records.chunks(self.config.batch_size).collect()
270        };
271
272        let mut outcomes: Vec<faucet_core::RowOutcome> = Vec::with_capacity(records.len());
273
274        for chunk in chunks {
275            // `skipInvalidRows=true`: BigQuery commits every valid row and
276            // returns `insertErrors` only for the rejected ones. This is what
277            // makes mapping the flagged indices to `Err` and the rest to
278            // `Ok(())` correct — the unflagged rows really were committed, so
279            // the DLQ router quarantines only the bad rows and the bookmark
280            // advances over genuinely-persisted data.
281            let response = self.insert_chunk_raw(chunk, true).await?;
282
283            // Build a set of failed row indices → first error message.
284            let failed: HashMap<usize, String> = response
285                .insert_errors
286                .unwrap_or_default()
287                .into_iter()
288                .filter_map(|e| {
289                    let idx = e.index? as usize;
290                    let msg = e
291                        .errors
292                        .as_ref()
293                        .and_then(|v| v.first())
294                        .map(|er| er.message.clone().unwrap_or_default())
295                        .unwrap_or_default();
296                    Some((idx, msg))
297                })
298                .collect();
299
300            for i in 0..chunk.len() {
301                match failed.get(&i) {
302                    Some(msg) => outcomes.push(Err(FaucetError::Sink(format!(
303                        "BigQuery row rejected: {msg}"
304                    )))),
305                    None => outcomes.push(Ok(())),
306                }
307            }
308        }
309
310        Ok(outcomes)
311    }
312}