Skip to main content

reddb_server/storage/unified/
store.rs

1//! Unified Store
2//!
3//! High-level API for the unified storage layer that combines tables, graphs,
4//! and vectors into a single coherent interface.
5//!
6//! # Features
7//!
8//! - Multi-collection management
9//! - Cross-collection queries
10//! - Unified entity access
11//! - Automatic ID generation
12//! - Cross-reference management
13//! - **Binary persistence** with pages, indices, and efficient encoding
14//! - **Page-based storage** via Pager for ACID durability
15//!
16//! # Persistence Modes
17//!
18//! 1. **File Mode** (`save_to_file`/`load_from_file`): Simple binary dump
19//! 2. **Paged Mode** (`open`/`persist`): Full page-based storage with B-tree indices
20
21use std::collections::{BTreeMap, HashMap};
22use std::fs::File;
23use std::io::{BufReader, BufWriter, Read, Write};
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
26use std::sync::Arc;
27
28use parking_lot::RwLock;
29
30use super::context_index::ContextIndex;
31use super::entity::{
32    CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
33    GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
34};
35use super::entity_cache::EntityCache;
36use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
37use super::metadata::{Metadata, MetadataFilter, MetadataValue};
38use super::segment::SegmentError;
39use crate::api::{DurabilityMode, GroupCommitOptions};
40use crate::physical::{ManifestEvent, ManifestEventKind};
41use crate::storage::engine::pager::PagerError;
42use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
43use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
44use crate::storage::schema::types::Value;
45
46const STORE_MAGIC: &[u8; 4] = b"RDST";
47const STORE_VERSION_V1: u32 = 1;
48const STORE_VERSION_V2: u32 = 2;
49const STORE_VERSION_V3: u32 = 3;
50const STORE_VERSION_V4: u32 = 4;
51const STORE_VERSION_V5: u32 = 5;
52const STORE_VERSION_V6: u32 = 6;
53const STORE_VERSION_V7: u32 = 7; // entity records include metadata (serialize_entity_record format)
54const METADATA_MAGIC: &[u8; 4] = b"RDM2";
55const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
56const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
57const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
58const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
59const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
60const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
61const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
62const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
63const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
64
65#[derive(Debug, Clone)]
66pub struct NativeManifestEntrySummary {
67    pub collection: String,
68    pub object_key: String,
69    pub kind: String,
70    pub block_index: u64,
71    pub block_checksum: u128,
72    pub snapshot_min: u64,
73    pub snapshot_max: Option<u64>,
74}
75
76#[derive(Debug, Clone)]
77pub struct NativeManifestSummary {
78    pub sequence: u64,
79    pub event_count: u32,
80    pub events_complete: bool,
81    pub omitted_event_count: u32,
82    pub recent_events: Vec<NativeManifestEntrySummary>,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct NativeRegistryIndexSummary {
87    pub name: String,
88    pub kind: String,
89    pub collection: Option<String>,
90    pub enabled: bool,
91    pub entries: u64,
92    pub estimated_memory_bytes: u64,
93    pub last_refresh_ms: Option<u128>,
94    pub backend: String,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct NativeRegistryProjectionSummary {
99    pub name: String,
100    pub source: String,
101    pub created_at_unix_ms: u128,
102    pub updated_at_unix_ms: u128,
103    pub node_labels: Vec<String>,
104    pub node_types: Vec<String>,
105    pub edge_labels: Vec<String>,
106    pub last_materialized_sequence: Option<u64>,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct NativeRegistryJobSummary {
111    pub id: String,
112    pub kind: String,
113    pub projection: Option<String>,
114    pub state: String,
115    pub created_at_unix_ms: u128,
116    pub updated_at_unix_ms: u128,
117    pub last_run_sequence: Option<u64>,
118    pub metadata: BTreeMap<String, String>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct NativeVectorArtifactSummary {
123    pub collection: String,
124    pub artifact_kind: String,
125    pub vector_count: u64,
126    pub dimension: u32,
127    pub max_layer: u32,
128    pub serialized_bytes: u64,
129    pub checksum: u64,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct NativeVectorArtifactPageSummary {
134    pub collection: String,
135    pub artifact_kind: String,
136    pub root_page: u32,
137    pub page_count: u32,
138    pub byte_len: u64,
139    pub checksum: u64,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct NativeRegistrySummary {
144    pub collection_count: u32,
145    pub index_count: u32,
146    pub graph_projection_count: u32,
147    pub analytics_job_count: u32,
148    pub vector_artifact_count: u32,
149    pub collections_complete: bool,
150    pub indexes_complete: bool,
151    pub graph_projections_complete: bool,
152    pub analytics_jobs_complete: bool,
153    pub vector_artifacts_complete: bool,
154    pub omitted_collection_count: u32,
155    pub omitted_index_count: u32,
156    pub omitted_graph_projection_count: u32,
157    pub omitted_analytics_job_count: u32,
158    pub omitted_vector_artifact_count: u32,
159    pub collection_names: Vec<String>,
160    pub indexes: Vec<NativeRegistryIndexSummary>,
161    pub graph_projections: Vec<NativeRegistryProjectionSummary>,
162    pub analytics_jobs: Vec<NativeRegistryJobSummary>,
163    pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct NativeSnapshotSummary {
168    pub snapshot_id: u64,
169    pub created_at_unix_ms: u128,
170    pub superblock_sequence: u64,
171    pub collection_count: u32,
172    pub total_entities: u64,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct NativeExportSummary {
177    pub name: String,
178    pub created_at_unix_ms: u128,
179    pub snapshot_id: Option<u64>,
180    pub superblock_sequence: u64,
181    pub collection_count: u32,
182    pub total_entities: u64,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct NativeRecoverySummary {
187    pub snapshot_count: u32,
188    pub export_count: u32,
189    pub snapshots_complete: bool,
190    pub exports_complete: bool,
191    pub omitted_snapshot_count: u32,
192    pub omitted_export_count: u32,
193    pub snapshots: Vec<NativeSnapshotSummary>,
194    pub exports: Vec<NativeExportSummary>,
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct NativeCatalogCollectionSummary {
199    pub name: String,
200    pub entities: u64,
201    pub cross_refs: u64,
202    pub segments: u32,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct NativeCatalogSummary {
207    pub collection_count: u32,
208    pub total_entities: u64,
209    pub collections_complete: bool,
210    pub omitted_collection_count: u32,
211    pub collections: Vec<NativeCatalogCollectionSummary>,
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct NativeMetadataStateSummary {
216    pub protocol_version: String,
217    pub generated_at_unix_ms: u128,
218    pub last_loaded_from: Option<String>,
219    pub last_healed_at_unix_ms: Option<u128>,
220}
221
222#[derive(Debug, Clone)]
223pub struct NativePhysicalState {
224    pub header: PhysicalFileHeader,
225    pub collection_roots: BTreeMap<String, u64>,
226    pub manifest: Option<NativeManifestSummary>,
227    pub registry: Option<NativeRegistrySummary>,
228    pub recovery: Option<NativeRecoverySummary>,
229    pub catalog: Option<NativeCatalogSummary>,
230    pub metadata_state: Option<NativeMetadataStateSummary>,
231    pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
232}
233
234// ============================================================================
235// Configuration
236// ============================================================================
237
238/// Configuration for UnifiedStore
239#[derive(Debug, Clone)]
240pub struct UnifiedStoreConfig {
241    /// Configuration for segment managers
242    pub manager_config: ManagerConfig,
243    /// Automatically index cross-references on insert
244    pub auto_index_refs: bool,
245    /// Automatically build a HASH index on a user `id` column the first
246    /// time a row carrying that column is inserted into a collection.
247    /// Mirrors PostgreSQL's implicit primary-key index and Mongo's `_id`
248    /// default index — without it, `WHERE id = N` falls through to a
249    /// full segment scan because RedDB has no concept of an automatic
250    /// primary-key index on user-declared columns. See `docs/perf/
251    /// delete-sequential-2026-05-06.md` for the perf rationale.
252    /// Defaults to `true`; set to `false` to opt out per workload.
253    pub auto_index_id: bool,
254    /// Maximum cross-references per entity
255    pub max_cross_refs: usize,
256    /// Enable write-ahead logging
257    pub enable_wal: bool,
258    /// Durability profile for paged writes.
259    pub durability_mode: DurabilityMode,
260    /// Group-commit batching knobs when using grouped durability.
261    pub group_commit: GroupCommitOptions,
262    /// Data directory path
263    pub data_dir: Option<std::path::PathBuf>,
264}
265
266impl Default for UnifiedStoreConfig {
267    fn default() -> Self {
268        Self {
269            manager_config: ManagerConfig::default(),
270            auto_index_refs: true,
271            auto_index_id: true,
272            max_cross_refs: 1000,
273            enable_wal: false,
274            // Mirrors `RedDBOptions::default().durability_mode` — see
275            // `src/api.rs` for the rationale.
276            durability_mode: DurabilityMode::WalDurableGrouped,
277            group_commit: GroupCommitOptions::default(),
278            data_dir: None,
279        }
280    }
281}
282
283impl UnifiedStoreConfig {
284    /// Create config with data directory
285    pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
286        self.data_dir = Some(path.into());
287        self
288    }
289
290    /// Enable WAL
291    pub fn with_wal(mut self) -> Self {
292        self.enable_wal = true;
293        self
294    }
295
296    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
297        self.durability_mode = mode;
298        self
299    }
300
301    pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
302        self.group_commit = options;
303        self
304    }
305
306    /// Set max cross-references
307    pub fn with_max_refs(mut self, max: usize) -> Self {
308        self.max_cross_refs = max;
309        self
310    }
311}
312
313// ============================================================================
314// Error Types
315// ============================================================================
316
317/// Errors from UnifiedStore operations
318#[derive(Debug)]
319pub enum StoreError {
320    /// Collection already exists
321    CollectionExists(String),
322    /// Collection not found
323    CollectionNotFound(String),
324    /// Entity not found
325    EntityNotFound(EntityId),
326    /// Too many cross-references
327    TooManyRefs(EntityId),
328    /// Segment error
329    Segment(SegmentError),
330    /// I/O error
331    Io(std::io::Error),
332    /// Serialization error
333    Serialization(String),
334    /// Internal error (lock poisoning, invariant violation)
335    Internal(String),
336}
337
338impl std::fmt::Display for StoreError {
339    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340        match self {
341            Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
342            Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
343            Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
344            Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
345            Self::Segment(e) => write!(f, "Segment error: {:?}", e),
346            Self::Io(e) => write!(f, "I/O error: {}", e),
347            Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
348            Self::Internal(msg) => write!(f, "Internal error: {}", msg),
349        }
350    }
351}
352
353impl std::error::Error for StoreError {}
354
355impl From<SegmentError> for StoreError {
356    fn from(e: SegmentError) -> Self {
357        Self::Segment(e)
358    }
359}
360
361impl From<std::io::Error> for StoreError {
362    fn from(e: std::io::Error) -> Self {
363        Self::Io(e)
364    }
365}
366
367// ============================================================================
368// Statistics
369// ============================================================================
370
371/// Statistics for UnifiedStore
372#[derive(Debug, Clone, Default)]
373pub struct StoreStats {
374    /// Number of collections
375    pub collection_count: usize,
376    /// Total entities across all collections
377    pub total_entities: usize,
378    /// Total memory usage in bytes
379    pub total_memory_bytes: usize,
380    /// Per-collection statistics
381    pub collections: HashMap<String, ManagerStats>,
382    /// Total cross-references
383    pub cross_ref_count: usize,
384}
385
386impl StoreStats {
387    /// Get average entities per collection
388    pub fn avg_entities_per_collection(&self) -> f64 {
389        if self.collection_count == 0 {
390            0.0
391        } else {
392            self.total_entities as f64 / self.collection_count as f64
393        }
394    }
395
396    /// Get memory in MB
397    pub fn memory_mb(&self) -> f64 {
398        self.total_memory_bytes as f64 / (1024.0 * 1024.0)
399    }
400}
401
402// ============================================================================
403// UnifiedStore - The Main API
404// ============================================================================
405
406/// Unified storage for tables, graphs, and vectors
407///
408/// UnifiedStore provides a single coherent interface for all data types:
409/// - **Tables**: Row-based data with columns
410/// - **Graphs**: Nodes and edges with labels
411/// - **Vectors**: Embeddings for similarity search
412///
413/// # Features
414///
415/// - Multi-collection management
416/// - Cross-collection queries
417/// - Cross-reference tracking between entities
418/// - Automatic ID generation
419/// - Segment-based storage with growing/sealed lifecycle
420///
421/// # Example
422///
423/// ```ignore
424/// use reddb::storage::{Entity, Store};
425///
426/// let store = Store::new();
427///
428/// // Create a collection
429/// store.create_collection("hosts")?;
430///
431/// // Insert an entity
432/// let entity = Entity::table_row(1, "hosts", 1, vec![]);
433/// let id = store.insert("hosts", entity)?;
434///
435/// // Query
436/// let found = store.get("hosts", id);
437/// ```
438pub struct UnifiedStore {
439    /// Store configuration
440    config: UnifiedStoreConfig,
441    /// File format version for serialization
442    format_version: AtomicU32,
443    /// Global entity ID counter
444    next_entity_id: AtomicU64,
445    /// Collections by name
446    collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
447    /// Forward cross-references: source_id → [(target_id, ref_type, target_collection)]
448    cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
449    /// Reverse cross-references: target_id → [(source_id, ref_type, source_collection)]
450    reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
451    /// Optional page-based storage via Pager
452    pager: Option<Arc<Pager>>,
453    /// Database file path (for paged mode)
454    db_path: Option<PathBuf>,
455    /// B-tree indices for O(log n) entity lookups by ID (per collection).
456    /// Stored as `Arc<BTree>` so hot-path callers can clone the handle out
457    /// under a read lock and release the map-level lock before doing the
458    /// actual insert — previously the outer RwLock was held for the whole
459    /// btree mutation, serialising every concurrent insert across every
460    /// collection into one global write lock.
461    btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
462    /// Cross-structure context index for unified search
463    context_index: ContextIndex,
464    /// Hot entity cache — sharded bounded LRU for `get_any` lookups.
465    /// See `entity_cache.rs` for the rationale; this replaced a single
466    /// `RwLock<HashMap>` that serialised every `delete_batch` invalidation.
467    entity_cache: EntityCache,
468    /// Graph node label index: (collection, label) → Vec<EntityId>.
469    /// O(1) lookup for MATCH (n:Label) graph patterns — avoids full collection scan.
470    graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
471    /// Whether the paged registry on page 1 must be rewritten before the next flush.
472    paged_registry_dirty: AtomicBool,
473    /// Logical store WAL / grouped durability coordinator for paged mode.
474    commit: Option<Arc<StoreCommitCoordinator>>,
475    /// Counts how often `unindex_cross_refs_batch` took the read-only fast
476    /// path (no inbound refs, no outbound refs for any deleted id) and so
477    /// avoided acquiring the `cross_refs` / `reverse_refs` write locks.
478    /// Used by tests to pin the early-exit; cheap relaxed counter otherwise.
479    unindex_cross_refs_fast_path: AtomicU64,
480}
481
482mod builder;
483mod commit;
484mod impl_entities;
485mod impl_file;
486mod impl_native_a;
487mod impl_native_b;
488mod impl_native_c;
489mod impl_pages;
490mod native_helpers;
491
492pub use self::builder::EntityBuilder;
493use self::commit::{StoreCommitCoordinator, StoreWalAction};
494use self::native_helpers::*;