1use super::*;
2
3impl RedDB {
4 pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5 self.physical_metadata()
6 .map(|metadata| metadata.tree_definitions)
7 .unwrap_or_default()
8 }
9
10 pub fn tree_definition(
11 &self,
12 collection: &str,
13 name: &str,
14 ) -> Option<crate::physical::PhysicalTreeDefinition> {
15 self.tree_definitions()
16 .into_iter()
17 .find(|definition| definition.collection == collection && definition.name == name)
18 }
19
20 pub fn save_tree_definition(
21 &self,
22 definition: crate::physical::PhysicalTreeDefinition,
23 ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24 self.update_physical_metadata(|metadata| {
25 if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26 existing.collection == definition.collection && existing.name == definition.name
27 }) {
28 *existing = definition.clone();
29 } else {
30 metadata.tree_definitions.push(definition.clone());
31 }
32 metadata.tree_definitions.sort_by(|left, right| {
33 left.collection
34 .cmp(&right.collection)
35 .then_with(|| left.name.cmp(&right.name))
36 });
37 definition.clone()
38 })
39 }
40
41 pub fn remove_tree_definition(
42 &self,
43 collection: &str,
44 name: &str,
45 ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46 self.update_physical_metadata(|metadata| {
47 metadata
48 .tree_definitions
49 .iter()
50 .position(|definition| {
51 definition.collection == collection && definition.name == name
52 })
53 .map(|index| metadata.tree_definitions.remove(index))
54 })
55 }
56
57 pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58 self.collection_ttl_defaults_ms
59 .read()
60 .ok()
61 .and_then(|defaults| defaults.get(collection).copied())
62 }
63
64 pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66 defaults.insert(collection.into(), ttl_ms);
67 }
68 }
69
70 pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72 defaults.remove(collection);
73 }
74 }
75
76 pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77 self.contract_cache_map()
78 .values()
79 .map(|arc| (**arc).clone())
80 .collect()
81 }
82
83 pub fn collection_contract(
84 &self,
85 collection: &str,
86 ) -> Option<crate::physical::CollectionContract> {
87 self.contract_cache_map()
88 .get(collection)
89 .map(|arc| (**arc).clone())
90 }
91
92 pub fn collection_contract_arc(
97 &self,
98 collection: &str,
99 ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100 self.contract_cache_map()
101 .get(collection)
102 .map(std::sync::Arc::clone)
103 }
104
105 fn contract_cache_map(
110 &self,
111 ) -> std::sync::Arc<
112 std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113 > {
114 if let Ok(guard) = self.collection_contract_cache.read() {
115 if let Some(map) = guard.as_ref() {
116 return std::sync::Arc::clone(map);
117 }
118 }
119 let contracts: Vec<crate::physical::CollectionContract> = self
120 .physical_metadata()
121 .map(|metadata| metadata.collection_contracts)
122 .unwrap_or_default();
123 let map: std::collections::HashMap<_, _> = contracts
124 .into_iter()
125 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126 .collect();
127 let arc = std::sync::Arc::new(map);
128 if let Ok(mut guard) = self.collection_contract_cache.write() {
129 *guard = Some(std::sync::Arc::clone(&arc));
130 }
131 arc
132 }
133
134 pub(crate) fn invalidate_collection_contract_cache(&self) {
135 if let Ok(mut guard) = self.collection_contract_cache.write() {
136 *guard = None;
137 }
138 }
139
140 pub fn save_collection_contract(
141 &self,
142 contract: crate::physical::CollectionContract,
143 ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145 if let Some(ttl_ms) = contract.default_ttl_ms {
146 defaults.insert(contract.name.clone(), ttl_ms);
147 } else {
148 defaults.remove(&contract.name);
149 }
150 }
151
152 self.store()
153 .context_index()
154 .set_collection_enabled(&contract.name, contract.context_index_enabled);
155
156 self.invalidate_collection_contract_cache();
157
158 self.update_physical_metadata(|metadata| {
159 if let Some(existing) = metadata
160 .collection_contracts
161 .iter_mut()
162 .find(|existing| existing.name == contract.name)
163 {
164 *existing = contract.clone();
165 } else {
166 metadata.collection_contracts.push(contract.clone());
167 }
168 metadata
169 .collection_contracts
170 .sort_by(|left, right| left.name.cmp(&right.name));
171
172 if let Some(ttl_ms) = contract.default_ttl_ms {
173 metadata
174 .collection_ttl_defaults_ms
175 .insert(contract.name.clone(), ttl_ms);
176 } else {
177 metadata.collection_ttl_defaults_ms.remove(&contract.name);
178 }
179
180 contract.clone()
181 })
182 }
183
184 pub fn remove_collection_contract(
185 &self,
186 collection: &str,
187 ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
188 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
189 defaults.remove(collection);
190 }
191
192 self.store()
193 .context_index()
194 .set_collection_enabled(collection, false);
195
196 self.invalidate_collection_contract_cache();
197
198 self.update_physical_metadata(|metadata| {
199 let removed = metadata
200 .collection_contracts
201 .iter()
202 .position(|contract| contract.name == collection)
203 .map(|index| metadata.collection_contracts.remove(index));
204 metadata.collection_ttl_defaults_ms.remove(collection);
205 metadata
206 .indexes
207 .retain(|index| index.collection.as_deref() != Some(collection));
208 removed
209 })
210 }
211
212 pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
213 self.collection_ttl_defaults_ms
214 .read()
215 .map(|defaults| {
216 defaults
217 .iter()
218 .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
219 .collect()
220 })
221 .unwrap_or_default()
222 }
223
224 pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
225 let metadata = self.physical_metadata();
226 let defaults = metadata
227 .as_ref()
228 .map(|m| m.collection_ttl_defaults_ms.clone())
229 .unwrap_or_default();
230
231 if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
232 current.clear();
233 current.extend(defaults);
234 }
235
236 if let Some(metadata) = metadata {
237 let store = self.store();
238 let index = store.context_index();
239 for contract in &metadata.collection_contracts {
240 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
241 }
242 }
243 }
244
245 pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
246 self.store.run_maintenance()?;
247 self.persist_metadata()?;
248 Ok(())
249 }
250
251 pub fn metadata_path(&self) -> Option<PathBuf> {
253 self.path
254 .as_ref()
255 .map(|path| PhysicalMetadataFile::metadata_path_for(path))
256 }
257
258 pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
260 self.load_or_bootstrap_physical_metadata(!self.options.read_only)
261 .ok()
262 }
263
264 pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
266 let indexes = self
267 .physical_metadata()
268 .map(|metadata| metadata.indexes)
269 .filter(|indexes| !indexes.is_empty())
270 .or_else(|| {
271 self.native_physical_state()
272 .map(|state| self.physical_index_state_from_native_state(&state, None))
273 })
274 .unwrap_or_else(|| self.physical_index_state());
275 self.reconcile_index_states_with_native_artifacts(indexes)
276 }
277
278 pub fn exports(&self) -> Vec<ExportDescriptor> {
280 self.physical_metadata()
281 .map(|metadata| metadata.exports)
282 .or_else(|| {
283 self.native_physical_state()
284 .map(|state| self.exports_from_native_state(&state))
285 })
286 .unwrap_or_default()
287 }
288
289 pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
291 self.physical_metadata()
292 .map(|metadata| metadata.snapshots)
293 .or_else(|| {
294 self.native_physical_state()
295 .map(|state| self.snapshots_from_native_state(&state))
296 })
297 .unwrap_or_default()
298 }
299
300 pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
302 self.physical_metadata()
303 .map(|metadata| metadata.graph_projections)
304 .or_else(|| {
305 self.native_physical_state()
306 .map(|state| self.graph_projections_from_native_state(&state))
307 })
308 .unwrap_or_default()
309 }
310
311 pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
313 self.catalog_model_snapshot().declared_graph_projections
314 }
315
316 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
318 self.graph_projections()
319 .into_iter()
320 .filter(|projection| {
321 projection.last_materialized_sequence.is_some()
322 || matches!(projection.state.as_str(), "materialized" | "stale")
323 })
324 .collect()
325 }
326
327 pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
329 self.physical_metadata()
330 .map(|metadata| metadata.analytics_jobs)
331 .or_else(|| {
332 self.native_physical_state()
333 .map(|state| self.analytics_jobs_from_native_state(&state))
334 })
335 .unwrap_or_default()
336 }
337
338 pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
340 self.catalog_model_snapshot().declared_analytics_jobs
341 }
342
343 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
345 self.analytics_jobs()
346 .into_iter()
347 .filter(|job| {
348 job.last_run_sequence.is_some()
349 || matches!(
350 job.state.as_str(),
351 "running" | "completed" | "failed" | "queued" | "stale"
352 )
353 })
354 .collect()
355 }
356
357 pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
359 self.catalog_model_snapshot().declared_indexes
360 }
361
362 pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
364 self.catalog_model_snapshot().operational_indexes
365 }
366
367 pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
369 self.catalog_model_snapshot().index_statuses
370 }
371
372 pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
374 self.catalog_model_snapshot()
375 .index_statuses
376 .into_iter()
377 .find(|status| status.name == name)
378 }
379
380 pub fn save_graph_projection(
382 &self,
383 name: impl Into<String>,
384 node_labels: Vec<String>,
385 node_types: Vec<String>,
386 edge_labels: Vec<String>,
387 source: impl Into<String>,
388 ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
389 let name = name.into();
390 let source = source.into();
391 self.update_physical_metadata(|metadata| {
392 let now = SystemTime::now()
393 .duration_since(UNIX_EPOCH)
394 .unwrap_or_default()
395 .as_millis();
396 let projection = if let Some(existing) = metadata
397 .graph_projections
398 .iter_mut()
399 .find(|projection| projection.name == name)
400 {
401 existing.updated_at_unix_ms = now;
402 existing.state = "declared".to_string();
403 existing.source = source.clone();
404 existing.node_labels = node_labels.clone();
405 existing.node_types = node_types.clone();
406 existing.edge_labels = edge_labels.clone();
407 existing.last_materialized_sequence = None;
408 existing.clone()
409 } else {
410 let projection = PhysicalGraphProjection {
411 name: name.clone(),
412 created_at_unix_ms: now,
413 updated_at_unix_ms: now,
414 state: "declared".to_string(),
415 source: source.clone(),
416 node_labels: node_labels.clone(),
417 node_types: node_types.clone(),
418 edge_labels: edge_labels.clone(),
419 last_materialized_sequence: None,
420 };
421 metadata.graph_projections.push(projection.clone());
422 projection
423 };
424
425 Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
426
427 metadata
428 .graph_projections
429 .sort_by(|left, right| left.name.cmp(&right.name));
430 projection
431 })
432 }
433
434 pub fn materialize_graph_projection(
436 &self,
437 name: &str,
438 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
439 self.update_physical_metadata(|metadata| {
440 let now = SystemTime::now()
441 .duration_since(UNIX_EPOCH)
442 .unwrap_or_default()
443 .as_millis();
444 let idx = metadata
445 .graph_projections
446 .iter()
447 .position(|projection| projection.name == name);
448 if let Some(idx) = idx {
449 metadata.graph_projections[idx].updated_at_unix_ms = now;
450 metadata.graph_projections[idx].state = "materialized".to_string();
451 metadata.graph_projections[idx].last_materialized_sequence =
452 Some(metadata.superblock.sequence);
453 let result = metadata.graph_projections[idx].clone();
454 Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
455 return Some(result);
456 }
457 None
458 })
459 }
460
461 pub fn mark_graph_projection_materializing(
463 &self,
464 name: &str,
465 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
466 self.update_physical_metadata(|metadata| {
467 let now = SystemTime::now()
468 .duration_since(UNIX_EPOCH)
469 .unwrap_or_default()
470 .as_millis();
471 let idx = metadata
472 .graph_projections
473 .iter()
474 .position(|projection| projection.name == name);
475 if let Some(idx) = idx {
476 metadata.graph_projections[idx].updated_at_unix_ms = now;
477 metadata.graph_projections[idx].state = "materializing".to_string();
478 metadata.graph_projections[idx].last_materialized_sequence = None;
479 let result = metadata.graph_projections[idx].clone();
480 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
481 return Some(result);
482 }
483 None
484 })
485 }
486
487 pub fn fail_graph_projection(
489 &self,
490 name: &str,
491 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
492 self.update_physical_metadata(|metadata| {
493 let now = SystemTime::now()
494 .duration_since(UNIX_EPOCH)
495 .unwrap_or_default()
496 .as_millis();
497 let idx = metadata
498 .graph_projections
499 .iter()
500 .position(|projection| projection.name == name);
501 if let Some(idx) = idx {
502 metadata.graph_projections[idx].updated_at_unix_ms = now;
503 metadata.graph_projections[idx].state = "failed".to_string();
504 metadata.graph_projections[idx].last_materialized_sequence = None;
505 let result = metadata.graph_projections[idx].clone();
506 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
507 return Some(result);
508 }
509 None
510 })
511 }
512
513 pub fn mark_graph_projection_stale(
515 &self,
516 name: &str,
517 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
518 self.update_physical_metadata(|metadata| {
519 let now = SystemTime::now()
520 .duration_since(UNIX_EPOCH)
521 .unwrap_or_default()
522 .as_millis();
523 let idx = metadata
524 .graph_projections
525 .iter()
526 .position(|projection| projection.name == name);
527 if let Some(idx) = idx {
528 metadata.graph_projections[idx].updated_at_unix_ms = now;
529 metadata.graph_projections[idx].state = "stale".to_string();
530 let result = metadata.graph_projections[idx].clone();
531 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
532 return Some(result);
533 }
534 None
535 })
536 }
537
538 fn mark_projection_dependent_jobs_stale(
539 metadata: &mut PhysicalMetadataFile,
540 projection_name: &str,
541 now: u128,
542 ) {
543 for job in metadata.analytics_jobs.iter_mut() {
544 if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
545 job.state = "stale".to_string();
546 job.updated_at_unix_ms = now;
547 }
548 }
549 }
550
551 fn rearm_projection_dependent_jobs_declared(
552 metadata: &mut PhysicalMetadataFile,
553 projection_name: &str,
554 now: u128,
555 ) {
556 for job in metadata.analytics_jobs.iter_mut() {
557 if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
558 job.state = "declared".to_string();
559 job.last_run_sequence = None;
560 job.updated_at_unix_ms = now;
561 }
562 }
563 }
564
565 pub fn save_analytics_job(
567 &self,
568 kind: impl Into<String>,
569 projection: Option<String>,
570 metadata_entries: BTreeMap<String, String>,
571 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
572 let kind = kind.into();
573 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
574 if let Some(projection_name) = projection.as_deref() {
575 if !self.graph_projection_is_declared(projection_name) {
576 return Err(format!(
577 "graph projection '{projection_name}' is not declared in physical metadata"
578 )
579 .into());
580 }
581 }
582
583 self.update_physical_metadata(|metadata| {
584 let now = SystemTime::now()
585 .duration_since(UNIX_EPOCH)
586 .unwrap_or_default()
587 .as_millis();
588
589 let job = if let Some(existing) = metadata
590 .analytics_jobs
591 .iter_mut()
592 .find(|job| job.id == job_id)
593 {
594 existing.kind = kind.clone();
595 existing.projection = projection.clone();
596 existing.updated_at_unix_ms = now;
597 existing.metadata = metadata_entries.clone();
598 if existing.last_run_sequence.is_none() {
599 existing.state = "declared".to_string();
600 }
601 existing.clone()
602 } else {
603 let job = PhysicalAnalyticsJob {
604 id: job_id.clone(),
605 kind: kind.clone(),
606 state: "declared".to_string(),
607 projection: projection.clone(),
608 created_at_unix_ms: now,
609 updated_at_unix_ms: now,
610 last_run_sequence: None,
611 metadata: metadata_entries.clone(),
612 };
613 metadata.analytics_jobs.push(job.clone());
614 job
615 };
616
617 metadata
618 .analytics_jobs
619 .sort_by(|left, right| left.id.cmp(&right.id));
620 job
621 })
622 }
623
624 pub fn record_analytics_job(
626 &self,
627 kind: impl Into<String>,
628 projection: Option<String>,
629 metadata_entries: BTreeMap<String, String>,
630 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
631 let kind = kind.into();
632 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
633 if let Some(projection_name) = projection.as_deref() {
634 if !self.graph_projection_is_declared(projection_name) {
635 return Err(format!(
636 "graph projection '{projection_name}' is not declared in physical metadata"
637 )
638 .into());
639 }
640 if !self.graph_projection_is_operational(projection_name) {
641 return Err(format!(
642 "graph projection '{projection_name}' is declared but not operationally materialized"
643 )
644 .into());
645 }
646 }
647
648 self.update_physical_metadata(|metadata| {
649 let now = SystemTime::now()
650 .duration_since(UNIX_EPOCH)
651 .unwrap_or_default()
652 .as_millis();
653
654 let existing = metadata
655 .analytics_jobs
656 .iter_mut()
657 .find(|job| job.id == job_id)?;
658 existing.state = "completed".to_string();
659 existing.kind = kind.clone();
660 existing.projection = projection.clone();
661 existing.updated_at_unix_ms = now;
662 existing.last_run_sequence = Some(metadata.superblock.sequence);
663 existing.metadata = metadata_entries.clone();
664 let job = existing.clone();
665
666 metadata
667 .analytics_jobs
668 .sort_by(|left, right| left.id.cmp(&right.id));
669 Some(job)
670 })
671 .and_then(|job| {
672 job.ok_or_else(|| {
673 format!("analytics job '{job_id}' is not declared in physical metadata").into()
674 })
675 })
676 }
677
678 pub fn queue_analytics_job(
680 &self,
681 kind: impl Into<String>,
682 projection: Option<String>,
683 metadata_entries: BTreeMap<String, String>,
684 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
685 let kind = kind.into();
686 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
687 if let Some(projection_name) = projection.as_deref() {
688 if !self.graph_projection_is_declared(projection_name) {
689 return Err(format!(
690 "graph projection '{projection_name}' is not declared in physical metadata"
691 )
692 .into());
693 }
694 if !self.graph_projection_is_operational(projection_name) {
695 return Err(format!(
696 "graph projection '{projection_name}' is declared but not operationally materialized"
697 )
698 .into());
699 }
700 }
701
702 self.update_physical_metadata(|metadata| {
703 let now = SystemTime::now()
704 .duration_since(UNIX_EPOCH)
705 .unwrap_or_default()
706 .as_millis();
707
708 let existing = metadata
709 .analytics_jobs
710 .iter_mut()
711 .find(|job| job.id == job_id)?;
712 existing.state = "queued".to_string();
713 existing.kind = kind.clone();
714 existing.projection = projection.clone();
715 existing.updated_at_unix_ms = now;
716 existing.metadata = metadata_entries.clone();
717 let job = existing.clone();
718
719 metadata
720 .analytics_jobs
721 .sort_by(|left, right| left.id.cmp(&right.id));
722 Some(job)
723 })
724 .and_then(|job| {
725 job.ok_or_else(|| {
726 format!("analytics job '{job_id}' is not declared in physical metadata").into()
727 })
728 })
729 }
730
731 pub fn start_analytics_job(
733 &self,
734 kind: impl Into<String>,
735 projection: Option<String>,
736 metadata_entries: BTreeMap<String, String>,
737 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
738 let kind = kind.into();
739 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
740 if let Some(projection_name) = projection.as_deref() {
741 if !self.graph_projection_is_declared(projection_name) {
742 return Err(format!(
743 "graph projection '{projection_name}' is not declared in physical metadata"
744 )
745 .into());
746 }
747 if !self.graph_projection_is_operational(projection_name) {
748 return Err(format!(
749 "graph projection '{projection_name}' is declared but not operationally materialized"
750 )
751 .into());
752 }
753 }
754
755 self.update_physical_metadata(|metadata| {
756 let now = SystemTime::now()
757 .duration_since(UNIX_EPOCH)
758 .unwrap_or_default()
759 .as_millis();
760
761 let existing = metadata
762 .analytics_jobs
763 .iter_mut()
764 .find(|job| job.id == job_id)?;
765 existing.state = "running".to_string();
766 existing.kind = kind.clone();
767 existing.projection = projection.clone();
768 existing.updated_at_unix_ms = now;
769 existing.metadata = metadata_entries.clone();
770 let job = existing.clone();
771
772 metadata
773 .analytics_jobs
774 .sort_by(|left, right| left.id.cmp(&right.id));
775 Some(job)
776 })
777 .and_then(|job| {
778 job.ok_or_else(|| {
779 format!("analytics job '{job_id}' is not declared in physical metadata").into()
780 })
781 })
782 }
783
784 pub fn fail_analytics_job(
786 &self,
787 kind: impl Into<String>,
788 projection: Option<String>,
789 metadata_entries: BTreeMap<String, String>,
790 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
791 let kind = kind.into();
792 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
793
794 self.update_physical_metadata(|metadata| {
795 let now = SystemTime::now()
796 .duration_since(UNIX_EPOCH)
797 .unwrap_or_default()
798 .as_millis();
799
800 let existing = metadata
801 .analytics_jobs
802 .iter_mut()
803 .find(|job| job.id == job_id)?;
804 existing.state = "failed".to_string();
805 existing.kind = kind.clone();
806 existing.projection = projection.clone();
807 existing.updated_at_unix_ms = now;
808 existing.metadata = metadata_entries.clone();
809 let job = existing.clone();
810
811 metadata
812 .analytics_jobs
813 .sort_by(|left, right| left.id.cmp(&right.id));
814 Some(job)
815 })
816 .and_then(|job| {
817 job.ok_or_else(|| {
818 format!("analytics job '{job_id}' is not declared in physical metadata").into()
819 })
820 })
821 }
822
823 pub fn mark_analytics_job_stale(
825 &self,
826 kind: impl Into<String>,
827 projection: Option<String>,
828 metadata_entries: BTreeMap<String, String>,
829 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
830 let kind = kind.into();
831 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
832
833 self.update_physical_metadata(|metadata| {
834 let now = SystemTime::now()
835 .duration_since(UNIX_EPOCH)
836 .unwrap_or_default()
837 .as_millis();
838
839 let existing = metadata
840 .analytics_jobs
841 .iter_mut()
842 .find(|job| job.id == job_id)?;
843 existing.state = "stale".to_string();
844 existing.kind = kind.clone();
845 existing.projection = projection.clone();
846 existing.updated_at_unix_ms = now;
847 existing.metadata = metadata_entries.clone();
848 let job = existing.clone();
849
850 metadata
851 .analytics_jobs
852 .sort_by(|left, right| left.id.cmp(&right.id));
853 Some(job)
854 })
855 .and_then(|job| {
856 job.ok_or_else(|| {
857 format!("analytics job '{job_id}' is not declared in physical metadata").into()
858 })
859 })
860 }
861
862 pub fn create_named_export(
864 &self,
865 name: impl Into<String>,
866 ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
867 let name = name.into();
868 if self.options.mode != StorageMode::Persistent {
869 return Err("exports require persistent mode".into());
870 }
871 let Some(path) = self.path() else {
872 return Err("database path is not available".into());
873 };
874
875 self.flush()?;
876
877 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
878 let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
879 let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
880 let export_metadata_binary_path =
881 PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
882
883 fs::copy(path, &export_data_path)?;
884
885 let descriptor = ExportDescriptor {
886 name: name.clone(),
887 created_at_unix_ms: SystemTime::now()
888 .duration_since(UNIX_EPOCH)
889 .unwrap_or_default()
890 .as_millis(),
891 snapshot_id: metadata
892 .snapshots
893 .last()
894 .map(|snapshot| snapshot.snapshot_id),
895 superblock_sequence: metadata.superblock.sequence,
896 data_path: export_data_path.display().to_string(),
897 metadata_path: export_metadata_path.display().to_string(),
898 collection_count: metadata.catalog.total_collections,
899 total_entities: metadata.catalog.total_entities,
900 };
901
902 metadata
903 .exports
904 .retain(|export| export.name != descriptor.name);
905 metadata.exports.push(descriptor.clone());
906 self.prune_export_registry(&mut metadata.exports);
907 metadata.save_for_data_path(path)?;
908 metadata.save_to_binary_path(&export_metadata_binary_path)?;
909 metadata.save_to_path(&export_metadata_path)?;
910
911 Ok(descriptor)
912 }
913
914 pub fn set_index_enabled(
916 &self,
917 name: &str,
918 enabled: bool,
919 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
920 let Some(status) = self.index_status(name) else {
921 return Err(format!("index '{name}' is not present in catalog status").into());
922 };
923 if !status.declared {
924 return Err(format!("index '{name}' is not declared in physical metadata").into());
925 }
926 self.update_physical_metadata(|metadata| {
927 let now = SystemTime::now()
928 .duration_since(UNIX_EPOCH)
929 .unwrap_or_default()
930 .as_millis();
931 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
932 index.enabled = enabled;
933 if !enabled {
934 index.build_state = "disabled".to_string();
935 } else if index.build_state == "disabled" {
936 index.build_state = if index.artifact_root_page.is_some() {
937 "ready".to_string()
938 } else {
939 "declared-unbuilt".to_string()
940 };
941 }
942 index.last_refresh_ms = Some(now);
943 return Some(index.clone());
944 }
945 None
946 })
947 }
948
949 pub fn mark_index_building(
951 &self,
952 name: &str,
953 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
954 let Some(status) = self.index_status(name) else {
955 return Err(format!("index '{name}' is not present in catalog status").into());
956 };
957 if !status.declared {
958 return Err(format!("index '{name}' is not declared in physical metadata").into());
959 }
960 if status.lifecycle_state == "disabled" {
961 return Err(format!("index '{name}' is disabled").into());
962 }
963 self.update_physical_metadata(|metadata| {
964 let now = SystemTime::now()
965 .duration_since(UNIX_EPOCH)
966 .unwrap_or_default()
967 .as_millis();
968 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
969 index.build_state = "building".to_string();
970 index.last_refresh_ms = Some(now);
971 return Some(index.clone());
972 }
973 None
974 })
975 }
976
977 pub fn fail_index(
979 &self,
980 name: &str,
981 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
982 let Some(status) = self.index_status(name) else {
983 return Err(format!("index '{name}' is not present in catalog status").into());
984 };
985 if !status.declared {
986 return Err(format!("index '{name}' is not declared in physical metadata").into());
987 }
988 self.update_physical_metadata(|metadata| {
989 let now = SystemTime::now()
990 .duration_since(UNIX_EPOCH)
991 .unwrap_or_default()
992 .as_millis();
993 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
994 index.build_state = "failed".to_string();
995 index.last_refresh_ms = Some(now);
996 return Some(index.clone());
997 }
998 None
999 })
1000 }
1001
1002 pub fn mark_index_stale(
1004 &self,
1005 name: &str,
1006 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1007 let Some(status) = self.index_status(name) else {
1008 return Err(format!("index '{name}' is not present in catalog status").into());
1009 };
1010 if !status.declared {
1011 return Err(format!("index '{name}' is not declared in physical metadata").into());
1012 }
1013 self.update_physical_metadata(|metadata| {
1014 let now = SystemTime::now()
1015 .duration_since(UNIX_EPOCH)
1016 .unwrap_or_default()
1017 .as_millis();
1018 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1019 index.build_state = if index.enabled {
1020 "stale".to_string()
1021 } else {
1022 "disabled".to_string()
1023 };
1024 index.last_refresh_ms = Some(now);
1025 return Some(index.clone());
1026 }
1027 None
1028 })
1029 }
1030
1031 pub fn mark_index_ready(
1033 &self,
1034 name: &str,
1035 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1036 self.warmup_index(name)
1037 }
1038
1039 pub fn warmup_index(
1041 &self,
1042 name: &str,
1043 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1044 let Some(status) = self.index_status(name) else {
1045 return Err(format!("index '{name}' is not present in catalog status").into());
1046 };
1047 if !status.declared {
1048 return Err(format!("index '{name}' is not declared in physical metadata").into());
1049 }
1050 if status.lifecycle_state == "disabled" {
1051 return Err(format!("index '{name}' is disabled").into());
1052 }
1053 if !status.operational {
1054 return Err(
1055 format!("index '{name}' is declared but not operationally materialized").into(),
1056 );
1057 }
1058 let warmed_artifact = self
1059 .physical_indexes()
1060 .into_iter()
1061 .find(|index| index.name == name)
1062 .map(|mut index| {
1063 self.warmup_native_vector_artifact_for_index(&index)?;
1064 self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1065 Ok::<_, String>(index)
1066 })
1067 .transpose()
1068 .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1069
1070 self.update_physical_metadata(|metadata| {
1071 let now = SystemTime::now()
1072 .duration_since(UNIX_EPOCH)
1073 .unwrap_or_default()
1074 .as_millis();
1075 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1076 if let Some(warmed) = warmed_artifact.as_ref() {
1077 index.entries = warmed.entries;
1078 index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1079 index.backend = warmed.backend.clone();
1080 index.build_state = "ready".to_string();
1081 }
1082 index.last_refresh_ms = Some(now);
1083 return Some(index.clone());
1084 }
1085 None
1086 })
1087 }
1088
1089 pub fn rebuild_index_registry(
1091 &self,
1092 collection: Option<&str>,
1093 ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1094 let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1095 self.update_physical_metadata(|metadata| {
1096 let now = SystemTime::now()
1097 .duration_since(UNIX_EPOCH)
1098 .unwrap_or_default()
1099 .as_millis();
1100
1101 let mut affected = Vec::new();
1102 let declared = metadata.indexes.clone();
1103 for declared_index in declared {
1104 let matches_collection = collection.is_none_or(|collection_name| {
1105 declared_index.collection.as_deref() == Some(collection_name)
1106 });
1107 if !matches_collection {
1108 continue;
1109 }
1110
1111 let mut rebuilt = fresh
1112 .iter()
1113 .find(|index| index.name == declared_index.name)
1114 .cloned()
1115 .unwrap_or_else(|| {
1116 let mut index = declared_index.clone();
1117 index.build_state = "declared-unbuilt".to_string();
1118 index
1119 });
1120 rebuilt.enabled = declared_index.enabled;
1121 rebuilt.artifact_kind = rebuilt
1122 .artifact_kind
1123 .or_else(|| declared_index.artifact_kind.clone());
1124 rebuilt.artifact_root_page = rebuilt
1125 .artifact_root_page
1126 .or(declared_index.artifact_root_page);
1127 rebuilt.artifact_checksum = rebuilt
1128 .artifact_checksum
1129 .or(declared_index.artifact_checksum);
1130 rebuilt.build_state =
1131 Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1132 rebuilt.last_refresh_ms = Some(now);
1133
1134 if let Some(existing) = metadata
1135 .indexes
1136 .iter_mut()
1137 .find(|index| index.name == rebuilt.name)
1138 {
1139 *existing = rebuilt.clone();
1140 } else {
1141 metadata.indexes.push(rebuilt.clone());
1142 }
1143
1144 affected.push(rebuilt);
1145 }
1146
1147 affected
1148 })
1149 }
1150
1151 fn finalize_rebuilt_index_build_state(
1152 declared: &PhysicalIndexState,
1153 rebuilt: &PhysicalIndexState,
1154 ) -> String {
1155 if !rebuilt.enabled {
1156 return "disabled".to_string();
1157 }
1158
1159 if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1160 return "failed".to_string();
1161 }
1162
1163 let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1164 if native_artifact_family {
1165 if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1166 return "ready".to_string();
1167 }
1168 if declared.artifact_root_page.is_some()
1169 || declared.artifact_checksum.is_some()
1170 || declared.artifact_kind.is_some()
1171 {
1172 return "stale".to_string();
1173 }
1174 return "declared-unbuilt".to_string();
1175 }
1176
1177 if rebuilt.entries > 0 {
1178 return "ready".to_string();
1179 }
1180
1181 if matches!(
1182 declared.build_state.as_str(),
1183 "stale" | "artifact-published" | "registry-loaded"
1184 ) {
1185 return "stale".to_string();
1186 }
1187
1188 "declared-unbuilt".to_string()
1189 }
1190}