use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
use tracing::{debug, info, instrument, warn};
pub mod logging;
mod security;
pub use rustlite_core::index::{BTreeIndex, HashIndex, Index, IndexInfo, IndexManager, IndexType};
pub use rustlite_core::{Error, Result};
pub use rustlite_core::transaction::{
IsolationLevel, MVCCStorage, Timestamp, Transaction, TransactionId, TransactionManager,
VersionChain, VersionedValue,
};
pub use rustlite_core::query::{
Column, ExecutionContext, Executor, Lexer, Parser, PhysicalPlan, Planner, Query, Row, Value,
};
pub use rustlite_wal::{
RecoveryManager, RecoveryStats, SyncMode, WalConfig, WalManager, WalReader, WalRecord,
};
pub use rustlite_storage::{
CompactionConfig, CompactionStats, CompactionWorker, Manifest, Memtable, MemtableEntry,
SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter, StorageConfig, StorageEngine,
StorageStats,
};
pub use rustlite_snapshot::{
SnapshotConfig, SnapshotFile, SnapshotManager, SnapshotMeta, SnapshotType,
};
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
enum StorageBackend {
Memory(RwLock<HashMap<Vec<u8>, Vec<u8>>>),
Persistent(StorageEngine),
}
struct DatabaseInner {
storage: StorageBackend,
indexes: RwLock<IndexManager>,
transaction_manager: Option<Arc<TransactionManager>>,
}
#[derive(Clone)]
pub struct Database {
inner: Arc<DatabaseInner>,
}
impl Database {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path_ref = path.as_ref();
info!(path = ?path_ref, "Opening RustLite database");
let engine = StorageEngine::open(path)?;
let mvcc_storage = Arc::new(MVCCStorage::new());
let tx_manager = TransactionManager::new(mvcc_storage);
Ok(Database {
inner: Arc::new(DatabaseInner {
storage: StorageBackend::Persistent(engine),
indexes: RwLock::new(IndexManager::new()),
transaction_manager: Some(tx_manager),
}),
})
}
pub fn open_with_config<P: AsRef<Path>>(path: P, config: StorageConfig) -> Result<Self> {
let engine = StorageEngine::open_with_config(path, config)?;
let mvcc_storage = Arc::new(MVCCStorage::new());
let tx_manager = TransactionManager::new(mvcc_storage);
Ok(Database {
inner: Arc::new(DatabaseInner {
storage: StorageBackend::Persistent(engine),
indexes: RwLock::new(IndexManager::new()),
transaction_manager: Some(tx_manager),
}),
})
}
pub fn in_memory() -> Result<Self> {
info!("Creating in-memory RustLite database");
let mvcc_storage = Arc::new(MVCCStorage::new());
let tx_manager = TransactionManager::new(mvcc_storage);
Ok(Database {
inner: Arc::new(DatabaseInner {
storage: StorageBackend::Memory(RwLock::new(HashMap::new())),
indexes: RwLock::new(IndexManager::new()),
transaction_manager: Some(tx_manager),
}),
})
}
#[deprecated(
since = "0.2.0",
note = "Use `Database::open()` for persistent storage or `Database::in_memory()` for temporary storage"
)]
pub fn new() -> Result<Self> {
Self::in_memory()
}
#[instrument(skip(self, key, value), fields(key_len = key.len(), value_len = value.len()))]
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
security::validate_key(key)?;
security::validate_value(value)?;
debug!("Writing key-value pair");
match &self.inner.storage {
StorageBackend::Memory(store) => {
let mut store = store.write().map_err(|_| Error::LockPoisoned)?;
store.insert(key.to_vec(), value.to_vec());
Ok(())
}
StorageBackend::Persistent(engine) => engine.put(key, value),
}
}
#[instrument(skip(self, key), fields(key_len = key.len()))]
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
security::validate_key(key)?;
debug!("Reading key");
match &self.inner.storage {
StorageBackend::Memory(store) => {
let store = store.read().map_err(|_| Error::LockPoisoned)?;
Ok(store.get(key).cloned())
}
StorageBackend::Persistent(engine) => engine.get(key),
}
}
#[instrument(skip(self, key), fields(key_len = key.len()))]
pub fn delete(&self, key: &[u8]) -> Result<bool> {
security::validate_key(key)?;
debug!("Deleting key");
match &self.inner.storage {
StorageBackend::Memory(store) => {
let mut store = store.write().map_err(|_| Error::LockPoisoned)?;
Ok(store.remove(key).is_some())
}
StorageBackend::Persistent(engine) => {
let existed = engine.get(key)?.is_some();
if existed {
engine.delete(key)?;
}
Ok(existed)
}
}
}
pub fn sync(&self) -> Result<()> {
match &self.inner.storage {
StorageBackend::Memory(_) => Ok(()),
StorageBackend::Persistent(engine) => engine.sync(),
}
}
pub fn is_persistent(&self) -> bool {
matches!(&self.inner.storage, StorageBackend::Persistent(_))
}
#[instrument(skip(self), fields(name = %name, index_type = ?index_type))]
pub fn create_index(&self, name: &str, index_type: IndexType) -> Result<()> {
security::validate_index_name(name)?;
info!("Creating index");
let mut indexes = self
.inner
.indexes
.write()
.map_err(|_| Error::LockPoisoned)?;
indexes.create_index(name, index_type)
}
pub fn drop_index(&self, name: &str) -> Result<bool> {
let mut indexes = self
.inner
.indexes
.write()
.map_err(|_| Error::LockPoisoned)?;
indexes.drop_index(name)
}
pub fn index_insert(&self, name: &str, key: &[u8], value: u64) -> Result<()> {
let mut indexes = self
.inner
.indexes
.write()
.map_err(|_| Error::LockPoisoned)?;
indexes.insert(name, key, value)
}
pub fn index_find(&self, name: &str, key: &[u8]) -> Result<Vec<u64>> {
let indexes = self.inner.indexes.read().map_err(|_| Error::LockPoisoned)?;
indexes.find(name, key)
}
pub fn index_remove(&self, name: &str, key: &[u8]) -> Result<bool> {
let mut indexes = self
.inner
.indexes
.write()
.map_err(|_| Error::LockPoisoned)?;
indexes.remove(name, key)
}
pub fn list_indexes(&self) -> Result<Vec<String>> {
let indexes = self.inner.indexes.read().map_err(|_| Error::LockPoisoned)?;
Ok(indexes
.list_indexes()
.iter()
.map(|s| s.to_string())
.collect())
}
pub fn index_info(&self) -> Result<Vec<IndexInfo>> {
let indexes = self.inner.indexes.read().map_err(|_| Error::LockPoisoned)?;
Ok(indexes.index_info())
}
#[instrument(skip(self, sql, context), fields(sql_len = sql.len()))]
pub fn query(&self, sql: &str, context: ExecutionContext) -> Result<Vec<Row>> {
security::validate_query(sql)?;
debug!(sql = %sql, "Executing query");
let mut parser =
Parser::new(sql).map_err(|e| Error::InvalidInput(format!("Parse error: {}", e)))?;
let query = parser
.parse()
.map_err(|e| Error::InvalidInput(format!("Parse error: {}", e)))?;
let planner = Planner::new();
let plan = planner
.plan(&query)
.map_err(|e| Error::InvalidInput(format!("Planning error: {}", e)))?;
let mut executor = Executor::new(context);
executor.execute(&plan)
}
pub fn prepare(&self, sql: &str) -> Result<PhysicalPlan> {
let mut parser =
Parser::new(sql).map_err(|e| Error::InvalidInput(format!("Parse error: {}", e)))?;
let query = parser
.parse()
.map_err(|e| Error::InvalidInput(format!("Parse error: {}", e)))?;
let planner = Planner::new();
planner
.plan(&query)
.map_err(|e| Error::InvalidInput(format!("Planning error: {}", e)))
}
pub fn execute_plan(&self, plan: &PhysicalPlan, context: ExecutionContext) -> Result<Vec<Row>> {
let mut executor = Executor::new(context);
executor.execute(plan)
}
#[instrument(skip(self), fields(isolation = ?isolation))]
pub fn begin_transaction(&self, isolation: IsolationLevel) -> Result<Transaction> {
info!("Beginning transaction");
if let Some(ref manager) = self.inner.transaction_manager {
manager.begin(isolation)
} else {
Err(Error::Transaction(
"Transaction support not initialized".into(),
))
}
}
pub fn begin(&self) -> Result<Transaction> {
self.begin_transaction(IsolationLevel::default())
}
pub fn gc(&self) -> Result<()> {
if let Some(ref manager) = self.inner.transaction_manager {
manager.gc()
} else {
Ok(()) }
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_version() {
assert_eq!(VERSION, "0.7.0");
}
#[test]
fn test_in_memory_database() {
let db = Database::in_memory().unwrap();
db.put(b"key", b"value").unwrap();
assert_eq!(db.get(b"key").unwrap(), Some(b"value".to_vec()));
assert!(!db.is_persistent());
}
#[test]
fn test_persistent_database() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
db.put(b"persist", b"data").unwrap();
assert_eq!(db.get(b"persist").unwrap(), Some(b"data".to_vec()));
assert!(db.is_persistent());
}
#[test]
fn test_persistence_across_reopens() {
let dir = tempdir().unwrap();
{
let db = Database::open(dir.path()).unwrap();
db.put(b"key1", b"value1").unwrap();
db.put(b"key2", b"value2").unwrap();
db.sync().unwrap();
}
{
let db = Database::open(dir.path()).unwrap();
assert_eq!(db.get(b"key1").unwrap(), Some(b"value1".to_vec()));
assert_eq!(db.get(b"key2").unwrap(), Some(b"value2".to_vec()));
}
}
#[test]
fn test_delete() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
db.put(b"key", b"value").unwrap();
assert!(db.delete(b"key").unwrap());
assert_eq!(db.get(b"key").unwrap(), None);
assert!(!db.delete(b"key").unwrap()); }
#[test]
fn test_update() {
let db = Database::in_memory().unwrap();
db.put(b"counter", b"1").unwrap();
assert_eq!(db.get(b"counter").unwrap(), Some(b"1".to_vec()));
db.put(b"counter", b"2").unwrap();
assert_eq!(db.get(b"counter").unwrap(), Some(b"2".to_vec()));
}
#[test]
#[allow(deprecated)]
fn test_backward_compatibility() {
let db = Database::new().unwrap();
db.put(b"key", b"value").unwrap();
assert_eq!(db.get(b"key").unwrap(), Some(b"value".to_vec()));
}
#[test]
fn test_create_and_drop_index() {
let db = Database::in_memory().unwrap();
db.create_index("test_idx", IndexType::BTree).unwrap();
assert_eq!(db.list_indexes().unwrap().len(), 1);
assert!(db.drop_index("test_idx").unwrap());
assert_eq!(db.list_indexes().unwrap().len(), 0);
}
#[test]
fn test_btree_index_operations() {
let db = Database::in_memory().unwrap();
db.create_index("names", IndexType::BTree).unwrap();
db.index_insert("names", b"alice", 100).unwrap();
db.index_insert("names", b"bob", 101).unwrap();
db.index_insert("names", b"charlie", 102).unwrap();
assert_eq!(db.index_find("names", b"bob").unwrap(), vec![101]);
assert!(db.index_remove("names", b"bob").unwrap());
assert!(db.index_find("names", b"bob").unwrap().is_empty());
}
#[test]
fn test_hash_index_operations() {
let db = Database::in_memory().unwrap();
db.create_index("sessions", IndexType::Hash).unwrap();
db.index_insert("sessions", b"sess:abc", 500).unwrap();
db.index_insert("sessions", b"sess:def", 501).unwrap();
assert_eq!(db.index_find("sessions", b"sess:abc").unwrap(), vec![500]);
assert!(db
.index_find("sessions", b"nonexistent")
.unwrap()
.is_empty());
}
#[test]
fn test_index_info() {
let db = Database::in_memory().unwrap();
db.create_index("idx1", IndexType::BTree).unwrap();
db.create_index("idx2", IndexType::Hash).unwrap();
db.index_insert("idx1", b"key1", 1).unwrap();
db.index_insert("idx1", b"key2", 2).unwrap();
db.index_insert("idx2", b"key3", 3).unwrap();
let info = db.index_info().unwrap();
assert_eq!(info.len(), 2);
}
#[test]
fn test_simple_query() {
let db = Database::in_memory().unwrap();
let mut context = ExecutionContext::new();
context.data.insert(
"users".to_string(),
vec![
Row {
columns: vec![
Column {
name: "name".to_string(),
alias: None,
},
Column {
name: "age".to_string(),
alias: None,
},
],
values: vec![Value::String("Alice".to_string()), Value::Integer(30)],
},
Row {
columns: vec![
Column {
name: "name".to_string(),
alias: None,
},
Column {
name: "age".to_string(),
alias: None,
},
],
values: vec![Value::String("Bob".to_string()), Value::Integer(25)],
},
],
);
let results = db.query("SELECT * FROM users", context).unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_query_with_where() {
let db = Database::in_memory().unwrap();
let mut context = ExecutionContext::new();
context.data.insert(
"users".to_string(),
vec![
Row {
columns: vec![
Column {
name: "name".to_string(),
alias: None,
},
Column {
name: "age".to_string(),
alias: None,
},
],
values: vec![Value::String("Alice".to_string()), Value::Integer(30)],
},
Row {
columns: vec![
Column {
name: "name".to_string(),
alias: None,
},
Column {
name: "age".to_string(),
alias: None,
},
],
values: vec![Value::String("Bob".to_string()), Value::Integer(25)],
},
],
);
let results = db
.query("SELECT name FROM users WHERE age > 26", context)
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].values[0], Value::String("Alice".to_string()));
}
#[test]
fn test_query_with_limit() {
let db = Database::in_memory().unwrap();
let mut context = ExecutionContext::new();
context.data.insert(
"users".to_string(),
vec![
Row {
columns: vec![Column {
name: "name".to_string(),
alias: None,
}],
values: vec![Value::String("Alice".to_string())],
},
Row {
columns: vec![Column {
name: "name".to_string(),
alias: None,
}],
values: vec![Value::String("Bob".to_string())],
},
Row {
columns: vec![Column {
name: "name".to_string(),
alias: None,
}],
values: vec![Value::String("Charlie".to_string())],
},
],
);
let results = db.query("SELECT * FROM users LIMIT 2", context).unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_prepare_and_execute() {
let db = Database::in_memory().unwrap();
let plan = db.prepare("SELECT * FROM users WHERE age > 18").unwrap();
let mut context = ExecutionContext::new();
context.data.insert(
"users".to_string(),
vec![Row {
columns: vec![
Column {
name: "name".to_string(),
alias: None,
},
Column {
name: "age".to_string(),
alias: None,
},
],
values: vec![Value::String("Alice".to_string()), Value::Integer(30)],
}],
);
let results = db.execute_plan(&plan, context).unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_transaction_basic() {
let db = Database::in_memory().unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"key1".to_vec(), b"value1".to_vec()).unwrap();
txn.put(b"key2".to_vec(), b"value2".to_vec()).unwrap();
txn.commit().unwrap();
let txn2 = db.begin().unwrap();
assert_eq!(txn2.get(b"key1").unwrap(), Some(b"value1".to_vec()));
assert_eq!(txn2.get(b"key2").unwrap(), Some(b"value2".to_vec()));
}
#[test]
fn test_transaction_isolation() {
let db = Database::in_memory().unwrap();
let mut txn1 = db.begin().unwrap();
txn1.put(b"counter".to_vec(), b"1".to_vec()).unwrap();
txn1.commit().unwrap();
let txn2 = db.begin().unwrap();
assert_eq!(txn2.get(b"counter").unwrap(), Some(b"1".to_vec()));
let mut txn3 = db.begin().unwrap();
txn3.put(b"counter".to_vec(), b"2".to_vec()).unwrap();
txn3.commit().unwrap();
assert_eq!(txn2.get(b"counter").unwrap(), Some(b"1".to_vec()));
}
#[test]
fn test_transaction_rollback() {
let db = Database::in_memory().unwrap();
let mut txn1 = db.begin().unwrap();
txn1.put(b"key1".to_vec(), b"original".to_vec()).unwrap();
txn1.commit().unwrap();
let mut txn2 = db.begin().unwrap();
txn2.put(b"key1".to_vec(), b"updated".to_vec()).unwrap();
txn2.rollback().unwrap();
let txn3 = db.begin().unwrap();
assert_eq!(txn3.get(b"key1").unwrap(), Some(b"original".to_vec()));
}
#[test]
fn test_transaction_delete() {
let db = Database::in_memory().unwrap();
let mut txn1 = db.begin().unwrap();
txn1.put(b"temp".to_vec(), b"data".to_vec()).unwrap();
txn1.commit().unwrap();
let mut txn2 = db.begin().unwrap();
txn2.delete(b"temp").unwrap();
txn2.commit().unwrap();
let txn3 = db.begin().unwrap();
assert_eq!(txn3.get(b"temp").unwrap(), None);
}
#[test]
fn test_transaction_scan() {
let db = Database::in_memory().unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"user:1".to_vec(), b"alice".to_vec()).unwrap();
txn.put(b"user:2".to_vec(), b"bob".to_vec()).unwrap();
txn.put(b"post:1".to_vec(), b"post1".to_vec()).unwrap();
txn.commit().unwrap();
let txn2 = db.begin().unwrap();
let results = txn2.scan(b"user:").unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_transaction_with_index() {
let db = Database::in_memory().unwrap();
db.create_index("user_idx", IndexType::Hash).unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"user:1".to_vec(), b"alice@example.com".to_vec())
.unwrap();
txn.commit().unwrap();
db.index_insert("user_idx", b"alice@example.com", 1)
.unwrap();
let txn2 = db.begin().unwrap();
let ids = db.index_find("user_idx", b"alice@example.com").unwrap();
assert_eq!(ids, vec![1]);
assert_eq!(
txn2.get(b"user:1").unwrap(),
Some(b"alice@example.com".to_vec())
);
}
#[test]
fn test_concurrent_transaction_isolation() {
use std::sync::Arc;
use std::thread;
let db = Arc::new(Database::in_memory().unwrap());
let mut setup = db.begin().unwrap();
setup.put(b"balance".to_vec(), b"1000".to_vec()).unwrap();
setup.commit().unwrap();
let db1 = db.clone();
let handle1 = thread::spawn(move || {
let txn = db1.begin().unwrap();
let balance1_bytes = txn.get(b"balance").unwrap().unwrap();
let balance1 = String::from_utf8_lossy(&balance1_bytes);
thread::sleep(std::time::Duration::from_millis(10));
let balance2_bytes = txn.get(b"balance").unwrap().unwrap();
let balance2 = String::from_utf8_lossy(&balance2_bytes);
assert_eq!(balance1, balance2); balance1.to_string()
});
let db2 = db.clone();
let handle2 = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(5));
let mut txn = db2.begin().unwrap();
txn.put(b"balance".to_vec(), b"2000".to_vec()).unwrap();
txn.commit().unwrap();
});
let balance_seen = handle1.join().unwrap();
handle2.join().unwrap();
assert_eq!(balance_seen, "1000");
let final_txn = db.begin().unwrap();
let final_balance_bytes = final_txn.get(b"balance").unwrap().unwrap();
let final_balance = String::from_utf8_lossy(&final_balance_bytes);
assert_eq!(final_balance, "2000");
}
#[test]
fn test_transaction_error_handling() {
let db = Database::in_memory().unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"key".to_vec(), b"value".to_vec()).unwrap();
txn.commit().unwrap();
}
#[test]
fn test_transaction_with_query() {
let db = Database::in_memory().unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"user:1:name".to_vec(), b"Alice".to_vec()).unwrap();
txn.put(b"user:1:age".to_vec(), b"30".to_vec()).unwrap();
txn.put(b"user:2:name".to_vec(), b"Bob".to_vec()).unwrap();
txn.put(b"user:2:age".to_vec(), b"25".to_vec()).unwrap();
txn.commit().unwrap();
let query_txn = db.begin().unwrap();
let name = query_txn.get(b"user:1:name").unwrap();
assert_eq!(name, Some(b"Alice".to_vec()));
}
#[test]
fn test_garbage_collection() {
let db = Database::in_memory().unwrap();
for i in 0..10 {
let mut txn = db.begin().unwrap();
txn.put(b"key".to_vec(), format!("version{}", i).into_bytes())
.unwrap();
txn.commit().unwrap();
}
db.gc().unwrap();
let txn = db.begin().unwrap();
assert_eq!(txn.get(b"key").unwrap(), Some(b"version9".to_vec()));
}
#[test]
fn test_persistent_transactions() {
let dir = tempdir().unwrap();
let path = dir.path();
{
let db = Database::open(path).unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"persistent_key".to_vec(), b"persistent_value".to_vec())
.unwrap();
txn.commit().unwrap();
db.put(b"direct_key", b"direct_value").unwrap();
db.sync().unwrap();
}
{
let db = Database::open(path).unwrap();
assert_eq!(
db.get(b"direct_key").unwrap(),
Some(b"direct_value".to_vec())
);
}
}
#[test]
fn test_transaction_with_large_dataset() {
let db = Database::in_memory().unwrap();
let mut txn = db.begin().unwrap();
for i in 0..1000 {
let key = format!("key:{:04}", i);
let value = format!("value:{}", i);
txn.put(key.into_bytes(), value.into_bytes()).unwrap();
}
txn.commit().unwrap();
let verify_txn = db.begin().unwrap();
for i in 0..1000 {
let key = format!("key:{:04}", i);
let expected_value = format!("value:{}", i);
assert_eq!(
verify_txn.get(&key.into_bytes()).unwrap(),
Some(expected_value.into_bytes())
);
}
}
#[test]
fn test_mixed_transaction_and_direct_operations() {
let db = Database::in_memory().unwrap();
db.put(b"direct", b"value1").unwrap();
let mut txn = db.begin().unwrap();
txn.put(b"txn".to_vec(), b"value2".to_vec()).unwrap();
txn.commit().unwrap();
assert_eq!(db.get(b"direct").unwrap(), Some(b"value1".to_vec()));
let read_txn = db.begin().unwrap();
assert_eq!(read_txn.get(b"txn").unwrap(), Some(b"value2".to_vec()));
}
#[test]
fn test_serializable_isolation() {
let db = Database::in_memory().unwrap();
let mut setup = db.begin().unwrap();
setup.put(b"counter".to_vec(), b"0".to_vec()).unwrap();
setup.commit().unwrap();
let txn = db.begin_transaction(IsolationLevel::Serializable).unwrap();
assert_eq!(txn.isolation_level(), IsolationLevel::Serializable);
let value = txn.get(b"counter").unwrap();
assert_eq!(value, Some(b"0".to_vec()));
}
#[test]
fn test_multiple_isolation_levels() {
let db = Database::in_memory().unwrap();
let _txn1 = db
.begin_transaction(IsolationLevel::ReadUncommitted)
.unwrap();
let _txn2 = db.begin_transaction(IsolationLevel::ReadCommitted).unwrap();
let _txn3 = db
.begin_transaction(IsolationLevel::RepeatableRead)
.unwrap();
let _txn4 = db.begin_transaction(IsolationLevel::Serializable).unwrap();
}
}