use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use super::context_index::ContextIndex;
use super::entity::{
CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
};
use super::entity_cache::EntityCache;
use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
use super::metadata::{Metadata, MetadataFilter, MetadataValue};
use super::segment::SegmentError;
use crate::api::{DurabilityMode, GroupCommitOptions};
use crate::physical::{ManifestEvent, ManifestEventKind};
use crate::storage::engine::pager::PagerError;
use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
use crate::storage::schema::types::Value;
pub use reddb_file::{
is_supported_store_version, NativeCatalogCollectionSummary, NativeCatalogSummary,
NativeExportSummary, NativeManifestEntrySummary, NativeManifestSummary,
NativeMetadataStateSummary, NativeRecoverySummary, NativeRegistryIndexSummary,
NativeRegistryJobSummary, NativeRegistryProjectionSummary, NativeRegistrySummary,
NativeSnapshotSummary, NativeVectorArtifactPageSummary, NativeVectorArtifactSummary,
ENTITY_RECORD_MAGIC, METADATA_MAGIC, METADATA_OVERFLOW_MAGIC, NATIVE_BLOB_MAGIC,
NATIVE_CATALOG_MAGIC, NATIVE_COLLECTION_ROOTS_MAGIC, NATIVE_MANIFEST_MAGIC,
NATIVE_MANIFEST_SAMPLE_LIMIT, NATIVE_METADATA_STATE_MAGIC, NATIVE_RECOVERY_MAGIC,
NATIVE_REGISTRY_MAGIC, NATIVE_VECTOR_ARTIFACT_MAGIC, STORE_MAGIC, STORE_VERSION_CURRENT,
STORE_VERSION_V1, STORE_VERSION_V2, STORE_VERSION_V3, STORE_VERSION_V4, STORE_VERSION_V5,
STORE_VERSION_V6, STORE_VERSION_V7, STORE_VERSION_V8, STORE_VERSION_V9,
};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MvccVacuumStats {
pub scanned_versions: u64,
pub retained_versions: u64,
pub reclaimed_versions: u64,
pub retained_history_versions: u64,
pub reclaimed_history_versions: u64,
pub retained_tombstones: u64,
pub reclaimed_tombstones: u64,
}
impl MvccVacuumStats {
pub fn add(&mut self, other: &Self) {
self.scanned_versions += other.scanned_versions;
self.retained_versions += other.retained_versions;
self.reclaimed_versions += other.reclaimed_versions;
self.retained_history_versions += other.retained_history_versions;
self.reclaimed_history_versions += other.reclaimed_history_versions;
self.retained_tombstones += other.retained_tombstones;
self.reclaimed_tombstones += other.reclaimed_tombstones;
}
}
#[derive(Debug, Clone)]
pub struct NativePhysicalState {
pub header: PhysicalFileHeader,
pub collection_roots: BTreeMap<String, u64>,
pub manifest: Option<NativeManifestSummary>,
pub registry: Option<NativeRegistrySummary>,
pub recovery: Option<NativeRecoverySummary>,
pub catalog: Option<NativeCatalogSummary>,
pub metadata_state: Option<NativeMetadataStateSummary>,
pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
}
#[derive(Debug, Clone)]
pub struct UnifiedStoreConfig {
pub manager_config: ManagerConfig,
pub auto_index_refs: bool,
pub auto_index_id: bool,
pub max_cross_refs: usize,
pub enable_wal: bool,
pub durability_mode: DurabilityMode,
pub group_commit: GroupCommitOptions,
pub data_dir: Option<std::path::PathBuf>,
pub embedded_wal_path: Option<std::path::PathBuf>,
}
impl Default for UnifiedStoreConfig {
fn default() -> Self {
Self {
manager_config: ManagerConfig::default(),
auto_index_refs: true,
auto_index_id: true,
max_cross_refs: 1000,
enable_wal: false,
durability_mode: DurabilityMode::WalDurableGrouped,
group_commit: GroupCommitOptions::default(),
data_dir: None,
embedded_wal_path: None,
}
}
}
impl UnifiedStoreConfig {
pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
self.data_dir = Some(path.into());
self
}
pub fn with_wal(mut self) -> Self {
self.enable_wal = true;
self
}
pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
self.durability_mode = mode;
self
}
pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
self.group_commit = options;
self
}
pub fn with_embedded_wal_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
self.embedded_wal_path = Some(path.into());
self
}
pub fn with_max_refs(mut self, max: usize) -> Self {
self.max_cross_refs = max;
self
}
}
#[derive(Debug)]
pub enum StoreError {
CollectionExists(String),
CollectionNotFound(String),
EntityNotFound(EntityId),
TooManyRefs(EntityId),
Segment(SegmentError),
Io(std::io::Error),
Serialization(String),
Internal(String),
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
Self::Segment(e) => write!(f, "Segment error: {:?}", e),
Self::Io(e) => write!(f, "I/O error: {}", e),
Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
Self::Internal(msg) => write!(f, "Internal error: {}", msg),
}
}
}
impl std::error::Error for StoreError {}
impl From<SegmentError> for StoreError {
fn from(e: SegmentError) -> Self {
Self::Segment(e)
}
}
impl From<std::io::Error> for StoreError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
#[derive(Debug, Clone, Default)]
pub struct StoreStats {
pub collection_count: usize,
pub total_entities: usize,
pub total_memory_bytes: usize,
pub collections: HashMap<String, ManagerStats>,
pub cross_ref_count: usize,
}
impl StoreStats {
pub fn avg_entities_per_collection(&self) -> f64 {
if self.collection_count == 0 {
0.0
} else {
self.total_entities as f64 / self.collection_count as f64
}
}
pub fn memory_mb(&self) -> f64 {
self.total_memory_bytes as f64 / (1024.0 * 1024.0)
}
}
pub struct UnifiedStore {
config: UnifiedStoreConfig,
format_version: AtomicU32,
next_entity_id: AtomicU64,
collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
pager: Option<Arc<Pager>>,
db_path: Option<PathBuf>,
btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
context_index: ContextIndex,
entity_cache: EntityCache,
graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
paged_registry_dirty: AtomicBool,
commit: Option<Arc<StoreCommitCoordinator>>,
unindex_cross_refs_fast_path: AtomicU64,
pub(crate) replayed_turbo_inserts: parking_lot::Mutex<HashMap<String, Vec<(u64, Vec<f32>)>>>,
}
mod builder;
mod commit;
mod impl_entities;
mod impl_file;
mod impl_native_a;
mod impl_native_b;
mod impl_native_c;
mod impl_pages;
mod native_helpers;
pub use self::builder::EntityBuilder;
pub(crate) use self::commit::DeferredStoreWalActions;
use self::commit::{StoreCommitCoordinator, StoreWalAction};
use self::native_helpers::*;