use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use parking_lot::{Mutex, RwLock};
use crate::cache::manager::CacheManager;
use crate::constants::*;
use crate::core::pager::FilePager;
use crate::core::snapshot::reader::SnapshotData;
use crate::core::wal::buffer::WalBuffer;
use crate::types::*;
use crate::vector::types::VectorManifest;
mod check;
mod checkpoint;
mod compactor;
mod iter;
mod open;
mod read;
mod recovery;
mod schema;
mod transaction;
mod vector;
mod write;
#[cfg(test)]
mod stress;
pub use compactor::{SingleFileOptimizeOptions, VacuumOptions};
pub use iter::*;
pub use open::{close_single_file, open_single_file, SingleFileOpenOptions, SyncMode};
pub use recovery::replay_wal_record;
#[derive(Debug)]
pub struct SingleFileTxState {
pub txid: TxId,
pub read_only: bool,
pub snapshot_ts: u64,
pub delta_snapshot: Option<DeltaState>,
}
impl SingleFileTxState {
pub fn new(
txid: TxId,
read_only: bool,
snapshot_ts: u64,
delta_snapshot: Option<DeltaState>,
) -> Self {
Self {
txid,
read_only,
snapshot_ts,
delta_snapshot,
}
}
}
pub struct SingleFileDB {
pub path: PathBuf,
pub read_only: bool,
pub pager: Mutex<FilePager>,
pub header: RwLock<DbHeaderV1>,
pub wal_buffer: Mutex<WalBuffer>,
pub snapshot: RwLock<Option<SnapshotData>>,
pub delta: RwLock<DeltaState>,
pub(crate) next_node_id: AtomicU64,
pub(crate) next_label_id: AtomicU32,
pub(crate) next_etype_id: AtomicU32,
pub(crate) next_propkey_id: AtomicU32,
pub(crate) next_tx_id: AtomicU64,
pub current_tx: Mutex<Option<SingleFileTxState>>,
pub(crate) label_names: RwLock<HashMap<String, LabelId>>,
pub(crate) label_ids: RwLock<HashMap<LabelId, String>>,
pub(crate) etype_names: RwLock<HashMap<String, ETypeId>>,
pub(crate) etype_ids: RwLock<HashMap<ETypeId, String>>,
pub(crate) propkey_names: RwLock<HashMap<String, PropKeyId>>,
pub(crate) propkey_ids: RwLock<HashMap<PropKeyId, String>>,
pub(crate) auto_checkpoint: bool,
pub(crate) checkpoint_threshold: f64,
pub(crate) background_checkpoint: bool,
pub(crate) checkpoint_status: Mutex<CheckpointStatus>,
pub(crate) vector_stores: RwLock<HashMap<PropKeyId, VectorManifest>>,
pub cache: RwLock<Option<CacheManager>>,
pub(crate) sync_mode: open::SyncMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckpointStatus {
Idle,
Running,
Completing,
}
impl SingleFileDB {
pub fn alloc_node_id(&self) -> NodeId {
self.next_node_id.fetch_add(1, Ordering::SeqCst)
}
pub fn alloc_label_id(&self) -> LabelId {
self.next_label_id.fetch_add(1, Ordering::SeqCst)
}
pub fn alloc_etype_id(&self) -> ETypeId {
self.next_etype_id.fetch_add(1, Ordering::SeqCst)
}
pub fn alloc_propkey_id(&self) -> PropKeyId {
self.next_propkey_id.fetch_add(1, Ordering::SeqCst)
}
pub fn alloc_tx_id(&self) -> TxId {
self.next_tx_id.fetch_add(1, Ordering::SeqCst)
}
pub fn node_exists(&self, node_id: NodeId) -> bool {
let delta = self.delta.read();
if delta.is_node_deleted(node_id) {
return false;
}
if delta.is_node_created(node_id) {
return true;
}
if let Some(ref snapshot) = *self.snapshot.read() {
return snapshot.has_node(node_id);
}
false
}
pub fn edge_exists(&self, src: NodeId, etype: ETypeId, dst: NodeId) -> bool {
let delta = self.delta.read();
if delta.is_edge_deleted(src, etype, dst) {
return false;
}
if delta.is_edge_added(src, etype, dst) {
return true;
}
if let Some(ref snapshot) = *self.snapshot.read() {
if let (Some(src_phys), Some(dst_phys)) =
(snapshot.get_phys_node(src), snapshot.get_phys_node(dst))
{
return snapshot.has_edge(src_phys, etype, dst_phys);
}
}
false
}
pub fn cache_is_enabled(&self) -> bool {
self
.cache
.read()
.as_ref()
.map(|c| c.is_enabled())
.unwrap_or(false)
}
pub fn cache_invalidate_node(&self, node_id: NodeId) {
if let Some(ref mut cache) = *self.cache.write() {
cache.invalidate_node(node_id);
}
}
pub fn cache_invalidate_edge(&self, src: NodeId, etype: ETypeId, dst: NodeId) {
if let Some(ref mut cache) = *self.cache.write() {
cache.invalidate_edge(src, etype, dst);
}
}
pub fn cache_invalidate_key(&self, key: &str) {
if let Some(ref mut cache) = *self.cache.write() {
cache.invalidate_key(key);
}
}
pub fn cache_clear(&self) {
if let Some(ref mut cache) = *self.cache.write() {
cache.clear();
}
}
pub fn cache_clear_query(&self) {
if let Some(ref mut cache) = *self.cache.write() {
cache.clear_query_cache();
}
}
pub fn cache_clear_key(&self) {
if let Some(ref mut cache) = *self.cache.write() {
cache.clear_key_cache();
}
}
pub fn cache_clear_property(&self) {
if let Some(ref mut cache) = *self.cache.write() {
cache.clear_property_cache();
}
}
pub fn cache_clear_traversal(&self) {
if let Some(ref mut cache) = *self.cache.write() {
cache.clear_traversal_cache();
}
}
pub fn cache_stats(&self) -> Option<CacheStats> {
self.cache.read().as_ref().map(|c| c.get_stats())
}
pub fn cache_reset_stats(&self) {
if let Some(ref mut cache) = *self.cache.write() {
cache.reset_stats();
}
}
}
pub fn is_single_file_path<P: AsRef<Path>>(path: P) -> bool {
path
.as_ref()
.extension()
.map(|ext| ext == "kitedb")
.unwrap_or(false)
}
pub fn single_file_extension() -> &'static str {
EXT_KITEDB
}