use sparrowdb_catalog::catalog::{Catalog, LabelId};
use sparrowdb_common::{EdgeId, NodeId};
use sparrowdb_storage::csr::CsrForward;
use sparrowdb_storage::edge_store::RelTableId;
use sparrowdb_storage::node_store::Value;
use sparrowdb_storage::wal::writer::WalWriter;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::{Arc, Mutex, RwLock};
pub(crate) type EdgePropsCache = Arc<RwLock<HashMap<u32, HashMap<(u64, u64), Vec<(u32, u64)>>>>>;
pub(crate) type VersionKey = (u64, u32);
#[derive(Clone)]
pub(crate) struct Version {
pub committed_at: u64,
pub value: Value,
}
#[derive(Default)]
pub(crate) struct VersionStore {
pub map: HashMap<VersionKey, Vec<Version>>,
}
impl VersionStore {
pub fn insert(&mut self, node_id: NodeId, col_id: u32, committed_at: u64, value: Value) {
let versions = self.map.entry((node_id.0, col_id)).or_default();
versions.push(Version {
committed_at,
value,
});
versions.sort_by_key(|v| v.committed_at);
}
pub fn get_at(&self, node_id: NodeId, col_id: u32, snapshot_txn_id: u64) -> Option<Value> {
let versions = self.map.get(&(node_id.0, col_id))?;
let idx = versions.partition_point(|v| v.committed_at <= snapshot_txn_id);
if idx == 0 {
None
} else {
Some(versions[idx - 1].value.clone())
}
}
pub fn gc(&mut self, min_active_txn_id: u64) -> usize {
let mut pruned = 0usize;
self.map.retain(|_, versions| {
let cutoff = versions.partition_point(|v| v.committed_at < min_active_txn_id);
if cutoff > 1 {
let keep_from = cutoff - 1;
pruned += keep_from;
versions.drain(..keep_from);
}
!versions.is_empty()
});
pruned
}
}
pub(crate) struct StagedUpdate {
pub before_image: Option<(u64, Value)>, pub new_value: Value,
pub key_name: String,
}
#[derive(Default)]
pub(crate) struct WriteBuffer {
pub updates: HashMap<VersionKey, StagedUpdate>,
}
pub(crate) enum WalMutation {
NodeCreate {
node_id: NodeId,
label_id: u32,
props: Vec<(u32, Value)>,
prop_names: Vec<String>,
},
NodeDelete { node_id: NodeId },
EdgeCreate {
edge_id: EdgeId,
src: NodeId,
dst: NodeId,
rel_type: String,
prop_entries: Vec<(String, Value)>,
},
EdgeDelete {
src: NodeId,
dst: NodeId,
rel_type: String,
},
}
pub(crate) enum PendingOp {
NodeCreate {
label_id: u32,
slot: u32,
props: Vec<(u32, Value)>,
},
NodeDelete { node_id: NodeId },
EdgeCreate {
src: NodeId,
dst: NodeId,
rel_table_id: RelTableId,
props: Vec<(u32, u64)>,
},
EdgeDelete {
src: NodeId,
dst: NodeId,
rel_table_id: RelTableId,
},
}
#[derive(Default)]
pub(crate) struct NodeVersions {
pub map: HashMap<u64, u64>,
}
impl NodeVersions {
pub fn set(&mut self, node_id: NodeId, txn_id: u64) {
self.map.insert(node_id.0, txn_id);
}
pub fn get(&self, node_id: NodeId) -> u64 {
self.map.get(&node_id.0).copied().unwrap_or(0)
}
}
pub(crate) struct DbInner {
pub path: PathBuf,
pub current_txn_id: AtomicU64,
pub write_locked: AtomicBool,
pub versions: RwLock<VersionStore>,
pub node_versions: RwLock<NodeVersions>,
#[allow(dead_code)]
pub encryption_key: Option<[u8; 32]>,
pub unique_constraints: RwLock<HashSet<(u32, u32)>>,
pub prop_index: RwLock<sparrowdb_storage::property_index::PropertyIndex>,
pub catalog: RwLock<Catalog>,
pub csr_map: RwLock<HashMap<u32, CsrForward>>,
pub label_row_counts: RwLock<HashMap<LabelId, usize>>,
pub wal_writer: Mutex<WalWriter>,
pub edge_props_cache: EdgePropsCache,
pub active_readers: Mutex<BTreeMap<u64, usize>>,
pub commits_since_gc: AtomicU64,
}
pub(crate) const GC_COMMIT_INTERVAL: u64 = 100;
impl DbInner {
pub fn invalidate_prop_index(&self) {
self.prop_index
.write()
.expect("prop_index RwLock poisoned")
.clear();
sparrowdb_storage::property_index::PropertyIndex::remove_persisted(&self.path);
}
pub fn persist_prop_index(&self) {
if let Ok(mut guard) = self.prop_index.write() {
guard.persist_if_grew(&self.path);
}
}
}
pub(crate) struct WriteGuard {
pub inner: Arc<DbInner>,
}
impl WriteGuard {
pub fn try_acquire(inner: &Arc<DbInner>) -> Option<Self> {
inner
.write_locked
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::Acquire,
std::sync::atomic::Ordering::Relaxed,
)
.ok()
.map(|_| WriteGuard {
inner: Arc::clone(inner),
})
}
}
impl Drop for WriteGuard {
fn drop(&mut self) {
self.inner
.write_locked
.store(false, std::sync::atomic::Ordering::Release);
}
}