Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    pub fn save_collection_contract(
141        &self,
142        contract: crate::physical::CollectionContract,
143    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145            if let Some(ttl_ms) = contract.default_ttl_ms {
146                defaults.insert(contract.name.clone(), ttl_ms);
147            } else {
148                defaults.remove(&contract.name);
149            }
150        }
151
152        self.store()
153            .context_index()
154            .set_collection_enabled(&contract.name, contract.context_index_enabled);
155
156        self.invalidate_collection_contract_cache();
157
158        self.update_physical_metadata(|metadata| {
159            if let Some(existing) = metadata
160                .collection_contracts
161                .iter_mut()
162                .find(|existing| existing.name == contract.name)
163            {
164                *existing = contract.clone();
165            } else {
166                metadata.collection_contracts.push(contract.clone());
167            }
168            metadata
169                .collection_contracts
170                .sort_by(|left, right| left.name.cmp(&right.name));
171
172            if let Some(ttl_ms) = contract.default_ttl_ms {
173                metadata
174                    .collection_ttl_defaults_ms
175                    .insert(contract.name.clone(), ttl_ms);
176            } else {
177                metadata.collection_ttl_defaults_ms.remove(&contract.name);
178            }
179
180            contract.clone()
181        })
182    }
183
184    pub fn remove_collection_contract(
185        &self,
186        collection: &str,
187    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
188        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
189            defaults.remove(collection);
190        }
191
192        self.store()
193            .context_index()
194            .set_collection_enabled(collection, false);
195
196        self.invalidate_collection_contract_cache();
197
198        self.update_physical_metadata(|metadata| {
199            let removed = metadata
200                .collection_contracts
201                .iter()
202                .position(|contract| contract.name == collection)
203                .map(|index| metadata.collection_contracts.remove(index));
204            metadata.collection_ttl_defaults_ms.remove(collection);
205            metadata
206                .indexes
207                .retain(|index| index.collection.as_deref() != Some(collection));
208            removed
209        })
210    }
211
212    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
213        self.collection_ttl_defaults_ms
214            .read()
215            .map(|defaults| {
216                defaults
217                    .iter()
218                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
219                    .collect()
220            })
221            .unwrap_or_default()
222    }
223
224    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
225        let metadata = self.physical_metadata();
226        let defaults = metadata
227            .as_ref()
228            .map(|m| m.collection_ttl_defaults_ms.clone())
229            .unwrap_or_default();
230
231        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
232            current.clear();
233            current.extend(defaults);
234        }
235
236        if let Some(metadata) = metadata {
237            let store = self.store();
238            let index = store.context_index();
239            for contract in &metadata.collection_contracts {
240                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
241            }
242        }
243    }
244
245    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
246        self.store.run_maintenance()?;
247        self.persist_metadata()?;
248        Ok(())
249    }
250
251    /// Path to the physical metadata sidecar, if persistent.
252    pub fn metadata_path(&self) -> Option<PathBuf> {
253        self.path
254            .as_ref()
255            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
256    }
257
258    /// Load the current physical metadata view, bootstrapping from native state when needed.
259    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
260        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
261            .ok()
262    }
263
264    /// Physical index registry derived for the current database state.
265    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
266        let indexes = self
267            .physical_metadata()
268            .map(|metadata| metadata.indexes)
269            .filter(|indexes| !indexes.is_empty())
270            .or_else(|| {
271                self.native_physical_state()
272                    .map(|state| self.physical_index_state_from_native_state(&state, None))
273            })
274            .unwrap_or_else(|| self.physical_index_state());
275        self.reconcile_index_states_with_native_artifacts(indexes)
276    }
277
278    /// List registered named exports from the current physical metadata view.
279    pub fn exports(&self) -> Vec<ExportDescriptor> {
280        self.physical_metadata()
281            .map(|metadata| metadata.exports)
282            .or_else(|| {
283                self.native_physical_state()
284                    .map(|state| self.exports_from_native_state(&state))
285            })
286            .unwrap_or_default()
287    }
288
289    /// List recorded snapshots from the current physical metadata view.
290    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
291        self.physical_metadata()
292            .map(|metadata| metadata.snapshots)
293            .or_else(|| {
294                self.native_physical_state()
295                    .map(|state| self.snapshots_from_native_state(&state))
296            })
297            .unwrap_or_default()
298    }
299
300    /// List persisted named graph projections from the current physical metadata view.
301    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
302        self.physical_metadata()
303            .map(|metadata| metadata.graph_projections)
304            .or_else(|| {
305                self.native_physical_state()
306                    .map(|state| self.graph_projections_from_native_state(&state))
307            })
308            .unwrap_or_default()
309    }
310
311    /// List graph projections declared in the catalog view.
312    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
313        self.catalog_model_snapshot().declared_graph_projections
314    }
315
316    /// List graph projections currently observed in the operational view.
317    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
318        self.graph_projections()
319            .into_iter()
320            .filter(|projection| {
321                projection.last_materialized_sequence.is_some()
322                    || matches!(projection.state.as_str(), "materialized" | "stale")
323            })
324            .collect()
325    }
326
327    /// List persisted analytics job metadata from the current physical metadata view.
328    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
329        self.physical_metadata()
330            .map(|metadata| metadata.analytics_jobs)
331            .or_else(|| {
332                self.native_physical_state()
333                    .map(|state| self.analytics_jobs_from_native_state(&state))
334            })
335            .unwrap_or_default()
336    }
337
338    /// List analytics jobs declared in the catalog view.
339    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
340        self.catalog_model_snapshot().declared_analytics_jobs
341    }
342
343    /// List analytics jobs currently observed in the operational view.
344    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
345        self.analytics_jobs()
346            .into_iter()
347            .filter(|job| {
348                job.last_run_sequence.is_some()
349                    || matches!(
350                        job.state.as_str(),
351                        "running" | "completed" | "failed" | "queued" | "stale"
352                    )
353            })
354            .collect()
355    }
356
357    /// List indexes declared in the catalog view.
358    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
359        self.catalog_model_snapshot().declared_indexes
360    }
361
362    /// List indexes currently observed in the operational view.
363    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
364        self.catalog_model_snapshot().operational_indexes
365    }
366
367    /// List reconciled index status entries from the catalog snapshot.
368    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
369        self.catalog_model_snapshot().index_statuses
370    }
371
372    /// Resolve one index status entry from the catalog snapshot.
373    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
374        self.catalog_model_snapshot()
375            .index_statuses
376            .into_iter()
377            .find(|status| status.name == name)
378    }
379
380    /// Upsert a named graph projection in the persisted physical metadata.
381    pub fn save_graph_projection(
382        &self,
383        name: impl Into<String>,
384        node_labels: Vec<String>,
385        node_types: Vec<String>,
386        edge_labels: Vec<String>,
387        source: impl Into<String>,
388    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
389        let name = name.into();
390        let source = source.into();
391        self.update_physical_metadata(|metadata| {
392            let now = SystemTime::now()
393                .duration_since(UNIX_EPOCH)
394                .unwrap_or_default()
395                .as_millis();
396            let projection = if let Some(existing) = metadata
397                .graph_projections
398                .iter_mut()
399                .find(|projection| projection.name == name)
400            {
401                existing.updated_at_unix_ms = now;
402                existing.state = "declared".to_string();
403                existing.source = source.clone();
404                existing.node_labels = node_labels.clone();
405                existing.node_types = node_types.clone();
406                existing.edge_labels = edge_labels.clone();
407                existing.last_materialized_sequence = None;
408                existing.clone()
409            } else {
410                let projection = PhysicalGraphProjection {
411                    name: name.clone(),
412                    created_at_unix_ms: now,
413                    updated_at_unix_ms: now,
414                    state: "declared".to_string(),
415                    source: source.clone(),
416                    node_labels: node_labels.clone(),
417                    node_types: node_types.clone(),
418                    edge_labels: edge_labels.clone(),
419                    last_materialized_sequence: None,
420                };
421                metadata.graph_projections.push(projection.clone());
422                projection
423            };
424
425            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
426
427            metadata
428                .graph_projections
429                .sort_by(|left, right| left.name.cmp(&right.name));
430            projection
431        })
432    }
433
434    /// Mark a declared graph projection as materialized in the current physical metadata.
435    pub fn materialize_graph_projection(
436        &self,
437        name: &str,
438    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
439        self.update_physical_metadata(|metadata| {
440            let now = SystemTime::now()
441                .duration_since(UNIX_EPOCH)
442                .unwrap_or_default()
443                .as_millis();
444            let idx = metadata
445                .graph_projections
446                .iter()
447                .position(|projection| projection.name == name);
448            if let Some(idx) = idx {
449                metadata.graph_projections[idx].updated_at_unix_ms = now;
450                metadata.graph_projections[idx].state = "materialized".to_string();
451                metadata.graph_projections[idx].last_materialized_sequence =
452                    Some(metadata.superblock.sequence);
453                let result = metadata.graph_projections[idx].clone();
454                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
455                return Some(result);
456            }
457            None
458        })
459    }
460
461    /// Mark a declared graph projection as materializing.
462    pub fn mark_graph_projection_materializing(
463        &self,
464        name: &str,
465    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
466        self.update_physical_metadata(|metadata| {
467            let now = SystemTime::now()
468                .duration_since(UNIX_EPOCH)
469                .unwrap_or_default()
470                .as_millis();
471            let idx = metadata
472                .graph_projections
473                .iter()
474                .position(|projection| projection.name == name);
475            if let Some(idx) = idx {
476                metadata.graph_projections[idx].updated_at_unix_ms = now;
477                metadata.graph_projections[idx].state = "materializing".to_string();
478                metadata.graph_projections[idx].last_materialized_sequence = None;
479                let result = metadata.graph_projections[idx].clone();
480                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
481                return Some(result);
482            }
483            None
484        })
485    }
486
487    /// Mark a graph projection as failed.
488    pub fn fail_graph_projection(
489        &self,
490        name: &str,
491    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
492        self.update_physical_metadata(|metadata| {
493            let now = SystemTime::now()
494                .duration_since(UNIX_EPOCH)
495                .unwrap_or_default()
496                .as_millis();
497            let idx = metadata
498                .graph_projections
499                .iter()
500                .position(|projection| projection.name == name);
501            if let Some(idx) = idx {
502                metadata.graph_projections[idx].updated_at_unix_ms = now;
503                metadata.graph_projections[idx].state = "failed".to_string();
504                metadata.graph_projections[idx].last_materialized_sequence = None;
505                let result = metadata.graph_projections[idx].clone();
506                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
507                return Some(result);
508            }
509            None
510        })
511    }
512
513    /// Mark a graph projection as stale while preserving any last materialized sequence.
514    pub fn mark_graph_projection_stale(
515        &self,
516        name: &str,
517    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
518        self.update_physical_metadata(|metadata| {
519            let now = SystemTime::now()
520                .duration_since(UNIX_EPOCH)
521                .unwrap_or_default()
522                .as_millis();
523            let idx = metadata
524                .graph_projections
525                .iter()
526                .position(|projection| projection.name == name);
527            if let Some(idx) = idx {
528                metadata.graph_projections[idx].updated_at_unix_ms = now;
529                metadata.graph_projections[idx].state = "stale".to_string();
530                let result = metadata.graph_projections[idx].clone();
531                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
532                return Some(result);
533            }
534            None
535        })
536    }
537
538    fn mark_projection_dependent_jobs_stale(
539        metadata: &mut PhysicalMetadataFile,
540        projection_name: &str,
541        now: u128,
542    ) {
543        for job in metadata.analytics_jobs.iter_mut() {
544            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
545                job.state = "stale".to_string();
546                job.updated_at_unix_ms = now;
547            }
548        }
549    }
550
551    fn rearm_projection_dependent_jobs_declared(
552        metadata: &mut PhysicalMetadataFile,
553        projection_name: &str,
554        now: u128,
555    ) {
556        for job in metadata.analytics_jobs.iter_mut() {
557            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
558                job.state = "declared".to_string();
559                job.last_run_sequence = None;
560                job.updated_at_unix_ms = now;
561            }
562        }
563    }
564
565    /// Declare or update analytics job metadata without marking it as executed.
566    pub fn save_analytics_job(
567        &self,
568        kind: impl Into<String>,
569        projection: Option<String>,
570        metadata_entries: BTreeMap<String, String>,
571    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
572        let kind = kind.into();
573        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
574        if let Some(projection_name) = projection.as_deref() {
575            if !self.graph_projection_is_declared(projection_name) {
576                return Err(format!(
577                    "graph projection '{projection_name}' is not declared in physical metadata"
578                )
579                .into());
580            }
581        }
582
583        self.update_physical_metadata(|metadata| {
584            let now = SystemTime::now()
585                .duration_since(UNIX_EPOCH)
586                .unwrap_or_default()
587                .as_millis();
588
589            let job = if let Some(existing) = metadata
590                .analytics_jobs
591                .iter_mut()
592                .find(|job| job.id == job_id)
593            {
594                existing.kind = kind.clone();
595                existing.projection = projection.clone();
596                existing.updated_at_unix_ms = now;
597                existing.metadata = metadata_entries.clone();
598                if existing.last_run_sequence.is_none() {
599                    existing.state = "declared".to_string();
600                }
601                existing.clone()
602            } else {
603                let job = PhysicalAnalyticsJob {
604                    id: job_id.clone(),
605                    kind: kind.clone(),
606                    state: "declared".to_string(),
607                    projection: projection.clone(),
608                    created_at_unix_ms: now,
609                    updated_at_unix_ms: now,
610                    last_run_sequence: None,
611                    metadata: metadata_entries.clone(),
612                };
613                metadata.analytics_jobs.push(job.clone());
614                job
615            };
616
617            metadata
618                .analytics_jobs
619                .sort_by(|left, right| left.id.cmp(&right.id));
620            job
621        })
622    }
623
624    /// Record or update analytics job metadata in the persisted physical metadata.
625    pub fn record_analytics_job(
626        &self,
627        kind: impl Into<String>,
628        projection: Option<String>,
629        metadata_entries: BTreeMap<String, String>,
630    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
631        let kind = kind.into();
632        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
633        if let Some(projection_name) = projection.as_deref() {
634            if !self.graph_projection_is_declared(projection_name) {
635                return Err(format!(
636                    "graph projection '{projection_name}' is not declared in physical metadata"
637                )
638                .into());
639            }
640            if !self.graph_projection_is_operational(projection_name) {
641                return Err(format!(
642                    "graph projection '{projection_name}' is declared but not operationally materialized"
643                )
644                .into());
645            }
646        }
647
648        self.update_physical_metadata(|metadata| {
649            let now = SystemTime::now()
650                .duration_since(UNIX_EPOCH)
651                .unwrap_or_default()
652                .as_millis();
653
654            let existing = metadata
655                .analytics_jobs
656                .iter_mut()
657                .find(|job| job.id == job_id)?;
658            existing.state = "completed".to_string();
659            existing.kind = kind.clone();
660            existing.projection = projection.clone();
661            existing.updated_at_unix_ms = now;
662            existing.last_run_sequence = Some(metadata.superblock.sequence);
663            existing.metadata = metadata_entries.clone();
664            let job = existing.clone();
665
666            metadata
667                .analytics_jobs
668                .sort_by(|left, right| left.id.cmp(&right.id));
669            Some(job)
670        })
671        .and_then(|job| {
672            job.ok_or_else(|| {
673                format!("analytics job '{job_id}' is not declared in physical metadata").into()
674            })
675        })
676    }
677
678    /// Mark a declared analytics job as running.
679    pub fn queue_analytics_job(
680        &self,
681        kind: impl Into<String>,
682        projection: Option<String>,
683        metadata_entries: BTreeMap<String, String>,
684    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
685        let kind = kind.into();
686        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
687        if let Some(projection_name) = projection.as_deref() {
688            if !self.graph_projection_is_declared(projection_name) {
689                return Err(format!(
690                    "graph projection '{projection_name}' is not declared in physical metadata"
691                )
692                .into());
693            }
694            if !self.graph_projection_is_operational(projection_name) {
695                return Err(format!(
696                    "graph projection '{projection_name}' is declared but not operationally materialized"
697                )
698                .into());
699            }
700        }
701
702        self.update_physical_metadata(|metadata| {
703            let now = SystemTime::now()
704                .duration_since(UNIX_EPOCH)
705                .unwrap_or_default()
706                .as_millis();
707
708            let existing = metadata
709                .analytics_jobs
710                .iter_mut()
711                .find(|job| job.id == job_id)?;
712            existing.state = "queued".to_string();
713            existing.kind = kind.clone();
714            existing.projection = projection.clone();
715            existing.updated_at_unix_ms = now;
716            existing.metadata = metadata_entries.clone();
717            let job = existing.clone();
718
719            metadata
720                .analytics_jobs
721                .sort_by(|left, right| left.id.cmp(&right.id));
722            Some(job)
723        })
724        .and_then(|job| {
725            job.ok_or_else(|| {
726                format!("analytics job '{job_id}' is not declared in physical metadata").into()
727            })
728        })
729    }
730
731    /// Mark a declared analytics job as running.
732    pub fn start_analytics_job(
733        &self,
734        kind: impl Into<String>,
735        projection: Option<String>,
736        metadata_entries: BTreeMap<String, String>,
737    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
738        let kind = kind.into();
739        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
740        if let Some(projection_name) = projection.as_deref() {
741            if !self.graph_projection_is_declared(projection_name) {
742                return Err(format!(
743                    "graph projection '{projection_name}' is not declared in physical metadata"
744                )
745                .into());
746            }
747            if !self.graph_projection_is_operational(projection_name) {
748                return Err(format!(
749                    "graph projection '{projection_name}' is declared but not operationally materialized"
750                )
751                .into());
752            }
753        }
754
755        self.update_physical_metadata(|metadata| {
756            let now = SystemTime::now()
757                .duration_since(UNIX_EPOCH)
758                .unwrap_or_default()
759                .as_millis();
760
761            let existing = metadata
762                .analytics_jobs
763                .iter_mut()
764                .find(|job| job.id == job_id)?;
765            existing.state = "running".to_string();
766            existing.kind = kind.clone();
767            existing.projection = projection.clone();
768            existing.updated_at_unix_ms = now;
769            existing.metadata = metadata_entries.clone();
770            let job = existing.clone();
771
772            metadata
773                .analytics_jobs
774                .sort_by(|left, right| left.id.cmp(&right.id));
775            Some(job)
776        })
777        .and_then(|job| {
778            job.ok_or_else(|| {
779                format!("analytics job '{job_id}' is not declared in physical metadata").into()
780            })
781        })
782    }
783
784    /// Mark a declared analytics job as failed.
785    pub fn fail_analytics_job(
786        &self,
787        kind: impl Into<String>,
788        projection: Option<String>,
789        metadata_entries: BTreeMap<String, String>,
790    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
791        let kind = kind.into();
792        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
793
794        self.update_physical_metadata(|metadata| {
795            let now = SystemTime::now()
796                .duration_since(UNIX_EPOCH)
797                .unwrap_or_default()
798                .as_millis();
799
800            let existing = metadata
801                .analytics_jobs
802                .iter_mut()
803                .find(|job| job.id == job_id)?;
804            existing.state = "failed".to_string();
805            existing.kind = kind.clone();
806            existing.projection = projection.clone();
807            existing.updated_at_unix_ms = now;
808            existing.metadata = metadata_entries.clone();
809            let job = existing.clone();
810
811            metadata
812                .analytics_jobs
813                .sort_by(|left, right| left.id.cmp(&right.id));
814            Some(job)
815        })
816        .and_then(|job| {
817            job.ok_or_else(|| {
818                format!("analytics job '{job_id}' is not declared in physical metadata").into()
819            })
820        })
821    }
822
823    /// Mark a declared analytics job as stale.
824    pub fn mark_analytics_job_stale(
825        &self,
826        kind: impl Into<String>,
827        projection: Option<String>,
828        metadata_entries: BTreeMap<String, String>,
829    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
830        let kind = kind.into();
831        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
832
833        self.update_physical_metadata(|metadata| {
834            let now = SystemTime::now()
835                .duration_since(UNIX_EPOCH)
836                .unwrap_or_default()
837                .as_millis();
838
839            let existing = metadata
840                .analytics_jobs
841                .iter_mut()
842                .find(|job| job.id == job_id)?;
843            existing.state = "stale".to_string();
844            existing.kind = kind.clone();
845            existing.projection = projection.clone();
846            existing.updated_at_unix_ms = now;
847            existing.metadata = metadata_entries.clone();
848            let job = existing.clone();
849
850            metadata
851                .analytics_jobs
852                .sort_by(|left, right| left.id.cmp(&right.id));
853            Some(job)
854        })
855        .and_then(|job| {
856            job.ok_or_else(|| {
857                format!("analytics job '{job_id}' is not declared in physical metadata").into()
858            })
859        })
860    }
861
862    /// Create a named export by copying the current database file and metadata sidecar.
863    pub fn create_named_export(
864        &self,
865        name: impl Into<String>,
866    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
867        let name = name.into();
868        if self.options.mode != StorageMode::Persistent {
869            return Err("exports require persistent mode".into());
870        }
871        let Some(path) = self.path() else {
872            return Err("database path is not available".into());
873        };
874
875        self.flush()?;
876
877        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
878        let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
879        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
880        let export_metadata_binary_path =
881            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
882
883        fs::copy(path, &export_data_path)?;
884
885        let descriptor = ExportDescriptor {
886            name: name.clone(),
887            created_at_unix_ms: SystemTime::now()
888                .duration_since(UNIX_EPOCH)
889                .unwrap_or_default()
890                .as_millis(),
891            snapshot_id: metadata
892                .snapshots
893                .last()
894                .map(|snapshot| snapshot.snapshot_id),
895            superblock_sequence: metadata.superblock.sequence,
896            data_path: export_data_path.display().to_string(),
897            metadata_path: export_metadata_path.display().to_string(),
898            collection_count: metadata.catalog.total_collections,
899            total_entities: metadata.catalog.total_entities,
900        };
901
902        metadata
903            .exports
904            .retain(|export| export.name != descriptor.name);
905        metadata.exports.push(descriptor.clone());
906        self.prune_export_registry(&mut metadata.exports);
907        metadata.save_for_data_path(path)?;
908        metadata.save_to_binary_path(&export_metadata_binary_path)?;
909        metadata.save_to_path(&export_metadata_path)?;
910
911        Ok(descriptor)
912    }
913
914    /// Enable or disable a physical index entry in the persisted registry.
915    pub fn set_index_enabled(
916        &self,
917        name: &str,
918        enabled: bool,
919    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
920        let Some(status) = self.index_status(name) else {
921            return Err(format!("index '{name}' is not present in catalog status").into());
922        };
923        if !status.declared {
924            return Err(format!("index '{name}' is not declared in physical metadata").into());
925        }
926        self.update_physical_metadata(|metadata| {
927            let now = SystemTime::now()
928                .duration_since(UNIX_EPOCH)
929                .unwrap_or_default()
930                .as_millis();
931            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
932                index.enabled = enabled;
933                if !enabled {
934                    index.build_state = "disabled".to_string();
935                } else if index.build_state == "disabled" {
936                    index.build_state = if index.artifact_root_page.is_some() {
937                        "ready".to_string()
938                    } else {
939                        "declared-unbuilt".to_string()
940                    };
941                }
942                index.last_refresh_ms = Some(now);
943                return Some(index.clone());
944            }
945            None
946        })
947    }
948
949    /// Mark a declared physical index as building in the persisted registry.
950    pub fn mark_index_building(
951        &self,
952        name: &str,
953    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
954        let Some(status) = self.index_status(name) else {
955            return Err(format!("index '{name}' is not present in catalog status").into());
956        };
957        if !status.declared {
958            return Err(format!("index '{name}' is not declared in physical metadata").into());
959        }
960        if status.lifecycle_state == "disabled" {
961            return Err(format!("index '{name}' is disabled").into());
962        }
963        self.update_physical_metadata(|metadata| {
964            let now = SystemTime::now()
965                .duration_since(UNIX_EPOCH)
966                .unwrap_or_default()
967                .as_millis();
968            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
969                index.build_state = "building".to_string();
970                index.last_refresh_ms = Some(now);
971                return Some(index.clone());
972            }
973            None
974        })
975    }
976
977    /// Mark a declared physical index as failed in the persisted registry.
978    pub fn fail_index(
979        &self,
980        name: &str,
981    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
982        let Some(status) = self.index_status(name) else {
983            return Err(format!("index '{name}' is not present in catalog status").into());
984        };
985        if !status.declared {
986            return Err(format!("index '{name}' is not declared in physical metadata").into());
987        }
988        self.update_physical_metadata(|metadata| {
989            let now = SystemTime::now()
990                .duration_since(UNIX_EPOCH)
991                .unwrap_or_default()
992                .as_millis();
993            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
994                index.build_state = "failed".to_string();
995                index.last_refresh_ms = Some(now);
996                return Some(index.clone());
997            }
998            None
999        })
1000    }
1001
1002    /// Mark a declared physical index as stale in the persisted registry.
1003    pub fn mark_index_stale(
1004        &self,
1005        name: &str,
1006    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1007        let Some(status) = self.index_status(name) else {
1008            return Err(format!("index '{name}' is not present in catalog status").into());
1009        };
1010        if !status.declared {
1011            return Err(format!("index '{name}' is not declared in physical metadata").into());
1012        }
1013        self.update_physical_metadata(|metadata| {
1014            let now = SystemTime::now()
1015                .duration_since(UNIX_EPOCH)
1016                .unwrap_or_default()
1017                .as_millis();
1018            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1019                index.build_state = if index.enabled {
1020                    "stale".to_string()
1021                } else {
1022                    "disabled".to_string()
1023                };
1024                index.last_refresh_ms = Some(now);
1025                return Some(index.clone());
1026            }
1027            None
1028        })
1029    }
1030
1031    /// Mark a declared physical index as ready in the persisted registry.
1032    pub fn mark_index_ready(
1033        &self,
1034        name: &str,
1035    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1036        self.warmup_index(name)
1037    }
1038
1039    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1040    pub fn warmup_index(
1041        &self,
1042        name: &str,
1043    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1044        let Some(status) = self.index_status(name) else {
1045            return Err(format!("index '{name}' is not present in catalog status").into());
1046        };
1047        if !status.declared {
1048            return Err(format!("index '{name}' is not declared in physical metadata").into());
1049        }
1050        if status.lifecycle_state == "disabled" {
1051            return Err(format!("index '{name}' is disabled").into());
1052        }
1053        if !status.operational {
1054            return Err(
1055                format!("index '{name}' is declared but not operationally materialized").into(),
1056            );
1057        }
1058        let warmed_artifact = self
1059            .physical_indexes()
1060            .into_iter()
1061            .find(|index| index.name == name)
1062            .map(|mut index| {
1063                self.warmup_native_vector_artifact_for_index(&index)?;
1064                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1065                Ok::<_, String>(index)
1066            })
1067            .transpose()
1068            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1069
1070        self.update_physical_metadata(|metadata| {
1071            let now = SystemTime::now()
1072                .duration_since(UNIX_EPOCH)
1073                .unwrap_or_default()
1074                .as_millis();
1075            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1076                if let Some(warmed) = warmed_artifact.as_ref() {
1077                    index.entries = warmed.entries;
1078                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1079                    index.backend = warmed.backend.clone();
1080                    index.build_state = "ready".to_string();
1081                }
1082                index.last_refresh_ms = Some(now);
1083                return Some(index.clone());
1084            }
1085            None
1086        })
1087    }
1088
1089    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1090    pub fn rebuild_index_registry(
1091        &self,
1092        collection: Option<&str>,
1093    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1094        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1095        self.update_physical_metadata(|metadata| {
1096            let now = SystemTime::now()
1097                .duration_since(UNIX_EPOCH)
1098                .unwrap_or_default()
1099                .as_millis();
1100
1101            let mut affected = Vec::new();
1102            let declared = metadata.indexes.clone();
1103            for declared_index in declared {
1104                let matches_collection = collection.is_none_or(|collection_name| {
1105                    declared_index.collection.as_deref() == Some(collection_name)
1106                });
1107                if !matches_collection {
1108                    continue;
1109                }
1110
1111                let mut rebuilt = fresh
1112                    .iter()
1113                    .find(|index| index.name == declared_index.name)
1114                    .cloned()
1115                    .unwrap_or_else(|| {
1116                        let mut index = declared_index.clone();
1117                        index.build_state = "declared-unbuilt".to_string();
1118                        index
1119                    });
1120                rebuilt.enabled = declared_index.enabled;
1121                rebuilt.artifact_kind = rebuilt
1122                    .artifact_kind
1123                    .or_else(|| declared_index.artifact_kind.clone());
1124                rebuilt.artifact_root_page = rebuilt
1125                    .artifact_root_page
1126                    .or(declared_index.artifact_root_page);
1127                rebuilt.artifact_checksum = rebuilt
1128                    .artifact_checksum
1129                    .or(declared_index.artifact_checksum);
1130                rebuilt.build_state =
1131                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1132                rebuilt.last_refresh_ms = Some(now);
1133
1134                if let Some(existing) = metadata
1135                    .indexes
1136                    .iter_mut()
1137                    .find(|index| index.name == rebuilt.name)
1138                {
1139                    *existing = rebuilt.clone();
1140                } else {
1141                    metadata.indexes.push(rebuilt.clone());
1142                }
1143
1144                affected.push(rebuilt);
1145            }
1146
1147            affected
1148        })
1149    }
1150
1151    fn finalize_rebuilt_index_build_state(
1152        declared: &PhysicalIndexState,
1153        rebuilt: &PhysicalIndexState,
1154    ) -> String {
1155        if !rebuilt.enabled {
1156            return "disabled".to_string();
1157        }
1158
1159        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1160            return "failed".to_string();
1161        }
1162
1163        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1164        if native_artifact_family {
1165            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1166                return "ready".to_string();
1167            }
1168            if declared.artifact_root_page.is_some()
1169                || declared.artifact_checksum.is_some()
1170                || declared.artifact_kind.is_some()
1171            {
1172                return "stale".to_string();
1173            }
1174            return "declared-unbuilt".to_string();
1175        }
1176
1177        if rebuilt.entries > 0 {
1178            return "ready".to_string();
1179        }
1180
1181        if matches!(
1182            declared.build_state.as_str(),
1183            "stale" | "artifact-published" | "registry-loaded"
1184        ) {
1185            return "stale".to_string();
1186        }
1187
1188        "declared-unbuilt".to_string()
1189    }
1190}