1use super::*;
2
3impl RedDB {
4 pub fn tree_definitions(&self) -> Vec<crate::physical::PhysicalTreeDefinition> {
5 self.physical_metadata()
6 .map(|metadata| metadata.tree_definitions)
7 .unwrap_or_default()
8 }
9
10 pub fn tree_definition(
11 &self,
12 collection: &str,
13 name: &str,
14 ) -> Option<crate::physical::PhysicalTreeDefinition> {
15 self.tree_definitions()
16 .into_iter()
17 .find(|definition| definition.collection == collection && definition.name == name)
18 }
19
20 pub fn save_tree_definition(
21 &self,
22 definition: crate::physical::PhysicalTreeDefinition,
23 ) -> Result<crate::physical::PhysicalTreeDefinition, Box<dyn std::error::Error>> {
24 self.update_physical_metadata(|metadata| {
25 if let Some(existing) = metadata.tree_definitions.iter_mut().find(|existing| {
26 existing.collection == definition.collection && existing.name == definition.name
27 }) {
28 *existing = definition.clone();
29 } else {
30 metadata.tree_definitions.push(definition.clone());
31 }
32 metadata.tree_definitions.sort_by(|left, right| {
33 left.collection
34 .cmp(&right.collection)
35 .then_with(|| left.name.cmp(&right.name))
36 });
37 definition.clone()
38 })
39 }
40
41 pub fn remove_tree_definition(
42 &self,
43 collection: &str,
44 name: &str,
45 ) -> Result<Option<crate::physical::PhysicalTreeDefinition>, Box<dyn std::error::Error>> {
46 self.update_physical_metadata(|metadata| {
47 metadata
48 .tree_definitions
49 .iter()
50 .position(|definition| {
51 definition.collection == collection && definition.name == name
52 })
53 .map(|index| metadata.tree_definitions.remove(index))
54 })
55 }
56
57 pub fn collection_default_ttl_ms(&self, collection: &str) -> Option<u64> {
58 self.collection_ttl_defaults_ms
59 .read()
60 .ok()
61 .and_then(|defaults| defaults.get(collection).copied())
62 }
63
64 pub fn set_collection_default_ttl_ms(&self, collection: impl Into<String>, ttl_ms: u64) {
65 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
66 defaults.insert(collection.into(), ttl_ms);
67 }
68 }
69
70 pub fn clear_collection_default_ttl_ms(&self, collection: &str) {
71 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
72 defaults.remove(collection);
73 }
74 }
75
76 pub fn collection_contracts(&self) -> Vec<crate::physical::CollectionContract> {
77 self.contract_cache_map()
78 .values()
79 .map(|arc| (**arc).clone())
80 .collect()
81 }
82
83 pub fn collection_contract(
84 &self,
85 collection: &str,
86 ) -> Option<crate::physical::CollectionContract> {
87 self.contract_cache_map()
88 .get(collection)
89 .map(|arc| (**arc).clone())
90 }
91
92 pub fn collection_contract_arc(
97 &self,
98 collection: &str,
99 ) -> Option<std::sync::Arc<crate::physical::CollectionContract>> {
100 self.contract_cache_map()
101 .get(collection)
102 .map(std::sync::Arc::clone)
103 }
104
105 fn contract_cache_map(
110 &self,
111 ) -> std::sync::Arc<
112 std::collections::HashMap<String, std::sync::Arc<crate::physical::CollectionContract>>,
113 > {
114 if let Ok(guard) = self.collection_contract_cache.read() {
115 if let Some(map) = guard.as_ref() {
116 return std::sync::Arc::clone(map);
117 }
118 }
119 let contracts: Vec<crate::physical::CollectionContract> = self
120 .physical_metadata()
121 .map(|metadata| metadata.collection_contracts)
122 .unwrap_or_default();
123 let map: std::collections::HashMap<_, _> = contracts
124 .into_iter()
125 .map(|contract| (contract.name.clone(), std::sync::Arc::new(contract)))
126 .collect();
127 let arc = std::sync::Arc::new(map);
128 if let Ok(mut guard) = self.collection_contract_cache.write() {
129 *guard = Some(std::sync::Arc::clone(&arc));
130 }
131 arc
132 }
133
134 pub(crate) fn invalidate_collection_contract_cache(&self) {
135 if let Ok(mut guard) = self.collection_contract_cache.write() {
136 *guard = None;
137 }
138 }
139
140 pub fn save_collection_contract(
141 &self,
142 contract: crate::physical::CollectionContract,
143 ) -> Result<crate::physical::CollectionContract, Box<dyn std::error::Error>> {
144 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
145 if let Some(ttl_ms) = contract.default_ttl_ms {
146 defaults.insert(contract.name.clone(), ttl_ms);
147 } else {
148 defaults.remove(&contract.name);
149 }
150 }
151
152 self.store()
153 .context_index()
154 .set_collection_enabled(&contract.name, contract.context_index_enabled);
155 if let Some(manager) = self.store().get_collection(&contract.name) {
156 let columns = contract
157 .declared_columns
158 .iter()
159 .map(|column| column.name.clone())
160 .collect();
161 manager.set_column_schema_if_empty(columns);
162 }
163
164 self.invalidate_collection_contract_cache();
165
166 self.update_physical_metadata(|metadata| {
167 if let Some(existing) = metadata
168 .collection_contracts
169 .iter_mut()
170 .find(|existing| existing.name == contract.name)
171 {
172 *existing = contract.clone();
173 } else {
174 metadata.collection_contracts.push(contract.clone());
175 }
176 metadata
177 .collection_contracts
178 .sort_by(|left, right| left.name.cmp(&right.name));
179
180 if let Some(ttl_ms) = contract.default_ttl_ms {
181 metadata
182 .collection_ttl_defaults_ms
183 .insert(contract.name.clone(), ttl_ms);
184 } else {
185 metadata.collection_ttl_defaults_ms.remove(&contract.name);
186 }
187
188 contract.clone()
189 })
190 }
191
192 pub fn remove_collection_contract(
193 &self,
194 collection: &str,
195 ) -> Result<Option<crate::physical::CollectionContract>, Box<dyn std::error::Error>> {
196 if let Ok(mut defaults) = self.collection_ttl_defaults_ms.write() {
197 defaults.remove(collection);
198 }
199
200 self.store()
201 .context_index()
202 .set_collection_enabled(collection, false);
203
204 self.invalidate_collection_contract_cache();
205
206 self.update_physical_metadata(|metadata| {
207 let removed = metadata
208 .collection_contracts
209 .iter()
210 .position(|contract| contract.name == collection)
211 .map(|index| metadata.collection_contracts.remove(index));
212 metadata.collection_ttl_defaults_ms.remove(collection);
213 metadata
214 .indexes
215 .retain(|index| index.collection.as_deref() != Some(collection));
216 removed
217 })
218 }
219
220 pub(crate) fn collection_ttl_defaults_snapshot(&self) -> BTreeMap<String, u64> {
221 self.collection_ttl_defaults_ms
222 .read()
223 .map(|defaults| {
224 defaults
225 .iter()
226 .map(|(collection, ttl_ms)| (collection.clone(), *ttl_ms))
227 .collect()
228 })
229 .unwrap_or_default()
230 }
231
232 pub(crate) fn load_collection_ttl_defaults_from_metadata(&self) {
233 let metadata = self.physical_metadata();
234 let defaults = metadata
235 .as_ref()
236 .map(|m| m.collection_ttl_defaults_ms.clone())
237 .unwrap_or_default();
238
239 if let Ok(mut current) = self.collection_ttl_defaults_ms.write() {
240 current.clear();
241 current.extend(defaults);
242 }
243
244 if let Some(metadata) = metadata {
245 let store = self.store();
246 let index = store.context_index();
247 for contract in &metadata.collection_contracts {
248 index.set_collection_enabled(&contract.name, contract.context_index_enabled);
249 }
250 }
251 }
252
253 pub fn run_maintenance(&self) -> Result<(), Box<dyn std::error::Error>> {
254 self.store.run_maintenance()?;
255 self.persist_metadata()?;
256 Ok(())
257 }
258
259 pub fn metadata_path(&self) -> Option<PathBuf> {
261 self.path
262 .as_ref()
263 .map(|path| PhysicalMetadataFile::metadata_path_for(path))
264 }
265
266 pub fn physical_metadata(&self) -> Option<PhysicalMetadataFile> {
268 self.load_or_bootstrap_physical_metadata(!self.options.read_only)
269 .ok()
270 }
271
272 pub fn physical_indexes(&self) -> Vec<PhysicalIndexState> {
274 let indexes = self
275 .physical_metadata()
276 .map(|metadata| metadata.indexes)
277 .filter(|indexes| !indexes.is_empty())
278 .or_else(|| {
279 self.native_physical_state()
280 .map(|state| self.physical_index_state_from_native_state(&state, None))
281 })
282 .unwrap_or_else(|| self.physical_index_state());
283 self.reconcile_index_states_with_native_artifacts(indexes)
284 }
285
286 pub fn exports(&self) -> Vec<ExportDescriptor> {
288 self.physical_metadata()
289 .map(|metadata| metadata.exports)
290 .or_else(|| {
291 self.native_physical_state()
292 .map(|state| self.exports_from_native_state(&state))
293 })
294 .unwrap_or_default()
295 }
296
297 pub fn snapshots(&self) -> Vec<crate::physical::SnapshotDescriptor> {
299 self.physical_metadata()
300 .map(|metadata| metadata.snapshots)
301 .or_else(|| {
302 self.native_physical_state()
303 .map(|state| self.snapshots_from_native_state(&state))
304 })
305 .unwrap_or_default()
306 }
307
308 pub fn graph_projections(&self) -> Vec<PhysicalGraphProjection> {
310 self.physical_metadata()
311 .map(|metadata| metadata.graph_projections)
312 .or_else(|| {
313 self.native_physical_state()
314 .map(|state| self.graph_projections_from_native_state(&state))
315 })
316 .unwrap_or_default()
317 }
318
319 pub fn declared_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
321 self.catalog_model_snapshot().declared_graph_projections
322 }
323
324 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
326 self.graph_projections()
327 .into_iter()
328 .filter(|projection| {
329 projection.last_materialized_sequence.is_some()
330 || matches!(projection.state.as_str(), "materialized" | "stale")
331 })
332 .collect()
333 }
334
335 pub fn analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
337 self.physical_metadata()
338 .map(|metadata| metadata.analytics_jobs)
339 .or_else(|| {
340 self.native_physical_state()
341 .map(|state| self.analytics_jobs_from_native_state(&state))
342 })
343 .unwrap_or_default()
344 }
345
346 pub fn declared_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
348 self.catalog_model_snapshot().declared_analytics_jobs
349 }
350
351 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
353 self.analytics_jobs()
354 .into_iter()
355 .filter(|job| {
356 job.last_run_sequence.is_some()
357 || matches!(
358 job.state.as_str(),
359 "running" | "completed" | "failed" | "queued" | "stale"
360 )
361 })
362 .collect()
363 }
364
365 pub fn declared_indexes(&self) -> Vec<PhysicalIndexState> {
367 self.catalog_model_snapshot().declared_indexes
368 }
369
370 pub fn operational_indexes(&self) -> Vec<PhysicalIndexState> {
372 self.catalog_model_snapshot().operational_indexes
373 }
374
375 pub fn index_statuses(&self) -> Vec<CatalogIndexStatus> {
377 self.catalog_model_snapshot().index_statuses
378 }
379
380 pub fn index_status(&self, name: &str) -> Option<CatalogIndexStatus> {
382 self.catalog_model_snapshot()
383 .index_statuses
384 .into_iter()
385 .find(|status| status.name == name)
386 }
387
388 pub fn save_graph_projection(
390 &self,
391 name: impl Into<String>,
392 node_labels: Vec<String>,
393 node_types: Vec<String>,
394 edge_labels: Vec<String>,
395 source: impl Into<String>,
396 ) -> Result<PhysicalGraphProjection, Box<dyn std::error::Error>> {
397 let name = name.into();
398 let source = source.into();
399 self.update_physical_metadata(|metadata| {
400 let now = SystemTime::now()
401 .duration_since(UNIX_EPOCH)
402 .unwrap_or_default()
403 .as_millis();
404 let projection = if let Some(existing) = metadata
405 .graph_projections
406 .iter_mut()
407 .find(|projection| projection.name == name)
408 {
409 existing.updated_at_unix_ms = now;
410 existing.state = "declared".to_string();
411 existing.source = source.clone();
412 existing.node_labels = node_labels.clone();
413 existing.node_types = node_types.clone();
414 existing.edge_labels = edge_labels.clone();
415 existing.last_materialized_sequence = None;
416 existing.clone()
417 } else {
418 let projection = PhysicalGraphProjection {
419 name: name.clone(),
420 created_at_unix_ms: now,
421 updated_at_unix_ms: now,
422 state: "declared".to_string(),
423 source: source.clone(),
424 node_labels: node_labels.clone(),
425 node_types: node_types.clone(),
426 edge_labels: edge_labels.clone(),
427 last_materialized_sequence: None,
428 };
429 metadata.graph_projections.push(projection.clone());
430 projection
431 };
432
433 Self::mark_projection_dependent_jobs_stale(metadata, &name, now);
434
435 metadata
436 .graph_projections
437 .sort_by(|left, right| left.name.cmp(&right.name));
438 projection
439 })
440 }
441
442 pub fn materialize_graph_projection(
444 &self,
445 name: &str,
446 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
447 self.update_physical_metadata(|metadata| {
448 let now = SystemTime::now()
449 .duration_since(UNIX_EPOCH)
450 .unwrap_or_default()
451 .as_millis();
452 let idx = metadata
453 .graph_projections
454 .iter()
455 .position(|projection| projection.name == name);
456 if let Some(idx) = idx {
457 metadata.graph_projections[idx].updated_at_unix_ms = now;
458 metadata.graph_projections[idx].state = "materialized".to_string();
459 metadata.graph_projections[idx].last_materialized_sequence =
460 Some(metadata.superblock.sequence);
461 let result = metadata.graph_projections[idx].clone();
462 Self::rearm_projection_dependent_jobs_declared(metadata, name, now);
463 return Some(result);
464 }
465 None
466 })
467 }
468
469 pub fn mark_graph_projection_materializing(
471 &self,
472 name: &str,
473 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
474 self.update_physical_metadata(|metadata| {
475 let now = SystemTime::now()
476 .duration_since(UNIX_EPOCH)
477 .unwrap_or_default()
478 .as_millis();
479 let idx = metadata
480 .graph_projections
481 .iter()
482 .position(|projection| projection.name == name);
483 if let Some(idx) = idx {
484 metadata.graph_projections[idx].updated_at_unix_ms = now;
485 metadata.graph_projections[idx].state = "materializing".to_string();
486 metadata.graph_projections[idx].last_materialized_sequence = None;
487 let result = metadata.graph_projections[idx].clone();
488 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
489 return Some(result);
490 }
491 None
492 })
493 }
494
495 pub fn fail_graph_projection(
497 &self,
498 name: &str,
499 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
500 self.update_physical_metadata(|metadata| {
501 let now = SystemTime::now()
502 .duration_since(UNIX_EPOCH)
503 .unwrap_or_default()
504 .as_millis();
505 let idx = metadata
506 .graph_projections
507 .iter()
508 .position(|projection| projection.name == name);
509 if let Some(idx) = idx {
510 metadata.graph_projections[idx].updated_at_unix_ms = now;
511 metadata.graph_projections[idx].state = "failed".to_string();
512 metadata.graph_projections[idx].last_materialized_sequence = None;
513 let result = metadata.graph_projections[idx].clone();
514 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
515 return Some(result);
516 }
517 None
518 })
519 }
520
521 pub fn mark_graph_projection_stale(
523 &self,
524 name: &str,
525 ) -> Result<Option<PhysicalGraphProjection>, Box<dyn std::error::Error>> {
526 self.update_physical_metadata(|metadata| {
527 let now = SystemTime::now()
528 .duration_since(UNIX_EPOCH)
529 .unwrap_or_default()
530 .as_millis();
531 let idx = metadata
532 .graph_projections
533 .iter()
534 .position(|projection| projection.name == name);
535 if let Some(idx) = idx {
536 metadata.graph_projections[idx].updated_at_unix_ms = now;
537 metadata.graph_projections[idx].state = "stale".to_string();
538 let result = metadata.graph_projections[idx].clone();
539 Self::mark_projection_dependent_jobs_stale(metadata, name, now);
540 return Some(result);
541 }
542 None
543 })
544 }
545
546 fn mark_projection_dependent_jobs_stale(
547 metadata: &mut PhysicalMetadataFile,
548 projection_name: &str,
549 now: u128,
550 ) {
551 for job in metadata.analytics_jobs.iter_mut() {
552 if job.projection.as_deref() == Some(projection_name) && job.state != "declared" {
553 job.state = "stale".to_string();
554 job.updated_at_unix_ms = now;
555 }
556 }
557 }
558
559 fn rearm_projection_dependent_jobs_declared(
560 metadata: &mut PhysicalMetadataFile,
561 projection_name: &str,
562 now: u128,
563 ) {
564 for job in metadata.analytics_jobs.iter_mut() {
565 if job.projection.as_deref() == Some(projection_name) && job.state == "stale" {
566 job.state = "declared".to_string();
567 job.last_run_sequence = None;
568 job.updated_at_unix_ms = now;
569 }
570 }
571 }
572
573 pub fn save_analytics_job(
575 &self,
576 kind: impl Into<String>,
577 projection: Option<String>,
578 metadata_entries: BTreeMap<String, String>,
579 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
580 let kind = kind.into();
581 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
582 if let Some(projection_name) = projection.as_deref() {
583 if !self.graph_projection_is_declared(projection_name) {
584 return Err(format!(
585 "graph projection '{projection_name}' is not declared in physical metadata"
586 )
587 .into());
588 }
589 }
590
591 self.update_physical_metadata(|metadata| {
592 let now = SystemTime::now()
593 .duration_since(UNIX_EPOCH)
594 .unwrap_or_default()
595 .as_millis();
596
597 let job = if let Some(existing) = metadata
598 .analytics_jobs
599 .iter_mut()
600 .find(|job| job.id == job_id)
601 {
602 existing.kind = kind.clone();
603 existing.projection = projection.clone();
604 existing.updated_at_unix_ms = now;
605 existing.metadata = metadata_entries.clone();
606 if existing.last_run_sequence.is_none() {
607 existing.state = "declared".to_string();
608 }
609 existing.clone()
610 } else {
611 let job = PhysicalAnalyticsJob {
612 id: job_id.clone(),
613 kind: kind.clone(),
614 state: "declared".to_string(),
615 projection: projection.clone(),
616 created_at_unix_ms: now,
617 updated_at_unix_ms: now,
618 last_run_sequence: None,
619 metadata: metadata_entries.clone(),
620 };
621 metadata.analytics_jobs.push(job.clone());
622 job
623 };
624
625 metadata
626 .analytics_jobs
627 .sort_by(|left, right| left.id.cmp(&right.id));
628 job
629 })
630 }
631
632 pub fn record_analytics_job(
634 &self,
635 kind: impl Into<String>,
636 projection: Option<String>,
637 metadata_entries: BTreeMap<String, String>,
638 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
639 let kind = kind.into();
640 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
641 if let Some(projection_name) = projection.as_deref() {
642 if !self.graph_projection_is_declared(projection_name) {
643 return Err(format!(
644 "graph projection '{projection_name}' is not declared in physical metadata"
645 )
646 .into());
647 }
648 if !self.graph_projection_is_operational(projection_name) {
649 return Err(format!(
650 "graph projection '{projection_name}' is declared but not operationally materialized"
651 )
652 .into());
653 }
654 }
655
656 self.update_physical_metadata(|metadata| {
657 let now = SystemTime::now()
658 .duration_since(UNIX_EPOCH)
659 .unwrap_or_default()
660 .as_millis();
661
662 let existing = metadata
663 .analytics_jobs
664 .iter_mut()
665 .find(|job| job.id == job_id)?;
666 existing.state = "completed".to_string();
667 existing.kind = kind.clone();
668 existing.projection = projection.clone();
669 existing.updated_at_unix_ms = now;
670 existing.last_run_sequence = Some(metadata.superblock.sequence);
671 existing.metadata = metadata_entries.clone();
672 let job = existing.clone();
673
674 metadata
675 .analytics_jobs
676 .sort_by(|left, right| left.id.cmp(&right.id));
677 Some(job)
678 })
679 .and_then(|job| {
680 job.ok_or_else(|| {
681 format!("analytics job '{job_id}' is not declared in physical metadata").into()
682 })
683 })
684 }
685
686 pub fn queue_analytics_job(
688 &self,
689 kind: impl Into<String>,
690 projection: Option<String>,
691 metadata_entries: BTreeMap<String, String>,
692 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
693 let kind = kind.into();
694 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
695 if let Some(projection_name) = projection.as_deref() {
696 if !self.graph_projection_is_declared(projection_name) {
697 return Err(format!(
698 "graph projection '{projection_name}' is not declared in physical metadata"
699 )
700 .into());
701 }
702 if !self.graph_projection_is_operational(projection_name) {
703 return Err(format!(
704 "graph projection '{projection_name}' is declared but not operationally materialized"
705 )
706 .into());
707 }
708 }
709
710 self.update_physical_metadata(|metadata| {
711 let now = SystemTime::now()
712 .duration_since(UNIX_EPOCH)
713 .unwrap_or_default()
714 .as_millis();
715
716 let existing = metadata
717 .analytics_jobs
718 .iter_mut()
719 .find(|job| job.id == job_id)?;
720 existing.state = "queued".to_string();
721 existing.kind = kind.clone();
722 existing.projection = projection.clone();
723 existing.updated_at_unix_ms = now;
724 existing.metadata = metadata_entries.clone();
725 let job = existing.clone();
726
727 metadata
728 .analytics_jobs
729 .sort_by(|left, right| left.id.cmp(&right.id));
730 Some(job)
731 })
732 .and_then(|job| {
733 job.ok_or_else(|| {
734 format!("analytics job '{job_id}' is not declared in physical metadata").into()
735 })
736 })
737 }
738
739 pub fn start_analytics_job(
741 &self,
742 kind: impl Into<String>,
743 projection: Option<String>,
744 metadata_entries: BTreeMap<String, String>,
745 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
746 let kind = kind.into();
747 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
748 if let Some(projection_name) = projection.as_deref() {
749 if !self.graph_projection_is_declared(projection_name) {
750 return Err(format!(
751 "graph projection '{projection_name}' is not declared in physical metadata"
752 )
753 .into());
754 }
755 if !self.graph_projection_is_operational(projection_name) {
756 return Err(format!(
757 "graph projection '{projection_name}' is declared but not operationally materialized"
758 )
759 .into());
760 }
761 }
762
763 self.update_physical_metadata(|metadata| {
764 let now = SystemTime::now()
765 .duration_since(UNIX_EPOCH)
766 .unwrap_or_default()
767 .as_millis();
768
769 let existing = metadata
770 .analytics_jobs
771 .iter_mut()
772 .find(|job| job.id == job_id)?;
773 existing.state = "running".to_string();
774 existing.kind = kind.clone();
775 existing.projection = projection.clone();
776 existing.updated_at_unix_ms = now;
777 existing.metadata = metadata_entries.clone();
778 let job = existing.clone();
779
780 metadata
781 .analytics_jobs
782 .sort_by(|left, right| left.id.cmp(&right.id));
783 Some(job)
784 })
785 .and_then(|job| {
786 job.ok_or_else(|| {
787 format!("analytics job '{job_id}' is not declared in physical metadata").into()
788 })
789 })
790 }
791
792 pub fn fail_analytics_job(
794 &self,
795 kind: impl Into<String>,
796 projection: Option<String>,
797 metadata_entries: BTreeMap<String, String>,
798 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
799 let kind = kind.into();
800 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
801
802 self.update_physical_metadata(|metadata| {
803 let now = SystemTime::now()
804 .duration_since(UNIX_EPOCH)
805 .unwrap_or_default()
806 .as_millis();
807
808 let existing = metadata
809 .analytics_jobs
810 .iter_mut()
811 .find(|job| job.id == job_id)?;
812 existing.state = "failed".to_string();
813 existing.kind = kind.clone();
814 existing.projection = projection.clone();
815 existing.updated_at_unix_ms = now;
816 existing.metadata = metadata_entries.clone();
817 let job = existing.clone();
818
819 metadata
820 .analytics_jobs
821 .sort_by(|left, right| left.id.cmp(&right.id));
822 Some(job)
823 })
824 .and_then(|job| {
825 job.ok_or_else(|| {
826 format!("analytics job '{job_id}' is not declared in physical metadata").into()
827 })
828 })
829 }
830
831 pub fn mark_analytics_job_stale(
833 &self,
834 kind: impl Into<String>,
835 projection: Option<String>,
836 metadata_entries: BTreeMap<String, String>,
837 ) -> Result<PhysicalAnalyticsJob, Box<dyn std::error::Error>> {
838 let kind = kind.into();
839 let job_id = Self::analytics_job_id(&kind, projection.as_deref());
840
841 self.update_physical_metadata(|metadata| {
842 let now = SystemTime::now()
843 .duration_since(UNIX_EPOCH)
844 .unwrap_or_default()
845 .as_millis();
846
847 let existing = metadata
848 .analytics_jobs
849 .iter_mut()
850 .find(|job| job.id == job_id)?;
851 existing.state = "stale".to_string();
852 existing.kind = kind.clone();
853 existing.projection = projection.clone();
854 existing.updated_at_unix_ms = now;
855 existing.metadata = metadata_entries.clone();
856 let job = existing.clone();
857
858 metadata
859 .analytics_jobs
860 .sort_by(|left, right| left.id.cmp(&right.id));
861 Some(job)
862 })
863 .and_then(|job| {
864 job.ok_or_else(|| {
865 format!("analytics job '{job_id}' is not declared in physical metadata").into()
866 })
867 })
868 }
869
870 pub fn create_named_export(
872 &self,
873 name: impl Into<String>,
874 ) -> Result<ExportDescriptor, Box<dyn std::error::Error>> {
875 let name = name.into();
876 if self.options.mode != StorageMode::Persistent {
877 return Err("exports require persistent mode".into());
878 }
879 let Some(path) = self.path() else {
880 return Err("database path is not available".into());
881 };
882
883 self.flush()?;
884
885 let mut metadata = self.load_or_bootstrap_physical_metadata(true)?;
886 let export_data_path = PhysicalMetadataFile::export_data_path_for(path, &name);
887 let export_metadata_path = PhysicalMetadataFile::metadata_path_for(&export_data_path);
888 let export_metadata_binary_path =
889 PhysicalMetadataFile::metadata_binary_path_for(&export_data_path);
890
891 fs::copy(path, &export_data_path)?;
892
893 let descriptor = ExportDescriptor {
894 name: name.clone(),
895 created_at_unix_ms: SystemTime::now()
896 .duration_since(UNIX_EPOCH)
897 .unwrap_or_default()
898 .as_millis(),
899 snapshot_id: metadata
900 .snapshots
901 .last()
902 .map(|snapshot| snapshot.snapshot_id),
903 superblock_sequence: metadata.superblock.sequence,
904 data_path: export_data_path.display().to_string(),
905 metadata_path: export_metadata_path.display().to_string(),
906 collection_count: metadata.catalog.total_collections,
907 total_entities: metadata.catalog.total_entities,
908 };
909
910 metadata
911 .exports
912 .retain(|export| export.name != descriptor.name);
913 metadata.exports.push(descriptor.clone());
914 self.prune_export_registry(&mut metadata.exports);
915 metadata.save_for_data_path(path)?;
916 metadata.save_to_binary_path(&export_metadata_binary_path)?;
917 metadata.save_to_path(&export_metadata_path)?;
918
919 Ok(descriptor)
920 }
921
922 pub fn set_index_enabled(
924 &self,
925 name: &str,
926 enabled: bool,
927 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
928 let Some(status) = self.index_status(name) else {
929 return Err(format!("index '{name}' is not present in catalog status").into());
930 };
931 if !status.declared {
932 return Err(format!("index '{name}' is not declared in physical metadata").into());
933 }
934 self.update_physical_metadata(|metadata| {
935 let now = SystemTime::now()
936 .duration_since(UNIX_EPOCH)
937 .unwrap_or_default()
938 .as_millis();
939 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
940 index.enabled = enabled;
941 if !enabled {
942 index.build_state = "disabled".to_string();
943 } else if index.build_state == "disabled" {
944 index.build_state = if index.artifact_root_page.is_some() {
945 "ready".to_string()
946 } else {
947 "declared-unbuilt".to_string()
948 };
949 }
950 index.last_refresh_ms = Some(now);
951 return Some(index.clone());
952 }
953 None
954 })
955 }
956
957 pub fn mark_index_building(
959 &self,
960 name: &str,
961 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
962 let Some(status) = self.index_status(name) else {
963 return Err(format!("index '{name}' is not present in catalog status").into());
964 };
965 if !status.declared {
966 return Err(format!("index '{name}' is not declared in physical metadata").into());
967 }
968 if status.lifecycle_state == "disabled" {
969 return Err(format!("index '{name}' is disabled").into());
970 }
971 self.update_physical_metadata(|metadata| {
972 let now = SystemTime::now()
973 .duration_since(UNIX_EPOCH)
974 .unwrap_or_default()
975 .as_millis();
976 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
977 index.build_state = "building".to_string();
978 index.last_refresh_ms = Some(now);
979 return Some(index.clone());
980 }
981 None
982 })
983 }
984
985 pub fn fail_index(
987 &self,
988 name: &str,
989 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
990 let Some(status) = self.index_status(name) else {
991 return Err(format!("index '{name}' is not present in catalog status").into());
992 };
993 if !status.declared {
994 return Err(format!("index '{name}' is not declared in physical metadata").into());
995 }
996 self.update_physical_metadata(|metadata| {
997 let now = SystemTime::now()
998 .duration_since(UNIX_EPOCH)
999 .unwrap_or_default()
1000 .as_millis();
1001 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1002 index.build_state = "failed".to_string();
1003 index.last_refresh_ms = Some(now);
1004 return Some(index.clone());
1005 }
1006 None
1007 })
1008 }
1009
1010 pub fn mark_index_stale(
1012 &self,
1013 name: &str,
1014 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1015 let Some(status) = self.index_status(name) else {
1016 return Err(format!("index '{name}' is not present in catalog status").into());
1017 };
1018 if !status.declared {
1019 return Err(format!("index '{name}' is not declared in physical metadata").into());
1020 }
1021 self.update_physical_metadata(|metadata| {
1022 let now = SystemTime::now()
1023 .duration_since(UNIX_EPOCH)
1024 .unwrap_or_default()
1025 .as_millis();
1026 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1027 index.build_state = if index.enabled {
1028 "stale".to_string()
1029 } else {
1030 "disabled".to_string()
1031 };
1032 index.last_refresh_ms = Some(now);
1033 return Some(index.clone());
1034 }
1035 None
1036 })
1037 }
1038
1039 pub fn mark_index_ready(
1041 &self,
1042 name: &str,
1043 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1044 self.warmup_index(name)
1045 }
1046
1047 pub fn warmup_index(
1049 &self,
1050 name: &str,
1051 ) -> Result<Option<PhysicalIndexState>, Box<dyn std::error::Error>> {
1052 let Some(status) = self.index_status(name) else {
1053 return Err(format!("index '{name}' is not present in catalog status").into());
1054 };
1055 if !status.declared {
1056 return Err(format!("index '{name}' is not declared in physical metadata").into());
1057 }
1058 if status.lifecycle_state == "disabled" {
1059 return Err(format!("index '{name}' is disabled").into());
1060 }
1061 if !status.operational {
1062 return Err(
1063 format!("index '{name}' is declared but not operationally materialized").into(),
1064 );
1065 }
1066 let warmed_artifact = self
1067 .physical_indexes()
1068 .into_iter()
1069 .find(|index| index.name == name)
1070 .map(|mut index| {
1071 self.warmup_native_vector_artifact_for_index(&index)?;
1072 self.apply_runtime_native_artifact_to_index_state(&mut index)?;
1073 Ok::<_, String>(index)
1074 })
1075 .transpose()
1076 .map_err(|err| -> Box<dyn std::error::Error> { err.into() })?;
1077
1078 self.update_physical_metadata(|metadata| {
1079 let now = SystemTime::now()
1080 .duration_since(UNIX_EPOCH)
1081 .unwrap_or_default()
1082 .as_millis();
1083 if let Some(index) = metadata.indexes.iter_mut().find(|index| index.name == name) {
1084 if let Some(warmed) = warmed_artifact.as_ref() {
1085 index.entries = warmed.entries;
1086 index.estimated_memory_bytes = warmed.estimated_memory_bytes;
1087 index.backend = warmed.backend.clone();
1088 index.build_state = "ready".to_string();
1089 }
1090 index.last_refresh_ms = Some(now);
1091 return Some(index.clone());
1092 }
1093 None
1094 })
1095 }
1096
1097 pub fn rebuild_index_registry(
1099 &self,
1100 collection: Option<&str>,
1101 ) -> Result<Vec<PhysicalIndexState>, Box<dyn std::error::Error>> {
1102 let fresh = self.reconcile_index_states_with_native_artifacts(self.physical_index_state());
1103 self.update_physical_metadata(|metadata| {
1104 let now = SystemTime::now()
1105 .duration_since(UNIX_EPOCH)
1106 .unwrap_or_default()
1107 .as_millis();
1108
1109 let mut affected = Vec::new();
1110 let declared = metadata.indexes.clone();
1111 for declared_index in declared {
1112 let matches_collection = collection.is_none_or(|collection_name| {
1113 declared_index.collection.as_deref() == Some(collection_name)
1114 });
1115 if !matches_collection {
1116 continue;
1117 }
1118
1119 let mut rebuilt = fresh
1120 .iter()
1121 .find(|index| index.name == declared_index.name)
1122 .cloned()
1123 .unwrap_or_else(|| {
1124 let mut index = declared_index.clone();
1125 index.build_state = "declared-unbuilt".to_string();
1126 index
1127 });
1128 rebuilt.enabled = declared_index.enabled;
1129 rebuilt.artifact_kind = rebuilt
1130 .artifact_kind
1131 .or_else(|| declared_index.artifact_kind.clone());
1132 rebuilt.artifact_root_page = rebuilt
1133 .artifact_root_page
1134 .or(declared_index.artifact_root_page);
1135 rebuilt.artifact_checksum = rebuilt
1136 .artifact_checksum
1137 .or(declared_index.artifact_checksum);
1138 rebuilt.build_state =
1139 Self::finalize_rebuilt_index_build_state(&declared_index, &rebuilt);
1140 rebuilt.last_refresh_ms = Some(now);
1141
1142 if let Some(existing) = metadata
1143 .indexes
1144 .iter_mut()
1145 .find(|index| index.name == rebuilt.name)
1146 {
1147 *existing = rebuilt.clone();
1148 } else {
1149 metadata.indexes.push(rebuilt.clone());
1150 }
1151
1152 affected.push(rebuilt);
1153 }
1154
1155 affected
1156 })
1157 }
1158
1159 fn finalize_rebuilt_index_build_state(
1160 declared: &PhysicalIndexState,
1161 rebuilt: &PhysicalIndexState,
1162 ) -> String {
1163 if !rebuilt.enabled {
1164 return "disabled".to_string();
1165 }
1166
1167 if declared.build_state == "failed" || rebuilt.build_state == "failed" {
1168 return "failed".to_string();
1169 }
1170
1171 let native_artifact_family = Self::native_artifact_kind_for_index(rebuilt.kind).is_some();
1172 if native_artifact_family {
1173 if rebuilt.artifact_root_page.is_some() && rebuilt.artifact_checksum.is_some() {
1174 return "ready".to_string();
1175 }
1176 if declared.artifact_root_page.is_some()
1177 || declared.artifact_checksum.is_some()
1178 || declared.artifact_kind.is_some()
1179 {
1180 return "stale".to_string();
1181 }
1182 return "declared-unbuilt".to_string();
1183 }
1184
1185 if rebuilt.entries > 0 {
1186 return "ready".to_string();
1187 }
1188
1189 if matches!(
1190 declared.build_state.as_str(),
1191 "stale" | "artifact-published" | "registry-loaded"
1192 ) {
1193 return "stale".to_string();
1194 }
1195
1196 "declared-unbuilt".to_string()
1197 }
1198}