Skip to main content

reddb_server/storage/unified/devx/
reddb.rs

1//! RedDB - Main Entry Point
2//!
3//! Unified Database with best-in-class developer experience for Tables, Graphs, and Vectors.
4
5use std::collections::HashMap;
6use std::fs;
7use std::fs::File;
8use std::io::Read;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, RwLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12use std::{
13    collections::{BTreeMap, BTreeSet},
14    fmt::Debug,
15};
16
17use super::super::{
18    EntityData, EntityId, EntityKind, StoreStats, UnifiedEntity, UnifiedStore, UnifiedStoreConfig,
19};
20use super::batch::BatchBuilder;
21use super::builders::{
22    DocumentBuilder, EdgeBuilder, KvBuilder, NodeBuilder, RowBuilder, VectorBuilder,
23};
24use super::helpers::cosine_similarity;
25use super::query::QueryBuilder;
26use super::refs::{NodeRef, TableRef, VectorRef};
27use super::types::{LinkedEntity, SimilarResult};
28use super::{IndexConfig, Preprocessor, SharedPreprocessors};
29use crate::api::{Capability, CatalogSnapshot, CollectionStats, RedDBOptions, StorageMode};
30use crate::catalog::{
31    consistency_report, snapshot_store_with_declarations, CatalogConsistencyReport,
32    CatalogDeclarations, CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
33    CollectionModel,
34};
35use crate::health::{storage_file_health, HealthReport, HealthState};
36use crate::index::{IndexCatalog, IndexConfig as RuntimeIndexConfig, IndexKind};
37use crate::physical::{
38    ExportDescriptor, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
39    PhysicalMetadataFile,
40};
41use crate::replication::{primary::PrimaryReplication, ReplicationRole};
42use crate::serde_json::Value as JsonValue;
43use crate::storage::engine::{HnswIndex, IvfConfig, IvfIndex, IvfStats, PhysicalFileHeader};
44use crate::storage::schema::Value;
45use crate::storage::unified::store::{
46    NativeCatalogCollectionSummary, NativeCatalogSummary, NativeExportSummary,
47    NativeManifestSummary, NativeMetadataStateSummary, NativePhysicalState, NativeRecoverySummary,
48    NativeRegistryIndexSummary, NativeRegistryJobSummary, NativeRegistryProjectionSummary,
49    NativeRegistrySummary, NativeSnapshotSummary, NativeVectorArtifactPageSummary,
50    NativeVectorArtifactSummary,
51};
52
53/// RedDB - Unified Database with Best-in-Class DevX
54///
55/// Single entry point for Tables, Graphs, and Vectors with full
56/// metadata support and cross-referencing.
57pub struct RedDB {
58    store: Arc<UnifiedStore>,
59    /// Preprocessing hooks
60    preprocessors: SharedPreprocessors,
61    /// Index configuration
62    index_config: IndexConfig,
63    /// Persistence path
64    path: Option<PathBuf>,
65    /// Construction/runtime options
66    options: RedDBOptions,
67    /// Whether the current persistence backend is page-based.
68    paged_mode: bool,
69    /// Per-collection HNSW vector index cache for fast approximate nearest neighbor search.
70    /// Lazily built on first vector similarity query per collection.
71    vector_indexes: RwLock<HashMap<String, CachedVectorIndex>>,
72    /// Default TTL policy declared at the collection level, in milliseconds.
73    collection_ttl_defaults_ms: RwLock<HashMap<String, u64>>,
74    /// In-memory cache of collection contracts keyed by collection name.
75    /// Populated lazily from `physical_metadata()` and invalidated on
76    /// `save_collection_contract` / `remove_collection_contract`.
77    /// Avoids reparsing the whole PhysicalMetadataFile JSON on every
78    /// `collection_contract(name)` lookup — which happens 3× per insert
79    /// (ensure_model, enforce_uniqueness, normalize_fields) and dominated
80    /// the insert hot path at ~30%.
81    pub(crate) collection_contract_cache:
82        RwLock<Option<Arc<HashMap<String, Arc<crate::physical::CollectionContract>>>>>,
83    /// Optional remote storage backend for snapshot transport.
84    pub(crate) remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
85    /// Optional CAS-capable handle for backends that implement
86    /// `AtomicRemoteBackend`. Mirrors `RedDBOptions::remote_backend_atomic`
87    /// — see that field for semantics.
88    pub(crate) remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
89    /// Remote object key used by the remote backend.
90    pub(crate) remote_key: Option<String>,
91    /// Primary replication state (only present when role is Primary).
92    pub(crate) replication: Option<Arc<PrimaryReplication>>,
93    /// Quorum coordinator for multi-region commits (Phase 2.6 PG parity).
94    ///
95    /// Only present when role is Primary. Write path calls
96    /// `quorum.wait_for_quorum(lsn)` after appending to the primary WAL
97    /// to block until the configured quorum of replicas has acked. When
98    /// the config is `Async` (default), this returns instantly — same
99    /// semantics as pre-Phase-2.6 RedDB.
100    pub(crate) quorum: Option<Arc<crate::replication::quorum::QuorumCoordinator>>,
101    /// Eventual consistency registry (embedded mode support).
102    pub(crate) ec_registry: Arc<crate::ec::config::EcRegistry>,
103    /// Lazily-initialised ML runtime (model registry + job queue +
104    /// semantic cache). Created on first access by the SQL layer so
105    /// `ML_CLASSIFY`, `SEMANTIC_CACHE_GET/PUT`, and friends have a
106    /// shared handle without forcing every instantiation path to
107    /// know about it.
108    pub(crate) ml_runtime: std::sync::OnceLock<crate::storage::ml::MlRuntime>,
109    /// Shared semantic cache used by `SEMANTIC_CACHE_*` scalars.
110    /// Separate from `MlRuntime` because cache config is runtime-only
111    /// and doesn't need the job queue — keep it a standalone `Arc`.
112    pub(crate) semantic_cache: std::sync::OnceLock<Arc<crate::storage::ml::SemanticCache>>,
113    /// Hypertable registry — populated by `CREATE HYPERTABLE` DDL,
114    /// consumed by chunk routing, retention sweeps, and `SHOW
115    /// HYPERTABLES`. Lazy so startup stays cheap when no hypertables
116    /// exist.
117    pub(crate) hypertables:
118        std::sync::OnceLock<Arc<crate::storage::timeseries::HypertableRegistry>>,
119    /// Continuous-aggregate engine — populated by `CA_REGISTER` and
120    /// queried by `CA_REFRESH` / `CA_STATE` scalars. Same lazy shape
121    /// as the other engine handles.
122    pub(crate) continuous_aggregates: std::sync::OnceLock<
123        Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine>,
124    >,
125    /// Per-collection `vector.turbo` runtime state (issue #693, PRD
126    /// #668). Lazily initialised: the entire map allocation is
127    /// deferred until the first turbo collection is created. Each
128    /// entry owns the in-memory `TurboQuantIndex` + optional
129    /// `TurboExtent` for the collection.
130    pub(crate) turbo_collections: std::sync::OnceLock<
131        Arc<
132            parking_lot::Mutex<
133                std::collections::HashMap<
134                    String,
135                    Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
136                >,
137            >,
138        >,
139    >,
140    /// Join handles for `vector.turbo` background-rebuild workers
141    /// (issue #673). Each call to `turbo_state` that materialises a
142    /// fresh `TurboCollectionState` registers a handle here; the
143    /// `Drop` impl below joins every handle before releasing
144    /// `store`, so a runtime restart on the same database path is
145    /// not racy with an in-flight rebuild holding the file lock.
146    pub(crate) turbo_rebuild_workers: parking_lot::Mutex<Vec<std::thread::JoinHandle<()>>>,
147}
148
149impl Drop for RedDB {
150    fn drop(&mut self) {
151        // Issue #673 — wait for every `vector.turbo` background
152        // rebuild worker to exit before our `Arc<UnifiedStore>` is
153        // released. The worker holds a strong handle to the store
154        // during its work phase, so without this join a fast
155        // `RedDBRuntime` restart on the same path observes the
156        // file lock as still held by the soon-to-exit worker.
157        let handles: Vec<_> = self.turbo_rebuild_workers.lock().drain(..).collect();
158        for h in handles {
159            let _ = h.join();
160        }
161        // Issue #674 — also join any in-flight `.tv` snapshot dump
162        // so the on-disk file is complete and renamed before a
163        // restart observes it. Without this, a fast reopen can race
164        // and see the `<path>.tv.tmp` before the atomic rename.
165        if let Some(map) = self.turbo_collections.get() {
166            let states: Vec<_> = map.lock().values().cloned().collect();
167            for state in states {
168                state.wait_snapshot();
169            }
170        }
171    }
172}
173
174/// A cached HNSW index together with the entity count at build time.
175/// When the live entity count diverges the cache is considered stale and
176/// is rebuilt on the next query.
177pub(crate) struct CachedVectorIndex {
178    pub index: Arc<RwLock<HnswIndex>>,
179    pub entity_count: usize,
180}
181
182#[derive(Debug, Clone)]
183pub struct NativeHeaderMismatch {
184    pub field: &'static str,
185    pub native: String,
186    pub expected: String,
187}
188
189#[derive(Debug, Clone)]
190pub struct NativeHeaderInspection {
191    pub native: PhysicalFileHeader,
192    pub expected: PhysicalFileHeader,
193    pub consistent: bool,
194    pub mismatches: Vec<NativeHeaderMismatch>,
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum NativeHeaderRepairPolicy {
199    InSync,
200    RepairNativeFromMetadata,
201    NativeAheadOfMetadata,
202}
203
204#[derive(Debug, Clone)]
205pub struct PhysicalAuthorityStatus {
206    pub preference: String,
207    pub sidecar_available: bool,
208    pub native_state_available: bool,
209    pub native_bootstrap_ready: bool,
210    pub native_registry_complete: Option<bool>,
211    pub native_recovery_complete: Option<bool>,
212    pub native_catalog_complete: Option<bool>,
213    pub sidecar_loaded_from: Option<String>,
214    pub native_header_repair_policy: Option<String>,
215    pub metadata_sequence: Option<u64>,
216    pub native_sequence: Option<u64>,
217    pub native_metadata_last_loaded_from: Option<String>,
218    pub native_metadata_generated_at_unix_ms: Option<u128>,
219}
220
221#[derive(Debug, Clone)]
222pub struct NativeVectorArtifactInspection {
223    pub collection: String,
224    pub artifact_kind: String,
225    pub root_page: u32,
226    pub page_count: u32,
227    pub byte_len: u64,
228    pub checksum: u64,
229    pub node_count: u64,
230    pub dimension: u32,
231    pub max_layer: u32,
232    pub total_connections: u64,
233    pub avg_connections: f64,
234    pub entry_point: Option<u64>,
235    pub ivf_n_lists: Option<u32>,
236    pub ivf_non_empty_lists: Option<u32>,
237    pub ivf_trained: Option<bool>,
238    pub graph_edge_count: Option<u64>,
239    pub graph_node_count: Option<u64>,
240    pub graph_label_count: Option<u32>,
241    pub text_doc_count: Option<u64>,
242    pub text_term_count: Option<u64>,
243    pub text_posting_count: Option<u64>,
244    pub document_doc_count: Option<u64>,
245    pub document_path_count: Option<u64>,
246    pub document_value_count: Option<u64>,
247    pub document_unique_value_count: Option<u64>,
248}
249
250#[derive(Debug, Clone)]
251pub struct NativeVectorArtifactBatchInspection {
252    pub inspected_count: usize,
253    pub valid_count: usize,
254    pub artifacts: Vec<NativeVectorArtifactInspection>,
255    pub failures: Vec<(String, String, String)>,
256}
257
258mod impl_access;
259mod impl_core_a;
260mod impl_core_b;
261mod impl_ec;
262mod impl_metadata;
263mod impl_registry;
264
265impl Default for RedDB {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271fn infer_collection_index_kind(model: CollectionModel, index_name: &str) -> IndexKind {
272    match index_name {
273        "graph-adjacency" => IndexKind::GraphAdjacency,
274        "vector-hnsw" => IndexKind::VectorHnsw,
275        "vector-inverted" => IndexKind::VectorInverted,
276        "vector-turbo" => IndexKind::VectorTurbo,
277        "text-fulltext" => IndexKind::FullText,
278        "document-pathvalue" => IndexKind::DocumentPathValue,
279        "search-hybrid" => IndexKind::HybridSearch,
280        _ => match model {
281            CollectionModel::Graph => IndexKind::GraphAdjacency,
282            CollectionModel::Vector => IndexKind::VectorHnsw,
283            CollectionModel::Document => IndexKind::DocumentPathValue,
284            CollectionModel::Kv | CollectionModel::Config | CollectionModel::Vault => {
285                IndexKind::Hash
286            }
287            _ => IndexKind::BTree,
288        },
289    }
290}
291
292fn estimate_index_entries(collection: &CollectionDescriptor, kind: IndexKind) -> usize {
293    match kind {
294        IndexKind::BTree | IndexKind::Hash | IndexKind::Bitmap | IndexKind::Spatial => {
295            collection.entities
296        }
297        IndexKind::GraphAdjacency => collection.cross_refs.max(collection.entities),
298        IndexKind::VectorHnsw | IndexKind::VectorInverted | IndexKind::VectorTurbo => {
299            collection.entities
300        }
301        IndexKind::FullText => collection.entities.saturating_mul(4),
302        IndexKind::DocumentPathValue => collection.entities.saturating_mul(6),
303        IndexKind::HybridSearch => collection.entities,
304    }
305}
306
307fn estimate_index_memory(entries: usize, kind: IndexKind) -> u64 {
308    let per_entry = match kind {
309        IndexKind::BTree => 64,
310        IndexKind::Hash => 48,
311        IndexKind::Bitmap => 2,   // Roaring bitmaps are very compact
312        IndexKind::Spatial => 40, // R-tree node: 2 floats + EntityId + overhead
313        IndexKind::GraphAdjacency => 96,
314        IndexKind::VectorHnsw => 256,
315        IndexKind::VectorInverted => 128,
316        IndexKind::VectorTurbo => 64,
317        IndexKind::FullText => 80,
318        IndexKind::DocumentPathValue => 104,
319        IndexKind::HybridSearch => 144,
320    };
321    (entries as u64).saturating_mul(per_entry)
322}
323
324fn index_backend_name(kind: IndexKind) -> &'static str {
325    match kind {
326        IndexKind::BTree => "page-btree",
327        IndexKind::Hash => "hash-map",
328        IndexKind::Bitmap => "roaring-bitmap",
329        IndexKind::Spatial => "rstar-rtree",
330        IndexKind::GraphAdjacency => "adjacency-map",
331        IndexKind::VectorHnsw => "vector-hnsw",
332        IndexKind::VectorInverted => "vector-ivf",
333        IndexKind::VectorTurbo => "vector-turbo",
334        IndexKind::FullText => "inverted-text",
335        IndexKind::DocumentPathValue => "document-pathvalue",
336        IndexKind::HybridSearch => "hybrid-score",
337    }
338}
339
340fn fnv1a_seed() -> u64 {
341    0xcbf29ce484222325
342}
343
344fn fnv1a_hash_bytes(hash: &mut u64, bytes: &[u8]) {
345    for byte in bytes {
346        *hash ^= *byte as u64;
347        *hash = hash.wrapping_mul(0x100000001b3);
348    }
349}
350
351fn fnv1a_hash_value<T: Debug>(hash: &mut u64, value: &T) {
352    let rendered = format!("{value:?}");
353    fnv1a_hash_bytes(hash, rendered.as_bytes());
354}