1use std::sync::Arc;
8
9use async_trait::async_trait;
10
11use khive_storage::error::StorageError;
12use khive_storage::types::{SqlColumn, SqlIsolation, SqlRow, SqlStatement, SqlTxOptions, SqlValue};
13use khive_storage::StorageCapability;
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn row_to_sql_row(row: &rusqlite::Row<'_>, col_count: usize, col_names: &[String]) -> SqlRow {
24 let mut columns = Vec::with_capacity(col_count);
25 for i in 0..col_count {
26 let value = match row.get_ref(i) {
27 Ok(rusqlite::types::ValueRef::Null) => SqlValue::Null,
28 Ok(rusqlite::types::ValueRef::Integer(v)) => SqlValue::Integer(v),
29 Ok(rusqlite::types::ValueRef::Real(v)) => SqlValue::Float(v),
30 Ok(rusqlite::types::ValueRef::Text(bytes)) => {
31 SqlValue::Text(String::from_utf8_lossy(bytes).into_owned())
32 }
33 Ok(rusqlite::types::ValueRef::Blob(bytes)) => SqlValue::Blob(bytes.to_vec()),
34 Err(_) => SqlValue::Null,
35 };
36 columns.push(SqlColumn {
37 name: col_names.get(i).cloned().unwrap_or_default(),
38 value,
39 });
40 }
41 SqlRow { columns }
42}
43
44fn bind_params(
46 stmt: &mut rusqlite::Statement<'_>,
47 params: &[SqlValue],
48) -> Result<(), rusqlite::Error> {
49 for (i, param) in params.iter().enumerate() {
50 let idx = i + 1; match param {
52 SqlValue::Null => stmt.raw_bind_parameter(idx, rusqlite::types::Null)?,
53 SqlValue::Bool(v) => stmt.raw_bind_parameter(idx, *v as i64)?,
54 SqlValue::Integer(v) => stmt.raw_bind_parameter(idx, *v)?,
55 SqlValue::Float(v) => stmt.raw_bind_parameter(idx, *v)?,
56 SqlValue::Text(v) => stmt.raw_bind_parameter(idx, v.as_str())?,
57 SqlValue::Blob(v) => stmt.raw_bind_parameter(idx, v.as_slice())?,
58 SqlValue::Json(v) => {
59 let s = serde_json::to_string(v).unwrap_or_default();
60 stmt.raw_bind_parameter(idx, s.as_str())?;
61 }
62 SqlValue::Uuid(v) => stmt.raw_bind_parameter(idx, v.to_string().as_str())?,
63 SqlValue::Timestamp(v) => {
64 stmt.raw_bind_parameter(idx, v.timestamp_micros())?;
65 }
66 }
67 }
68 Ok(())
69}
70
71fn execute_query(
73 conn: &rusqlite::Connection,
74 statement: &SqlStatement,
75) -> Result<Vec<SqlRow>, rusqlite::Error> {
76 let mut stmt = conn.prepare(&statement.sql)?;
77 bind_params(&mut stmt, &statement.params)?;
78
79 let col_count = stmt.column_count();
80 let col_names: Vec<String> = (0..col_count)
81 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
82 .collect();
83
84 let mut rows = Vec::new();
85 let mut raw_rows = stmt.raw_query();
86 while let Some(row) = raw_rows.next()? {
87 rows.push(row_to_sql_row(row, col_count, &col_names));
88 }
89 Ok(rows)
90}
91
92fn map_rusqlite_err(e: rusqlite::Error, op: &'static str) -> StorageError {
94 StorageError::driver(StorageCapability::Sql, op, e)
95}
96
97fn open_standalone_reader(pool: &ConnectionPool) -> Result<rusqlite::Connection, StorageError> {
102 let config = pool.config();
103 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
104 operation: "reader".into(),
105 message: "in-memory databases do not support standalone readers; use pool-backed".into(),
106 })?;
107
108 let conn = rusqlite::Connection::open_with_flags(
109 path,
110 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
111 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
112 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
113 )
114 .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
115
116 conn.busy_timeout(config.busy_timeout)
117 .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
118 conn.pragma_update(None, "cache_size", "-65536")
119 .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
120 conn.pragma_update(None, "mmap_size", "1073741824")
121 .map_err(|e| map_rusqlite_err(e, "open_reader"))?;
122
123 Ok(conn)
124}
125
126fn open_standalone_writer(pool: &ConnectionPool) -> Result<rusqlite::Connection, StorageError> {
127 let config = pool.config();
128 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
129 operation: "writer".into(),
130 message: "in-memory databases do not support standalone writer; use pool-backed".into(),
131 })?;
132
133 let conn = rusqlite::Connection::open_with_flags(
134 path,
135 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
136 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
137 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
138 )
139 .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
140
141 conn.busy_timeout(config.busy_timeout)
142 .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
143 conn.pragma_update(None, "cache_size", "-65536")
144 .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
145 conn.pragma_update(None, "mmap_size", "1073741824")
146 .map_err(|e| map_rusqlite_err(e, "open_writer"))?;
147
148 Ok(conn)
149}
150
151struct SqliteReader {
156 conn: Option<rusqlite::Connection>,
157}
158
159#[async_trait]
160impl khive_storage::SqlReader for SqliteReader {
161 async fn query_row(
162 &mut self,
163 statement: SqlStatement,
164 ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
165 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
166 operation: "query_row".into(),
167 message: "connection already consumed".into(),
168 })?;
169 let (conn, result) = tokio::task::spawn_blocking(move || {
170 let res = execute_query(&conn, &statement);
171 (conn, res)
172 })
173 .await
174 .map_err(|e| StorageError::driver(StorageCapability::Sql, "query_row", e))?;
175 self.conn = Some(conn);
176 let rows = result.map_err(|e| map_rusqlite_err(e, "query_row"))?;
177 Ok(rows.into_iter().next())
178 }
179
180 async fn query_all(
181 &mut self,
182 statement: SqlStatement,
183 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
184 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
185 operation: "query_all".into(),
186 message: "connection already consumed".into(),
187 })?;
188 let (conn, result) = tokio::task::spawn_blocking(move || {
189 let res = execute_query(&conn, &statement);
190 (conn, res)
191 })
192 .await
193 .map_err(|e| StorageError::driver(StorageCapability::Sql, "query_all", e))?;
194 self.conn = Some(conn);
195 result.map_err(|e| map_rusqlite_err(e, "query_all"))
196 }
197
198 async fn query_scalar(
199 &mut self,
200 statement: SqlStatement,
201 ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
202 let row = self.query_row(statement).await?;
203 Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
204 }
205
206 async fn explain(
207 &mut self,
208 statement: SqlStatement,
209 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
210 let explain_stmt = SqlStatement {
211 sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
212 params: statement.params,
213 label: statement.label,
214 };
215 self.query_all(explain_stmt).await
216 }
217}
218
219struct SqliteWriter {
224 conn: Option<rusqlite::Connection>,
225}
226
227#[async_trait]
228impl khive_storage::SqlReader for SqliteWriter {
229 async fn query_row(
230 &mut self,
231 statement: SqlStatement,
232 ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
233 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
234 operation: "writer.query_row".into(),
235 message: "connection already consumed".into(),
236 })?;
237 let (conn, result) = tokio::task::spawn_blocking(move || {
238 let res = execute_query(&conn, &statement);
239 (conn, res)
240 })
241 .await
242 .map_err(|e| StorageError::driver(StorageCapability::Sql, "writer.query_row", e))?;
243 self.conn = Some(conn);
244 let rows = result.map_err(|e| map_rusqlite_err(e, "writer.query_row"))?;
245 Ok(rows.into_iter().next())
246 }
247
248 async fn query_all(
249 &mut self,
250 statement: SqlStatement,
251 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
252 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
253 operation: "writer.query_all".into(),
254 message: "connection already consumed".into(),
255 })?;
256 let (conn, result) = tokio::task::spawn_blocking(move || {
257 let res = execute_query(&conn, &statement);
258 (conn, res)
259 })
260 .await
261 .map_err(|e| StorageError::driver(StorageCapability::Sql, "writer.query_all", e))?;
262 self.conn = Some(conn);
263 result.map_err(|e| map_rusqlite_err(e, "writer.query_all"))
264 }
265
266 async fn query_scalar(
267 &mut self,
268 statement: SqlStatement,
269 ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
270 let row = khive_storage::SqlReader::query_row(self, statement).await?;
271 Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
272 }
273
274 async fn explain(
275 &mut self,
276 statement: SqlStatement,
277 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
278 let explain_stmt = SqlStatement {
279 sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
280 params: statement.params,
281 label: statement.label,
282 };
283 khive_storage::SqlReader::query_all(self, explain_stmt).await
284 }
285}
286
287#[async_trait]
288impl khive_storage::SqlWriter for SqliteWriter {
289 async fn execute(
290 &mut self,
291 statement: SqlStatement,
292 ) -> khive_storage::types::StorageResult<u64> {
293 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
294 operation: "execute".into(),
295 message: "connection already consumed".into(),
296 })?;
297 let (conn, result) = tokio::task::spawn_blocking(move || {
298 let res = (|| -> Result<usize, rusqlite::Error> {
299 let mut stmt = conn.prepare(&statement.sql)?;
300 bind_params(&mut stmt, &statement.params)?;
301 stmt.raw_execute()
302 })();
303 (conn, res)
304 })
305 .await
306 .map_err(|e| StorageError::driver(StorageCapability::Sql, "execute", e))?;
307 self.conn = Some(conn);
308 let affected = result.map_err(|e| map_rusqlite_err(e, "execute"))?;
309 Ok(affected as u64)
310 }
311
312 async fn execute_batch(
313 &mut self,
314 statements: Vec<SqlStatement>,
315 ) -> khive_storage::types::StorageResult<u64> {
316 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
317 operation: "execute_batch".into(),
318 message: "connection already consumed".into(),
319 })?;
320 let (conn, result) = tokio::task::spawn_blocking(move || {
321 let result = (|| -> Result<u64, rusqlite::Error> {
322 conn.execute_batch("BEGIN IMMEDIATE")?;
323 let mut total: u64 = 0;
324 for statement in &statements {
325 let mut stmt = conn.prepare(&statement.sql)?;
326 bind_params(&mut stmt, &statement.params)?;
327 total += stmt.raw_execute()? as u64;
328 }
329 conn.execute_batch("COMMIT")?;
330 Ok(total)
331 })();
332 if result.is_err() {
333 let _ = conn.execute_batch("ROLLBACK");
334 }
335 (conn, result)
336 })
337 .await
338 .map_err(|e| StorageError::driver(StorageCapability::Sql, "execute_batch", e))?;
339 self.conn = Some(conn);
340 result.map_err(|e| map_rusqlite_err(e, "execute_batch"))
341 }
342
343 async fn execute_script(&mut self, script: String) -> khive_storage::types::StorageResult<()> {
344 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
345 operation: "execute_script".into(),
346 message: "connection already consumed".into(),
347 })?;
348 let (conn, result) = tokio::task::spawn_blocking(move || {
349 let res = conn.execute_batch(&script);
350 (conn, res)
351 })
352 .await
353 .map_err(|e| StorageError::driver(StorageCapability::Sql, "execute_script", e))?;
354 self.conn = Some(conn);
355 result.map_err(|e| map_rusqlite_err(e, "execute_script"))
356 }
357}
358
359struct SqliteTransaction {
364 conn: Option<rusqlite::Connection>,
365 read_only: bool,
369}
370
371#[async_trait]
372impl khive_storage::SqlReader for SqliteTransaction {
373 async fn query_row(
374 &mut self,
375 statement: SqlStatement,
376 ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
377 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
378 operation: "tx.query_row".into(),
379 message: "connection already consumed".into(),
380 })?;
381 let (conn, result) = tokio::task::spawn_blocking(move || {
382 let res = execute_query(&conn, &statement);
383 (conn, res)
384 })
385 .await
386 .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.query_row", e))?;
387 self.conn = Some(conn);
388 let rows = result.map_err(|e| map_rusqlite_err(e, "tx.query_row"))?;
389 Ok(rows.into_iter().next())
390 }
391
392 async fn query_all(
393 &mut self,
394 statement: SqlStatement,
395 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
396 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
397 operation: "tx.query_all".into(),
398 message: "connection already consumed".into(),
399 })?;
400 let (conn, result) = tokio::task::spawn_blocking(move || {
401 let res = execute_query(&conn, &statement);
402 (conn, res)
403 })
404 .await
405 .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.query_all", e))?;
406 self.conn = Some(conn);
407 result.map_err(|e| map_rusqlite_err(e, "tx.query_all"))
408 }
409
410 async fn query_scalar(
411 &mut self,
412 statement: SqlStatement,
413 ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
414 let row = khive_storage::SqlReader::query_row(self, statement).await?;
415 Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
416 }
417
418 async fn explain(
419 &mut self,
420 statement: SqlStatement,
421 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
422 let explain_stmt = SqlStatement {
423 sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
424 params: statement.params,
425 label: statement.label,
426 };
427 khive_storage::SqlReader::query_all(self, explain_stmt).await
428 }
429}
430
431#[async_trait]
432impl khive_storage::SqlWriter for SqliteTransaction {
433 async fn execute(
434 &mut self,
435 statement: SqlStatement,
436 ) -> khive_storage::types::StorageResult<u64> {
437 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
438 operation: "tx.execute".into(),
439 message: "connection already consumed".into(),
440 })?;
441 let (conn, result) = tokio::task::spawn_blocking(move || {
442 let res = (|| -> Result<usize, rusqlite::Error> {
443 let mut stmt = conn.prepare(&statement.sql)?;
444 bind_params(&mut stmt, &statement.params)?;
445 stmt.raw_execute()
446 })();
447 (conn, res)
448 })
449 .await
450 .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.execute", e))?;
451 self.conn = Some(conn);
452 let affected = result.map_err(|e| map_rusqlite_err(e, "tx.execute"))?;
453 Ok(affected as u64)
454 }
455
456 async fn execute_batch(
457 &mut self,
458 statements: Vec<SqlStatement>,
459 ) -> khive_storage::types::StorageResult<u64> {
460 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
461 operation: "tx.execute_batch".into(),
462 message: "connection already consumed".into(),
463 })?;
464 let (conn, result) = tokio::task::spawn_blocking(move || {
465 let mut total: u64 = 0;
466 for statement in &statements {
467 let res = (|| -> Result<usize, rusqlite::Error> {
468 let mut stmt = conn.prepare(&statement.sql)?;
469 bind_params(&mut stmt, &statement.params)?;
470 stmt.raw_execute()
471 })();
472 match res {
473 Ok(n) => total += n as u64,
474 Err(e) => return (conn, Err(e)),
475 }
476 }
477 (conn, Ok(total))
478 })
479 .await
480 .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.execute_batch", e))?;
481 self.conn = Some(conn);
482 result.map_err(|e| map_rusqlite_err(e, "tx.execute_batch"))
483 }
484
485 async fn execute_script(&mut self, script: String) -> khive_storage::types::StorageResult<()> {
486 let conn = self.conn.take().ok_or_else(|| StorageError::Pool {
487 operation: "tx.execute_script".into(),
488 message: "connection already consumed".into(),
489 })?;
490 let (conn, result) = tokio::task::spawn_blocking(move || {
491 let res = conn.execute_batch(&script);
492 (conn, res)
493 })
494 .await
495 .map_err(|e| StorageError::driver(StorageCapability::Sql, "tx.execute_script", e))?;
496 self.conn = Some(conn);
497 result.map_err(|e| map_rusqlite_err(e, "tx.execute_script"))
498 }
499}
500
501#[async_trait]
502impl khive_storage::SqlTransaction for SqliteTransaction {
503 async fn commit(mut self: Box<Self>) -> khive_storage::types::StorageResult<()> {
504 let conn = self.conn.take().ok_or_else(|| StorageError::Transaction {
505 operation: "commit".into(),
506 message: "connection already consumed".into(),
507 })?;
508 let read_only = self.read_only;
509 tokio::task::spawn_blocking(move || {
510 if read_only {
512 let _ = conn.pragma_update(None, "query_only", "OFF");
513 }
514 conn.execute_batch("COMMIT")
515 .map_err(|e| map_rusqlite_err(e, "commit"))
516 })
517 .await
518 .map_err(|e| StorageError::driver(StorageCapability::Sql, "commit", e))?
519 }
520
521 async fn rollback(mut self: Box<Self>) -> khive_storage::types::StorageResult<()> {
522 let conn = self.conn.take().ok_or_else(|| StorageError::Transaction {
523 operation: "rollback".into(),
524 message: "connection already consumed".into(),
525 })?;
526 let read_only = self.read_only;
527 tokio::task::spawn_blocking(move || {
528 if read_only {
530 let _ = conn.pragma_update(None, "query_only", "OFF");
531 }
532 conn.execute_batch("ROLLBACK")
533 .map_err(|e| map_rusqlite_err(e, "rollback"))
534 })
535 .await
536 .map_err(|e| StorageError::driver(StorageCapability::Sql, "rollback", e))?
537 }
538}
539
540struct PoolBackedReader {
545 pool: Arc<ConnectionPool>,
546}
547
548#[async_trait]
549impl khive_storage::SqlReader for PoolBackedReader {
550 async fn query_row(
551 &mut self,
552 statement: SqlStatement,
553 ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
554 let pool = Arc::clone(&self.pool);
555 tokio::task::spawn_blocking(move || {
556 let guard = pool
557 .reader()
558 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader", e))?;
559 let rows = execute_query(&guard, &statement)
560 .map_err(|e| map_rusqlite_err(e, "pool_reader.query_row"))?;
561 Ok(rows.into_iter().next())
562 })
563 .await
564 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader.query_row", e))?
565 }
566
567 async fn query_all(
568 &mut self,
569 statement: SqlStatement,
570 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
571 let pool = Arc::clone(&self.pool);
572 tokio::task::spawn_blocking(move || {
573 let guard = pool
574 .reader()
575 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader", e))?;
576 execute_query(&guard, &statement)
577 .map_err(|e| map_rusqlite_err(e, "pool_reader.query_all"))
578 })
579 .await
580 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_reader.query_all", e))?
581 }
582
583 async fn query_scalar(
584 &mut self,
585 statement: SqlStatement,
586 ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
587 let row = self.query_row(statement).await?;
588 Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
589 }
590
591 async fn explain(
592 &mut self,
593 statement: SqlStatement,
594 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
595 let explain_stmt = SqlStatement {
596 sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
597 params: statement.params,
598 label: statement.label,
599 };
600 self.query_all(explain_stmt).await
601 }
602}
603
604struct PoolBackedWriter {
605 pool: Arc<ConnectionPool>,
606}
607
608#[async_trait]
609impl khive_storage::SqlReader for PoolBackedWriter {
610 async fn query_row(
611 &mut self,
612 statement: SqlStatement,
613 ) -> khive_storage::types::StorageResult<Option<SqlRow>> {
614 let pool = Arc::clone(&self.pool);
615 tokio::task::spawn_blocking(move || {
616 let guard = pool.try_writer().map_err(|e: SqliteError| {
617 StorageError::driver(StorageCapability::Sql, "pool_writer.query_row", e)
618 })?;
619 let rows = execute_query(&guard, &statement)
620 .map_err(|e| map_rusqlite_err(e, "pool_writer.query_row"))?;
621 Ok(rows.into_iter().next())
622 })
623 .await
624 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.query_row", e))?
625 }
626
627 async fn query_all(
628 &mut self,
629 statement: SqlStatement,
630 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
631 let pool = Arc::clone(&self.pool);
632 tokio::task::spawn_blocking(move || {
633 let guard = pool.try_writer().map_err(|e: SqliteError| {
634 StorageError::driver(StorageCapability::Sql, "pool_writer.query_all", e)
635 })?;
636 execute_query(&guard, &statement)
637 .map_err(|e| map_rusqlite_err(e, "pool_writer.query_all"))
638 })
639 .await
640 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.query_all", e))?
641 }
642
643 async fn query_scalar(
644 &mut self,
645 statement: SqlStatement,
646 ) -> khive_storage::types::StorageResult<Option<SqlValue>> {
647 let row = khive_storage::SqlReader::query_row(self, statement).await?;
648 Ok(row.and_then(|r| r.columns.into_iter().next().map(|c| c.value)))
649 }
650
651 async fn explain(
652 &mut self,
653 statement: SqlStatement,
654 ) -> khive_storage::types::StorageResult<Vec<SqlRow>> {
655 let explain_stmt = SqlStatement {
656 sql: format!("EXPLAIN QUERY PLAN {}", statement.sql),
657 params: statement.params,
658 label: statement.label,
659 };
660 khive_storage::SqlReader::query_all(self, explain_stmt).await
661 }
662}
663
664#[async_trait]
665impl khive_storage::SqlWriter for PoolBackedWriter {
666 async fn execute(
667 &mut self,
668 statement: SqlStatement,
669 ) -> khive_storage::types::StorageResult<u64> {
670 let pool = Arc::clone(&self.pool);
671 tokio::task::spawn_blocking(move || {
672 let guard = pool.try_writer().map_err(|e: SqliteError| {
673 StorageError::driver(StorageCapability::Sql, "pool_writer.execute", e)
674 })?;
675 let mut stmt = guard
676 .prepare(&statement.sql)
677 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute"))?;
678 bind_params(&mut stmt, &statement.params)
679 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute"))?;
680 let rows = stmt
681 .raw_execute()
682 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute"))?;
683 Ok(rows as u64)
684 })
685 .await
686 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.execute", e))?
687 }
688
689 async fn execute_batch(
690 &mut self,
691 statements: Vec<SqlStatement>,
692 ) -> khive_storage::types::StorageResult<u64> {
693 let pool = Arc::clone(&self.pool);
694 tokio::task::spawn_blocking(move || {
695 let guard = pool.try_writer().map_err(|e: SqliteError| {
696 StorageError::driver(StorageCapability::Sql, "pool_writer.execute_batch", e)
697 })?;
698 guard
699 .execute_batch("BEGIN IMMEDIATE")
700 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?;
701 let result = (|| -> Result<u64, StorageError> {
702 let mut total = 0u64;
703 for statement in &statements {
704 let mut stmt = guard
705 .prepare(&statement.sql)
706 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?;
707 bind_params(&mut stmt, &statement.params)
708 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?;
709 total += stmt
710 .raw_execute()
711 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_batch"))?
712 as u64;
713 }
714 Ok(total)
715 })();
716 match result {
717 Ok(total) => {
718 if let Err(e) = guard.execute_batch("COMMIT") {
719 let _ = guard.execute_batch("ROLLBACK");
720 Err(map_rusqlite_err(e, "pool_writer.execute_batch"))
721 } else {
722 Ok(total)
723 }
724 }
725 Err(e) => {
726 let _ = guard.execute_batch("ROLLBACK");
727 Err(e)
728 }
729 }
730 })
731 .await
732 .map_err(|e| StorageError::driver(StorageCapability::Sql, "pool_writer.execute_batch", e))?
733 }
734
735 async fn execute_script(&mut self, script: String) -> khive_storage::types::StorageResult<()> {
736 let pool = Arc::clone(&self.pool);
737 tokio::task::spawn_blocking(move || {
738 let guard = pool.try_writer().map_err(|e: SqliteError| {
739 StorageError::driver(StorageCapability::Sql, "pool_writer.execute_script", e)
740 })?;
741 guard
742 .execute_batch(&script)
743 .map_err(|e| map_rusqlite_err(e, "pool_writer.execute_script"))
744 })
745 .await
746 .map_err(|e| {
747 StorageError::driver(StorageCapability::Sql, "pool_writer.execute_script", e)
748 })?
749 }
750}
751
752pub struct SqlBridge {
762 pool: Arc<ConnectionPool>,
763 is_file_backed: bool,
764}
765
766impl SqlBridge {
767 pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
769 Self {
770 pool,
771 is_file_backed,
772 }
773 }
774}
775
776#[async_trait]
777impl khive_storage::SqlAccess for SqlBridge {
778 async fn reader(
779 &self,
780 ) -> khive_storage::types::StorageResult<Box<dyn khive_storage::SqlReader>> {
781 if self.is_file_backed {
782 let conn = open_standalone_reader(&self.pool)?;
783 Ok(Box::new(SqliteReader { conn: Some(conn) }))
784 } else {
785 Ok(Box::new(PoolBackedReader {
786 pool: Arc::clone(&self.pool),
787 }))
788 }
789 }
790
791 async fn writer(
792 &self,
793 ) -> khive_storage::types::StorageResult<Box<dyn khive_storage::SqlWriter>> {
794 if self.is_file_backed {
795 let conn = open_standalone_writer(&self.pool)?;
796 Ok(Box::new(SqliteWriter { conn: Some(conn) }))
797 } else {
798 Ok(Box::new(PoolBackedWriter {
799 pool: Arc::clone(&self.pool),
800 }))
801 }
802 }
803
804 async fn begin_tx(
805 &self,
806 options: SqlTxOptions,
807 ) -> khive_storage::types::StorageResult<Box<dyn khive_storage::SqlTransaction>> {
808 let conn = if self.is_file_backed {
812 open_standalone_writer(&self.pool)?
813 } else {
814 return Err(StorageError::Pool {
815 operation: "begin_tx".into(),
816 message: "transactions require file-backed database (not in-memory)".into(),
817 });
818 };
819
820 let read_only = options.read_only;
825 let begin_stmt = match options.isolation {
826 SqlIsolation::Serializable => "BEGIN EXCLUSIVE",
827 _ => {
828 if read_only {
829 "BEGIN DEFERRED"
832 } else {
833 "BEGIN IMMEDIATE"
835 }
836 }
837 };
838 conn.execute_batch(begin_stmt)
839 .map_err(|e| map_rusqlite_err(e, "begin_tx"))?;
840
841 if read_only {
845 conn.pragma_update(None, "query_only", "ON")
846 .map_err(|e| map_rusqlite_err(e, "begin_tx"))?;
847 }
848
849 Ok(Box::new(SqliteTransaction {
850 conn: Some(conn),
851 read_only,
852 }))
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use super::*;
859 use crate::pool::PoolConfig;
860 use khive_storage::types::{SqlIsolation, SqlStatement, SqlTxOptions, SqlValue};
861 use khive_storage::SqlAccess as _;
862
863 #[tokio::test]
866 async fn tx_read_only_rejects_writes() {
867 let dir = tempfile::tempdir().unwrap();
868 let path = dir.path().join("tx_ro.db");
869 let config = PoolConfig {
870 path: Some(path.clone()),
871 ..PoolConfig::default()
872 };
873 let pool = Arc::new(ConnectionPool::new(config).unwrap());
874
875 {
877 let guard = pool.writer().unwrap();
878 guard
879 .conn()
880 .execute_batch("CREATE TABLE IF NOT EXISTS ro_test (id INTEGER PRIMARY KEY)")
881 .unwrap();
882 }
883
884 let bridge = SqlBridge::new(Arc::clone(&pool), true);
885
886 let mut tx = bridge
887 .begin_tx(SqlTxOptions {
888 read_only: true,
889 isolation: SqlIsolation::Default,
890 label: None,
891 })
892 .await
893 .unwrap();
894
895 let result = tx
897 .execute(SqlStatement {
898 sql: "INSERT INTO ro_test (id) VALUES (?1)".into(),
899 params: vec![SqlValue::Integer(1)],
900 label: None,
901 })
902 .await;
903
904 assert!(result.is_err(), "INSERT in read-only tx must fail");
905
906 tx.rollback().await.unwrap();
908 }
909}