Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_metadata.rs

1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5    pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
6        if self.options.read_only {
7            return Ok(());
8        }
9
10        // Export pruning is only meaningful for persistent mode where we
11        // have a metadata sidecar that tracks file-backed export artifacts.
12        if self.options.mode == StorageMode::Persistent {
13            let Some(path) = self.path() else {
14                return Ok(());
15            };
16
17            let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
18                return Ok(());
19            };
20
21            self.prune_export_registry(&mut metadata.exports);
22            metadata.save_for_data_path(path)?;
23        }
24
25        let _ = self.sweep_ttl_expired_entities()?;
26
27        Ok(())
28    }
29
30    fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
31        let now_ms = SystemTime::now()
32            .duration_since(UNIX_EPOCH)
33            .unwrap_or_default()
34            .as_millis()
35            .min(u128::from(u64::MAX)) as u64;
36
37        let mut to_delete = Vec::<(String, EntityId)>::new();
38
39        let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
40        to_delete.append(&mut absolute_expired);
41
42        let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
43        to_delete.append(&mut relative_expired);
44
45        to_delete.sort_unstable();
46        to_delete.dedup();
47
48        let mut deleted = 0usize;
49        for (collection, id) in to_delete {
50            match self.store.delete(&collection, id) {
51                Ok(true) => deleted = deleted.saturating_add(1),
52                Ok(false) => {}
53                Err(err) => {
54                    return Err(format!(
55                        "failed deleting expired entity {id} from collection '{collection}': {err:?}"
56                    )
57                    .into());
58                }
59            }
60        }
61
62        Ok(deleted)
63    }
64
65    fn expired_entities_by_expires_at(
66        &self,
67        now_ms: u64,
68    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
69        let mut ids = self.store.filter_metadata_all(&[(
70            "_expires_at".to_string(),
71            MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
72        )]);
73
74        if let Ok(now_ms_i64) = i64::try_from(now_ms) {
75            ids.extend(self.store.filter_metadata_all(&[(
76                "_expires_at".to_string(),
77                MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
78            )]));
79        }
80
81        let now_ms_f64 = now_ms as f64;
82        if now_ms_f64.is_finite() {
83            ids.extend(self.store.filter_metadata_all(&[(
84                "_expires_at".to_string(),
85                MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
86            )]));
87        }
88
89        Ok(ids)
90    }
91
92    fn expired_entities_by_ttl(
93        &self,
94        now_ms: u64,
95    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
96        let mut candidates = Vec::<(String, EntityId)>::new();
97
98        let ttl_ms_candidates = self
99            .store
100            .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
101        candidates.extend(ttl_ms_candidates);
102
103        let ttl_candidates = self
104            .store
105            .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
106        candidates.extend(ttl_candidates);
107
108        if candidates.is_empty() {
109            return Ok(Vec::new());
110        }
111
112        candidates.sort_unstable();
113        candidates.dedup();
114
115        let mut expired = Vec::<(String, EntityId)>::new();
116        for (collection, entity_id) in candidates {
117            let Some(entity) = self.store.get(&collection, entity_id) else {
118                continue;
119            };
120
121            let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
122                continue;
123            };
124
125            let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
126            let ttl_secs = if ttl_ms.is_none() {
127                metadata.get("_ttl").and_then(|value| {
128                    Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
129                })
130            } else {
131                None
132            };
133
134            let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
135                continue;
136            };
137
138            let created_at_ms = entity.created_at.saturating_mul(1000);
139            let expiry_ms = created_at_ms.saturating_add(ttl_ms);
140            if expiry_ms <= now_ms {
141                expired.push((collection, entity_id));
142            }
143        }
144
145        Ok(expired)
146    }
147
148    fn metadata_u64(value: &MetadataValue) -> Option<u64> {
149        match value {
150            MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
151            MetadataValue::Timestamp(v) => Some(*v),
152            MetadataValue::Float(v) => {
153                if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
154                    return None;
155                }
156                if *v > u64::MAX as f64 {
157                    return None;
158                }
159                Some(v.trunc() as u64)
160            }
161            MetadataValue::String(v) => v.parse::<u64>().ok(),
162            _ => None,
163        }
164    }
165
166    // ========================================================================
167    // Builder Methods - Create Entities
168    // ========================================================================
169
170    /// Start building a graph node
171    ///
172    /// # Example
173    /// ```ignore
174    /// let host = db.node("hosts", "Host")
175    ///     .property("ip", "192.168.1.1")
176    ///     .save()?;
177    /// ```
178    pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
179        NodeBuilder::new(
180            self.store.clone(),
181            self.preprocessors.clone(),
182            collection,
183            label,
184        )
185    }
186
187    /// Start building a graph edge
188    ///
189    /// # Example
190    /// ```ignore
191    /// let edge = db.edge("connections", "CONNECTS_TO")
192    ///     .from(host_a)
193    ///     .to(host_b)
194    ///     .weight(0.95)
195    ///     .property("protocol", "TCP")
196    ///     .save()?;
197    /// ```
198    pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
199        EdgeBuilder::new(
200            self.store.clone(),
201            self.preprocessors.clone(),
202            collection,
203            label,
204        )
205    }
206
207    /// Start building a vector entry
208    ///
209    /// # Example
210    /// ```ignore
211    /// let vec = db.vector("embeddings")
212    ///     .dense(embedding)
213    ///     .content("Original text content")
214    ///     .metadata("source", "document.pdf")
215    ///     .save()?;
216    /// ```
217    pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
218        VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
219    }
220
221    /// Start building a table row
222    ///
223    /// # Example
224    /// ```ignore
225    /// let row = db.row("scans", vec![
226    ///     ("timestamp", Value::Timestamp(now)),
227    ///     ("target", Value::text("192.168.1.0/24")),
228    ///     ("findings", Value::Integer(42)),
229    /// ]).save()?;
230    /// ```
231    pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
232        RowBuilder::new(
233            self.store.clone(),
234            self.preprocessors.clone(),
235            table,
236            columns,
237        )
238    }
239
240    /// Start building a document
241    ///
242    /// Documents are stored as enriched table rows with a full JSON body
243    /// field and flattened top-level keys for filtering.
244    ///
245    /// # Example
246    /// ```ignore
247    /// let doc = db.doc("articles")
248    ///     .field("title", "Hello World")
249    ///     .field("views", 42)
250    ///     .metadata("source", "web")
251    ///     .save()?;
252    /// ```
253    pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
254        DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
255    }
256
257    /// Start building a key-value pair
258    ///
259    /// KV pairs are stored as table rows with named fields `key` and `value`.
260    ///
261    /// # Example
262    /// ```ignore
263    /// let id = db.kv("config", "theme", Value::text("dark"))
264    ///     .metadata("updated_by", "admin")
265    ///     .save()?;
266    /// ```
267    pub fn kv(
268        &self,
269        collection: impl Into<String>,
270        key: impl Into<String>,
271        value: Value,
272    ) -> KvBuilder {
273        KvBuilder::new(
274            self.store.clone(),
275            self.preprocessors.clone(),
276            collection,
277            key,
278            value,
279        )
280    }
281
282    /// Get a key-value pair by key, returning the value and entity id
283    ///
284    /// Scans the collection for an entity whose named field `key` matches.
285    pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
286        let manager = self.store.get_collection(collection)?;
287        let entities = manager.query_all(|_| true);
288        for entity in entities {
289            if let EntityData::Row(ref row) = entity.data {
290                if let Some(ref named) = row.named {
291                    if let Some(Value::Text(ref k)) = named.get("key") {
292                        if &**k == key {
293                            let value = named.get("value").cloned().unwrap_or(Value::Null);
294                            return Some((value, entity.id));
295                        }
296                    }
297                }
298            }
299        }
300        None
301    }
302
303    /// Delete a key-value pair by key, returning whether it was found and removed
304    pub fn delete_kv(
305        &self,
306        collection: &str,
307        key: &str,
308    ) -> Result<bool, super::super::error::DevXError> {
309        let Some((_, id)) = self.get_kv(collection, key) else {
310            return Ok(false);
311        };
312        self.store
313            .delete(collection, id)
314            .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
315        Ok(true)
316    }
317
318    pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
319        if self.options.mode == StorageMode::Persistent && !self.options.read_only {
320            // Load metadata without persisting (avoids blocking catalog snapshot on boot)
321            let _ = self.load_or_bootstrap_physical_metadata(false);
322            // Skip repair on boot — deferred to first explicit persist_metadata() call.
323            // This avoids the recursive catalog_model_snapshot → physical_metadata loop
324            // that caused stack overflow / 12-second hang on startup.
325        }
326        self.load_collection_ttl_defaults_from_metadata();
327        self.recover_queue_pending_state();
328        Ok(self)
329    }
330
331    pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
332        if self.options.mode != StorageMode::Persistent || self.options.read_only {
333            return Ok(());
334        }
335        let Some(path) = self.path() else {
336            return Ok(());
337        };
338
339        let previous = self.load_or_bootstrap_physical_metadata(false).ok();
340        let collection_roots = self.physical_collection_roots();
341        let indexes = self
342            .native_physical_state()
343            .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
344            .unwrap_or_else(|| self.physical_index_state());
345        let mut metadata = PhysicalMetadataFile::from_state(
346            self.options.clone(),
347            self.catalog_snapshot(),
348            collection_roots,
349            indexes,
350            previous.as_ref(),
351        );
352        metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
353        metadata.save_for_data_path(path)?;
354        self.persist_native_physical_header(&metadata)?;
355        Ok(())
356    }
357
358    fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
359        if self.options.mode != StorageMode::Persistent || self.options.read_only {
360            return Ok(false);
361        }
362        let Some(path) = self.path() else {
363            return Ok(false);
364        };
365        let Some(native_state) = self.native_physical_state() else {
366            return Ok(false);
367        };
368        if !Self::native_state_is_bootstrap_complete(&native_state) {
369            return Ok(false);
370        }
371
372        let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
373        let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
374        metadata.save_for_data_path(path)?;
375        self.persist_native_physical_header(&metadata)?;
376        Ok(true)
377    }
378
379    /// Rebuild the external physical metadata view from the native state published in the
380    /// paged database file.
381    pub fn rebuild_physical_metadata_from_native_state(
382        &self,
383    ) -> Result<bool, Box<dyn std::error::Error>> {
384        self.bootstrap_metadata_from_native_state()
385    }
386
387    pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
388        let registry_complete = native_state.registry.as_ref().map(|registry| {
389            registry.collections_complete
390                && registry.indexes_complete
391                && registry.graph_projections_complete
392                && registry.analytics_jobs_complete
393                && registry.vector_artifacts_complete
394        });
395        let recovery_complete = native_state
396            .recovery
397            .as_ref()
398            .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
399        let catalog_complete = native_state
400            .catalog
401            .as_ref()
402            .map(|catalog| catalog.collections_complete);
403
404        registry_complete == Some(true)
405            && recovery_complete == Some(true)
406            && catalog_complete == Some(true)
407    }
408
409    pub(crate) fn load_or_bootstrap_physical_metadata(
410        &self,
411        persist_bootstrapped: bool,
412    ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
413        if self.options.mode != StorageMode::Persistent {
414            return Err("physical metadata requires persistent mode".into());
415        }
416        let Some(path) = self.path() else {
417            return Err("database path is not available".into());
418        };
419        let native_state = self.native_physical_state();
420
421        match PhysicalMetadataFile::load_for_data_path(path) {
422            Ok(metadata) => {
423                if let Some(native_state) = native_state.as_ref() {
424                    let inspection = Self::inspect_native_header_against_metadata(
425                        native_state.header,
426                        &metadata,
427                    );
428                    if Self::repair_policy_for_inspection(&inspection)
429                        == NativeHeaderRepairPolicy::NativeAheadOfMetadata
430                    {
431                        let bootstrapped =
432                            self.metadata_from_native_state(native_state, Some(&metadata));
433                        if persist_bootstrapped && !self.options.read_only {
434                            bootstrapped.save_for_data_path(path)?;
435                            self.persist_native_physical_header(&bootstrapped)?;
436                        }
437                        return Ok(bootstrapped);
438                    }
439                }
440                Ok(metadata)
441            }
442            Err(err) => {
443                let Some(native_state) = native_state else {
444                    return Err(err.into());
445                };
446                // Accept the bootstrap when the native state is either
447                // (a) fully populated and consistent (the original
448                // contract), or (b) trivially empty — a freshly created
449                // database with no collections written yet. Without (b)
450                // a brand-new data file can never reach
451                // `readiness_for_query = true`, because the bootstrap
452                // refuses to run until the registry/catalog/recovery
453                // structures are "complete", which they never become
454                // until the bootstrap has already run once.
455                //
456                // The emptiness check is conservative: header.sequence
457                // must still be at its initial value AND all three
458                // physical state summaries must be absent. Anything
459                // else falls through to the original error so we never
460                // paper over partially corrupted files.
461                let is_fresh_empty = native_state.header.sequence == 0
462                    && native_state.registry.is_none()
463                    && native_state.catalog.is_none()
464                    && native_state.recovery.is_none();
465                if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
466                    return Err(err.into());
467                }
468                let metadata = self.metadata_from_native_state(&native_state, None);
469                if persist_bootstrapped && !self.options.read_only {
470                    metadata.save_for_data_path(path)?;
471                    self.persist_native_physical_header(&metadata)?;
472                }
473                Ok(metadata)
474            }
475        }
476    }
477
478    pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
479        let path = self.path()?;
480        let native_state = self.native_physical_state();
481        let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
482
483        match (metadata, native_state) {
484            (Some(metadata), Some(native_state)) => {
485                let inspection =
486                    Self::inspect_native_header_against_metadata(native_state.header, &metadata);
487                match Self::repair_policy_for_inspection(&inspection) {
488                    NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
489                    NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
490                    NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
491                }
492            }
493            (Some(_), None) => Some("sidecar_only"),
494            (None, Some(_)) => Some("sidecar_missing_native_available"),
495            (None, None) => Some("sidecar_missing_no_native"),
496        }
497    }
498
499    fn metadata_from_native_state(
500        &self,
501        native_state: &NativePhysicalState,
502        previous: Option<&PhysicalMetadataFile>,
503    ) -> PhysicalMetadataFile {
504        let now = SystemTime::now()
505            .duration_since(UNIX_EPOCH)
506            .unwrap_or_default()
507            .as_millis();
508        let catalog = self.catalog_snapshot();
509        let catalog_name = catalog.name.clone();
510        let catalog_total_entities = catalog.total_entities;
511        let catalog_total_collections = catalog.total_collections;
512        let indexes = self.physical_index_state();
513
514        let mut manifest =
515            crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
516        manifest.updated_at_unix_ms = now;
517
518        let manifest_events = native_state
519            .manifest
520            .as_ref()
521            .map(|summary| {
522                summary
523                    .recent_events
524                    .iter()
525                    .map(|event| crate::physical::ManifestEvent {
526                        collection: event.collection.clone(),
527                        object_key: event.object_key.clone(),
528                        kind: match event.kind.as_str() {
529                            "insert" => crate::physical::ManifestEventKind::Insert,
530                            "update" => crate::physical::ManifestEventKind::Update,
531                            "remove" => crate::physical::ManifestEventKind::Remove,
532                            _ => crate::physical::ManifestEventKind::Checkpoint,
533                        },
534                        block: crate::physical::BlockReference {
535                            index: event.block_index,
536                            checksum: event.block_checksum,
537                        },
538                        snapshot_min: event.snapshot_min,
539                        snapshot_max: event.snapshot_max,
540                    })
541                    .collect()
542            })
543            .unwrap_or_default();
544
545        let graph_projections = native_state
546            .registry
547            .as_ref()
548            .and_then(|registry| {
549                registry.graph_projections_complete.then(|| {
550                    registry
551                        .graph_projections
552                        .iter()
553                        .map(|projection| crate::physical::PhysicalGraphProjection {
554                            name: projection.name.clone(),
555                            created_at_unix_ms: projection.created_at_unix_ms,
556                            updated_at_unix_ms: projection.updated_at_unix_ms,
557                            state: "materialized".to_string(),
558                            source: projection.source.clone(),
559                            node_labels: projection.node_labels.clone(),
560                            node_types: projection.node_types.clone(),
561                            edge_labels: projection.edge_labels.clone(),
562                            last_materialized_sequence: projection.last_materialized_sequence,
563                        })
564                        .collect()
565                })
566            })
567            .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
568            .unwrap_or_default();
569
570        let analytics_jobs = native_state
571            .registry
572            .as_ref()
573            .and_then(|registry| {
574                registry.analytics_jobs_complete.then(|| {
575                    registry
576                        .analytics_jobs
577                        .iter()
578                        .map(|job| crate::physical::PhysicalAnalyticsJob {
579                            id: job.id.clone(),
580                            kind: job.kind.clone(),
581                            state: job.state.clone(),
582                            projection: job.projection.clone(),
583                            created_at_unix_ms: job.created_at_unix_ms,
584                            updated_at_unix_ms: job.updated_at_unix_ms,
585                            last_run_sequence: job.last_run_sequence,
586                            metadata: job.metadata.clone(),
587                        })
588                        .collect()
589                })
590            })
591            .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
592            .unwrap_or_default();
593
594        let exports = native_state
595            .recovery
596            .as_ref()
597            .and_then(|recovery| {
598                recovery.exports_complete.then(|| {
599                    recovery
600                        .exports
601                        .iter()
602                        .map(|export| crate::physical::ExportDescriptor {
603                            name: export.name.clone(),
604                            created_at_unix_ms: export.created_at_unix_ms,
605                            snapshot_id: export.snapshot_id,
606                            superblock_sequence: export.superblock_sequence,
607                            data_path: self
608                                .path()
609                                .map(|path| {
610                                    crate::physical::PhysicalMetadataFile::export_data_path_for(
611                                        path,
612                                        &export.name,
613                                    )
614                                    .display()
615                                    .to_string()
616                                })
617                                .unwrap_or_default(),
618                            metadata_path: self
619                                .path()
620                                .map(|path| {
621                                    let export_data_path =
622                                        crate::physical::PhysicalMetadataFile::export_data_path_for(
623                                            path,
624                                            &export.name,
625                                        );
626                                    crate::physical::PhysicalMetadataFile::metadata_path_for(
627                                        &export_data_path,
628                                    )
629                                    .display()
630                                    .to_string()
631                                })
632                                .unwrap_or_default(),
633                            collection_count: export.collection_count as usize,
634                            total_entities: export.total_entities as usize,
635                        })
636                        .collect()
637                })
638            })
639            .or_else(|| previous.map(|metadata| metadata.exports.clone()))
640            .unwrap_or_default();
641
642        let snapshots = native_state
643            .recovery
644            .as_ref()
645            .and_then(|recovery| {
646                recovery.snapshots_complete.then(|| {
647                    recovery
648                        .snapshots
649                        .iter()
650                        .map(|snapshot| crate::physical::SnapshotDescriptor {
651                            snapshot_id: snapshot.snapshot_id,
652                            created_at_unix_ms: snapshot.created_at_unix_ms,
653                            superblock_sequence: snapshot.superblock_sequence,
654                            collection_count: snapshot.collection_count as usize,
655                            total_entities: snapshot.total_entities as usize,
656                        })
657                        .collect()
658                })
659            })
660            .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
661            .unwrap_or_else(|| {
662                vec![crate::physical::SnapshotDescriptor {
663                    snapshot_id: native_state.header.sequence,
664                    created_at_unix_ms: now,
665                    superblock_sequence: native_state.header.sequence,
666                    collection_count: catalog_total_collections,
667                    total_entities: catalog_total_entities,
668                }]
669            });
670
671        let catalog_stats = native_state
672            .catalog
673            .as_ref()
674            .and_then(|native_catalog| {
675                native_catalog.collections_complete.then(|| {
676                    native_catalog
677                        .collections
678                        .iter()
679                        .map(|collection| {
680                            (
681                                collection.name.clone(),
682                                crate::api::CollectionStats {
683                                    entities: collection.entities as usize,
684                                    cross_refs: collection.cross_refs as usize,
685                                    segments: collection.segments as usize,
686                                },
687                            )
688                        })
689                        .collect::<BTreeMap<_, _>>()
690                })
691            })
692            .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
693            .unwrap_or_else(|| catalog.stats_by_collection.clone());
694
695        PhysicalMetadataFile {
696            protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
697            generated_at_unix_ms: now,
698            last_loaded_from: Some("native_bootstrap".to_string()),
699            last_healed_at_unix_ms: Some(now),
700            manifest,
701            catalog: crate::api::CatalogSnapshot {
702                name: catalog_name,
703                total_entities: native_state
704                    .catalog
705                    .as_ref()
706                    .map(|summary| summary.total_entities as usize)
707                    .unwrap_or(catalog_total_entities),
708                total_collections: native_state
709                    .catalog
710                    .as_ref()
711                    .map(|summary| summary.collection_count as usize)
712                    .unwrap_or(catalog_total_collections),
713                stats_by_collection: catalog_stats,
714                updated_at: SystemTime::now(),
715            },
716            manifest_events,
717            collection_ttl_defaults_ms: previous
718                .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
719                .unwrap_or_default(),
720            collection_contracts: previous
721                .map(|metadata| metadata.collection_contracts.clone())
722                .unwrap_or_default(),
723            tree_definitions: previous
724                .map(|metadata| metadata.tree_definitions.clone())
725                .unwrap_or_default(),
726            indexes,
727            graph_projections,
728            analytics_jobs,
729            exports,
730            superblock: crate::physical::SuperblockHeader {
731                format_version: native_state.header.format_version,
732                sequence: native_state.header.sequence,
733                copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
734                manifest: crate::physical::ManifestPointers {
735                    oldest: crate::physical::BlockReference {
736                        index: native_state.header.manifest_oldest_root,
737                        checksum: 0,
738                    },
739                    newest: crate::physical::BlockReference {
740                        index: native_state.header.manifest_root,
741                        checksum: 0,
742                    },
743                },
744                free_set: crate::physical::BlockReference {
745                    index: native_state.header.free_set_root,
746                    checksum: 0,
747                },
748                collection_roots: native_state.collection_roots.clone(),
749            },
750            snapshots,
751        }
752    }
753
754    pub(crate) fn reconcile_index_states_with_native_artifacts(
755        &self,
756        mut indexes: Vec<PhysicalIndexState>,
757    ) -> Vec<PhysicalIndexState> {
758        let native_artifacts = self
759            .native_physical_state()
760            .and_then(|state| state.registry)
761            .map(|registry| registry.vector_artifacts)
762            .unwrap_or_default();
763        for index in &mut indexes {
764            let Some(collection) = index.collection.as_deref() else {
765                continue;
766            };
767            let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
768                continue;
769            };
770            let Some(artifact) = native_artifacts.iter().find(|artifact| {
771                artifact.collection == collection && artifact.artifact_kind == artifact_kind
772            }) else {
773                index.build_state = "metadata-only".to_string();
774                continue;
775            };
776            index.entries = artifact.vector_count as usize;
777            index.estimated_memory_bytes = artifact.serialized_bytes;
778            index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
779            index.artifact_kind = Some(artifact.artifact_kind.clone());
780            index.artifact_checksum = Some(artifact.checksum);
781            index.build_state = "artifact-published".to_string();
782            if let Some(pages) = self.native_vector_artifact_pages() {
783                index.artifact_root_page = pages
784                    .into_iter()
785                    .find(|page| {
786                        page.collection == artifact.collection
787                            && page.artifact_kind == artifact.artifact_kind
788                    })
789                    .map(|page| page.root_page);
790            }
791        }
792        indexes
793    }
794
795    pub(crate) fn warmup_native_vector_artifact_for_index(
796        &self,
797        index: &PhysicalIndexState,
798    ) -> Result<(), String> {
799        let Some(collection) = index.collection.as_deref() else {
800            return Ok(());
801        };
802        let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
803            return Ok(());
804        };
805        self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
806        Ok(())
807    }
808
809    pub(crate) fn apply_runtime_native_artifact_to_index_state(
810        &self,
811        index: &mut PhysicalIndexState,
812    ) -> Result<(), String> {
813        let Some(collection) = index.collection.as_deref() else {
814            return Ok(());
815        };
816        let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
817            return Ok(());
818        };
819        let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
820        index.entries = artifact
821            .graph_edge_count
822            .or(artifact.text_posting_count)
823            .unwrap_or(artifact.node_count) as usize;
824        index.estimated_memory_bytes = artifact.byte_len;
825        index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
826        index.artifact_kind = Some(artifact.artifact_kind.clone());
827        index.artifact_checksum = Some(artifact.checksum);
828        index.build_state = "ready".to_string();
829        index.artifact_root_page = self
830            .native_vector_artifact_pages()
831            .and_then(|pages| {
832                pages.into_iter().find(|page| {
833                    page.collection == artifact.collection
834                        && page.artifact_kind == artifact.artifact_kind
835                })
836            })
837            .map(|page| page.root_page);
838        Ok(())
839    }
840
841    pub(crate) fn physical_index_state_from_native_state(
842        &self,
843        native_state: &NativePhysicalState,
844        previous: Option<&PhysicalMetadataFile>,
845    ) -> Vec<PhysicalIndexState> {
846        let mut fresh = self.physical_index_state();
847        let Some(registry) = native_state.registry.as_ref() else {
848            if let Some(previous) = previous {
849                for index in &previous.indexes {
850                    if !fresh.iter().any(|candidate| candidate.name == index.name) {
851                        fresh.push(index.clone());
852                    }
853                }
854            }
855            return fresh;
856        };
857
858        for index in &mut fresh {
859            if let Some(native) = registry
860                .indexes
861                .iter()
862                .find(|candidate| candidate.name == index.name)
863            {
864                index.enabled = native.enabled;
865                index.last_refresh_ms = native.last_refresh_ms;
866                index.backend = native.backend.clone();
867                index.entries = native.entries as usize;
868                index.estimated_memory_bytes = native.estimated_memory_bytes;
869                if index.artifact_kind.is_none() {
870                    index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
871                        .map(|value| value.to_string());
872                }
873                if index.build_state == "catalog-derived" {
874                    index.build_state = "registry-loaded".to_string();
875                }
876            }
877        }
878
879        for native in &registry.indexes {
880            if fresh.iter().any(|index| index.name == native.name) {
881                continue;
882            }
883            let Some(kind) = Self::index_kind_from_str(&native.kind) else {
884                continue;
885            };
886            fresh.push(PhysicalIndexState {
887                name: native.name.clone(),
888                kind,
889                collection: native.collection.clone(),
890                enabled: native.enabled,
891                entries: native.entries as usize,
892                estimated_memory_bytes: native.estimated_memory_bytes,
893                last_refresh_ms: native.last_refresh_ms,
894                backend: native.backend.clone(),
895                artifact_kind: Self::native_artifact_kind_for_index(kind)
896                    .map(|value| value.to_string()),
897                artifact_root_page: None,
898                artifact_checksum: None,
899                build_state: "registry-loaded".to_string(),
900            });
901        }
902
903        if !registry.indexes_complete {
904            if let Some(previous) = previous {
905                for index in &previous.indexes {
906                    if !fresh.iter().any(|candidate| candidate.name == index.name) {
907                        fresh.push(index.clone());
908                    }
909                }
910            }
911        }
912
913        fresh
914    }
915
916    pub(crate) fn graph_projections_from_native_state(
917        &self,
918        native_state: &NativePhysicalState,
919    ) -> Vec<PhysicalGraphProjection> {
920        native_state
921            .registry
922            .as_ref()
923            .map(|registry| {
924                registry
925                    .graph_projections
926                    .iter()
927                    .map(|projection| PhysicalGraphProjection {
928                        name: projection.name.clone(),
929                        created_at_unix_ms: projection.created_at_unix_ms,
930                        updated_at_unix_ms: projection.updated_at_unix_ms,
931                        state: "materialized".to_string(),
932                        source: projection.source.clone(),
933                        node_labels: projection.node_labels.clone(),
934                        node_types: projection.node_types.clone(),
935                        edge_labels: projection.edge_labels.clone(),
936                        last_materialized_sequence: projection.last_materialized_sequence,
937                    })
938                    .collect()
939            })
940            .unwrap_or_default()
941    }
942
943    pub(crate) fn analytics_jobs_from_native_state(
944        &self,
945        native_state: &NativePhysicalState,
946    ) -> Vec<PhysicalAnalyticsJob> {
947        native_state
948            .registry
949            .as_ref()
950            .map(|registry| {
951                registry
952                    .analytics_jobs
953                    .iter()
954                    .map(|job| PhysicalAnalyticsJob {
955                        id: job.id.clone(),
956                        kind: job.kind.clone(),
957                        state: job.state.clone(),
958                        projection: job.projection.clone(),
959                        created_at_unix_ms: job.created_at_unix_ms,
960                        updated_at_unix_ms: job.updated_at_unix_ms,
961                        last_run_sequence: job.last_run_sequence,
962                        metadata: job.metadata.clone(),
963                    })
964                    .collect()
965            })
966            .unwrap_or_default()
967    }
968
969    pub(crate) fn exports_from_native_state(
970        &self,
971        native_state: &NativePhysicalState,
972    ) -> Vec<ExportDescriptor> {
973        native_state
974            .recovery
975            .as_ref()
976            .map(|recovery| {
977                recovery
978                    .exports
979                    .iter()
980                    .map(|export| ExportDescriptor {
981                        name: export.name.clone(),
982                        created_at_unix_ms: export.created_at_unix_ms,
983                        snapshot_id: export.snapshot_id,
984                        superblock_sequence: export.superblock_sequence,
985                        data_path: self
986                            .path()
987                            .map(|path| {
988                                crate::physical::PhysicalMetadataFile::export_data_path_for(
989                                    path,
990                                    &export.name,
991                                )
992                                .display()
993                                .to_string()
994                            })
995                            .unwrap_or_default(),
996                        metadata_path: self
997                            .path()
998                            .map(|path| {
999                                let export_data_path =
1000                                    crate::physical::PhysicalMetadataFile::export_data_path_for(
1001                                        path,
1002                                        &export.name,
1003                                    );
1004                                crate::physical::PhysicalMetadataFile::metadata_path_for(
1005                                    &export_data_path,
1006                                )
1007                                .display()
1008                                .to_string()
1009                            })
1010                            .unwrap_or_default(),
1011                        collection_count: export.collection_count as usize,
1012                        total_entities: export.total_entities as usize,
1013                    })
1014                    .collect()
1015            })
1016            .unwrap_or_default()
1017    }
1018
1019    pub(crate) fn snapshots_from_native_state(
1020        &self,
1021        native_state: &NativePhysicalState,
1022    ) -> Vec<crate::physical::SnapshotDescriptor> {
1023        native_state
1024            .recovery
1025            .as_ref()
1026            .map(|recovery| {
1027                recovery
1028                    .snapshots
1029                    .iter()
1030                    .map(|snapshot| crate::physical::SnapshotDescriptor {
1031                        snapshot_id: snapshot.snapshot_id,
1032                        created_at_unix_ms: snapshot.created_at_unix_ms,
1033                        superblock_sequence: snapshot.superblock_sequence,
1034                        collection_count: snapshot.collection_count as usize,
1035                        total_entities: snapshot.total_entities as usize,
1036                    })
1037                    .collect()
1038            })
1039            .unwrap_or_default()
1040    }
1041
1042    fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1043        match value {
1044            "btree" => Some(crate::index::IndexKind::BTree),
1045            "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1046            "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1047            "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1048            "text.fulltext" => Some(crate::index::IndexKind::FullText),
1049            "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1050            "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1051            _ => None,
1052        }
1053    }
1054
1055    pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1056        match kind {
1057            IndexKind::VectorHnsw => Some("hnsw"),
1058            IndexKind::VectorInverted => Some("ivf"),
1059            IndexKind::GraphAdjacency => Some("graph.adjacency"),
1060            IndexKind::FullText => Some("text.fulltext"),
1061            IndexKind::DocumentPathValue => Some("document.pathvalue"),
1062            _ => None,
1063        }
1064    }
1065
1066    fn index_is_declared(&self, name: &str) -> bool {
1067        self.physical_metadata()
1068            .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1069            .unwrap_or(false)
1070    }
1071
1072    pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1073        self.physical_metadata()
1074            .map(|metadata| {
1075                metadata
1076                    .graph_projections
1077                    .iter()
1078                    .any(|projection| projection.name == name)
1079            })
1080            .unwrap_or(false)
1081    }
1082
1083    pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1084        self.operational_graph_projections()
1085            .into_iter()
1086            .any(|projection| projection.name == name && projection.state == "materialized")
1087    }
1088
1089    pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1090        match projection {
1091            Some(projection) => format!("{kind}::{projection}"),
1092            None => format!("{kind}::global"),
1093        }
1094    }
1095
1096    pub(crate) fn update_physical_metadata<T, F>(
1097        &self,
1098        mutator: F,
1099    ) -> Result<T, Box<dyn std::error::Error>>
1100    where
1101        F: FnOnce(&mut PhysicalMetadataFile) -> T,
1102    {
1103        if self.options.mode != StorageMode::Persistent {
1104            return Err("physical metadata operations require persistent mode".into());
1105        }
1106        if self.options.read_only {
1107            return Err("physical metadata operations are not allowed in read-only mode".into());
1108        }
1109        let Some(path) = self.path() else {
1110            return Err("database path is not available".into());
1111        };
1112
1113        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1114
1115        if metadata.indexes.is_empty() {
1116            metadata.indexes = self.physical_index_state();
1117        }
1118        metadata.superblock.collection_roots = self.physical_collection_roots();
1119
1120        let result = mutator(&mut metadata);
1121        metadata.save_for_data_path(path)?;
1122        self.persist_native_physical_header(&metadata)?;
1123        Ok(result)
1124    }
1125
1126    pub(crate) fn persist_native_physical_header(
1127        &self,
1128        metadata: &PhysicalMetadataFile,
1129    ) -> Result<(), Box<dyn std::error::Error>> {
1130        if !self.paged_mode {
1131            return Ok(());
1132        }
1133
1134        let existing_page = self
1135            .store
1136            .physical_file_header()
1137            .map(|header| header.collection_roots_page)
1138            .filter(|page| *page != 0);
1139        let existing_registry_page = self
1140            .store
1141            .physical_file_header()
1142            .map(|header| header.registry_page)
1143            .filter(|page| *page != 0);
1144        let existing_recovery_page = self
1145            .store
1146            .physical_file_header()
1147            .map(|header| header.recovery_page)
1148            .filter(|page| *page != 0);
1149        let existing_catalog_page = self
1150            .store
1151            .physical_file_header()
1152            .map(|header| header.catalog_page)
1153            .filter(|page| *page != 0);
1154        let existing_metadata_state_page = self
1155            .store
1156            .physical_file_header()
1157            .map(|header| header.metadata_state_page)
1158            .filter(|page| *page != 0);
1159        let existing_vector_artifact_page = self
1160            .store
1161            .physical_file_header()
1162            .map(|header| header.vector_artifact_page)
1163            .filter(|page| *page != 0);
1164        let existing_manifest_page = self
1165            .store
1166            .physical_file_header()
1167            .map(|header| header.manifest_page)
1168            .filter(|page| *page != 0);
1169        let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1170            metadata.superblock.sequence,
1171            &metadata.manifest_events,
1172            existing_manifest_page,
1173        )?;
1174        let (collection_roots_page, collection_roots_checksum) = self
1175            .store
1176            .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1177        let registry_summary = self.native_registry_summary_from_metadata(metadata);
1178        let (registry_page, registry_checksum) = self
1179            .store
1180            .write_native_registry_summary(&registry_summary, existing_registry_page)?;
1181        let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1182        let (recovery_page, recovery_checksum) = self
1183            .store
1184            .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1185        let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1186        let (catalog_page, catalog_checksum) = self
1187            .store
1188            .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1189        let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1190        let (metadata_state_page, metadata_state_checksum) =
1191            self.store.write_native_metadata_state_summary(
1192                &metadata_state_summary,
1193                existing_metadata_state_page,
1194            )?;
1195        let vector_artifact_records = self.native_vector_artifact_records();
1196        let vector_artifact_payloads = vector_artifact_records
1197            .iter()
1198            .map(|(summary, bytes)| {
1199                (
1200                    summary.collection.clone(),
1201                    summary.artifact_kind.clone(),
1202                    bytes.clone(),
1203                )
1204            })
1205            .collect::<Vec<_>>();
1206        let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1207            self.store.write_native_vector_artifact_store(
1208                &vector_artifact_payloads,
1209                existing_vector_artifact_page,
1210            )?;
1211        let mut header = Self::native_header_from_metadata(metadata);
1212        header.manifest_page = manifest_page;
1213        header.manifest_checksum = manifest_checksum;
1214        header.collection_roots_page = collection_roots_page;
1215        header.collection_roots_checksum = collection_roots_checksum;
1216        header.registry_page = registry_page;
1217        header.registry_checksum = registry_checksum;
1218        header.recovery_page = recovery_page;
1219        header.recovery_checksum = recovery_checksum;
1220        header.catalog_page = catalog_page;
1221        header.catalog_checksum = catalog_checksum;
1222        header.metadata_state_page = metadata_state_page;
1223        header.metadata_state_checksum = metadata_state_checksum;
1224        header.vector_artifact_page = vector_artifact_page;
1225        header.vector_artifact_checksum = vector_artifact_checksum;
1226        self.store.update_physical_file_header(header)?;
1227        self.store.persist()?;
1228        Ok(())
1229    }
1230
1231    pub(crate) fn native_header_from_metadata(
1232        metadata: &PhysicalMetadataFile,
1233    ) -> PhysicalFileHeader {
1234        PhysicalFileHeader {
1235            format_version: metadata.superblock.format_version,
1236            sequence: metadata.superblock.sequence,
1237            manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1238            manifest_root: metadata.superblock.manifest.newest.index,
1239            free_set_root: metadata.superblock.free_set.index,
1240            manifest_page: 0,
1241            manifest_checksum: 0,
1242            collection_roots_page: 0,
1243            collection_roots_checksum: 0,
1244            collection_root_count: metadata.superblock.collection_roots.len() as u32,
1245            snapshot_count: metadata.snapshots.len() as u32,
1246            index_count: metadata.indexes.len() as u32,
1247            catalog_collection_count: metadata.catalog.total_collections as u32,
1248            catalog_total_entities: metadata.catalog.total_entities as u64,
1249            export_count: metadata.exports.len() as u32,
1250            graph_projection_count: metadata.graph_projections.len() as u32,
1251            analytics_job_count: metadata.analytics_jobs.len() as u32,
1252            manifest_event_count: metadata.manifest_events.len() as u32,
1253            registry_page: 0,
1254            registry_checksum: 0,
1255            recovery_page: 0,
1256            recovery_checksum: 0,
1257            catalog_page: 0,
1258            catalog_checksum: 0,
1259            metadata_state_page: 0,
1260            metadata_state_checksum: 0,
1261            vector_artifact_page: 0,
1262            vector_artifact_checksum: 0,
1263        }
1264    }
1265
1266    fn recover_queue_pending_state(&self) {
1267        const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1268
1269        let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1270            return;
1271        };
1272
1273        let pending_rows = manager.query_all(|entity| {
1274            entity.data.as_row().is_some_and(|row| {
1275                matches!(
1276                    row.get_field("kind"),
1277                    Some(crate::storage::schema::Value::Text(kind)) if &**kind == "queue_pending"
1278                )
1279            })
1280        });
1281
1282        for row in pending_rows {
1283            let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1284        }
1285    }
1286}