1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5 pub fn is_replica_role(&self) -> bool {
6 matches!(
7 self.options.replication.role,
8 crate::replication::ReplicationRole::Replica { .. }
9 )
10 }
11
12 pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
13 if self.options.read_only || self.is_replica_role() {
14 return Ok(());
15 }
16
17 if self.options.mode == StorageMode::Persistent {
20 let Some(path) = self.path() else {
21 return Ok(());
22 };
23
24 let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
25 return Ok(());
26 };
27
28 self.prune_export_registry(&mut metadata.exports);
29 metadata.save_for_data_path(path)?;
30 }
31
32 let _ = self.sweep_ttl_expired_entities()?;
33
34 Ok(())
35 }
36
37 pub(crate) fn ttl_expired_entities_now(
38 &self,
39 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
40 self.ttl_expired_entities_at(current_unix_ms())
41 }
42
43 pub fn replica_allows_entity_at_read(
44 &self,
45 collection: &str,
46 entity: &crate::storage::UnifiedEntity,
47 ) -> bool {
48 if !self.is_replica_role() {
49 return true;
50 }
51 !self.entity_expired_at(collection, entity, current_unix_ms())
52 }
53
54 fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
55 let to_delete = self.ttl_expired_entities_now()?;
56
57 let mut deleted = 0usize;
58 for (collection, id) in to_delete {
59 match self.store.delete(&collection, id) {
60 Ok(true) => deleted = deleted.saturating_add(1),
61 Ok(false) => {}
62 Err(err) => {
63 return Err(format!(
64 "failed deleting expired entity {id} from collection '{collection}': {err:?}"
65 )
66 .into());
67 }
68 }
69 }
70
71 Ok(deleted)
72 }
73
74 fn ttl_expired_entities_at(
75 &self,
76 now_ms: u64,
77 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
78 let mut to_delete = Vec::<(String, EntityId)>::new();
79
80 let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
81 to_delete.append(&mut absolute_expired);
82
83 let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
84 to_delete.append(&mut relative_expired);
85
86 to_delete.sort_unstable();
87 to_delete.dedup();
88
89 Ok(to_delete)
90 }
91
92 fn entity_expired_at(
93 &self,
94 collection: &str,
95 entity: &crate::storage::UnifiedEntity,
96 now_ms: u64,
97 ) -> bool {
98 let Some(metadata) = self.store.get_metadata(collection, entity.id) else {
99 return false;
100 };
101
102 if metadata
103 .get("_expires_at")
104 .and_then(Self::metadata_u64)
105 .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
106 {
107 return true;
108 }
109
110 let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
111 let ttl_secs = if ttl_ms.is_none() {
112 metadata.get("_ttl").and_then(|value| {
113 Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
114 })
115 } else {
116 None
117 };
118
119 let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
120 return false;
121 };
122 entity
123 .created_at
124 .saturating_mul(1000)
125 .saturating_add(ttl_ms)
126 <= now_ms
127 }
128
129 fn expired_entities_by_expires_at(
130 &self,
131 now_ms: u64,
132 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
133 let mut ids = self.store.filter_metadata_all(&[(
134 "_expires_at".to_string(),
135 MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
136 )]);
137
138 if let Ok(now_ms_i64) = i64::try_from(now_ms) {
139 ids.extend(self.store.filter_metadata_all(&[(
140 "_expires_at".to_string(),
141 MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
142 )]));
143 }
144
145 let now_ms_f64 = now_ms as f64;
146 if now_ms_f64.is_finite() {
147 ids.extend(self.store.filter_metadata_all(&[(
148 "_expires_at".to_string(),
149 MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
150 )]));
151 }
152
153 Ok(ids)
154 }
155
156 fn expired_entities_by_ttl(
157 &self,
158 now_ms: u64,
159 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
160 let mut candidates = Vec::<(String, EntityId)>::new();
161
162 let ttl_ms_candidates = self
163 .store
164 .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
165 candidates.extend(ttl_ms_candidates);
166
167 let ttl_candidates = self
168 .store
169 .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
170 candidates.extend(ttl_candidates);
171
172 if candidates.is_empty() {
173 return Ok(Vec::new());
174 }
175
176 candidates.sort_unstable();
177 candidates.dedup();
178
179 let mut expired = Vec::<(String, EntityId)>::new();
180 for (collection, entity_id) in candidates {
181 let Some(entity) = self.store.get(&collection, entity_id) else {
182 continue;
183 };
184
185 let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
186 continue;
187 };
188
189 let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
190 let ttl_secs = if ttl_ms.is_none() {
191 metadata.get("_ttl").and_then(|value| {
192 Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
193 })
194 } else {
195 None
196 };
197
198 let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
199 continue;
200 };
201
202 let created_at_ms = entity.created_at.saturating_mul(1000);
203 let expiry_ms = created_at_ms.saturating_add(ttl_ms);
204 if expiry_ms <= now_ms {
205 expired.push((collection, entity_id));
206 }
207 }
208
209 Ok(expired)
210 }
211
212 fn metadata_u64(value: &MetadataValue) -> Option<u64> {
213 match value {
214 MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
215 MetadataValue::Timestamp(v) => Some(*v),
216 MetadataValue::Float(v) => {
217 if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
218 return None;
219 }
220 if *v > u64::MAX as f64 {
221 return None;
222 }
223 Some(v.trunc() as u64)
224 }
225 MetadataValue::String(v) => v.parse::<u64>().ok(),
226 _ => None,
227 }
228 }
229
230 pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
243 NodeBuilder::new(
244 self.store.clone(),
245 self.preprocessors.clone(),
246 collection,
247 label,
248 )
249 }
250
251 pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
263 EdgeBuilder::new(
264 self.store.clone(),
265 self.preprocessors.clone(),
266 collection,
267 label,
268 )
269 }
270
271 pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
282 VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
283 }
284
285 pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
296 RowBuilder::new(
297 self.store.clone(),
298 self.preprocessors.clone(),
299 table,
300 columns,
301 )
302 }
303
304 pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
318 DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
319 }
320
321 pub fn kv(
332 &self,
333 collection: impl Into<String>,
334 key: impl Into<String>,
335 value: Value,
336 ) -> KvBuilder {
337 KvBuilder::new(
338 self.store.clone(),
339 self.preprocessors.clone(),
340 collection,
341 key,
342 value,
343 )
344 }
345
346 pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
350 let manager = self.store.get_collection(collection)?;
351 let entities = manager.query_all(|_| true);
352 for entity in entities {
353 if let EntityData::Row(ref row) = entity.data {
354 if let Some(ref named) = row.named {
355 if let Some(Value::Text(ref k)) = named.get("key") {
356 if &**k == key {
357 let value = named.get("value").cloned().unwrap_or(Value::Null);
358 return Some((value, entity.id));
359 }
360 }
361 }
362 }
363 }
364 None
365 }
366
367 pub fn delete_kv(
369 &self,
370 collection: &str,
371 key: &str,
372 ) -> Result<bool, super::super::error::DevXError> {
373 let Some((_, id)) = self.get_kv(collection, key) else {
374 return Ok(false);
375 };
376 self.store
377 .delete(collection, id)
378 .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
379 Ok(true)
380 }
381
382 pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
383 if self.options.mode == StorageMode::Persistent {
384 if let Ok(metadata) = self.load_or_bootstrap_physical_metadata(false) {
386 crate::reserved_fields::validate_physical_metadata_contracts(&metadata)
387 .map_err(|err| err.to_string())?;
388 }
389 }
393 self.load_collection_ttl_defaults_from_metadata();
394 self.recover_queue_pending_state();
395 Ok(self)
396 }
397
398 pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
399 if self.options.mode != StorageMode::Persistent || self.options.read_only {
400 return Ok(());
401 }
402 let Some(path) = self.path() else {
403 return Ok(());
404 };
405
406 let previous = self.load_or_bootstrap_physical_metadata(false).ok();
407 let collection_roots = self.physical_collection_roots();
408 let indexes = self
409 .native_physical_state()
410 .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
411 .unwrap_or_else(|| self.physical_index_state());
412 let mut metadata = PhysicalMetadataFile::from_state(
413 self.options.clone(),
414 self.catalog_snapshot(),
415 collection_roots,
416 indexes,
417 previous.as_ref(),
418 );
419 metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
420 metadata.save_for_data_path(path)?;
421 self.persist_native_physical_header(&metadata)?;
422 Ok(())
423 }
424
425 fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
426 if self.options.mode != StorageMode::Persistent || self.options.read_only {
427 return Ok(false);
428 }
429 let Some(path) = self.path() else {
430 return Ok(false);
431 };
432 let Some(native_state) = self.native_physical_state() else {
433 return Ok(false);
434 };
435 if !Self::native_state_is_bootstrap_complete(&native_state) {
436 return Ok(false);
437 }
438
439 let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
440 let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
441 metadata.save_for_data_path(path)?;
442 self.persist_native_physical_header(&metadata)?;
443 Ok(true)
444 }
445
446 pub fn rebuild_physical_metadata_from_native_state(
449 &self,
450 ) -> Result<bool, Box<dyn std::error::Error>> {
451 self.bootstrap_metadata_from_native_state()
452 }
453
454 pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
455 let registry_complete = native_state.registry.as_ref().map(|registry| {
456 registry.collections_complete
457 && registry.indexes_complete
458 && registry.graph_projections_complete
459 && registry.analytics_jobs_complete
460 && registry.vector_artifacts_complete
461 });
462 let recovery_complete = native_state
463 .recovery
464 .as_ref()
465 .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
466 let catalog_complete = native_state
467 .catalog
468 .as_ref()
469 .map(|catalog| catalog.collections_complete);
470
471 registry_complete == Some(true)
472 && recovery_complete == Some(true)
473 && catalog_complete == Some(true)
474 }
475
476 pub(crate) fn load_or_bootstrap_physical_metadata(
477 &self,
478 persist_bootstrapped: bool,
479 ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
480 if self.options.mode != StorageMode::Persistent {
481 return Err("physical metadata requires persistent mode".into());
482 }
483 let Some(path) = self.path() else {
484 return Err("database path is not available".into());
485 };
486 let native_state = self.native_physical_state();
487
488 match PhysicalMetadataFile::load_for_data_path(path) {
489 Ok(metadata) => {
490 if let Some(native_state) = native_state.as_ref() {
491 let inspection = Self::inspect_native_header_against_metadata(
492 native_state.header,
493 &metadata,
494 );
495 if Self::repair_policy_for_inspection(&inspection)
496 == NativeHeaderRepairPolicy::NativeAheadOfMetadata
497 {
498 let bootstrapped =
499 self.metadata_from_native_state(native_state, Some(&metadata));
500 if persist_bootstrapped && !self.options.read_only {
501 bootstrapped.save_for_data_path(path)?;
502 self.persist_native_physical_header(&bootstrapped)?;
503 }
504 return Ok(bootstrapped);
505 }
506 }
507 Ok(metadata)
508 }
509 Err(err) => {
510 let Some(native_state) = native_state else {
511 return Err(err.into());
512 };
513 let is_fresh_empty = native_state.header.sequence == 0
529 && native_state.registry.is_none()
530 && native_state.catalog.is_none()
531 && native_state.recovery.is_none();
532 if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
533 return Err(err.into());
534 }
535 let metadata = self.metadata_from_native_state(&native_state, None);
536 if persist_bootstrapped && !self.options.read_only {
537 metadata.save_for_data_path(path)?;
538 self.persist_native_physical_header(&metadata)?;
539 }
540 Ok(metadata)
541 }
542 }
543 }
544
545 pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
546 let path = self.path()?;
547 let native_state = self.native_physical_state();
548 let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
549
550 match (metadata, native_state) {
551 (Some(metadata), Some(native_state)) => {
552 let inspection =
553 Self::inspect_native_header_against_metadata(native_state.header, &metadata);
554 match Self::repair_policy_for_inspection(&inspection) {
555 NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
556 NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
557 NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
558 }
559 }
560 (Some(_), None) => Some("sidecar_only"),
561 (None, Some(_)) => Some("sidecar_missing_native_available"),
562 (None, None) => Some("sidecar_missing_no_native"),
563 }
564 }
565
566 fn metadata_from_native_state(
567 &self,
568 native_state: &NativePhysicalState,
569 previous: Option<&PhysicalMetadataFile>,
570 ) -> PhysicalMetadataFile {
571 let now = SystemTime::now()
572 .duration_since(UNIX_EPOCH)
573 .unwrap_or_default()
574 .as_millis();
575 let catalog = self.catalog_snapshot();
576 let catalog_name = catalog.name.clone();
577 let catalog_total_entities = catalog.total_entities;
578 let catalog_total_collections = catalog.total_collections;
579 let indexes = self.physical_index_state();
580
581 let mut manifest =
582 crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
583 manifest.updated_at_unix_ms = now;
584
585 let manifest_events = native_state
586 .manifest
587 .as_ref()
588 .map(|summary| {
589 summary
590 .recent_events
591 .iter()
592 .map(|event| crate::physical::ManifestEvent {
593 collection: event.collection.clone(),
594 object_key: event.object_key.clone(),
595 kind: match event.kind.as_str() {
596 "insert" => crate::physical::ManifestEventKind::Insert,
597 "update" => crate::physical::ManifestEventKind::Update,
598 "remove" => crate::physical::ManifestEventKind::Remove,
599 _ => crate::physical::ManifestEventKind::Checkpoint,
600 },
601 block: crate::physical::BlockReference {
602 index: event.block_index,
603 checksum: event.block_checksum,
604 },
605 snapshot_min: event.snapshot_min,
606 snapshot_max: event.snapshot_max,
607 })
608 .collect()
609 })
610 .unwrap_or_default();
611
612 let graph_projections = native_state
613 .registry
614 .as_ref()
615 .and_then(|registry| {
616 registry.graph_projections_complete.then(|| {
617 registry
618 .graph_projections
619 .iter()
620 .map(|projection| crate::physical::PhysicalGraphProjection {
621 name: projection.name.clone(),
622 created_at_unix_ms: projection.created_at_unix_ms,
623 updated_at_unix_ms: projection.updated_at_unix_ms,
624 state: "materialized".to_string(),
625 source: projection.source.clone(),
626 node_labels: projection.node_labels.clone(),
627 node_types: projection.node_types.clone(),
628 edge_labels: projection.edge_labels.clone(),
629 last_materialized_sequence: projection.last_materialized_sequence,
630 })
631 .collect()
632 })
633 })
634 .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
635 .unwrap_or_default();
636
637 let analytics_jobs = native_state
638 .registry
639 .as_ref()
640 .and_then(|registry| {
641 registry.analytics_jobs_complete.then(|| {
642 registry
643 .analytics_jobs
644 .iter()
645 .map(|job| crate::physical::PhysicalAnalyticsJob {
646 id: job.id.clone(),
647 kind: job.kind.clone(),
648 state: job.state.clone(),
649 projection: job.projection.clone(),
650 created_at_unix_ms: job.created_at_unix_ms,
651 updated_at_unix_ms: job.updated_at_unix_ms,
652 last_run_sequence: job.last_run_sequence,
653 metadata: job.metadata.clone(),
654 })
655 .collect()
656 })
657 })
658 .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
659 .unwrap_or_default();
660
661 let exports = native_state
662 .recovery
663 .as_ref()
664 .and_then(|recovery| {
665 recovery.exports_complete.then(|| {
666 recovery
667 .exports
668 .iter()
669 .map(|export| crate::physical::ExportDescriptor {
670 name: export.name.clone(),
671 created_at_unix_ms: export.created_at_unix_ms,
672 snapshot_id: export.snapshot_id,
673 superblock_sequence: export.superblock_sequence,
674 data_path: self
675 .path()
676 .map(|path| {
677 crate::physical::PhysicalMetadataFile::export_data_path_for(
678 path,
679 &export.name,
680 )
681 .display()
682 .to_string()
683 })
684 .unwrap_or_default(),
685 metadata_path: self
686 .path()
687 .map(|path| {
688 let export_data_path =
689 crate::physical::PhysicalMetadataFile::export_data_path_for(
690 path,
691 &export.name,
692 );
693 crate::physical::PhysicalMetadataFile::metadata_path_for(
694 &export_data_path,
695 )
696 .display()
697 .to_string()
698 })
699 .unwrap_or_default(),
700 collection_count: export.collection_count as usize,
701 total_entities: export.total_entities as usize,
702 })
703 .collect()
704 })
705 })
706 .or_else(|| previous.map(|metadata| metadata.exports.clone()))
707 .unwrap_or_default();
708
709 let snapshots = native_state
710 .recovery
711 .as_ref()
712 .and_then(|recovery| {
713 recovery.snapshots_complete.then(|| {
714 recovery
715 .snapshots
716 .iter()
717 .map(|snapshot| crate::physical::SnapshotDescriptor {
718 snapshot_id: snapshot.snapshot_id,
719 created_at_unix_ms: snapshot.created_at_unix_ms,
720 superblock_sequence: snapshot.superblock_sequence,
721 collection_count: snapshot.collection_count as usize,
722 total_entities: snapshot.total_entities as usize,
723 })
724 .collect()
725 })
726 })
727 .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
728 .unwrap_or_else(|| {
729 vec![crate::physical::SnapshotDescriptor {
730 snapshot_id: native_state.header.sequence,
731 created_at_unix_ms: now,
732 superblock_sequence: native_state.header.sequence,
733 collection_count: catalog_total_collections,
734 total_entities: catalog_total_entities,
735 }]
736 });
737
738 let catalog_stats = native_state
739 .catalog
740 .as_ref()
741 .and_then(|native_catalog| {
742 native_catalog.collections_complete.then(|| {
743 native_catalog
744 .collections
745 .iter()
746 .map(|collection| {
747 (
748 collection.name.clone(),
749 crate::api::CollectionStats {
750 entities: collection.entities as usize,
751 cross_refs: collection.cross_refs as usize,
752 segments: collection.segments as usize,
753 },
754 )
755 })
756 .collect::<BTreeMap<_, _>>()
757 })
758 })
759 .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
760 .unwrap_or_else(|| catalog.stats_by_collection.clone());
761
762 PhysicalMetadataFile {
763 protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
764 generated_at_unix_ms: now,
765 last_loaded_from: Some("native_bootstrap".to_string()),
766 last_healed_at_unix_ms: Some(now),
767 manifest,
768 catalog: crate::api::CatalogSnapshot {
769 name: catalog_name,
770 total_entities: native_state
771 .catalog
772 .as_ref()
773 .map(|summary| summary.total_entities as usize)
774 .unwrap_or(catalog_total_entities),
775 total_collections: native_state
776 .catalog
777 .as_ref()
778 .map(|summary| summary.collection_count as usize)
779 .unwrap_or(catalog_total_collections),
780 stats_by_collection: catalog_stats,
781 updated_at: SystemTime::now(),
782 },
783 manifest_events,
784 collection_ttl_defaults_ms: previous
785 .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
786 .unwrap_or_default(),
787 collection_contracts: previous
788 .map(|metadata| metadata.collection_contracts.clone())
789 .unwrap_or_default(),
790 tree_definitions: previous
791 .map(|metadata| metadata.tree_definitions.clone())
792 .unwrap_or_default(),
793 indexes,
794 graph_projections,
795 analytics_jobs,
796 exports,
797 superblock: crate::physical::SuperblockHeader {
798 format_version: native_state.header.format_version,
799 sequence: native_state.header.sequence,
800 copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
801 manifest: crate::physical::ManifestPointers {
802 oldest: crate::physical::BlockReference {
803 index: native_state.header.manifest_oldest_root,
804 checksum: 0,
805 },
806 newest: crate::physical::BlockReference {
807 index: native_state.header.manifest_root,
808 checksum: 0,
809 },
810 },
811 free_set: crate::physical::BlockReference {
812 index: native_state.header.free_set_root,
813 checksum: 0,
814 },
815 collection_roots: native_state.collection_roots.clone(),
816 },
817 snapshots,
818 }
819 }
820
821 pub(crate) fn reconcile_index_states_with_native_artifacts(
822 &self,
823 mut indexes: Vec<PhysicalIndexState>,
824 ) -> Vec<PhysicalIndexState> {
825 let native_artifacts = self
826 .native_physical_state()
827 .and_then(|state| state.registry)
828 .map(|registry| registry.vector_artifacts)
829 .unwrap_or_default();
830 for index in &mut indexes {
831 let Some(collection) = index.collection.as_deref() else {
832 continue;
833 };
834 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
835 continue;
836 };
837 let Some(artifact) = native_artifacts.iter().find(|artifact| {
838 artifact.collection == collection && artifact.artifact_kind == artifact_kind
839 }) else {
840 index.build_state = "metadata-only".to_string();
841 continue;
842 };
843 index.entries = artifact.vector_count as usize;
844 index.estimated_memory_bytes = artifact.serialized_bytes;
845 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
846 index.artifact_kind = Some(artifact.artifact_kind.clone());
847 index.artifact_checksum = Some(artifact.checksum);
848 index.build_state = "artifact-published".to_string();
849 if let Some(pages) = self.native_vector_artifact_pages() {
850 index.artifact_root_page = pages
851 .into_iter()
852 .find(|page| {
853 page.collection == artifact.collection
854 && page.artifact_kind == artifact.artifact_kind
855 })
856 .map(|page| page.root_page);
857 }
858 }
859 indexes
860 }
861
862 pub(crate) fn warmup_native_vector_artifact_for_index(
863 &self,
864 index: &PhysicalIndexState,
865 ) -> Result<(), String> {
866 let Some(collection) = index.collection.as_deref() else {
867 return Ok(());
868 };
869 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
870 return Ok(());
871 };
872 self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
873 Ok(())
874 }
875
876 pub(crate) fn apply_runtime_native_artifact_to_index_state(
877 &self,
878 index: &mut PhysicalIndexState,
879 ) -> Result<(), String> {
880 let Some(collection) = index.collection.as_deref() else {
881 return Ok(());
882 };
883 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
884 return Ok(());
885 };
886 let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
887 index.entries = artifact
888 .graph_edge_count
889 .or(artifact.text_posting_count)
890 .unwrap_or(artifact.node_count) as usize;
891 index.estimated_memory_bytes = artifact.byte_len;
892 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
893 index.artifact_kind = Some(artifact.artifact_kind.clone());
894 index.artifact_checksum = Some(artifact.checksum);
895 index.build_state = "ready".to_string();
896 index.artifact_root_page = self
897 .native_vector_artifact_pages()
898 .and_then(|pages| {
899 pages.into_iter().find(|page| {
900 page.collection == artifact.collection
901 && page.artifact_kind == artifact.artifact_kind
902 })
903 })
904 .map(|page| page.root_page);
905 Ok(())
906 }
907
908 pub(crate) fn physical_index_state_from_native_state(
909 &self,
910 native_state: &NativePhysicalState,
911 previous: Option<&PhysicalMetadataFile>,
912 ) -> Vec<PhysicalIndexState> {
913 let mut fresh = self.physical_index_state();
914 let Some(registry) = native_state.registry.as_ref() else {
915 if let Some(previous) = previous {
916 for index in &previous.indexes {
917 if !fresh.iter().any(|candidate| candidate.name == index.name) {
918 fresh.push(index.clone());
919 }
920 }
921 }
922 return fresh;
923 };
924
925 for index in &mut fresh {
926 if let Some(native) = registry
927 .indexes
928 .iter()
929 .find(|candidate| candidate.name == index.name)
930 {
931 index.enabled = native.enabled;
932 index.last_refresh_ms = native.last_refresh_ms;
933 index.backend = native.backend.clone();
934 index.entries = native.entries as usize;
935 index.estimated_memory_bytes = native.estimated_memory_bytes;
936 if index.artifact_kind.is_none() {
937 index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
938 .map(|value| value.to_string());
939 }
940 if index.build_state == "catalog-derived" {
941 index.build_state = "registry-loaded".to_string();
942 }
943 }
944 }
945
946 for native in ®istry.indexes {
947 if fresh.iter().any(|index| index.name == native.name) {
948 continue;
949 }
950 let Some(kind) = Self::index_kind_from_str(&native.kind) else {
951 continue;
952 };
953 fresh.push(PhysicalIndexState {
954 name: native.name.clone(),
955 kind,
956 collection: native.collection.clone(),
957 enabled: native.enabled,
958 entries: native.entries as usize,
959 estimated_memory_bytes: native.estimated_memory_bytes,
960 last_refresh_ms: native.last_refresh_ms,
961 backend: native.backend.clone(),
962 artifact_kind: Self::native_artifact_kind_for_index(kind)
963 .map(|value| value.to_string()),
964 artifact_root_page: None,
965 artifact_checksum: None,
966 build_state: "registry-loaded".to_string(),
967 });
968 }
969
970 if !registry.indexes_complete {
971 if let Some(previous) = previous {
972 for index in &previous.indexes {
973 if !fresh.iter().any(|candidate| candidate.name == index.name) {
974 fresh.push(index.clone());
975 }
976 }
977 }
978 }
979
980 fresh
981 }
982
983 pub(crate) fn graph_projections_from_native_state(
984 &self,
985 native_state: &NativePhysicalState,
986 ) -> Vec<PhysicalGraphProjection> {
987 native_state
988 .registry
989 .as_ref()
990 .map(|registry| {
991 registry
992 .graph_projections
993 .iter()
994 .map(|projection| PhysicalGraphProjection {
995 name: projection.name.clone(),
996 created_at_unix_ms: projection.created_at_unix_ms,
997 updated_at_unix_ms: projection.updated_at_unix_ms,
998 state: "materialized".to_string(),
999 source: projection.source.clone(),
1000 node_labels: projection.node_labels.clone(),
1001 node_types: projection.node_types.clone(),
1002 edge_labels: projection.edge_labels.clone(),
1003 last_materialized_sequence: projection.last_materialized_sequence,
1004 })
1005 .collect()
1006 })
1007 .unwrap_or_default()
1008 }
1009
1010 pub(crate) fn analytics_jobs_from_native_state(
1011 &self,
1012 native_state: &NativePhysicalState,
1013 ) -> Vec<PhysicalAnalyticsJob> {
1014 native_state
1015 .registry
1016 .as_ref()
1017 .map(|registry| {
1018 registry
1019 .analytics_jobs
1020 .iter()
1021 .map(|job| PhysicalAnalyticsJob {
1022 id: job.id.clone(),
1023 kind: job.kind.clone(),
1024 state: job.state.clone(),
1025 projection: job.projection.clone(),
1026 created_at_unix_ms: job.created_at_unix_ms,
1027 updated_at_unix_ms: job.updated_at_unix_ms,
1028 last_run_sequence: job.last_run_sequence,
1029 metadata: job.metadata.clone(),
1030 })
1031 .collect()
1032 })
1033 .unwrap_or_default()
1034 }
1035
1036 pub(crate) fn exports_from_native_state(
1037 &self,
1038 native_state: &NativePhysicalState,
1039 ) -> Vec<ExportDescriptor> {
1040 native_state
1041 .recovery
1042 .as_ref()
1043 .map(|recovery| {
1044 recovery
1045 .exports
1046 .iter()
1047 .map(|export| ExportDescriptor {
1048 name: export.name.clone(),
1049 created_at_unix_ms: export.created_at_unix_ms,
1050 snapshot_id: export.snapshot_id,
1051 superblock_sequence: export.superblock_sequence,
1052 data_path: self
1053 .path()
1054 .map(|path| {
1055 crate::physical::PhysicalMetadataFile::export_data_path_for(
1056 path,
1057 &export.name,
1058 )
1059 .display()
1060 .to_string()
1061 })
1062 .unwrap_or_default(),
1063 metadata_path: self
1064 .path()
1065 .map(|path| {
1066 let export_data_path =
1067 crate::physical::PhysicalMetadataFile::export_data_path_for(
1068 path,
1069 &export.name,
1070 );
1071 crate::physical::PhysicalMetadataFile::metadata_path_for(
1072 &export_data_path,
1073 )
1074 .display()
1075 .to_string()
1076 })
1077 .unwrap_or_default(),
1078 collection_count: export.collection_count as usize,
1079 total_entities: export.total_entities as usize,
1080 })
1081 .collect()
1082 })
1083 .unwrap_or_default()
1084 }
1085
1086 pub(crate) fn snapshots_from_native_state(
1087 &self,
1088 native_state: &NativePhysicalState,
1089 ) -> Vec<crate::physical::SnapshotDescriptor> {
1090 native_state
1091 .recovery
1092 .as_ref()
1093 .map(|recovery| {
1094 recovery
1095 .snapshots
1096 .iter()
1097 .map(|snapshot| crate::physical::SnapshotDescriptor {
1098 snapshot_id: snapshot.snapshot_id,
1099 created_at_unix_ms: snapshot.created_at_unix_ms,
1100 superblock_sequence: snapshot.superblock_sequence,
1101 collection_count: snapshot.collection_count as usize,
1102 total_entities: snapshot.total_entities as usize,
1103 })
1104 .collect()
1105 })
1106 .unwrap_or_default()
1107 }
1108
1109 fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1110 match value {
1111 "btree" => Some(crate::index::IndexKind::BTree),
1112 "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1113 "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1114 "vector.turbo" => Some(crate::index::IndexKind::VectorTurbo),
1115 "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1116 "text.fulltext" => Some(crate::index::IndexKind::FullText),
1117 "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1118 "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1119 _ => None,
1120 }
1121 }
1122
1123 pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1124 match kind {
1125 IndexKind::VectorHnsw => Some("hnsw"),
1126 IndexKind::VectorInverted => Some("ivf"),
1127 IndexKind::VectorTurbo => Some("turboquant"),
1128 IndexKind::GraphAdjacency => Some("graph.adjacency"),
1129 IndexKind::FullText => Some("text.fulltext"),
1130 IndexKind::DocumentPathValue => Some("document.pathvalue"),
1131 _ => None,
1132 }
1133 }
1134
1135 fn index_is_declared(&self, name: &str) -> bool {
1136 self.physical_metadata()
1137 .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1138 .unwrap_or(false)
1139 }
1140
1141 pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1142 self.physical_metadata()
1143 .map(|metadata| {
1144 metadata
1145 .graph_projections
1146 .iter()
1147 .any(|projection| projection.name == name)
1148 })
1149 .unwrap_or(false)
1150 }
1151
1152 pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1153 self.operational_graph_projections()
1154 .into_iter()
1155 .any(|projection| projection.name == name && projection.state == "materialized")
1156 }
1157
1158 pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1159 match projection {
1160 Some(projection) => format!("{kind}::{projection}"),
1161 None => format!("{kind}::global"),
1162 }
1163 }
1164
1165 pub(crate) fn update_physical_metadata<T, F>(
1166 &self,
1167 mutator: F,
1168 ) -> Result<T, Box<dyn std::error::Error>>
1169 where
1170 F: FnOnce(&mut PhysicalMetadataFile) -> T,
1171 {
1172 if self.options.mode != StorageMode::Persistent {
1173 return Err("physical metadata operations require persistent mode".into());
1174 }
1175 if self.options.read_only {
1176 return Err("physical metadata operations are not allowed in read-only mode".into());
1177 }
1178 let Some(path) = self.path() else {
1179 return Err("database path is not available".into());
1180 };
1181
1182 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1183
1184 if metadata.indexes.is_empty() {
1185 metadata.indexes = self.physical_index_state();
1186 }
1187 metadata.superblock.collection_roots = self.physical_collection_roots();
1188
1189 let result = mutator(&mut metadata);
1190 metadata.save_for_data_path(path)?;
1191 self.persist_native_physical_header(&metadata)?;
1192 Ok(result)
1193 }
1194
1195 pub(crate) fn persist_native_physical_header(
1196 &self,
1197 metadata: &PhysicalMetadataFile,
1198 ) -> Result<(), Box<dyn std::error::Error>> {
1199 if !self.paged_mode {
1200 return Ok(());
1201 }
1202
1203 let existing_page = self
1204 .store
1205 .physical_file_header()
1206 .map(|header| header.collection_roots_page)
1207 .filter(|page| *page != 0);
1208 let existing_registry_page = self
1209 .store
1210 .physical_file_header()
1211 .map(|header| header.registry_page)
1212 .filter(|page| *page != 0);
1213 let existing_recovery_page = self
1214 .store
1215 .physical_file_header()
1216 .map(|header| header.recovery_page)
1217 .filter(|page| *page != 0);
1218 let existing_catalog_page = self
1219 .store
1220 .physical_file_header()
1221 .map(|header| header.catalog_page)
1222 .filter(|page| *page != 0);
1223 let existing_metadata_state_page = self
1224 .store
1225 .physical_file_header()
1226 .map(|header| header.metadata_state_page)
1227 .filter(|page| *page != 0);
1228 let existing_vector_artifact_page = self
1229 .store
1230 .physical_file_header()
1231 .map(|header| header.vector_artifact_page)
1232 .filter(|page| *page != 0);
1233 let existing_manifest_page = self
1234 .store
1235 .physical_file_header()
1236 .map(|header| header.manifest_page)
1237 .filter(|page| *page != 0);
1238 let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1239 metadata.superblock.sequence,
1240 &metadata.manifest_events,
1241 existing_manifest_page,
1242 )?;
1243 let (collection_roots_page, collection_roots_checksum) = self
1244 .store
1245 .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1246 let registry_summary = self.native_registry_summary_from_metadata(metadata);
1247 let (registry_page, registry_checksum) = self
1248 .store
1249 .write_native_registry_summary(®istry_summary, existing_registry_page)?;
1250 let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1251 let (recovery_page, recovery_checksum) = self
1252 .store
1253 .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1254 let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1255 let (catalog_page, catalog_checksum) = self
1256 .store
1257 .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1258 let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1259 let (metadata_state_page, metadata_state_checksum) =
1260 self.store.write_native_metadata_state_summary(
1261 &metadata_state_summary,
1262 existing_metadata_state_page,
1263 )?;
1264 let vector_artifact_records = self.native_vector_artifact_records();
1265 let vector_artifact_payloads = vector_artifact_records
1266 .iter()
1267 .map(|(summary, bytes)| {
1268 (
1269 summary.collection.clone(),
1270 summary.artifact_kind.clone(),
1271 bytes.clone(),
1272 )
1273 })
1274 .collect::<Vec<_>>();
1275 let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1276 self.store.write_native_vector_artifact_store(
1277 &vector_artifact_payloads,
1278 existing_vector_artifact_page,
1279 )?;
1280 let mut header = Self::native_header_from_metadata(metadata);
1281 header.manifest_page = manifest_page;
1282 header.manifest_checksum = manifest_checksum;
1283 header.collection_roots_page = collection_roots_page;
1284 header.collection_roots_checksum = collection_roots_checksum;
1285 header.registry_page = registry_page;
1286 header.registry_checksum = registry_checksum;
1287 header.recovery_page = recovery_page;
1288 header.recovery_checksum = recovery_checksum;
1289 header.catalog_page = catalog_page;
1290 header.catalog_checksum = catalog_checksum;
1291 header.metadata_state_page = metadata_state_page;
1292 header.metadata_state_checksum = metadata_state_checksum;
1293 header.vector_artifact_page = vector_artifact_page;
1294 header.vector_artifact_checksum = vector_artifact_checksum;
1295 self.store.update_physical_file_header(header)?;
1296 self.store.persist()?;
1297 Ok(())
1298 }
1299
1300 pub(crate) fn native_header_from_metadata(
1301 metadata: &PhysicalMetadataFile,
1302 ) -> PhysicalFileHeader {
1303 PhysicalFileHeader {
1304 format_version: metadata.superblock.format_version,
1305 sequence: metadata.superblock.sequence,
1306 manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1307 manifest_root: metadata.superblock.manifest.newest.index,
1308 free_set_root: metadata.superblock.free_set.index,
1309 manifest_page: 0,
1310 manifest_checksum: 0,
1311 collection_roots_page: 0,
1312 collection_roots_checksum: 0,
1313 collection_root_count: metadata.superblock.collection_roots.len() as u32,
1314 snapshot_count: metadata.snapshots.len() as u32,
1315 index_count: metadata.indexes.len() as u32,
1316 catalog_collection_count: metadata.catalog.total_collections as u32,
1317 catalog_total_entities: metadata.catalog.total_entities as u64,
1318 export_count: metadata.exports.len() as u32,
1319 graph_projection_count: metadata.graph_projections.len() as u32,
1320 analytics_job_count: metadata.analytics_jobs.len() as u32,
1321 manifest_event_count: metadata.manifest_events.len() as u32,
1322 registry_page: 0,
1323 registry_checksum: 0,
1324 recovery_page: 0,
1325 recovery_checksum: 0,
1326 catalog_page: 0,
1327 catalog_checksum: 0,
1328 metadata_state_page: 0,
1329 metadata_state_checksum: 0,
1330 vector_artifact_page: 0,
1331 vector_artifact_checksum: 0,
1332 }
1333 }
1334
1335 fn recover_queue_pending_state(&self) {
1336 const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1337
1338 let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1339 return;
1340 };
1341
1342 let pending_rows = manager.query_all(|entity| {
1343 entity.data.as_row().is_some_and(|row| {
1344 matches!(
1345 row.get_field("kind"),
1346 Some(crate::storage::schema::Value::Text(kind)) if &**kind == "queue_pending"
1347 )
1348 })
1349 });
1350
1351 for row in pending_rows {
1352 let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1353 }
1354 }
1355}
1356
1357fn current_unix_ms() -> u64 {
1358 SystemTime::now()
1359 .duration_since(UNIX_EPOCH)
1360 .unwrap_or_default()
1361 .as_millis()
1362 .min(u128::from(u64::MAX)) as u64
1363}