faucet_sink_sqlite/
sink.rs1use crate::config::{SqliteColumnMapping, SqliteSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::quote_ident;
7use serde_json::Value;
8use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
9use sqlx::{Row, SqlitePool};
10use std::str::FromStr;
11use std::time::Duration;
12
13pub struct SqliteSink {
15 config: SqliteSinkConfig,
16 pool: SqlitePool,
17}
18
19impl SqliteSink {
20 pub async fn new(config: SqliteSinkConfig) -> Result<Self, FaucetError> {
30 let options = SqliteConnectOptions::from_str(&config.database_url)
31 .map_err(|e| FaucetError::Sink(format!("invalid SQLite database_url: {e}")))?
32 .create_if_missing(true)
33 .journal_mode(SqliteJournalMode::Wal)
34 .busy_timeout(Duration::from_secs(5));
35
36 let pool = SqlitePoolOptions::new()
37 .max_connections(config.max_connections)
38 .connect_with(options)
39 .await
40 .map_err(|e| FaucetError::Sink(format!("SQLite connection failed: {e}")))?;
41
42 Ok(Self { config, pool })
43 }
44
45 async fn insert_json(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
48 if records.is_empty() {
49 return Ok(0);
50 }
51
52 let placeholders: Vec<&str> = records.iter().map(|_| "(?)").collect();
54 let insert_sql = format!(
55 "INSERT INTO {} ({}) VALUES {}",
56 quote_ident(&self.config.table_name),
57 quote_ident(column),
58 placeholders.join(", ")
59 );
60
61 let mut tx = self
62 .pool
63 .begin()
64 .await
65 .map_err(|e| FaucetError::Sink(format!("SQLite transaction begin failed: {e}")))?;
66
67 let mut q = sqlx::query(&insert_sql);
68 for record in records {
69 let json_str = serde_json::to_string(record)
70 .map_err(|e| FaucetError::Sink(format!("failed to serialize record: {e}")))?;
71 q = q.bind(json_str);
72 }
73
74 q.execute(&mut *tx)
75 .await
76 .map_err(|e| FaucetError::Sink(format!("SQLite insert failed: {e}")))?;
77
78 tx.commit()
79 .await
80 .map_err(|e| FaucetError::Sink(format!("SQLite transaction commit failed: {e}")))?;
81
82 Ok(records.len())
83 }
84
85 async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
91 if records.is_empty() {
92 return Ok(0);
93 }
94
95 let columns: Vec<String> = sqlx::query(&format!(
97 "PRAGMA table_info({})",
98 quote_ident(&self.config.table_name)
99 ))
100 .fetch_all(&self.pool)
101 .await
102 .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
103 .iter()
104 .map(|row| row.get::<String, _>("name"))
105 .collect();
106
107 if columns.is_empty() {
108 return Err(FaucetError::Sink(format!(
109 "table '{}' has no columns or does not exist",
110 self.config.table_name
111 )));
112 }
113
114 let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
121 let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
122
123 for record in records {
124 let obj = record
125 .as_object()
126 .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
127
128 let matching: Vec<(&String, &Value)> = columns
129 .iter()
130 .filter_map(|col| obj.get(col).map(|v| (col, v)))
131 .collect();
132
133 if matching.is_empty() {
134 tracing::warn!(
135 record_keys = ?obj.keys().collect::<Vec<_>>(),
136 table_columns = ?columns,
137 "record has no keys matching table columns, skipping"
138 );
139 continue;
140 }
141
142 for (c, _) in &matching {
143 used.insert(c.as_str());
144 }
145 matched_rows.push(matching);
146 }
147
148 if matched_rows.is_empty() {
149 return Ok(0);
150 }
151
152 let insert_columns: Vec<String> = columns
154 .iter()
155 .filter(|c| used.contains(c.as_str()))
156 .cloned()
157 .collect();
158
159 let num_cols = insert_columns.len();
160 let num_rows = matched_rows.len();
161 let col_names: Vec<String> = insert_columns.iter().map(|c| quote_ident(c)).collect();
162
163 const MAX_SQLITE_VARS: usize = 32766;
169 let max_rows_per_insert = (MAX_SQLITE_VARS / num_cols).max(1);
170
171 let mut tx = self
172 .pool
173 .begin()
174 .await
175 .map_err(|e| FaucetError::Sink(format!("SQLite transaction begin failed: {e}")))?;
176
177 for sub in matched_rows.chunks(max_rows_per_insert) {
178 let row_placeholder = format!("({})", vec!["?"; num_cols].join(", "));
180 let value_tuples: Vec<&str> =
181 (0..sub.len()).map(|_| row_placeholder.as_str()).collect();
182 let query = format!(
183 "INSERT INTO {} ({}) VALUES {}",
184 quote_ident(&self.config.table_name),
185 col_names.join(", "),
186 value_tuples.join(", ")
187 );
188
189 let mut q = sqlx::query(&query);
190 for matched in sub {
191 for col in &insert_columns {
192 let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
193 q = match val {
199 None | Some(Value::Null) => q.bind(None::<String>),
200 Some(Value::Bool(b)) => q.bind(*b),
201 Some(Value::Number(n)) => {
202 if let Some(i) = n.as_i64() {
203 q.bind(i)
204 } else if let Some(f) = n.as_f64() {
205 q.bind(f)
206 } else {
207 q.bind(n.to_string())
209 }
210 }
211 Some(Value::String(s)) => q.bind(s.clone()),
212 Some(v) => q.bind(v.to_string()),
215 };
216 }
217 }
218
219 q.execute(&mut *tx)
220 .await
221 .map_err(|e| FaucetError::Sink(format!("SQLite insert failed: {e}")))?;
222 }
223
224 tx.commit()
225 .await
226 .map_err(|e| FaucetError::Sink(format!("SQLite transaction commit failed: {e}")))?;
227
228 Ok(num_rows)
229 }
230}
231
232#[async_trait]
233impl faucet_core::Sink for SqliteSink {
234 fn config_schema(&self) -> serde_json::Value {
235 serde_json::to_value(faucet_core::schema_for!(SqliteSinkConfig))
236 .expect("schema serialization")
237 }
238
239 async fn check(
245 &self,
246 ctx: &faucet_core::check::CheckContext,
247 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
248 use faucet_core::check::{CheckReport, Probe};
249
250 let started = std::time::Instant::now();
251 let probe =
252 match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
253 .await
254 {
255 Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
256 Ok(Err(e)) => Probe::fail_hint(
257 "auth",
258 started.elapsed(),
259 e.to_string(),
260 "check database_url / that the database file is reachable and openable",
261 ),
262 Err(_) => Probe::fail_hint(
263 "auth",
264 started.elapsed(),
265 "timed out",
266 "check database_url / that the database file is reachable and openable",
267 ),
268 };
269 Ok(CheckReport::single(probe))
270 }
271
272 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
273 if records.is_empty() {
274 return Ok(0);
275 }
276
277 let effective_chunk = if self.config.batch_size == 0 {
283 records.len()
284 } else {
285 self.config.batch_size
286 };
287
288 let mut total = 0;
289 for chunk in records.chunks(effective_chunk) {
290 total += match &self.config.column_mapping {
291 SqliteColumnMapping::Json { column } => self.insert_json(chunk, column).await?,
292 SqliteColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
293 };
294 }
295
296 tracing::info!(
297 table = %self.config.table_name,
298 rows = total,
299 "SQLite write complete"
300 );
301 Ok(total)
302 }
303}