Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_metadata.rs

1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5    pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
6        if self.options.read_only {
7            return Ok(());
8        }
9
10        // Export pruning is only meaningful for persistent mode where we
11        // have a metadata sidecar that tracks file-backed export artifacts.
12        if self.options.mode == StorageMode::Persistent {
13            let Some(path) = self.path() else {
14                return Ok(());
15            };
16
17            let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
18                return Ok(());
19            };
20
21            self.prune_export_registry(&mut metadata.exports);
22            metadata.save_for_data_path(path)?;
23        }
24
25        let _ = self.sweep_ttl_expired_entities()?;
26
27        Ok(())
28    }
29
30    fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
31        let now_ms = SystemTime::now()
32            .duration_since(UNIX_EPOCH)
33            .unwrap_or_default()
34            .as_millis()
35            .min(u128::from(u64::MAX)) as u64;
36
37        let mut to_delete = Vec::<(String, EntityId)>::new();
38
39        let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
40        to_delete.append(&mut absolute_expired);
41
42        let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
43        to_delete.append(&mut relative_expired);
44
45        to_delete.sort_unstable();
46        to_delete.dedup();
47
48        let mut deleted = 0usize;
49        for (collection, id) in to_delete {
50            match self.store.delete(&collection, id) {
51                Ok(true) => deleted = deleted.saturating_add(1),
52                Ok(false) => {}
53                Err(err) => {
54                    return Err(format!(
55                        "failed deleting expired entity {id} from collection '{collection}': {err:?}"
56                    )
57                    .into());
58                }
59            }
60        }
61
62        Ok(deleted)
63    }
64
65    fn expired_entities_by_expires_at(
66        &self,
67        now_ms: u64,
68    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
69        let mut ids = self.store.filter_metadata_all(&[(
70            "_expires_at".to_string(),
71            MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
72        )]);
73
74        if let Ok(now_ms_i64) = i64::try_from(now_ms) {
75            ids.extend(self.store.filter_metadata_all(&[(
76                "_expires_at".to_string(),
77                MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
78            )]));
79        }
80
81        let now_ms_f64 = now_ms as f64;
82        if now_ms_f64.is_finite() {
83            ids.extend(self.store.filter_metadata_all(&[(
84                "_expires_at".to_string(),
85                MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
86            )]));
87        }
88
89        Ok(ids)
90    }
91
92    fn expired_entities_by_ttl(
93        &self,
94        now_ms: u64,
95    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
96        let mut candidates = Vec::<(String, EntityId)>::new();
97
98        let ttl_ms_candidates = self
99            .store
100            .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
101        candidates.extend(ttl_ms_candidates);
102
103        let ttl_candidates = self
104            .store
105            .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
106        candidates.extend(ttl_candidates);
107
108        if candidates.is_empty() {
109            return Ok(Vec::new());
110        }
111
112        candidates.sort_unstable();
113        candidates.dedup();
114
115        let mut expired = Vec::<(String, EntityId)>::new();
116        for (collection, entity_id) in candidates {
117            let Some(entity) = self.store.get(&collection, entity_id) else {
118                continue;
119            };
120
121            let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
122                continue;
123            };
124
125            let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
126            let ttl_secs = if ttl_ms.is_none() {
127                metadata.get("_ttl").and_then(|value| {
128                    Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
129                })
130            } else {
131                None
132            };
133
134            let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
135                continue;
136            };
137
138            let created_at_ms = entity.created_at.saturating_mul(1000);
139            let expiry_ms = created_at_ms.saturating_add(ttl_ms);
140            if expiry_ms <= now_ms {
141                expired.push((collection, entity_id));
142            }
143        }
144
145        Ok(expired)
146    }
147
148    fn metadata_u64(value: &MetadataValue) -> Option<u64> {
149        match value {
150            MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
151            MetadataValue::Timestamp(v) => Some(*v),
152            MetadataValue::Float(v) => {
153                if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
154                    return None;
155                }
156                if *v > u64::MAX as f64 {
157                    return None;
158                }
159                Some(v.trunc() as u64)
160            }
161            MetadataValue::String(v) => v.parse::<u64>().ok(),
162            _ => None,
163        }
164    }
165
166    // ========================================================================
167    // Builder Methods - Create Entities
168    // ========================================================================
169
170    /// Start building a graph node
171    ///
172    /// # Example
173    /// ```ignore
174    /// let host = db.node("hosts", "Host")
175    ///     .property("ip", "192.168.1.1")
176    ///     .save()?;
177    /// ```
178    pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
179        NodeBuilder::new(
180            self.store.clone(),
181            self.preprocessors.clone(),
182            collection,
183            label,
184        )
185    }
186
187    /// Start building a graph edge
188    ///
189    /// # Example
190    /// ```ignore
191    /// let edge = db.edge("connections", "CONNECTS_TO")
192    ///     .from(host_a)
193    ///     .to(host_b)
194    ///     .weight(0.95)
195    ///     .property("protocol", "TCP")
196    ///     .save()?;
197    /// ```
198    pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
199        EdgeBuilder::new(
200            self.store.clone(),
201            self.preprocessors.clone(),
202            collection,
203            label,
204        )
205    }
206
207    /// Start building a vector entry
208    ///
209    /// # Example
210    /// ```ignore
211    /// let vec = db.vector("embeddings")
212    ///     .dense(embedding)
213    ///     .content("Original text content")
214    ///     .metadata("source", "document.pdf")
215    ///     .save()?;
216    /// ```
217    pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
218        VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
219    }
220
221    /// Start building a table row
222    ///
223    /// # Example
224    /// ```ignore
225    /// let row = db.row("scans", vec![
226    ///     ("timestamp", Value::Timestamp(now)),
227    ///     ("target", Value::text("192.168.1.0/24")),
228    ///     ("findings", Value::Integer(42)),
229    /// ]).save()?;
230    /// ```
231    pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
232        RowBuilder::new(
233            self.store.clone(),
234            self.preprocessors.clone(),
235            table,
236            columns,
237        )
238    }
239
240    /// Start building a document
241    ///
242    /// Documents are stored as enriched table rows with a full JSON body
243    /// field and flattened top-level keys for filtering.
244    ///
245    /// # Example
246    /// ```ignore
247    /// let doc = db.doc("articles")
248    ///     .field("title", "Hello World")
249    ///     .field("views", 42)
250    ///     .metadata("source", "web")
251    ///     .save()?;
252    /// ```
253    pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
254        DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
255    }
256
257    /// Start building a key-value pair
258    ///
259    /// KV pairs are stored as table rows with named fields `key` and `value`.
260    ///
261    /// # Example
262    /// ```ignore
263    /// let id = db.kv("config", "theme", Value::text("dark"))
264    ///     .metadata("updated_by", "admin")
265    ///     .save()?;
266    /// ```
267    pub fn kv(
268        &self,
269        collection: impl Into<String>,
270        key: impl Into<String>,
271        value: Value,
272    ) -> KvBuilder {
273        KvBuilder::new(
274            self.store.clone(),
275            self.preprocessors.clone(),
276            collection,
277            key,
278            value,
279        )
280    }
281
282    /// Get a key-value pair by key, returning the value and entity id
283    ///
284    /// Scans the collection for an entity whose named field `key` matches.
285    pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
286        let manager = self.store.get_collection(collection)?;
287        let entities = manager.query_all(|_| true);
288        for entity in entities {
289            if let EntityData::Row(ref row) = entity.data {
290                if let Some(ref named) = row.named {
291                    if let Some(Value::Text(ref k)) = named.get("key") {
292                        if &**k == key {
293                            let value = named.get("value").cloned().unwrap_or(Value::Null);
294                            return Some((value, entity.id));
295                        }
296                    }
297                }
298            }
299        }
300        None
301    }
302
303    /// Delete a key-value pair by key, returning whether it was found and removed
304    pub fn delete_kv(
305        &self,
306        collection: &str,
307        key: &str,
308    ) -> Result<bool, super::super::error::DevXError> {
309        let Some((_, id)) = self.get_kv(collection, key) else {
310            return Ok(false);
311        };
312        self.store
313            .delete(collection, id)
314            .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
315        Ok(true)
316    }
317
318    pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
319        if self.options.mode == StorageMode::Persistent {
320            // Load metadata without persisting (avoids blocking catalog snapshot on boot)
321            if let Ok(metadata) = self.load_or_bootstrap_physical_metadata(false) {
322                crate::reserved_fields::validate_physical_metadata_contracts(&metadata)
323                    .map_err(|err| err.to_string())?;
324            }
325            // Skip repair on boot — deferred to first explicit persist_metadata() call.
326            // This avoids the recursive catalog_model_snapshot → physical_metadata loop
327            // that caused stack overflow / 12-second hang on startup.
328        }
329        self.load_collection_ttl_defaults_from_metadata();
330        self.recover_queue_pending_state();
331        Ok(self)
332    }
333
334    pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
335        if self.options.mode != StorageMode::Persistent || self.options.read_only {
336            return Ok(());
337        }
338        let Some(path) = self.path() else {
339            return Ok(());
340        };
341
342        let previous = self.load_or_bootstrap_physical_metadata(false).ok();
343        let collection_roots = self.physical_collection_roots();
344        let indexes = self
345            .native_physical_state()
346            .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
347            .unwrap_or_else(|| self.physical_index_state());
348        let mut metadata = PhysicalMetadataFile::from_state(
349            self.options.clone(),
350            self.catalog_snapshot(),
351            collection_roots,
352            indexes,
353            previous.as_ref(),
354        );
355        metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
356        metadata.save_for_data_path(path)?;
357        self.persist_native_physical_header(&metadata)?;
358        Ok(())
359    }
360
361    fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
362        if self.options.mode != StorageMode::Persistent || self.options.read_only {
363            return Ok(false);
364        }
365        let Some(path) = self.path() else {
366            return Ok(false);
367        };
368        let Some(native_state) = self.native_physical_state() else {
369            return Ok(false);
370        };
371        if !Self::native_state_is_bootstrap_complete(&native_state) {
372            return Ok(false);
373        }
374
375        let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
376        let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
377        metadata.save_for_data_path(path)?;
378        self.persist_native_physical_header(&metadata)?;
379        Ok(true)
380    }
381
382    /// Rebuild the external physical metadata view from the native state published in the
383    /// paged database file.
384    pub fn rebuild_physical_metadata_from_native_state(
385        &self,
386    ) -> Result<bool, Box<dyn std::error::Error>> {
387        self.bootstrap_metadata_from_native_state()
388    }
389
390    pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
391        let registry_complete = native_state.registry.as_ref().map(|registry| {
392            registry.collections_complete
393                && registry.indexes_complete
394                && registry.graph_projections_complete
395                && registry.analytics_jobs_complete
396                && registry.vector_artifacts_complete
397        });
398        let recovery_complete = native_state
399            .recovery
400            .as_ref()
401            .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
402        let catalog_complete = native_state
403            .catalog
404            .as_ref()
405            .map(|catalog| catalog.collections_complete);
406
407        registry_complete == Some(true)
408            && recovery_complete == Some(true)
409            && catalog_complete == Some(true)
410    }
411
412    pub(crate) fn load_or_bootstrap_physical_metadata(
413        &self,
414        persist_bootstrapped: bool,
415    ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
416        if self.options.mode != StorageMode::Persistent {
417            return Err("physical metadata requires persistent mode".into());
418        }
419        let Some(path) = self.path() else {
420            return Err("database path is not available".into());
421        };
422        let native_state = self.native_physical_state();
423
424        match PhysicalMetadataFile::load_for_data_path(path) {
425            Ok(metadata) => {
426                if let Some(native_state) = native_state.as_ref() {
427                    let inspection = Self::inspect_native_header_against_metadata(
428                        native_state.header,
429                        &metadata,
430                    );
431                    if Self::repair_policy_for_inspection(&inspection)
432                        == NativeHeaderRepairPolicy::NativeAheadOfMetadata
433                    {
434                        let bootstrapped =
435                            self.metadata_from_native_state(native_state, Some(&metadata));
436                        if persist_bootstrapped && !self.options.read_only {
437                            bootstrapped.save_for_data_path(path)?;
438                            self.persist_native_physical_header(&bootstrapped)?;
439                        }
440                        return Ok(bootstrapped);
441                    }
442                }
443                Ok(metadata)
444            }
445            Err(err) => {
446                let Some(native_state) = native_state else {
447                    return Err(err.into());
448                };
449                // Accept the bootstrap when the native state is either
450                // (a) fully populated and consistent (the original
451                // contract), or (b) trivially empty — a freshly created
452                // database with no collections written yet. Without (b)
453                // a brand-new data file can never reach
454                // `readiness_for_query = true`, because the bootstrap
455                // refuses to run until the registry/catalog/recovery
456                // structures are "complete", which they never become
457                // until the bootstrap has already run once.
458                //
459                // The emptiness check is conservative: header.sequence
460                // must still be at its initial value AND all three
461                // physical state summaries must be absent. Anything
462                // else falls through to the original error so we never
463                // paper over partially corrupted files.
464                let is_fresh_empty = native_state.header.sequence == 0
465                    && native_state.registry.is_none()
466                    && native_state.catalog.is_none()
467                    && native_state.recovery.is_none();
468                if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
469                    return Err(err.into());
470                }
471                let metadata = self.metadata_from_native_state(&native_state, None);
472                if persist_bootstrapped && !self.options.read_only {
473                    metadata.save_for_data_path(path)?;
474                    self.persist_native_physical_header(&metadata)?;
475                }
476                Ok(metadata)
477            }
478        }
479    }
480
481    pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
482        let path = self.path()?;
483        let native_state = self.native_physical_state();
484        let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
485
486        match (metadata, native_state) {
487            (Some(metadata), Some(native_state)) => {
488                let inspection =
489                    Self::inspect_native_header_against_metadata(native_state.header, &metadata);
490                match Self::repair_policy_for_inspection(&inspection) {
491                    NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
492                    NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
493                    NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
494                }
495            }
496            (Some(_), None) => Some("sidecar_only"),
497            (None, Some(_)) => Some("sidecar_missing_native_available"),
498            (None, None) => Some("sidecar_missing_no_native"),
499        }
500    }
501
502    fn metadata_from_native_state(
503        &self,
504        native_state: &NativePhysicalState,
505        previous: Option<&PhysicalMetadataFile>,
506    ) -> PhysicalMetadataFile {
507        let now = SystemTime::now()
508            .duration_since(UNIX_EPOCH)
509            .unwrap_or_default()
510            .as_millis();
511        let catalog = self.catalog_snapshot();
512        let catalog_name = catalog.name.clone();
513        let catalog_total_entities = catalog.total_entities;
514        let catalog_total_collections = catalog.total_collections;
515        let indexes = self.physical_index_state();
516
517        let mut manifest =
518            crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
519        manifest.updated_at_unix_ms = now;
520
521        let manifest_events = native_state
522            .manifest
523            .as_ref()
524            .map(|summary| {
525                summary
526                    .recent_events
527                    .iter()
528                    .map(|event| crate::physical::ManifestEvent {
529                        collection: event.collection.clone(),
530                        object_key: event.object_key.clone(),
531                        kind: match event.kind.as_str() {
532                            "insert" => crate::physical::ManifestEventKind::Insert,
533                            "update" => crate::physical::ManifestEventKind::Update,
534                            "remove" => crate::physical::ManifestEventKind::Remove,
535                            _ => crate::physical::ManifestEventKind::Checkpoint,
536                        },
537                        block: crate::physical::BlockReference {
538                            index: event.block_index,
539                            checksum: event.block_checksum,
540                        },
541                        snapshot_min: event.snapshot_min,
542                        snapshot_max: event.snapshot_max,
543                    })
544                    .collect()
545            })
546            .unwrap_or_default();
547
548        let graph_projections = native_state
549            .registry
550            .as_ref()
551            .and_then(|registry| {
552                registry.graph_projections_complete.then(|| {
553                    registry
554                        .graph_projections
555                        .iter()
556                        .map(|projection| crate::physical::PhysicalGraphProjection {
557                            name: projection.name.clone(),
558                            created_at_unix_ms: projection.created_at_unix_ms,
559                            updated_at_unix_ms: projection.updated_at_unix_ms,
560                            state: "materialized".to_string(),
561                            source: projection.source.clone(),
562                            node_labels: projection.node_labels.clone(),
563                            node_types: projection.node_types.clone(),
564                            edge_labels: projection.edge_labels.clone(),
565                            last_materialized_sequence: projection.last_materialized_sequence,
566                        })
567                        .collect()
568                })
569            })
570            .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
571            .unwrap_or_default();
572
573        let analytics_jobs = native_state
574            .registry
575            .as_ref()
576            .and_then(|registry| {
577                registry.analytics_jobs_complete.then(|| {
578                    registry
579                        .analytics_jobs
580                        .iter()
581                        .map(|job| crate::physical::PhysicalAnalyticsJob {
582                            id: job.id.clone(),
583                            kind: job.kind.clone(),
584                            state: job.state.clone(),
585                            projection: job.projection.clone(),
586                            created_at_unix_ms: job.created_at_unix_ms,
587                            updated_at_unix_ms: job.updated_at_unix_ms,
588                            last_run_sequence: job.last_run_sequence,
589                            metadata: job.metadata.clone(),
590                        })
591                        .collect()
592                })
593            })
594            .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
595            .unwrap_or_default();
596
597        let exports = native_state
598            .recovery
599            .as_ref()
600            .and_then(|recovery| {
601                recovery.exports_complete.then(|| {
602                    recovery
603                        .exports
604                        .iter()
605                        .map(|export| crate::physical::ExportDescriptor {
606                            name: export.name.clone(),
607                            created_at_unix_ms: export.created_at_unix_ms,
608                            snapshot_id: export.snapshot_id,
609                            superblock_sequence: export.superblock_sequence,
610                            data_path: self
611                                .path()
612                                .map(|path| {
613                                    crate::physical::PhysicalMetadataFile::export_data_path_for(
614                                        path,
615                                        &export.name,
616                                    )
617                                    .display()
618                                    .to_string()
619                                })
620                                .unwrap_or_default(),
621                            metadata_path: self
622                                .path()
623                                .map(|path| {
624                                    let export_data_path =
625                                        crate::physical::PhysicalMetadataFile::export_data_path_for(
626                                            path,
627                                            &export.name,
628                                        );
629                                    crate::physical::PhysicalMetadataFile::metadata_path_for(
630                                        &export_data_path,
631                                    )
632                                    .display()
633                                    .to_string()
634                                })
635                                .unwrap_or_default(),
636                            collection_count: export.collection_count as usize,
637                            total_entities: export.total_entities as usize,
638                        })
639                        .collect()
640                })
641            })
642            .or_else(|| previous.map(|metadata| metadata.exports.clone()))
643            .unwrap_or_default();
644
645        let snapshots = native_state
646            .recovery
647            .as_ref()
648            .and_then(|recovery| {
649                recovery.snapshots_complete.then(|| {
650                    recovery
651                        .snapshots
652                        .iter()
653                        .map(|snapshot| crate::physical::SnapshotDescriptor {
654                            snapshot_id: snapshot.snapshot_id,
655                            created_at_unix_ms: snapshot.created_at_unix_ms,
656                            superblock_sequence: snapshot.superblock_sequence,
657                            collection_count: snapshot.collection_count as usize,
658                            total_entities: snapshot.total_entities as usize,
659                        })
660                        .collect()
661                })
662            })
663            .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
664            .unwrap_or_else(|| {
665                vec![crate::physical::SnapshotDescriptor {
666                    snapshot_id: native_state.header.sequence,
667                    created_at_unix_ms: now,
668                    superblock_sequence: native_state.header.sequence,
669                    collection_count: catalog_total_collections,
670                    total_entities: catalog_total_entities,
671                }]
672            });
673
674        let catalog_stats = native_state
675            .catalog
676            .as_ref()
677            .and_then(|native_catalog| {
678                native_catalog.collections_complete.then(|| {
679                    native_catalog
680                        .collections
681                        .iter()
682                        .map(|collection| {
683                            (
684                                collection.name.clone(),
685                                crate::api::CollectionStats {
686                                    entities: collection.entities as usize,
687                                    cross_refs: collection.cross_refs as usize,
688                                    segments: collection.segments as usize,
689                                },
690                            )
691                        })
692                        .collect::<BTreeMap<_, _>>()
693                })
694            })
695            .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
696            .unwrap_or_else(|| catalog.stats_by_collection.clone());
697
698        PhysicalMetadataFile {
699            protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
700            generated_at_unix_ms: now,
701            last_loaded_from: Some("native_bootstrap".to_string()),
702            last_healed_at_unix_ms: Some(now),
703            manifest,
704            catalog: crate::api::CatalogSnapshot {
705                name: catalog_name,
706                total_entities: native_state
707                    .catalog
708                    .as_ref()
709                    .map(|summary| summary.total_entities as usize)
710                    .unwrap_or(catalog_total_entities),
711                total_collections: native_state
712                    .catalog
713                    .as_ref()
714                    .map(|summary| summary.collection_count as usize)
715                    .unwrap_or(catalog_total_collections),
716                stats_by_collection: catalog_stats,
717                updated_at: SystemTime::now(),
718            },
719            manifest_events,
720            collection_ttl_defaults_ms: previous
721                .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
722                .unwrap_or_default(),
723            collection_contracts: previous
724                .map(|metadata| metadata.collection_contracts.clone())
725                .unwrap_or_default(),
726            tree_definitions: previous
727                .map(|metadata| metadata.tree_definitions.clone())
728                .unwrap_or_default(),
729            indexes,
730            graph_projections,
731            analytics_jobs,
732            exports,
733            superblock: crate::physical::SuperblockHeader {
734                format_version: native_state.header.format_version,
735                sequence: native_state.header.sequence,
736                copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
737                manifest: crate::physical::ManifestPointers {
738                    oldest: crate::physical::BlockReference {
739                        index: native_state.header.manifest_oldest_root,
740                        checksum: 0,
741                    },
742                    newest: crate::physical::BlockReference {
743                        index: native_state.header.manifest_root,
744                        checksum: 0,
745                    },
746                },
747                free_set: crate::physical::BlockReference {
748                    index: native_state.header.free_set_root,
749                    checksum: 0,
750                },
751                collection_roots: native_state.collection_roots.clone(),
752            },
753            snapshots,
754        }
755    }
756
757    pub(crate) fn reconcile_index_states_with_native_artifacts(
758        &self,
759        mut indexes: Vec<PhysicalIndexState>,
760    ) -> Vec<PhysicalIndexState> {
761        let native_artifacts = self
762            .native_physical_state()
763            .and_then(|state| state.registry)
764            .map(|registry| registry.vector_artifacts)
765            .unwrap_or_default();
766        for index in &mut indexes {
767            let Some(collection) = index.collection.as_deref() else {
768                continue;
769            };
770            let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
771                continue;
772            };
773            let Some(artifact) = native_artifacts.iter().find(|artifact| {
774                artifact.collection == collection && artifact.artifact_kind == artifact_kind
775            }) else {
776                index.build_state = "metadata-only".to_string();
777                continue;
778            };
779            index.entries = artifact.vector_count as usize;
780            index.estimated_memory_bytes = artifact.serialized_bytes;
781            index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
782            index.artifact_kind = Some(artifact.artifact_kind.clone());
783            index.artifact_checksum = Some(artifact.checksum);
784            index.build_state = "artifact-published".to_string();
785            if let Some(pages) = self.native_vector_artifact_pages() {
786                index.artifact_root_page = pages
787                    .into_iter()
788                    .find(|page| {
789                        page.collection == artifact.collection
790                            && page.artifact_kind == artifact.artifact_kind
791                    })
792                    .map(|page| page.root_page);
793            }
794        }
795        indexes
796    }
797
798    pub(crate) fn warmup_native_vector_artifact_for_index(
799        &self,
800        index: &PhysicalIndexState,
801    ) -> Result<(), String> {
802        let Some(collection) = index.collection.as_deref() else {
803            return Ok(());
804        };
805        let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
806            return Ok(());
807        };
808        self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
809        Ok(())
810    }
811
812    pub(crate) fn apply_runtime_native_artifact_to_index_state(
813        &self,
814        index: &mut PhysicalIndexState,
815    ) -> Result<(), String> {
816        let Some(collection) = index.collection.as_deref() else {
817            return Ok(());
818        };
819        let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
820            return Ok(());
821        };
822        let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
823        index.entries = artifact
824            .graph_edge_count
825            .or(artifact.text_posting_count)
826            .unwrap_or(artifact.node_count) as usize;
827        index.estimated_memory_bytes = artifact.byte_len;
828        index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
829        index.artifact_kind = Some(artifact.artifact_kind.clone());
830        index.artifact_checksum = Some(artifact.checksum);
831        index.build_state = "ready".to_string();
832        index.artifact_root_page = self
833            .native_vector_artifact_pages()
834            .and_then(|pages| {
835                pages.into_iter().find(|page| {
836                    page.collection == artifact.collection
837                        && page.artifact_kind == artifact.artifact_kind
838                })
839            })
840            .map(|page| page.root_page);
841        Ok(())
842    }
843
844    pub(crate) fn physical_index_state_from_native_state(
845        &self,
846        native_state: &NativePhysicalState,
847        previous: Option<&PhysicalMetadataFile>,
848    ) -> Vec<PhysicalIndexState> {
849        let mut fresh = self.physical_index_state();
850        let Some(registry) = native_state.registry.as_ref() else {
851            if let Some(previous) = previous {
852                for index in &previous.indexes {
853                    if !fresh.iter().any(|candidate| candidate.name == index.name) {
854                        fresh.push(index.clone());
855                    }
856                }
857            }
858            return fresh;
859        };
860
861        for index in &mut fresh {
862            if let Some(native) = registry
863                .indexes
864                .iter()
865                .find(|candidate| candidate.name == index.name)
866            {
867                index.enabled = native.enabled;
868                index.last_refresh_ms = native.last_refresh_ms;
869                index.backend = native.backend.clone();
870                index.entries = native.entries as usize;
871                index.estimated_memory_bytes = native.estimated_memory_bytes;
872                if index.artifact_kind.is_none() {
873                    index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
874                        .map(|value| value.to_string());
875                }
876                if index.build_state == "catalog-derived" {
877                    index.build_state = "registry-loaded".to_string();
878                }
879            }
880        }
881
882        for native in &registry.indexes {
883            if fresh.iter().any(|index| index.name == native.name) {
884                continue;
885            }
886            let Some(kind) = Self::index_kind_from_str(&native.kind) else {
887                continue;
888            };
889            fresh.push(PhysicalIndexState {
890                name: native.name.clone(),
891                kind,
892                collection: native.collection.clone(),
893                enabled: native.enabled,
894                entries: native.entries as usize,
895                estimated_memory_bytes: native.estimated_memory_bytes,
896                last_refresh_ms: native.last_refresh_ms,
897                backend: native.backend.clone(),
898                artifact_kind: Self::native_artifact_kind_for_index(kind)
899                    .map(|value| value.to_string()),
900                artifact_root_page: None,
901                artifact_checksum: None,
902                build_state: "registry-loaded".to_string(),
903            });
904        }
905
906        if !registry.indexes_complete {
907            if let Some(previous) = previous {
908                for index in &previous.indexes {
909                    if !fresh.iter().any(|candidate| candidate.name == index.name) {
910                        fresh.push(index.clone());
911                    }
912                }
913            }
914        }
915
916        fresh
917    }
918
919    pub(crate) fn graph_projections_from_native_state(
920        &self,
921        native_state: &NativePhysicalState,
922    ) -> Vec<PhysicalGraphProjection> {
923        native_state
924            .registry
925            .as_ref()
926            .map(|registry| {
927                registry
928                    .graph_projections
929                    .iter()
930                    .map(|projection| PhysicalGraphProjection {
931                        name: projection.name.clone(),
932                        created_at_unix_ms: projection.created_at_unix_ms,
933                        updated_at_unix_ms: projection.updated_at_unix_ms,
934                        state: "materialized".to_string(),
935                        source: projection.source.clone(),
936                        node_labels: projection.node_labels.clone(),
937                        node_types: projection.node_types.clone(),
938                        edge_labels: projection.edge_labels.clone(),
939                        last_materialized_sequence: projection.last_materialized_sequence,
940                    })
941                    .collect()
942            })
943            .unwrap_or_default()
944    }
945
946    pub(crate) fn analytics_jobs_from_native_state(
947        &self,
948        native_state: &NativePhysicalState,
949    ) -> Vec<PhysicalAnalyticsJob> {
950        native_state
951            .registry
952            .as_ref()
953            .map(|registry| {
954                registry
955                    .analytics_jobs
956                    .iter()
957                    .map(|job| PhysicalAnalyticsJob {
958                        id: job.id.clone(),
959                        kind: job.kind.clone(),
960                        state: job.state.clone(),
961                        projection: job.projection.clone(),
962                        created_at_unix_ms: job.created_at_unix_ms,
963                        updated_at_unix_ms: job.updated_at_unix_ms,
964                        last_run_sequence: job.last_run_sequence,
965                        metadata: job.metadata.clone(),
966                    })
967                    .collect()
968            })
969            .unwrap_or_default()
970    }
971
972    pub(crate) fn exports_from_native_state(
973        &self,
974        native_state: &NativePhysicalState,
975    ) -> Vec<ExportDescriptor> {
976        native_state
977            .recovery
978            .as_ref()
979            .map(|recovery| {
980                recovery
981                    .exports
982                    .iter()
983                    .map(|export| ExportDescriptor {
984                        name: export.name.clone(),
985                        created_at_unix_ms: export.created_at_unix_ms,
986                        snapshot_id: export.snapshot_id,
987                        superblock_sequence: export.superblock_sequence,
988                        data_path: self
989                            .path()
990                            .map(|path| {
991                                crate::physical::PhysicalMetadataFile::export_data_path_for(
992                                    path,
993                                    &export.name,
994                                )
995                                .display()
996                                .to_string()
997                            })
998                            .unwrap_or_default(),
999                        metadata_path: self
1000                            .path()
1001                            .map(|path| {
1002                                let export_data_path =
1003                                    crate::physical::PhysicalMetadataFile::export_data_path_for(
1004                                        path,
1005                                        &export.name,
1006                                    );
1007                                crate::physical::PhysicalMetadataFile::metadata_path_for(
1008                                    &export_data_path,
1009                                )
1010                                .display()
1011                                .to_string()
1012                            })
1013                            .unwrap_or_default(),
1014                        collection_count: export.collection_count as usize,
1015                        total_entities: export.total_entities as usize,
1016                    })
1017                    .collect()
1018            })
1019            .unwrap_or_default()
1020    }
1021
1022    pub(crate) fn snapshots_from_native_state(
1023        &self,
1024        native_state: &NativePhysicalState,
1025    ) -> Vec<crate::physical::SnapshotDescriptor> {
1026        native_state
1027            .recovery
1028            .as_ref()
1029            .map(|recovery| {
1030                recovery
1031                    .snapshots
1032                    .iter()
1033                    .map(|snapshot| crate::physical::SnapshotDescriptor {
1034                        snapshot_id: snapshot.snapshot_id,
1035                        created_at_unix_ms: snapshot.created_at_unix_ms,
1036                        superblock_sequence: snapshot.superblock_sequence,
1037                        collection_count: snapshot.collection_count as usize,
1038                        total_entities: snapshot.total_entities as usize,
1039                    })
1040                    .collect()
1041            })
1042            .unwrap_or_default()
1043    }
1044
1045    fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1046        match value {
1047            "btree" => Some(crate::index::IndexKind::BTree),
1048            "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1049            "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1050            "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1051            "text.fulltext" => Some(crate::index::IndexKind::FullText),
1052            "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1053            "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1054            _ => None,
1055        }
1056    }
1057
1058    pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1059        match kind {
1060            IndexKind::VectorHnsw => Some("hnsw"),
1061            IndexKind::VectorInverted => Some("ivf"),
1062            IndexKind::GraphAdjacency => Some("graph.adjacency"),
1063            IndexKind::FullText => Some("text.fulltext"),
1064            IndexKind::DocumentPathValue => Some("document.pathvalue"),
1065            _ => None,
1066        }
1067    }
1068
1069    fn index_is_declared(&self, name: &str) -> bool {
1070        self.physical_metadata()
1071            .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1072            .unwrap_or(false)
1073    }
1074
1075    pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1076        self.physical_metadata()
1077            .map(|metadata| {
1078                metadata
1079                    .graph_projections
1080                    .iter()
1081                    .any(|projection| projection.name == name)
1082            })
1083            .unwrap_or(false)
1084    }
1085
1086    pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1087        self.operational_graph_projections()
1088            .into_iter()
1089            .any(|projection| projection.name == name && projection.state == "materialized")
1090    }
1091
1092    pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1093        match projection {
1094            Some(projection) => format!("{kind}::{projection}"),
1095            None => format!("{kind}::global"),
1096        }
1097    }
1098
1099    pub(crate) fn update_physical_metadata<T, F>(
1100        &self,
1101        mutator: F,
1102    ) -> Result<T, Box<dyn std::error::Error>>
1103    where
1104        F: FnOnce(&mut PhysicalMetadataFile) -> T,
1105    {
1106        if self.options.mode != StorageMode::Persistent {
1107            return Err("physical metadata operations require persistent mode".into());
1108        }
1109        if self.options.read_only {
1110            return Err("physical metadata operations are not allowed in read-only mode".into());
1111        }
1112        let Some(path) = self.path() else {
1113            return Err("database path is not available".into());
1114        };
1115
1116        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1117
1118        if metadata.indexes.is_empty() {
1119            metadata.indexes = self.physical_index_state();
1120        }
1121        metadata.superblock.collection_roots = self.physical_collection_roots();
1122
1123        let result = mutator(&mut metadata);
1124        metadata.save_for_data_path(path)?;
1125        self.persist_native_physical_header(&metadata)?;
1126        Ok(result)
1127    }
1128
1129    pub(crate) fn persist_native_physical_header(
1130        &self,
1131        metadata: &PhysicalMetadataFile,
1132    ) -> Result<(), Box<dyn std::error::Error>> {
1133        if !self.paged_mode {
1134            return Ok(());
1135        }
1136
1137        let existing_page = self
1138            .store
1139            .physical_file_header()
1140            .map(|header| header.collection_roots_page)
1141            .filter(|page| *page != 0);
1142        let existing_registry_page = self
1143            .store
1144            .physical_file_header()
1145            .map(|header| header.registry_page)
1146            .filter(|page| *page != 0);
1147        let existing_recovery_page = self
1148            .store
1149            .physical_file_header()
1150            .map(|header| header.recovery_page)
1151            .filter(|page| *page != 0);
1152        let existing_catalog_page = self
1153            .store
1154            .physical_file_header()
1155            .map(|header| header.catalog_page)
1156            .filter(|page| *page != 0);
1157        let existing_metadata_state_page = self
1158            .store
1159            .physical_file_header()
1160            .map(|header| header.metadata_state_page)
1161            .filter(|page| *page != 0);
1162        let existing_vector_artifact_page = self
1163            .store
1164            .physical_file_header()
1165            .map(|header| header.vector_artifact_page)
1166            .filter(|page| *page != 0);
1167        let existing_manifest_page = self
1168            .store
1169            .physical_file_header()
1170            .map(|header| header.manifest_page)
1171            .filter(|page| *page != 0);
1172        let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1173            metadata.superblock.sequence,
1174            &metadata.manifest_events,
1175            existing_manifest_page,
1176        )?;
1177        let (collection_roots_page, collection_roots_checksum) = self
1178            .store
1179            .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1180        let registry_summary = self.native_registry_summary_from_metadata(metadata);
1181        let (registry_page, registry_checksum) = self
1182            .store
1183            .write_native_registry_summary(&registry_summary, existing_registry_page)?;
1184        let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1185        let (recovery_page, recovery_checksum) = self
1186            .store
1187            .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1188        let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1189        let (catalog_page, catalog_checksum) = self
1190            .store
1191            .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1192        let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1193        let (metadata_state_page, metadata_state_checksum) =
1194            self.store.write_native_metadata_state_summary(
1195                &metadata_state_summary,
1196                existing_metadata_state_page,
1197            )?;
1198        let vector_artifact_records = self.native_vector_artifact_records();
1199        let vector_artifact_payloads = vector_artifact_records
1200            .iter()
1201            .map(|(summary, bytes)| {
1202                (
1203                    summary.collection.clone(),
1204                    summary.artifact_kind.clone(),
1205                    bytes.clone(),
1206                )
1207            })
1208            .collect::<Vec<_>>();
1209        let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1210            self.store.write_native_vector_artifact_store(
1211                &vector_artifact_payloads,
1212                existing_vector_artifact_page,
1213            )?;
1214        let mut header = Self::native_header_from_metadata(metadata);
1215        header.manifest_page = manifest_page;
1216        header.manifest_checksum = manifest_checksum;
1217        header.collection_roots_page = collection_roots_page;
1218        header.collection_roots_checksum = collection_roots_checksum;
1219        header.registry_page = registry_page;
1220        header.registry_checksum = registry_checksum;
1221        header.recovery_page = recovery_page;
1222        header.recovery_checksum = recovery_checksum;
1223        header.catalog_page = catalog_page;
1224        header.catalog_checksum = catalog_checksum;
1225        header.metadata_state_page = metadata_state_page;
1226        header.metadata_state_checksum = metadata_state_checksum;
1227        header.vector_artifact_page = vector_artifact_page;
1228        header.vector_artifact_checksum = vector_artifact_checksum;
1229        self.store.update_physical_file_header(header)?;
1230        self.store.persist()?;
1231        Ok(())
1232    }
1233
1234    pub(crate) fn native_header_from_metadata(
1235        metadata: &PhysicalMetadataFile,
1236    ) -> PhysicalFileHeader {
1237        PhysicalFileHeader {
1238            format_version: metadata.superblock.format_version,
1239            sequence: metadata.superblock.sequence,
1240            manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1241            manifest_root: metadata.superblock.manifest.newest.index,
1242            free_set_root: metadata.superblock.free_set.index,
1243            manifest_page: 0,
1244            manifest_checksum: 0,
1245            collection_roots_page: 0,
1246            collection_roots_checksum: 0,
1247            collection_root_count: metadata.superblock.collection_roots.len() as u32,
1248            snapshot_count: metadata.snapshots.len() as u32,
1249            index_count: metadata.indexes.len() as u32,
1250            catalog_collection_count: metadata.catalog.total_collections as u32,
1251            catalog_total_entities: metadata.catalog.total_entities as u64,
1252            export_count: metadata.exports.len() as u32,
1253            graph_projection_count: metadata.graph_projections.len() as u32,
1254            analytics_job_count: metadata.analytics_jobs.len() as u32,
1255            manifest_event_count: metadata.manifest_events.len() as u32,
1256            registry_page: 0,
1257            registry_checksum: 0,
1258            recovery_page: 0,
1259            recovery_checksum: 0,
1260            catalog_page: 0,
1261            catalog_checksum: 0,
1262            metadata_state_page: 0,
1263            metadata_state_checksum: 0,
1264            vector_artifact_page: 0,
1265            vector_artifact_checksum: 0,
1266        }
1267    }
1268
1269    fn recover_queue_pending_state(&self) {
1270        const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1271
1272        let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1273            return;
1274        };
1275
1276        let pending_rows = manager.query_all(|entity| {
1277            entity.data.as_row().is_some_and(|row| {
1278                matches!(
1279                    row.get_field("kind"),
1280                    Some(crate::storage::schema::Value::Text(kind)) if &**kind == "queue_pending"
1281                )
1282            })
1283        });
1284
1285        for row in pending_rows {
1286            let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1287        }
1288    }
1289}