1use std::collections::{BTreeMap, HashMap};
22use std::fs::File;
23use std::io::{BufReader, Read};
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
26use std::sync::Arc;
27
28use parking_lot::RwLock;
29
30use super::context_index::ContextIndex;
31use super::entity::{
32 CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
33 GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
34};
35use super::entity_cache::EntityCache;
36use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
37use super::metadata::{Metadata, MetadataFilter, MetadataValue};
38use super::segment::SegmentError;
39use crate::api::{DurabilityMode, GroupCommitOptions};
40use crate::physical::{ManifestEvent, ManifestEventKind};
41use crate::storage::engine::pager::PagerError;
42use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
43use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
44use crate::storage::schema::types::Value;
45
46pub use reddb_file::{
47 is_supported_store_version, NativeCatalogCollectionSummary, NativeCatalogSummary,
48 NativeExportSummary, NativeManifestEntrySummary, NativeManifestSummary,
49 NativeMetadataStateSummary, NativeRecoverySummary, NativeRegistryIndexSummary,
50 NativeRegistryJobSummary, NativeRegistryProjectionSummary, NativeRegistrySummary,
51 NativeSnapshotSummary, NativeVectorArtifactPageSummary, NativeVectorArtifactSummary,
52 ENTITY_RECORD_MAGIC, METADATA_MAGIC, METADATA_OVERFLOW_MAGIC, NATIVE_BLOB_MAGIC,
53 NATIVE_CATALOG_MAGIC, NATIVE_COLLECTION_ROOTS_MAGIC, NATIVE_MANIFEST_MAGIC,
54 NATIVE_MANIFEST_SAMPLE_LIMIT, NATIVE_METADATA_STATE_MAGIC, NATIVE_RECOVERY_MAGIC,
55 NATIVE_REGISTRY_MAGIC, NATIVE_VECTOR_ARTIFACT_MAGIC, STORE_MAGIC, STORE_VERSION_CURRENT,
56 STORE_VERSION_V1, STORE_VERSION_V2, STORE_VERSION_V3, STORE_VERSION_V4, STORE_VERSION_V5,
57 STORE_VERSION_V6, STORE_VERSION_V7, STORE_VERSION_V8, STORE_VERSION_V9,
58};
59
60#[derive(Debug, Clone, Default, PartialEq, Eq)]
61pub struct MvccVacuumStats {
62 pub scanned_versions: u64,
63 pub retained_versions: u64,
64 pub reclaimed_versions: u64,
65 pub retained_history_versions: u64,
66 pub reclaimed_history_versions: u64,
67 pub retained_tombstones: u64,
68 pub reclaimed_tombstones: u64,
69}
70
71impl MvccVacuumStats {
72 pub fn add(&mut self, other: &Self) {
73 self.scanned_versions += other.scanned_versions;
74 self.retained_versions += other.retained_versions;
75 self.reclaimed_versions += other.reclaimed_versions;
76 self.retained_history_versions += other.retained_history_versions;
77 self.reclaimed_history_versions += other.reclaimed_history_versions;
78 self.retained_tombstones += other.retained_tombstones;
79 self.reclaimed_tombstones += other.reclaimed_tombstones;
80 }
81}
82
83#[derive(Debug, Clone)]
84pub struct NativePhysicalState {
85 pub header: PhysicalFileHeader,
86 pub collection_roots: BTreeMap<String, u64>,
87 pub manifest: Option<NativeManifestSummary>,
88 pub registry: Option<NativeRegistrySummary>,
89 pub recovery: Option<NativeRecoverySummary>,
90 pub catalog: Option<NativeCatalogSummary>,
91 pub metadata_state: Option<NativeMetadataStateSummary>,
92 pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
93}
94
95#[derive(Debug, Clone)]
101pub struct UnifiedStoreConfig {
102 pub manager_config: ManagerConfig,
104 pub auto_index_refs: bool,
106 pub auto_index_id: bool,
115 pub max_cross_refs: usize,
117 pub enable_wal: bool,
119 pub durability_mode: DurabilityMode,
121 pub group_commit: GroupCommitOptions,
123 pub data_dir: Option<std::path::PathBuf>,
125 pub embedded_wal_path: Option<std::path::PathBuf>,
127}
128
129impl Default for UnifiedStoreConfig {
130 fn default() -> Self {
131 Self {
132 manager_config: ManagerConfig::default(),
133 auto_index_refs: true,
134 auto_index_id: true,
135 max_cross_refs: 1000,
136 enable_wal: false,
137 durability_mode: DurabilityMode::WalDurableGrouped,
140 group_commit: GroupCommitOptions::default(),
141 data_dir: None,
142 embedded_wal_path: None,
143 }
144 }
145}
146
147impl UnifiedStoreConfig {
148 pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
150 self.data_dir = Some(path.into());
151 self
152 }
153
154 pub fn with_wal(mut self) -> Self {
156 self.enable_wal = true;
157 self
158 }
159
160 pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
161 self.durability_mode = mode;
162 self
163 }
164
165 pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
166 self.group_commit = options;
167 self
168 }
169
170 pub fn with_embedded_wal_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
171 self.embedded_wal_path = Some(path.into());
172 self
173 }
174
175 pub fn with_max_refs(mut self, max: usize) -> Self {
177 self.max_cross_refs = max;
178 self
179 }
180}
181
182#[derive(Debug)]
188pub enum StoreError {
189 CollectionExists(String),
191 CollectionNotFound(String),
193 EntityNotFound(EntityId),
195 TooManyRefs(EntityId),
197 Segment(SegmentError),
199 Io(std::io::Error),
201 Serialization(String),
203 Internal(String),
205}
206
207impl std::fmt::Display for StoreError {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 match self {
210 Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
211 Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
212 Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
213 Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
214 Self::Segment(e) => write!(f, "Segment error: {:?}", e),
215 Self::Io(e) => write!(f, "I/O error: {}", e),
216 Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
217 Self::Internal(msg) => write!(f, "Internal error: {}", msg),
218 }
219 }
220}
221
222impl std::error::Error for StoreError {}
223
224impl From<SegmentError> for StoreError {
225 fn from(e: SegmentError) -> Self {
226 Self::Segment(e)
227 }
228}
229
230impl From<std::io::Error> for StoreError {
231 fn from(e: std::io::Error) -> Self {
232 Self::Io(e)
233 }
234}
235
236#[derive(Debug, Clone, Default)]
242pub struct StoreStats {
243 pub collection_count: usize,
245 pub total_entities: usize,
247 pub total_memory_bytes: usize,
249 pub collections: HashMap<String, ManagerStats>,
251 pub cross_ref_count: usize,
253}
254
255impl StoreStats {
256 pub fn avg_entities_per_collection(&self) -> f64 {
258 if self.collection_count == 0 {
259 0.0
260 } else {
261 self.total_entities as f64 / self.collection_count as f64
262 }
263 }
264
265 pub fn memory_mb(&self) -> f64 {
267 self.total_memory_bytes as f64 / (1024.0 * 1024.0)
268 }
269}
270
271pub struct UnifiedStore {
308 config: UnifiedStoreConfig,
310 format_version: AtomicU32,
312 next_entity_id: AtomicU64,
314 collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
316 cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
318 reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
320 pager: Option<Arc<Pager>>,
322 db_path: Option<PathBuf>,
324 btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
331 context_index: ContextIndex,
333 entity_cache: EntityCache,
337 graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
340 paged_registry_dirty: AtomicBool,
342 commit: Option<Arc<StoreCommitCoordinator>>,
344 unindex_cross_refs_fast_path: AtomicU64,
349 pub(crate) replayed_turbo_inserts: parking_lot::Mutex<HashMap<String, Vec<(u64, Vec<f32>)>>>,
356}
357
358mod builder;
359mod commit;
360mod impl_entities;
361mod impl_file;
362mod impl_native_a;
363mod impl_native_b;
364mod impl_native_c;
365mod impl_pages;
366mod native_helpers;
367
368pub use self::builder::EntityBuilder;
369pub(crate) use self::commit::DeferredStoreWalActions;
370use self::commit::{StoreCommitCoordinator, StoreWalAction};
371use self::native_helpers::*;