Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    /// Single-file artifact only: mirror the current contract set into the
141    /// store's auxiliary-metadata blob so it rides the binary dump. Without
142    /// this the contracts live only in the in-memory cache and a collection's
143    /// declared model (e.g. `kv`) is lost on reopen — recovery would re-infer
144    /// it from the stored entities and a KV collection would come back as a
145    /// table. No-op for other profiles, which persist contracts via the
146    /// metadata sidecar.
147    pub(crate) fn sync_single_file_contract_blob(&self) {
148        let contracts = self.collection_contracts();
149        let bytes = crate::physical::serialize_collection_contracts(&contracts);
150        self.store().set_aux_metadata(bytes);
151    }
152
153    /// Seed the in-memory contract cache from the store's auxiliary-metadata
154    /// blob (single-file artifact). Called on open so a collection's declared
155    /// model survives a restart. Does nothing when the blob is absent (older
156    /// V9 dumps) or empty.
157    pub(crate) fn seed_contract_cache_from_store_aux(&self) {
158        let bytes = self.store().aux_metadata();
159        if bytes.is_empty() {
160            return;
161        }
162        let contracts = match crate::physical::deserialize_collection_contracts(&bytes) {
163            Ok(contracts) => contracts,
164            Err(_) => return,
165        };
166        let map: std::collections::HashMap<_, _> = contracts
167            .into_iter()
168            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
169            .collect();
170        // Restore the per-collection state derived from contracts (context
171        // index enablement + default TTLs) so the full collection definition —
172        // not just the declared model — survives a restart.
173        let store = self.store();
174        let index = store.context_index();
175        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
176            for contract in map.values() {
177                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
178                if let Some(ttl_ms) = contract.default_ttl_ms {
179                    defaults.insert(contract.name.clone(), ttl_ms);
180                }
181            }
182        }
183        if let Ok(mut guard) = self.collection_contract_cache.write() {
184            *guard = Some(std::sync::Arc::new(map));
185        }
186    }
187
188    pub fn save_collection_contract(
189        &self,
190        contract: crate::physical::CollectionContract,
191    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
192        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
193            if let Some(ttl_ms) = contract.default_ttl_ms {
194                defaults.insert(contract.name.clone(), ttl_ms);
195            } else {
196                defaults.remove(&contract.name);
197            }
198        }
199
200        self.store()
201            .context_index()
202            .set_collection_enabled(&contract.name, contract.context_index_enabled);
203        if let Some(manager) = self.store().get_collection(&contract.name) {
204            let columns = contract
205                .declared_columns
206                .iter()
207                .map(|column| column.name.clone())
208                .collect();
209            manager.set_column_schema_if_empty(columns);
210        }
211
212        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
213            && self.options.storage_profile.packaging
214                == crate::storage::StoragePackaging::SingleFile
215        {
216            if let Ok(mut guard) = self.collection_contract_cache.write() {
217                let mut contracts = guard
218                    .as_ref()
219                    .map(|existing| existing.as_ref().clone())
220                    .unwrap_or_default();
221                contracts.insert(contract.name.clone(), std::sync::Arc::new(contract.clone()));
222                *guard = Some(std::sync::Arc::new(contracts));
223            }
224            self.sync_single_file_contract_blob();
225            return Ok(contract);
226        }
227
228        self.invalidate_collection_contract_cache();
229
230        let saved = self.update_physical_metadata(|metadata| {
231            if let Some(existing) = metadata
232                .collection_contracts
233                .iter_mut()
234                .find(|existing| existing.name == contract.name)
235            {
236                *existing = contract.clone();
237            } else {
238                metadata.collection_contracts.push(contract.clone());
239            }
240            metadata
241                .collection_contracts
242                .sort_by(|left, right| left.name.cmp(&right.name));
243
244            if let Some(ttl_ms) = contract.default_ttl_ms {
245                metadata
246                    .collection_ttl_defaults_ms
247                    .insert(contract.name.clone(), ttl_ms);
248            } else {
249                metadata.collection_ttl_defaults_ms.remove(&contract.name);
250            }
251
252            contract.clone()
253        })?;
254        self.invalidate_collection_contract_cache();
255        Ok(saved)
256    }
257
258    pub fn remove_collection_contract(
259        &self,
260        collection: &str,
261    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
262        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
263            defaults.remove(collection);
264        }
265
266        self.store()
267            .context_index()
268            .set_collection_enabled(collection, false);
269
270        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
271            && self.options.storage_profile.packaging
272                == crate::storage::StoragePackaging::SingleFile
273        {
274            let mut removed = None;
275            if let Ok(mut guard) = self.collection_contract_cache.write() {
276                let mut contracts = guard
277                    .as_ref()
278                    .map(|existing| existing.as_ref().clone())
279                    .unwrap_or_default();
280                removed = contracts
281                    .remove(collection)
282                    .map(|contract| (*contract).clone());
283                *guard = Some(std::sync::Arc::new(contracts));
284            }
285            self.sync_single_file_contract_blob();
286            return Ok(removed);
287        }
288
289        self.invalidate_collection_contract_cache();
290
291        let removed = self.update_physical_metadata(|metadata| {
292            let removed = metadata
293                .collection_contracts
294                .iter()
295                .position(|contract| contract.name == collection)
296                .map(|index| metadata.collection_contracts.remove(index));
297            metadata.collection_ttl_defaults_ms.remove(collection);
298            metadata
299                .indexes
300                .retain(|index| index.collection.as_deref() != Some(collection));
301            removed
302        })?;
303        self.invalidate_collection_contract_cache();
304        Ok(removed)
305    }
306
307    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
308        self.collection_ttl_defaults_ms
309            .read()
310            .map(|defaults| {
311                defaults
312                    .iter()
313                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
314                    .collect()
315            })
316            .unwrap_or_default()
317    }
318
319    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
320        let metadata = self.physical_metadata();
321        let defaults = metadata
322            .as_ref()
323            .map(|m| m.collection_ttl_defaults_ms.clone())
324            .unwrap_or_default();
325
326        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
327            current.clear();
328            current.extend(defaults);
329        }
330
331        if let Some(metadata) = metadata {
332            let store = self.store();
333            let index = store.context_index();
334            for contract in &metadata.collection_contracts {
335                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
336            }
337        }
338    }
339
340    /// Snapshot the live hypertable registry (specs + every chunk)
341    /// into the sidecar-serialisable form. Called from
342    /// [`persist_metadata`] so the chunk routing spine is durable on
343    /// the same metadata path as collection contracts — issue #866.
344    /// Returns an empty vec (and touches nothing) when no hypertable
345    /// has ever been declared, so non-hypertable databases pay no
346    /// persist cost.
347    pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
348        let registry = self.hypertables();
349        if registry.is_empty() {
350            return Vec::new();
351        }
352        use std::collections::BTreeMap;
353        let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
354            BTreeMap::new();
355        for chunk in registry.snapshot_chunks() {
356            chunks_by_table
357                .entry(chunk.id.hypertable.clone())
358                .or_default()
359                .push(crate::physical::PhysicalHypertableChunk {
360                    start_ns: chunk.id.start_ns,
361                    end_ns_exclusive: chunk.end_ns_exclusive,
362                    row_count: chunk.row_count,
363                    min_ts_ns: chunk.min_ts_ns,
364                    max_ts_ns: chunk.max_ts_ns,
365                    sealed: chunk.sealed,
366                    ttl_override_ns: chunk.ttl_override_ns,
367                    columnar_page: chunk.columnar_page,
368                });
369        }
370        registry
371            .list()
372            .into_iter()
373            .map(|spec| crate::physical::PhysicalHypertable {
374                chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
375                name: spec.name,
376                time_column: spec.time_column,
377                chunk_interval_ns: spec.chunk_interval_ns,
378                default_ttl_ns: spec.default_ttl_ns,
379            })
380            .collect()
381    }
382
383    /// Rehydrate the hypertable registry from the physical metadata
384    /// sidecar at boot. Mirror of
385    /// [`load_collection_ttl_defaults_from_metadata`] — specs and
386    /// chunks are restored verbatim so bounds / routing / TTL are
387    /// identical to the pre-restart state (issue #866).
388    pub(crate) fn load_hypertables_from_metadata(&self) {
389        let Some(metadata) = self.physical_metadata() else {
390            return;
391        };
392        if metadata.hypertables.is_empty() {
393            return;
394        }
395        let registry = self.hypertables();
396        for hypertable in &metadata.hypertables {
397            let mut spec = crate::storage::timeseries::HypertableSpec::new(
398                hypertable.name.clone(),
399                hypertable.time_column.clone(),
400                hypertable.chunk_interval_ns,
401            );
402            if let Some(ttl) = hypertable.default_ttl_ns {
403                spec = spec.with_ttl_ns(ttl);
404            }
405            registry.register(spec);
406            for chunk in &hypertable.chunks {
407                registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
408                    id: crate::storage::timeseries::ChunkId {
409                        hypertable: hypertable.name.clone(),
410                        start_ns: chunk.start_ns,
411                    },
412                    end_ns_exclusive: chunk.end_ns_exclusive,
413                    row_count: chunk.row_count,
414                    min_ts_ns: chunk.min_ts_ns,
415                    max_ts_ns: chunk.max_ts_ns,
416                    sealed: chunk.sealed,
417                    ttl_override_ns: chunk.ttl_override_ns,
418                    columnar_page: chunk.columnar_page,
419                });
420            }
421        }
422    }
423
424    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
425        self.store.run_maintenance()?;
426        self.persist_metadata()?;
427        Ok(())
428    }
429
430    /// Path to the physical metadata sidecar, if persistent.
431    pub fn metadata_path(&self) -> Option<PathBuf> {
432        self.path
433            .as_ref()
434            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
435    }
436
437    /// Load the current physical metadata view, bootstrapping from native state when needed.
438    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
439        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
440            .ok()
441    }
442
443    /// Physical index registry derived for the current database state.
444    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
445        let indexes = self
446            .physical_metadata()
447            .map(|metadata| metadata.indexes)
448            .filter(|indexes| !indexes.is_empty())
449            .or_else(|| {
450                self.native_physical_state()
451                    .map(|state| self.physical_index_state_from_native_state(&state, None))
452            })
453            .unwrap_or_else(|| self.physical_index_state());
454        self.reconcile_index_states_with_native_artifacts(indexes)
455    }
456
457    /// List registered named exports from the current physical metadata view.
458    pub fn exports(&self) -> Vec<ExportDescriptor> {
459        self.physical_metadata()
460            .map(|metadata| metadata.exports)
461            .or_else(|| {
462                self.native_physical_state()
463                    .map(|state| self.exports_from_native_state(&state))
464            })
465            .unwrap_or_default()
466    }
467
468    /// List recorded snapshots from the current physical metadata view.
469    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
470        if let Some(metadata) = self.physical_metadata() {
471            if !metadata.snapshots.is_empty() {
472                return metadata.snapshots;
473            }
474        }
475        self.native_physical_state()
476            .map(|state| self.snapshots_from_native_state(&state))
477            .unwrap_or_default()
478    }
479
480    /// List persisted named graph projections from the current physical metadata view.
481    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
482        self.physical_metadata()
483            .map(|metadata| metadata.graph_projections)
484            .or_else(|| {
485                self.native_physical_state()
486                    .map(|state| self.graph_projections_from_native_state(&state))
487            })
488            .unwrap_or_default()
489    }
490
491    /// List graph projections declared in the catalog view.
492    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
493        self.catalog_model_snapshot().declared_graph_projections
494    }
495
496    /// List graph projections currently observed in the operational view.
497    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
498        self.graph_projections()
499            .into_iter()
500            .filter(|projection| {
501                projection.last_materialized_sequence.is_some()
502                    || matches!(projection.state.as_str(), "materialized" | "stale")
503            })
504            .collect()
505    }
506
507    /// List persisted analytics job metadata from the current physical metadata view.
508    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
509        self.physical_metadata()
510            .map(|metadata| metadata.analytics_jobs)
511            .or_else(|| {
512                self.native_physical_state()
513                    .map(|state| self.analytics_jobs_from_native_state(&state))
514            })
515            .unwrap_or_default()
516    }
517
518    /// List analytics jobs declared in the catalog view.
519    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
520        self.catalog_model_snapshot().declared_analytics_jobs
521    }
522
523    /// List analytics jobs currently observed in the operational view.
524    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
525        self.analytics_jobs()
526            .into_iter()
527            .filter(|job| {
528                job.last_run_sequence.is_some()
529                    || matches!(
530                        job.state.as_str(),
531                        "running" | "completed" | "failed" | "queued" | "stale"
532                    )
533            })
534            .collect()
535    }
536
537    /// List indexes declared in the catalog view.
538    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
539        self.catalog_model_snapshot().declared_indexes
540    }
541
542    /// List indexes currently observed in the operational view.
543    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
544        self.catalog_model_snapshot().operational_indexes
545    }
546
547    /// List reconciled index status entries from the catalog snapshot.
548    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
549        self.catalog_model_snapshot().index_statuses
550    }
551
552    /// Resolve one index status entry from the catalog snapshot.
553    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
554        self.catalog_model_snapshot()
555            .index_statuses
556            .into_iter()
557            .find(|status| status.name == name)
558    }
559
560    /// Upsert a named graph projection in the persisted physical metadata.
561    pub fn save_graph_projection(
562        &self,
563        name: impl Into<String>,
564        node_labels: Vec<String>,
565        node_types: Vec<String>,
566        edge_labels: Vec<String>,
567        source: impl Into<String>,
568    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
569        let name = name.into();
570        let source = source.into();
571        self.update_physical_metadata(|metadata| {
572            let now = SystemTime::now()
573                .duration_since(UNIX_EPOCH)
574                .unwrap_or_default()
575                .as_millis();
576            let projection = if let Some(existing) = metadata
577                .graph_projections
578                .iter_mut()
579                .find(|projection| projection.name == name)
580            {
581                existing.updated_at_unix_ms = now;
582                existing.state = "declared".to_string();
583                existing.source = source.clone();
584                existing.node_labels = node_labels.clone();
585                existing.node_types = node_types.clone();
586                existing.edge_labels = edge_labels.clone();
587                existing.last_materialized_sequence = None;
588                existing.clone()
589            } else {
590                let projection = PhysicalGraphProjection {
591                    name: name.clone(),
592                    created_at_unix_ms: now,
593                    updated_at_unix_ms: now,
594                    state: "declared".to_string(),
595                    source: source.clone(),
596                    node_labels: node_labels.clone(),
597                    node_types: node_types.clone(),
598                    edge_labels: edge_labels.clone(),
599                    last_materialized_sequence: None,
600                };
601                metadata.graph_projections.push(projection.clone());
602                projection
603            };
604
605            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
606
607            metadata
608                .graph_projections
609                .sort_by(|left, right| left.name.cmp(&right.name));
610            projection
611        })
612    }
613
614    /// Mark a declared graph projection as materialized in the current physical metadata.
615    pub fn materialize_graph_projection(
616        &self,
617        name: &str,
618    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
619        self.update_physical_metadata(|metadata| {
620            let now = SystemTime::now()
621                .duration_since(UNIX_EPOCH)
622                .unwrap_or_default()
623                .as_millis();
624            let idx = metadata
625                .graph_projections
626                .iter()
627                .position(|projection| projection.name == name);
628            if let Some(idx) = idx {
629                metadata.graph_projections[idx].updated_at_unix_ms = now;
630                metadata.graph_projections[idx].state = "materialized".to_string();
631                metadata.graph_projections[idx].last_materialized_sequence =
632                    Some(metadata.superblock.sequence);
633                let result = metadata.graph_projections[idx].clone();
634                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
635                return Some(result);
636            }
637            None
638        })
639    }
640
641    /// Mark a declared graph projection as materializing.
642    pub fn mark_graph_projection_materializing(
643        &self,
644        name: &str,
645    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
646        self.update_physical_metadata(|metadata| {
647            let now = SystemTime::now()
648                .duration_since(UNIX_EPOCH)
649                .unwrap_or_default()
650                .as_millis();
651            let idx = metadata
652                .graph_projections
653                .iter()
654                .position(|projection| projection.name == name);
655            if let Some(idx) = idx {
656                metadata.graph_projections[idx].updated_at_unix_ms = now;
657                metadata.graph_projections[idx].state = "materializing".to_string();
658                metadata.graph_projections[idx].last_materialized_sequence = None;
659                let result = metadata.graph_projections[idx].clone();
660                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
661                return Some(result);
662            }
663            None
664        })
665    }
666
667    /// Mark a graph projection as failed.
668    pub fn fail_graph_projection(
669        &self,
670        name: &str,
671    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
672        self.update_physical_metadata(|metadata| {
673            let now = SystemTime::now()
674                .duration_since(UNIX_EPOCH)
675                .unwrap_or_default()
676                .as_millis();
677            let idx = metadata
678                .graph_projections
679                .iter()
680                .position(|projection| projection.name == name);
681            if let Some(idx) = idx {
682                metadata.graph_projections[idx].updated_at_unix_ms = now;
683                metadata.graph_projections[idx].state = "failed".to_string();
684                metadata.graph_projections[idx].last_materialized_sequence = None;
685                let result = metadata.graph_projections[idx].clone();
686                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
687                return Some(result);
688            }
689            None
690        })
691    }
692
693    /// Mark a graph projection as stale while preserving any last materialized sequence.
694    pub fn mark_graph_projection_stale(
695        &self,
696        name: &str,
697    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
698        self.update_physical_metadata(|metadata| {
699            let now = SystemTime::now()
700                .duration_since(UNIX_EPOCH)
701                .unwrap_or_default()
702                .as_millis();
703            let idx = metadata
704                .graph_projections
705                .iter()
706                .position(|projection| projection.name == name);
707            if let Some(idx) = idx {
708                metadata.graph_projections[idx].updated_at_unix_ms = now;
709                metadata.graph_projections[idx].state = "stale".to_string();
710                let result = metadata.graph_projections[idx].clone();
711                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
712                return Some(result);
713            }
714            None
715        })
716    }
717
718    fn mark_projection_dependent_jobs_stale(
719        metadata: &mut PhysicalMetadataFile,
720        projection_name: &str,
721        now: u128,
722    ) {
723        for job in metadata.analytics_jobs.iter_mut() {
724            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
725                job.state = "stale".to_string();
726                job.updated_at_unix_ms = now;
727            }
728        }
729    }
730
731    fn rearm_projection_dependent_jobs_declared(
732        metadata: &mut PhysicalMetadataFile,
733        projection_name: &str,
734        now: u128,
735    ) {
736        for job in metadata.analytics_jobs.iter_mut() {
737            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
738                job.state = "declared".to_string();
739                job.last_run_sequence = None;
740                job.updated_at_unix_ms = now;
741            }
742        }
743    }
744
745    /// Declare or update analytics job metadata without marking it as executed.
746    pub fn save_analytics_job(
747        &self,
748        kind: impl Into<String>,
749        projection: Option<String>,
750        metadata_entries: BTreeMap<String, String>,
751    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
752        let kind = kind.into();
753        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
754        if let Some(projection_name) = projection.as_deref() {
755            if !self.graph_projection_is_declared(projection_name) {
756                return Err(format!(
757                    "graph projection '{projection_name}' is not declared in physical metadata"
758                )
759                .into());
760            }
761        }
762
763        self.update_physical_metadata(|metadata| {
764            let now = SystemTime::now()
765                .duration_since(UNIX_EPOCH)
766                .unwrap_or_default()
767                .as_millis();
768
769            let job = if let Some(existing) = metadata
770                .analytics_jobs
771                .iter_mut()
772                .find(|job| job.id == job_id)
773            {
774                existing.kind = kind.clone();
775                existing.projection = projection.clone();
776                existing.updated_at_unix_ms = now;
777                existing.metadata = metadata_entries.clone();
778                if existing.last_run_sequence.is_none() {
779                    existing.state = "declared".to_string();
780                }
781                existing.clone()
782            } else {
783                let job = PhysicalAnalyticsJob {
784                    id: job_id.clone(),
785                    kind: kind.clone(),
786                    state: "declared".to_string(),
787                    projection: projection.clone(),
788                    created_at_unix_ms: now,
789                    updated_at_unix_ms: now,
790                    last_run_sequence: None,
791                    metadata: metadata_entries.clone(),
792                };
793                metadata.analytics_jobs.push(job.clone());
794                job
795            };
796
797            metadata
798                .analytics_jobs
799                .sort_by(|left, right| left.id.cmp(&right.id));
800            job
801        })
802    }
803
804    /// Record or update analytics job metadata in the persisted physical metadata.
805    pub fn record_analytics_job(
806        &self,
807        kind: impl Into<String>,
808        projection: Option<String>,
809        metadata_entries: BTreeMap<String, String>,
810    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
811        let kind = kind.into();
812        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
813        if let Some(projection_name) = projection.as_deref() {
814            if !self.graph_projection_is_declared(projection_name) {
815                return Err(format!(
816                    "graph projection '{projection_name}' is not declared in physical metadata"
817                )
818                .into());
819            }
820            if !self.graph_projection_is_operational(projection_name) {
821                return Err(format!(
822                    "graph projection '{projection_name}' is declared but not operationally materialized"
823                )
824                .into());
825            }
826        }
827
828        self.update_physical_metadata(|metadata| {
829            let now = SystemTime::now()
830                .duration_since(UNIX_EPOCH)
831                .unwrap_or_default()
832                .as_millis();
833
834            let existing = metadata
835                .analytics_jobs
836                .iter_mut()
837                .find(|job| job.id == job_id)?;
838            existing.state = "completed".to_string();
839            existing.kind = kind.clone();
840            existing.projection = projection.clone();
841            existing.updated_at_unix_ms = now;
842            existing.last_run_sequence = Some(metadata.superblock.sequence);
843            existing.metadata = metadata_entries.clone();
844            let job = existing.clone();
845
846            metadata
847                .analytics_jobs
848                .sort_by(|left, right| left.id.cmp(&right.id));
849            Some(job)
850        })
851        .and_then(|job| {
852            job.ok_or_else(|| {
853                format!("analytics job '{job_id}' is not declared in physical metadata").into()
854            })
855        })
856    }
857
858    /// Mark a declared analytics job as running.
859    pub fn queue_analytics_job(
860        &self,
861        kind: impl Into<String>,
862        projection: Option<String>,
863        metadata_entries: BTreeMap<String, String>,
864    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
865        let kind = kind.into();
866        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
867        if let Some(projection_name) = projection.as_deref() {
868            if !self.graph_projection_is_declared(projection_name) {
869                return Err(format!(
870                    "graph projection '{projection_name}' is not declared in physical metadata"
871                )
872                .into());
873            }
874            if !self.graph_projection_is_operational(projection_name) {
875                return Err(format!(
876                    "graph projection '{projection_name}' is declared but not operationally materialized"
877                )
878                .into());
879            }
880        }
881
882        self.update_physical_metadata(|metadata| {
883            let now = SystemTime::now()
884                .duration_since(UNIX_EPOCH)
885                .unwrap_or_default()
886                .as_millis();
887
888            let existing = metadata
889                .analytics_jobs
890                .iter_mut()
891                .find(|job| job.id == job_id)?;
892            existing.state = "queued".to_string();
893            existing.kind = kind.clone();
894            existing.projection = projection.clone();
895            existing.updated_at_unix_ms = now;
896            existing.metadata = metadata_entries.clone();
897            let job = existing.clone();
898
899            metadata
900                .analytics_jobs
901                .sort_by(|left, right| left.id.cmp(&right.id));
902            Some(job)
903        })
904        .and_then(|job| {
905            job.ok_or_else(|| {
906                format!("analytics job '{job_id}' is not declared in physical metadata").into()
907            })
908        })
909    }
910
911    /// Mark a declared analytics job as running.
912    pub fn start_analytics_job(
913        &self,
914        kind: impl Into<String>,
915        projection: Option<String>,
916        metadata_entries: BTreeMap<String, String>,
917    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
918        let kind = kind.into();
919        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
920        if let Some(projection_name) = projection.as_deref() {
921            if !self.graph_projection_is_declared(projection_name) {
922                return Err(format!(
923                    "graph projection '{projection_name}' is not declared in physical metadata"
924                )
925                .into());
926            }
927            if !self.graph_projection_is_operational(projection_name) {
928                return Err(format!(
929                    "graph projection '{projection_name}' is declared but not operationally materialized"
930                )
931                .into());
932            }
933        }
934
935        self.update_physical_metadata(|metadata| {
936            let now = SystemTime::now()
937                .duration_since(UNIX_EPOCH)
938                .unwrap_or_default()
939                .as_millis();
940
941            let existing = metadata
942                .analytics_jobs
943                .iter_mut()
944                .find(|job| job.id == job_id)?;
945            existing.state = "running".to_string();
946            existing.kind = kind.clone();
947            existing.projection = projection.clone();
948            existing.updated_at_unix_ms = now;
949            existing.metadata = metadata_entries.clone();
950            let job = existing.clone();
951
952            metadata
953                .analytics_jobs
954                .sort_by(|left, right| left.id.cmp(&right.id));
955            Some(job)
956        })
957        .and_then(|job| {
958            job.ok_or_else(|| {
959                format!("analytics job '{job_id}' is not declared in physical metadata").into()
960            })
961        })
962    }
963
964    /// Mark a declared analytics job as failed.
965    pub fn fail_analytics_job(
966        &self,
967        kind: impl Into<String>,
968        projection: Option<String>,
969        metadata_entries: BTreeMap<String, String>,
970    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
971        let kind = kind.into();
972        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
973
974        self.update_physical_metadata(|metadata| {
975            let now = SystemTime::now()
976                .duration_since(UNIX_EPOCH)
977                .unwrap_or_default()
978                .as_millis();
979
980            let existing = metadata
981                .analytics_jobs
982                .iter_mut()
983                .find(|job| job.id == job_id)?;
984            existing.state = "failed".to_string();
985            existing.kind = kind.clone();
986            existing.projection = projection.clone();
987            existing.updated_at_unix_ms = now;
988            existing.metadata = metadata_entries.clone();
989            let job = existing.clone();
990
991            metadata
992                .analytics_jobs
993                .sort_by(|left, right| left.id.cmp(&right.id));
994            Some(job)
995        })
996        .and_then(|job| {
997            job.ok_or_else(|| {
998                format!("analytics job '{job_id}' is not declared in physical metadata").into()
999            })
1000        })
1001    }
1002
1003    /// Mark a declared analytics job as stale.
1004    pub fn mark_analytics_job_stale(
1005        &self,
1006        kind: impl Into<String>,
1007        projection: Option<String>,
1008        metadata_entries: BTreeMap<String, String>,
1009    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
1010        let kind = kind.into();
1011        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
1012
1013        self.update_physical_metadata(|metadata| {
1014            let now = SystemTime::now()
1015                .duration_since(UNIX_EPOCH)
1016                .unwrap_or_default()
1017                .as_millis();
1018
1019            let existing = metadata
1020                .analytics_jobs
1021                .iter_mut()
1022                .find(|job| job.id == job_id)?;
1023            existing.state = "stale".to_string();
1024            existing.kind = kind.clone();
1025            existing.projection = projection.clone();
1026            existing.updated_at_unix_ms = now;
1027            existing.metadata = metadata_entries.clone();
1028            let job = existing.clone();
1029
1030            metadata
1031                .analytics_jobs
1032                .sort_by(|left, right| left.id.cmp(&right.id));
1033            Some(job)
1034        })
1035        .and_then(|job| {
1036            job.ok_or_else(|| {
1037                format!("analytics job '{job_id}' is not declared in physical metadata").into()
1038            })
1039        })
1040    }
1041
1042    /// Create a named export by copying the current database file and metadata sidecar.
1043    pub fn create_named_export(
1044        &self,
1045        name: impl Into<String>,
1046    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
1047        let name = name.into();
1048        if self.options.mode != StorageMode::Persistent {
1049            return Err("exports require persistent mode".into());
1050        }
1051        let Some(path) = self.path() else {
1052            return Err("database path is not available".into());
1053        };
1054
1055        self.flush()?;
1056
1057        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1058        let export_data_path = reddb_file::copy_physical_export_data_file(path, &name)?;
1059        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
1060        let export_metadata_binary_path =
1061            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
1062
1063        let descriptor = ExportDescriptor {
1064            name: name.clone(),
1065            created_at_unix_ms: SystemTime::now()
1066                .duration_since(UNIX_EPOCH)
1067                .unwrap_or_default()
1068                .as_millis(),
1069            snapshot_id: metadata
1070                .snapshots
1071                .last()
1072                .map(|snapshot| snapshot.snapshot_id),
1073            superblock_sequence: metadata.superblock.sequence,
1074            data_path: export_data_path.display().to_string(),
1075            metadata_path: export_metadata_path.display().to_string(),
1076            collection_count: metadata.catalog.total_collections,
1077            total_entities: metadata.catalog.total_entities,
1078        };
1079
1080        metadata
1081            .exports
1082            .retain(|export| export.name != descriptor.name);
1083        metadata.exports.push(descriptor.clone());
1084        self.prune_export_registry(&mut metadata.exports);
1085        metadata.save_for_data_path(path)?;
1086        metadata.save_to_binary_path(&export_metadata_binary_path)?;
1087        metadata.save_to_path(&export_metadata_path)?;
1088
1089        Ok(descriptor)
1090    }
1091
1092    /// Enable or disable a physical index entry in the persisted registry.
1093    pub fn set_index_enabled(
1094        &self,
1095        name: &str,
1096        enabled: bool,
1097    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1098        let Some(status) = self.index_status(name) else {
1099            return Err(format!("index '{name}' is not present in catalog status").into());
1100        };
1101        if !status.declared {
1102            return Err(format!("index '{name}' is not declared in physical metadata").into());
1103        }
1104        self.update_physical_metadata(|metadata| {
1105            let now = SystemTime::now()
1106                .duration_since(UNIX_EPOCH)
1107                .unwrap_or_default()
1108                .as_millis();
1109            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1110                index.enabled = enabled;
1111                if !enabled {
1112                    index.build_state = "disabled".to_string();
1113                } else if index.build_state == "disabled" {
1114                    index.build_state = if index.artifact_root_page.is_some() {
1115                        "ready".to_string()
1116                    } else {
1117                        "declared-unbuilt".to_string()
1118                    };
1119                }
1120                index.last_refresh_ms = Some(now);
1121                return Some(index.clone());
1122            }
1123            None
1124        })
1125    }
1126
1127    /// Mark a declared physical index as building in the persisted registry.
1128    pub fn mark_index_building(
1129        &self,
1130        name: &str,
1131    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1132        let Some(status) = self.index_status(name) else {
1133            return Err(format!("index '{name}' is not present in catalog status").into());
1134        };
1135        if !status.declared {
1136            return Err(format!("index '{name}' is not declared in physical metadata").into());
1137        }
1138        if status.lifecycle_state == "disabled" {
1139            return Err(format!("index '{name}' is disabled").into());
1140        }
1141        self.update_physical_metadata(|metadata| {
1142            let now = SystemTime::now()
1143                .duration_since(UNIX_EPOCH)
1144                .unwrap_or_default()
1145                .as_millis();
1146            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1147                index.build_state = "building".to_string();
1148                index.last_refresh_ms = Some(now);
1149                return Some(index.clone());
1150            }
1151            None
1152        })
1153    }
1154
1155    /// Mark a declared physical index as failed in the persisted registry.
1156    pub fn fail_index(
1157        &self,
1158        name: &str,
1159    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1160        let Some(status) = self.index_status(name) else {
1161            return Err(format!("index '{name}' is not present in catalog status").into());
1162        };
1163        if !status.declared {
1164            return Err(format!("index '{name}' is not declared in physical metadata").into());
1165        }
1166        self.update_physical_metadata(|metadata| {
1167            let now = SystemTime::now()
1168                .duration_since(UNIX_EPOCH)
1169                .unwrap_or_default()
1170                .as_millis();
1171            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1172                index.build_state = "failed".to_string();
1173                index.last_refresh_ms = Some(now);
1174                return Some(index.clone());
1175            }
1176            None
1177        })
1178    }
1179
1180    /// Mark a declared physical index as stale in the persisted registry.
1181    pub fn mark_index_stale(
1182        &self,
1183        name: &str,
1184    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1185        let Some(status) = self.index_status(name) else {
1186            return Err(format!("index '{name}' is not present in catalog status").into());
1187        };
1188        if !status.declared {
1189            return Err(format!("index '{name}' is not declared in physical metadata").into());
1190        }
1191        self.update_physical_metadata(|metadata| {
1192            let now = SystemTime::now()
1193                .duration_since(UNIX_EPOCH)
1194                .unwrap_or_default()
1195                .as_millis();
1196            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1197                index.build_state = if index.enabled {
1198                    "stale".to_string()
1199                } else {
1200                    "disabled".to_string()
1201                };
1202                index.last_refresh_ms = Some(now);
1203                return Some(index.clone());
1204            }
1205            None
1206        })
1207    }
1208
1209    /// Mark a declared physical index as ready in the persisted registry.
1210    pub fn mark_index_ready(
1211        &self,
1212        name: &str,
1213    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1214        self.warmup_index(name)
1215    }
1216
1217    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1218    pub fn warmup_index(
1219        &self,
1220        name: &str,
1221    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1222        let Some(status) = self.index_status(name) else {
1223            return Err(format!("index '{name}' is not present in catalog status").into());
1224        };
1225        if !status.declared {
1226            return Err(format!("index '{name}' is not declared in physical metadata").into());
1227        }
1228        if status.lifecycle_state == "disabled" {
1229            return Err(format!("index '{name}' is disabled").into());
1230        }
1231        if !status.operational {
1232            return Err(
1233                format!("index '{name}' is declared but not operationally materialized").into(),
1234            );
1235        }
1236        let warmed_artifact = self
1237            .physical_indexes()
1238            .into_iter()
1239            .find(|index| index.name == name)
1240            .map(|mut index| {
1241                self.warmup_native_vector_artifact_for_index(&index)?;
1242                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1243                Ok::<_, String>(index)
1244            })
1245            .transpose()
1246            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1247
1248        self.update_physical_metadata(|metadata| {
1249            let now = SystemTime::now()
1250                .duration_since(UNIX_EPOCH)
1251                .unwrap_or_default()
1252                .as_millis();
1253            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1254                if let Some(warmed) = warmed_artifact.as_ref() {
1255                    index.entries = warmed.entries;
1256                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1257                    index.backend = warmed.backend.clone();
1258                    index.build_state = "ready".to_string();
1259                }
1260                index.last_refresh_ms = Some(now);
1261                return Some(index.clone());
1262            }
1263            None
1264        })
1265    }
1266
1267    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1268    pub fn rebuild_index_registry(
1269        &self,
1270        collection: Option<&str>,
1271    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1272        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1273        self.update_physical_metadata(|metadata| {
1274            let now = SystemTime::now()
1275                .duration_since(UNIX_EPOCH)
1276                .unwrap_or_default()
1277                .as_millis();
1278
1279            let mut affected = Vec::new();
1280            let declared = metadata.indexes.clone();
1281            for declared_index in declared {
1282                let matches_collection = collection.is_none_or(|collection_name| {
1283                    declared_index.collection.as_deref() == Some(collection_name)
1284                });
1285                if !matches_collection {
1286                    continue;
1287                }
1288
1289                let mut rebuilt = fresh
1290                    .iter()
1291                    .find(|index| index.name == declared_index.name)
1292                    .cloned()
1293                    .unwrap_or_else(|| {
1294                        let mut index = declared_index.clone();
1295                        index.build_state = "declared-unbuilt".to_string();
1296                        index
1297                    });
1298                rebuilt.enabled = declared_index.enabled;
1299                rebuilt.artifact_kind = rebuilt
1300                    .artifact_kind
1301                    .or_else(|| declared_index.artifact_kind.clone());
1302                rebuilt.artifact_root_page = rebuilt
1303                    .artifact_root_page
1304                    .or(declared_index.artifact_root_page);
1305                rebuilt.artifact_checksum = rebuilt
1306                    .artifact_checksum
1307                    .or(declared_index.artifact_checksum);
1308                rebuilt.build_state =
1309                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1310                rebuilt.last_refresh_ms = Some(now);
1311
1312                if let Some(existing) = metadata
1313                    .indexes
1314                    .iter_mut()
1315                    .find(|index| index.name == rebuilt.name)
1316                {
1317                    *existing = rebuilt.clone();
1318                } else {
1319                    metadata.indexes.push(rebuilt.clone());
1320                }
1321
1322                affected.push(rebuilt);
1323            }
1324
1325            affected
1326        })
1327    }
1328
1329    fn finalize_rebuilt_index_build_state(
1330        declared: &PhysicalIndexState,
1331        rebuilt: &PhysicalIndexState,
1332    ) -> String {
1333        if !rebuilt.enabled {
1334            return "disabled".to_string();
1335        }
1336
1337        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1338            return "failed".to_string();
1339        }
1340
1341        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1342        if native_artifact_family {
1343            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1344                return "ready".to_string();
1345            }
1346            if declared.artifact_root_page.is_some()
1347                || declared.artifact_checksum.is_some()
1348                || declared.artifact_kind.is_some()
1349            {
1350                return "stale".to_string();
1351            }
1352            return "declared-unbuilt".to_string();
1353        }
1354
1355        if rebuilt.entries > 0 {
1356            return "ready".to_string();
1357        }
1358
1359        if matches!(
1360            declared.build_state.as_str(),
1361            "stale" | "artifact-published" | "registry-loaded"
1362        ) {
1363            return "stale".to_string();
1364        }
1365
1366        "declared-unbuilt".to_string()
1367    }
1368}