1use std::collections::HashMap;
6use std::fs;
7use std::fs::File;
8use std::io::Read;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, RwLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12use std::{
13 collections::{BTreeMap, BTreeSet},
14 fmt::Debug,
15};
16
17use super::super::{
18 EntityData, EntityId, EntityKind, StoreStats, UnifiedEntity, UnifiedStore, UnifiedStoreConfig,
19};
20use super::batch::BatchBuilder;
21use super::builders::{
22 DocumentBuilder, EdgeBuilder, KvBuilder, NodeBuilder, RowBuilder, VectorBuilder,
23};
24use super::helpers::cosine_similarity;
25use super::query::QueryBuilder;
26use super::refs::{NodeRef, TableRef, VectorRef};
27use super::types::{LinkedEntity, SimilarResult};
28use super::{IndexConfig, Preprocessor, SharedPreprocessors};
29use crate::api::{Capability, CatalogSnapshot, CollectionStats, RedDBOptions, StorageMode};
30use crate::catalog::{
31 consistency_report, snapshot_store_with_declarations, CatalogConsistencyReport,
32 CatalogDeclarations, CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
33 CollectionModel,
34};
35use crate::health::{storage_file_health, HealthReport, HealthState};
36use crate::index::{IndexCatalog, IndexConfig as RuntimeIndexConfig, IndexKind};
37use crate::physical::{
38 ExportDescriptor, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
39 PhysicalMetadataFile,
40};
41use crate::replication::{primary::PrimaryReplication, ReplicationRole};
42use crate::serde_json::Value as JsonValue;
43use crate::storage::engine::{HnswIndex, IvfConfig, IvfIndex, IvfStats, PhysicalFileHeader};
44use crate::storage::schema::Value;
45use crate::storage::unified::store::{
46 NativeCatalogCollectionSummary, NativeCatalogSummary, NativeExportSummary,
47 NativeManifestSummary, NativeMetadataStateSummary, NativePhysicalState, NativeRecoverySummary,
48 NativeRegistryIndexSummary, NativeRegistryJobSummary, NativeRegistryProjectionSummary,
49 NativeRegistrySummary, NativeSnapshotSummary, NativeVectorArtifactPageSummary,
50 NativeVectorArtifactSummary,
51};
52
53pub struct RedDB {
58 store: Arc<UnifiedStore>,
59 preprocessors: SharedPreprocessors,
61 index_config: IndexConfig,
63 path: Option<PathBuf>,
65 options: RedDBOptions,
67 paged_mode: bool,
69 vector_indexes: RwLock<HashMap<String, CachedVectorIndex>>,
72 collection_ttl_defaults_ms: RwLock<HashMap<String, u64>>,
74 pub(crate) collection_contract_cache:
82 RwLock<Option<Arc<HashMap<String, Arc<crate::physical::CollectionContract>>>>>,
83 pub(crate) remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
85 pub(crate) remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
89 pub(crate) remote_key: Option<String>,
91 pub(crate) replication: Option<Arc<PrimaryReplication>>,
93 pub(crate) quorum: Option<Arc<crate::replication::quorum::QuorumCoordinator>>,
101 pub(crate) ec_registry: Arc<crate::ec::config::EcRegistry>,
103 pub(crate) ml_runtime: std::sync::OnceLock<crate::storage::ml::MlRuntime>,
109 pub(crate) semantic_cache: std::sync::OnceLock<Arc<crate::storage::ml::SemanticCache>>,
113 pub(crate) hypertables:
118 std::sync::OnceLock<Arc<crate::storage::timeseries::HypertableRegistry>>,
119 pub(crate) continuous_aggregates: std::sync::OnceLock<
123 Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine>,
124 >,
125 pub(crate) turbo_collections: std::sync::OnceLock<
131 Arc<
132 parking_lot::Mutex<
133 std::collections::HashMap<
134 String,
135 Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
136 >,
137 >,
138 >,
139 >,
140 pub(crate) turbo_rebuild_workers: parking_lot::Mutex<Vec<std::thread::JoinHandle<()>>>,
147 _ephemeral_cleanup: Option<EphemeralDataPathCleanup>,
148}
149
150pub(super) struct EphemeralDataPathCleanup {
151 path: PathBuf,
152}
153
154impl EphemeralDataPathCleanup {
155 pub(super) fn new(path: PathBuf) -> Self {
156 Self { path }
157 }
158}
159
160impl Drop for EphemeralDataPathCleanup {
161 fn drop(&mut self) {
162 for path in ephemeral_data_artifacts(&self.path) {
163 if path.is_dir() {
164 let _ = fs::remove_dir_all(path);
165 } else {
166 let _ = fs::remove_file(path);
167 }
168 }
169 if let Some(parent) = self.path.parent() {
173 if is_ephemeral_owner_dir(parent) {
174 let _ = fs::remove_dir_all(parent);
175 }
176 }
177 }
178}
179
180fn is_ephemeral_owner_dir(dir: &Path) -> bool {
183 let Some(dir_name) = dir.file_name().and_then(|name| name.to_str()) else {
184 return false;
185 };
186 if !dir_name.starts_with("reddb-ephemeral-") {
187 return false;
188 }
189 dir.parent()
190 .map(|grandparent| grandparent == std::env::temp_dir())
191 .unwrap_or(false)
192}
193
194pub(super) fn is_ephemeral_data_path(path: &Path) -> bool {
195 path.parent().map(is_ephemeral_owner_dir).unwrap_or(false)
200}
201
202fn ephemeral_data_artifacts(data_path: &Path) -> Vec<PathBuf> {
203 let logical_wal_path = reddb_file::layout::logical_wal_path(data_path);
204 let result_cache_l2_path = reddb_file::layout::result_cache_l2_path(data_path);
205 let legacy_logical_slots_path = reddb_file::layout::legacy_logical_slots_path(data_path);
206 let mut operational_manifest_root = data_path.as_os_str().to_os_string();
207 operational_manifest_root.push(".ops");
208 let mut paths = vec![
209 data_path.to_path_buf(),
210 PathBuf::from(operational_manifest_root),
211 reddb_file::layout::unified_wal_path(data_path),
212 logical_wal_path.clone(),
213 reddb_file::layout::logical_wal_temp_path(&logical_wal_path),
214 reddb_file::layout::temp_path(data_path),
215 reddb_file::layout::atomic_temp_path(data_path),
216 result_cache_l2_path.clone(),
217 reddb_file::layout::pager_legacy_wal_path(data_path),
218 reddb_file::layout::engine_wal_path(data_path),
219 reddb_file::layout::pager_header_path(data_path),
220 reddb_file::layout::pager_meta_path(data_path),
221 reddb_file::layout::pager_dwb_path(data_path),
222 legacy_logical_slots_path.clone(),
223 reddb_file::layout::legacy_logical_slots_temp_path(&legacy_logical_slots_path),
224 reddb_file::layout::legacy_audit_log_path(data_path),
225 reddb_file::layout::shm_path(data_path),
226 reddb_file::layout::physical_metadata_json_path(data_path),
227 reddb_file::layout::physical_metadata_binary_path(data_path),
228 reddb_file::layout::rebootstrap_staging_root(data_path),
229 reddb_file::layout::rebootstrap_pending_path(data_path),
230 reddb_file::layout::rebootstrap_ready_marker_path(data_path),
231 reddb_file::layout::rebootstrap_intent_log_path(data_path),
232 reddb_file::layout::rebootstrap_previous_path(data_path),
233 reddb_file::layout::primary_replica_root(data_path),
234 reddb_file::layout::serverless_root(data_path),
235 ];
236 paths.extend(reddb_file::layout::pager_shadow_sidecar_paths(data_path));
237 paths.extend(reddb_file::layout::pager_shadow_sidecar_paths(
238 &result_cache_l2_path,
239 ));
240 if let Some(parent) = data_path.parent() {
241 paths.push(reddb_file::layout::legacy_slow_query_log_path(parent));
242 }
243 paths.push(reddb_file::layout::support_dir_for(data_path));
244 paths
245}
246
247impl Drop for RedDB {
248 fn drop(&mut self) {
249 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
250 && self.options.storage_profile.packaging
251 == crate::storage::StoragePackaging::SingleFile
252 && !self.paged_mode
253 && !self.options.read_only
254 {
255 if let Some(path) = &self.path {
256 let snapshot = self.store.to_binary_dump_bytes();
257 let _ = crate::storage::EmbeddedRdbArtifact::write_snapshot(path, &snapshot);
258 }
259 }
260
261 let handles: Vec<_> = self.turbo_rebuild_workers.lock().drain(..).collect();
268 for h in handles {
269 let _ = h.join();
270 }
271 if let Some(map) = self.turbo_collections.get() {
276 let states: Vec<_> = map.lock().values().cloned().collect();
277 for state in states {
278 state.wait_snapshot();
279 }
280 }
281 }
282}
283
284pub(crate) struct CachedVectorIndex {
288 pub index: Arc<RwLock<HnswIndex>>,
289 pub entity_count: usize,
290}
291
292#[derive(Debug, Clone)]
293pub struct NativeHeaderMismatch {
294 pub field: &'static str,
295 pub native: String,
296 pub expected: String,
297}
298
299#[derive(Debug, Clone)]
300pub struct NativeHeaderInspection {
301 pub native: PhysicalFileHeader,
302 pub expected: PhysicalFileHeader,
303 pub consistent: bool,
304 pub mismatches: Vec<NativeHeaderMismatch>,
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq)]
308pub enum NativeHeaderRepairPolicy {
309 InSync,
310 RepairNativeFromMetadata,
311 NativeAheadOfMetadata,
312}
313
314#[derive(Debug, Clone)]
315pub struct PhysicalAuthorityStatus {
316 pub preference: String,
317 pub sidecar_available: bool,
318 pub native_state_available: bool,
319 pub native_bootstrap_ready: bool,
320 pub native_registry_complete: Option<bool>,
321 pub native_recovery_complete: Option<bool>,
322 pub native_catalog_complete: Option<bool>,
323 pub sidecar_loaded_from: Option<String>,
324 pub native_header_repair_policy: Option<String>,
325 pub metadata_sequence: Option<u64>,
326 pub native_sequence: Option<u64>,
327 pub native_metadata_last_loaded_from: Option<String>,
328 pub native_metadata_generated_at_unix_ms: Option<u128>,
329}
330
331#[derive(Debug, Clone)]
332pub struct NativeVectorArtifactInspection {
333 pub collection: String,
334 pub artifact_kind: String,
335 pub root_page: u32,
336 pub page_count: u32,
337 pub byte_len: u64,
338 pub checksum: u64,
339 pub node_count: u64,
340 pub dimension: u32,
341 pub max_layer: u32,
342 pub total_connections: u64,
343 pub avg_connections: f64,
344 pub entry_point: Option<u64>,
345 pub ivf_n_lists: Option<u32>,
346 pub ivf_non_empty_lists: Option<u32>,
347 pub ivf_trained: Option<bool>,
348 pub graph_edge_count: Option<u64>,
349 pub graph_node_count: Option<u64>,
350 pub graph_label_count: Option<u32>,
351 pub text_doc_count: Option<u64>,
352 pub text_term_count: Option<u64>,
353 pub text_posting_count: Option<u64>,
354 pub document_doc_count: Option<u64>,
355 pub document_path_count: Option<u64>,
356 pub document_value_count: Option<u64>,
357 pub document_unique_value_count: Option<u64>,
358}
359
360#[derive(Debug, Clone)]
361pub struct NativeVectorArtifactBatchInspection {
362 pub inspected_count: usize,
363 pub valid_count: usize,
364 pub artifacts: Vec<NativeVectorArtifactInspection>,
365 pub failures: Vec<(String, String, String)>,
366}
367
368mod impl_access;
369mod impl_core_a;
370mod impl_core_b;
371mod impl_ec;
372mod impl_metadata;
373mod impl_registry;
374
375impl Default for RedDB {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381fn infer_collection_index_kind(model: CollectionModel, index_name: &str) -> IndexKind {
382 match index_name {
383 "graph-adjacency" => IndexKind::GraphAdjacency,
384 "vector-hnsw" => IndexKind::VectorHnsw,
385 "vector-inverted" => IndexKind::VectorInverted,
386 "vector-turbo" => IndexKind::VectorTurbo,
387 "text-fulltext" => IndexKind::FullText,
388 "document-pathvalue" => IndexKind::DocumentPathValue,
389 "search-hybrid" => IndexKind::HybridSearch,
390 _ => match model {
391 CollectionModel::Graph => IndexKind::GraphAdjacency,
392 CollectionModel::Vector => IndexKind::VectorHnsw,
393 CollectionModel::Document => IndexKind::DocumentPathValue,
394 CollectionModel::Kv | CollectionModel::Config | CollectionModel::Vault => {
395 IndexKind::Hash
396 }
397 _ => IndexKind::BTree,
398 },
399 }
400}
401
402fn estimate_index_entries(collection: &CollectionDescriptor, kind: IndexKind) -> usize {
403 match kind {
404 IndexKind::BTree | IndexKind::Hash | IndexKind::Bitmap | IndexKind::Spatial => {
405 collection.entities
406 }
407 IndexKind::GraphAdjacency => collection.cross_refs.max(collection.entities),
408 IndexKind::VectorHnsw | IndexKind::VectorInverted | IndexKind::VectorTurbo => {
409 collection.entities
410 }
411 IndexKind::FullText => collection.entities.saturating_mul(4),
412 IndexKind::DocumentPathValue => collection.entities.saturating_mul(6),
413 IndexKind::HybridSearch => collection.entities,
414 }
415}
416
417fn estimate_index_memory(entries: usize, kind: IndexKind) -> u64 {
418 let per_entry = match kind {
419 IndexKind::BTree => 64,
420 IndexKind::Hash => 48,
421 IndexKind::Bitmap => 2, IndexKind::Spatial => 40, IndexKind::GraphAdjacency => 96,
424 IndexKind::VectorHnsw => 256,
425 IndexKind::VectorInverted => 128,
426 IndexKind::VectorTurbo => 64,
427 IndexKind::FullText => 80,
428 IndexKind::DocumentPathValue => 104,
429 IndexKind::HybridSearch => 144,
430 };
431 (entries as u64).saturating_mul(per_entry)
432}
433
434fn index_backend_name(kind: IndexKind) -> &'static str {
435 match kind {
436 IndexKind::BTree => "page-btree",
437 IndexKind::Hash => "hash-map",
438 IndexKind::Bitmap => "roaring-bitmap",
439 IndexKind::Spatial => "rstar-rtree",
440 IndexKind::GraphAdjacency => "adjacency-map",
441 IndexKind::VectorHnsw => "vector-hnsw",
442 IndexKind::VectorInverted => "vector-ivf",
443 IndexKind::VectorTurbo => "vector-turbo",
444 IndexKind::FullText => "inverted-text",
445 IndexKind::DocumentPathValue => "document-pathvalue",
446 IndexKind::HybridSearch => "hybrid-score",
447 }
448}
449
450fn fnv1a_seed() -> u64 {
451 0xcbf29ce484222325
452}
453
454fn fnv1a_hash_bytes(hash: &mut u64, bytes: &[u8]) {
455 for byte in bytes {
456 *hash ^= *byte as u64;
457 *hash = hash.wrapping_mul(0x100000001b3);
458 }
459}
460
461fn fnv1a_hash_value<T: Debug>(hash: &mut u64, value: &T) {
462 let rendered = format!("{value:?}");
463 fnv1a_hash_bytes(hash, rendered.as_bytes());
464}