Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    pub fn save_collection_contract(
141        &self,
142        contract: crate::physical::CollectionContract,
143    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145            if let Some(ttl_ms) = contract.default_ttl_ms {
146                defaults.insert(contract.name.clone(), ttl_ms);
147            } else {
148                defaults.remove(&contract.name);
149            }
150        }
151
152        self.store()
153            .context_index()
154            .set_collection_enabled(&contract.name, contract.context_index_enabled);
155        if let Some(manager) = self.store().get_collection(&contract.name) {
156            let columns = contract
157                .declared_columns
158                .iter()
159                .map(|column| column.name.clone())
160                .collect();
161            manager.set_column_schema_if_empty(columns);
162        }
163
164        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
165            && self.options.storage_profile.packaging
166                == crate::storage::StoragePackaging::SingleFile
167        {
168            if let Ok(mut guard) = self.collection_contract_cache.write() {
169                let mut contracts = guard
170                    .as_ref()
171                    .map(|existing| existing.as_ref().clone())
172                    .unwrap_or_default();
173                contracts.insert(contract.name.clone(), std::sync::Arc::new(contract.clone()));
174                *guard = Some(std::sync::Arc::new(contracts));
175            }
176            return Ok(contract);
177        }
178
179        self.invalidate_collection_contract_cache();
180
181        let saved = self.update_physical_metadata(|metadata| {
182            if let Some(existing) = metadata
183                .collection_contracts
184                .iter_mut()
185                .find(|existing| existing.name == contract.name)
186            {
187                *existing = contract.clone();
188            } else {
189                metadata.collection_contracts.push(contract.clone());
190            }
191            metadata
192                .collection_contracts
193                .sort_by(|left, right| left.name.cmp(&right.name));
194
195            if let Some(ttl_ms) = contract.default_ttl_ms {
196                metadata
197                    .collection_ttl_defaults_ms
198                    .insert(contract.name.clone(), ttl_ms);
199            } else {
200                metadata.collection_ttl_defaults_ms.remove(&contract.name);
201            }
202
203            contract.clone()
204        })?;
205        self.invalidate_collection_contract_cache();
206        Ok(saved)
207    }
208
209    pub fn remove_collection_contract(
210        &self,
211        collection: &str,
212    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
213        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
214            defaults.remove(collection);
215        }
216
217        self.store()
218            .context_index()
219            .set_collection_enabled(collection, false);
220
221        if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
222            && self.options.storage_profile.packaging
223                == crate::storage::StoragePackaging::SingleFile
224        {
225            let mut removed = None;
226            if let Ok(mut guard) = self.collection_contract_cache.write() {
227                let mut contracts = guard
228                    .as_ref()
229                    .map(|existing| existing.as_ref().clone())
230                    .unwrap_or_default();
231                removed = contracts
232                    .remove(collection)
233                    .map(|contract| (*contract).clone());
234                *guard = Some(std::sync::Arc::new(contracts));
235            }
236            return Ok(removed);
237        }
238
239        self.invalidate_collection_contract_cache();
240
241        let removed = self.update_physical_metadata(|metadata| {
242            let removed = metadata
243                .collection_contracts
244                .iter()
245                .position(|contract| contract.name == collection)
246                .map(|index| metadata.collection_contracts.remove(index));
247            metadata.collection_ttl_defaults_ms.remove(collection);
248            metadata
249                .indexes
250                .retain(|index| index.collection.as_deref() != Some(collection));
251            removed
252        })?;
253        self.invalidate_collection_contract_cache();
254        Ok(removed)
255    }
256
257    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
258        self.collection_ttl_defaults_ms
259            .read()
260            .map(|defaults| {
261                defaults
262                    .iter()
263                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
264                    .collect()
265            })
266            .unwrap_or_default()
267    }
268
269    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
270        let metadata = self.physical_metadata();
271        let defaults = metadata
272            .as_ref()
273            .map(|m| m.collection_ttl_defaults_ms.clone())
274            .unwrap_or_default();
275
276        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
277            current.clear();
278            current.extend(defaults);
279        }
280
281        if let Some(metadata) = metadata {
282            let store = self.store();
283            let index = store.context_index();
284            for contract in &metadata.collection_contracts {
285                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
286            }
287        }
288    }
289
290    /// Snapshot the live hypertable registry (specs + every chunk)
291    /// into the sidecar-serialisable form. Called from
292    /// [`persist_metadata`] so the chunk routing spine is durable on
293    /// the same metadata path as collection contracts — issue #866.
294    /// Returns an empty vec (and touches nothing) when no hypertable
295    /// has ever been declared, so non-hypertable databases pay no
296    /// persist cost.
297    pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
298        let registry = self.hypertables();
299        if registry.is_empty() {
300            return Vec::new();
301        }
302        use std::collections::BTreeMap;
303        let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
304            BTreeMap::new();
305        for chunk in registry.snapshot_chunks() {
306            chunks_by_table
307                .entry(chunk.id.hypertable.clone())
308                .or_default()
309                .push(crate::physical::PhysicalHypertableChunk {
310                    start_ns: chunk.id.start_ns,
311                    end_ns_exclusive: chunk.end_ns_exclusive,
312                    row_count: chunk.row_count,
313                    min_ts_ns: chunk.min_ts_ns,
314                    max_ts_ns: chunk.max_ts_ns,
315                    sealed: chunk.sealed,
316                    ttl_override_ns: chunk.ttl_override_ns,
317                    columnar_page: chunk.columnar_page,
318                });
319        }
320        registry
321            .list()
322            .into_iter()
323            .map(|spec| crate::physical::PhysicalHypertable {
324                chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
325                name: spec.name,
326                time_column: spec.time_column,
327                chunk_interval_ns: spec.chunk_interval_ns,
328                default_ttl_ns: spec.default_ttl_ns,
329            })
330            .collect()
331    }
332
333    /// Rehydrate the hypertable registry from the physical metadata
334    /// sidecar at boot. Mirror of
335    /// [`load_collection_ttl_defaults_from_metadata`] — specs and
336    /// chunks are restored verbatim so bounds / routing / TTL are
337    /// identical to the pre-restart state (issue #866).
338    pub(crate) fn load_hypertables_from_metadata(&self) {
339        let Some(metadata) = self.physical_metadata() else {
340            return;
341        };
342        if metadata.hypertables.is_empty() {
343            return;
344        }
345        let registry = self.hypertables();
346        for hypertable in &metadata.hypertables {
347            let mut spec = crate::storage::timeseries::HypertableSpec::new(
348                hypertable.name.clone(),
349                hypertable.time_column.clone(),
350                hypertable.chunk_interval_ns,
351            );
352            if let Some(ttl) = hypertable.default_ttl_ns {
353                spec = spec.with_ttl_ns(ttl);
354            }
355            registry.register(spec);
356            for chunk in &hypertable.chunks {
357                registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
358                    id: crate::storage::timeseries::ChunkId {
359                        hypertable: hypertable.name.clone(),
360                        start_ns: chunk.start_ns,
361                    },
362                    end_ns_exclusive: chunk.end_ns_exclusive,
363                    row_count: chunk.row_count,
364                    min_ts_ns: chunk.min_ts_ns,
365                    max_ts_ns: chunk.max_ts_ns,
366                    sealed: chunk.sealed,
367                    ttl_override_ns: chunk.ttl_override_ns,
368                    columnar_page: chunk.columnar_page,
369                });
370            }
371        }
372    }
373
374    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
375        self.store.run_maintenance()?;
376        self.persist_metadata()?;
377        Ok(())
378    }
379
380    /// Path to the physical metadata sidecar, if persistent.
381    pub fn metadata_path(&self) -> Option<PathBuf> {
382        self.path
383            .as_ref()
384            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
385    }
386
387    /// Load the current physical metadata view, bootstrapping from native state when needed.
388    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
389        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
390            .ok()
391    }
392
393    /// Physical index registry derived for the current database state.
394    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
395        let indexes = self
396            .physical_metadata()
397            .map(|metadata| metadata.indexes)
398            .filter(|indexes| !indexes.is_empty())
399            .or_else(|| {
400                self.native_physical_state()
401                    .map(|state| self.physical_index_state_from_native_state(&state, None))
402            })
403            .unwrap_or_else(|| self.physical_index_state());
404        self.reconcile_index_states_with_native_artifacts(indexes)
405    }
406
407    /// List registered named exports from the current physical metadata view.
408    pub fn exports(&self) -> Vec<ExportDescriptor> {
409        self.physical_metadata()
410            .map(|metadata| metadata.exports)
411            .or_else(|| {
412                self.native_physical_state()
413                    .map(|state| self.exports_from_native_state(&state))
414            })
415            .unwrap_or_default()
416    }
417
418    /// List recorded snapshots from the current physical metadata view.
419    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
420        if let Some(metadata) = self.physical_metadata() {
421            if !metadata.snapshots.is_empty() {
422                return metadata.snapshots;
423            }
424        }
425        self.native_physical_state()
426            .map(|state| self.snapshots_from_native_state(&state))
427            .unwrap_or_default()
428    }
429
430    /// List persisted named graph projections from the current physical metadata view.
431    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
432        self.physical_metadata()
433            .map(|metadata| metadata.graph_projections)
434            .or_else(|| {
435                self.native_physical_state()
436                    .map(|state| self.graph_projections_from_native_state(&state))
437            })
438            .unwrap_or_default()
439    }
440
441    /// List graph projections declared in the catalog view.
442    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
443        self.catalog_model_snapshot().declared_graph_projections
444    }
445
446    /// List graph projections currently observed in the operational view.
447    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
448        self.graph_projections()
449            .into_iter()
450            .filter(|projection| {
451                projection.last_materialized_sequence.is_some()
452                    || matches!(projection.state.as_str(), "materialized" | "stale")
453            })
454            .collect()
455    }
456
457    /// List persisted analytics job metadata from the current physical metadata view.
458    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
459        self.physical_metadata()
460            .map(|metadata| metadata.analytics_jobs)
461            .or_else(|| {
462                self.native_physical_state()
463                    .map(|state| self.analytics_jobs_from_native_state(&state))
464            })
465            .unwrap_or_default()
466    }
467
468    /// List analytics jobs declared in the catalog view.
469    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
470        self.catalog_model_snapshot().declared_analytics_jobs
471    }
472
473    /// List analytics jobs currently observed in the operational view.
474    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
475        self.analytics_jobs()
476            .into_iter()
477            .filter(|job| {
478                job.last_run_sequence.is_some()
479                    || matches!(
480                        job.state.as_str(),
481                        "running" | "completed" | "failed" | "queued" | "stale"
482                    )
483            })
484            .collect()
485    }
486
487    /// List indexes declared in the catalog view.
488    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
489        self.catalog_model_snapshot().declared_indexes
490    }
491
492    /// List indexes currently observed in the operational view.
493    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
494        self.catalog_model_snapshot().operational_indexes
495    }
496
497    /// List reconciled index status entries from the catalog snapshot.
498    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
499        self.catalog_model_snapshot().index_statuses
500    }
501
502    /// Resolve one index status entry from the catalog snapshot.
503    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
504        self.catalog_model_snapshot()
505            .index_statuses
506            .into_iter()
507            .find(|status| status.name == name)
508    }
509
510    /// Upsert a named graph projection in the persisted physical metadata.
511    pub fn save_graph_projection(
512        &self,
513        name: impl Into<String>,
514        node_labels: Vec<String>,
515        node_types: Vec<String>,
516        edge_labels: Vec<String>,
517        source: impl Into<String>,
518    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
519        let name = name.into();
520        let source = source.into();
521        self.update_physical_metadata(|metadata| {
522            let now = SystemTime::now()
523                .duration_since(UNIX_EPOCH)
524                .unwrap_or_default()
525                .as_millis();
526            let projection = if let Some(existing) = metadata
527                .graph_projections
528                .iter_mut()
529                .find(|projection| projection.name == name)
530            {
531                existing.updated_at_unix_ms = now;
532                existing.state = "declared".to_string();
533                existing.source = source.clone();
534                existing.node_labels = node_labels.clone();
535                existing.node_types = node_types.clone();
536                existing.edge_labels = edge_labels.clone();
537                existing.last_materialized_sequence = None;
538                existing.clone()
539            } else {
540                let projection = PhysicalGraphProjection {
541                    name: name.clone(),
542                    created_at_unix_ms: now,
543                    updated_at_unix_ms: now,
544                    state: "declared".to_string(),
545                    source: source.clone(),
546                    node_labels: node_labels.clone(),
547                    node_types: node_types.clone(),
548                    edge_labels: edge_labels.clone(),
549                    last_materialized_sequence: None,
550                };
551                metadata.graph_projections.push(projection.clone());
552                projection
553            };
554
555            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
556
557            metadata
558                .graph_projections
559                .sort_by(|left, right| left.name.cmp(&right.name));
560            projection
561        })
562    }
563
564    /// Mark a declared graph projection as materialized in the current physical metadata.
565    pub fn materialize_graph_projection(
566        &self,
567        name: &str,
568    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
569        self.update_physical_metadata(|metadata| {
570            let now = SystemTime::now()
571                .duration_since(UNIX_EPOCH)
572                .unwrap_or_default()
573                .as_millis();
574            let idx = metadata
575                .graph_projections
576                .iter()
577                .position(|projection| projection.name == name);
578            if let Some(idx) = idx {
579                metadata.graph_projections[idx].updated_at_unix_ms = now;
580                metadata.graph_projections[idx].state = "materialized".to_string();
581                metadata.graph_projections[idx].last_materialized_sequence =
582                    Some(metadata.superblock.sequence);
583                let result = metadata.graph_projections[idx].clone();
584                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
585                return Some(result);
586            }
587            None
588        })
589    }
590
591    /// Mark a declared graph projection as materializing.
592    pub fn mark_graph_projection_materializing(
593        &self,
594        name: &str,
595    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
596        self.update_physical_metadata(|metadata| {
597            let now = SystemTime::now()
598                .duration_since(UNIX_EPOCH)
599                .unwrap_or_default()
600                .as_millis();
601            let idx = metadata
602                .graph_projections
603                .iter()
604                .position(|projection| projection.name == name);
605            if let Some(idx) = idx {
606                metadata.graph_projections[idx].updated_at_unix_ms = now;
607                metadata.graph_projections[idx].state = "materializing".to_string();
608                metadata.graph_projections[idx].last_materialized_sequence = None;
609                let result = metadata.graph_projections[idx].clone();
610                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
611                return Some(result);
612            }
613            None
614        })
615    }
616
617    /// Mark a graph projection as failed.
618    pub fn fail_graph_projection(
619        &self,
620        name: &str,
621    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
622        self.update_physical_metadata(|metadata| {
623            let now = SystemTime::now()
624                .duration_since(UNIX_EPOCH)
625                .unwrap_or_default()
626                .as_millis();
627            let idx = metadata
628                .graph_projections
629                .iter()
630                .position(|projection| projection.name == name);
631            if let Some(idx) = idx {
632                metadata.graph_projections[idx].updated_at_unix_ms = now;
633                metadata.graph_projections[idx].state = "failed".to_string();
634                metadata.graph_projections[idx].last_materialized_sequence = None;
635                let result = metadata.graph_projections[idx].clone();
636                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
637                return Some(result);
638            }
639            None
640        })
641    }
642
643    /// Mark a graph projection as stale while preserving any last materialized sequence.
644    pub fn mark_graph_projection_stale(
645        &self,
646        name: &str,
647    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
648        self.update_physical_metadata(|metadata| {
649            let now = SystemTime::now()
650                .duration_since(UNIX_EPOCH)
651                .unwrap_or_default()
652                .as_millis();
653            let idx = metadata
654                .graph_projections
655                .iter()
656                .position(|projection| projection.name == name);
657            if let Some(idx) = idx {
658                metadata.graph_projections[idx].updated_at_unix_ms = now;
659                metadata.graph_projections[idx].state = "stale".to_string();
660                let result = metadata.graph_projections[idx].clone();
661                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
662                return Some(result);
663            }
664            None
665        })
666    }
667
668    fn mark_projection_dependent_jobs_stale(
669        metadata: &mut PhysicalMetadataFile,
670        projection_name: &str,
671        now: u128,
672    ) {
673        for job in metadata.analytics_jobs.iter_mut() {
674            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
675                job.state = "stale".to_string();
676                job.updated_at_unix_ms = now;
677            }
678        }
679    }
680
681    fn rearm_projection_dependent_jobs_declared(
682        metadata: &mut PhysicalMetadataFile,
683        projection_name: &str,
684        now: u128,
685    ) {
686        for job in metadata.analytics_jobs.iter_mut() {
687            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
688                job.state = "declared".to_string();
689                job.last_run_sequence = None;
690                job.updated_at_unix_ms = now;
691            }
692        }
693    }
694
695    /// Declare or update analytics job metadata without marking it as executed.
696    pub fn save_analytics_job(
697        &self,
698        kind: impl Into<String>,
699        projection: Option<String>,
700        metadata_entries: BTreeMap<String, String>,
701    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
702        let kind = kind.into();
703        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
704        if let Some(projection_name) = projection.as_deref() {
705            if !self.graph_projection_is_declared(projection_name) {
706                return Err(format!(
707                    "graph projection '{projection_name}' is not declared in physical metadata"
708                )
709                .into());
710            }
711        }
712
713        self.update_physical_metadata(|metadata| {
714            let now = SystemTime::now()
715                .duration_since(UNIX_EPOCH)
716                .unwrap_or_default()
717                .as_millis();
718
719            let job = if let Some(existing) = metadata
720                .analytics_jobs
721                .iter_mut()
722                .find(|job| job.id == job_id)
723            {
724                existing.kind = kind.clone();
725                existing.projection = projection.clone();
726                existing.updated_at_unix_ms = now;
727                existing.metadata = metadata_entries.clone();
728                if existing.last_run_sequence.is_none() {
729                    existing.state = "declared".to_string();
730                }
731                existing.clone()
732            } else {
733                let job = PhysicalAnalyticsJob {
734                    id: job_id.clone(),
735                    kind: kind.clone(),
736                    state: "declared".to_string(),
737                    projection: projection.clone(),
738                    created_at_unix_ms: now,
739                    updated_at_unix_ms: now,
740                    last_run_sequence: None,
741                    metadata: metadata_entries.clone(),
742                };
743                metadata.analytics_jobs.push(job.clone());
744                job
745            };
746
747            metadata
748                .analytics_jobs
749                .sort_by(|left, right| left.id.cmp(&right.id));
750            job
751        })
752    }
753
754    /// Record or update analytics job metadata in the persisted physical metadata.
755    pub fn record_analytics_job(
756        &self,
757        kind: impl Into<String>,
758        projection: Option<String>,
759        metadata_entries: BTreeMap<String, String>,
760    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
761        let kind = kind.into();
762        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
763        if let Some(projection_name) = projection.as_deref() {
764            if !self.graph_projection_is_declared(projection_name) {
765                return Err(format!(
766                    "graph projection '{projection_name}' is not declared in physical metadata"
767                )
768                .into());
769            }
770            if !self.graph_projection_is_operational(projection_name) {
771                return Err(format!(
772                    "graph projection '{projection_name}' is declared but not operationally materialized"
773                )
774                .into());
775            }
776        }
777
778        self.update_physical_metadata(|metadata| {
779            let now = SystemTime::now()
780                .duration_since(UNIX_EPOCH)
781                .unwrap_or_default()
782                .as_millis();
783
784            let existing = metadata
785                .analytics_jobs
786                .iter_mut()
787                .find(|job| job.id == job_id)?;
788            existing.state = "completed".to_string();
789            existing.kind = kind.clone();
790            existing.projection = projection.clone();
791            existing.updated_at_unix_ms = now;
792            existing.last_run_sequence = Some(metadata.superblock.sequence);
793            existing.metadata = metadata_entries.clone();
794            let job = existing.clone();
795
796            metadata
797                .analytics_jobs
798                .sort_by(|left, right| left.id.cmp(&right.id));
799            Some(job)
800        })
801        .and_then(|job| {
802            job.ok_or_else(|| {
803                format!("analytics job '{job_id}' is not declared in physical metadata").into()
804            })
805        })
806    }
807
808    /// Mark a declared analytics job as running.
809    pub fn queue_analytics_job(
810        &self,
811        kind: impl Into<String>,
812        projection: Option<String>,
813        metadata_entries: BTreeMap<String, String>,
814    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
815        let kind = kind.into();
816        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
817        if let Some(projection_name) = projection.as_deref() {
818            if !self.graph_projection_is_declared(projection_name) {
819                return Err(format!(
820                    "graph projection '{projection_name}' is not declared in physical metadata"
821                )
822                .into());
823            }
824            if !self.graph_projection_is_operational(projection_name) {
825                return Err(format!(
826                    "graph projection '{projection_name}' is declared but not operationally materialized"
827                )
828                .into());
829            }
830        }
831
832        self.update_physical_metadata(|metadata| {
833            let now = SystemTime::now()
834                .duration_since(UNIX_EPOCH)
835                .unwrap_or_default()
836                .as_millis();
837
838            let existing = metadata
839                .analytics_jobs
840                .iter_mut()
841                .find(|job| job.id == job_id)?;
842            existing.state = "queued".to_string();
843            existing.kind = kind.clone();
844            existing.projection = projection.clone();
845            existing.updated_at_unix_ms = now;
846            existing.metadata = metadata_entries.clone();
847            let job = existing.clone();
848
849            metadata
850                .analytics_jobs
851                .sort_by(|left, right| left.id.cmp(&right.id));
852            Some(job)
853        })
854        .and_then(|job| {
855            job.ok_or_else(|| {
856                format!("analytics job '{job_id}' is not declared in physical metadata").into()
857            })
858        })
859    }
860
861    /// Mark a declared analytics job as running.
862    pub fn start_analytics_job(
863        &self,
864        kind: impl Into<String>,
865        projection: Option<String>,
866        metadata_entries: BTreeMap<String, String>,
867    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
868        let kind = kind.into();
869        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
870        if let Some(projection_name) = projection.as_deref() {
871            if !self.graph_projection_is_declared(projection_name) {
872                return Err(format!(
873                    "graph projection '{projection_name}' is not declared in physical metadata"
874                )
875                .into());
876            }
877            if !self.graph_projection_is_operational(projection_name) {
878                return Err(format!(
879                    "graph projection '{projection_name}' is declared but not operationally materialized"
880                )
881                .into());
882            }
883        }
884
885        self.update_physical_metadata(|metadata| {
886            let now = SystemTime::now()
887                .duration_since(UNIX_EPOCH)
888                .unwrap_or_default()
889                .as_millis();
890
891            let existing = metadata
892                .analytics_jobs
893                .iter_mut()
894                .find(|job| job.id == job_id)?;
895            existing.state = "running".to_string();
896            existing.kind = kind.clone();
897            existing.projection = projection.clone();
898            existing.updated_at_unix_ms = now;
899            existing.metadata = metadata_entries.clone();
900            let job = existing.clone();
901
902            metadata
903                .analytics_jobs
904                .sort_by(|left, right| left.id.cmp(&right.id));
905            Some(job)
906        })
907        .and_then(|job| {
908            job.ok_or_else(|| {
909                format!("analytics job '{job_id}' is not declared in physical metadata").into()
910            })
911        })
912    }
913
914    /// Mark a declared analytics job as failed.
915    pub fn fail_analytics_job(
916        &self,
917        kind: impl Into<String>,
918        projection: Option<String>,
919        metadata_entries: BTreeMap<String, String>,
920    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
921        let kind = kind.into();
922        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
923
924        self.update_physical_metadata(|metadata| {
925            let now = SystemTime::now()
926                .duration_since(UNIX_EPOCH)
927                .unwrap_or_default()
928                .as_millis();
929
930            let existing = metadata
931                .analytics_jobs
932                .iter_mut()
933                .find(|job| job.id == job_id)?;
934            existing.state = "failed".to_string();
935            existing.kind = kind.clone();
936            existing.projection = projection.clone();
937            existing.updated_at_unix_ms = now;
938            existing.metadata = metadata_entries.clone();
939            let job = existing.clone();
940
941            metadata
942                .analytics_jobs
943                .sort_by(|left, right| left.id.cmp(&right.id));
944            Some(job)
945        })
946        .and_then(|job| {
947            job.ok_or_else(|| {
948                format!("analytics job '{job_id}' is not declared in physical metadata").into()
949            })
950        })
951    }
952
953    /// Mark a declared analytics job as stale.
954    pub fn mark_analytics_job_stale(
955        &self,
956        kind: impl Into<String>,
957        projection: Option<String>,
958        metadata_entries: BTreeMap<String, String>,
959    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
960        let kind = kind.into();
961        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
962
963        self.update_physical_metadata(|metadata| {
964            let now = SystemTime::now()
965                .duration_since(UNIX_EPOCH)
966                .unwrap_or_default()
967                .as_millis();
968
969            let existing = metadata
970                .analytics_jobs
971                .iter_mut()
972                .find(|job| job.id == job_id)?;
973            existing.state = "stale".to_string();
974            existing.kind = kind.clone();
975            existing.projection = projection.clone();
976            existing.updated_at_unix_ms = now;
977            existing.metadata = metadata_entries.clone();
978            let job = existing.clone();
979
980            metadata
981                .analytics_jobs
982                .sort_by(|left, right| left.id.cmp(&right.id));
983            Some(job)
984        })
985        .and_then(|job| {
986            job.ok_or_else(|| {
987                format!("analytics job '{job_id}' is not declared in physical metadata").into()
988            })
989        })
990    }
991
992    /// Create a named export by copying the current database file and metadata sidecar.
993    pub fn create_named_export(
994        &self,
995        name: impl Into<String>,
996    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
997        let name = name.into();
998        if self.options.mode != StorageMode::Persistent {
999            return Err("exports require persistent mode".into());
1000        }
1001        let Some(path) = self.path() else {
1002            return Err("database path is not available".into());
1003        };
1004
1005        self.flush()?;
1006
1007        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1008        let export_data_path = reddb_file::copy_physical_export_data_file(path, &name)?;
1009        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
1010        let export_metadata_binary_path =
1011            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
1012
1013        let descriptor = ExportDescriptor {
1014            name: name.clone(),
1015            created_at_unix_ms: SystemTime::now()
1016                .duration_since(UNIX_EPOCH)
1017                .unwrap_or_default()
1018                .as_millis(),
1019            snapshot_id: metadata
1020                .snapshots
1021                .last()
1022                .map(|snapshot| snapshot.snapshot_id),
1023            superblock_sequence: metadata.superblock.sequence,
1024            data_path: export_data_path.display().to_string(),
1025            metadata_path: export_metadata_path.display().to_string(),
1026            collection_count: metadata.catalog.total_collections,
1027            total_entities: metadata.catalog.total_entities,
1028        };
1029
1030        metadata
1031            .exports
1032            .retain(|export| export.name != descriptor.name);
1033        metadata.exports.push(descriptor.clone());
1034        self.prune_export_registry(&mut metadata.exports);
1035        metadata.save_for_data_path(path)?;
1036        metadata.save_to_binary_path(&export_metadata_binary_path)?;
1037        metadata.save_to_path(&export_metadata_path)?;
1038
1039        Ok(descriptor)
1040    }
1041
1042    /// Enable or disable a physical index entry in the persisted registry.
1043    pub fn set_index_enabled(
1044        &self,
1045        name: &str,
1046        enabled: bool,
1047    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1048        let Some(status) = self.index_status(name) else {
1049            return Err(format!("index '{name}' is not present in catalog status").into());
1050        };
1051        if !status.declared {
1052            return Err(format!("index '{name}' is not declared in physical metadata").into());
1053        }
1054        self.update_physical_metadata(|metadata| {
1055            let now = SystemTime::now()
1056                .duration_since(UNIX_EPOCH)
1057                .unwrap_or_default()
1058                .as_millis();
1059            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1060                index.enabled = enabled;
1061                if !enabled {
1062                    index.build_state = "disabled".to_string();
1063                } else if index.build_state == "disabled" {
1064                    index.build_state = if index.artifact_root_page.is_some() {
1065                        "ready".to_string()
1066                    } else {
1067                        "declared-unbuilt".to_string()
1068                    };
1069                }
1070                index.last_refresh_ms = Some(now);
1071                return Some(index.clone());
1072            }
1073            None
1074        })
1075    }
1076
1077    /// Mark a declared physical index as building in the persisted registry.
1078    pub fn mark_index_building(
1079        &self,
1080        name: &str,
1081    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1082        let Some(status) = self.index_status(name) else {
1083            return Err(format!("index '{name}' is not present in catalog status").into());
1084        };
1085        if !status.declared {
1086            return Err(format!("index '{name}' is not declared in physical metadata").into());
1087        }
1088        if status.lifecycle_state == "disabled" {
1089            return Err(format!("index '{name}' is disabled").into());
1090        }
1091        self.update_physical_metadata(|metadata| {
1092            let now = SystemTime::now()
1093                .duration_since(UNIX_EPOCH)
1094                .unwrap_or_default()
1095                .as_millis();
1096            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1097                index.build_state = "building".to_string();
1098                index.last_refresh_ms = Some(now);
1099                return Some(index.clone());
1100            }
1101            None
1102        })
1103    }
1104
1105    /// Mark a declared physical index as failed in the persisted registry.
1106    pub fn fail_index(
1107        &self,
1108        name: &str,
1109    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1110        let Some(status) = self.index_status(name) else {
1111            return Err(format!("index '{name}' is not present in catalog status").into());
1112        };
1113        if !status.declared {
1114            return Err(format!("index '{name}' is not declared in physical metadata").into());
1115        }
1116        self.update_physical_metadata(|metadata| {
1117            let now = SystemTime::now()
1118                .duration_since(UNIX_EPOCH)
1119                .unwrap_or_default()
1120                .as_millis();
1121            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1122                index.build_state = "failed".to_string();
1123                index.last_refresh_ms = Some(now);
1124                return Some(index.clone());
1125            }
1126            None
1127        })
1128    }
1129
1130    /// Mark a declared physical index as stale in the persisted registry.
1131    pub fn mark_index_stale(
1132        &self,
1133        name: &str,
1134    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1135        let Some(status) = self.index_status(name) else {
1136            return Err(format!("index '{name}' is not present in catalog status").into());
1137        };
1138        if !status.declared {
1139            return Err(format!("index '{name}' is not declared in physical metadata").into());
1140        }
1141        self.update_physical_metadata(|metadata| {
1142            let now = SystemTime::now()
1143                .duration_since(UNIX_EPOCH)
1144                .unwrap_or_default()
1145                .as_millis();
1146            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1147                index.build_state = if index.enabled {
1148                    "stale".to_string()
1149                } else {
1150                    "disabled".to_string()
1151                };
1152                index.last_refresh_ms = Some(now);
1153                return Some(index.clone());
1154            }
1155            None
1156        })
1157    }
1158
1159    /// Mark a declared physical index as ready in the persisted registry.
1160    pub fn mark_index_ready(
1161        &self,
1162        name: &str,
1163    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1164        self.warmup_index(name)
1165    }
1166
1167    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1168    pub fn warmup_index(
1169        &self,
1170        name: &str,
1171    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1172        let Some(status) = self.index_status(name) else {
1173            return Err(format!("index '{name}' is not present in catalog status").into());
1174        };
1175        if !status.declared {
1176            return Err(format!("index '{name}' is not declared in physical metadata").into());
1177        }
1178        if status.lifecycle_state == "disabled" {
1179            return Err(format!("index '{name}' is disabled").into());
1180        }
1181        if !status.operational {
1182            return Err(
1183                format!("index '{name}' is declared but not operationally materialized").into(),
1184            );
1185        }
1186        let warmed_artifact = self
1187            .physical_indexes()
1188            .into_iter()
1189            .find(|index| index.name == name)
1190            .map(|mut index| {
1191                self.warmup_native_vector_artifact_for_index(&index)?;
1192                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1193                Ok::<_, String>(index)
1194            })
1195            .transpose()
1196            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1197
1198        self.update_physical_metadata(|metadata| {
1199            let now = SystemTime::now()
1200                .duration_since(UNIX_EPOCH)
1201                .unwrap_or_default()
1202                .as_millis();
1203            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1204                if let Some(warmed) = warmed_artifact.as_ref() {
1205                    index.entries = warmed.entries;
1206                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1207                    index.backend = warmed.backend.clone();
1208                    index.build_state = "ready".to_string();
1209                }
1210                index.last_refresh_ms = Some(now);
1211                return Some(index.clone());
1212            }
1213            None
1214        })
1215    }
1216
1217    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1218    pub fn rebuild_index_registry(
1219        &self,
1220        collection: Option<&str>,
1221    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1222        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1223        self.update_physical_metadata(|metadata| {
1224            let now = SystemTime::now()
1225                .duration_since(UNIX_EPOCH)
1226                .unwrap_or_default()
1227                .as_millis();
1228
1229            let mut affected = Vec::new();
1230            let declared = metadata.indexes.clone();
1231            for declared_index in declared {
1232                let matches_collection = collection.is_none_or(|collection_name| {
1233                    declared_index.collection.as_deref() == Some(collection_name)
1234                });
1235                if !matches_collection {
1236                    continue;
1237                }
1238
1239                let mut rebuilt = fresh
1240                    .iter()
1241                    .find(|index| index.name == declared_index.name)
1242                    .cloned()
1243                    .unwrap_or_else(|| {
1244                        let mut index = declared_index.clone();
1245                        index.build_state = "declared-unbuilt".to_string();
1246                        index
1247                    });
1248                rebuilt.enabled = declared_index.enabled;
1249                rebuilt.artifact_kind = rebuilt
1250                    .artifact_kind
1251                    .or_else(|| declared_index.artifact_kind.clone());
1252                rebuilt.artifact_root_page = rebuilt
1253                    .artifact_root_page
1254                    .or(declared_index.artifact_root_page);
1255                rebuilt.artifact_checksum = rebuilt
1256                    .artifact_checksum
1257                    .or(declared_index.artifact_checksum);
1258                rebuilt.build_state =
1259                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1260                rebuilt.last_refresh_ms = Some(now);
1261
1262                if let Some(existing) = metadata
1263                    .indexes
1264                    .iter_mut()
1265                    .find(|index| index.name == rebuilt.name)
1266                {
1267                    *existing = rebuilt.clone();
1268                } else {
1269                    metadata.indexes.push(rebuilt.clone());
1270                }
1271
1272                affected.push(rebuilt);
1273            }
1274
1275            affected
1276        })
1277    }
1278
1279    fn finalize_rebuilt_index_build_state(
1280        declared: &PhysicalIndexState,
1281        rebuilt: &PhysicalIndexState,
1282    ) -> String {
1283        if !rebuilt.enabled {
1284            return "disabled".to_string();
1285        }
1286
1287        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1288            return "failed".to_string();
1289        }
1290
1291        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1292        if native_artifact_family {
1293            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1294                return "ready".to_string();
1295            }
1296            if declared.artifact_root_page.is_some()
1297                || declared.artifact_checksum.is_some()
1298                || declared.artifact_kind.is_some()
1299            {
1300                return "stale".to_string();
1301            }
1302            return "declared-unbuilt".to_string();
1303        }
1304
1305        if rebuilt.entries > 0 {
1306            return "ready".to_string();
1307        }
1308
1309        if matches!(
1310            declared.build_state.as_str(),
1311            "stale" | "artifact-published" | "registry-loaded"
1312        ) {
1313            return "stale".to_string();
1314        }
1315
1316        "declared-unbuilt".to_string()
1317    }
1318}