faucet_sink_bigquery/sink.rs
1//! BigQuery streaming insert sink.
2
3use crate::config::BigQuerySinkConfig;
4use async_trait::async_trait;
5use faucet_common_bigquery::build_client;
6use faucet_core::FaucetError;
7use gcp_bigquery_client::Client;
8use gcp_bigquery_client::model::table_data_insert_all_request::TableDataInsertAllRequest;
9use gcp_bigquery_client::model::table_data_insert_all_response::TableDataInsertAllResponse;
10use serde_json::Value;
11
12/// A sink that writes JSON records to a BigQuery table using the streaming
13/// insert API (`tabledata.insertAll`).
14pub struct BigQuerySink {
15 config: BigQuerySinkConfig,
16 client: Client,
17}
18
19impl BigQuerySink {
20 /// Create a new BigQuery sink from the given configuration.
21 ///
22 /// This initialises the BigQuery client and authenticates with GCP.
23 /// Returns a [`FaucetError::Auth`] if authentication fails.
24 pub async fn new(config: BigQuerySinkConfig) -> Result<Self, FaucetError> {
25 faucet_core::validate_batch_size(config.batch_size)?;
26 let client = build_client(&config.auth).await?;
27 Ok(Self { config, client })
28 }
29
30 /// Construct a sink from a pre-built BigQuery client.
31 ///
32 /// This is a low-level escape hatch for callers that build their own
33 /// [`gcp_bigquery_client::Client`] — for example to target the
34 /// [`bigquery-emulator`](https://github.com/goccy/bigquery-emulator) via
35 /// [`ClientBuilder::with_v2_base_url`](gcp_bigquery_client::client_builder::ClientBuilder::with_v2_base_url),
36 /// or to drive a wiremock-backed test fixture. Production code should
37 /// prefer [`BigQuerySink::new`], which handles credential loading.
38 #[doc(hidden)]
39 pub fn from_parts(config: BigQuerySinkConfig, client: Client) -> Self {
40 Self { config, client }
41 }
42
43 /// Issue a single `tabledata.insertAll` call and return the raw response.
44 ///
45 /// Returns `Err` only on transport-level or HTTP-level failures. Per-row
46 /// `insertErrors` in the response body are surfaced to the caller as-is;
47 /// it is the caller's responsibility to inspect them.
48 ///
49 /// `skip_invalid_rows` maps to BigQuery's `skipInvalidRows` flag. When
50 /// `false` (the all-or-nothing [`write_batch`](Self::write_batch) path) a
51 /// single invalid row makes BigQuery commit *nothing* and return per-row
52 /// errors. When `true` (the [`write_batch_partial`] DLQ path) BigQuery
53 /// commits every valid row and reports `insertErrors` only for the rejected
54 /// ones — which is what makes the per-row `Ok`/`Err` mapping in
55 /// `write_batch_partial` truthful (without it, the "good" siblings are
56 /// reported `Ok` but were never actually committed → silent data loss).
57 ///
58 /// [`write_batch_partial`]: faucet_core::Sink::write_batch_partial
59 async fn insert_chunk_raw(
60 &self,
61 rows: &[Value],
62 skip_invalid_rows: bool,
63 ) -> Result<TableDataInsertAllResponse, FaucetError> {
64 let mut insert_request = TableDataInsertAllRequest::new();
65 if skip_invalid_rows {
66 insert_request.skip_invalid_rows();
67 }
68 for row in rows {
69 // When `insert_id_field` is configured, send that field's value as
70 // the streaming `insertId` so BigQuery can de-duplicate retries
71 // (#78/#31). A row lacking the field is inserted without one.
72 let insert_id = self.config.insert_id_field.as_ref().and_then(|field| {
73 row.get(field).map(|v| match v {
74 Value::String(s) => s.clone(),
75 other => other.to_string(),
76 })
77 });
78 insert_request.add_row(insert_id, row).map_err(|e| {
79 FaucetError::Sink(format!("failed to serialize row for BigQuery: {e}"))
80 })?;
81 }
82 self.client
83 .tabledata()
84 .insert_all(
85 &self.config.project_id,
86 &self.config.dataset_id,
87 &self.config.table_id,
88 insert_request,
89 )
90 .await
91 .map_err(|e| FaucetError::Sink(format!("BigQuery insertAll failed: {e}")))
92 }
93
94 /// Insert a single chunk of rows in one `tabledata.insertAll` call,
95 /// collapsing any per-row errors into a single [`FaucetError::Sink`].
96 ///
97 /// Used by [`write_batch`](Self::write_batch). Callers that need per-row
98 /// error granularity should use
99 /// [`write_batch_partial`](faucet_core::Sink::write_batch_partial) instead,
100 /// which calls [`insert_chunk_raw`](Self::insert_chunk_raw) directly.
101 async fn insert_batch(&self, rows: &[Value]) -> Result<usize, FaucetError> {
102 if rows.is_empty() {
103 return Ok(0);
104 }
105
106 // All-or-nothing path: `skipInvalidRows=false` so BigQuery commits the
107 // whole chunk or nothing. Any `insertErrors` below becomes an outer
108 // `Err`, so the pipeline aborts before the bookmark advances — no
109 // partial commit to resume past.
110 let response = self.insert_chunk_raw(rows, false).await?;
111
112 // Check for per-row errors.
113 if let Some(errors) = response.insert_errors
114 && !errors.is_empty()
115 {
116 let count = errors.len();
117 let first = &errors[0];
118 return Err(FaucetError::Sink(format!(
119 "BigQuery insertAll: {count} row(s) failed; first error on row {:?}: {:?}",
120 first.index,
121 first
122 .errors
123 .as_ref()
124 .and_then(|errs| errs.first())
125 .map(|e| &e.message),
126 )));
127 }
128
129 Ok(rows.len())
130 }
131}
132
133#[async_trait]
134impl faucet_core::Sink for BigQuerySink {
135 fn connector_name(&self) -> &'static str {
136 "bigquery"
137 }
138
139 fn config_schema(&self) -> serde_json::Value {
140 serde_json::to_value(faucet_core::schema_for!(BigQuerySinkConfig))
141 .expect("schema serialization")
142 }
143
144 /// Preflight check (`faucet doctor`).
145 ///
146 /// Runs a single read-only `tables.get` against the configured
147 /// `project_id.dataset_id.table_id` using the already-authenticated
148 /// client built in [`BigQuerySink::new`]. This mints/uses the access
149 /// token and confirms the credentials can read the target table's
150 /// metadata — without inserting any rows. Auth, missing-dataset,
151 /// missing-table, and permission errors all surface as a `Fail` probe
152 /// with a remediation hint. The access token is never included in the
153 /// reason or hint.
154 async fn check(
155 &self,
156 ctx: &faucet_core::check::CheckContext,
157 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
158 use faucet_core::check::{CheckReport, Probe};
159
160 let started = std::time::Instant::now();
161 let fqn = format!(
162 "{}.{}.{}",
163 self.config.project_id, self.config.dataset_id, self.config.table_id
164 );
165
166 let result = tokio::time::timeout(
167 ctx.timeout,
168 self.client.table().get(
169 &self.config.project_id,
170 &self.config.dataset_id,
171 &self.config.table_id,
172 Some(vec!["tableReference"]),
173 ),
174 )
175 .await;
176
177 let probe = match result {
178 Ok(Ok(_table)) => Probe::pass("auth", started.elapsed()),
179 Ok(Err(e)) => Probe::fail_hint(
180 "auth",
181 started.elapsed(),
182 format!("BigQuery tables.get on {fqn} failed: {e}"),
183 "Verify the service account has roles/bigquery.dataViewer (or \
184 read access) on the dataset and that the project, dataset, and \
185 table IDs are correct.",
186 ),
187 Err(_elapsed) => Probe::fail_hint(
188 "auth",
189 started.elapsed(),
190 format!(
191 "BigQuery tables.get on {fqn} timed out after {:?}",
192 ctx.timeout
193 ),
194 "Check network reachability to bigquery.googleapis.com and that \
195 credentials can be minted within the timeout.",
196 ),
197 };
198
199 Ok(CheckReport::single(probe))
200 }
201
202 /// Write records to BigQuery.
203 ///
204 /// When `config.batch_size > 0` and the input slice is larger than
205 /// `batch_size`, the slice is split into chunks of `batch_size` rows and
206 /// each chunk is sent as a separate `tabledata.insertAll` call. When
207 /// `config.batch_size == 0`, the entire slice is sent in a single
208 /// `insertAll` request — useful when upstream `StreamPage`s are already
209 /// sized for BigQuery's per-request limits.
210 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
211 if records.is_empty() {
212 return Ok(0);
213 }
214
215 let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
216 // Sentinel: pass the entire upstream page through in a single
217 // insertAll call. Subject to BigQuery's ~10MB request limit.
218 vec![records]
219 } else {
220 records.chunks(self.config.batch_size).collect()
221 };
222
223 let mut total = 0;
224 for chunk in chunks {
225 total += self.insert_batch(chunk).await?;
226 }
227
228 tracing::info!(
229 table = %format!(
230 "{}.{}.{}",
231 self.config.project_id, self.config.dataset_id, self.config.table_id
232 ),
233 rows = total,
234 "BigQuery write complete"
235 );
236 Ok(total)
237 }
238
239 /// Write records to BigQuery, returning a per-row outcome vector.
240 ///
241 /// Unlike [`write_batch`](faucet_core::Sink::write_batch), which collapses all
242 /// `insertErrors` into a single `FaucetError`, this method maps each row
243 /// to `Ok(())` if BigQuery accepted it or `Err(FaucetError::Sink(...))` if
244 /// BigQuery reported a per-row error for it. This allows the pipeline's DLQ
245 /// router to quarantine only the rows that BigQuery actually rejected while
246 /// keeping already-committed siblings out of the dead-letter queue.
247 ///
248 /// Transport-level or HTTP-level failures (e.g. network errors, 4xx/5xx
249 /// responses) are still returned as an outer `Err` because no rows can be
250 /// considered committed in that case.
251 ///
252 /// Chunking follows the same `batch_size` semantics as `write_batch`:
253 /// `batch_size == 0` sends the entire slice in one call; `batch_size > 0`
254 /// splits the slice into chunks and concatenates the per-row outcomes in
255 /// input order.
256 async fn write_batch_partial(
257 &self,
258 records: &[Value],
259 ) -> Result<Vec<faucet_core::RowOutcome>, FaucetError> {
260 use std::collections::HashMap;
261
262 if records.is_empty() {
263 return Ok(Vec::new());
264 }
265
266 let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
267 vec![records]
268 } else {
269 records.chunks(self.config.batch_size).collect()
270 };
271
272 let mut outcomes: Vec<faucet_core::RowOutcome> = Vec::with_capacity(records.len());
273
274 for chunk in chunks {
275 // `skipInvalidRows=true`: BigQuery commits every valid row and
276 // returns `insertErrors` only for the rejected ones. This is what
277 // makes mapping the flagged indices to `Err` and the rest to
278 // `Ok(())` correct — the unflagged rows really were committed, so
279 // the DLQ router quarantines only the bad rows and the bookmark
280 // advances over genuinely-persisted data.
281 let response = self.insert_chunk_raw(chunk, true).await?;
282
283 // Build a set of failed row indices → first error message.
284 let failed: HashMap<usize, String> = response
285 .insert_errors
286 .unwrap_or_default()
287 .into_iter()
288 .filter_map(|e| {
289 let idx = e.index? as usize;
290 let msg = e
291 .errors
292 .as_ref()
293 .and_then(|v| v.first())
294 .map(|er| er.message.clone().unwrap_or_default())
295 .unwrap_or_default();
296 Some((idx, msg))
297 })
298 .collect();
299
300 for i in 0..chunk.len() {
301 match failed.get(&i) {
302 Some(msg) => outcomes.push(Err(FaucetError::Sink(format!(
303 "BigQuery row rejected: {msg}"
304 )))),
305 None => outcomes.push(Ok(())),
306 }
307 }
308 }
309
310 Ok(outcomes)
311 }
312}