use crate::database::{MoteDB, TransactionStats};
use crate::database::indexes::{VectorIndexStats, SpatialIndexStats};
use crate::sql::StreamingQueryResult;
use crate::sql::ast::Statement;
use crate::types::{Value, Row, RowId, SqlRow};
use crate::{Result, DBConfig};
use lru::LruCache;
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
pub struct Database {
inner: Arc<MoteDB>,
stmt_cache: Arc<Mutex<LruCache<String, Statement>>>,
}
impl Database {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
Ok(Self {
inner: Arc::new(MoteDB::create(path)?),
stmt_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(256).unwrap()))),
})
}
pub fn create_with_config<P: AsRef<Path>>(path: P, config: DBConfig) -> Result<Self> {
Ok(Self {
inner: Arc::new(MoteDB::create_with_config(path, config)?),
stmt_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(256).unwrap()))),
})
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Ok(Self {
inner: Arc::new(MoteDB::open(path)?),
stmt_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(256).unwrap()))),
})
}
pub fn open_with_config<P: AsRef<Path>>(path: P, config: DBConfig) -> Result<Self> {
Ok(Self {
inner: Arc::new(MoteDB::open_with_config(path, config)?),
stmt_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(256).unwrap()))),
})
}
pub fn flush(&self) -> Result<()> {
self.inner.flush()
}
pub fn checkpoint(&self) -> Result<()> {
self.inner.checkpoint()
}
pub fn close(&self) -> Result<()> {
self.flush()
}
pub fn execute(&self, sql: &str) -> Result<StreamingQueryResult> {
use crate::sql::{Lexer, Parser, QueryExecutor};
let statement = {
let mut cache = self.stmt_cache.lock().unwrap();
if let Some(stmt) = cache.get(sql) {
stmt.clone()
} else {
let mut lexer = Lexer::new(sql);
let tokens = lexer.tokenize()?;
let mut parser = Parser::new(tokens);
let stmt = parser.parse()?;
cache.put(sql.to_string(), stmt.clone());
stmt
}
};
let executor = QueryExecutor::new(self.inner.clone());
executor.execute_streaming(statement)
}
pub fn begin_transaction(&self) -> Result<u64> {
self.inner.begin_transaction()
}
pub fn commit_transaction(&self, tx_id: u64) -> Result<()> {
self.inner.commit_transaction(tx_id)
}
pub fn rollback_transaction(&self, tx_id: u64) -> Result<()> {
self.inner.rollback_transaction(tx_id)
}
pub fn savepoint(&self, tx_id: u64, name: &str) -> Result<()> {
self.inner.create_savepoint(tx_id, name.to_string())
}
pub fn rollback_to_savepoint(&self, tx_id: u64, name: &str) -> Result<()> {
self.inner.rollback_to_savepoint(tx_id, name)
}
pub fn release_savepoint(&self, tx_id: u64, name: &str) -> Result<()> {
self.inner.release_savepoint(tx_id, name)
}
pub fn batch_insert(&self, _table_name: &str, rows: Vec<Row>) -> Result<Vec<RowId>> {
self.inner.batch_insert_rows(rows)
}
pub fn batch_insert_map(&self, table_name: &str, sql_rows: Vec<SqlRow>) -> Result<Vec<RowId>> {
let schema = self.inner.get_table_schema(table_name)?;
let rows: Result<Vec<Row>> = sql_rows.into_iter().map(|sql_row| {
crate::sql::row_converter::sql_row_to_row(&sql_row, &schema)
}).collect();
self.inner.batch_insert_rows_to_table(table_name, rows?)
}
pub fn batch_insert_with_vectors(&self, table_name: &str, rows: Vec<Row>, _vector_columns: &[&str]) -> Result<Vec<RowId>> {
self.inner.batch_insert_rows_to_table(table_name, rows)
}
pub fn batch_insert_with_vectors_map(&self, table_name: &str, sql_rows: Vec<SqlRow>, vector_columns: &[&str]) -> Result<Vec<RowId>> {
let schema = self.inner.get_table_schema(table_name)?;
let rows: Result<Vec<Row>> = sql_rows.into_iter().map(|sql_row| {
crate::sql::row_converter::sql_row_to_row(&sql_row, &schema)
}).collect();
self.batch_insert_with_vectors(table_name, rows?, vector_columns)
}
pub fn create_column_index(&self, table_name: &str, column_name: &str) -> Result<()> {
self.inner.create_column_index(table_name, column_name)
}
pub fn create_vector_index(&self, index_name: &str, dimension: usize) -> Result<()> {
self.inner.create_vector_index(index_name, dimension)
}
pub fn create_text_index(&self, index_name: &str) -> Result<()> {
self.inner.create_text_index(index_name)
}
pub fn create_spatial_index(&self, index_name: &str, bounds: crate::types::BoundingBox) -> Result<()> {
self.inner.create_spatial_index(index_name, bounds)
}
pub fn drop_index(&self, table_name: &str, index_name: &str) -> Result<()> {
let sql = format!("DROP INDEX {} ON {}", index_name, table_name);
self.execute(&sql)?;
Ok(())
}
pub fn query_by_column(&self, table_name: &str, column_name: &str, value: &Value) -> Result<Vec<RowId>> {
self.inner.query_by_column(table_name, column_name, value)
}
pub fn query_by_column_range(&self, table_name: &str, column_name: &str,
start: &Value, end: &Value) -> Result<Vec<RowId>> {
self.inner.query_by_column_range(table_name, column_name, start, end)
}
pub fn query_by_column_between(&self, table_name: &str, column_name: &str,
start: &Value, start_inclusive: bool,
end: &Value, end_inclusive: bool) -> Result<Vec<RowId>> {
self.inner.query_by_column_between(table_name, column_name, start, start_inclusive, end, end_inclusive)
}
pub fn vector_search(&self, index_name: &str, query: &[f32], k: usize) -> Result<Vec<(RowId, f32)>> {
self.inner.vector_search(index_name, query, k)
}
pub fn text_search_ranked(&self, index_name: &str, query: &str, top_k: usize) -> Result<Vec<(RowId, f32)>> {
self.inner.text_search_ranked(index_name, query, top_k)
}
pub fn spatial_search(&self, index_name: &str, bbox: &crate::types::BoundingBox) -> Result<Vec<RowId>> {
self.inner.spatial_range_query(index_name, bbox)
}
pub fn query_timestamp_range(&self, start: i64, end: i64) -> Result<Vec<RowId>> {
self.inner.query_timestamp_range(start, end)
}
pub fn vector_index_stats(&self, index_name: &str) -> Result<VectorIndexStats> {
self.inner.vector_index_stats(index_name)
}
pub fn spatial_index_stats(&self, index_name: &str) -> Result<SpatialIndexStats> {
self.inner.spatial_index_stats(index_name)
}
pub fn transaction_stats(&self) -> TransactionStats {
self.inner.transaction_stats()
}
pub fn insert_row(&self, table_name: &str, row: Row) -> Result<RowId> {
self.inner.insert_row_to_table(table_name, row)
}
pub fn insert_row_map(&self, table_name: &str, sql_row: SqlRow) -> Result<RowId> {
let schema = self.inner.get_table_schema(table_name)?;
let row = crate::sql::row_converter::sql_row_to_row(&sql_row, &schema)?;
self.inner.insert_row_to_table(table_name, row)
}
pub fn get_row(&self, row_id: RowId) -> Result<Option<Row>> {
self.inner.get_row(row_id)
}
pub fn get_row_map(&self, table_name: &str, row_id: RowId) -> Result<Option<SqlRow>> {
if let Some(row) = self.inner.get_row(row_id)? {
let schema = self.inner.get_table_schema(table_name)?;
Ok(Some(crate::sql::row_converter::row_to_sql_row(&row, &schema)?))
} else {
Ok(None)
}
}
pub fn update_row(&self, table_name: &str, row_id: RowId, new_row: Row) -> Result<()> {
let old_row = self.inner.get_table_row(table_name, row_id)?
.ok_or_else(|| crate::StorageError::InvalidData(
format!("Row {} not found in table '{}'", row_id, table_name)
))?;
self.inner.update_row_in_table(table_name, row_id, old_row, new_row)
}
pub fn update_row_map(&self, table_name: &str, row_id: RowId, sql_row: SqlRow) -> Result<()> {
let old_row = self.inner.get_table_row(table_name, row_id)?
.ok_or_else(|| crate::StorageError::InvalidData(
format!("Row {} not found in table '{}'", row_id, table_name)
))?;
let schema = self.inner.get_table_schema(table_name)?;
let new_row = crate::sql::row_converter::sql_row_to_row(&sql_row, &schema)?;
self.inner.update_row_in_table(table_name, row_id, old_row, new_row)
}
pub fn delete_row(&self, table_name: &str, row_id: RowId) -> Result<()> {
let old_row = self.inner.get_table_row(table_name, row_id)?
.ok_or_else(|| crate::StorageError::InvalidData(
format!("Row {} not found in table '{}'", row_id, table_name)
))?;
self.inner.delete_row_from_table(table_name, row_id, old_row)
}
pub fn scan_table(&self, table_name: &str) -> Result<Vec<(RowId, Row)>> {
self.inner.scan_table_rows(table_name)
}
}
impl Drop for Database {
fn drop(&mut self) {
let _ = self.inner.flush();
}
}