1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5 pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
6 if self.options.read_only {
7 return Ok(());
8 }
9
10 if self.options.mode == StorageMode::Persistent {
13 let Some(path) = self.path() else {
14 return Ok(());
15 };
16
17 let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
18 return Ok(());
19 };
20
21 self.prune_export_registry(&mut metadata.exports);
22 metadata.save_for_data_path(path)?;
23 }
24
25 let _ = self.sweep_ttl_expired_entities()?;
26
27 Ok(())
28 }
29
30 fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
31 let now_ms = SystemTime::now()
32 .duration_since(UNIX_EPOCH)
33 .unwrap_or_default()
34 .as_millis()
35 .min(u128::from(u64::MAX)) as u64;
36
37 let mut to_delete = Vec::<(String, EntityId)>::new();
38
39 let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
40 to_delete.append(&mut absolute_expired);
41
42 let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
43 to_delete.append(&mut relative_expired);
44
45 to_delete.sort_unstable();
46 to_delete.dedup();
47
48 let mut deleted = 0usize;
49 for (collection, id) in to_delete {
50 match self.store.delete(&collection, id) {
51 Ok(true) => deleted = deleted.saturating_add(1),
52 Ok(false) => {}
53 Err(err) => {
54 return Err(format!(
55 "failed deleting expired entity {id} from collection '{collection}': {err:?}"
56 )
57 .into());
58 }
59 }
60 }
61
62 Ok(deleted)
63 }
64
65 fn expired_entities_by_expires_at(
66 &self,
67 now_ms: u64,
68 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
69 let mut ids = self.store.filter_metadata_all(&[(
70 "_expires_at".to_string(),
71 MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
72 )]);
73
74 if let Ok(now_ms_i64) = i64::try_from(now_ms) {
75 ids.extend(self.store.filter_metadata_all(&[(
76 "_expires_at".to_string(),
77 MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
78 )]));
79 }
80
81 let now_ms_f64 = now_ms as f64;
82 if now_ms_f64.is_finite() {
83 ids.extend(self.store.filter_metadata_all(&[(
84 "_expires_at".to_string(),
85 MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
86 )]));
87 }
88
89 Ok(ids)
90 }
91
92 fn expired_entities_by_ttl(
93 &self,
94 now_ms: u64,
95 ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
96 let mut candidates = Vec::<(String, EntityId)>::new();
97
98 let ttl_ms_candidates = self
99 .store
100 .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
101 candidates.extend(ttl_ms_candidates);
102
103 let ttl_candidates = self
104 .store
105 .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
106 candidates.extend(ttl_candidates);
107
108 if candidates.is_empty() {
109 return Ok(Vec::new());
110 }
111
112 candidates.sort_unstable();
113 candidates.dedup();
114
115 let mut expired = Vec::<(String, EntityId)>::new();
116 for (collection, entity_id) in candidates {
117 let Some(entity) = self.store.get(&collection, entity_id) else {
118 continue;
119 };
120
121 let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
122 continue;
123 };
124
125 let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
126 let ttl_secs = if ttl_ms.is_none() {
127 metadata.get("_ttl").and_then(|value| {
128 Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
129 })
130 } else {
131 None
132 };
133
134 let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
135 continue;
136 };
137
138 let created_at_ms = entity.created_at.saturating_mul(1000);
139 let expiry_ms = created_at_ms.saturating_add(ttl_ms);
140 if expiry_ms <= now_ms {
141 expired.push((collection, entity_id));
142 }
143 }
144
145 Ok(expired)
146 }
147
148 fn metadata_u64(value: &MetadataValue) -> Option<u64> {
149 match value {
150 MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
151 MetadataValue::Timestamp(v) => Some(*v),
152 MetadataValue::Float(v) => {
153 if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
154 return None;
155 }
156 if *v > u64::MAX as f64 {
157 return None;
158 }
159 Some(v.trunc() as u64)
160 }
161 MetadataValue::String(v) => v.parse::<u64>().ok(),
162 _ => None,
163 }
164 }
165
166 pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
179 NodeBuilder::new(
180 self.store.clone(),
181 self.preprocessors.clone(),
182 collection,
183 label,
184 )
185 }
186
187 pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
199 EdgeBuilder::new(
200 self.store.clone(),
201 self.preprocessors.clone(),
202 collection,
203 label,
204 )
205 }
206
207 pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
218 VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
219 }
220
221 pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
232 RowBuilder::new(
233 self.store.clone(),
234 self.preprocessors.clone(),
235 table,
236 columns,
237 )
238 }
239
240 pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
254 DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
255 }
256
257 pub fn kv(
268 &self,
269 collection: impl Into<String>,
270 key: impl Into<String>,
271 value: Value,
272 ) -> KvBuilder {
273 KvBuilder::new(
274 self.store.clone(),
275 self.preprocessors.clone(),
276 collection,
277 key,
278 value,
279 )
280 }
281
282 pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
286 let manager = self.store.get_collection(collection)?;
287 let entities = manager.query_all(|_| true);
288 for entity in entities {
289 if let EntityData::Row(ref row) = entity.data {
290 if let Some(ref named) = row.named {
291 if let Some(Value::Text(ref k)) = named.get("key") {
292 if &**k == key {
293 let value = named.get("value").cloned().unwrap_or(Value::Null);
294 return Some((value, entity.id));
295 }
296 }
297 }
298 }
299 }
300 None
301 }
302
303 pub fn delete_kv(
305 &self,
306 collection: &str,
307 key: &str,
308 ) -> Result<bool, super::super::error::DevXError> {
309 let Some((_, id)) = self.get_kv(collection, key) else {
310 return Ok(false);
311 };
312 self.store
313 .delete(collection, id)
314 .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
315 Ok(true)
316 }
317
318 pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
319 if self.options.mode == StorageMode::Persistent && !self.options.read_only {
320 let _ = self.load_or_bootstrap_physical_metadata(false);
322 }
326 self.load_collection_ttl_defaults_from_metadata();
327 self.recover_queue_pending_state();
328 Ok(self)
329 }
330
331 pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
332 if self.options.mode != StorageMode::Persistent || self.options.read_only {
333 return Ok(());
334 }
335 let Some(path) = self.path() else {
336 return Ok(());
337 };
338
339 let previous = self.load_or_bootstrap_physical_metadata(false).ok();
340 let collection_roots = self.physical_collection_roots();
341 let indexes = self
342 .native_physical_state()
343 .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
344 .unwrap_or_else(|| self.physical_index_state());
345 let mut metadata = PhysicalMetadataFile::from_state(
346 self.options.clone(),
347 self.catalog_snapshot(),
348 collection_roots,
349 indexes,
350 previous.as_ref(),
351 );
352 metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
353 metadata.save_for_data_path(path)?;
354 self.persist_native_physical_header(&metadata)?;
355 Ok(())
356 }
357
358 fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
359 if self.options.mode != StorageMode::Persistent || self.options.read_only {
360 return Ok(false);
361 }
362 let Some(path) = self.path() else {
363 return Ok(false);
364 };
365 let Some(native_state) = self.native_physical_state() else {
366 return Ok(false);
367 };
368 if !Self::native_state_is_bootstrap_complete(&native_state) {
369 return Ok(false);
370 }
371
372 let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
373 let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
374 metadata.save_for_data_path(path)?;
375 self.persist_native_physical_header(&metadata)?;
376 Ok(true)
377 }
378
379 pub fn rebuild_physical_metadata_from_native_state(
382 &self,
383 ) -> Result<bool, Box<dyn std::error::Error>> {
384 self.bootstrap_metadata_from_native_state()
385 }
386
387 pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
388 let registry_complete = native_state.registry.as_ref().map(|registry| {
389 registry.collections_complete
390 && registry.indexes_complete
391 && registry.graph_projections_complete
392 && registry.analytics_jobs_complete
393 && registry.vector_artifacts_complete
394 });
395 let recovery_complete = native_state
396 .recovery
397 .as_ref()
398 .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
399 let catalog_complete = native_state
400 .catalog
401 .as_ref()
402 .map(|catalog| catalog.collections_complete);
403
404 registry_complete == Some(true)
405 && recovery_complete == Some(true)
406 && catalog_complete == Some(true)
407 }
408
409 pub(crate) fn load_or_bootstrap_physical_metadata(
410 &self,
411 persist_bootstrapped: bool,
412 ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
413 if self.options.mode != StorageMode::Persistent {
414 return Err("physical metadata requires persistent mode".into());
415 }
416 let Some(path) = self.path() else {
417 return Err("database path is not available".into());
418 };
419 let native_state = self.native_physical_state();
420
421 match PhysicalMetadataFile::load_for_data_path(path) {
422 Ok(metadata) => {
423 if let Some(native_state) = native_state.as_ref() {
424 let inspection = Self::inspect_native_header_against_metadata(
425 native_state.header,
426 &metadata,
427 );
428 if Self::repair_policy_for_inspection(&inspection)
429 == NativeHeaderRepairPolicy::NativeAheadOfMetadata
430 {
431 let bootstrapped =
432 self.metadata_from_native_state(native_state, Some(&metadata));
433 if persist_bootstrapped && !self.options.read_only {
434 bootstrapped.save_for_data_path(path)?;
435 self.persist_native_physical_header(&bootstrapped)?;
436 }
437 return Ok(bootstrapped);
438 }
439 }
440 Ok(metadata)
441 }
442 Err(err) => {
443 let Some(native_state) = native_state else {
444 return Err(err.into());
445 };
446 let is_fresh_empty = native_state.header.sequence == 0
462 && native_state.registry.is_none()
463 && native_state.catalog.is_none()
464 && native_state.recovery.is_none();
465 if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
466 return Err(err.into());
467 }
468 let metadata = self.metadata_from_native_state(&native_state, None);
469 if persist_bootstrapped && !self.options.read_only {
470 metadata.save_for_data_path(path)?;
471 self.persist_native_physical_header(&metadata)?;
472 }
473 Ok(metadata)
474 }
475 }
476 }
477
478 pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
479 let path = self.path()?;
480 let native_state = self.native_physical_state();
481 let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
482
483 match (metadata, native_state) {
484 (Some(metadata), Some(native_state)) => {
485 let inspection =
486 Self::inspect_native_header_against_metadata(native_state.header, &metadata);
487 match Self::repair_policy_for_inspection(&inspection) {
488 NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
489 NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
490 NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
491 }
492 }
493 (Some(_), None) => Some("sidecar_only"),
494 (None, Some(_)) => Some("sidecar_missing_native_available"),
495 (None, None) => Some("sidecar_missing_no_native"),
496 }
497 }
498
499 fn metadata_from_native_state(
500 &self,
501 native_state: &NativePhysicalState,
502 previous: Option<&PhysicalMetadataFile>,
503 ) -> PhysicalMetadataFile {
504 let now = SystemTime::now()
505 .duration_since(UNIX_EPOCH)
506 .unwrap_or_default()
507 .as_millis();
508 let catalog = self.catalog_snapshot();
509 let catalog_name = catalog.name.clone();
510 let catalog_total_entities = catalog.total_entities;
511 let catalog_total_collections = catalog.total_collections;
512 let indexes = self.physical_index_state();
513
514 let mut manifest =
515 crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
516 manifest.updated_at_unix_ms = now;
517
518 let manifest_events = native_state
519 .manifest
520 .as_ref()
521 .map(|summary| {
522 summary
523 .recent_events
524 .iter()
525 .map(|event| crate::physical::ManifestEvent {
526 collection: event.collection.clone(),
527 object_key: event.object_key.clone(),
528 kind: match event.kind.as_str() {
529 "insert" => crate::physical::ManifestEventKind::Insert,
530 "update" => crate::physical::ManifestEventKind::Update,
531 "remove" => crate::physical::ManifestEventKind::Remove,
532 _ => crate::physical::ManifestEventKind::Checkpoint,
533 },
534 block: crate::physical::BlockReference {
535 index: event.block_index,
536 checksum: event.block_checksum,
537 },
538 snapshot_min: event.snapshot_min,
539 snapshot_max: event.snapshot_max,
540 })
541 .collect()
542 })
543 .unwrap_or_default();
544
545 let graph_projections = native_state
546 .registry
547 .as_ref()
548 .and_then(|registry| {
549 registry.graph_projections_complete.then(|| {
550 registry
551 .graph_projections
552 .iter()
553 .map(|projection| crate::physical::PhysicalGraphProjection {
554 name: projection.name.clone(),
555 created_at_unix_ms: projection.created_at_unix_ms,
556 updated_at_unix_ms: projection.updated_at_unix_ms,
557 state: "materialized".to_string(),
558 source: projection.source.clone(),
559 node_labels: projection.node_labels.clone(),
560 node_types: projection.node_types.clone(),
561 edge_labels: projection.edge_labels.clone(),
562 last_materialized_sequence: projection.last_materialized_sequence,
563 })
564 .collect()
565 })
566 })
567 .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
568 .unwrap_or_default();
569
570 let analytics_jobs = native_state
571 .registry
572 .as_ref()
573 .and_then(|registry| {
574 registry.analytics_jobs_complete.then(|| {
575 registry
576 .analytics_jobs
577 .iter()
578 .map(|job| crate::physical::PhysicalAnalyticsJob {
579 id: job.id.clone(),
580 kind: job.kind.clone(),
581 state: job.state.clone(),
582 projection: job.projection.clone(),
583 created_at_unix_ms: job.created_at_unix_ms,
584 updated_at_unix_ms: job.updated_at_unix_ms,
585 last_run_sequence: job.last_run_sequence,
586 metadata: job.metadata.clone(),
587 })
588 .collect()
589 })
590 })
591 .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
592 .unwrap_or_default();
593
594 let exports = native_state
595 .recovery
596 .as_ref()
597 .and_then(|recovery| {
598 recovery.exports_complete.then(|| {
599 recovery
600 .exports
601 .iter()
602 .map(|export| crate::physical::ExportDescriptor {
603 name: export.name.clone(),
604 created_at_unix_ms: export.created_at_unix_ms,
605 snapshot_id: export.snapshot_id,
606 superblock_sequence: export.superblock_sequence,
607 data_path: self
608 .path()
609 .map(|path| {
610 crate::physical::PhysicalMetadataFile::export_data_path_for(
611 path,
612 &export.name,
613 )
614 .display()
615 .to_string()
616 })
617 .unwrap_or_default(),
618 metadata_path: self
619 .path()
620 .map(|path| {
621 let export_data_path =
622 crate::physical::PhysicalMetadataFile::export_data_path_for(
623 path,
624 &export.name,
625 );
626 crate::physical::PhysicalMetadataFile::metadata_path_for(
627 &export_data_path,
628 )
629 .display()
630 .to_string()
631 })
632 .unwrap_or_default(),
633 collection_count: export.collection_count as usize,
634 total_entities: export.total_entities as usize,
635 })
636 .collect()
637 })
638 })
639 .or_else(|| previous.map(|metadata| metadata.exports.clone()))
640 .unwrap_or_default();
641
642 let snapshots = native_state
643 .recovery
644 .as_ref()
645 .and_then(|recovery| {
646 recovery.snapshots_complete.then(|| {
647 recovery
648 .snapshots
649 .iter()
650 .map(|snapshot| crate::physical::SnapshotDescriptor {
651 snapshot_id: snapshot.snapshot_id,
652 created_at_unix_ms: snapshot.created_at_unix_ms,
653 superblock_sequence: snapshot.superblock_sequence,
654 collection_count: snapshot.collection_count as usize,
655 total_entities: snapshot.total_entities as usize,
656 })
657 .collect()
658 })
659 })
660 .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
661 .unwrap_or_else(|| {
662 vec![crate::physical::SnapshotDescriptor {
663 snapshot_id: native_state.header.sequence,
664 created_at_unix_ms: now,
665 superblock_sequence: native_state.header.sequence,
666 collection_count: catalog_total_collections,
667 total_entities: catalog_total_entities,
668 }]
669 });
670
671 let catalog_stats = native_state
672 .catalog
673 .as_ref()
674 .and_then(|native_catalog| {
675 native_catalog.collections_complete.then(|| {
676 native_catalog
677 .collections
678 .iter()
679 .map(|collection| {
680 (
681 collection.name.clone(),
682 crate::api::CollectionStats {
683 entities: collection.entities as usize,
684 cross_refs: collection.cross_refs as usize,
685 segments: collection.segments as usize,
686 },
687 )
688 })
689 .collect::<BTreeMap<_, _>>()
690 })
691 })
692 .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
693 .unwrap_or_else(|| catalog.stats_by_collection.clone());
694
695 PhysicalMetadataFile {
696 protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
697 generated_at_unix_ms: now,
698 last_loaded_from: Some("native_bootstrap".to_string()),
699 last_healed_at_unix_ms: Some(now),
700 manifest,
701 catalog: crate::api::CatalogSnapshot {
702 name: catalog_name,
703 total_entities: native_state
704 .catalog
705 .as_ref()
706 .map(|summary| summary.total_entities as usize)
707 .unwrap_or(catalog_total_entities),
708 total_collections: native_state
709 .catalog
710 .as_ref()
711 .map(|summary| summary.collection_count as usize)
712 .unwrap_or(catalog_total_collections),
713 stats_by_collection: catalog_stats,
714 updated_at: SystemTime::now(),
715 },
716 manifest_events,
717 collection_ttl_defaults_ms: previous
718 .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
719 .unwrap_or_default(),
720 collection_contracts: previous
721 .map(|metadata| metadata.collection_contracts.clone())
722 .unwrap_or_default(),
723 tree_definitions: previous
724 .map(|metadata| metadata.tree_definitions.clone())
725 .unwrap_or_default(),
726 indexes,
727 graph_projections,
728 analytics_jobs,
729 exports,
730 superblock: crate::physical::SuperblockHeader {
731 format_version: native_state.header.format_version,
732 sequence: native_state.header.sequence,
733 copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
734 manifest: crate::physical::ManifestPointers {
735 oldest: crate::physical::BlockReference {
736 index: native_state.header.manifest_oldest_root,
737 checksum: 0,
738 },
739 newest: crate::physical::BlockReference {
740 index: native_state.header.manifest_root,
741 checksum: 0,
742 },
743 },
744 free_set: crate::physical::BlockReference {
745 index: native_state.header.free_set_root,
746 checksum: 0,
747 },
748 collection_roots: native_state.collection_roots.clone(),
749 },
750 snapshots,
751 }
752 }
753
754 pub(crate) fn reconcile_index_states_with_native_artifacts(
755 &self,
756 mut indexes: Vec<PhysicalIndexState>,
757 ) -> Vec<PhysicalIndexState> {
758 let native_artifacts = self
759 .native_physical_state()
760 .and_then(|state| state.registry)
761 .map(|registry| registry.vector_artifacts)
762 .unwrap_or_default();
763 for index in &mut indexes {
764 let Some(collection) = index.collection.as_deref() else {
765 continue;
766 };
767 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
768 continue;
769 };
770 let Some(artifact) = native_artifacts.iter().find(|artifact| {
771 artifact.collection == collection && artifact.artifact_kind == artifact_kind
772 }) else {
773 index.build_state = "metadata-only".to_string();
774 continue;
775 };
776 index.entries = artifact.vector_count as usize;
777 index.estimated_memory_bytes = artifact.serialized_bytes;
778 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
779 index.artifact_kind = Some(artifact.artifact_kind.clone());
780 index.artifact_checksum = Some(artifact.checksum);
781 index.build_state = "artifact-published".to_string();
782 if let Some(pages) = self.native_vector_artifact_pages() {
783 index.artifact_root_page = pages
784 .into_iter()
785 .find(|page| {
786 page.collection == artifact.collection
787 && page.artifact_kind == artifact.artifact_kind
788 })
789 .map(|page| page.root_page);
790 }
791 }
792 indexes
793 }
794
795 pub(crate) fn warmup_native_vector_artifact_for_index(
796 &self,
797 index: &PhysicalIndexState,
798 ) -> Result<(), String> {
799 let Some(collection) = index.collection.as_deref() else {
800 return Ok(());
801 };
802 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
803 return Ok(());
804 };
805 self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
806 Ok(())
807 }
808
809 pub(crate) fn apply_runtime_native_artifact_to_index_state(
810 &self,
811 index: &mut PhysicalIndexState,
812 ) -> Result<(), String> {
813 let Some(collection) = index.collection.as_deref() else {
814 return Ok(());
815 };
816 let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
817 return Ok(());
818 };
819 let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
820 index.entries = artifact
821 .graph_edge_count
822 .or(artifact.text_posting_count)
823 .unwrap_or(artifact.node_count) as usize;
824 index.estimated_memory_bytes = artifact.byte_len;
825 index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
826 index.artifact_kind = Some(artifact.artifact_kind.clone());
827 index.artifact_checksum = Some(artifact.checksum);
828 index.build_state = "ready".to_string();
829 index.artifact_root_page = self
830 .native_vector_artifact_pages()
831 .and_then(|pages| {
832 pages.into_iter().find(|page| {
833 page.collection == artifact.collection
834 && page.artifact_kind == artifact.artifact_kind
835 })
836 })
837 .map(|page| page.root_page);
838 Ok(())
839 }
840
841 pub(crate) fn physical_index_state_from_native_state(
842 &self,
843 native_state: &NativePhysicalState,
844 previous: Option<&PhysicalMetadataFile>,
845 ) -> Vec<PhysicalIndexState> {
846 let mut fresh = self.physical_index_state();
847 let Some(registry) = native_state.registry.as_ref() else {
848 if let Some(previous) = previous {
849 for index in &previous.indexes {
850 if !fresh.iter().any(|candidate| candidate.name == index.name) {
851 fresh.push(index.clone());
852 }
853 }
854 }
855 return fresh;
856 };
857
858 for index in &mut fresh {
859 if let Some(native) = registry
860 .indexes
861 .iter()
862 .find(|candidate| candidate.name == index.name)
863 {
864 index.enabled = native.enabled;
865 index.last_refresh_ms = native.last_refresh_ms;
866 index.backend = native.backend.clone();
867 index.entries = native.entries as usize;
868 index.estimated_memory_bytes = native.estimated_memory_bytes;
869 if index.artifact_kind.is_none() {
870 index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
871 .map(|value| value.to_string());
872 }
873 if index.build_state == "catalog-derived" {
874 index.build_state = "registry-loaded".to_string();
875 }
876 }
877 }
878
879 for native in ®istry.indexes {
880 if fresh.iter().any(|index| index.name == native.name) {
881 continue;
882 }
883 let Some(kind) = Self::index_kind_from_str(&native.kind) else {
884 continue;
885 };
886 fresh.push(PhysicalIndexState {
887 name: native.name.clone(),
888 kind,
889 collection: native.collection.clone(),
890 enabled: native.enabled,
891 entries: native.entries as usize,
892 estimated_memory_bytes: native.estimated_memory_bytes,
893 last_refresh_ms: native.last_refresh_ms,
894 backend: native.backend.clone(),
895 artifact_kind: Self::native_artifact_kind_for_index(kind)
896 .map(|value| value.to_string()),
897 artifact_root_page: None,
898 artifact_checksum: None,
899 build_state: "registry-loaded".to_string(),
900 });
901 }
902
903 if !registry.indexes_complete {
904 if let Some(previous) = previous {
905 for index in &previous.indexes {
906 if !fresh.iter().any(|candidate| candidate.name == index.name) {
907 fresh.push(index.clone());
908 }
909 }
910 }
911 }
912
913 fresh
914 }
915
916 pub(crate) fn graph_projections_from_native_state(
917 &self,
918 native_state: &NativePhysicalState,
919 ) -> Vec<PhysicalGraphProjection> {
920 native_state
921 .registry
922 .as_ref()
923 .map(|registry| {
924 registry
925 .graph_projections
926 .iter()
927 .map(|projection| PhysicalGraphProjection {
928 name: projection.name.clone(),
929 created_at_unix_ms: projection.created_at_unix_ms,
930 updated_at_unix_ms: projection.updated_at_unix_ms,
931 state: "materialized".to_string(),
932 source: projection.source.clone(),
933 node_labels: projection.node_labels.clone(),
934 node_types: projection.node_types.clone(),
935 edge_labels: projection.edge_labels.clone(),
936 last_materialized_sequence: projection.last_materialized_sequence,
937 })
938 .collect()
939 })
940 .unwrap_or_default()
941 }
942
943 pub(crate) fn analytics_jobs_from_native_state(
944 &self,
945 native_state: &NativePhysicalState,
946 ) -> Vec<PhysicalAnalyticsJob> {
947 native_state
948 .registry
949 .as_ref()
950 .map(|registry| {
951 registry
952 .analytics_jobs
953 .iter()
954 .map(|job| PhysicalAnalyticsJob {
955 id: job.id.clone(),
956 kind: job.kind.clone(),
957 state: job.state.clone(),
958 projection: job.projection.clone(),
959 created_at_unix_ms: job.created_at_unix_ms,
960 updated_at_unix_ms: job.updated_at_unix_ms,
961 last_run_sequence: job.last_run_sequence,
962 metadata: job.metadata.clone(),
963 })
964 .collect()
965 })
966 .unwrap_or_default()
967 }
968
969 pub(crate) fn exports_from_native_state(
970 &self,
971 native_state: &NativePhysicalState,
972 ) -> Vec<ExportDescriptor> {
973 native_state
974 .recovery
975 .as_ref()
976 .map(|recovery| {
977 recovery
978 .exports
979 .iter()
980 .map(|export| ExportDescriptor {
981 name: export.name.clone(),
982 created_at_unix_ms: export.created_at_unix_ms,
983 snapshot_id: export.snapshot_id,
984 superblock_sequence: export.superblock_sequence,
985 data_path: self
986 .path()
987 .map(|path| {
988 crate::physical::PhysicalMetadataFile::export_data_path_for(
989 path,
990 &export.name,
991 )
992 .display()
993 .to_string()
994 })
995 .unwrap_or_default(),
996 metadata_path: self
997 .path()
998 .map(|path| {
999 let export_data_path =
1000 crate::physical::PhysicalMetadataFile::export_data_path_for(
1001 path,
1002 &export.name,
1003 );
1004 crate::physical::PhysicalMetadataFile::metadata_path_for(
1005 &export_data_path,
1006 )
1007 .display()
1008 .to_string()
1009 })
1010 .unwrap_or_default(),
1011 collection_count: export.collection_count as usize,
1012 total_entities: export.total_entities as usize,
1013 })
1014 .collect()
1015 })
1016 .unwrap_or_default()
1017 }
1018
1019 pub(crate) fn snapshots_from_native_state(
1020 &self,
1021 native_state: &NativePhysicalState,
1022 ) -> Vec<crate::physical::SnapshotDescriptor> {
1023 native_state
1024 .recovery
1025 .as_ref()
1026 .map(|recovery| {
1027 recovery
1028 .snapshots
1029 .iter()
1030 .map(|snapshot| crate::physical::SnapshotDescriptor {
1031 snapshot_id: snapshot.snapshot_id,
1032 created_at_unix_ms: snapshot.created_at_unix_ms,
1033 superblock_sequence: snapshot.superblock_sequence,
1034 collection_count: snapshot.collection_count as usize,
1035 total_entities: snapshot.total_entities as usize,
1036 })
1037 .collect()
1038 })
1039 .unwrap_or_default()
1040 }
1041
1042 fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1043 match value {
1044 "btree" => Some(crate::index::IndexKind::BTree),
1045 "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1046 "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1047 "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1048 "text.fulltext" => Some(crate::index::IndexKind::FullText),
1049 "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1050 "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1051 _ => None,
1052 }
1053 }
1054
1055 pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1056 match kind {
1057 IndexKind::VectorHnsw => Some("hnsw"),
1058 IndexKind::VectorInverted => Some("ivf"),
1059 IndexKind::GraphAdjacency => Some("graph.adjacency"),
1060 IndexKind::FullText => Some("text.fulltext"),
1061 IndexKind::DocumentPathValue => Some("document.pathvalue"),
1062 _ => None,
1063 }
1064 }
1065
1066 fn index_is_declared(&self, name: &str) -> bool {
1067 self.physical_metadata()
1068 .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1069 .unwrap_or(false)
1070 }
1071
1072 pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1073 self.physical_metadata()
1074 .map(|metadata| {
1075 metadata
1076 .graph_projections
1077 .iter()
1078 .any(|projection| projection.name == name)
1079 })
1080 .unwrap_or(false)
1081 }
1082
1083 pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1084 self.operational_graph_projections()
1085 .into_iter()
1086 .any(|projection| projection.name == name && projection.state == "materialized")
1087 }
1088
1089 pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1090 match projection {
1091 Some(projection) => format!("{kind}::{projection}"),
1092 None => format!("{kind}::global"),
1093 }
1094 }
1095
1096 pub(crate) fn update_physical_metadata<T, F>(
1097 &self,
1098 mutator: F,
1099 ) -> Result<T, Box<dyn std::error::Error>>
1100 where
1101 F: FnOnce(&mut PhysicalMetadataFile) -> T,
1102 {
1103 if self.options.mode != StorageMode::Persistent {
1104 return Err("physical metadata operations require persistent mode".into());
1105 }
1106 if self.options.read_only {
1107 return Err("physical metadata operations are not allowed in read-only mode".into());
1108 }
1109 let Some(path) = self.path() else {
1110 return Err("database path is not available".into());
1111 };
1112
1113 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1114
1115 if metadata.indexes.is_empty() {
1116 metadata.indexes = self.physical_index_state();
1117 }
1118 metadata.superblock.collection_roots = self.physical_collection_roots();
1119
1120 let result = mutator(&mut metadata);
1121 metadata.save_for_data_path(path)?;
1122 self.persist_native_physical_header(&metadata)?;
1123 Ok(result)
1124 }
1125
1126 pub(crate) fn persist_native_physical_header(
1127 &self,
1128 metadata: &PhysicalMetadataFile,
1129 ) -> Result<(), Box<dyn std::error::Error>> {
1130 if !self.paged_mode {
1131 return Ok(());
1132 }
1133
1134 let existing_page = self
1135 .store
1136 .physical_file_header()
1137 .map(|header| header.collection_roots_page)
1138 .filter(|page| *page != 0);
1139 let existing_registry_page = self
1140 .store
1141 .physical_file_header()
1142 .map(|header| header.registry_page)
1143 .filter(|page| *page != 0);
1144 let existing_recovery_page = self
1145 .store
1146 .physical_file_header()
1147 .map(|header| header.recovery_page)
1148 .filter(|page| *page != 0);
1149 let existing_catalog_page = self
1150 .store
1151 .physical_file_header()
1152 .map(|header| header.catalog_page)
1153 .filter(|page| *page != 0);
1154 let existing_metadata_state_page = self
1155 .store
1156 .physical_file_header()
1157 .map(|header| header.metadata_state_page)
1158 .filter(|page| *page != 0);
1159 let existing_vector_artifact_page = self
1160 .store
1161 .physical_file_header()
1162 .map(|header| header.vector_artifact_page)
1163 .filter(|page| *page != 0);
1164 let existing_manifest_page = self
1165 .store
1166 .physical_file_header()
1167 .map(|header| header.manifest_page)
1168 .filter(|page| *page != 0);
1169 let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1170 metadata.superblock.sequence,
1171 &metadata.manifest_events,
1172 existing_manifest_page,
1173 )?;
1174 let (collection_roots_page, collection_roots_checksum) = self
1175 .store
1176 .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1177 let registry_summary = self.native_registry_summary_from_metadata(metadata);
1178 let (registry_page, registry_checksum) = self
1179 .store
1180 .write_native_registry_summary(®istry_summary, existing_registry_page)?;
1181 let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1182 let (recovery_page, recovery_checksum) = self
1183 .store
1184 .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1185 let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1186 let (catalog_page, catalog_checksum) = self
1187 .store
1188 .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1189 let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1190 let (metadata_state_page, metadata_state_checksum) =
1191 self.store.write_native_metadata_state_summary(
1192 &metadata_state_summary,
1193 existing_metadata_state_page,
1194 )?;
1195 let vector_artifact_records = self.native_vector_artifact_records();
1196 let vector_artifact_payloads = vector_artifact_records
1197 .iter()
1198 .map(|(summary, bytes)| {
1199 (
1200 summary.collection.clone(),
1201 summary.artifact_kind.clone(),
1202 bytes.clone(),
1203 )
1204 })
1205 .collect::<Vec<_>>();
1206 let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1207 self.store.write_native_vector_artifact_store(
1208 &vector_artifact_payloads,
1209 existing_vector_artifact_page,
1210 )?;
1211 let mut header = Self::native_header_from_metadata(metadata);
1212 header.manifest_page = manifest_page;
1213 header.manifest_checksum = manifest_checksum;
1214 header.collection_roots_page = collection_roots_page;
1215 header.collection_roots_checksum = collection_roots_checksum;
1216 header.registry_page = registry_page;
1217 header.registry_checksum = registry_checksum;
1218 header.recovery_page = recovery_page;
1219 header.recovery_checksum = recovery_checksum;
1220 header.catalog_page = catalog_page;
1221 header.catalog_checksum = catalog_checksum;
1222 header.metadata_state_page = metadata_state_page;
1223 header.metadata_state_checksum = metadata_state_checksum;
1224 header.vector_artifact_page = vector_artifact_page;
1225 header.vector_artifact_checksum = vector_artifact_checksum;
1226 self.store.update_physical_file_header(header)?;
1227 self.store.persist()?;
1228 Ok(())
1229 }
1230
1231 pub(crate) fn native_header_from_metadata(
1232 metadata: &PhysicalMetadataFile,
1233 ) -> PhysicalFileHeader {
1234 PhysicalFileHeader {
1235 format_version: metadata.superblock.format_version,
1236 sequence: metadata.superblock.sequence,
1237 manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1238 manifest_root: metadata.superblock.manifest.newest.index,
1239 free_set_root: metadata.superblock.free_set.index,
1240 manifest_page: 0,
1241 manifest_checksum: 0,
1242 collection_roots_page: 0,
1243 collection_roots_checksum: 0,
1244 collection_root_count: metadata.superblock.collection_roots.len() as u32,
1245 snapshot_count: metadata.snapshots.len() as u32,
1246 index_count: metadata.indexes.len() as u32,
1247 catalog_collection_count: metadata.catalog.total_collections as u32,
1248 catalog_total_entities: metadata.catalog.total_entities as u64,
1249 export_count: metadata.exports.len() as u32,
1250 graph_projection_count: metadata.graph_projections.len() as u32,
1251 analytics_job_count: metadata.analytics_jobs.len() as u32,
1252 manifest_event_count: metadata.manifest_events.len() as u32,
1253 registry_page: 0,
1254 registry_checksum: 0,
1255 recovery_page: 0,
1256 recovery_checksum: 0,
1257 catalog_page: 0,
1258 catalog_checksum: 0,
1259 metadata_state_page: 0,
1260 metadata_state_checksum: 0,
1261 vector_artifact_page: 0,
1262 vector_artifact_checksum: 0,
1263 }
1264 }
1265
1266 fn recover_queue_pending_state(&self) {
1267 const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1268
1269 let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1270 return;
1271 };
1272
1273 let pending_rows = manager.query_all(|entity| {
1274 entity.data.as_row().is_some_and(|row| {
1275 matches!(
1276 row.get_field("kind"),
1277 Some(crate::storage::schema::Value::Text(kind)) if &**kind == "queue_pending"
1278 )
1279 })
1280 });
1281
1282 for row in pending_rows {
1283 let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1284 }
1285 }
1286}