Skip to main content

reddb_server/storage/unified/devx/
reddb.rs

1//! RedDB - Main Entry Point
2//!
3//! Unified Database with best-in-class developer experience for Tables, Graphs, and Vectors.
4
5use std::collections::HashMap;
6use std::fs;
7use std::fs::File;
8use std::io::Read;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, RwLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12use std::{
13    collections::{BTreeMap, BTreeSet},
14    fmt::Debug,
15};
16
17use super::super::{
18    EntityData, EntityId, EntityKind, StoreStats, UnifiedEntity, UnifiedStore, UnifiedStoreConfig,
19};
20use super::batch::BatchBuilder;
21use super::builders::{
22    DocumentBuilder, EdgeBuilder, KvBuilder, NodeBuilder, RowBuilder, VectorBuilder,
23};
24use super::helpers::cosine_similarity;
25use super::query::QueryBuilder;
26use super::refs::{NodeRef, TableRef, VectorRef};
27use super::types::{LinkedEntity, SimilarResult};
28use super::{IndexConfig, Preprocessor, SharedPreprocessors};
29use crate::api::{Capability, CatalogSnapshot, CollectionStats, RedDBOptions, StorageMode};
30use crate::catalog::{
31    consistency_report, snapshot_store_with_declarations, CatalogConsistencyReport,
32    CatalogDeclarations, CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
33    CollectionModel,
34};
35use crate::health::{storage_file_health, HealthReport, HealthState};
36use crate::index::{IndexCatalog, IndexConfig as RuntimeIndexConfig, IndexKind};
37use crate::physical::{
38    ExportDescriptor, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
39    PhysicalMetadataFile,
40};
41use crate::replication::{primary::PrimaryReplication, ReplicationRole};
42use crate::serde_json::Value as JsonValue;
43use crate::storage::engine::{HnswIndex, IvfConfig, IvfIndex, IvfStats, PhysicalFileHeader};
44use crate::storage::schema::Value;
45use crate::storage::unified::store::{
46    NativeCatalogCollectionSummary, NativeCatalogSummary, NativeExportSummary,
47    NativeManifestSummary, NativeMetadataStateSummary, NativePhysicalState, NativeRecoverySummary,
48    NativeRegistryIndexSummary, NativeRegistryJobSummary, NativeRegistryProjectionSummary,
49    NativeRegistrySummary, NativeSnapshotSummary, NativeVectorArtifactPageSummary,
50    NativeVectorArtifactSummary,
51};
52
53/// RedDB - Unified Database with Best-in-Class DevX
54///
55/// Single entry point for Tables, Graphs, and Vectors with full
56/// metadata support and cross-referencing.
57pub struct RedDB {
58    store: Arc<UnifiedStore>,
59    /// Preprocessing hooks
60    preprocessors: SharedPreprocessors,
61    /// Index configuration
62    index_config: IndexConfig,
63    /// Persistence path
64    path: Option<PathBuf>,
65    /// Construction/runtime options
66    options: RedDBOptions,
67    /// Whether the current persistence backend is page-based.
68    paged_mode: bool,
69    /// Per-collection HNSW vector index cache for fast approximate nearest neighbor search.
70    /// Lazily built on first vector similarity query per collection.
71    vector_indexes: RwLock<HashMap<String, CachedVectorIndex>>,
72    /// Default TTL policy declared at the collection level, in milliseconds.
73    collection_ttl_defaults_ms: RwLock<HashMap<String, u64>>,
74    /// In-memory cache of collection contracts keyed by collection name.
75    /// Populated lazily from `physical_metadata()` and invalidated on
76    /// `save_collection_contract` / `remove_collection_contract`.
77    /// Avoids reparsing the whole PhysicalMetadataFile JSON on every
78    /// `collection_contract(name)` lookup — which happens 3× per insert
79    /// (ensure_model, enforce_uniqueness, normalize_fields) and dominated
80    /// the insert hot path at ~30%.
81    pub(crate) collection_contract_cache:
82        RwLock<Option<Arc<HashMap<String, Arc<crate::physical::CollectionContract>>>>>,
83    /// Optional remote storage backend for snapshot transport.
84    pub(crate) remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
85    /// Optional CAS-capable handle for backends that implement
86    /// `AtomicRemoteBackend`. Mirrors `RedDBOptions::remote_backend_atomic`
87    /// — see that field for semantics.
88    pub(crate) remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
89    /// Remote object key used by the remote backend.
90    pub(crate) remote_key: Option<String>,
91    /// Primary replication state (only present when role is Primary).
92    pub(crate) replication: Option<Arc<PrimaryReplication>>,
93    /// Quorum coordinator for multi-region commits (Phase 2.6 PG parity).
94    ///
95    /// Only present when role is Primary. Write path calls
96    /// `quorum.wait_for_quorum(lsn)` after appending to the primary WAL
97    /// to block until the configured quorum of replicas has acked. When
98    /// the config is `Async` (default), this returns instantly — same
99    /// semantics as pre-Phase-2.6 RedDB.
100    pub(crate) quorum: Option<Arc<crate::replication::quorum::QuorumCoordinator>>,
101    /// Eventual consistency registry (embedded mode support).
102    pub(crate) ec_registry: Arc<crate::ec::config::EcRegistry>,
103    /// Lazily-initialised ML runtime (model registry + job queue +
104    /// semantic cache). Created on first access by the SQL layer so
105    /// `ML_CLASSIFY`, `SEMANTIC_CACHE_GET/PUT`, and friends have a
106    /// shared handle without forcing every instantiation path to
107    /// know about it.
108    pub(crate) ml_runtime: std::sync::OnceLock<crate::storage::ml::MlRuntime>,
109    /// Shared semantic cache used by `SEMANTIC_CACHE_*` scalars.
110    /// Separate from `MlRuntime` because cache config is runtime-only
111    /// and doesn't need the job queue — keep it a standalone `Arc`.
112    pub(crate) semantic_cache: std::sync::OnceLock<Arc<crate::storage::ml::SemanticCache>>,
113    /// Hypertable registry — populated by `CREATE HYPERTABLE` DDL,
114    /// consumed by chunk routing, retention sweeps, and `SHOW
115    /// HYPERTABLES`. Lazy so startup stays cheap when no hypertables
116    /// exist.
117    pub(crate) hypertables:
118        std::sync::OnceLock<Arc<crate::storage::timeseries::HypertableRegistry>>,
119    /// Continuous-aggregate engine — populated by `CA_REGISTER` and
120    /// queried by `CA_REFRESH` / `CA_STATE` scalars. Same lazy shape
121    /// as the other engine handles.
122    pub(crate) continuous_aggregates: std::sync::OnceLock<
123        Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine>,
124    >,
125    /// Per-collection `vector.turbo` runtime state (issue #693, PRD
126    /// #668). Lazily initialised: the entire map allocation is
127    /// deferred until the first turbo collection is created. Each
128    /// entry owns the in-memory `TurboQuantIndex` + optional
129    /// `TurboExtent` for the collection.
130    pub(crate) turbo_collections: std::sync::OnceLock<
131        Arc<
132            parking_lot::Mutex<
133                std::collections::HashMap<
134                    String,
135                    Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
136                >,
137            >,
138        >,
139    >,
140    /// Join handles for `vector.turbo` background-rebuild workers
141    /// (issue #673). Each call to `turbo_state` that materialises a
142    /// fresh `TurboCollectionState` registers a handle here; the
143    /// `Drop` impl below joins every handle before releasing
144    /// `store`, so a runtime restart on the same database path is
145    /// not racy with an in-flight rebuild holding the file lock.
146    pub(crate) turbo_rebuild_workers: parking_lot::Mutex<Vec<std::thread::JoinHandle<()>>>,
147    _ephemeral_cleanup: Option<EphemeralDataPathCleanup>,
148}
149
150pub(super) struct EphemeralDataPathCleanup {
151    path: PathBuf,
152}
153
154impl EphemeralDataPathCleanup {
155    pub(super) fn new(path: PathBuf) -> Self {
156        Self { path }
157    }
158}
159
160impl Drop for EphemeralDataPathCleanup {
161    fn drop(&mut self) {
162        for path in ephemeral_data_artifacts(&self.path) {
163            if path.is_dir() {
164                let _ = fs::remove_dir_all(path);
165            } else {
166                let _ = fs::remove_file(path);
167            }
168        }
169    }
170}
171
172pub(super) fn is_ephemeral_data_path(path: &Path) -> bool {
173    let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
174        return false;
175    };
176    if !file_name.starts_with("reddb-ephemeral-") || !file_name.ends_with(".rdb") {
177        return false;
178    }
179    path.parent()
180        .map(|parent| parent == std::env::temp_dir())
181        .unwrap_or(false)
182}
183
184fn ephemeral_data_artifacts(data_path: &Path) -> Vec<PathBuf> {
185    let logical_wal_path = reddb_file::layout::logical_wal_path(data_path);
186    let result_cache_l2_path = reddb_file::layout::result_cache_l2_path(data_path);
187    let legacy_logical_slots_path = reddb_file::layout::legacy_logical_slots_path(data_path);
188    let mut operational_manifest_root = data_path.as_os_str().to_os_string();
189    operational_manifest_root.push(".ops");
190    let mut paths = vec![
191        data_path.to_path_buf(),
192        PathBuf::from(operational_manifest_root),
193        reddb_file::layout::unified_wal_path(data_path),
194        logical_wal_path.clone(),
195        reddb_file::layout::logical_wal_temp_path(&logical_wal_path),
196        reddb_file::layout::temp_path(data_path),
197        reddb_file::layout::atomic_temp_path(data_path),
198        result_cache_l2_path.clone(),
199        reddb_file::layout::pager_legacy_wal_path(data_path),
200        reddb_file::layout::engine_wal_path(data_path),
201        reddb_file::layout::pager_header_path(data_path),
202        reddb_file::layout::pager_meta_path(data_path),
203        reddb_file::layout::pager_dwb_path(data_path),
204        legacy_logical_slots_path.clone(),
205        reddb_file::layout::legacy_logical_slots_temp_path(&legacy_logical_slots_path),
206        reddb_file::layout::legacy_audit_log_path(data_path),
207        reddb_file::layout::shm_path(data_path),
208        reddb_file::layout::physical_metadata_json_path(data_path),
209        reddb_file::layout::physical_metadata_binary_path(data_path),
210        reddb_file::layout::rebootstrap_staging_root(data_path),
211        reddb_file::layout::rebootstrap_pending_path(data_path),
212        reddb_file::layout::rebootstrap_ready_marker_path(data_path),
213        reddb_file::layout::rebootstrap_intent_log_path(data_path),
214        reddb_file::layout::rebootstrap_previous_path(data_path),
215        reddb_file::layout::primary_replica_root(data_path),
216        reddb_file::layout::serverless_root(data_path),
217    ];
218    paths.extend(reddb_file::layout::pager_shadow_sidecar_paths(data_path));
219    paths.extend(reddb_file::layout::pager_shadow_sidecar_paths(
220        &result_cache_l2_path,
221    ));
222    if let Some(parent) = data_path.parent() {
223        paths.push(reddb_file::layout::legacy_slow_query_log_path(parent));
224    }
225    paths.push(reddb_file::layout::support_dir_for(data_path));
226    paths
227}
228
229impl Drop for RedDB {
230    fn drop(&mut self) {
231        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
232            && self.options.storage_profile.packaging
233                == crate::storage::StoragePackaging::SingleFile
234            && !self.paged_mode
235            && !self.options.read_only
236        {
237            if let Some(path) = &self.path {
238                let snapshot = self.store.to_binary_dump_bytes();
239                let _ = crate::storage::EmbeddedRdbArtifact::write_snapshot(path, &snapshot);
240            }
241        }
242
243        // Issue #673 — wait for every `vector.turbo` background
244        // rebuild worker to exit before our `Arc<UnifiedStore>` is
245        // released. The worker holds a strong handle to the store
246        // during its work phase, so without this join a fast
247        // `RedDBRuntime` restart on the same path observes the
248        // file lock as still held by the soon-to-exit worker.
249        let handles: Vec<_> = self.turbo_rebuild_workers.lock().drain(..).collect();
250        for h in handles {
251            let _ = h.join();
252        }
253        // Issue #674 — also join any in-flight `.tv` snapshot dump
254        // so the on-disk file is complete and renamed before a
255        // restart observes it. Without this, a fast reopen can race
256        // and see the `<path>.tv.tmp` before the atomic rename.
257        if let Some(map) = self.turbo_collections.get() {
258            let states: Vec<_> = map.lock().values().cloned().collect();
259            for state in states {
260                state.wait_snapshot();
261            }
262        }
263    }
264}
265
266/// A cached HNSW index together with the entity count at build time.
267/// When the live entity count diverges the cache is considered stale and
268/// is rebuilt on the next query.
269pub(crate) struct CachedVectorIndex {
270    pub index: Arc<RwLock<HnswIndex>>,
271    pub entity_count: usize,
272}
273
274#[derive(Debug, Clone)]
275pub struct NativeHeaderMismatch {
276    pub field: &'static str,
277    pub native: String,
278    pub expected: String,
279}
280
281#[derive(Debug, Clone)]
282pub struct NativeHeaderInspection {
283    pub native: PhysicalFileHeader,
284    pub expected: PhysicalFileHeader,
285    pub consistent: bool,
286    pub mismatches: Vec<NativeHeaderMismatch>,
287}
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290pub enum NativeHeaderRepairPolicy {
291    InSync,
292    RepairNativeFromMetadata,
293    NativeAheadOfMetadata,
294}
295
296#[derive(Debug, Clone)]
297pub struct PhysicalAuthorityStatus {
298    pub preference: String,
299    pub sidecar_available: bool,
300    pub native_state_available: bool,
301    pub native_bootstrap_ready: bool,
302    pub native_registry_complete: Option<bool>,
303    pub native_recovery_complete: Option<bool>,
304    pub native_catalog_complete: Option<bool>,
305    pub sidecar_loaded_from: Option<String>,
306    pub native_header_repair_policy: Option<String>,
307    pub metadata_sequence: Option<u64>,
308    pub native_sequence: Option<u64>,
309    pub native_metadata_last_loaded_from: Option<String>,
310    pub native_metadata_generated_at_unix_ms: Option<u128>,
311}
312
313#[derive(Debug, Clone)]
314pub struct NativeVectorArtifactInspection {
315    pub collection: String,
316    pub artifact_kind: String,
317    pub root_page: u32,
318    pub page_count: u32,
319    pub byte_len: u64,
320    pub checksum: u64,
321    pub node_count: u64,
322    pub dimension: u32,
323    pub max_layer: u32,
324    pub total_connections: u64,
325    pub avg_connections: f64,
326    pub entry_point: Option<u64>,
327    pub ivf_n_lists: Option<u32>,
328    pub ivf_non_empty_lists: Option<u32>,
329    pub ivf_trained: Option<bool>,
330    pub graph_edge_count: Option<u64>,
331    pub graph_node_count: Option<u64>,
332    pub graph_label_count: Option<u32>,
333    pub text_doc_count: Option<u64>,
334    pub text_term_count: Option<u64>,
335    pub text_posting_count: Option<u64>,
336    pub document_doc_count: Option<u64>,
337    pub document_path_count: Option<u64>,
338    pub document_value_count: Option<u64>,
339    pub document_unique_value_count: Option<u64>,
340}
341
342#[derive(Debug, Clone)]
343pub struct NativeVectorArtifactBatchInspection {
344    pub inspected_count: usize,
345    pub valid_count: usize,
346    pub artifacts: Vec<NativeVectorArtifactInspection>,
347    pub failures: Vec<(String, String, String)>,
348}
349
350mod impl_access;
351mod impl_core_a;
352mod impl_core_b;
353mod impl_ec;
354mod impl_metadata;
355mod impl_registry;
356
357impl Default for RedDB {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363fn infer_collection_index_kind(model: CollectionModel, index_name: &str) -> IndexKind {
364    match index_name {
365        "graph-adjacency" => IndexKind::GraphAdjacency,
366        "vector-hnsw" => IndexKind::VectorHnsw,
367        "vector-inverted" => IndexKind::VectorInverted,
368        "vector-turbo" => IndexKind::VectorTurbo,
369        "text-fulltext" => IndexKind::FullText,
370        "document-pathvalue" => IndexKind::DocumentPathValue,
371        "search-hybrid" => IndexKind::HybridSearch,
372        _ => match model {
373            CollectionModel::Graph => IndexKind::GraphAdjacency,
374            CollectionModel::Vector => IndexKind::VectorHnsw,
375            CollectionModel::Document => IndexKind::DocumentPathValue,
376            CollectionModel::Kv | CollectionModel::Config | CollectionModel::Vault => {
377                IndexKind::Hash
378            }
379            _ => IndexKind::BTree,
380        },
381    }
382}
383
384fn estimate_index_entries(collection: &CollectionDescriptor, kind: IndexKind) -> usize {
385    match kind {
386        IndexKind::BTree | IndexKind::Hash | IndexKind::Bitmap | IndexKind::Spatial => {
387            collection.entities
388        }
389        IndexKind::GraphAdjacency => collection.cross_refs.max(collection.entities),
390        IndexKind::VectorHnsw | IndexKind::VectorInverted | IndexKind::VectorTurbo => {
391            collection.entities
392        }
393        IndexKind::FullText => collection.entities.saturating_mul(4),
394        IndexKind::DocumentPathValue => collection.entities.saturating_mul(6),
395        IndexKind::HybridSearch => collection.entities,
396    }
397}
398
399fn estimate_index_memory(entries: usize, kind: IndexKind) -> u64 {
400    let per_entry = match kind {
401        IndexKind::BTree => 64,
402        IndexKind::Hash => 48,
403        IndexKind::Bitmap => 2,   // Roaring bitmaps are very compact
404        IndexKind::Spatial => 40, // R-tree node: 2 floats + EntityId + overhead
405        IndexKind::GraphAdjacency => 96,
406        IndexKind::VectorHnsw => 256,
407        IndexKind::VectorInverted => 128,
408        IndexKind::VectorTurbo => 64,
409        IndexKind::FullText => 80,
410        IndexKind::DocumentPathValue => 104,
411        IndexKind::HybridSearch => 144,
412    };
413    (entries as u64).saturating_mul(per_entry)
414}
415
416fn index_backend_name(kind: IndexKind) -> &'static str {
417    match kind {
418        IndexKind::BTree => "page-btree",
419        IndexKind::Hash => "hash-map",
420        IndexKind::Bitmap => "roaring-bitmap",
421        IndexKind::Spatial => "rstar-rtree",
422        IndexKind::GraphAdjacency => "adjacency-map",
423        IndexKind::VectorHnsw => "vector-hnsw",
424        IndexKind::VectorInverted => "vector-ivf",
425        IndexKind::VectorTurbo => "vector-turbo",
426        IndexKind::FullText => "inverted-text",
427        IndexKind::DocumentPathValue => "document-pathvalue",
428        IndexKind::HybridSearch => "hybrid-score",
429    }
430}
431
432fn fnv1a_seed() -> u64 {
433    0xcbf29ce484222325
434}
435
436fn fnv1a_hash_bytes(hash: &mut u64, bytes: &[u8]) {
437    for byte in bytes {
438        *hash ^= *byte as u64;
439        *hash = hash.wrapping_mul(0x100000001b3);
440    }
441}
442
443fn fnv1a_hash_value<T: Debug>(hash: &mut u64, value: &T) {
444    let rendered = format!("{value:?}");
445    fnv1a_hash_bytes(hash, rendered.as_bytes());
446}