use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use crate::core::{Error, IsolationLevel, Result, Value};
use crate::executor::context::ExecutionContextBuilder;
use crate::executor::scheduler::JobScheduler;
use crate::executor::Executor;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;
use crate::storage::{Config, SyncMode};
use super::params::{NamedParams, Params};
use super::rows::{FromRow, Rows};
use super::statement::Statement;
use super::transaction::Transaction;
pub const MEMORY_SCHEME: &str = "memory";
pub const FILE_SCHEME: &str = "file";
static DATABASE_REGISTRY: std::sync::LazyLock<
RwLock<HashMap<String, std::sync::Weak<DatabaseInner>>>,
> = std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
struct DatabaseInner {
engine: Arc<MVCCEngine>,
executor: Mutex<Executor>,
dsn: String,
scheduler_shutdown: Option<Arc<std::sync::atomic::AtomicBool>>,
}
impl Drop for DatabaseInner {
fn drop(&mut self) {
if let Some(shutdown) = &self.scheduler_shutdown {
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
}
let _ = self.engine.close_engine();
}
}
#[derive(Clone)]
pub struct Database {
inner: Arc<DatabaseInner>,
}
impl Database {
pub fn open(dsn: &str) -> Result<Self> {
tracing::info!("Opening database connection to: {}", dsn);
{
let registry = DATABASE_REGISTRY
.read()
.map_err(|_| Error::LockAcquisitionFailed("registry read".to_string()))?;
if let Some(weak_inner) = registry.get(dsn) {
if let Some(inner) = weak_inner.upgrade() {
return Ok(Database { inner });
}
}
}
let mut registry = DATABASE_REGISTRY
.write()
.map_err(|_| Error::LockAcquisitionFailed("registry write".to_string()))?;
if let Some(weak_inner) = registry.get(dsn) {
if let Some(inner) = weak_inner.upgrade() {
return Ok(Database { inner });
}
}
let (scheme, path) = Self::parse_dsn(dsn)?;
let engine = match scheme.as_str() {
MEMORY_SCHEME => {
let engine = MVCCEngine::in_memory();
engine.open_engine()?;
Arc::new(engine)
}
FILE_SCHEME => {
let (_clean_path, config) = Self::parse_file_config(&path)?;
let engine = MVCCEngine::new(config);
engine.open_engine()?;
Arc::new(engine)
}
_ => {
return Err(Error::parse(format!(
"Unsupported scheme '{}'. Use 'memory://' or 'file://path'",
scheme
)));
}
};
let executor = Executor::new(Arc::clone(&engine));
let mut scheduler_shutdown = None;
if scheme != MEMORY_SCHEME || dsn.contains("scheduler=true") {
let executor_for_scheduler = Executor::new_internal(
Arc::clone(&engine),
Arc::clone(crate::functions::global_registry()),
);
scheduler_shutdown = Some(JobScheduler::start(executor_for_scheduler));
}
let inner = Arc::new(DatabaseInner {
engine,
executor: Mutex::new(executor),
dsn: dsn.to_string(),
scheduler_shutdown,
});
registry.insert(dsn.to_string(), Arc::downgrade(&inner));
Ok(Database { inner })
}
pub fn open_in_memory() -> Result<Self> {
let engine = MVCCEngine::in_memory();
engine.open_engine()?;
let engine = Arc::new(engine);
let function_registry = crate::functions::registry::global_registry();
let executor =
Executor::with_function_registry(Arc::clone(&engine), Arc::clone(function_registry));
let inner = Arc::new(DatabaseInner {
engine,
executor: Mutex::new(executor),
dsn: "memory://".to_string(),
scheduler_shutdown: None,
});
Ok(Database { inner })
}
fn parse_dsn(dsn: &str) -> Result<(String, String)> {
let idx = dsn
.find("://")
.ok_or_else(|| Error::parse("Invalid DSN format: expected scheme://path"))?;
let scheme = dsn[..idx].to_lowercase();
let path = dsn[idx + 3..].to_string();
match scheme.as_str() {
MEMORY_SCHEME | FILE_SCHEME => {}
_ => {
return Err(Error::parse(format!(
"Unsupported scheme '{}'. Use 'memory://' or 'file://path'",
scheme
)));
}
}
if scheme == FILE_SCHEME {
let clean_path = if path.contains('?') {
&path[..path.find('?').unwrap()]
} else {
&path
};
if clean_path.is_empty() {
return Err(Error::parse("file:// scheme requires a non-empty path"));
}
}
Ok((scheme, path))
}
fn parse_file_config(path: &str) -> Result<(String, Config)> {
let (clean_path, query) = if let Some(idx) = path.find('?') {
(path[..idx].to_string(), Some(&path[idx + 1..]))
} else {
(path.to_string(), None)
};
let mut config = Config::with_path(&clean_path);
if let Some(query) = query {
for param in query.split('&') {
let mut parts = param.splitn(2, '=');
let key = parts.next().unwrap_or("");
let value = parts.next().unwrap_or("");
match key {
"sync_mode" | "sync" => {
config.persistence.sync_mode = match value.to_lowercase().as_str() {
"none" | "off" | "0" => SyncMode::None,
"normal" | "1" => SyncMode::Normal,
"full" | "2" => SyncMode::Full,
_ => SyncMode::Normal,
};
}
"snapshot_interval" => {
if let Ok(secs) = value.parse::<u32>() {
config.persistence.snapshot_interval = secs;
}
}
"keep_snapshots" => {
if let Ok(count) = value.parse::<u32>() {
config.persistence.keep_snapshots = count;
}
}
"wal_flush_trigger" => {
if let Ok(bytes) = value.parse::<usize>() {
config.persistence.wal_flush_trigger = bytes;
}
}
"wal_buffer_size" => {
if let Ok(bytes) = value.parse::<usize>() {
config.persistence.wal_buffer_size = bytes;
}
}
"wal_max_size" => {
if let Ok(bytes) = value.parse::<usize>() {
config.persistence.wal_max_size = bytes;
}
}
"commit_batch_size" => {
if let Ok(size) = value.parse::<u32>() {
config.persistence.commit_batch_size = size;
}
}
"sync_interval_ms" | "sync_interval" => {
if let Ok(ms) = value.parse::<u32>() {
config.persistence.sync_interval_ms = ms;
}
}
"wal_compression" => {
config.persistence.wal_compression =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
}
"snapshot_compression" => {
config.persistence.snapshot_compression =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
}
"compression" => {
let enabled =
matches!(value.to_lowercase().as_str(), "on" | "true" | "1" | "yes");
config.persistence.wal_compression = enabled;
config.persistence.snapshot_compression = enabled;
}
"compression_threshold" => {
if let Ok(bytes) = value.parse::<usize>() {
config.persistence.compression_threshold = bytes;
}
}
_ => {} }
}
}
Ok((clean_path, config))
}
#[inline]
fn truncate_sql(sql: &str) -> &str {
if sql.len() > 1024 {
let max_idx = (0..=1024)
.rev()
.find(|&i| sql.is_char_boundary(i))
.unwrap_or(0);
&sql[..max_idx]
} else {
sql
}
}
pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<i64> {
let _span = tracing::info_span!("db.execute", sql = %Self::truncate_sql(sql)).entered();
let start = std::time::Instant::now();
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let result = if param_values.is_empty() {
executor.execute(sql)
} else {
executor.execute_with_params(sql, ¶m_values)
};
let elapsed = start.elapsed();
tracing::info!(
target: "oxibase::metrics",
metric_type = "histogram",
metric_name = "query_duration_ms",
value = elapsed.as_secs_f64() * 1000.0,
unit = "ms",
description = "Execution time of queries"
);
let result = match result {
Ok(res) => res,
Err(e) => {
tracing::info!(
target: "oxibase::metrics",
metric_type = "counter",
metric_name = "errors_total",
value = 1.0,
unit = "count",
description = "Total number of queries resulting in errors",
error = %e
);
return Err(e);
}
};
if elapsed.as_millis() > 1000 {
tracing::warn!("Slow query detected ({}ms): {}", elapsed.as_millis(), sql);
}
Ok(result.rows_affected())
}
pub fn query<P: Params>(&self, sql: &str, params: P) -> Result<Rows> {
let _span = tracing::info_span!("db.query", sql = %Self::truncate_sql(sql)).entered();
let start = std::time::Instant::now();
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let result = if param_values.is_empty() {
executor.execute(sql)
} else {
executor.execute_with_params(sql, ¶m_values)
};
let elapsed = start.elapsed();
tracing::info!(
target: "oxibase::metrics",
metric_type = "histogram",
metric_name = "query_duration_ms",
value = elapsed.as_secs_f64() * 1000.0,
unit = "ms",
description = "Execution time of queries"
);
let result = match result {
Ok(res) => res,
Err(e) => {
tracing::info!(
target: "oxibase::metrics",
metric_type = "counter",
metric_name = "errors_total",
value = 1.0,
unit = "count",
description = "Total number of queries resulting in errors",
error = %e
);
return Err(e);
}
};
if elapsed.as_millis() > 1000 {
tracing::warn!("Slow query detected ({}ms): {}", elapsed.as_millis(), sql);
}
Ok(Rows::new(result))
}
pub fn query_one<T: FromValue, P: Params>(&self, sql: &str, params: P) -> Result<T> {
let row = self
.query(sql, params)?
.next()
.ok_or(Error::NoRowsReturned)??;
row.get(0)
}
pub fn query_opt<T: FromValue, P: Params>(&self, sql: &str, params: P) -> Result<Option<T>> {
match self.query(sql, params)?.next() {
Some(row) => Ok(Some(row?.get(0)?)),
None => Ok(None),
}
}
pub fn execute_with_timeout<P: Params>(
&self,
sql: &str,
params: P,
timeout_ms: u64,
) -> Result<i64> {
let _span = tracing::info_span!("db.execute_with_timeout", sql = %Self::truncate_sql(sql))
.entered();
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let ctx = ExecutionContextBuilder::new()
.params(param_values)
.timeout_ms(timeout_ms)
.build();
let result = executor.execute_with_context(sql, &ctx)?;
Ok(result.rows_affected())
}
pub fn query_with_timeout<P: Params>(
&self,
sql: &str,
params: P,
timeout_ms: u64,
) -> Result<Rows> {
let _span =
tracing::info_span!("db.query_with_timeout", sql = %Self::truncate_sql(sql)).entered();
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let param_values = params.into_params();
let ctx = ExecutionContextBuilder::new()
.params(param_values)
.timeout_ms(timeout_ms)
.build();
let result = executor.execute_with_context(sql, &ctx)?;
Ok(Rows::new(result))
}
pub fn prepare(&self, sql: &str) -> Result<Statement> {
Statement::new(self.clone(), sql.to_string())
}
pub fn execute_named(&self, sql: &str, params: NamedParams) -> Result<i64> {
let _span =
tracing::info_span!("db.execute_named", sql = %Self::truncate_sql(sql)).entered();
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let result = executor.execute_with_named_params(sql, params.into_inner())?;
Ok(result.rows_affected())
}
pub fn query_named(&self, sql: &str, params: NamedParams) -> Result<Rows> {
let _span = tracing::info_span!("db.query_named", sql = %Self::truncate_sql(sql)).entered();
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let result = executor.execute_with_named_params(sql, params.into_inner())?;
Ok(Rows::new(result))
}
pub fn query_one_named<T: FromValue>(&self, sql: &str, params: NamedParams) -> Result<T> {
let mut rows = self.query_named(sql, params)?;
match rows.next() {
Some(Ok(row)) => row.get(0),
Some(Err(e)) => Err(e),
None => Err(Error::NoRowsReturned),
}
}
pub fn query_as<T: FromRow, P: Params>(&self, sql: &str, params: P) -> Result<Vec<T>> {
let rows = self.query(sql, params)?;
rows.map(|r| r.and_then(|row| T::from_row(&row))).collect()
}
pub fn query_as_named<T: FromRow>(&self, sql: &str, params: NamedParams) -> Result<Vec<T>> {
let rows = self.query_named(sql, params)?;
rows.map(|r| r.and_then(|row| T::from_row(&row))).collect()
}
pub fn begin(&self) -> Result<Transaction> {
self.begin_with_isolation(IsolationLevel::ReadCommitted)
}
pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> Result<Transaction> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
let tx = executor.begin_transaction_with_isolation(isolation)?;
Ok(Transaction::new(tx))
}
pub fn engine(&self) -> &Arc<MVCCEngine> {
&self.inner.engine
}
pub fn close(&self) -> Result<()> {
tracing::info!("Closing database connection: {}", self.inner.dsn);
let mut registry = DATABASE_REGISTRY
.write()
.map_err(|_| Error::LockAcquisitionFailed("registry write".to_string()))?;
registry.remove(&self.inner.dsn);
self.inner.engine.close_engine()?;
Ok(())
}
pub fn table_exists(&self, name: &str) -> Result<bool> {
let engine = &self.inner.engine;
let tx = engine.begin_transaction()?;
Ok(tx.get_table(name).is_ok())
}
pub fn dsn(&self) -> &str {
&self.inner.dsn
}
pub fn set_default_isolation_level(&self, level: IsolationLevel) -> Result<()> {
let mut executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
executor.set_default_isolation_level(level);
Ok(())
}
pub fn create_snapshot(&self) -> Result<()> {
use crate::storage::Engine;
self.inner.engine.create_snapshot()
}
pub(crate) fn executor(&self) -> &Mutex<Executor> {
&self.inner.executor
}
pub fn semantic_cache_stats(&self) -> Result<crate::executor::SemanticCacheStatsSnapshot> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
Ok(executor.semantic_cache_stats())
}
pub fn clear_semantic_cache(&self) -> Result<()> {
let executor = self
.inner
.executor
.lock()
.map_err(|_| Error::LockAcquisitionFailed("executor".to_string()))?;
executor.clear_semantic_cache();
Ok(())
}
}
pub trait FromValue: Sized {
fn from_value(value: &Value) -> Result<Self>;
}
impl FromValue for i64 {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Integer(i) => Ok(*i),
Value::Float(f) => Ok(*f as i64),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Integer".to_string(),
}),
}
}
}
impl FromValue for i32 {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Integer(i) => Ok(*i as i32),
Value::Float(f) => Ok(*f as i32),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Integer".to_string(),
}),
}
}
}
impl FromValue for f64 {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Float(f) => Ok(*f),
Value::Integer(i) => Ok(*i as f64),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Float".to_string(),
}),
}
}
}
impl FromValue for String {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Text(s) => Ok(s.to_string()),
Value::Json(s) => Ok(s.to_string()),
Value::Integer(i) => Ok(i.to_string()),
Value::Float(f) => Ok(f.to_string()),
Value::Boolean(b) => Ok(if *b {
"true".to_string()
} else {
"false".to_string()
}),
Value::Timestamp(ts) => Ok(ts.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
Value::Null(_) => Ok(String::new()),
}
}
}
impl FromValue for bool {
fn from_value(value: &Value) -> Result<Self> {
match value {
Value::Boolean(b) => Ok(*b),
Value::Integer(i) => Ok(*i != 0),
_ => Err(Error::TypeConversion {
from: format!("{:?}", value),
to: "Boolean".to_string(),
}),
}
}
}
impl FromValue for Value {
fn from_value(value: &Value) -> Result<Self> {
Ok(value.clone())
}
}
impl<T: FromValue> FromValue for Option<T> {
fn from_value(value: &Value) -> Result<Self> {
if value.is_null() {
Ok(None)
} else {
Ok(Some(T::from_value(value)?))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_open_memory() {
let db = Database::open("memory://").unwrap();
assert_eq!(db.dsn(), "memory://");
}
#[test]
fn test_open_in_memory() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute("INSERT INTO test VALUES ($1)", (1,)).unwrap();
for row in db.query("SELECT * FROM test", ()).unwrap() {
let row = row.unwrap();
let id: i64 = row.get(0).unwrap();
assert_eq!(id, 1);
}
}
#[test]
fn test_execute_and_query_new_api() {
let db = Database::open_in_memory().unwrap();
db.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
(),
)
.unwrap();
let affected = db
.execute(
"INSERT INTO users VALUES ($1, $2, $3), ($4, $5, $6)",
(1, "Alice", 30, 2, "Bob", 25),
)
.unwrap();
assert_eq!(affected, 2);
let rows: Vec<_> = db
.query("SELECT * FROM users ORDER BY id", ())
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
assert_eq!(rows[0].get::<String>(1).unwrap(), "Alice");
assert_eq!(rows[0].get::<i64>(2).unwrap(), 30);
}
#[test]
fn test_query_one() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute("INSERT INTO test VALUES ($1), ($2), ($3)", (1, 2, 3))
.unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test", ()).unwrap();
assert_eq!(count, 3);
}
#[test]
fn test_query_opt() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute("INSERT INTO test VALUES ($1)", (1,)).unwrap();
let result: Option<i64> = db
.query_opt("SELECT id FROM test WHERE id = $1", (1,))
.unwrap();
assert_eq!(result, Some(1));
let result: Option<i64> = db
.query_opt("SELECT id FROM test WHERE id = $1", (999,))
.unwrap();
assert_eq!(result, None);
}
#[test]
fn test_params_macro() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)", ())
.unwrap();
db.execute(
"INSERT INTO users VALUES ($1, $2)",
crate::params![1, "Alice"],
)
.unwrap();
let names: Vec<String> = db
.query("SELECT name FROM users WHERE id = $1", crate::params![1])
.unwrap()
.map(|r| r.and_then(|row| row.get(0)))
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(names, vec!["Alice"]);
}
#[test]
fn test_parse_dsn() {
let (scheme, path) = Database::parse_dsn("memory://").unwrap();
assert_eq!(scheme, "memory");
assert_eq!(path, "");
let (scheme, path) = Database::parse_dsn("file:///tmp/test.db").unwrap();
assert_eq!(scheme, "file");
assert_eq!(path, "/tmp/test.db");
let (scheme, path) = Database::parse_dsn("file:///tmp/test.db?sync=full").unwrap();
assert_eq!(scheme, "file");
assert_eq!(path, "/tmp/test.db?sync=full");
assert!(Database::parse_dsn("invalid").is_err());
assert!(Database::parse_dsn("unknown://test").is_err());
}
#[test]
fn test_from_value_types() {
assert_eq!(i64::from_value(&Value::Integer(42)).unwrap(), 42);
assert_eq!(f64::from_value(&Value::Float(3.5)).unwrap(), 3.5);
assert_eq!(
String::from_value(&Value::Text("hello".into())).unwrap(),
"hello"
);
assert!(bool::from_value(&Value::Boolean(true)).unwrap());
assert_eq!(
Option::<i64>::from_value(&Value::Integer(42)).unwrap(),
Some(42)
);
assert_eq!(
Option::<i64>::from_value(&Value::null_unknown()).unwrap(),
None
);
}
}