1use super::*;
2
3impl RedDB {
4 pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5 self.physical_metadata()
6 .map(|metadata| metadata.tree_definitions)
7 .unwrap_or_default()
8 }
9
10 pub fn tree_definition(
11 &self,
12 collection: &str,
13 name: &str,
14 ) -> Option<crate::physical::PhysicalTreeDefinition> {
15 self.tree_definitions()
16 .into_iter()
17 .find(|definition| definition.collection == collection && definition.name == name)
18 }
19
20 pub fn save_tree_definition(
21 &self,
22 definition: crate::physical::PhysicalTreeDefinition,
23 ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24 self.update_physical_metadata(|metadata| {
25 if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26 existing.collection == definition.collection && existing.name == definition.name
27 }) {
28 *existing = definition.clone();
29 } else {
30 metadata.tree_definitions.push(definition.clone());
31 }
32 metadata.tree_definitions.sort_by(|left, right| {
33 left.collection
34 .cmp(&right.collection)
35 .then_with(|| left.name.cmp(&right.name))
36 });
37 definition.clone()
38 })
39 }
40
41 pub fn remove_tree_definition(
42 &self,
43 collection: &str,
44 name: &str,
45 ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46 self.update_physical_metadata(|metadata| {
47 metadata
48 .tree_definitions
49 .iter()
50 .position(|definition| {
51 definition.collection == collection && definition.name == name
52 })
53 .map(|index| metadata.tree_definitions.remove(index))
54 })
55 }
56
57 pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58 self.collection_ttl_defaults_ms
59 .read()
60 .ok()
61 .and_then(|defaults| defaults.get(collection).copied())
62 }
63
64 pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66 defaults.insert(collection.into(), ttl_ms);
67 }
68 }
69
70 pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72 defaults.remove(collection);
73 }
74 }
75
76 pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77 self.contract_cache_map()
78 .values()
79 .map(|arc| (**arc).clone())
80 .collect()
81 }
82
83 pub fn collection_contract(
84 &self,
85 collection: &str,
86 ) -> Option<crate::physical::CollectionContract> {
87 self.contract_cache_map()
88 .get(collection)
89 .map(|arc| (**arc).clone())
90 }
91
92 pub fn collection_contract_arc(
97 &self,
98 collection: &str,
99 ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100 self.contract_cache_map()
101 .get(collection)
102 .map(std::sync::Arc::clone)
103 }
104
105 fn contract_cache_map(
110 &self,
111 ) -> std::sync::Arc<
112 std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113 > {
114 if let Ok(guard) = self.collection_contract_cache.read() {
115 if let Some(map) = guard.as_ref() {
116 return std::sync::Arc::clone(map);
117 }
118 }
119 let contracts: Vec<crate::physical::CollectionContract> = self
120 .physical_metadata()
121 .map(|metadata| metadata.collection_contracts)
122 .unwrap_or_default();
123 let map: std::collections::HashMap<_, _> = contracts
124 .into_iter()
125 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126 .collect();
127 let arc = std::sync::Arc::new(map);
128 if let Ok(mut guard) = self.collection_contract_cache.write() {
129 *guard = Some(std::sync::Arc::clone(&arc));
130 }
131 arc
132 }
133
134 pub(crate) fn invalidate_collection_contract_cache(&self) {
135 if let Ok(mut guard) = self.collection_contract_cache.write() {
136 *guard = None;
137 }
138 }
139
140 pub fn save_collection_contract(
141 &self,
142 contract: crate::physical::CollectionContract,
143 ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145 if let Some(ttl_ms) = contract.default_ttl_ms {
146 defaults.insert(contract.name.clone(), ttl_ms);
147 } else {
148 defaults.remove(&contract.name);
149 }
150 }
151
152 self.store()
153 .context_index()
154 .set_collection_enabled(&contract.name, contract.context_index_enabled);
155 if let Some(manager) = self.store().get_collection(&contract.name) {
156 let columns = contract
157 .declared_columns
158 .iter()
159 .map(|column| column.name.clone())
160 .collect();
161 manager.set_column_schema_if_empty(columns);
162 }
163
164 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
165 && self.options.storage_profile.packaging
166 == crate::storage::StoragePackaging::SingleFile
167 {
168 if let Ok(mut guard) = self.collection_contract_cache.write() {
169 let mut contracts = guard
170 .as_ref()
171 .map(|existing| existing.as_ref().clone())
172 .unwrap_or_default();
173 contracts.insert(contract.name.clone(), std::sync::Arc::new(contract.clone()));
174 *guard = Some(std::sync::Arc::new(contracts));
175 }
176 return Ok(contract);
177 }
178
179 self.invalidate_collection_contract_cache();
180
181 let saved = self.update_physical_metadata(|metadata| {
182 if let Some(existing) = metadata
183 .collection_contracts
184 .iter_mut()
185 .find(|existing| existing.name == contract.name)
186 {
187 *existing = contract.clone();
188 } else {
189 metadata.collection_contracts.push(contract.clone());
190 }
191 metadata
192 .collection_contracts
193 .sort_by(|left, right| left.name.cmp(&right.name));
194
195 if let Some(ttl_ms) = contract.default_ttl_ms {
196 metadata
197 .collection_ttl_defaults_ms
198 .insert(contract.name.clone(), ttl_ms);
199 } else {
200 metadata.collection_ttl_defaults_ms.remove(&contract.name);
201 }
202
203 contract.clone()
204 })?;
205 self.invalidate_collection_contract_cache();
206 Ok(saved)
207 }
208
209 pub fn remove_collection_contract(
210 &self,
211 collection: &str,
212 ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
213 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
214 defaults.remove(collection);
215 }
216
217 self.store()
218 .context_index()
219 .set_collection_enabled(collection, false);
220
221 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
222 && self.options.storage_profile.packaging
223 == crate::storage::StoragePackaging::SingleFile
224 {
225 let mut removed = None;
226 if let Ok(mut guard) = self.collection_contract_cache.write() {
227 let mut contracts = guard
228 .as_ref()
229 .map(|existing| existing.as_ref().clone())
230 .unwrap_or_default();
231 removed = contracts
232 .remove(collection)
233 .map(|contract| (*contract).clone());
234 *guard = Some(std::sync::Arc::new(contracts));
235 }
236 return Ok(removed);
237 }
238
239 self.invalidate_collection_contract_cache();
240
241 let removed = self.update_physical_metadata(|metadata| {
242 let removed = metadata
243 .collection_contracts
244 .iter()
245 .position(|contract| contract.name == collection)
246 .map(|index| metadata.collection_contracts.remove(index));
247 metadata.collection_ttl_defaults_ms.remove(collection);
248 metadata
249 .indexes
250 .retain(|index| index.collection.as_deref() != Some(collection));
251 removed
252 })?;
253 self.invalidate_collection_contract_cache();
254 Ok(removed)
255 }
256
257 pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
258 self.collection_ttl_defaults_ms
259 .read()
260 .map(|defaults| {
261 defaults
262 .iter()
263 .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
264 .collect()
265 })
266 .unwrap_or_default()
267 }
268
269 pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
270 let metadata = self.physical_metadata();
271 let defaults = metadata
272 .as_ref()
273 .map(|m| m.collection_ttl_defaults_ms.clone())
274 .unwrap_or_default();
275
276 if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
277 current.clear();
278 current.extend(defaults);
279 }
280
281 if let Some(metadata) = metadata {
282 let store = self.store();
283 let index = store.context_index();
284 for contract in &metadata.collection_contracts {
285 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
286 }
287 }
288 }
289
290 pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
298 let registry = self.hypertables();
299 if registry.is_empty() {
300 return Vec::new();
301 }
302 use std::collections::BTreeMap;
303 let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
304 BTreeMap::new();
305 for chunk in registry.snapshot_chunks() {
306 chunks_by_table
307 .entry(chunk.id.hypertable.clone())
308 .or_default()
309 .push(crate::physical::PhysicalHypertableChunk {
310 start_ns: chunk.id.start_ns,
311 end_ns_exclusive: chunk.end_ns_exclusive,
312 row_count: chunk.row_count,
313 min_ts_ns: chunk.min_ts_ns,
314 max_ts_ns: chunk.max_ts_ns,
315 sealed: chunk.sealed,
316 ttl_override_ns: chunk.ttl_override_ns,
317 columnar_page: chunk.columnar_page,
318 });
319 }
320 registry
321 .list()
322 .into_iter()
323 .map(|spec| crate::physical::PhysicalHypertable {
324 chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
325 name: spec.name,
326 time_column: spec.time_column,
327 chunk_interval_ns: spec.chunk_interval_ns,
328 default_ttl_ns: spec.default_ttl_ns,
329 })
330 .collect()
331 }
332
333 pub(crate) fn load_hypertables_from_metadata(&self) {
339 let Some(metadata) = self.physical_metadata() else {
340 return;
341 };
342 if metadata.hypertables.is_empty() {
343 return;
344 }
345 let registry = self.hypertables();
346 for hypertable in &metadata.hypertables {
347 let mut spec = crate::storage::timeseries::HypertableSpec::new(
348 hypertable.name.clone(),
349 hypertable.time_column.clone(),
350 hypertable.chunk_interval_ns,
351 );
352 if let Some(ttl) = hypertable.default_ttl_ns {
353 spec = spec.with_ttl_ns(ttl);
354 }
355 registry.register(spec);
356 for chunk in &hypertable.chunks {
357 registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
358 id: crate::storage::timeseries::ChunkId {
359 hypertable: hypertable.name.clone(),
360 start_ns: chunk.start_ns,
361 },
362 end_ns_exclusive: chunk.end_ns_exclusive,
363 row_count: chunk.row_count,
364 min_ts_ns: chunk.min_ts_ns,
365 max_ts_ns: chunk.max_ts_ns,
366 sealed: chunk.sealed,
367 ttl_override_ns: chunk.ttl_override_ns,
368 columnar_page: chunk.columnar_page,
369 });
370 }
371 }
372 }
373
374 pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
375 self.store.run_maintenance()?;
376 self.persist_metadata()?;
377 Ok(())
378 }
379
380 pub fn metadata_path(&self) -> Option<PathBuf> {
382 self.path
383 .as_ref()
384 .map(|path| PhysicalMetadataFile::metadata_path_for(path))
385 }
386
387 pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
389 self.load_or_bootstrap_physical_metadata(!self.options.read_only)
390 .ok()
391 }
392
393 pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
395 let indexes = self
396 .physical_metadata()
397 .map(|metadata| metadata.indexes)
398 .filter(|indexes| !indexes.is_empty())
399 .or_else(|| {
400 self.native_physical_state()
401 .map(|state| self.physical_index_state_from_native_state(&state, None))
402 })
403 .unwrap_or_else(|| self.physical_index_state());
404 self.reconcile_index_states_with_native_artifacts(indexes)
405 }
406
407 pub fn exports(&self) -> Vec<ExportDescriptor> {
409 self.physical_metadata()
410 .map(|metadata| metadata.exports)
411 .or_else(|| {
412 self.native_physical_state()
413 .map(|state| self.exports_from_native_state(&state))
414 })
415 .unwrap_or_default()
416 }
417
418 pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
420 if let Some(metadata) = self.physical_metadata() {
421 if !metadata.snapshots.is_empty() {
422 return metadata.snapshots;
423 }
424 }
425 self.native_physical_state()
426 .map(|state| self.snapshots_from_native_state(&state))
427 .unwrap_or_default()
428 }
429
430 pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
432 self.physical_metadata()
433 .map(|metadata| metadata.graph_projections)
434 .or_else(|| {
435 self.native_physical_state()
436 .map(|state| self.graph_projections_from_native_state(&state))
437 })
438 .unwrap_or_default()
439 }
440
441 pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
443 self.catalog_model_snapshot().declared_graph_projections
444 }
445
446 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
448 self.graph_projections()
449 .into_iter()
450 .filter(|projection| {
451 projection.last_materialized_sequence.is_some()
452 || matches!(projection.state.as_str(), "materialized" | "stale")
453 })
454 .collect()
455 }
456
457 pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
459 self.physical_metadata()
460 .map(|metadata| metadata.analytics_jobs)
461 .or_else(|| {
462 self.native_physical_state()
463 .map(|state| self.analytics_jobs_from_native_state(&state))
464 })
465 .unwrap_or_default()
466 }
467
468 pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
470 self.catalog_model_snapshot().declared_analytics_jobs
471 }
472
473 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
475 self.analytics_jobs()
476 .into_iter()
477 .filter(|job| {
478 job.last_run_sequence.is_some()
479 || matches!(
480 job.state.as_str(),
481 "running" | "completed" | "failed" | "queued" | "stale"
482 )
483 })
484 .collect()
485 }
486
487 pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
489 self.catalog_model_snapshot().declared_indexes
490 }
491
492 pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
494 self.catalog_model_snapshot().operational_indexes
495 }
496
497 pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
499 self.catalog_model_snapshot().index_statuses
500 }
501
502 pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
504 self.catalog_model_snapshot()
505 .index_statuses
506 .into_iter()
507 .find(|status| status.name == name)
508 }
509
510 pub fn save_graph_projection(
512 &self,
513 name: impl Into<String>,
514 node_labels: Vec<String>,
515 node_types: Vec<String>,
516 edge_labels: Vec<String>,
517 source: impl Into<String>,
518 ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
519 let name = name.into();
520 let source = source.into();
521 self.update_physical_metadata(|metadata| {
522 let now = SystemTime::now()
523 .duration_since(UNIX_EPOCH)
524 .unwrap_or_default()
525 .as_millis();
526 let projection = if let Some(existing) = metadata
527 .graph_projections
528 .iter_mut()
529 .find(|projection| projection.name == name)
530 {
531 existing.updated_at_unix_ms = now;
532 existing.state = "declared".to_string();
533 existing.source = source.clone();
534 existing.node_labels = node_labels.clone();
535 existing.node_types = node_types.clone();
536 existing.edge_labels = edge_labels.clone();
537 existing.last_materialized_sequence = None;
538 existing.clone()
539 } else {
540 let projection = PhysicalGraphProjection {
541 name: name.clone(),
542 created_at_unix_ms: now,
543 updated_at_unix_ms: now,
544 state: "declared".to_string(),
545 source: source.clone(),
546 node_labels: node_labels.clone(),
547 node_types: node_types.clone(),
548 edge_labels: edge_labels.clone(),
549 last_materialized_sequence: None,
550 };
551 metadata.graph_projections.push(projection.clone());
552 projection
553 };
554
555 Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
556
557 metadata
558 .graph_projections
559 .sort_by(|left, right| left.name.cmp(&right.name));
560 projection
561 })
562 }
563
564 pub fn materialize_graph_projection(
566 &self,
567 name: &str,
568 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
569 self.update_physical_metadata(|metadata| {
570 let now = SystemTime::now()
571 .duration_since(UNIX_EPOCH)
572 .unwrap_or_default()
573 .as_millis();
574 let idx = metadata
575 .graph_projections
576 .iter()
577 .position(|projection| projection.name == name);
578 if let Some(idx) = idx {
579 metadata.graph_projections[idx].updated_at_unix_ms = now;
580 metadata.graph_projections[idx].state = "materialized".to_string();
581 metadata.graph_projections[idx].last_materialized_sequence =
582 Some(metadata.superblock.sequence);
583 let result = metadata.graph_projections[idx].clone();
584 Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
585 return Some(result);
586 }
587 None
588 })
589 }
590
591 pub fn mark_graph_projection_materializing(
593 &self,
594 name: &str,
595 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
596 self.update_physical_metadata(|metadata| {
597 let now = SystemTime::now()
598 .duration_since(UNIX_EPOCH)
599 .unwrap_or_default()
600 .as_millis();
601 let idx = metadata
602 .graph_projections
603 .iter()
604 .position(|projection| projection.name == name);
605 if let Some(idx) = idx {
606 metadata.graph_projections[idx].updated_at_unix_ms = now;
607 metadata.graph_projections[idx].state = "materializing".to_string();
608 metadata.graph_projections[idx].last_materialized_sequence = None;
609 let result = metadata.graph_projections[idx].clone();
610 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
611 return Some(result);
612 }
613 None
614 })
615 }
616
617 pub fn fail_graph_projection(
619 &self,
620 name: &str,
621 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
622 self.update_physical_metadata(|metadata| {
623 let now = SystemTime::now()
624 .duration_since(UNIX_EPOCH)
625 .unwrap_or_default()
626 .as_millis();
627 let idx = metadata
628 .graph_projections
629 .iter()
630 .position(|projection| projection.name == name);
631 if let Some(idx) = idx {
632 metadata.graph_projections[idx].updated_at_unix_ms = now;
633 metadata.graph_projections[idx].state = "failed".to_string();
634 metadata.graph_projections[idx].last_materialized_sequence = None;
635 let result = metadata.graph_projections[idx].clone();
636 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
637 return Some(result);
638 }
639 None
640 })
641 }
642
643 pub fn mark_graph_projection_stale(
645 &self,
646 name: &str,
647 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
648 self.update_physical_metadata(|metadata| {
649 let now = SystemTime::now()
650 .duration_since(UNIX_EPOCH)
651 .unwrap_or_default()
652 .as_millis();
653 let idx = metadata
654 .graph_projections
655 .iter()
656 .position(|projection| projection.name == name);
657 if let Some(idx) = idx {
658 metadata.graph_projections[idx].updated_at_unix_ms = now;
659 metadata.graph_projections[idx].state = "stale".to_string();
660 let result = metadata.graph_projections[idx].clone();
661 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
662 return Some(result);
663 }
664 None
665 })
666 }
667
668 fn mark_projection_dependent_jobs_stale(
669 metadata: &mut PhysicalMetadataFile,
670 projection_name: &str,
671 now: u128,
672 ) {
673 for job in metadata.analytics_jobs.iter_mut() {
674 if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
675 job.state = "stale".to_string();
676 job.updated_at_unix_ms = now;
677 }
678 }
679 }
680
681 fn rearm_projection_dependent_jobs_declared(
682 metadata: &mut PhysicalMetadataFile,
683 projection_name: &str,
684 now: u128,
685 ) {
686 for job in metadata.analytics_jobs.iter_mut() {
687 if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
688 job.state = "declared".to_string();
689 job.last_run_sequence = None;
690 job.updated_at_unix_ms = now;
691 }
692 }
693 }
694
695 pub fn save_analytics_job(
697 &self,
698 kind: impl Into<String>,
699 projection: Option<String>,
700 metadata_entries: BTreeMap<String, String>,
701 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
702 let kind = kind.into();
703 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
704 if let Some(projection_name) = projection.as_deref() {
705 if !self.graph_projection_is_declared(projection_name) {
706 return Err(format!(
707 "graph projection '{projection_name}' is not declared in physical metadata"
708 )
709 .into());
710 }
711 }
712
713 self.update_physical_metadata(|metadata| {
714 let now = SystemTime::now()
715 .duration_since(UNIX_EPOCH)
716 .unwrap_or_default()
717 .as_millis();
718
719 let job = if let Some(existing) = metadata
720 .analytics_jobs
721 .iter_mut()
722 .find(|job| job.id == job_id)
723 {
724 existing.kind = kind.clone();
725 existing.projection = projection.clone();
726 existing.updated_at_unix_ms = now;
727 existing.metadata = metadata_entries.clone();
728 if existing.last_run_sequence.is_none() {
729 existing.state = "declared".to_string();
730 }
731 existing.clone()
732 } else {
733 let job = PhysicalAnalyticsJob {
734 id: job_id.clone(),
735 kind: kind.clone(),
736 state: "declared".to_string(),
737 projection: projection.clone(),
738 created_at_unix_ms: now,
739 updated_at_unix_ms: now,
740 last_run_sequence: None,
741 metadata: metadata_entries.clone(),
742 };
743 metadata.analytics_jobs.push(job.clone());
744 job
745 };
746
747 metadata
748 .analytics_jobs
749 .sort_by(|left, right| left.id.cmp(&right.id));
750 job
751 })
752 }
753
754 pub fn record_analytics_job(
756 &self,
757 kind: impl Into<String>,
758 projection: Option<String>,
759 metadata_entries: BTreeMap<String, String>,
760 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
761 let kind = kind.into();
762 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
763 if let Some(projection_name) = projection.as_deref() {
764 if !self.graph_projection_is_declared(projection_name) {
765 return Err(format!(
766 "graph projection '{projection_name}' is not declared in physical metadata"
767 )
768 .into());
769 }
770 if !self.graph_projection_is_operational(projection_name) {
771 return Err(format!(
772 "graph projection '{projection_name}' is declared but not operationally materialized"
773 )
774 .into());
775 }
776 }
777
778 self.update_physical_metadata(|metadata| {
779 let now = SystemTime::now()
780 .duration_since(UNIX_EPOCH)
781 .unwrap_or_default()
782 .as_millis();
783
784 let existing = metadata
785 .analytics_jobs
786 .iter_mut()
787 .find(|job| job.id == job_id)?;
788 existing.state = "completed".to_string();
789 existing.kind = kind.clone();
790 existing.projection = projection.clone();
791 existing.updated_at_unix_ms = now;
792 existing.last_run_sequence = Some(metadata.superblock.sequence);
793 existing.metadata = metadata_entries.clone();
794 let job = existing.clone();
795
796 metadata
797 .analytics_jobs
798 .sort_by(|left, right| left.id.cmp(&right.id));
799 Some(job)
800 })
801 .and_then(|job| {
802 job.ok_or_else(|| {
803 format!("analytics job '{job_id}' is not declared in physical metadata").into()
804 })
805 })
806 }
807
808 pub fn queue_analytics_job(
810 &self,
811 kind: impl Into<String>,
812 projection: Option<String>,
813 metadata_entries: BTreeMap<String, String>,
814 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
815 let kind = kind.into();
816 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
817 if let Some(projection_name) = projection.as_deref() {
818 if !self.graph_projection_is_declared(projection_name) {
819 return Err(format!(
820 "graph projection '{projection_name}' is not declared in physical metadata"
821 )
822 .into());
823 }
824 if !self.graph_projection_is_operational(projection_name) {
825 return Err(format!(
826 "graph projection '{projection_name}' is declared but not operationally materialized"
827 )
828 .into());
829 }
830 }
831
832 self.update_physical_metadata(|metadata| {
833 let now = SystemTime::now()
834 .duration_since(UNIX_EPOCH)
835 .unwrap_or_default()
836 .as_millis();
837
838 let existing = metadata
839 .analytics_jobs
840 .iter_mut()
841 .find(|job| job.id == job_id)?;
842 existing.state = "queued".to_string();
843 existing.kind = kind.clone();
844 existing.projection = projection.clone();
845 existing.updated_at_unix_ms = now;
846 existing.metadata = metadata_entries.clone();
847 let job = existing.clone();
848
849 metadata
850 .analytics_jobs
851 .sort_by(|left, right| left.id.cmp(&right.id));
852 Some(job)
853 })
854 .and_then(|job| {
855 job.ok_or_else(|| {
856 format!("analytics job '{job_id}' is not declared in physical metadata").into()
857 })
858 })
859 }
860
861 pub fn start_analytics_job(
863 &self,
864 kind: impl Into<String>,
865 projection: Option<String>,
866 metadata_entries: BTreeMap<String, String>,
867 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
868 let kind = kind.into();
869 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
870 if let Some(projection_name) = projection.as_deref() {
871 if !self.graph_projection_is_declared(projection_name) {
872 return Err(format!(
873 "graph projection '{projection_name}' is not declared in physical metadata"
874 )
875 .into());
876 }
877 if !self.graph_projection_is_operational(projection_name) {
878 return Err(format!(
879 "graph projection '{projection_name}' is declared but not operationally materialized"
880 )
881 .into());
882 }
883 }
884
885 self.update_physical_metadata(|metadata| {
886 let now = SystemTime::now()
887 .duration_since(UNIX_EPOCH)
888 .unwrap_or_default()
889 .as_millis();
890
891 let existing = metadata
892 .analytics_jobs
893 .iter_mut()
894 .find(|job| job.id == job_id)?;
895 existing.state = "running".to_string();
896 existing.kind = kind.clone();
897 existing.projection = projection.clone();
898 existing.updated_at_unix_ms = now;
899 existing.metadata = metadata_entries.clone();
900 let job = existing.clone();
901
902 metadata
903 .analytics_jobs
904 .sort_by(|left, right| left.id.cmp(&right.id));
905 Some(job)
906 })
907 .and_then(|job| {
908 job.ok_or_else(|| {
909 format!("analytics job '{job_id}' is not declared in physical metadata").into()
910 })
911 })
912 }
913
914 pub fn fail_analytics_job(
916 &self,
917 kind: impl Into<String>,
918 projection: Option<String>,
919 metadata_entries: BTreeMap<String, String>,
920 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
921 let kind = kind.into();
922 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
923
924 self.update_physical_metadata(|metadata| {
925 let now = SystemTime::now()
926 .duration_since(UNIX_EPOCH)
927 .unwrap_or_default()
928 .as_millis();
929
930 let existing = metadata
931 .analytics_jobs
932 .iter_mut()
933 .find(|job| job.id == job_id)?;
934 existing.state = "failed".to_string();
935 existing.kind = kind.clone();
936 existing.projection = projection.clone();
937 existing.updated_at_unix_ms = now;
938 existing.metadata = metadata_entries.clone();
939 let job = existing.clone();
940
941 metadata
942 .analytics_jobs
943 .sort_by(|left, right| left.id.cmp(&right.id));
944 Some(job)
945 })
946 .and_then(|job| {
947 job.ok_or_else(|| {
948 format!("analytics job '{job_id}' is not declared in physical metadata").into()
949 })
950 })
951 }
952
953 pub fn mark_analytics_job_stale(
955 &self,
956 kind: impl Into<String>,
957 projection: Option<String>,
958 metadata_entries: BTreeMap<String, String>,
959 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
960 let kind = kind.into();
961 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
962
963 self.update_physical_metadata(|metadata| {
964 let now = SystemTime::now()
965 .duration_since(UNIX_EPOCH)
966 .unwrap_or_default()
967 .as_millis();
968
969 let existing = metadata
970 .analytics_jobs
971 .iter_mut()
972 .find(|job| job.id == job_id)?;
973 existing.state = "stale".to_string();
974 existing.kind = kind.clone();
975 existing.projection = projection.clone();
976 existing.updated_at_unix_ms = now;
977 existing.metadata = metadata_entries.clone();
978 let job = existing.clone();
979
980 metadata
981 .analytics_jobs
982 .sort_by(|left, right| left.id.cmp(&right.id));
983 Some(job)
984 })
985 .and_then(|job| {
986 job.ok_or_else(|| {
987 format!("analytics job '{job_id}' is not declared in physical metadata").into()
988 })
989 })
990 }
991
992 pub fn create_named_export(
994 &self,
995 name: impl Into<String>,
996 ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
997 let name = name.into();
998 if self.options.mode != StorageMode::Persistent {
999 return Err("exports require persistent mode".into());
1000 }
1001 let Some(path) = self.path() else {
1002 return Err("database path is not available".into());
1003 };
1004
1005 self.flush()?;
1006
1007 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1008 let export_data_path = reddb_file::copy_physical_export_data_file(path, &name)?;
1009 let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
1010 let export_metadata_binary_path =
1011 PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
1012
1013 let descriptor = ExportDescriptor {
1014 name: name.clone(),
1015 created_at_unix_ms: SystemTime::now()
1016 .duration_since(UNIX_EPOCH)
1017 .unwrap_or_default()
1018 .as_millis(),
1019 snapshot_id: metadata
1020 .snapshots
1021 .last()
1022 .map(|snapshot| snapshot.snapshot_id),
1023 superblock_sequence: metadata.superblock.sequence,
1024 data_path: export_data_path.display().to_string(),
1025 metadata_path: export_metadata_path.display().to_string(),
1026 collection_count: metadata.catalog.total_collections,
1027 total_entities: metadata.catalog.total_entities,
1028 };
1029
1030 metadata
1031 .exports
1032 .retain(|export| export.name != descriptor.name);
1033 metadata.exports.push(descriptor.clone());
1034 self.prune_export_registry(&mut metadata.exports);
1035 metadata.save_for_data_path(path)?;
1036 metadata.save_to_binary_path(&export_metadata_binary_path)?;
1037 metadata.save_to_path(&export_metadata_path)?;
1038
1039 Ok(descriptor)
1040 }
1041
1042 pub fn set_index_enabled(
1044 &self,
1045 name: &str,
1046 enabled: bool,
1047 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1048 let Some(status) = self.index_status(name) else {
1049 return Err(format!("index '{name}' is not present in catalog status").into());
1050 };
1051 if !status.declared {
1052 return Err(format!("index '{name}' is not declared in physical metadata").into());
1053 }
1054 self.update_physical_metadata(|metadata| {
1055 let now = SystemTime::now()
1056 .duration_since(UNIX_EPOCH)
1057 .unwrap_or_default()
1058 .as_millis();
1059 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1060 index.enabled = enabled;
1061 if !enabled {
1062 index.build_state = "disabled".to_string();
1063 } else if index.build_state == "disabled" {
1064 index.build_state = if index.artifact_root_page.is_some() {
1065 "ready".to_string()
1066 } else {
1067 "declared-unbuilt".to_string()
1068 };
1069 }
1070 index.last_refresh_ms = Some(now);
1071 return Some(index.clone());
1072 }
1073 None
1074 })
1075 }
1076
1077 pub fn mark_index_building(
1079 &self,
1080 name: &str,
1081 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1082 let Some(status) = self.index_status(name) else {
1083 return Err(format!("index '{name}' is not present in catalog status").into());
1084 };
1085 if !status.declared {
1086 return Err(format!("index '{name}' is not declared in physical metadata").into());
1087 }
1088 if status.lifecycle_state == "disabled" {
1089 return Err(format!("index '{name}' is disabled").into());
1090 }
1091 self.update_physical_metadata(|metadata| {
1092 let now = SystemTime::now()
1093 .duration_since(UNIX_EPOCH)
1094 .unwrap_or_default()
1095 .as_millis();
1096 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1097 index.build_state = "building".to_string();
1098 index.last_refresh_ms = Some(now);
1099 return Some(index.clone());
1100 }
1101 None
1102 })
1103 }
1104
1105 pub fn fail_index(
1107 &self,
1108 name: &str,
1109 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1110 let Some(status) = self.index_status(name) else {
1111 return Err(format!("index '{name}' is not present in catalog status").into());
1112 };
1113 if !status.declared {
1114 return Err(format!("index '{name}' is not declared in physical metadata").into());
1115 }
1116 self.update_physical_metadata(|metadata| {
1117 let now = SystemTime::now()
1118 .duration_since(UNIX_EPOCH)
1119 .unwrap_or_default()
1120 .as_millis();
1121 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1122 index.build_state = "failed".to_string();
1123 index.last_refresh_ms = Some(now);
1124 return Some(index.clone());
1125 }
1126 None
1127 })
1128 }
1129
1130 pub fn mark_index_stale(
1132 &self,
1133 name: &str,
1134 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1135 let Some(status) = self.index_status(name) else {
1136 return Err(format!("index '{name}' is not present in catalog status").into());
1137 };
1138 if !status.declared {
1139 return Err(format!("index '{name}' is not declared in physical metadata").into());
1140 }
1141 self.update_physical_metadata(|metadata| {
1142 let now = SystemTime::now()
1143 .duration_since(UNIX_EPOCH)
1144 .unwrap_or_default()
1145 .as_millis();
1146 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1147 index.build_state = if index.enabled {
1148 "stale".to_string()
1149 } else {
1150 "disabled".to_string()
1151 };
1152 index.last_refresh_ms = Some(now);
1153 return Some(index.clone());
1154 }
1155 None
1156 })
1157 }
1158
1159 pub fn mark_index_ready(
1161 &self,
1162 name: &str,
1163 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1164 self.warmup_index(name)
1165 }
1166
1167 pub fn warmup_index(
1169 &self,
1170 name: &str,
1171 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1172 let Some(status) = self.index_status(name) else {
1173 return Err(format!("index '{name}' is not present in catalog status").into());
1174 };
1175 if !status.declared {
1176 return Err(format!("index '{name}' is not declared in physical metadata").into());
1177 }
1178 if status.lifecycle_state == "disabled" {
1179 return Err(format!("index '{name}' is disabled").into());
1180 }
1181 if !status.operational {
1182 return Err(
1183 format!("index '{name}' is declared but not operationally materialized").into(),
1184 );
1185 }
1186 let warmed_artifact = self
1187 .physical_indexes()
1188 .into_iter()
1189 .find(|index| index.name == name)
1190 .map(|mut index| {
1191 self.warmup_native_vector_artifact_for_index(&index)?;
1192 self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1193 Ok::<_, String>(index)
1194 })
1195 .transpose()
1196 .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1197
1198 self.update_physical_metadata(|metadata| {
1199 let now = SystemTime::now()
1200 .duration_since(UNIX_EPOCH)
1201 .unwrap_or_default()
1202 .as_millis();
1203 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1204 if let Some(warmed) = warmed_artifact.as_ref() {
1205 index.entries = warmed.entries;
1206 index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1207 index.backend = warmed.backend.clone();
1208 index.build_state = "ready".to_string();
1209 }
1210 index.last_refresh_ms = Some(now);
1211 return Some(index.clone());
1212 }
1213 None
1214 })
1215 }
1216
1217 pub fn rebuild_index_registry(
1219 &self,
1220 collection: Option<&str>,
1221 ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1222 let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1223 self.update_physical_metadata(|metadata| {
1224 let now = SystemTime::now()
1225 .duration_since(UNIX_EPOCH)
1226 .unwrap_or_default()
1227 .as_millis();
1228
1229 let mut affected = Vec::new();
1230 let declared = metadata.indexes.clone();
1231 for declared_index in declared {
1232 let matches_collection = collection.is_none_or(|collection_name| {
1233 declared_index.collection.as_deref() == Some(collection_name)
1234 });
1235 if !matches_collection {
1236 continue;
1237 }
1238
1239 let mut rebuilt = fresh
1240 .iter()
1241 .find(|index| index.name == declared_index.name)
1242 .cloned()
1243 .unwrap_or_else(|| {
1244 let mut index = declared_index.clone();
1245 index.build_state = "declared-unbuilt".to_string();
1246 index
1247 });
1248 rebuilt.enabled = declared_index.enabled;
1249 rebuilt.artifact_kind = rebuilt
1250 .artifact_kind
1251 .or_else(|| declared_index.artifact_kind.clone());
1252 rebuilt.artifact_root_page = rebuilt
1253 .artifact_root_page
1254 .or(declared_index.artifact_root_page);
1255 rebuilt.artifact_checksum = rebuilt
1256 .artifact_checksum
1257 .or(declared_index.artifact_checksum);
1258 rebuilt.build_state =
1259 Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1260 rebuilt.last_refresh_ms = Some(now);
1261
1262 if let Some(existing) = metadata
1263 .indexes
1264 .iter_mut()
1265 .find(|index| index.name == rebuilt.name)
1266 {
1267 *existing = rebuilt.clone();
1268 } else {
1269 metadata.indexes.push(rebuilt.clone());
1270 }
1271
1272 affected.push(rebuilt);
1273 }
1274
1275 affected
1276 })
1277 }
1278
1279 fn finalize_rebuilt_index_build_state(
1280 declared: &PhysicalIndexState,
1281 rebuilt: &PhysicalIndexState,
1282 ) -> String {
1283 if !rebuilt.enabled {
1284 return "disabled".to_string();
1285 }
1286
1287 if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1288 return "failed".to_string();
1289 }
1290
1291 let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1292 if native_artifact_family {
1293 if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1294 return "ready".to_string();
1295 }
1296 if declared.artifact_root_page.is_some()
1297 || declared.artifact_checksum.is_some()
1298 || declared.artifact_kind.is_some()
1299 {
1300 return "stale".to_string();
1301 }
1302 return "declared-unbuilt".to_string();
1303 }
1304
1305 if rebuilt.entries > 0 {
1306 return "ready".to_string();
1307 }
1308
1309 if matches!(
1310 declared.build_state.as_str(),
1311 "stale" | "artifact-published" | "registry-loaded"
1312 ) {
1313 return "stale".to_string();
1314 }
1315
1316 "declared-unbuilt".to_string()
1317 }
1318}