use rustc_hash::FxHashMap;
use std::sync::{Arc, Mutex, RwLock};
use crate::core::{DataType, Error, IsolationLevel, Result, Value};
use crate::executor::context::ExecutionContextBuilder;
use crate::executor::{CachedPlanRef, ExecutionContext, Executor};
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;
use crate::storage::{Config, SyncMode};
use super::params::{NamedParams, Params};
use super::rows::{FromRow, Rows};
use super::statement::Statement;
use super::transaction::Transaction;
pub const MEMORY_SCHEME: &str = "memory";
pub const FILE_SCHEME: &str = "file";
static DATABASE_REGISTRY: std::sync::LazyLock<RwLock<FxHashMap<String, Arc<DatabaseInner>>>> =
std::sync::LazyLock::new(|| RwLock::new(FxHashMap::default()));
pub(crate) struct DatabaseInner {
engine: Arc<MVCCEngine>,
executor: Mutex<Executor>,
dsn: String,
owns_engine: bool,
#[cfg(feature = "test-filedb")]
_temp_dir: Option<tempfile::TempDir>,
}
pub(crate) type DatabaseInnerHandle = DatabaseInner;
impl Drop for DatabaseInner {
fn drop(&mut self) {
crate::executor::clear_all_thread_local_caches();
if self.owns_engine {
let _ = self.engine.close_engine();
}
}
}
pub struct Database {
inner: Arc<DatabaseInner>,
}
#[cfg(feature = "ffi")]
impl Database {
pub(crate) fn keepalive(&self) -> Arc<DatabaseInner> {
Arc::clone(&self.inner)
}
pub(crate) fn inner_arc(&self) -> &Arc<DatabaseInner> {
&self.inner
}
pub(crate) fn try_unregister_arc(inner: &Arc<DatabaseInner>) {
if let Ok(mut registry) = DATABASE_REGISTRY.write() {
if let Some(entry) = registry.get(&inner.dsn) {
if Arc::ptr_eq(entry, inner) && Arc::strong_count(inner) == 2 {
registry.remove(&inner.dsn);
}
}
}
}
}
impl Clone for Database {
fn clone(&self) -> Self {
let engine = Arc::clone(&self.inner.engine);
let executor = crate::executor::Executor::new(Arc::clone(&engine));
let inner = Arc::new(DatabaseInner {
engine,
executor: Mutex::new(executor),
dsn: self.inner.dsn.clone(),
owns_engine: false,
#[cfg(feature = "test-filedb")]
_temp_dir: None, });
Database { inner }
}
}
impl Drop for Database {
fn drop(&mut self) {
if let Ok(mut registry) = DATABASE_REGISTRY.write() {
if let Some(entry) = registry.get(&self.inner.dsn) {
if Arc::ptr_eq(entry, &self.inner) && Arc::strong_count(&self.inner) == 2 {
registry.remove(&self.inner.dsn);
}
}
}
}
}
impl Database {
pub fn open(dsn: &str) -> Result<Self> {
{
let registry = DATABASE_REGISTRY
.read()
.map_err(|_| Error::LockAcquisitionFailed("registry read".to_string()))?;
if let Some(inner) = registry.get(dsn) {
return Ok(Database {
inner: Arc::clone(inner),
});
}
}
let mut registry = DATABASE_REGISTRY
.write()
.map_err(|_| Error::LockAcquisitionFailed("registry write".to_string()))?;
if let Some(inner) = registry.get(dsn) {
return Ok(Database {
inner: Arc::clone(inner),
});
}
let (scheme, path) = Self::parse_dsn(dsn)?;
#[cfg(feature = "test-filedb")]
let mut _temp_dir_holder: Option<tempfile::TempDir> = None;
let engine = match scheme.as_str() {
MEMORY_SCHEME => {
#[cfg(feature = "test-filedb")]
{
let tmp = tempfile::tempdir().map_err(|e| {
Error::internal(format!("failed to create temp dir: {}", e))
})?;
let file_dsn = format!("file://{}", tmp.path().display());
let (_clean_path, config) = Self::parse_file_config(&file_dsn[7..])?;
let engine = MVCCEngine::new(config);
engine.open_engine()?;
let engine = Arc::new(engine);
engine.start_cleanup();
_temp_dir_holder = Some(tmp);
engine
}
#[cfg(not(feature = "test-filedb"))]
{
let engine = MVCCEngine::in_memory();
engine.open_engine()?;
let engine = Arc::new(engine);
engine.start_cleanup();
engine
}
}
FILE_SCHEME => {
let (_clean_path, config) = Self::parse_file_config(&path)?;
let engine = MVCCEngine::new(config);
engine.open_engine()?;
let engine = Arc::new(engine);
engine.start_cleanup();
engine
}
_ => {
return Err(Error::parse(format!(
"Unsupported scheme '{}'. Use 'memory://' or 'file://path'",
scheme
)));
}
};
let executor = Executor::new(Arc::clone(&engine));
let inner = Arc::new(DatabaseInner {
engine,
executor: Mutex::new(executor),
dsn: dsn.to_string(),
owns_engine: true,
#[cfg(feature = "test-filedb")]
_temp_dir: _temp_dir_holder,
});
registry.insert(dsn.to_string(), Arc::clone(&inner));
Ok(Database { inner })
}
pub fn open_in_memory() -> Result<Self> {
Self::create_in_memory_engine()
}
#[cfg(feature = "test-filedb")]
fn create_in_memory_engine() -> Result<Self> {
let tmp = tempfile::tempdir()
.map_err(|e| Error::internal(format!("failed to create temp dir: {}", e)))?;
let file_dsn = format!("file://{}", tmp.path().display());
let (_clean_path, config) = Self::parse_file_config(&file_dsn[7..])?;
let engine = MVCCEngine::new(config);
engine.open_engine()?;
let engine = Arc::new(engine);
engine.start_cleanup();
let executor = Executor::new(Arc::clone(&engine));
let inner = Arc::new(DatabaseInner {
engine,
executor: Mutex::new(executor),
dsn: "memory://".to_string(),
owns_engine: true,
_temp_dir: Some(tmp),
});
Ok(Database { inner })
}
#[cfg(not(feature = "test-filedb"))]
fn create_in_memory_engine() -> Result<Self> {
let engine = MVCCEngine::in_memory();
engine.open_engine()?;
let engine = Arc::new(engine);
engine.start_cleanup();
let executor = Executor::new(Arc::clone(&engine));
let inner = Arc::new(DatabaseInner {
engine,
executor: Mutex::new(executor),
dsn: "memory://".to_string(),
owns_engine: true,
});
Ok(Database { inner })
}
fn parse_dsn(dsn: &str) -> Result<(String, String)> {
let idx = dsn
.find("://")
.ok_or_else(|| Error::parse("Invalid DSN format: expected scheme://path"))?;
let scheme = dsn[..idx].to_lowercase();
let path = dsn[idx + 3..].to_string();
match scheme.as_str() {
MEMORY_SCHEME | FILE_SCHEME => {}
_ => {
return Err(Error::parse(format!(
"Unsupported scheme '{}'. Use 'memory://' or 'file://path'",
scheme
)));
}
}
if scheme == FILE_SCHEME {
let clean_path = if path.contains('?') {
&path[..path.find('?').unwrap()]
} else {
&path
};
if clean_path.is_empty() {
return Err(Error::parse("file:// scheme requires a non-empty path"));
}
}
Ok((scheme, path))
}
fn parse_file_config(path: &str) -> Result<(String, Config)> {
let (clean_path, query) = if let Some(idx) = path.find('?') {
(path[..idx].to_string(), Some(&path[idx + 1..]))
} else {
(path.to_string(), None)
};
let mut config = Config::with_path(&clean_path);
if let Some(query) = query {
for param in query.split('&') {
let mut parts = param.splitn(2, '=');
let key = parts.next().unwrap_or("");
let value = parts.next().unwrap_or("");
match key {
"sync_mode" | "sync" => {
config.persistence.sync_mode = match value.to_lowercase().as_str() {
"none" | "off" | "0" => SyncMode::None,
"normal" | "1" => SyncMode::Normal,
"full" | "2" => SyncMode::Full,
_ => SyncMode::Normal,
};
}
"checkpoint_interval" | "snapshot_interval" => {
config.persistence.checkpoint_interval =
value.parse::<u32>().map_err(|_| {
Error::invalid_argument(format!(
"invalid checkpoint_interval: '{}'",
value
))
})?;
}
"compact_threshold" => {
config.persistence.compact_threshold =
value.parse::<u32>().map_err(|_| {
Error::invalid_argument(format!(
"invalid compact_threshold: '{}'",
value
))
})?;
}
"keep_snapshots" => {
config.persistence.keep_snapshots = value.parse::<u32>().map_err(|_| {
Error::invalid_argument(format!("invalid keep_snapshots: '{}'", value))
})?;
}
"wal_flush_trigger" => {
config.persistence.wal_flush_trigger =
value.parse::<usize>().map_err(|_| {
Error::invalid_argument(format!(
"invalid wal_flush_trigger: '{}'",
value
))
})?;
}
"wal_buffer_size" => {
config.persistence.wal_buffer_size =
value.parse::<usize>().map_err(|_| {
Error::invalid_argument(format!(
"invalid wal_buffer_size: '{}'",
value
))
})?;
}
"wal_max_size" => {
config.persistence.wal_max_size = value.parse::<usize>().map_err(|_| {
Error::invalid_argument(format!("invalid wal_max_size: '{}'", value))
})?;
}
"commit_batch_size" => {
config.persistence.commit_batch_size =
value.parse::<u32>().map_err(|_| {
Error::invalid_argument(format!(
"invalid commit_batch_size: '{}'",
value
))
})?;
}
"sync_interval_ms" | "sync_interval" => {
config.persistence.sync_interval_ms =
value.parse::<u32>().map_err(|_| {
Error::invalid_argument(format!(
"invalid sync_interval_ms: '{}'",
value
))
})?;
}
"wal_compression" => {
config.persistence.wal_compression =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
}
"volume_compression" => {
config.persistence.volume_compression =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
}
"compression" | "snapshot_compression" => {
let enabled =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
config.persistence.wal_compression = enabled;
config.persistence.volume_compression = enabled;
}
"compression_threshold" => {
config.persistence.compression_threshold =
value.parse::<usize>().map_err(|_| {
Error::invalid_argument(format!(
"invalid compression_threshold: '{}'",
value
))
})?;
}
"target_volume_rows" => {
let rows = value.parse::<usize>().map_err(|_| {
Error::invalid_argument(format!(
"invalid target_volume_rows: '{}'",
value
))
})?;
config.persistence.target_volume_rows = rows.max(65_536);
}
"checkpoint_on_close" => {
config.persistence.checkpoint_on_close =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
}
"cleanup_interval" => {
config.cleanup.interval_secs = value.parse::<u64>().map_err(|_| {
Error::invalid_argument(format!(
"invalid cleanup_interval: '{}'",
value
))
})?;
}
"deleted_row_retention" => {
config.cleanup.deleted_row_retention_secs =
value.parse::<u64>().map_err(|_| {
Error::invalid_argument(format!(
"invalid deleted_row_retention: '{}'",
value
))
})?;
}
"transaction_retention" => {
config.cleanup.transaction_retention_secs =
value.parse::<u64>().map_err(|_| {
Error::invalid_argument(format!(
"invalid transaction_retention: '{}'",
value
))
})?;
}
"cleanup" => {
config.cleanup.enabled =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
}
_ => {} }
}
}
Ok((clean_path, config))
}
pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<i64> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let result = if param_values.is_empty() {
executor.execute(sql)?
} else if let Some(fast_result) = executor.try_fast_path_with_params(sql, ¶m_values) {
fast_result?
} else {
executor.execute_with_params(sql, param_values)?
};
Ok(result.rows_affected())
}
pub fn query<P: Params>(&self, sql: &str, params: P) -> Result<Rows> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let result = if param_values.is_empty() {
executor.execute(sql)?
} else if let Some(fast_result) = executor.try_fast_path_with_params(sql, ¶m_values) {
fast_result?
} else {
executor.execute_with_params(sql, param_values)?
};
Ok(Rows::new(result))
}
pub fn query_one<T: FromValue, P: Params>(&self, sql: &str, params: P) -> Result<T> {
let row = self
.query(sql, params)?
.next()
.ok_or(Error::NoRowsReturned)??;
row.get(0)
}
pub fn query_opt<T: FromValue, P: Params>(&self, sql: &str, params: P) -> Result<Option<T>> {
match self.query(sql, params)?.next() {
Some(row) => Ok(Some(row?.get(0)?)),
None => Ok(None),
}
}
pub fn execute_with_timeout<P: Params>(
&self,
sql: &str,
params: P,
timeout_ms: u64,
) -> Result<i64> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let ctx = ExecutionContextBuilder::new()
.params(param_values)
.timeout_ms(timeout_ms)
.build();
let result = executor.execute_with_context(sql, &ctx)?;
Ok(result.rows_affected())
}
pub fn query_with_timeout<P: Params>(
&self,
sql: &str,
params: P,
timeout_ms: u64,
) -> Result<Rows> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let ctx = ExecutionContextBuilder::new()
.params(param_values)
.timeout_ms(timeout_ms)
.build();
let result = executor.execute_with_context(sql, &ctx)?;
Ok(Rows::new(result))
}
pub fn prepare(&self, sql: &str) -> Result<Statement> {
Statement::new(Arc::downgrade(&self.inner), sql.to_string(), self)
}
pub(crate) fn from_inner(inner: Arc<DatabaseInner>) -> Self {
Database { inner }
}
pub fn execute_named(&self, sql: &str, params: NamedParams) -> Result<i64> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let result = executor.execute_with_named_params(sql, params.into_inner())?;
Ok(result.rows_affected())
}
pub fn query_named(&self, sql: &str, params: NamedParams) -> Result<Rows> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let result = executor.execute_with_named_params(sql, params.into_inner())?;
Ok(Rows::new(result))
}
pub fn query_one_named<T: FromValue>(&self, sql: &str, params: NamedParams) -> Result<T> {
let mut rows = self.query_named(sql, params)?;
match rows.next() {
Some(Ok(row)) => row.get(0),
Some(Err(e)) => Err(e),
None => Err(Error::NoRowsReturned),
}
}
pub fn query_as<T: FromRow, P: Params>(&self, sql: &str, params: P) -> Result<Vec<T>> {
let rows = self.query(sql, params)?;
rows.map(|r| r.and_then(|row| T::from_row(&row))).collect()
}
pub fn query_as_named<T: FromRow>(&self, sql: &str, params: NamedParams) -> Result<Vec<T>> {
let rows = self.query_named(sql, params)?;
rows.map(|r| r.and_then(|row| T::from_row(&row))).collect()
}
pub fn begin(&self) -> Result<Transaction> {
self.begin_with_isolation(IsolationLevel::ReadCommitted)
}
pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> Result<Transaction> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let tx = executor.begin_transaction_with_isolation(isolation)?;
let engine = executor.engine().clone();
Ok(Transaction::new(tx, engine))
}
pub fn engine(&self) -> &Arc<MVCCEngine> {
&self.inner.engine
}
pub fn close(&self) -> Result<()> {
let mut registry = DATABASE_REGISTRY
.write()
.map_err(|_| Error::LockAcquisitionFailed("registry write".to_string()))?;
registry.remove(&self.inner.dsn);
self.inner.engine.close_engine()?;
Ok(())
}
pub fn cached_plan(&self, sql: &str) -> Result<CachedPlanRef> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
executor.get_or_create_plan(sql)
}
pub fn execute_plan<P: Params>(&self, plan: &CachedPlanRef, params: P) -> Result<i64> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let ctx = if param_values.is_empty() {
ExecutionContext::new()
} else {
ExecutionContext::with_params(param_values)
};
let result = executor.execute_with_cached_plan(plan, &ctx)?;
Ok(result.rows_affected())
}
pub fn query_plan<P: Params>(&self, plan: &CachedPlanRef, params: P) -> Result<Rows> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let ctx = if param_values.is_empty() {
ExecutionContext::new()
} else {
ExecutionContext::with_params(param_values)
};
let result = executor.execute_with_cached_plan(plan, &ctx)?;
Ok(Rows::new(result))
}
pub fn execute_named_plan(&self, plan: &CachedPlanRef, params: NamedParams) -> Result<i64> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let ctx = ExecutionContext::with_named_params(params.into_inner());
let result = executor.execute_with_cached_plan(plan, &ctx)?;
Ok(result.rows_affected())
}
pub fn query_named_plan(&self, plan: &CachedPlanRef, params: NamedParams) -> Result<Rows> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let ctx = ExecutionContext::with_named_params(params.into_inner());
let result = executor.execute_with_cached_plan(plan, &ctx)?;
Ok(Rows::new(result))
}
pub fn table_exists(&self, name: &str) -> Result<bool> {
let engine = &self.inner.engine;
let tx = engine.begin_transaction()?;
Ok(tx.get_table(name).is_ok())
}
pub fn dsn(&self) -> &str {
&self.inner.dsn
}
pub fn set_default_isolation_level(&self, level: IsolationLevel) -> Result<()> {
let mut executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
executor.set_default_isolation_level(level);
Ok(())
}
pub fn create_snapshot(&self) -> Result<()> {
use crate::storage::Engine;
self.inner.engine.create_snapshot()
}
pub fn restore_snapshot(&self, timestamp: Option<&str>) -> Result<String> {
use crate::storage::Engine;
let result = self.inner.engine.restore_snapshot(timestamp)?;
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
executor.clear_semantic_cache();
crate::executor::context::clear_scalar_subquery_cache();
crate::executor::context::clear_in_subquery_cache();
crate::executor::context::clear_semi_join_cache();
Ok(result)
}
pub(crate) fn executor(&self) -> &Mutex<Executor> {
&self.inner.executor
}
pub fn semantic_cache_stats(&self) -> Result<crate::executor::SemanticCacheStatsSnapshot> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
Ok(executor.semantic_cache_stats())
}
pub fn clear_semantic_cache(&self) -> Result<()> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
executor.clear_semantic_cache();
Ok(())
}
pub fn oldest_loaded_snapshot_timestamp(&self) -> Option<String> {
self.inner.engine.oldest_loaded_snapshot_timestamp()
}
}
pub trait FromValue: Sized {
fn from_value(value: &Value) -> Result<Self>;
}
impl FromValue for i64 {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Integer(i) => Ok(*i),
Value::Float(f) => Ok(*f as i64),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Integer".to_string(),
}),
}
}
}
impl FromValue for i32 {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Integer(i) => Ok(*i as i32),
Value::Float(f) => Ok(*f as i32),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Integer".to_string(),
}),
}
}
}
impl FromValue for f64 {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Float(f) => Ok(*f),
Value::Integer(i) => Ok(*i as f64),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Float".to_string(),
}),
}
}
}
impl FromValue for String {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Text(s) => Ok(s.to_string()),
Value::Extension(data) if data.first() == Some(&(DataType::Json as u8)) => {
Ok(std::str::from_utf8(&data[1..]).unwrap_or("").to_string())
}
Value::Integer(i) => Ok(i.to_string()),
Value::Float(f) => Ok(f.to_string()),
Value::Boolean(b) => Ok(if *b {
"true".to_string()
} else {
"false".to_string()
}),
Value::Timestamp(ts) => Ok(ts.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
Value::Extension(_) => value
.as_string()
.ok_or_else(|| Error::invalid_argument("Cannot convert extension to String")),
Value::Null(_) => Ok(String::new()),
}
}
}
impl FromValue for bool {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Boolean(b) => Ok(*b),
Value::Integer(i) => Ok(*i != 0),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Boolean".to_string(),
}),
}
}
}
impl FromValue for Value {
fn from_value(value: &Value) -> Result<Self> {
Ok(value.clone())
}
}
impl<T: FromValue> FromValue for Option<T> {
fn from_value(value: &Value) -> Result<Self> {
if value.is_null() {
Ok(None)
} else {
Ok(Some(T::from_value(value)?))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::named_params;
#[test]
fn test_open_memory() {
let db = Database::open("memory://").unwrap();
assert_eq!(db.dsn(), "memory://");
}
#[test]
fn test_open_in_memory() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute("INSERT INTO test VALUES ($1)", (1,)).unwrap();
for row in db.query("SELECT * FROM test", ()).unwrap() {
let row = row.unwrap();
let id: i64 = row.get(0).unwrap();
assert_eq!(id, 1);
}
}
#[test]
fn test_execute_and_query_new_api() {
let db = Database::open_in_memory().unwrap();
db.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
(),
)
.unwrap();
let affected = db
.execute(
"INSERT INTO users VALUES ($1, $2, $3), ($4, $5, $6)",
(1, "Alice", 30, 2, "Bob", 25),
)
.unwrap();
assert_eq!(affected, 2);
let rows: Vec<_> = db
.query("SELECT * FROM users ORDER BY id", ())
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
assert_eq!(rows[0].get::<String>(1).unwrap(), "Alice");
assert_eq!(rows[0].get::<i64>(2).unwrap(), 30);
}
#[test]
fn test_query_one() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute("INSERT INTO test VALUES ($1), ($2), ($3)", (1, 2, 3))
.unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test", ()).unwrap();
assert_eq!(count, 3);
}
#[test]
fn test_query_opt() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute("INSERT INTO test VALUES ($1)", (1,)).unwrap();
let result: Option<i64> = db
.query_opt("SELECT id FROM test WHERE id = $1", (1,))
.unwrap();
assert_eq!(result, Some(1));
let result: Option<i64> = db
.query_opt("SELECT id FROM test WHERE id = $1", (999,))
.unwrap();
assert_eq!(result, None);
}
#[test]
fn test_params_macro() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)", ())
.unwrap();
db.execute(
"INSERT INTO users VALUES ($1, $2)",
crate::params![1, "Alice"],
)
.unwrap();
let names: Vec<String> = db
.query("SELECT name FROM users WHERE id = $1", crate::params![1])
.unwrap()
.map(|r| r.and_then(|row| row.get(0)))
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(names, vec!["Alice"]);
}
#[test]
fn test_parse_dsn() {
let (scheme, path) = Database::parse_dsn("memory://").unwrap();
assert_eq!(scheme, "memory");
assert_eq!(path, "");
let (scheme, path) = Database::parse_dsn("file:///tmp/test.db").unwrap();
assert_eq!(scheme, "file");
assert_eq!(path, "/tmp/test.db");
let (scheme, path) = Database::parse_dsn("file:///tmp/test.db?sync=full").unwrap();
assert_eq!(scheme, "file");
assert_eq!(path, "/tmp/test.db?sync=full");
assert!(Database::parse_dsn("invalid").is_err());
assert!(Database::parse_dsn("unknown://test").is_err());
}
#[test]
fn test_from_value_types() {
assert_eq!(i64::from_value(&Value::Integer(42)).unwrap(), 42);
assert_eq!(f64::from_value(&Value::Float(3.5)).unwrap(), 3.5);
assert_eq!(
String::from_value(&Value::Text("hello".into())).unwrap(),
"hello"
);
assert!(bool::from_value(&Value::Boolean(true)).unwrap());
assert_eq!(
Option::<i64>::from_value(&Value::Integer(42)).unwrap(),
Some(42)
);
assert_eq!(
Option::<i64>::from_value(&Value::null_unknown()).unwrap(),
None
);
}
#[test]
fn test_cached_plan_insert_and_query() {
let db = Database::open_in_memory().unwrap();
db.execute(
"CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT, score FLOAT)",
(),
)
.unwrap();
let insert_plan = db
.cached_plan("INSERT INTO test VALUES ($1, $2, $3)")
.unwrap();
db.execute_plan(&insert_plan, (1, "Alice", 95.5)).unwrap();
db.execute_plan(&insert_plan, (2, "Bob", 82.0)).unwrap();
db.execute_plan(&insert_plan, (3, "Charlie", 91.0)).unwrap();
let query_plan = db
.cached_plan("SELECT name FROM test WHERE id = $1")
.unwrap();
let mut rows = db.query_plan(&query_plan, (2,)).unwrap();
let row = rows.next().unwrap().unwrap();
assert_eq!(row.get::<String>(0).unwrap(), "Bob");
}
#[test]
fn test_cached_plan_reuse() {
let db = Database::open_in_memory().unwrap();
db.execute(
"CREATE TABLE test (id INTEGER PRIMARY KEY, value INTEGER)",
(),
)
.unwrap();
let plan1 = db.cached_plan("INSERT INTO test VALUES ($1, $2)").unwrap();
let plan2 = db.cached_plan("INSERT INTO test VALUES ($1, $2)").unwrap();
db.execute_plan(&plan1, (1, 100)).unwrap();
db.execute_plan(&plan2, (2, 200)).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test", ()).unwrap();
assert_eq!(count, 2);
}
#[test]
fn test_cached_plan_update_delete() {
let db = Database::open_in_memory().unwrap();
db.execute(
"CREATE TABLE test (id INTEGER PRIMARY KEY, value INTEGER)",
(),
)
.unwrap();
db.execute("INSERT INTO test VALUES (1, 100)", ()).unwrap();
db.execute("INSERT INTO test VALUES (2, 200)", ()).unwrap();
let update_plan = db
.cached_plan("UPDATE test SET value = $1 WHERE id = $2")
.unwrap();
let affected = db.execute_plan(&update_plan, (999, 1)).unwrap();
assert_eq!(affected, 1);
let val: i64 = db
.query_one("SELECT value FROM test WHERE id = 1", ())
.unwrap();
assert_eq!(val, 999);
let delete_plan = db.cached_plan("DELETE FROM test WHERE id = $1").unwrap();
let affected = db.execute_plan(&delete_plan, (2,)).unwrap();
assert_eq!(affected, 1);
let count: i64 = db.query_one("SELECT COUNT(*) FROM test", ()).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_cached_plan_no_params() {
let db = Database::open_in_memory().unwrap();
db.execute(
"CREATE TABLE test (id INTEGER PRIMARY KEY, value INTEGER)",
(),
)
.unwrap();
db.execute("INSERT INTO test VALUES (1, 10)", ()).unwrap();
db.execute("INSERT INTO test VALUES (2, 20)", ()).unwrap();
let plan = db.cached_plan("SELECT COUNT(*) FROM test").unwrap();
let mut rows = db.query_plan(&plan, ()).unwrap();
let row = rows.next().unwrap().unwrap();
assert_eq!(row.get::<i64>(0).unwrap(), 2);
}
#[test]
fn test_cached_plan_named_params() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)", ())
.unwrap();
let plan = db
.cached_plan("INSERT INTO test VALUES (:id, :name)")
.unwrap();
db.execute_named_plan(&plan, named_params! { id: 1, name: "Alice" })
.unwrap();
db.execute_named_plan(&plan, named_params! { id: 2, name: "Bob" })
.unwrap();
let query_plan = db
.cached_plan("SELECT name FROM test WHERE id = :id")
.unwrap();
let mut rows = db
.query_named_plan(&query_plan, named_params! { id: 1 })
.unwrap();
let row = rows.next().unwrap().unwrap();
assert_eq!(row.get::<String>(0).unwrap(), "Alice");
}
#[test]
fn test_cached_plan_multi_statement_error() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
let result = db.cached_plan("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2)");
assert!(result.is_err());
}
}