use crate::config::DBConfig;
use crate::index::btree::{BTree, BTreeConfig};
use crate::index::vamana::{DiskANNIndex, VamanaConfig};
use crate::index::{SpatialHybridIndex, SpatialHybridConfig, BoundingBoxF32};
use crate::index::text_fts::TextFTSIndex;
use crate::index::column_value::ColumnValueIndex;
use crate::storage::{LSMEngine, LSMConfig};
use crate::txn::coordinator::TransactionCoordinator;
use crate::txn::version_store::VersionStore;
use crate::txn::wal::{WALManager, WALRecord};
use crate::types::RowId;
use crate::catalog::TableRegistry;
use crate::cache::RowCache;
use crate::{Result, StorageError};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
#[derive(Debug, Clone)]
pub struct DatabaseStats {
pub total_rows: RowId,
pub num_partitions: u8,
}
#[derive(Debug, Clone)]
pub struct VectorIndexStats {
pub total_vectors: usize,
pub dimension: usize,
pub cache_hit_rate: f32,
pub memory_usage: usize,
pub disk_usage: usize,
}
#[derive(Debug, Clone)]
pub struct SpatialIndexStats {
pub total_entries: usize,
pub memory_usage: usize,
pub bytes_per_entry: usize,
}
pub struct MoteDB {
pub(crate) path: PathBuf,
pub(crate) wal: Arc<WALManager>,
pub(crate) lsm_engine: Arc<LSMEngine>,
pub(crate) timestamp_index: Arc<RwLock<BTree>>,
pub(crate) next_row_id: Arc<RwLock<RowId>>,
pub(crate) num_partitions: u8,
pub(crate) txn_coordinator: Arc<TransactionCoordinator>,
pub(crate) version_store: Arc<VersionStore>,
pub(crate) pending_updates: Arc<RwLock<usize>>,
pub(crate) pending_spatial_updates: Arc<RwLock<usize>>,
pub(crate) vector_indexes: Arc<DashMap<String, Arc<RwLock<DiskANNIndex>>>>,
pub(crate) spatial_indexes: Arc<DashMap<String, Arc<RwLock<SpatialHybridIndex>>>>,
pub text_indexes: Arc<DashMap<String, Arc<RwLock<TextFTSIndex>>>>,
pub column_indexes: Arc<DashMap<String, Arc<RwLock<ColumnValueIndex>>>>,
pub(crate) table_registry: Arc<TableRegistry>,
pub(crate) index_registry: Arc<crate::database::index_metadata::IndexRegistry>,
pub(crate) row_cache: Arc<RowCache>,
pub(crate) table_hash_cache: Arc<DashMap<String, u64>>,
pub(crate) index_update_strategy: crate::config::IndexUpdateStrategy,
pub(crate) query_timeout_secs: Option<u64>,
pub(crate) is_flushing: Arc<AtomicBool>,
}
impl MoteDB {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::create_with_config(path, DBConfig::default())
}
pub fn create_with_config<P: AsRef<Path>>(path: P, config: DBConfig) -> Result<Self> {
let path = path.as_ref();
let db_path = path.with_extension("mote");
std::fs::create_dir_all(&db_path)?;
let wal_path = db_path.join("wal");
let lsm_dir = db_path.join("lsm");
let indexes_dir = db_path.join("indexes");
let num_partitions = config.num_partitions;
std::fs::create_dir_all(&wal_path)?;
let wal_config = crate::txn::wal::WALConfig::from(config.wal_config);
let wal = Arc::new(WALManager::create_with_config(&wal_path, num_partitions, wal_config)?);
std::fs::create_dir_all(&indexes_dir)?;
let timestamp_storage = indexes_dir.join("timestamp.idx");
let btree_config = BTreeConfig {
unique_keys: false, allow_updates: true,
..Default::default()
};
let timestamp_index = Arc::new(RwLock::new(BTree::with_config(timestamp_storage, btree_config)?));
std::fs::create_dir_all(&lsm_dir)?;
let lsm_engine = Arc::new(LSMEngine::new(lsm_dir, LSMConfig::default())?);
let version_store = Arc::new(VersionStore::new());
let txn_coordinator = Arc::new(TransactionCoordinator::new(version_store.clone()));
let table_registry = Arc::new(TableRegistry::new(&db_path)?);
let index_registry = Arc::new(crate::database::index_metadata::IndexRegistry::new(&db_path));
let row_cache = Arc::new(RowCache::new(config.row_cache_size.unwrap_or(10000)));
let table_hash_cache = Arc::new(DashMap::new());
{
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
"_default".hash(&mut hasher);
let table_hash = hasher.finish() & 0xFFFFFFFF; table_hash_cache.insert("_default".to_string(), table_hash);
}
let db = Self {
path: db_path,
wal,
lsm_engine: lsm_engine.clone(),
timestamp_index,
next_row_id: Arc::new(RwLock::new(0)),
num_partitions,
txn_coordinator,
version_store,
pending_updates: Arc::new(RwLock::new(0)),
pending_spatial_updates: Arc::new(RwLock::new(0)),
vector_indexes: Arc::new(DashMap::new()),
spatial_indexes: Arc::new(DashMap::new()),
text_indexes: Arc::new(DashMap::new()),
column_indexes: Arc::new(DashMap::new()),
table_registry,
index_registry, row_cache,
table_hash_cache,
index_update_strategy: config.index_update_strategy.clone(), query_timeout_secs: config.query_timeout_secs, is_flushing: Arc::new(AtomicBool::new(false)), };
let db_clone = db.clone_for_callback();
lsm_engine.set_flush_callback(move |memtable| {
db_clone.batch_build_indexes_from_flush(memtable)
})?;
Ok(db)
}
pub(crate) fn clone_for_callback(&self) -> Self {
Self {
path: self.path.clone(),
wal: self.wal.clone(),
lsm_engine: self.lsm_engine.clone(),
timestamp_index: self.timestamp_index.clone(),
next_row_id: self.next_row_id.clone(),
num_partitions: self.num_partitions,
txn_coordinator: self.txn_coordinator.clone(),
version_store: self.version_store.clone(),
pending_updates: self.pending_updates.clone(),
pending_spatial_updates: self.pending_spatial_updates.clone(),
vector_indexes: self.vector_indexes.clone(),
spatial_indexes: self.spatial_indexes.clone(),
text_indexes: self.text_indexes.clone(),
column_indexes: self.column_indexes.clone(),
table_registry: self.table_registry.clone(),
index_registry: self.index_registry.clone(), row_cache: self.row_cache.clone(),
table_hash_cache: self.table_hash_cache.clone(), index_update_strategy: self.index_update_strategy.clone(), query_timeout_secs: self.query_timeout_secs, is_flushing: self.is_flushing.clone(), }
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
let db_path = path.with_extension("mote");
let wal_path = db_path.join("wal");
let lsm_dir = db_path.join("lsm");
let indexes_dir = db_path.join("indexes");
let num_partitions = 4;
let wal = if wal_path.exists() {
Arc::new(WALManager::open(&wal_path, num_partitions)?)
} else {
std::fs::create_dir_all(&wal_path)?;
Arc::new(WALManager::create(&wal_path, num_partitions)?)
};
let recovered_records = wal.recover()?;
std::fs::create_dir_all(&indexes_dir)?;
let timestamp_storage = indexes_dir.join("timestamp.idx");
let btree_config = BTreeConfig {
unique_keys: false,
allow_updates: true,
..Default::default()
};
let mut timestamp_idx = BTree::with_config(timestamp_storage, btree_config)?;
let persisted_count = timestamp_idx.len();
let mut max_row_id = if persisted_count > 0 {
(persisted_count - 1) as u64
} else {
0
};
for records in recovered_records.values() {
for record in records {
if let WALRecord::Insert { row_id, data, .. } = record {
max_row_id = max_row_id.max(*row_id);
if let Some(crate::types::Value::Timestamp(ts)) = data.first() {
let _ = timestamp_idx.insert(ts.as_micros() as u64, *row_id);
}
}
}
}
let timestamp_index = Arc::new(RwLock::new(timestamp_idx));
std::fs::create_dir_all(&lsm_dir)?;
let lsm_engine = Arc::new(LSMEngine::new(lsm_dir, LSMConfig::default())?);
debug_log!("[database] 恢复 WAL 记录到 LSM Engine...");
let mut recovered_count = 0;
for records in recovered_records.values() {
for record in records {
use std::hash::{Hash, Hasher};
match record {
WALRecord::Insert { table_name, row_id, data, .. } => {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
table_name.hash(&mut hasher);
let table_hash = hasher.finish() & 0xFFFFFFFF; let composite_key = (table_hash << 32) | (*row_id & 0xFFFFFFFF);
let row_data = bincode::serialize(data)?;
let value = crate::storage::lsm::Value::new(row_data, composite_key);
lsm_engine.put(composite_key, value)?;
recovered_count += 1;
}
WALRecord::Update { table_name, row_id, new_data, .. } => {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
table_name.hash(&mut hasher);
let table_hash = hasher.finish() & 0xFFFFFFFF; let composite_key = (table_hash << 32) | (*row_id & 0xFFFFFFFF);
let row_data = bincode::serialize(new_data)?;
let value = crate::storage::lsm::Value::new(row_data, composite_key);
lsm_engine.put(composite_key, value)?;
recovered_count += 1;
}
WALRecord::Delete { table_name, row_id, .. } => {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
table_name.hash(&mut hasher);
let table_hash = hasher.finish() & 0xFFFFFFFF; let composite_key = (table_hash << 32) | (*row_id & 0xFFFFFFFF);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| StorageError::InvalidData(e.to_string()))?
.as_micros() as u64;
lsm_engine.delete(composite_key, timestamp)?;
recovered_count += 1;
}
_ => {}
}
}
}
debug_log!("[database] WAL 恢复完成,恢复了 {} 条记录", recovered_count);
let version_store = Arc::new(VersionStore::new());
let txn_coordinator = Arc::new(TransactionCoordinator::new(version_store.clone()));
let vector_indexes = Self::load_vector_indexes(&db_path)?;
let spatial_indexes = Self::load_spatial_indexes(&db_path)?;
let text_indexes = Self::load_text_indexes(&db_path)?;
let table_registry = Arc::new(TableRegistry::new(&db_path)?);
let index_registry = Arc::new(crate::database::index_metadata::IndexRegistry::new(&db_path));
let _ = index_registry.load();
let row_cache = Arc::new(RowCache::new(10000));
let table_hash_cache = Arc::new(DashMap::new());
{
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
"_default".hash(&mut hasher);
let table_hash = hasher.finish() & 0xFFFFFFFF; table_hash_cache.insert("_default".to_string(), table_hash);
for table_name in table_registry.list_tables()? {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
table_name.hash(&mut hasher);
let table_hash = hasher.finish() & 0xFFFFFFFF; table_hash_cache.insert(table_name, table_hash);
}
}
let db = Self {
path: db_path,
wal,
lsm_engine: lsm_engine.clone(),
timestamp_index,
next_row_id: Arc::new(RwLock::new(max_row_id + 1)),
num_partitions,
txn_coordinator,
version_store,
pending_updates: Arc::new(RwLock::new(0)),
pending_spatial_updates: Arc::new(RwLock::new(0)),
vector_indexes: Arc::new(Self::hashmap_to_dashmap(vector_indexes)),
spatial_indexes: Arc::new(Self::hashmap_to_dashmap(spatial_indexes)),
text_indexes: Arc::new(Self::hashmap_to_dashmap(text_indexes)),
column_indexes: Arc::new(DashMap::new()), table_registry,
index_registry, row_cache,
table_hash_cache, index_update_strategy: crate::config::IndexUpdateStrategy::default(), query_timeout_secs: None, is_flushing: Arc::new(AtomicBool::new(false)), };
let db_clone = db.clone_for_callback();
lsm_engine.set_flush_callback(move |memtable| {
db_clone.batch_build_indexes_from_flush(memtable)
})?;
Ok(db)
}
fn hashmap_to_dashmap<K: std::hash::Hash + Eq, V>(map: HashMap<K, V>) -> DashMap<K, V> {
let dashmap = DashMap::new();
for (k, v) in map {
dashmap.insert(k, v);
}
dashmap
}
fn load_vector_indexes(db_path: &Path) -> Result<HashMap<String, Arc<RwLock<DiskANNIndex>>>> {
let mut indexes = HashMap::new();
let indexes_dir = db_path.join("indexes");
if indexes_dir.exists() {
if let Ok(entries) = std::fs::read_dir(&indexes_dir) {
for entry in entries.flatten() {
if let Ok(name) = entry.file_name().into_string() {
if name.starts_with("vector_") {
let index_name = name.strip_prefix("vector_").unwrap();
let index_path = entry.path();
let config = VamanaConfig::default();
if let Ok(index) = DiskANNIndex::load(&index_path, config) {
indexes.insert(
index_name.to_string(),
Arc::new(RwLock::new(index))
);
println!("[MoteDB] Loaded vector index: {}", index_name);
}
}
}
}
}
}
Ok(indexes)
}
fn load_spatial_indexes(db_path: &Path) -> Result<HashMap<String, Arc<RwLock<SpatialHybridIndex>>>> {
let mut indexes = HashMap::new();
let indexes_dir = db_path.join("indexes");
if indexes_dir.exists() {
if let Ok(entries) = std::fs::read_dir(&indexes_dir) {
for entry in entries.flatten() {
if let Ok(name) = entry.file_name().into_string() {
if name.starts_with("spatial_") {
let index_name = name.strip_prefix("spatial_").unwrap();
let index_path = entry.path();
let default_config = SpatialHybridConfig::new(
BoundingBoxF32::new(0.0, 0.0, 1000.0, 1000.0)
).with_mmap(true, Some(index_path.clone()));
if let Ok(index) = SpatialHybridIndex::load(&index_path, default_config) {
indexes.insert(
index_name.to_string(),
Arc::new(RwLock::new(index))
);
println!("[MoteDB] Loaded spatial index: {}", index_name);
}
}
}
}
}
}
Ok(indexes)
}
fn load_text_indexes(db_path: &Path) -> Result<HashMap<String, Arc<RwLock<TextFTSIndex>>>> {
let mut indexes = HashMap::new();
let indexes_dir = db_path.join("indexes");
if indexes_dir.exists() {
if let Ok(entries) = std::fs::read_dir(&indexes_dir) {
for entry in entries.flatten() {
if let Ok(name) = entry.file_name().into_string() {
if name.starts_with("text_") {
let index_name = name.strip_prefix("text_").unwrap();
let index_path = entry.path();
if let Ok(index) = TextFTSIndex::new(index_path) {
indexes.insert(
index_name.to_string(),
Arc::new(RwLock::new(index))
);
println!("[MoteDB] Loaded text index: {}", index_name);
}
}
}
}
}
}
Ok(indexes)
}
}