use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use super::context_index::ContextIndex;
use super::entity::{
CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
};
use super::entity_cache::EntityCache;
use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
use super::metadata::{Metadata, MetadataFilter, MetadataValue};
use super::segment::SegmentError;
use crate::api::{DurabilityMode, GroupCommitOptions};
use crate::physical::{ManifestEvent, ManifestEventKind};
use crate::storage::engine::pager::PagerError;
use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
use crate::storage::schema::types::Value;
const STORE_MAGIC: &[u8; 4] = b"RDST";
const STORE_VERSION_V1: u32 = 1;
const STORE_VERSION_V2: u32 = 2;
const STORE_VERSION_V3: u32 = 3;
const STORE_VERSION_V4: u32 = 4;
const STORE_VERSION_V5: u32 = 5;
const STORE_VERSION_V6: u32 = 6;
const STORE_VERSION_V7: u32 = 7; const STORE_VERSION_V8: u32 = 8; const STORE_VERSION_V9: u32 = 9; const METADATA_MAGIC: &[u8; 4] = b"RDM2";
const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
#[derive(Debug, Clone)]
pub struct NativeManifestEntrySummary {
pub collection: String,
pub object_key: String,
pub kind: String,
pub block_index: u64,
pub block_checksum: u128,
pub snapshot_min: u64,
pub snapshot_max: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct NativeManifestSummary {
pub sequence: u64,
pub event_count: u32,
pub events_complete: bool,
pub omitted_event_count: u32,
pub recent_events: Vec<NativeManifestEntrySummary>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistryIndexSummary {
pub name: String,
pub kind: String,
pub collection: Option<String>,
pub enabled: bool,
pub entries: u64,
pub estimated_memory_bytes: u64,
pub last_refresh_ms: Option<u128>,
pub backend: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistryProjectionSummary {
pub name: String,
pub source: String,
pub created_at_unix_ms: u128,
pub updated_at_unix_ms: u128,
pub node_labels: Vec<String>,
pub node_types: Vec<String>,
pub edge_labels: Vec<String>,
pub last_materialized_sequence: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistryJobSummary {
pub id: String,
pub kind: String,
pub projection: Option<String>,
pub state: String,
pub created_at_unix_ms: u128,
pub updated_at_unix_ms: u128,
pub last_run_sequence: Option<u64>,
pub metadata: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeVectorArtifactSummary {
pub collection: String,
pub artifact_kind: String,
pub vector_count: u64,
pub dimension: u32,
pub max_layer: u32,
pub serialized_bytes: u64,
pub checksum: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeVectorArtifactPageSummary {
pub collection: String,
pub artifact_kind: String,
pub root_page: u32,
pub page_count: u32,
pub byte_len: u64,
pub checksum: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistrySummary {
pub collection_count: u32,
pub index_count: u32,
pub graph_projection_count: u32,
pub analytics_job_count: u32,
pub vector_artifact_count: u32,
pub collections_complete: bool,
pub indexes_complete: bool,
pub graph_projections_complete: bool,
pub analytics_jobs_complete: bool,
pub vector_artifacts_complete: bool,
pub omitted_collection_count: u32,
pub omitted_index_count: u32,
pub omitted_graph_projection_count: u32,
pub omitted_analytics_job_count: u32,
pub omitted_vector_artifact_count: u32,
pub collection_names: Vec<String>,
pub indexes: Vec<NativeRegistryIndexSummary>,
pub graph_projections: Vec<NativeRegistryProjectionSummary>,
pub analytics_jobs: Vec<NativeRegistryJobSummary>,
pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeSnapshotSummary {
pub snapshot_id: u64,
pub created_at_unix_ms: u128,
pub superblock_sequence: u64,
pub collection_count: u32,
pub total_entities: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeExportSummary {
pub name: String,
pub created_at_unix_ms: u128,
pub snapshot_id: Option<u64>,
pub superblock_sequence: u64,
pub collection_count: u32,
pub total_entities: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRecoverySummary {
pub snapshot_count: u32,
pub export_count: u32,
pub snapshots_complete: bool,
pub exports_complete: bool,
pub omitted_snapshot_count: u32,
pub omitted_export_count: u32,
pub snapshots: Vec<NativeSnapshotSummary>,
pub exports: Vec<NativeExportSummary>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeCatalogCollectionSummary {
pub name: String,
pub entities: u64,
pub cross_refs: u64,
pub segments: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeCatalogSummary {
pub collection_count: u32,
pub total_entities: u64,
pub collections_complete: bool,
pub omitted_collection_count: u32,
pub collections: Vec<NativeCatalogCollectionSummary>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MvccVacuumStats {
pub scanned_versions: u64,
pub retained_versions: u64,
pub reclaimed_versions: u64,
pub retained_history_versions: u64,
pub reclaimed_history_versions: u64,
pub retained_tombstones: u64,
pub reclaimed_tombstones: u64,
}
impl MvccVacuumStats {
pub fn add(&mut self, other: &Self) {
self.scanned_versions += other.scanned_versions;
self.retained_versions += other.retained_versions;
self.reclaimed_versions += other.reclaimed_versions;
self.retained_history_versions += other.retained_history_versions;
self.reclaimed_history_versions += other.reclaimed_history_versions;
self.retained_tombstones += other.retained_tombstones;
self.reclaimed_tombstones += other.reclaimed_tombstones;
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeMetadataStateSummary {
pub protocol_version: String,
pub generated_at_unix_ms: u128,
pub last_loaded_from: Option<String>,
pub last_healed_at_unix_ms: Option<u128>,
}
#[derive(Debug, Clone)]
pub struct NativePhysicalState {
pub header: PhysicalFileHeader,
pub collection_roots: BTreeMap<String, u64>,
pub manifest: Option<NativeManifestSummary>,
pub registry: Option<NativeRegistrySummary>,
pub recovery: Option<NativeRecoverySummary>,
pub catalog: Option<NativeCatalogSummary>,
pub metadata_state: Option<NativeMetadataStateSummary>,
pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
}
#[derive(Debug, Clone)]
pub struct UnifiedStoreConfig {
pub manager_config: ManagerConfig,
pub auto_index_refs: bool,
pub auto_index_id: bool,
pub max_cross_refs: usize,
pub enable_wal: bool,
pub durability_mode: DurabilityMode,
pub group_commit: GroupCommitOptions,
pub data_dir: Option<std::path::PathBuf>,
}
impl Default for UnifiedStoreConfig {
fn default() -> Self {
Self {
manager_config: ManagerConfig::default(),
auto_index_refs: true,
auto_index_id: true,
max_cross_refs: 1000,
enable_wal: false,
durability_mode: DurabilityMode::WalDurableGrouped,
group_commit: GroupCommitOptions::default(),
data_dir: None,
}
}
}
impl UnifiedStoreConfig {
pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
self.data_dir = Some(path.into());
self
}
pub fn with_wal(mut self) -> Self {
self.enable_wal = true;
self
}
pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
self.durability_mode = mode;
self
}
pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
self.group_commit = options;
self
}
pub fn with_max_refs(mut self, max: usize) -> Self {
self.max_cross_refs = max;
self
}
}
#[derive(Debug)]
pub enum StoreError {
CollectionExists(String),
CollectionNotFound(String),
EntityNotFound(EntityId),
TooManyRefs(EntityId),
Segment(SegmentError),
Io(std::io::Error),
Serialization(String),
Internal(String),
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
Self::Segment(e) => write!(f, "Segment error: {:?}", e),
Self::Io(e) => write!(f, "I/O error: {}", e),
Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
Self::Internal(msg) => write!(f, "Internal error: {}", msg),
}
}
}
impl std::error::Error for StoreError {}
impl From<SegmentError> for StoreError {
fn from(e: SegmentError) -> Self {
Self::Segment(e)
}
}
impl From<std::io::Error> for StoreError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
#[derive(Debug, Clone, Default)]
pub struct StoreStats {
pub collection_count: usize,
pub total_entities: usize,
pub total_memory_bytes: usize,
pub collections: HashMap<String, ManagerStats>,
pub cross_ref_count: usize,
}
impl StoreStats {
pub fn avg_entities_per_collection(&self) -> f64 {
if self.collection_count == 0 {
0.0
} else {
self.total_entities as f64 / self.collection_count as f64
}
}
pub fn memory_mb(&self) -> f64 {
self.total_memory_bytes as f64 / (1024.0 * 1024.0)
}
}
pub struct UnifiedStore {
config: UnifiedStoreConfig,
format_version: AtomicU32,
next_entity_id: AtomicU64,
collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
pager: Option<Arc<Pager>>,
db_path: Option<PathBuf>,
btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
context_index: ContextIndex,
entity_cache: EntityCache,
graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
paged_registry_dirty: AtomicBool,
commit: Option<Arc<StoreCommitCoordinator>>,
unindex_cross_refs_fast_path: AtomicU64,
}
mod builder;
mod commit;
mod impl_entities;
mod impl_file;
mod impl_native_a;
mod impl_native_b;
mod impl_native_c;
mod impl_pages;
mod native_helpers;
pub use self::builder::EntityBuilder;
pub(crate) use self::commit::DeferredStoreWalActions;
use self::commit::{StoreCommitCoordinator, StoreWalAction};
use self::native_helpers::*;