Skip to main content

khive_db/
sql_bridge.rs

1//! SqlAccess bridge: connects `ConnectionPool` to `khive_storage::SqlAccess`.
2//!
3//! Two modes:
4//! - **File-backed**: Opens standalone connections per reader/writer/tx call (high concurrency).
5//! - **Memory**: Uses pool-backed approach (acquire pool connection per-query inside `spawn_blocking`).
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10
11use khive_storage::error::StorageError;
12use khive_storage::types::{SqlColumn, SqlIsolation, SqlRow, SqlStatement, SqlTxOptions, SqlValue};
13use khive_storage::StorageCapability;
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18// =============================================================================
19// Shared helpers
20// =============================================================================
21
22/// Convert a rusqlite `Row` into an owned `SqlRow`.
23fn row_to_sql_row(row: &rusqlite::Row<'_>, col_count: usize, col_names: &[String]) -> SqlRow {
24    let mut columns = Vec::with_capacity(col_count);
25    for i in 0..col_count {
26        let value = match row.get_ref(i) {
27            Ok(rusqlite::types::ValueRef::Null) => SqlValue::Null,
28            Ok(rusqlite::types::ValueRef::Integer(v)) => SqlValue::Integer(v),
29            Ok(rusqlite::types::ValueRef::Real(v)) => SqlValue::Float(v),
30            Ok(rusqlite::types::ValueRef::Text(bytes)) => {
31                SqlValue::Text(String::from_utf8_lossy(bytes).into_owned())
32            }
33            Ok(rusqlite::types::ValueRef::Blob(bytes)) => SqlValue::Blob(bytes.to_vec()),
34            Err(_) => SqlValue::Null,
35        };
36        columns.push(SqlColumn {
37            name: col_names.get(i).cloned().unwrap_or_default(),
38            value,
39        });
40    }
41    SqlRow { columns }
42}
43
44/// Bind `SqlValue` parameters to a rusqlite statement.
45fn bind_params(
46    stmt: &mut rusqlite::Statement<'_>,
47    params: &[SqlValue],
48) -> Result<(), rusqlite::Error> {
49    for (i, param) in params.iter().enumerate() {
50        let idx = i + 1; // rusqlite uses 1-based indexing
51        match param {
52            SqlValue::Null => stmt.raw_bind_parameter(idx, rusqlite::types::Null)?,
53            SqlValue::Bool(v) => stmt.raw_bind_parameter(idx, *v as i64)?,
54            SqlValue::Integer(v) => stmt.raw_bind_parameter(idx, *v)?,
55            SqlValue::Float(v) => stmt.raw_bind_parameter(idx, *v)?,
56            SqlValue::Text(v) => stmt.raw_bind_parameter(idx, v.as_str())?,
57            SqlValue::Blob(v) => stmt.raw_bind_parameter(idx, v.as_slice())?,
58            SqlValue::Json(v) => {
59                let s = serde_json::to_string(v).unwrap_or_default();
60                stmt.raw_bind_parameter(idx, s.as_str())?;
61            }
62            SqlValue::Uuid(v) => stmt.raw_bind_parameter(idx, v.to_string().as_str())?,
63            SqlValue::Timestamp(v) => {
64                stmt.raw_bind_parameter(idx, v.timestamp_micros())?;
65            }
66        }
67    }
68    Ok(())
69}
70
71/// Execute a query on a `rusqlite::Connection` and return owned rows.
72fn execute_query(
73    conn: &rusqlite::Connection,
74    statement: &SqlStatement,
75) -> Result<Vec<SqlRow>, rusqlite::Error> {
76    let mut stmt = conn.prepare(&statement.sql)?;
77    bind_params(&mut stmt, &statement.params)?;
78
79    let col_count = stmt.column_count();
80    let col_names: Vec<String> = (0..col_count)
81        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
82        .collect();
83
84    let mut rows = Vec::new();
85    let mut raw_rows = stmt.raw_query();
86    while let Some(row) = raw_rows.next()? {
87        rows.push(row_to_sql_row(row, col_count, &col_names));
88    }
89    Ok(rows)
90}
91
92/// Map a rusqlite error to `StorageError`.
93fn map_rusqlite_err(e: rusqlite::Error, op: &'static str) -> StorageError {
94    StorageError::driver(StorageCapability::Sql, op, e)
95}
96
97// =============================================================================
98// Standalone connection readers/writers (file-backed databases)
99// =============================================================================
100
101fn open_standalone_reader(pool: &ConnectionPool) -> Result<rusqlite::Connection, StorageError> {
102    let config = pool.config();
103    let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
104        operation: "reader".into(),
105        message: "in-memory databases do not support standalone readers; use pool-backed".into(),
106    })?;
107
108    let conn = rusqlite::Connection::open_with_flags(
109        path,
110        rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
111            | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
112            | rusqlite::OpenFlags::SQLITE_OPEN_URI,
113    )
114    .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
115
116    conn.busy_timeout(config.busy_timeout)
117        .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
118    conn.pragma_update(None, "cache_size", "-65536")
119        .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
120    conn.pragma_update(None, "mmap_size", "1073741824")
121        .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
122
123    Ok(conn)
124}
125
126fn open_standalone_writer(pool: &ConnectionPool) -> Result<rusqlite::Connection, StorageError> {
127    let config = pool.config();
128    let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
129        operation: "writer".into(),
130        message: "in-memory databases do not support standalone writer; use pool-backed".into(),
131    })?;
132
133    let conn = rusqlite::Connection::open_with_flags(
134        path,
135        rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
136            | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
137            | rusqlite::OpenFlags::SQLITE_OPEN_URI,
138    )
139    .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
140
141    conn.busy_timeout(config.busy_timeout)
142        .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
143    conn.pragma_update(None, "cache_size", "-65536")
144        .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
145    conn.pragma_update(None, "mmap_size", "1073741824")
146        .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
147
148    Ok(conn)
149}
150
151// =============================================================================
152// File-backed: SqliteReader (standalone connection)
153// =============================================================================
154
155struct SqliteReader {
156    conn: Option<rusqlite::Connection>,
157}
158
159#[async_trait]
160impl khive_storage::SqlReader for SqliteReader {
161    async fn query_row(
162        &mut self,
163        statement: SqlStatement,
164    ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
165        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
166            operation: "query_row".into(),
167            message: "connection already consumed".into(),
168        })?;
169        let (conn, result) = tokio::task::spawn_blocking(move || {
170            let res = execute_query(&conn, &statement);
171            (conn, res)
172        })
173        .await
174        .map_err(|e| StorageError::driver(StorageCapability::Sql, "query_row", e))?;
175        self.conn = Some(conn);
176        let rows = result.map_err(|e| map_rusqlite_err(e, "query_row"))?;
177        Ok(rows.into_iter().next())
178    }
179
180    async fn query_all(
181        &mut self,
182        statement: SqlStatement,
183    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
184        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
185            operation: "query_all".into(),
186            message: "connection already consumed".into(),
187        })?;
188        let (conn, result) = tokio::task::spawn_blocking(move || {
189            let res = execute_query(&conn, &statement);
190            (conn, res)
191        })
192        .await
193        .map_err(|e| StorageError::driver(StorageCapability::Sql, "query_all", e))?;
194        self.conn = Some(conn);
195        result.map_err(|e| map_rusqlite_err(e, "query_all"))
196    }
197
198    async fn query_scalar(
199        &mut self,
200        statement: SqlStatement,
201    ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
202        let row = self.query_row(statement).await?;
203        Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
204    }
205
206    async fn explain(
207        &mut self,
208        statement: SqlStatement,
209    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
210        let explain_stmt = SqlStatement {
211            sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
212            params: statement.params,
213            label: statement.label,
214        };
215        self.query_all(explain_stmt).await
216    }
217}
218
219// =============================================================================
220// File-backed: SqliteWriter (standalone connection)
221// =============================================================================
222
223struct SqliteWriter {
224    conn: Option<rusqlite::Connection>,
225}
226
227#[async_trait]
228impl khive_storage::SqlReader for SqliteWriter {
229    async fn query_row(
230        &mut self,
231        statement: SqlStatement,
232    ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
233        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
234            operation: "writer.query_row".into(),
235            message: "connection already consumed".into(),
236        })?;
237        let (conn, result) = tokio::task::spawn_blocking(move || {
238            let res = execute_query(&conn, &statement);
239            (conn, res)
240        })
241        .await
242        .map_err(|e| StorageError::driver(StorageCapability::Sql, "writer.query_row", e))?;
243        self.conn = Some(conn);
244        let rows = result.map_err(|e| map_rusqlite_err(e, "writer.query_row"))?;
245        Ok(rows.into_iter().next())
246    }
247
248    async fn query_all(
249        &mut self,
250        statement: SqlStatement,
251    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
252        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
253            operation: "writer.query_all".into(),
254            message: "connection already consumed".into(),
255        })?;
256        let (conn, result) = tokio::task::spawn_blocking(move || {
257            let res = execute_query(&conn, &statement);
258            (conn, res)
259        })
260        .await
261        .map_err(|e| StorageError::driver(StorageCapability::Sql, "writer.query_all", e))?;
262        self.conn = Some(conn);
263        result.map_err(|e| map_rusqlite_err(e, "writer.query_all"))
264    }
265
266    async fn query_scalar(
267        &mut self,
268        statement: SqlStatement,
269    ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
270        let row = khive_storage::SqlReader::query_row(self, statement).await?;
271        Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
272    }
273
274    async fn explain(
275        &mut self,
276        statement: SqlStatement,
277    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
278        let explain_stmt = SqlStatement {
279            sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
280            params: statement.params,
281            label: statement.label,
282        };
283        khive_storage::SqlReader::query_all(self, explain_stmt).await
284    }
285}
286
287#[async_trait]
288impl khive_storage::SqlWriter for SqliteWriter {
289    async fn execute(
290        &mut self,
291        statement: SqlStatement,
292    ) -> khive_storage::types::StorageResult<u64> {
293        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
294            operation: "execute".into(),
295            message: "connection already consumed".into(),
296        })?;
297        let (conn, result) = tokio::task::spawn_blocking(move || {
298            let res = (|| -> Result<usize, rusqlite::Error> {
299                let mut stmt = conn.prepare(&statement.sql)?;
300                bind_params(&mut stmt, &statement.params)?;
301                stmt.raw_execute()
302            })();
303            (conn, res)
304        })
305        .await
306        .map_err(|e| StorageError::driver(StorageCapability::Sql, "execute", e))?;
307        self.conn = Some(conn);
308        let affected = result.map_err(|e| map_rusqlite_err(e, "execute"))?;
309        Ok(affected as u64)
310    }
311
312    async fn execute_batch(
313        &mut self,
314        statements: Vec<SqlStatement>,
315    ) -> khive_storage::types::StorageResult<u64> {
316        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
317            operation: "execute_batch".into(),
318            message: "connection already consumed".into(),
319        })?;
320        let (conn, result) = tokio::task::spawn_blocking(move || {
321            let result = (|| -> Result<u64, rusqlite::Error> {
322                conn.execute_batch("BEGIN IMMEDIATE")?;
323                let mut total: u64 = 0;
324                for statement in &statements {
325                    let mut stmt = conn.prepare(&statement.sql)?;
326                    bind_params(&mut stmt, &statement.params)?;
327                    total += stmt.raw_execute()? as u64;
328                }
329                conn.execute_batch("COMMIT")?;
330                Ok(total)
331            })();
332            if result.is_err() {
333                let _ = conn.execute_batch("ROLLBACK");
334            }
335            (conn, result)
336        })
337        .await
338        .map_err(|e| StorageError::driver(StorageCapability::Sql, "execute_batch", e))?;
339        self.conn = Some(conn);
340        result.map_err(|e| map_rusqlite_err(e, "execute_batch"))
341    }
342
343    async fn execute_script(&mut self, script: String) -> khive_storage::types::StorageResult<()> {
344        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
345            operation: "execute_script".into(),
346            message: "connection already consumed".into(),
347        })?;
348        let (conn, result) = tokio::task::spawn_blocking(move || {
349            let res = conn.execute_batch(&script);
350            (conn, res)
351        })
352        .await
353        .map_err(|e| StorageError::driver(StorageCapability::Sql, "execute_script", e))?;
354        self.conn = Some(conn);
355        result.map_err(|e| map_rusqlite_err(e, "execute_script"))
356    }
357}
358
359// =============================================================================
360// File-backed: SqliteTransaction (standalone connection)
361// =============================================================================
362
363struct SqliteTransaction {
364    conn: Option<rusqlite::Connection>,
365    /// Whether `PRAGMA query_only = ON` was set on the connection.
366    /// Must be reset to OFF before COMMIT/ROLLBACK so the connection can
367    /// be returned cleanly (defensive; connection is dropped after use anyway).
368    read_only: bool,
369}
370
371#[async_trait]
372impl khive_storage::SqlReader for SqliteTransaction {
373    async fn query_row(
374        &mut self,
375        statement: SqlStatement,
376    ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
377        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
378            operation: "tx.query_row".into(),
379            message: "connection already consumed".into(),
380        })?;
381        let (conn, result) = tokio::task::spawn_blocking(move || {
382            let res = execute_query(&conn, &statement);
383            (conn, res)
384        })
385        .await
386        .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.query_row", e))?;
387        self.conn = Some(conn);
388        let rows = result.map_err(|e| map_rusqlite_err(e, "tx.query_row"))?;
389        Ok(rows.into_iter().next())
390    }
391
392    async fn query_all(
393        &mut self,
394        statement: SqlStatement,
395    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
396        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
397            operation: "tx.query_all".into(),
398            message: "connection already consumed".into(),
399        })?;
400        let (conn, result) = tokio::task::spawn_blocking(move || {
401            let res = execute_query(&conn, &statement);
402            (conn, res)
403        })
404        .await
405        .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.query_all", e))?;
406        self.conn = Some(conn);
407        result.map_err(|e| map_rusqlite_err(e, "tx.query_all"))
408    }
409
410    async fn query_scalar(
411        &mut self,
412        statement: SqlStatement,
413    ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
414        let row = khive_storage::SqlReader::query_row(self, statement).await?;
415        Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
416    }
417
418    async fn explain(
419        &mut self,
420        statement: SqlStatement,
421    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
422        let explain_stmt = SqlStatement {
423            sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
424            params: statement.params,
425            label: statement.label,
426        };
427        khive_storage::SqlReader::query_all(self, explain_stmt).await
428    }
429}
430
431#[async_trait]
432impl khive_storage::SqlWriter for SqliteTransaction {
433    async fn execute(
434        &mut self,
435        statement: SqlStatement,
436    ) -> khive_storage::types::StorageResult<u64> {
437        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
438            operation: "tx.execute".into(),
439            message: "connection already consumed".into(),
440        })?;
441        let (conn, result) = tokio::task::spawn_blocking(move || {
442            let res = (|| -> Result<usize, rusqlite::Error> {
443                let mut stmt = conn.prepare(&statement.sql)?;
444                bind_params(&mut stmt, &statement.params)?;
445                stmt.raw_execute()
446            })();
447            (conn, res)
448        })
449        .await
450        .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.execute", e))?;
451        self.conn = Some(conn);
452        let affected = result.map_err(|e| map_rusqlite_err(e, "tx.execute"))?;
453        Ok(affected as u64)
454    }
455
456    async fn execute_batch(
457        &mut self,
458        statements: Vec<SqlStatement>,
459    ) -> khive_storage::types::StorageResult<u64> {
460        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
461            operation: "tx.execute_batch".into(),
462            message: "connection already consumed".into(),
463        })?;
464        let (conn, result) = tokio::task::spawn_blocking(move || {
465            let mut total: u64 = 0;
466            for statement in &statements {
467                let res = (|| -> Result<usize, rusqlite::Error> {
468                    let mut stmt = conn.prepare(&statement.sql)?;
469                    bind_params(&mut stmt, &statement.params)?;
470                    stmt.raw_execute()
471                })();
472                match res {
473                    Ok(n) => total += n as u64,
474                    Err(e) => return (conn, Err(e)),
475                }
476            }
477            (conn, Ok(total))
478        })
479        .await
480        .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.execute_batch", e))?;
481        self.conn = Some(conn);
482        result.map_err(|e| map_rusqlite_err(e, "tx.execute_batch"))
483    }
484
485    async fn execute_script(&mut self, script: String) -> khive_storage::types::StorageResult<()> {
486        let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
487            operation: "tx.execute_script".into(),
488            message: "connection already consumed".into(),
489        })?;
490        let (conn, result) = tokio::task::spawn_blocking(move || {
491            let res = conn.execute_batch(&script);
492            (conn, res)
493        })
494        .await
495        .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.execute_script", e))?;
496        self.conn = Some(conn);
497        result.map_err(|e| map_rusqlite_err(e, "tx.execute_script"))
498    }
499}
500
501#[async_trait]
502impl khive_storage::SqlTransaction for SqliteTransaction {
503    async fn commit(mut self: Box<Self>) -> khive_storage::types::StorageResult<()> {
504        let conn = self.conn.take().ok_or_else(|| StorageError::Transaction {
505            operation: "commit".into(),
506            message: "connection already consumed".into(),
507        })?;
508        let read_only = self.read_only;
509        tokio::task::spawn_blocking(move || {
510            // Reset query_only before COMMIT so the connection ends cleanly.
511            if read_only {
512                let _ = conn.pragma_update(None, "query_only", "OFF");
513            }
514            conn.execute_batch("COMMIT")
515                .map_err(|e| map_rusqlite_err(e, "commit"))
516        })
517        .await
518        .map_err(|e| StorageError::driver(StorageCapability::Sql, "commit", e))?
519    }
520
521    async fn rollback(mut self: Box<Self>) -> khive_storage::types::StorageResult<()> {
522        let conn = self.conn.take().ok_or_else(|| StorageError::Transaction {
523            operation: "rollback".into(),
524            message: "connection already consumed".into(),
525        })?;
526        let read_only = self.read_only;
527        tokio::task::spawn_blocking(move || {
528            // Reset query_only before ROLLBACK so the connection ends cleanly.
529            if read_only {
530                let _ = conn.pragma_update(None, "query_only", "OFF");
531            }
532            conn.execute_batch("ROLLBACK")
533                .map_err(|e| map_rusqlite_err(e, "rollback"))
534        })
535        .await
536        .map_err(|e| StorageError::driver(StorageCapability::Sql, "rollback", e))?
537    }
538}
539
540// =============================================================================
541// Pool-backed reader/writer (in-memory databases)
542// =============================================================================
543
544struct PoolBackedReader {
545    pool: Arc<ConnectionPool>,
546}
547
548#[async_trait]
549impl khive_storage::SqlReader for PoolBackedReader {
550    async fn query_row(
551        &mut self,
552        statement: SqlStatement,
553    ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
554        let pool = Arc::clone(&self.pool);
555        tokio::task::spawn_blocking(move || {
556            let guard = pool
557                .reader()
558                .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader", e))?;
559            let rows = execute_query(&guard, &statement)
560                .map_err(|e| map_rusqlite_err(e, "pool_reader.query_row"))?;
561            Ok(rows.into_iter().next())
562        })
563        .await
564        .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader.query_row", e))?
565    }
566
567    async fn query_all(
568        &mut self,
569        statement: SqlStatement,
570    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
571        let pool = Arc::clone(&self.pool);
572        tokio::task::spawn_blocking(move || {
573            let guard = pool
574                .reader()
575                .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader", e))?;
576            execute_query(&guard, &statement)
577                .map_err(|e| map_rusqlite_err(e, "pool_reader.query_all"))
578        })
579        .await
580        .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader.query_all", e))?
581    }
582
583    async fn query_scalar(
584        &mut self,
585        statement: SqlStatement,
586    ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
587        let row = self.query_row(statement).await?;
588        Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
589    }
590
591    async fn explain(
592        &mut self,
593        statement: SqlStatement,
594    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
595        let explain_stmt = SqlStatement {
596            sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
597            params: statement.params,
598            label: statement.label,
599        };
600        self.query_all(explain_stmt).await
601    }
602}
603
604struct PoolBackedWriter {
605    pool: Arc<ConnectionPool>,
606}
607
608#[async_trait]
609impl khive_storage::SqlReader for PoolBackedWriter {
610    async fn query_row(
611        &mut self,
612        statement: SqlStatement,
613    ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
614        let pool = Arc::clone(&self.pool);
615        tokio::task::spawn_blocking(move || {
616            let guard = pool.try_writer().map_err(|e: SqliteError| {
617                StorageError::driver(StorageCapability::Sql, "pool_writer.query_row", e)
618            })?;
619            let rows = execute_query(&guard, &statement)
620                .map_err(|e| map_rusqlite_err(e, "pool_writer.query_row"))?;
621            Ok(rows.into_iter().next())
622        })
623        .await
624        .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.query_row", e))?
625    }
626
627    async fn query_all(
628        &mut self,
629        statement: SqlStatement,
630    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
631        let pool = Arc::clone(&self.pool);
632        tokio::task::spawn_blocking(move || {
633            let guard = pool.try_writer().map_err(|e: SqliteError| {
634                StorageError::driver(StorageCapability::Sql, "pool_writer.query_all", e)
635            })?;
636            execute_query(&guard, &statement)
637                .map_err(|e| map_rusqlite_err(e, "pool_writer.query_all"))
638        })
639        .await
640        .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.query_all", e))?
641    }
642
643    async fn query_scalar(
644        &mut self,
645        statement: SqlStatement,
646    ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
647        let row = khive_storage::SqlReader::query_row(self, statement).await?;
648        Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
649    }
650
651    async fn explain(
652        &mut self,
653        statement: SqlStatement,
654    ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
655        let explain_stmt = SqlStatement {
656            sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
657            params: statement.params,
658            label: statement.label,
659        };
660        khive_storage::SqlReader::query_all(self, explain_stmt).await
661    }
662}
663
664#[async_trait]
665impl khive_storage::SqlWriter for PoolBackedWriter {
666    async fn execute(
667        &mut self,
668        statement: SqlStatement,
669    ) -> khive_storage::types::StorageResult<u64> {
670        let pool = Arc::clone(&self.pool);
671        tokio::task::spawn_blocking(move || {
672            let guard = pool.try_writer().map_err(|e: SqliteError| {
673                StorageError::driver(StorageCapability::Sql, "pool_writer.execute", e)
674            })?;
675            let mut stmt = guard
676                .prepare(&statement.sql)
677                .map_err(|e| map_rusqlite_err(e, "pool_writer.execute"))?;
678            bind_params(&mut stmt, &statement.params)
679                .map_err(|e| map_rusqlite_err(e, "pool_writer.execute"))?;
680            let rows = stmt
681                .raw_execute()
682                .map_err(|e| map_rusqlite_err(e, "pool_writer.execute"))?;
683            Ok(rows as u64)
684        })
685        .await
686        .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.execute", e))?
687    }
688
689    async fn execute_batch(
690        &mut self,
691        statements: Vec<SqlStatement>,
692    ) -> khive_storage::types::StorageResult<u64> {
693        let pool = Arc::clone(&self.pool);
694        tokio::task::spawn_blocking(move || {
695            let guard = pool.try_writer().map_err(|e: SqliteError| {
696                StorageError::driver(StorageCapability::Sql, "pool_writer.execute_batch", e)
697            })?;
698            guard
699                .execute_batch("BEGIN IMMEDIATE")
700                .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?;
701            let result = (|| -> Result<u64, StorageError> {
702                let mut total = 0u64;
703                for statement in &statements {
704                    let mut stmt = guard
705                        .prepare(&statement.sql)
706                        .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?;
707                    bind_params(&mut stmt, &statement.params)
708                        .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?;
709                    total += stmt
710                        .raw_execute()
711                        .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?
712                        as u64;
713                }
714                Ok(total)
715            })();
716            match result {
717                Ok(total) => {
718                    if let Err(e) = guard.execute_batch("COMMIT") {
719                        let _ = guard.execute_batch("ROLLBACK");
720                        Err(map_rusqlite_err(e, "pool_writer.execute_batch"))
721                    } else {
722                        Ok(total)
723                    }
724                }
725                Err(e) => {
726                    let _ = guard.execute_batch("ROLLBACK");
727                    Err(e)
728                }
729            }
730        })
731        .await
732        .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.execute_batch", e))?
733    }
734
735    async fn execute_script(&mut self, script: String) -> khive_storage::types::StorageResult<()> {
736        let pool = Arc::clone(&self.pool);
737        tokio::task::spawn_blocking(move || {
738            let guard = pool.try_writer().map_err(|e: SqliteError| {
739                StorageError::driver(StorageCapability::Sql, "pool_writer.execute_script", e)
740            })?;
741            guard
742                .execute_batch(&script)
743                .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_script"))
744        })
745        .await
746        .map_err(|e| {
747            StorageError::driver(StorageCapability::Sql, "pool_writer.execute_script", e)
748        })?
749    }
750}
751
752// =============================================================================
753// SqlBridge: the SqlAccess implementor
754// =============================================================================
755
756/// Bridges `ConnectionPool` to `khive_storage::SqlAccess`.
757///
758/// Dispatches based on whether the pool is file-backed or in-memory:
759/// - File-backed: standalone connections per reader/writer/tx (high concurrency).
760/// - In-memory: pool-backed connections per query (single shared connection).
761pub struct SqlBridge {
762    pool: Arc<ConnectionPool>,
763    is_file_backed: bool,
764}
765
766impl SqlBridge {
767    /// Create a new bridge wrapping the given pool.
768    pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
769        Self {
770            pool,
771            is_file_backed,
772        }
773    }
774}
775
776#[async_trait]
777impl khive_storage::SqlAccess for SqlBridge {
778    async fn reader(
779        &self,
780    ) -> khive_storage::types::StorageResult<Box<dyn khive_storage::SqlReader>> {
781        if self.is_file_backed {
782            let conn = open_standalone_reader(&self.pool)?;
783            Ok(Box::new(SqliteReader { conn: Some(conn) }))
784        } else {
785            Ok(Box::new(PoolBackedReader {
786                pool: Arc::clone(&self.pool),
787            }))
788        }
789    }
790
791    async fn writer(
792        &self,
793    ) -> khive_storage::types::StorageResult<Box<dyn khive_storage::SqlWriter>> {
794        if self.is_file_backed {
795            let conn = open_standalone_writer(&self.pool)?;
796            Ok(Box::new(SqliteWriter { conn: Some(conn) }))
797        } else {
798            Ok(Box::new(PoolBackedWriter {
799                pool: Arc::clone(&self.pool),
800            }))
801        }
802    }
803
804    async fn begin_tx(
805        &self,
806        options: SqlTxOptions,
807    ) -> khive_storage::types::StorageResult<Box<dyn khive_storage::SqlTransaction>> {
808        // Transactions need a standalone connection so the BEGIN/COMMIT state
809        // is not shared with other operations. For in-memory DBs we still
810        // open a standalone writer since the pool writer would conflict.
811        let conn = if self.is_file_backed {
812            open_standalone_writer(&self.pool)?
813        } else {
814            return Err(StorageError::Pool {
815                operation: "begin_tx".into(),
816                message: "transactions require file-backed database (not in-memory)".into(),
817            });
818        };
819
820        // Map isolation level to SQLite BEGIN mode.
821        // SQLite WAL mode gives snapshot isolation for readers automatically;
822        // IMMEDIATE acquires the write lock early (prevents writer starvation),
823        // EXCLUSIVE prevents any concurrent readers for full serializability.
824        let read_only = options.read_only;
825        let begin_stmt = match options.isolation {
826            SqlIsolation::Serializable => "BEGIN EXCLUSIVE",
827            _ => {
828                if read_only {
829                    // DEFERRED acquires no lock at BEGIN time, compatible with
830                    // read-only transactions (no write-intent needed).
831                    "BEGIN DEFERRED"
832                } else {
833                    // IMMEDIATE acquires the write lock early to prevent starvation.
834                    "BEGIN IMMEDIATE"
835                }
836            }
837        };
838        conn.execute_batch(begin_stmt)
839            .map_err(|e| map_rusqlite_err(e, "begin_tx"))?;
840
841        // Honor read_only: block all writes via PRAGMA query_only.
842        // The connection is opened as read-write so COMMIT still works, but
843        // any INSERT/UPDATE/DELETE executed inside the transaction will error.
844        if read_only {
845            conn.pragma_update(None, "query_only", "ON")
846                .map_err(|e| map_rusqlite_err(e, "begin_tx"))?;
847        }
848
849        Ok(Box::new(SqliteTransaction {
850            conn: Some(conn),
851            read_only,
852        }))
853    }
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859    use crate::pool::PoolConfig;
860    use khive_storage::types::{SqlIsolation, SqlStatement, SqlTxOptions, SqlValue};
861    use khive_storage::SqlAccess as _;
862
863    /// Verify that a read-only transaction rejects INSERT statements via
864    /// PRAGMA query_only.
865    #[tokio::test]
866    async fn tx_read_only_rejects_writes() {
867        let dir = tempfile::tempdir().unwrap();
868        let path = dir.path().join("tx_ro.db");
869        let config = PoolConfig {
870            path: Some(path.clone()),
871            ..PoolConfig::default()
872        };
873        let pool = Arc::new(ConnectionPool::new(config).unwrap());
874
875        // Create a table so there is something to INSERT into.
876        {
877            let guard = pool.writer().unwrap();
878            guard
879                .conn()
880                .execute_batch("CREATE TABLE IF NOT EXISTS ro_test (id INTEGER PRIMARY KEY)")
881                .unwrap();
882        }
883
884        let bridge = SqlBridge::new(Arc::clone(&pool), true);
885
886        let mut tx = bridge
887            .begin_tx(SqlTxOptions {
888                read_only: true,
889                isolation: SqlIsolation::Default,
890                label: None,
891            })
892            .await
893            .unwrap();
894
895        // An INSERT inside a read-only transaction must fail.
896        let result = tx
897            .execute(SqlStatement {
898                sql: "INSERT INTO ro_test (id) VALUES (?1)".into(),
899                params: vec![SqlValue::Integer(1)],
900                label: None,
901            })
902            .await;
903
904        assert!(result.is_err(), "INSERT in read-only tx must fail");
905
906        // Rollback should succeed regardless.
907        tx.rollback().await.unwrap();
908    }
909}