1use super::*;
2
3impl RedDB {
4 pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5 self.physical_metadata()
6 .map(|metadata| metadata.tree_definitions)
7 .unwrap_or_default()
8 }
9
10 pub fn tree_definition(
11 &self,
12 collection: &str,
13 name: &str,
14 ) -> Option<crate::physical::PhysicalTreeDefinition> {
15 self.tree_definitions()
16 .into_iter()
17 .find(|definition| definition.collection == collection && definition.name == name)
18 }
19
20 pub fn save_tree_definition(
21 &self,
22 definition: crate::physical::PhysicalTreeDefinition,
23 ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24 self.update_physical_metadata(|metadata| {
25 if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26 existing.collection == definition.collection && existing.name == definition.name
27 }) {
28 *existing = definition.clone();
29 } else {
30 metadata.tree_definitions.push(definition.clone());
31 }
32 metadata.tree_definitions.sort_by(|left, right| {
33 left.collection
34 .cmp(&right.collection)
35 .then_with(|| left.name.cmp(&right.name))
36 });
37 definition.clone()
38 })
39 }
40
41 pub fn remove_tree_definition(
42 &self,
43 collection: &str,
44 name: &str,
45 ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46 self.update_physical_metadata(|metadata| {
47 metadata
48 .tree_definitions
49 .iter()
50 .position(|definition| {
51 definition.collection == collection && definition.name == name
52 })
53 .map(|index| metadata.tree_definitions.remove(index))
54 })
55 }
56
57 pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58 self.collection_ttl_defaults_ms
59 .read()
60 .ok()
61 .and_then(|defaults| defaults.get(collection).copied())
62 }
63
64 pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66 defaults.insert(collection.into(), ttl_ms);
67 }
68 }
69
70 pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72 defaults.remove(collection);
73 }
74 }
75
76 pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77 self.contract_cache_map()
78 .values()
79 .map(|arc| (**arc).clone())
80 .collect()
81 }
82
83 pub fn collection_contract(
84 &self,
85 collection: &str,
86 ) -> Option<crate::physical::CollectionContract> {
87 self.contract_cache_map()
88 .get(collection)
89 .map(|arc| (**arc).clone())
90 }
91
92 pub fn collection_contract_arc(
97 &self,
98 collection: &str,
99 ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100 self.contract_cache_map()
101 .get(collection)
102 .map(std::sync::Arc::clone)
103 }
104
105 fn contract_cache_map(
110 &self,
111 ) -> std::sync::Arc<
112 std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113 > {
114 if let Ok(guard) = self.collection_contract_cache.read() {
115 if let Some(map) = guard.as_ref() {
116 return std::sync::Arc::clone(map);
117 }
118 }
119 let contracts: Vec<crate::physical::CollectionContract> = self
120 .physical_metadata()
121 .map(|metadata| metadata.collection_contracts)
122 .unwrap_or_default();
123 let map: std::collections::HashMap<_, _> = contracts
124 .into_iter()
125 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126 .collect();
127 let arc = std::sync::Arc::new(map);
128 if let Ok(mut guard) = self.collection_contract_cache.write() {
129 *guard = Some(std::sync::Arc::clone(&arc));
130 }
131 arc
132 }
133
134 pub(crate) fn invalidate_collection_contract_cache(&self) {
135 if let Ok(mut guard) = self.collection_contract_cache.write() {
136 *guard = None;
137 }
138 }
139
140 pub fn save_collection_contract(
141 &self,
142 contract: crate::physical::CollectionContract,
143 ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145 if let Some(ttl_ms) = contract.default_ttl_ms {
146 defaults.insert(contract.name.clone(), ttl_ms);
147 } else {
148 defaults.remove(&contract.name);
149 }
150 }
151
152 self.store()
153 .context_index()
154 .set_collection_enabled(&contract.name, contract.context_index_enabled);
155 if let Some(manager) = self.store().get_collection(&contract.name) {
156 let columns = contract
157 .declared_columns
158 .iter()
159 .map(|column| column.name.clone())
160 .collect();
161 manager.set_column_schema_if_empty(columns);
162 }
163
164 self.invalidate_collection_contract_cache();
165
166 self.update_physical_metadata(|metadata| {
167 if let Some(existing) = metadata
168 .collection_contracts
169 .iter_mut()
170 .find(|existing| existing.name == contract.name)
171 {
172 *existing = contract.clone();
173 } else {
174 metadata.collection_contracts.push(contract.clone());
175 }
176 metadata
177 .collection_contracts
178 .sort_by(|left, right| left.name.cmp(&right.name));
179
180 if let Some(ttl_ms) = contract.default_ttl_ms {
181 metadata
182 .collection_ttl_defaults_ms
183 .insert(contract.name.clone(), ttl_ms);
184 } else {
185 metadata.collection_ttl_defaults_ms.remove(&contract.name);
186 }
187
188 contract.clone()
189 })
190 }
191
192 pub fn remove_collection_contract(
193 &self,
194 collection: &str,
195 ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
196 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
197 defaults.remove(collection);
198 }
199
200 self.store()
201 .context_index()
202 .set_collection_enabled(collection, false);
203
204 self.invalidate_collection_contract_cache();
205
206 self.update_physical_metadata(|metadata| {
207 let removed = metadata
208 .collection_contracts
209 .iter()
210 .position(|contract| contract.name == collection)
211 .map(|index| metadata.collection_contracts.remove(index));
212 metadata.collection_ttl_defaults_ms.remove(collection);
213 metadata
214 .indexes
215 .retain(|index| index.collection.as_deref() != Some(collection));
216 removed
217 })
218 }
219
220 pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
221 self.collection_ttl_defaults_ms
222 .read()
223 .map(|defaults| {
224 defaults
225 .iter()
226 .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
227 .collect()
228 })
229 .unwrap_or_default()
230 }
231
232 pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
233 let metadata = self.physical_metadata();
234 let defaults = metadata
235 .as_ref()
236 .map(|m| m.collection_ttl_defaults_ms.clone())
237 .unwrap_or_default();
238
239 if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
240 current.clear();
241 current.extend(defaults);
242 }
243
244 if let Some(metadata) = metadata {
245 let store = self.store();
246 let index = store.context_index();
247 for contract in &metadata.collection_contracts {
248 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
249 }
250 }
251 }
252
253 pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
261 let registry = self.hypertables();
262 if registry.is_empty() {
263 return Vec::new();
264 }
265 use std::collections::BTreeMap;
266 let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
267 BTreeMap::new();
268 for chunk in registry.snapshot_chunks() {
269 chunks_by_table
270 .entry(chunk.id.hypertable.clone())
271 .or_default()
272 .push(crate::physical::PhysicalHypertableChunk {
273 start_ns: chunk.id.start_ns,
274 end_ns_exclusive: chunk.end_ns_exclusive,
275 row_count: chunk.row_count,
276 min_ts_ns: chunk.min_ts_ns,
277 max_ts_ns: chunk.max_ts_ns,
278 sealed: chunk.sealed,
279 ttl_override_ns: chunk.ttl_override_ns,
280 columnar_page: chunk.columnar_page,
281 });
282 }
283 registry
284 .list()
285 .into_iter()
286 .map(|spec| crate::physical::PhysicalHypertable {
287 chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
288 name: spec.name,
289 time_column: spec.time_column,
290 chunk_interval_ns: spec.chunk_interval_ns,
291 default_ttl_ns: spec.default_ttl_ns,
292 })
293 .collect()
294 }
295
296 pub(crate) fn load_hypertables_from_metadata(&self) {
302 let Some(metadata) = self.physical_metadata() else {
303 return;
304 };
305 if metadata.hypertables.is_empty() {
306 return;
307 }
308 let registry = self.hypertables();
309 for hypertable in &metadata.hypertables {
310 let mut spec = crate::storage::timeseries::HypertableSpec::new(
311 hypertable.name.clone(),
312 hypertable.time_column.clone(),
313 hypertable.chunk_interval_ns,
314 );
315 if let Some(ttl) = hypertable.default_ttl_ns {
316 spec = spec.with_ttl_ns(ttl);
317 }
318 registry.register(spec);
319 for chunk in &hypertable.chunks {
320 registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
321 id: crate::storage::timeseries::ChunkId {
322 hypertable: hypertable.name.clone(),
323 start_ns: chunk.start_ns,
324 },
325 end_ns_exclusive: chunk.end_ns_exclusive,
326 row_count: chunk.row_count,
327 min_ts_ns: chunk.min_ts_ns,
328 max_ts_ns: chunk.max_ts_ns,
329 sealed: chunk.sealed,
330 ttl_override_ns: chunk.ttl_override_ns,
331 columnar_page: chunk.columnar_page,
332 });
333 }
334 }
335 }
336
337 pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
338 self.store.run_maintenance()?;
339 self.persist_metadata()?;
340 Ok(())
341 }
342
343 pub fn metadata_path(&self) -> Option<PathBuf> {
345 self.path
346 .as_ref()
347 .map(|path| PhysicalMetadataFile::metadata_path_for(path))
348 }
349
350 pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
352 self.load_or_bootstrap_physical_metadata(!self.options.read_only)
353 .ok()
354 }
355
356 pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
358 let indexes = self
359 .physical_metadata()
360 .map(|metadata| metadata.indexes)
361 .filter(|indexes| !indexes.is_empty())
362 .or_else(|| {
363 self.native_physical_state()
364 .map(|state| self.physical_index_state_from_native_state(&state, None))
365 })
366 .unwrap_or_else(|| self.physical_index_state());
367 self.reconcile_index_states_with_native_artifacts(indexes)
368 }
369
370 pub fn exports(&self) -> Vec<ExportDescriptor> {
372 self.physical_metadata()
373 .map(|metadata| metadata.exports)
374 .or_else(|| {
375 self.native_physical_state()
376 .map(|state| self.exports_from_native_state(&state))
377 })
378 .unwrap_or_default()
379 }
380
381 pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
383 self.physical_metadata()
384 .map(|metadata| metadata.snapshots)
385 .or_else(|| {
386 self.native_physical_state()
387 .map(|state| self.snapshots_from_native_state(&state))
388 })
389 .unwrap_or_default()
390 }
391
392 pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
394 self.physical_metadata()
395 .map(|metadata| metadata.graph_projections)
396 .or_else(|| {
397 self.native_physical_state()
398 .map(|state| self.graph_projections_from_native_state(&state))
399 })
400 .unwrap_or_default()
401 }
402
403 pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
405 self.catalog_model_snapshot().declared_graph_projections
406 }
407
408 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
410 self.graph_projections()
411 .into_iter()
412 .filter(|projection| {
413 projection.last_materialized_sequence.is_some()
414 || matches!(projection.state.as_str(), "materialized" | "stale")
415 })
416 .collect()
417 }
418
419 pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
421 self.physical_metadata()
422 .map(|metadata| metadata.analytics_jobs)
423 .or_else(|| {
424 self.native_physical_state()
425 .map(|state| self.analytics_jobs_from_native_state(&state))
426 })
427 .unwrap_or_default()
428 }
429
430 pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
432 self.catalog_model_snapshot().declared_analytics_jobs
433 }
434
435 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
437 self.analytics_jobs()
438 .into_iter()
439 .filter(|job| {
440 job.last_run_sequence.is_some()
441 || matches!(
442 job.state.as_str(),
443 "running" | "completed" | "failed" | "queued" | "stale"
444 )
445 })
446 .collect()
447 }
448
449 pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
451 self.catalog_model_snapshot().declared_indexes
452 }
453
454 pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
456 self.catalog_model_snapshot().operational_indexes
457 }
458
459 pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
461 self.catalog_model_snapshot().index_statuses
462 }
463
464 pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
466 self.catalog_model_snapshot()
467 .index_statuses
468 .into_iter()
469 .find(|status| status.name == name)
470 }
471
472 pub fn save_graph_projection(
474 &self,
475 name: impl Into<String>,
476 node_labels: Vec<String>,
477 node_types: Vec<String>,
478 edge_labels: Vec<String>,
479 source: impl Into<String>,
480 ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
481 let name = name.into();
482 let source = source.into();
483 self.update_physical_metadata(|metadata| {
484 let now = SystemTime::now()
485 .duration_since(UNIX_EPOCH)
486 .unwrap_or_default()
487 .as_millis();
488 let projection = if let Some(existing) = metadata
489 .graph_projections
490 .iter_mut()
491 .find(|projection| projection.name == name)
492 {
493 existing.updated_at_unix_ms = now;
494 existing.state = "declared".to_string();
495 existing.source = source.clone();
496 existing.node_labels = node_labels.clone();
497 existing.node_types = node_types.clone();
498 existing.edge_labels = edge_labels.clone();
499 existing.last_materialized_sequence = None;
500 existing.clone()
501 } else {
502 let projection = PhysicalGraphProjection {
503 name: name.clone(),
504 created_at_unix_ms: now,
505 updated_at_unix_ms: now,
506 state: "declared".to_string(),
507 source: source.clone(),
508 node_labels: node_labels.clone(),
509 node_types: node_types.clone(),
510 edge_labels: edge_labels.clone(),
511 last_materialized_sequence: None,
512 };
513 metadata.graph_projections.push(projection.clone());
514 projection
515 };
516
517 Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
518
519 metadata
520 .graph_projections
521 .sort_by(|left, right| left.name.cmp(&right.name));
522 projection
523 })
524 }
525
526 pub fn materialize_graph_projection(
528 &self,
529 name: &str,
530 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
531 self.update_physical_metadata(|metadata| {
532 let now = SystemTime::now()
533 .duration_since(UNIX_EPOCH)
534 .unwrap_or_default()
535 .as_millis();
536 let idx = metadata
537 .graph_projections
538 .iter()
539 .position(|projection| projection.name == name);
540 if let Some(idx) = idx {
541 metadata.graph_projections[idx].updated_at_unix_ms = now;
542 metadata.graph_projections[idx].state = "materialized".to_string();
543 metadata.graph_projections[idx].last_materialized_sequence =
544 Some(metadata.superblock.sequence);
545 let result = metadata.graph_projections[idx].clone();
546 Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
547 return Some(result);
548 }
549 None
550 })
551 }
552
553 pub fn mark_graph_projection_materializing(
555 &self,
556 name: &str,
557 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
558 self.update_physical_metadata(|metadata| {
559 let now = SystemTime::now()
560 .duration_since(UNIX_EPOCH)
561 .unwrap_or_default()
562 .as_millis();
563 let idx = metadata
564 .graph_projections
565 .iter()
566 .position(|projection| projection.name == name);
567 if let Some(idx) = idx {
568 metadata.graph_projections[idx].updated_at_unix_ms = now;
569 metadata.graph_projections[idx].state = "materializing".to_string();
570 metadata.graph_projections[idx].last_materialized_sequence = None;
571 let result = metadata.graph_projections[idx].clone();
572 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
573 return Some(result);
574 }
575 None
576 })
577 }
578
579 pub fn fail_graph_projection(
581 &self,
582 name: &str,
583 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
584 self.update_physical_metadata(|metadata| {
585 let now = SystemTime::now()
586 .duration_since(UNIX_EPOCH)
587 .unwrap_or_default()
588 .as_millis();
589 let idx = metadata
590 .graph_projections
591 .iter()
592 .position(|projection| projection.name == name);
593 if let Some(idx) = idx {
594 metadata.graph_projections[idx].updated_at_unix_ms = now;
595 metadata.graph_projections[idx].state = "failed".to_string();
596 metadata.graph_projections[idx].last_materialized_sequence = None;
597 let result = metadata.graph_projections[idx].clone();
598 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
599 return Some(result);
600 }
601 None
602 })
603 }
604
605 pub fn mark_graph_projection_stale(
607 &self,
608 name: &str,
609 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
610 self.update_physical_metadata(|metadata| {
611 let now = SystemTime::now()
612 .duration_since(UNIX_EPOCH)
613 .unwrap_or_default()
614 .as_millis();
615 let idx = metadata
616 .graph_projections
617 .iter()
618 .position(|projection| projection.name == name);
619 if let Some(idx) = idx {
620 metadata.graph_projections[idx].updated_at_unix_ms = now;
621 metadata.graph_projections[idx].state = "stale".to_string();
622 let result = metadata.graph_projections[idx].clone();
623 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
624 return Some(result);
625 }
626 None
627 })
628 }
629
630 fn mark_projection_dependent_jobs_stale(
631 metadata: &mut PhysicalMetadataFile,
632 projection_name: &str,
633 now: u128,
634 ) {
635 for job in metadata.analytics_jobs.iter_mut() {
636 if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
637 job.state = "stale".to_string();
638 job.updated_at_unix_ms = now;
639 }
640 }
641 }
642
643 fn rearm_projection_dependent_jobs_declared(
644 metadata: &mut PhysicalMetadataFile,
645 projection_name: &str,
646 now: u128,
647 ) {
648 for job in metadata.analytics_jobs.iter_mut() {
649 if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
650 job.state = "declared".to_string();
651 job.last_run_sequence = None;
652 job.updated_at_unix_ms = now;
653 }
654 }
655 }
656
657 pub fn save_analytics_job(
659 &self,
660 kind: impl Into<String>,
661 projection: Option<String>,
662 metadata_entries: BTreeMap<String, String>,
663 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
664 let kind = kind.into();
665 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
666 if let Some(projection_name) = projection.as_deref() {
667 if !self.graph_projection_is_declared(projection_name) {
668 return Err(format!(
669 "graph projection '{projection_name}' is not declared in physical metadata"
670 )
671 .into());
672 }
673 }
674
675 self.update_physical_metadata(|metadata| {
676 let now = SystemTime::now()
677 .duration_since(UNIX_EPOCH)
678 .unwrap_or_default()
679 .as_millis();
680
681 let job = if let Some(existing) = metadata
682 .analytics_jobs
683 .iter_mut()
684 .find(|job| job.id == job_id)
685 {
686 existing.kind = kind.clone();
687 existing.projection = projection.clone();
688 existing.updated_at_unix_ms = now;
689 existing.metadata = metadata_entries.clone();
690 if existing.last_run_sequence.is_none() {
691 existing.state = "declared".to_string();
692 }
693 existing.clone()
694 } else {
695 let job = PhysicalAnalyticsJob {
696 id: job_id.clone(),
697 kind: kind.clone(),
698 state: "declared".to_string(),
699 projection: projection.clone(),
700 created_at_unix_ms: now,
701 updated_at_unix_ms: now,
702 last_run_sequence: None,
703 metadata: metadata_entries.clone(),
704 };
705 metadata.analytics_jobs.push(job.clone());
706 job
707 };
708
709 metadata
710 .analytics_jobs
711 .sort_by(|left, right| left.id.cmp(&right.id));
712 job
713 })
714 }
715
716 pub fn record_analytics_job(
718 &self,
719 kind: impl Into<String>,
720 projection: Option<String>,
721 metadata_entries: BTreeMap<String, String>,
722 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
723 let kind = kind.into();
724 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
725 if let Some(projection_name) = projection.as_deref() {
726 if !self.graph_projection_is_declared(projection_name) {
727 return Err(format!(
728 "graph projection '{projection_name}' is not declared in physical metadata"
729 )
730 .into());
731 }
732 if !self.graph_projection_is_operational(projection_name) {
733 return Err(format!(
734 "graph projection '{projection_name}' is declared but not operationally materialized"
735 )
736 .into());
737 }
738 }
739
740 self.update_physical_metadata(|metadata| {
741 let now = SystemTime::now()
742 .duration_since(UNIX_EPOCH)
743 .unwrap_or_default()
744 .as_millis();
745
746 let existing = metadata
747 .analytics_jobs
748 .iter_mut()
749 .find(|job| job.id == job_id)?;
750 existing.state = "completed".to_string();
751 existing.kind = kind.clone();
752 existing.projection = projection.clone();
753 existing.updated_at_unix_ms = now;
754 existing.last_run_sequence = Some(metadata.superblock.sequence);
755 existing.metadata = metadata_entries.clone();
756 let job = existing.clone();
757
758 metadata
759 .analytics_jobs
760 .sort_by(|left, right| left.id.cmp(&right.id));
761 Some(job)
762 })
763 .and_then(|job| {
764 job.ok_or_else(|| {
765 format!("analytics job '{job_id}' is not declared in physical metadata").into()
766 })
767 })
768 }
769
770 pub fn queue_analytics_job(
772 &self,
773 kind: impl Into<String>,
774 projection: Option<String>,
775 metadata_entries: BTreeMap<String, String>,
776 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
777 let kind = kind.into();
778 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
779 if let Some(projection_name) = projection.as_deref() {
780 if !self.graph_projection_is_declared(projection_name) {
781 return Err(format!(
782 "graph projection '{projection_name}' is not declared in physical metadata"
783 )
784 .into());
785 }
786 if !self.graph_projection_is_operational(projection_name) {
787 return Err(format!(
788 "graph projection '{projection_name}' is declared but not operationally materialized"
789 )
790 .into());
791 }
792 }
793
794 self.update_physical_metadata(|metadata| {
795 let now = SystemTime::now()
796 .duration_since(UNIX_EPOCH)
797 .unwrap_or_default()
798 .as_millis();
799
800 let existing = metadata
801 .analytics_jobs
802 .iter_mut()
803 .find(|job| job.id == job_id)?;
804 existing.state = "queued".to_string();
805 existing.kind = kind.clone();
806 existing.projection = projection.clone();
807 existing.updated_at_unix_ms = now;
808 existing.metadata = metadata_entries.clone();
809 let job = existing.clone();
810
811 metadata
812 .analytics_jobs
813 .sort_by(|left, right| left.id.cmp(&right.id));
814 Some(job)
815 })
816 .and_then(|job| {
817 job.ok_or_else(|| {
818 format!("analytics job '{job_id}' is not declared in physical metadata").into()
819 })
820 })
821 }
822
823 pub fn start_analytics_job(
825 &self,
826 kind: impl Into<String>,
827 projection: Option<String>,
828 metadata_entries: BTreeMap<String, String>,
829 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
830 let kind = kind.into();
831 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
832 if let Some(projection_name) = projection.as_deref() {
833 if !self.graph_projection_is_declared(projection_name) {
834 return Err(format!(
835 "graph projection '{projection_name}' is not declared in physical metadata"
836 )
837 .into());
838 }
839 if !self.graph_projection_is_operational(projection_name) {
840 return Err(format!(
841 "graph projection '{projection_name}' is declared but not operationally materialized"
842 )
843 .into());
844 }
845 }
846
847 self.update_physical_metadata(|metadata| {
848 let now = SystemTime::now()
849 .duration_since(UNIX_EPOCH)
850 .unwrap_or_default()
851 .as_millis();
852
853 let existing = metadata
854 .analytics_jobs
855 .iter_mut()
856 .find(|job| job.id == job_id)?;
857 existing.state = "running".to_string();
858 existing.kind = kind.clone();
859 existing.projection = projection.clone();
860 existing.updated_at_unix_ms = now;
861 existing.metadata = metadata_entries.clone();
862 let job = existing.clone();
863
864 metadata
865 .analytics_jobs
866 .sort_by(|left, right| left.id.cmp(&right.id));
867 Some(job)
868 })
869 .and_then(|job| {
870 job.ok_or_else(|| {
871 format!("analytics job '{job_id}' is not declared in physical metadata").into()
872 })
873 })
874 }
875
876 pub fn fail_analytics_job(
878 &self,
879 kind: impl Into<String>,
880 projection: Option<String>,
881 metadata_entries: BTreeMap<String, String>,
882 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
883 let kind = kind.into();
884 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
885
886 self.update_physical_metadata(|metadata| {
887 let now = SystemTime::now()
888 .duration_since(UNIX_EPOCH)
889 .unwrap_or_default()
890 .as_millis();
891
892 let existing = metadata
893 .analytics_jobs
894 .iter_mut()
895 .find(|job| job.id == job_id)?;
896 existing.state = "failed".to_string();
897 existing.kind = kind.clone();
898 existing.projection = projection.clone();
899 existing.updated_at_unix_ms = now;
900 existing.metadata = metadata_entries.clone();
901 let job = existing.clone();
902
903 metadata
904 .analytics_jobs
905 .sort_by(|left, right| left.id.cmp(&right.id));
906 Some(job)
907 })
908 .and_then(|job| {
909 job.ok_or_else(|| {
910 format!("analytics job '{job_id}' is not declared in physical metadata").into()
911 })
912 })
913 }
914
915 pub fn mark_analytics_job_stale(
917 &self,
918 kind: impl Into<String>,
919 projection: Option<String>,
920 metadata_entries: BTreeMap<String, String>,
921 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
922 let kind = kind.into();
923 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
924
925 self.update_physical_metadata(|metadata| {
926 let now = SystemTime::now()
927 .duration_since(UNIX_EPOCH)
928 .unwrap_or_default()
929 .as_millis();
930
931 let existing = metadata
932 .analytics_jobs
933 .iter_mut()
934 .find(|job| job.id == job_id)?;
935 existing.state = "stale".to_string();
936 existing.kind = kind.clone();
937 existing.projection = projection.clone();
938 existing.updated_at_unix_ms = now;
939 existing.metadata = metadata_entries.clone();
940 let job = existing.clone();
941
942 metadata
943 .analytics_jobs
944 .sort_by(|left, right| left.id.cmp(&right.id));
945 Some(job)
946 })
947 .and_then(|job| {
948 job.ok_or_else(|| {
949 format!("analytics job '{job_id}' is not declared in physical metadata").into()
950 })
951 })
952 }
953
954 pub fn create_named_export(
956 &self,
957 name: impl Into<String>,
958 ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
959 let name = name.into();
960 if self.options.mode != StorageMode::Persistent {
961 return Err("exports require persistent mode".into());
962 }
963 let Some(path) = self.path() else {
964 return Err("database path is not available".into());
965 };
966
967 self.flush()?;
968
969 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
970 let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
971 let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
972 let export_metadata_binary_path =
973 PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
974
975 fs::copy(path, &export_data_path)?;
976
977 let descriptor = ExportDescriptor {
978 name: name.clone(),
979 created_at_unix_ms: SystemTime::now()
980 .duration_since(UNIX_EPOCH)
981 .unwrap_or_default()
982 .as_millis(),
983 snapshot_id: metadata
984 .snapshots
985 .last()
986 .map(|snapshot| snapshot.snapshot_id),
987 superblock_sequence: metadata.superblock.sequence,
988 data_path: export_data_path.display().to_string(),
989 metadata_path: export_metadata_path.display().to_string(),
990 collection_count: metadata.catalog.total_collections,
991 total_entities: metadata.catalog.total_entities,
992 };
993
994 metadata
995 .exports
996 .retain(|export| export.name != descriptor.name);
997 metadata.exports.push(descriptor.clone());
998 self.prune_export_registry(&mut metadata.exports);
999 metadata.save_for_data_path(path)?;
1000 metadata.save_to_binary_path(&export_metadata_binary_path)?;
1001 metadata.save_to_path(&export_metadata_path)?;
1002
1003 Ok(descriptor)
1004 }
1005
1006 pub fn set_index_enabled(
1008 &self,
1009 name: &str,
1010 enabled: bool,
1011 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1012 let Some(status) = self.index_status(name) else {
1013 return Err(format!("index '{name}' is not present in catalog status").into());
1014 };
1015 if !status.declared {
1016 return Err(format!("index '{name}' is not declared in physical metadata").into());
1017 }
1018 self.update_physical_metadata(|metadata| {
1019 let now = SystemTime::now()
1020 .duration_since(UNIX_EPOCH)
1021 .unwrap_or_default()
1022 .as_millis();
1023 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1024 index.enabled = enabled;
1025 if !enabled {
1026 index.build_state = "disabled".to_string();
1027 } else if index.build_state == "disabled" {
1028 index.build_state = if index.artifact_root_page.is_some() {
1029 "ready".to_string()
1030 } else {
1031 "declared-unbuilt".to_string()
1032 };
1033 }
1034 index.last_refresh_ms = Some(now);
1035 return Some(index.clone());
1036 }
1037 None
1038 })
1039 }
1040
1041 pub fn mark_index_building(
1043 &self,
1044 name: &str,
1045 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1046 let Some(status) = self.index_status(name) else {
1047 return Err(format!("index '{name}' is not present in catalog status").into());
1048 };
1049 if !status.declared {
1050 return Err(format!("index '{name}' is not declared in physical metadata").into());
1051 }
1052 if status.lifecycle_state == "disabled" {
1053 return Err(format!("index '{name}' is disabled").into());
1054 }
1055 self.update_physical_metadata(|metadata| {
1056 let now = SystemTime::now()
1057 .duration_since(UNIX_EPOCH)
1058 .unwrap_or_default()
1059 .as_millis();
1060 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1061 index.build_state = "building".to_string();
1062 index.last_refresh_ms = Some(now);
1063 return Some(index.clone());
1064 }
1065 None
1066 })
1067 }
1068
1069 pub fn fail_index(
1071 &self,
1072 name: &str,
1073 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1074 let Some(status) = self.index_status(name) else {
1075 return Err(format!("index '{name}' is not present in catalog status").into());
1076 };
1077 if !status.declared {
1078 return Err(format!("index '{name}' is not declared in physical metadata").into());
1079 }
1080 self.update_physical_metadata(|metadata| {
1081 let now = SystemTime::now()
1082 .duration_since(UNIX_EPOCH)
1083 .unwrap_or_default()
1084 .as_millis();
1085 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1086 index.build_state = "failed".to_string();
1087 index.last_refresh_ms = Some(now);
1088 return Some(index.clone());
1089 }
1090 None
1091 })
1092 }
1093
1094 pub fn mark_index_stale(
1096 &self,
1097 name: &str,
1098 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1099 let Some(status) = self.index_status(name) else {
1100 return Err(format!("index '{name}' is not present in catalog status").into());
1101 };
1102 if !status.declared {
1103 return Err(format!("index '{name}' is not declared in physical metadata").into());
1104 }
1105 self.update_physical_metadata(|metadata| {
1106 let now = SystemTime::now()
1107 .duration_since(UNIX_EPOCH)
1108 .unwrap_or_default()
1109 .as_millis();
1110 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1111 index.build_state = if index.enabled {
1112 "stale".to_string()
1113 } else {
1114 "disabled".to_string()
1115 };
1116 index.last_refresh_ms = Some(now);
1117 return Some(index.clone());
1118 }
1119 None
1120 })
1121 }
1122
1123 pub fn mark_index_ready(
1125 &self,
1126 name: &str,
1127 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1128 self.warmup_index(name)
1129 }
1130
1131 pub fn warmup_index(
1133 &self,
1134 name: &str,
1135 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1136 let Some(status) = self.index_status(name) else {
1137 return Err(format!("index '{name}' is not present in catalog status").into());
1138 };
1139 if !status.declared {
1140 return Err(format!("index '{name}' is not declared in physical metadata").into());
1141 }
1142 if status.lifecycle_state == "disabled" {
1143 return Err(format!("index '{name}' is disabled").into());
1144 }
1145 if !status.operational {
1146 return Err(
1147 format!("index '{name}' is declared but not operationally materialized").into(),
1148 );
1149 }
1150 let warmed_artifact = self
1151 .physical_indexes()
1152 .into_iter()
1153 .find(|index| index.name == name)
1154 .map(|mut index| {
1155 self.warmup_native_vector_artifact_for_index(&index)?;
1156 self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1157 Ok::<_, String>(index)
1158 })
1159 .transpose()
1160 .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1161
1162 self.update_physical_metadata(|metadata| {
1163 let now = SystemTime::now()
1164 .duration_since(UNIX_EPOCH)
1165 .unwrap_or_default()
1166 .as_millis();
1167 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1168 if let Some(warmed) = warmed_artifact.as_ref() {
1169 index.entries = warmed.entries;
1170 index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1171 index.backend = warmed.backend.clone();
1172 index.build_state = "ready".to_string();
1173 }
1174 index.last_refresh_ms = Some(now);
1175 return Some(index.clone());
1176 }
1177 None
1178 })
1179 }
1180
1181 pub fn rebuild_index_registry(
1183 &self,
1184 collection: Option<&str>,
1185 ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1186 let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1187 self.update_physical_metadata(|metadata| {
1188 let now = SystemTime::now()
1189 .duration_since(UNIX_EPOCH)
1190 .unwrap_or_default()
1191 .as_millis();
1192
1193 let mut affected = Vec::new();
1194 let declared = metadata.indexes.clone();
1195 for declared_index in declared {
1196 let matches_collection = collection.is_none_or(|collection_name| {
1197 declared_index.collection.as_deref() == Some(collection_name)
1198 });
1199 if !matches_collection {
1200 continue;
1201 }
1202
1203 let mut rebuilt = fresh
1204 .iter()
1205 .find(|index| index.name == declared_index.name)
1206 .cloned()
1207 .unwrap_or_else(|| {
1208 let mut index = declared_index.clone();
1209 index.build_state = "declared-unbuilt".to_string();
1210 index
1211 });
1212 rebuilt.enabled = declared_index.enabled;
1213 rebuilt.artifact_kind = rebuilt
1214 .artifact_kind
1215 .or_else(|| declared_index.artifact_kind.clone());
1216 rebuilt.artifact_root_page = rebuilt
1217 .artifact_root_page
1218 .or(declared_index.artifact_root_page);
1219 rebuilt.artifact_checksum = rebuilt
1220 .artifact_checksum
1221 .or(declared_index.artifact_checksum);
1222 rebuilt.build_state =
1223 Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1224 rebuilt.last_refresh_ms = Some(now);
1225
1226 if let Some(existing) = metadata
1227 .indexes
1228 .iter_mut()
1229 .find(|index| index.name == rebuilt.name)
1230 {
1231 *existing = rebuilt.clone();
1232 } else {
1233 metadata.indexes.push(rebuilt.clone());
1234 }
1235
1236 affected.push(rebuilt);
1237 }
1238
1239 affected
1240 })
1241 }
1242
1243 fn finalize_rebuilt_index_build_state(
1244 declared: &PhysicalIndexState,
1245 rebuilt: &PhysicalIndexState,
1246 ) -> String {
1247 if !rebuilt.enabled {
1248 return "disabled".to_string();
1249 }
1250
1251 if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1252 return "failed".to_string();
1253 }
1254
1255 let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1256 if native_artifact_family {
1257 if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1258 return "ready".to_string();
1259 }
1260 if declared.artifact_root_page.is_some()
1261 || declared.artifact_checksum.is_some()
1262 || declared.artifact_kind.is_some()
1263 {
1264 return "stale".to_string();
1265 }
1266 return "declared-unbuilt".to_string();
1267 }
1268
1269 if rebuilt.entries > 0 {
1270 return "ready".to_string();
1271 }
1272
1273 if matches!(
1274 declared.build_state.as_str(),
1275 "stale" | "artifact-published" | "registry-loaded"
1276 ) {
1277 return "stale".to_string();
1278 }
1279
1280 "declared-unbuilt".to_string()
1281 }
1282}