1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5 pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
6 if self.options.read_only {
7 return Ok(());
8 }
9
10 if self.options.mode == StorageMode::Persistent {
13 let Some(path) = self.path() else {
14 return Ok(());
15 };
16
17 let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
18 return Ok(());
19 };
20
21 self.prune_export_registry(&mut metadata.exports);
22 metadata.save_for_data_path(path)?;
23 }
24
25 let _ = self.sweep_ttl_expired_entities()?;
26
27 Ok(())
28 }
29
30 fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
31 let now_ms = SystemTime::now()
32 .duration_since(UNIX_EPOCH)
33 .unwrap_or_default()
34 .as_millis()
35 .min(u128::from(u64::MAX)) as u64;
36
37 let mut to_delete = Vec::<(String, EntityId)>::new();
38
39 let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
40 to_delete.append(&mut absolute_expired);
41
42 let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
43 to_delete.append(&mut relative_expired);
44
45 to_delete.sort_unstable();
46 to_delete.dedup();
47
48 let mut deleted = 0usize;
49 for (collection, id) in to_delete {
50 match self.store.delete(&collection, id) {
51 Ok(true) => deleted = deleted.saturating_add(1),
52 Ok(false) => {}
53 Err(err) => {
54 return Err(format!(
55 "failed deleting expired entity {id} from collection '{collection}': {err:?}"
56 )
57 .into());
58 }
59 }
60 }
61
62 Ok(deleted)
63 }
64
65 fn expired_entities_by_expires_at(
66 &self,
67 now_ms: u64,
68 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
69 let mut ids = self.store.filter_metadata_all(&[(
70 "_expires_at".to_string(),
71 MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
72 )]);
73
74 if let Ok(now_ms_i64) = i64::try_from(now_ms) {
75 ids.extend(self.store.filter_metadata_all(&[(
76 "_expires_at".to_string(),
77 MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
78 )]));
79 }
80
81 let now_ms_f64 = now_ms as f64;
82 if now_ms_f64.is_finite() {
83 ids.extend(self.store.filter_metadata_all(&[(
84 "_expires_at".to_string(),
85 MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
86 )]));
87 }
88
89 Ok(ids)
90 }
91
92 fn expired_entities_by_ttl(
93 &self,
94 now_ms: u64,
95 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
96 let mut candidates = Vec::<(String, EntityId)>::new();
97
98 let ttl_ms_candidates = self
99 .store
100 .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
101 candidates.extend(ttl_ms_candidates);
102
103 let ttl_candidates = self
104 .store
105 .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
106 candidates.extend(ttl_candidates);
107
108 if candidates.is_empty() {
109 return Ok(Vec::new());
110 }
111
112 candidates.sort_unstable();
113 candidates.dedup();
114
115 let mut expired = Vec::<(String, EntityId)>::new();
116 for (collection, entity_id) in candidates {
117 let Some(entity) = self.store.get(&collection, entity_id) else {
118 continue;
119 };
120
121 let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
122 continue;
123 };
124
125 let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
126 let ttl_secs = if ttl_ms.is_none() {
127 metadata.get("_ttl").and_then(|value| {
128 Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
129 })
130 } else {
131 None
132 };
133
134 let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
135 continue;
136 };
137
138 let created_at_ms = entity.created_at.saturating_mul(1000);
139 let expiry_ms = created_at_ms.saturating_add(ttl_ms);
140 if expiry_ms <= now_ms {
141 expired.push((collection, entity_id));
142 }
143 }
144
145 Ok(expired)
146 }
147
148 fn metadata_u64(value: &MetadataValue) -> Option<u64> {
149 match value {
150 MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
151 MetadataValue::Timestamp(v) => Some(*v),
152 MetadataValue::Float(v) => {
153 if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
154 return None;
155 }
156 if *v > u64::MAX as f64 {
157 return None;
158 }
159 Some(v.trunc() as u64)
160 }
161 MetadataValue::String(v) => v.parse::<u64>().ok(),
162 _ => None,
163 }
164 }
165
166 pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
179 NodeBuilder::new(
180 self.store.clone(),
181 self.preprocessors.clone(),
182 collection,
183 label,
184 )
185 }
186
187 pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
199 EdgeBuilder::new(
200 self.store.clone(),
201 self.preprocessors.clone(),
202 collection,
203 label,
204 )
205 }
206
207 pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
218 VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
219 }
220
221 pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
232 RowBuilder::new(
233 self.store.clone(),
234 self.preprocessors.clone(),
235 table,
236 columns,
237 )
238 }
239
240 pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
254 DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
255 }
256
257 pub fn kv(
268 &self,
269 collection: impl Into<String>,
270 key: impl Into<String>,
271 value: Value,
272 ) -> KvBuilder {
273 KvBuilder::new(
274 self.store.clone(),
275 self.preprocessors.clone(),
276 collection,
277 key,
278 value,
279 )
280 }
281
282 pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
286 let manager = self.store.get_collection(collection)?;
287 let entities = manager.query_all(|_| true);
288 for entity in entities {
289 if let EntityData::Row(ref row) = entity.data {
290 if let Some(ref named) = row.named {
291 if let Some(Value::Text(ref k)) = named.get("key") {
292 if &**k == key {
293 let value = named.get("value").cloned().unwrap_or(Value::Null);
294 return Some((value, entity.id));
295 }
296 }
297 }
298 }
299 }
300 None
301 }
302
303 pub fn delete_kv(
305 &self,
306 collection: &str,
307 key: &str,
308 ) -> Result<bool, super::super::error::DevXError> {
309 let Some((_, id)) = self.get_kv(collection, key) else {
310 return Ok(false);
311 };
312 self.store
313 .delete(collection, id)
314 .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
315 Ok(true)
316 }
317
318 pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
319 if self.options.mode == StorageMode::Persistent {
320 if let Ok(metadata) = self.load_or_bootstrap_physical_metadata(false) {
322 crate::reserved_fields::validate_physical_metadata_contracts(&metadata)
323 .map_err(|err| err.to_string())?;
324 }
325 }
329 self.load_collection_ttl_defaults_from_metadata();
330 self.recover_queue_pending_state();
331 Ok(self)
332 }
333
334 pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
335 if self.options.mode != StorageMode::Persistent || self.options.read_only {
336 return Ok(());
337 }
338 let Some(path) = self.path() else {
339 return Ok(());
340 };
341
342 let previous = self.load_or_bootstrap_physical_metadata(false).ok();
343 let collection_roots = self.physical_collection_roots();
344 let indexes = self
345 .native_physical_state()
346 .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
347 .unwrap_or_else(|| self.physical_index_state());
348 let mut metadata = PhysicalMetadataFile::from_state(
349 self.options.clone(),
350 self.catalog_snapshot(),
351 collection_roots,
352 indexes,
353 previous.as_ref(),
354 );
355 metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
356 metadata.save_for_data_path(path)?;
357 self.persist_native_physical_header(&metadata)?;
358 Ok(())
359 }
360
361 fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
362 if self.options.mode != StorageMode::Persistent || self.options.read_only {
363 return Ok(false);
364 }
365 let Some(path) = self.path() else {
366 return Ok(false);
367 };
368 let Some(native_state) = self.native_physical_state() else {
369 return Ok(false);
370 };
371 if !Self::native_state_is_bootstrap_complete(&native_state) {
372 return Ok(false);
373 }
374
375 let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
376 let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
377 metadata.save_for_data_path(path)?;
378 self.persist_native_physical_header(&metadata)?;
379 Ok(true)
380 }
381
382 pub fn rebuild_physical_metadata_from_native_state(
385 &self,
386 ) -> Result<bool, Box<dyn std::error::Error>> {
387 self.bootstrap_metadata_from_native_state()
388 }
389
390 pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
391 let registry_complete = native_state.registry.as_ref().map(|registry| {
392 registry.collections_complete
393 && registry.indexes_complete
394 && registry.graph_projections_complete
395 && registry.analytics_jobs_complete
396 && registry.vector_artifacts_complete
397 });
398 let recovery_complete = native_state
399 .recovery
400 .as_ref()
401 .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
402 let catalog_complete = native_state
403 .catalog
404 .as_ref()
405 .map(|catalog| catalog.collections_complete);
406
407 registry_complete == Some(true)
408 && recovery_complete == Some(true)
409 && catalog_complete == Some(true)
410 }
411
412 pub(crate) fn load_or_bootstrap_physical_metadata(
413 &self,
414 persist_bootstrapped: bool,
415 ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
416 if self.options.mode != StorageMode::Persistent {
417 return Err("physical metadata requires persistent mode".into());
418 }
419 let Some(path) = self.path() else {
420 return Err("database path is not available".into());
421 };
422 let native_state = self.native_physical_state();
423
424 match PhysicalMetadataFile::load_for_data_path(path) {
425 Ok(metadata) => {
426 if let Some(native_state) = native_state.as_ref() {
427 let inspection = Self::inspect_native_header_against_metadata(
428 native_state.header,
429 &metadata,
430 );
431 if Self::repair_policy_for_inspection(&inspection)
432 == NativeHeaderRepairPolicy::NativeAheadOfMetadata
433 {
434 let bootstrapped =
435 self.metadata_from_native_state(native_state, Some(&metadata));
436 if persist_bootstrapped && !self.options.read_only {
437 bootstrapped.save_for_data_path(path)?;
438 self.persist_native_physical_header(&bootstrapped)?;
439 }
440 return Ok(bootstrapped);
441 }
442 }
443 Ok(metadata)
444 }
445 Err(err) => {
446 let Some(native_state) = native_state else {
447 return Err(err.into());
448 };
449 let is_fresh_empty = native_state.header.sequence == 0
465 && native_state.registry.is_none()
466 && native_state.catalog.is_none()
467 && native_state.recovery.is_none();
468 if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
469 return Err(err.into());
470 }
471 let metadata = self.metadata_from_native_state(&native_state, None);
472 if persist_bootstrapped && !self.options.read_only {
473 metadata.save_for_data_path(path)?;
474 self.persist_native_physical_header(&metadata)?;
475 }
476 Ok(metadata)
477 }
478 }
479 }
480
481 pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
482 let path = self.path()?;
483 let native_state = self.native_physical_state();
484 let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
485
486 match (metadata, native_state) {
487 (Some(metadata), Some(native_state)) => {
488 let inspection =
489 Self::inspect_native_header_against_metadata(native_state.header, &metadata);
490 match Self::repair_policy_for_inspection(&inspection) {
491 NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
492 NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
493 NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
494 }
495 }
496 (Some(_), None) => Some("sidecar_only"),
497 (None, Some(_)) => Some("sidecar_missing_native_available"),
498 (None, None) => Some("sidecar_missing_no_native"),
499 }
500 }
501
502 fn metadata_from_native_state(
503 &self,
504 native_state: &NativePhysicalState,
505 previous: Option<&PhysicalMetadataFile>,
506 ) -> PhysicalMetadataFile {
507 let now = SystemTime::now()
508 .duration_since(UNIX_EPOCH)
509 .unwrap_or_default()
510 .as_millis();
511 let catalog = self.catalog_snapshot();
512 let catalog_name = catalog.name.clone();
513 let catalog_total_entities = catalog.total_entities;
514 let catalog_total_collections = catalog.total_collections;
515 let indexes = self.physical_index_state();
516
517 let mut manifest =
518 crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
519 manifest.updated_at_unix_ms = now;
520
521 let manifest_events = native_state
522 .manifest
523 .as_ref()
524 .map(|summary| {
525 summary
526 .recent_events
527 .iter()
528 .map(|event| crate::physical::ManifestEvent {
529 collection: event.collection.clone(),
530 object_key: event.object_key.clone(),
531 kind: match event.kind.as_str() {
532 "insert" => crate::physical::ManifestEventKind::Insert,
533 "update" => crate::physical::ManifestEventKind::Update,
534 "remove" => crate::physical::ManifestEventKind::Remove,
535 _ => crate::physical::ManifestEventKind::Checkpoint,
536 },
537 block: crate::physical::BlockReference {
538 index: event.block_index,
539 checksum: event.block_checksum,
540 },
541 snapshot_min: event.snapshot_min,
542 snapshot_max: event.snapshot_max,
543 })
544 .collect()
545 })
546 .unwrap_or_default();
547
548 let graph_projections = native_state
549 .registry
550 .as_ref()
551 .and_then(|registry| {
552 registry.graph_projections_complete.then(|| {
553 registry
554 .graph_projections
555 .iter()
556 .map(|projection| crate::physical::PhysicalGraphProjection {
557 name: projection.name.clone(),
558 created_at_unix_ms: projection.created_at_unix_ms,
559 updated_at_unix_ms: projection.updated_at_unix_ms,
560 state: "materialized".to_string(),
561 source: projection.source.clone(),
562 node_labels: projection.node_labels.clone(),
563 node_types: projection.node_types.clone(),
564 edge_labels: projection.edge_labels.clone(),
565 last_materialized_sequence: projection.last_materialized_sequence,
566 })
567 .collect()
568 })
569 })
570 .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
571 .unwrap_or_default();
572
573 let analytics_jobs = native_state
574 .registry
575 .as_ref()
576 .and_then(|registry| {
577 registry.analytics_jobs_complete.then(|| {
578 registry
579 .analytics_jobs
580 .iter()
581 .map(|job| crate::physical::PhysicalAnalyticsJob {
582 id: job.id.clone(),
583 kind: job.kind.clone(),
584 state: job.state.clone(),
585 projection: job.projection.clone(),
586 created_at_unix_ms: job.created_at_unix_ms,
587 updated_at_unix_ms: job.updated_at_unix_ms,
588 last_run_sequence: job.last_run_sequence,
589 metadata: job.metadata.clone(),
590 })
591 .collect()
592 })
593 })
594 .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
595 .unwrap_or_default();
596
597 let exports = native_state
598 .recovery
599 .as_ref()
600 .and_then(|recovery| {
601 recovery.exports_complete.then(|| {
602 recovery
603 .exports
604 .iter()
605 .map(|export| crate::physical::ExportDescriptor {
606 name: export.name.clone(),
607 created_at_unix_ms: export.created_at_unix_ms,
608 snapshot_id: export.snapshot_id,
609 superblock_sequence: export.superblock_sequence,
610 data_path: self
611 .path()
612 .map(|path| {
613 crate::physical::PhysicalMetadataFile::export_data_path_for(
614 path,
615 &export.name,
616 )
617 .display()
618 .to_string()
619 })
620 .unwrap_or_default(),
621 metadata_path: self
622 .path()
623 .map(|path| {
624 let export_data_path =
625 crate::physical::PhysicalMetadataFile::export_data_path_for(
626 path,
627 &export.name,
628 );
629 crate::physical::PhysicalMetadataFile::metadata_path_for(
630 &export_data_path,
631 )
632 .display()
633 .to_string()
634 })
635 .unwrap_or_default(),
636 collection_count: export.collection_count as usize,
637 total_entities: export.total_entities as usize,
638 })
639 .collect()
640 })
641 })
642 .or_else(|| previous.map(|metadata| metadata.exports.clone()))
643 .unwrap_or_default();
644
645 let snapshots = native_state
646 .recovery
647 .as_ref()
648 .and_then(|recovery| {
649 recovery.snapshots_complete.then(|| {
650 recovery
651 .snapshots
652 .iter()
653 .map(|snapshot| crate::physical::SnapshotDescriptor {
654 snapshot_id: snapshot.snapshot_id,
655 created_at_unix_ms: snapshot.created_at_unix_ms,
656 superblock_sequence: snapshot.superblock_sequence,
657 collection_count: snapshot.collection_count as usize,
658 total_entities: snapshot.total_entities as usize,
659 })
660 .collect()
661 })
662 })
663 .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
664 .unwrap_or_else(|| {
665 vec![crate::physical::SnapshotDescriptor {
666 snapshot_id: native_state.header.sequence,
667 created_at_unix_ms: now,
668 superblock_sequence: native_state.header.sequence,
669 collection_count: catalog_total_collections,
670 total_entities: catalog_total_entities,
671 }]
672 });
673
674 let catalog_stats = native_state
675 .catalog
676 .as_ref()
677 .and_then(|native_catalog| {
678 native_catalog.collections_complete.then(|| {
679 native_catalog
680 .collections
681 .iter()
682 .map(|collection| {
683 (
684 collection.name.clone(),
685 crate::api::CollectionStats {
686 entities: collection.entities as usize,
687 cross_refs: collection.cross_refs as usize,
688 segments: collection.segments as usize,
689 },
690 )
691 })
692 .collect::<BTreeMap<_, _>>()
693 })
694 })
695 .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
696 .unwrap_or_else(|| catalog.stats_by_collection.clone());
697
698 PhysicalMetadataFile {
699 protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
700 generated_at_unix_ms: now,
701 last_loaded_from: Some("native_bootstrap".to_string()),
702 last_healed_at_unix_ms: Some(now),
703 manifest,
704 catalog: crate::api::CatalogSnapshot {
705 name: catalog_name,
706 total_entities: native_state
707 .catalog
708 .as_ref()
709 .map(|summary| summary.total_entities as usize)
710 .unwrap_or(catalog_total_entities),
711 total_collections: native_state
712 .catalog
713 .as_ref()
714 .map(|summary| summary.collection_count as usize)
715 .unwrap_or(catalog_total_collections),
716 stats_by_collection: catalog_stats,
717 updated_at: SystemTime::now(),
718 },
719 manifest_events,
720 collection_ttl_defaults_ms: previous
721 .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
722 .unwrap_or_default(),
723 collection_contracts: previous
724 .map(|metadata| metadata.collection_contracts.clone())
725 .unwrap_or_default(),
726 tree_definitions: previous
727 .map(|metadata| metadata.tree_definitions.clone())
728 .unwrap_or_default(),
729 indexes,
730 graph_projections,
731 analytics_jobs,
732 exports,
733 superblock: crate::physical::SuperblockHeader {
734 format_version: native_state.header.format_version,
735 sequence: native_state.header.sequence,
736 copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
737 manifest: crate::physical::ManifestPointers {
738 oldest: crate::physical::BlockReference {
739 index: native_state.header.manifest_oldest_root,
740 checksum: 0,
741 },
742 newest: crate::physical::BlockReference {
743 index: native_state.header.manifest_root,
744 checksum: 0,
745 },
746 },
747 free_set: crate::physical::BlockReference {
748 index: native_state.header.free_set_root,
749 checksum: 0,
750 },
751 collection_roots: native_state.collection_roots.clone(),
752 },
753 snapshots,
754 }
755 }
756
757 pub(crate) fn reconcile_index_states_with_native_artifacts(
758 &self,
759 mut indexes: Vec<PhysicalIndexState>,
760 ) -> Vec<PhysicalIndexState> {
761 let native_artifacts = self
762 .native_physical_state()
763 .and_then(|state| state.registry)
764 .map(|registry| registry.vector_artifacts)
765 .unwrap_or_default();
766 for index in &mut indexes {
767 let Some(collection) = index.collection.as_deref() else {
768 continue;
769 };
770 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
771 continue;
772 };
773 let Some(artifact) = native_artifacts.iter().find(|artifact| {
774 artifact.collection == collection && artifact.artifact_kind == artifact_kind
775 }) else {
776 index.build_state = "metadata-only".to_string();
777 continue;
778 };
779 index.entries = artifact.vector_count as usize;
780 index.estimated_memory_bytes = artifact.serialized_bytes;
781 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
782 index.artifact_kind = Some(artifact.artifact_kind.clone());
783 index.artifact_checksum = Some(artifact.checksum);
784 index.build_state = "artifact-published".to_string();
785 if let Some(pages) = self.native_vector_artifact_pages() {
786 index.artifact_root_page = pages
787 .into_iter()
788 .find(|page| {
789 page.collection == artifact.collection
790 && page.artifact_kind == artifact.artifact_kind
791 })
792 .map(|page| page.root_page);
793 }
794 }
795 indexes
796 }
797
798 pub(crate) fn warmup_native_vector_artifact_for_index(
799 &self,
800 index: &PhysicalIndexState,
801 ) -> Result<(), String> {
802 let Some(collection) = index.collection.as_deref() else {
803 return Ok(());
804 };
805 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
806 return Ok(());
807 };
808 self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
809 Ok(())
810 }
811
812 pub(crate) fn apply_runtime_native_artifact_to_index_state(
813 &self,
814 index: &mut PhysicalIndexState,
815 ) -> Result<(), String> {
816 let Some(collection) = index.collection.as_deref() else {
817 return Ok(());
818 };
819 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
820 return Ok(());
821 };
822 let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
823 index.entries = artifact
824 .graph_edge_count
825 .or(artifact.text_posting_count)
826 .unwrap_or(artifact.node_count) as usize;
827 index.estimated_memory_bytes = artifact.byte_len;
828 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
829 index.artifact_kind = Some(artifact.artifact_kind.clone());
830 index.artifact_checksum = Some(artifact.checksum);
831 index.build_state = "ready".to_string();
832 index.artifact_root_page = self
833 .native_vector_artifact_pages()
834 .and_then(|pages| {
835 pages.into_iter().find(|page| {
836 page.collection == artifact.collection
837 && page.artifact_kind == artifact.artifact_kind
838 })
839 })
840 .map(|page| page.root_page);
841 Ok(())
842 }
843
844 pub(crate) fn physical_index_state_from_native_state(
845 &self,
846 native_state: &NativePhysicalState,
847 previous: Option<&PhysicalMetadataFile>,
848 ) -> Vec<PhysicalIndexState> {
849 let mut fresh = self.physical_index_state();
850 let Some(registry) = native_state.registry.as_ref() else {
851 if let Some(previous) = previous {
852 for index in &previous.indexes {
853 if !fresh.iter().any(|candidate| candidate.name == index.name) {
854 fresh.push(index.clone());
855 }
856 }
857 }
858 return fresh;
859 };
860
861 for index in &mut fresh {
862 if let Some(native) = registry
863 .indexes
864 .iter()
865 .find(|candidate| candidate.name == index.name)
866 {
867 index.enabled = native.enabled;
868 index.last_refresh_ms = native.last_refresh_ms;
869 index.backend = native.backend.clone();
870 index.entries = native.entries as usize;
871 index.estimated_memory_bytes = native.estimated_memory_bytes;
872 if index.artifact_kind.is_none() {
873 index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
874 .map(|value| value.to_string());
875 }
876 if index.build_state == "catalog-derived" {
877 index.build_state = "registry-loaded".to_string();
878 }
879 }
880 }
881
882 for native in ®istry.indexes {
883 if fresh.iter().any(|index| index.name == native.name) {
884 continue;
885 }
886 let Some(kind) = Self::index_kind_from_str(&native.kind) else {
887 continue;
888 };
889 fresh.push(PhysicalIndexState {
890 name: native.name.clone(),
891 kind,
892 collection: native.collection.clone(),
893 enabled: native.enabled,
894 entries: native.entries as usize,
895 estimated_memory_bytes: native.estimated_memory_bytes,
896 last_refresh_ms: native.last_refresh_ms,
897 backend: native.backend.clone(),
898 artifact_kind: Self::native_artifact_kind_for_index(kind)
899 .map(|value| value.to_string()),
900 artifact_root_page: None,
901 artifact_checksum: None,
902 build_state: "registry-loaded".to_string(),
903 });
904 }
905
906 if !registry.indexes_complete {
907 if let Some(previous) = previous {
908 for index in &previous.indexes {
909 if !fresh.iter().any(|candidate| candidate.name == index.name) {
910 fresh.push(index.clone());
911 }
912 }
913 }
914 }
915
916 fresh
917 }
918
919 pub(crate) fn graph_projections_from_native_state(
920 &self,
921 native_state: &NativePhysicalState,
922 ) -> Vec<PhysicalGraphProjection> {
923 native_state
924 .registry
925 .as_ref()
926 .map(|registry| {
927 registry
928 .graph_projections
929 .iter()
930 .map(|projection| PhysicalGraphProjection {
931 name: projection.name.clone(),
932 created_at_unix_ms: projection.created_at_unix_ms,
933 updated_at_unix_ms: projection.updated_at_unix_ms,
934 state: "materialized".to_string(),
935 source: projection.source.clone(),
936 node_labels: projection.node_labels.clone(),
937 node_types: projection.node_types.clone(),
938 edge_labels: projection.edge_labels.clone(),
939 last_materialized_sequence: projection.last_materialized_sequence,
940 })
941 .collect()
942 })
943 .unwrap_or_default()
944 }
945
946 pub(crate) fn analytics_jobs_from_native_state(
947 &self,
948 native_state: &NativePhysicalState,
949 ) -> Vec<PhysicalAnalyticsJob> {
950 native_state
951 .registry
952 .as_ref()
953 .map(|registry| {
954 registry
955 .analytics_jobs
956 .iter()
957 .map(|job| PhysicalAnalyticsJob {
958 id: job.id.clone(),
959 kind: job.kind.clone(),
960 state: job.state.clone(),
961 projection: job.projection.clone(),
962 created_at_unix_ms: job.created_at_unix_ms,
963 updated_at_unix_ms: job.updated_at_unix_ms,
964 last_run_sequence: job.last_run_sequence,
965 metadata: job.metadata.clone(),
966 })
967 .collect()
968 })
969 .unwrap_or_default()
970 }
971
972 pub(crate) fn exports_from_native_state(
973 &self,
974 native_state: &NativePhysicalState,
975 ) -> Vec<ExportDescriptor> {
976 native_state
977 .recovery
978 .as_ref()
979 .map(|recovery| {
980 recovery
981 .exports
982 .iter()
983 .map(|export| ExportDescriptor {
984 name: export.name.clone(),
985 created_at_unix_ms: export.created_at_unix_ms,
986 snapshot_id: export.snapshot_id,
987 superblock_sequence: export.superblock_sequence,
988 data_path: self
989 .path()
990 .map(|path| {
991 crate::physical::PhysicalMetadataFile::export_data_path_for(
992 path,
993 &export.name,
994 )
995 .display()
996 .to_string()
997 })
998 .unwrap_or_default(),
999 metadata_path: self
1000 .path()
1001 .map(|path| {
1002 let export_data_path =
1003 crate::physical::PhysicalMetadataFile::export_data_path_for(
1004 path,
1005 &export.name,
1006 );
1007 crate::physical::PhysicalMetadataFile::metadata_path_for(
1008 &export_data_path,
1009 )
1010 .display()
1011 .to_string()
1012 })
1013 .unwrap_or_default(),
1014 collection_count: export.collection_count as usize,
1015 total_entities: export.total_entities as usize,
1016 })
1017 .collect()
1018 })
1019 .unwrap_or_default()
1020 }
1021
1022 pub(crate) fn snapshots_from_native_state(
1023 &self,
1024 native_state: &NativePhysicalState,
1025 ) -> Vec<crate::physical::SnapshotDescriptor> {
1026 native_state
1027 .recovery
1028 .as_ref()
1029 .map(|recovery| {
1030 recovery
1031 .snapshots
1032 .iter()
1033 .map(|snapshot| crate::physical::SnapshotDescriptor {
1034 snapshot_id: snapshot.snapshot_id,
1035 created_at_unix_ms: snapshot.created_at_unix_ms,
1036 superblock_sequence: snapshot.superblock_sequence,
1037 collection_count: snapshot.collection_count as usize,
1038 total_entities: snapshot.total_entities as usize,
1039 })
1040 .collect()
1041 })
1042 .unwrap_or_default()
1043 }
1044
1045 fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1046 match value {
1047 "btree" => Some(crate::index::IndexKind::BTree),
1048 "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1049 "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1050 "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1051 "text.fulltext" => Some(crate::index::IndexKind::FullText),
1052 "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1053 "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1054 _ => None,
1055 }
1056 }
1057
1058 pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1059 match kind {
1060 IndexKind::VectorHnsw => Some("hnsw"),
1061 IndexKind::VectorInverted => Some("ivf"),
1062 IndexKind::GraphAdjacency => Some("graph.adjacency"),
1063 IndexKind::FullText => Some("text.fulltext"),
1064 IndexKind::DocumentPathValue => Some("document.pathvalue"),
1065 _ => None,
1066 }
1067 }
1068
1069 fn index_is_declared(&self, name: &str) -> bool {
1070 self.physical_metadata()
1071 .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1072 .unwrap_or(false)
1073 }
1074
1075 pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1076 self.physical_metadata()
1077 .map(|metadata| {
1078 metadata
1079 .graph_projections
1080 .iter()
1081 .any(|projection| projection.name == name)
1082 })
1083 .unwrap_or(false)
1084 }
1085
1086 pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1087 self.operational_graph_projections()
1088 .into_iter()
1089 .any(|projection| projection.name == name && projection.state == "materialized")
1090 }
1091
1092 pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1093 match projection {
1094 Some(projection) => format!("{kind}::{projection}"),
1095 None => format!("{kind}::global"),
1096 }
1097 }
1098
1099 pub(crate) fn update_physical_metadata<T, F>(
1100 &self,
1101 mutator: F,
1102 ) -> Result<T, Box<dyn std::error::Error>>
1103 where
1104 F: FnOnce(&mut PhysicalMetadataFile) -> T,
1105 {
1106 if self.options.mode != StorageMode::Persistent {
1107 return Err("physical metadata operations require persistent mode".into());
1108 }
1109 if self.options.read_only {
1110 return Err("physical metadata operations are not allowed in read-only mode".into());
1111 }
1112 let Some(path) = self.path() else {
1113 return Err("database path is not available".into());
1114 };
1115
1116 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1117
1118 if metadata.indexes.is_empty() {
1119 metadata.indexes = self.physical_index_state();
1120 }
1121 metadata.superblock.collection_roots = self.physical_collection_roots();
1122
1123 let result = mutator(&mut metadata);
1124 metadata.save_for_data_path(path)?;
1125 self.persist_native_physical_header(&metadata)?;
1126 Ok(result)
1127 }
1128
1129 pub(crate) fn persist_native_physical_header(
1130 &self,
1131 metadata: &PhysicalMetadataFile,
1132 ) -> Result<(), Box<dyn std::error::Error>> {
1133 if !self.paged_mode {
1134 return Ok(());
1135 }
1136
1137 let existing_page = self
1138 .store
1139 .physical_file_header()
1140 .map(|header| header.collection_roots_page)
1141 .filter(|page| *page != 0);
1142 let existing_registry_page = self
1143 .store
1144 .physical_file_header()
1145 .map(|header| header.registry_page)
1146 .filter(|page| *page != 0);
1147 let existing_recovery_page = self
1148 .store
1149 .physical_file_header()
1150 .map(|header| header.recovery_page)
1151 .filter(|page| *page != 0);
1152 let existing_catalog_page = self
1153 .store
1154 .physical_file_header()
1155 .map(|header| header.catalog_page)
1156 .filter(|page| *page != 0);
1157 let existing_metadata_state_page = self
1158 .store
1159 .physical_file_header()
1160 .map(|header| header.metadata_state_page)
1161 .filter(|page| *page != 0);
1162 let existing_vector_artifact_page = self
1163 .store
1164 .physical_file_header()
1165 .map(|header| header.vector_artifact_page)
1166 .filter(|page| *page != 0);
1167 let existing_manifest_page = self
1168 .store
1169 .physical_file_header()
1170 .map(|header| header.manifest_page)
1171 .filter(|page| *page != 0);
1172 let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1173 metadata.superblock.sequence,
1174 &metadata.manifest_events,
1175 existing_manifest_page,
1176 )?;
1177 let (collection_roots_page, collection_roots_checksum) = self
1178 .store
1179 .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1180 let registry_summary = self.native_registry_summary_from_metadata(metadata);
1181 let (registry_page, registry_checksum) = self
1182 .store
1183 .write_native_registry_summary(®istry_summary, existing_registry_page)?;
1184 let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1185 let (recovery_page, recovery_checksum) = self
1186 .store
1187 .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1188 let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1189 let (catalog_page, catalog_checksum) = self
1190 .store
1191 .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1192 let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1193 let (metadata_state_page, metadata_state_checksum) =
1194 self.store.write_native_metadata_state_summary(
1195 &metadata_state_summary,
1196 existing_metadata_state_page,
1197 )?;
1198 let vector_artifact_records = self.native_vector_artifact_records();
1199 let vector_artifact_payloads = vector_artifact_records
1200 .iter()
1201 .map(|(summary, bytes)| {
1202 (
1203 summary.collection.clone(),
1204 summary.artifact_kind.clone(),
1205 bytes.clone(),
1206 )
1207 })
1208 .collect::<Vec<_>>();
1209 let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1210 self.store.write_native_vector_artifact_store(
1211 &vector_artifact_payloads,
1212 existing_vector_artifact_page,
1213 )?;
1214 let mut header = Self::native_header_from_metadata(metadata);
1215 header.manifest_page = manifest_page;
1216 header.manifest_checksum = manifest_checksum;
1217 header.collection_roots_page = collection_roots_page;
1218 header.collection_roots_checksum = collection_roots_checksum;
1219 header.registry_page = registry_page;
1220 header.registry_checksum = registry_checksum;
1221 header.recovery_page = recovery_page;
1222 header.recovery_checksum = recovery_checksum;
1223 header.catalog_page = catalog_page;
1224 header.catalog_checksum = catalog_checksum;
1225 header.metadata_state_page = metadata_state_page;
1226 header.metadata_state_checksum = metadata_state_checksum;
1227 header.vector_artifact_page = vector_artifact_page;
1228 header.vector_artifact_checksum = vector_artifact_checksum;
1229 self.store.update_physical_file_header(header)?;
1230 self.store.persist()?;
1231 Ok(())
1232 }
1233
1234 pub(crate) fn native_header_from_metadata(
1235 metadata: &PhysicalMetadataFile,
1236 ) -> PhysicalFileHeader {
1237 PhysicalFileHeader {
1238 format_version: metadata.superblock.format_version,
1239 sequence: metadata.superblock.sequence,
1240 manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1241 manifest_root: metadata.superblock.manifest.newest.index,
1242 free_set_root: metadata.superblock.free_set.index,
1243 manifest_page: 0,
1244 manifest_checksum: 0,
1245 collection_roots_page: 0,
1246 collection_roots_checksum: 0,
1247 collection_root_count: metadata.superblock.collection_roots.len() as u32,
1248 snapshot_count: metadata.snapshots.len() as u32,
1249 index_count: metadata.indexes.len() as u32,
1250 catalog_collection_count: metadata.catalog.total_collections as u32,
1251 catalog_total_entities: metadata.catalog.total_entities as u64,
1252 export_count: metadata.exports.len() as u32,
1253 graph_projection_count: metadata.graph_projections.len() as u32,
1254 analytics_job_count: metadata.analytics_jobs.len() as u32,
1255 manifest_event_count: metadata.manifest_events.len() as u32,
1256 registry_page: 0,
1257 registry_checksum: 0,
1258 recovery_page: 0,
1259 recovery_checksum: 0,
1260 catalog_page: 0,
1261 catalog_checksum: 0,
1262 metadata_state_page: 0,
1263 metadata_state_checksum: 0,
1264 vector_artifact_page: 0,
1265 vector_artifact_checksum: 0,
1266 }
1267 }
1268
1269 fn recover_queue_pending_state(&self) {
1270 const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1271
1272 let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1273 return;
1274 };
1275
1276 let pending_rows = manager.query_all(|entity| {
1277 entity.data.as_row().is_some_and(|row| {
1278 matches!(
1279 row.get_field("kind"),
1280 Some(crate::storage::schema::Value::Text(kind)) if &**kind == "queue_pending"
1281 )
1282 })
1283 });
1284
1285 for row in pending_rows {
1286 let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1287 }
1288 }
1289}