1use super::*;
2
3impl RedDB {
4 pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5 self.physical_metadata()
6 .map(|metadata| metadata.tree_definitions)
7 .unwrap_or_default()
8 }
9
10 pub fn tree_definition(
11 &self,
12 collection: &str,
13 name: &str,
14 ) -> Option<crate::physical::PhysicalTreeDefinition> {
15 self.tree_definitions()
16 .into_iter()
17 .find(|definition| definition.collection == collection && definition.name == name)
18 }
19
20 pub fn save_tree_definition(
21 &self,
22 definition: crate::physical::PhysicalTreeDefinition,
23 ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24 self.update_physical_metadata(|metadata| {
25 if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26 existing.collection == definition.collection && existing.name == definition.name
27 }) {
28 *existing = definition.clone();
29 } else {
30 metadata.tree_definitions.push(definition.clone());
31 }
32 metadata.tree_definitions.sort_by(|left, right| {
33 left.collection
34 .cmp(&right.collection)
35 .then_with(|| left.name.cmp(&right.name))
36 });
37 definition.clone()
38 })
39 }
40
41 pub fn remove_tree_definition(
42 &self,
43 collection: &str,
44 name: &str,
45 ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46 self.update_physical_metadata(|metadata| {
47 metadata
48 .tree_definitions
49 .iter()
50 .position(|definition| {
51 definition.collection == collection && definition.name == name
52 })
53 .map(|index| metadata.tree_definitions.remove(index))
54 })
55 }
56
57 pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58 self.collection_ttl_defaults_ms
59 .read()
60 .ok()
61 .and_then(|defaults| defaults.get(collection).copied())
62 }
63
64 pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66 defaults.insert(collection.into(), ttl_ms);
67 }
68 }
69
70 pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72 defaults.remove(collection);
73 }
74 }
75
76 pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77 self.contract_cache_map()
78 .values()
79 .map(|arc| (**arc).clone())
80 .collect()
81 }
82
83 pub fn collection_contract(
84 &self,
85 collection: &str,
86 ) -> Option<crate::physical::CollectionContract> {
87 self.contract_cache_map()
88 .get(collection)
89 .map(|arc| (**arc).clone())
90 }
91
92 pub fn collection_contract_arc(
97 &self,
98 collection: &str,
99 ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100 self.contract_cache_map()
101 .get(collection)
102 .map(std::sync::Arc::clone)
103 }
104
105 fn contract_cache_map(
110 &self,
111 ) -> std::sync::Arc<
112 std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113 > {
114 if let Ok(guard) = self.collection_contract_cache.read() {
115 if let Some(map) = guard.as_ref() {
116 return std::sync::Arc::clone(map);
117 }
118 }
119 let contracts: Vec<crate::physical::CollectionContract> = self
120 .physical_metadata()
121 .map(|metadata| metadata.collection_contracts)
122 .unwrap_or_default();
123 let map: std::collections::HashMap<_, _> = contracts
124 .into_iter()
125 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126 .collect();
127 let arc = std::sync::Arc::new(map);
128 if let Ok(mut guard) = self.collection_contract_cache.write() {
129 *guard = Some(std::sync::Arc::clone(&arc));
130 }
131 arc
132 }
133
134 pub(crate) fn invalidate_collection_contract_cache(&self) {
135 if let Ok(mut guard) = self.collection_contract_cache.write() {
136 *guard = None;
137 }
138 }
139
140 pub(crate) fn sync_single_file_contract_blob(&self) {
148 let contracts = self.collection_contracts();
149 let bytes = crate::physical::serialize_collection_contracts(&contracts);
150 self.store().set_aux_metadata(bytes);
151 }
152
153 pub(crate) fn seed_contract_cache_from_store_aux(&self) {
158 let bytes = self.store().aux_metadata();
159 if bytes.is_empty() {
160 return;
161 }
162 let contracts = match crate::physical::deserialize_collection_contracts(&bytes) {
163 Ok(contracts) => contracts,
164 Err(_) => return,
165 };
166 let map: std::collections::HashMap<_, _> = contracts
167 .into_iter()
168 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
169 .collect();
170 let store = self.store();
174 let index = store.context_index();
175 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
176 for contract in map.values() {
177 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
178 if let Some(ttl_ms) = contract.default_ttl_ms {
179 defaults.insert(contract.name.clone(), ttl_ms);
180 }
181 }
182 }
183 if let Ok(mut guard) = self.collection_contract_cache.write() {
184 *guard = Some(std::sync::Arc::new(map));
185 }
186 }
187
188 pub fn save_collection_contract(
189 &self,
190 contract: crate::physical::CollectionContract,
191 ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
192 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
193 if let Some(ttl_ms) = contract.default_ttl_ms {
194 defaults.insert(contract.name.clone(), ttl_ms);
195 } else {
196 defaults.remove(&contract.name);
197 }
198 }
199
200 self.store()
201 .context_index()
202 .set_collection_enabled(&contract.name, contract.context_index_enabled);
203 if let Some(manager) = self.store().get_collection(&contract.name) {
204 let columns = contract
205 .declared_columns
206 .iter()
207 .map(|column| column.name.clone())
208 .collect();
209 manager.set_column_schema_if_empty(columns);
210 }
211
212 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
213 && self.options.storage_profile.packaging
214 == crate::storage::StoragePackaging::SingleFile
215 {
216 if let Ok(mut guard) = self.collection_contract_cache.write() {
217 let mut contracts = guard
218 .as_ref()
219 .map(|existing| existing.as_ref().clone())
220 .unwrap_or_default();
221 contracts.insert(contract.name.clone(), std::sync::Arc::new(contract.clone()));
222 *guard = Some(std::sync::Arc::new(contracts));
223 }
224 self.sync_single_file_contract_blob();
225 return Ok(contract);
226 }
227
228 self.invalidate_collection_contract_cache();
229
230 let saved = self.update_physical_metadata(|metadata| {
231 if let Some(existing) = metadata
232 .collection_contracts
233 .iter_mut()
234 .find(|existing| existing.name == contract.name)
235 {
236 *existing = contract.clone();
237 } else {
238 metadata.collection_contracts.push(contract.clone());
239 }
240 metadata
241 .collection_contracts
242 .sort_by(|left, right| left.name.cmp(&right.name));
243
244 if let Some(ttl_ms) = contract.default_ttl_ms {
245 metadata
246 .collection_ttl_defaults_ms
247 .insert(contract.name.clone(), ttl_ms);
248 } else {
249 metadata.collection_ttl_defaults_ms.remove(&contract.name);
250 }
251
252 contract.clone()
253 })?;
254 self.invalidate_collection_contract_cache();
255 Ok(saved)
256 }
257
258 pub fn remove_collection_contract(
259 &self,
260 collection: &str,
261 ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
262 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
263 defaults.remove(collection);
264 }
265
266 self.store()
267 .context_index()
268 .set_collection_enabled(collection, false);
269
270 if self.options.storage_profile.deploy_profile == crate::storage::DeployProfile::Embedded
271 && self.options.storage_profile.packaging
272 == crate::storage::StoragePackaging::SingleFile
273 {
274 let mut removed = None;
275 if let Ok(mut guard) = self.collection_contract_cache.write() {
276 let mut contracts = guard
277 .as_ref()
278 .map(|existing| existing.as_ref().clone())
279 .unwrap_or_default();
280 removed = contracts
281 .remove(collection)
282 .map(|contract| (*contract).clone());
283 *guard = Some(std::sync::Arc::new(contracts));
284 }
285 self.sync_single_file_contract_blob();
286 return Ok(removed);
287 }
288
289 self.invalidate_collection_contract_cache();
290
291 let removed = self.update_physical_metadata(|metadata| {
292 let removed = metadata
293 .collection_contracts
294 .iter()
295 .position(|contract| contract.name == collection)
296 .map(|index| metadata.collection_contracts.remove(index));
297 metadata.collection_ttl_defaults_ms.remove(collection);
298 metadata
299 .indexes
300 .retain(|index| index.collection.as_deref() != Some(collection));
301 removed
302 })?;
303 self.invalidate_collection_contract_cache();
304 Ok(removed)
305 }
306
307 pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
308 self.collection_ttl_defaults_ms
309 .read()
310 .map(|defaults| {
311 defaults
312 .iter()
313 .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
314 .collect()
315 })
316 .unwrap_or_default()
317 }
318
319 pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
320 let metadata = self.physical_metadata();
321 let defaults = metadata
322 .as_ref()
323 .map(|m| m.collection_ttl_defaults_ms.clone())
324 .unwrap_or_default();
325
326 if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
327 current.clear();
328 current.extend(defaults);
329 }
330
331 if let Some(metadata) = metadata {
332 let store = self.store();
333 let index = store.context_index();
334 for contract in &metadata.collection_contracts {
335 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
336 }
337 }
338 }
339
340 pub(crate) fn hypertable_registry_snapshot(&self) -> Vec<crate::physical::PhysicalHypertable> {
348 let registry = self.hypertables();
349 if registry.is_empty() {
350 return Vec::new();
351 }
352 use std::collections::BTreeMap;
353 let mut chunks_by_table: BTreeMap<String, Vec<crate::physical::PhysicalHypertableChunk>> =
354 BTreeMap::new();
355 for chunk in registry.snapshot_chunks() {
356 chunks_by_table
357 .entry(chunk.id.hypertable.clone())
358 .or_default()
359 .push(crate::physical::PhysicalHypertableChunk {
360 start_ns: chunk.id.start_ns,
361 end_ns_exclusive: chunk.end_ns_exclusive,
362 row_count: chunk.row_count,
363 min_ts_ns: chunk.min_ts_ns,
364 max_ts_ns: chunk.max_ts_ns,
365 sealed: chunk.sealed,
366 ttl_override_ns: chunk.ttl_override_ns,
367 columnar_page: chunk.columnar_page,
368 });
369 }
370 registry
371 .list()
372 .into_iter()
373 .map(|spec| crate::physical::PhysicalHypertable {
374 chunks: chunks_by_table.remove(&spec.name).unwrap_or_default(),
375 name: spec.name,
376 time_column: spec.time_column,
377 chunk_interval_ns: spec.chunk_interval_ns,
378 default_ttl_ns: spec.default_ttl_ns,
379 })
380 .collect()
381 }
382
383 pub(crate) fn load_hypertables_from_metadata(&self) {
389 let Some(metadata) = self.physical_metadata() else {
390 return;
391 };
392 if metadata.hypertables.is_empty() {
393 return;
394 }
395 let registry = self.hypertables();
396 for hypertable in &metadata.hypertables {
397 let mut spec = crate::storage::timeseries::HypertableSpec::new(
398 hypertable.name.clone(),
399 hypertable.time_column.clone(),
400 hypertable.chunk_interval_ns,
401 );
402 if let Some(ttl) = hypertable.default_ttl_ns {
403 spec = spec.with_ttl_ns(ttl);
404 }
405 registry.register(spec);
406 for chunk in &hypertable.chunks {
407 registry.restore_chunk(crate::storage::timeseries::ChunkMeta {
408 id: crate::storage::timeseries::ChunkId {
409 hypertable: hypertable.name.clone(),
410 start_ns: chunk.start_ns,
411 },
412 end_ns_exclusive: chunk.end_ns_exclusive,
413 row_count: chunk.row_count,
414 min_ts_ns: chunk.min_ts_ns,
415 max_ts_ns: chunk.max_ts_ns,
416 sealed: chunk.sealed,
417 ttl_override_ns: chunk.ttl_override_ns,
418 columnar_page: chunk.columnar_page,
419 });
420 }
421 }
422 }
423
424 pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
425 self.store.run_maintenance()?;
426 self.persist_metadata()?;
427 Ok(())
428 }
429
430 pub fn metadata_path(&self) -> Option<PathBuf> {
432 self.path
433 .as_ref()
434 .map(|path| PhysicalMetadataFile::metadata_path_for(path))
435 }
436
437 pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
439 self.load_or_bootstrap_physical_metadata(!self.options.read_only)
440 .ok()
441 }
442
443 pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
445 let indexes = self
446 .physical_metadata()
447 .map(|metadata| metadata.indexes)
448 .filter(|indexes| !indexes.is_empty())
449 .or_else(|| {
450 self.native_physical_state()
451 .map(|state| self.physical_index_state_from_native_state(&state, None))
452 })
453 .unwrap_or_else(|| self.physical_index_state());
454 self.reconcile_index_states_with_native_artifacts(indexes)
455 }
456
457 pub fn exports(&self) -> Vec<ExportDescriptor> {
459 self.physical_metadata()
460 .map(|metadata| metadata.exports)
461 .or_else(|| {
462 self.native_physical_state()
463 .map(|state| self.exports_from_native_state(&state))
464 })
465 .unwrap_or_default()
466 }
467
468 pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
470 if let Some(metadata) = self.physical_metadata() {
471 if !metadata.snapshots.is_empty() {
472 return metadata.snapshots;
473 }
474 }
475 self.native_physical_state()
476 .map(|state| self.snapshots_from_native_state(&state))
477 .unwrap_or_default()
478 }
479
480 pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
482 self.physical_metadata()
483 .map(|metadata| metadata.graph_projections)
484 .or_else(|| {
485 self.native_physical_state()
486 .map(|state| self.graph_projections_from_native_state(&state))
487 })
488 .unwrap_or_default()
489 }
490
491 pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
493 self.catalog_model_snapshot().declared_graph_projections
494 }
495
496 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
498 self.graph_projections()
499 .into_iter()
500 .filter(|projection| {
501 projection.last_materialized_sequence.is_some()
502 || matches!(projection.state.as_str(), "materialized" | "stale")
503 })
504 .collect()
505 }
506
507 pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
509 self.physical_metadata()
510 .map(|metadata| metadata.analytics_jobs)
511 .or_else(|| {
512 self.native_physical_state()
513 .map(|state| self.analytics_jobs_from_native_state(&state))
514 })
515 .unwrap_or_default()
516 }
517
518 pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
520 self.catalog_model_snapshot().declared_analytics_jobs
521 }
522
523 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
525 self.analytics_jobs()
526 .into_iter()
527 .filter(|job| {
528 job.last_run_sequence.is_some()
529 || matches!(
530 job.state.as_str(),
531 "running" | "completed" | "failed" | "queued" | "stale"
532 )
533 })
534 .collect()
535 }
536
537 pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
539 self.catalog_model_snapshot().declared_indexes
540 }
541
542 pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
544 self.catalog_model_snapshot().operational_indexes
545 }
546
547 pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
549 self.catalog_model_snapshot().index_statuses
550 }
551
552 pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
554 self.catalog_model_snapshot()
555 .index_statuses
556 .into_iter()
557 .find(|status| status.name == name)
558 }
559
560 pub fn save_graph_projection(
562 &self,
563 name: impl Into<String>,
564 node_labels: Vec<String>,
565 node_types: Vec<String>,
566 edge_labels: Vec<String>,
567 source: impl Into<String>,
568 ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
569 let name = name.into();
570 let source = source.into();
571 self.update_physical_metadata(|metadata| {
572 let now = SystemTime::now()
573 .duration_since(UNIX_EPOCH)
574 .unwrap_or_default()
575 .as_millis();
576 let projection = if let Some(existing) = metadata
577 .graph_projections
578 .iter_mut()
579 .find(|projection| projection.name == name)
580 {
581 existing.updated_at_unix_ms = now;
582 existing.state = "declared".to_string();
583 existing.source = source.clone();
584 existing.node_labels = node_labels.clone();
585 existing.node_types = node_types.clone();
586 existing.edge_labels = edge_labels.clone();
587 existing.last_materialized_sequence = None;
588 existing.clone()
589 } else {
590 let projection = PhysicalGraphProjection {
591 name: name.clone(),
592 created_at_unix_ms: now,
593 updated_at_unix_ms: now,
594 state: "declared".to_string(),
595 source: source.clone(),
596 node_labels: node_labels.clone(),
597 node_types: node_types.clone(),
598 edge_labels: edge_labels.clone(),
599 last_materialized_sequence: None,
600 };
601 metadata.graph_projections.push(projection.clone());
602 projection
603 };
604
605 Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
606
607 metadata
608 .graph_projections
609 .sort_by(|left, right| left.name.cmp(&right.name));
610 projection
611 })
612 }
613
614 pub fn materialize_graph_projection(
616 &self,
617 name: &str,
618 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
619 self.update_physical_metadata(|metadata| {
620 let now = SystemTime::now()
621 .duration_since(UNIX_EPOCH)
622 .unwrap_or_default()
623 .as_millis();
624 let idx = metadata
625 .graph_projections
626 .iter()
627 .position(|projection| projection.name == name);
628 if let Some(idx) = idx {
629 metadata.graph_projections[idx].updated_at_unix_ms = now;
630 metadata.graph_projections[idx].state = "materialized".to_string();
631 metadata.graph_projections[idx].last_materialized_sequence =
632 Some(metadata.superblock.sequence);
633 let result = metadata.graph_projections[idx].clone();
634 Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
635 return Some(result);
636 }
637 None
638 })
639 }
640
641 pub fn mark_graph_projection_materializing(
643 &self,
644 name: &str,
645 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
646 self.update_physical_metadata(|metadata| {
647 let now = SystemTime::now()
648 .duration_since(UNIX_EPOCH)
649 .unwrap_or_default()
650 .as_millis();
651 let idx = metadata
652 .graph_projections
653 .iter()
654 .position(|projection| projection.name == name);
655 if let Some(idx) = idx {
656 metadata.graph_projections[idx].updated_at_unix_ms = now;
657 metadata.graph_projections[idx].state = "materializing".to_string();
658 metadata.graph_projections[idx].last_materialized_sequence = None;
659 let result = metadata.graph_projections[idx].clone();
660 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
661 return Some(result);
662 }
663 None
664 })
665 }
666
667 pub fn fail_graph_projection(
669 &self,
670 name: &str,
671 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
672 self.update_physical_metadata(|metadata| {
673 let now = SystemTime::now()
674 .duration_since(UNIX_EPOCH)
675 .unwrap_or_default()
676 .as_millis();
677 let idx = metadata
678 .graph_projections
679 .iter()
680 .position(|projection| projection.name == name);
681 if let Some(idx) = idx {
682 metadata.graph_projections[idx].updated_at_unix_ms = now;
683 metadata.graph_projections[idx].state = "failed".to_string();
684 metadata.graph_projections[idx].last_materialized_sequence = None;
685 let result = metadata.graph_projections[idx].clone();
686 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
687 return Some(result);
688 }
689 None
690 })
691 }
692
693 pub fn mark_graph_projection_stale(
695 &self,
696 name: &str,
697 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
698 self.update_physical_metadata(|metadata| {
699 let now = SystemTime::now()
700 .duration_since(UNIX_EPOCH)
701 .unwrap_or_default()
702 .as_millis();
703 let idx = metadata
704 .graph_projections
705 .iter()
706 .position(|projection| projection.name == name);
707 if let Some(idx) = idx {
708 metadata.graph_projections[idx].updated_at_unix_ms = now;
709 metadata.graph_projections[idx].state = "stale".to_string();
710 let result = metadata.graph_projections[idx].clone();
711 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
712 return Some(result);
713 }
714 None
715 })
716 }
717
718 fn mark_projection_dependent_jobs_stale(
719 metadata: &mut PhysicalMetadataFile,
720 projection_name: &str,
721 now: u128,
722 ) {
723 for job in metadata.analytics_jobs.iter_mut() {
724 if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
725 job.state = "stale".to_string();
726 job.updated_at_unix_ms = now;
727 }
728 }
729 }
730
731 fn rearm_projection_dependent_jobs_declared(
732 metadata: &mut PhysicalMetadataFile,
733 projection_name: &str,
734 now: u128,
735 ) {
736 for job in metadata.analytics_jobs.iter_mut() {
737 if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
738 job.state = "declared".to_string();
739 job.last_run_sequence = None;
740 job.updated_at_unix_ms = now;
741 }
742 }
743 }
744
745 pub fn save_analytics_job(
747 &self,
748 kind: impl Into<String>,
749 projection: Option<String>,
750 metadata_entries: BTreeMap<String, String>,
751 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
752 let kind = kind.into();
753 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
754 if let Some(projection_name) = projection.as_deref() {
755 if !self.graph_projection_is_declared(projection_name) {
756 return Err(format!(
757 "graph projection '{projection_name}' is not declared in physical metadata"
758 )
759 .into());
760 }
761 }
762
763 self.update_physical_metadata(|metadata| {
764 let now = SystemTime::now()
765 .duration_since(UNIX_EPOCH)
766 .unwrap_or_default()
767 .as_millis();
768
769 let job = if let Some(existing) = metadata
770 .analytics_jobs
771 .iter_mut()
772 .find(|job| job.id == job_id)
773 {
774 existing.kind = kind.clone();
775 existing.projection = projection.clone();
776 existing.updated_at_unix_ms = now;
777 existing.metadata = metadata_entries.clone();
778 if existing.last_run_sequence.is_none() {
779 existing.state = "declared".to_string();
780 }
781 existing.clone()
782 } else {
783 let job = PhysicalAnalyticsJob {
784 id: job_id.clone(),
785 kind: kind.clone(),
786 state: "declared".to_string(),
787 projection: projection.clone(),
788 created_at_unix_ms: now,
789 updated_at_unix_ms: now,
790 last_run_sequence: None,
791 metadata: metadata_entries.clone(),
792 };
793 metadata.analytics_jobs.push(job.clone());
794 job
795 };
796
797 metadata
798 .analytics_jobs
799 .sort_by(|left, right| left.id.cmp(&right.id));
800 job
801 })
802 }
803
804 pub fn record_analytics_job(
806 &self,
807 kind: impl Into<String>,
808 projection: Option<String>,
809 metadata_entries: BTreeMap<String, String>,
810 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
811 let kind = kind.into();
812 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
813 if let Some(projection_name) = projection.as_deref() {
814 if !self.graph_projection_is_declared(projection_name) {
815 return Err(format!(
816 "graph projection '{projection_name}' is not declared in physical metadata"
817 )
818 .into());
819 }
820 if !self.graph_projection_is_operational(projection_name) {
821 return Err(format!(
822 "graph projection '{projection_name}' is declared but not operationally materialized"
823 )
824 .into());
825 }
826 }
827
828 self.update_physical_metadata(|metadata| {
829 let now = SystemTime::now()
830 .duration_since(UNIX_EPOCH)
831 .unwrap_or_default()
832 .as_millis();
833
834 let existing = metadata
835 .analytics_jobs
836 .iter_mut()
837 .find(|job| job.id == job_id)?;
838 existing.state = "completed".to_string();
839 existing.kind = kind.clone();
840 existing.projection = projection.clone();
841 existing.updated_at_unix_ms = now;
842 existing.last_run_sequence = Some(metadata.superblock.sequence);
843 existing.metadata = metadata_entries.clone();
844 let job = existing.clone();
845
846 metadata
847 .analytics_jobs
848 .sort_by(|left, right| left.id.cmp(&right.id));
849 Some(job)
850 })
851 .and_then(|job| {
852 job.ok_or_else(|| {
853 format!("analytics job '{job_id}' is not declared in physical metadata").into()
854 })
855 })
856 }
857
858 pub fn queue_analytics_job(
860 &self,
861 kind: impl Into<String>,
862 projection: Option<String>,
863 metadata_entries: BTreeMap<String, String>,
864 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
865 let kind = kind.into();
866 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
867 if let Some(projection_name) = projection.as_deref() {
868 if !self.graph_projection_is_declared(projection_name) {
869 return Err(format!(
870 "graph projection '{projection_name}' is not declared in physical metadata"
871 )
872 .into());
873 }
874 if !self.graph_projection_is_operational(projection_name) {
875 return Err(format!(
876 "graph projection '{projection_name}' is declared but not operationally materialized"
877 )
878 .into());
879 }
880 }
881
882 self.update_physical_metadata(|metadata| {
883 let now = SystemTime::now()
884 .duration_since(UNIX_EPOCH)
885 .unwrap_or_default()
886 .as_millis();
887
888 let existing = metadata
889 .analytics_jobs
890 .iter_mut()
891 .find(|job| job.id == job_id)?;
892 existing.state = "queued".to_string();
893 existing.kind = kind.clone();
894 existing.projection = projection.clone();
895 existing.updated_at_unix_ms = now;
896 existing.metadata = metadata_entries.clone();
897 let job = existing.clone();
898
899 metadata
900 .analytics_jobs
901 .sort_by(|left, right| left.id.cmp(&right.id));
902 Some(job)
903 })
904 .and_then(|job| {
905 job.ok_or_else(|| {
906 format!("analytics job '{job_id}' is not declared in physical metadata").into()
907 })
908 })
909 }
910
911 pub fn start_analytics_job(
913 &self,
914 kind: impl Into<String>,
915 projection: Option<String>,
916 metadata_entries: BTreeMap<String, String>,
917 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
918 let kind = kind.into();
919 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
920 if let Some(projection_name) = projection.as_deref() {
921 if !self.graph_projection_is_declared(projection_name) {
922 return Err(format!(
923 "graph projection '{projection_name}' is not declared in physical metadata"
924 )
925 .into());
926 }
927 if !self.graph_projection_is_operational(projection_name) {
928 return Err(format!(
929 "graph projection '{projection_name}' is declared but not operationally materialized"
930 )
931 .into());
932 }
933 }
934
935 self.update_physical_metadata(|metadata| {
936 let now = SystemTime::now()
937 .duration_since(UNIX_EPOCH)
938 .unwrap_or_default()
939 .as_millis();
940
941 let existing = metadata
942 .analytics_jobs
943 .iter_mut()
944 .find(|job| job.id == job_id)?;
945 existing.state = "running".to_string();
946 existing.kind = kind.clone();
947 existing.projection = projection.clone();
948 existing.updated_at_unix_ms = now;
949 existing.metadata = metadata_entries.clone();
950 let job = existing.clone();
951
952 metadata
953 .analytics_jobs
954 .sort_by(|left, right| left.id.cmp(&right.id));
955 Some(job)
956 })
957 .and_then(|job| {
958 job.ok_or_else(|| {
959 format!("analytics job '{job_id}' is not declared in physical metadata").into()
960 })
961 })
962 }
963
964 pub fn fail_analytics_job(
966 &self,
967 kind: impl Into<String>,
968 projection: Option<String>,
969 metadata_entries: BTreeMap<String, String>,
970 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
971 let kind = kind.into();
972 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
973
974 self.update_physical_metadata(|metadata| {
975 let now = SystemTime::now()
976 .duration_since(UNIX_EPOCH)
977 .unwrap_or_default()
978 .as_millis();
979
980 let existing = metadata
981 .analytics_jobs
982 .iter_mut()
983 .find(|job| job.id == job_id)?;
984 existing.state = "failed".to_string();
985 existing.kind = kind.clone();
986 existing.projection = projection.clone();
987 existing.updated_at_unix_ms = now;
988 existing.metadata = metadata_entries.clone();
989 let job = existing.clone();
990
991 metadata
992 .analytics_jobs
993 .sort_by(|left, right| left.id.cmp(&right.id));
994 Some(job)
995 })
996 .and_then(|job| {
997 job.ok_or_else(|| {
998 format!("analytics job '{job_id}' is not declared in physical metadata").into()
999 })
1000 })
1001 }
1002
1003 pub fn mark_analytics_job_stale(
1005 &self,
1006 kind: impl Into<String>,
1007 projection: Option<String>,
1008 metadata_entries: BTreeMap<String, String>,
1009 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
1010 let kind = kind.into();
1011 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
1012
1013 self.update_physical_metadata(|metadata| {
1014 let now = SystemTime::now()
1015 .duration_since(UNIX_EPOCH)
1016 .unwrap_or_default()
1017 .as_millis();
1018
1019 let existing = metadata
1020 .analytics_jobs
1021 .iter_mut()
1022 .find(|job| job.id == job_id)?;
1023 existing.state = "stale".to_string();
1024 existing.kind = kind.clone();
1025 existing.projection = projection.clone();
1026 existing.updated_at_unix_ms = now;
1027 existing.metadata = metadata_entries.clone();
1028 let job = existing.clone();
1029
1030 metadata
1031 .analytics_jobs
1032 .sort_by(|left, right| left.id.cmp(&right.id));
1033 Some(job)
1034 })
1035 .and_then(|job| {
1036 job.ok_or_else(|| {
1037 format!("analytics job '{job_id}' is not declared in physical metadata").into()
1038 })
1039 })
1040 }
1041
1042 pub fn create_named_export(
1044 &self,
1045 name: impl Into<String>,
1046 ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
1047 let name = name.into();
1048 if self.options.mode != StorageMode::Persistent {
1049 return Err("exports require persistent mode".into());
1050 }
1051 let Some(path) = self.path() else {
1052 return Err("database path is not available".into());
1053 };
1054
1055 self.flush()?;
1056
1057 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
1058 let export_data_path = reddb_file::copy_physical_export_data_file(path, &name)?;
1059 let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
1060 let export_metadata_binary_path =
1061 PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
1062
1063 let descriptor = ExportDescriptor {
1064 name: name.clone(),
1065 created_at_unix_ms: SystemTime::now()
1066 .duration_since(UNIX_EPOCH)
1067 .unwrap_or_default()
1068 .as_millis(),
1069 snapshot_id: metadata
1070 .snapshots
1071 .last()
1072 .map(|snapshot| snapshot.snapshot_id),
1073 superblock_sequence: metadata.superblock.sequence,
1074 data_path: export_data_path.display().to_string(),
1075 metadata_path: export_metadata_path.display().to_string(),
1076 collection_count: metadata.catalog.total_collections,
1077 total_entities: metadata.catalog.total_entities,
1078 };
1079
1080 metadata
1081 .exports
1082 .retain(|export| export.name != descriptor.name);
1083 metadata.exports.push(descriptor.clone());
1084 self.prune_export_registry(&mut metadata.exports);
1085 metadata.save_for_data_path(path)?;
1086 metadata.save_to_binary_path(&export_metadata_binary_path)?;
1087 metadata.save_to_path(&export_metadata_path)?;
1088
1089 Ok(descriptor)
1090 }
1091
1092 pub fn set_index_enabled(
1094 &self,
1095 name: &str,
1096 enabled: bool,
1097 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1098 let Some(status) = self.index_status(name) else {
1099 return Err(format!("index '{name}' is not present in catalog status").into());
1100 };
1101 if !status.declared {
1102 return Err(format!("index '{name}' is not declared in physical metadata").into());
1103 }
1104 self.update_physical_metadata(|metadata| {
1105 let now = SystemTime::now()
1106 .duration_since(UNIX_EPOCH)
1107 .unwrap_or_default()
1108 .as_millis();
1109 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1110 index.enabled = enabled;
1111 if !enabled {
1112 index.build_state = "disabled".to_string();
1113 } else if index.build_state == "disabled" {
1114 index.build_state = if index.artifact_root_page.is_some() {
1115 "ready".to_string()
1116 } else {
1117 "declared-unbuilt".to_string()
1118 };
1119 }
1120 index.last_refresh_ms = Some(now);
1121 return Some(index.clone());
1122 }
1123 None
1124 })
1125 }
1126
1127 pub fn mark_index_building(
1129 &self,
1130 name: &str,
1131 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1132 let Some(status) = self.index_status(name) else {
1133 return Err(format!("index '{name}' is not present in catalog status").into());
1134 };
1135 if !status.declared {
1136 return Err(format!("index '{name}' is not declared in physical metadata").into());
1137 }
1138 if status.lifecycle_state == "disabled" {
1139 return Err(format!("index '{name}' is disabled").into());
1140 }
1141 self.update_physical_metadata(|metadata| {
1142 let now = SystemTime::now()
1143 .duration_since(UNIX_EPOCH)
1144 .unwrap_or_default()
1145 .as_millis();
1146 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1147 index.build_state = "building".to_string();
1148 index.last_refresh_ms = Some(now);
1149 return Some(index.clone());
1150 }
1151 None
1152 })
1153 }
1154
1155 pub fn fail_index(
1157 &self,
1158 name: &str,
1159 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1160 let Some(status) = self.index_status(name) else {
1161 return Err(format!("index '{name}' is not present in catalog status").into());
1162 };
1163 if !status.declared {
1164 return Err(format!("index '{name}' is not declared in physical metadata").into());
1165 }
1166 self.update_physical_metadata(|metadata| {
1167 let now = SystemTime::now()
1168 .duration_since(UNIX_EPOCH)
1169 .unwrap_or_default()
1170 .as_millis();
1171 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1172 index.build_state = "failed".to_string();
1173 index.last_refresh_ms = Some(now);
1174 return Some(index.clone());
1175 }
1176 None
1177 })
1178 }
1179
1180 pub fn mark_index_stale(
1182 &self,
1183 name: &str,
1184 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1185 let Some(status) = self.index_status(name) else {
1186 return Err(format!("index '{name}' is not present in catalog status").into());
1187 };
1188 if !status.declared {
1189 return Err(format!("index '{name}' is not declared in physical metadata").into());
1190 }
1191 self.update_physical_metadata(|metadata| {
1192 let now = SystemTime::now()
1193 .duration_since(UNIX_EPOCH)
1194 .unwrap_or_default()
1195 .as_millis();
1196 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1197 index.build_state = if index.enabled {
1198 "stale".to_string()
1199 } else {
1200 "disabled".to_string()
1201 };
1202 index.last_refresh_ms = Some(now);
1203 return Some(index.clone());
1204 }
1205 None
1206 })
1207 }
1208
1209 pub fn mark_index_ready(
1211 &self,
1212 name: &str,
1213 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1214 self.warmup_index(name)
1215 }
1216
1217 pub fn warmup_index(
1219 &self,
1220 name: &str,
1221 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1222 let Some(status) = self.index_status(name) else {
1223 return Err(format!("index '{name}' is not present in catalog status").into());
1224 };
1225 if !status.declared {
1226 return Err(format!("index '{name}' is not declared in physical metadata").into());
1227 }
1228 if status.lifecycle_state == "disabled" {
1229 return Err(format!("index '{name}' is disabled").into());
1230 }
1231 if !status.operational {
1232 return Err(
1233 format!("index '{name}' is declared but not operationally materialized").into(),
1234 );
1235 }
1236 let warmed_artifact = self
1237 .physical_indexes()
1238 .into_iter()
1239 .find(|index| index.name == name)
1240 .map(|mut index| {
1241 self.warmup_native_vector_artifact_for_index(&index)?;
1242 self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1243 Ok::<_, String>(index)
1244 })
1245 .transpose()
1246 .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1247
1248 self.update_physical_metadata(|metadata| {
1249 let now = SystemTime::now()
1250 .duration_since(UNIX_EPOCH)
1251 .unwrap_or_default()
1252 .as_millis();
1253 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1254 if let Some(warmed) = warmed_artifact.as_ref() {
1255 index.entries = warmed.entries;
1256 index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1257 index.backend = warmed.backend.clone();
1258 index.build_state = "ready".to_string();
1259 }
1260 index.last_refresh_ms = Some(now);
1261 return Some(index.clone());
1262 }
1263 None
1264 })
1265 }
1266
1267 pub fn rebuild_index_registry(
1269 &self,
1270 collection: Option<&str>,
1271 ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1272 let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1273 self.update_physical_metadata(|metadata| {
1274 let now = SystemTime::now()
1275 .duration_since(UNIX_EPOCH)
1276 .unwrap_or_default()
1277 .as_millis();
1278
1279 let mut affected = Vec::new();
1280 let declared = metadata.indexes.clone();
1281 for declared_index in declared {
1282 let matches_collection = collection.is_none_or(|collection_name| {
1283 declared_index.collection.as_deref() == Some(collection_name)
1284 });
1285 if !matches_collection {
1286 continue;
1287 }
1288
1289 let mut rebuilt = fresh
1290 .iter()
1291 .find(|index| index.name == declared_index.name)
1292 .cloned()
1293 .unwrap_or_else(|| {
1294 let mut index = declared_index.clone();
1295 index.build_state = "declared-unbuilt".to_string();
1296 index
1297 });
1298 rebuilt.enabled = declared_index.enabled;
1299 rebuilt.artifact_kind = rebuilt
1300 .artifact_kind
1301 .or_else(|| declared_index.artifact_kind.clone());
1302 rebuilt.artifact_root_page = rebuilt
1303 .artifact_root_page
1304 .or(declared_index.artifact_root_page);
1305 rebuilt.artifact_checksum = rebuilt
1306 .artifact_checksum
1307 .or(declared_index.artifact_checksum);
1308 rebuilt.build_state =
1309 Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1310 rebuilt.last_refresh_ms = Some(now);
1311
1312 if let Some(existing) = metadata
1313 .indexes
1314 .iter_mut()
1315 .find(|index| index.name == rebuilt.name)
1316 {
1317 *existing = rebuilt.clone();
1318 } else {
1319 metadata.indexes.push(rebuilt.clone());
1320 }
1321
1322 affected.push(rebuilt);
1323 }
1324
1325 affected
1326 })
1327 }
1328
1329 fn finalize_rebuilt_index_build_state(
1330 declared: &PhysicalIndexState,
1331 rebuilt: &PhysicalIndexState,
1332 ) -> String {
1333 if !rebuilt.enabled {
1334 return "disabled".to_string();
1335 }
1336
1337 if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1338 return "failed".to_string();
1339 }
1340
1341 let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1342 if native_artifact_family {
1343 if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1344 return "ready".to_string();
1345 }
1346 if declared.artifact_root_page.is_some()
1347 || declared.artifact_checksum.is_some()
1348 || declared.artifact_kind.is_some()
1349 {
1350 return "stale".to_string();
1351 }
1352 return "declared-unbuilt".to_string();
1353 }
1354
1355 if rebuilt.entries > 0 {
1356 return "ready".to_string();
1357 }
1358
1359 if matches!(
1360 declared.build_state.as_str(),
1361 "stale" | "artifact-published" | "registry-loaded"
1362 ) {
1363 return "stale".to_string();
1364 }
1365
1366 "declared-unbuilt".to_string()
1367 }
1368}