1use std::collections::{BTreeMap, HashMap};
22use std::fs::File;
23use std::io::{BufReader, BufWriter, Read, Write};
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
26use std::sync::Arc;
27
28use parking_lot::RwLock;
29
30use super::context_index::ContextIndex;
31use super::entity::{
32 CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
33 GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
34};
35use super::entity_cache::EntityCache;
36use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
37use super::metadata::{Metadata, MetadataFilter, MetadataValue};
38use super::segment::SegmentError;
39use crate::api::{DurabilityMode, GroupCommitOptions};
40use crate::physical::{ManifestEvent, ManifestEventKind};
41use crate::storage::engine::pager::PagerError;
42use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
43use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
44use crate::storage::schema::types::Value;
45
46const STORE_MAGIC: &[u8; 4] = b"RDST";
47const STORE_VERSION_V1: u32 = 1;
48const STORE_VERSION_V2: u32 = 2;
49const STORE_VERSION_V3: u32 = 3;
50const STORE_VERSION_V4: u32 = 4;
51const STORE_VERSION_V5: u32 = 5;
52const STORE_VERSION_V6: u32 = 6;
53const STORE_VERSION_V7: u32 = 7; const STORE_VERSION_V8: u32 = 8; const STORE_VERSION_V9: u32 = 9; const METADATA_MAGIC: &[u8; 4] = b"RDM2";
57const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
58const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
59const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
60const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
61const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
62const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
63const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
64const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
65const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
66
67#[derive(Debug, Clone)]
68pub struct NativeManifestEntrySummary {
69 pub collection: String,
70 pub object_key: String,
71 pub kind: String,
72 pub block_index: u64,
73 pub block_checksum: u128,
74 pub snapshot_min: u64,
75 pub snapshot_max: Option<u64>,
76}
77
78#[derive(Debug, Clone)]
79pub struct NativeManifestSummary {
80 pub sequence: u64,
81 pub event_count: u32,
82 pub events_complete: bool,
83 pub omitted_event_count: u32,
84 pub recent_events: Vec<NativeManifestEntrySummary>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct NativeRegistryIndexSummary {
89 pub name: String,
90 pub kind: String,
91 pub collection: Option<String>,
92 pub enabled: bool,
93 pub entries: u64,
94 pub estimated_memory_bytes: u64,
95 pub last_refresh_ms: Option<u128>,
96 pub backend: String,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct NativeRegistryProjectionSummary {
101 pub name: String,
102 pub source: String,
103 pub created_at_unix_ms: u128,
104 pub updated_at_unix_ms: u128,
105 pub node_labels: Vec<String>,
106 pub node_types: Vec<String>,
107 pub edge_labels: Vec<String>,
108 pub last_materialized_sequence: Option<u64>,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct NativeRegistryJobSummary {
113 pub id: String,
114 pub kind: String,
115 pub projection: Option<String>,
116 pub state: String,
117 pub created_at_unix_ms: u128,
118 pub updated_at_unix_ms: u128,
119 pub last_run_sequence: Option<u64>,
120 pub metadata: BTreeMap<String, String>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct NativeVectorArtifactSummary {
125 pub collection: String,
126 pub artifact_kind: String,
127 pub vector_count: u64,
128 pub dimension: u32,
129 pub max_layer: u32,
130 pub serialized_bytes: u64,
131 pub checksum: u64,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct NativeVectorArtifactPageSummary {
136 pub collection: String,
137 pub artifact_kind: String,
138 pub root_page: u32,
139 pub page_count: u32,
140 pub byte_len: u64,
141 pub checksum: u64,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct NativeRegistrySummary {
146 pub collection_count: u32,
147 pub index_count: u32,
148 pub graph_projection_count: u32,
149 pub analytics_job_count: u32,
150 pub vector_artifact_count: u32,
151 pub collections_complete: bool,
152 pub indexes_complete: bool,
153 pub graph_projections_complete: bool,
154 pub analytics_jobs_complete: bool,
155 pub vector_artifacts_complete: bool,
156 pub omitted_collection_count: u32,
157 pub omitted_index_count: u32,
158 pub omitted_graph_projection_count: u32,
159 pub omitted_analytics_job_count: u32,
160 pub omitted_vector_artifact_count: u32,
161 pub collection_names: Vec<String>,
162 pub indexes: Vec<NativeRegistryIndexSummary>,
163 pub graph_projections: Vec<NativeRegistryProjectionSummary>,
164 pub analytics_jobs: Vec<NativeRegistryJobSummary>,
165 pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct NativeSnapshotSummary {
170 pub snapshot_id: u64,
171 pub created_at_unix_ms: u128,
172 pub superblock_sequence: u64,
173 pub collection_count: u32,
174 pub total_entities: u64,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct NativeExportSummary {
179 pub name: String,
180 pub created_at_unix_ms: u128,
181 pub snapshot_id: Option<u64>,
182 pub superblock_sequence: u64,
183 pub collection_count: u32,
184 pub total_entities: u64,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct NativeRecoverySummary {
189 pub snapshot_count: u32,
190 pub export_count: u32,
191 pub snapshots_complete: bool,
192 pub exports_complete: bool,
193 pub omitted_snapshot_count: u32,
194 pub omitted_export_count: u32,
195 pub snapshots: Vec<NativeSnapshotSummary>,
196 pub exports: Vec<NativeExportSummary>,
197}
198
199#[derive(Debug, Clone, PartialEq, Eq)]
200pub struct NativeCatalogCollectionSummary {
201 pub name: String,
202 pub entities: u64,
203 pub cross_refs: u64,
204 pub segments: u32,
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct NativeCatalogSummary {
209 pub collection_count: u32,
210 pub total_entities: u64,
211 pub collections_complete: bool,
212 pub omitted_collection_count: u32,
213 pub collections: Vec<NativeCatalogCollectionSummary>,
214}
215
216#[derive(Debug, Clone, Default, PartialEq, Eq)]
217pub struct MvccVacuumStats {
218 pub scanned_versions: u64,
219 pub retained_versions: u64,
220 pub reclaimed_versions: u64,
221 pub retained_history_versions: u64,
222 pub reclaimed_history_versions: u64,
223 pub retained_tombstones: u64,
224 pub reclaimed_tombstones: u64,
225}
226
227impl MvccVacuumStats {
228 pub fn add(&mut self, other: &Self) {
229 self.scanned_versions += other.scanned_versions;
230 self.retained_versions += other.retained_versions;
231 self.reclaimed_versions += other.reclaimed_versions;
232 self.retained_history_versions += other.retained_history_versions;
233 self.reclaimed_history_versions += other.reclaimed_history_versions;
234 self.retained_tombstones += other.retained_tombstones;
235 self.reclaimed_tombstones += other.reclaimed_tombstones;
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct NativeMetadataStateSummary {
241 pub protocol_version: String,
242 pub generated_at_unix_ms: u128,
243 pub last_loaded_from: Option<String>,
244 pub last_healed_at_unix_ms: Option<u128>,
245}
246
247#[derive(Debug, Clone)]
248pub struct NativePhysicalState {
249 pub header: PhysicalFileHeader,
250 pub collection_roots: BTreeMap<String, u64>,
251 pub manifest: Option<NativeManifestSummary>,
252 pub registry: Option<NativeRegistrySummary>,
253 pub recovery: Option<NativeRecoverySummary>,
254 pub catalog: Option<NativeCatalogSummary>,
255 pub metadata_state: Option<NativeMetadataStateSummary>,
256 pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
257}
258
259#[derive(Debug, Clone)]
265pub struct UnifiedStoreConfig {
266 pub manager_config: ManagerConfig,
268 pub auto_index_refs: bool,
270 pub auto_index_id: bool,
279 pub max_cross_refs: usize,
281 pub enable_wal: bool,
283 pub durability_mode: DurabilityMode,
285 pub group_commit: GroupCommitOptions,
287 pub data_dir: Option<std::path::PathBuf>,
289}
290
291impl Default for UnifiedStoreConfig {
292 fn default() -> Self {
293 Self {
294 manager_config: ManagerConfig::default(),
295 auto_index_refs: true,
296 auto_index_id: true,
297 max_cross_refs: 1000,
298 enable_wal: false,
299 durability_mode: DurabilityMode::WalDurableGrouped,
302 group_commit: GroupCommitOptions::default(),
303 data_dir: None,
304 }
305 }
306}
307
308impl UnifiedStoreConfig {
309 pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
311 self.data_dir = Some(path.into());
312 self
313 }
314
315 pub fn with_wal(mut self) -> Self {
317 self.enable_wal = true;
318 self
319 }
320
321 pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
322 self.durability_mode = mode;
323 self
324 }
325
326 pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
327 self.group_commit = options;
328 self
329 }
330
331 pub fn with_max_refs(mut self, max: usize) -> Self {
333 self.max_cross_refs = max;
334 self
335 }
336}
337
338#[derive(Debug)]
344pub enum StoreError {
345 CollectionExists(String),
347 CollectionNotFound(String),
349 EntityNotFound(EntityId),
351 TooManyRefs(EntityId),
353 Segment(SegmentError),
355 Io(std::io::Error),
357 Serialization(String),
359 Internal(String),
361}
362
363impl std::fmt::Display for StoreError {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 match self {
366 Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
367 Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
368 Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
369 Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
370 Self::Segment(e) => write!(f, "Segment error: {:?}", e),
371 Self::Io(e) => write!(f, "I/O error: {}", e),
372 Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
373 Self::Internal(msg) => write!(f, "Internal error: {}", msg),
374 }
375 }
376}
377
378impl std::error::Error for StoreError {}
379
380impl From<SegmentError> for StoreError {
381 fn from(e: SegmentError) -> Self {
382 Self::Segment(e)
383 }
384}
385
386impl From<std::io::Error> for StoreError {
387 fn from(e: std::io::Error) -> Self {
388 Self::Io(e)
389 }
390}
391
392#[derive(Debug, Clone, Default)]
398pub struct StoreStats {
399 pub collection_count: usize,
401 pub total_entities: usize,
403 pub total_memory_bytes: usize,
405 pub collections: HashMap<String, ManagerStats>,
407 pub cross_ref_count: usize,
409}
410
411impl StoreStats {
412 pub fn avg_entities_per_collection(&self) -> f64 {
414 if self.collection_count == 0 {
415 0.0
416 } else {
417 self.total_entities as f64 / self.collection_count as f64
418 }
419 }
420
421 pub fn memory_mb(&self) -> f64 {
423 self.total_memory_bytes as f64 / (1024.0 * 1024.0)
424 }
425}
426
427pub struct UnifiedStore {
464 config: UnifiedStoreConfig,
466 format_version: AtomicU32,
468 next_entity_id: AtomicU64,
470 collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
472 cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
474 reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
476 pager: Option<Arc<Pager>>,
478 db_path: Option<PathBuf>,
480 btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
487 context_index: ContextIndex,
489 entity_cache: EntityCache,
493 graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
496 paged_registry_dirty: AtomicBool,
498 commit: Option<Arc<StoreCommitCoordinator>>,
500 unindex_cross_refs_fast_path: AtomicU64,
505}
506
507mod builder;
508mod commit;
509mod impl_entities;
510mod impl_file;
511mod impl_native_a;
512mod impl_native_b;
513mod impl_native_c;
514mod impl_pages;
515mod native_helpers;
516
517pub use self::builder::EntityBuilder;
518pub(crate) use self::commit::DeferredStoreWalActions;
519use self::commit::{StoreCommitCoordinator, StoreWalAction};
520use self::native_helpers::*;