Skip to main content

reddb_server/storage/unified/
store.rs

1//! Unified Store
2//!
3//! High-level API for the unified storage layer that combines tables, graphs,
4//! and vectors into a single coherent interface.
5//!
6//! # Features
7//!
8//! - Multi-collection management
9//! - Cross-collection queries
10//! - Unified entity access
11//! - Automatic ID generation
12//! - Cross-reference management
13//! - **Binary persistence** with pages, indices, and efficient encoding
14//! - **Page-based storage** via Pager for ACID durability
15//!
16//! # Persistence Modes
17//!
18//! 1. **File Mode** (`save_to_file`/`load_from_file`): Simple binary dump
19//! 2. **Paged Mode** (`open`/`persist`): Full page-based storage with B-tree indices
20
21use std::collections::{BTreeMap, HashMap};
22use std::fs::File;
23use std::io::{BufReader, BufWriter, Read, Write};
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
26use std::sync::Arc;
27
28use parking_lot::RwLock;
29
30use super::context_index::ContextIndex;
31use super::entity::{
32    CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
33    GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
34};
35use super::entity_cache::EntityCache;
36use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
37use super::metadata::{Metadata, MetadataFilter, MetadataValue};
38use super::segment::SegmentError;
39use crate::api::{DurabilityMode, GroupCommitOptions};
40use crate::physical::{ManifestEvent, ManifestEventKind};
41use crate::storage::engine::pager::PagerError;
42use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
43use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
44use crate::storage::schema::types::Value;
45
46const STORE_MAGIC: &[u8; 4] = b"RDST";
47const STORE_VERSION_V1: u32 = 1;
48const STORE_VERSION_V2: u32 = 2;
49const STORE_VERSION_V3: u32 = 3;
50const STORE_VERSION_V4: u32 = 4;
51const STORE_VERSION_V5: u32 = 5;
52const STORE_VERSION_V6: u32 = 6;
53const STORE_VERSION_V7: u32 = 7; // entity records include metadata (serialize_entity_record format)
54const STORE_VERSION_V8: u32 = 8; // table rows may carry explicit logical identity
55const STORE_VERSION_V9: u32 = 9; // entity records persist MVCC xmin/xmax
56const METADATA_MAGIC: &[u8; 4] = b"RDM2";
57const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
58const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
59const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
60const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
61const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
62const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
63const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
64const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
65const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
66
67#[derive(Debug, Clone)]
68pub struct NativeManifestEntrySummary {
69    pub collection: String,
70    pub object_key: String,
71    pub kind: String,
72    pub block_index: u64,
73    pub block_checksum: u128,
74    pub snapshot_min: u64,
75    pub snapshot_max: Option<u64>,
76}
77
78#[derive(Debug, Clone)]
79pub struct NativeManifestSummary {
80    pub sequence: u64,
81    pub event_count: u32,
82    pub events_complete: bool,
83    pub omitted_event_count: u32,
84    pub recent_events: Vec<NativeManifestEntrySummary>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct NativeRegistryIndexSummary {
89    pub name: String,
90    pub kind: String,
91    pub collection: Option<String>,
92    pub enabled: bool,
93    pub entries: u64,
94    pub estimated_memory_bytes: u64,
95    pub last_refresh_ms: Option<u128>,
96    pub backend: String,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct NativeRegistryProjectionSummary {
101    pub name: String,
102    pub source: String,
103    pub created_at_unix_ms: u128,
104    pub updated_at_unix_ms: u128,
105    pub node_labels: Vec<String>,
106    pub node_types: Vec<String>,
107    pub edge_labels: Vec<String>,
108    pub last_materialized_sequence: Option<u64>,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct NativeRegistryJobSummary {
113    pub id: String,
114    pub kind: String,
115    pub projection: Option<String>,
116    pub state: String,
117    pub created_at_unix_ms: u128,
118    pub updated_at_unix_ms: u128,
119    pub last_run_sequence: Option<u64>,
120    pub metadata: BTreeMap<String, String>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct NativeVectorArtifactSummary {
125    pub collection: String,
126    pub artifact_kind: String,
127    pub vector_count: u64,
128    pub dimension: u32,
129    pub max_layer: u32,
130    pub serialized_bytes: u64,
131    pub checksum: u64,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct NativeVectorArtifactPageSummary {
136    pub collection: String,
137    pub artifact_kind: String,
138    pub root_page: u32,
139    pub page_count: u32,
140    pub byte_len: u64,
141    pub checksum: u64,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct NativeRegistrySummary {
146    pub collection_count: u32,
147    pub index_count: u32,
148    pub graph_projection_count: u32,
149    pub analytics_job_count: u32,
150    pub vector_artifact_count: u32,
151    pub collections_complete: bool,
152    pub indexes_complete: bool,
153    pub graph_projections_complete: bool,
154    pub analytics_jobs_complete: bool,
155    pub vector_artifacts_complete: bool,
156    pub omitted_collection_count: u32,
157    pub omitted_index_count: u32,
158    pub omitted_graph_projection_count: u32,
159    pub omitted_analytics_job_count: u32,
160    pub omitted_vector_artifact_count: u32,
161    pub collection_names: Vec<String>,
162    pub indexes: Vec<NativeRegistryIndexSummary>,
163    pub graph_projections: Vec<NativeRegistryProjectionSummary>,
164    pub analytics_jobs: Vec<NativeRegistryJobSummary>,
165    pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct NativeSnapshotSummary {
170    pub snapshot_id: u64,
171    pub created_at_unix_ms: u128,
172    pub superblock_sequence: u64,
173    pub collection_count: u32,
174    pub total_entities: u64,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct NativeExportSummary {
179    pub name: String,
180    pub created_at_unix_ms: u128,
181    pub snapshot_id: Option<u64>,
182    pub superblock_sequence: u64,
183    pub collection_count: u32,
184    pub total_entities: u64,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct NativeRecoverySummary {
189    pub snapshot_count: u32,
190    pub export_count: u32,
191    pub snapshots_complete: bool,
192    pub exports_complete: bool,
193    pub omitted_snapshot_count: u32,
194    pub omitted_export_count: u32,
195    pub snapshots: Vec<NativeSnapshotSummary>,
196    pub exports: Vec<NativeExportSummary>,
197}
198
199#[derive(Debug, Clone, PartialEq, Eq)]
200pub struct NativeCatalogCollectionSummary {
201    pub name: String,
202    pub entities: u64,
203    pub cross_refs: u64,
204    pub segments: u32,
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct NativeCatalogSummary {
209    pub collection_count: u32,
210    pub total_entities: u64,
211    pub collections_complete: bool,
212    pub omitted_collection_count: u32,
213    pub collections: Vec<NativeCatalogCollectionSummary>,
214}
215
216#[derive(Debug, Clone, Default, PartialEq, Eq)]
217pub struct MvccVacuumStats {
218    pub scanned_versions: u64,
219    pub retained_versions: u64,
220    pub reclaimed_versions: u64,
221    pub retained_history_versions: u64,
222    pub reclaimed_history_versions: u64,
223    pub retained_tombstones: u64,
224    pub reclaimed_tombstones: u64,
225}
226
227impl MvccVacuumStats {
228    pub fn add(&mut self, other: &Self) {
229        self.scanned_versions += other.scanned_versions;
230        self.retained_versions += other.retained_versions;
231        self.reclaimed_versions += other.reclaimed_versions;
232        self.retained_history_versions += other.retained_history_versions;
233        self.reclaimed_history_versions += other.reclaimed_history_versions;
234        self.retained_tombstones += other.retained_tombstones;
235        self.reclaimed_tombstones += other.reclaimed_tombstones;
236    }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct NativeMetadataStateSummary {
241    pub protocol_version: String,
242    pub generated_at_unix_ms: u128,
243    pub last_loaded_from: Option<String>,
244    pub last_healed_at_unix_ms: Option<u128>,
245}
246
247#[derive(Debug, Clone)]
248pub struct NativePhysicalState {
249    pub header: PhysicalFileHeader,
250    pub collection_roots: BTreeMap<String, u64>,
251    pub manifest: Option<NativeManifestSummary>,
252    pub registry: Option<NativeRegistrySummary>,
253    pub recovery: Option<NativeRecoverySummary>,
254    pub catalog: Option<NativeCatalogSummary>,
255    pub metadata_state: Option<NativeMetadataStateSummary>,
256    pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
257}
258
259// ============================================================================
260// Configuration
261// ============================================================================
262
263/// Configuration for UnifiedStore
264#[derive(Debug, Clone)]
265pub struct UnifiedStoreConfig {
266    /// Configuration for segment managers
267    pub manager_config: ManagerConfig,
268    /// Automatically index cross-references on insert
269    pub auto_index_refs: bool,
270    /// Automatically build a HASH index on a user `id` column the first
271    /// time a row carrying that column is inserted into a collection.
272    /// Mirrors PostgreSQL's implicit primary-key index and Mongo's `_id`
273    /// default index — without it, `WHERE id = N` falls through to a
274    /// full segment scan because RedDB has no concept of an automatic
275    /// primary-key index on user-declared columns. See `docs/perf/
276    /// delete-sequential-2026-05-06.md` for the perf rationale.
277    /// Defaults to `true`; set to `false` to opt out per workload.
278    pub auto_index_id: bool,
279    /// Maximum cross-references per entity
280    pub max_cross_refs: usize,
281    /// Enable write-ahead logging
282    pub enable_wal: bool,
283    /// Durability profile for paged writes.
284    pub durability_mode: DurabilityMode,
285    /// Group-commit batching knobs when using grouped durability.
286    pub group_commit: GroupCommitOptions,
287    /// Data directory path
288    pub data_dir: Option<std::path::PathBuf>,
289}
290
291impl Default for UnifiedStoreConfig {
292    fn default() -> Self {
293        Self {
294            manager_config: ManagerConfig::default(),
295            auto_index_refs: true,
296            auto_index_id: true,
297            max_cross_refs: 1000,
298            enable_wal: false,
299            // Mirrors `RedDBOptions::default().durability_mode` — see
300            // `src/api.rs` for the rationale.
301            durability_mode: DurabilityMode::WalDurableGrouped,
302            group_commit: GroupCommitOptions::default(),
303            data_dir: None,
304        }
305    }
306}
307
308impl UnifiedStoreConfig {
309    /// Create config with data directory
310    pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
311        self.data_dir = Some(path.into());
312        self
313    }
314
315    /// Enable WAL
316    pub fn with_wal(mut self) -> Self {
317        self.enable_wal = true;
318        self
319    }
320
321    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
322        self.durability_mode = mode;
323        self
324    }
325
326    pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
327        self.group_commit = options;
328        self
329    }
330
331    /// Set max cross-references
332    pub fn with_max_refs(mut self, max: usize) -> Self {
333        self.max_cross_refs = max;
334        self
335    }
336}
337
338// ============================================================================
339// Error Types
340// ============================================================================
341
342/// Errors from UnifiedStore operations
343#[derive(Debug)]
344pub enum StoreError {
345    /// Collection already exists
346    CollectionExists(String),
347    /// Collection not found
348    CollectionNotFound(String),
349    /// Entity not found
350    EntityNotFound(EntityId),
351    /// Too many cross-references
352    TooManyRefs(EntityId),
353    /// Segment error
354    Segment(SegmentError),
355    /// I/O error
356    Io(std::io::Error),
357    /// Serialization error
358    Serialization(String),
359    /// Internal error (lock poisoning, invariant violation)
360    Internal(String),
361}
362
363impl std::fmt::Display for StoreError {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        match self {
366            Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
367            Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
368            Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
369            Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
370            Self::Segment(e) => write!(f, "Segment error: {:?}", e),
371            Self::Io(e) => write!(f, "I/O error: {}", e),
372            Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
373            Self::Internal(msg) => write!(f, "Internal error: {}", msg),
374        }
375    }
376}
377
378impl std::error::Error for StoreError {}
379
380impl From<SegmentError> for StoreError {
381    fn from(e: SegmentError) -> Self {
382        Self::Segment(e)
383    }
384}
385
386impl From<std::io::Error> for StoreError {
387    fn from(e: std::io::Error) -> Self {
388        Self::Io(e)
389    }
390}
391
392// ============================================================================
393// Statistics
394// ============================================================================
395
396/// Statistics for UnifiedStore
397#[derive(Debug, Clone, Default)]
398pub struct StoreStats {
399    /// Number of collections
400    pub collection_count: usize,
401    /// Total entities across all collections
402    pub total_entities: usize,
403    /// Total memory usage in bytes
404    pub total_memory_bytes: usize,
405    /// Per-collection statistics
406    pub collections: HashMap<String, ManagerStats>,
407    /// Total cross-references
408    pub cross_ref_count: usize,
409}
410
411impl StoreStats {
412    /// Get average entities per collection
413    pub fn avg_entities_per_collection(&self) -> f64 {
414        if self.collection_count == 0 {
415            0.0
416        } else {
417            self.total_entities as f64 / self.collection_count as f64
418        }
419    }
420
421    /// Get memory in MB
422    pub fn memory_mb(&self) -> f64 {
423        self.total_memory_bytes as f64 / (1024.0 * 1024.0)
424    }
425}
426
427// ============================================================================
428// UnifiedStore - The Main API
429// ============================================================================
430
431/// Unified storage for tables, graphs, and vectors
432///
433/// UnifiedStore provides a single coherent interface for all data types:
434/// - **Tables**: Row-based data with columns
435/// - **Graphs**: Nodes and edges with labels
436/// - **Vectors**: Embeddings for similarity search
437///
438/// # Features
439///
440/// - Multi-collection management
441/// - Cross-collection queries
442/// - Cross-reference tracking between entities
443/// - Automatic ID generation
444/// - Segment-based storage with growing/sealed lifecycle
445///
446/// # Example
447///
448/// ```ignore
449/// use reddb::storage::{Entity, Store};
450///
451/// let store = Store::new();
452///
453/// // Create a collection
454/// store.create_collection("hosts")?;
455///
456/// // Insert an entity
457/// let entity = Entity::table_row(1, "hosts", 1, vec![]);
458/// let id = store.insert("hosts", entity)?;
459///
460/// // Query
461/// let found = store.get("hosts", id);
462/// ```
463pub struct UnifiedStore {
464    /// Store configuration
465    config: UnifiedStoreConfig,
466    /// File format version for serialization
467    format_version: AtomicU32,
468    /// Global entity ID counter
469    next_entity_id: AtomicU64,
470    /// Collections by name
471    collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
472    /// Forward cross-references: source_id → [(target_id, ref_type, target_collection)]
473    cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
474    /// Reverse cross-references: target_id → [(source_id, ref_type, source_collection)]
475    reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
476    /// Optional page-based storage via Pager
477    pager: Option<Arc<Pager>>,
478    /// Database file path (for paged mode)
479    db_path: Option<PathBuf>,
480    /// B-tree indices for O(log n) entity lookups by ID (per collection).
481    /// Stored as `Arc<BTree>` so hot-path callers can clone the handle out
482    /// under a read lock and release the map-level lock before doing the
483    /// actual insert — previously the outer RwLock was held for the whole
484    /// btree mutation, serialising every concurrent insert across every
485    /// collection into one global write lock.
486    btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
487    /// Cross-structure context index for unified search
488    context_index: ContextIndex,
489    /// Hot entity cache — sharded bounded LRU for `get_any` lookups.
490    /// See `entity_cache.rs` for the rationale; this replaced a single
491    /// `RwLock<HashMap>` that serialised every `delete_batch` invalidation.
492    entity_cache: EntityCache,
493    /// Graph node label index: (collection, label) → Vec<EntityId>.
494    /// O(1) lookup for MATCH (n:Label) graph patterns — avoids full collection scan.
495    graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
496    /// Whether the paged registry on page 1 must be rewritten before the next flush.
497    paged_registry_dirty: AtomicBool,
498    /// Logical store WAL / grouped durability coordinator for paged mode.
499    commit: Option<Arc<StoreCommitCoordinator>>,
500    /// Counts how often `unindex_cross_refs_batch` took the read-only fast
501    /// path (no inbound refs, no outbound refs for any deleted id) and so
502    /// avoided acquiring the `cross_refs` / `reverse_refs` write locks.
503    /// Used by tests to pin the early-exit; cheap relaxed counter otherwise.
504    unindex_cross_refs_fast_path: AtomicU64,
505}
506
507mod builder;
508mod commit;
509mod impl_entities;
510mod impl_file;
511mod impl_native_a;
512mod impl_native_b;
513mod impl_native_c;
514mod impl_pages;
515mod native_helpers;
516
517pub use self::builder::EntityBuilder;
518pub(crate) use self::commit::DeferredStoreWalActions;
519use self::commit::{StoreCommitCoordinator, StoreWalAction};
520use self::native_helpers::*;