Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_registry.rs

1use super::*;
2
3impl RedDB {
4    pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5        self.physical_metadata()
6            .map(|metadata| metadata.tree_definitions)
7            .unwrap_or_default()
8    }
9
10    pub fn tree_definition(
11        &self,
12        collection: &str,
13        name: &str,
14    ) -> Option<crate::physical::PhysicalTreeDefinition> {
15        self.tree_definitions()
16            .into_iter()
17            .find(|definition| definition.collection == collection && definition.name == name)
18    }
19
20    pub fn save_tree_definition(
21        &self,
22        definition: crate::physical::PhysicalTreeDefinition,
23    ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24        self.update_physical_metadata(|metadata| {
25            if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26                existing.collection == definition.collection && existing.name == definition.name
27            }) {
28                *existing = definition.clone();
29            } else {
30                metadata.tree_definitions.push(definition.clone());
31            }
32            metadata.tree_definitions.sort_by(|left, right| {
33                left.collection
34                    .cmp(&right.collection)
35                    .then_with(|| left.name.cmp(&right.name))
36            });
37            definition.clone()
38        })
39    }
40
41    pub fn remove_tree_definition(
42        &self,
43        collection: &str,
44        name: &str,
45    ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46        self.update_physical_metadata(|metadata| {
47            metadata
48                .tree_definitions
49                .iter()
50                .position(|definition| {
51                    definition.collection == collection && definition.name == name
52                })
53                .map(|index| metadata.tree_definitions.remove(index))
54        })
55    }
56
57    pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58        self.collection_ttl_defaults_ms
59            .read()
60            .ok()
61            .and_then(|defaults| defaults.get(collection).copied())
62    }
63
64    pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66            defaults.insert(collection.into(), ttl_ms);
67        }
68    }
69
70    pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72            defaults.remove(collection);
73        }
74    }
75
76    pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77        self.contract_cache_map()
78            .values()
79            .map(|arc| (**arc).clone())
80            .collect()
81    }
82
83    pub fn collection_contract(
84        &self,
85        collection: &str,
86    ) -> Option<crate::physical::CollectionContract> {
87        self.contract_cache_map()
88            .get(collection)
89            .map(|arc| (**arc).clone())
90    }
91
92    /// Arc-bumping variant — callers that only need read access skip
93    /// the per-call full `CollectionContract` clone. Deep clones cost
94    /// ~200-400 ns each (vec of declared columns + string allocs);
95    /// the UPDATE fast path hits this 2-3× per mutation.
96    pub fn collection_contract_arc(
97        &self,
98        collection: &str,
99    ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100        self.contract_cache_map()
101            .get(collection)
102            .map(std::sync::Arc::clone)
103    }
104
105    /// Return (or lazily build) the in-memory contract map.
106    /// Reparses PhysicalMetadataFile JSON only once per invalidation.
107    /// Called 3× per insert (ensure_model, enforce_uniqueness, normalize_fields)
108    /// so the cache is load-bearing for insert throughput.
109    fn contract_cache_map(
110        &self,
111    ) -> std::sync::Arc<
112        std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113    > {
114        if let Ok(guard) = self.collection_contract_cache.read() {
115            if let Some(map) = guard.as_ref() {
116                return std::sync::Arc::clone(map);
117            }
118        }
119        let contracts: Vec<crate::physical::CollectionContract> = self
120            .physical_metadata()
121            .map(|metadata| metadata.collection_contracts)
122            .unwrap_or_default();
123        let map: std::collections::HashMap<_, _> = contracts
124            .into_iter()
125            .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126            .collect();
127        let arc = std::sync::Arc::new(map);
128        if let Ok(mut guard) = self.collection_contract_cache.write() {
129            *guard = Some(std::sync::Arc::clone(&arc));
130        }
131        arc
132    }
133
134    pub(crate) fn invalidate_collection_contract_cache(&self) {
135        if let Ok(mut guard) = self.collection_contract_cache.write() {
136            *guard = None;
137        }
138    }
139
140    pub fn save_collection_contract(
141        &self,
142        contract: crate::physical::CollectionContract,
143    ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145            if let Some(ttl_ms) = contract.default_ttl_ms {
146                defaults.insert(contract.name.clone(), ttl_ms);
147            } else {
148                defaults.remove(&contract.name);
149            }
150        }
151
152        self.store()
153            .context_index()
154            .set_collection_enabled(&contract.name, contract.context_index_enabled);
155        if let Some(manager) = self.store().get_collection(&contract.name) {
156            let columns = contract
157                .declared_columns
158                .iter()
159                .map(|column| column.name.clone())
160                .collect();
161            manager.set_column_schema_if_empty(columns);
162        }
163
164        self.invalidate_collection_contract_cache();
165
166        self.update_physical_metadata(|metadata| {
167            if let Some(existing) = metadata
168                .collection_contracts
169                .iter_mut()
170                .find(|existing| existing.name == contract.name)
171            {
172                *existing = contract.clone();
173            } else {
174                metadata.collection_contracts.push(contract.clone());
175            }
176            metadata
177                .collection_contracts
178                .sort_by(|left, right| left.name.cmp(&right.name));
179
180            if let Some(ttl_ms) = contract.default_ttl_ms {
181                metadata
182                    .collection_ttl_defaults_ms
183                    .insert(contract.name.clone(), ttl_ms);
184            } else {
185                metadata.collection_ttl_defaults_ms.remove(&contract.name);
186            }
187
188            contract.clone()
189        })
190    }
191
192    pub fn remove_collection_contract(
193        &self,
194        collection: &str,
195    ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
196        if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
197            defaults.remove(collection);
198        }
199
200        self.store()
201            .context_index()
202            .set_collection_enabled(collection, false);
203
204        self.invalidate_collection_contract_cache();
205
206        self.update_physical_metadata(|metadata| {
207            let removed = metadata
208                .collection_contracts
209                .iter()
210                .position(|contract| contract.name == collection)
211                .map(|index| metadata.collection_contracts.remove(index));
212            metadata.collection_ttl_defaults_ms.remove(collection);
213            metadata
214                .indexes
215                .retain(|index| index.collection.as_deref() != Some(collection));
216            removed
217        })
218    }
219
220    pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
221        self.collection_ttl_defaults_ms
222            .read()
223            .map(|defaults| {
224                defaults
225                    .iter()
226                    .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
227                    .collect()
228            })
229            .unwrap_or_default()
230    }
231
232    pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
233        let metadata = self.physical_metadata();
234        let defaults = metadata
235            .as_ref()
236            .map(|m| m.collection_ttl_defaults_ms.clone())
237            .unwrap_or_default();
238
239        if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
240            current.clear();
241            current.extend(defaults);
242        }
243
244        if let Some(metadata) = metadata {
245            let store = self.store();
246            let index = store.context_index();
247            for contract in &metadata.collection_contracts {
248                index.set_collection_enabled(&contract.name, contract.context_index_enabled);
249            }
250        }
251    }
252
253    /// Snapshot the live hypertable registry (specs + every chunk)
254    /// into the sidecar-serialisable form. Called from
255    /// [`persist_metadata`] so the chunk routing spine is durable on
256    /// the same metadata path as collection contracts — issue #866.
257    /// Returns an empty vec (and touches nothing) when no hypertable
258    /// has ever been declared, so non-hypertable databases pay no
259    /// persist cost.
260    pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
261        let registry = self.hypertables();
262        if registry.is_empty() {
263            return Vec::new();
264        }
265        use std::collections::BTreeMap;
266        let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
267            BTreeMap::new();
268        for chunk in registry.snapshot_chunks() {
269            chunks_by_table
270                .entry(chunk.id.hypertable.clone())
271                .or_default()
272                .push(crate::physical::PhysicalHypertableChunk {
273                    start_ns: chunk.id.start_ns,
274                    end_ns_exclusive: chunk.end_ns_exclusive,
275                    row_count: chunk.row_count,
276                    min_ts_ns: chunk.min_ts_ns,
277                    max_ts_ns: chunk.max_ts_ns,
278                    sealed: chunk.sealed,
279                    ttl_override_ns: chunk.ttl_override_ns,
280                });
281        }
282        registry
283            .list()
284            .into_iter()
285            .map(|spec| crate::physical::PhysicalHypertable {
286                chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
287                name: spec.name,
288                time_column: spec.time_column,
289                chunk_interval_ns: spec.chunk_interval_ns,
290                default_ttl_ns: spec.default_ttl_ns,
291            })
292            .collect()
293    }
294
295    /// Rehydrate the hypertable registry from the physical metadata
296    /// sidecar at boot. Mirror of
297    /// [`load_collection_ttl_defaults_from_metadata`] — specs and
298    /// chunks are restored verbatim so bounds / routing / TTL are
299    /// identical to the pre-restart state (issue #866).
300    pub(crate) fn load_hypertables_from_metadata(&self) {
301        let Some(metadata) = self.physical_metadata() else {
302            return;
303        };
304        if metadata.hypertables.is_empty() {
305            return;
306        }
307        let registry = self.hypertables();
308        for hypertable in &metadata.hypertables {
309            let mut spec = crate::storage::timeseries::HypertableSpec::new(
310                hypertable.name.clone(),
311                hypertable.time_column.clone(),
312                hypertable.chunk_interval_ns,
313            );
314            if let Some(ttl) = hypertable.default_ttl_ns {
315                spec = spec.with_ttl_ns(ttl);
316            }
317            registry.register(spec);
318            for chunk in &hypertable.chunks {
319                registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
320                    id: crate::storage::timeseries::ChunkId {
321                        hypertable: hypertable.name.clone(),
322                        start_ns: chunk.start_ns,
323                    },
324                    end_ns_exclusive: chunk.end_ns_exclusive,
325                    row_count: chunk.row_count,
326                    min_ts_ns: chunk.min_ts_ns,
327                    max_ts_ns: chunk.max_ts_ns,
328                    sealed: chunk.sealed,
329                    ttl_override_ns: chunk.ttl_override_ns,
330                });
331            }
332        }
333    }
334
335    pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
336        self.store.run_maintenance()?;
337        self.persist_metadata()?;
338        Ok(())
339    }
340
341    /// Path to the physical metadata sidecar, if persistent.
342    pub fn metadata_path(&self) -> Option<PathBuf> {
343        self.path
344            .as_ref()
345            .map(|path| PhysicalMetadataFile::metadata_path_for(path))
346    }
347
348    /// Load the current physical metadata view, bootstrapping from native state when needed.
349    pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
350        self.load_or_bootstrap_physical_metadata(!self.options.read_only)
351            .ok()
352    }
353
354    /// Physical index registry derived for the current database state.
355    pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
356        let indexes = self
357            .physical_metadata()
358            .map(|metadata| metadata.indexes)
359            .filter(|indexes| !indexes.is_empty())
360            .or_else(|| {
361                self.native_physical_state()
362                    .map(|state| self.physical_index_state_from_native_state(&state, None))
363            })
364            .unwrap_or_else(|| self.physical_index_state());
365        self.reconcile_index_states_with_native_artifacts(indexes)
366    }
367
368    /// List registered named exports from the current physical metadata view.
369    pub fn exports(&self) -> Vec<ExportDescriptor> {
370        self.physical_metadata()
371            .map(|metadata| metadata.exports)
372            .or_else(|| {
373                self.native_physical_state()
374                    .map(|state| self.exports_from_native_state(&state))
375            })
376            .unwrap_or_default()
377    }
378
379    /// List recorded snapshots from the current physical metadata view.
380    pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
381        self.physical_metadata()
382            .map(|metadata| metadata.snapshots)
383            .or_else(|| {
384                self.native_physical_state()
385                    .map(|state| self.snapshots_from_native_state(&state))
386            })
387            .unwrap_or_default()
388    }
389
390    /// List persisted named graph projections from the current physical metadata view.
391    pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
392        self.physical_metadata()
393            .map(|metadata| metadata.graph_projections)
394            .or_else(|| {
395                self.native_physical_state()
396                    .map(|state| self.graph_projections_from_native_state(&state))
397            })
398            .unwrap_or_default()
399    }
400
401    /// List graph projections declared in the catalog view.
402    pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
403        self.catalog_model_snapshot().declared_graph_projections
404    }
405
406    /// List graph projections currently observed in the operational view.
407    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
408        self.graph_projections()
409            .into_iter()
410            .filter(|projection| {
411                projection.last_materialized_sequence.is_some()
412                    || matches!(projection.state.as_str(), "materialized" | "stale")
413            })
414            .collect()
415    }
416
417    /// List persisted analytics job metadata from the current physical metadata view.
418    pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
419        self.physical_metadata()
420            .map(|metadata| metadata.analytics_jobs)
421            .or_else(|| {
422                self.native_physical_state()
423                    .map(|state| self.analytics_jobs_from_native_state(&state))
424            })
425            .unwrap_or_default()
426    }
427
428    /// List analytics jobs declared in the catalog view.
429    pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
430        self.catalog_model_snapshot().declared_analytics_jobs
431    }
432
433    /// List analytics jobs currently observed in the operational view.
434    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
435        self.analytics_jobs()
436            .into_iter()
437            .filter(|job| {
438                job.last_run_sequence.is_some()
439                    || matches!(
440                        job.state.as_str(),
441                        "running" | "completed" | "failed" | "queued" | "stale"
442                    )
443            })
444            .collect()
445    }
446
447    /// List indexes declared in the catalog view.
448    pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
449        self.catalog_model_snapshot().declared_indexes
450    }
451
452    /// List indexes currently observed in the operational view.
453    pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
454        self.catalog_model_snapshot().operational_indexes
455    }
456
457    /// List reconciled index status entries from the catalog snapshot.
458    pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
459        self.catalog_model_snapshot().index_statuses
460    }
461
462    /// Resolve one index status entry from the catalog snapshot.
463    pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
464        self.catalog_model_snapshot()
465            .index_statuses
466            .into_iter()
467            .find(|status| status.name == name)
468    }
469
470    /// Upsert a named graph projection in the persisted physical metadata.
471    pub fn save_graph_projection(
472        &self,
473        name: impl Into<String>,
474        node_labels: Vec<String>,
475        node_types: Vec<String>,
476        edge_labels: Vec<String>,
477        source: impl Into<String>,
478    ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
479        let name = name.into();
480        let source = source.into();
481        self.update_physical_metadata(|metadata| {
482            let now = SystemTime::now()
483                .duration_since(UNIX_EPOCH)
484                .unwrap_or_default()
485                .as_millis();
486            let projection = if let Some(existing) = metadata
487                .graph_projections
488                .iter_mut()
489                .find(|projection| projection.name == name)
490            {
491                existing.updated_at_unix_ms = now;
492                existing.state = "declared".to_string();
493                existing.source = source.clone();
494                existing.node_labels = node_labels.clone();
495                existing.node_types = node_types.clone();
496                existing.edge_labels = edge_labels.clone();
497                existing.last_materialized_sequence = None;
498                existing.clone()
499            } else {
500                let projection = PhysicalGraphProjection {
501                    name: name.clone(),
502                    created_at_unix_ms: now,
503                    updated_at_unix_ms: now,
504                    state: "declared".to_string(),
505                    source: source.clone(),
506                    node_labels: node_labels.clone(),
507                    node_types: node_types.clone(),
508                    edge_labels: edge_labels.clone(),
509                    last_materialized_sequence: None,
510                };
511                metadata.graph_projections.push(projection.clone());
512                projection
513            };
514
515            Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
516
517            metadata
518                .graph_projections
519                .sort_by(|left, right| left.name.cmp(&right.name));
520            projection
521        })
522    }
523
524    /// Mark a declared graph projection as materialized in the current physical metadata.
525    pub fn materialize_graph_projection(
526        &self,
527        name: &str,
528    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
529        self.update_physical_metadata(|metadata| {
530            let now = SystemTime::now()
531                .duration_since(UNIX_EPOCH)
532                .unwrap_or_default()
533                .as_millis();
534            let idx = metadata
535                .graph_projections
536                .iter()
537                .position(|projection| projection.name == name);
538            if let Some(idx) = idx {
539                metadata.graph_projections[idx].updated_at_unix_ms = now;
540                metadata.graph_projections[idx].state = "materialized".to_string();
541                metadata.graph_projections[idx].last_materialized_sequence =
542                    Some(metadata.superblock.sequence);
543                let result = metadata.graph_projections[idx].clone();
544                Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
545                return Some(result);
546            }
547            None
548        })
549    }
550
551    /// Mark a declared graph projection as materializing.
552    pub fn mark_graph_projection_materializing(
553        &self,
554        name: &str,
555    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
556        self.update_physical_metadata(|metadata| {
557            let now = SystemTime::now()
558                .duration_since(UNIX_EPOCH)
559                .unwrap_or_default()
560                .as_millis();
561            let idx = metadata
562                .graph_projections
563                .iter()
564                .position(|projection| projection.name == name);
565            if let Some(idx) = idx {
566                metadata.graph_projections[idx].updated_at_unix_ms = now;
567                metadata.graph_projections[idx].state = "materializing".to_string();
568                metadata.graph_projections[idx].last_materialized_sequence = None;
569                let result = metadata.graph_projections[idx].clone();
570                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
571                return Some(result);
572            }
573            None
574        })
575    }
576
577    /// Mark a graph projection as failed.
578    pub fn fail_graph_projection(
579        &self,
580        name: &str,
581    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
582        self.update_physical_metadata(|metadata| {
583            let now = SystemTime::now()
584                .duration_since(UNIX_EPOCH)
585                .unwrap_or_default()
586                .as_millis();
587            let idx = metadata
588                .graph_projections
589                .iter()
590                .position(|projection| projection.name == name);
591            if let Some(idx) = idx {
592                metadata.graph_projections[idx].updated_at_unix_ms = now;
593                metadata.graph_projections[idx].state = "failed".to_string();
594                metadata.graph_projections[idx].last_materialized_sequence = None;
595                let result = metadata.graph_projections[idx].clone();
596                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
597                return Some(result);
598            }
599            None
600        })
601    }
602
603    /// Mark a graph projection as stale while preserving any last materialized sequence.
604    pub fn mark_graph_projection_stale(
605        &self,
606        name: &str,
607    ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
608        self.update_physical_metadata(|metadata| {
609            let now = SystemTime::now()
610                .duration_since(UNIX_EPOCH)
611                .unwrap_or_default()
612                .as_millis();
613            let idx = metadata
614                .graph_projections
615                .iter()
616                .position(|projection| projection.name == name);
617            if let Some(idx) = idx {
618                metadata.graph_projections[idx].updated_at_unix_ms = now;
619                metadata.graph_projections[idx].state = "stale".to_string();
620                let result = metadata.graph_projections[idx].clone();
621                Self::mark_projection_dependent_jobs_stale(metadata, name, now);
622                return Some(result);
623            }
624            None
625        })
626    }
627
628    fn mark_projection_dependent_jobs_stale(
629        metadata: &mut PhysicalMetadataFile,
630        projection_name: &str,
631        now: u128,
632    ) {
633        for job in metadata.analytics_jobs.iter_mut() {
634            if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
635                job.state = "stale".to_string();
636                job.updated_at_unix_ms = now;
637            }
638        }
639    }
640
641    fn rearm_projection_dependent_jobs_declared(
642        metadata: &mut PhysicalMetadataFile,
643        projection_name: &str,
644        now: u128,
645    ) {
646        for job in metadata.analytics_jobs.iter_mut() {
647            if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
648                job.state = "declared".to_string();
649                job.last_run_sequence = None;
650                job.updated_at_unix_ms = now;
651            }
652        }
653    }
654
655    /// Declare or update analytics job metadata without marking it as executed.
656    pub fn save_analytics_job(
657        &self,
658        kind: impl Into<String>,
659        projection: Option<String>,
660        metadata_entries: BTreeMap<String, String>,
661    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
662        let kind = kind.into();
663        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
664        if let Some(projection_name) = projection.as_deref() {
665            if !self.graph_projection_is_declared(projection_name) {
666                return Err(format!(
667                    "graph projection '{projection_name}' is not declared in physical metadata"
668                )
669                .into());
670            }
671        }
672
673        self.update_physical_metadata(|metadata| {
674            let now = SystemTime::now()
675                .duration_since(UNIX_EPOCH)
676                .unwrap_or_default()
677                .as_millis();
678
679            let job = if let Some(existing) = metadata
680                .analytics_jobs
681                .iter_mut()
682                .find(|job| job.id == job_id)
683            {
684                existing.kind = kind.clone();
685                existing.projection = projection.clone();
686                existing.updated_at_unix_ms = now;
687                existing.metadata = metadata_entries.clone();
688                if existing.last_run_sequence.is_none() {
689                    existing.state = "declared".to_string();
690                }
691                existing.clone()
692            } else {
693                let job = PhysicalAnalyticsJob {
694                    id: job_id.clone(),
695                    kind: kind.clone(),
696                    state: "declared".to_string(),
697                    projection: projection.clone(),
698                    created_at_unix_ms: now,
699                    updated_at_unix_ms: now,
700                    last_run_sequence: None,
701                    metadata: metadata_entries.clone(),
702                };
703                metadata.analytics_jobs.push(job.clone());
704                job
705            };
706
707            metadata
708                .analytics_jobs
709                .sort_by(|left, right| left.id.cmp(&right.id));
710            job
711        })
712    }
713
714    /// Record or update analytics job metadata in the persisted physical metadata.
715    pub fn record_analytics_job(
716        &self,
717        kind: impl Into<String>,
718        projection: Option<String>,
719        metadata_entries: BTreeMap<String, String>,
720    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
721        let kind = kind.into();
722        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
723        if let Some(projection_name) = projection.as_deref() {
724            if !self.graph_projection_is_declared(projection_name) {
725                return Err(format!(
726                    "graph projection '{projection_name}' is not declared in physical metadata"
727                )
728                .into());
729            }
730            if !self.graph_projection_is_operational(projection_name) {
731                return Err(format!(
732                    "graph projection '{projection_name}' is declared but not operationally materialized"
733                )
734                .into());
735            }
736        }
737
738        self.update_physical_metadata(|metadata| {
739            let now = SystemTime::now()
740                .duration_since(UNIX_EPOCH)
741                .unwrap_or_default()
742                .as_millis();
743
744            let existing = metadata
745                .analytics_jobs
746                .iter_mut()
747                .find(|job| job.id == job_id)?;
748            existing.state = "completed".to_string();
749            existing.kind = kind.clone();
750            existing.projection = projection.clone();
751            existing.updated_at_unix_ms = now;
752            existing.last_run_sequence = Some(metadata.superblock.sequence);
753            existing.metadata = metadata_entries.clone();
754            let job = existing.clone();
755
756            metadata
757                .analytics_jobs
758                .sort_by(|left, right| left.id.cmp(&right.id));
759            Some(job)
760        })
761        .and_then(|job| {
762            job.ok_or_else(|| {
763                format!("analytics job '{job_id}' is not declared in physical metadata").into()
764            })
765        })
766    }
767
768    /// Mark a declared analytics job as running.
769    pub fn queue_analytics_job(
770        &self,
771        kind: impl Into<String>,
772        projection: Option<String>,
773        metadata_entries: BTreeMap<String, String>,
774    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
775        let kind = kind.into();
776        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
777        if let Some(projection_name) = projection.as_deref() {
778            if !self.graph_projection_is_declared(projection_name) {
779                return Err(format!(
780                    "graph projection '{projection_name}' is not declared in physical metadata"
781                )
782                .into());
783            }
784            if !self.graph_projection_is_operational(projection_name) {
785                return Err(format!(
786                    "graph projection '{projection_name}' is declared but not operationally materialized"
787                )
788                .into());
789            }
790        }
791
792        self.update_physical_metadata(|metadata| {
793            let now = SystemTime::now()
794                .duration_since(UNIX_EPOCH)
795                .unwrap_or_default()
796                .as_millis();
797
798            let existing = metadata
799                .analytics_jobs
800                .iter_mut()
801                .find(|job| job.id == job_id)?;
802            existing.state = "queued".to_string();
803            existing.kind = kind.clone();
804            existing.projection = projection.clone();
805            existing.updated_at_unix_ms = now;
806            existing.metadata = metadata_entries.clone();
807            let job = existing.clone();
808
809            metadata
810                .analytics_jobs
811                .sort_by(|left, right| left.id.cmp(&right.id));
812            Some(job)
813        })
814        .and_then(|job| {
815            job.ok_or_else(|| {
816                format!("analytics job '{job_id}' is not declared in physical metadata").into()
817            })
818        })
819    }
820
821    /// Mark a declared analytics job as running.
822    pub fn start_analytics_job(
823        &self,
824        kind: impl Into<String>,
825        projection: Option<String>,
826        metadata_entries: BTreeMap<String, String>,
827    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
828        let kind = kind.into();
829        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
830        if let Some(projection_name) = projection.as_deref() {
831            if !self.graph_projection_is_declared(projection_name) {
832                return Err(format!(
833                    "graph projection '{projection_name}' is not declared in physical metadata"
834                )
835                .into());
836            }
837            if !self.graph_projection_is_operational(projection_name) {
838                return Err(format!(
839                    "graph projection '{projection_name}' is declared but not operationally materialized"
840                )
841                .into());
842            }
843        }
844
845        self.update_physical_metadata(|metadata| {
846            let now = SystemTime::now()
847                .duration_since(UNIX_EPOCH)
848                .unwrap_or_default()
849                .as_millis();
850
851            let existing = metadata
852                .analytics_jobs
853                .iter_mut()
854                .find(|job| job.id == job_id)?;
855            existing.state = "running".to_string();
856            existing.kind = kind.clone();
857            existing.projection = projection.clone();
858            existing.updated_at_unix_ms = now;
859            existing.metadata = metadata_entries.clone();
860            let job = existing.clone();
861
862            metadata
863                .analytics_jobs
864                .sort_by(|left, right| left.id.cmp(&right.id));
865            Some(job)
866        })
867        .and_then(|job| {
868            job.ok_or_else(|| {
869                format!("analytics job '{job_id}' is not declared in physical metadata").into()
870            })
871        })
872    }
873
874    /// Mark a declared analytics job as failed.
875    pub fn fail_analytics_job(
876        &self,
877        kind: impl Into<String>,
878        projection: Option<String>,
879        metadata_entries: BTreeMap<String, String>,
880    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
881        let kind = kind.into();
882        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
883
884        self.update_physical_metadata(|metadata| {
885            let now = SystemTime::now()
886                .duration_since(UNIX_EPOCH)
887                .unwrap_or_default()
888                .as_millis();
889
890            let existing = metadata
891                .analytics_jobs
892                .iter_mut()
893                .find(|job| job.id == job_id)?;
894            existing.state = "failed".to_string();
895            existing.kind = kind.clone();
896            existing.projection = projection.clone();
897            existing.updated_at_unix_ms = now;
898            existing.metadata = metadata_entries.clone();
899            let job = existing.clone();
900
901            metadata
902                .analytics_jobs
903                .sort_by(|left, right| left.id.cmp(&right.id));
904            Some(job)
905        })
906        .and_then(|job| {
907            job.ok_or_else(|| {
908                format!("analytics job '{job_id}' is not declared in physical metadata").into()
909            })
910        })
911    }
912
913    /// Mark a declared analytics job as stale.
914    pub fn mark_analytics_job_stale(
915        &self,
916        kind: impl Into<String>,
917        projection: Option<String>,
918        metadata_entries: BTreeMap<String, String>,
919    ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
920        let kind = kind.into();
921        let job_id = Self::analytics_job_id(&kind, projection.as_deref());
922
923        self.update_physical_metadata(|metadata| {
924            let now = SystemTime::now()
925                .duration_since(UNIX_EPOCH)
926                .unwrap_or_default()
927                .as_millis();
928
929            let existing = metadata
930                .analytics_jobs
931                .iter_mut()
932                .find(|job| job.id == job_id)?;
933            existing.state = "stale".to_string();
934            existing.kind = kind.clone();
935            existing.projection = projection.clone();
936            existing.updated_at_unix_ms = now;
937            existing.metadata = metadata_entries.clone();
938            let job = existing.clone();
939
940            metadata
941                .analytics_jobs
942                .sort_by(|left, right| left.id.cmp(&right.id));
943            Some(job)
944        })
945        .and_then(|job| {
946            job.ok_or_else(|| {
947                format!("analytics job '{job_id}' is not declared in physical metadata").into()
948            })
949        })
950    }
951
952    /// Create a named export by copying the current database file and metadata sidecar.
953    pub fn create_named_export(
954        &self,
955        name: impl Into<String>,
956    ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
957        let name = name.into();
958        if self.options.mode != StorageMode::Persistent {
959            return Err("exports require persistent mode".into());
960        }
961        let Some(path) = self.path() else {
962            return Err("database path is not available".into());
963        };
964
965        self.flush()?;
966
967        let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
968        let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
969        let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
970        let export_metadata_binary_path =
971            PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
972
973        fs::copy(path, &export_data_path)?;
974
975        let descriptor = ExportDescriptor {
976            name: name.clone(),
977            created_at_unix_ms: SystemTime::now()
978                .duration_since(UNIX_EPOCH)
979                .unwrap_or_default()
980                .as_millis(),
981            snapshot_id: metadata
982                .snapshots
983                .last()
984                .map(|snapshot| snapshot.snapshot_id),
985            superblock_sequence: metadata.superblock.sequence,
986            data_path: export_data_path.display().to_string(),
987            metadata_path: export_metadata_path.display().to_string(),
988            collection_count: metadata.catalog.total_collections,
989            total_entities: metadata.catalog.total_entities,
990        };
991
992        metadata
993            .exports
994            .retain(|export| export.name != descriptor.name);
995        metadata.exports.push(descriptor.clone());
996        self.prune_export_registry(&mut metadata.exports);
997        metadata.save_for_data_path(path)?;
998        metadata.save_to_binary_path(&export_metadata_binary_path)?;
999        metadata.save_to_path(&export_metadata_path)?;
1000
1001        Ok(descriptor)
1002    }
1003
1004    /// Enable or disable a physical index entry in the persisted registry.
1005    pub fn set_index_enabled(
1006        &self,
1007        name: &str,
1008        enabled: bool,
1009    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1010        let Some(status) = self.index_status(name) else {
1011            return Err(format!("index '{name}' is not present in catalog status").into());
1012        };
1013        if !status.declared {
1014            return Err(format!("index '{name}' is not declared in physical metadata").into());
1015        }
1016        self.update_physical_metadata(|metadata| {
1017            let now = SystemTime::now()
1018                .duration_since(UNIX_EPOCH)
1019                .unwrap_or_default()
1020                .as_millis();
1021            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1022                index.enabled = enabled;
1023                if !enabled {
1024                    index.build_state = "disabled".to_string();
1025                } else if index.build_state == "disabled" {
1026                    index.build_state = if index.artifact_root_page.is_some() {
1027                        "ready".to_string()
1028                    } else {
1029                        "declared-unbuilt".to_string()
1030                    };
1031                }
1032                index.last_refresh_ms = Some(now);
1033                return Some(index.clone());
1034            }
1035            None
1036        })
1037    }
1038
1039    /// Mark a declared physical index as building in the persisted registry.
1040    pub fn mark_index_building(
1041        &self,
1042        name: &str,
1043    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1044        let Some(status) = self.index_status(name) else {
1045            return Err(format!("index '{name}' is not present in catalog status").into());
1046        };
1047        if !status.declared {
1048            return Err(format!("index '{name}' is not declared in physical metadata").into());
1049        }
1050        if status.lifecycle_state == "disabled" {
1051            return Err(format!("index '{name}' is disabled").into());
1052        }
1053        self.update_physical_metadata(|metadata| {
1054            let now = SystemTime::now()
1055                .duration_since(UNIX_EPOCH)
1056                .unwrap_or_default()
1057                .as_millis();
1058            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1059                index.build_state = "building".to_string();
1060                index.last_refresh_ms = Some(now);
1061                return Some(index.clone());
1062            }
1063            None
1064        })
1065    }
1066
1067    /// Mark a declared physical index as failed in the persisted registry.
1068    pub fn fail_index(
1069        &self,
1070        name: &str,
1071    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1072        let Some(status) = self.index_status(name) else {
1073            return Err(format!("index '{name}' is not present in catalog status").into());
1074        };
1075        if !status.declared {
1076            return Err(format!("index '{name}' is not declared in physical metadata").into());
1077        }
1078        self.update_physical_metadata(|metadata| {
1079            let now = SystemTime::now()
1080                .duration_since(UNIX_EPOCH)
1081                .unwrap_or_default()
1082                .as_millis();
1083            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1084                index.build_state = "failed".to_string();
1085                index.last_refresh_ms = Some(now);
1086                return Some(index.clone());
1087            }
1088            None
1089        })
1090    }
1091
1092    /// Mark a declared physical index as stale in the persisted registry.
1093    pub fn mark_index_stale(
1094        &self,
1095        name: &str,
1096    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1097        let Some(status) = self.index_status(name) else {
1098            return Err(format!("index '{name}' is not present in catalog status").into());
1099        };
1100        if !status.declared {
1101            return Err(format!("index '{name}' is not declared in physical metadata").into());
1102        }
1103        self.update_physical_metadata(|metadata| {
1104            let now = SystemTime::now()
1105                .duration_since(UNIX_EPOCH)
1106                .unwrap_or_default()
1107                .as_millis();
1108            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1109                index.build_state = if index.enabled {
1110                    "stale".to_string()
1111                } else {
1112                    "disabled".to_string()
1113                };
1114                index.last_refresh_ms = Some(now);
1115                return Some(index.clone());
1116            }
1117            None
1118        })
1119    }
1120
1121    /// Mark a declared physical index as ready in the persisted registry.
1122    pub fn mark_index_ready(
1123        &self,
1124        name: &str,
1125    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1126        self.warmup_index(name)
1127    }
1128
1129    /// Mark a physical index as warmed up/refreshed in the persisted registry.
1130    pub fn warmup_index(
1131        &self,
1132        name: &str,
1133    ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1134        let Some(status) = self.index_status(name) else {
1135            return Err(format!("index '{name}' is not present in catalog status").into());
1136        };
1137        if !status.declared {
1138            return Err(format!("index '{name}' is not declared in physical metadata").into());
1139        }
1140        if status.lifecycle_state == "disabled" {
1141            return Err(format!("index '{name}' is disabled").into());
1142        }
1143        if !status.operational {
1144            return Err(
1145                format!("index '{name}' is declared but not operationally materialized").into(),
1146            );
1147        }
1148        let warmed_artifact = self
1149            .physical_indexes()
1150            .into_iter()
1151            .find(|index| index.name == name)
1152            .map(|mut index| {
1153                self.warmup_native_vector_artifact_for_index(&index)?;
1154                self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1155                Ok::<_, String>(index)
1156            })
1157            .transpose()
1158            .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1159
1160        self.update_physical_metadata(|metadata| {
1161            let now = SystemTime::now()
1162                .duration_since(UNIX_EPOCH)
1163                .unwrap_or_default()
1164                .as_millis();
1165            if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1166                if let Some(warmed) = warmed_artifact.as_ref() {
1167                    index.entries = warmed.entries;
1168                    index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1169                    index.backend = warmed.backend.clone();
1170                    index.build_state = "ready".to_string();
1171                }
1172                index.last_refresh_ms = Some(now);
1173                return Some(index.clone());
1174            }
1175            None
1176        })
1177    }
1178
1179    /// Rebuild physical index metadata from the current catalog, optionally restricted to one collection.
1180    pub fn rebuild_index_registry(
1181        &self,
1182        collection: Option<&str>,
1183    ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1184        let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1185        self.update_physical_metadata(|metadata| {
1186            let now = SystemTime::now()
1187                .duration_since(UNIX_EPOCH)
1188                .unwrap_or_default()
1189                .as_millis();
1190
1191            let mut affected = Vec::new();
1192            let declared = metadata.indexes.clone();
1193            for declared_index in declared {
1194                let matches_collection = collection.is_none_or(|collection_name| {
1195                    declared_index.collection.as_deref() == Some(collection_name)
1196                });
1197                if !matches_collection {
1198                    continue;
1199                }
1200
1201                let mut rebuilt = fresh
1202                    .iter()
1203                    .find(|index| index.name == declared_index.name)
1204                    .cloned()
1205                    .unwrap_or_else(|| {
1206                        let mut index = declared_index.clone();
1207                        index.build_state = "declared-unbuilt".to_string();
1208                        index
1209                    });
1210                rebuilt.enabled = declared_index.enabled;
1211                rebuilt.artifact_kind = rebuilt
1212                    .artifact_kind
1213                    .or_else(|| declared_index.artifact_kind.clone());
1214                rebuilt.artifact_root_page = rebuilt
1215                    .artifact_root_page
1216                    .or(declared_index.artifact_root_page);
1217                rebuilt.artifact_checksum = rebuilt
1218                    .artifact_checksum
1219                    .or(declared_index.artifact_checksum);
1220                rebuilt.build_state =
1221                    Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1222                rebuilt.last_refresh_ms = Some(now);
1223
1224                if let Some(existing) = metadata
1225                    .indexes
1226                    .iter_mut()
1227                    .find(|index| index.name == rebuilt.name)
1228                {
1229                    *existing = rebuilt.clone();
1230                } else {
1231                    metadata.indexes.push(rebuilt.clone());
1232                }
1233
1234                affected.push(rebuilt);
1235            }
1236
1237            affected
1238        })
1239    }
1240
1241    fn finalize_rebuilt_index_build_state(
1242        declared: &PhysicalIndexState,
1243        rebuilt: &PhysicalIndexState,
1244    ) -> String {
1245        if !rebuilt.enabled {
1246            return "disabled".to_string();
1247        }
1248
1249        if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1250            return "failed".to_string();
1251        }
1252
1253        let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1254        if native_artifact_family {
1255            if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1256                return "ready".to_string();
1257            }
1258            if declared.artifact_root_page.is_some()
1259                || declared.artifact_checksum.is_some()
1260                || declared.artifact_kind.is_some()
1261            {
1262                return "stale".to_string();
1263            }
1264            return "declared-unbuilt".to_string();
1265        }
1266
1267        if rebuilt.entries > 0 {
1268            return "ready".to_string();
1269        }
1270
1271        if matches!(
1272            declared.build_state.as_str(),
1273            "stale" | "artifact-published" | "registry-loaded"
1274        ) {
1275            return "stale".to_string();
1276        }
1277
1278        "declared-unbuilt".to_string()
1279    }
1280}