Skip to main content

reddb_server/storage/unified/devx/
reddb.rs

1//! RedDB - Main Entry Point
2//!
3//! Unified Database with best-in-class developer experience for Tables, Graphs, and Vectors.
4
5use std::collections::HashMap;
6use std::fs;
7use std::fs::File;
8use std::io::Read;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, RwLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12use std::{
13    collections::{BTreeMap, BTreeSet},
14    fmt::Debug,
15};
16
17use super::super::{
18    EntityData, EntityId, EntityKind, StoreStats, UnifiedEntity, UnifiedStore, UnifiedStoreConfig,
19};
20use super::batch::BatchBuilder;
21use super::builders::{
22    DocumentBuilder, EdgeBuilder, KvBuilder, NodeBuilder, RowBuilder, VectorBuilder,
23};
24use super::helpers::cosine_similarity;
25use super::query::QueryBuilder;
26use super::refs::{NodeRef, TableRef, VectorRef};
27use super::types::{LinkedEntity, SimilarResult};
28use super::{IndexConfig, Preprocessor, SharedPreprocessors};
29use crate::api::{Capability, CatalogSnapshot, CollectionStats, RedDBOptions, StorageMode};
30use crate::catalog::{
31    consistency_report, snapshot_store_with_declarations, CatalogConsistencyReport,
32    CatalogDeclarations, CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
33    CollectionModel,
34};
35use crate::health::{storage_file_health, HealthReport, HealthState};
36use crate::index::{IndexCatalog, IndexConfig as RuntimeIndexConfig, IndexKind};
37use crate::physical::{
38    ExportDescriptor, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
39    PhysicalMetadataFile,
40};
41use crate::replication::{primary::PrimaryReplication, ReplicationRole};
42use crate::serde_json::Value as JsonValue;
43use crate::storage::engine::{HnswIndex, IvfConfig, IvfIndex, IvfStats, PhysicalFileHeader};
44use crate::storage::schema::Value;
45use crate::storage::unified::store::{
46    NativeCatalogCollectionSummary, NativeCatalogSummary, NativeExportSummary,
47    NativeManifestSummary, NativeMetadataStateSummary, NativePhysicalState, NativeRecoverySummary,
48    NativeRegistryIndexSummary, NativeRegistryJobSummary, NativeRegistryProjectionSummary,
49    NativeRegistrySummary, NativeSnapshotSummary, NativeVectorArtifactPageSummary,
50    NativeVectorArtifactSummary,
51};
52
53/// RedDB - Unified Database with Best-in-Class DevX
54///
55/// Single entry point for Tables, Graphs, and Vectors with full
56/// metadata support and cross-referencing.
57pub struct RedDB {
58    store: Arc<UnifiedStore>,
59    /// Preprocessing hooks
60    preprocessors: SharedPreprocessors,
61    /// Index configuration
62    index_config: IndexConfig,
63    /// Persistence path
64    path: Option<PathBuf>,
65    /// Construction/runtime options
66    options: RedDBOptions,
67    /// Whether the current persistence backend is page-based.
68    paged_mode: bool,
69    /// Per-collection HNSW vector index cache for fast approximate nearest neighbor search.
70    /// Lazily built on first vector similarity query per collection.
71    vector_indexes: RwLock<HashMap<String, CachedVectorIndex>>,
72    /// Default TTL policy declared at the collection level, in milliseconds.
73    collection_ttl_defaults_ms: RwLock<HashMap<String, u64>>,
74    /// In-memory cache of collection contracts keyed by collection name.
75    /// Populated lazily from `physical_metadata()` and invalidated on
76    /// `save_collection_contract` / `remove_collection_contract`.
77    /// Avoids reparsing the whole PhysicalMetadataFile JSON on every
78    /// `collection_contract(name)` lookup — which happens 3× per insert
79    /// (ensure_model, enforce_uniqueness, normalize_fields) and dominated
80    /// the insert hot path at ~30%.
81    pub(crate) collection_contract_cache:
82        RwLock<Option<Arc<HashMap<String, Arc<crate::physical::CollectionContract>>>>>,
83    /// Optional remote storage backend for snapshot transport.
84    pub(crate) remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
85    /// Optional CAS-capable handle for backends that implement
86    /// `AtomicRemoteBackend`. Mirrors `RedDBOptions::remote_backend_atomic`
87    /// — see that field for semantics.
88    pub(crate) remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
89    /// Remote object key used by the remote backend.
90    pub(crate) remote_key: Option<String>,
91    /// Primary replication state (only present when role is Primary).
92    pub(crate) replication: Option<Arc<PrimaryReplication>>,
93    /// Quorum coordinator for multi-region commits (Phase 2.6 PG parity).
94    ///
95    /// Only present when role is Primary. Write path calls
96    /// `quorum.wait_for_quorum(lsn)` after appending to the primary WAL
97    /// to block until the configured quorum of replicas has acked. When
98    /// the config is `Async` (default), this returns instantly — same
99    /// semantics as pre-Phase-2.6 RedDB.
100    pub(crate) quorum: Option<Arc<crate::replication::quorum::QuorumCoordinator>>,
101    /// Eventual consistency registry (embedded mode support).
102    pub(crate) ec_registry: Arc<crate::ec::config::EcRegistry>,
103    /// Lazily-initialised ML runtime (model registry + job queue +
104    /// semantic cache). Created on first access by the SQL layer so
105    /// `ML_CLASSIFY`, `SEMANTIC_CACHE_GET/PUT`, and friends have a
106    /// shared handle without forcing every instantiation path to
107    /// know about it.
108    pub(crate) ml_runtime: std::sync::OnceLock<crate::storage::ml::MlRuntime>,
109    /// Shared semantic cache used by `SEMANTIC_CACHE_*` scalars.
110    /// Separate from `MlRuntime` because cache config is runtime-only
111    /// and doesn't need the job queue — keep it a standalone `Arc`.
112    pub(crate) semantic_cache: std::sync::OnceLock<Arc<crate::storage::ml::SemanticCache>>,
113    /// Hypertable registry — populated by `CREATE HYPERTABLE` DDL,
114    /// consumed by chunk routing, retention sweeps, and `SHOW
115    /// HYPERTABLES`. Lazy so startup stays cheap when no hypertables
116    /// exist.
117    pub(crate) hypertables:
118        std::sync::OnceLock<Arc<crate::storage::timeseries::HypertableRegistry>>,
119    /// Continuous-aggregate engine — populated by `CA_REGISTER` and
120    /// queried by `CA_REFRESH` / `CA_STATE` scalars. Same lazy shape
121    /// as the other engine handles.
122    pub(crate) continuous_aggregates: std::sync::OnceLock<
123        Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine>,
124    >,
125}
126
127/// A cached HNSW index together with the entity count at build time.
128/// When the live entity count diverges the cache is considered stale and
129/// is rebuilt on the next query.
130pub(crate) struct CachedVectorIndex {
131    pub index: Arc<RwLock<HnswIndex>>,
132    pub entity_count: usize,
133}
134
135#[derive(Debug, Clone)]
136pub struct NativeHeaderMismatch {
137    pub field: &'static str,
138    pub native: String,
139    pub expected: String,
140}
141
142#[derive(Debug, Clone)]
143pub struct NativeHeaderInspection {
144    pub native: PhysicalFileHeader,
145    pub expected: PhysicalFileHeader,
146    pub consistent: bool,
147    pub mismatches: Vec<NativeHeaderMismatch>,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum NativeHeaderRepairPolicy {
152    InSync,
153    RepairNativeFromMetadata,
154    NativeAheadOfMetadata,
155}
156
157#[derive(Debug, Clone)]
158pub struct PhysicalAuthorityStatus {
159    pub preference: String,
160    pub sidecar_available: bool,
161    pub native_state_available: bool,
162    pub native_bootstrap_ready: bool,
163    pub native_registry_complete: Option<bool>,
164    pub native_recovery_complete: Option<bool>,
165    pub native_catalog_complete: Option<bool>,
166    pub sidecar_loaded_from: Option<String>,
167    pub native_header_repair_policy: Option<String>,
168    pub metadata_sequence: Option<u64>,
169    pub native_sequence: Option<u64>,
170    pub native_metadata_last_loaded_from: Option<String>,
171    pub native_metadata_generated_at_unix_ms: Option<u128>,
172}
173
174#[derive(Debug, Clone)]
175pub struct NativeVectorArtifactInspection {
176    pub collection: String,
177    pub artifact_kind: String,
178    pub root_page: u32,
179    pub page_count: u32,
180    pub byte_len: u64,
181    pub checksum: u64,
182    pub node_count: u64,
183    pub dimension: u32,
184    pub max_layer: u32,
185    pub total_connections: u64,
186    pub avg_connections: f64,
187    pub entry_point: Option<u64>,
188    pub ivf_n_lists: Option<u32>,
189    pub ivf_non_empty_lists: Option<u32>,
190    pub ivf_trained: Option<bool>,
191    pub graph_edge_count: Option<u64>,
192    pub graph_node_count: Option<u64>,
193    pub graph_label_count: Option<u32>,
194    pub text_doc_count: Option<u64>,
195    pub text_term_count: Option<u64>,
196    pub text_posting_count: Option<u64>,
197    pub document_doc_count: Option<u64>,
198    pub document_path_count: Option<u64>,
199    pub document_value_count: Option<u64>,
200    pub document_unique_value_count: Option<u64>,
201}
202
203#[derive(Debug, Clone)]
204pub struct NativeVectorArtifactBatchInspection {
205    pub inspected_count: usize,
206    pub valid_count: usize,
207    pub artifacts: Vec<NativeVectorArtifactInspection>,
208    pub failures: Vec<(String, String, String)>,
209}
210
211mod impl_access;
212mod impl_core_a;
213mod impl_core_b;
214mod impl_ec;
215mod impl_metadata;
216mod impl_registry;
217
218impl Default for RedDB {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224fn infer_collection_index_kind(model: CollectionModel, index_name: &str) -> IndexKind {
225    match index_name {
226        "graph-adjacency" => IndexKind::GraphAdjacency,
227        "vector-hnsw" => IndexKind::VectorHnsw,
228        "vector-inverted" => IndexKind::VectorInverted,
229        "text-fulltext" => IndexKind::FullText,
230        "document-pathvalue" => IndexKind::DocumentPathValue,
231        "search-hybrid" => IndexKind::HybridSearch,
232        _ => match model {
233            CollectionModel::Graph => IndexKind::GraphAdjacency,
234            CollectionModel::Vector => IndexKind::VectorHnsw,
235            CollectionModel::Document => IndexKind::DocumentPathValue,
236            CollectionModel::Kv | CollectionModel::Config | CollectionModel::Vault => {
237                IndexKind::Hash
238            }
239            _ => IndexKind::BTree,
240        },
241    }
242}
243
244fn estimate_index_entries(collection: &CollectionDescriptor, kind: IndexKind) -> usize {
245    match kind {
246        IndexKind::BTree | IndexKind::Hash | IndexKind::Bitmap | IndexKind::Spatial => {
247            collection.entities
248        }
249        IndexKind::GraphAdjacency => collection.cross_refs.max(collection.entities),
250        IndexKind::VectorHnsw | IndexKind::VectorInverted => collection.entities,
251        IndexKind::FullText => collection.entities.saturating_mul(4),
252        IndexKind::DocumentPathValue => collection.entities.saturating_mul(6),
253        IndexKind::HybridSearch => collection.entities,
254    }
255}
256
257fn estimate_index_memory(entries: usize, kind: IndexKind) -> u64 {
258    let per_entry = match kind {
259        IndexKind::BTree => 64,
260        IndexKind::Hash => 48,
261        IndexKind::Bitmap => 2,   // Roaring bitmaps are very compact
262        IndexKind::Spatial => 40, // R-tree node: 2 floats + EntityId + overhead
263        IndexKind::GraphAdjacency => 96,
264        IndexKind::VectorHnsw => 256,
265        IndexKind::VectorInverted => 128,
266        IndexKind::FullText => 80,
267        IndexKind::DocumentPathValue => 104,
268        IndexKind::HybridSearch => 144,
269    };
270    (entries as u64).saturating_mul(per_entry)
271}
272
273fn index_backend_name(kind: IndexKind) -> &'static str {
274    match kind {
275        IndexKind::BTree => "page-btree",
276        IndexKind::Hash => "hash-map",
277        IndexKind::Bitmap => "roaring-bitmap",
278        IndexKind::Spatial => "rstar-rtree",
279        IndexKind::GraphAdjacency => "adjacency-map",
280        IndexKind::VectorHnsw => "vector-hnsw",
281        IndexKind::VectorInverted => "vector-ivf",
282        IndexKind::FullText => "inverted-text",
283        IndexKind::DocumentPathValue => "document-pathvalue",
284        IndexKind::HybridSearch => "hybrid-score",
285    }
286}
287
288fn fnv1a_seed() -> u64 {
289    0xcbf29ce484222325
290}
291
292fn fnv1a_hash_bytes(hash: &mut u64, bytes: &[u8]) {
293    for byte in bytes {
294        *hash ^= *byte as u64;
295        *hash = hash.wrapping_mul(0x100000001b3);
296    }
297}
298
299fn fnv1a_hash_value<T: Debug>(hash: &mut u64, value: &T) {
300    let rendered = format!("{value:?}");
301    fnv1a_hash_bytes(hash, rendered.as_bytes());
302}