1use std::collections::HashMap;
6use std::fs;
7use std::fs::File;
8use std::io::Read;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, RwLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12use std::{
13 collections::{BTreeMap, BTreeSet},
14 fmt::Debug,
15};
16
17use super::super::{
18 EntityData, EntityId, EntityKind, StoreStats, UnifiedEntity, UnifiedStore, UnifiedStoreConfig,
19};
20use super::batch::BatchBuilder;
21use super::builders::{
22 DocumentBuilder, EdgeBuilder, KvBuilder, NodeBuilder, RowBuilder, VectorBuilder,
23};
24use super::helpers::cosine_similarity;
25use super::query::QueryBuilder;
26use super::refs::{NodeRef, TableRef, VectorRef};
27use super::types::{LinkedEntity, SimilarResult};
28use super::{IndexConfig, Preprocessor, SharedPreprocessors};
29use crate::api::{Capability, CatalogSnapshot, CollectionStats, RedDBOptions, StorageMode};
30use crate::catalog::{
31 consistency_report, snapshot_store_with_declarations, CatalogConsistencyReport,
32 CatalogDeclarations, CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
33 CollectionModel,
34};
35use crate::health::{storage_file_health, HealthReport, HealthState};
36use crate::index::{IndexCatalog, IndexConfig as RuntimeIndexConfig, IndexKind};
37use crate::physical::{
38 ExportDescriptor, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
39 PhysicalMetadataFile,
40};
41use crate::replication::{primary::PrimaryReplication, ReplicationRole};
42use crate::serde_json::Value as JsonValue;
43use crate::storage::engine::{HnswIndex, IvfConfig, IvfIndex, IvfStats, PhysicalFileHeader};
44use crate::storage::schema::Value;
45use crate::storage::unified::store::{
46 NativeCatalogCollectionSummary, NativeCatalogSummary, NativeExportSummary,
47 NativeManifestSummary, NativeMetadataStateSummary, NativePhysicalState, NativeRecoverySummary,
48 NativeRegistryIndexSummary, NativeRegistryJobSummary, NativeRegistryProjectionSummary,
49 NativeRegistrySummary, NativeSnapshotSummary, NativeVectorArtifactPageSummary,
50 NativeVectorArtifactSummary,
51};
52
53pub struct RedDB {
58 store: Arc<UnifiedStore>,
59 preprocessors: SharedPreprocessors,
61 index_config: IndexConfig,
63 path: Option<PathBuf>,
65 options: RedDBOptions,
67 paged_mode: bool,
69 vector_indexes: RwLock<HashMap<String, CachedVectorIndex>>,
72 collection_ttl_defaults_ms: RwLock<HashMap<String, u64>>,
74 pub(crate) collection_contract_cache:
82 RwLock<Option<Arc<HashMap<String, Arc<crate::physical::CollectionContract>>>>>,
83 pub(crate) remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
85 pub(crate) remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
89 pub(crate) remote_key: Option<String>,
91 pub(crate) replication: Option<Arc<PrimaryReplication>>,
93 pub(crate) quorum: Option<Arc<crate::replication::quorum::QuorumCoordinator>>,
101 pub(crate) ec_registry: Arc<crate::ec::config::EcRegistry>,
103 pub(crate) ml_runtime: std::sync::OnceLock<crate::storage::ml::MlRuntime>,
109 pub(crate) semantic_cache: std::sync::OnceLock<Arc<crate::storage::ml::SemanticCache>>,
113 pub(crate) hypertables:
118 std::sync::OnceLock<Arc<crate::storage::timeseries::HypertableRegistry>>,
119 pub(crate) continuous_aggregates: std::sync::OnceLock<
123 Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine>,
124 >,
125 pub(crate) turbo_collections: std::sync::OnceLock<
131 Arc<
132 parking_lot::Mutex<
133 std::collections::HashMap<
134 String,
135 Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
136 >,
137 >,
138 >,
139 >,
140 pub(crate) turbo_rebuild_workers: parking_lot::Mutex<Vec<std::thread::JoinHandle<()>>>,
147}
148
149impl Drop for RedDB {
150 fn drop(&mut self) {
151 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
152 && self.options.storage_profile.packaging
153 == crate::storage::StoragePackaging::SingleFile
154 && !self.options.read_only
155 {
156 if let Some(path) = &self.path {
157 let snapshot = self.store.to_binary_dump_bytes();
158 let _ = crate::storage::EmbeddedRdbArtifact::write_snapshot(path, &snapshot);
159 }
160 }
161
162 let handles: Vec<_> = self.turbo_rebuild_workers.lock().drain(..).collect();
169 for h in handles {
170 let _ = h.join();
171 }
172 if let Some(map) = self.turbo_collections.get() {
177 let states: Vec<_> = map.lock().values().cloned().collect();
178 for state in states {
179 state.wait_snapshot();
180 }
181 }
182 }
183}
184
185pub(crate) struct CachedVectorIndex {
189 pub index: Arc<RwLock<HnswIndex>>,
190 pub entity_count: usize,
191}
192
193#[derive(Debug, Clone)]
194pub struct NativeHeaderMismatch {
195 pub field: &'static str,
196 pub native: String,
197 pub expected: String,
198}
199
200#[derive(Debug, Clone)]
201pub struct NativeHeaderInspection {
202 pub native: PhysicalFileHeader,
203 pub expected: PhysicalFileHeader,
204 pub consistent: bool,
205 pub mismatches: Vec<NativeHeaderMismatch>,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209pub enum NativeHeaderRepairPolicy {
210 InSync,
211 RepairNativeFromMetadata,
212 NativeAheadOfMetadata,
213}
214
215#[derive(Debug, Clone)]
216pub struct PhysicalAuthorityStatus {
217 pub preference: String,
218 pub sidecar_available: bool,
219 pub native_state_available: bool,
220 pub native_bootstrap_ready: bool,
221 pub native_registry_complete: Option<bool>,
222 pub native_recovery_complete: Option<bool>,
223 pub native_catalog_complete: Option<bool>,
224 pub sidecar_loaded_from: Option<String>,
225 pub native_header_repair_policy: Option<String>,
226 pub metadata_sequence: Option<u64>,
227 pub native_sequence: Option<u64>,
228 pub native_metadata_last_loaded_from: Option<String>,
229 pub native_metadata_generated_at_unix_ms: Option<u128>,
230}
231
232#[derive(Debug, Clone)]
233pub struct NativeVectorArtifactInspection {
234 pub collection: String,
235 pub artifact_kind: String,
236 pub root_page: u32,
237 pub page_count: u32,
238 pub byte_len: u64,
239 pub checksum: u64,
240 pub node_count: u64,
241 pub dimension: u32,
242 pub max_layer: u32,
243 pub total_connections: u64,
244 pub avg_connections: f64,
245 pub entry_point: Option<u64>,
246 pub ivf_n_lists: Option<u32>,
247 pub ivf_non_empty_lists: Option<u32>,
248 pub ivf_trained: Option<bool>,
249 pub graph_edge_count: Option<u64>,
250 pub graph_node_count: Option<u64>,
251 pub graph_label_count: Option<u32>,
252 pub text_doc_count: Option<u64>,
253 pub text_term_count: Option<u64>,
254 pub text_posting_count: Option<u64>,
255 pub document_doc_count: Option<u64>,
256 pub document_path_count: Option<u64>,
257 pub document_value_count: Option<u64>,
258 pub document_unique_value_count: Option<u64>,
259}
260
261#[derive(Debug, Clone)]
262pub struct NativeVectorArtifactBatchInspection {
263 pub inspected_count: usize,
264 pub valid_count: usize,
265 pub artifacts: Vec<NativeVectorArtifactInspection>,
266 pub failures: Vec<(String, String, String)>,
267}
268
269mod impl_access;
270mod impl_core_a;
271mod impl_core_b;
272mod impl_ec;
273mod impl_metadata;
274mod impl_registry;
275
276impl Default for RedDB {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282fn infer_collection_index_kind(model: CollectionModel, index_name: &str) -> IndexKind {
283 match index_name {
284 "graph-adjacency" => IndexKind::GraphAdjacency,
285 "vector-hnsw" => IndexKind::VectorHnsw,
286 "vector-inverted" => IndexKind::VectorInverted,
287 "vector-turbo" => IndexKind::VectorTurbo,
288 "text-fulltext" => IndexKind::FullText,
289 "document-pathvalue" => IndexKind::DocumentPathValue,
290 "search-hybrid" => IndexKind::HybridSearch,
291 _ => match model {
292 CollectionModel::Graph => IndexKind::GraphAdjacency,
293 CollectionModel::Vector => IndexKind::VectorHnsw,
294 CollectionModel::Document => IndexKind::DocumentPathValue,
295 CollectionModel::Kv | CollectionModel::Config | CollectionModel::Vault => {
296 IndexKind::Hash
297 }
298 _ => IndexKind::BTree,
299 },
300 }
301}
302
303fn estimate_index_entries(collection: &CollectionDescriptor, kind: IndexKind) -> usize {
304 match kind {
305 IndexKind::BTree | IndexKind::Hash | IndexKind::Bitmap | IndexKind::Spatial => {
306 collection.entities
307 }
308 IndexKind::GraphAdjacency => collection.cross_refs.max(collection.entities),
309 IndexKind::VectorHnsw | IndexKind::VectorInverted | IndexKind::VectorTurbo => {
310 collection.entities
311 }
312 IndexKind::FullText => collection.entities.saturating_mul(4),
313 IndexKind::DocumentPathValue => collection.entities.saturating_mul(6),
314 IndexKind::HybridSearch => collection.entities,
315 }
316}
317
318fn estimate_index_memory(entries: usize, kind: IndexKind) -> u64 {
319 let per_entry = match kind {
320 IndexKind::BTree => 64,
321 IndexKind::Hash => 48,
322 IndexKind::Bitmap => 2, IndexKind::Spatial => 40, IndexKind::GraphAdjacency => 96,
325 IndexKind::VectorHnsw => 256,
326 IndexKind::VectorInverted => 128,
327 IndexKind::VectorTurbo => 64,
328 IndexKind::FullText => 80,
329 IndexKind::DocumentPathValue => 104,
330 IndexKind::HybridSearch => 144,
331 };
332 (entries as u64).saturating_mul(per_entry)
333}
334
335fn index_backend_name(kind: IndexKind) -> &'static str {
336 match kind {
337 IndexKind::BTree => "page-btree",
338 IndexKind::Hash => "hash-map",
339 IndexKind::Bitmap => "roaring-bitmap",
340 IndexKind::Spatial => "rstar-rtree",
341 IndexKind::GraphAdjacency => "adjacency-map",
342 IndexKind::VectorHnsw => "vector-hnsw",
343 IndexKind::VectorInverted => "vector-ivf",
344 IndexKind::VectorTurbo => "vector-turbo",
345 IndexKind::FullText => "inverted-text",
346 IndexKind::DocumentPathValue => "document-pathvalue",
347 IndexKind::HybridSearch => "hybrid-score",
348 }
349}
350
351fn fnv1a_seed() -> u64 {
352 0xcbf29ce484222325
353}
354
355fn fnv1a_hash_bytes(hash: &mut u64, bytes: &[u8]) {
356 for byte in bytes {
357 *hash ^= *byte as u64;
358 *hash = hash.wrapping_mul(0x100000001b3);
359 }
360}
361
362fn fnv1a_hash_value<T: Debug>(hash: &mut u64, value: &T) {
363 let rendered = format!("{value:?}");
364 fnv1a_hash_bytes(hash, rendered.as_bytes());
365}