Skip to main content

reddb_server/storage/unified/
store.rs

1//! Unified Store
2//!
3//! High-level API for the unified storage layer that combines tables, graphs,
4//! and vectors into a single coherent interface.
5//!
6//! # Features
7//!
8//! - Multi-collection management
9//! - Cross-collection queries
10//! - Unified entity access
11//! - Automatic ID generation
12//! - Cross-reference management
13//! - **Binary persistence** with pages, indices, and efficient encoding
14//! - **Page-based storage** via Pager for ACID durability
15//!
16//! # Persistence Modes
17//!
18//! 1. **File Mode** (`save_to_file`/`load_from_file`): Simple binary dump
19//! 2. **Paged Mode** (`open`/`persist`): Full page-based storage with B-tree indices
20
21use std::collections::{BTreeMap, HashMap};
22use std::fs::File;
23use std::io::{BufReader, Read};
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
26use std::sync::Arc;
27
28use parking_lot::RwLock;
29
30use super::context_index::ContextIndex;
31use super::entity::{
32    CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
33    GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
34};
35use super::entity_cache::EntityCache;
36use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
37use super::metadata::{Metadata, MetadataFilter, MetadataValue};
38use super::segment::SegmentError;
39use crate::api::{DurabilityMode, GroupCommitOptions};
40use crate::physical::{ManifestEvent, ManifestEventKind};
41use crate::storage::engine::pager::PagerError;
42use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
43use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
44use crate::storage::schema::types::Value;
45
46pub use reddb_file::{
47    is_supported_store_version, NativeCatalogCollectionSummary, NativeCatalogSummary,
48    NativeExportSummary, NativeManifestEntrySummary, NativeManifestSummary,
49    NativeMetadataStateSummary, NativeRecoverySummary, NativeRegistryIndexSummary,
50    NativeRegistryJobSummary, NativeRegistryProjectionSummary, NativeRegistrySummary,
51    NativeSnapshotSummary, NativeVectorArtifactPageSummary, NativeVectorArtifactSummary,
52    ENTITY_RECORD_MAGIC, METADATA_MAGIC, METADATA_OVERFLOW_MAGIC, NATIVE_BLOB_MAGIC,
53    NATIVE_CATALOG_MAGIC, NATIVE_COLLECTION_ROOTS_MAGIC, NATIVE_MANIFEST_MAGIC,
54    NATIVE_MANIFEST_SAMPLE_LIMIT, NATIVE_METADATA_STATE_MAGIC, NATIVE_RECOVERY_MAGIC,
55    NATIVE_REGISTRY_MAGIC, NATIVE_VECTOR_ARTIFACT_MAGIC, STORE_MAGIC, STORE_VERSION_CURRENT,
56    STORE_VERSION_V1, STORE_VERSION_V2, STORE_VERSION_V3, STORE_VERSION_V4, STORE_VERSION_V5,
57    STORE_VERSION_V6, STORE_VERSION_V7, STORE_VERSION_V8, STORE_VERSION_V9,
58};
59
60#[derive(Debug, Clone, Default, PartialEq, Eq)]
61pub struct MvccVacuumStats {
62    pub scanned_versions: u64,
63    pub retained_versions: u64,
64    pub reclaimed_versions: u64,
65    pub retained_history_versions: u64,
66    pub reclaimed_history_versions: u64,
67    pub retained_tombstones: u64,
68    pub reclaimed_tombstones: u64,
69}
70
71impl MvccVacuumStats {
72    pub fn add(&mut self, other: &Self) {
73        self.scanned_versions += other.scanned_versions;
74        self.retained_versions += other.retained_versions;
75        self.reclaimed_versions += other.reclaimed_versions;
76        self.retained_history_versions += other.retained_history_versions;
77        self.reclaimed_history_versions += other.reclaimed_history_versions;
78        self.retained_tombstones += other.retained_tombstones;
79        self.reclaimed_tombstones += other.reclaimed_tombstones;
80    }
81}
82
83#[derive(Debug, Clone)]
84pub struct NativePhysicalState {
85    pub header: PhysicalFileHeader,
86    pub collection_roots: BTreeMap<String, u64>,
87    pub manifest: Option<NativeManifestSummary>,
88    pub registry: Option<NativeRegistrySummary>,
89    pub recovery: Option<NativeRecoverySummary>,
90    pub catalog: Option<NativeCatalogSummary>,
91    pub metadata_state: Option<NativeMetadataStateSummary>,
92    pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
93}
94
95// ============================================================================
96// Configuration
97// ============================================================================
98
99/// Configuration for UnifiedStore
100#[derive(Debug, Clone)]
101pub struct UnifiedStoreConfig {
102    /// Configuration for segment managers
103    pub manager_config: ManagerConfig,
104    /// Automatically index cross-references on insert
105    pub auto_index_refs: bool,
106    /// Automatically build a HASH index on a user `id` column the first
107    /// time a row carrying that column is inserted into a collection.
108    /// Mirrors PostgreSQL's implicit primary-key index and Mongo's `_id`
109    /// default index — without it, `WHERE id = N` falls through to a
110    /// full segment scan because RedDB has no concept of an automatic
111    /// primary-key index on user-declared columns. See `docs/perf/
112    /// delete-sequential-2026-05-06.md` for the perf rationale.
113    /// Defaults to `true`; set to `false` to opt out per workload.
114    pub auto_index_id: bool,
115    /// Maximum cross-references per entity
116    pub max_cross_refs: usize,
117    /// Enable write-ahead logging
118    pub enable_wal: bool,
119    /// Durability profile for paged writes.
120    pub durability_mode: DurabilityMode,
121    /// Group-commit batching knobs when using grouped durability.
122    pub group_commit: GroupCommitOptions,
123    /// Data directory path
124    pub data_dir: Option<std::path::PathBuf>,
125    /// Embedded single-file artifact used for the internal WAL stream.
126    pub embedded_wal_path: Option<std::path::PathBuf>,
127}
128
129impl Default for UnifiedStoreConfig {
130    fn default() -> Self {
131        Self {
132            manager_config: ManagerConfig::default(),
133            auto_index_refs: true,
134            auto_index_id: true,
135            max_cross_refs: 1000,
136            enable_wal: false,
137            // Mirrors `RedDBOptions::default().durability_mode` — see
138            // `src/api.rs` for the rationale.
139            durability_mode: DurabilityMode::WalDurableGrouped,
140            group_commit: GroupCommitOptions::default(),
141            data_dir: None,
142            embedded_wal_path: None,
143        }
144    }
145}
146
147impl UnifiedStoreConfig {
148    /// Create config with data directory
149    pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
150        self.data_dir = Some(path.into());
151        self
152    }
153
154    /// Enable WAL
155    pub fn with_wal(mut self) -> Self {
156        self.enable_wal = true;
157        self
158    }
159
160    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
161        self.durability_mode = mode;
162        self
163    }
164
165    pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
166        self.group_commit = options;
167        self
168    }
169
170    pub fn with_embedded_wal_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
171        self.embedded_wal_path = Some(path.into());
172        self
173    }
174
175    /// Set max cross-references
176    pub fn with_max_refs(mut self, max: usize) -> Self {
177        self.max_cross_refs = max;
178        self
179    }
180}
181
182// ============================================================================
183// Error Types
184// ============================================================================
185
186/// Errors from UnifiedStore operations
187#[derive(Debug)]
188pub enum StoreError {
189    /// Collection already exists
190    CollectionExists(String),
191    /// Collection not found
192    CollectionNotFound(String),
193    /// Entity not found
194    EntityNotFound(EntityId),
195    /// Too many cross-references
196    TooManyRefs(EntityId),
197    /// Segment error
198    Segment(SegmentError),
199    /// I/O error
200    Io(std::io::Error),
201    /// Serialization error
202    Serialization(String),
203    /// Internal error (lock poisoning, invariant violation)
204    Internal(String),
205}
206
207impl std::fmt::Display for StoreError {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        match self {
210            Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
211            Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
212            Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
213            Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
214            Self::Segment(e) => write!(f, "Segment error: {:?}", e),
215            Self::Io(e) => write!(f, "I/O error: {}", e),
216            Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
217            Self::Internal(msg) => write!(f, "Internal error: {}", msg),
218        }
219    }
220}
221
222impl std::error::Error for StoreError {}
223
224impl From<SegmentError> for StoreError {
225    fn from(e: SegmentError) -> Self {
226        Self::Segment(e)
227    }
228}
229
230impl From<std::io::Error> for StoreError {
231    fn from(e: std::io::Error) -> Self {
232        Self::Io(e)
233    }
234}
235
236// ============================================================================
237// Statistics
238// ============================================================================
239
240/// Statistics for UnifiedStore
241#[derive(Debug, Clone, Default)]
242pub struct StoreStats {
243    /// Number of collections
244    pub collection_count: usize,
245    /// Total entities across all collections
246    pub total_entities: usize,
247    /// Total memory usage in bytes
248    pub total_memory_bytes: usize,
249    /// Per-collection statistics
250    pub collections: HashMap<String, ManagerStats>,
251    /// Total cross-references
252    pub cross_ref_count: usize,
253}
254
255impl StoreStats {
256    /// Get average entities per collection
257    pub fn avg_entities_per_collection(&self) -> f64 {
258        if self.collection_count == 0 {
259            0.0
260        } else {
261            self.total_entities as f64 / self.collection_count as f64
262        }
263    }
264
265    /// Get memory in MB
266    pub fn memory_mb(&self) -> f64 {
267        self.total_memory_bytes as f64 / (1024.0 * 1024.0)
268    }
269}
270
271// ============================================================================
272// UnifiedStore - The Main API
273// ============================================================================
274
275/// Unified storage for tables, graphs, and vectors
276///
277/// UnifiedStore provides a single coherent interface for all data types:
278/// - **Tables**: Row-based data with columns
279/// - **Graphs**: Nodes and edges with labels
280/// - **Vectors**: Embeddings for similarity search
281///
282/// # Features
283///
284/// - Multi-collection management
285/// - Cross-collection queries
286/// - Cross-reference tracking between entities
287/// - Automatic ID generation
288/// - Segment-based storage with growing/sealed lifecycle
289///
290/// # Example
291///
292/// ```ignore
293/// use reddb::storage::{Entity, Store};
294///
295/// let store = Store::new();
296///
297/// // Create a collection
298/// store.create_collection("hosts")?;
299///
300/// // Insert an entity
301/// let entity = Entity::table_row(1, "hosts", 1, vec![]);
302/// let id = store.insert("hosts", entity)?;
303///
304/// // Query
305/// let found = store.get("hosts", id);
306/// ```
307pub struct UnifiedStore {
308    /// Store configuration
309    config: UnifiedStoreConfig,
310    /// File format version for serialization
311    format_version: AtomicU32,
312    /// Global entity ID counter
313    next_entity_id: AtomicU64,
314    /// Collections by name
315    collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
316    /// Forward cross-references: source_id → [(target_id, ref_type, target_collection)]
317    cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
318    /// Reverse cross-references: target_id → [(source_id, ref_type, source_collection)]
319    reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
320    /// Optional page-based storage via Pager
321    pager: Option<Arc<Pager>>,
322    /// Database file path (for paged mode)
323    db_path: Option<PathBuf>,
324    /// B-tree indices for O(log n) entity lookups by ID (per collection).
325    /// Stored as `Arc<BTree>` so hot-path callers can clone the handle out
326    /// under a read lock and release the map-level lock before doing the
327    /// actual insert — previously the outer RwLock was held for the whole
328    /// btree mutation, serialising every concurrent insert across every
329    /// collection into one global write lock.
330    btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
331    /// Cross-structure context index for unified search
332    context_index: ContextIndex,
333    /// Hot entity cache — sharded bounded LRU for `get_any` lookups.
334    /// See `entity_cache.rs` for the rationale; this replaced a single
335    /// `RwLock<HashMap>` that serialised every `delete_batch` invalidation.
336    entity_cache: EntityCache,
337    /// Graph node label index: (collection, label) → Vec<EntityId>.
338    /// O(1) lookup for MATCH (n:Label) graph patterns — avoids full collection scan.
339    graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
340    /// Whether the paged registry on page 1 must be rewritten before the next flush.
341    paged_registry_dirty: AtomicBool,
342    /// Logical store WAL / grouped durability coordinator for paged mode.
343    commit: Option<Arc<StoreCommitCoordinator>>,
344    /// Counts how often `unindex_cross_refs_batch` took the read-only fast
345    /// path (no inbound refs, no outbound refs for any deleted id) and so
346    /// avoided acquiring the `cross_refs` / `reverse_refs` write locks.
347    /// Used by tests to pin the early-exit; cheap relaxed counter otherwise.
348    unindex_cross_refs_fast_path: AtomicU64,
349    /// WAL-replayed `VectorInsert` records, captured at open time and
350    /// drained per-collection on first `vector.turbo` access (issue
351    /// #694). Boot-time recovery: the in-memory TurboQuant index is
352    /// rebuilt by replaying these FP32 vectors in WAL order, so the
353    /// rebuilt state is byte-deterministic against the pre-restart
354    /// state under a fixed codec seed.
355    pub(crate) replayed_turbo_inserts: parking_lot::Mutex<HashMap<String, Vec<(u64, Vec<f32>)>>>,
356}
357
358mod builder;
359mod commit;
360mod impl_entities;
361mod impl_file;
362mod impl_native_a;
363mod impl_native_b;
364mod impl_native_c;
365mod impl_pages;
366mod native_helpers;
367
368pub use self::builder::EntityBuilder;
369pub(crate) use self::commit::DeferredStoreWalActions;
370use self::commit::{StoreCommitCoordinator, StoreWalAction};
371use self::native_helpers::*;