1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5 pub fn is_replica_role(&self) -> bool {
6 matches!(
7 self.options.replication.role,
8 crate::replication::ReplicationRole::Replica { .. }
9 )
10 }
11
12 pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
13 if self.options.read_only || self.is_replica_role() {
14 return Ok(());
15 }
16
17 if self.options.mode == StorageMode::Persistent {
20 let Some(path) = self.path() else {
21 return Ok(());
22 };
23
24 let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
25 return Ok(());
26 };
27
28 self.prune_export_registry(&mut metadata.exports);
29 metadata.save_for_data_path(path)?;
30 }
31
32 let _ = self.sweep_ttl_expired_entities()?;
33
34 Ok(())
35 }
36
37 pub(crate) fn ttl_expired_entities_now(
38 &self,
39 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
40 self.ttl_expired_entities_at(current_unix_ms())
41 }
42
43 pub fn replica_allows_entity_at_read(
44 &self,
45 collection: &str,
46 entity: &crate::storage::UnifiedEntity,
47 ) -> bool {
48 if !self.is_replica_role() {
49 return true;
50 }
51 !self.entity_expired_at(collection, entity, current_unix_ms())
52 }
53
54 fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
55 let to_delete = self.ttl_expired_entities_now()?;
56
57 let mut deleted = 0usize;
58 for (collection, id) in to_delete {
59 match self.store.delete(&collection, id) {
60 Ok(true) => deleted = deleted.saturating_add(1),
61 Ok(false) => {}
62 Err(err) => {
63 return Err(format!(
64 "failed deleting expired entity {id} from collection '{collection}': {err:?}"
65 )
66 .into());
67 }
68 }
69 }
70
71 Ok(deleted)
72 }
73
74 fn ttl_expired_entities_at(
75 &self,
76 now_ms: u64,
77 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
78 let mut to_delete = Vec::<(String, EntityId)>::new();
79
80 let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
81 to_delete.append(&mut absolute_expired);
82
83 let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
84 to_delete.append(&mut relative_expired);
85
86 to_delete.sort_unstable();
87 to_delete.dedup();
88
89 Ok(to_delete)
90 }
91
92 fn entity_expired_at(
93 &self,
94 collection: &str,
95 entity: &crate::storage::UnifiedEntity,
96 now_ms: u64,
97 ) -> bool {
98 let Some(metadata) = self.store.get_metadata(collection, entity.id) else {
99 return false;
100 };
101
102 if metadata
103 .get("_expires_at")
104 .and_then(Self::metadata_u64)
105 .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
106 {
107 return true;
108 }
109
110 let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
111 let ttl_secs = if ttl_ms.is_none() {
112 metadata.get("_ttl").and_then(|value| {
113 Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
114 })
115 } else {
116 None
117 };
118
119 let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
120 return false;
121 };
122 entity
123 .created_at
124 .saturating_mul(1000)
125 .saturating_add(ttl_ms)
126 <= now_ms
127 }
128
129 fn expired_entities_by_expires_at(
130 &self,
131 now_ms: u64,
132 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
133 let mut ids = self.store.filter_metadata_all(&[(
134 "_expires_at".to_string(),
135 MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
136 )]);
137
138 if let Ok(now_ms_i64) = i64::try_from(now_ms) {
139 ids.extend(self.store.filter_metadata_all(&[(
140 "_expires_at".to_string(),
141 MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
142 )]));
143 }
144
145 let now_ms_f64 = now_ms as f64;
146 if now_ms_f64.is_finite() {
147 ids.extend(self.store.filter_metadata_all(&[(
148 "_expires_at".to_string(),
149 MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
150 )]));
151 }
152
153 Ok(ids)
154 }
155
156 fn expired_entities_by_ttl(
157 &self,
158 now_ms: u64,
159 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
160 let mut candidates = Vec::<(String, EntityId)>::new();
161
162 let ttl_ms_candidates = self
163 .store
164 .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
165 candidates.extend(ttl_ms_candidates);
166
167 let ttl_candidates = self
168 .store
169 .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
170 candidates.extend(ttl_candidates);
171
172 if candidates.is_empty() {
173 return Ok(Vec::new());
174 }
175
176 candidates.sort_unstable();
177 candidates.dedup();
178
179 let mut expired = Vec::<(String, EntityId)>::new();
180 for (collection, entity_id) in candidates {
181 let Some(entity) = self.store.get(&collection, entity_id) else {
182 continue;
183 };
184
185 let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
186 continue;
187 };
188
189 let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
190 let ttl_secs = if ttl_ms.is_none() {
191 metadata.get("_ttl").and_then(|value| {
192 Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
193 })
194 } else {
195 None
196 };
197
198 let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
199 continue;
200 };
201
202 let created_at_ms = entity.created_at.saturating_mul(1000);
203 let expiry_ms = created_at_ms.saturating_add(ttl_ms);
204 if expiry_ms <= now_ms {
205 expired.push((collection, entity_id));
206 }
207 }
208
209 Ok(expired)
210 }
211
212 fn metadata_u64(value: &MetadataValue) -> Option<u64> {
213 match value {
214 MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
215 MetadataValue::Timestamp(v) => Some(*v),
216 MetadataValue::Float(v) => {
217 if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
218 return None;
219 }
220 if *v > u64::MAX as f64 {
221 return None;
222 }
223 Some(v.trunc() as u64)
224 }
225 MetadataValue::String(v) => v.parse::<u64>().ok(),
226 _ => None,
227 }
228 }
229
230 pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
243 NodeBuilder::new(
244 self.store.clone(),
245 self.preprocessors.clone(),
246 collection,
247 label,
248 )
249 }
250
251 pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
263 EdgeBuilder::new(
264 self.store.clone(),
265 self.preprocessors.clone(),
266 collection,
267 label,
268 )
269 }
270
271 pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
282 VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
283 }
284
285 pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
296 RowBuilder::new(
297 self.store.clone(),
298 self.preprocessors.clone(),
299 table,
300 columns,
301 )
302 }
303
304 pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
318 DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
319 }
320
321 pub fn kv(
332 &self,
333 collection: impl Into<String>,
334 key: impl Into<String>,
335 value: Value,
336 ) -> KvBuilder {
337 KvBuilder::new(
338 self.store.clone(),
339 self.preprocessors.clone(),
340 collection,
341 key,
342 value,
343 )
344 }
345
346 pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
350 let manager = self.store.get_collection(collection)?;
351 let entities = manager.query_all(|_| true);
352 for entity in entities {
353 if let EntityData::Row(ref row) = entity.data {
354 if let Some(ref named) = row.named {
355 if let Some(Value::Text(ref k)) = named.get("key") {
356 if &**k == key {
357 let value = named.get("value").cloned().unwrap_or(Value::Null);
358 return Some((value, entity.id));
359 }
360 }
361 }
362 }
363 }
364 None
365 }
366
367 pub fn delete_kv(
369 &self,
370 collection: &str,
371 key: &str,
372 ) -> Result<bool, super::super::error::DevXError> {
373 let Some((_, id)) = self.get_kv(collection, key) else {
374 return Ok(false);
375 };
376 self.store
377 .delete(collection, id)
378 .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
379 Ok(true)
380 }
381
382 pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
383 if self.options.mode == StorageMode::Persistent {
384 if let Ok(metadata) = self.load_or_bootstrap_physical_metadata(false) {
386 crate::reserved_fields::validate_physical_metadata_contracts(&metadata)
387 .map_err(|err| err.to_string())?;
388 }
389 }
393 self.load_collection_ttl_defaults_from_metadata();
394 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
401 && self.options.storage_profile.packaging
402 == crate::storage::StoragePackaging::SingleFile
403 {
404 self.seed_contract_cache_from_store_aux();
405 }
406 self.load_hypertables_from_metadata();
410 self.recover_queue_pending_state();
411 Ok(self)
412 }
413
414 pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
415 if self.options.mode != StorageMode::Persistent || self.options.read_only {
416 return Ok(());
417 }
418 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
419 && self.options.storage_profile.packaging
420 == crate::storage::StoragePackaging::SingleFile
421 {
422 return Ok(());
423 }
424 let Some(path) = self.path() else {
425 return Ok(());
426 };
427
428 let previous = self.load_or_bootstrap_physical_metadata(false).ok();
429 let collection_roots = self.physical_collection_roots();
430 let indexes = self
431 .native_physical_state()
432 .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
433 .unwrap_or_else(|| self.physical_index_state());
434 let mut metadata = PhysicalMetadataFile::from_state(
435 self.options.clone(),
436 self.catalog_snapshot(),
437 collection_roots,
438 indexes,
439 previous.as_ref(),
440 );
441 metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
442 metadata.hypertables = self.hypertable_registry_snapshot();
446 metadata.save_for_data_path(path)?;
447 self.persist_native_physical_header(&metadata)?;
448 Ok(())
449 }
450
451 fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
452 if self.options.mode != StorageMode::Persistent || self.options.read_only {
453 return Ok(false);
454 }
455 let Some(path) = self.path() else {
456 return Ok(false);
457 };
458 let Some(native_state) = self.native_physical_state() else {
459 return Ok(false);
460 };
461 if !Self::native_state_is_bootstrap_complete(&native_state) {
462 return Ok(false);
463 }
464
465 let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
466 let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
467 metadata.save_for_data_path(path)?;
468 self.persist_native_physical_header(&metadata)?;
469 Ok(true)
470 }
471
472 pub fn rebuild_physical_metadata_from_native_state(
475 &self,
476 ) -> Result<bool, Box<dyn std::error::Error>> {
477 self.bootstrap_metadata_from_native_state()
478 }
479
480 pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
481 let registry_complete = native_state.registry.as_ref().map(|registry| {
482 registry.collections_complete
483 && registry.indexes_complete
484 && registry.graph_projections_complete
485 && registry.analytics_jobs_complete
486 && registry.vector_artifacts_complete
487 });
488 let recovery_complete = native_state
489 .recovery
490 .as_ref()
491 .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
492 let catalog_complete = native_state
493 .catalog
494 .as_ref()
495 .map(|catalog| catalog.collections_complete);
496
497 registry_complete == Some(true)
498 && recovery_complete == Some(true)
499 && catalog_complete == Some(true)
500 }
501
502 pub(crate) fn load_or_bootstrap_physical_metadata(
503 &self,
504 persist_bootstrapped: bool,
505 ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
506 if self.options.mode != StorageMode::Persistent {
507 return Err("physical metadata requires persistent mode".into());
508 }
509 let Some(path) = self.path() else {
510 return Err("database path is not available".into());
511 };
512 let native_state = self.native_physical_state();
513
514 match PhysicalMetadataFile::load_for_data_path(path) {
515 Ok(metadata) => {
516 if let Some(native_state) = native_state.as_ref() {
517 let inspection = Self::inspect_native_header_against_metadata(
518 native_state.header,
519 &metadata,
520 );
521 if Self::repair_policy_for_inspection(&inspection)
522 == NativeHeaderRepairPolicy::NativeAheadOfMetadata
523 {
524 let bootstrapped =
525 self.metadata_from_native_state(native_state, Some(&metadata));
526 if persist_bootstrapped && !self.options.read_only {
527 bootstrapped.save_for_data_path(path)?;
528 self.persist_native_physical_header(&bootstrapped)?;
529 }
530 return Ok(bootstrapped);
531 }
532 }
533 Ok(metadata)
534 }
535 Err(err) => {
536 let Some(native_state) = native_state else {
537 return Err(err.into());
538 };
539 let is_fresh_empty = native_state.header.sequence == 0
555 && native_state.registry.is_none()
556 && native_state.catalog.is_none()
557 && native_state.recovery.is_none();
558 if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
559 return Err(err.into());
560 }
561 let metadata = self.metadata_from_native_state(&native_state, None);
562 if persist_bootstrapped && !self.options.read_only {
563 metadata.save_for_data_path(path)?;
564 self.persist_native_physical_header(&metadata)?;
565 }
566 Ok(metadata)
567 }
568 }
569 }
570
571 pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
572 let path = self.path()?;
573 let native_state = self.native_physical_state();
574 let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
575
576 match (metadata, native_state) {
577 (Some(metadata), Some(native_state)) => {
578 let inspection =
579 Self::inspect_native_header_against_metadata(native_state.header, &metadata);
580 match Self::repair_policy_for_inspection(&inspection) {
581 NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
582 NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
583 NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
584 }
585 }
586 (Some(_), None) => Some("sidecar_only"),
587 (None, Some(_)) => Some("sidecar_missing_native_available"),
588 (None, None) => Some("sidecar_missing_no_native"),
589 }
590 }
591
592 fn metadata_from_native_state(
593 &self,
594 native_state: &NativePhysicalState,
595 previous: Option<&PhysicalMetadataFile>,
596 ) -> PhysicalMetadataFile {
597 let now = SystemTime::now()
598 .duration_since(UNIX_EPOCH)
599 .unwrap_or_default()
600 .as_millis();
601 let catalog = self.catalog_snapshot();
602 let catalog_name = catalog.name.clone();
603 let catalog_total_entities = catalog.total_entities;
604 let catalog_total_collections = catalog.total_collections;
605 let indexes = self.physical_index_state();
606
607 let mut manifest =
608 crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
609 manifest.updated_at_unix_ms = now;
610
611 let manifest_events = native_state
612 .manifest
613 .as_ref()
614 .map(|summary| {
615 summary
616 .recent_events
617 .iter()
618 .map(|event| crate::physical::ManifestEvent {
619 collection: event.collection.clone(),
620 object_key: event.object_key.clone(),
621 kind: match event.kind.as_str() {
622 "insert" => crate::physical::ManifestEventKind::Insert,
623 "update" => crate::physical::ManifestEventKind::Update,
624 "remove" => crate::physical::ManifestEventKind::Remove,
625 _ => crate::physical::ManifestEventKind::Checkpoint,
626 },
627 block: crate::physical::BlockReference {
628 index: event.block_index,
629 checksum: event.block_checksum,
630 },
631 snapshot_min: event.snapshot_min,
632 snapshot_max: event.snapshot_max,
633 })
634 .collect()
635 })
636 .unwrap_or_default();
637
638 let graph_projections = native_state
639 .registry
640 .as_ref()
641 .and_then(|registry| {
642 registry.graph_projections_complete.then(|| {
643 registry
644 .graph_projections
645 .iter()
646 .map(|projection| crate::physical::PhysicalGraphProjection {
647 name: projection.name.clone(),
648 created_at_unix_ms: projection.created_at_unix_ms,
649 updated_at_unix_ms: projection.updated_at_unix_ms,
650 state: "materialized".to_string(),
651 source: projection.source.clone(),
652 node_labels: projection.node_labels.clone(),
653 node_types: projection.node_types.clone(),
654 edge_labels: projection.edge_labels.clone(),
655 last_materialized_sequence: projection.last_materialized_sequence,
656 })
657 .collect()
658 })
659 })
660 .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
661 .unwrap_or_default();
662
663 let analytics_jobs = native_state
664 .registry
665 .as_ref()
666 .and_then(|registry| {
667 registry.analytics_jobs_complete.then(|| {
668 registry
669 .analytics_jobs
670 .iter()
671 .map(|job| crate::physical::PhysicalAnalyticsJob {
672 id: job.id.clone(),
673 kind: job.kind.clone(),
674 state: job.state.clone(),
675 projection: job.projection.clone(),
676 created_at_unix_ms: job.created_at_unix_ms,
677 updated_at_unix_ms: job.updated_at_unix_ms,
678 last_run_sequence: job.last_run_sequence,
679 metadata: job.metadata.clone(),
680 })
681 .collect()
682 })
683 })
684 .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
685 .unwrap_or_default();
686
687 let exports = native_state
688 .recovery
689 .as_ref()
690 .and_then(|recovery| {
691 recovery.exports_complete.then(|| {
692 recovery
693 .exports
694 .iter()
695 .map(|export| crate::physical::ExportDescriptor {
696 name: export.name.clone(),
697 created_at_unix_ms: export.created_at_unix_ms,
698 snapshot_id: export.snapshot_id,
699 superblock_sequence: export.superblock_sequence,
700 data_path: self
701 .path()
702 .map(|path| {
703 crate::physical::PhysicalMetadataFile::export_data_path_for(
704 path,
705 &export.name,
706 )
707 .display()
708 .to_string()
709 })
710 .unwrap_or_default(),
711 metadata_path: self
712 .path()
713 .map(|path| {
714 let export_data_path =
715 crate::physical::PhysicalMetadataFile::export_data_path_for(
716 path,
717 &export.name,
718 );
719 crate::physical::PhysicalMetadataFile::metadata_path_for(
720 &export_data_path,
721 )
722 .display()
723 .to_string()
724 })
725 .unwrap_or_default(),
726 collection_count: export.collection_count as usize,
727 total_entities: export.total_entities as usize,
728 })
729 .collect()
730 })
731 })
732 .or_else(|| previous.map(|metadata| metadata.exports.clone()))
733 .unwrap_or_default();
734
735 let snapshots = native_state
736 .recovery
737 .as_ref()
738 .and_then(|recovery| {
739 recovery.snapshots_complete.then(|| {
740 recovery
741 .snapshots
742 .iter()
743 .map(|snapshot| crate::physical::SnapshotDescriptor {
744 snapshot_id: snapshot.snapshot_id,
745 created_at_unix_ms: snapshot.created_at_unix_ms,
746 superblock_sequence: snapshot.superblock_sequence,
747 collection_count: snapshot.collection_count as usize,
748 total_entities: snapshot.total_entities as usize,
749 })
750 .collect()
751 })
752 })
753 .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
754 .unwrap_or_else(|| {
755 vec![crate::physical::SnapshotDescriptor {
756 snapshot_id: native_state.header.sequence,
757 created_at_unix_ms: now,
758 superblock_sequence: native_state.header.sequence,
759 collection_count: catalog_total_collections,
760 total_entities: catalog_total_entities,
761 }]
762 });
763
764 let catalog_stats = native_state
765 .catalog
766 .as_ref()
767 .and_then(|native_catalog| {
768 native_catalog.collections_complete.then(|| {
769 native_catalog
770 .collections
771 .iter()
772 .map(|collection| {
773 (
774 collection.name.clone(),
775 crate::api::CollectionStats {
776 entities: collection.entities as usize,
777 cross_refs: collection.cross_refs as usize,
778 segments: collection.segments as usize,
779 },
780 )
781 })
782 .collect::<BTreeMap<_, _>>()
783 })
784 })
785 .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
786 .unwrap_or_else(|| catalog.stats_by_collection.clone());
787
788 PhysicalMetadataFile {
789 protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
790 generated_at_unix_ms: now,
791 last_loaded_from: Some("native_bootstrap".to_string()),
792 last_healed_at_unix_ms: Some(now),
793 manifest,
794 catalog: crate::api::CatalogSnapshot {
795 name: catalog_name,
796 total_entities: native_state
797 .catalog
798 .as_ref()
799 .map(|summary| summary.total_entities as usize)
800 .unwrap_or(catalog_total_entities),
801 total_collections: native_state
802 .catalog
803 .as_ref()
804 .map(|summary| summary.collection_count as usize)
805 .unwrap_or(catalog_total_collections),
806 stats_by_collection: catalog_stats,
807 updated_at: SystemTime::now(),
808 },
809 manifest_events,
810 collection_ttl_defaults_ms: previous
811 .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
812 .unwrap_or_default(),
813 collection_contracts: previous
814 .map(|metadata| metadata.collection_contracts.clone())
815 .unwrap_or_default(),
816 hypertables: previous
817 .map(|metadata| metadata.hypertables.clone())
818 .unwrap_or_default(),
819 tree_definitions: previous
820 .map(|metadata| metadata.tree_definitions.clone())
821 .unwrap_or_default(),
822 indexes,
823 graph_projections,
824 analytics_jobs,
825 exports,
826 superblock: crate::physical::SuperblockHeader {
827 format_version: native_state.header.format_version,
828 sequence: native_state.header.sequence,
829 copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
830 manifest: crate::physical::ManifestPointers {
831 oldest: crate::physical::BlockReference {
832 index: native_state.header.manifest_oldest_root,
833 checksum: 0,
834 },
835 newest: crate::physical::BlockReference {
836 index: native_state.header.manifest_root,
837 checksum: 0,
838 },
839 },
840 free_set: crate::physical::BlockReference {
841 index: native_state.header.free_set_root,
842 checksum: 0,
843 },
844 collection_roots: native_state.collection_roots.clone(),
845 },
846 snapshots,
847 }
848 }
849
850 pub(crate) fn reconcile_index_states_with_native_artifacts(
851 &self,
852 mut indexes: Vec<PhysicalIndexState>,
853 ) -> Vec<PhysicalIndexState> {
854 let native_artifacts = self
855 .native_physical_state()
856 .and_then(|state| state.registry)
857 .map(|registry| registry.vector_artifacts)
858 .unwrap_or_default();
859 for index in &mut indexes {
860 let Some(collection) = index.collection.as_deref() else {
861 continue;
862 };
863 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
864 continue;
865 };
866 let Some(artifact) = native_artifacts.iter().find(|artifact| {
867 artifact.collection == collection && artifact.artifact_kind == artifact_kind
868 }) else {
869 index.build_state = "metadata-only".to_string();
870 continue;
871 };
872 index.entries = artifact.vector_count as usize;
873 index.estimated_memory_bytes = artifact.serialized_bytes;
874 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
875 index.artifact_kind = Some(artifact.artifact_kind.clone());
876 index.artifact_checksum = Some(artifact.checksum);
877 index.build_state = "artifact-published".to_string();
878 if let Some(pages) = self.native_vector_artifact_pages() {
879 index.artifact_root_page = pages
880 .into_iter()
881 .find(|page| {
882 page.collection == artifact.collection
883 && page.artifact_kind == artifact.artifact_kind
884 })
885 .map(|page| page.root_page);
886 }
887 }
888 indexes
889 }
890
891 pub(crate) fn warmup_native_vector_artifact_for_index(
892 &self,
893 index: &PhysicalIndexState,
894 ) -> Result<(), String> {
895 let Some(collection) = index.collection.as_deref() else {
896 return Ok(());
897 };
898 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
899 return Ok(());
900 };
901 self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
902 Ok(())
903 }
904
905 pub(crate) fn apply_runtime_native_artifact_to_index_state(
906 &self,
907 index: &mut PhysicalIndexState,
908 ) -> Result<(), String> {
909 let Some(collection) = index.collection.as_deref() else {
910 return Ok(());
911 };
912 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
913 return Ok(());
914 };
915 let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
916 index.entries = artifact
917 .graph_edge_count
918 .or(artifact.text_posting_count)
919 .unwrap_or(artifact.node_count) as usize;
920 index.estimated_memory_bytes = artifact.byte_len;
921 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
922 index.artifact_kind = Some(artifact.artifact_kind.clone());
923 index.artifact_checksum = Some(artifact.checksum);
924 index.build_state = "ready".to_string();
925 index.artifact_root_page = self
926 .native_vector_artifact_pages()
927 .and_then(|pages| {
928 pages.into_iter().find(|page| {
929 page.collection == artifact.collection
930 && page.artifact_kind == artifact.artifact_kind
931 })
932 })
933 .map(|page| page.root_page);
934 Ok(())
935 }
936
937 pub(crate) fn physical_index_state_from_native_state(
938 &self,
939 native_state: &NativePhysicalState,
940 previous: Option<&PhysicalMetadataFile>,
941 ) -> Vec<PhysicalIndexState> {
942 let mut fresh = self.physical_index_state();
943 let Some(registry) = native_state.registry.as_ref() else {
944 if let Some(previous) = previous {
945 for index in &previous.indexes {
946 if !fresh.iter().any(|candidate| candidate.name == index.name) {
947 fresh.push(index.clone());
948 }
949 }
950 }
951 return fresh;
952 };
953
954 for index in &mut fresh {
955 if let Some(native) = registry
956 .indexes
957 .iter()
958 .find(|candidate| candidate.name == index.name)
959 {
960 index.enabled = native.enabled;
961 index.last_refresh_ms = native.last_refresh_ms;
962 index.backend = native.backend.clone();
963 index.entries = native.entries as usize;
964 index.estimated_memory_bytes = native.estimated_memory_bytes;
965 if index.artifact_kind.is_none() {
966 index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
967 .map(|value| value.to_string());
968 }
969 if index.build_state == "catalog-derived" {
970 index.build_state = "registry-loaded".to_string();
971 }
972 }
973 }
974
975 for native in ®istry.indexes {
976 if fresh.iter().any(|index| index.name == native.name) {
977 continue;
978 }
979 let Some(kind) = Self::index_kind_from_str(&native.kind) else {
980 continue;
981 };
982 fresh.push(PhysicalIndexState {
983 name: native.name.clone(),
984 kind,
985 collection: native.collection.clone(),
986 enabled: native.enabled,
987 entries: native.entries as usize,
988 estimated_memory_bytes: native.estimated_memory_bytes,
989 last_refresh_ms: native.last_refresh_ms,
990 backend: native.backend.clone(),
991 artifact_kind: Self::native_artifact_kind_for_index(kind)
992 .map(|value| value.to_string()),
993 artifact_root_page: None,
994 artifact_checksum: None,
995 build_state: "registry-loaded".to_string(),
996 });
997 }
998
999 if !registry.indexes_complete {
1000 if let Some(previous) = previous {
1001 for index in &previous.indexes {
1002 if !fresh.iter().any(|candidate| candidate.name == index.name) {
1003 fresh.push(index.clone());
1004 }
1005 }
1006 }
1007 }
1008
1009 fresh
1010 }
1011
1012 pub(crate) fn graph_projections_from_native_state(
1013 &self,
1014 native_state: &NativePhysicalState,
1015 ) -> Vec<PhysicalGraphProjection> {
1016 native_state
1017 .registry
1018 .as_ref()
1019 .map(|registry| {
1020 registry
1021 .graph_projections
1022 .iter()
1023 .map(|projection| PhysicalGraphProjection {
1024 name: projection.name.clone(),
1025 created_at_unix_ms: projection.created_at_unix_ms,
1026 updated_at_unix_ms: projection.updated_at_unix_ms,
1027 state: "materialized".to_string(),
1028 source: projection.source.clone(),
1029 node_labels: projection.node_labels.clone(),
1030 node_types: projection.node_types.clone(),
1031 edge_labels: projection.edge_labels.clone(),
1032 last_materialized_sequence: projection.last_materialized_sequence,
1033 })
1034 .collect()
1035 })
1036 .unwrap_or_default()
1037 }
1038
1039 pub(crate) fn analytics_jobs_from_native_state(
1040 &self,
1041 native_state: &NativePhysicalState,
1042 ) -> Vec<PhysicalAnalyticsJob> {
1043 native_state
1044 .registry
1045 .as_ref()
1046 .map(|registry| {
1047 registry
1048 .analytics_jobs
1049 .iter()
1050 .map(|job| PhysicalAnalyticsJob {
1051 id: job.id.clone(),
1052 kind: job.kind.clone(),
1053 state: job.state.clone(),
1054 projection: job.projection.clone(),
1055 created_at_unix_ms: job.created_at_unix_ms,
1056 updated_at_unix_ms: job.updated_at_unix_ms,
1057 last_run_sequence: job.last_run_sequence,
1058 metadata: job.metadata.clone(),
1059 })
1060 .collect()
1061 })
1062 .unwrap_or_default()
1063 }
1064
1065 pub(crate) fn exports_from_native_state(
1066 &self,
1067 native_state: &NativePhysicalState,
1068 ) -> Vec<ExportDescriptor> {
1069 native_state
1070 .recovery
1071 .as_ref()
1072 .map(|recovery| {
1073 recovery
1074 .exports
1075 .iter()
1076 .map(|export| ExportDescriptor {
1077 name: export.name.clone(),
1078 created_at_unix_ms: export.created_at_unix_ms,
1079 snapshot_id: export.snapshot_id,
1080 superblock_sequence: export.superblock_sequence,
1081 data_path: self
1082 .path()
1083 .map(|path| {
1084 crate::physical::PhysicalMetadataFile::export_data_path_for(
1085 path,
1086 &export.name,
1087 )
1088 .display()
1089 .to_string()
1090 })
1091 .unwrap_or_default(),
1092 metadata_path: self
1093 .path()
1094 .map(|path| {
1095 let export_data_path =
1096 crate::physical::PhysicalMetadataFile::export_data_path_for(
1097 path,
1098 &export.name,
1099 );
1100 crate::physical::PhysicalMetadataFile::metadata_path_for(
1101 &export_data_path,
1102 )
1103 .display()
1104 .to_string()
1105 })
1106 .unwrap_or_default(),
1107 collection_count: export.collection_count as usize,
1108 total_entities: export.total_entities as usize,
1109 })
1110 .collect()
1111 })
1112 .unwrap_or_default()
1113 }
1114
1115 pub(crate) fn snapshots_from_native_state(
1116 &self,
1117 native_state: &NativePhysicalState,
1118 ) -> Vec<crate::physical::SnapshotDescriptor> {
1119 let snapshots: Vec<_> = native_state
1120 .recovery
1121 .as_ref()
1122 .map(|recovery| {
1123 recovery
1124 .snapshots
1125 .iter()
1126 .map(|snapshot| crate::physical::SnapshotDescriptor {
1127 snapshot_id: snapshot.snapshot_id,
1128 created_at_unix_ms: snapshot.created_at_unix_ms,
1129 superblock_sequence: snapshot.superblock_sequence,
1130 collection_count: snapshot.collection_count as usize,
1131 total_entities: snapshot.total_entities as usize,
1132 })
1133 .collect()
1134 })
1135 .unwrap_or_default();
1136 if !snapshots.is_empty() {
1137 return snapshots;
1138 }
1139
1140 let now = SystemTime::now()
1141 .duration_since(UNIX_EPOCH)
1142 .unwrap_or_default()
1143 .as_millis();
1144 let (collection_count, total_entities) = native_state
1145 .catalog
1146 .as_ref()
1147 .map(|catalog| {
1148 (
1149 catalog.collection_count as usize,
1150 catalog.total_entities as usize,
1151 )
1152 })
1153 .unwrap_or_else(|| {
1154 let catalog = self.catalog_snapshot();
1155 (catalog.total_collections, catalog.total_entities)
1156 });
1157
1158 vec![crate::physical::SnapshotDescriptor {
1159 snapshot_id: native_state.header.sequence,
1160 created_at_unix_ms: now,
1161 superblock_sequence: native_state.header.sequence,
1162 collection_count,
1163 total_entities,
1164 }]
1165 }
1166
1167 fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1168 match value {
1169 "btree" => Some(crate::index::IndexKind::BTree),
1170 "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1171 "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1172 "vector.turbo" => Some(crate::index::IndexKind::VectorTurbo),
1173 "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1174 "text.fulltext" => Some(crate::index::IndexKind::FullText),
1175 "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1176 "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1177 _ => None,
1178 }
1179 }
1180
1181 pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1182 match kind {
1183 IndexKind::VectorHnsw => Some("hnsw"),
1184 IndexKind::VectorInverted => Some("ivf"),
1185 IndexKind::VectorTurbo => Some("turboquant"),
1186 IndexKind::GraphAdjacency => Some("graph.adjacency"),
1187 IndexKind::FullText => Some("text.fulltext"),
1188 IndexKind::DocumentPathValue => Some("document.pathvalue"),
1189 _ => None,
1190 }
1191 }
1192
1193 fn index_is_declared(&self, name: &str) -> bool {
1194 self.physical_metadata()
1195 .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1196 .unwrap_or(false)
1197 }
1198
1199 pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1200 self.physical_metadata()
1201 .map(|metadata| {
1202 metadata
1203 .graph_projections
1204 .iter()
1205 .any(|projection| projection.name == name)
1206 })
1207 .unwrap_or(false)
1208 }
1209
1210 pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1211 self.operational_graph_projections()
1212 .into_iter()
1213 .any(|projection| projection.name == name && projection.state == "materialized")
1214 }
1215
1216 pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1217 match projection {
1218 Some(projection) => format!("{kind}::{projection}"),
1219 None => format!("{kind}::global"),
1220 }
1221 }
1222
1223 pub(crate) fn update_physical_metadata<T, F>(
1224 &self,
1225 mutator: F,
1226 ) -> Result<T, Box<dyn std::error::Error>>
1227 where
1228 F: FnOnce(&mut PhysicalMetadataFile) -> T,
1229 {
1230 if self.options.mode != StorageMode::Persistent {
1231 return Err("physical metadata operations require persistent mode".into());
1232 }
1233 if self.options.read_only {
1234 return Err("physical metadata operations are not allowed in read-only mode".into());
1235 }
1236 let Some(path) = self.path() else {
1237 return Err("database path is not available".into());
1238 };
1239
1240 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1241
1242 if metadata.indexes.is_empty() {
1243 metadata.indexes = self.physical_index_state();
1244 }
1245 metadata.superblock.collection_roots = self.physical_collection_roots();
1246
1247 let result = mutator(&mut metadata);
1248 metadata.save_for_data_path(path)?;
1249 self.persist_native_physical_header(&metadata)?;
1250 Ok(result)
1251 }
1252
1253 pub(crate) fn persist_native_physical_header(
1254 &self,
1255 metadata: &PhysicalMetadataFile,
1256 ) -> Result<(), Box<dyn std::error::Error>> {
1257 if !self.paged_mode {
1258 return Ok(());
1259 }
1260
1261 let existing_page = self
1262 .store
1263 .physical_file_header()
1264 .map(|header| header.collection_roots_page)
1265 .filter(|page| *page != 0);
1266 let existing_registry_page = self
1267 .store
1268 .physical_file_header()
1269 .map(|header| header.registry_page)
1270 .filter(|page| *page != 0);
1271 let existing_recovery_page = self
1272 .store
1273 .physical_file_header()
1274 .map(|header| header.recovery_page)
1275 .filter(|page| *page != 0);
1276 let existing_catalog_page = self
1277 .store
1278 .physical_file_header()
1279 .map(|header| header.catalog_page)
1280 .filter(|page| *page != 0);
1281 let existing_metadata_state_page = self
1282 .store
1283 .physical_file_header()
1284 .map(|header| header.metadata_state_page)
1285 .filter(|page| *page != 0);
1286 let existing_vector_artifact_page = self
1287 .store
1288 .physical_file_header()
1289 .map(|header| header.vector_artifact_page)
1290 .filter(|page| *page != 0);
1291 let existing_manifest_page = self
1292 .store
1293 .physical_file_header()
1294 .map(|header| header.manifest_page)
1295 .filter(|page| *page != 0);
1296 let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1297 metadata.superblock.sequence,
1298 &metadata.manifest_events,
1299 existing_manifest_page,
1300 )?;
1301 let (collection_roots_page, collection_roots_checksum) = self
1302 .store
1303 .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1304 let registry_summary = self.native_registry_summary_from_metadata(metadata);
1305 let (registry_page, registry_checksum) = self
1306 .store
1307 .write_native_registry_summary(®istry_summary, existing_registry_page)?;
1308 let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1309 let (recovery_page, recovery_checksum) = self
1310 .store
1311 .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1312 let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1313 let (catalog_page, catalog_checksum) = self
1314 .store
1315 .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1316 let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1317 let (metadata_state_page, metadata_state_checksum) =
1318 self.store.write_native_metadata_state_summary(
1319 &metadata_state_summary,
1320 existing_metadata_state_page,
1321 )?;
1322 let vector_artifact_records = self.native_vector_artifact_records();
1323 let vector_artifact_payloads = vector_artifact_records
1324 .iter()
1325 .map(|(summary, bytes)| {
1326 (
1327 summary.collection.clone(),
1328 summary.artifact_kind.clone(),
1329 bytes.clone(),
1330 )
1331 })
1332 .collect::<Vec<_>>();
1333 let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1334 self.store.write_native_vector_artifact_store(
1335 &vector_artifact_payloads,
1336 existing_vector_artifact_page,
1337 )?;
1338 let mut header = Self::native_header_from_metadata(metadata);
1339 header.manifest_page = manifest_page;
1340 header.manifest_checksum = manifest_checksum;
1341 header.collection_roots_page = collection_roots_page;
1342 header.collection_roots_checksum = collection_roots_checksum;
1343 header.registry_page = registry_page;
1344 header.registry_checksum = registry_checksum;
1345 header.recovery_page = recovery_page;
1346 header.recovery_checksum = recovery_checksum;
1347 header.catalog_page = catalog_page;
1348 header.catalog_checksum = catalog_checksum;
1349 header.metadata_state_page = metadata_state_page;
1350 header.metadata_state_checksum = metadata_state_checksum;
1351 header.vector_artifact_page = vector_artifact_page;
1352 header.vector_artifact_checksum = vector_artifact_checksum;
1353 self.store.update_physical_file_header(header)?;
1354 self.store.persist()?;
1355 Ok(())
1356 }
1357
1358 pub(crate) fn native_header_from_metadata(
1359 metadata: &PhysicalMetadataFile,
1360 ) -> PhysicalFileHeader {
1361 PhysicalFileHeader {
1362 format_version: metadata.superblock.format_version,
1363 sequence: metadata.superblock.sequence,
1364 manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1365 manifest_root: metadata.superblock.manifest.newest.index,
1366 free_set_root: metadata.superblock.free_set.index,
1367 manifest_page: 0,
1368 manifest_checksum: 0,
1369 collection_roots_page: 0,
1370 collection_roots_checksum: 0,
1371 collection_root_count: metadata.superblock.collection_roots.len() as u32,
1372 snapshot_count: metadata.snapshots.len() as u32,
1373 index_count: metadata.indexes.len() as u32,
1374 catalog_collection_count: metadata.catalog.total_collections as u32,
1375 catalog_total_entities: metadata.catalog.total_entities as u64,
1376 export_count: metadata.exports.len() as u32,
1377 graph_projection_count: metadata.graph_projections.len() as u32,
1378 analytics_job_count: metadata.analytics_jobs.len() as u32,
1379 manifest_event_count: metadata.manifest_events.len() as u32,
1380 registry_page: 0,
1381 registry_checksum: 0,
1382 recovery_page: 0,
1383 recovery_checksum: 0,
1384 catalog_page: 0,
1385 catalog_checksum: 0,
1386 metadata_state_page: 0,
1387 metadata_state_checksum: 0,
1388 vector_artifact_page: 0,
1389 vector_artifact_checksum: 0,
1390 }
1391 }
1392
1393 fn recover_queue_pending_state(&self) {
1394 const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1395
1396 let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1397 return;
1398 };
1399
1400 let pending_rows = manager.query_all(|entity| {
1401 entity.data.as_row().is_some_and(|row| {
1402 matches!(
1403 row.get_field("kind"),
1404 Some(crate::storage::schema::Value::Text(kind))
1405 if matches!(&**kind, "queue_pending" | "queue_pending_lc")
1406 )
1407 })
1408 });
1409
1410 for row in pending_rows {
1411 let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1412 }
1413 }
1414}
1415
1416fn current_unix_ms() -> u64 {
1417 SystemTime::now()
1418 .duration_since(UNIX_EPOCH)
1419 .unwrap_or_default()
1420 .as_millis()
1421 .min(u128::from(u64::MAX)) as u64
1422}