Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_metadata.rs

1use super::*;
2use crate::storage::unified::metadata::{MetadataFilter, MetadataValue};
3
4impl RedDB {
5    pub fn is_replica_role(&self) -> bool {
6        matches!(
7            self.options.replication.role,
8            crate::replication::ReplicationRole::Replica { .. }
9        )
10    }
11
12    pub fn enforce_retention_policy(&self) -> Result<(), Box<dyn std::error::Error>> {
13        if self.options.read_only || self.is_replica_role() {
14            return Ok(());
15        }
16
17        // Export pruning is only meaningful for persistent mode where we
18        // have a metadata sidecar that tracks file-backed export artifacts.
19        if self.options.mode == StorageMode::Persistent {
20            let Some(path) = self.path() else {
21                return Ok(());
22            };
23
24            let Ok(mut metadata) = self.load_or_bootstrap_physical_metadata(true) else {
25                return Ok(());
26            };
27
28            self.prune_export_registry(&mut metadata.exports);
29            metadata.save_for_data_path(path)?;
30        }
31
32        let _ = self.sweep_ttl_expired_entities()?;
33
34        Ok(())
35    }
36
37    pub(crate) fn ttl_expired_entities_now(
38        &self,
39    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
40        self.ttl_expired_entities_at(current_unix_ms())
41    }
42
43    pub fn replica_allows_entity_at_read(
44        &self,
45        collection: &str,
46        entity: &crate::storage::UnifiedEntity,
47    ) -> bool {
48        if !self.is_replica_role() {
49            return true;
50        }
51        !self.entity_expired_at(collection, entity, current_unix_ms())
52    }
53
54    fn sweep_ttl_expired_entities(&self) -> Result<usize, Box<dyn std::error::Error>> {
55        let to_delete = self.ttl_expired_entities_now()?;
56
57        let mut deleted = 0usize;
58        for (collection, id) in to_delete {
59            match self.store.delete(&collection, id) {
60                Ok(true) => deleted = deleted.saturating_add(1),
61                Ok(false) => {}
62                Err(err) => {
63                    return Err(format!(
64                        "failed deleting expired entity {id} from collection '{collection}': {err:?}"
65                    )
66                    .into());
67                }
68            }
69        }
70
71        Ok(deleted)
72    }
73
74    fn ttl_expired_entities_at(
75        &self,
76        now_ms: u64,
77    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
78        let mut to_delete = Vec::<(String, EntityId)>::new();
79
80        let mut absolute_expired = self.expired_entities_by_expires_at(now_ms)?;
81        to_delete.append(&mut absolute_expired);
82
83        let mut relative_expired = self.expired_entities_by_ttl(now_ms)?;
84        to_delete.append(&mut relative_expired);
85
86        to_delete.sort_unstable();
87        to_delete.dedup();
88
89        Ok(to_delete)
90    }
91
92    fn entity_expired_at(
93        &self,
94        collection: &str,
95        entity: &crate::storage::UnifiedEntity,
96        now_ms: u64,
97    ) -> bool {
98        let Some(metadata) = self.store.get_metadata(collection, entity.id) else {
99            return false;
100        };
101
102        if metadata
103            .get("_expires_at")
104            .and_then(Self::metadata_u64)
105            .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
106        {
107            return true;
108        }
109
110        let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
111        let ttl_secs = if ttl_ms.is_none() {
112            metadata.get("_ttl").and_then(|value| {
113                Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
114            })
115        } else {
116            None
117        };
118
119        let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
120            return false;
121        };
122        entity
123            .created_at
124            .saturating_mul(1000)
125            .saturating_add(ttl_ms)
126            <= now_ms
127    }
128
129    fn expired_entities_by_expires_at(
130        &self,
131        now_ms: u64,
132    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
133        let mut ids = self.store.filter_metadata_all(&[(
134            "_expires_at".to_string(),
135            MetadataFilter::Le(MetadataValue::Timestamp(now_ms)),
136        )]);
137
138        if let Ok(now_ms_i64) = i64::try_from(now_ms) {
139            ids.extend(self.store.filter_metadata_all(&[(
140                "_expires_at".to_string(),
141                MetadataFilter::Le(MetadataValue::Int(now_ms_i64)),
142            )]));
143        }
144
145        let now_ms_f64 = now_ms as f64;
146        if now_ms_f64.is_finite() {
147            ids.extend(self.store.filter_metadata_all(&[(
148                "_expires_at".to_string(),
149                MetadataFilter::Le(MetadataValue::Float(now_ms_f64)),
150            )]));
151        }
152
153        Ok(ids)
154    }
155
156    fn expired_entities_by_ttl(
157        &self,
158        now_ms: u64,
159    ) -> Result<Vec<(String, EntityId)>, Box<dyn std::error::Error>> {
160        let mut candidates = Vec::<(String, EntityId)>::new();
161
162        let ttl_ms_candidates = self
163            .store
164            .filter_metadata_all(&[("_ttl_ms".to_string(), MetadataFilter::IsNotNull)]);
165        candidates.extend(ttl_ms_candidates);
166
167        let ttl_candidates = self
168            .store
169            .filter_metadata_all(&[("_ttl".to_string(), MetadataFilter::IsNotNull)]);
170        candidates.extend(ttl_candidates);
171
172        if candidates.is_empty() {
173            return Ok(Vec::new());
174        }
175
176        candidates.sort_unstable();
177        candidates.dedup();
178
179        let mut expired = Vec::<(String, EntityId)>::new();
180        for (collection, entity_id) in candidates {
181            let Some(entity) = self.store.get(&collection, entity_id) else {
182                continue;
183            };
184
185            let Some(metadata) = self.store.get_metadata(&collection, entity_id) else {
186                continue;
187            };
188
189            let ttl_ms = metadata.get("_ttl_ms").and_then(Self::metadata_u64);
190            let ttl_secs = if ttl_ms.is_none() {
191                metadata.get("_ttl").and_then(|value| {
192                    Self::metadata_u64(value).and_then(|value_secs| value_secs.checked_mul(1000))
193                })
194            } else {
195                None
196            };
197
198            let Some(ttl_ms) = ttl_ms.or(ttl_secs) else {
199                continue;
200            };
201
202            let created_at_ms = entity.created_at.saturating_mul(1000);
203            let expiry_ms = created_at_ms.saturating_add(ttl_ms);
204            if expiry_ms <= now_ms {
205                expired.push((collection, entity_id));
206            }
207        }
208
209        Ok(expired)
210    }
211
212    fn metadata_u64(value: &MetadataValue) -> Option<u64> {
213        match value {
214            MetadataValue::Int(v) if *v >= 0 => Some(*v as u64),
215            MetadataValue::Timestamp(v) => Some(*v),
216            MetadataValue::Float(v) => {
217                if !v.is_finite() || !v.is_sign_positive() || v.fract().abs() >= f64::EPSILON {
218                    return None;
219                }
220                if *v > u64::MAX as f64 {
221                    return None;
222                }
223                Some(v.trunc() as u64)
224            }
225            MetadataValue::String(v) => v.parse::<u64>().ok(),
226            _ => None,
227        }
228    }
229
230    // ========================================================================
231    // Builder Methods - Create Entities
232    // ========================================================================
233
234    /// Start building a graph node
235    ///
236    /// # Example
237    /// ```ignore
238    /// let host = db.node("hosts", "Host")
239    ///     .property("ip", "192.168.1.1")
240    ///     .save()?;
241    /// ```
242    pub fn node(&self, collection: impl Into<String>, label: impl Into<String>) -> NodeBuilder {
243        NodeBuilder::new(
244            self.store.clone(),
245            self.preprocessors.clone(),
246            collection,
247            label,
248        )
249    }
250
251    /// Start building a graph edge
252    ///
253    /// # Example
254    /// ```ignore
255    /// let edge = db.edge("connections", "CONNECTS_TO")
256    ///     .from(host_a)
257    ///     .to(host_b)
258    ///     .weight(0.95)
259    ///     .property("protocol", "TCP")
260    ///     .save()?;
261    /// ```
262    pub fn edge(&self, collection: impl Into<String>, label: impl Into<String>) -> EdgeBuilder {
263        EdgeBuilder::new(
264            self.store.clone(),
265            self.preprocessors.clone(),
266            collection,
267            label,
268        )
269    }
270
271    /// Start building a vector entry
272    ///
273    /// # Example
274    /// ```ignore
275    /// let vec = db.vector("embeddings")
276    ///     .dense(embedding)
277    ///     .content("Original text content")
278    ///     .metadata("source", "document.pdf")
279    ///     .save()?;
280    /// ```
281    pub fn vector(&self, collection: impl Into<String>) -> VectorBuilder {
282        VectorBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
283    }
284
285    /// Start building a table row
286    ///
287    /// # Example
288    /// ```ignore
289    /// let row = db.row("scans", vec![
290    ///     ("timestamp", Value::Timestamp(now)),
291    ///     ("target", Value::text("192.168.1.0/24")),
292    ///     ("findings", Value::Integer(42)),
293    /// ]).save()?;
294    /// ```
295    pub fn row(&self, table: impl Into<String>, columns: Vec<(&str, Value)>) -> RowBuilder {
296        RowBuilder::new(
297            self.store.clone(),
298            self.preprocessors.clone(),
299            table,
300            columns,
301        )
302    }
303
304    /// Start building a document
305    ///
306    /// Documents are stored as enriched table rows with a full JSON body
307    /// field and flattened top-level keys for filtering.
308    ///
309    /// # Example
310    /// ```ignore
311    /// let doc = db.doc("articles")
312    ///     .field("title", "Hello World")
313    ///     .field("views", 42)
314    ///     .metadata("source", "web")
315    ///     .save()?;
316    /// ```
317    pub fn doc(&self, collection: impl Into<String>) -> DocumentBuilder {
318        DocumentBuilder::new(self.store.clone(), self.preprocessors.clone(), collection)
319    }
320
321    /// Start building a key-value pair
322    ///
323    /// KV pairs are stored as table rows with named fields `key` and `value`.
324    ///
325    /// # Example
326    /// ```ignore
327    /// let id = db.kv("config", "theme", Value::text("dark"))
328    ///     .metadata("updated_by", "admin")
329    ///     .save()?;
330    /// ```
331    pub fn kv(
332        &self,
333        collection: impl Into<String>,
334        key: impl Into<String>,
335        value: Value,
336    ) -> KvBuilder {
337        KvBuilder::new(
338            self.store.clone(),
339            self.preprocessors.clone(),
340            collection,
341            key,
342            value,
343        )
344    }
345
346    /// Get a key-value pair by key, returning the value and entity id
347    ///
348    /// Scans the collection for an entity whose named field `key` matches.
349    pub fn get_kv(&self, collection: &str, key: &str) -> Option<(Value, EntityId)> {
350        let manager = self.store.get_collection(collection)?;
351        let entities = manager.query_all(|_| true);
352        for entity in entities {
353            if let EntityData::Row(ref row) = entity.data {
354                if let Some(ref named) = row.named {
355                    if let Some(Value::Text(ref k)) = named.get("key") {
356                        if &**k == key {
357                            let value = named.get("value").cloned().unwrap_or(Value::Null);
358                            return Some((value, entity.id));
359                        }
360                    }
361                }
362            }
363        }
364        None
365    }
366
367    /// Delete a key-value pair by key, returning whether it was found and removed
368    pub fn delete_kv(
369        &self,
370        collection: &str,
371        key: &str,
372    ) -> Result<bool, super::super::error::DevXError> {
373        let Some((_, id)) = self.get_kv(collection, key) else {
374            return Ok(false);
375        };
376        self.store
377            .delete(collection, id)
378            .map_err(|err| super::super::error::DevXError::Storage(format!("{err:?}")))?;
379        Ok(true)
380    }
381
382    pub(crate) fn with_initialized_metadata(self) -> Result<Self, Box<dyn std::error::Error>> {
383        if self.options.mode == StorageMode::Persistent {
384            // Load metadata without persisting (avoids blocking catalog snapshot on boot)
385            if let Ok(metadata) = self.load_or_bootstrap_physical_metadata(false) {
386                crate::reserved_fields::validate_physical_metadata_contracts(&metadata)
387                    .map_err(|err| err.to_string())?;
388            }
389            // Skip repair on boot — deferred to first explicit persist_metadata() call.
390            // This avoids the recursive catalog_model_snapshot → physical_metadata loop
391            // that caused stack overflow / 12-second hang on startup.
392        }
393        self.load_collection_ttl_defaults_from_metadata();
394        // Single-file artifact: collection contracts ride the store's binary
395        // dump, not the metadata sidecar (which is never written in this
396        // profile). Seed the contract cache from that blob so each
397        // collection's declared model survives a restart — otherwise recovery
398        // re-infers the model from the stored entities and a KV collection
399        // comes back as a table.
400        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
401            && self.options.storage_profile.packaging
402                == crate::storage::StoragePackaging::SingleFile
403        {
404            self.seed_contract_cache_from_store_aux();
405        }
406        // Issue #866 — rehydrate the hypertable chunk spine before the
407        // API opens so chunk routing / pruning / TTL work immediately
408        // after a restart.
409        self.load_hypertables_from_metadata();
410        self.recover_queue_pending_state();
411        Ok(self)
412    }
413
414    pub(crate) fn persist_metadata(&self) -> Result<(), Box<dyn std::error::Error>> {
415        if self.options.mode != StorageMode::Persistent || self.options.read_only {
416            return Ok(());
417        }
418        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
419            && self.options.storage_profile.packaging
420                == crate::storage::StoragePackaging::SingleFile
421        {
422            return Ok(());
423        }
424        let Some(path) = self.path() else {
425            return Ok(());
426        };
427
428        let previous = self.load_or_bootstrap_physical_metadata(false).ok();
429        let collection_roots = self.physical_collection_roots();
430        let indexes = self
431            .native_physical_state()
432            .map(|state| self.physical_index_state_from_native_state(&state, previous.as_ref()))
433            .unwrap_or_else(|| self.physical_index_state());
434        let mut metadata = PhysicalMetadataFile::from_state(
435            self.options.clone(),
436            self.catalog_snapshot(),
437            collection_roots,
438            indexes,
439            previous.as_ref(),
440        );
441        metadata.collection_ttl_defaults_ms = self.collection_ttl_defaults_snapshot();
442        // Issue #866 — persist the hypertable chunk spine (specs +
443        // chunk bounds / routing / TTL) onto the same metadata path as
444        // collection contracts so a restart recovers it identically.
445        metadata.hypertables = self.hypertable_registry_snapshot();
446        metadata.save_for_data_path(path)?;
447        self.persist_native_physical_header(&metadata)?;
448        Ok(())
449    }
450
451    fn bootstrap_metadata_from_native_state(&self) -> Result<bool, Box<dyn std::error::Error>> {
452        if self.options.mode != StorageMode::Persistent || self.options.read_only {
453            return Ok(false);
454        }
455        let Some(path) = self.path() else {
456            return Ok(false);
457        };
458        let Some(native_state) = self.native_physical_state() else {
459            return Ok(false);
460        };
461        if !Self::native_state_is_bootstrap_complete(&native_state) {
462            return Ok(false);
463        }
464
465        let previous = PhysicalMetadataFile::load_for_data_path(path).ok();
466        let metadata = self.metadata_from_native_state(&native_state, previous.as_ref());
467        metadata.save_for_data_path(path)?;
468        self.persist_native_physical_header(&metadata)?;
469        Ok(true)
470    }
471
472    /// Rebuild the external physical metadata view from the native state published in the
473    /// paged database file.
474    pub fn rebuild_physical_metadata_from_native_state(
475        &self,
476    ) -> Result<bool, Box<dyn std::error::Error>> {
477        self.bootstrap_metadata_from_native_state()
478    }
479
480    pub(crate) fn native_state_is_bootstrap_complete(native_state: &NativePhysicalState) -> bool {
481        let registry_complete = native_state.registry.as_ref().map(|registry| {
482            registry.collections_complete
483                && registry.indexes_complete
484                && registry.graph_projections_complete
485                && registry.analytics_jobs_complete
486                && registry.vector_artifacts_complete
487        });
488        let recovery_complete = native_state
489            .recovery
490            .as_ref()
491            .map(|recovery| recovery.snapshots_complete && recovery.exports_complete);
492        let catalog_complete = native_state
493            .catalog
494            .as_ref()
495            .map(|catalog| catalog.collections_complete);
496
497        registry_complete == Some(true)
498            && recovery_complete == Some(true)
499            && catalog_complete == Some(true)
500    }
501
502    pub(crate) fn load_or_bootstrap_physical_metadata(
503        &self,
504        persist_bootstrapped: bool,
505    ) -> Result<PhysicalMetadataFile, Box<dyn std::error::Error>> {
506        if self.options.mode != StorageMode::Persistent {
507            return Err("physical metadata requires persistent mode".into());
508        }
509        let Some(path) = self.path() else {
510            return Err("database path is not available".into());
511        };
512        let native_state = self.native_physical_state();
513
514        match PhysicalMetadataFile::load_for_data_path(path) {
515            Ok(metadata) => {
516                if let Some(native_state) = native_state.as_ref() {
517                    let inspection = Self::inspect_native_header_against_metadata(
518                        native_state.header,
519                        &metadata,
520                    );
521                    if Self::repair_policy_for_inspection(&inspection)
522                        == NativeHeaderRepairPolicy::NativeAheadOfMetadata
523                    {
524                        let bootstrapped =
525                            self.metadata_from_native_state(native_state, Some(&metadata));
526                        if persist_bootstrapped && !self.options.read_only {
527                            bootstrapped.save_for_data_path(path)?;
528                            self.persist_native_physical_header(&bootstrapped)?;
529                        }
530                        return Ok(bootstrapped);
531                    }
532                }
533                Ok(metadata)
534            }
535            Err(err) => {
536                let Some(native_state) = native_state else {
537                    return Err(err.into());
538                };
539                // Accept the bootstrap when the native state is either
540                // (a) fully populated and consistent (the original
541                // contract), or (b) trivially empty — a freshly created
542                // database with no collections written yet. Without (b)
543                // a brand-new data file can never reach
544                // `readiness_for_query = true`, because the bootstrap
545                // refuses to run until the registry/catalog/recovery
546                // structures are "complete", which they never become
547                // until the bootstrap has already run once.
548                //
549                // The emptiness check is conservative: header.sequence
550                // must still be at its initial value AND all three
551                // physical state summaries must be absent. Anything
552                // else falls through to the original error so we never
553                // paper over partially corrupted files.
554                let is_fresh_empty = native_state.header.sequence == 0
555                    && native_state.registry.is_none()
556                    && native_state.catalog.is_none()
557                    && native_state.recovery.is_none();
558                if !is_fresh_empty && !Self::native_state_is_bootstrap_complete(&native_state) {
559                    return Err(err.into());
560                }
561                let metadata = self.metadata_from_native_state(&native_state, None);
562                if persist_bootstrapped && !self.options.read_only {
563                    metadata.save_for_data_path(path)?;
564                    self.persist_native_physical_header(&metadata)?;
565                }
566                Ok(metadata)
567            }
568        }
569    }
570
571    pub(crate) fn physical_metadata_preference(&self) -> Option<&'static str> {
572        let path = self.path()?;
573        let native_state = self.native_physical_state();
574        let metadata = PhysicalMetadataFile::load_for_data_path(path).ok();
575
576        match (metadata, native_state) {
577            (Some(metadata), Some(native_state)) => {
578                let inspection =
579                    Self::inspect_native_header_against_metadata(native_state.header, &metadata);
580                match Self::repair_policy_for_inspection(&inspection) {
581                    NativeHeaderRepairPolicy::InSync => Some("sidecar_current"),
582                    NativeHeaderRepairPolicy::RepairNativeFromMetadata => Some("sidecar_current"),
583                    NativeHeaderRepairPolicy::NativeAheadOfMetadata => Some("native_ahead"),
584                }
585            }
586            (Some(_), None) => Some("sidecar_only"),
587            (None, Some(_)) => Some("sidecar_missing_native_available"),
588            (None, None) => Some("sidecar_missing_no_native"),
589        }
590    }
591
592    fn metadata_from_native_state(
593        &self,
594        native_state: &NativePhysicalState,
595        previous: Option<&PhysicalMetadataFile>,
596    ) -> PhysicalMetadataFile {
597        let now = SystemTime::now()
598            .duration_since(UNIX_EPOCH)
599            .unwrap_or_default()
600            .as_millis();
601        let catalog = self.catalog_snapshot();
602        let catalog_name = catalog.name.clone();
603        let catalog_total_entities = catalog.total_entities;
604        let catalog_total_collections = catalog.total_collections;
605        let indexes = self.physical_index_state();
606
607        let mut manifest =
608            crate::api::SchemaManifest::now(self.options.clone(), catalog.total_collections);
609        manifest.updated_at_unix_ms = now;
610
611        let manifest_events = native_state
612            .manifest
613            .as_ref()
614            .map(|summary| {
615                summary
616                    .recent_events
617                    .iter()
618                    .map(|event| crate::physical::ManifestEvent {
619                        collection: event.collection.clone(),
620                        object_key: event.object_key.clone(),
621                        kind: match event.kind.as_str() {
622                            "insert" => crate::physical::ManifestEventKind::Insert,
623                            "update" => crate::physical::ManifestEventKind::Update,
624                            "remove" => crate::physical::ManifestEventKind::Remove,
625                            _ => crate::physical::ManifestEventKind::Checkpoint,
626                        },
627                        block: crate::physical::BlockReference {
628                            index: event.block_index,
629                            checksum: event.block_checksum,
630                        },
631                        snapshot_min: event.snapshot_min,
632                        snapshot_max: event.snapshot_max,
633                    })
634                    .collect()
635            })
636            .unwrap_or_default();
637
638        let graph_projections = native_state
639            .registry
640            .as_ref()
641            .and_then(|registry| {
642                registry.graph_projections_complete.then(|| {
643                    registry
644                        .graph_projections
645                        .iter()
646                        .map(|projection| crate::physical::PhysicalGraphProjection {
647                            name: projection.name.clone(),
648                            created_at_unix_ms: projection.created_at_unix_ms,
649                            updated_at_unix_ms: projection.updated_at_unix_ms,
650                            state: "materialized".to_string(),
651                            source: projection.source.clone(),
652                            node_labels: projection.node_labels.clone(),
653                            node_types: projection.node_types.clone(),
654                            edge_labels: projection.edge_labels.clone(),
655                            last_materialized_sequence: projection.last_materialized_sequence,
656                        })
657                        .collect()
658                })
659            })
660            .or_else(|| previous.map(|metadata| metadata.graph_projections.clone()))
661            .unwrap_or_default();
662
663        let analytics_jobs = native_state
664            .registry
665            .as_ref()
666            .and_then(|registry| {
667                registry.analytics_jobs_complete.then(|| {
668                    registry
669                        .analytics_jobs
670                        .iter()
671                        .map(|job| crate::physical::PhysicalAnalyticsJob {
672                            id: job.id.clone(),
673                            kind: job.kind.clone(),
674                            state: job.state.clone(),
675                            projection: job.projection.clone(),
676                            created_at_unix_ms: job.created_at_unix_ms,
677                            updated_at_unix_ms: job.updated_at_unix_ms,
678                            last_run_sequence: job.last_run_sequence,
679                            metadata: job.metadata.clone(),
680                        })
681                        .collect()
682                })
683            })
684            .or_else(|| previous.map(|metadata| metadata.analytics_jobs.clone()))
685            .unwrap_or_default();
686
687        let exports = native_state
688            .recovery
689            .as_ref()
690            .and_then(|recovery| {
691                recovery.exports_complete.then(|| {
692                    recovery
693                        .exports
694                        .iter()
695                        .map(|export| crate::physical::ExportDescriptor {
696                            name: export.name.clone(),
697                            created_at_unix_ms: export.created_at_unix_ms,
698                            snapshot_id: export.snapshot_id,
699                            superblock_sequence: export.superblock_sequence,
700                            data_path: self
701                                .path()
702                                .map(|path| {
703                                    crate::physical::PhysicalMetadataFile::export_data_path_for(
704                                        path,
705                                        &export.name,
706                                    )
707                                    .display()
708                                    .to_string()
709                                })
710                                .unwrap_or_default(),
711                            metadata_path: self
712                                .path()
713                                .map(|path| {
714                                    let export_data_path =
715                                        crate::physical::PhysicalMetadataFile::export_data_path_for(
716                                            path,
717                                            &export.name,
718                                        );
719                                    crate::physical::PhysicalMetadataFile::metadata_path_for(
720                                        &export_data_path,
721                                    )
722                                    .display()
723                                    .to_string()
724                                })
725                                .unwrap_or_default(),
726                            collection_count: export.collection_count as usize,
727                            total_entities: export.total_entities as usize,
728                        })
729                        .collect()
730                })
731            })
732            .or_else(|| previous.map(|metadata| metadata.exports.clone()))
733            .unwrap_or_default();
734
735        let snapshots = native_state
736            .recovery
737            .as_ref()
738            .and_then(|recovery| {
739                recovery.snapshots_complete.then(|| {
740                    recovery
741                        .snapshots
742                        .iter()
743                        .map(|snapshot| crate::physical::SnapshotDescriptor {
744                            snapshot_id: snapshot.snapshot_id,
745                            created_at_unix_ms: snapshot.created_at_unix_ms,
746                            superblock_sequence: snapshot.superblock_sequence,
747                            collection_count: snapshot.collection_count as usize,
748                            total_entities: snapshot.total_entities as usize,
749                        })
750                        .collect()
751                })
752            })
753            .or_else(|| previous.map(|metadata| metadata.snapshots.clone()))
754            .unwrap_or_else(|| {
755                vec![crate::physical::SnapshotDescriptor {
756                    snapshot_id: native_state.header.sequence,
757                    created_at_unix_ms: now,
758                    superblock_sequence: native_state.header.sequence,
759                    collection_count: catalog_total_collections,
760                    total_entities: catalog_total_entities,
761                }]
762            });
763
764        let catalog_stats = native_state
765            .catalog
766            .as_ref()
767            .and_then(|native_catalog| {
768                native_catalog.collections_complete.then(|| {
769                    native_catalog
770                        .collections
771                        .iter()
772                        .map(|collection| {
773                            (
774                                collection.name.clone(),
775                                crate::api::CollectionStats {
776                                    entities: collection.entities as usize,
777                                    cross_refs: collection.cross_refs as usize,
778                                    segments: collection.segments as usize,
779                                },
780                            )
781                        })
782                        .collect::<BTreeMap<_, _>>()
783                })
784            })
785            .or_else(|| previous.map(|metadata| metadata.catalog.stats_by_collection.clone()))
786            .unwrap_or_else(|| catalog.stats_by_collection.clone());
787
788        PhysicalMetadataFile {
789            protocol_version: crate::physical::PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
790            generated_at_unix_ms: now,
791            last_loaded_from: Some("native_bootstrap".to_string()),
792            last_healed_at_unix_ms: Some(now),
793            manifest,
794            catalog: crate::api::CatalogSnapshot {
795                name: catalog_name,
796                total_entities: native_state
797                    .catalog
798                    .as_ref()
799                    .map(|summary| summary.total_entities as usize)
800                    .unwrap_or(catalog_total_entities),
801                total_collections: native_state
802                    .catalog
803                    .as_ref()
804                    .map(|summary| summary.collection_count as usize)
805                    .unwrap_or(catalog_total_collections),
806                stats_by_collection: catalog_stats,
807                updated_at: SystemTime::now(),
808            },
809            manifest_events,
810            collection_ttl_defaults_ms: previous
811                .map(|metadata| metadata.collection_ttl_defaults_ms.clone())
812                .unwrap_or_default(),
813            collection_contracts: previous
814                .map(|metadata| metadata.collection_contracts.clone())
815                .unwrap_or_default(),
816            hypertables: previous
817                .map(|metadata| metadata.hypertables.clone())
818                .unwrap_or_default(),
819            tree_definitions: previous
820                .map(|metadata| metadata.tree_definitions.clone())
821                .unwrap_or_default(),
822            indexes,
823            graph_projections,
824            analytics_jobs,
825            exports,
826            superblock: crate::physical::SuperblockHeader {
827                format_version: native_state.header.format_version,
828                sequence: native_state.header.sequence,
829                copies: crate::physical::DEFAULT_SUPERBLOCK_COPIES,
830                manifest: crate::physical::ManifestPointers {
831                    oldest: crate::physical::BlockReference {
832                        index: native_state.header.manifest_oldest_root,
833                        checksum: 0,
834                    },
835                    newest: crate::physical::BlockReference {
836                        index: native_state.header.manifest_root,
837                        checksum: 0,
838                    },
839                },
840                free_set: crate::physical::BlockReference {
841                    index: native_state.header.free_set_root,
842                    checksum: 0,
843                },
844                collection_roots: native_state.collection_roots.clone(),
845            },
846            snapshots,
847        }
848    }
849
850    pub(crate) fn reconcile_index_states_with_native_artifacts(
851        &self,
852        mut indexes: Vec<PhysicalIndexState>,
853    ) -> Vec<PhysicalIndexState> {
854        let native_artifacts = self
855            .native_physical_state()
856            .and_then(|state| state.registry)
857            .map(|registry| registry.vector_artifacts)
858            .unwrap_or_default();
859        for index in &mut indexes {
860            let Some(collection) = index.collection.as_deref() else {
861                continue;
862            };
863            let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
864                continue;
865            };
866            let Some(artifact) = native_artifacts.iter().find(|artifact| {
867                artifact.collection == collection && artifact.artifact_kind == artifact_kind
868            }) else {
869                index.build_state = "metadata-only".to_string();
870                continue;
871            };
872            index.entries = artifact.vector_count as usize;
873            index.estimated_memory_bytes = artifact.serialized_bytes;
874            index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
875            index.artifact_kind = Some(artifact.artifact_kind.clone());
876            index.artifact_checksum = Some(artifact.checksum);
877            index.build_state = "artifact-published".to_string();
878            if let Some(pages) = self.native_vector_artifact_pages() {
879                index.artifact_root_page = pages
880                    .into_iter()
881                    .find(|page| {
882                        page.collection == artifact.collection
883                            && page.artifact_kind == artifact.artifact_kind
884                    })
885                    .map(|page| page.root_page);
886            }
887        }
888        indexes
889    }
890
891    pub(crate) fn warmup_native_vector_artifact_for_index(
892        &self,
893        index: &PhysicalIndexState,
894    ) -> Result<(), String> {
895        let Some(collection) = index.collection.as_deref() else {
896            return Ok(());
897        };
898        let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
899            return Ok(());
900        };
901        self.warmup_native_vector_artifact(collection, Some(artifact_kind))?;
902        Ok(())
903    }
904
905    pub(crate) fn apply_runtime_native_artifact_to_index_state(
906        &self,
907        index: &mut PhysicalIndexState,
908    ) -> Result<(), String> {
909        let Some(collection) = index.collection.as_deref() else {
910            return Ok(());
911        };
912        let Some(artifact_kind) = Self::native_artifact_kind_for_index(index.kind) else {
913            return Ok(());
914        };
915        let artifact = self.inspect_native_vector_artifact(collection, Some(artifact_kind))?;
916        index.entries = artifact
917            .graph_edge_count
918            .or(artifact.text_posting_count)
919            .unwrap_or(artifact.node_count) as usize;
920        index.estimated_memory_bytes = artifact.byte_len;
921        index.backend = format!("{}+native-artifact", index_backend_name(index.kind));
922        index.artifact_kind = Some(artifact.artifact_kind.clone());
923        index.artifact_checksum = Some(artifact.checksum);
924        index.build_state = "ready".to_string();
925        index.artifact_root_page = self
926            .native_vector_artifact_pages()
927            .and_then(|pages| {
928                pages.into_iter().find(|page| {
929                    page.collection == artifact.collection
930                        && page.artifact_kind == artifact.artifact_kind
931                })
932            })
933            .map(|page| page.root_page);
934        Ok(())
935    }
936
937    pub(crate) fn physical_index_state_from_native_state(
938        &self,
939        native_state: &NativePhysicalState,
940        previous: Option<&PhysicalMetadataFile>,
941    ) -> Vec<PhysicalIndexState> {
942        let mut fresh = self.physical_index_state();
943        let Some(registry) = native_state.registry.as_ref() else {
944            if let Some(previous) = previous {
945                for index in &previous.indexes {
946                    if !fresh.iter().any(|candidate| candidate.name == index.name) {
947                        fresh.push(index.clone());
948                    }
949                }
950            }
951            return fresh;
952        };
953
954        for index in &mut fresh {
955            if let Some(native) = registry
956                .indexes
957                .iter()
958                .find(|candidate| candidate.name == index.name)
959            {
960                index.enabled = native.enabled;
961                index.last_refresh_ms = native.last_refresh_ms;
962                index.backend = native.backend.clone();
963                index.entries = native.entries as usize;
964                index.estimated_memory_bytes = native.estimated_memory_bytes;
965                if index.artifact_kind.is_none() {
966                    index.artifact_kind = Self::native_artifact_kind_for_index(index.kind)
967                        .map(|value| value.to_string());
968                }
969                if index.build_state == "catalog-derived" {
970                    index.build_state = "registry-loaded".to_string();
971                }
972            }
973        }
974
975        for native in &registry.indexes {
976            if fresh.iter().any(|index| index.name == native.name) {
977                continue;
978            }
979            let Some(kind) = Self::index_kind_from_str(&native.kind) else {
980                continue;
981            };
982            fresh.push(PhysicalIndexState {
983                name: native.name.clone(),
984                kind,
985                collection: native.collection.clone(),
986                enabled: native.enabled,
987                entries: native.entries as usize,
988                estimated_memory_bytes: native.estimated_memory_bytes,
989                last_refresh_ms: native.last_refresh_ms,
990                backend: native.backend.clone(),
991                artifact_kind: Self::native_artifact_kind_for_index(kind)
992                    .map(|value| value.to_string()),
993                artifact_root_page: None,
994                artifact_checksum: None,
995                build_state: "registry-loaded".to_string(),
996            });
997        }
998
999        if !registry.indexes_complete {
1000            if let Some(previous) = previous {
1001                for index in &previous.indexes {
1002                    if !fresh.iter().any(|candidate| candidate.name == index.name) {
1003                        fresh.push(index.clone());
1004                    }
1005                }
1006            }
1007        }
1008
1009        fresh
1010    }
1011
1012    pub(crate) fn graph_projections_from_native_state(
1013        &self,
1014        native_state: &NativePhysicalState,
1015    ) -> Vec<PhysicalGraphProjection> {
1016        native_state
1017            .registry
1018            .as_ref()
1019            .map(|registry| {
1020                registry
1021                    .graph_projections
1022                    .iter()
1023                    .map(|projection| PhysicalGraphProjection {
1024                        name: projection.name.clone(),
1025                        created_at_unix_ms: projection.created_at_unix_ms,
1026                        updated_at_unix_ms: projection.updated_at_unix_ms,
1027                        state: "materialized".to_string(),
1028                        source: projection.source.clone(),
1029                        node_labels: projection.node_labels.clone(),
1030                        node_types: projection.node_types.clone(),
1031                        edge_labels: projection.edge_labels.clone(),
1032                        last_materialized_sequence: projection.last_materialized_sequence,
1033                    })
1034                    .collect()
1035            })
1036            .unwrap_or_default()
1037    }
1038
1039    pub(crate) fn analytics_jobs_from_native_state(
1040        &self,
1041        native_state: &NativePhysicalState,
1042    ) -> Vec<PhysicalAnalyticsJob> {
1043        native_state
1044            .registry
1045            .as_ref()
1046            .map(|registry| {
1047                registry
1048                    .analytics_jobs
1049                    .iter()
1050                    .map(|job| PhysicalAnalyticsJob {
1051                        id: job.id.clone(),
1052                        kind: job.kind.clone(),
1053                        state: job.state.clone(),
1054                        projection: job.projection.clone(),
1055                        created_at_unix_ms: job.created_at_unix_ms,
1056                        updated_at_unix_ms: job.updated_at_unix_ms,
1057                        last_run_sequence: job.last_run_sequence,
1058                        metadata: job.metadata.clone(),
1059                    })
1060                    .collect()
1061            })
1062            .unwrap_or_default()
1063    }
1064
1065    pub(crate) fn exports_from_native_state(
1066        &self,
1067        native_state: &NativePhysicalState,
1068    ) -> Vec<ExportDescriptor> {
1069        native_state
1070            .recovery
1071            .as_ref()
1072            .map(|recovery| {
1073                recovery
1074                    .exports
1075                    .iter()
1076                    .map(|export| ExportDescriptor {
1077                        name: export.name.clone(),
1078                        created_at_unix_ms: export.created_at_unix_ms,
1079                        snapshot_id: export.snapshot_id,
1080                        superblock_sequence: export.superblock_sequence,
1081                        data_path: self
1082                            .path()
1083                            .map(|path| {
1084                                crate::physical::PhysicalMetadataFile::export_data_path_for(
1085                                    path,
1086                                    &export.name,
1087                                )
1088                                .display()
1089                                .to_string()
1090                            })
1091                            .unwrap_or_default(),
1092                        metadata_path: self
1093                            .path()
1094                            .map(|path| {
1095                                let export_data_path =
1096                                    crate::physical::PhysicalMetadataFile::export_data_path_for(
1097                                        path,
1098                                        &export.name,
1099                                    );
1100                                crate::physical::PhysicalMetadataFile::metadata_path_for(
1101                                    &export_data_path,
1102                                )
1103                                .display()
1104                                .to_string()
1105                            })
1106                            .unwrap_or_default(),
1107                        collection_count: export.collection_count as usize,
1108                        total_entities: export.total_entities as usize,
1109                    })
1110                    .collect()
1111            })
1112            .unwrap_or_default()
1113    }
1114
1115    pub(crate) fn snapshots_from_native_state(
1116        &self,
1117        native_state: &NativePhysicalState,
1118    ) -> Vec<crate::physical::SnapshotDescriptor> {
1119        let snapshots: Vec<_> = native_state
1120            .recovery
1121            .as_ref()
1122            .map(|recovery| {
1123                recovery
1124                    .snapshots
1125                    .iter()
1126                    .map(|snapshot| crate::physical::SnapshotDescriptor {
1127                        snapshot_id: snapshot.snapshot_id,
1128                        created_at_unix_ms: snapshot.created_at_unix_ms,
1129                        superblock_sequence: snapshot.superblock_sequence,
1130                        collection_count: snapshot.collection_count as usize,
1131                        total_entities: snapshot.total_entities as usize,
1132                    })
1133                    .collect()
1134            })
1135            .unwrap_or_default();
1136        if !snapshots.is_empty() {
1137            return snapshots;
1138        }
1139
1140        let now = SystemTime::now()
1141            .duration_since(UNIX_EPOCH)
1142            .unwrap_or_default()
1143            .as_millis();
1144        let (collection_count, total_entities) = native_state
1145            .catalog
1146            .as_ref()
1147            .map(|catalog| {
1148                (
1149                    catalog.collection_count as usize,
1150                    catalog.total_entities as usize,
1151                )
1152            })
1153            .unwrap_or_else(|| {
1154                let catalog = self.catalog_snapshot();
1155                (catalog.total_collections, catalog.total_entities)
1156            });
1157
1158        vec![crate::physical::SnapshotDescriptor {
1159            snapshot_id: native_state.header.sequence,
1160            created_at_unix_ms: now,
1161            superblock_sequence: native_state.header.sequence,
1162            collection_count,
1163            total_entities,
1164        }]
1165    }
1166
1167    fn index_kind_from_str(value: &str) -> Option<crate::index::IndexKind> {
1168        match value {
1169            "btree" => Some(crate::index::IndexKind::BTree),
1170            "vector.hnsw" => Some(crate::index::IndexKind::VectorHnsw),
1171            "vector.inverted" => Some(crate::index::IndexKind::VectorInverted),
1172            "vector.turbo" => Some(crate::index::IndexKind::VectorTurbo),
1173            "graph.adjacency" => Some(crate::index::IndexKind::GraphAdjacency),
1174            "text.fulltext" => Some(crate::index::IndexKind::FullText),
1175            "document.pathvalue" => Some(crate::index::IndexKind::DocumentPathValue),
1176            "search.hybrid" => Some(crate::index::IndexKind::HybridSearch),
1177            _ => None,
1178        }
1179    }
1180
1181    pub(crate) fn native_artifact_kind_for_index(kind: IndexKind) -> Option<&'static str> {
1182        match kind {
1183            IndexKind::VectorHnsw => Some("hnsw"),
1184            IndexKind::VectorInverted => Some("ivf"),
1185            IndexKind::VectorTurbo => Some("turboquant"),
1186            IndexKind::GraphAdjacency => Some("graph.adjacency"),
1187            IndexKind::FullText => Some("text.fulltext"),
1188            IndexKind::DocumentPathValue => Some("document.pathvalue"),
1189            _ => None,
1190        }
1191    }
1192
1193    fn index_is_declared(&self, name: &str) -> bool {
1194        self.physical_metadata()
1195            .map(|metadata| metadata.indexes.iter().any(|index| index.name == name))
1196            .unwrap_or(false)
1197    }
1198
1199    pub(crate) fn graph_projection_is_declared(&self, name: &str) -> bool {
1200        self.physical_metadata()
1201            .map(|metadata| {
1202                metadata
1203                    .graph_projections
1204                    .iter()
1205                    .any(|projection| projection.name == name)
1206            })
1207            .unwrap_or(false)
1208    }
1209
1210    pub(crate) fn graph_projection_is_operational(&self, name: &str) -> bool {
1211        self.operational_graph_projections()
1212            .into_iter()
1213            .any(|projection| projection.name == name && projection.state == "materialized")
1214    }
1215
1216    pub(crate) fn analytics_job_id(kind: &str, projection: Option<&str>) -> String {
1217        match projection {
1218            Some(projection) => format!("{kind}::{projection}"),
1219            None => format!("{kind}::global"),
1220        }
1221    }
1222
1223    pub(crate) fn update_physical_metadata<T, F>(
1224        &self,
1225        mutator: F,
1226    ) -> Result<T, Box<dyn std::error::Error>>
1227    where
1228        F: FnOnce(&mut PhysicalMetadataFile) -> T,
1229    {
1230        if self.options.mode != StorageMode::Persistent {
1231            return Err("physical metadata operations require persistent mode".into());
1232        }
1233        if self.options.read_only {
1234            return Err("physical metadata operations are not allowed in read-only mode".into());
1235        }
1236        let Some(path) = self.path() else {
1237            return Err("database path is not available".into());
1238        };
1239
1240        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1241
1242        if metadata.indexes.is_empty() {
1243            metadata.indexes = self.physical_index_state();
1244        }
1245        metadata.superblock.collection_roots = self.physical_collection_roots();
1246
1247        let result = mutator(&mut metadata);
1248        metadata.save_for_data_path(path)?;
1249        self.persist_native_physical_header(&metadata)?;
1250        Ok(result)
1251    }
1252
1253    pub(crate) fn persist_native_physical_header(
1254        &self,
1255        metadata: &PhysicalMetadataFile,
1256    ) -> Result<(), Box<dyn std::error::Error>> {
1257        if !self.paged_mode {
1258            return Ok(());
1259        }
1260
1261        let existing_page = self
1262            .store
1263            .physical_file_header()
1264            .map(|header| header.collection_roots_page)
1265            .filter(|page| *page != 0);
1266        let existing_registry_page = self
1267            .store
1268            .physical_file_header()
1269            .map(|header| header.registry_page)
1270            .filter(|page| *page != 0);
1271        let existing_recovery_page = self
1272            .store
1273            .physical_file_header()
1274            .map(|header| header.recovery_page)
1275            .filter(|page| *page != 0);
1276        let existing_catalog_page = self
1277            .store
1278            .physical_file_header()
1279            .map(|header| header.catalog_page)
1280            .filter(|page| *page != 0);
1281        let existing_metadata_state_page = self
1282            .store
1283            .physical_file_header()
1284            .map(|header| header.metadata_state_page)
1285            .filter(|page| *page != 0);
1286        let existing_vector_artifact_page = self
1287            .store
1288            .physical_file_header()
1289            .map(|header| header.vector_artifact_page)
1290            .filter(|page| *page != 0);
1291        let existing_manifest_page = self
1292            .store
1293            .physical_file_header()
1294            .map(|header| header.manifest_page)
1295            .filter(|page| *page != 0);
1296        let (manifest_page, manifest_checksum) = self.store.write_native_manifest_summary(
1297            metadata.superblock.sequence,
1298            &metadata.manifest_events,
1299            existing_manifest_page,
1300        )?;
1301        let (collection_roots_page, collection_roots_checksum) = self
1302            .store
1303            .write_native_collection_roots(&metadata.superblock.collection_roots, existing_page)?;
1304        let registry_summary = self.native_registry_summary_from_metadata(metadata);
1305        let (registry_page, registry_checksum) = self
1306            .store
1307            .write_native_registry_summary(&registry_summary, existing_registry_page)?;
1308        let recovery_summary = Self::native_recovery_summary_from_metadata(metadata);
1309        let (recovery_page, recovery_checksum) = self
1310            .store
1311            .write_native_recovery_summary(&recovery_summary, existing_recovery_page)?;
1312        let catalog_summary = Self::native_catalog_summary_from_metadata(metadata);
1313        let (catalog_page, catalog_checksum) = self
1314            .store
1315            .write_native_catalog_summary(&catalog_summary, existing_catalog_page)?;
1316        let metadata_state_summary = Self::native_metadata_state_summary_from_metadata(metadata);
1317        let (metadata_state_page, metadata_state_checksum) =
1318            self.store.write_native_metadata_state_summary(
1319                &metadata_state_summary,
1320                existing_metadata_state_page,
1321            )?;
1322        let vector_artifact_records = self.native_vector_artifact_records();
1323        let vector_artifact_payloads = vector_artifact_records
1324            .iter()
1325            .map(|(summary, bytes)| {
1326                (
1327                    summary.collection.clone(),
1328                    summary.artifact_kind.clone(),
1329                    bytes.clone(),
1330                )
1331            })
1332            .collect::<Vec<_>>();
1333        let (vector_artifact_page, vector_artifact_checksum, _vector_artifact_pages) =
1334            self.store.write_native_vector_artifact_store(
1335                &vector_artifact_payloads,
1336                existing_vector_artifact_page,
1337            )?;
1338        let mut header = Self::native_header_from_metadata(metadata);
1339        header.manifest_page = manifest_page;
1340        header.manifest_checksum = manifest_checksum;
1341        header.collection_roots_page = collection_roots_page;
1342        header.collection_roots_checksum = collection_roots_checksum;
1343        header.registry_page = registry_page;
1344        header.registry_checksum = registry_checksum;
1345        header.recovery_page = recovery_page;
1346        header.recovery_checksum = recovery_checksum;
1347        header.catalog_page = catalog_page;
1348        header.catalog_checksum = catalog_checksum;
1349        header.metadata_state_page = metadata_state_page;
1350        header.metadata_state_checksum = metadata_state_checksum;
1351        header.vector_artifact_page = vector_artifact_page;
1352        header.vector_artifact_checksum = vector_artifact_checksum;
1353        self.store.update_physical_file_header(header)?;
1354        self.store.persist()?;
1355        Ok(())
1356    }
1357
1358    pub(crate) fn native_header_from_metadata(
1359        metadata: &PhysicalMetadataFile,
1360    ) -> PhysicalFileHeader {
1361        PhysicalFileHeader {
1362            format_version: metadata.superblock.format_version,
1363            sequence: metadata.superblock.sequence,
1364            manifest_oldest_root: metadata.superblock.manifest.oldest.index,
1365            manifest_root: metadata.superblock.manifest.newest.index,
1366            free_set_root: metadata.superblock.free_set.index,
1367            manifest_page: 0,
1368            manifest_checksum: 0,
1369            collection_roots_page: 0,
1370            collection_roots_checksum: 0,
1371            collection_root_count: metadata.superblock.collection_roots.len() as u32,
1372            snapshot_count: metadata.snapshots.len() as u32,
1373            index_count: metadata.indexes.len() as u32,
1374            catalog_collection_count: metadata.catalog.total_collections as u32,
1375            catalog_total_entities: metadata.catalog.total_entities as u64,
1376            export_count: metadata.exports.len() as u32,
1377            graph_projection_count: metadata.graph_projections.len() as u32,
1378            analytics_job_count: metadata.analytics_jobs.len() as u32,
1379            manifest_event_count: metadata.manifest_events.len() as u32,
1380            registry_page: 0,
1381            registry_checksum: 0,
1382            recovery_page: 0,
1383            recovery_checksum: 0,
1384            catalog_page: 0,
1385            catalog_checksum: 0,
1386            metadata_state_page: 0,
1387            metadata_state_checksum: 0,
1388            vector_artifact_page: 0,
1389            vector_artifact_checksum: 0,
1390        }
1391    }
1392
1393    fn recover_queue_pending_state(&self) {
1394        const QUEUE_META_COLLECTION: &str = "red_queue_meta";
1395
1396        let Some(manager) = self.store.get_collection(QUEUE_META_COLLECTION) else {
1397            return;
1398        };
1399
1400        let pending_rows = manager.query_all(|entity| {
1401            entity.data.as_row().is_some_and(|row| {
1402                matches!(
1403                    row.get_field("kind"),
1404                    Some(crate::storage::schema::Value::Text(kind))
1405                        if matches!(&**kind, "queue_pending" | "queue_pending_lc")
1406                )
1407            })
1408        });
1409
1410        for row in pending_rows {
1411            let _ = self.store.delete(QUEUE_META_COLLECTION, row.id);
1412        }
1413    }
1414}
1415
1416fn current_unix_ms() -> u64 {
1417    SystemTime::now()
1418        .duration_since(UNIX_EPOCH)
1419        .unwrap_or_default()
1420        .as_millis()
1421        .min(u128::from(u64::MAX)) as u64
1422}