Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    pub fn save_collection_contract(
141        &self,
142        contract: crate::physical::CollectionContract,
143    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145            if let Some(ttl_ms) = contract.default_ttl_ms {
146                defaults.insert(contract.name.clone(), ttl_ms);
147            } else {
148                defaults.remove(&contract.name);
149            }
150        }
151
152        self.store()
153            .context_index()
154            .set_collection_enabled(&contract.name, contract.context_index_enabled);
155        if let Some(manager) = self.store().get_collection(&contract.name) {
156            let columns = contract
157                .declared_columns
158                .iter()
159                .map(|column| column.name.clone())
160                .collect();
161            manager.set_column_schema_if_empty(columns);
162        }
163
164        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
165            && self.options.storage_profile.packaging
166                == crate::storage::StoragePackaging::SingleFile
167        {
168            if let Ok(mut guard) = self.collection_contract_cache.write() {
169                let mut contracts = guard
170                    .as_ref()
171                    .map(|existing| existing.as_ref().clone())
172                    .unwrap_or_default();
173                contracts.insert(contract.name.clone(), std::sync::Arc::new(contract.clone()));
174                *guard = Some(std::sync::Arc::new(contracts));
175            }
176            return Ok(contract);
177        }
178
179        self.invalidate_collection_contract_cache();
180
181        let saved = self.update_physical_metadata(|metadata| {
182            if let Some(existing) = metadata
183                .collection_contracts
184                .iter_mut()
185                .find(|existing| existing.name == contract.name)
186            {
187                *existing = contract.clone();
188            } else {
189                metadata.collection_contracts.push(contract.clone());
190            }
191            metadata
192                .collection_contracts
193                .sort_by(|left, right| left.name.cmp(&right.name));
194
195            if let Some(ttl_ms) = contract.default_ttl_ms {
196                metadata
197                    .collection_ttl_defaults_ms
198                    .insert(contract.name.clone(), ttl_ms);
199            } else {
200                metadata.collection_ttl_defaults_ms.remove(&contract.name);
201            }
202
203            contract.clone()
204        })?;
205        self.invalidate_collection_contract_cache();
206        Ok(saved)
207    }
208
209    pub fn remove_collection_contract(
210        &self,
211        collection: &str,
212    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
213        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
214            defaults.remove(collection);
215        }
216
217        self.store()
218            .context_index()
219            .set_collection_enabled(collection, false);
220
221        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
222            && self.options.storage_profile.packaging
223                == crate::storage::StoragePackaging::SingleFile
224        {
225            let mut removed = None;
226            if let Ok(mut guard) = self.collection_contract_cache.write() {
227                let mut contracts = guard
228                    .as_ref()
229                    .map(|existing| existing.as_ref().clone())
230                    .unwrap_or_default();
231                removed = contracts
232                    .remove(collection)
233                    .map(|contract| (*contract).clone());
234                *guard = Some(std::sync::Arc::new(contracts));
235            }
236            return Ok(removed);
237        }
238
239        self.invalidate_collection_contract_cache();
240
241        let removed = self.update_physical_metadata(|metadata| {
242            let removed = metadata
243                .collection_contracts
244                .iter()
245                .position(|contract| contract.name == collection)
246                .map(|index| metadata.collection_contracts.remove(index));
247            metadata.collection_ttl_defaults_ms.remove(collection);
248            metadata
249                .indexes
250                .retain(|index| index.collection.as_deref() != Some(collection));
251            removed
252        })?;
253        self.invalidate_collection_contract_cache();
254        Ok(removed)
255    }
256
257    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
258        self.collection_ttl_defaults_ms
259            .read()
260            .map(|defaults| {
261                defaults
262                    .iter()
263                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
264                    .collect()
265            })
266            .unwrap_or_default()
267    }
268
269    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
270        let metadata = self.physical_metadata();
271        let defaults = metadata
272            .as_ref()
273            .map(|m| m.collection_ttl_defaults_ms.clone())
274            .unwrap_or_default();
275
276        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
277            current.clear();
278            current.extend(defaults);
279        }
280
281        if let Some(metadata) = metadata {
282            let store = self.store();
283            let index = store.context_index();
284            for contract in &metadata.collection_contracts {
285                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
286            }
287        }
288    }
289
290    /// Snapshot the live hypertable registry (specs + every chunk)
291    /// into the sidecar-serialisable form. Called from
292    /// [`persist_metadata`] so the chunk routing spine is durable on
293    /// the same metadata path as collection contracts — issue #866.
294    /// Returns an empty vec (and touches nothing) when no hypertable
295    /// has ever been declared, so non-hypertable databases pay no
296    /// persist cost.
297    pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
298        let registry = self.hypertables();
299        if registry.is_empty() {
300            return Vec::new();
301        }
302        use std::collections::BTreeMap;
303        let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
304            BTreeMap::new();
305        for chunk in registry.snapshot_chunks() {
306            chunks_by_table
307                .entry(chunk.id.hypertable.clone())
308                .or_default()
309                .push(crate::physical::PhysicalHypertableChunk {
310                    start_ns: chunk.id.start_ns,
311                    end_ns_exclusive: chunk.end_ns_exclusive,
312                    row_count: chunk.row_count,
313                    min_ts_ns: chunk.min_ts_ns,
314                    max_ts_ns: chunk.max_ts_ns,
315                    sealed: chunk.sealed,
316                    ttl_override_ns: chunk.ttl_override_ns,
317                    columnar_page: chunk.columnar_page,
318                });
319        }
320        registry
321            .list()
322            .into_iter()
323            .map(|spec| crate::physical::PhysicalHypertable {
324                chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
325                name: spec.name,
326                time_column: spec.time_column,
327                chunk_interval_ns: spec.chunk_interval_ns,
328                default_ttl_ns: spec.default_ttl_ns,
329            })
330            .collect()
331    }
332
333    /// Rehydrate the hypertable registry from the physical metadata
334    /// sidecar at boot. Mirror of
335    /// [`load_collection_ttl_defaults_from_metadata`] — specs and
336    /// chunks are restored verbatim so bounds / routing / TTL are
337    /// identical to the pre-restart state (issue #866).
338    pub(crate) fn load_hypertables_from_metadata(&self) {
339        let Some(metadata) = self.physical_metadata() else {
340            return;
341        };
342        if metadata.hypertables.is_empty() {
343            return;
344        }
345        let registry = self.hypertables();
346        for hypertable in &metadata.hypertables {
347            let mut spec = crate::storage::timeseries::HypertableSpec::new(
348                hypertable.name.clone(),
349                hypertable.time_column.clone(),
350                hypertable.chunk_interval_ns,
351            );
352            if let Some(ttl) = hypertable.default_ttl_ns {
353                spec = spec.with_ttl_ns(ttl);
354            }
355            registry.register(spec);
356            for chunk in &hypertable.chunks {
357                registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
358                    id: crate::storage::timeseries::ChunkId {
359                        hypertable: hypertable.name.clone(),
360                        start_ns: chunk.start_ns,
361                    },
362                    end_ns_exclusive: chunk.end_ns_exclusive,
363                    row_count: chunk.row_count,
364                    min_ts_ns: chunk.min_ts_ns,
365                    max_ts_ns: chunk.max_ts_ns,
366                    sealed: chunk.sealed,
367                    ttl_override_ns: chunk.ttl_override_ns,
368                    columnar_page: chunk.columnar_page,
369                });
370            }
371        }
372    }
373
374    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
375        self.store.run_maintenance()?;
376        self.persist_metadata()?;
377        Ok(())
378    }
379
380    /// Path to the physical metadata sidecar, if persistent.
381    pub fn metadata_path(&self) -> Option<PathBuf> {
382        self.path
383            .as_ref()
384            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
385    }
386
387    /// Load the current physical metadata view, bootstrapping from native state when needed.
388    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
389        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
390            .ok()
391    }
392
393    /// Physical index registry derived for the current database state.
394    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
395        let indexes = self
396            .physical_metadata()
397            .map(|metadata| metadata.indexes)
398            .filter(|indexes| !indexes.is_empty())
399            .or_else(|| {
400                self.native_physical_state()
401                    .map(|state| self.physical_index_state_from_native_state(&state, None))
402            })
403            .unwrap_or_else(|| self.physical_index_state());
404        self.reconcile_index_states_with_native_artifacts(indexes)
405    }
406
407    /// List registered named exports from the current physical metadata view.
408    pub fn exports(&self) -> Vec<ExportDescriptor> {
409        self.physical_metadata()
410            .map(|metadata| metadata.exports)
411            .or_else(|| {
412                self.native_physical_state()
413                    .map(|state| self.exports_from_native_state(&state))
414            })
415            .unwrap_or_default()
416    }
417
418    /// List recorded snapshots from the current physical metadata view.
419    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
420        self.physical_metadata()
421            .map(|metadata| metadata.snapshots)
422            .or_else(|| {
423                self.native_physical_state()
424                    .map(|state| self.snapshots_from_native_state(&state))
425            })
426            .unwrap_or_default()
427    }
428
429    /// List persisted named graph projections from the current physical metadata view.
430    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
431        self.physical_metadata()
432            .map(|metadata| metadata.graph_projections)
433            .or_else(|| {
434                self.native_physical_state()
435                    .map(|state| self.graph_projections_from_native_state(&state))
436            })
437            .unwrap_or_default()
438    }
439
440    /// List graph projections declared in the catalog view.
441    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
442        self.catalog_model_snapshot().declared_graph_projections
443    }
444
445    /// List graph projections currently observed in the operational view.
446    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
447        self.graph_projections()
448            .into_iter()
449            .filter(|projection| {
450                projection.last_materialized_sequence.is_some()
451                    || matches!(projection.state.as_str(), "materialized" | "stale")
452            })
453            .collect()
454    }
455
456    /// List persisted analytics job metadata from the current physical metadata view.
457    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
458        self.physical_metadata()
459            .map(|metadata| metadata.analytics_jobs)
460            .or_else(|| {
461                self.native_physical_state()
462                    .map(|state| self.analytics_jobs_from_native_state(&state))
463            })
464            .unwrap_or_default()
465    }
466
467    /// List analytics jobs declared in the catalog view.
468    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
469        self.catalog_model_snapshot().declared_analytics_jobs
470    }
471
472    /// List analytics jobs currently observed in the operational view.
473    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
474        self.analytics_jobs()
475            .into_iter()
476            .filter(|job| {
477                job.last_run_sequence.is_some()
478                    || matches!(
479                        job.state.as_str(),
480                        "running" | "completed" | "failed" | "queued" | "stale"
481                    )
482            })
483            .collect()
484    }
485
486    /// List indexes declared in the catalog view.
487    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
488        self.catalog_model_snapshot().declared_indexes
489    }
490
491    /// List indexes currently observed in the operational view.
492    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
493        self.catalog_model_snapshot().operational_indexes
494    }
495
496    /// List reconciled index status entries from the catalog snapshot.
497    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
498        self.catalog_model_snapshot().index_statuses
499    }
500
501    /// Resolve one index status entry from the catalog snapshot.
502    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
503        self.catalog_model_snapshot()
504            .index_statuses
505            .into_iter()
506            .find(|status| status.name == name)
507    }
508
509    /// Upsert a named graph projection in the persisted physical metadata.
510    pub fn save_graph_projection(
511        &self,
512        name: impl Into<String>,
513        node_labels: Vec<String>,
514        node_types: Vec<String>,
515        edge_labels: Vec<String>,
516        source: impl Into<String>,
517    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
518        let name = name.into();
519        let source = source.into();
520        self.update_physical_metadata(|metadata| {
521            let now = SystemTime::now()
522                .duration_since(UNIX_EPOCH)
523                .unwrap_or_default()
524                .as_millis();
525            let projection = if let Some(existing) = metadata
526                .graph_projections
527                .iter_mut()
528                .find(|projection| projection.name == name)
529            {
530                existing.updated_at_unix_ms = now;
531                existing.state = "declared".to_string();
532                existing.source = source.clone();
533                existing.node_labels = node_labels.clone();
534                existing.node_types = node_types.clone();
535                existing.edge_labels = edge_labels.clone();
536                existing.last_materialized_sequence = None;
537                existing.clone()
538            } else {
539                let projection = PhysicalGraphProjection {
540                    name: name.clone(),
541                    created_at_unix_ms: now,
542                    updated_at_unix_ms: now,
543                    state: "declared".to_string(),
544                    source: source.clone(),
545                    node_labels: node_labels.clone(),
546                    node_types: node_types.clone(),
547                    edge_labels: edge_labels.clone(),
548                    last_materialized_sequence: None,
549                };
550                metadata.graph_projections.push(projection.clone());
551                projection
552            };
553
554            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
555
556            metadata
557                .graph_projections
558                .sort_by(|left, right| left.name.cmp(&right.name));
559            projection
560        })
561    }
562
563    /// Mark a declared graph projection as materialized in the current physical metadata.
564    pub fn materialize_graph_projection(
565        &self,
566        name: &str,
567    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
568        self.update_physical_metadata(|metadata| {
569            let now = SystemTime::now()
570                .duration_since(UNIX_EPOCH)
571                .unwrap_or_default()
572                .as_millis();
573            let idx = metadata
574                .graph_projections
575                .iter()
576                .position(|projection| projection.name == name);
577            if let Some(idx) = idx {
578                metadata.graph_projections[idx].updated_at_unix_ms = now;
579                metadata.graph_projections[idx].state = "materialized".to_string();
580                metadata.graph_projections[idx].last_materialized_sequence =
581                    Some(metadata.superblock.sequence);
582                let result = metadata.graph_projections[idx].clone();
583                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
584                return Some(result);
585            }
586            None
587        })
588    }
589
590    /// Mark a declared graph projection as materializing.
591    pub fn mark_graph_projection_materializing(
592        &self,
593        name: &str,
594    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
595        self.update_physical_metadata(|metadata| {
596            let now = SystemTime::now()
597                .duration_since(UNIX_EPOCH)
598                .unwrap_or_default()
599                .as_millis();
600            let idx = metadata
601                .graph_projections
602                .iter()
603                .position(|projection| projection.name == name);
604            if let Some(idx) = idx {
605                metadata.graph_projections[idx].updated_at_unix_ms = now;
606                metadata.graph_projections[idx].state = "materializing".to_string();
607                metadata.graph_projections[idx].last_materialized_sequence = None;
608                let result = metadata.graph_projections[idx].clone();
609                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
610                return Some(result);
611            }
612            None
613        })
614    }
615
616    /// Mark a graph projection as failed.
617    pub fn fail_graph_projection(
618        &self,
619        name: &str,
620    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
621        self.update_physical_metadata(|metadata| {
622            let now = SystemTime::now()
623                .duration_since(UNIX_EPOCH)
624                .unwrap_or_default()
625                .as_millis();
626            let idx = metadata
627                .graph_projections
628                .iter()
629                .position(|projection| projection.name == name);
630            if let Some(idx) = idx {
631                metadata.graph_projections[idx].updated_at_unix_ms = now;
632                metadata.graph_projections[idx].state = "failed".to_string();
633                metadata.graph_projections[idx].last_materialized_sequence = None;
634                let result = metadata.graph_projections[idx].clone();
635                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
636                return Some(result);
637            }
638            None
639        })
640    }
641
642    /// Mark a graph projection as stale while preserving any last materialized sequence.
643    pub fn mark_graph_projection_stale(
644        &self,
645        name: &str,
646    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
647        self.update_physical_metadata(|metadata| {
648            let now = SystemTime::now()
649                .duration_since(UNIX_EPOCH)
650                .unwrap_or_default()
651                .as_millis();
652            let idx = metadata
653                .graph_projections
654                .iter()
655                .position(|projection| projection.name == name);
656            if let Some(idx) = idx {
657                metadata.graph_projections[idx].updated_at_unix_ms = now;
658                metadata.graph_projections[idx].state = "stale".to_string();
659                let result = metadata.graph_projections[idx].clone();
660                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
661                return Some(result);
662            }
663            None
664        })
665    }
666
667    fn mark_projection_dependent_jobs_stale(
668        metadata: &mut PhysicalMetadataFile,
669        projection_name: &str,
670        now: u128,
671    ) {
672        for job in metadata.analytics_jobs.iter_mut() {
673            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
674                job.state = "stale".to_string();
675                job.updated_at_unix_ms = now;
676            }
677        }
678    }
679
680    fn rearm_projection_dependent_jobs_declared(
681        metadata: &mut PhysicalMetadataFile,
682        projection_name: &str,
683        now: u128,
684    ) {
685        for job in metadata.analytics_jobs.iter_mut() {
686            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
687                job.state = "declared".to_string();
688                job.last_run_sequence = None;
689                job.updated_at_unix_ms = now;
690            }
691        }
692    }
693
694    /// Declare or update analytics job metadata without marking it as executed.
695    pub fn save_analytics_job(
696        &self,
697        kind: impl Into<String>,
698        projection: Option<String>,
699        metadata_entries: BTreeMap<String, String>,
700    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
701        let kind = kind.into();
702        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
703        if let Some(projection_name) = projection.as_deref() {
704            if !self.graph_projection_is_declared(projection_name) {
705                return Err(format!(
706                    "graph projection '{projection_name}' is not declared in physical metadata"
707                )
708                .into());
709            }
710        }
711
712        self.update_physical_metadata(|metadata| {
713            let now = SystemTime::now()
714                .duration_since(UNIX_EPOCH)
715                .unwrap_or_default()
716                .as_millis();
717
718            let job = if let Some(existing) = metadata
719                .analytics_jobs
720                .iter_mut()
721                .find(|job| job.id == job_id)
722            {
723                existing.kind = kind.clone();
724                existing.projection = projection.clone();
725                existing.updated_at_unix_ms = now;
726                existing.metadata = metadata_entries.clone();
727                if existing.last_run_sequence.is_none() {
728                    existing.state = "declared".to_string();
729                }
730                existing.clone()
731            } else {
732                let job = PhysicalAnalyticsJob {
733                    id: job_id.clone(),
734                    kind: kind.clone(),
735                    state: "declared".to_string(),
736                    projection: projection.clone(),
737                    created_at_unix_ms: now,
738                    updated_at_unix_ms: now,
739                    last_run_sequence: None,
740                    metadata: metadata_entries.clone(),
741                };
742                metadata.analytics_jobs.push(job.clone());
743                job
744            };
745
746            metadata
747                .analytics_jobs
748                .sort_by(|left, right| left.id.cmp(&right.id));
749            job
750        })
751    }
752
753    /// Record or update analytics job metadata in the persisted physical metadata.
754    pub fn record_analytics_job(
755        &self,
756        kind: impl Into<String>,
757        projection: Option<String>,
758        metadata_entries: BTreeMap<String, String>,
759    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
760        let kind = kind.into();
761        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
762        if let Some(projection_name) = projection.as_deref() {
763            if !self.graph_projection_is_declared(projection_name) {
764                return Err(format!(
765                    "graph projection '{projection_name}' is not declared in physical metadata"
766                )
767                .into());
768            }
769            if !self.graph_projection_is_operational(projection_name) {
770                return Err(format!(
771                    "graph projection '{projection_name}' is declared but not operationally materialized"
772                )
773                .into());
774            }
775        }
776
777        self.update_physical_metadata(|metadata| {
778            let now = SystemTime::now()
779                .duration_since(UNIX_EPOCH)
780                .unwrap_or_default()
781                .as_millis();
782
783            let existing = metadata
784                .analytics_jobs
785                .iter_mut()
786                .find(|job| job.id == job_id)?;
787            existing.state = "completed".to_string();
788            existing.kind = kind.clone();
789            existing.projection = projection.clone();
790            existing.updated_at_unix_ms = now;
791            existing.last_run_sequence = Some(metadata.superblock.sequence);
792            existing.metadata = metadata_entries.clone();
793            let job = existing.clone();
794
795            metadata
796                .analytics_jobs
797                .sort_by(|left, right| left.id.cmp(&right.id));
798            Some(job)
799        })
800        .and_then(|job| {
801            job.ok_or_else(|| {
802                format!("analytics job '{job_id}' is not declared in physical metadata").into()
803            })
804        })
805    }
806
807    /// Mark a declared analytics job as running.
808    pub fn queue_analytics_job(
809        &self,
810        kind: impl Into<String>,
811        projection: Option<String>,
812        metadata_entries: BTreeMap<String, String>,
813    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
814        let kind = kind.into();
815        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
816        if let Some(projection_name) = projection.as_deref() {
817            if !self.graph_projection_is_declared(projection_name) {
818                return Err(format!(
819                    "graph projection '{projection_name}' is not declared in physical metadata"
820                )
821                .into());
822            }
823            if !self.graph_projection_is_operational(projection_name) {
824                return Err(format!(
825                    "graph projection '{projection_name}' is declared but not operationally materialized"
826                )
827                .into());
828            }
829        }
830
831        self.update_physical_metadata(|metadata| {
832            let now = SystemTime::now()
833                .duration_since(UNIX_EPOCH)
834                .unwrap_or_default()
835                .as_millis();
836
837            let existing = metadata
838                .analytics_jobs
839                .iter_mut()
840                .find(|job| job.id == job_id)?;
841            existing.state = "queued".to_string();
842            existing.kind = kind.clone();
843            existing.projection = projection.clone();
844            existing.updated_at_unix_ms = now;
845            existing.metadata = metadata_entries.clone();
846            let job = existing.clone();
847
848            metadata
849                .analytics_jobs
850                .sort_by(|left, right| left.id.cmp(&right.id));
851            Some(job)
852        })
853        .and_then(|job| {
854            job.ok_or_else(|| {
855                format!("analytics job '{job_id}' is not declared in physical metadata").into()
856            })
857        })
858    }
859
860    /// Mark a declared analytics job as running.
861    pub fn start_analytics_job(
862        &self,
863        kind: impl Into<String>,
864        projection: Option<String>,
865        metadata_entries: BTreeMap<String, String>,
866    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
867        let kind = kind.into();
868        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
869        if let Some(projection_name) = projection.as_deref() {
870            if !self.graph_projection_is_declared(projection_name) {
871                return Err(format!(
872                    "graph projection '{projection_name}' is not declared in physical metadata"
873                )
874                .into());
875            }
876            if !self.graph_projection_is_operational(projection_name) {
877                return Err(format!(
878                    "graph projection '{projection_name}' is declared but not operationally materialized"
879                )
880                .into());
881            }
882        }
883
884        self.update_physical_metadata(|metadata| {
885            let now = SystemTime::now()
886                .duration_since(UNIX_EPOCH)
887                .unwrap_or_default()
888                .as_millis();
889
890            let existing = metadata
891                .analytics_jobs
892                .iter_mut()
893                .find(|job| job.id == job_id)?;
894            existing.state = "running".to_string();
895            existing.kind = kind.clone();
896            existing.projection = projection.clone();
897            existing.updated_at_unix_ms = now;
898            existing.metadata = metadata_entries.clone();
899            let job = existing.clone();
900
901            metadata
902                .analytics_jobs
903                .sort_by(|left, right| left.id.cmp(&right.id));
904            Some(job)
905        })
906        .and_then(|job| {
907            job.ok_or_else(|| {
908                format!("analytics job '{job_id}' is not declared in physical metadata").into()
909            })
910        })
911    }
912
913    /// Mark a declared analytics job as failed.
914    pub fn fail_analytics_job(
915        &self,
916        kind: impl Into<String>,
917        projection: Option<String>,
918        metadata_entries: BTreeMap<String, String>,
919    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
920        let kind = kind.into();
921        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
922
923        self.update_physical_metadata(|metadata| {
924            let now = SystemTime::now()
925                .duration_since(UNIX_EPOCH)
926                .unwrap_or_default()
927                .as_millis();
928
929            let existing = metadata
930                .analytics_jobs
931                .iter_mut()
932                .find(|job| job.id == job_id)?;
933            existing.state = "failed".to_string();
934            existing.kind = kind.clone();
935            existing.projection = projection.clone();
936            existing.updated_at_unix_ms = now;
937            existing.metadata = metadata_entries.clone();
938            let job = existing.clone();
939
940            metadata
941                .analytics_jobs
942                .sort_by(|left, right| left.id.cmp(&right.id));
943            Some(job)
944        })
945        .and_then(|job| {
946            job.ok_or_else(|| {
947                format!("analytics job '{job_id}' is not declared in physical metadata").into()
948            })
949        })
950    }
951
952    /// Mark a declared analytics job as stale.
953    pub fn mark_analytics_job_stale(
954        &self,
955        kind: impl Into<String>,
956        projection: Option<String>,
957        metadata_entries: BTreeMap<String, String>,
958    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
959        let kind = kind.into();
960        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
961
962        self.update_physical_metadata(|metadata| {
963            let now = SystemTime::now()
964                .duration_since(UNIX_EPOCH)
965                .unwrap_or_default()
966                .as_millis();
967
968            let existing = metadata
969                .analytics_jobs
970                .iter_mut()
971                .find(|job| job.id == job_id)?;
972            existing.state = "stale".to_string();
973            existing.kind = kind.clone();
974            existing.projection = projection.clone();
975            existing.updated_at_unix_ms = now;
976            existing.metadata = metadata_entries.clone();
977            let job = existing.clone();
978
979            metadata
980                .analytics_jobs
981                .sort_by(|left, right| left.id.cmp(&right.id));
982            Some(job)
983        })
984        .and_then(|job| {
985            job.ok_or_else(|| {
986                format!("analytics job '{job_id}' is not declared in physical metadata").into()
987            })
988        })
989    }
990
991    /// Create a named export by copying the current database file and metadata sidecar.
992    pub fn create_named_export(
993        &self,
994        name: impl Into<String>,
995    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
996        let name = name.into();
997        if self.options.mode != StorageMode::Persistent {
998            return Err("exports require persistent mode".into());
999        }
1000        let Some(path) = self.path() else {
1001            return Err("database path is not available".into());
1002        };
1003
1004        self.flush()?;
1005
1006        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1007        let export_data_path = reddb_file::copy_physical_export_data_file(path, &name)?;
1008        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
1009        let export_metadata_binary_path =
1010            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
1011
1012        let descriptor = ExportDescriptor {
1013            name: name.clone(),
1014            created_at_unix_ms: SystemTime::now()
1015                .duration_since(UNIX_EPOCH)
1016                .unwrap_or_default()
1017                .as_millis(),
1018            snapshot_id: metadata
1019                .snapshots
1020                .last()
1021                .map(|snapshot| snapshot.snapshot_id),
1022            superblock_sequence: metadata.superblock.sequence,
1023            data_path: export_data_path.display().to_string(),
1024            metadata_path: export_metadata_path.display().to_string(),
1025            collection_count: metadata.catalog.total_collections,
1026            total_entities: metadata.catalog.total_entities,
1027        };
1028
1029        metadata
1030            .exports
1031            .retain(|export| export.name != descriptor.name);
1032        metadata.exports.push(descriptor.clone());
1033        self.prune_export_registry(&mut metadata.exports);
1034        metadata.save_for_data_path(path)?;
1035        metadata.save_to_binary_path(&export_metadata_binary_path)?;
1036        metadata.save_to_path(&export_metadata_path)?;
1037
1038        Ok(descriptor)
1039    }
1040
1041    /// Enable or disable a physical index entry in the persisted registry.
1042    pub fn set_index_enabled(
1043        &self,
1044        name: &str,
1045        enabled: bool,
1046    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1047        let Some(status) = self.index_status(name) else {
1048            return Err(format!("index '{name}' is not present in catalog status").into());
1049        };
1050        if !status.declared {
1051            return Err(format!("index '{name}' is not declared in physical metadata").into());
1052        }
1053        self.update_physical_metadata(|metadata| {
1054            let now = SystemTime::now()
1055                .duration_since(UNIX_EPOCH)
1056                .unwrap_or_default()
1057                .as_millis();
1058            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1059                index.enabled = enabled;
1060                if !enabled {
1061                    index.build_state = "disabled".to_string();
1062                } else if index.build_state == "disabled" {
1063                    index.build_state = if index.artifact_root_page.is_some() {
1064                        "ready".to_string()
1065                    } else {
1066                        "declared-unbuilt".to_string()
1067                    };
1068                }
1069                index.last_refresh_ms = Some(now);
1070                return Some(index.clone());
1071            }
1072            None
1073        })
1074    }
1075
1076    /// Mark a declared physical index as building in the persisted registry.
1077    pub fn mark_index_building(
1078        &self,
1079        name: &str,
1080    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1081        let Some(status) = self.index_status(name) else {
1082            return Err(format!("index '{name}' is not present in catalog status").into());
1083        };
1084        if !status.declared {
1085            return Err(format!("index '{name}' is not declared in physical metadata").into());
1086        }
1087        if status.lifecycle_state == "disabled" {
1088            return Err(format!("index '{name}' is disabled").into());
1089        }
1090        self.update_physical_metadata(|metadata| {
1091            let now = SystemTime::now()
1092                .duration_since(UNIX_EPOCH)
1093                .unwrap_or_default()
1094                .as_millis();
1095            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1096                index.build_state = "building".to_string();
1097                index.last_refresh_ms = Some(now);
1098                return Some(index.clone());
1099            }
1100            None
1101        })
1102    }
1103
1104    /// Mark a declared physical index as failed in the persisted registry.
1105    pub fn fail_index(
1106        &self,
1107        name: &str,
1108    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1109        let Some(status) = self.index_status(name) else {
1110            return Err(format!("index '{name}' is not present in catalog status").into());
1111        };
1112        if !status.declared {
1113            return Err(format!("index '{name}' is not declared in physical metadata").into());
1114        }
1115        self.update_physical_metadata(|metadata| {
1116            let now = SystemTime::now()
1117                .duration_since(UNIX_EPOCH)
1118                .unwrap_or_default()
1119                .as_millis();
1120            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1121                index.build_state = "failed".to_string();
1122                index.last_refresh_ms = Some(now);
1123                return Some(index.clone());
1124            }
1125            None
1126        })
1127    }
1128
1129    /// Mark a declared physical index as stale in the persisted registry.
1130    pub fn mark_index_stale(
1131        &self,
1132        name: &str,
1133    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1134        let Some(status) = self.index_status(name) else {
1135            return Err(format!("index '{name}' is not present in catalog status").into());
1136        };
1137        if !status.declared {
1138            return Err(format!("index '{name}' is not declared in physical metadata").into());
1139        }
1140        self.update_physical_metadata(|metadata| {
1141            let now = SystemTime::now()
1142                .duration_since(UNIX_EPOCH)
1143                .unwrap_or_default()
1144                .as_millis();
1145            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1146                index.build_state = if index.enabled {
1147                    "stale".to_string()
1148                } else {
1149                    "disabled".to_string()
1150                };
1151                index.last_refresh_ms = Some(now);
1152                return Some(index.clone());
1153            }
1154            None
1155        })
1156    }
1157
1158    /// Mark a declared physical index as ready in the persisted registry.
1159    pub fn mark_index_ready(
1160        &self,
1161        name: &str,
1162    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1163        self.warmup_index(name)
1164    }
1165
1166    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1167    pub fn warmup_index(
1168        &self,
1169        name: &str,
1170    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1171        let Some(status) = self.index_status(name) else {
1172            return Err(format!("index '{name}' is not present in catalog status").into());
1173        };
1174        if !status.declared {
1175            return Err(format!("index '{name}' is not declared in physical metadata").into());
1176        }
1177        if status.lifecycle_state == "disabled" {
1178            return Err(format!("index '{name}' is disabled").into());
1179        }
1180        if !status.operational {
1181            return Err(
1182                format!("index '{name}' is declared but not operationally materialized").into(),
1183            );
1184        }
1185        let warmed_artifact = self
1186            .physical_indexes()
1187            .into_iter()
1188            .find(|index| index.name == name)
1189            .map(|mut index| {
1190                self.warmup_native_vector_artifact_for_index(&index)?;
1191                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1192                Ok::<_, String>(index)
1193            })
1194            .transpose()
1195            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1196
1197        self.update_physical_metadata(|metadata| {
1198            let now = SystemTime::now()
1199                .duration_since(UNIX_EPOCH)
1200                .unwrap_or_default()
1201                .as_millis();
1202            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1203                if let Some(warmed) = warmed_artifact.as_ref() {
1204                    index.entries = warmed.entries;
1205                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1206                    index.backend = warmed.backend.clone();
1207                    index.build_state = "ready".to_string();
1208                }
1209                index.last_refresh_ms = Some(now);
1210                return Some(index.clone());
1211            }
1212            None
1213        })
1214    }
1215
1216    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1217    pub fn rebuild_index_registry(
1218        &self,
1219        collection: Option<&str>,
1220    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1221        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1222        self.update_physical_metadata(|metadata| {
1223            let now = SystemTime::now()
1224                .duration_since(UNIX_EPOCH)
1225                .unwrap_or_default()
1226                .as_millis();
1227
1228            let mut affected = Vec::new();
1229            let declared = metadata.indexes.clone();
1230            for declared_index in declared {
1231                let matches_collection = collection.is_none_or(|collection_name| {
1232                    declared_index.collection.as_deref() == Some(collection_name)
1233                });
1234                if !matches_collection {
1235                    continue;
1236                }
1237
1238                let mut rebuilt = fresh
1239                    .iter()
1240                    .find(|index| index.name == declared_index.name)
1241                    .cloned()
1242                    .unwrap_or_else(|| {
1243                        let mut index = declared_index.clone();
1244                        index.build_state = "declared-unbuilt".to_string();
1245                        index
1246                    });
1247                rebuilt.enabled = declared_index.enabled;
1248                rebuilt.artifact_kind = rebuilt
1249                    .artifact_kind
1250                    .or_else(|| declared_index.artifact_kind.clone());
1251                rebuilt.artifact_root_page = rebuilt
1252                    .artifact_root_page
1253                    .or(declared_index.artifact_root_page);
1254                rebuilt.artifact_checksum = rebuilt
1255                    .artifact_checksum
1256                    .or(declared_index.artifact_checksum);
1257                rebuilt.build_state =
1258                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1259                rebuilt.last_refresh_ms = Some(now);
1260
1261                if let Some(existing) = metadata
1262                    .indexes
1263                    .iter_mut()
1264                    .find(|index| index.name == rebuilt.name)
1265                {
1266                    *existing = rebuilt.clone();
1267                } else {
1268                    metadata.indexes.push(rebuilt.clone());
1269                }
1270
1271                affected.push(rebuilt);
1272            }
1273
1274            affected
1275        })
1276    }
1277
1278    fn finalize_rebuilt_index_build_state(
1279        declared: &PhysicalIndexState,
1280        rebuilt: &PhysicalIndexState,
1281    ) -> String {
1282        if !rebuilt.enabled {
1283            return "disabled".to_string();
1284        }
1285
1286        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1287            return "failed".to_string();
1288        }
1289
1290        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1291        if native_artifact_family {
1292            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1293                return "ready".to_string();
1294            }
1295            if declared.artifact_root_page.is_some()
1296                || declared.artifact_checksum.is_some()
1297                || declared.artifact_kind.is_some()
1298            {
1299                return "stale".to_string();
1300            }
1301            return "declared-unbuilt".to_string();
1302        }
1303
1304        if rebuilt.entries > 0 {
1305            return "ready".to_string();
1306        }
1307
1308        if matches!(
1309            declared.build_state.as_str(),
1310            "stale" | "artifact-published" | "registry-loaded"
1311        ) {
1312            return "stale".to_string();
1313        }
1314
1315        "declared-unbuilt".to_string()
1316    }
1317}