use crate::channel::mpsc;
use crate::cx::Cx;
use crate::runtime::blocking_pool::{BlockingPool, BlockingPoolHandle};
use crate::time::{sleep, wall_now};
use crate::types::{CancelReason, Outcome};
use parking_lot::Mutex;
use std::collections::BTreeMap;
use std::fmt;
use std::future::poll_fn;
use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::task::Poll;
use std::time::Duration;
static SQLITE_POOL: OnceLock<BlockingPool> = OnceLock::new();
const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_millis(250);
const DEFAULT_STATEMENT_CACHE_CAPACITY: usize = 64;
const SQLITE_ROW_STREAM_CHANNEL_CAPACITY: usize = 1;
const SQLITE_ROW_STREAM_FULL_BACKOFF: Duration = Duration::from_millis(1);
fn sqlite_cancelled_reason(cx: &Cx) -> CancelReason {
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled"))
}
async fn sqlite_wait_retry_delay(cx: &Cx, delay: Duration) -> Result<(), CancelReason> {
if delay.is_zero() {
cx.checkpoint().map_err(|_| sqlite_cancelled_reason(cx))?;
crate::runtime::yield_now().await;
return cx.checkpoint().map_err(|_| sqlite_cancelled_reason(cx));
}
let now = cx
.timer_driver()
.map_or_else(wall_now, |driver| driver.now());
let mut sleeper = sleep(now, delay);
poll_fn(|task_cx| {
if cx.checkpoint().is_err() {
return Poll::Ready(Err(sqlite_cancelled_reason(cx)));
}
Pin::new(&mut sleeper).poll(task_cx).map(Ok)
})
.await
}
fn wal_checkpoint_i64(row: &SqliteRow, column: &str) -> Result<i64, SqliteError> {
row.get_i64(column).map_err(|err| {
SqliteError::WalCheckpointFailed(format!(
"WAL checkpoint status column {column:?} was missing or non-integer: {err}"
))
})
}
fn get_sqlite_pool() -> BlockingPoolHandle {
SQLITE_POOL.get_or_init(|| BlockingPool::new(1, 4)).handle()
}
fn configure_connection_defaults(
conn: &rusqlite::Connection,
enable_wal: bool,
) -> Result<(), SqliteError> {
conn.busy_timeout(DEFAULT_BUSY_TIMEOUT)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
if enable_wal {
conn.pragma_update(None, "journal_mode", "WAL")
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
}
conn.set_prepared_statement_cache_capacity(DEFAULT_STATEMENT_CACHE_CAPACITY);
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TransactionState {
Autocommit,
InTransaction,
NeedsRollback,
RollingBack, }
fn rollback_orphaned_transaction_mutex_guarded(
conn: &rusqlite::Connection,
transaction_state: &Mutex<TransactionState>,
) -> Result<(), SqliteError> {
let mut state_guard = transaction_state.lock();
if *state_guard != TransactionState::NeedsRollback {
return Ok(());
}
*state_guard = TransactionState::RollingBack;
drop(state_guard);
let final_state = if conn.is_autocommit() {
TransactionState::Autocommit
} else {
match conn.execute_batch("ROLLBACK") {
Ok(()) => TransactionState::Autocommit,
Err(e) => {
if conn.is_autocommit() {
TransactionState::Autocommit
} else {
let mut state_guard = transaction_state.lock();
*state_guard = TransactionState::NeedsRollback;
return Err(SqliteError::Sqlite(e.to_string()));
}
}
}
};
let mut state_guard = transaction_state.lock();
*state_guard = final_state;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SqlSurfaceViolation {
Pragma,
TransactionControl,
AttachDetach,
}
impl SqlSurfaceViolation {
fn checked_surface_message(self) -> &'static str {
match self {
Self::Pragma => "PRAGMA statements require the explicit *_unchecked SQLite APIs",
Self::TransactionControl => {
"transaction or connection control statements require the explicit *_unchecked SQLite APIs"
}
Self::AttachDetach => "ATTACH and DETACH are disabled on the checked SQLite APIs",
}
}
}
fn classify_sql_surface_violation(sql: &str) -> Option<SqlSurfaceViolation> {
use sqlparser::dialect::SQLiteDialect;
use sqlparser::parser::Parser;
let dialect = SQLiteDialect {};
match Parser::parse_sql(&dialect, sql) {
Ok(statements) => check_parsed_statements(&statements),
Err(_) => {
check_sql_keywords_fallback(sql)
}
}
}
fn check_parsed_statements(
statements: &[sqlparser::ast::Statement],
) -> Option<SqlSurfaceViolation> {
use sqlparser::ast::Statement;
for statement in statements {
match statement {
Statement::Pragma { .. } => {
return Some(SqlSurfaceViolation::Pragma);
}
Statement::SetVariable { .. } if is_pragma_statement(statement) => {
return Some(SqlSurfaceViolation::Pragma);
}
Statement::AttachDatabase { .. }
| Statement::AttachDuckDBDatabase { .. }
| Statement::DetachDuckDBDatabase { .. } => {
return Some(SqlSurfaceViolation::AttachDetach);
}
Statement::StartTransaction { .. }
| Statement::Commit { .. }
| Statement::Rollback { .. }
| Statement::Savepoint { .. } => {
return Some(SqlSurfaceViolation::TransactionControl);
}
Statement::CreateTrigger { .. } => {
}
_ => {}
}
}
None
}
fn is_pragma_statement(statement: &sqlparser::ast::Statement) -> bool {
use sqlparser::ast::Statement;
if let Statement::SetVariable { variables, .. } = statement {
return variables.to_string().to_uppercase().starts_with("PRAGMA");
}
false
}
fn check_sql_keywords_fallback(sql: &str) -> Option<SqlSurfaceViolation> {
let sql_upper = sql.to_uppercase();
let sql_clean = remove_sql_comments(&sql_upper);
let statements: Vec<&str> = sql_clean.split(';').map(|s| s.trim()).collect();
for stmt in statements {
if stmt.is_empty() {
continue;
}
if stmt.starts_with("PRAGMA ") || stmt == "PRAGMA" {
return Some(SqlSurfaceViolation::Pragma);
}
if stmt.starts_with("ATTACH ") || stmt.starts_with("DETACH ") {
return Some(SqlSurfaceViolation::AttachDetach);
}
if !stmt.contains(" TRIGGER ") {
if stmt.starts_with("BEGIN")
|| stmt.starts_with("COMMIT")
|| stmt.starts_with("ROLLBACK")
|| stmt.starts_with("SAVEPOINT ")
{
return Some(SqlSurfaceViolation::TransactionControl);
}
}
}
None
}
fn remove_sql_comments(sql: &str) -> String {
let mut result = String::with_capacity(sql.len());
let mut chars = sql.chars().peekable();
while let Some(ch) = chars.next() {
match ch {
'-' if chars.peek() == Some(&'-') => {
chars.next(); for ch in chars.by_ref() {
if ch == '\n' || ch == '\r' {
result.push(' ');
break;
}
}
}
'/' if chars.peek() == Some(&'*') => {
chars.next(); while let Some(ch) = chars.next() {
if ch == '*' && chars.peek() == Some(&'/') {
chars.next(); break;
}
}
result.push(' ');
}
'\'' | '"' | '`' => {
let quote = ch;
result.push(ch);
while let Some(ch) = chars.next() {
result.push(ch);
if ch == quote {
if chars.peek() == Some("e) {
chars.next(); result.push(quote);
} else {
break;
}
}
}
}
_ => result.push(ch),
}
}
result
}
fn ensure_checked_sql_surface(sql: &str) -> Result<(), SqliteError> {
if let Some(violation) = classify_sql_surface_violation(sql) {
return Err(SqliteError::UnsafeSql(
violation.checked_surface_message().to_string(),
));
}
Ok(())
}
fn ensure_unchecked_sql_surface(sql: &str) -> Result<(), SqliteError> {
if matches!(
classify_sql_surface_violation(sql),
Some(SqlSurfaceViolation::AttachDetach)
) {
return Err(SqliteError::UnsafeSql(
"ATTACH and DETACH are disabled on SQLite connections; open a separate validated connection instead"
.to_string(),
));
}
Ok(())
}
fn resolve_sqlite_open_path(path: &Path) -> Result<PathBuf, SqliteError> {
if path.exists() {
return std::fs::canonicalize(path).map_err(SqliteError::Io);
}
let parent = path
.parent()
.filter(|parent| !parent.as_os_str().is_empty())
.unwrap_or_else(|| Path::new("."));
let canonical_parent = std::fs::canonicalize(parent).map_err(SqliteError::Io)?;
let file_name = path.file_name().ok_or_else(|| {
SqliteError::UnsafePath("SQLite database path must resolve to a file name".to_string())
})?;
Ok(canonical_parent.join(file_name))
}
fn validate_sqlite_open_path(path: &Path) -> Result<(), SqliteError> {
let raw = path.as_os_str().to_string_lossy();
if raw.starts_with('~') {
return Err(SqliteError::UnsafePath(
"tilde-prefixed SQLite paths are rejected; pass an explicit validated path".to_string(),
));
}
if path
.components()
.any(|component| matches!(component, Component::ParentDir))
{
return Err(SqliteError::UnsafePath(
"parent-directory traversal in SQLite paths is rejected; pass a normalized validated path"
.to_string(),
));
}
let resolved = resolve_sqlite_open_path(path)?;
validate_resolved_sqlite_path(&resolved)
}
fn validate_resolved_sqlite_path(resolved_path: &Path) -> Result<(), SqliteError> {
if resolved_path.starts_with(Path::new("/etc")) {
return Err(SqliteError::UnsafePath(format!(
"SQLite database path resolves into restricted system directory: {}",
resolved_path.display()
)));
}
if resolved_path.starts_with(Path::new("/sys")) {
return Err(SqliteError::UnsafePath(format!(
"SQLite database path resolves into restricted /sys directory: {}",
resolved_path.display()
)));
}
if resolved_path.starts_with(Path::new("/proc")) {
return Err(SqliteError::UnsafePath(format!(
"SQLite database path resolves into restricted /proc directory: {}",
resolved_path.display()
)));
}
if resolved_path.starts_with(Path::new("/dev")) {
return Err(SqliteError::UnsafePath(format!(
"SQLite database path resolves into restricted /dev directory: {}",
resolved_path.display()
)));
}
Ok(())
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_validate_sqlite_open_path(path: &Path) -> Result<(), SqliteError> {
validate_sqlite_open_path(path)
}
#[derive(Debug)]
pub enum SqliteError {
Sqlite(String),
Cancelled(CancelReason),
ConnectionClosed,
ColumnNotFound(String),
TypeMismatch {
column: String,
expected: &'static str,
actual: String,
},
Io(std::io::Error),
TransactionFinished,
LockPoisoned,
UnsafeSql(String),
UnsafePath(String),
InvalidTextEncoding {
column: String,
source: std::str::Utf8Error,
},
WalCheckpointFailed(String),
}
impl SqliteError {
#[must_use]
pub fn is_busy(&self) -> bool {
match self {
Self::Sqlite(msg) => msg.contains("database is locked") || msg.contains("SQLITE_BUSY"),
_ => false,
}
}
#[must_use]
pub fn is_locked(&self) -> bool {
match self {
Self::Sqlite(msg) => {
msg.contains("database table is locked") || msg.contains("SQLITE_LOCKED")
}
_ => false,
}
}
#[must_use]
pub fn is_constraint_violation(&self) -> bool {
match self {
Self::Sqlite(msg) => {
msg.contains("SQLITE_CONSTRAINT")
|| msg.contains("UNIQUE constraint failed")
|| msg.contains("NOT NULL constraint failed")
|| msg.contains("FOREIGN KEY constraint failed")
|| msg.contains("CHECK constraint failed")
}
_ => false,
}
}
#[must_use]
pub fn is_unique_violation(&self) -> bool {
match self {
Self::Sqlite(msg) => msg.contains("UNIQUE constraint failed"),
_ => false,
}
}
#[must_use]
pub fn is_connection_error(&self) -> bool {
matches!(
self,
Self::Io(_) | Self::ConnectionClosed | Self::LockPoisoned
)
}
#[must_use]
pub fn is_transient(&self) -> bool {
if matches!(self, Self::Io(_) | Self::ConnectionClosed) {
return true;
}
self.is_busy() || self.is_locked()
}
#[must_use]
pub fn is_retryable(&self) -> bool {
self.is_transient()
}
#[must_use]
pub fn error_code(&self) -> Option<&str> {
match self {
Self::Sqlite(msg) => {
if msg.contains("SQLITE_BUSY") || msg.contains("database is locked") {
Some("SQLITE_BUSY")
} else if msg.contains("SQLITE_LOCKED") || msg.contains("database table is locked")
{
Some("SQLITE_LOCKED")
} else if msg.contains("SQLITE_CONSTRAINT") || msg.contains("constraint failed") {
Some("SQLITE_CONSTRAINT")
} else if msg.contains("SQLITE_ERROR") {
Some("SQLITE_ERROR")
} else {
None
}
}
Self::Io(_) => Some("SQLITE_IOERR"),
Self::ConnectionClosed => Some("SQLITE_MISUSE"),
Self::UnsafePath(_) => Some("SQLITE_PERM"),
_ => None,
}
}
}
impl fmt::Display for SqliteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sqlite(msg) => write!(f, "SQLite error: {msg}"),
Self::Cancelled(reason) => write!(f, "SQLite operation cancelled: {reason:?}"),
Self::ConnectionClosed => write!(f, "SQLite connection is closed"),
Self::ColumnNotFound(name) => write!(f, "Column not found: {name}"),
Self::TypeMismatch {
column,
expected,
actual,
} => write!(
f,
"Type mismatch for column {column}: expected {expected}, got {actual}"
),
Self::Io(e) => write!(f, "SQLite I/O error: {e}"),
Self::TransactionFinished => write!(f, "Transaction already finished"),
Self::LockPoisoned => write!(f, "SQLite connection lock poisoned"),
Self::UnsafeSql(msg) => {
write!(
f,
"Unsafe SQLite control SQL on SQLite binding surface: {msg}"
)
}
Self::UnsafePath(msg) => write!(f, "Unsafe SQLite database path: {msg}"),
Self::InvalidTextEncoding { column, source } => {
write!(
f,
"SQLite text column {column} contained invalid UTF-8: {source}"
)
}
Self::WalCheckpointFailed(msg) => write!(f, "WAL checkpoint failed: {msg}"),
}
}
}
impl std::error::Error for SqliteError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(e) => Some(e),
Self::InvalidTextEncoding { source, .. } => Some(source),
_ => None,
}
}
}
impl From<std::io::Error> for SqliteError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum SqliteValue {
Null,
Integer(i64),
Real(f64),
Text(String),
Blob(Vec<u8>),
}
impl SqliteValue {
#[must_use]
pub fn is_null(&self) -> bool {
matches!(self, Self::Null)
}
#[must_use]
pub fn as_integer(&self) -> Option<i64> {
match self {
Self::Integer(v) => Some(*v),
_ => None,
}
}
#[must_use]
pub fn as_real(&self) -> Option<f64> {
match self {
Self::Real(v) => Some(*v),
#[allow(clippy::cast_precision_loss)]
Self::Integer(v) => Some(*v as f64),
_ => None,
}
}
#[must_use]
pub fn as_text(&self) -> Option<&str> {
match self {
Self::Text(v) => Some(v),
_ => None,
}
}
#[must_use]
pub fn as_blob(&self) -> Option<&[u8]> {
match self {
Self::Blob(v) => Some(v),
_ => None,
}
}
}
impl fmt::Display for SqliteValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Null => write!(f, "NULL"),
Self::Integer(v) => write!(f, "{v}"),
Self::Real(v) => write!(f, "{v}"),
Self::Text(v) => write!(f, "{v}"),
Self::Blob(v) => write!(f, "<blob {} bytes>", v.len()),
}
}
}
#[derive(Debug, Clone)]
pub struct SqliteRow {
columns: Arc<BTreeMap<String, usize>>,
values: Vec<SqliteValue>,
}
impl SqliteRow {
fn new(columns: Arc<BTreeMap<String, usize>>, values: Vec<SqliteValue>) -> Self {
Self { columns, values }
}
pub fn get(&self, column: &str) -> Result<&SqliteValue, SqliteError> {
let idx = self
.columns
.get(column)
.ok_or_else(|| SqliteError::ColumnNotFound(column.to_string()))?;
self.values
.get(*idx)
.ok_or_else(|| SqliteError::ColumnNotFound(column.to_string()))
}
pub fn get_idx(&self, idx: usize) -> Result<&SqliteValue, SqliteError> {
self.values
.get(idx)
.ok_or_else(|| SqliteError::ColumnNotFound(format!("index {idx}")))
}
pub fn get_i64(&self, column: &str) -> Result<i64, SqliteError> {
let val = self.get(column)?;
val.as_integer().ok_or_else(|| SqliteError::TypeMismatch {
column: column.to_string(),
expected: "integer",
actual: format!("{val:?}"),
})
}
pub fn get_f64(&self, column: &str) -> Result<f64, SqliteError> {
let val = self.get(column)?;
val.as_real().ok_or_else(|| SqliteError::TypeMismatch {
column: column.to_string(),
expected: "real",
actual: format!("{val:?}"),
})
}
pub fn get_str(&self, column: &str) -> Result<&str, SqliteError> {
let val = self.get(column)?;
val.as_text().ok_or_else(|| SqliteError::TypeMismatch {
column: column.to_string(),
expected: "text",
actual: format!("{val:?}"),
})
}
pub fn get_blob(&self, column: &str) -> Result<&[u8], SqliteError> {
let val = self.get(column)?;
val.as_blob().ok_or_else(|| SqliteError::TypeMismatch {
column: column.to_string(),
expected: "blob",
actual: format!("{val:?}"),
})
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn column_names(&self) -> impl Iterator<Item = &str> {
self.columns.keys().map(String::as_str)
}
}
#[derive(Debug, Default)]
struct SqliteRowStreamCounters {
rows_stepped: AtomicUsize,
rows_yielded: AtomicUsize,
buffered_rows: AtomicUsize,
peak_buffered_rows: AtomicUsize,
}
impl SqliteRowStreamCounters {
fn record_buffered_row(&self) {
let buffered = self
.buffered_rows
.fetch_add(1, Ordering::AcqRel)
.saturating_add(1);
let observed = buffered.min(SQLITE_ROW_STREAM_CHANNEL_CAPACITY);
let mut peak = self.peak_buffered_rows.load(Ordering::Acquire);
while observed > peak {
match self.peak_buffered_rows.compare_exchange_weak(
peak,
observed,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(current) => peak = current,
}
}
}
fn record_yielded_row(&self) {
self.buffered_rows.fetch_sub(1, Ordering::AcqRel);
self.rows_yielded.fetch_add(1, Ordering::AcqRel);
}
fn snapshot(&self) -> SqliteRowStreamStats {
SqliteRowStreamStats {
rows_stepped: self.rows_stepped.load(Ordering::Acquire),
rows_yielded: self.rows_yielded.load(Ordering::Acquire),
buffered_rows: self.buffered_rows.load(Ordering::Acquire),
peak_buffered_rows: self.peak_buffered_rows.load(Ordering::Acquire),
channel_capacity: SQLITE_ROW_STREAM_CHANNEL_CAPACITY,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SqliteRowStreamStats {
pub rows_stepped: usize,
pub rows_yielded: usize,
pub buffered_rows: usize,
pub peak_buffered_rows: usize,
pub channel_capacity: usize,
}
type SqliteRowStreamMessage = Result<SqliteRow, SqliteError>;
fn send_sqlite_stream_message(
sender: &mpsc::Sender<SqliteRowStreamMessage>,
counters: &SqliteRowStreamCounters,
mut message: SqliteRowStreamMessage,
) -> bool {
let is_row = message.is_ok();
loop {
match sender.try_reserve() {
Ok(permit) => {
if is_row {
counters.record_buffered_row();
}
match permit.send(message) {
Outcome::Ok(()) => return true,
Outcome::Err(
mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_),
) => {
if is_row {
counters.buffered_rows.fetch_sub(1, Ordering::AcqRel);
}
return false;
}
Outcome::Err(mpsc::SendError::Full(value)) => {
if is_row {
counters.buffered_rows.fetch_sub(1, Ordering::AcqRel);
}
message = value;
}
Outcome::Cancelled(_) | Outcome::Panicked(_) => return false,
}
}
Err(mpsc::SendError::Disconnected(()) | mpsc::SendError::Cancelled(())) => {
return false;
}
Err(mpsc::SendError::Full(())) => {
std::thread::sleep(SQLITE_ROW_STREAM_FULL_BACKOFF);
}
}
}
}
fn sqlite_row_from_rusqlite_row(
row: &rusqlite::Row<'_>,
column_count: usize,
column_names: &[String],
columns: &Arc<BTreeMap<String, usize>>,
) -> Result<SqliteRow, SqliteError> {
let mut values = Vec::with_capacity(column_count);
for i in 0..column_count {
let value = row
.get_ref(i)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let column = column_name_or_index(column_names, i);
values.push(convert_value(value, &column)?);
}
Ok(SqliteRow::new(Arc::clone(columns), values))
}
pub struct SqliteRowStream {
receiver: mpsc::Receiver<SqliteRowStreamMessage>,
handle: crate::runtime::blocking_pool::BlockingTaskHandle,
counters: Arc<SqliteRowStreamCounters>,
finished: bool,
}
impl fmt::Debug for SqliteRowStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SqliteRowStream")
.field("stats", &self.stats())
.field("finished", &self.finished)
.finish()
}
}
impl SqliteRowStream {
pub async fn next(&mut self, cx: &Cx) -> Outcome<Option<SqliteRow>, SqliteError> {
if self.finished {
return Outcome::Ok(None);
}
if cx.checkpoint().is_err() {
self.finish();
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.receiver.recv(cx).await {
Ok(Ok(row)) => {
self.counters.record_yielded_row();
Outcome::Ok(Some(row))
}
Ok(Err(err)) => {
self.finish();
Outcome::Err(err)
}
Err(mpsc::RecvError::Disconnected) => {
self.finished = true;
Outcome::Ok(None)
}
Err(mpsc::RecvError::Cancelled) => {
self.finish();
Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
)
}
Err(mpsc::RecvError::Empty) => Outcome::Err(SqliteError::Sqlite(
"sqlite row stream receive unexpectedly returned empty".to_string(),
)),
}
}
#[must_use]
pub fn stats(&self) -> SqliteRowStreamStats {
self.counters.snapshot()
}
fn finish(&mut self) {
if !self.finished {
self.finished = true;
self.receiver.close();
self.handle.cancel();
}
}
}
impl Drop for SqliteRowStream {
fn drop(&mut self) {
self.finish();
}
}
struct SqliteConnectionInner {
conn: Option<rusqlite::Connection>,
}
impl SqliteConnectionInner {
fn new(conn: rusqlite::Connection) -> Self {
Self { conn: Some(conn) }
}
fn get(&self) -> Result<&rusqlite::Connection, SqliteError> {
self.conn.as_ref().ok_or(SqliteError::ConnectionClosed)
}
fn close(&mut self) {
self.conn = None;
}
}
pub struct SqliteConnection {
inner: Arc<Mutex<SqliteConnectionInner>>,
pool: BlockingPoolHandle,
transaction_state: Arc<Mutex<TransactionState>>,
}
impl fmt::Debug for SqliteConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = *self.transaction_state.lock();
f.debug_struct("SqliteConnection")
.field("open", &self.inner.lock().conn.is_some())
.field("pool", &self.pool)
.field("transaction_state", &state)
.finish()
}
}
impl SqliteConnection {
async fn run_connection_op<R, F>(
&self,
cx: &Cx,
op_name: &'static str,
f: F,
) -> Outcome<R, SqliteError>
where
R: Send + 'static,
F: FnOnce(&rusqlite::Connection) -> Result<R, SqliteError> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
let (tx, mut rx) = crate::channel::oneshot::channel();
let permit = tx.reserve(cx);
let handle = self.pool.spawn(move || {
let result = (|| {
let guard = inner.lock();
let conn = guard.get()?;
let result = f(conn);
drop(guard);
result
})();
if let Ok(p) = permit {
let _ = p.send(result);
}
});
match rx.recv(cx).await {
Ok(Ok(result)) => Outcome::Ok(result),
Ok(Err(e)) => Outcome::Err(e),
Err(crate::channel::oneshot::RecvError::Cancelled) => {
handle.cancel();
Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
)
}
Err(crate::channel::oneshot::RecvError::Closed) => Outcome::Err(SqliteError::Sqlite(
format!("failed to receive result for {op_name}"),
)),
Err(crate::channel::oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("{op_name} awaits a fresh oneshot recv future")
}
}
}
async fn drain_orphaned_transaction(&self, cx: &Cx) -> Outcome<(), SqliteError> {
let current_state = *self.transaction_state.lock();
if current_state != TransactionState::NeedsRollback {
return Outcome::Ok(());
}
let transaction_state = Arc::clone(&self.transaction_state);
self.run_connection_op(cx, "sqlite rollback cleanup", move |conn| {
rollback_orphaned_transaction_mutex_guarded(conn, transaction_state.as_ref())
})
.await
}
pub async fn open(cx: &Cx, path: impl AsRef<Path>) -> Outcome<Self, SqliteError> {
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let path = path.as_ref().to_path_buf();
let pool = get_sqlite_pool();
let pool_clone = pool.clone();
let (tx, mut rx) = crate::channel::oneshot::channel();
let permit = tx.reserve(cx);
let handle = pool.spawn(move || {
let result = (|| {
let resolved_path = resolve_sqlite_open_path(&path)?;
validate_resolved_sqlite_path(&resolved_path)?;
let conn = rusqlite::Connection::open(&resolved_path)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
configure_connection_defaults(&conn, true)?;
Ok(conn)
})();
if let Ok(p) = permit {
let _ = p.send(result);
}
});
match rx.recv(cx).await {
Ok(Ok(conn)) => Outcome::Ok(Self {
inner: Arc::new(Mutex::new(SqliteConnectionInner::new(conn))),
pool: pool_clone,
transaction_state: Arc::new(Mutex::new(TransactionState::Autocommit)),
}),
Ok(Err(e)) => Outcome::Err(e),
Err(crate::channel::oneshot::RecvError::Cancelled) => {
handle.cancel();
Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
)
}
Err(crate::channel::oneshot::RecvError::Closed) => {
Outcome::Err(SqliteError::Sqlite("failed to receive result".to_string()))
}
Err(crate::channel::oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("SQLite blocking-pool open awaits a fresh oneshot recv future")
}
}
}
pub async fn open_in_memory(cx: &Cx) -> Outcome<Self, SqliteError> {
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let pool = get_sqlite_pool();
let pool_clone = pool.clone();
let (tx, mut rx) = crate::channel::oneshot::channel();
let permit = tx.reserve(cx);
let handle = pool.spawn(move || {
let result = (|| {
let conn = rusqlite::Connection::open_in_memory()
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
configure_connection_defaults(&conn, false)?;
Ok(conn)
})();
if let Ok(p) = permit {
let _ = p.send(result);
}
});
match rx.recv(cx).await {
Ok(Ok(conn)) => Outcome::Ok(Self {
inner: Arc::new(Mutex::new(SqliteConnectionInner::new(conn))),
pool: pool_clone,
transaction_state: Arc::new(Mutex::new(TransactionState::Autocommit)),
}),
Ok(Err(e)) => Outcome::Err(e),
Err(crate::channel::oneshot::RecvError::Cancelled) => {
handle.cancel();
Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
)
}
Err(crate::channel::oneshot::RecvError::Closed) => {
Outcome::Err(SqliteError::Sqlite("failed to receive result".to_string()))
}
Err(crate::channel::oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("SQLite in-memory open awaits a fresh oneshot recv future")
}
}
}
pub async fn execute(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<u64, SqliteError> {
if let Err(err) = ensure_checked_sql_surface(sql) {
return Outcome::Err(err);
}
self.execute_unchecked(cx, sql, params).await
}
pub async fn execute_unchecked(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<u64, SqliteError> {
if let Err(err) = ensure_unchecked_sql_surface(sql) {
return Outcome::Err(err);
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.drain_orphaned_transaction(cx).await {
Outcome::Ok(()) => {}
Outcome::Err(e) => return Outcome::Err(e),
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let sql = sql.to_string();
let params: Vec<SqliteValue> = params.to_vec();
self.run_connection_op(cx, "sqlite execute", move |conn| {
let params_refs: Vec<&dyn rusqlite::ToSql> =
params.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
conn.execute(&sql, params_refs.as_slice())
.map(|n| n as u64)
.map_err(|e| SqliteError::Sqlite(e.to_string()))
})
.await
}
pub async fn execute_batch(&self, cx: &Cx, sql: &str) -> Outcome<(), SqliteError> {
if let Err(err) = ensure_checked_sql_surface(sql) {
return Outcome::Err(err);
}
self.execute_batch_unchecked(cx, sql).await
}
pub async fn execute_batch_unchecked(&self, cx: &Cx, sql: &str) -> Outcome<(), SqliteError> {
if let Err(err) = ensure_unchecked_sql_surface(sql) {
return Outcome::Err(err);
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.drain_orphaned_transaction(cx).await {
Outcome::Ok(()) => {}
Outcome::Err(e) => return Outcome::Err(e),
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let sql = sql.to_string();
self.run_connection_op(cx, "sqlite execute_batch", move |conn| {
conn.execute_batch(&sql)
.map_err(|e| SqliteError::Sqlite(e.to_string()))
})
.await
}
pub async fn query(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<Vec<SqliteRow>, SqliteError> {
if let Err(err) = ensure_checked_sql_surface(sql) {
return Outcome::Err(err);
}
self.query_unchecked(cx, sql, params).await
}
pub async fn query_unchecked(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<Vec<SqliteRow>, SqliteError> {
if let Err(err) = ensure_unchecked_sql_surface(sql) {
return Outcome::Err(err);
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.drain_orphaned_transaction(cx).await {
Outcome::Ok(()) => {}
Outcome::Err(e) => return Outcome::Err(e),
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let sql = sql.to_string();
let params: Vec<SqliteValue> = params.to_vec();
self.run_connection_op(cx, "sqlite query", move |conn| {
let params_refs: Vec<&dyn rusqlite::ToSql> =
params.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
let mut stmt = conn
.prepare_cached(&sql)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let column_names: Vec<String> = stmt
.column_names()
.iter()
.map(std::string::ToString::to_string)
.collect();
let columns: BTreeMap<String, usize> = column_names
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let columns = Arc::new(columns);
let column_count = stmt.column_count();
let mut rows = stmt
.query(params_refs.as_slice())
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let mut result = Vec::new();
while let Some(row) = rows
.next()
.map_err(|e| SqliteError::Sqlite(e.to_string()))?
{
result.push(sqlite_row_from_rusqlite_row(
row,
column_count,
&column_names,
&columns,
)?);
}
drop(rows);
drop(stmt);
Ok(result)
})
.await
}
pub async fn query_stream(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<SqliteRowStream, SqliteError> {
if let Err(err) = ensure_checked_sql_surface(sql) {
return Outcome::Err(err);
}
self.query_stream_unchecked(cx, sql, params).await
}
pub async fn query_stream_unchecked(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<SqliteRowStream, SqliteError> {
if let Err(err) = ensure_unchecked_sql_surface(sql) {
return Outcome::Err(err);
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.drain_orphaned_transaction(cx).await {
Outcome::Ok(()) => {}
Outcome::Err(e) => return Outcome::Err(e),
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let sql = sql.to_string();
let params: Vec<SqliteValue> = params.to_vec();
let inner = Arc::clone(&self.inner);
let counters = Arc::new(SqliteRowStreamCounters::default());
let worker_counters = Arc::clone(&counters);
let (sender, receiver) = mpsc::channel(SQLITE_ROW_STREAM_CHANNEL_CAPACITY);
let handle = self.pool.spawn(move || {
let result = (|| {
let guard = inner.lock();
let conn = guard.get()?;
let params_refs: Vec<&dyn rusqlite::ToSql> =
params.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
let mut stmt = conn
.prepare_cached(&sql)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let column_names: Vec<String> = stmt
.column_names()
.iter()
.map(std::string::ToString::to_string)
.collect();
let columns: BTreeMap<String, usize> = column_names
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let columns = Arc::new(columns);
let column_count = stmt.column_count();
let mut rows = stmt
.query(params_refs.as_slice())
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
while let Some(row) = rows
.next()
.map_err(|e| SqliteError::Sqlite(e.to_string()))?
{
worker_counters.rows_stepped.fetch_add(1, Ordering::AcqRel);
let row =
sqlite_row_from_rusqlite_row(row, column_count, &column_names, &columns)?;
if !send_sqlite_stream_message(&sender, &worker_counters, Ok(row)) {
break;
}
}
drop(rows);
drop(stmt);
drop(guard);
Ok(())
})();
if let Err(err) = result {
let _ = send_sqlite_stream_message(&sender, &worker_counters, Err(err));
}
});
Outcome::Ok(SqliteRowStream {
receiver,
handle,
counters,
finished: false,
})
}
pub async fn query_row(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<Option<SqliteRow>, SqliteError> {
if let Err(err) = ensure_checked_sql_surface(sql) {
return Outcome::Err(err);
}
self.query_row_unchecked(cx, sql, params).await
}
pub async fn query_row_unchecked(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<Option<SqliteRow>, SqliteError> {
if let Err(err) = ensure_unchecked_sql_surface(sql) {
return Outcome::Err(err);
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.drain_orphaned_transaction(cx).await {
Outcome::Ok(()) => {}
Outcome::Err(e) => return Outcome::Err(e),
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
let sql = sql.to_string();
let params: Vec<SqliteValue> = params.to_vec();
self.run_connection_op(cx, "sqlite query_row", move |conn| {
let params_refs: Vec<&dyn rusqlite::ToSql> =
params.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
let mut stmt = conn
.prepare_cached(&sql)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let column_count = stmt.column_count();
let column_names: Vec<String> = stmt
.column_names()
.iter()
.map(std::string::ToString::to_string)
.collect();
let mut rows = stmt
.query(params_refs.as_slice())
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let row_opt = rows
.next()
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let result = if let Some(row) = row_opt {
let columns: BTreeMap<String, usize> = column_names
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let columns = Arc::new(columns);
let mut values = Vec::with_capacity(column_count);
for i in 0..column_count {
let value = row
.get_ref(i)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
let column = column_name_or_index(&column_names, i);
values.push(convert_value(value, &column)?);
}
Some(SqliteRow::new(columns, values))
} else {
None
};
drop(rows);
drop(stmt);
Ok(result)
})
.await
}
pub async fn begin(&self, cx: &Cx) -> Outcome<SqliteTransaction<'_>, SqliteError> {
match self.execute_unchecked(cx, "BEGIN", &[]).await {
Outcome::Ok(_) => {
*self.transaction_state.lock() = TransactionState::InTransaction;
Outcome::Ok(SqliteTransaction {
conn: self,
finished: false,
})
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
pub async fn begin_immediate(&self, cx: &Cx) -> Outcome<SqliteTransaction<'_>, SqliteError> {
match self.execute_unchecked(cx, "BEGIN IMMEDIATE", &[]).await {
Outcome::Ok(_) => {
*self.transaction_state.lock() = TransactionState::InTransaction;
Outcome::Ok(SqliteTransaction {
conn: self,
finished: false,
})
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
pub async fn begin_exclusive(&self, cx: &Cx) -> Outcome<SqliteTransaction<'_>, SqliteError> {
match self.execute_unchecked(cx, "BEGIN EXCLUSIVE", &[]).await {
Outcome::Ok(_) => {
*self.transaction_state.lock() = TransactionState::InTransaction;
Outcome::Ok(SqliteTransaction {
conn: self,
finished: false,
})
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
pub async fn set_busy_timeout(&self, cx: &Cx, timeout: Duration) -> Outcome<(), SqliteError> {
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.drain_orphaned_transaction(cx).await {
Outcome::Ok(()) => {}
Outcome::Err(e) => return Outcome::Err(e),
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
self.run_connection_op(cx, "sqlite set_busy_timeout", move |conn| {
conn.busy_timeout(timeout)
.map_err(|e| SqliteError::Sqlite(e.to_string()))?;
Ok(())
})
.await
}
pub fn close(&self) -> Result<(), SqliteError> {
let mut guard = self.inner.lock();
if let Some(conn) = guard.conn.as_ref() {
let _ =
rollback_orphaned_transaction_mutex_guarded(conn, self.transaction_state.as_ref());
match self.execute_wal_checkpoint_with_retry(conn) {
Ok(()) => {
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::debug!(
"WAL checkpoint completed successfully during close"
);
}
Err(e) => {
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::error!(
error = %e,
"WAL checkpoint failed during connection close - failing close to prevent data loss"
);
return Err(e);
}
}
conn.flush_prepared_statement_cache();
}
*self.transaction_state.lock() = TransactionState::Autocommit;
guard.close();
Ok(())
}
pub async fn close_async(&self, cx: &Cx) -> Outcome<(), SqliteError> {
if cx.checkpoint().is_err() {
return Outcome::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| CancelReason::user("cancelled")),
);
}
match self.execute_wal_checkpoint_async_with_retry(cx).await {
Outcome::Ok(()) => {
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::debug!("Async WAL checkpoint completed successfully");
}
Outcome::Err(e) => {
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::error!(
error = %e,
"Async WAL checkpoint failed during connection close - failing close to prevent data loss"
);
return Outcome::Err(e);
}
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
match self.close_without_checkpoint() {
Ok(()) => Outcome::Ok(()),
Err(e) => Outcome::Err(e),
}
}
#[must_use]
pub fn is_open(&self) -> bool {
self.inner.lock().conn.is_some()
}
fn execute_wal_checkpoint_with_retry(
&self,
conn: &rusqlite::Connection,
) -> Result<(), SqliteError> {
const MAX_RETRY_ATTEMPTS: u32 = 3;
const RETRY_DELAY_MS: u64 = 50;
for attempt in 1..=MAX_RETRY_ATTEMPTS {
match self.execute_single_wal_checkpoint(conn) {
Ok(()) => {
#[cfg(feature = "tracing-integration")]
if attempt > 1 {
crate::tracing_compat::info!(
attempt = attempt,
"WAL checkpoint succeeded after retry"
);
}
return Ok(());
}
Err(e) => {
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::warn!(
error = %e,
attempt = attempt,
max_attempts = MAX_RETRY_ATTEMPTS,
"WAL checkpoint attempt failed"
);
if attempt == MAX_RETRY_ATTEMPTS {
return Err(SqliteError::WalCheckpointFailed(format!(
"WAL checkpoint failed after {} attempts: {}",
MAX_RETRY_ATTEMPTS, e
)));
}
std::thread::sleep(std::time::Duration::from_millis(
RETRY_DELAY_MS * attempt as u64,
));
}
}
}
unreachable!("Loop should always return within max attempts")
}
fn execute_single_wal_checkpoint(
&self,
conn: &rusqlite::Connection,
) -> Result<(), rusqlite::Error> {
conn.execute_batch("PRAGMA wal_checkpoint(RESTART)")?;
let mut stmt = conn.prepare_cached("PRAGMA wal_checkpoint")?;
let result: (i32, i32, i32) =
stmt.query_row([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
let (busy, log_pages, checkpointed_pages) = result;
if busy != 0 {
return Err(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
Some("WAL checkpoint blocked by concurrent readers".to_string()),
));
}
if log_pages > 0 && checkpointed_pages == 0 {
return Err(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_IOERR),
Some(format!(
"WAL checkpoint failed - {} pages remain in WAL",
log_pages
)),
));
}
Ok(())
}
async fn execute_wal_checkpoint_async_with_retry(&self, cx: &Cx) -> Outcome<(), SqliteError> {
const MAX_RETRY_ATTEMPTS: u32 = 3;
for attempt in 1..=MAX_RETRY_ATTEMPTS {
match self.execute_wal_checkpoint_async_single(cx).await {
Outcome::Ok(()) => {
#[cfg(feature = "tracing-integration")]
if attempt > 1 {
crate::tracing_compat::info!(
attempt = attempt,
"Async WAL checkpoint succeeded after retry"
);
}
return Outcome::Ok(());
}
Outcome::Err(e) => {
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::warn!(
error = %e,
attempt = attempt,
max_attempts = MAX_RETRY_ATTEMPTS,
"Async WAL checkpoint attempt failed"
);
if attempt == MAX_RETRY_ATTEMPTS {
return Outcome::Err(SqliteError::WalCheckpointFailed(format!(
"Async WAL checkpoint failed after {} attempts: {}",
MAX_RETRY_ATTEMPTS, e
)));
}
let retry_delay = Duration::from_millis(50 * u64::from(attempt));
if let Err(reason) = sqlite_wait_retry_delay(cx, retry_delay).await {
return Outcome::Cancelled(reason);
}
}
Outcome::Cancelled(r) => return Outcome::Cancelled(r),
Outcome::Panicked(p) => return Outcome::Panicked(p),
}
}
unreachable!("Loop should always return within max attempts")
}
async fn execute_wal_checkpoint_async_single(&self, cx: &Cx) -> Outcome<(), SqliteError> {
match self
.execute_batch_unchecked(cx, "PRAGMA wal_checkpoint(RESTART)")
.await
{
Outcome::Ok(()) => {
match self.query_unchecked(cx, "PRAGMA wal_checkpoint", &[]).await {
Outcome::Ok(rows) => {
if let Some(row) = rows.first() {
let busy = match wal_checkpoint_i64(row, "busy") {
Ok(value) => value,
Err(err) => return Outcome::Err(err),
};
let log_pages = match wal_checkpoint_i64(row, "log") {
Ok(value) => value,
Err(err) => return Outcome::Err(err),
};
let checkpointed_pages = match wal_checkpoint_i64(row, "checkpointed") {
Ok(value) => value,
Err(err) => return Outcome::Err(err),
};
if busy != 0 {
return Outcome::Err(SqliteError::WalCheckpointFailed(
"WAL checkpoint blocked by concurrent readers".to_string(),
));
}
if log_pages > 0 && checkpointed_pages == 0 {
return Outcome::Err(SqliteError::WalCheckpointFailed(format!(
"WAL checkpoint failed - {} pages remain in WAL",
log_pages
)));
}
}
Outcome::Ok(())
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
fn close_without_checkpoint(&self) -> Result<(), SqliteError> {
let mut guard = self.inner.lock();
if let Some(conn) = guard.conn.as_ref() {
let _ =
rollback_orphaned_transaction_mutex_guarded(conn, self.transaction_state.as_ref());
conn.flush_prepared_statement_cache();
}
*self.transaction_state.lock() = TransactionState::Autocommit;
guard.close();
Ok(())
}
}
pub struct SqliteTransaction<'a> {
conn: &'a SqliteConnection,
finished: bool,
}
impl SqliteTransaction<'_> {
#[must_use]
pub(crate) fn requires_rollback_before_commit(&self) -> bool {
*self.conn.transaction_state.lock() == TransactionState::NeedsRollback
}
pub(crate) fn poison_for_rollback(&self) {
*self.conn.transaction_state.lock() = TransactionState::NeedsRollback;
}
pub async fn commit(mut self, cx: &Cx) -> Outcome<(), SqliteError> {
if self.finished {
return Outcome::Err(SqliteError::TransactionFinished);
}
match self.conn.execute_unchecked(cx, "COMMIT", &[]).await {
Outcome::Ok(_) => {
*self.conn.transaction_state.lock() = TransactionState::Autocommit;
self.finished = true;
Outcome::Ok(())
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
pub async fn rollback(mut self, cx: &Cx) -> Outcome<(), SqliteError> {
if self.finished {
return Outcome::Err(SqliteError::TransactionFinished);
}
match self.conn.execute_unchecked(cx, "ROLLBACK", &[]).await {
Outcome::Ok(_) => {
*self.conn.transaction_state.lock() = TransactionState::Autocommit;
self.finished = true;
Outcome::Ok(())
}
Outcome::Err(e) => Outcome::Err(e),
Outcome::Cancelled(r) => Outcome::Cancelled(r),
Outcome::Panicked(p) => Outcome::Panicked(p),
}
}
pub async fn execute(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<u64, SqliteError> {
if self.finished {
return Outcome::Err(SqliteError::TransactionFinished);
}
self.conn.execute(cx, sql, params).await
}
pub(crate) async fn execute_unchecked(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<u64, SqliteError> {
if self.finished {
return Outcome::Err(SqliteError::TransactionFinished);
}
self.conn.execute_unchecked(cx, sql, params).await
}
pub async fn query(
&self,
cx: &Cx,
sql: &str,
params: &[SqliteValue],
) -> Outcome<Vec<SqliteRow>, SqliteError> {
if self.finished {
return Outcome::Err(SqliteError::TransactionFinished);
}
self.conn.query(cx, sql, params).await
}
}
impl Drop for SqliteTransaction<'_> {
fn drop(&mut self) {
if !self.finished {
self.poison_for_rollback();
}
}
}
fn column_name_or_index(column_names: &[String], idx: usize) -> String {
column_names
.get(idx)
.cloned()
.unwrap_or_else(|| format!("index {idx}"))
}
fn convert_value(
value: rusqlite::types::ValueRef<'_>,
column: &str,
) -> Result<SqliteValue, SqliteError> {
match value {
rusqlite::types::ValueRef::Null => Ok(SqliteValue::Null),
rusqlite::types::ValueRef::Integer(v) => Ok(SqliteValue::Integer(v)),
rusqlite::types::ValueRef::Real(v) => Ok(SqliteValue::Real(v)),
rusqlite::types::ValueRef::Text(v) => {
let text =
std::str::from_utf8(v).map_err(|source| SqliteError::InvalidTextEncoding {
column: column.to_string(),
source,
})?;
Ok(SqliteValue::Text(text.to_string()))
}
rusqlite::types::ValueRef::Blob(v) => Ok(SqliteValue::Blob(v.to_vec())),
}
}
impl rusqlite::ToSql for SqliteValue {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
use rusqlite::types::ToSqlOutput;
match self {
Self::Null => Ok(ToSqlOutput::Owned(rusqlite::types::Value::Null)),
Self::Integer(v) => Ok(ToSqlOutput::Owned(rusqlite::types::Value::Integer(*v))),
Self::Real(v) => Ok(ToSqlOutput::Owned(rusqlite::types::Value::Real(*v))),
Self::Text(v) => Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text(v.clone()))),
Self::Blob(v) => Ok(ToSqlOutput::Owned(rusqlite::types::Value::Blob(v.clone()))),
}
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::cx::Cx;
use crate::test_utils::init_test_logging;
use crate::types::Budget;
use crate::types::Outcome;
use crate::util::ArenaIndex;
use crate::{RegionId, TaskId};
use futures_lite::future::block_on;
use tempfile::tempdir;
#[test]
fn test_sqlparser_blocks_pragma() {
assert_eq!(
classify_sql_surface_violation("PRAGMA journal_mode"),
Some(SqlSurfaceViolation::Pragma)
);
assert_eq!(
classify_sql_surface_violation("pragma foreign_keys"),
Some(SqlSurfaceViolation::Pragma)
);
assert_eq!(
classify_sql_surface_violation("/* comment */ PRAGMA journal_mode"),
Some(SqlSurfaceViolation::Pragma)
);
assert_eq!(classify_sql_surface_violation("SELECT * FROM users"), None);
assert_eq!(
classify_sql_surface_violation("INSERT INTO test VALUES (1, 'test')"),
None
);
}
#[test]
fn test_sqlparser_blocks_attach_detach() {
assert_eq!(
classify_sql_surface_violation("ATTACH 'db.sqlite' AS test"),
Some(SqlSurfaceViolation::AttachDetach)
);
assert_eq!(
classify_sql_surface_violation("DETACH DATABASE test"),
Some(SqlSurfaceViolation::AttachDetach)
);
assert_eq!(classify_sql_surface_violation("SELECT * FROM users"), None);
}
#[test]
fn test_sqlparser_blocks_transaction_control() {
assert_eq!(
classify_sql_surface_violation("BEGIN IMMEDIATE"),
Some(SqlSurfaceViolation::TransactionControl)
);
assert_eq!(
classify_sql_surface_violation("COMMIT"),
Some(SqlSurfaceViolation::TransactionControl)
);
assert_eq!(
classify_sql_surface_violation("ROLLBACK"),
Some(SqlSurfaceViolation::TransactionControl)
);
assert_eq!(
classify_sql_surface_violation(
"CREATE TRIGGER test AFTER INSERT ON table BEGIN INSERT INTO log VALUES (1); END"
),
None
);
assert_eq!(classify_sql_surface_violation("SELECT * FROM users"), None);
}
#[test]
fn test_sqlparser_comment_bypass_protection() {
let sql = "/* comment */ PRAGMA journal_mode -- line comment";
assert_eq!(
classify_sql_surface_violation(sql),
Some(SqlSurfaceViolation::Pragma)
);
let sql = "/* outer /* inner */ comment */ SELECT 1";
assert_eq!(classify_sql_surface_violation(sql), None);
}
#[test]
fn test_toctou_fix_path_resolution() {
use std::fs;
use tempfile::tempdir;
let temp_dir = tempdir().expect("Failed to create temp directory");
let temp_path = temp_dir.path();
let db_file = temp_path.join("test.sqlite");
fs::write(&db_file, b"").expect("Failed to create test database file");
let resolved = resolve_sqlite_open_path(&db_file).expect("Failed to resolve path");
validate_resolved_sqlite_path(&resolved).expect("Safe path should validate");
let etc_path = Path::new("/etc/passwd");
assert!(validate_resolved_sqlite_path(etc_path).is_err());
let sys_path = Path::new("/sys/kernel");
assert!(validate_resolved_sqlite_path(sys_path).is_err());
let proc_path = Path::new("/proc/version");
assert!(validate_resolved_sqlite_path(proc_path).is_err());
let dev_path = Path::new("/dev/null");
assert!(validate_resolved_sqlite_path(dev_path).is_err());
}
#[test]
#[cfg(unix)]
fn test_toctou_fix_prevents_symlink_attack() {
use std::os::unix::fs::symlink;
use tempfile::tempdir;
let temp_dir = tempdir().expect("Failed to create temp directory");
let temp_path = temp_dir.path();
let symlink_path = temp_path.join("malicious.sqlite");
if symlink("/etc/passwd", &symlink_path).is_ok() {
let resolved =
resolve_sqlite_open_path(&symlink_path).expect("Failed to resolve symlink");
assert!(validate_resolved_sqlite_path(&resolved).is_err());
assert!(resolved.starts_with("/etc"));
}
}
#[test]
fn test_path_validation_comprehensive() {
let tilde_path = Path::new("~/database.sqlite");
assert!(validate_sqlite_open_path(tilde_path).is_err());
let traversal_path = Path::new("../../../etc/passwd");
assert!(validate_sqlite_open_path(traversal_path).is_err());
let current_path = Path::new("./test.sqlite");
let _ = validate_sqlite_open_path(current_path);
}
#[test]
fn test_wal_checkpoint_fail_closed() {
use tempfile::NamedTempFile;
let _temp_file = NamedTempFile::new().expect("Failed to create temp file");
let checkpoint_error = SqliteError::WalCheckpointFailed("test error".to_string());
assert!(matches!(
checkpoint_error,
SqliteError::WalCheckpointFailed(_)
));
let error_msg = format!("{}", checkpoint_error);
assert!(error_msg.contains("WAL checkpoint failed"));
assert!(error_msg.contains("test error"));
}
#[test]
fn test_wal_checkpoint_error_variants() {
let busy_error = SqliteError::WalCheckpointFailed(
"WAL checkpoint blocked by concurrent readers".to_string(),
);
assert!(format!("{}", busy_error).contains("blocked by concurrent readers"));
let incomplete_error = SqliteError::WalCheckpointFailed(
"WAL checkpoint failed - 42 pages remain in WAL".to_string(),
);
assert!(format!("{}", incomplete_error).contains("pages remain in WAL"));
let retry_error = SqliteError::WalCheckpointFailed(
"WAL checkpoint failed after 3 attempts: I/O error".to_string(),
);
assert!(format!("{}", retry_error).contains("failed after 3 attempts"));
}
#[test]
fn test_wal_checkpoint_security_properties() {
let checkpoint_failure = SqliteError::WalCheckpointFailed("simulated failure".to_string());
assert!(matches!(
checkpoint_failure,
SqliteError::WalCheckpointFailed(_)
));
const MAX_RETRY_ATTEMPTS: u32 = 3;
assert_eq!(MAX_RETRY_ATTEMPTS, 3);
let restart_pragma = "PRAGMA wal_checkpoint(RESTART)";
assert!(restart_pragma.contains("RESTART"));
assert!(!restart_pragma.contains("FULL"));
}
#[test]
fn test_mutex_transaction_state_transitions() {
let transaction_state = Mutex::new(TransactionState::Autocommit);
{
let mut guard = transaction_state.lock();
*guard = TransactionState::InTransaction;
}
assert_eq!(*transaction_state.lock(), TransactionState::InTransaction);
{
let mut guard = transaction_state.lock();
assert_eq!(*guard, TransactionState::InTransaction);
*guard = TransactionState::NeedsRollback;
}
assert_eq!(*transaction_state.lock(), TransactionState::NeedsRollback);
}
#[test]
fn test_concurrent_rollback_prevention() {
use std::sync::Arc;
use std::thread;
let transaction_state = Arc::new(Mutex::new(TransactionState::NeedsRollback));
let state1 = Arc::clone(&transaction_state);
let state2 = Arc::clone(&transaction_state);
let handle1 = thread::spawn(move || {
let mut guard = state1.lock();
if *guard == TransactionState::NeedsRollback {
*guard = TransactionState::RollingBack;
true } else {
false
}
});
let handle2 = thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_nanos(1));
let mut guard = state2.lock();
if *guard == TransactionState::NeedsRollback {
*guard = TransactionState::RollingBack;
true
} else {
false }
});
let result1 = handle1.join().unwrap();
let result2 = handle2.join().unwrap();
assert_ne!(
result1, result2,
"Mutex should prevent concurrent state modification"
);
assert_eq!(*transaction_state.lock(), TransactionState::RollingBack);
}
#[test]
fn test_rollback_state_machine() {
let transaction_state = Mutex::new(TransactionState::Autocommit);
{
let mut guard = transaction_state.lock();
*guard = TransactionState::InTransaction;
}
assert_eq!(*transaction_state.lock(), TransactionState::InTransaction);
{
let mut guard = transaction_state.lock();
*guard = TransactionState::NeedsRollback;
}
assert_eq!(*transaction_state.lock(), TransactionState::NeedsRollback);
{
let mut guard = transaction_state.lock();
if *guard == TransactionState::NeedsRollback {
*guard = TransactionState::RollingBack;
}
}
assert_eq!(*transaction_state.lock(), TransactionState::RollingBack);
{
let mut guard = transaction_state.lock();
*guard = TransactionState::Autocommit;
}
assert_eq!(*transaction_state.lock(), TransactionState::Autocommit);
}
#[test]
fn test_concurrency_race_conditions_fixed() {
let transaction_state = Mutex::new(TransactionState::Autocommit);
{
let mut guard = transaction_state.lock();
*guard = TransactionState::NeedsRollback;
}
assert_eq!(*transaction_state.lock(), TransactionState::NeedsRollback);
assert_ne!(
TransactionState::RollingBack,
TransactionState::NeedsRollback
); }
#[test]
fn test_parking_lot_mutex_guard_scoping() {
use std::sync::Arc;
use std::thread;
let transaction_state = Arc::new(Mutex::new(TransactionState::Autocommit));
let state_for_thread = Arc::clone(&transaction_state);
{
let mut guard = transaction_state.lock();
*guard = TransactionState::InTransaction;
}
let handle = thread::spawn(move || {
let mut guard = state_for_thread.lock();
assert_eq!(*guard, TransactionState::InTransaction);
*guard = TransactionState::NeedsRollback;
});
handle.join().unwrap();
assert_eq!(*transaction_state.lock(), TransactionState::NeedsRollback);
}
#[test]
fn test_rollback_mutex_synchronization() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
let transaction_state = Mutex::new(TransactionState::NeedsRollback);
let result = rollback_orphaned_transaction_mutex_guarded(&conn, &transaction_state);
assert!(result.is_ok());
assert_eq!(*transaction_state.lock(), TransactionState::Autocommit);
}
fn create_test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
#[test]
fn test_sqlite_value_display() {
assert_eq!(SqliteValue::Null.to_string(), "NULL");
assert_eq!(SqliteValue::Integer(42).to_string(), "42");
assert_eq!(SqliteValue::Real(3.5).to_string(), "3.5");
assert_eq!(SqliteValue::Text("hello".to_string()).to_string(), "hello");
assert_eq!(
SqliteValue::Blob(vec![1, 2, 3]).to_string(),
"<blob 3 bytes>"
);
}
#[test]
fn test_sqlite_value_accessors() {
assert!(SqliteValue::Null.is_null());
assert!(!SqliteValue::Integer(42).is_null());
assert_eq!(SqliteValue::Integer(42).as_integer(), Some(42));
assert_eq!(SqliteValue::Text("hi".to_string()).as_integer(), None);
assert_eq!(SqliteValue::Real(3.5).as_real(), Some(3.5));
assert_eq!(SqliteValue::Integer(42).as_real(), Some(42.0));
assert_eq!(
SqliteValue::Text("hello".to_string()).as_text(),
Some("hello")
);
assert_eq!(SqliteValue::Integer(42).as_text(), None);
assert_eq!(
SqliteValue::Blob(vec![1, 2, 3]).as_blob(),
Some(&[1, 2, 3][..])
);
}
#[test]
fn test_sqlite_row_accessors() {
let mut columns = BTreeMap::new();
columns.insert("id".to_string(), 0);
columns.insert("name".to_string(), 1);
let columns = Arc::new(columns);
let values = vec![
SqliteValue::Integer(1),
SqliteValue::Text("Alice".to_string()),
];
let row = SqliteRow::new(columns, values);
assert_eq!(row.len(), 2);
assert!(!row.is_empty());
assert_eq!(row.get_i64("id").unwrap(), 1);
assert_eq!(row.get_str("name").unwrap(), "Alice");
assert!(row.get("missing").is_err());
}
#[test]
fn sqlite_error_display_sqlite() {
let err = SqliteError::Sqlite("connection refused".into());
assert_eq!(err.to_string(), "SQLite error: connection refused");
}
#[test]
fn sqlite_error_display_cancelled() {
let err = SqliteError::Cancelled(CancelReason::user("timeout"));
let msg = err.to_string();
assert!(msg.starts_with("SQLite operation cancelled:"), "{msg}");
}
#[test]
fn sqlite_error_display_connection_closed() {
assert_eq!(
SqliteError::ConnectionClosed.to_string(),
"SQLite connection is closed"
);
}
#[test]
fn sqlite_error_display_column_not_found() {
let err = SqliteError::ColumnNotFound("missing_col".into());
assert_eq!(err.to_string(), "Column not found: missing_col");
}
#[test]
fn sqlite_error_display_type_mismatch() {
let err = SqliteError::TypeMismatch {
column: "age".into(),
expected: "integer",
actual: "Text(\"hello\")".into(),
};
assert_eq!(
err.to_string(),
"Type mismatch for column age: expected integer, got Text(\"hello\")"
);
}
#[test]
fn sqlite_error_display_io() {
let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file not found");
let err = SqliteError::Io(io_err);
assert!(err.to_string().starts_with("SQLite I/O error:"), "{err}");
}
#[test]
fn sqlite_error_display_transaction_finished() {
assert_eq!(
SqliteError::TransactionFinished.to_string(),
"Transaction already finished"
);
}
#[test]
fn sqlite_error_display_lock_poisoned() {
assert_eq!(
SqliteError::LockPoisoned.to_string(),
"SQLite connection lock poisoned"
);
}
#[test]
fn sqlite_error_display_unsafe_sql() {
let err = SqliteError::UnsafeSql("PRAGMA statements require *_unchecked".into());
assert_eq!(
err.to_string(),
"Unsafe SQLite control SQL on SQLite binding surface: PRAGMA statements require *_unchecked"
);
}
#[test]
fn sqlite_error_display_unsafe_path() {
let err = SqliteError::UnsafePath("resolved into /etc".into());
assert_eq!(
err.to_string(),
"Unsafe SQLite database path: resolved into /etc"
);
}
#[test]
fn sqlite_error_display_invalid_text_encoding() {
let invalid_utf8 = vec![0x80_u8];
let err = SqliteError::InvalidTextEncoding {
column: "payload".into(),
source: std::str::from_utf8(&invalid_utf8).unwrap_err(),
};
assert!(
err.to_string()
.starts_with("SQLite text column payload contained invalid UTF-8:")
);
}
#[test]
fn sqlite_error_source_io_returns_some() {
use std::error::Error;
let io_err = std::io::Error::other("disk failure");
let err = SqliteError::Io(io_err);
assert!(err.source().is_some());
}
#[test]
fn sqlite_error_source_non_io_returns_none() {
use std::error::Error;
assert!(SqliteError::ConnectionClosed.source().is_none());
assert!(SqliteError::Sqlite("oops".into()).source().is_none());
assert!(SqliteError::LockPoisoned.source().is_none());
assert!(SqliteError::TransactionFinished.source().is_none());
assert!(SqliteError::UnsafeSql("oops".into()).source().is_none());
assert!(SqliteError::ColumnNotFound("x".into()).source().is_none());
}
#[test]
fn sqlite_error_source_invalid_text_encoding_returns_some() {
use std::error::Error;
let invalid_utf8 = vec![0x80_u8];
let err = SqliteError::InvalidTextEncoding {
column: "payload".into(),
source: std::str::from_utf8(&invalid_utf8).unwrap_err(),
};
assert!(err.source().is_some());
}
#[test]
fn checked_sql_surface_rejects_transaction_control_keywords() {
for sql in [
"BEGIN IMMEDIATE",
" -- comment\nROLLBACK",
"/* comment */ SAVEPOINT sp1",
"ATTACH 'tenant.db' AS tenant",
] {
let err = ensure_checked_sql_surface(sql).unwrap_err();
assert!(
matches!(err, SqliteError::UnsafeSql(_)),
"expected unsafe SQL rejection for {sql:?}, got {err:?}"
);
}
}
#[test]
fn checked_sql_surface_rejects_pragma_keywords() {
for sql in [
"PRAGMA read_uncommitted = 1",
" /* comment */ PRAGMA foreign_keys = OFF",
] {
let err = ensure_checked_sql_surface(sql).unwrap_err();
assert!(
matches!(err, SqliteError::UnsafeSql(_)),
"expected unsafe SQL rejection for {sql:?}, got {err:?}"
);
}
}
#[test]
fn unchecked_sql_surface_rejects_attach_detach_keywords() {
for sql in ["ATTACH 'tenant.db' AS tenant", "DETACH tenant"] {
let err = ensure_unchecked_sql_surface(sql).unwrap_err();
assert!(
matches!(err, SqliteError::UnsafeSql(_)),
"expected unsafe SQL rejection for {sql:?}, got {err:?}"
);
}
}
#[test]
fn unchecked_sql_surface_allows_pragma_and_transaction_control() {
for sql in ["PRAGMA journal_mode", "BEGIN IMMEDIATE", "ROLLBACK"] {
ensure_unchecked_sql_surface(sql)
.unwrap_or_else(|err| panic!("unchecked surface should allow {sql:?}: {err:?}"));
}
}
#[test]
fn validate_sqlite_open_path_rejects_tilde_prefixes() {
for raw in ["~/tenant.db", "~alice/tenant.db"] {
let err = validate_sqlite_open_path(Path::new(raw)).unwrap_err();
assert!(
matches!(err, SqliteError::UnsafePath(ref msg) if msg.contains("tilde-prefixed")),
"expected tilde rejection for {raw:?}, got {err:?}"
);
}
}
#[test]
fn validate_sqlite_open_path_rejects_restricted_system_directory() {
let err = validate_sqlite_open_path(Path::new("/etc/asupersync-test.sqlite")).unwrap_err();
assert!(
matches!(err, SqliteError::UnsafePath(ref msg) if msg.contains("/etc")),
"expected /etc rejection, got {err:?}"
);
}
#[test]
fn validate_sqlite_open_path_rejects_parent_directory_traversal() {
for raw in ["../tenant.db", "nested/../../tenant.db"] {
let err = validate_sqlite_open_path(Path::new(raw)).unwrap_err();
assert!(
matches!(err, SqliteError::UnsafePath(ref msg) if msg.contains("parent-directory traversal")),
"expected traversal rejection for {raw:?}, got {err:?}"
);
}
}
#[test]
fn checked_sql_surface_allows_regular_dml() {
for sql in [
"SELECT * FROM users",
"INSERT INTO users(name) VALUES ('alice')",
"WITH cte AS (SELECT 1) SELECT * FROM cte",
] {
ensure_checked_sql_surface(sql)
.unwrap_or_else(|err| panic!("checked surface should allow {sql:?}: {err:?}"));
}
}
#[test]
fn checked_sql_surface_allows_create_trigger_ddl() {
let sql = "
CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT);
CREATE TRIGGER t_audit
AFTER INSERT ON t
BEGIN
INSERT INTO t(name) VALUES ('copied;still literal');
END;
";
ensure_checked_sql_surface(sql)
.unwrap_or_else(|err| panic!("checked surface should allow trigger DDL: {err:?}"));
}
#[test]
fn checked_sql_surface_rejects_top_level_end_transaction_control() {
let err = ensure_checked_sql_surface("END").unwrap_err();
assert!(
matches!(err, SqliteError::UnsafeSql(_)),
"expected unsafe SQL rejection for END, got {err:?}"
);
}
#[test]
fn sqlite_error_from_io_error() {
let io_err = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "denied");
let err: SqliteError = io_err.into();
assert!(matches!(err, SqliteError::Io(_)));
}
#[test]
fn sqlite_value_partial_eq() {
assert_eq!(SqliteValue::Null, SqliteValue::Null);
assert_eq!(SqliteValue::Integer(10), SqliteValue::Integer(10));
assert_ne!(SqliteValue::Integer(10), SqliteValue::Integer(20));
assert_eq!(SqliteValue::Real(1.5), SqliteValue::Real(1.5));
assert_eq!(SqliteValue::Text("a".into()), SqliteValue::Text("a".into()));
assert_ne!(SqliteValue::Text("a".into()), SqliteValue::Text("b".into()));
assert_eq!(SqliteValue::Blob(vec![1, 2]), SqliteValue::Blob(vec![1, 2]));
assert_ne!(SqliteValue::Null, SqliteValue::Integer(0));
}
#[test]
fn sqlite_value_as_real_returns_none_for_text() {
assert_eq!(SqliteValue::Text("nope".into()).as_real(), None);
}
#[test]
fn sqlite_value_as_real_returns_none_for_blob() {
assert_eq!(SqliteValue::Blob(vec![1]).as_real(), None);
}
#[test]
fn sqlite_value_as_real_returns_none_for_null() {
assert_eq!(SqliteValue::Null.as_real(), None);
}
#[test]
fn sqlite_value_as_integer_returns_none_for_real() {
assert_eq!(SqliteValue::Real(3.5).as_integer(), None);
}
#[test]
fn sqlite_value_as_text_returns_none_for_blob() {
assert_eq!(SqliteValue::Blob(vec![0]).as_text(), None);
}
#[test]
fn sqlite_value_as_blob_returns_none_for_text() {
assert_eq!(SqliteValue::Text("x".into()).as_blob(), None);
}
#[test]
fn sqlite_value_as_blob_returns_none_for_null() {
assert_eq!(SqliteValue::Null.as_blob(), None);
}
#[test]
fn sqlite_value_display_empty_blob() {
assert_eq!(SqliteValue::Blob(vec![]).to_string(), "<blob 0 bytes>");
}
#[test]
fn sqlite_value_display_negative_integer() {
assert_eq!(SqliteValue::Integer(-99).to_string(), "-99");
}
fn make_test_sqlite_row(names: &[&str], values: Vec<SqliteValue>) -> SqliteRow {
let mut columns = BTreeMap::new();
for (i, name) in names.iter().enumerate() {
columns.insert(name.to_string(), i);
}
SqliteRow::new(Arc::new(columns), values)
}
#[test]
fn sqlite_row_get_idx_valid() {
let row = make_test_sqlite_row(
&["a", "b"],
vec![SqliteValue::Integer(1), SqliteValue::Text("two".into())],
);
assert_eq!(row.get_idx(0).unwrap(), &SqliteValue::Integer(1));
assert_eq!(row.get_idx(1).unwrap(), &SqliteValue::Text("two".into()));
}
#[test]
fn sqlite_row_get_idx_out_of_bounds() {
let row = make_test_sqlite_row(&["a"], vec![SqliteValue::Null]);
assert!(row.get_idx(5).is_err());
}
#[test]
fn sqlite_row_get_f64_success() {
let row = make_test_sqlite_row(&["val"], vec![SqliteValue::Real(3.5)]);
assert!((row.get_f64("val").unwrap() - 3.5).abs() < f64::EPSILON);
}
#[test]
fn sqlite_row_get_f64_widens_from_integer() {
let row = make_test_sqlite_row(&["val"], vec![SqliteValue::Integer(7)]);
assert!((row.get_f64("val").unwrap() - 7.0).abs() < f64::EPSILON);
}
#[test]
fn sqlite_row_get_f64_type_mismatch() {
let row = make_test_sqlite_row(&["name"], vec![SqliteValue::Text("alice".into())]);
let err = row.get_f64("name").unwrap_err();
assert!(matches!(err, SqliteError::TypeMismatch { .. }));
}
#[test]
fn sqlite_row_get_blob_success() {
let row = make_test_sqlite_row(&["data"], vec![SqliteValue::Blob(vec![0xDE, 0xAD])]);
assert_eq!(row.get_blob("data").unwrap(), &[0xDE, 0xAD]);
}
#[test]
fn sqlite_row_get_blob_type_mismatch() {
let row = make_test_sqlite_row(&["num"], vec![SqliteValue::Integer(42)]);
let err = row.get_blob("num").unwrap_err();
assert!(matches!(err, SqliteError::TypeMismatch { .. }));
}
#[test]
fn sqlite_row_get_i64_type_mismatch() {
let row = make_test_sqlite_row(&["name"], vec![SqliteValue::Text("not_a_number".into())]);
let err = row.get_i64("name").unwrap_err();
assert!(matches!(err, SqliteError::TypeMismatch { .. }));
}
#[test]
fn sqlite_row_get_str_type_mismatch() {
let row = make_test_sqlite_row(&["id"], vec![SqliteValue::Integer(1)]);
let err = row.get_str("id").unwrap_err();
assert!(matches!(err, SqliteError::TypeMismatch { .. }));
}
#[test]
fn sqlite_row_column_names() {
let row = make_test_sqlite_row(
&["alpha", "beta", "gamma"],
vec![SqliteValue::Null, SqliteValue::Null, SqliteValue::Null],
);
let names: Vec<&str> = row.column_names().collect();
assert_eq!(names, vec!["alpha", "beta", "gamma"]);
}
#[test]
fn sqlite_row_empty() {
let row = make_test_sqlite_row(&[], vec![]);
assert_eq!(row.len(), 0);
assert!(row.is_empty());
assert!(row.get_idx(0).is_err());
assert_eq!(row.column_names().count(), 0);
}
#[test]
fn sqlite_row_get_column_not_found() {
let row = make_test_sqlite_row(&["exists"], vec![SqliteValue::Integer(1)]);
let err = row.get("nope").unwrap_err();
assert!(matches!(err, SqliteError::ColumnNotFound(_)));
}
#[test]
fn test_open_in_memory_exec_query_round_trip() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
match conn
.execute(
&cx,
"INSERT INTO t(name) VALUES (?1)",
&[SqliteValue::Text("alice".to_string())],
)
.await
{
Outcome::Ok(1) => {}
other => panic!("insert failed: {other:?}"),
}
let rows = match conn.query(&cx, "SELECT name FROM t", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("query failed: {other:?}"),
};
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get_str("name").unwrap(), "alice");
});
}
#[test]
fn sqlite_query_stream_yields_many_rows_with_single_row_buffer() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE streamed (id INTEGER PRIMARY KEY, payload TEXT);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("create streamed table failed: {other:?}"),
}
for id in 0..64 {
let payload = format!("payload-{id:03}-{}", "x".repeat(1024));
match conn
.execute(
&cx,
"INSERT INTO streamed(id, payload) VALUES (?1, ?2)",
&[SqliteValue::Integer(id), SqliteValue::Text(payload)],
)
.await
{
Outcome::Ok(1) => {}
other => panic!("streamed insert {id} failed: {other:?}"),
}
}
let mut stream = match conn
.query_stream(&cx, "SELECT id, payload FROM streamed ORDER BY id", &[])
.await
{
Outcome::Ok(stream) => stream,
other => panic!("query_stream failed to start: {other:?}"),
};
let mut ids = Vec::new();
while let Outcome::Ok(Some(row)) = stream.next(&cx).await {
ids.push(row.get_i64("id").unwrap());
assert_eq!(
row.get_str("payload").unwrap().len(),
"payload-000-".len().saturating_add(1024)
);
}
let stats = stream.stats();
assert_eq!(ids, (0..64).collect::<Vec<_>>());
assert_eq!(stats.rows_yielded, 64);
assert_eq!(stats.rows_stepped, 64);
assert_eq!(stats.buffered_rows, 0);
assert_eq!(stats.channel_capacity, SQLITE_ROW_STREAM_CHANNEL_CAPACITY);
assert!(
stats.peak_buffered_rows <= SQLITE_ROW_STREAM_CHANNEL_CAPACITY,
"SQLite row stream must not buffer more than one row: {stats:?}"
);
});
}
#[test]
fn sqlite_query_stream_drop_finalizes_statement_and_returns_connection() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE streamed_drop (id INTEGER PRIMARY KEY);
INSERT INTO streamed_drop(id) VALUES (1), (2), (3), (4);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("create streamed_drop table failed: {other:?}"),
}
let mut stream = match conn
.query_stream(&cx, "SELECT id FROM streamed_drop ORDER BY id", &[])
.await
{
Outcome::Ok(stream) => stream,
other => panic!("query_stream failed to start: {other:?}"),
};
match stream.next(&cx).await {
Outcome::Ok(Some(row)) => assert_eq!(row.get_i64("id").unwrap(), 1),
other => panic!("first stream row failed: {other:?}"),
}
drop(stream);
let rows = match conn
.query(&cx, "SELECT COUNT(*) AS count FROM streamed_drop", &[])
.await
{
Outcome::Ok(rows) => rows,
other => panic!("connection was not returned after stream drop: {other:?}"),
};
assert_eq!(rows[0].get_i64("count").unwrap(), 4);
});
}
#[test]
fn sqlite_query_stream_surfaces_query_error_on_next() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
let mut stream = match conn
.query_stream(&cx, "SELECT value FROM missing_table", &[])
.await
{
Outcome::Ok(stream) => stream,
other => panic!("query_stream should defer SQLite prepare errors: {other:?}"),
};
match stream.next(&cx).await {
Outcome::Err(SqliteError::Sqlite(message)) => {
assert!(
message.contains("missing_table") || message.contains("no such table"),
"unexpected SQLite error: {message}"
);
}
other => panic!("missing table should surface through stream next: {other:?}"),
}
});
}
#[test]
fn sqlite_query_stream_cancelled_next_closes_stream_and_connection_recovers() {
let cx = create_test_cx();
let cancel_cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE streamed_cancel (id INTEGER PRIMARY KEY);
INSERT INTO streamed_cancel(id) VALUES (1), (2), (3);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("create streamed_cancel table failed: {other:?}"),
}
let mut stream = match conn
.query_stream(&cx, "SELECT id FROM streamed_cancel ORDER BY id", &[])
.await
{
Outcome::Ok(stream) => stream,
other => panic!("query_stream failed to start: {other:?}"),
};
cancel_cx.set_cancel_requested(true);
match stream.next(&cancel_cx).await {
Outcome::Cancelled(_) => {}
other => panic!("cancelled stream next should return Cancelled: {other:?}"),
}
let rows = match conn
.query(&cx, "SELECT COUNT(*) AS count FROM streamed_cancel", &[])
.await
{
Outcome::Ok(rows) => rows,
other => panic!("connection was not returned after stream cancel: {other:?}"),
};
assert_eq!(rows[0].get_i64("count").unwrap(), 3);
});
}
#[test]
fn sqlite_file_persists_while_memory_resets_under_lab_runtime() {
init_test_logging();
let dir = tempdir().unwrap();
let db_path = dir.path().join("lab_runtime_persistence.sqlite3");
let config = TestConfig::new()
.with_seed(0x51A7_1001)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let (persisted_name, memory_table_count) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = Cx::current().expect("lab runtime should install a current Cx");
let file_conn = match SqliteConnection::open(&cx, &db_path).await {
Outcome::Ok(conn) => conn,
other => panic!("file open failed: {other:?}"),
};
match file_conn
.execute_batch(
&cx,
"CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT);
INSERT INTO t(name) VALUES ('persisted');",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("file schema setup failed: {other:?}"),
}
tracing::info!(
event = %serde_json::json!({
"phase": "file_seeded",
"path": db_path.display().to_string(),
}),
"sqlite_lab_checkpoint"
);
file_conn.close().unwrap();
let reopened_file = match SqliteConnection::open(&cx, &db_path).await {
Outcome::Ok(conn) => conn,
other => panic!("file reopen failed: {other:?}"),
};
let file_rows = match reopened_file.query(&cx, "SELECT name FROM t", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("file query failed after reopen: {other:?}"),
};
let persisted_name = file_rows[0].get_str("name").unwrap().to_string();
tracing::info!(
event = %serde_json::json!({
"phase": "file_reopened",
"row_count": file_rows.len(),
"name": persisted_name,
}),
"sqlite_lab_checkpoint"
);
reopened_file.close().unwrap();
let memory_conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("memory open failed: {other:?}"),
};
match memory_conn
.execute_batch(
&cx,
"CREATE TABLE ephemeral (id INTEGER PRIMARY KEY, name TEXT);
INSERT INTO ephemeral(name) VALUES ('transient');",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("memory schema setup failed: {other:?}"),
}
tracing::info!(
event = %serde_json::json!({
"phase": "memory_seeded",
"table": "ephemeral",
}),
"sqlite_lab_checkpoint"
);
memory_conn.close().unwrap();
let reopened_memory = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("memory reopen failed: {other:?}"),
};
let memory_rows = match reopened_memory
.query(
&cx,
"SELECT name FROM sqlite_master WHERE type='table' AND name='ephemeral'",
&[],
)
.await
{
Outcome::Ok(rows) => rows,
other => panic!("memory table probe failed after reopen: {other:?}"),
};
tracing::info!(
event = %serde_json::json!({
"phase": "memory_reopened",
"table_count": memory_rows.len(),
}),
"sqlite_lab_checkpoint"
);
reopened_memory.close().unwrap();
(persisted_name, memory_rows.len())
});
assert_eq!(persisted_name, "persisted");
assert_eq!(memory_table_count, 0);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"sqlite lab persistence test should leave runtime invariants clean: {violations:?}"
);
}
#[test]
fn sqlite_transaction_commit_persists_under_lab_runtime() {
init_test_logging();
let config = TestConfig::new()
.with_seed(0x51A7_2002)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let (count_inside_tx, count_after_commit, committed_name) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = Cx::current().expect("lab runtime should install a current Cx");
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE tx_items (id INTEGER PRIMARY KEY, name TEXT);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("schema setup failed: {other:?}"),
}
let Outcome::Ok(tx) = conn.begin(&cx).await else {
panic!("begin failed");
};
match tx
.execute(
&cx,
"INSERT INTO tx_items(name) VALUES (?1)",
&[SqliteValue::Text("committed".to_string())],
)
.await
{
Outcome::Ok(1) => {}
other => panic!("insert in transaction failed: {other:?}"),
}
let rows_inside = match tx
.query(&cx, "SELECT COUNT(*) AS count FROM tx_items", &[])
.await
{
Outcome::Ok(rows) => rows,
other => panic!("count query inside transaction failed: {other:?}"),
};
let count_inside_tx = rows_inside[0]
.get_i64("count")
.expect("count column should be present");
tracing::info!(
event = %serde_json::json!({
"phase": "transaction_inserted",
"count_inside_tx": count_inside_tx,
}),
"sqlite_lab_checkpoint"
);
match tx.commit(&cx).await {
Outcome::Ok(()) => {}
other => panic!("commit failed: {other:?}"),
}
let rows_after = match conn
.query(
&cx,
"SELECT COUNT(*) AS count, MIN(name) AS name FROM tx_items",
&[],
)
.await
{
Outcome::Ok(rows) => rows,
other => panic!("query after commit failed: {other:?}"),
};
let count_after_commit = rows_after[0]
.get_i64("count")
.expect("count column should be present");
let committed_name = rows_after[0]
.get_str("name")
.expect("name column should be present")
.to_string();
tracing::info!(
event = %serde_json::json!({
"phase": "transaction_committed",
"count_after_commit": count_after_commit,
"name": committed_name,
}),
"sqlite_lab_checkpoint"
);
conn.close().unwrap();
(count_inside_tx, count_after_commit, committed_name)
});
assert_eq!(count_inside_tx, 1);
assert_eq!(count_after_commit, 1);
assert_eq!(committed_name, "committed");
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"sqlite lab transaction test should leave runtime invariants clean: {violations:?}"
);
assert!(
runtime.is_quiescent(),
"lab runtime should reach quiescence"
);
}
#[test]
fn transaction_commit_cancelled_does_not_mark_finished_before_commit_runs() {
let cx = create_test_cx();
let cancelled_cx = create_test_cx();
cancelled_cx.cancel_fast(crate::types::CancelKind::User);
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
let Outcome::Ok(tx) = conn.begin(&cx).await else {
panic!("begin failed");
};
match tx.commit(&cancelled_cx).await {
Outcome::Cancelled(_) => {}
other => panic!("expected cancelled commit, got: {other:?}"),
}
for _ in 0..8 {
if conn
.inner
.lock()
.get()
.is_ok_and(rusqlite::Connection::is_autocommit)
{
break;
}
match conn.query(&cx, "SELECT 1", &[]).await {
Outcome::Ok(_) => {}
other => panic!("probe query failed: {other:?}"),
}
}
assert!(
conn.inner
.lock()
.get()
.is_ok_and(rusqlite::Connection::is_autocommit),
"connection should return to autocommit after cancelled commit drop path"
);
});
}
#[test]
fn open_file_sets_wal_mode() {
let cx = create_test_cx();
let dir = tempdir().unwrap();
let db_path = dir.path().join("wal_mode.sqlite3");
block_on(async {
let conn = match SqliteConnection::open(&cx, &db_path).await {
Outcome::Ok(conn) => conn,
other => panic!("open failed: {other:?}"),
};
let rows = match conn.query_unchecked(&cx, "PRAGMA journal_mode", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("query pragma failed: {other:?}"),
};
let mode = rows[0]
.get_idx(0)
.unwrap()
.as_text()
.unwrap()
.to_ascii_lowercase();
assert_eq!(mode, "wal");
});
}
#[test]
fn query_rejects_invalid_utf8_text() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.query_unchecked(&cx, "SELECT CAST(X'80' AS TEXT) AS bad_text", &[])
.await
{
Outcome::Err(SqliteError::InvalidTextEncoding { column, .. }) => {
assert_eq!(column, "bad_text");
}
other => panic!("expected invalid UTF-8 rejection, got: {other:?}"),
}
});
}
#[test]
fn unchecked_execute_rejects_attach_database() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_unchecked(&cx, "ATTACH ':memory:' AS audit", &[])
.await
{
Outcome::Err(SqliteError::UnsafeSql(msg)) => {
assert!(msg.contains("ATTACH and DETACH"));
}
other => panic!("expected ATTACH rejection, got: {other:?}"),
}
});
}
#[test]
fn open_rejects_tilde_prefixed_paths_before_rusqlite() {
let cx = create_test_cx();
block_on(async {
match SqliteConnection::open(&cx, "~/tenant.sqlite").await {
Outcome::Err(SqliteError::UnsafePath(msg)) => {
assert!(msg.contains("tilde-prefixed"));
}
other => panic!("expected unsafe path rejection, got: {other:?}"),
}
});
}
#[test]
fn open_rejects_parent_directory_traversal_before_rusqlite() {
let cx = create_test_cx();
block_on(async {
match SqliteConnection::open(&cx, "../tenant.sqlite").await {
Outcome::Err(SqliteError::UnsafePath(msg)) => {
assert!(msg.contains("parent-directory traversal"));
}
other => panic!("expected unsafe traversal rejection, got: {other:?}"),
}
});
}
#[test]
fn separate_validated_connections_keep_schema_isolated_without_attach() {
let cx = create_test_cx();
block_on(async {
let dir = tempfile::tempdir().expect("tempdir");
let first_path = dir.path().join("tenant_a.sqlite3");
let second_path = dir.path().join("tenant_b.sqlite3");
let first = match SqliteConnection::open(&cx, &first_path).await {
Outcome::Ok(conn) => conn,
other => panic!("open first db failed: {other:?}"),
};
let second = match SqliteConnection::open(&cx, &second_path).await {
Outcome::Ok(conn) => conn,
other => panic!("open second db failed: {other:?}"),
};
match first
.execute_batch(
&cx,
"CREATE TABLE tenant_only (id INTEGER PRIMARY KEY, value TEXT);
INSERT INTO tenant_only(value) VALUES ('a');",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("seed first db failed: {other:?}"),
}
let rows = match second
.query(
&cx,
"SELECT name FROM sqlite_master WHERE type='table' AND name='tenant_only'",
&[],
)
.await
{
Outcome::Ok(rows) => rows,
other => panic!("query second db failed: {other:?}"),
};
assert!(
rows.is_empty(),
"separate validated sqlite connections must not share attached schema state"
);
});
}
#[test]
fn sqlite_rowid_max_round_trips_without_overflow() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
match conn
.execute(
&cx,
"INSERT INTO t(id, name) VALUES (?1, ?2)",
&[
SqliteValue::Integer(i64::MAX),
SqliteValue::Text("max-rowid".to_string()),
],
)
.await
{
Outcome::Ok(1) => {}
other => panic!("insert failed: {other:?}"),
}
let rows = match conn
.query(&cx, "SELECT rowid AS rowid, id, name FROM t", &[])
.await
{
Outcome::Ok(rows) => rows,
other => panic!("query failed: {other:?}"),
};
assert_eq!(rows[0].get_i64("rowid").unwrap(), i64::MAX);
assert_eq!(rows[0].get_i64("id").unwrap(), i64::MAX);
assert_eq!(rows[0].get_str("name").unwrap(), "max-rowid");
});
}
#[test]
fn sqlite_rowid_overflow_literal_is_rejected() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
match conn
.execute_unchecked(
&cx,
"INSERT INTO t(id, name) VALUES(9223372036854775808, 'overflow')",
&[],
)
.await
{
Outcome::Err(SqliteError::Sqlite(msg)) => {
assert!(
msg.to_ascii_lowercase().contains("datatype mismatch"),
"unexpected rowid overflow error: {msg}"
);
}
other => panic!("expected rowid overflow rejection, got: {other:?}"),
}
});
}
#[test]
fn transaction_drop_rolls_back_uncommitted_work() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
let Outcome::Ok(tx) = conn.begin(&cx).await else {
panic!("begin failed");
};
match tx
.execute(
&cx,
"INSERT INTO t(v) VALUES (?1)",
&[SqliteValue::Text("x".to_string())],
)
.await
{
Outcome::Ok(1) => {}
other => panic!("insert in tx failed: {other:?}"),
}
drop(tx);
let rows = match conn.query(&cx, "SELECT COUNT(*) FROM t", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("count query failed: {other:?}"),
};
assert_eq!(rows[0].get_idx(0).unwrap().as_integer(), Some(0));
});
}
#[test]
fn transaction_drop_preserves_foreign_key_cascade_consistency() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"
CREATE TABLE parent (id INTEGER PRIMARY KEY);
CREATE TABLE child (
id INTEGER PRIMARY KEY,
parent_id INTEGER NOT NULL REFERENCES parent(id) ON DELETE CASCADE
);
INSERT INTO parent(id) VALUES (1);
INSERT INTO child(id, parent_id) VALUES (10, 1);
",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("schema setup failed: {other:?}"),
}
let Outcome::Ok(tx) = conn.begin_immediate(&cx).await else {
panic!("begin_immediate failed");
};
match tx
.execute(&cx, "DELETE FROM parent WHERE id = 1", &[])
.await
{
Outcome::Ok(1) => {}
other => panic!("delete in transaction failed: {other:?}"),
}
drop(tx);
let parent_rows = match conn.query(&cx, "SELECT COUNT(*) FROM parent", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("parent count failed: {other:?}"),
};
let child_rows = match conn.query(&cx, "SELECT COUNT(*) FROM child", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("child count failed: {other:?}"),
};
assert_eq!(parent_rows[0].get_idx(0).unwrap().as_integer(), Some(1));
assert_eq!(child_rows[0].get_idx(0).unwrap().as_integer(), Some(1));
match conn
.execute(&cx, "DELETE FROM parent WHERE id = 1", &[])
.await
{
Outcome::Ok(1) => {}
other => panic!("post-rollback delete failed: {other:?}"),
}
let child_rows = match conn.query(&cx, "SELECT COUNT(*) FROM child", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("child recount failed: {other:?}"),
};
assert_eq!(child_rows[0].get_idx(0).unwrap().as_integer(), Some(0));
});
}
#[test]
fn cached_statements_remain_usable_after_schema_change() {
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
{
let guard = conn.inner.lock();
let raw = guard.get().expect("connection open");
raw.set_prepared_statement_cache_capacity(1);
}
match conn
.execute_batch(
&cx,
"
CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT);
INSERT INTO t(value) VALUES ('before');
",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("initial schema setup failed: {other:?}"),
}
match conn
.query(&cx, "SELECT value FROM t WHERE id = 1", &[])
.await
{
Outcome::Ok(rows) => assert_eq!(rows[0].get_str("value").unwrap(), "before"),
other => panic!("initial cached query failed: {other:?}"),
}
match conn.query(&cx, "SELECT id FROM t WHERE id = 1", &[]).await {
Outcome::Ok(rows) => assert_eq!(rows[0].get_i64("id").unwrap(), 1),
other => panic!("second cached query failed: {other:?}"),
}
match conn
.execute_batch(
&cx,
"
DROP TABLE t;
CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT);
INSERT INTO t(value) VALUES ('after');
",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("schema rebuild failed: {other:?}"),
}
match conn
.query(&cx, "SELECT value FROM t WHERE id = 1", &[])
.await
{
Outcome::Ok(rows) => assert_eq!(rows[0].get_str("value").unwrap(), "after"),
other => panic!("cached query after schema change failed: {other:?}"),
}
});
}
#[test]
fn busy_timeout_produces_lock_error_under_write_contention() {
let cx = create_test_cx();
let dir = tempdir().unwrap();
let db_path = dir.path().join("busy_timeout.sqlite3");
block_on(async {
let conn1 = match SqliteConnection::open(&cx, &db_path).await {
Outcome::Ok(conn) => conn,
other => panic!("open conn1 failed: {other:?}"),
};
let conn2 = match SqliteConnection::open(&cx, &db_path).await {
Outcome::Ok(conn) => conn,
other => panic!("open conn2 failed: {other:?}"),
};
match conn1
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
match conn2.set_busy_timeout(&cx, Duration::from_millis(50)).await {
Outcome::Ok(()) => {}
other => panic!("set_busy_timeout failed: {other:?}"),
}
let Outcome::Ok(tx) = conn1.begin_immediate(&cx).await else {
panic!("begin_immediate failed");
};
match conn2
.execute(
&cx,
"INSERT INTO t(v) VALUES (?1)",
&[SqliteValue::Text("blocked".to_string())],
)
.await
{
Outcome::Err(SqliteError::Sqlite(msg)) => {
let lower = msg.to_ascii_lowercase();
assert!(
lower.contains("database is locked") || lower.contains("database is busy"),
"unexpected busy error message: {msg}"
);
}
other => panic!("expected lock error, got: {other:?}"),
}
match tx.rollback(&cx).await {
Outcome::Ok(()) => {}
other => panic!("rollback failed: {other:?}"),
}
});
}
#[test]
fn execute_with_cancelled_cx_does_not_mutate_state() {
let cx = create_test_cx();
let cancelled = create_test_cx();
cancelled.cancel_fast(crate::types::CancelKind::User);
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(&cx, "CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);")
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
match conn
.execute(
&cancelled,
"INSERT INTO t(v) VALUES (?1)",
&[SqliteValue::Text("never".to_string())],
)
.await
{
Outcome::Cancelled(_) => {}
other => panic!("expected cancellation, got: {other:?}"),
}
let rows = match conn.query(&cx, "SELECT COUNT(*) FROM t", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("count query failed: {other:?}"),
};
assert_eq!(rows[0].get_idx(0).unwrap().as_integer(), Some(0));
});
}
#[cfg(feature = "sqlite")]
mod pragma_journal_mode_conformance {
use super::*;
use crate::test_utils::run_test_with_cx;
use std::fs;
use std::path::PathBuf;
use tempfile::TempDir;
struct JournalModeTestData {
temp_dir: TempDir,
db_path: PathBuf,
}
impl JournalModeTestData {
fn new() -> Self {
let temp_dir = tempfile::tempdir().expect("Failed to create temp directory");
let db_path = temp_dir.path().join("test.db");
Self { temp_dir, db_path }
}
fn get_db_path(&self) -> &Path {
&self.db_path
}
fn get_wal_path(&self) -> PathBuf {
self.db_path.with_extension("db-wal")
}
fn get_shm_path(&self) -> PathBuf {
self.db_path.with_extension("db-shm")
}
async fn get_journal_mode(conn: &SqliteConnection, cx: &Cx) -> String {
let rows = match conn.query(cx, "PRAGMA journal_mode", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("Failed to query journal_mode: {other:?}"),
};
rows[0]
.get_idx(0)
.unwrap()
.as_text()
.unwrap_or_else(|| panic!("journal_mode should return a string"))
.to_owned()
}
async fn set_journal_mode(
conn: &SqliteConnection,
cx: &Cx,
mode: &str,
) -> Outcome<String, SqliteError> {
let sql = format!("PRAGMA journal_mode = {}", mode);
match conn.query(cx, &sql, &[]).await {
Outcome::Ok(rows) => Outcome::Ok(
rows[0]
.get_idx(0)
.unwrap()
.as_text()
.unwrap_or_else(|| panic!("journal_mode pragma should return a string"))
.to_owned(),
),
Outcome::Err(err) => Outcome::Err(err),
Outcome::Cancelled(cancelled) => Outcome::Cancelled(cancelled),
Outcome::Panicked(payload) => Outcome::Panicked(payload),
}
}
async fn setup_test_data(conn: &SqliteConnection, cx: &Cx) {
match conn
.execute_batch(
cx,
"
CREATE TABLE test_data (
id INTEGER PRIMARY KEY,
value TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO test_data (value) VALUES ('test1'), ('test2'), ('test3');
",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("Failed to create test data: {other:?}"),
}
}
async fn verify_test_data(conn: &SqliteConnection, cx: &Cx, expected_count: i64) {
let rows = match conn.query(cx, "SELECT COUNT(*) FROM test_data", &[]).await {
Outcome::Ok(rows) => rows,
other => panic!("Failed to count test data: {other:?}"),
};
let count = rows[0].get_idx(0).unwrap().as_integer().unwrap();
assert_eq!(count, expected_count, "Test data count mismatch");
}
}
#[test]
fn delete_to_wal_mode_transition_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
let initial_mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
assert_eq!(
initial_mode.to_lowercase(),
"delete",
"Should start in DELETE mode"
);
JournalModeTestData::setup_test_data(&conn, &cx).await;
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
let wal_result =
match JournalModeTestData::set_journal_mode(&conn, &cx, "WAL").await {
Outcome::Ok(mode) => mode,
other => panic!("Failed to set WAL mode: {other:?}"),
};
assert_eq!(
wal_result.to_lowercase(),
"wal",
"Should transition to WAL mode"
);
let current_mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
assert_eq!(
current_mode.to_lowercase(),
"wal",
"Journal mode should be WAL"
);
assert!(
test_data.get_wal_path().exists(),
"WAL file should be created"
);
assert!(
test_data.get_shm_path().exists(),
"SHM file should be created"
);
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
match conn
.execute(
&cx,
"INSERT INTO test_data (value) VALUES (?)",
&[SqliteValue::Text("wal_data".to_owned())],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to insert WAL data: {other:?}"),
};
JournalModeTestData::verify_test_data(&conn, &cx, 4).await;
conn.close().unwrap();
});
}
#[test]
fn wal_to_truncate_mode_transition_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
match JournalModeTestData::set_journal_mode(&conn, &cx, "WAL").await {
Outcome::Ok(_) => {}
other => panic!("Failed to set WAL mode: {other:?}"),
};
JournalModeTestData::setup_test_data(&conn, &cx).await;
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
assert!(test_data.get_wal_path().exists(), "WAL file should exist");
let truncate_result =
match JournalModeTestData::set_journal_mode(&conn, &cx, "TRUNCATE").await {
Outcome::Ok(mode) => mode,
other => panic!("Failed to set TRUNCATE mode: {other:?}"),
};
assert_eq!(
truncate_result.to_lowercase(),
"truncate",
"Should transition to TRUNCATE mode"
);
let current_mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
assert_eq!(
current_mode.to_lowercase(),
"truncate",
"Journal mode should be TRUNCATE"
);
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
match conn
.execute(
&cx,
"INSERT INTO test_data (value) VALUES (?)",
&[SqliteValue::Text("truncate_data".to_owned())],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to insert TRUNCATE data: {other:?}"),
};
JournalModeTestData::verify_test_data(&conn, &cx, 4).await;
conn.close().unwrap();
});
}
#[test]
fn memory_mode_persistence_loss_conformance() {
run_test_with_cx(|cx| async move {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open in-memory connection: {other:?}"),
};
let memory_result =
match JournalModeTestData::set_journal_mode(&conn, &cx, "MEMORY").await {
Outcome::Ok(mode) => mode,
other => panic!("Failed to set MEMORY mode: {other:?}"),
};
assert_eq!(
memory_result.to_lowercase(),
"memory",
"Should be in MEMORY mode"
);
JournalModeTestData::setup_test_data(&conn, &cx).await;
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
match conn
.execute_batch(
&cx,
"
BEGIN TRANSACTION;
INSERT INTO test_data (value) VALUES ('memory_test');
UPDATE test_data SET value = 'modified' WHERE id = 1;
",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("Failed to begin transaction: {other:?}"),
};
conn.close().unwrap();
let new_conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to reopen in-memory connection: {other:?}"),
};
let tables_result = new_conn
.query(
&cx,
"SELECT name FROM sqlite_master WHERE type='table'",
&[],
)
.await;
match tables_result {
Outcome::Ok(rows) => {
assert_eq!(
rows.len(),
0,
"In-memory database should have no persistent tables"
);
}
other => panic!("Failed to query sqlite_master: {other:?}"),
}
new_conn.close().unwrap();
});
}
#[test]
fn off_mode_atomicity_absence_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
let off_result =
match JournalModeTestData::set_journal_mode(&conn, &cx, "OFF").await {
Outcome::Ok(mode) => mode,
other => panic!("Failed to set OFF mode: {other:?}"),
};
assert_eq!(off_result.to_lowercase(), "off", "Should be in OFF mode");
match conn
.execute_batch(
&cx,
"
CREATE TABLE atomicity_test (
id INTEGER PRIMARY KEY,
step INTEGER,
data TEXT
);
",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("Failed to create table: {other:?}"),
};
match conn.execute_unchecked(&cx, "BEGIN TRANSACTION", &[]).await {
Outcome::Ok(_) => {}
other => panic!("Failed to begin transaction: {other:?}"),
};
match conn
.execute(
&cx,
"INSERT INTO atomicity_test (step, data) VALUES (1, 'step1')",
&[],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to insert step1: {other:?}"),
};
match conn
.execute(
&cx,
"INSERT INTO atomicity_test (step, data) VALUES (2, 'step2')",
&[],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to insert step2: {other:?}"),
};
match conn.execute_unchecked(&cx, "COMMIT", &[]).await {
Outcome::Ok(_) => {}
other => panic!("Failed to commit: {other:?}"),
};
let rows = match conn
.query(&cx, "SELECT COUNT(*) FROM atomicity_test", &[])
.await
{
Outcome::Ok(rows) => rows,
other => panic!("Failed to count rows: {other:?}"),
};
let count = rows[0].get_idx(0).unwrap().as_integer().unwrap();
assert_eq!(count, 2, "Both inserts should be present");
let journal_files = fs::read_dir(test_data.temp_dir.path())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry.path().extension().map_or(false, |ext| {
ext == "journal" || ext == "wal" || ext == "shm"
})
})
.count();
assert_eq!(journal_files, 0, "OFF mode should not create journal files");
conn.close().unwrap();
});
}
#[test]
fn unsupported_mode_fallback_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
let invalid_modes = ["INVALID", "BOGUS", "NONEXISTENT"];
for invalid_mode in &invalid_modes {
match JournalModeTestData::set_journal_mode(&conn, &cx, invalid_mode).await {
Outcome::Ok(returned_mode) => {
assert_ne!(
returned_mode.to_lowercase(),
invalid_mode.to_lowercase(),
"Should not accept invalid mode: {}",
invalid_mode
);
let valid_modes =
["delete", "truncate", "persist", "memory", "wal", "off"];
assert!(
valid_modes.contains(&returned_mode.to_lowercase().as_str()),
"Fallback should be a valid journal mode, got: {}",
returned_mode
);
}
Outcome::Err(_) => {
}
other => panic!(
"Unexpected outcome for invalid mode {}: {other:?}",
invalid_mode
),
}
let current_mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
assert!(
!current_mode.is_empty(),
"Should still have a valid journal mode after invalid attempt"
);
}
JournalModeTestData::setup_test_data(&conn, &cx).await;
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
conn.close().unwrap();
});
}
#[test]
fn journal_mode_persistence_across_connections_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
{
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
match JournalModeTestData::set_journal_mode(&conn, &cx, "WAL").await {
Outcome::Ok(_) => {}
other => panic!("Failed to set WAL mode: {other:?}"),
};
JournalModeTestData::setup_test_data(&conn, &cx).await;
conn.close().unwrap();
}
{
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to reopen connection: {other:?}"),
};
let persistent_mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
assert_eq!(
persistent_mode.to_lowercase(),
"wal",
"WAL mode should persist across connections"
);
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
conn.close().unwrap();
}
});
}
#[test]
fn journal_mode_concurrent_access_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
match JournalModeTestData::set_journal_mode(&conn, &cx, "WAL").await {
Outcome::Ok(_) => {}
other => panic!("Failed to set WAL mode: {other:?}"),
};
JournalModeTestData::setup_test_data(&conn, &cx).await;
let reader_conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open reader connection: {other:?}"),
};
JournalModeTestData::verify_test_data(&conn, &cx, 3).await;
JournalModeTestData::verify_test_data(&reader_conn, &cx, 3).await;
match conn
.execute(
&cx,
"INSERT INTO test_data (value) VALUES (?)",
&[SqliteValue::Text("concurrent_write".to_owned())],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed concurrent write: {other:?}"),
};
JournalModeTestData::verify_test_data(&conn, &cx, 4).await;
reader_conn.close().unwrap();
conn.close().unwrap();
});
}
#[test]
fn journal_mode_edge_cases_conformance() {
run_test_with_cx(|cx| async move {
let test_data = JournalModeTestData::new();
let conn = match SqliteConnection::open(&cx, test_data.get_db_path()).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
let modes_to_test = [
("wal", "wal"),
("WAL", "wal"),
("Wal", "wal"),
("DELETE", "delete"),
("delete", "delete"),
];
for (input_mode, expected_mode) in &modes_to_test {
match JournalModeTestData::set_journal_mode(&conn, &cx, input_mode).await {
Outcome::Ok(returned_mode) => {
assert_eq!(
returned_mode.to_lowercase(),
expected_mode.to_lowercase(),
"Mode {} should normalize to {}",
input_mode,
expected_mode
);
}
other => panic!("Failed to set mode {}: {other:?}", input_mode),
}
}
for _ in 0..5 {
let mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
assert!(
!mode.is_empty(),
"Journal mode query should always return a value"
);
}
let current_mode = JournalModeTestData::get_journal_mode(&conn, &cx).await;
match JournalModeTestData::set_journal_mode(&conn, &cx, ¤t_mode).await {
Outcome::Ok(returned_mode) => {
assert_eq!(
returned_mode.to_lowercase(),
current_mode.to_lowercase(),
"Setting to current mode should be no-op"
);
}
other => panic!("Failed to set to current mode: {other:?}"),
}
conn.close().unwrap();
});
}
}
mod real_database_integration {
use super::*;
use crate::test_utils::run_test_with_cx;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Instant;
struct RealSqliteConfig {
database_path: String,
enabled: bool,
reason: Option<String>,
}
impl RealSqliteConfig {
fn new() -> Self {
let enabled = std::env::var("REAL_SQLITE_TESTS").unwrap_or_default() == "true";
let db_path =
std::env::var("SQLITE_TEST_PATH").unwrap_or_else(|_| ":memory:".to_string());
let reason = if !enabled {
Some("REAL_SQLITE_TESTS not set to 'true'".to_string())
} else if std::env::var("NODE_ENV").unwrap_or_default() == "production" {
Some("BLOCKED: NODE_ENV=production".to_string())
} else if db_path.contains("prod") || db_path.contains("/var/lib/") {
Some("BLOCKED: Production database path detected".to_string())
} else {
None
};
Self {
database_path: db_path,
enabled: enabled && reason.is_none(),
reason,
}
}
}
#[derive(Debug)]
struct SqliteTestLogger {
test_name: String,
start_time: Instant,
phase_count: AtomicU32,
}
impl SqliteTestLogger {
fn new(test_name: &str) -> Self {
let logger = Self {
test_name: test_name.to_string(),
start_time: Instant::now(),
phase_count: AtomicU32::new(0),
};
eprintln!(
"{{\"test\":\"{}\",\"event\":\"test_start\",\"ts\":\"{}\"}}",
test_name,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
);
logger
}
fn phase(&self, phase_name: &str) {
let phase_num = self.phase_count.fetch_add(1, Ordering::Relaxed);
let elapsed_ms = self.start_time.elapsed().as_millis();
eprintln!(
"{{\"test\":\"{}\",\"event\":\"phase\",\"phase\":\"{}\",\"phase_num\":{},\"elapsed_ms\":{},\"ts\":{}}}",
self.test_name,
phase_name,
phase_num,
elapsed_ms,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
);
}
fn sqlite_operation(&self, operation: &str, result: &str, details: Option<&str>) {
let mut log_entry = format!(
"{{\"test\":\"{}\",\"event\":\"sqlite_operation\",\"operation\":\"{}\",\"result\":\"{}\"",
self.test_name, operation, result
);
if let Some(detail) = details {
log_entry.push_str(&format!(",\"details\":\"{}\"", detail));
}
log_entry.push_str(&format!(
",\"ts\":{}}}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
));
eprintln!("{}", log_entry);
}
fn assert_match(&self, field: &str, expected: &str, actual: &str) -> bool {
let matches = expected == actual;
eprintln!(
"{{\"test\":\"{}\",\"event\":\"assertion\",\"field\":\"{}\",\"expected\":\"{}\",\"actual\":\"{}\",\"matches\":{},\"ts\":{}}}",
self.test_name,
field,
expected,
actual,
matches,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
);
matches
}
fn test_end(&self, result: &str) {
let duration_ms = self.start_time.elapsed().as_millis();
eprintln!(
"{{\"test\":\"{}\",\"event\":\"test_end\",\"result\":\"{}\",\"duration_ms\":{},\"ts\":{}}}",
self.test_name,
result,
duration_ms,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
);
}
}
struct SqliteDataFactory {
counter: AtomicU32,
}
impl SqliteDataFactory {
fn new() -> Self {
Self {
counter: AtomicU32::new(0),
}
}
fn create_user_record(&self) -> (i64, String, String) {
let id = self.counter.fetch_add(1, Ordering::Relaxed) as i64;
let name = format!("user_{}", id);
let email = format!("user{}@test-domain.com", id);
(id, name, email)
}
fn create_batch_records(&self, count: usize) -> Vec<(String, String, i64)> {
(0..count)
.map(|_| {
let (id, name, email) = self.create_user_record();
(name, email, id)
})
.collect()
}
fn create_transaction_batch(
&self,
user_id: i64,
count: usize,
) -> Vec<(i64, String, f64)> {
(0..count)
.map(|i| {
let tx_id = self.counter.fetch_add(1, Ordering::Relaxed) as i64;
let description = format!("Transaction {} for user {}", i, user_id);
let amount = (i as f64) * 10.5 + 1.0; (tx_id, description, amount)
})
.collect()
}
}
fn require_real_sqlite() -> Option<RealSqliteConfig> {
let config = RealSqliteConfig::new();
if !config.enabled {
let reason = config
.reason
.as_deref()
.unwrap_or("Real SQLite testing not available");
eprintln!("SKIPPING: {}", reason);
return None;
}
Some(config)
}
#[test]
fn test_real_sqlite_journal_mode_transitions() {
let Some(config) = require_real_sqlite() else {
return;
};
let log = SqliteTestLogger::new("real_sqlite_journal_mode_transitions");
run_test_with_cx(|cx| async move {
log.phase("setup");
let conn = if config.database_path == ":memory:" {
match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open in-memory connection: {other:?}"),
}
} else {
match SqliteConnection::open(&cx, &config.database_path).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open file connection: {other:?}"),
}
};
log.phase("transaction_isolation_setup");
match conn.execute_unchecked(&cx, "BEGIN TRANSACTION", &[]).await {
Outcome::Ok(_) => log.sqlite_operation("begin_transaction", "success", None),
other => panic!("Failed to begin transaction: {other:?}"),
}
log.phase("schema_and_data_setup");
let factory = SqliteDataFactory::new();
match conn
.execute_batch(
&cx,
"
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE transactions (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
description TEXT NOT NULL,
amount REAL NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_transactions_user_id ON transactions(user_id);
",
)
.await
{
Outcome::Ok(()) => log.sqlite_operation("schema_creation", "success", None),
other => panic!("Failed to create schema: {other:?}"),
}
let users = factory.create_batch_records(10);
for (name, email, user_id) in &users {
match conn
.execute(
&cx,
"INSERT INTO users (id, name, email) VALUES (?1, ?2, ?3)",
&[
SqliteValue::Integer(*user_id),
SqliteValue::Text(name.clone()),
SqliteValue::Text(email.clone()),
],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to insert user: {other:?}"),
}
let transactions = factory.create_transaction_batch(*user_id, 3);
for (tx_id, description, amount) in transactions {
match conn
.execute(
&cx,
"INSERT INTO transactions (id, user_id, description, amount) VALUES (?1, ?2, ?3, ?4)",
&[
SqliteValue::Integer(tx_id),
SqliteValue::Integer(*user_id),
SqliteValue::Text(description),
SqliteValue::Real(amount),
],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to insert transaction: {other:?}"),
}
}
}
log.sqlite_operation(
"test_data_inserted",
"success",
Some(&format!(
"{} users, {} transactions",
users.len(),
users.len().saturating_mul(3)
)),
);
log.phase("journal_mode_testing");
let initial_mode = match conn.query_unchecked(&cx, "PRAGMA journal_mode", &[]).await
{
Outcome::Ok(rows) => rows[0].get_idx(0).unwrap().as_text().unwrap().to_owned(),
other => panic!("Failed to get initial journal mode: {other:?}"),
};
log.sqlite_operation("get_initial_journal_mode", "success", Some(&initial_mode));
let user_count_before =
match conn.query(&cx, "SELECT COUNT(*) FROM users", &[]).await {
Outcome::Ok(rows) => rows[0].get_idx(0).unwrap().as_integer().unwrap(),
other => panic!("Failed to count users: {other:?}"),
};
assert!(log.assert_match(
"user_count_before_journal_change",
"10",
&user_count_before.to_string()
));
log.phase("wal_mode_transition");
match conn
.query_unchecked(&cx, "PRAGMA journal_mode = WAL", &[])
.await
{
Outcome::Ok(rows) => {
let new_mode = rows[0].get_idx(0).unwrap().as_text().unwrap();
log.sqlite_operation("set_journal_mode_wal", "success", Some(new_mode));
if config.database_path != ":memory:" {
assert!(log.assert_match(
"journal_mode_after_wal",
"wal",
&new_mode.to_lowercase()
));
}
}
other => panic!("Failed to set WAL mode: {other:?}"),
}
log.phase("data_integrity_verification");
let user_count_after =
match conn.query(&cx, "SELECT COUNT(*) FROM users", &[]).await {
Outcome::Ok(rows) => rows[0].get_idx(0).unwrap().as_integer().unwrap(),
other => panic!("Failed to count users after mode change: {other:?}"),
};
assert!(log.assert_match(
"user_count_after_journal_change",
"10",
&user_count_after.to_string()
));
let tx_count = match conn
.query(&cx, "SELECT COUNT(*) FROM transactions", &[])
.await
{
Outcome::Ok(rows) => rows[0].get_idx(0).unwrap().as_integer().unwrap(),
other => panic!("Failed to count transactions: {other:?}"),
};
assert!(log.assert_match("transaction_count", "30", &tx_count.to_string()));
log.phase("complex_query_testing");
let user_tx_summary = match conn
.query(
&cx,
"SELECT u.name, COUNT(t.id) as tx_count, SUM(t.amount) as total_amount
FROM users u
LEFT JOIN transactions t ON u.id = t.user_id
GROUP BY u.id, u.name
ORDER BY total_amount DESC
LIMIT 5",
&[],
)
.await
{
Outcome::Ok(rows) => rows,
other => panic!("Failed to execute complex query: {other:?}"),
};
assert!(
user_tx_summary.len() >= 5,
"Should have at least 5 users in summary"
);
log.sqlite_operation(
"complex_query",
"success",
Some(&format!("{} user summaries", user_tx_summary.len())),
);
log.phase("transaction_rollback");
match conn.execute_unchecked(&cx, "ROLLBACK", &[]).await {
Outcome::Ok(_) => log.sqlite_operation("rollback_transaction", "success", None),
other => panic!("Failed to rollback transaction: {other:?}"),
}
log.phase("cleanup");
conn.close().unwrap();
log.test_end("pass");
});
}
#[test]
fn test_real_sqlite_concurrent_access_patterns() {
let Some(_config) = require_real_sqlite() else {
return;
};
let log = SqliteTestLogger::new("real_sqlite_concurrent_access");
run_test_with_cx(|cx| async move {
log.phase("setup");
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("Failed to open connection: {other:?}"),
};
log.phase("wal_mode_setup");
match conn
.query_unchecked(&cx, "PRAGMA journal_mode = WAL", &[])
.await
{
Outcome::Ok(_) => log.sqlite_operation("set_wal_mode", "success", None),
other => panic!("Failed to set WAL mode: {other:?}"),
}
log.phase("schema_setup");
match conn.execute_unchecked(&cx, "BEGIN TRANSACTION", &[]).await {
Outcome::Ok(_) => {}
other => panic!("Failed to begin transaction: {other:?}"),
}
match conn
.execute_batch(
&cx,
"
CREATE TABLE accounts (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
balance REAL NOT NULL DEFAULT 0.0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE transfers (
id INTEGER PRIMARY KEY,
from_account INTEGER NOT NULL,
to_account INTEGER NOT NULL,
amount REAL NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (from_account) REFERENCES accounts(id),
FOREIGN KEY (to_account) REFERENCES accounts(id)
);
",
)
.await
{
Outcome::Ok(()) => log.sqlite_operation("concurrent_schema", "success", None),
other => panic!("Failed to create concurrent test schema: {other:?}"),
}
log.phase("test_data_creation");
let accounts = vec![
(1, "Account A", 1000.0),
(2, "Account B", 500.0),
(3, "Account C", 750.0),
];
for (id, name, balance) in &accounts {
match conn
.execute(
&cx,
"INSERT INTO accounts (id, name, balance) VALUES (?1, ?2, ?3)",
&[
SqliteValue::Integer(*id),
SqliteValue::Text(name.to_string()),
SqliteValue::Real(*balance),
],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to create account: {other:?}"),
}
}
log.phase("concurrent_operations_simulation");
let transfers = vec![
(1, 2, 100.0), (2, 3, 200.0), (3, 1, 150.0), ];
for (from_id, to_id, amount) in &transfers {
let balance_check = match conn
.query(
&cx,
"SELECT balance FROM accounts WHERE id = ?1",
&[SqliteValue::Integer(*from_id)],
)
.await
{
Outcome::Ok(rows) => rows[0].get_idx(0).unwrap().as_real().unwrap(),
other => panic!("Failed to check balance: {other:?}"),
};
if balance_check >= *amount {
match conn
.execute(
&cx,
"INSERT INTO transfers (from_account, to_account, amount, status) VALUES (?1, ?2, ?3, 'completed')",
&[
SqliteValue::Integer(*from_id),
SqliteValue::Integer(*to_id),
SqliteValue::Real(*amount),
],
)
.await
{
Outcome::Ok(_) => log.sqlite_operation("transfer_created", "success", Some(&format!("{} -> {}: {}", from_id, to_id, amount))),
other => panic!("Failed to create transfer: {other:?}"),
}
match conn
.execute(
&cx,
"UPDATE accounts SET balance = balance - ?1 WHERE id = ?2",
&[SqliteValue::Real(*amount), SqliteValue::Integer(*from_id)],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to debit account: {other:?}"),
}
match conn
.execute(
&cx,
"UPDATE accounts SET balance = balance + ?1 WHERE id = ?2",
&[SqliteValue::Real(*amount), SqliteValue::Integer(*to_id)],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("Failed to credit account: {other:?}"),
}
}
}
log.phase("integrity_verification");
let final_balances = match conn
.query(
&cx,
"SELECT id, name, balance FROM accounts ORDER BY id",
&[],
)
.await
{
Outcome::Ok(rows) => rows,
other => panic!("Failed to get final balances: {other:?}"),
};
for row in &final_balances {
let id = row.get_idx(0).unwrap().as_integer().unwrap();
let name = row.get_idx(1).unwrap().as_text().unwrap();
let balance = row.get_idx(2).unwrap().as_real().unwrap();
log.sqlite_operation(
"final_balance",
"verified",
Some(&format!("{} ({}): {}", name, id, balance)),
);
}
let transfer_count = match conn
.query(
&cx,
"SELECT COUNT(*) FROM transfers WHERE status = 'completed'",
&[],
)
.await
{
Outcome::Ok(rows) => rows[0].get_idx(0).unwrap().as_integer().unwrap(),
other => panic!("Failed to count transfers: {other:?}"),
};
assert!(transfer_count > 0, "Should have completed transfers");
log.sqlite_operation(
"transfer_verification",
"success",
Some(&format!("{} completed transfers", transfer_count)),
);
log.phase("rollback_cleanup");
match conn.execute_unchecked(&cx, "ROLLBACK", &[]).await {
Outcome::Ok(_) => log.sqlite_operation("rollback", "success", None),
other => panic!("Failed to rollback: {other:?}"),
}
conn.close().unwrap();
log.test_end("pass");
});
}
}
mod sqlite_prepared_statement_reset_audit {
use super::*;
#[test]
fn audit_rusqlite_automatic_statement_reset() {
init_test_logging();
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE reset_test (id INTEGER PRIMARY KEY, value TEXT);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
for i in 1..=5 {
let value = format!("test-value-{i}");
match conn
.execute(
&cx,
"INSERT INTO reset_test (value) VALUES (?1)",
&[SqliteValue::Text(value)],
)
.await
{
Outcome::Ok(rows) => {
crate::assert_with_log!(
rows == 1,
"INSERT should affect exactly 1 row",
1,
rows
);
}
other => panic!("insert {i} failed: {other:?}"),
}
}
for i in 1..=5 {
let expected_value = format!("test-value-{i}");
match conn
.query(
&cx,
"SELECT value FROM reset_test WHERE id = ?1",
&[SqliteValue::Integer(i)],
)
.await
{
Outcome::Ok(rows) => {
crate::assert_with_log!(
rows.len() == 1,
"Query should return exactly 1 row",
1,
rows.len()
);
let actual_value = rows[0].get_str("value").unwrap();
crate::assert_with_log!(
actual_value == expected_value,
"Query result should match inserted value",
&expected_value,
actual_value
);
}
other => panic!("query {i} failed: {other:?}"),
}
}
eprintln!(
"{{\"audit\":\"SQLITE_RESET_SEMANTICS\",\"status\":\"SOUND\",\"requirement\":\"automatic statement reset via rusqlite APIs\"}}"
);
crate::test_complete!("audit_rusqlite_automatic_statement_reset");
});
}
#[test]
fn audit_prepare_cached_statement_reuse() {
init_test_logging();
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE cached_test (id INTEGER PRIMARY KEY, data TEXT);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
{
let guard = conn.inner.lock();
let raw_conn = guard.get().expect("connection should be open");
raw_conn.set_prepared_statement_cache_capacity(2);
}
match conn
.execute(&cx, "INSERT INTO cached_test (data) VALUES ('first')", &[])
.await
{
Outcome::Ok(_) => {}
other => panic!("insert first failed: {other:?}"),
}
const QUERY_SQL: &str = "SELECT data FROM cached_test WHERE id = ?1";
match conn.query(&cx, QUERY_SQL, &[SqliteValue::Integer(1)]).await {
Outcome::Ok(rows) => {
crate::assert_with_log!(
rows.len() == 1 && rows[0].get_str("data").unwrap() == "first",
"First query execution should return 'first'",
"first",
rows[0].get_str("data").unwrap()
);
}
other => panic!("first query failed: {other:?}"),
}
match conn.query(&cx, QUERY_SQL, &[SqliteValue::Integer(1)]).await {
Outcome::Ok(rows) => {
crate::assert_with_log!(
rows.len() == 1 && rows[0].get_str("data").unwrap() == "first",
"Second query execution should return same result",
"first",
rows[0].get_str("data").unwrap()
);
}
other => panic!("second query failed: {other:?}"),
}
match conn
.execute(&cx, "INSERT INTO cached_test (data) VALUES ('second')", &[])
.await
{
Outcome::Ok(_) => {}
other => panic!("insert second failed: {other:?}"),
}
match conn.query(&cx, QUERY_SQL, &[SqliteValue::Integer(2)]).await {
Outcome::Ok(rows) => {
crate::assert_with_log!(
rows.len() == 1 && rows[0].get_str("data").unwrap() == "second",
"Cached statement with new parameter should return correct result",
"second",
rows[0].get_str("data").unwrap()
);
}
other => panic!("parameter change query failed: {other:?}"),
}
eprintln!(
"{{\"audit\":\"STATEMENT_CACHE_RESET\",\"status\":\"SOUND\",\"requirement\":\"cached statement reset between executions\"}}"
);
crate::test_complete!("audit_prepare_cached_statement_reuse");
});
}
#[test]
fn audit_query_iterator_reset_on_drop() {
init_test_logging();
let cx = create_test_cx();
block_on(async {
let conn = match SqliteConnection::open_in_memory(&cx).await {
Outcome::Ok(conn) => conn,
other => panic!("open_in_memory failed: {other:?}"),
};
match conn
.execute_batch(
&cx,
"CREATE TABLE iterator_test (id INTEGER PRIMARY KEY, value INTEGER);",
)
.await
{
Outcome::Ok(()) => {}
other => panic!("create table failed: {other:?}"),
}
for i in 1..=10 {
match conn
.execute(
&cx,
"INSERT INTO iterator_test (value) VALUES (?1)",
&[SqliteValue::Integer(i * 10)],
)
.await
{
Outcome::Ok(_) => {}
other => panic!("insert {i} failed: {other:?}"),
}
}
let query_sql = "SELECT COUNT(*) as count FROM iterator_test WHERE value > ?1";
let count_gt_0 = match conn
.query_row(&cx, query_sql, &[SqliteValue::Integer(0)])
.await
{
Outcome::Ok(Some(row)) => row.get_i64("count").unwrap(),
other => panic!("count_gt_0 query failed: {other:?}"),
};
let count_gt_50 = match conn
.query_row(&cx, query_sql, &[SqliteValue::Integer(50)])
.await
{
Outcome::Ok(Some(row)) => row.get_i64("count").unwrap(),
other => panic!("count_gt_50 query failed: {other:?}"),
};
let count_gt_100 = match conn
.query_row(&cx, query_sql, &[SqliteValue::Integer(100)])
.await
{
Outcome::Ok(Some(row)) => row.get_i64("count").unwrap(),
other => panic!("count_gt_100 query failed: {other:?}"),
};
crate::assert_with_log!(
count_gt_0 == 10 && count_gt_50 == 5 && count_gt_100 == 0,
"Statement reset between queries should produce correct results",
(10, 5, 0),
(count_gt_0, count_gt_50, count_gt_100)
);
eprintln!(
"{{\"audit\":\"ITERATOR_DROP_RESET\",\"status\":\"SOUND\",\"requirement\":\"statement reset on Rows drop\"}}"
);
crate::test_complete!("audit_query_iterator_reset_on_drop");
});
}
#[test]
fn audit_sqlite_query_result_streaming_memory_usage() {
eprintln!(
"{{\"defect\":\"SQLITE_QUERY_RESULT_STREAMING\",\"severity\":\"CRITICAL\",\"impact\":\"OOM risk\",\"violation\":\"sqlite3_step streaming\",\"architecture\":\"blocking_pool\",\"complexity\":\"HIGH\"}}"
);
eprintln!(
"{{\"recommendation\":\"FILE_BEAD\",\"reason\":\"30min_deadline_insufficient\",\"estimated_effort\":\"2-4_hours\",\"same_pattern_as\":\"MySQL/PostgreSQL but blocking_pool_architecture\"}}"
);
}
}
}