Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    pub fn save_collection_contract(
141        &self,
142        contract: crate::physical::CollectionContract,
143    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145            if let Some(ttl_ms) = contract.default_ttl_ms {
146                defaults.insert(contract.name.clone(), ttl_ms);
147            } else {
148                defaults.remove(&contract.name);
149            }
150        }
151
152        self.store()
153            .context_index()
154            .set_collection_enabled(&contract.name, contract.context_index_enabled);
155        if let Some(manager) = self.store().get_collection(&contract.name) {
156            let columns = contract
157                .declared_columns
158                .iter()
159                .map(|column| column.name.clone())
160                .collect();
161            manager.set_column_schema_if_empty(columns);
162        }
163
164        self.invalidate_collection_contract_cache();
165
166        self.update_physical_metadata(|metadata| {
167            if let Some(existing) = metadata
168                .collection_contracts
169                .iter_mut()
170                .find(|existing| existing.name == contract.name)
171            {
172                *existing = contract.clone();
173            } else {
174                metadata.collection_contracts.push(contract.clone());
175            }
176            metadata
177                .collection_contracts
178                .sort_by(|left, right| left.name.cmp(&right.name));
179
180            if let Some(ttl_ms) = contract.default_ttl_ms {
181                metadata
182                    .collection_ttl_defaults_ms
183                    .insert(contract.name.clone(), ttl_ms);
184            } else {
185                metadata.collection_ttl_defaults_ms.remove(&contract.name);
186            }
187
188            contract.clone()
189        })
190    }
191
192    pub fn remove_collection_contract(
193        &self,
194        collection: &str,
195    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
196        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
197            defaults.remove(collection);
198        }
199
200        self.store()
201            .context_index()
202            .set_collection_enabled(collection, false);
203
204        self.invalidate_collection_contract_cache();
205
206        self.update_physical_metadata(|metadata| {
207            let removed = metadata
208                .collection_contracts
209                .iter()
210                .position(|contract| contract.name == collection)
211                .map(|index| metadata.collection_contracts.remove(index));
212            metadata.collection_ttl_defaults_ms.remove(collection);
213            metadata
214                .indexes
215                .retain(|index| index.collection.as_deref() != Some(collection));
216            removed
217        })
218    }
219
220    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
221        self.collection_ttl_defaults_ms
222            .read()
223            .map(|defaults| {
224                defaults
225                    .iter()
226                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
227                    .collect()
228            })
229            .unwrap_or_default()
230    }
231
232    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
233        let metadata = self.physical_metadata();
234        let defaults = metadata
235            .as_ref()
236            .map(|m| m.collection_ttl_defaults_ms.clone())
237            .unwrap_or_default();
238
239        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
240            current.clear();
241            current.extend(defaults);
242        }
243
244        if let Some(metadata) = metadata {
245            let store = self.store();
246            let index = store.context_index();
247            for contract in &metadata.collection_contracts {
248                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
249            }
250        }
251    }
252
253    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
254        self.store.run_maintenance()?;
255        self.persist_metadata()?;
256        Ok(())
257    }
258
259    /// Path to the physical metadata sidecar, if persistent.
260    pub fn metadata_path(&self) -> Option<PathBuf> {
261        self.path
262            .as_ref()
263            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
264    }
265
266    /// Load the current physical metadata view, bootstrapping from native state when needed.
267    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
268        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
269            .ok()
270    }
271
272    /// Physical index registry derived for the current database state.
273    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
274        let indexes = self
275            .physical_metadata()
276            .map(|metadata| metadata.indexes)
277            .filter(|indexes| !indexes.is_empty())
278            .or_else(|| {
279                self.native_physical_state()
280                    .map(|state| self.physical_index_state_from_native_state(&state, None))
281            })
282            .unwrap_or_else(|| self.physical_index_state());
283        self.reconcile_index_states_with_native_artifacts(indexes)
284    }
285
286    /// List registered named exports from the current physical metadata view.
287    pub fn exports(&self) -> Vec<ExportDescriptor> {
288        self.physical_metadata()
289            .map(|metadata| metadata.exports)
290            .or_else(|| {
291                self.native_physical_state()
292                    .map(|state| self.exports_from_native_state(&state))
293            })
294            .unwrap_or_default()
295    }
296
297    /// List recorded snapshots from the current physical metadata view.
298    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
299        self.physical_metadata()
300            .map(|metadata| metadata.snapshots)
301            .or_else(|| {
302                self.native_physical_state()
303                    .map(|state| self.snapshots_from_native_state(&state))
304            })
305            .unwrap_or_default()
306    }
307
308    /// List persisted named graph projections from the current physical metadata view.
309    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
310        self.physical_metadata()
311            .map(|metadata| metadata.graph_projections)
312            .or_else(|| {
313                self.native_physical_state()
314                    .map(|state| self.graph_projections_from_native_state(&state))
315            })
316            .unwrap_or_default()
317    }
318
319    /// List graph projections declared in the catalog view.
320    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
321        self.catalog_model_snapshot().declared_graph_projections
322    }
323
324    /// List graph projections currently observed in the operational view.
325    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
326        self.graph_projections()
327            .into_iter()
328            .filter(|projection| {
329                projection.last_materialized_sequence.is_some()
330                    || matches!(projection.state.as_str(), "materialized" | "stale")
331            })
332            .collect()
333    }
334
335    /// List persisted analytics job metadata from the current physical metadata view.
336    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
337        self.physical_metadata()
338            .map(|metadata| metadata.analytics_jobs)
339            .or_else(|| {
340                self.native_physical_state()
341                    .map(|state| self.analytics_jobs_from_native_state(&state))
342            })
343            .unwrap_or_default()
344    }
345
346    /// List analytics jobs declared in the catalog view.
347    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
348        self.catalog_model_snapshot().declared_analytics_jobs
349    }
350
351    /// List analytics jobs currently observed in the operational view.
352    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
353        self.analytics_jobs()
354            .into_iter()
355            .filter(|job| {
356                job.last_run_sequence.is_some()
357                    || matches!(
358                        job.state.as_str(),
359                        "running" | "completed" | "failed" | "queued" | "stale"
360                    )
361            })
362            .collect()
363    }
364
365    /// List indexes declared in the catalog view.
366    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
367        self.catalog_model_snapshot().declared_indexes
368    }
369
370    /// List indexes currently observed in the operational view.
371    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
372        self.catalog_model_snapshot().operational_indexes
373    }
374
375    /// List reconciled index status entries from the catalog snapshot.
376    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
377        self.catalog_model_snapshot().index_statuses
378    }
379
380    /// Resolve one index status entry from the catalog snapshot.
381    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
382        self.catalog_model_snapshot()
383            .index_statuses
384            .into_iter()
385            .find(|status| status.name == name)
386    }
387
388    /// Upsert a named graph projection in the persisted physical metadata.
389    pub fn save_graph_projection(
390        &self,
391        name: impl Into<String>,
392        node_labels: Vec<String>,
393        node_types: Vec<String>,
394        edge_labels: Vec<String>,
395        source: impl Into<String>,
396    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
397        let name = name.into();
398        let source = source.into();
399        self.update_physical_metadata(|metadata| {
400            let now = SystemTime::now()
401                .duration_since(UNIX_EPOCH)
402                .unwrap_or_default()
403                .as_millis();
404            let projection = if let Some(existing) = metadata
405                .graph_projections
406                .iter_mut()
407                .find(|projection| projection.name == name)
408            {
409                existing.updated_at_unix_ms = now;
410                existing.state = "declared".to_string();
411                existing.source = source.clone();
412                existing.node_labels = node_labels.clone();
413                existing.node_types = node_types.clone();
414                existing.edge_labels = edge_labels.clone();
415                existing.last_materialized_sequence = None;
416                existing.clone()
417            } else {
418                let projection = PhysicalGraphProjection {
419                    name: name.clone(),
420                    created_at_unix_ms: now,
421                    updated_at_unix_ms: now,
422                    state: "declared".to_string(),
423                    source: source.clone(),
424                    node_labels: node_labels.clone(),
425                    node_types: node_types.clone(),
426                    edge_labels: edge_labels.clone(),
427                    last_materialized_sequence: None,
428                };
429                metadata.graph_projections.push(projection.clone());
430                projection
431            };
432
433            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
434
435            metadata
436                .graph_projections
437                .sort_by(|left, right| left.name.cmp(&right.name));
438            projection
439        })
440    }
441
442    /// Mark a declared graph projection as materialized in the current physical metadata.
443    pub fn materialize_graph_projection(
444        &self,
445        name: &str,
446    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
447        self.update_physical_metadata(|metadata| {
448            let now = SystemTime::now()
449                .duration_since(UNIX_EPOCH)
450                .unwrap_or_default()
451                .as_millis();
452            let idx = metadata
453                .graph_projections
454                .iter()
455                .position(|projection| projection.name == name);
456            if let Some(idx) = idx {
457                metadata.graph_projections[idx].updated_at_unix_ms = now;
458                metadata.graph_projections[idx].state = "materialized".to_string();
459                metadata.graph_projections[idx].last_materialized_sequence =
460                    Some(metadata.superblock.sequence);
461                let result = metadata.graph_projections[idx].clone();
462                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
463                return Some(result);
464            }
465            None
466        })
467    }
468
469    /// Mark a declared graph projection as materializing.
470    pub fn mark_graph_projection_materializing(
471        &self,
472        name: &str,
473    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
474        self.update_physical_metadata(|metadata| {
475            let now = SystemTime::now()
476                .duration_since(UNIX_EPOCH)
477                .unwrap_or_default()
478                .as_millis();
479            let idx = metadata
480                .graph_projections
481                .iter()
482                .position(|projection| projection.name == name);
483            if let Some(idx) = idx {
484                metadata.graph_projections[idx].updated_at_unix_ms = now;
485                metadata.graph_projections[idx].state = "materializing".to_string();
486                metadata.graph_projections[idx].last_materialized_sequence = None;
487                let result = metadata.graph_projections[idx].clone();
488                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
489                return Some(result);
490            }
491            None
492        })
493    }
494
495    /// Mark a graph projection as failed.
496    pub fn fail_graph_projection(
497        &self,
498        name: &str,
499    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
500        self.update_physical_metadata(|metadata| {
501            let now = SystemTime::now()
502                .duration_since(UNIX_EPOCH)
503                .unwrap_or_default()
504                .as_millis();
505            let idx = metadata
506                .graph_projections
507                .iter()
508                .position(|projection| projection.name == name);
509            if let Some(idx) = idx {
510                metadata.graph_projections[idx].updated_at_unix_ms = now;
511                metadata.graph_projections[idx].state = "failed".to_string();
512                metadata.graph_projections[idx].last_materialized_sequence = None;
513                let result = metadata.graph_projections[idx].clone();
514                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
515                return Some(result);
516            }
517            None
518        })
519    }
520
521    /// Mark a graph projection as stale while preserving any last materialized sequence.
522    pub fn mark_graph_projection_stale(
523        &self,
524        name: &str,
525    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
526        self.update_physical_metadata(|metadata| {
527            let now = SystemTime::now()
528                .duration_since(UNIX_EPOCH)
529                .unwrap_or_default()
530                .as_millis();
531            let idx = metadata
532                .graph_projections
533                .iter()
534                .position(|projection| projection.name == name);
535            if let Some(idx) = idx {
536                metadata.graph_projections[idx].updated_at_unix_ms = now;
537                metadata.graph_projections[idx].state = "stale".to_string();
538                let result = metadata.graph_projections[idx].clone();
539                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
540                return Some(result);
541            }
542            None
543        })
544    }
545
546    fn mark_projection_dependent_jobs_stale(
547        metadata: &mut PhysicalMetadataFile,
548        projection_name: &str,
549        now: u128,
550    ) {
551        for job in metadata.analytics_jobs.iter_mut() {
552            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
553                job.state = "stale".to_string();
554                job.updated_at_unix_ms = now;
555            }
556        }
557    }
558
559    fn rearm_projection_dependent_jobs_declared(
560        metadata: &mut PhysicalMetadataFile,
561        projection_name: &str,
562        now: u128,
563    ) {
564        for job in metadata.analytics_jobs.iter_mut() {
565            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
566                job.state = "declared".to_string();
567                job.last_run_sequence = None;
568                job.updated_at_unix_ms = now;
569            }
570        }
571    }
572
573    /// Declare or update analytics job metadata without marking it as executed.
574    pub fn save_analytics_job(
575        &self,
576        kind: impl Into<String>,
577        projection: Option<String>,
578        metadata_entries: BTreeMap<String, String>,
579    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
580        let kind = kind.into();
581        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
582        if let Some(projection_name) = projection.as_deref() {
583            if !self.graph_projection_is_declared(projection_name) {
584                return Err(format!(
585                    "graph projection '{projection_name}' is not declared in physical metadata"
586                )
587                .into());
588            }
589        }
590
591        self.update_physical_metadata(|metadata| {
592            let now = SystemTime::now()
593                .duration_since(UNIX_EPOCH)
594                .unwrap_or_default()
595                .as_millis();
596
597            let job = if let Some(existing) = metadata
598                .analytics_jobs
599                .iter_mut()
600                .find(|job| job.id == job_id)
601            {
602                existing.kind = kind.clone();
603                existing.projection = projection.clone();
604                existing.updated_at_unix_ms = now;
605                existing.metadata = metadata_entries.clone();
606                if existing.last_run_sequence.is_none() {
607                    existing.state = "declared".to_string();
608                }
609                existing.clone()
610            } else {
611                let job = PhysicalAnalyticsJob {
612                    id: job_id.clone(),
613                    kind: kind.clone(),
614                    state: "declared".to_string(),
615                    projection: projection.clone(),
616                    created_at_unix_ms: now,
617                    updated_at_unix_ms: now,
618                    last_run_sequence: None,
619                    metadata: metadata_entries.clone(),
620                };
621                metadata.analytics_jobs.push(job.clone());
622                job
623            };
624
625            metadata
626                .analytics_jobs
627                .sort_by(|left, right| left.id.cmp(&right.id));
628            job
629        })
630    }
631
632    /// Record or update analytics job metadata in the persisted physical metadata.
633    pub fn record_analytics_job(
634        &self,
635        kind: impl Into<String>,
636        projection: Option<String>,
637        metadata_entries: BTreeMap<String, String>,
638    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
639        let kind = kind.into();
640        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
641        if let Some(projection_name) = projection.as_deref() {
642            if !self.graph_projection_is_declared(projection_name) {
643                return Err(format!(
644                    "graph projection '{projection_name}' is not declared in physical metadata"
645                )
646                .into());
647            }
648            if !self.graph_projection_is_operational(projection_name) {
649                return Err(format!(
650                    "graph projection '{projection_name}' is declared but not operationally materialized"
651                )
652                .into());
653            }
654        }
655
656        self.update_physical_metadata(|metadata| {
657            let now = SystemTime::now()
658                .duration_since(UNIX_EPOCH)
659                .unwrap_or_default()
660                .as_millis();
661
662            let existing = metadata
663                .analytics_jobs
664                .iter_mut()
665                .find(|job| job.id == job_id)?;
666            existing.state = "completed".to_string();
667            existing.kind = kind.clone();
668            existing.projection = projection.clone();
669            existing.updated_at_unix_ms = now;
670            existing.last_run_sequence = Some(metadata.superblock.sequence);
671            existing.metadata = metadata_entries.clone();
672            let job = existing.clone();
673
674            metadata
675                .analytics_jobs
676                .sort_by(|left, right| left.id.cmp(&right.id));
677            Some(job)
678        })
679        .and_then(|job| {
680            job.ok_or_else(|| {
681                format!("analytics job '{job_id}' is not declared in physical metadata").into()
682            })
683        })
684    }
685
686    /// Mark a declared analytics job as running.
687    pub fn queue_analytics_job(
688        &self,
689        kind: impl Into<String>,
690        projection: Option<String>,
691        metadata_entries: BTreeMap<String, String>,
692    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
693        let kind = kind.into();
694        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
695        if let Some(projection_name) = projection.as_deref() {
696            if !self.graph_projection_is_declared(projection_name) {
697                return Err(format!(
698                    "graph projection '{projection_name}' is not declared in physical metadata"
699                )
700                .into());
701            }
702            if !self.graph_projection_is_operational(projection_name) {
703                return Err(format!(
704                    "graph projection '{projection_name}' is declared but not operationally materialized"
705                )
706                .into());
707            }
708        }
709
710        self.update_physical_metadata(|metadata| {
711            let now = SystemTime::now()
712                .duration_since(UNIX_EPOCH)
713                .unwrap_or_default()
714                .as_millis();
715
716            let existing = metadata
717                .analytics_jobs
718                .iter_mut()
719                .find(|job| job.id == job_id)?;
720            existing.state = "queued".to_string();
721            existing.kind = kind.clone();
722            existing.projection = projection.clone();
723            existing.updated_at_unix_ms = now;
724            existing.metadata = metadata_entries.clone();
725            let job = existing.clone();
726
727            metadata
728                .analytics_jobs
729                .sort_by(|left, right| left.id.cmp(&right.id));
730            Some(job)
731        })
732        .and_then(|job| {
733            job.ok_or_else(|| {
734                format!("analytics job '{job_id}' is not declared in physical metadata").into()
735            })
736        })
737    }
738
739    /// Mark a declared analytics job as running.
740    pub fn start_analytics_job(
741        &self,
742        kind: impl Into<String>,
743        projection: Option<String>,
744        metadata_entries: BTreeMap<String, String>,
745    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
746        let kind = kind.into();
747        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
748        if let Some(projection_name) = projection.as_deref() {
749            if !self.graph_projection_is_declared(projection_name) {
750                return Err(format!(
751                    "graph projection '{projection_name}' is not declared in physical metadata"
752                )
753                .into());
754            }
755            if !self.graph_projection_is_operational(projection_name) {
756                return Err(format!(
757                    "graph projection '{projection_name}' is declared but not operationally materialized"
758                )
759                .into());
760            }
761        }
762
763        self.update_physical_metadata(|metadata| {
764            let now = SystemTime::now()
765                .duration_since(UNIX_EPOCH)
766                .unwrap_or_default()
767                .as_millis();
768
769            let existing = metadata
770                .analytics_jobs
771                .iter_mut()
772                .find(|job| job.id == job_id)?;
773            existing.state = "running".to_string();
774            existing.kind = kind.clone();
775            existing.projection = projection.clone();
776            existing.updated_at_unix_ms = now;
777            existing.metadata = metadata_entries.clone();
778            let job = existing.clone();
779
780            metadata
781                .analytics_jobs
782                .sort_by(|left, right| left.id.cmp(&right.id));
783            Some(job)
784        })
785        .and_then(|job| {
786            job.ok_or_else(|| {
787                format!("analytics job '{job_id}' is not declared in physical metadata").into()
788            })
789        })
790    }
791
792    /// Mark a declared analytics job as failed.
793    pub fn fail_analytics_job(
794        &self,
795        kind: impl Into<String>,
796        projection: Option<String>,
797        metadata_entries: BTreeMap<String, String>,
798    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
799        let kind = kind.into();
800        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
801
802        self.update_physical_metadata(|metadata| {
803            let now = SystemTime::now()
804                .duration_since(UNIX_EPOCH)
805                .unwrap_or_default()
806                .as_millis();
807
808            let existing = metadata
809                .analytics_jobs
810                .iter_mut()
811                .find(|job| job.id == job_id)?;
812            existing.state = "failed".to_string();
813            existing.kind = kind.clone();
814            existing.projection = projection.clone();
815            existing.updated_at_unix_ms = now;
816            existing.metadata = metadata_entries.clone();
817            let job = existing.clone();
818
819            metadata
820                .analytics_jobs
821                .sort_by(|left, right| left.id.cmp(&right.id));
822            Some(job)
823        })
824        .and_then(|job| {
825            job.ok_or_else(|| {
826                format!("analytics job '{job_id}' is not declared in physical metadata").into()
827            })
828        })
829    }
830
831    /// Mark a declared analytics job as stale.
832    pub fn mark_analytics_job_stale(
833        &self,
834        kind: impl Into<String>,
835        projection: Option<String>,
836        metadata_entries: BTreeMap<String, String>,
837    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
838        let kind = kind.into();
839        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
840
841        self.update_physical_metadata(|metadata| {
842            let now = SystemTime::now()
843                .duration_since(UNIX_EPOCH)
844                .unwrap_or_default()
845                .as_millis();
846
847            let existing = metadata
848                .analytics_jobs
849                .iter_mut()
850                .find(|job| job.id == job_id)?;
851            existing.state = "stale".to_string();
852            existing.kind = kind.clone();
853            existing.projection = projection.clone();
854            existing.updated_at_unix_ms = now;
855            existing.metadata = metadata_entries.clone();
856            let job = existing.clone();
857
858            metadata
859                .analytics_jobs
860                .sort_by(|left, right| left.id.cmp(&right.id));
861            Some(job)
862        })
863        .and_then(|job| {
864            job.ok_or_else(|| {
865                format!("analytics job '{job_id}' is not declared in physical metadata").into()
866            })
867        })
868    }
869
870    /// Create a named export by copying the current database file and metadata sidecar.
871    pub fn create_named_export(
872        &self,
873        name: impl Into<String>,
874    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
875        let name = name.into();
876        if self.options.mode != StorageMode::Persistent {
877            return Err("exports require persistent mode".into());
878        }
879        let Some(path) = self.path() else {
880            return Err("database path is not available".into());
881        };
882
883        self.flush()?;
884
885        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
886        let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
887        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
888        let export_metadata_binary_path =
889            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
890
891        fs::copy(path, &export_data_path)?;
892
893        let descriptor = ExportDescriptor {
894            name: name.clone(),
895            created_at_unix_ms: SystemTime::now()
896                .duration_since(UNIX_EPOCH)
897                .unwrap_or_default()
898                .as_millis(),
899            snapshot_id: metadata
900                .snapshots
901                .last()
902                .map(|snapshot| snapshot.snapshot_id),
903            superblock_sequence: metadata.superblock.sequence,
904            data_path: export_data_path.display().to_string(),
905            metadata_path: export_metadata_path.display().to_string(),
906            collection_count: metadata.catalog.total_collections,
907            total_entities: metadata.catalog.total_entities,
908        };
909
910        metadata
911            .exports
912            .retain(|export| export.name != descriptor.name);
913        metadata.exports.push(descriptor.clone());
914        self.prune_export_registry(&mut metadata.exports);
915        metadata.save_for_data_path(path)?;
916        metadata.save_to_binary_path(&export_metadata_binary_path)?;
917        metadata.save_to_path(&export_metadata_path)?;
918
919        Ok(descriptor)
920    }
921
922    /// Enable or disable a physical index entry in the persisted registry.
923    pub fn set_index_enabled(
924        &self,
925        name: &str,
926        enabled: bool,
927    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
928        let Some(status) = self.index_status(name) else {
929            return Err(format!("index '{name}' is not present in catalog status").into());
930        };
931        if !status.declared {
932            return Err(format!("index '{name}' is not declared in physical metadata").into());
933        }
934        self.update_physical_metadata(|metadata| {
935            let now = SystemTime::now()
936                .duration_since(UNIX_EPOCH)
937                .unwrap_or_default()
938                .as_millis();
939            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
940                index.enabled = enabled;
941                if !enabled {
942                    index.build_state = "disabled".to_string();
943                } else if index.build_state == "disabled" {
944                    index.build_state = if index.artifact_root_page.is_some() {
945                        "ready".to_string()
946                    } else {
947                        "declared-unbuilt".to_string()
948                    };
949                }
950                index.last_refresh_ms = Some(now);
951                return Some(index.clone());
952            }
953            None
954        })
955    }
956
957    /// Mark a declared physical index as building in the persisted registry.
958    pub fn mark_index_building(
959        &self,
960        name: &str,
961    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
962        let Some(status) = self.index_status(name) else {
963            return Err(format!("index '{name}' is not present in catalog status").into());
964        };
965        if !status.declared {
966            return Err(format!("index '{name}' is not declared in physical metadata").into());
967        }
968        if status.lifecycle_state == "disabled" {
969            return Err(format!("index '{name}' is disabled").into());
970        }
971        self.update_physical_metadata(|metadata| {
972            let now = SystemTime::now()
973                .duration_since(UNIX_EPOCH)
974                .unwrap_or_default()
975                .as_millis();
976            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
977                index.build_state = "building".to_string();
978                index.last_refresh_ms = Some(now);
979                return Some(index.clone());
980            }
981            None
982        })
983    }
984
985    /// Mark a declared physical index as failed in the persisted registry.
986    pub fn fail_index(
987        &self,
988        name: &str,
989    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
990        let Some(status) = self.index_status(name) else {
991            return Err(format!("index '{name}' is not present in catalog status").into());
992        };
993        if !status.declared {
994            return Err(format!("index '{name}' is not declared in physical metadata").into());
995        }
996        self.update_physical_metadata(|metadata| {
997            let now = SystemTime::now()
998                .duration_since(UNIX_EPOCH)
999                .unwrap_or_default()
1000                .as_millis();
1001            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1002                index.build_state = "failed".to_string();
1003                index.last_refresh_ms = Some(now);
1004                return Some(index.clone());
1005            }
1006            None
1007        })
1008    }
1009
1010    /// Mark a declared physical index as stale in the persisted registry.
1011    pub fn mark_index_stale(
1012        &self,
1013        name: &str,
1014    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1015        let Some(status) = self.index_status(name) else {
1016            return Err(format!("index '{name}' is not present in catalog status").into());
1017        };
1018        if !status.declared {
1019            return Err(format!("index '{name}' is not declared in physical metadata").into());
1020        }
1021        self.update_physical_metadata(|metadata| {
1022            let now = SystemTime::now()
1023                .duration_since(UNIX_EPOCH)
1024                .unwrap_or_default()
1025                .as_millis();
1026            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1027                index.build_state = if index.enabled {
1028                    "stale".to_string()
1029                } else {
1030                    "disabled".to_string()
1031                };
1032                index.last_refresh_ms = Some(now);
1033                return Some(index.clone());
1034            }
1035            None
1036        })
1037    }
1038
1039    /// Mark a declared physical index as ready in the persisted registry.
1040    pub fn mark_index_ready(
1041        &self,
1042        name: &str,
1043    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1044        self.warmup_index(name)
1045    }
1046
1047    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1048    pub fn warmup_index(
1049        &self,
1050        name: &str,
1051    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1052        let Some(status) = self.index_status(name) else {
1053            return Err(format!("index '{name}' is not present in catalog status").into());
1054        };
1055        if !status.declared {
1056            return Err(format!("index '{name}' is not declared in physical metadata").into());
1057        }
1058        if status.lifecycle_state == "disabled" {
1059            return Err(format!("index '{name}' is disabled").into());
1060        }
1061        if !status.operational {
1062            return Err(
1063                format!("index '{name}' is declared but not operationally materialized").into(),
1064            );
1065        }
1066        let warmed_artifact = self
1067            .physical_indexes()
1068            .into_iter()
1069            .find(|index| index.name == name)
1070            .map(|mut index| {
1071                self.warmup_native_vector_artifact_for_index(&index)?;
1072                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1073                Ok::<_, String>(index)
1074            })
1075            .transpose()
1076            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1077
1078        self.update_physical_metadata(|metadata| {
1079            let now = SystemTime::now()
1080                .duration_since(UNIX_EPOCH)
1081                .unwrap_or_default()
1082                .as_millis();
1083            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1084                if let Some(warmed) = warmed_artifact.as_ref() {
1085                    index.entries = warmed.entries;
1086                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1087                    index.backend = warmed.backend.clone();
1088                    index.build_state = "ready".to_string();
1089                }
1090                index.last_refresh_ms = Some(now);
1091                return Some(index.clone());
1092            }
1093            None
1094        })
1095    }
1096
1097    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1098    pub fn rebuild_index_registry(
1099        &self,
1100        collection: Option<&str>,
1101    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1102        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1103        self.update_physical_metadata(|metadata| {
1104            let now = SystemTime::now()
1105                .duration_since(UNIX_EPOCH)
1106                .unwrap_or_default()
1107                .as_millis();
1108
1109            let mut affected = Vec::new();
1110            let declared = metadata.indexes.clone();
1111            for declared_index in declared {
1112                let matches_collection = collection.is_none_or(|collection_name| {
1113                    declared_index.collection.as_deref() == Some(collection_name)
1114                });
1115                if !matches_collection {
1116                    continue;
1117                }
1118
1119                let mut rebuilt = fresh
1120                    .iter()
1121                    .find(|index| index.name == declared_index.name)
1122                    .cloned()
1123                    .unwrap_or_else(|| {
1124                        let mut index = declared_index.clone();
1125                        index.build_state = "declared-unbuilt".to_string();
1126                        index
1127                    });
1128                rebuilt.enabled = declared_index.enabled;
1129                rebuilt.artifact_kind = rebuilt
1130                    .artifact_kind
1131                    .or_else(|| declared_index.artifact_kind.clone());
1132                rebuilt.artifact_root_page = rebuilt
1133                    .artifact_root_page
1134                    .or(declared_index.artifact_root_page);
1135                rebuilt.artifact_checksum = rebuilt
1136                    .artifact_checksum
1137                    .or(declared_index.artifact_checksum);
1138                rebuilt.build_state =
1139                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1140                rebuilt.last_refresh_ms = Some(now);
1141
1142                if let Some(existing) = metadata
1143                    .indexes
1144                    .iter_mut()
1145                    .find(|index| index.name == rebuilt.name)
1146                {
1147                    *existing = rebuilt.clone();
1148                } else {
1149                    metadata.indexes.push(rebuilt.clone());
1150                }
1151
1152                affected.push(rebuilt);
1153            }
1154
1155            affected
1156        })
1157    }
1158
1159    fn finalize_rebuilt_index_build_state(
1160        declared: &PhysicalIndexState,
1161        rebuilt: &PhysicalIndexState,
1162    ) -> String {
1163        if !rebuilt.enabled {
1164            return "disabled".to_string();
1165        }
1166
1167        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1168            return "failed".to_string();
1169        }
1170
1171        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1172        if native_artifact_family {
1173            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1174                return "ready".to_string();
1175            }
1176            if declared.artifact_root_page.is_some()
1177                || declared.artifact_checksum.is_some()
1178                || declared.artifact_kind.is_some()
1179            {
1180                return "stale".to_string();
1181            }
1182            return "declared-unbuilt".to_string();
1183        }
1184
1185        if rebuilt.entries > 0 {
1186            return "ready".to_string();
1187        }
1188
1189        if matches!(
1190            declared.build_state.as_str(),
1191            "stale" | "artifact-published" | "registry-loaded"
1192        ) {
1193            return "stale".to_string();
1194        }
1195
1196        "declared-unbuilt".to_string()
1197    }
1198}