use std::sync::Arc;
use crate::error::{BsqlError, BsqlResult};
pub struct SqlitePool {
inner: Arc<bsql_driver_sqlite::pool::SqlitePool>,
}
pub struct SqlitePoolBuilder {
path: Option<String>,
reader_count: usize,
}
impl SqlitePoolBuilder {
pub fn path(mut self, path: &str) -> Self {
self.path = Some(path.to_owned());
self
}
pub fn reader_count(mut self, count: usize) -> Self {
self.reader_count = count;
self
}
pub fn build(self) -> BsqlResult<SqlitePool> {
let path = self.path.ok_or_else(|| {
BsqlError::Connect(crate::error::ConnectError {
message: "SQLite pool builder requires a path".into(),
source: None,
})
})?;
let inner = bsql_driver_sqlite::pool::SqlitePool::builder()
.path(&path)
.reader_count(self.reader_count)
.build()
.map_err(BsqlError::from_sqlite)?;
Ok(SqlitePool {
inner: Arc::new(inner),
})
}
}
impl SqlitePool {
#[doc(hidden)]
#[inline]
pub fn __inner(&self) -> &bsql_driver_sqlite::pool::SqlitePool {
&self.inner
}
pub fn connect(path: &str) -> BsqlResult<Self> {
let inner =
bsql_driver_sqlite::pool::SqlitePool::connect(path).map_err(BsqlError::from_sqlite)?;
Ok(SqlitePool {
inner: Arc::new(inner),
})
}
pub fn open(path: &str) -> BsqlResult<Self> {
Self::connect(path)
}
pub fn builder() -> SqlitePoolBuilder {
SqlitePoolBuilder {
path: None,
reader_count: 4,
}
}
pub fn query_readonly(
&self,
sql: &str,
sql_hash: u64,
params: smallvec::SmallVec<[bsql_driver_sqlite::pool::ParamValue; 8]>,
) -> BsqlResult<(bsql_driver_sqlite::conn::QueryResult, bsql_arena::Arena)> {
self.inner
.query_readonly(sql, sql_hash, params)
.map_err(BsqlError::from_sqlite)
}
pub fn query_readwrite(
&self,
sql: &str,
sql_hash: u64,
params: smallvec::SmallVec<[bsql_driver_sqlite::pool::ParamValue; 8]>,
) -> BsqlResult<(bsql_driver_sqlite::conn::QueryResult, bsql_arena::Arena)> {
self.inner
.query_readwrite(sql, sql_hash, params)
.map_err(BsqlError::from_sqlite)
}
pub fn execute_sql(
&self,
sql: &str,
sql_hash: u64,
params: smallvec::SmallVec<[bsql_driver_sqlite::pool::ParamValue; 8]>,
) -> BsqlResult<u64> {
self.inner
.execute(sql, sql_hash, params)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn fetch_one_direct<F, T>(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
is_write: bool,
decode: F,
) -> BsqlResult<T>
where
F: FnOnce(
&bsql_driver_sqlite::ffi::StmtHandle,
) -> Result<T, bsql_driver_sqlite::SqliteError>,
{
self.inner
.fetch_one_direct(sql, sql_hash, params, is_write, decode)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn fetch_optional_direct<F, T>(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
is_write: bool,
decode: F,
) -> BsqlResult<Option<T>>
where
F: FnOnce(
&bsql_driver_sqlite::ffi::StmtHandle,
) -> Result<T, bsql_driver_sqlite::SqliteError>,
{
self.inner
.fetch_optional_direct(sql, sql_hash, params, is_write, decode)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn fetch_all_direct<F, T>(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
is_write: bool,
decode: F,
) -> BsqlResult<Vec<T>>
where
F: Fn(&bsql_driver_sqlite::ffi::StmtHandle) -> Result<T, bsql_driver_sqlite::SqliteError>,
{
self.inner
.fetch_all_direct(sql, sql_hash, params, is_write, decode)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn fetch_all_arena<F, T>(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
is_write: bool,
decode: F,
) -> BsqlResult<bsql_arena::ArenaRows<T>>
where
F: Fn(
&bsql_driver_sqlite::ffi::StmtHandle,
&mut bsql_arena::Arena,
) -> Result<T, bsql_driver_sqlite::SqliteError>,
{
self.inner
.fetch_all_arena(sql, sql_hash, params, is_write, decode)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn for_each<F>(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
is_write: bool,
f: F,
) -> BsqlResult<()>
where
F: FnMut(
&bsql_driver_sqlite::ffi::StmtHandle,
) -> Result<(), bsql_driver_sqlite::SqliteError>,
{
self.inner
.for_each(sql, sql_hash, params, is_write, f)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn for_each_collect<F, T>(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
is_write: bool,
f: F,
) -> BsqlResult<Vec<T>>
where
F: FnMut(
&bsql_driver_sqlite::ffi::StmtHandle,
) -> Result<T, bsql_driver_sqlite::SqliteError>,
{
self.inner
.for_each_collect(sql, sql_hash, params, is_write, f)
.map_err(BsqlError::from_sqlite)
}
#[inline]
pub fn execute_direct(
&self,
sql: &str,
sql_hash: u64,
params: &[&dyn bsql_driver_sqlite::codec::SqliteEncode],
) -> BsqlResult<u64> {
self.inner
.execute_direct(sql, sql_hash, params)
.map_err(BsqlError::from_sqlite)
}
pub fn execute_batch(
&self,
sql: &str,
sql_hash: u64,
param_sets: &[&[&dyn bsql_driver_sqlite::codec::SqliteEncode]],
) -> BsqlResult<u64> {
self.inner
.execute_batch(sql, sql_hash, param_sets)
.map_err(BsqlError::from_sqlite)
}
pub fn simple_exec(&self, sql: &str) -> BsqlResult<()> {
self.inner.simple_exec(sql).map_err(BsqlError::from_sqlite)
}
pub fn begin(&self) -> BsqlResult<SqliteTransaction> {
self.inner
.begin_transaction()
.map_err(BsqlError::from_sqlite)?;
Ok(SqliteTransaction {
pool: Arc::clone(&self.inner),
finished: false,
})
}
pub fn query_streaming(
&self,
sql: &str,
sql_hash: u64,
params: smallvec::SmallVec<[bsql_driver_sqlite::pool::ParamValue; 8]>,
chunk_size: usize,
) -> BsqlResult<SqliteStreamingQuery> {
let (first_result, first_arena, state, reader_idx) = self
.inner
.query_streaming(sql, sql_hash, params, chunk_size)
.map_err(BsqlError::from_sqlite)?;
Ok(SqliteStreamingQuery {
pool: Arc::clone(&self.inner),
state: Some(state),
current_result: Some(first_result),
current_arena: Some(first_arena),
position: 0,
reader_idx,
})
}
pub fn warmup(&self, sqls: &[&str]) {
self.inner.warmup(sqls);
}
pub fn reader_count(&self) -> usize {
self.inner.reader_count()
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn close(&self) {
self.inner.close();
}
}
pub struct SqliteTransaction {
pool: Arc<bsql_driver_sqlite::pool::SqlitePool>,
finished: bool,
}
impl SqliteTransaction {
pub fn commit(mut self) -> BsqlResult<()> {
self.finished = true;
self.pool
.commit_transaction()
.map_err(BsqlError::from_sqlite)
}
pub fn rollback(mut self) -> BsqlResult<()> {
self.finished = true;
self.pool
.rollback_transaction()
.map_err(BsqlError::from_sqlite)
}
pub fn savepoint(&self, name: &str) -> BsqlResult<()> {
validate_savepoint_name(name)?;
self.pool.savepoint(name).map_err(BsqlError::from_sqlite)
}
pub fn release_savepoint(&self, name: &str) -> BsqlResult<()> {
validate_savepoint_name(name)?;
self.pool
.release_savepoint(name)
.map_err(BsqlError::from_sqlite)
}
pub fn rollback_to(&self, name: &str) -> BsqlResult<()> {
validate_savepoint_name(name)?;
self.pool.rollback_to(name).map_err(BsqlError::from_sqlite)
}
pub fn execute_sql(
&self,
sql: &str,
sql_hash: u64,
params: smallvec::SmallVec<[bsql_driver_sqlite::pool::ParamValue; 8]>,
) -> BsqlResult<u64> {
self.pool
.execute(sql, sql_hash, params)
.map_err(BsqlError::from_sqlite)
}
pub fn execute_batch(
&self,
sql: &str,
sql_hash: u64,
param_sets: &[&[&dyn bsql_driver_sqlite::codec::SqliteEncode]],
) -> BsqlResult<u64> {
self.pool
.execute_batch(sql, sql_hash, param_sets)
.map_err(BsqlError::from_sqlite)
}
pub fn query_readwrite(
&self,
sql: &str,
sql_hash: u64,
params: smallvec::SmallVec<[bsql_driver_sqlite::pool::ParamValue; 8]>,
) -> BsqlResult<(bsql_driver_sqlite::conn::QueryResult, bsql_arena::Arena)> {
self.pool
.query_readwrite(sql, sql_hash, params)
.map_err(BsqlError::from_sqlite)
}
}
impl std::fmt::Debug for SqliteTransaction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteTransaction")
.field("finished", &self.finished)
.finish()
}
}
impl Drop for SqliteTransaction {
fn drop(&mut self) {
if !self.finished {
log::warn!(
"bsql: SqliteTransaction dropped without commit() or rollback() — \
rolling back automatically."
);
let _ = self.pool.rollback_transaction();
}
}
}
pub struct SqliteStreamingQuery {
pool: Arc<bsql_driver_sqlite::pool::SqlitePool>,
state: Option<bsql_driver_sqlite::pool::StreamingState>,
current_result: Option<bsql_driver_sqlite::conn::QueryResult>,
current_arena: Option<bsql_arena::Arena>,
position: usize,
reader_idx: usize,
}
impl SqliteStreamingQuery {
pub fn fetch_next_chunk(&mut self) -> BsqlResult<bool> {
let state = match self.state.take() {
Some(s) if !s.inner.finished => s,
Some(s) => {
self.state = Some(s);
return Ok(false);
}
None => return Ok(false),
};
let (result, arena, new_state) = self
.pool
.streaming_next(state, self.reader_idx)
.map_err(BsqlError::from_sqlite)?;
let has_rows = result.row_count > 0;
self.current_result = Some(result);
self.current_arena = Some(arena);
self.position = 0;
self.state = Some(new_state);
Ok(has_rows)
}
pub fn current(
&self,
) -> Option<(
&bsql_driver_sqlite::conn::QueryResult,
&bsql_arena::Arena,
usize,
)> {
match (&self.current_result, &self.current_arena) {
(Some(result), Some(arena)) if self.position < result.row_count => {
Some((result, arena, self.position))
}
_ => None,
}
}
pub fn advance(&mut self) {
self.position += 1;
}
pub fn has_current_row(&self) -> bool {
self.current_result
.as_ref()
.is_some_and(|r| self.position < r.row_count)
}
pub fn is_finished(&self) -> bool {
!self.has_current_row() && self.state.as_ref().is_none_or(|s| s.inner.finished)
}
}
impl Drop for SqliteStreamingQuery {
fn drop(&mut self) {
if let Some(state) = self.state.take() {
if !state.inner.finished {
self.pool.streaming_drop(state, self.reader_idx);
}
}
}
}
fn validate_savepoint_name(name: &str) -> BsqlResult<()> {
crate::util::validate_savepoint_name(name)
}
impl Clone for SqlitePool {
fn clone(&self) -> Self {
SqlitePool {
inner: Arc::clone(&self.inner),
}
}
}
impl std::fmt::Debug for SqlitePool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqlitePool")
.field("reader_count", &self.inner.reader_count())
.field("closed", &self.inner.is_closed())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_db_path() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir();
let pid = std::process::id();
format!("{}/bsql_test_sqlite_pool_{}_{}.db", dir.display(), pid, id)
}
#[test]
fn open_is_alias_for_connect() {
let path = temp_db_path();
let pool = SqlitePool::open(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
assert_eq!(pool.reader_count(), 4);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_commit() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(2)],
)
.unwrap();
tx.commit().unwrap();
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let (result, arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result.get_i64(0, 0, &arena), Some(1));
assert_eq!(result.get_i64(1, 0, &arena), Some(2));
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_rollback() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
tx.rollback().unwrap();
let sql = "SELECT id FROM t";
let hash = crate::rapid_hash_str(sql);
let (result, _arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 0);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_savepoint() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
tx.savepoint("sp1").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(2)],
)
.unwrap();
tx.rollback_to("sp1").unwrap();
tx.commit().unwrap();
let sql = "SELECT id FROM t";
let hash = crate::rapid_hash_str(sql);
let (result, arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get_i64(0, 0, &arena), Some(1));
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_drop_auto_rollback() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
{
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
drop(tx);
}
let sql = "SELECT id FROM t";
let hash = crate::rapid_hash_str(sql);
let (result, _arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 0);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn streaming_query() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
for i in 0..10 {
pool.simple_exec(&format!("INSERT INTO t VALUES ({i})"))
.unwrap();
}
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let mut stream = pool
.query_streaming(sql, hash, smallvec::SmallVec::new(), 3)
.unwrap();
assert!(stream.has_current_row());
assert!(!stream.is_finished());
let mut total = 0;
loop {
if stream.has_current_row() {
let (result, arena, pos) = stream.current().unwrap();
let _id = result.get_i64(pos, 0, arena);
stream.advance();
total += 1;
} else if !stream.is_finished() {
let fetched = stream.fetch_next_chunk().unwrap();
if !fetched {
break;
}
} else {
break;
}
}
assert_eq!(total, 10);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn savepoint_name_validation() {
assert!(validate_savepoint_name("sp1").is_ok());
assert!(validate_savepoint_name("_sp").is_ok());
assert!(validate_savepoint_name("my_savepoint_123").is_ok());
assert!(validate_savepoint_name("").is_err());
assert!(validate_savepoint_name("1sp").is_err());
assert!(validate_savepoint_name("sp-1").is_err());
assert!(validate_savepoint_name("sp 1").is_err());
let long = "a".repeat(64);
assert!(validate_savepoint_name(&long).is_err());
let max = "a".repeat(63);
assert!(validate_savepoint_name(&max).is_ok());
}
#[test]
fn builder_requires_path() {
let result = SqlitePool::builder().build();
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("path"), "error should mention path: {err}");
}
#[test]
fn builder_default_reader_count() {
let b = SqlitePool::builder();
assert_eq!(b.reader_count, 4);
}
#[test]
fn builder_custom_reader_count() {
let path = temp_db_path();
let pool = SqlitePool::builder()
.path(&path)
.reader_count(2)
.build()
.unwrap();
assert_eq!(pool.reader_count(), 2);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_debug() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
let dbg = format!("{pool:?}");
assert!(
dbg.contains("SqlitePool"),
"Debug should show SqlitePool: {dbg}"
);
assert!(
dbg.contains("reader_count"),
"Debug should show reader_count: {dbg}"
);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_transaction_debug() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
let tx = pool.begin().unwrap();
let dbg = format!("{tx:?}");
assert!(
dbg.contains("SqliteTransaction"),
"Debug should show SqliteTransaction: {dbg}"
);
assert!(
dbg.contains("finished"),
"Debug should show finished field: {dbg}"
);
tx.rollback().unwrap();
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_clone() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
let pool2 = pool.clone();
assert_eq!(pool.reader_count(), pool2.reader_count());
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_close_and_is_closed() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
assert!(!pool.is_closed());
pool.close();
assert!(pool.is_closed());
let _ = std::fs::remove_file(&path);
}
#[test]
fn execute_batch_multiple() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let sql = "INSERT INTO t VALUES (?1)";
let hash = crate::rapid_hash_str(sql);
let v1 = 1i64;
let v2 = 2i64;
let v3 = 3i64;
let params1: &[&dyn bsql_driver_sqlite::codec::SqliteEncode] = &[&v1];
let params2: &[&dyn bsql_driver_sqlite::codec::SqliteEncode] = &[&v2];
let params3: &[&dyn bsql_driver_sqlite::codec::SqliteEncode] = &[&v3];
let total = pool
.execute_batch(sql, hash, &[params1, params2, params3])
.unwrap();
assert_eq!(total, 3);
let select = "SELECT id FROM t ORDER BY id";
let select_hash = crate::rapid_hash_str(select);
let (result, _arena) = pool
.query_readonly(select, select_hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 3);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn execute_direct_insert() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let sql = "INSERT INTO t VALUES (?1)";
let hash = crate::rapid_hash_str(sql);
let v: i64 = 42;
let affected = pool
.execute_direct(
sql,
hash,
&[&v as &dyn bsql_driver_sqlite::codec::SqliteEncode],
)
.unwrap();
assert_eq!(affected, 1);
pool.close();
let _ = std::fs::remove_file(&path);
}
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
#[test]
fn sqlite_pool_is_send_and_sync() {
_assert_send::<SqlitePool>();
_assert_sync::<SqlitePool>();
}
#[test]
fn transaction_nested_savepoints_three_levels() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
tx.savepoint("sp1").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(2)],
)
.unwrap();
tx.savepoint("sp2").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(3)],
)
.unwrap();
tx.savepoint("sp3").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(4)],
)
.unwrap();
tx.rollback_to("sp2").unwrap();
tx.commit().unwrap();
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let (result, arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result.get_i64(0, 0, &arena), Some(1));
assert_eq!(result.get_i64(1, 0, &arena), Some(2));
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_savepoint_same_name_twice() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
tx.savepoint("sp1").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(2)],
)
.unwrap();
tx.savepoint("sp1").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(3)],
)
.unwrap();
tx.rollback_to("sp1").unwrap();
tx.commit().unwrap();
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let (result, arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result.get_i64(0, 0, &arena), Some(1));
assert_eq!(result.get_i64(1, 0, &arena), Some(2));
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_release_savepoint() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.savepoint("sp1").unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(1)],
)
.unwrap();
tx.release_savepoint("sp1").unwrap();
tx.commit().unwrap();
let sql = "SELECT id FROM t";
let hash = crate::rapid_hash_str(sql);
let (result, _arena) = pool
.query_readonly(sql, hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 1);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_warmup() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
pool.warmup(&["SELECT id FROM t"]);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_for_each() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
pool.simple_exec("INSERT INTO t VALUES (1)").unwrap();
pool.simple_exec("INSERT INTO t VALUES (2)").unwrap();
pool.simple_exec("INSERT INTO t VALUES (3)").unwrap();
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let mut ids = Vec::new();
pool.for_each(sql, hash, &[], false, |stmt| {
let id = stmt.column_int64(0);
ids.push(id);
Ok(())
})
.unwrap();
assert_eq!(ids, vec![1, 2, 3]);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_for_each_collect() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
pool.simple_exec("INSERT INTO t VALUES (10)").unwrap();
pool.simple_exec("INSERT INTO t VALUES (20)").unwrap();
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let ids: Vec<i64> = pool
.for_each_collect(sql, hash, &[], false, |stmt| Ok(stmt.column_int64(0)))
.unwrap();
assert_eq!(ids, vec![10, 20]);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_fetch_one_direct() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
pool.simple_exec("INSERT INTO t VALUES (42)").unwrap();
let sql = "SELECT id FROM t LIMIT 1";
let hash = crate::rapid_hash_str(sql);
let id: i64 = pool
.fetch_one_direct(sql, hash, &[], false, |stmt| Ok(stmt.column_int64(0)))
.unwrap();
assert_eq!(id, 42);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_fetch_optional_direct_some() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
pool.simple_exec("INSERT INTO t VALUES (7)").unwrap();
let sql = "SELECT id FROM t LIMIT 1";
let hash = crate::rapid_hash_str(sql);
let id: Option<i64> = pool
.fetch_optional_direct(sql, hash, &[], false, |stmt| Ok(stmt.column_int64(0)))
.unwrap();
assert_eq!(id, Some(7));
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_fetch_optional_direct_none() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let sql = "SELECT id FROM t LIMIT 1";
let hash = crate::rapid_hash_str(sql);
let id: Option<i64> = pool
.fetch_optional_direct(sql, hash, &[], false, |stmt| Ok(stmt.column_int64(0)))
.unwrap();
assert_eq!(id, None);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_fetch_all_direct() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
pool.simple_exec("INSERT INTO t VALUES (1)").unwrap();
pool.simple_exec("INSERT INTO t VALUES (2)").unwrap();
let sql = "SELECT id FROM t ORDER BY id";
let hash = crate::rapid_hash_str(sql);
let ids: Vec<i64> = pool
.fetch_all_direct(sql, hash, &[], false, |stmt| Ok(stmt.column_int64(0)))
.unwrap();
assert_eq!(ids, vec![1, 2]);
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn sqlite_pool_fetch_all_direct_empty() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let sql = "SELECT id FROM t";
let hash = crate::rapid_hash_str(sql);
let ids: Vec<i64> = pool
.fetch_all_direct(sql, hash, &[], false, |stmt| Ok(stmt.column_int64(0)))
.unwrap();
assert!(ids.is_empty());
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_read_own_writes() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
tx.execute_sql(
"INSERT INTO t VALUES (?1)",
crate::rapid_hash_str("INSERT INTO t VALUES (?1)"),
smallvec::smallvec![bsql_driver_sqlite::pool::ParamValue::Int(42)],
)
.unwrap();
let (result, arena) = tx
.query_readwrite(
"SELECT id FROM t",
crate::rapid_hash_str("SELECT id FROM t"),
smallvec::SmallVec::new(),
)
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get_i64(0, 0, &arena), Some(42));
tx.rollback().unwrap();
pool.close();
let _ = std::fs::remove_file(&path);
}
#[test]
fn transaction_execute_batch() {
let path = temp_db_path();
let pool = SqlitePool::connect(&path).unwrap();
pool.simple_exec("CREATE TABLE t (id INTEGER NOT NULL)")
.unwrap();
let tx = pool.begin().unwrap();
let sql = "INSERT INTO t VALUES (?1)";
let hash = crate::rapid_hash_str(sql);
let v1 = 1i64;
let v2 = 2i64;
let params1: &[&dyn bsql_driver_sqlite::codec::SqliteEncode] = &[&v1];
let params2: &[&dyn bsql_driver_sqlite::codec::SqliteEncode] = &[&v2];
let total = tx.execute_batch(sql, hash, &[params1, params2]).unwrap();
assert_eq!(total, 2);
tx.commit().unwrap();
let select = "SELECT id FROM t ORDER BY id";
let select_hash = crate::rapid_hash_str(select);
let (result, _) = pool
.query_readonly(select, select_hash, smallvec::SmallVec::new())
.unwrap();
assert_eq!(result.len(), 2);
pool.close();
let _ = std::fs::remove_file(&path);
}
}