use rustc_hash::FxHashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::path::Path;
use super::file_lock::FileLock;
use crate::core::{DataType, Error, IsolationLevel, Result, Schema};
use crate::storage::config::Config;
use crate::storage::mvcc::wal_manager::WALOperationType;
use crate::storage::mvcc::{
MVCCTable, MvccTransaction, PersistenceManager, RowVersion, TransactionEngineOperations,
TransactionRegistry, TransactionVersionStore, VersionStore, VisibilityChecker,
INVALID_TRANSACTION_ID,
};
use crate::storage::traits::{Engine, Index, Table, Transaction};
type TxnVersionStoreMap = FxHashMap<(i64, String), Arc<RwLock<TransactionVersionStore>>>;
const SNAPSHOT_META_MAGIC: u32 = 0x50414E53;
const SNAPSHOT_META_VERSION: u32 = 1;
fn write_snapshot_metadata(path: &std::path::Path, lsn: u64) -> Result<()> {
use std::io::Write;
let mut buf = Vec::with_capacity(28);
buf.extend_from_slice(&SNAPSHOT_META_MAGIC.to_le_bytes());
buf.extend_from_slice(&SNAPSHOT_META_VERSION.to_le_bytes());
buf.extend_from_slice(&lsn.to_le_bytes());
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
buf.extend_from_slice(×tamp.to_le_bytes());
let crc = crc32fast::hash(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
let temp_path = path.with_extension("bin.tmp");
let mut file = std::fs::File::create(&temp_path).map_err(|e| {
Error::internal(format!(
"failed to create snapshot metadata temp file: {}",
e
))
})?;
file.write_all(&buf)
.map_err(|e| Error::internal(format!("failed to write snapshot metadata: {}", e)))?;
file.sync_all()
.map_err(|e| Error::internal(format!("failed to sync snapshot metadata: {}", e)))?;
std::fs::rename(&temp_path, path)
.map_err(|e| Error::internal(format!("failed to rename snapshot metadata: {}", e)))?;
if let Some(parent) = path.parent() {
if let Ok(dir_file) = std::fs::File::open(parent) {
let _ = dir_file.sync_all();
}
}
Ok(())
}
fn read_snapshot_metadata(path: &std::path::Path) -> u64 {
let data = match std::fs::read(path) {
Ok(d) => d,
Err(_) => return 0,
};
if data.len() < 28 {
return 0;
}
let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
if magic != SNAPSHOT_META_MAGIC {
return 0;
}
let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
if version > SNAPSHOT_META_VERSION {
eprintln!(
"Warning: Snapshot metadata version {} is newer than supported {}",
version, SNAPSHOT_META_VERSION
);
return 0;
}
let stored_crc = u32::from_le_bytes(data[24..28].try_into().unwrap());
let computed_crc = crc32fast::hash(&data[0..24]);
if stored_crc != computed_crc {
eprintln!("Warning: Snapshot metadata checksum mismatch");
return 0;
}
u64::from_le_bytes(data[8..16].try_into().unwrap())
}
fn read_snapshot_lsn(snapshot_dir: &std::path::Path) -> u64 {
let bin_path = snapshot_dir.join("snapshot_meta.bin");
if bin_path.exists() {
let lsn = read_snapshot_metadata(&bin_path);
if lsn > 0 {
return lsn;
}
}
let json_path = snapshot_dir.join("snapshot_meta.json");
if json_path.exists() {
if let Ok(content) = std::fs::read_to_string(&json_path) {
return content
.trim()
.strip_prefix("{\"lsn\":")
.and_then(|s| s.strip_suffix("}"))
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0);
}
}
0
}
#[derive(Debug, Clone)]
pub struct ViewDefinition {
pub name: String,
pub original_name: String,
pub query: String,
}
impl ViewDefinition {
pub fn new(name: &str, query: String) -> Self {
Self {
name: name.to_lowercase(),
original_name: name.to_string(),
query,
}
}
pub fn serialize(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&(self.original_name.len() as u16).to_le_bytes());
buf.extend_from_slice(self.original_name.as_bytes());
buf.extend_from_slice(&(self.query.len() as u32).to_le_bytes());
buf.extend_from_slice(self.query.as_bytes());
buf
}
pub fn deserialize(data: &[u8]) -> crate::core::Result<Self> {
let mut pos = 0;
if pos + 2 > data.len() {
return Err(crate::core::Error::internal(
"invalid view: missing name length",
));
}
let name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + name_len > data.len() {
return Err(crate::core::Error::internal("invalid view: missing name"));
}
let original_name = String::from_utf8(data[pos..pos + name_len].to_vec())
.map_err(|e| crate::core::Error::internal(format!("invalid view name: {}", e)))?;
pos += name_len;
if pos + 4 > data.len() {
return Err(crate::core::Error::internal(
"invalid view: missing query length",
));
}
let query_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if pos + query_len > data.len() {
return Err(crate::core::Error::internal("invalid view: missing query"));
}
let query = String::from_utf8(data[pos..pos + query_len].to_vec())
.map_err(|e| crate::core::Error::internal(format!("invalid view query: {}", e)))?;
Ok(Self::new(&original_name, query))
}
}
const DEFAULT_SCHEMA: &str = "public";
pub struct MVCCEngine {
path: String,
config: RwLock<Config>,
pub(crate) schemas: Arc<RwLock<FxHashMap<String, FxHashMap<String, Schema>>>>,
version_stores: Arc<RwLock<FxHashMap<String, Arc<VersionStore>>>>,
registry: Arc<TransactionRegistry>,
open: AtomicBool,
txn_version_stores: Arc<RwLock<TxnVersionStoreMap>>,
views: RwLock<FxHashMap<String, FxHashMap<String, Arc<ViewDefinition>>>>,
#[allow(clippy::type_complexity)]
pub(crate) sequences:
Arc<RwLock<FxHashMap<String, FxHashMap<String, Arc<crate::core::SequenceState>>>>>,
#[allow(clippy::type_complexity)]
ring_buffers: Arc<
RwLock<
FxHashMap<
String,
std::sync::Arc<parking_lot::RwLock<std::collections::VecDeque<crate::core::Row>>>,
>,
>,
>,
persistence: Arc<Option<PersistenceManager>>,
loading_from_disk: Arc<AtomicBool>,
file_lock: Mutex<Option<FileLock>>,
}
impl MVCCEngine {
pub fn new(config: Config) -> Self {
let path = config.path.clone().unwrap_or_default();
let persistence = if !path.is_empty() && config.persistence.enabled {
match PersistenceManager::new(Some(Path::new(&path)), &config.persistence) {
Ok(pm) => Some(pm),
Err(e) => {
eprintln!("Warning: Failed to initialize persistence: {}", e);
None
}
}
} else {
None
};
Self {
path: if path.is_empty() {
"memory://".to_string()
} else {
path
},
config: RwLock::new(config),
schemas: Arc::new(RwLock::new(FxHashMap::default())),
version_stores: Arc::new(RwLock::new(FxHashMap::default())),
registry: Arc::new(TransactionRegistry::new()),
open: AtomicBool::new(false),
txn_version_stores: Arc::new(RwLock::new(FxHashMap::default())),
views: RwLock::new(FxHashMap::default()),
sequences: Arc::new(RwLock::new(FxHashMap::default())),
ring_buffers: Arc::new(RwLock::new(FxHashMap::default())),
persistence: Arc::new(persistence),
loading_from_disk: Arc::new(AtomicBool::new(false)),
file_lock: Mutex::new(None),
}
}
pub fn in_memory() -> Self {
Self::new(Config::default())
}
pub fn open_engine(&self) -> Result<()> {
if self.open.swap(true, Ordering::AcqRel) {
return Ok(()); }
if self.path != "memory://" {
let lock = FileLock::acquire(&self.path)?;
let mut file_lock = self.file_lock.lock().unwrap();
*file_lock = Some(lock);
}
self.registry.start_accepting_transactions();
if let Some(ref pm) = *self.persistence {
if pm.is_enabled() {
pm.start()?;
self.loading_from_disk.store(true, Ordering::Release);
let snapshot_lsn = self.load_snapshots()?;
self.replay_wal(snapshot_lsn)?;
self.loading_from_disk.store(false, Ordering::Release);
}
}
Ok(())
}
fn load_snapshots(&self) -> Result<u64> {
let pm = match self.persistence.as_ref() {
Some(pm) if pm.is_enabled() => pm,
_ => return Ok(0),
};
let snapshot_dir = pm.path().join("snapshots");
if !snapshot_dir.exists() {
return Ok(0); }
let metadata_lsn = read_snapshot_lsn(&snapshot_dir);
let mut max_header_lsn: u64 = 0;
let table_dirs = match std::fs::read_dir(&snapshot_dir) {
Ok(entries) => entries,
Err(_) => return Ok(0),
};
for entry in table_dirs.flatten() {
if !entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
continue;
}
let table_name = entry.file_name().to_string_lossy().to_string();
if let Some(snapshot_path) = self.find_latest_snapshot(&entry.path()) {
match self.load_table_snapshot(&table_name, &snapshot_path) {
Ok(source_lsn) => {
if source_lsn > max_header_lsn {
max_header_lsn = source_lsn;
}
}
Err(e) => {
eprintln!("Warning: Failed to load snapshot for {}: {}", table_name, e);
}
}
}
}
let snapshot_lsn = std::cmp::max(metadata_lsn, max_header_lsn);
Ok(snapshot_lsn)
}
fn find_latest_snapshot(&self, dir: &std::path::Path) -> Option<std::path::PathBuf> {
let mut snapshots: Vec<std::path::PathBuf> = std::fs::read_dir(dir)
.ok()?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("snapshot-") && n.ends_with(".bin"))
.unwrap_or(false)
})
.collect();
snapshots.sort();
snapshots.pop()
}
fn load_table_snapshot(
&self,
_table_name: &str,
snapshot_path: &std::path::Path,
) -> Result<u64> {
let mut reader = super::snapshot::SnapshotReader::open(snapshot_path)?;
let source_lsn = reader.source_lsn();
let schema = reader.schema().clone();
let table_name_lower = schema.table_name_lower.clone();
let version_store = Arc::new(VersionStore::with_visibility_checker(
schema.table_name.clone(),
schema.clone(),
Arc::clone(&self.registry) as Arc<dyn VisibilityChecker>,
));
reader.for_each(|_row_id, mut version| {
version.txn_id = super::RECOVERY_TRANSACTION_ID;
version_store.apply_recovered_version(version);
true
})?;
{
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas.entry(DEFAULT_SCHEMA.to_string()).or_default();
default_schema.insert(table_name_lower.clone(), schema);
}
{
let mut stores = self.version_stores.write().unwrap();
stores.insert(table_name_lower, version_store);
}
Ok(source_lsn)
}
fn replay_wal(&self, from_lsn: u64) -> Result<()> {
let pm = match self.persistence.as_ref() {
Some(pm) if pm.is_enabled() => pm,
_ => return Ok(()),
};
let result = pm.replay_two_phase(from_lsn, |entry| self.apply_wal_entry(entry));
match result {
Ok(info) => {
if info.skipped_entries > 0 {
eprintln!(
"Recovery: {} entries skipped (from aborted/uncommitted transactions)",
info.skipped_entries
);
}
self.populate_all_indexes();
Ok(())
}
Err(e) => Err(e),
}
}
fn populate_all_indexes(&self) {
let stores = self.version_stores.read().unwrap();
for store in stores.values() {
store.populate_all_indexes();
}
}
fn apply_wal_entry(&self, entry: crate::storage::mvcc::wal_manager::WALEntry) -> Result<()> {
use crate::storage::mvcc::persistence::{deserialize_row_version, IndexMetadata};
use crate::storage::mvcc::wal_manager::WALOperationType;
match entry.operation {
WALOperationType::CreateTable => {
if let Ok(schema) = self.deserialize_schema(&entry.data) {
let table_name = schema.table_name_lower.clone();
let schema_name = schema.schema_name_lower.clone();
{
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas.entry(schema_name.clone()).or_default();
default_schema.insert(table_name.clone(), schema.clone());
}
let is_telemetry_table = schema_name == "system"
&& (table_name == "logs"
|| table_name == "traces"
|| table_name == "metrics");
if is_telemetry_table {
let buffer = std::sync::Arc::new(parking_lot::RwLock::new(
std::collections::VecDeque::with_capacity(100_000),
));
let mut ring_buffers = self.ring_buffers.write().unwrap();
ring_buffers.insert(table_name, buffer);
} else {
let version_store = Arc::new(VersionStore::with_visibility_checker(
schema.table_name.clone(),
schema.clone(),
Arc::clone(&self.registry) as Arc<dyn VisibilityChecker>,
));
let mut stores = self.version_stores.write().unwrap();
stores.insert(table_name, version_store);
}
}
}
WALOperationType::DropTable => {
let table_name = entry.table_name.to_lowercase();
{
let mut schemas = self.schemas.write().unwrap();
if let Some(default_schema) = schemas.get_mut(DEFAULT_SCHEMA) {
default_schema.remove(&table_name);
}
}
{
let mut stores = self.version_stores.write().unwrap();
if let Some(store) = stores.remove(&table_name) {
store.close();
}
}
}
WALOperationType::CreateIndex => {
if let Ok(index_meta) = IndexMetadata::deserialize(&entry.data) {
let table_name = entry.table_name.to_lowercase();
if let Ok(store) = self.get_version_store(&table_name) {
let _ = store.create_index_from_metadata(&index_meta, true);
}
}
}
WALOperationType::DropIndex => {
if let Ok(index_name) = String::from_utf8(entry.data.clone()) {
let table_name = entry.table_name.to_lowercase();
if let Ok(store) = self.get_version_store(&table_name) {
let _ = store.drop_index(&index_name);
}
}
}
WALOperationType::Insert | WALOperationType::Update => {
if let Ok(row_version) = deserialize_row_version(&entry.data) {
let table_name = entry.table_name.to_lowercase();
if let Ok(store) = self.get_version_store(&table_name) {
store.apply_recovered_version(row_version);
}
}
}
WALOperationType::Delete => {
let table_name = entry.table_name.to_lowercase();
if let Ok(store) = self.get_version_store(&table_name) {
store.mark_deleted(entry.row_id, entry.txn_id);
}
}
WALOperationType::Commit => {
self.registry
.recover_committed_transaction(entry.txn_id, entry.lsn as i64);
}
WALOperationType::Rollback => {
}
WALOperationType::AlterTable => {
if let Err(e) = self.replay_alter_table(&entry.data) {
eprintln!("Warning: Failed to replay ALTER TABLE: {}", e);
}
}
WALOperationType::CreateView => {
if let Ok(view_def) = ViewDefinition::deserialize(&entry.data) {
let parts: Vec<&str> = entry.table_name.split('.').collect();
let (schema_name, view_name) = if parts.len() > 1 {
(parts[0], parts[1])
} else {
(DEFAULT_SCHEMA, parts[0])
};
let schema_lower = schema_name.to_lowercase();
let name_lower = view_name.to_lowercase();
let mut views = self.views.write().unwrap();
let schema_views = views.entry(schema_lower).or_default();
schema_views.insert(name_lower, Arc::new(view_def));
}
}
WALOperationType::DropView => {
if let Ok(_view_name) = String::from_utf8(entry.data.clone()) {
let parts: Vec<&str> = entry.table_name.split('.').collect();
let (schema_name, name) = if parts.len() > 1 {
(parts[0], parts[1])
} else {
(DEFAULT_SCHEMA, parts[0])
};
let schema_lower = schema_name.to_lowercase();
let name_lower = name.to_lowercase();
let mut views = self.views.write().unwrap();
if let Some(schema_views) = views.get_mut(&schema_lower) {
schema_views.remove(&name_lower);
}
}
}
}
Ok(())
}
fn deserialize_schema(&self, data: &[u8]) -> Result<Schema> {
use crate::core::SchemaColumn;
if data.len() < 4 {
return Err(Error::internal("schema data too short"));
}
let mut pos = 0;
if pos + 2 > data.len() {
return Err(Error::internal("invalid schema: missing table name length"));
}
let name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + name_len > data.len() {
return Err(Error::internal("invalid schema: missing table name"));
}
let table_name = String::from_utf8(data[pos..pos + name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid table name: {}", e)))?;
pos += name_len;
let mut schema_name = "public".to_string();
if pos + 2 <= data.len() {
let schema_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
if schema_name_len > 0 && pos + 2 + schema_name_len <= data.len() {
pos += 2;
schema_name = String::from_utf8(data[pos..pos + schema_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid schema name: {}", e)))?;
pos += schema_name_len;
} else if schema_name_len == 0 {
pos += 2;
}
}
if pos + 2 > data.len() {
return Err(Error::internal("invalid schema: missing column count"));
}
let column_count = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut columns = Vec::with_capacity(column_count);
for i in 0..column_count {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid schema: missing column name length",
));
}
let col_name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + col_name_len > data.len() {
return Err(Error::internal("invalid schema: missing column name"));
}
let col_name = String::from_utf8(data[pos..pos + col_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid column name: {}", e)))?;
pos += col_name_len;
if pos >= data.len() {
return Err(Error::internal("invalid schema: missing data type"));
}
let data_type = DataType::from_u8(data[pos]).unwrap_or(DataType::Null);
pos += 1;
if pos >= data.len() {
return Err(Error::internal("invalid schema: missing nullable flag"));
}
let nullable = data[pos] != 0;
pos += 1;
if pos >= data.len() {
return Err(Error::internal("invalid schema: missing primary key flag"));
}
let primary_key = data[pos] != 0;
pos += 1;
let auto_increment = if pos < data.len() {
let val = data[pos] != 0;
pos += 1;
val
} else {
false
};
let default_expr = if pos + 2 <= data.len() {
let expr_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if expr_len > 0 && pos + expr_len <= data.len() {
let expr = String::from_utf8(data[pos..pos + expr_len].to_vec())
.map_err(|e| Error::internal(format!("invalid default expr: {}", e)))?;
pos += expr_len;
Some(expr)
} else {
None
}
} else {
None
};
let check_expr = if pos + 2 <= data.len() {
let expr_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if expr_len > 0 && pos + expr_len <= data.len() {
let expr = String::from_utf8(data[pos..pos + expr_len].to_vec())
.map_err(|e| Error::internal(format!("invalid check expr: {}", e)))?;
pos += expr_len;
Some(expr)
} else {
None
}
} else {
None
};
columns.push(SchemaColumn::with_constraints(
i,
&col_name,
data_type,
nullable,
primary_key,
auto_increment,
default_expr,
check_expr,
));
}
let mut foreign_keys = Vec::new();
let mut referenced_by = Vec::new();
if pos + 2 <= data.len() {
let fk_count = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
for _ in 0..fk_count {
if pos + 2 > data.len() {
break;
}
let column_id = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + 2 > data.len() {
break;
}
let ref_tbl_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + ref_tbl_len > data.len() {
break;
}
let referenced_table =
String::from_utf8_lossy(&data[pos..pos + ref_tbl_len]).into_owned();
pos += ref_tbl_len;
if pos + 2 > data.len() {
break;
}
let ref_col_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + ref_col_len > data.len() {
break;
}
let referenced_column_name =
String::from_utf8_lossy(&data[pos..pos + ref_col_len]).into_owned();
pos += ref_col_len;
if pos + 2 > data.len() {
break;
}
let on_delete_val = data[pos];
pos += 1;
let on_update_val = data[pos];
pos += 1;
let parse_action = |val: u8| match val {
0 => crate::parser::ast::ReferentialAction::Restrict,
1 => crate::parser::ast::ReferentialAction::Cascade,
2 => crate::parser::ast::ReferentialAction::SetNull,
_ => crate::parser::ast::ReferentialAction::NoAction,
};
foreign_keys.push(crate::core::schema::ForeignKeyMetadata {
column_id,
referenced_table,
referenced_column_name,
on_delete: parse_action(on_delete_val),
on_update: parse_action(on_update_val),
});
}
if pos + 2 <= data.len() {
let ref_by_count =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
for _ in 0..ref_by_count {
if pos + 2 > data.len() {
break;
}
let ref_by_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + ref_by_len > data.len() {
break;
}
let ref_by = String::from_utf8_lossy(&data[pos..pos + ref_by_len]).into_owned();
pos += ref_by_len;
referenced_by.push(ref_by);
}
}
}
let mut schema = Schema::new(&table_name, columns);
schema.schema_name = schema_name;
schema.schema_name_lower = schema.schema_name.to_lowercase();
schema.foreign_keys = foreign_keys;
schema.referenced_by = referenced_by;
Ok(schema)
}
pub fn close_engine(&self) -> Result<()> {
if self
.open
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Ok(()); }
self.registry.stop_accepting_transactions();
let stores = self.version_stores.read().unwrap();
for store in stores.values() {
store.close();
}
drop(stores);
if let Some(ref pm) = *self.persistence {
if pm.is_enabled() {
if let Err(e) = pm.stop() {
eprintln!("Warning: Error stopping persistence: {}", e);
}
}
}
{
let mut file_lock = self.file_lock.lock().unwrap();
*file_lock = None;
}
Ok(())
}
pub fn is_open(&self) -> bool {
self.open.load(Ordering::Acquire)
}
pub fn get_path(&self) -> &str {
&self.path
}
pub fn config(&self) -> Config {
self.config.read().unwrap().clone()
}
pub fn update_engine_config(&self, config: Config) -> Result<()> {
let current = self.config.read().unwrap();
if config.path != current.path {
return Err(Error::internal("cannot change database path after opening"));
}
drop(current);
*self.config.write().unwrap() = config;
Ok(())
}
pub fn registry(&self) -> Arc<TransactionRegistry> {
Arc::clone(&self.registry)
}
fn replay_alter_table(&self, data: &[u8]) -> Result<()> {
if data.is_empty() {
return Err(Error::internal("empty ALTER TABLE data"));
}
let op_type = data[0];
let mut pos = 1;
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid ALTER TABLE data: missing table name length",
));
}
let table_name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + table_name_len > data.len() {
return Err(Error::internal(
"invalid ALTER TABLE data: missing table name",
));
}
let table_name = String::from_utf8(data[pos..pos + table_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid table name: {}", e)))?;
pos += table_name_len;
match op_type {
1 => {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid AddColumn data: missing column name length",
));
}
let col_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + col_name_len > data.len() {
return Err(Error::internal(
"invalid AddColumn data: missing column name",
));
}
let column_name = String::from_utf8(data[pos..pos + col_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid column name: {}", e)))?;
pos += col_name_len;
if pos >= data.len() {
return Err(Error::internal("invalid AddColumn data: missing data type"));
}
let data_type = DataType::from_u8(data[pos])
.ok_or_else(|| Error::internal("invalid data type byte"))?;
pos += 1;
if pos >= data.len() {
return Err(Error::internal("invalid AddColumn data: missing nullable"));
}
let nullable = data[pos] != 0;
pos += 1;
let default_expr = if pos + 2 <= data.len() {
let expr_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if expr_len > 0 && pos + expr_len <= data.len() {
let expr = String::from_utf8(data[pos..pos + expr_len].to_vec())
.map_err(|e| Error::internal(format!("invalid default expr: {}", e)))?;
Some(expr)
} else {
None
}
} else {
None
};
self.create_column_with_default(
&table_name,
&column_name,
data_type,
nullable,
default_expr,
)?;
}
2 => {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid DropColumn data: missing column name length",
));
}
let col_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + col_name_len > data.len() {
return Err(Error::internal(
"invalid DropColumn data: missing column name",
));
}
let column_name = String::from_utf8(data[pos..pos + col_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid column name: {}", e)))?;
self.drop_column(&table_name, &column_name)?;
}
3 => {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid RenameColumn data: missing old name length",
));
}
let old_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + old_name_len > data.len() {
return Err(Error::internal(
"invalid RenameColumn data: missing old name",
));
}
let old_name = String::from_utf8(data[pos..pos + old_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid old column name: {}", e)))?;
pos += old_name_len;
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid RenameColumn data: missing new name length",
));
}
let new_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + new_name_len > data.len() {
return Err(Error::internal(
"invalid RenameColumn data: missing new name",
));
}
let new_name = String::from_utf8(data[pos..pos + new_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid new column name: {}", e)))?;
self.rename_column(&table_name, &old_name, &new_name)?;
}
4 => {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid ModifyColumn data: missing column name length",
));
}
let col_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + col_name_len > data.len() {
return Err(Error::internal(
"invalid ModifyColumn data: missing column name",
));
}
let column_name = String::from_utf8(data[pos..pos + col_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid column name: {}", e)))?;
pos += col_name_len;
if pos >= data.len() {
return Err(Error::internal(
"invalid ModifyColumn data: missing data type",
));
}
let data_type = DataType::from_u8(data[pos])
.ok_or_else(|| Error::internal("invalid data type byte"))?;
pos += 1;
if pos >= data.len() {
return Err(Error::internal(
"invalid ModifyColumn data: missing nullable",
));
}
let nullable = data[pos] != 0;
pos += 1;
let mut auto_increment_opt = None;
if pos < data.len() {
let ai_present = data[pos] != 0;
pos += 1;
if ai_present && pos < data.len() {
auto_increment_opt = Some(data[pos] != 0);
pos += 1;
}
}
let mut check_expr_opt = None;
if pos < data.len() {
let ce_present = data[pos] != 0;
pos += 1;
if ce_present && pos < data.len() {
let ce_is_some = data[pos] != 0;
pos += 1;
if ce_is_some {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid ModifyColumn data: missing check_expr length",
));
}
let ce_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + ce_len > data.len() {
return Err(Error::internal(
"invalid ModifyColumn data: missing check_expr string",
));
}
let expr = String::from_utf8(data[pos..pos + ce_len].to_vec())
.map_err(|e| {
Error::internal(format!("invalid check_expr string: {}", e))
})?;
check_expr_opt = Some(Some(expr));
let _ = ce_len; } else {
check_expr_opt = Some(None);
}
}
}
self.modify_column(
&table_name,
&column_name,
data_type,
nullable,
auto_increment_opt,
check_expr_opt,
)?;
}
5 => {
if pos + 2 > data.len() {
return Err(Error::internal(
"invalid RenameTable data: missing new name length",
));
}
let new_name_len =
u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + new_name_len > data.len() {
return Err(Error::internal(
"invalid RenameTable data: missing new name",
));
}
let new_table_name = String::from_utf8(data[pos..pos + new_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid new table name: {}", e)))?;
self.rename_table(&table_name, &new_table_name)?;
}
_ => {
return Err(Error::internal(format!(
"unknown ALTER TABLE operation type: {}",
op_type
)));
}
}
Ok(())
}
fn should_skip_wal(&self) -> bool {
self.loading_from_disk.load(Ordering::Acquire)
}
fn record_ddl(&self, table_name: &str, op: WALOperationType, schema_data: &[u8]) {
if self.should_skip_wal() {
return;
}
if let Some(ref pm) = *self.persistence {
if pm.is_enabled() {
if let Err(e) = pm.record_ddl_operation(table_name, op, schema_data) {
eprintln!("Warning: Failed to record DDL operation in WAL: {}", e);
}
}
}
}
pub fn serialize_schema(schema: &Schema) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&(schema.table_name.len() as u16).to_le_bytes());
buf.extend_from_slice(schema.table_name.as_bytes());
buf.extend_from_slice(&(schema.schema_name.len() as u16).to_le_bytes());
buf.extend_from_slice(schema.schema_name.as_bytes());
buf.extend_from_slice(&(schema.columns.len() as u16).to_le_bytes());
for col in &schema.columns {
buf.extend_from_slice(&(col.name.len() as u16).to_le_bytes());
buf.extend_from_slice(col.name.as_bytes());
buf.push(col.data_type.as_u8());
buf.push(if col.nullable { 1 } else { 0 });
buf.push(if col.primary_key { 1 } else { 0 });
buf.push(if col.auto_increment { 1 } else { 0 });
if let Some(ref default_expr) = col.default_expr {
buf.extend_from_slice(&(default_expr.len() as u16).to_le_bytes());
buf.extend_from_slice(default_expr.as_bytes());
} else {
buf.extend_from_slice(&0u16.to_le_bytes());
}
if let Some(ref check_expr) = col.check_expr {
buf.extend_from_slice(&(check_expr.len() as u16).to_le_bytes());
buf.extend_from_slice(check_expr.as_bytes());
} else {
buf.extend_from_slice(&0u16.to_le_bytes());
}
}
buf.extend_from_slice(&(schema.foreign_keys.len() as u16).to_le_bytes());
for fk in &schema.foreign_keys {
buf.extend_from_slice(&(fk.column_id as u16).to_le_bytes());
buf.extend_from_slice(&(fk.referenced_table.len() as u16).to_le_bytes());
buf.extend_from_slice(fk.referenced_table.as_bytes());
buf.extend_from_slice(&(fk.referenced_column_name.len() as u16).to_le_bytes());
buf.extend_from_slice(fk.referenced_column_name.as_bytes());
let on_delete_val = match fk.on_delete {
crate::parser::ast::ReferentialAction::Restrict => 0,
crate::parser::ast::ReferentialAction::Cascade => 1,
crate::parser::ast::ReferentialAction::SetNull => 2,
crate::parser::ast::ReferentialAction::NoAction => 3,
};
buf.push(on_delete_val);
let on_update_val = match fk.on_update {
crate::parser::ast::ReferentialAction::Restrict => 0,
crate::parser::ast::ReferentialAction::Cascade => 1,
crate::parser::ast::ReferentialAction::SetNull => 2,
crate::parser::ast::ReferentialAction::NoAction => 3,
};
buf.push(on_update_val);
}
buf.extend_from_slice(&(schema.referenced_by.len() as u16).to_le_bytes());
for ref_by in &schema.referenced_by {
buf.extend_from_slice(&(ref_by.len() as u16).to_le_bytes());
buf.extend_from_slice(ref_by.as_bytes());
}
buf
}
pub fn create_table(&self, schema: Schema) -> Result<Schema> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name = schema.table_name_lower.clone();
let schema_name = schema.schema_name_lower.clone();
tracing::debug!(
"create_table called for schema: {}, table: {}",
schema_name,
table_name
);
{
let schemas = self.schemas.read().unwrap();
let empty_map = FxHashMap::default();
let default_schema = schemas.get(&schema_name).unwrap_or(&empty_map);
if default_schema.contains_key(&table_name) {
return Err(Error::TableAlreadyExists);
}
}
self.validate_schema(&schema)?;
{
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas.entry(schema_name.clone()).or_default();
default_schema.insert(table_name.clone(), schema.clone());
}
let is_telemetry_table = schema_name == "system"
&& (table_name == "logs" || table_name == "traces" || table_name == "metrics");
if is_telemetry_table {
let buffer = std::sync::Arc::new(parking_lot::RwLock::new(
std::collections::VecDeque::with_capacity(100_000),
));
{
let mut ring_buffers = self.ring_buffers.write().unwrap();
ring_buffers.insert(table_name.clone(), buffer);
}
} else {
let version_store = Arc::new(VersionStore::with_visibility_checker(
schema.table_name.clone(),
schema.clone(),
Arc::clone(&self.registry) as Arc<dyn VisibilityChecker>,
));
let mut stores = self.version_stores.write().unwrap();
stores.insert(table_name.clone(), version_store);
}
let schema_data = Self::serialize_schema(&schema);
self.record_ddl(
&schema.table_name,
WALOperationType::CreateTable,
&schema_data,
);
Ok(schema)
}
pub fn drop_table_internal(&self, name: &str) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let full_name = name;
let (schema_name, table_name) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schema_name_lower = schema_name.to_lowercase();
let table_name_lower = table_name.to_lowercase();
let is_telemetry_table = schema_name_lower == "system"
&& (table_name_lower == "logs"
|| table_name_lower == "traces"
|| table_name_lower == "metrics");
if is_telemetry_table {
return Err(Error::NotSupportedMessage(
"Cannot drop system telemetry tables".to_string(),
));
}
{
let schemas = self.schemas.read().unwrap();
let empty_map = FxHashMap::default();
let default_schema = schemas.get(&schema_name_lower).unwrap_or(&empty_map);
if !default_schema.contains_key(&table_name_lower) {
return Err(Error::TableNotFound);
}
}
self.record_ddl(name, WALOperationType::DropTable, &[]);
{
let mut stores = self.version_stores.write().unwrap();
if let Some(store) = stores.remove(&table_name_lower) {
store.close();
}
}
{
let mut schemas = self.schemas.write().unwrap();
if let Some(default_schema) = schemas.get_mut(&schema_name_lower) {
default_schema.remove(&table_name_lower);
}
}
Ok(())
}
pub fn get_version_store(&self, name: &str) -> Result<Arc<VersionStore>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name = name.to_lowercase();
let stores = self.version_stores.read().unwrap();
stores.get(&table_name).cloned().ok_or(Error::TableNotFound)
}
pub fn create_column(
&self,
table_name: &str,
column_name: &str,
data_type: DataType,
nullable: bool,
) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name_lower = table_name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas
.get_mut(DEFAULT_SCHEMA)
.ok_or(Error::TableNotFound)?;
let schema = default_schema
.get_mut(&table_name_lower)
.ok_or(Error::TableNotFound)?;
if schema.has_column(column_name) {
return Err(Error::DuplicateColumn);
}
let column = crate::core::SchemaColumn::new(
schema.columns.len(),
column_name,
data_type,
nullable,
false,
);
schema.add_column(column)?;
let stores = self.version_stores.read().unwrap();
if let Some(store) = stores.get(&table_name_lower) {
let mut vs_schema = store.schema_mut();
let col = crate::core::SchemaColumn::new(
vs_schema.columns.len(),
column_name,
data_type,
nullable,
false,
);
vs_schema.add_column(col)?;
}
Ok(())
}
pub fn create_column_with_default(
&self,
table_name: &str,
column_name: &str,
data_type: DataType,
nullable: bool,
default_expr: Option<String>,
) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name_lower = table_name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas
.get_mut(DEFAULT_SCHEMA)
.ok_or(Error::TableNotFound)?;
let schema = default_schema
.get_mut(&table_name_lower)
.ok_or(Error::TableNotFound)?;
if schema.has_column(column_name) {
return Err(Error::DuplicateColumn);
}
let mut column = crate::core::SchemaColumn::new(
schema.columns.len(),
column_name,
data_type,
nullable,
false,
);
column.default_expr = default_expr.clone();
schema.add_column(column)?;
let stores = self.version_stores.read().unwrap();
if let Some(store) = stores.get(&table_name_lower) {
let mut vs_schema = store.schema_mut();
let mut col = crate::core::SchemaColumn::new(
vs_schema.columns.len(),
column_name,
data_type,
nullable,
false,
);
col.default_expr = default_expr;
vs_schema.add_column(col)?;
}
Ok(())
}
pub fn drop_column(&self, table_name: &str, column_name: &str) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name_lower = table_name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas
.get_mut(DEFAULT_SCHEMA)
.ok_or(Error::TableNotFound)?;
let schema = default_schema
.get_mut(&table_name_lower)
.ok_or(Error::TableNotFound)?;
if let Some((_, col)) = schema.find_column(column_name) {
if col.primary_key {
return Err(Error::CannotDropPrimaryKey);
}
} else {
return Err(Error::ColumnNotFound);
}
schema.remove_column(column_name)?;
let stores = self.version_stores.read().unwrap();
if let Some(store) = stores.get(&table_name_lower) {
let mut vs_schema = store.schema_mut();
vs_schema.remove_column(column_name)?;
}
Ok(())
}
pub fn rename_column(&self, table_name: &str, old_name: &str, new_name: &str) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name_lower = table_name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas
.get_mut(DEFAULT_SCHEMA)
.ok_or(Error::TableNotFound)?;
let schema = default_schema
.get_mut(&table_name_lower)
.ok_or(Error::TableNotFound)?;
if !schema.has_column(old_name) {
return Err(Error::ColumnNotFound);
}
if schema.has_column(new_name) {
return Err(Error::DuplicateColumn);
}
schema.rename_column(old_name, new_name)?;
let stores = self.version_stores.read().unwrap();
if let Some(store) = stores.get(&table_name_lower) {
let mut vs_schema = store.schema_mut();
vs_schema.rename_column(old_name, new_name)?;
}
Ok(())
}
pub fn modify_column(
&self,
table_name: &str,
column_name: &str,
data_type: DataType,
nullable: bool,
auto_increment: Option<bool>,
check_expr: Option<Option<String>>,
) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let table_name_lower = table_name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas
.get_mut(DEFAULT_SCHEMA)
.ok_or(Error::TableNotFound)?;
let schema = default_schema
.get_mut(&table_name_lower)
.ok_or(Error::TableNotFound)?;
if !schema.has_column(column_name) {
return Err(Error::ColumnNotFound);
}
schema.modify_column(
column_name,
Some(data_type),
Some(nullable),
auto_increment,
check_expr.clone(),
)?;
let stores = self.version_stores.read().unwrap();
if let Some(store) = stores.get(&table_name_lower) {
let mut vs_schema = store.schema_mut();
vs_schema.modify_column(
column_name,
Some(data_type),
Some(nullable),
auto_increment,
check_expr,
)?;
}
Ok(())
}
pub fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let old_name_lower = old_name.to_lowercase();
let new_name_lower = new_name.to_lowercase();
{
let schemas = self.schemas.read().unwrap();
let empty_map = FxHashMap::default();
let default_schema = schemas.get(DEFAULT_SCHEMA).unwrap_or(&empty_map);
if !default_schema.contains_key(&old_name_lower) {
return Err(Error::TableNotFound);
}
if default_schema.contains_key(&new_name_lower) {
return Err(Error::TableAlreadyExists);
}
}
{
let mut schemas = self.schemas.write().unwrap();
if let Some(default_schema) = schemas.get_mut(DEFAULT_SCHEMA) {
if let Some(mut schema) = default_schema.remove(&old_name_lower) {
schema.table_name = new_name.to_string();
default_schema.insert(new_name_lower.clone(), schema);
}
}
}
{
let mut stores = self.version_stores.write().unwrap();
if let Some(store) = stores.remove(&old_name_lower) {
{
let mut vs_schema = store.schema_mut();
vs_schema.table_name = new_name.to_string();
}
stores.insert(new_name_lower, store);
}
}
Ok(())
}
fn validate_schema(&self, schema: &Schema) -> Result<()> {
if schema.table_name.is_empty() {
return Err(Error::internal("schema missing table name"));
}
let mut seen_names = std::collections::HashSet::new();
for col in &schema.columns {
if col.name.is_empty() {
return Err(Error::internal("column name cannot be empty"));
}
if col.primary_key && col.data_type != DataType::Integer {
return Err(Error::internal(format!(
"primary key column {} must be of type INTEGER",
col.name
)));
}
if !seen_names.insert(col.name.to_lowercase()) {
return Err(Error::DuplicateColumn);
}
}
Ok(())
}
fn create_engine_operations(&self) -> Arc<dyn TransactionEngineOperations> {
Arc::new(EngineOperations::new(self))
}
pub fn create_view(
&self,
schema_name: &str,
name: &str,
query: String,
if_not_exists: bool,
) -> Result<()> {
use crate::storage::mvcc::wal_manager::WALOperationType;
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let schema_lower = schema_name.to_lowercase();
let name_lower = name.to_lowercase();
let mut views = self.views.write().unwrap();
let schema_views = views.entry(schema_lower.clone()).or_default();
if schema_views.contains_key(&name_lower) {
if if_not_exists {
return Ok(());
}
return Err(Error::ViewAlreadyExists(name.to_string()));
}
let schemas = self.schemas.read().unwrap();
let empty_map = FxHashMap::default();
let default_schema = schemas.get(&schema_lower).unwrap_or(&empty_map);
if default_schema.contains_key(&name_lower) {
return Err(Error::internal(format!(
"cannot create view '{}': a table with the same name exists in schema '{}'",
name, schema_name
)));
}
drop(schemas);
let view_def = Arc::new(ViewDefinition::new(name, query));
schema_views.insert(name_lower.clone(), Arc::clone(&view_def));
drop(views);
let full_name = if schema_lower == DEFAULT_SCHEMA {
name_lower.clone()
} else {
format!("{}.{}", schema_lower, name_lower)
};
let data = view_def.serialize();
self.record_ddl(&full_name, WALOperationType::CreateView, &data);
Ok(())
}
pub fn drop_view(&self, schema_name: &str, name: &str, if_exists: bool) -> Result<()> {
use crate::storage::mvcc::wal_manager::WALOperationType;
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let schema_lower = schema_name.to_lowercase();
let name_lower = name.to_lowercase();
let mut views = self.views.write().unwrap();
let removed = if let Some(schema_views) = views.get_mut(&schema_lower) {
schema_views.remove(&name_lower)
} else {
None
};
if removed.is_none() {
if if_exists {
return Ok(());
}
return Err(Error::ViewNotFound(name.to_string()));
}
drop(views);
let full_name = if schema_lower == DEFAULT_SCHEMA {
name_lower.clone()
} else {
format!("{}.{}", schema_lower, name_lower)
};
self.record_ddl(&full_name, WALOperationType::DropView, name.as_bytes());
Ok(())
}
pub fn view_exists(&self, schema_name: &str, name: &str) -> Result<bool> {
self.view_exists_lowercase(&schema_name.to_lowercase(), &name.to_lowercase())
}
#[inline]
pub fn view_exists_lowercase(&self, schema_lower: &str, name_lower: &str) -> Result<bool> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let views = self.views.read().unwrap();
Ok(views
.get(schema_lower)
.is_some_and(|s| s.contains_key(name_lower)))
}
pub fn get_view(&self, schema_name: &str, name: &str) -> Result<Option<Arc<ViewDefinition>>> {
self.get_view_lowercase(&schema_name.to_lowercase(), &name.to_lowercase())
}
#[inline]
pub fn get_view_lowercase(
&self,
schema_lower: &str,
name_lower: &str,
) -> Result<Option<Arc<ViewDefinition>>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let views = self.views.read().unwrap();
Ok(views
.get(schema_lower)
.and_then(|s| s.get(name_lower))
.cloned()) }
pub fn list_views(&self) -> Result<Vec<(String, String)>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let views = self.views.read().unwrap();
let mut result = Vec::new();
for (schema_name, schema_views) in views.iter() {
for v in schema_views.values() {
result.push((schema_name.clone(), v.original_name.clone()));
}
}
Ok(result)
}
}
impl Engine for MVCCEngine {
fn open(&mut self) -> Result<()> {
MVCCEngine::open_engine(self)
}
fn close(&mut self) -> Result<()> {
MVCCEngine::close_engine(self)
}
fn begin_transaction(&self) -> Result<Box<dyn Transaction>> {
self.begin_transaction_with_level(self.get_isolation_level())
}
fn begin_transaction_with_level(&self, level: IsolationLevel) -> Result<Box<dyn Transaction>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let (txn_id, begin_seq) = self.registry.begin_transaction();
if txn_id == INVALID_TRANSACTION_ID {
return Err(Error::internal(
"transaction registry is not accepting new transactions",
));
}
let mut txn = MvccTransaction::new(txn_id, begin_seq, Arc::clone(&self.registry));
if level != IsolationLevel::ReadCommitted {
txn.set_isolation_level(level)?;
}
let engine_ops = self.create_engine_operations();
txn.set_engine_operations(engine_ops);
Ok(Box::new(txn))
}
fn path(&self) -> Option<&str> {
if self.path == "memory://" {
None
} else {
Some(&self.path)
}
}
fn table_exists(&self, table_name: &str) -> Result<bool> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let full_name = table_name;
let (schema_name, table_name) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schemas = self.schemas.read().unwrap();
let schema = schemas.get(&schema_name.to_lowercase());
Ok(schema.is_some_and(|s| s.contains_key(&table_name.to_lowercase())))
}
fn view_exists(&self, view_name: &str) -> Result<bool> {
let parts: Vec<&str> = view_name.split('.').collect();
let (schema_name, name) = if parts.len() > 1 {
(parts[0], parts[1])
} else {
(DEFAULT_SCHEMA, parts[0])
};
MVCCEngine::view_exists(self, schema_name, name)
}
fn index_exists(&self, index_name: &str, table_name: &str) -> Result<bool> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let store = self.get_version_store(table_name)?;
Ok(store.index_exists(index_name))
}
fn get_index(&self, table_name: &str, index_name: &str) -> Result<Box<dyn Index>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let store = self.get_version_store(table_name)?;
if store.index_exists(index_name) {
return Err(Error::internal(format!(
"index retrieval not yet implemented: {}.{}",
table_name, index_name
)));
}
Err(Error::internal(format!(
"index not found: {}.{}",
table_name, index_name
)))
}
fn get_table_schema(&self, table_name: &str) -> Result<Schema> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let full_name = table_name;
let (schema_name, table_name) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schemas = self.schemas.read().unwrap();
let schema = schemas
.get(&schema_name.to_lowercase())
.ok_or(Error::TableNotFound)?;
schema
.get(&table_name.to_lowercase())
.cloned()
.ok_or(Error::TableNotFound)
}
fn update_table_schema(&self, table_name: &str, schema: Schema) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let full_name = table_name;
let (schema_name, table_name_str) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schema_name_lower = schema_name.to_lowercase();
let table_name_lower = table_name_str.to_lowercase();
{
let mut schemas = self.schemas.write().unwrap();
let default_schema = schemas
.get_mut(&schema_name_lower)
.ok_or(Error::TableNotFound)?;
if !default_schema.contains_key(&table_name_lower) {
return Err(Error::TableNotFound);
}
default_schema.insert(table_name_lower.clone(), schema.clone());
}
if let Ok(store) = self.get_version_store(&table_name_lower) {
let mut store_schema = store.schema_mut();
*store_schema = schema.clone();
}
if let Some(ref _pm) = *self.persistence {
let _serialized = Self::serialize_schema(&schema);
}
Ok(())
}
fn list_table_indexes(&self, table_name: &str) -> Result<FxHashMap<String, String>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let store = self.get_version_store(table_name)?;
let mut result = FxHashMap::default();
for index_name in store.list_indexes() {
result.insert(index_name, "BTree".to_string());
}
Ok(result)
}
fn get_all_indexes(&self, table_name: &str) -> Result<Vec<std::sync::Arc<dyn Index>>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let store = self.get_version_store(table_name)?;
Ok(store.get_all_indexes())
}
fn get_isolation_level(&self) -> IsolationLevel {
self.registry.get_global_isolation_level()
}
fn set_isolation_level(&mut self, level: IsolationLevel) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
self.registry.set_global_isolation_level(level);
Ok(())
}
fn get_config(&self) -> Config {
self.config.read().expect("config lock poisoned").clone()
}
fn update_config(&mut self, config: Config) -> Result<()> {
self.update_engine_config(config)
}
fn create_snapshot(&self) -> Result<()> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let pm = match self.persistence.as_ref() {
Some(pm) if pm.is_enabled() => pm,
_ => return Ok(()), };
let snapshot_lsn = pm.create_checkpoint(vec![])?;
let snapshot_commit_seq = self.registry.current_commit_sequence();
let snapshot_dir = pm.path().join("snapshots");
if let Err(e) = std::fs::create_dir_all(&snapshot_dir) {
return Err(Error::internal(format!(
"failed to create snapshot directory: {}",
e
)));
}
let schemas = self.schemas.read().unwrap();
let stores = self.version_stores.read().unwrap();
let mut pending_snapshots: Vec<(std::path::PathBuf, std::path::PathBuf, String)> =
Vec::new();
let mut all_succeeded = true;
let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S%.3f").to_string();
if let Some(default_schema) = schemas.get(DEFAULT_SCHEMA) {
for (table_name, schema) in default_schema.iter() {
if let Some(store) = stores.get(table_name) {
let table_snapshot_dir = snapshot_dir.join(table_name);
if let Err(e) = std::fs::create_dir_all(&table_snapshot_dir) {
eprintln!(
"Warning: Failed to create snapshot directory for {}: {}",
table_name, e
);
all_succeeded = false;
break;
}
let final_path = table_snapshot_dir.join(format!("snapshot-{}.bin", timestamp));
let temp_path =
table_snapshot_dir.join(format!("snapshot-{}.bin.tmp", timestamp));
let mut writer = match super::snapshot::SnapshotWriter::with_source_lsn(
&temp_path,
snapshot_lsn,
) {
Ok(w) => w,
Err(e) => {
eprintln!(
"Warning: Failed to create snapshot writer for {}: {}",
table_name, e
);
all_succeeded = false;
break;
}
};
if let Err(e) = writer.write_schema(schema) {
eprintln!("Warning: Failed to write schema for {}: {}", table_name, e);
writer.fail();
all_succeeded = false;
break;
}
let mut write_error = false;
store.for_each_committed_version_with_cutoff(
|_row_id, version| {
let mut snapshot_version = version.clone();
snapshot_version.txn_id = -1;
if let Err(e) = writer.append_row(&snapshot_version) {
eprintln!(
"Warning: Failed to write row {} to snapshot: {}",
_row_id, e
);
write_error = true;
return false; }
true
},
snapshot_commit_seq,
);
if write_error {
writer.fail();
all_succeeded = false;
break;
}
if let Err(e) = writer.finalize() {
eprintln!(
"Warning: Failed to finalize snapshot for {}: {}",
table_name, e
);
writer.fail();
all_succeeded = false;
break;
}
pending_snapshots.push((temp_path, final_path, table_name.clone()));
}
}
let mut renamed_successfully: Vec<(std::path::PathBuf, std::path::PathBuf)> =
Vec::new();
if all_succeeded {
let mut dirs_to_sync: std::collections::HashSet<std::path::PathBuf> =
std::collections::HashSet::new();
for (temp_path, final_path, table_name) in &pending_snapshots {
if let Err(e) = std::fs::rename(temp_path, final_path) {
eprintln!(
"Warning: Failed to rename snapshot for {}: {}",
table_name, e
);
for (orig_temp, renamed_final) in renamed_successfully.iter().rev() {
if let Err(rollback_err) = std::fs::rename(renamed_final, orig_temp) {
eprintln!(
"Critical: Failed to rollback snapshot rename {:?} -> {:?}: {}",
renamed_final, orig_temp, rollback_err
);
}
}
all_succeeded = false;
break;
}
renamed_successfully.push((temp_path.clone(), final_path.clone()));
if let Some(parent) = final_path.parent() {
dirs_to_sync.insert(parent.to_path_buf());
}
}
if all_succeeded {
for dir in &dirs_to_sync {
if let Ok(dir_file) = std::fs::File::open(dir) {
let _ = dir_file.sync_all();
}
}
}
}
}
if !all_succeeded {
for (temp_path, _, _) in &pending_snapshots {
let _ = std::fs::remove_file(temp_path);
}
eprintln!("Warning: Snapshot creation failed, all temp files cleaned up");
return Ok(());
}
let meta_path = snapshot_dir.join("snapshot_meta.bin");
if let Err(e) = write_snapshot_metadata(&meta_path, snapshot_lsn) {
eprintln!("Warning: Failed to write snapshot metadata: {}", e);
return Ok(()); }
if snapshot_lsn > 0 {
if let Err(e) = pm.truncate_wal(snapshot_lsn) {
eprintln!("Warning: Failed to truncate WAL after snapshot: {}", e);
}
}
let keep_count = pm.keep_count();
if keep_count > 0 {
for (_, _, table_name) in &pending_snapshots {
if let Some(default_schema) = schemas.get(DEFAULT_SCHEMA) {
if let Some(schema) = default_schema.get(table_name) {
let disk_store = super::snapshot::DiskVersionStore::new(
&snapshot_dir,
table_name,
schema,
);
if let Ok(disk_store) = disk_store {
if let Err(e) = disk_store.cleanup_old_snapshots(keep_count) {
eprintln!(
"Warning: Failed to cleanup old snapshots for {}: {}",
table_name, e
);
}
}
}
}
}
}
Ok(())
}
fn record_create_index(
&self,
table_name: &str,
index_name: &str,
column_names: &[String],
is_unique: bool,
index_type: crate::core::IndexType,
) {
if self.should_skip_wal() {
return;
}
let schema = match self.get_table_schema(table_name) {
Ok(s) => s,
Err(_) => return,
};
let mut column_ids = Vec::with_capacity(column_names.len());
let mut data_types = Vec::with_capacity(column_names.len());
for col_name in column_names {
let col_name_lower = col_name.to_lowercase();
if let Some((idx, col)) = schema
.columns
.iter()
.enumerate()
.find(|(_, c)| c.name.to_lowercase() == col_name_lower)
{
column_ids.push(idx as i32);
data_types.push(col.data_type);
} else {
return;
}
}
let index_meta = super::persistence::IndexMetadata {
name: index_name.to_string(),
table_name: table_name.to_string(),
column_names: column_names.to_vec(),
column_ids,
data_types,
is_unique,
index_type,
};
let data = index_meta.serialize();
self.record_ddl(table_name, WALOperationType::CreateIndex, &data);
}
fn record_drop_index(&self, table_name: &str, index_name: &str) {
if self.should_skip_wal() {
return;
}
self.record_ddl(
table_name,
WALOperationType::DropIndex,
index_name.as_bytes(),
);
}
fn record_alter_table_add_column(
&self,
table_name: &str,
column_name: &str,
data_type: crate::core::DataType,
nullable: bool,
default_expr: Option<&str>,
) {
if self.should_skip_wal() {
return;
}
let mut data = Vec::new();
data.push(1u8);
data.extend_from_slice(&(table_name.len() as u16).to_le_bytes());
data.extend_from_slice(table_name.as_bytes());
data.extend_from_slice(&(column_name.len() as u16).to_le_bytes());
data.extend_from_slice(column_name.as_bytes());
data.push(data_type as u8);
data.push(if nullable { 1 } else { 0 });
if let Some(expr) = default_expr {
data.extend_from_slice(&(expr.len() as u16).to_le_bytes());
data.extend_from_slice(expr.as_bytes());
} else {
data.extend_from_slice(&0u16.to_le_bytes());
}
self.record_ddl(table_name, WALOperationType::AlterTable, &data);
}
fn record_alter_table_drop_column(&self, table_name: &str, column_name: &str) {
if self.should_skip_wal() {
return;
}
let mut data = Vec::new();
data.push(2u8);
data.extend_from_slice(&(table_name.len() as u16).to_le_bytes());
data.extend_from_slice(table_name.as_bytes());
data.extend_from_slice(&(column_name.len() as u16).to_le_bytes());
data.extend_from_slice(column_name.as_bytes());
self.record_ddl(table_name, WALOperationType::AlterTable, &data);
}
fn record_alter_table_rename_column(
&self,
table_name: &str,
old_column_name: &str,
new_column_name: &str,
) {
if self.should_skip_wal() {
return;
}
let mut data = Vec::new();
data.push(3u8);
data.extend_from_slice(&(table_name.len() as u16).to_le_bytes());
data.extend_from_slice(table_name.as_bytes());
data.extend_from_slice(&(old_column_name.len() as u16).to_le_bytes());
data.extend_from_slice(old_column_name.as_bytes());
data.extend_from_slice(&(new_column_name.len() as u16).to_le_bytes());
data.extend_from_slice(new_column_name.as_bytes());
self.record_ddl(table_name, WALOperationType::AlterTable, &data);
}
fn record_alter_table_modify_column(
&self,
table_name: &str,
column_name: &str,
data_type: crate::core::DataType,
nullable: bool,
auto_increment: Option<bool>,
check_expr: Option<Option<String>>,
) {
if self.should_skip_wal() {
return;
}
let mut data = Vec::new();
data.push(4u8);
data.extend_from_slice(&(table_name.len() as u16).to_le_bytes());
data.extend_from_slice(table_name.as_bytes());
data.extend_from_slice(&(column_name.len() as u16).to_le_bytes());
data.extend_from_slice(column_name.as_bytes());
data.push(data_type as u8);
data.push(if nullable { 1 } else { 0 });
if let Some(ai) = auto_increment {
data.push(1); data.push(if ai { 1 } else { 0 });
} else {
data.push(0); }
if let Some(ce_opt) = check_expr {
data.push(1); if let Some(ce) = ce_opt {
data.push(1); data.extend_from_slice(&(ce.len() as u16).to_le_bytes());
data.extend_from_slice(ce.as_bytes());
} else {
data.push(0); }
} else {
data.push(0); }
self.record_ddl(table_name, WALOperationType::AlterTable, &data);
}
fn record_alter_table_rename(&self, old_table_name: &str, new_table_name: &str) {
if self.should_skip_wal() {
return;
}
let mut data = Vec::new();
data.push(5u8);
data.extend_from_slice(&(old_table_name.len() as u16).to_le_bytes());
data.extend_from_slice(old_table_name.as_bytes());
data.extend_from_slice(&(new_table_name.len() as u16).to_le_bytes());
data.extend_from_slice(new_table_name.as_bytes());
self.record_ddl(old_table_name, WALOperationType::AlterTable, &data);
}
fn fetch_rows_by_ids(
&self,
table_name: &str,
row_ids: &[i64],
) -> Result<Vec<(i64, crate::core::Row)>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let store = self.get_version_store(table_name)?;
let read_txn_id = INVALID_TRANSACTION_ID + 1;
Ok(store.get_visible_versions_batch(row_ids, read_txn_id))
}
fn get_row_fetcher(
&self,
table_name: &str,
) -> Result<Box<dyn Fn(&[i64]) -> Vec<(i64, crate::core::Row)> + Send + Sync>> {
if !self.is_open() {
return Err(Error::EngineNotOpen);
}
let store = self.get_version_store(table_name)?;
let read_txn_id = INVALID_TRANSACTION_ID + 1;
Ok(Box::new(move |row_ids: &[i64]| {
store.get_visible_versions_batch(row_ids, read_txn_id)
}))
}
fn sequence_exists(&self, schema_name: &str, sequence_name: &str) -> Result<bool> {
let sequences = self.sequences.read().unwrap();
let schema_lower = schema_name.to_lowercase();
let name_lower = sequence_name.to_lowercase();
Ok(sequences
.get(&schema_lower)
.is_some_and(|s| s.contains_key(&name_lower)))
}
fn create_sequence(
&self,
schema_name: &str,
sequence_name: &str,
options: crate::core::SequenceOptions,
) -> Result<()> {
let mut sequences = self.sequences.write().unwrap();
let schema_lower = schema_name.to_lowercase();
let name_lower = sequence_name.to_lowercase();
let schema_seqs = sequences.entry(schema_lower.clone()).or_default();
if schema_seqs.contains_key(&name_lower) {
return Err(crate::core::Error::SequenceAlreadyExists(
sequence_name.to_string(),
));
}
schema_seqs.insert(
name_lower,
std::sync::Arc::new(crate::core::SequenceState::new(options)),
);
Ok(())
}
fn alter_sequence(
&self,
schema_name: &str,
sequence_name: &str,
options: crate::core::SequenceOptions,
) -> Result<()> {
let mut sequences = self.sequences.write().unwrap();
let schema_lower = schema_name.to_lowercase();
let name_lower = sequence_name.to_lowercase();
let schema_seqs = sequences.entry(schema_lower.clone()).or_default();
if !schema_seqs.contains_key(&name_lower) {
return Err(crate::core::Error::SequenceNotFound(
sequence_name.to_string(),
));
}
schema_seqs.insert(
name_lower,
std::sync::Arc::new(crate::core::SequenceState::new(options)),
);
Ok(())
}
fn drop_sequence(&self, schema_name: &str, sequence_name: &str) -> Result<()> {
let mut sequences = self.sequences.write().unwrap();
let schema_lower = schema_name.to_lowercase();
let name_lower = sequence_name.to_lowercase();
let removed = if let Some(schema_seqs) = sequences.get_mut(&schema_lower) {
schema_seqs.remove(&name_lower)
} else {
None
};
if removed.is_none() {
return Err(crate::core::Error::SequenceNotFound(
sequence_name.to_string(),
));
}
Ok(())
}
fn nextval(&self, schema_name: &str, sequence_name: &str) -> Result<i64> {
let sequence = {
let sequences = self.sequences.read().unwrap();
let schema_lower = schema_name.to_lowercase();
let name_lower = sequence_name.to_lowercase();
if let Some(seq) = sequences
.get(&schema_lower)
.and_then(|s| s.get(&name_lower))
{
std::sync::Arc::clone(seq)
} else {
return Err(crate::core::Error::SequenceNotFound(
sequence_name.to_string(),
));
}
};
sequence.nextval()
}
fn setval(
&self,
schema_name: &str,
sequence_name: &str,
value: i64,
is_called: bool,
) -> Result<i64> {
let sequence = {
let sequences = self.sequences.read().unwrap();
let schema_lower = schema_name.to_lowercase();
let name_lower = sequence_name.to_lowercase();
if let Some(seq) = sequences
.get(&schema_lower)
.and_then(|s| s.get(&name_lower))
{
std::sync::Arc::clone(seq)
} else {
return Err(crate::core::Error::SequenceNotFound(
sequence_name.to_string(),
));
}
};
sequence.setval(value, is_called)
}
fn list_sequences(&self) -> Result<Vec<(String, String, crate::core::SequenceOptions, i64)>> {
let sequences = self.sequences.read().unwrap();
let mut result = Vec::new();
for (schema_name, schema_seqs) in sequences.iter() {
for (name, seq) in schema_seqs.iter() {
result.push((
schema_name.clone(),
name.clone(),
seq.options.clone(),
seq.current_value(),
));
}
}
Ok(result)
}
}
impl MVCCEngine {
pub fn cleanup_old_transactions(&self, max_age: std::time::Duration) -> i32 {
if !self.is_open() {
return 0;
}
self.registry.cleanup_old_transactions(max_age)
}
pub fn cleanup_deleted_rows(&self, max_age: std::time::Duration) -> i32 {
if !self.is_open() {
return 0;
}
let stores = self.version_stores.read().unwrap();
let mut total_removed = 0;
for store in stores.values() {
total_removed += store.cleanup_deleted_rows(max_age);
}
total_removed
}
pub fn cleanup_old_previous_versions(&self) -> i32 {
if !self.is_open() {
return 0;
}
let stores = self.version_stores.read().unwrap();
let mut total_cleaned = 0;
for store in stores.values() {
total_cleaned += store.cleanup_old_previous_versions();
}
total_cleaned
}
pub fn start_periodic_cleanup(
self: &Arc<Self>,
interval: std::time::Duration,
max_age: std::time::Duration,
) -> CleanupHandle {
use std::sync::atomic::AtomicBool;
use std::thread;
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_flag_clone = Arc::clone(&stop_flag);
let engine = Arc::clone(self);
let handle = thread::spawn(move || {
while !stop_flag_clone.load(Ordering::Acquire) {
let check_interval = std::time::Duration::from_millis(100);
let mut elapsed = std::time::Duration::ZERO;
while elapsed < interval && !stop_flag_clone.load(Ordering::Acquire) {
thread::sleep(check_interval);
elapsed += check_interval;
}
if stop_flag_clone.load(Ordering::Acquire) {
break;
}
let _txn_count = engine.cleanup_old_transactions(max_age);
let _row_count = engine.cleanup_deleted_rows(max_age);
let _prev_version_count = engine.cleanup_old_previous_versions();
}
});
CleanupHandle {
stop_flag,
thread: Some(handle),
}
}
}
pub struct CleanupHandle {
stop_flag: Arc<AtomicBool>,
thread: Option<std::thread::JoinHandle<()>>,
}
impl CleanupHandle {
pub fn stop(&mut self) {
self.stop_flag.store(true, Ordering::Release);
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
}
}
impl Drop for CleanupHandle {
fn drop(&mut self) {
self.stop();
}
}
struct EngineOperations {
schemas: Arc<RwLock<FxHashMap<String, FxHashMap<String, Schema>>>>,
version_stores: Arc<RwLock<FxHashMap<String, Arc<VersionStore>>>>,
registry: Arc<TransactionRegistry>,
txn_version_stores: Arc<RwLock<TxnVersionStoreMap>>,
persistence: Arc<Option<PersistenceManager>>,
loading_from_disk: Arc<AtomicBool>,
#[allow(clippy::type_complexity)]
ring_buffers: Arc<
RwLock<
FxHashMap<
String,
std::sync::Arc<parking_lot::RwLock<std::collections::VecDeque<crate::core::Row>>>,
>,
>,
>,
}
impl EngineOperations {
fn new(engine: &MVCCEngine) -> Self {
Self {
schemas: Arc::clone(&engine.schemas),
version_stores: Arc::clone(&engine.version_stores),
registry: Arc::clone(&engine.registry),
txn_version_stores: Arc::clone(&engine.txn_version_stores),
persistence: Arc::clone(&engine.persistence),
loading_from_disk: Arc::clone(&engine.loading_from_disk),
ring_buffers: Arc::clone(&engine.ring_buffers),
}
}
fn schemas(&self) -> &Arc<RwLock<FxHashMap<String, FxHashMap<String, Schema>>>> {
&self.schemas
}
fn version_stores(&self) -> &RwLock<FxHashMap<String, Arc<VersionStore>>> {
&self.version_stores
}
fn txn_version_stores(&self) -> &RwLock<TxnVersionStoreMap> {
&self.txn_version_stores
}
fn persistence(&self) -> &Option<PersistenceManager> {
&self.persistence
}
fn should_skip_wal(&self) -> bool {
self.loading_from_disk.load(Ordering::Acquire)
}
}
impl TransactionEngineOperations for EngineOperations {
fn get_table_for_transaction(&self, txn_id: i64, table_name: &str) -> Result<Box<dyn Table>> {
let full_name = table_name;
let (schema_name, table_name_str) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schema_name_lower = schema_name.to_lowercase();
let table_name_lower = table_name_str.to_lowercase();
let schema = {
let schemas = self.schemas().read().unwrap();
let default_schema = schemas.get(&schema_name_lower).ok_or_else(|| {
tracing::debug!(
"TableNotFound because schema {} missing. Schemas available: {:?}",
schema_name_lower,
schemas.keys()
);
Error::TableNotFound
})?;
default_schema
.get(&table_name_lower)
.cloned()
.ok_or_else(|| {
tracing::debug!(
"TableNotFound because table {} missing in schema {}. Tables in schema: {:?}",
table_name_lower,
schema_name_lower,
default_schema.keys()
);
Error::TableNotFound
})?
};
let is_telemetry_table = schema_name_lower == "system"
&& (table_name_lower == "logs"
|| table_name_lower == "traces"
|| table_name_lower == "metrics");
if is_telemetry_table {
let buffer = {
let ring_buffers = (*self.ring_buffers).read().unwrap();
ring_buffers
.get(&table_name_lower)
.cloned()
.ok_or_else(|| {
tracing::debug!(
"TableNotFound because ring buffer missing for {}.",
table_name_lower
);
Error::TableNotFound
})?
};
return Ok(Box::new(crate::storage::mvcc::SystemRingBufferTable::new(
full_name, schema, 100_000, buffer,
)));
}
let stores = self.version_stores().read().unwrap();
let version_store = stores.get(&table_name_lower).cloned().ok_or_else(|| {
tracing::debug!(
"TableNotFound because version_store missing for {}. Stores available: {:?}",
table_name_lower,
stores.keys()
);
Error::TableNotFound
})?;
drop(stores);
let cache_key = (txn_id, table_name_lower.clone());
let txn_versions = {
let cache = self.txn_version_stores().read().unwrap();
if let Some(cached) = cache.get(&cache_key) {
Arc::clone(cached)
} else {
drop(cache);
let new_store = Arc::new(RwLock::new(TransactionVersionStore::new(
Arc::clone(&version_store),
txn_id,
)));
let mut cache = self.txn_version_stores().write().unwrap();
cache.insert(cache_key, Arc::clone(&new_store));
new_store
}
};
let table = MVCCTable::new_with_shared_store(txn_id, version_store, txn_versions);
Ok(Box::new(table))
}
fn create_table(&self, name: &str, schema: Schema) -> Result<Box<dyn Table>> {
let full_name = name;
let (schema_name, table_name_str) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schema_name_lower = schema_name.to_lowercase();
let table_name_lower = table_name_str.to_lowercase();
{
let schemas = self.schemas().read().unwrap();
let default_schema = schemas.get(&schema_name_lower);
if default_schema.is_some_and(|s| s.contains_key(&table_name_lower)) {
return Err(Error::TableAlreadyExists);
}
}
{
let mut schemas = (*self.schemas()).write().unwrap();
let default_schema = schemas.entry(schema_name_lower.clone()).or_default();
default_schema.insert(table_name_lower.clone(), schema.clone());
}
let is_telemetry_table = schema_name_lower == "system"
&& (table_name_lower == "logs"
|| table_name_lower == "traces"
|| table_name_lower == "metrics");
if is_telemetry_table {
let buffer = std::sync::Arc::new(parking_lot::RwLock::new(
std::collections::VecDeque::with_capacity(100_000),
));
{
let mut ring_buffers = (*self.ring_buffers).write().unwrap();
ring_buffers.insert(table_name_lower.clone(), std::sync::Arc::clone(&buffer));
}
Ok(Box::new(crate::storage::mvcc::SystemRingBufferTable::new(
full_name, schema, 100_000, buffer,
)))
} else {
let version_store = Arc::new(VersionStore::with_visibility_checker(
schema.table_name.clone(),
schema,
Arc::clone(&self.registry) as Arc<dyn VisibilityChecker>,
));
{
let mut stores = self.version_stores().write().unwrap();
stores.insert(table_name_lower.clone(), Arc::clone(&version_store));
}
let txn_versions = TransactionVersionStore::new(Arc::clone(&version_store), 0);
let table = MVCCTable::new(0, version_store, txn_versions);
Ok(Box::new(table))
}
}
fn drop_table(&self, name: &str) -> Result<()> {
let full_name = name;
let (schema_name, table_name) = if let Some(pos) = full_name.find('.') {
(
full_name[..pos].to_string(),
full_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_name.to_string())
};
let schema_name_lower = schema_name.to_lowercase();
let table_name_lower = table_name.to_lowercase();
let is_telemetry_table = schema_name_lower == "system"
&& (table_name_lower == "logs"
|| table_name_lower == "traces"
|| table_name_lower == "metrics");
if is_telemetry_table {
return Err(Error::NotSupportedMessage(
"Cannot drop system tables".to_string(),
));
}
{
let mut schemas = self.schemas().write().unwrap();
let default_schema = schemas
.get_mut(&schema_name_lower)
.ok_or(Error::TableNotFound)?;
if default_schema.remove(&table_name_lower).is_none() {
return Err(Error::TableNotFound);
}
}
{
let mut stores = self.version_stores().write().unwrap();
if let Some(store) = stores.remove(&table_name_lower) {
store.close();
}
}
Ok(())
}
fn list_tables(&self) -> Result<Vec<String>> {
let schemas = self.schemas().read().unwrap();
let mut result = Vec::new();
for (schema_name, tables) in schemas.iter() {
for table_name in tables.keys() {
if schema_name == "public" {
result.push(table_name.clone());
} else {
result.push(format!("{}.{}", schema_name, table_name));
}
}
}
Ok(result)
}
fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()> {
let full_old_name = old_name;
let (old_schema_name, old_table_name_str) = if let Some(pos) = full_old_name.find('.') {
(
full_old_name[..pos].to_string(),
full_old_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_old_name.to_string())
};
let old_schema_name_lower = old_schema_name.to_lowercase();
let old_table_name_lower = old_table_name_str.to_lowercase();
let full_new_name = new_name;
let (new_schema_name, new_table_name_str) = if let Some(pos) = full_new_name.find('.') {
(
full_new_name[..pos].to_string(),
full_new_name[pos + 1..].to_string(),
)
} else {
("public".to_string(), full_new_name.to_string())
};
let new_schema_name_lower = new_schema_name.to_lowercase();
let new_table_name_lower = new_table_name_str.to_lowercase();
{
let schemas = (*self.schemas()).read().unwrap();
let old_schema = schemas
.get(&old_schema_name_lower)
.ok_or(Error::TableNotFound)?;
if !old_schema.contains_key(&old_table_name_lower) {
return Err(Error::TableNotFound);
}
let empty_map = FxHashMap::default();
let new_schema = schemas.get(&new_schema_name_lower).unwrap_or(&empty_map);
if new_schema.contains_key(&new_table_name_lower) {
return Err(Error::TableAlreadyExists);
}
}
{
let mut schemas = (*self.schemas()).write().unwrap();
let mut schema_to_move = None;
if let Some(old_schema) = schemas.get_mut(&old_schema_name_lower) {
if let Some(mut schema) = old_schema.remove(&old_table_name_lower) {
schema.table_name = new_table_name_str.clone();
schema.schema_name = new_schema_name.clone();
schema.table_name_lower = new_table_name_lower.clone();
schema.schema_name_lower = new_schema_name_lower.clone();
schema_to_move = Some(schema);
}
}
if let Some(schema) = schema_to_move {
let new_schema_map = schemas.entry(new_schema_name_lower.clone()).or_default();
new_schema_map.insert(new_table_name_lower.clone(), schema);
}
}
{
let mut stores = self.version_stores().write().unwrap();
if let Some(store) = stores.remove(&old_table_name_lower) {
{
let mut vs_schema = store.schema_mut();
vs_schema.table_name = new_table_name_str;
vs_schema.schema_name = new_schema_name;
vs_schema.table_name_lower = new_table_name_lower.clone();
vs_schema.schema_name_lower = new_schema_name_lower;
}
stores.insert(new_table_name_lower, store);
}
}
Ok(())
}
fn commit_table(&self, txn_id: i64, table: &dyn Table) -> Result<()> {
if self.should_skip_wal() {
return Ok(());
}
if let Some(ref pm) = self.persistence() {
if pm.is_enabled() {
let table_name = table.name();
let pending = table.get_pending_versions();
for (row_id, row_data, is_deleted, version_txn_id) in pending {
let version = RowVersion {
txn_id: version_txn_id,
deleted_at_txn_id: if is_deleted { version_txn_id } else { 0 },
data: row_data,
row_id,
create_time: 0, };
let op = if is_deleted {
WALOperationType::Delete
} else {
WALOperationType::Insert
};
if let Err(e) =
pm.record_dml_operation(txn_id, table_name, row_id, op, &version)
{
eprintln!("Warning: Failed to record DML in WAL: {}", e);
}
}
}
}
Ok(())
}
fn rollback_table(&self, _txn_id: i64, table: &dyn Table) {
let _ = table;
}
fn record_commit(&self, txn_id: i64) -> Result<()> {
if self.should_skip_wal() {
return Ok(());
}
if let Some(ref pm) = self.persistence() {
if pm.is_enabled() {
if let Err(e) = pm.record_commit(txn_id) {
eprintln!("Warning: Failed to record commit in WAL: {}", e);
}
}
}
Ok(())
}
fn record_rollback(&self, txn_id: i64) -> Result<()> {
if self.should_skip_wal() {
return Ok(());
}
if let Some(ref pm) = self.persistence() {
if pm.is_enabled() {
if let Err(e) = pm.record_rollback(txn_id) {
eprintln!("Warning: Failed to record rollback in WAL: {}", e);
}
}
}
Ok(())
}
fn get_tables_with_pending_changes(&self, txn_id: i64) -> Result<Vec<Box<dyn Table>>> {
let mut tables = Vec::new();
let cache = self.txn_version_stores().read().unwrap();
for ((cached_txn_id, table_name), txn_store) in cache.iter() {
if *cached_txn_id == txn_id {
let store = txn_store.read().unwrap();
if store.has_local_changes() {
drop(store);
let stores = self.version_stores().read().unwrap();
if let Some(version_store) = stores.get(table_name).cloned() {
drop(stores);
let table = MVCCTable::new_with_shared_store(
txn_id,
Arc::clone(&version_store),
Arc::clone(txn_store),
);
tables.push(Box::new(table) as Box<dyn Table>);
}
}
}
}
Ok(tables)
}
fn commit_all_tables(&self, txn_id: i64) -> Result<()> {
let cache = self.txn_version_stores().read().unwrap();
for ((cached_txn_id, table_name), txn_store) in cache.iter() {
if *cached_txn_id == txn_id {
let has_changes = {
let store = txn_store.read().unwrap();
store.has_local_changes()
};
if has_changes {
let stores = self.version_stores().read().unwrap();
if let Some(version_store) = stores.get(table_name).cloned() {
drop(stores);
let mut table = MVCCTable::new_with_shared_store(
txn_id,
Arc::clone(&version_store),
Arc::clone(txn_store),
);
table.commit()?;
}
}
}
}
drop(cache);
let mut cache = self.txn_version_stores().write().unwrap();
cache.retain(|(cached_txn_id, _), _| *cached_txn_id != txn_id);
Ok(())
}
fn rollback_all_tables(&self, txn_id: i64) -> Result<()> {
let cache = self.txn_version_stores().read().unwrap();
for ((cached_txn_id, _), txn_store) in cache.iter() {
if *cached_txn_id == txn_id {
txn_store.write().unwrap().rollback();
}
}
drop(cache);
let mut cache = self.txn_version_stores().write().unwrap();
cache.retain(|(cached_txn_id, _), _| *cached_txn_id != txn_id);
Ok(())
}
fn create_schema(&self, name: &str) -> Result<()> {
let name_lower = name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
schemas.insert(name_lower, FxHashMap::default());
Ok(())
}
fn drop_schema(&self, name: &str) -> Result<()> {
let name_lower = name.to_lowercase();
let mut schemas = self.schemas.write().unwrap();
schemas.remove(&name_lower);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, Row, SchemaBuilder, Value};
#[test]
fn test_engine_creation() {
let engine = MVCCEngine::in_memory();
assert!(!engine.is_open());
assert_eq!(engine.get_path(), "memory://");
}
#[test]
fn test_engine_open_close() {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
assert!(engine.is_open());
engine.close_engine().unwrap();
assert!(!engine.is_open());
}
#[test]
fn test_engine_create_table() {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
let schema = SchemaBuilder::new("users")
.column("id", DataType::Integer, false, true)
.column("name", DataType::Text, true, false)
.build();
let created = engine.create_table(schema).unwrap();
assert_eq!(created.table_name, "users");
assert!(engine.table_exists("users").unwrap());
assert!(engine.table_exists("USERS").unwrap());
engine.close_engine().unwrap();
}
#[test]
fn test_engine_drop_table() {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
let schema = SchemaBuilder::new("temp")
.column("id", DataType::Integer, false, true)
.build();
engine.create_table(schema).unwrap();
assert!(engine.table_exists("temp").unwrap());
engine.drop_table_internal("temp").unwrap();
assert!(!engine.table_exists("temp").unwrap());
engine.close_engine().unwrap();
}
#[test]
fn test_engine_duplicate_table_error() {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
let schema = SchemaBuilder::new("dup")
.column("id", DataType::Integer, false, true)
.build();
engine.create_table(schema.clone()).unwrap();
let result = engine.create_table(schema);
assert!(result.is_err());
engine.close_engine().unwrap();
}
#[test]
fn test_engine_begin_transaction() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
let txn = engine.begin_transaction();
assert!(txn.is_ok());
let mut txn = txn.unwrap();
assert!(txn.id() > 0);
txn.rollback().unwrap();
engine.close().unwrap();
}
#[test]
fn test_engine_transaction_create_table() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
let mut txn = engine.begin_transaction().unwrap();
let schema = SchemaBuilder::new("txn_table")
.column("id", DataType::Integer, false, true)
.column("value", DataType::Text, true, false)
.build();
let table = txn.create_table("txn_table", schema).unwrap();
assert_eq!(table.name(), "txn_table");
txn.commit().unwrap();
engine.close().unwrap();
}
#[test]
fn test_engine_transaction_insert_and_select() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
let schema = SchemaBuilder::new("data")
.column("id", DataType::Integer, false, true)
.column("name", DataType::Text, true, false)
.build();
engine.create_table(schema).unwrap();
let mut txn = engine.begin_transaction().unwrap();
let mut table = txn.get_table("data").unwrap();
table
.insert(Row::from_values(vec![
Value::Integer(1),
Value::text("Alice"),
]))
.unwrap();
table
.insert(Row::from_values(vec![
Value::Integer(2),
Value::text("Bob"),
]))
.unwrap();
let mut scanner = table.scan(&[0, 1], None).unwrap();
let mut count = 0;
while scanner.next() {
count += 1;
}
assert_eq!(count, 2);
txn.commit().unwrap();
engine.close().unwrap();
}
#[test]
fn test_engine_isolation_level() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
assert_eq!(engine.get_isolation_level(), IsolationLevel::ReadCommitted);
engine
.set_isolation_level(IsolationLevel::SnapshotIsolation)
.unwrap();
assert_eq!(
engine.get_isolation_level(),
IsolationLevel::SnapshotIsolation
);
engine.close().unwrap();
}
#[test]
fn test_engine_get_version_store() {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
let schema = SchemaBuilder::new("versioned")
.column("id", DataType::Integer, false, true)
.build();
engine.create_table(schema).unwrap();
let store = engine.get_version_store("versioned");
assert!(store.is_ok());
let store = engine.get_version_store("nonexistent");
assert!(store.is_err());
engine.close_engine().unwrap();
}
#[test]
fn test_engine_get_table_schema() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
let schema = SchemaBuilder::new("test_schema")
.column("id", DataType::Integer, false, true)
.column("name", DataType::Text, true, false)
.build();
engine.create_table(schema).unwrap();
let retrieved = engine.get_table_schema("test_schema").unwrap();
assert_eq!(retrieved.columns.len(), 2);
assert_eq!(retrieved.columns[0].name, "id");
assert!(engine.get_table_schema("nonexistent").is_err());
engine.close().unwrap();
}
#[test]
fn test_engine_transaction_with_isolation_level() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
let txn = engine.begin_transaction_with_level(IsolationLevel::SnapshotIsolation);
assert!(txn.is_ok());
let mut txn = txn.unwrap();
txn.rollback().unwrap();
engine.close().unwrap();
}
#[test]
fn test_engine_path() {
let engine = MVCCEngine::in_memory();
assert!(engine.path().is_none());
let config = Config::with_path("/tmp/test.db");
let engine = MVCCEngine::new(config);
assert_eq!(engine.path(), Some("/tmp/test.db"));
}
#[test]
fn test_engine_create_snapshot() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
assert!(engine.create_snapshot().is_ok());
engine.close().unwrap();
}
#[test]
fn test_engine_list_table_indexes() {
let mut engine = MVCCEngine::in_memory();
engine.open().unwrap();
let schema = SchemaBuilder::new("indexed")
.column("id", DataType::Integer, false, true)
.build();
engine.create_table(schema).unwrap();
let indexes = engine.list_table_indexes("indexed").unwrap();
assert!(indexes.is_empty());
engine.close().unwrap();
}
#[test]
fn test_cross_transaction_visibility() {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
let schema = SchemaBuilder::new("test_xact")
.column("id", DataType::Integer, false, true)
.column("name", DataType::Text, true, false)
.build();
engine.create_table(schema).unwrap();
{
let mut tx1 = engine.begin_transaction().unwrap();
let mut table = tx1.get_table("test_xact").unwrap();
table
.insert(Row::from_values(vec![
Value::Integer(1),
Value::text("Alice"),
]))
.unwrap();
tx1.commit().unwrap();
}
{
let tx2 = engine.begin_transaction().unwrap();
let table = tx2.get_table("test_xact").unwrap();
let mut scanner = table.scan(&[0, 1], None).unwrap();
let mut count = 0;
while scanner.next() {
count += 1;
}
assert_eq!(
count, 1,
"Transaction 2 should see 1 row committed by Transaction 1"
);
}
engine.close_engine().unwrap();
}
#[test]
fn test_durability_ddl_survives_restart() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_str().unwrap().to_string();
let config = Config::with_path(&path);
let engine = MVCCEngine::new(config.clone());
engine.open_engine().unwrap();
let schema = SchemaBuilder::new("survivor_table")
.column("id", DataType::Integer, false, true)
.build();
engine.create_table(schema).unwrap();
engine.close_engine().unwrap();
drop(engine);
let new_engine = MVCCEngine::new(config);
new_engine.open_engine().unwrap();
assert!(new_engine.table_exists("survivor_table").unwrap());
}
}