Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    pub fn save_collection_contract(
141        &self,
142        contract: crate::physical::CollectionContract,
143    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145            if let Some(ttl_ms) = contract.default_ttl_ms {
146                defaults.insert(contract.name.clone(), ttl_ms);
147            } else {
148                defaults.remove(&contract.name);
149            }
150        }
151
152        self.store()
153            .context_index()
154            .set_collection_enabled(&contract.name, contract.context_index_enabled);
155        if let Some(manager) = self.store().get_collection(&contract.name) {
156            let columns = contract
157                .declared_columns
158                .iter()
159                .map(|column| column.name.clone())
160                .collect();
161            manager.set_column_schema_if_empty(columns);
162        }
163
164        self.invalidate_collection_contract_cache();
165
166        self.update_physical_metadata(|metadata| {
167            if let Some(existing) = metadata
168                .collection_contracts
169                .iter_mut()
170                .find(|existing| existing.name == contract.name)
171            {
172                *existing = contract.clone();
173            } else {
174                metadata.collection_contracts.push(contract.clone());
175            }
176            metadata
177                .collection_contracts
178                .sort_by(|left, right| left.name.cmp(&right.name));
179
180            if let Some(ttl_ms) = contract.default_ttl_ms {
181                metadata
182                    .collection_ttl_defaults_ms
183                    .insert(contract.name.clone(), ttl_ms);
184            } else {
185                metadata.collection_ttl_defaults_ms.remove(&contract.name);
186            }
187
188            contract.clone()
189        })
190    }
191
192    pub fn remove_collection_contract(
193        &self,
194        collection: &str,
195    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
196        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
197            defaults.remove(collection);
198        }
199
200        self.store()
201            .context_index()
202            .set_collection_enabled(collection, false);
203
204        self.invalidate_collection_contract_cache();
205
206        self.update_physical_metadata(|metadata| {
207            let removed = metadata
208                .collection_contracts
209                .iter()
210                .position(|contract| contract.name == collection)
211                .map(|index| metadata.collection_contracts.remove(index));
212            metadata.collection_ttl_defaults_ms.remove(collection);
213            metadata
214                .indexes
215                .retain(|index| index.collection.as_deref() != Some(collection));
216            removed
217        })
218    }
219
220    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
221        self.collection_ttl_defaults_ms
222            .read()
223            .map(|defaults| {
224                defaults
225                    .iter()
226                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
227                    .collect()
228            })
229            .unwrap_or_default()
230    }
231
232    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
233        let metadata = self.physical_metadata();
234        let defaults = metadata
235            .as_ref()
236            .map(|m| m.collection_ttl_defaults_ms.clone())
237            .unwrap_or_default();
238
239        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
240            current.clear();
241            current.extend(defaults);
242        }
243
244        if let Some(metadata) = metadata {
245            let store = self.store();
246            let index = store.context_index();
247            for contract in &metadata.collection_contracts {
248                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
249            }
250        }
251    }
252
253    /// Snapshot the live hypertable registry (specs + every chunk)
254    /// into the sidecar-serialisable form. Called from
255    /// [`persist_metadata`] so the chunk routing spine is durable on
256    /// the same metadata path as collection contracts — issue #866.
257    /// Returns an empty vec (and touches nothing) when no hypertable
258    /// has ever been declared, so non-hypertable databases pay no
259    /// persist cost.
260    pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
261        let registry = self.hypertables();
262        if registry.is_empty() {
263            return Vec::new();
264        }
265        use std::collections::BTreeMap;
266        let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
267            BTreeMap::new();
268        for chunk in registry.snapshot_chunks() {
269            chunks_by_table
270                .entry(chunk.id.hypertable.clone())
271                .or_default()
272                .push(crate::physical::PhysicalHypertableChunk {
273                    start_ns: chunk.id.start_ns,
274                    end_ns_exclusive: chunk.end_ns_exclusive,
275                    row_count: chunk.row_count,
276                    min_ts_ns: chunk.min_ts_ns,
277                    max_ts_ns: chunk.max_ts_ns,
278                    sealed: chunk.sealed,
279                    ttl_override_ns: chunk.ttl_override_ns,
280                    columnar_page: chunk.columnar_page,
281                });
282        }
283        registry
284            .list()
285            .into_iter()
286            .map(|spec| crate::physical::PhysicalHypertable {
287                chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
288                name: spec.name,
289                time_column: spec.time_column,
290                chunk_interval_ns: spec.chunk_interval_ns,
291                default_ttl_ns: spec.default_ttl_ns,
292            })
293            .collect()
294    }
295
296    /// Rehydrate the hypertable registry from the physical metadata
297    /// sidecar at boot. Mirror of
298    /// [`load_collection_ttl_defaults_from_metadata`] — specs and
299    /// chunks are restored verbatim so bounds / routing / TTL are
300    /// identical to the pre-restart state (issue #866).
301    pub(crate) fn load_hypertables_from_metadata(&self) {
302        let Some(metadata) = self.physical_metadata() else {
303            return;
304        };
305        if metadata.hypertables.is_empty() {
306            return;
307        }
308        let registry = self.hypertables();
309        for hypertable in &metadata.hypertables {
310            let mut spec = crate::storage::timeseries::HypertableSpec::new(
311                hypertable.name.clone(),
312                hypertable.time_column.clone(),
313                hypertable.chunk_interval_ns,
314            );
315            if let Some(ttl) = hypertable.default_ttl_ns {
316                spec = spec.with_ttl_ns(ttl);
317            }
318            registry.register(spec);
319            for chunk in &hypertable.chunks {
320                registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
321                    id: crate::storage::timeseries::ChunkId {
322                        hypertable: hypertable.name.clone(),
323                        start_ns: chunk.start_ns,
324                    },
325                    end_ns_exclusive: chunk.end_ns_exclusive,
326                    row_count: chunk.row_count,
327                    min_ts_ns: chunk.min_ts_ns,
328                    max_ts_ns: chunk.max_ts_ns,
329                    sealed: chunk.sealed,
330                    ttl_override_ns: chunk.ttl_override_ns,
331                    columnar_page: chunk.columnar_page,
332                });
333            }
334        }
335    }
336
337    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
338        self.store.run_maintenance()?;
339        self.persist_metadata()?;
340        Ok(())
341    }
342
343    /// Path to the physical metadata sidecar, if persistent.
344    pub fn metadata_path(&self) -> Option<PathBuf> {
345        self.path
346            .as_ref()
347            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
348    }
349
350    /// Load the current physical metadata view, bootstrapping from native state when needed.
351    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
352        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
353            .ok()
354    }
355
356    /// Physical index registry derived for the current database state.
357    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
358        let indexes = self
359            .physical_metadata()
360            .map(|metadata| metadata.indexes)
361            .filter(|indexes| !indexes.is_empty())
362            .or_else(|| {
363                self.native_physical_state()
364                    .map(|state| self.physical_index_state_from_native_state(&state, None))
365            })
366            .unwrap_or_else(|| self.physical_index_state());
367        self.reconcile_index_states_with_native_artifacts(indexes)
368    }
369
370    /// List registered named exports from the current physical metadata view.
371    pub fn exports(&self) -> Vec<ExportDescriptor> {
372        self.physical_metadata()
373            .map(|metadata| metadata.exports)
374            .or_else(|| {
375                self.native_physical_state()
376                    .map(|state| self.exports_from_native_state(&state))
377            })
378            .unwrap_or_default()
379    }
380
381    /// List recorded snapshots from the current physical metadata view.
382    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
383        self.physical_metadata()
384            .map(|metadata| metadata.snapshots)
385            .or_else(|| {
386                self.native_physical_state()
387                    .map(|state| self.snapshots_from_native_state(&state))
388            })
389            .unwrap_or_default()
390    }
391
392    /// List persisted named graph projections from the current physical metadata view.
393    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
394        self.physical_metadata()
395            .map(|metadata| metadata.graph_projections)
396            .or_else(|| {
397                self.native_physical_state()
398                    .map(|state| self.graph_projections_from_native_state(&state))
399            })
400            .unwrap_or_default()
401    }
402
403    /// List graph projections declared in the catalog view.
404    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
405        self.catalog_model_snapshot().declared_graph_projections
406    }
407
408    /// List graph projections currently observed in the operational view.
409    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
410        self.graph_projections()
411            .into_iter()
412            .filter(|projection| {
413                projection.last_materialized_sequence.is_some()
414                    || matches!(projection.state.as_str(), "materialized" | "stale")
415            })
416            .collect()
417    }
418
419    /// List persisted analytics job metadata from the current physical metadata view.
420    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
421        self.physical_metadata()
422            .map(|metadata| metadata.analytics_jobs)
423            .or_else(|| {
424                self.native_physical_state()
425                    .map(|state| self.analytics_jobs_from_native_state(&state))
426            })
427            .unwrap_or_default()
428    }
429
430    /// List analytics jobs declared in the catalog view.
431    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
432        self.catalog_model_snapshot().declared_analytics_jobs
433    }
434
435    /// List analytics jobs currently observed in the operational view.
436    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
437        self.analytics_jobs()
438            .into_iter()
439            .filter(|job| {
440                job.last_run_sequence.is_some()
441                    || matches!(
442                        job.state.as_str(),
443                        "running" | "completed" | "failed" | "queued" | "stale"
444                    )
445            })
446            .collect()
447    }
448
449    /// List indexes declared in the catalog view.
450    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
451        self.catalog_model_snapshot().declared_indexes
452    }
453
454    /// List indexes currently observed in the operational view.
455    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
456        self.catalog_model_snapshot().operational_indexes
457    }
458
459    /// List reconciled index status entries from the catalog snapshot.
460    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
461        self.catalog_model_snapshot().index_statuses
462    }
463
464    /// Resolve one index status entry from the catalog snapshot.
465    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
466        self.catalog_model_snapshot()
467            .index_statuses
468            .into_iter()
469            .find(|status| status.name == name)
470    }
471
472    /// Upsert a named graph projection in the persisted physical metadata.
473    pub fn save_graph_projection(
474        &self,
475        name: impl Into<String>,
476        node_labels: Vec<String>,
477        node_types: Vec<String>,
478        edge_labels: Vec<String>,
479        source: impl Into<String>,
480    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
481        let name = name.into();
482        let source = source.into();
483        self.update_physical_metadata(|metadata| {
484            let now = SystemTime::now()
485                .duration_since(UNIX_EPOCH)
486                .unwrap_or_default()
487                .as_millis();
488            let projection = if let Some(existing) = metadata
489                .graph_projections
490                .iter_mut()
491                .find(|projection| projection.name == name)
492            {
493                existing.updated_at_unix_ms = now;
494                existing.state = "declared".to_string();
495                existing.source = source.clone();
496                existing.node_labels = node_labels.clone();
497                existing.node_types = node_types.clone();
498                existing.edge_labels = edge_labels.clone();
499                existing.last_materialized_sequence = None;
500                existing.clone()
501            } else {
502                let projection = PhysicalGraphProjection {
503                    name: name.clone(),
504                    created_at_unix_ms: now,
505                    updated_at_unix_ms: now,
506                    state: "declared".to_string(),
507                    source: source.clone(),
508                    node_labels: node_labels.clone(),
509                    node_types: node_types.clone(),
510                    edge_labels: edge_labels.clone(),
511                    last_materialized_sequence: None,
512                };
513                metadata.graph_projections.push(projection.clone());
514                projection
515            };
516
517            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
518
519            metadata
520                .graph_projections
521                .sort_by(|left, right| left.name.cmp(&right.name));
522            projection
523        })
524    }
525
526    /// Mark a declared graph projection as materialized in the current physical metadata.
527    pub fn materialize_graph_projection(
528        &self,
529        name: &str,
530    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
531        self.update_physical_metadata(|metadata| {
532            let now = SystemTime::now()
533                .duration_since(UNIX_EPOCH)
534                .unwrap_or_default()
535                .as_millis();
536            let idx = metadata
537                .graph_projections
538                .iter()
539                .position(|projection| projection.name == name);
540            if let Some(idx) = idx {
541                metadata.graph_projections[idx].updated_at_unix_ms = now;
542                metadata.graph_projections[idx].state = "materialized".to_string();
543                metadata.graph_projections[idx].last_materialized_sequence =
544                    Some(metadata.superblock.sequence);
545                let result = metadata.graph_projections[idx].clone();
546                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
547                return Some(result);
548            }
549            None
550        })
551    }
552
553    /// Mark a declared graph projection as materializing.
554    pub fn mark_graph_projection_materializing(
555        &self,
556        name: &str,
557    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
558        self.update_physical_metadata(|metadata| {
559            let now = SystemTime::now()
560                .duration_since(UNIX_EPOCH)
561                .unwrap_or_default()
562                .as_millis();
563            let idx = metadata
564                .graph_projections
565                .iter()
566                .position(|projection| projection.name == name);
567            if let Some(idx) = idx {
568                metadata.graph_projections[idx].updated_at_unix_ms = now;
569                metadata.graph_projections[idx].state = "materializing".to_string();
570                metadata.graph_projections[idx].last_materialized_sequence = None;
571                let result = metadata.graph_projections[idx].clone();
572                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
573                return Some(result);
574            }
575            None
576        })
577    }
578
579    /// Mark a graph projection as failed.
580    pub fn fail_graph_projection(
581        &self,
582        name: &str,
583    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
584        self.update_physical_metadata(|metadata| {
585            let now = SystemTime::now()
586                .duration_since(UNIX_EPOCH)
587                .unwrap_or_default()
588                .as_millis();
589            let idx = metadata
590                .graph_projections
591                .iter()
592                .position(|projection| projection.name == name);
593            if let Some(idx) = idx {
594                metadata.graph_projections[idx].updated_at_unix_ms = now;
595                metadata.graph_projections[idx].state = "failed".to_string();
596                metadata.graph_projections[idx].last_materialized_sequence = None;
597                let result = metadata.graph_projections[idx].clone();
598                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
599                return Some(result);
600            }
601            None
602        })
603    }
604
605    /// Mark a graph projection as stale while preserving any last materialized sequence.
606    pub fn mark_graph_projection_stale(
607        &self,
608        name: &str,
609    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
610        self.update_physical_metadata(|metadata| {
611            let now = SystemTime::now()
612                .duration_since(UNIX_EPOCH)
613                .unwrap_or_default()
614                .as_millis();
615            let idx = metadata
616                .graph_projections
617                .iter()
618                .position(|projection| projection.name == name);
619            if let Some(idx) = idx {
620                metadata.graph_projections[idx].updated_at_unix_ms = now;
621                metadata.graph_projections[idx].state = "stale".to_string();
622                let result = metadata.graph_projections[idx].clone();
623                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
624                return Some(result);
625            }
626            None
627        })
628    }
629
630    fn mark_projection_dependent_jobs_stale(
631        metadata: &mut PhysicalMetadataFile,
632        projection_name: &str,
633        now: u128,
634    ) {
635        for job in metadata.analytics_jobs.iter_mut() {
636            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
637                job.state = "stale".to_string();
638                job.updated_at_unix_ms = now;
639            }
640        }
641    }
642
643    fn rearm_projection_dependent_jobs_declared(
644        metadata: &mut PhysicalMetadataFile,
645        projection_name: &str,
646        now: u128,
647    ) {
648        for job in metadata.analytics_jobs.iter_mut() {
649            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
650                job.state = "declared".to_string();
651                job.last_run_sequence = None;
652                job.updated_at_unix_ms = now;
653            }
654        }
655    }
656
657    /// Declare or update analytics job metadata without marking it as executed.
658    pub fn save_analytics_job(
659        &self,
660        kind: impl Into<String>,
661        projection: Option<String>,
662        metadata_entries: BTreeMap<String, String>,
663    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
664        let kind = kind.into();
665        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
666        if let Some(projection_name) = projection.as_deref() {
667            if !self.graph_projection_is_declared(projection_name) {
668                return Err(format!(
669                    "graph projection '{projection_name}' is not declared in physical metadata"
670                )
671                .into());
672            }
673        }
674
675        self.update_physical_metadata(|metadata| {
676            let now = SystemTime::now()
677                .duration_since(UNIX_EPOCH)
678                .unwrap_or_default()
679                .as_millis();
680
681            let job = if let Some(existing) = metadata
682                .analytics_jobs
683                .iter_mut()
684                .find(|job| job.id == job_id)
685            {
686                existing.kind = kind.clone();
687                existing.projection = projection.clone();
688                existing.updated_at_unix_ms = now;
689                existing.metadata = metadata_entries.clone();
690                if existing.last_run_sequence.is_none() {
691                    existing.state = "declared".to_string();
692                }
693                existing.clone()
694            } else {
695                let job = PhysicalAnalyticsJob {
696                    id: job_id.clone(),
697                    kind: kind.clone(),
698                    state: "declared".to_string(),
699                    projection: projection.clone(),
700                    created_at_unix_ms: now,
701                    updated_at_unix_ms: now,
702                    last_run_sequence: None,
703                    metadata: metadata_entries.clone(),
704                };
705                metadata.analytics_jobs.push(job.clone());
706                job
707            };
708
709            metadata
710                .analytics_jobs
711                .sort_by(|left, right| left.id.cmp(&right.id));
712            job
713        })
714    }
715
716    /// Record or update analytics job metadata in the persisted physical metadata.
717    pub fn record_analytics_job(
718        &self,
719        kind: impl Into<String>,
720        projection: Option<String>,
721        metadata_entries: BTreeMap<String, String>,
722    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
723        let kind = kind.into();
724        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
725        if let Some(projection_name) = projection.as_deref() {
726            if !self.graph_projection_is_declared(projection_name) {
727                return Err(format!(
728                    "graph projection '{projection_name}' is not declared in physical metadata"
729                )
730                .into());
731            }
732            if !self.graph_projection_is_operational(projection_name) {
733                return Err(format!(
734                    "graph projection '{projection_name}' is declared but not operationally materialized"
735                )
736                .into());
737            }
738        }
739
740        self.update_physical_metadata(|metadata| {
741            let now = SystemTime::now()
742                .duration_since(UNIX_EPOCH)
743                .unwrap_or_default()
744                .as_millis();
745
746            let existing = metadata
747                .analytics_jobs
748                .iter_mut()
749                .find(|job| job.id == job_id)?;
750            existing.state = "completed".to_string();
751            existing.kind = kind.clone();
752            existing.projection = projection.clone();
753            existing.updated_at_unix_ms = now;
754            existing.last_run_sequence = Some(metadata.superblock.sequence);
755            existing.metadata = metadata_entries.clone();
756            let job = existing.clone();
757
758            metadata
759                .analytics_jobs
760                .sort_by(|left, right| left.id.cmp(&right.id));
761            Some(job)
762        })
763        .and_then(|job| {
764            job.ok_or_else(|| {
765                format!("analytics job '{job_id}' is not declared in physical metadata").into()
766            })
767        })
768    }
769
770    /// Mark a declared analytics job as running.
771    pub fn queue_analytics_job(
772        &self,
773        kind: impl Into<String>,
774        projection: Option<String>,
775        metadata_entries: BTreeMap<String, String>,
776    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
777        let kind = kind.into();
778        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
779        if let Some(projection_name) = projection.as_deref() {
780            if !self.graph_projection_is_declared(projection_name) {
781                return Err(format!(
782                    "graph projection '{projection_name}' is not declared in physical metadata"
783                )
784                .into());
785            }
786            if !self.graph_projection_is_operational(projection_name) {
787                return Err(format!(
788                    "graph projection '{projection_name}' is declared but not operationally materialized"
789                )
790                .into());
791            }
792        }
793
794        self.update_physical_metadata(|metadata| {
795            let now = SystemTime::now()
796                .duration_since(UNIX_EPOCH)
797                .unwrap_or_default()
798                .as_millis();
799
800            let existing = metadata
801                .analytics_jobs
802                .iter_mut()
803                .find(|job| job.id == job_id)?;
804            existing.state = "queued".to_string();
805            existing.kind = kind.clone();
806            existing.projection = projection.clone();
807            existing.updated_at_unix_ms = now;
808            existing.metadata = metadata_entries.clone();
809            let job = existing.clone();
810
811            metadata
812                .analytics_jobs
813                .sort_by(|left, right| left.id.cmp(&right.id));
814            Some(job)
815        })
816        .and_then(|job| {
817            job.ok_or_else(|| {
818                format!("analytics job '{job_id}' is not declared in physical metadata").into()
819            })
820        })
821    }
822
823    /// Mark a declared analytics job as running.
824    pub fn start_analytics_job(
825        &self,
826        kind: impl Into<String>,
827        projection: Option<String>,
828        metadata_entries: BTreeMap<String, String>,
829    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
830        let kind = kind.into();
831        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
832        if let Some(projection_name) = projection.as_deref() {
833            if !self.graph_projection_is_declared(projection_name) {
834                return Err(format!(
835                    "graph projection '{projection_name}' is not declared in physical metadata"
836                )
837                .into());
838            }
839            if !self.graph_projection_is_operational(projection_name) {
840                return Err(format!(
841                    "graph projection '{projection_name}' is declared but not operationally materialized"
842                )
843                .into());
844            }
845        }
846
847        self.update_physical_metadata(|metadata| {
848            let now = SystemTime::now()
849                .duration_since(UNIX_EPOCH)
850                .unwrap_or_default()
851                .as_millis();
852
853            let existing = metadata
854                .analytics_jobs
855                .iter_mut()
856                .find(|job| job.id == job_id)?;
857            existing.state = "running".to_string();
858            existing.kind = kind.clone();
859            existing.projection = projection.clone();
860            existing.updated_at_unix_ms = now;
861            existing.metadata = metadata_entries.clone();
862            let job = existing.clone();
863
864            metadata
865                .analytics_jobs
866                .sort_by(|left, right| left.id.cmp(&right.id));
867            Some(job)
868        })
869        .and_then(|job| {
870            job.ok_or_else(|| {
871                format!("analytics job '{job_id}' is not declared in physical metadata").into()
872            })
873        })
874    }
875
876    /// Mark a declared analytics job as failed.
877    pub fn fail_analytics_job(
878        &self,
879        kind: impl Into<String>,
880        projection: Option<String>,
881        metadata_entries: BTreeMap<String, String>,
882    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
883        let kind = kind.into();
884        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
885
886        self.update_physical_metadata(|metadata| {
887            let now = SystemTime::now()
888                .duration_since(UNIX_EPOCH)
889                .unwrap_or_default()
890                .as_millis();
891
892            let existing = metadata
893                .analytics_jobs
894                .iter_mut()
895                .find(|job| job.id == job_id)?;
896            existing.state = "failed".to_string();
897            existing.kind = kind.clone();
898            existing.projection = projection.clone();
899            existing.updated_at_unix_ms = now;
900            existing.metadata = metadata_entries.clone();
901            let job = existing.clone();
902
903            metadata
904                .analytics_jobs
905                .sort_by(|left, right| left.id.cmp(&right.id));
906            Some(job)
907        })
908        .and_then(|job| {
909            job.ok_or_else(|| {
910                format!("analytics job '{job_id}' is not declared in physical metadata").into()
911            })
912        })
913    }
914
915    /// Mark a declared analytics job as stale.
916    pub fn mark_analytics_job_stale(
917        &self,
918        kind: impl Into<String>,
919        projection: Option<String>,
920        metadata_entries: BTreeMap<String, String>,
921    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
922        let kind = kind.into();
923        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
924
925        self.update_physical_metadata(|metadata| {
926            let now = SystemTime::now()
927                .duration_since(UNIX_EPOCH)
928                .unwrap_or_default()
929                .as_millis();
930
931            let existing = metadata
932                .analytics_jobs
933                .iter_mut()
934                .find(|job| job.id == job_id)?;
935            existing.state = "stale".to_string();
936            existing.kind = kind.clone();
937            existing.projection = projection.clone();
938            existing.updated_at_unix_ms = now;
939            existing.metadata = metadata_entries.clone();
940            let job = existing.clone();
941
942            metadata
943                .analytics_jobs
944                .sort_by(|left, right| left.id.cmp(&right.id));
945            Some(job)
946        })
947        .and_then(|job| {
948            job.ok_or_else(|| {
949                format!("analytics job '{job_id}' is not declared in physical metadata").into()
950            })
951        })
952    }
953
954    /// Create a named export by copying the current database file and metadata sidecar.
955    pub fn create_named_export(
956        &self,
957        name: impl Into<String>,
958    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
959        let name = name.into();
960        if self.options.mode != StorageMode::Persistent {
961            return Err("exports require persistent mode".into());
962        }
963        let Some(path) = self.path() else {
964            return Err("database path is not available".into());
965        };
966
967        self.flush()?;
968
969        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
970        let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
971        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
972        let export_metadata_binary_path =
973            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
974
975        fs::copy(path, &export_data_path)?;
976
977        let descriptor = ExportDescriptor {
978            name: name.clone(),
979            created_at_unix_ms: SystemTime::now()
980                .duration_since(UNIX_EPOCH)
981                .unwrap_or_default()
982                .as_millis(),
983            snapshot_id: metadata
984                .snapshots
985                .last()
986                .map(|snapshot| snapshot.snapshot_id),
987            superblock_sequence: metadata.superblock.sequence,
988            data_path: export_data_path.display().to_string(),
989            metadata_path: export_metadata_path.display().to_string(),
990            collection_count: metadata.catalog.total_collections,
991            total_entities: metadata.catalog.total_entities,
992        };
993
994        metadata
995            .exports
996            .retain(|export| export.name != descriptor.name);
997        metadata.exports.push(descriptor.clone());
998        self.prune_export_registry(&mut metadata.exports);
999        metadata.save_for_data_path(path)?;
1000        metadata.save_to_binary_path(&export_metadata_binary_path)?;
1001        metadata.save_to_path(&export_metadata_path)?;
1002
1003        Ok(descriptor)
1004    }
1005
1006    /// Enable or disable a physical index entry in the persisted registry.
1007    pub fn set_index_enabled(
1008        &self,
1009        name: &str,
1010        enabled: bool,
1011    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1012        let Some(status) = self.index_status(name) else {
1013            return Err(format!("index '{name}' is not present in catalog status").into());
1014        };
1015        if !status.declared {
1016            return Err(format!("index '{name}' is not declared in physical metadata").into());
1017        }
1018        self.update_physical_metadata(|metadata| {
1019            let now = SystemTime::now()
1020                .duration_since(UNIX_EPOCH)
1021                .unwrap_or_default()
1022                .as_millis();
1023            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1024                index.enabled = enabled;
1025                if !enabled {
1026                    index.build_state = "disabled".to_string();
1027                } else if index.build_state == "disabled" {
1028                    index.build_state = if index.artifact_root_page.is_some() {
1029                        "ready".to_string()
1030                    } else {
1031                        "declared-unbuilt".to_string()
1032                    };
1033                }
1034                index.last_refresh_ms = Some(now);
1035                return Some(index.clone());
1036            }
1037            None
1038        })
1039    }
1040
1041    /// Mark a declared physical index as building in the persisted registry.
1042    pub fn mark_index_building(
1043        &self,
1044        name: &str,
1045    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1046        let Some(status) = self.index_status(name) else {
1047            return Err(format!("index '{name}' is not present in catalog status").into());
1048        };
1049        if !status.declared {
1050            return Err(format!("index '{name}' is not declared in physical metadata").into());
1051        }
1052        if status.lifecycle_state == "disabled" {
1053            return Err(format!("index '{name}' is disabled").into());
1054        }
1055        self.update_physical_metadata(|metadata| {
1056            let now = SystemTime::now()
1057                .duration_since(UNIX_EPOCH)
1058                .unwrap_or_default()
1059                .as_millis();
1060            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1061                index.build_state = "building".to_string();
1062                index.last_refresh_ms = Some(now);
1063                return Some(index.clone());
1064            }
1065            None
1066        })
1067    }
1068
1069    /// Mark a declared physical index as failed in the persisted registry.
1070    pub fn fail_index(
1071        &self,
1072        name: &str,
1073    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1074        let Some(status) = self.index_status(name) else {
1075            return Err(format!("index '{name}' is not present in catalog status").into());
1076        };
1077        if !status.declared {
1078            return Err(format!("index '{name}' is not declared in physical metadata").into());
1079        }
1080        self.update_physical_metadata(|metadata| {
1081            let now = SystemTime::now()
1082                .duration_since(UNIX_EPOCH)
1083                .unwrap_or_default()
1084                .as_millis();
1085            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1086                index.build_state = "failed".to_string();
1087                index.last_refresh_ms = Some(now);
1088                return Some(index.clone());
1089            }
1090            None
1091        })
1092    }
1093
1094    /// Mark a declared physical index as stale in the persisted registry.
1095    pub fn mark_index_stale(
1096        &self,
1097        name: &str,
1098    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1099        let Some(status) = self.index_status(name) else {
1100            return Err(format!("index '{name}' is not present in catalog status").into());
1101        };
1102        if !status.declared {
1103            return Err(format!("index '{name}' is not declared in physical metadata").into());
1104        }
1105        self.update_physical_metadata(|metadata| {
1106            let now = SystemTime::now()
1107                .duration_since(UNIX_EPOCH)
1108                .unwrap_or_default()
1109                .as_millis();
1110            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1111                index.build_state = if index.enabled {
1112                    "stale".to_string()
1113                } else {
1114                    "disabled".to_string()
1115                };
1116                index.last_refresh_ms = Some(now);
1117                return Some(index.clone());
1118            }
1119            None
1120        })
1121    }
1122
1123    /// Mark a declared physical index as ready in the persisted registry.
1124    pub fn mark_index_ready(
1125        &self,
1126        name: &str,
1127    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1128        self.warmup_index(name)
1129    }
1130
1131    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1132    pub fn warmup_index(
1133        &self,
1134        name: &str,
1135    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1136        let Some(status) = self.index_status(name) else {
1137            return Err(format!("index '{name}' is not present in catalog status").into());
1138        };
1139        if !status.declared {
1140            return Err(format!("index '{name}' is not declared in physical metadata").into());
1141        }
1142        if status.lifecycle_state == "disabled" {
1143            return Err(format!("index '{name}' is disabled").into());
1144        }
1145        if !status.operational {
1146            return Err(
1147                format!("index '{name}' is declared but not operationally materialized").into(),
1148            );
1149        }
1150        let warmed_artifact = self
1151            .physical_indexes()
1152            .into_iter()
1153            .find(|index| index.name == name)
1154            .map(|mut index| {
1155                self.warmup_native_vector_artifact_for_index(&index)?;
1156                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1157                Ok::<_, String>(index)
1158            })
1159            .transpose()
1160            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1161
1162        self.update_physical_metadata(|metadata| {
1163            let now = SystemTime::now()
1164                .duration_since(UNIX_EPOCH)
1165                .unwrap_or_default()
1166                .as_millis();
1167            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1168                if let Some(warmed) = warmed_artifact.as_ref() {
1169                    index.entries = warmed.entries;
1170                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1171                    index.backend = warmed.backend.clone();
1172                    index.build_state = "ready".to_string();
1173                }
1174                index.last_refresh_ms = Some(now);
1175                return Some(index.clone());
1176            }
1177            None
1178        })
1179    }
1180
1181    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1182    pub fn rebuild_index_registry(
1183        &self,
1184        collection: Option<&str>,
1185    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1186        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1187        self.update_physical_metadata(|metadata| {
1188            let now = SystemTime::now()
1189                .duration_since(UNIX_EPOCH)
1190                .unwrap_or_default()
1191                .as_millis();
1192
1193            let mut affected = Vec::new();
1194            let declared = metadata.indexes.clone();
1195            for declared_index in declared {
1196                let matches_collection = collection.is_none_or(|collection_name| {
1197                    declared_index.collection.as_deref() == Some(collection_name)
1198                });
1199                if !matches_collection {
1200                    continue;
1201                }
1202
1203                let mut rebuilt = fresh
1204                    .iter()
1205                    .find(|index| index.name == declared_index.name)
1206                    .cloned()
1207                    .unwrap_or_else(|| {
1208                        let mut index = declared_index.clone();
1209                        index.build_state = "declared-unbuilt".to_string();
1210                        index
1211                    });
1212                rebuilt.enabled = declared_index.enabled;
1213                rebuilt.artifact_kind = rebuilt
1214                    .artifact_kind
1215                    .or_else(|| declared_index.artifact_kind.clone());
1216                rebuilt.artifact_root_page = rebuilt
1217                    .artifact_root_page
1218                    .or(declared_index.artifact_root_page);
1219                rebuilt.artifact_checksum = rebuilt
1220                    .artifact_checksum
1221                    .or(declared_index.artifact_checksum);
1222                rebuilt.build_state =
1223                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1224                rebuilt.last_refresh_ms = Some(now);
1225
1226                if let Some(existing) = metadata
1227                    .indexes
1228                    .iter_mut()
1229                    .find(|index| index.name == rebuilt.name)
1230                {
1231                    *existing = rebuilt.clone();
1232                } else {
1233                    metadata.indexes.push(rebuilt.clone());
1234                }
1235
1236                affected.push(rebuilt);
1237            }
1238
1239            affected
1240        })
1241    }
1242
1243    fn finalize_rebuilt_index_build_state(
1244        declared: &PhysicalIndexState,
1245        rebuilt: &PhysicalIndexState,
1246    ) -> String {
1247        if !rebuilt.enabled {
1248            return "disabled".to_string();
1249        }
1250
1251        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1252            return "failed".to_string();
1253        }
1254
1255        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1256        if native_artifact_family {
1257            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1258                return "ready".to_string();
1259            }
1260            if declared.artifact_root_page.is_some()
1261                || declared.artifact_checksum.is_some()
1262                || declared.artifact_kind.is_some()
1263            {
1264                return "stale".to_string();
1265            }
1266            return "declared-unbuilt".to_string();
1267        }
1268
1269        if rebuilt.entries > 0 {
1270            return "ready".to_string();
1271        }
1272
1273        if matches!(
1274            declared.build_state.as_str(),
1275            "stale" | "artifact-published" | "registry-loaded"
1276        ) {
1277            return "stale".to_string();
1278        }
1279
1280        "declared-unbuilt".to_string()
1281    }
1282}