Skip to main content

faucet_sink_sqlite/
sink.rs

1//! SQLite sink implementation.
2
3use crate::config::{SqliteColumnMapping, SqliteSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::quote_ident;
7use serde_json::Value;
8use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
9use sqlx::{Row, SqlitePool};
10use std::str::FromStr;
11use std::time::Duration;
12
13/// A sink that writes JSON records to a SQLite table.
14pub struct SqliteSink {
15    config: SqliteSinkConfig,
16    pool: SqlitePool,
17}
18
19impl SqliteSink {
20    /// Create a new SQLite sink. Establishes a connection pool.
21    ///
22    /// The pool opens each connection with `journal_mode = WAL` and a 5-second
23    /// `busy_timeout`. WAL lets a writer and readers proceed concurrently
24    /// instead of locking each other out, and the busy timeout makes a
25    /// connection wait-and-retry for the write lock rather than failing
26    /// immediately with `SQLITE_BUSY` under contention. `create_if_missing`
27    /// preserves the previous behaviour of creating the database file on first
28    /// open. WAL on a `sqlite::memory:` database is a harmless no-op.
29    pub async fn new(config: SqliteSinkConfig) -> Result<Self, FaucetError> {
30        let options = SqliteConnectOptions::from_str(&config.database_url)
31            .map_err(|e| FaucetError::Sink(format!("invalid SQLite database_url: {e}")))?
32            .create_if_missing(true)
33            .journal_mode(SqliteJournalMode::Wal)
34            .busy_timeout(Duration::from_secs(5));
35
36        let pool = SqlitePoolOptions::new()
37            .max_connections(config.max_connections)
38            .connect_with(options)
39            .await
40            .map_err(|e| FaucetError::Sink(format!("SQLite connection failed: {e}")))?;
41
42        Ok(Self { config, pool })
43    }
44
45    /// Insert a batch of records using JSON column mode.
46    /// Uses a single multi-row INSERT wrapped in a transaction.
47    async fn insert_json(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
48        if records.is_empty() {
49            return Ok(0);
50        }
51
52        // Build multi-row INSERT: INSERT INTO t (col) VALUES (?), (?), ...
53        let placeholders: Vec<&str> = records.iter().map(|_| "(?)").collect();
54        let insert_sql = format!(
55            "INSERT INTO {} ({}) VALUES {}",
56            quote_ident(&self.config.table_name),
57            quote_ident(column),
58            placeholders.join(", ")
59        );
60
61        let mut tx = self
62            .pool
63            .begin()
64            .await
65            .map_err(|e| FaucetError::Sink(format!("SQLite transaction begin failed: {e}")))?;
66
67        let mut q = sqlx::query(&insert_sql);
68        for record in records {
69            let json_str = serde_json::to_string(record)
70                .map_err(|e| FaucetError::Sink(format!("failed to serialize record: {e}")))?;
71            q = q.bind(json_str);
72        }
73
74        q.execute(&mut *tx)
75            .await
76            .map_err(|e| FaucetError::Sink(format!("SQLite insert failed: {e}")))?;
77
78        tx.commit()
79            .await
80            .map_err(|e| FaucetError::Sink(format!("SQLite transaction commit failed: {e}")))?;
81
82        Ok(records.len())
83    }
84
85    /// Insert a batch of records using auto-mapped columns.
86    ///
87    /// Discovers column names from `pragma_table_info` and maps
88    /// top-level JSON fields to columns. Uses a single multi-row INSERT
89    /// wrapped in a transaction.
90    async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
91        if records.is_empty() {
92            return Ok(0);
93        }
94
95        // Get column names from the table using pragma_table_info.
96        let columns: Vec<String> = sqlx::query(&format!(
97            "PRAGMA table_info({})",
98            quote_ident(&self.config.table_name)
99        ))
100        .fetch_all(&self.pool)
101        .await
102        .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
103        .iter()
104        .map(|row| row.get::<String, _>("name"))
105        .collect();
106
107        if columns.is_empty() {
108            return Err(FaucetError::Sink(format!(
109                "table '{}' has no columns or does not exist",
110                self.config.table_name
111            )));
112        }
113
114        // Pre-validate all records and collect matched column values. The
115        // INSERT column set is the UNION of table columns present in ANY record
116        // (in declared table order), not just the first record's keys —
117        // otherwise a field present only in a later record of the batch would be
118        // silently dropped (audit #146 H1). A row missing a unioned column binds
119        // SQL NULL.
120        let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
121        let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
122
123        for record in records {
124            let obj = record
125                .as_object()
126                .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
127
128            let matching: Vec<(&String, &Value)> = columns
129                .iter()
130                .filter_map(|col| obj.get(col).map(|v| (col, v)))
131                .collect();
132
133            if matching.is_empty() {
134                tracing::warn!(
135                    record_keys = ?obj.keys().collect::<Vec<_>>(),
136                    table_columns = ?columns,
137                    "record has no keys matching table columns, skipping"
138                );
139                continue;
140            }
141
142            for (c, _) in &matching {
143                used.insert(c.as_str());
144            }
145            matched_rows.push(matching);
146        }
147
148        if matched_rows.is_empty() {
149            return Ok(0);
150        }
151
152        // Table columns (in declared order) that appear in at least one record.
153        let insert_columns: Vec<String> = columns
154            .iter()
155            .filter(|c| used.contains(c.as_str()))
156            .cloned()
157            .collect();
158
159        let num_cols = insert_columns.len();
160        let num_rows = matched_rows.len();
161        let col_names: Vec<String> = insert_columns.iter().map(|c| quote_ident(c)).collect();
162
163        // SQLite caps bind parameters per statement at SQLITE_MAX_VARIABLE_NUMBER
164        // (32766 since 3.32). A multi-row INSERT binds `rows × num_cols`
165        // parameters, so a wide table at a large batch_size can exceed it and
166        // fail at runtime with "too many SQL variables" (#78/#21). Split into
167        // sub-INSERTs of at most floor(MAX_VARS / num_cols) rows.
168        const MAX_SQLITE_VARS: usize = 32766;
169        let max_rows_per_insert = (MAX_SQLITE_VARS / num_cols).max(1);
170
171        let mut tx = self
172            .pool
173            .begin()
174            .await
175            .map_err(|e| FaucetError::Sink(format!("SQLite transaction begin failed: {e}")))?;
176
177        for sub in matched_rows.chunks(max_rows_per_insert) {
178            // Build multi-row VALUES clause: (?, ?), (?, ?), ...
179            let row_placeholder = format!("({})", vec!["?"; num_cols].join(", "));
180            let value_tuples: Vec<&str> =
181                (0..sub.len()).map(|_| row_placeholder.as_str()).collect();
182            let query = format!(
183                "INSERT INTO {} ({}) VALUES {}",
184                quote_ident(&self.config.table_name),
185                col_names.join(", "),
186                value_tuples.join(", ")
187            );
188
189            let mut q = sqlx::query(&query);
190            for matched in sub {
191                for col in &insert_columns {
192                    let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
193                    // Bind native SQLite types so column affinity and typed reads
194                    // round-trip correctly. Binding every value as a JSON string
195                    // (the old behaviour) stored `"Bob"` with embedded quotes,
196                    // turned `true` into the text "true", and bound the literal
197                    // text "null" for absent columns instead of SQL NULL (#78/#4).
198                    q = match val {
199                        None | Some(Value::Null) => q.bind(None::<String>),
200                        Some(Value::Bool(b)) => q.bind(*b),
201                        Some(Value::Number(n)) => {
202                            if let Some(i) = n.as_i64() {
203                                q.bind(i)
204                            } else if let Some(f) = n.as_f64() {
205                                q.bind(f)
206                            } else {
207                                // u64 above i64::MAX — preserve exact text.
208                                q.bind(n.to_string())
209                            }
210                        }
211                        Some(Value::String(s)) => q.bind(s.clone()),
212                        // Arrays/objects have no scalar SQL representation — store
213                        // their JSON text (suitable for TEXT / JSON columns).
214                        Some(v) => q.bind(v.to_string()),
215                    };
216                }
217            }
218
219            q.execute(&mut *tx)
220                .await
221                .map_err(|e| FaucetError::Sink(format!("SQLite insert failed: {e}")))?;
222        }
223
224        tx.commit()
225            .await
226            .map_err(|e| FaucetError::Sink(format!("SQLite transaction commit failed: {e}")))?;
227
228        Ok(num_rows)
229    }
230}
231
232#[async_trait]
233impl faucet_core::Sink for SqliteSink {
234    fn config_schema(&self) -> serde_json::Value {
235        serde_json::to_value(faucet_core::schema_for!(SqliteSinkConfig))
236            .expect("schema serialization")
237    }
238
239    /// Preflight connectivity probe (`faucet doctor`).
240    ///
241    /// Acquires a connection from the existing pool and runs `SELECT 1`. This
242    /// is non-mutating and idempotent — it validates that the database file /
243    /// connection opens without writing anything.
244    async fn check(
245        &self,
246        ctx: &faucet_core::check::CheckContext,
247    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
248        use faucet_core::check::{CheckReport, Probe};
249
250        let started = std::time::Instant::now();
251        let probe =
252            match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
253                .await
254            {
255                Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
256                Ok(Err(e)) => Probe::fail_hint(
257                    "auth",
258                    started.elapsed(),
259                    e.to_string(),
260                    "check database_url / that the database file is reachable and openable",
261                ),
262                Err(_) => Probe::fail_hint(
263                    "auth",
264                    started.elapsed(),
265                    "timed out",
266                    "check database_url / that the database file is reachable and openable",
267                ),
268            };
269        Ok(CheckReport::single(probe))
270    }
271
272    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
273        if records.is_empty() {
274            return Ok(0);
275        }
276
277        // `batch_size = 0` is the "no batching" sentinel: write the entire
278        // upstream slice as a single multi-row INSERT inside one
279        // `BEGIN`/`COMMIT` transaction, preserving `StreamPage` framing.
280        // Otherwise re-chunk into `batch_size` slices so each transaction
281        // stays near SQLite's sweet spot (~1000 rows per multi-row INSERT).
282        let effective_chunk = if self.config.batch_size == 0 {
283            records.len()
284        } else {
285            self.config.batch_size
286        };
287
288        let mut total = 0;
289        for chunk in records.chunks(effective_chunk) {
290            total += match &self.config.column_mapping {
291                SqliteColumnMapping::Json { column } => self.insert_json(chunk, column).await?,
292                SqliteColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
293            };
294        }
295
296        tracing::info!(
297            table = %self.config.table_name,
298            rows = total,
299            "SQLite write complete"
300        );
301        Ok(total)
302    }
303}