1use super::*;
2
3impl RedDB {
4 pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5 self.physical_metadata()
6 .map(|metadata| metadata.tree_definitions)
7 .unwrap_or_default()
8 }
9
10 pub fn tree_definition(
11 &self,
12 collection: &str,
13 name: &str,
14 ) -> Option<crate::physical::PhysicalTreeDefinition> {
15 self.tree_definitions()
16 .into_iter()
17 .find(|definition| definition.collection == collection && definition.name == name)
18 }
19
20 pub fn save_tree_definition(
21 &self,
22 definition: crate::physical::PhysicalTreeDefinition,
23 ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24 self.update_physical_metadata(|metadata| {
25 if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26 existing.collection == definition.collection && existing.name == definition.name
27 }) {
28 *existing = definition.clone();
29 } else {
30 metadata.tree_definitions.push(definition.clone());
31 }
32 metadata.tree_definitions.sort_by(|left, right| {
33 left.collection
34 .cmp(&right.collection)
35 .then_with(|| left.name.cmp(&right.name))
36 });
37 definition.clone()
38 })
39 }
40
41 pub fn remove_tree_definition(
42 &self,
43 collection: &str,
44 name: &str,
45 ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46 self.update_physical_metadata(|metadata| {
47 metadata
48 .tree_definitions
49 .iter()
50 .position(|definition| {
51 definition.collection == collection && definition.name == name
52 })
53 .map(|index| metadata.tree_definitions.remove(index))
54 })
55 }
56
57 pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58 self.collection_ttl_defaults_ms
59 .read()
60 .ok()
61 .and_then(|defaults| defaults.get(collection).copied())
62 }
63
64 pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66 defaults.insert(collection.into(), ttl_ms);
67 }
68 }
69
70 pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72 defaults.remove(collection);
73 }
74 }
75
76 pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77 self.contract_cache_map()
78 .values()
79 .map(|arc| (**arc).clone())
80 .collect()
81 }
82
83 pub fn collection_contract(
84 &self,
85 collection: &str,
86 ) -> Option<crate::physical::CollectionContract> {
87 self.contract_cache_map()
88 .get(collection)
89 .map(|arc| (**arc).clone())
90 }
91
92 pub fn collection_contract_arc(
97 &self,
98 collection: &str,
99 ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100 self.contract_cache_map()
101 .get(collection)
102 .map(std::sync::Arc::clone)
103 }
104
105 fn contract_cache_map(
110 &self,
111 ) -> std::sync::Arc<
112 std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113 > {
114 if let Ok(guard) = self.collection_contract_cache.read() {
115 if let Some(map) = guard.as_ref() {
116 return std::sync::Arc::clone(map);
117 }
118 }
119 let contracts: Vec<crate::physical::CollectionContract> = self
120 .physical_metadata()
121 .map(|metadata| metadata.collection_contracts)
122 .unwrap_or_default();
123 let map: std::collections::HashMap<_, _> = contracts
124 .into_iter()
125 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126 .collect();
127 let arc = std::sync::Arc::new(map);
128 if let Ok(mut guard) = self.collection_contract_cache.write() {
129 *guard = Some(std::sync::Arc::clone(&arc));
130 }
131 arc
132 }
133
134 pub(crate) fn invalidate_collection_contract_cache(&self) {
135 if let Ok(mut guard) = self.collection_contract_cache.write() {
136 *guard = None;
137 }
138 }
139
140 pub fn save_collection_contract(
141 &self,
142 contract: crate::physical::CollectionContract,
143 ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145 if let Some(ttl_ms) = contract.default_ttl_ms {
146 defaults.insert(contract.name.clone(), ttl_ms);
147 } else {
148 defaults.remove(&contract.name);
149 }
150 }
151
152 self.store()
153 .context_index()
154 .set_collection_enabled(&contract.name, contract.context_index_enabled);
155 if let Some(manager) = self.store().get_collection(&contract.name) {
156 let columns = contract
157 .declared_columns
158 .iter()
159 .map(|column| column.name.clone())
160 .collect();
161 manager.set_column_schema_if_empty(columns);
162 }
163
164 self.invalidate_collection_contract_cache();
165
166 self.update_physical_metadata(|metadata| {
167 if let Some(existing) = metadata
168 .collection_contracts
169 .iter_mut()
170 .find(|existing| existing.name == contract.name)
171 {
172 *existing = contract.clone();
173 } else {
174 metadata.collection_contracts.push(contract.clone());
175 }
176 metadata
177 .collection_contracts
178 .sort_by(|left, right| left.name.cmp(&right.name));
179
180 if let Some(ttl_ms) = contract.default_ttl_ms {
181 metadata
182 .collection_ttl_defaults_ms
183 .insert(contract.name.clone(), ttl_ms);
184 } else {
185 metadata.collection_ttl_defaults_ms.remove(&contract.name);
186 }
187
188 contract.clone()
189 })
190 }
191
192 pub fn remove_collection_contract(
193 &self,
194 collection: &str,
195 ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
196 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
197 defaults.remove(collection);
198 }
199
200 self.store()
201 .context_index()
202 .set_collection_enabled(collection, false);
203
204 self.invalidate_collection_contract_cache();
205
206 self.update_physical_metadata(|metadata| {
207 let removed = metadata
208 .collection_contracts
209 .iter()
210 .position(|contract| contract.name == collection)
211 .map(|index| metadata.collection_contracts.remove(index));
212 metadata.collection_ttl_defaults_ms.remove(collection);
213 metadata
214 .indexes
215 .retain(|index| index.collection.as_deref() != Some(collection));
216 removed
217 })
218 }
219
220 pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
221 self.collection_ttl_defaults_ms
222 .read()
223 .map(|defaults| {
224 defaults
225 .iter()
226 .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
227 .collect()
228 })
229 .unwrap_or_default()
230 }
231
232 pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
233 let metadata = self.physical_metadata();
234 let defaults = metadata
235 .as_ref()
236 .map(|m| m.collection_ttl_defaults_ms.clone())
237 .unwrap_or_default();
238
239 if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
240 current.clear();
241 current.extend(defaults);
242 }
243
244 if let Some(metadata) = metadata {
245 let store = self.store();
246 let index = store.context_index();
247 for contract in &metadata.collection_contracts {
248 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
249 }
250 }
251 }
252
253 pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
261 let registry = self.hypertables();
262 if registry.is_empty() {
263 return Vec::new();
264 }
265 use std::collections::BTreeMap;
266 let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
267 BTreeMap::new();
268 for chunk in registry.snapshot_chunks() {
269 chunks_by_table
270 .entry(chunk.id.hypertable.clone())
271 .or_default()
272 .push(crate::physical::PhysicalHypertableChunk {
273 start_ns: chunk.id.start_ns,
274 end_ns_exclusive: chunk.end_ns_exclusive,
275 row_count: chunk.row_count,
276 min_ts_ns: chunk.min_ts_ns,
277 max_ts_ns: chunk.max_ts_ns,
278 sealed: chunk.sealed,
279 ttl_override_ns: chunk.ttl_override_ns,
280 });
281 }
282 registry
283 .list()
284 .into_iter()
285 .map(|spec| crate::physical::PhysicalHypertable {
286 chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
287 name: spec.name,
288 time_column: spec.time_column,
289 chunk_interval_ns: spec.chunk_interval_ns,
290 default_ttl_ns: spec.default_ttl_ns,
291 })
292 .collect()
293 }
294
295 pub(crate) fn load_hypertables_from_metadata(&self) {
301 let Some(metadata) = self.physical_metadata() else {
302 return;
303 };
304 if metadata.hypertables.is_empty() {
305 return;
306 }
307 let registry = self.hypertables();
308 for hypertable in &metadata.hypertables {
309 let mut spec = crate::storage::timeseries::HypertableSpec::new(
310 hypertable.name.clone(),
311 hypertable.time_column.clone(),
312 hypertable.chunk_interval_ns,
313 );
314 if let Some(ttl) = hypertable.default_ttl_ns {
315 spec = spec.with_ttl_ns(ttl);
316 }
317 registry.register(spec);
318 for chunk in &hypertable.chunks {
319 registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
320 id: crate::storage::timeseries::ChunkId {
321 hypertable: hypertable.name.clone(),
322 start_ns: chunk.start_ns,
323 },
324 end_ns_exclusive: chunk.end_ns_exclusive,
325 row_count: chunk.row_count,
326 min_ts_ns: chunk.min_ts_ns,
327 max_ts_ns: chunk.max_ts_ns,
328 sealed: chunk.sealed,
329 ttl_override_ns: chunk.ttl_override_ns,
330 });
331 }
332 }
333 }
334
335 pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
336 self.store.run_maintenance()?;
337 self.persist_metadata()?;
338 Ok(())
339 }
340
341 pub fn metadata_path(&self) -> Option<PathBuf> {
343 self.path
344 .as_ref()
345 .map(|path| PhysicalMetadataFile::metadata_path_for(path))
346 }
347
348 pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
350 self.load_or_bootstrap_physical_metadata(!self.options.read_only)
351 .ok()
352 }
353
354 pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
356 let indexes = self
357 .physical_metadata()
358 .map(|metadata| metadata.indexes)
359 .filter(|indexes| !indexes.is_empty())
360 .or_else(|| {
361 self.native_physical_state()
362 .map(|state| self.physical_index_state_from_native_state(&state, None))
363 })
364 .unwrap_or_else(|| self.physical_index_state());
365 self.reconcile_index_states_with_native_artifacts(indexes)
366 }
367
368 pub fn exports(&self) -> Vec<ExportDescriptor> {
370 self.physical_metadata()
371 .map(|metadata| metadata.exports)
372 .or_else(|| {
373 self.native_physical_state()
374 .map(|state| self.exports_from_native_state(&state))
375 })
376 .unwrap_or_default()
377 }
378
379 pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
381 self.physical_metadata()
382 .map(|metadata| metadata.snapshots)
383 .or_else(|| {
384 self.native_physical_state()
385 .map(|state| self.snapshots_from_native_state(&state))
386 })
387 .unwrap_or_default()
388 }
389
390 pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
392 self.physical_metadata()
393 .map(|metadata| metadata.graph_projections)
394 .or_else(|| {
395 self.native_physical_state()
396 .map(|state| self.graph_projections_from_native_state(&state))
397 })
398 .unwrap_or_default()
399 }
400
401 pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
403 self.catalog_model_snapshot().declared_graph_projections
404 }
405
406 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
408 self.graph_projections()
409 .into_iter()
410 .filter(|projection| {
411 projection.last_materialized_sequence.is_some()
412 || matches!(projection.state.as_str(), "materialized" | "stale")
413 })
414 .collect()
415 }
416
417 pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
419 self.physical_metadata()
420 .map(|metadata| metadata.analytics_jobs)
421 .or_else(|| {
422 self.native_physical_state()
423 .map(|state| self.analytics_jobs_from_native_state(&state))
424 })
425 .unwrap_or_default()
426 }
427
428 pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
430 self.catalog_model_snapshot().declared_analytics_jobs
431 }
432
433 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
435 self.analytics_jobs()
436 .into_iter()
437 .filter(|job| {
438 job.last_run_sequence.is_some()
439 || matches!(
440 job.state.as_str(),
441 "running" | "completed" | "failed" | "queued" | "stale"
442 )
443 })
444 .collect()
445 }
446
447 pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
449 self.catalog_model_snapshot().declared_indexes
450 }
451
452 pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
454 self.catalog_model_snapshot().operational_indexes
455 }
456
457 pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
459 self.catalog_model_snapshot().index_statuses
460 }
461
462 pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
464 self.catalog_model_snapshot()
465 .index_statuses
466 .into_iter()
467 .find(|status| status.name == name)
468 }
469
470 pub fn save_graph_projection(
472 &self,
473 name: impl Into<String>,
474 node_labels: Vec<String>,
475 node_types: Vec<String>,
476 edge_labels: Vec<String>,
477 source: impl Into<String>,
478 ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
479 let name = name.into();
480 let source = source.into();
481 self.update_physical_metadata(|metadata| {
482 let now = SystemTime::now()
483 .duration_since(UNIX_EPOCH)
484 .unwrap_or_default()
485 .as_millis();
486 let projection = if let Some(existing) = metadata
487 .graph_projections
488 .iter_mut()
489 .find(|projection| projection.name == name)
490 {
491 existing.updated_at_unix_ms = now;
492 existing.state = "declared".to_string();
493 existing.source = source.clone();
494 existing.node_labels = node_labels.clone();
495 existing.node_types = node_types.clone();
496 existing.edge_labels = edge_labels.clone();
497 existing.last_materialized_sequence = None;
498 existing.clone()
499 } else {
500 let projection = PhysicalGraphProjection {
501 name: name.clone(),
502 created_at_unix_ms: now,
503 updated_at_unix_ms: now,
504 state: "declared".to_string(),
505 source: source.clone(),
506 node_labels: node_labels.clone(),
507 node_types: node_types.clone(),
508 edge_labels: edge_labels.clone(),
509 last_materialized_sequence: None,
510 };
511 metadata.graph_projections.push(projection.clone());
512 projection
513 };
514
515 Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
516
517 metadata
518 .graph_projections
519 .sort_by(|left, right| left.name.cmp(&right.name));
520 projection
521 })
522 }
523
524 pub fn materialize_graph_projection(
526 &self,
527 name: &str,
528 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
529 self.update_physical_metadata(|metadata| {
530 let now = SystemTime::now()
531 .duration_since(UNIX_EPOCH)
532 .unwrap_or_default()
533 .as_millis();
534 let idx = metadata
535 .graph_projections
536 .iter()
537 .position(|projection| projection.name == name);
538 if let Some(idx) = idx {
539 metadata.graph_projections[idx].updated_at_unix_ms = now;
540 metadata.graph_projections[idx].state = "materialized".to_string();
541 metadata.graph_projections[idx].last_materialized_sequence =
542 Some(metadata.superblock.sequence);
543 let result = metadata.graph_projections[idx].clone();
544 Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
545 return Some(result);
546 }
547 None
548 })
549 }
550
551 pub fn mark_graph_projection_materializing(
553 &self,
554 name: &str,
555 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
556 self.update_physical_metadata(|metadata| {
557 let now = SystemTime::now()
558 .duration_since(UNIX_EPOCH)
559 .unwrap_or_default()
560 .as_millis();
561 let idx = metadata
562 .graph_projections
563 .iter()
564 .position(|projection| projection.name == name);
565 if let Some(idx) = idx {
566 metadata.graph_projections[idx].updated_at_unix_ms = now;
567 metadata.graph_projections[idx].state = "materializing".to_string();
568 metadata.graph_projections[idx].last_materialized_sequence = None;
569 let result = metadata.graph_projections[idx].clone();
570 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
571 return Some(result);
572 }
573 None
574 })
575 }
576
577 pub fn fail_graph_projection(
579 &self,
580 name: &str,
581 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
582 self.update_physical_metadata(|metadata| {
583 let now = SystemTime::now()
584 .duration_since(UNIX_EPOCH)
585 .unwrap_or_default()
586 .as_millis();
587 let idx = metadata
588 .graph_projections
589 .iter()
590 .position(|projection| projection.name == name);
591 if let Some(idx) = idx {
592 metadata.graph_projections[idx].updated_at_unix_ms = now;
593 metadata.graph_projections[idx].state = "failed".to_string();
594 metadata.graph_projections[idx].last_materialized_sequence = None;
595 let result = metadata.graph_projections[idx].clone();
596 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
597 return Some(result);
598 }
599 None
600 })
601 }
602
603 pub fn mark_graph_projection_stale(
605 &self,
606 name: &str,
607 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
608 self.update_physical_metadata(|metadata| {
609 let now = SystemTime::now()
610 .duration_since(UNIX_EPOCH)
611 .unwrap_or_default()
612 .as_millis();
613 let idx = metadata
614 .graph_projections
615 .iter()
616 .position(|projection| projection.name == name);
617 if let Some(idx) = idx {
618 metadata.graph_projections[idx].updated_at_unix_ms = now;
619 metadata.graph_projections[idx].state = "stale".to_string();
620 let result = metadata.graph_projections[idx].clone();
621 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
622 return Some(result);
623 }
624 None
625 })
626 }
627
628 fn mark_projection_dependent_jobs_stale(
629 metadata: &mut PhysicalMetadataFile,
630 projection_name: &str,
631 now: u128,
632 ) {
633 for job in metadata.analytics_jobs.iter_mut() {
634 if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
635 job.state = "stale".to_string();
636 job.updated_at_unix_ms = now;
637 }
638 }
639 }
640
641 fn rearm_projection_dependent_jobs_declared(
642 metadata: &mut PhysicalMetadataFile,
643 projection_name: &str,
644 now: u128,
645 ) {
646 for job in metadata.analytics_jobs.iter_mut() {
647 if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
648 job.state = "declared".to_string();
649 job.last_run_sequence = None;
650 job.updated_at_unix_ms = now;
651 }
652 }
653 }
654
655 pub fn save_analytics_job(
657 &self,
658 kind: impl Into<String>,
659 projection: Option<String>,
660 metadata_entries: BTreeMap<String, String>,
661 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
662 let kind = kind.into();
663 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
664 if let Some(projection_name) = projection.as_deref() {
665 if !self.graph_projection_is_declared(projection_name) {
666 return Err(format!(
667 "graph projection '{projection_name}' is not declared in physical metadata"
668 )
669 .into());
670 }
671 }
672
673 self.update_physical_metadata(|metadata| {
674 let now = SystemTime::now()
675 .duration_since(UNIX_EPOCH)
676 .unwrap_or_default()
677 .as_millis();
678
679 let job = if let Some(existing) = metadata
680 .analytics_jobs
681 .iter_mut()
682 .find(|job| job.id == job_id)
683 {
684 existing.kind = kind.clone();
685 existing.projection = projection.clone();
686 existing.updated_at_unix_ms = now;
687 existing.metadata = metadata_entries.clone();
688 if existing.last_run_sequence.is_none() {
689 existing.state = "declared".to_string();
690 }
691 existing.clone()
692 } else {
693 let job = PhysicalAnalyticsJob {
694 id: job_id.clone(),
695 kind: kind.clone(),
696 state: "declared".to_string(),
697 projection: projection.clone(),
698 created_at_unix_ms: now,
699 updated_at_unix_ms: now,
700 last_run_sequence: None,
701 metadata: metadata_entries.clone(),
702 };
703 metadata.analytics_jobs.push(job.clone());
704 job
705 };
706
707 metadata
708 .analytics_jobs
709 .sort_by(|left, right| left.id.cmp(&right.id));
710 job
711 })
712 }
713
714 pub fn record_analytics_job(
716 &self,
717 kind: impl Into<String>,
718 projection: Option<String>,
719 metadata_entries: BTreeMap<String, String>,
720 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
721 let kind = kind.into();
722 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
723 if let Some(projection_name) = projection.as_deref() {
724 if !self.graph_projection_is_declared(projection_name) {
725 return Err(format!(
726 "graph projection '{projection_name}' is not declared in physical metadata"
727 )
728 .into());
729 }
730 if !self.graph_projection_is_operational(projection_name) {
731 return Err(format!(
732 "graph projection '{projection_name}' is declared but not operationally materialized"
733 )
734 .into());
735 }
736 }
737
738 self.update_physical_metadata(|metadata| {
739 let now = SystemTime::now()
740 .duration_since(UNIX_EPOCH)
741 .unwrap_or_default()
742 .as_millis();
743
744 let existing = metadata
745 .analytics_jobs
746 .iter_mut()
747 .find(|job| job.id == job_id)?;
748 existing.state = "completed".to_string();
749 existing.kind = kind.clone();
750 existing.projection = projection.clone();
751 existing.updated_at_unix_ms = now;
752 existing.last_run_sequence = Some(metadata.superblock.sequence);
753 existing.metadata = metadata_entries.clone();
754 let job = existing.clone();
755
756 metadata
757 .analytics_jobs
758 .sort_by(|left, right| left.id.cmp(&right.id));
759 Some(job)
760 })
761 .and_then(|job| {
762 job.ok_or_else(|| {
763 format!("analytics job '{job_id}' is not declared in physical metadata").into()
764 })
765 })
766 }
767
768 pub fn queue_analytics_job(
770 &self,
771 kind: impl Into<String>,
772 projection: Option<String>,
773 metadata_entries: BTreeMap<String, String>,
774 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
775 let kind = kind.into();
776 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
777 if let Some(projection_name) = projection.as_deref() {
778 if !self.graph_projection_is_declared(projection_name) {
779 return Err(format!(
780 "graph projection '{projection_name}' is not declared in physical metadata"
781 )
782 .into());
783 }
784 if !self.graph_projection_is_operational(projection_name) {
785 return Err(format!(
786 "graph projection '{projection_name}' is declared but not operationally materialized"
787 )
788 .into());
789 }
790 }
791
792 self.update_physical_metadata(|metadata| {
793 let now = SystemTime::now()
794 .duration_since(UNIX_EPOCH)
795 .unwrap_or_default()
796 .as_millis();
797
798 let existing = metadata
799 .analytics_jobs
800 .iter_mut()
801 .find(|job| job.id == job_id)?;
802 existing.state = "queued".to_string();
803 existing.kind = kind.clone();
804 existing.projection = projection.clone();
805 existing.updated_at_unix_ms = now;
806 existing.metadata = metadata_entries.clone();
807 let job = existing.clone();
808
809 metadata
810 .analytics_jobs
811 .sort_by(|left, right| left.id.cmp(&right.id));
812 Some(job)
813 })
814 .and_then(|job| {
815 job.ok_or_else(|| {
816 format!("analytics job '{job_id}' is not declared in physical metadata").into()
817 })
818 })
819 }
820
821 pub fn start_analytics_job(
823 &self,
824 kind: impl Into<String>,
825 projection: Option<String>,
826 metadata_entries: BTreeMap<String, String>,
827 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
828 let kind = kind.into();
829 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
830 if let Some(projection_name) = projection.as_deref() {
831 if !self.graph_projection_is_declared(projection_name) {
832 return Err(format!(
833 "graph projection '{projection_name}' is not declared in physical metadata"
834 )
835 .into());
836 }
837 if !self.graph_projection_is_operational(projection_name) {
838 return Err(format!(
839 "graph projection '{projection_name}' is declared but not operationally materialized"
840 )
841 .into());
842 }
843 }
844
845 self.update_physical_metadata(|metadata| {
846 let now = SystemTime::now()
847 .duration_since(UNIX_EPOCH)
848 .unwrap_or_default()
849 .as_millis();
850
851 let existing = metadata
852 .analytics_jobs
853 .iter_mut()
854 .find(|job| job.id == job_id)?;
855 existing.state = "running".to_string();
856 existing.kind = kind.clone();
857 existing.projection = projection.clone();
858 existing.updated_at_unix_ms = now;
859 existing.metadata = metadata_entries.clone();
860 let job = existing.clone();
861
862 metadata
863 .analytics_jobs
864 .sort_by(|left, right| left.id.cmp(&right.id));
865 Some(job)
866 })
867 .and_then(|job| {
868 job.ok_or_else(|| {
869 format!("analytics job '{job_id}' is not declared in physical metadata").into()
870 })
871 })
872 }
873
874 pub fn fail_analytics_job(
876 &self,
877 kind: impl Into<String>,
878 projection: Option<String>,
879 metadata_entries: BTreeMap<String, String>,
880 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
881 let kind = kind.into();
882 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
883
884 self.update_physical_metadata(|metadata| {
885 let now = SystemTime::now()
886 .duration_since(UNIX_EPOCH)
887 .unwrap_or_default()
888 .as_millis();
889
890 let existing = metadata
891 .analytics_jobs
892 .iter_mut()
893 .find(|job| job.id == job_id)?;
894 existing.state = "failed".to_string();
895 existing.kind = kind.clone();
896 existing.projection = projection.clone();
897 existing.updated_at_unix_ms = now;
898 existing.metadata = metadata_entries.clone();
899 let job = existing.clone();
900
901 metadata
902 .analytics_jobs
903 .sort_by(|left, right| left.id.cmp(&right.id));
904 Some(job)
905 })
906 .and_then(|job| {
907 job.ok_or_else(|| {
908 format!("analytics job '{job_id}' is not declared in physical metadata").into()
909 })
910 })
911 }
912
913 pub fn mark_analytics_job_stale(
915 &self,
916 kind: impl Into<String>,
917 projection: Option<String>,
918 metadata_entries: BTreeMap<String, String>,
919 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
920 let kind = kind.into();
921 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
922
923 self.update_physical_metadata(|metadata| {
924 let now = SystemTime::now()
925 .duration_since(UNIX_EPOCH)
926 .unwrap_or_default()
927 .as_millis();
928
929 let existing = metadata
930 .analytics_jobs
931 .iter_mut()
932 .find(|job| job.id == job_id)?;
933 existing.state = "stale".to_string();
934 existing.kind = kind.clone();
935 existing.projection = projection.clone();
936 existing.updated_at_unix_ms = now;
937 existing.metadata = metadata_entries.clone();
938 let job = existing.clone();
939
940 metadata
941 .analytics_jobs
942 .sort_by(|left, right| left.id.cmp(&right.id));
943 Some(job)
944 })
945 .and_then(|job| {
946 job.ok_or_else(|| {
947 format!("analytics job '{job_id}' is not declared in physical metadata").into()
948 })
949 })
950 }
951
952 pub fn create_named_export(
954 &self,
955 name: impl Into<String>,
956 ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
957 let name = name.into();
958 if self.options.mode != StorageMode::Persistent {
959 return Err("exports require persistent mode".into());
960 }
961 let Some(path) = self.path() else {
962 return Err("database path is not available".into());
963 };
964
965 self.flush()?;
966
967 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
968 let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
969 let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
970 let export_metadata_binary_path =
971 PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
972
973 fs::copy(path, &export_data_path)?;
974
975 let descriptor = ExportDescriptor {
976 name: name.clone(),
977 created_at_unix_ms: SystemTime::now()
978 .duration_since(UNIX_EPOCH)
979 .unwrap_or_default()
980 .as_millis(),
981 snapshot_id: metadata
982 .snapshots
983 .last()
984 .map(|snapshot| snapshot.snapshot_id),
985 superblock_sequence: metadata.superblock.sequence,
986 data_path: export_data_path.display().to_string(),
987 metadata_path: export_metadata_path.display().to_string(),
988 collection_count: metadata.catalog.total_collections,
989 total_entities: metadata.catalog.total_entities,
990 };
991
992 metadata
993 .exports
994 .retain(|export| export.name != descriptor.name);
995 metadata.exports.push(descriptor.clone());
996 self.prune_export_registry(&mut metadata.exports);
997 metadata.save_for_data_path(path)?;
998 metadata.save_to_binary_path(&export_metadata_binary_path)?;
999 metadata.save_to_path(&export_metadata_path)?;
1000
1001 Ok(descriptor)
1002 }
1003
1004 pub fn set_index_enabled(
1006 &self,
1007 name: &str,
1008 enabled: bool,
1009 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1010 let Some(status) = self.index_status(name) else {
1011 return Err(format!("index '{name}' is not present in catalog status").into());
1012 };
1013 if !status.declared {
1014 return Err(format!("index '{name}' is not declared in physical metadata").into());
1015 }
1016 self.update_physical_metadata(|metadata| {
1017 let now = SystemTime::now()
1018 .duration_since(UNIX_EPOCH)
1019 .unwrap_or_default()
1020 .as_millis();
1021 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1022 index.enabled = enabled;
1023 if !enabled {
1024 index.build_state = "disabled".to_string();
1025 } else if index.build_state == "disabled" {
1026 index.build_state = if index.artifact_root_page.is_some() {
1027 "ready".to_string()
1028 } else {
1029 "declared-unbuilt".to_string()
1030 };
1031 }
1032 index.last_refresh_ms = Some(now);
1033 return Some(index.clone());
1034 }
1035 None
1036 })
1037 }
1038
1039 pub fn mark_index_building(
1041 &self,
1042 name: &str,
1043 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1044 let Some(status) = self.index_status(name) else {
1045 return Err(format!("index '{name}' is not present in catalog status").into());
1046 };
1047 if !status.declared {
1048 return Err(format!("index '{name}' is not declared in physical metadata").into());
1049 }
1050 if status.lifecycle_state == "disabled" {
1051 return Err(format!("index '{name}' is disabled").into());
1052 }
1053 self.update_physical_metadata(|metadata| {
1054 let now = SystemTime::now()
1055 .duration_since(UNIX_EPOCH)
1056 .unwrap_or_default()
1057 .as_millis();
1058 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1059 index.build_state = "building".to_string();
1060 index.last_refresh_ms = Some(now);
1061 return Some(index.clone());
1062 }
1063 None
1064 })
1065 }
1066
1067 pub fn fail_index(
1069 &self,
1070 name: &str,
1071 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1072 let Some(status) = self.index_status(name) else {
1073 return Err(format!("index '{name}' is not present in catalog status").into());
1074 };
1075 if !status.declared {
1076 return Err(format!("index '{name}' is not declared in physical metadata").into());
1077 }
1078 self.update_physical_metadata(|metadata| {
1079 let now = SystemTime::now()
1080 .duration_since(UNIX_EPOCH)
1081 .unwrap_or_default()
1082 .as_millis();
1083 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1084 index.build_state = "failed".to_string();
1085 index.last_refresh_ms = Some(now);
1086 return Some(index.clone());
1087 }
1088 None
1089 })
1090 }
1091
1092 pub fn mark_index_stale(
1094 &self,
1095 name: &str,
1096 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1097 let Some(status) = self.index_status(name) else {
1098 return Err(format!("index '{name}' is not present in catalog status").into());
1099 };
1100 if !status.declared {
1101 return Err(format!("index '{name}' is not declared in physical metadata").into());
1102 }
1103 self.update_physical_metadata(|metadata| {
1104 let now = SystemTime::now()
1105 .duration_since(UNIX_EPOCH)
1106 .unwrap_or_default()
1107 .as_millis();
1108 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1109 index.build_state = if index.enabled {
1110 "stale".to_string()
1111 } else {
1112 "disabled".to_string()
1113 };
1114 index.last_refresh_ms = Some(now);
1115 return Some(index.clone());
1116 }
1117 None
1118 })
1119 }
1120
1121 pub fn mark_index_ready(
1123 &self,
1124 name: &str,
1125 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1126 self.warmup_index(name)
1127 }
1128
1129 pub fn warmup_index(
1131 &self,
1132 name: &str,
1133 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1134 let Some(status) = self.index_status(name) else {
1135 return Err(format!("index '{name}' is not present in catalog status").into());
1136 };
1137 if !status.declared {
1138 return Err(format!("index '{name}' is not declared in physical metadata").into());
1139 }
1140 if status.lifecycle_state == "disabled" {
1141 return Err(format!("index '{name}' is disabled").into());
1142 }
1143 if !status.operational {
1144 return Err(
1145 format!("index '{name}' is declared but not operationally materialized").into(),
1146 );
1147 }
1148 let warmed_artifact = self
1149 .physical_indexes()
1150 .into_iter()
1151 .find(|index| index.name == name)
1152 .map(|mut index| {
1153 self.warmup_native_vector_artifact_for_index(&index)?;
1154 self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1155 Ok::<_, String>(index)
1156 })
1157 .transpose()
1158 .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1159
1160 self.update_physical_metadata(|metadata| {
1161 let now = SystemTime::now()
1162 .duration_since(UNIX_EPOCH)
1163 .unwrap_or_default()
1164 .as_millis();
1165 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1166 if let Some(warmed) = warmed_artifact.as_ref() {
1167 index.entries = warmed.entries;
1168 index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1169 index.backend = warmed.backend.clone();
1170 index.build_state = "ready".to_string();
1171 }
1172 index.last_refresh_ms = Some(now);
1173 return Some(index.clone());
1174 }
1175 None
1176 })
1177 }
1178
1179 pub fn rebuild_index_registry(
1181 &self,
1182 collection: Option<&str>,
1183 ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1184 let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1185 self.update_physical_metadata(|metadata| {
1186 let now = SystemTime::now()
1187 .duration_since(UNIX_EPOCH)
1188 .unwrap_or_default()
1189 .as_millis();
1190
1191 let mut affected = Vec::new();
1192 let declared = metadata.indexes.clone();
1193 for declared_index in declared {
1194 let matches_collection = collection.is_none_or(|collection_name| {
1195 declared_index.collection.as_deref() == Some(collection_name)
1196 });
1197 if !matches_collection {
1198 continue;
1199 }
1200
1201 let mut rebuilt = fresh
1202 .iter()
1203 .find(|index| index.name == declared_index.name)
1204 .cloned()
1205 .unwrap_or_else(|| {
1206 let mut index = declared_index.clone();
1207 index.build_state = "declared-unbuilt".to_string();
1208 index
1209 });
1210 rebuilt.enabled = declared_index.enabled;
1211 rebuilt.artifact_kind = rebuilt
1212 .artifact_kind
1213 .or_else(|| declared_index.artifact_kind.clone());
1214 rebuilt.artifact_root_page = rebuilt
1215 .artifact_root_page
1216 .or(declared_index.artifact_root_page);
1217 rebuilt.artifact_checksum = rebuilt
1218 .artifact_checksum
1219 .or(declared_index.artifact_checksum);
1220 rebuilt.build_state =
1221 Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1222 rebuilt.last_refresh_ms = Some(now);
1223
1224 if let Some(existing) = metadata
1225 .indexes
1226 .iter_mut()
1227 .find(|index| index.name == rebuilt.name)
1228 {
1229 *existing = rebuilt.clone();
1230 } else {
1231 metadata.indexes.push(rebuilt.clone());
1232 }
1233
1234 affected.push(rebuilt);
1235 }
1236
1237 affected
1238 })
1239 }
1240
1241 fn finalize_rebuilt_index_build_state(
1242 declared: &PhysicalIndexState,
1243 rebuilt: &PhysicalIndexState,
1244 ) -> String {
1245 if !rebuilt.enabled {
1246 return "disabled".to_string();
1247 }
1248
1249 if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1250 return "failed".to_string();
1251 }
1252
1253 let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1254 if native_artifact_family {
1255 if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1256 return "ready".to_string();
1257 }
1258 if declared.artifact_root_page.is_some()
1259 || declared.artifact_checksum.is_some()
1260 || declared.artifact_kind.is_some()
1261 {
1262 return "stale".to_string();
1263 }
1264 return "declared-unbuilt".to_string();
1265 }
1266
1267 if rebuilt.entries > 0 {
1268 return "ready".to_string();
1269 }
1270
1271 if matches!(
1272 declared.build_state.as_str(),
1273 "stale" | "artifact-published" | "registry-loaded"
1274 ) {
1275 return "stale".to_string();
1276 }
1277
1278 "declared-unbuilt".to_string()
1279 }
1280}