reddb-io-server 1.2.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
//! Unified Store
//!
//! High-level API for the unified storage layer that combines tables, graphs,
//! and vectors into a single coherent interface.
//!
//! # Features
//!
//! - Multi-collection management
//! - Cross-collection queries
//! - Unified entity access
//! - Automatic ID generation
//! - Cross-reference management
//! - **Binary persistence** with pages, indices, and efficient encoding
//! - **Page-based storage** via Pager for ACID durability
//!
//! # Persistence Modes
//!
//! 1. **File Mode** (`save_to_file`/`load_from_file`): Simple binary dump
//! 2. **Paged Mode** (`open`/`persist`): Full page-based storage with B-tree indices

use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;

use parking_lot::RwLock;

use super::context_index::ContextIndex;
use super::entity::{
    CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
    GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
};
use super::entity_cache::EntityCache;
use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
use super::metadata::{Metadata, MetadataFilter, MetadataValue};
use super::segment::SegmentError;
use crate::api::{DurabilityMode, GroupCommitOptions};
use crate::physical::{ManifestEvent, ManifestEventKind};
use crate::storage::engine::pager::PagerError;
use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
use crate::storage::schema::types::Value;

const STORE_MAGIC: &[u8; 4] = b"RDST";
const STORE_VERSION_V1: u32 = 1;
const STORE_VERSION_V2: u32 = 2;
const STORE_VERSION_V3: u32 = 3;
const STORE_VERSION_V4: u32 = 4;
const STORE_VERSION_V5: u32 = 5;
const STORE_VERSION_V6: u32 = 6;
const STORE_VERSION_V7: u32 = 7; // entity records include metadata (serialize_entity_record format)
const STORE_VERSION_V8: u32 = 8; // table rows may carry explicit logical identity
const STORE_VERSION_V9: u32 = 9; // entity records persist MVCC xmin/xmax
const METADATA_MAGIC: &[u8; 4] = b"RDM2";
const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;

#[derive(Debug, Clone)]
pub struct NativeManifestEntrySummary {
    pub collection: String,
    pub object_key: String,
    pub kind: String,
    pub block_index: u64,
    pub block_checksum: u128,
    pub snapshot_min: u64,
    pub snapshot_max: Option<u64>,
}

