1use std::collections::{BTreeMap, HashMap};
22use std::fs::File;
23use std::io::{BufReader, BufWriter, Read, Write};
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
26use std::sync::Arc;
27
28use parking_lot::RwLock;
29
30use super::context_index::ContextIndex;
31use super::entity::{
32 CrossRef, EdgeData, EmbeddingSlot, EntityData, EntityId, EntityKind, GraphEdgeKind,
33 GraphNodeKind, NodeData, RefType, RowData, TimeSeriesPointKind, UnifiedEntity, VectorData,
34};
35use super::entity_cache::EntityCache;
36use super::manager::{ManagerConfig, ManagerStats, SegmentManager};
37use super::metadata::{Metadata, MetadataFilter, MetadataValue};
38use super::segment::SegmentError;
39use crate::api::{DurabilityMode, GroupCommitOptions};
40use crate::physical::{ManifestEvent, ManifestEventKind};
41use crate::storage::engine::pager::PagerError;
42use crate::storage::engine::{BTree, BTreeError, Pager, PagerConfig, PhysicalFileHeader};
43use crate::storage::primitives::encoding::{read_varu32, read_varu64, write_varu32, write_varu64};
44use crate::storage::schema::types::Value;
45
46const STORE_MAGIC: &[u8; 4] = b"RDST";
47const STORE_VERSION_V1: u32 = 1;
48const STORE_VERSION_V2: u32 = 2;
49const STORE_VERSION_V3: u32 = 3;
50const STORE_VERSION_V4: u32 = 4;
51const STORE_VERSION_V5: u32 = 5;
52const STORE_VERSION_V6: u32 = 6;
53const STORE_VERSION_V7: u32 = 7; const METADATA_MAGIC: &[u8; 4] = b"RDM2";
55const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
56const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
57const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
58const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
59const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
60const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
61const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
62const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
63const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
64
65#[derive(Debug, Clone)]
66pub struct NativeManifestEntrySummary {
67 pub collection: String,
68 pub object_key: String,
69 pub kind: String,
70 pub block_index: u64,
71 pub block_checksum: u128,
72 pub snapshot_min: u64,
73 pub snapshot_max: Option<u64>,
74}
75
76#[derive(Debug, Clone)]
77pub struct NativeManifestSummary {
78 pub sequence: u64,
79 pub event_count: u32,
80 pub events_complete: bool,
81 pub omitted_event_count: u32,
82 pub recent_events: Vec<NativeManifestEntrySummary>,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct NativeRegistryIndexSummary {
87 pub name: String,
88 pub kind: String,
89 pub collection: Option<String>,
90 pub enabled: bool,
91 pub entries: u64,
92 pub estimated_memory_bytes: u64,
93 pub last_refresh_ms: Option<u128>,
94 pub backend: String,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct NativeRegistryProjectionSummary {
99 pub name: String,
100 pub source: String,
101 pub created_at_unix_ms: u128,
102 pub updated_at_unix_ms: u128,
103 pub node_labels: Vec<String>,
104 pub node_types: Vec<String>,
105 pub edge_labels: Vec<String>,
106 pub last_materialized_sequence: Option<u64>,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct NativeRegistryJobSummary {
111 pub id: String,
112 pub kind: String,
113 pub projection: Option<String>,
114 pub state: String,
115 pub created_at_unix_ms: u128,
116 pub updated_at_unix_ms: u128,
117 pub last_run_sequence: Option<u64>,
118 pub metadata: BTreeMap<String, String>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct NativeVectorArtifactSummary {
123 pub collection: String,
124 pub artifact_kind: String,
125 pub vector_count: u64,
126 pub dimension: u32,
127 pub max_layer: u32,
128 pub serialized_bytes: u64,
129 pub checksum: u64,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct NativeVectorArtifactPageSummary {
134 pub collection: String,
135 pub artifact_kind: String,
136 pub root_page: u32,
137 pub page_count: u32,
138 pub byte_len: u64,
139 pub checksum: u64,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct NativeRegistrySummary {
144 pub collection_count: u32,
145 pub index_count: u32,
146 pub graph_projection_count: u32,
147 pub analytics_job_count: u32,
148 pub vector_artifact_count: u32,
149 pub collections_complete: bool,
150 pub indexes_complete: bool,
151 pub graph_projections_complete: bool,
152 pub analytics_jobs_complete: bool,
153 pub vector_artifacts_complete: bool,
154 pub omitted_collection_count: u32,
155 pub omitted_index_count: u32,
156 pub omitted_graph_projection_count: u32,
157 pub omitted_analytics_job_count: u32,
158 pub omitted_vector_artifact_count: u32,
159 pub collection_names: Vec<String>,
160 pub indexes: Vec<NativeRegistryIndexSummary>,
161 pub graph_projections: Vec<NativeRegistryProjectionSummary>,
162 pub analytics_jobs: Vec<NativeRegistryJobSummary>,
163 pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct NativeSnapshotSummary {
168 pub snapshot_id: u64,
169 pub created_at_unix_ms: u128,
170 pub superblock_sequence: u64,
171 pub collection_count: u32,
172 pub total_entities: u64,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct NativeExportSummary {
177 pub name: String,
178 pub created_at_unix_ms: u128,
179 pub snapshot_id: Option<u64>,
180 pub superblock_sequence: u64,
181 pub collection_count: u32,
182 pub total_entities: u64,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct NativeRecoverySummary {
187 pub snapshot_count: u32,
188 pub export_count: u32,
189 pub snapshots_complete: bool,
190 pub exports_complete: bool,
191 pub omitted_snapshot_count: u32,
192 pub omitted_export_count: u32,
193 pub snapshots: Vec<NativeSnapshotSummary>,
194 pub exports: Vec<NativeExportSummary>,
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct NativeCatalogCollectionSummary {
199 pub name: String,
200 pub entities: u64,
201 pub cross_refs: u64,
202 pub segments: u32,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct NativeCatalogSummary {
207 pub collection_count: u32,
208 pub total_entities: u64,
209 pub collections_complete: bool,
210 pub omitted_collection_count: u32,
211 pub collections: Vec<NativeCatalogCollectionSummary>,
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct NativeMetadataStateSummary {
216 pub protocol_version: String,
217 pub generated_at_unix_ms: u128,
218 pub last_loaded_from: Option<String>,
219 pub last_healed_at_unix_ms: Option<u128>,
220}
221
222#[derive(Debug, Clone)]
223pub struct NativePhysicalState {
224 pub header: PhysicalFileHeader,
225 pub collection_roots: BTreeMap<String, u64>,
226 pub manifest: Option<NativeManifestSummary>,
227 pub registry: Option<NativeRegistrySummary>,
228 pub recovery: Option<NativeRecoverySummary>,
229 pub catalog: Option<NativeCatalogSummary>,
230 pub metadata_state: Option<NativeMetadataStateSummary>,
231 pub vector_artifact_pages: Option<Vec<NativeVectorArtifactPageSummary>>,
232}
233
234#[derive(Debug, Clone)]
240pub struct UnifiedStoreConfig {
241 pub manager_config: ManagerConfig,
243 pub auto_index_refs: bool,
245 pub auto_index_id: bool,
254 pub max_cross_refs: usize,
256 pub enable_wal: bool,
258 pub durability_mode: DurabilityMode,
260 pub group_commit: GroupCommitOptions,
262 pub data_dir: Option<std::path::PathBuf>,
264}
265
266impl Default for UnifiedStoreConfig {
267 fn default() -> Self {
268 Self {
269 manager_config: ManagerConfig::default(),
270 auto_index_refs: true,
271 auto_index_id: true,
272 max_cross_refs: 1000,
273 enable_wal: false,
274 durability_mode: DurabilityMode::WalDurableGrouped,
277 group_commit: GroupCommitOptions::default(),
278 data_dir: None,
279 }
280 }
281}
282
283impl UnifiedStoreConfig {
284 pub fn with_data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
286 self.data_dir = Some(path.into());
287 self
288 }
289
290 pub fn with_wal(mut self) -> Self {
292 self.enable_wal = true;
293 self
294 }
295
296 pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
297 self.durability_mode = mode;
298 self
299 }
300
301 pub fn with_group_commit(mut self, options: GroupCommitOptions) -> Self {
302 self.group_commit = options;
303 self
304 }
305
306 pub fn with_max_refs(mut self, max: usize) -> Self {
308 self.max_cross_refs = max;
309 self
310 }
311}
312
313#[derive(Debug)]
319pub enum StoreError {
320 CollectionExists(String),
322 CollectionNotFound(String),
324 EntityNotFound(EntityId),
326 TooManyRefs(EntityId),
328 Segment(SegmentError),
330 Io(std::io::Error),
332 Serialization(String),
334 Internal(String),
336}
337
338impl std::fmt::Display for StoreError {
339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340 match self {
341 Self::CollectionExists(name) => write!(f, "Collection already exists: {}", name),
342 Self::CollectionNotFound(name) => write!(f, "Collection not found: {}", name),
343 Self::EntityNotFound(id) => write!(f, "Entity not found: {}", id),
344 Self::TooManyRefs(id) => write!(f, "Too many cross-references for entity: {}", id),
345 Self::Segment(e) => write!(f, "Segment error: {:?}", e),
346 Self::Io(e) => write!(f, "I/O error: {}", e),
347 Self::Serialization(msg) => write!(f, "Serialization error: {}", msg),
348 Self::Internal(msg) => write!(f, "Internal error: {}", msg),
349 }
350 }
351}
352
353impl std::error::Error for StoreError {}
354
355impl From<SegmentError> for StoreError {
356 fn from(e: SegmentError) -> Self {
357 Self::Segment(e)
358 }
359}
360
361impl From<std::io::Error> for StoreError {
362 fn from(e: std::io::Error) -> Self {
363 Self::Io(e)
364 }
365}
366
367#[derive(Debug, Clone, Default)]
373pub struct StoreStats {
374 pub collection_count: usize,
376 pub total_entities: usize,
378 pub total_memory_bytes: usize,
380 pub collections: HashMap<String, ManagerStats>,
382 pub cross_ref_count: usize,
384}
385
386impl StoreStats {
387 pub fn avg_entities_per_collection(&self) -> f64 {
389 if self.collection_count == 0 {
390 0.0
391 } else {
392 self.total_entities as f64 / self.collection_count as f64
393 }
394 }
395
396 pub fn memory_mb(&self) -> f64 {
398 self.total_memory_bytes as f64 / (1024.0 * 1024.0)
399 }
400}
401
402pub struct UnifiedStore {
439 config: UnifiedStoreConfig,
441 format_version: AtomicU32,
443 next_entity_id: AtomicU64,
445 collections: RwLock<HashMap<String, Arc<SegmentManager>>>,
447 cross_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
449 reverse_refs: RwLock<HashMap<EntityId, Vec<(EntityId, RefType, String)>>>,
451 pager: Option<Arc<Pager>>,
453 db_path: Option<PathBuf>,
455 btree_indices: RwLock<HashMap<String, Arc<BTree>>>,
462 context_index: ContextIndex,
464 entity_cache: EntityCache,
468 graph_label_index: RwLock<HashMap<(String, String), Vec<EntityId>>>,
471 paged_registry_dirty: AtomicBool,
473 commit: Option<Arc<StoreCommitCoordinator>>,
475 unindex_cross_refs_fast_path: AtomicU64,
480}
481
482mod builder;
483mod commit;
484mod impl_entities;
485mod impl_file;
486mod impl_native_a;
487mod impl_native_b;
488mod impl_native_c;
489mod impl_pages;
490mod native_helpers;
491
492pub use self::builder::EntityBuilder;
493use self::commit::{StoreCommitCoordinator, StoreWalAction};
494use self::native_helpers::*;