#[derive(Debug, Clone)]
pub struct NativeManifestSummary {
    pub sequence: u64,
    pub event_count: u32,
    pub events_complete: bool,
    pub omitted_event_count: u32,
    pub recent_events: Vec<NativeManifestEntrySummary>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistryIndexSummary {
    pub name: String,
    pub kind: String,
    pub collection: Option<String>,
    pub enabled: bool,
    pub entries: u64,
    pub estimated_memory_bytes: u64,
    pub last_refresh_ms: Option<u128>,
    pub backend: String,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistryProjectionSummary {
    pub name: String,
    pub source: String,
    pub created_at_unix_ms: u128,
    pub updated_at_unix_ms: u128,
    pub node_labels: Vec<String>,
    pub node_types: Vec<String>,
    pub edge_labels: Vec<String>,
    pub last_materialized_sequence: Option<u64>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistryJobSummary {
    pub id: String,
    pub kind: String,
    pub projection: Option<String>,
    pub state: String,
    pub created_at_unix_ms: u128,
    pub updated_at_unix_ms: u128,
    pub last_run_sequence: Option<u64>,
    pub metadata: BTreeMap<String, String>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeVectorArtifactSummary {
    pub collection: String,
    pub artifact_kind: String,
    pub vector_count: u64,
    pub dimension: u32,
    pub max_layer: u32,
    pub serialized_bytes: u64,
    pub checksum: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeVectorArtifactPageSummary {
    pub collection: String,
    pub artifact_kind: String,
    pub root_page: u32,
    pub page_count: u32,
    pub byte_len: u64,
    pub checksum: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRegistrySummary {
    pub collection_count: u32,
    pub index_count: u32,
    pub graph_projection_count: u32,
    pub analytics_job_count: u32,
    pub vector_artifact_count: u32,
    pub collections_complete: bool,
    pub indexes_complete: bool,
    pub graph_projections_complete: bool,
    pub analytics_jobs_complete: bool,
    pub vector_artifacts_complete: bool,
    pub omitted_collection_count: u32,
    pub omitted_index_count: u32,
    pub omitted_graph_projection_count: u32,
    pub omitted_analytics_job_count: u32,
    pub omitted_vector_artifact_count: u32,
    pub collection_names: Vec<String>,
    pub indexes: Vec<NativeRegistryIndexSummary>,
    pub graph_projections: Vec<NativeRegistryProjectionSummary>,
    pub analytics_jobs: Vec<NativeRegistryJobSummary>,
    pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeSnapshotSummary {
    pub snapshot_id: u64,
    pub created_at_unix_ms: u128,
    pub superblock_sequence: u64,
    pub collection_count: u32,
    pub total_entities: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeExportSummary {
    pub name: String,
    pub created_at_unix_ms: u128,
    pub snapshot_id: Option<u64>,
    pub superblock_sequence: u64,
    pub collection_count: u32,
    pub total_entities: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeRecoverySummary {
    pub snapshot_count: u32,
    pub export_count: u32,
    pub snapshots_complete: bool,
    pub exports_complete: bool,
    pub omitted_snapshot_count: u32,
    pub omitted_export_count: u32,
    pub snapshots: Vec<NativeSnapshotSummary>,
    pub exports: Vec<NativeExportSummary>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeCatalogCollectionSummary {
    pub name: String,
    pub entities: u64,
    pub cross_refs: u64,
    pub segments: u32,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeCatalogSummary {
    pub collection_count: u32,
    pub total_entities: u64,
    pub collections_complete: bool,
    pub omitted_collection_count: u32,
    pub collections: Vec<NativeCatalogCollectionSummary>,
}

#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MvccVacuumStats {
    pub scanned_versions: u64,
    pub retained_versions: u64,
    pub reclaimed_versions: u64,
    pub retained_history_versions: u64,
    pub reclaimed_history_versions: u64,
    pub retained_tombstones: u64,
    pub reclaimed_tombstones: u64,
}

impl MvccVacuumStats {
    pub fn add(&mut self, other: &Self) {
        self.scanned_versions += other.scanned_versions;
        self.retained_versions += other.retained_versions;
        self.reclaimed_versions += other.reclaimed_versions;
        self.retained_history_versions += other.retained_history_versions;
        self.reclaimed_history_versions += other.reclaimed_history_versions;
        self.retained_tombstones += other.retained_tombstones;
        self.reclaimed_tombstones += other.reclaimed_tombstones;
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeMetadataStateSummary {
    pub protocol_version: String,
    pub generated_at_unix_ms: u128,
    pub last_loaded_from: Option<String>,
    pub last_healed_at_unix_ms: Option<u128>,
}

#[derive(Debug, Clone)]
pub struct NativePhysicalState {
    pub header: PhysicalFileHeader,
    pub collection_roots: BTreeMap<String, u64>,
    pub manifest: Option<NativeManifestSummary>,
    pub registry: Option<NativeRegistrySummary>,
    pub recovery: Option<NativeRecoverySummary>,
    pub catalog: Option<NativeCatalogSummary>,
    pub metadata_state: Option<NativeMetadataStateSummary>,
    pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
}

// ============================================================================
// Configuration
// ============================================================================

/// Configuration for UnifiedStore
#[derive(Debug, Clone)]
pub struct UnifiedStoreConfig {
    /// Configuration for segment managers
    pub manager_config: ManagerConfig,
    /// Automatically index cross-references on insert
    pub auto_index_refs: bool,
    /// Automatically build a HASH index on a user `id` column the first
    /// time a row carrying that column is inserted into a collection.
    /// Mirrors PostgreSQL's implicit primary-key index and Mongo's `_id`
    /// default index — without it, `WHERE id = N` falls through to a
    /// full segment scan because RedDB has no concept of an automatic
    /// primary-key index on user-declared columns. See `docs/perf/
    /// delete-sequential-2026-05-06.md` for the perf rationale.
    /// Defaults to `true`; set to `false` to opt out per workload.
    pub auto_index_id: bool,
    /// Maximum cross-references per entity
    pub max_cross_refs: usize,
    /// Enable write-ahead logging
    pub enable_wal: bool,
    /// Durability profile for paged writes.
    pub durability_mode: DurabilityMode,
    /// Group-commit batching knobs when using grouped durability.
    pub group_commit: GroupCommitOptions,
    /// Data directory path
    pub data_dir: Option<std::path::PathBuf>,
}

impl Default for UnifiedStoreConfig {
    fn default() -> Self {
        Self {
            manager_config: ManagerConfig::default(),
            auto_index_refs: true,
            auto_index_id: true,
            max_cross_refs: 1000,
            enable_wal: false,
            // Mirrors `RedDBOptions::default().durability_mode` — see
            // `src/api.rs` for the rationale.
            durability_mode: DurabilityMode::WalDurableGrouped,
            group_commit: GroupCommitOptions::default(),
            data_dir: None,
        }
    }
}

impl UnifiedStoreConfig {
    /// Create config with data directory
    pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
        self.data_dir = Some(path.into());
        self
    }

    /// Enable WAL
    pub fn with_wal(mut self) -> Self {
        self.enable_wal = true;
        self
    }

    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
        self.durability_mode = mode;
        self
    }

    pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
        self.group_commit = options;
        self
    }

    /// Set max cross-references
    pub fn with_max_refs(mut self, max: usize) -> Self {
        self.max_cross_refs = max;
        self
    }
}

// ============================================================================
// Error Types
// ============================================================================

/// Errors from UnifiedStore operations
#[derive(Debug)]
pub enum StoreError {
    /// Collection already exists
    CollectionExists(String),
    /// Collection not found
    CollectionNotFound(String),
    /// Entity not found
    EntityNotFound(EntityId),
    /// Too many cross-references
    TooManyRefs(EntityId),
    /// Segment error
    Segment(SegmentError),
    /// I/O error
    Io(std::io::Error),
    /// Serialization error
    Serialization(String),
    /// Internal error (lock poisoning, invariant violation)
    Internal(String),
}

impl std::fmt::Display for StoreError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
            Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
            Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
            Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
            Self::Segment(e) => write!(f, "Segment error: {:?}", e),
            Self::Io(e) => write!(f, "I/O error: {}", e),
            Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
            Self::Internal(msg) => write!(f, "Internal error: {}", msg),
        }
    }
}

impl std::error::Error for StoreError {}

impl From<SegmentError> for StoreError {
    fn from(e: SegmentError) -> Self {
        Self::Segment(e)
    }
}

impl From<std::io::Error> for StoreError {
    fn from(e: std::io::Error) -> Self {
        Self::Io(e)
    }
}

// ============================================================================
// Statistics
// ============================================================================

/// Statistics for UnifiedStore
#[derive(Debug, Clone, Default)]
pub struct StoreStats {
    /// Number of collections
    pub collection_count: usize,
    /// Total entities across all collections
    pub total_entities: usize,
    /// Total memory usage in bytes
    pub total_memory_bytes: usize,
    /// Per-collection statistics
    pub collections: HashMap<String, ManagerStats>,
    /// Total cross-references
    pub cross_ref_count: usize,
}

impl StoreStats {
    /// Get average entities per collection
    pub fn avg_entities_per_collection(&self) -> f64 {
        if self.collection_count == 0 {
            0.0
        } else {
            self.total_entities as f64 / self.collection_count as f64
        }
    }

    /// Get memory in MB
    pub fn memory_mb(&self) -> f64 {
        self.total_memory_bytes as f64 / (1024.0 * 1024.0)
    }
}

// ============================================================================
// UnifiedStore - The Main API
// ============================================================================

/// Unified storage for tables, graphs, and vectors
///
/// UnifiedStore provides a single coherent interface for all data types:
/// - **Tables**: Row-based data with columns
/// - **Graphs**: Nodes and edges with labels
/// - **Vectors**: Embeddings for similarity search
///
/// # Features
///
/// - Multi-collection management
/// - Cross-collection queries
/// - Cross-reference tracking between entities
/// - Automatic ID generation
/// - Segment-based storage with growing/sealed lifecycle
///
/// # Example
///
/// ```ignore
/// use reddb::storage::{Entity, Store};
///
/// let store = Store::new();
///
/// // Create a collection
/// store.create_collection("hosts")?;
///
/// // Insert an entity
/// let entity = Entity::table_row(1, "hosts", 1, vec![]);
/// let id = store.insert("hosts", entity)?;
///
/// // Query
/// let found = store.get("hosts", id);
/// ```
pub struct UnifiedStore {
    /// Store configuration
    config: UnifiedStoreConfig,
    /// File format version for serialization
    format_version: AtomicU32,
    /// Global entity ID counter
    next_entity_id: AtomicU64,
    /// Collections by name
    collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
    /// Forward cross-references: source_id → [(target_id, ref_type, target_collection)]
    cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
    /// Reverse cross-references: target_id → [(source_id, ref_type, source_collection)]
    reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
    /// Optional page-based storage via Pager
    pager: Option<Arc<Pager>>,
    /// Database file path (for paged mode)
    db_path: Option<PathBuf>,
    /// B-tree indices for O(log n) entity lookups by ID (per collection).
    /// Stored as `Arc<BTree>` so hot-path callers can clone the handle out
    /// under a read lock and release the map-level lock before doing the
    /// actual insert — previously the outer RwLock was held for the whole
    /// btree mutation, serialising every concurrent insert across every
    /// collection into one global write lock.
    btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
    /// Cross-structure context index for unified search
    context_index: ContextIndex,
    /// Hot entity cache — sharded bounded LRU for `get_any` lookups.
    /// See `entity_cache.rs` for the rationale; this replaced a single
    /// `RwLock<HashMap>` that serialised every `delete_batch` invalidation.
    entity_cache: EntityCache,
    /// Graph node label index: (collection, label) → Vec<EntityId>.
    /// O(1) lookup for MATCH (n:Label) graph patterns — avoids full collection scan.
    graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
    /// Whether the paged registry on page 1 must be rewritten before the next flush.
    paged_registry_dirty: AtomicBool,
    /// Logical store WAL / grouped durability coordinator for paged mode.
    commit: Option<Arc<StoreCommitCoordinator>>,
    /// Counts how often `unindex_cross_refs_batch` took the read-only fast
    /// path (no inbound refs, no outbound refs for any deleted id) and so
    /// avoided acquiring the `cross_refs` / `reverse_refs` write locks.
    /// Used by tests to pin the early-exit; cheap relaxed counter otherwise.
    unindex_cross_refs_fast_path: AtomicU64,
}

mod builder;
mod commit;
mod impl_entities;
mod impl_file;
mod impl_native_a;
mod impl_native_b;
mod impl_native_c;
mod impl_pages;
mod native_helpers;

pub use self::builder::EntityBuilder;
pub(crate) use self::commit::DeferredStoreWalActions;
use self::commit::{StoreCommitCoordinator, StoreWalAction};
use self::native_helpers::*;