1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Hll,
23 Sketch,
24 Filter,
25 Kv,
26 Config,
27 Vault,
28 Mixed,
29 TimeSeries,
30 Queue,
31 Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36 Strict,
37 SemiStructured,
38 Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43 Insert,
44 Update,
45 Delete,
46}
47
48impl SubscriptionOperation {
49 pub fn as_str(self) -> &'static str {
50 match self {
51 Self::Insert => "INSERT",
52 Self::Update => "UPDATE",
53 Self::Delete => "DELETE",
54 }
55 }
56
57 pub fn from_str(value: &str) -> Option<Self> {
58 match value.to_ascii_uppercase().as_str() {
59 "INSERT" => Some(Self::Insert),
60 "UPDATE" => Some(Self::Update),
61 "DELETE" => Some(Self::Delete),
62 _ => None,
63 }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69 pub name: String,
71 pub source: String,
72 pub target_queue: String,
73 pub ops_filter: Vec<SubscriptionOperation>,
74 pub where_filter: Option<String>,
75 pub redact_fields: Vec<String>,
76 pub enabled: bool,
77 pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
92pub enum AnalyticsOutput {
93 Communities,
94 Components,
95 Centrality,
96}
97
98impl AnalyticsOutput {
99 pub fn as_str(self) -> &'static str {
100 match self {
101 Self::Communities => "communities",
102 Self::Components => "components",
103 Self::Centrality => "centrality",
104 }
105 }
106
107 pub fn from_str(value: &str) -> Option<Self> {
108 match value.to_ascii_lowercase().as_str() {
109 "communities" => Some(Self::Communities),
110 "components" => Some(Self::Components),
111 "centrality" => Some(Self::Centrality),
112 _ => None,
113 }
114 }
115}
116
117#[derive(Debug, Clone, PartialEq)]
129pub struct AnalyticsViewDescriptor {
130 pub output: AnalyticsOutput,
131 pub algorithm: Option<String>,
135 pub resolution: Option<f64>,
137 pub max_iterations: Option<i64>,
139 pub tolerance: Option<f64>,
141}
142
143#[derive(Debug, Clone)]
144pub struct CollectionDescriptor {
145 pub name: String,
146 pub model: CollectionModel,
147 pub schema_mode: SchemaMode,
148 pub contract_present: bool,
149 pub contract_origin: Option<crate::physical::ContractOrigin>,
150 pub declared_model: Option<CollectionModel>,
151 pub observed_model: CollectionModel,
152 pub queue_mode: Option<QueueMode>,
153 pub queue_max_attempts: Option<u32>,
157 pub queue_lock_deadline_ms: Option<u64>,
159 pub queue_in_flight_cap_per_group: Option<u32>,
161 pub queue_dlq_target: Option<String>,
163 pub vector_dimension: Option<usize>,
164 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
165 pub session_key: Option<String>,
169 pub session_gap_ms: Option<u64>,
172 pub retention_duration_ms: Option<u64>,
176 pub declared_schema_mode: Option<SchemaMode>,
177 pub observed_schema_mode: SchemaMode,
178 pub entities: usize,
179 pub cross_refs: usize,
180 pub segments: usize,
181 pub indices: Vec<String>,
182 pub declared_indices: Vec<String>,
183 pub operational_indices: Vec<String>,
184 pub indexes_in_sync: bool,
185 pub missing_operational_indices: Vec<String>,
186 pub undeclared_operational_indices: Vec<String>,
187 pub queryable_index_count: usize,
188 pub indexes_requiring_rebuild_count: usize,
189 pub queryable_graph_projection_count: usize,
190 pub graph_projections_requiring_rematerialization_count: usize,
191 pub executable_analytics_job_count: usize,
192 pub analytics_jobs_requiring_rerun_count: usize,
193 pub subscriptions: Vec<SubscriptionDescriptor>,
194 pub analytics_config: Vec<AnalyticsViewDescriptor>,
198 pub resources_in_sync: bool,
199 pub attention_required: bool,
200 pub attention_score: usize,
201 pub attention_reasons: Vec<String>,
202}
203
204#[derive(Debug, Clone)]
205pub struct CatalogModelSnapshot {
206 pub summary: CatalogSnapshot,
207 pub collections: Vec<CollectionDescriptor>,
208 pub indices: IndexCatalogSnapshot,
209 pub declared_indexes: Vec<PhysicalIndexState>,
210 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
211 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
212 pub operational_indexes: Vec<PhysicalIndexState>,
213 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
214 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
215 pub index_statuses: Vec<CatalogIndexStatus>,
216 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
217 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
218 pub queryable_index_count: usize,
219 pub indexes_requiring_rebuild_count: usize,
220 pub queryable_graph_projection_count: usize,
221 pub graph_projections_requiring_rematerialization_count: usize,
222 pub executable_analytics_job_count: usize,
223 pub analytics_jobs_requiring_rerun_count: usize,
224}
225
226#[derive(Debug, Clone)]
227pub struct CatalogAttentionSummary {
228 pub collections_requiring_attention: usize,
229 pub indexes_requiring_attention: usize,
230 pub graph_projections_requiring_attention: usize,
231 pub analytics_jobs_requiring_attention: usize,
232 pub top_collection: Option<CollectionDescriptor>,
233 pub top_index: Option<CatalogIndexStatus>,
234 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
235 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
236}
237
238#[derive(Debug, Clone, Default)]
239pub struct CatalogDeclarations {
240 pub declared_indexes: Vec<PhysicalIndexState>,
241 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
242 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
243 pub operational_indexes: Vec<PhysicalIndexState>,
244 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
245 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
246}
247
248#[derive(Debug, Clone)]
249pub struct CatalogGraphProjectionStatus {
250 pub name: String,
251 pub source: Option<String>,
252 pub lifecycle_state: String,
253 pub declared: bool,
254 pub operational: bool,
255 pub in_sync: bool,
256 pub last_materialized_sequence: Option<u64>,
257 pub queryable: bool,
258 pub requires_rematerialization: bool,
259 pub dependent_job_count: usize,
260 pub active_dependent_job_count: usize,
261 pub stale_dependent_job_count: usize,
262 pub dependent_jobs_in_sync: bool,
263 pub rerun_required: bool,
264 pub attention_score: usize,
265 pub attention_reasons: Vec<String>,
266}
267
268#[derive(Debug, Clone)]
269pub struct CatalogIndexStatus {
270 pub name: String,
271 pub collection: Option<String>,
272 pub kind: String,
273 pub declared: bool,
274 pub operational: bool,
275 pub enabled: bool,
276 pub build_state: Option<String>,
277 pub artifact_state: crate::physical::ArtifactState,
278 pub lifecycle_state: String,
279 pub in_sync: bool,
280 pub queryable: bool,
281 pub requires_rebuild: bool,
282 pub attention_score: usize,
283 pub attention_reasons: Vec<String>,
284}
285
286#[derive(Debug, Clone)]
287pub struct CatalogAnalyticsJobStatus {
288 pub id: String,
289 pub kind: String,
290 pub projection: Option<String>,
291 pub state: String,
292 pub lifecycle_state: String,
293 pub declared: bool,
294 pub operational: bool,
295 pub in_sync: bool,
296 pub last_run_sequence: Option<u64>,
297 pub projection_declared: Option<bool>,
298 pub projection_operational: Option<bool>,
299 pub projection_lifecycle_state: Option<String>,
300 pub dependency_in_sync: Option<bool>,
301 pub executable: bool,
302 pub requires_rerun: bool,
303 pub attention_score: usize,
304 pub attention_reasons: Vec<String>,
305}
306
307#[derive(Debug, Clone, Default)]
308pub struct CatalogConsistencyReport {
309 pub declared_index_count: usize,
310 pub operational_index_count: usize,
311 pub declared_graph_projection_count: usize,
312 pub operational_graph_projection_count: usize,
313 pub declared_analytics_job_count: usize,
314 pub operational_analytics_job_count: usize,
315 pub missing_operational_indexes: Vec<String>,
316 pub undeclared_operational_indexes: Vec<String>,
317 pub missing_operational_graph_projections: Vec<String>,
318 pub undeclared_operational_graph_projections: Vec<String>,
319 pub missing_operational_analytics_jobs: Vec<String>,
320 pub undeclared_operational_analytics_jobs: Vec<String>,
321}
322
323pub fn snapshot_store(
324 name: &str,
325 store: &UnifiedStore,
326 index_catalog: Option<&IndexCatalog>,
327) -> CatalogModelSnapshot {
328 snapshot_store_with_declarations(name, store, index_catalog, None, None)
329}
330
331pub fn snapshot_store_with_declarations(
332 name: &str,
333 store: &UnifiedStore,
334 index_catalog: Option<&IndexCatalog>,
335 declarations: Option<&CatalogDeclarations>,
336 contracts: Option<&[CollectionContract]>,
337) -> CatalogModelSnapshot {
338 let index_statuses = index_statuses(declarations);
339 let graph_projection_statuses = graph_projection_statuses(declarations);
340 let analytics_job_statuses = analytics_job_statuses(declarations);
341
342 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
343 for (collection, entity) in store.query_all(|_| true) {
344 grouped.entry(collection).or_default().push(entity);
345 }
346 let queue_policies = queue_policies_from_grouped(&grouped);
347
348 let mut stats_by_collection = BTreeMap::new();
349 let mut collections = Vec::new();
350
351 for collection_name in store.list_collections() {
352 let entities = grouped.remove(&collection_name).unwrap_or_default();
353 let inferred_model = infer_model(&entities);
354 let inferred_schema_mode = infer_schema_mode(inferred_model);
355 let contract = collection_contract(&collection_name, contracts);
356 let model = contract
357 .map(|contract| contract.declared_model)
358 .unwrap_or(inferred_model);
359 let cross_refs = entities
360 .iter()
361 .map(|entity| entity.cross_refs().len())
362 .sum();
363 let entity_count = entities.len();
364 let manager_stats = store
365 .get_collection(&collection_name)
366 .map(|manager| manager.stats());
367 let segments = manager_stats
368 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
369 .unwrap_or(0);
370
371 stats_by_collection.insert(
372 collection_name.clone(),
373 CollectionStats {
374 entities: entity_count,
375 cross_refs,
376 segments,
377 },
378 );
379
380 let declared_indices = declared_indices(&collection_name, declarations);
381 let operational_indices =
382 operational_indices(&collection_name, index_catalog, declarations);
383 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
384 collection_index_consistency(&declared_indices, &operational_indices);
385 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
386 collection_index_readiness(&collection_name, &index_statuses);
387 let (
388 queryable_graph_projection_count,
389 graph_projections_requiring_rematerialization_count,
390 graph_projections_locally_in_sync,
391 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
392 let (
393 executable_analytics_job_count,
394 analytics_jobs_requiring_rerun_count,
395 analytics_jobs_locally_in_sync,
396 ) = collection_analytics_job_readiness(
397 &collection_name,
398 &graph_projection_statuses,
399 &analytics_job_statuses,
400 );
401 let resources_in_sync = indexes_in_sync
402 && indexes_locally_in_sync
403 && graph_projections_locally_in_sync
404 && analytics_jobs_locally_in_sync;
405 let attention_required = !resources_in_sync
406 || indexes_requiring_rebuild_count > 0
407 || graph_projections_requiring_rematerialization_count > 0
408 || analytics_jobs_requiring_rerun_count > 0;
409 let mut attention_reasons = Vec::new();
410 if !indexes_in_sync || !indexes_locally_in_sync {
411 attention_reasons.push("index_drift".to_string());
412 }
413 if indexes_requiring_rebuild_count > 0 {
414 attention_reasons.push("indexes_require_rebuild".to_string());
415 }
416 if !graph_projections_locally_in_sync {
417 attention_reasons.push("graph_projection_drift".to_string());
418 }
419 if graph_projections_requiring_rematerialization_count > 0 {
420 attention_reasons.push("graph_projections_require_rematerialization".to_string());
421 }
422 if !analytics_jobs_locally_in_sync {
423 attention_reasons.push("analytics_job_drift".to_string());
424 }
425 if analytics_jobs_requiring_rerun_count > 0 {
426 attention_reasons.push("analytics_jobs_require_rerun".to_string());
427 }
428 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
429 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
430 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
431 + usize::from(!resources_in_sync);
432
433 collections.push(CollectionDescriptor {
434 name: collection_name.clone(),
435 model,
436 schema_mode: contract
437 .map(|contract| contract.schema_mode)
438 .unwrap_or(inferred_schema_mode),
439 contract_present: contract.is_some(),
440 contract_origin: contract.map(|contract| contract.origin),
441 declared_model: contract.map(|contract| contract.declared_model),
442 observed_model: inferred_model,
443 queue_mode: if model == CollectionModel::Queue {
444 Some(
445 queue_policies
446 .get(&collection_name)
447 .map(|p| p.mode)
448 .unwrap_or_default(),
449 )
450 } else {
451 None
452 },
453 queue_max_attempts: if model == CollectionModel::Queue {
454 Some(
455 queue_policies
456 .get(&collection_name)
457 .map(|p| p.max_attempts)
458 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
459 )
460 } else {
461 None
462 },
463 queue_lock_deadline_ms: if model == CollectionModel::Queue {
464 Some(
465 queue_policies
466 .get(&collection_name)
467 .map(|p| p.lock_deadline_ms)
468 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
469 )
470 } else {
471 None
472 },
473 queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
474 Some(
475 queue_policies
476 .get(&collection_name)
477 .map(|p| p.in_flight_cap_per_group)
478 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
479 )
480 } else {
481 None
482 },
483 queue_dlq_target: if model == CollectionModel::Queue {
484 queue_policies
485 .get(&collection_name)
486 .and_then(|p| p.dlq_target.clone())
487 } else {
488 None
489 },
490 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
491 vector_metric: contract.and_then(|contract| contract.vector_metric),
492 session_key: contract.and_then(|contract| contract.session_key.clone()),
493 session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
494 retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
495 declared_schema_mode: contract.map(|contract| contract.schema_mode),
496 observed_schema_mode: inferred_schema_mode,
497 entities: entity_count,
498 cross_refs,
499 segments,
500 indices: infer_indices(&collection_name, model, index_catalog, declarations),
501 declared_indices,
502 operational_indices,
503 indexes_in_sync,
504 missing_operational_indices,
505 undeclared_operational_indices,
506 queryable_index_count,
507 indexes_requiring_rebuild_count,
508 queryable_graph_projection_count,
509 graph_projections_requiring_rematerialization_count,
510 executable_analytics_job_count,
511 analytics_jobs_requiring_rerun_count,
512 subscriptions: contract
513 .map(|contract| contract.subscriptions.clone())
514 .unwrap_or_default(),
515 analytics_config: contract
516 .map(|contract| contract.analytics_config.clone())
517 .unwrap_or_default(),
518 resources_in_sync,
519 attention_required,
520 attention_score,
521 attention_reasons,
522 });
523 }
524
525 collections.sort_by(|left, right| left.name.cmp(&right.name));
526
527 let summary = CatalogSnapshot {
528 name: name.to_string(),
529 total_entities: stats_by_collection
530 .values()
531 .map(|stats| stats.entities)
532 .sum(),
533 total_collections: stats_by_collection.len(),
534 stats_by_collection,
535 updated_at: SystemTime::now(),
536 };
537
538 CatalogModelSnapshot {
539 summary,
540 collections,
541 indices: index_catalog
542 .map(IndexCatalog::snapshot)
543 .unwrap_or_default(),
544 declared_indexes: declarations
545 .map(|declarations| declarations.declared_indexes.clone())
546 .unwrap_or_default(),
547 declared_graph_projections: declarations
548 .map(|declarations| declarations.declared_graph_projections.clone())
549 .unwrap_or_default(),
550 declared_analytics_jobs: declarations
551 .map(|declarations| declarations.declared_analytics_jobs.clone())
552 .unwrap_or_default(),
553 operational_indexes: declarations
554 .map(|declarations| declarations.operational_indexes.clone())
555 .unwrap_or_default(),
556 operational_graph_projections: declarations
557 .map(|declarations| declarations.operational_graph_projections.clone())
558 .unwrap_or_default(),
559 operational_analytics_jobs: declarations
560 .map(|declarations| declarations.operational_analytics_jobs.clone())
561 .unwrap_or_default(),
562 queryable_index_count: index_statuses
563 .iter()
564 .filter(|status| status.queryable)
565 .count(),
566 indexes_requiring_rebuild_count: index_statuses
567 .iter()
568 .filter(|status| status.requires_rebuild)
569 .count(),
570 queryable_graph_projection_count: graph_projection_statuses
571 .iter()
572 .filter(|status| status.queryable)
573 .count(),
574 graph_projections_requiring_rematerialization_count: graph_projection_statuses
575 .iter()
576 .filter(|status| status.requires_rematerialization)
577 .count(),
578 executable_analytics_job_count: analytics_job_statuses
579 .iter()
580 .filter(|status| status.executable)
581 .count(),
582 analytics_jobs_requiring_rerun_count: analytics_job_statuses
583 .iter()
584 .filter(|status| status.requires_rerun)
585 .count(),
586 index_statuses,
587 graph_projection_statuses,
588 analytics_job_statuses,
589 }
590}
591
592#[derive(Debug, Clone)]
596pub struct QueuePolicySnapshot {
597 pub mode: QueueMode,
598 pub max_attempts: u32,
599 pub lock_deadline_ms: u64,
600 pub in_flight_cap_per_group: u32,
601 pub dlq_target: Option<String>,
602}
603
604fn queue_policies_from_grouped(
605 grouped: &HashMap<String, Vec<UnifiedEntity>>,
606) -> HashMap<String, QueuePolicySnapshot> {
607 let Some(meta) = grouped.get("red_queue_meta") else {
608 return HashMap::new();
609 };
610
611 let mut policies = HashMap::new();
612 for entity in meta {
613 let Some(row) = entity.data.as_row() else {
614 continue;
615 };
616 if row_text(row, "kind").as_deref() != Some("queue_config") {
617 continue;
618 }
619 let Some(queue) = row_text(row, "queue") else {
620 continue;
621 };
622 let mode = row_text(row, "mode")
623 .as_deref()
624 .and_then(QueueMode::parse)
625 .unwrap_or_default();
626 let max_attempts = row_u64(row, "max_attempts")
627 .map(|v| v as u32)
628 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
629 let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
630 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
631 let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
632 .map(|v| v as u32)
633 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
634 let dlq_target = row_text(row, "dlq");
635 policies.insert(
636 queue,
637 QueuePolicySnapshot {
638 mode,
639 max_attempts,
640 lock_deadline_ms,
641 in_flight_cap_per_group,
642 dlq_target,
643 },
644 );
645 }
646 policies
647}
648
649fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
650 match row.get_field(key) {
651 Some(Value::UnsignedInteger(v)) => Some(*v),
652 Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
653 _ => None,
654 }
655}
656
657fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
658 match row.get_field(key) {
659 Some(Value::Text(value)) => Some(value.to_string()),
660 _ => None,
661 }
662}
663
664fn collection_contract<'a>(
665 collection_name: &str,
666 contracts: Option<&'a [CollectionContract]>,
667) -> Option<&'a CollectionContract> {
668 contracts.and_then(|contracts| {
669 contracts
670 .iter()
671 .find(|contract| contract.name == collection_name)
672 })
673}
674
675fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
676 let mut has_table = false;
677 let mut has_graph = false;
678 let mut has_vector = false;
679
680 for entity in entities {
681 match &entity.kind {
682 EntityKind::TableRow { .. } => has_table = true,
683 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
684 EntityKind::Vector { .. } => has_vector = true,
685 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
686 }
687 }
688
689 match (has_table, has_graph, has_vector) {
690 (true, false, false) => CollectionModel::Table,
691 (false, true, false) => CollectionModel::Graph,
692 (false, false, true) => CollectionModel::Vector,
693 _ => CollectionModel::Mixed,
694 }
695}
696
697fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
698 match model {
699 CollectionModel::Table => SchemaMode::Strict,
700 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
701 CollectionModel::Document
702 | CollectionModel::Hll
703 | CollectionModel::Sketch
704 | CollectionModel::Filter
705 | CollectionModel::Kv
706 | CollectionModel::Config
707 | CollectionModel::Vault
708 | CollectionModel::Mixed => SchemaMode::Dynamic,
709 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
710 CollectionModel::Queue => SchemaMode::Dynamic,
711 }
712}
713
714fn infer_indices(
715 collection_name: &str,
716 model: CollectionModel,
717 index_catalog: Option<&IndexCatalog>,
718 declarations: Option<&CatalogDeclarations>,
719) -> Vec<String> {
720 let declared = declared_indices(collection_name, declarations);
721 if !declared.is_empty() {
722 return declared;
723 }
724
725 let available = index_catalog
726 .map(IndexCatalog::snapshot)
727 .unwrap_or_default();
728 let mut selected = Vec::new();
729
730 for metric in available {
731 let relevant = matches!(
732 (model, metric.kind),
733 (_, IndexKind::BTree)
734 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
735 | (CollectionModel::Vector, IndexKind::VectorHnsw)
736 | (CollectionModel::Vector, IndexKind::VectorInverted)
737 | (CollectionModel::Vector, IndexKind::VectorTurbo)
738 | (CollectionModel::Document, IndexKind::FullText)
739 | (CollectionModel::Document, IndexKind::DocumentPathValue)
740 | (CollectionModel::Kv, IndexKind::Hash)
741 | (CollectionModel::Config, IndexKind::Hash)
742 | (CollectionModel::Vault, IndexKind::Hash)
743 | (CollectionModel::Mixed, _)
744 );
745
746 if relevant && metric.enabled {
747 selected.push(metric.name);
748 }
749 }
750
751 selected
752}
753
754fn declared_indices(
755 collection_name: &str,
756 declarations: Option<&CatalogDeclarations>,
757) -> Vec<String> {
758 let mut selected = declarations
759 .map(|declarations| {
760 declarations
761 .declared_indexes
762 .iter()
763 .filter(|index| index.collection.as_deref() == Some(collection_name))
764 .map(|index| index.name.clone())
765 .collect::<Vec<_>>()
766 })
767 .unwrap_or_default();
768 selected.sort();
769 selected.dedup();
770 selected
771}
772
773fn operational_indices(
774 collection_name: &str,
775 index_catalog: Option<&IndexCatalog>,
776 declarations: Option<&CatalogDeclarations>,
777) -> Vec<String> {
778 if let Some(declarations) = declarations {
779 let mut selected = declarations
780 .operational_indexes
781 .iter()
782 .filter(|index| index.collection.as_deref() == Some(collection_name))
783 .map(|index| index.name.clone())
784 .collect::<Vec<_>>();
785 selected.sort();
786 selected.dedup();
787 return selected;
788 }
789
790 let mut selected = index_catalog
791 .map(IndexCatalog::snapshot)
792 .unwrap_or_default()
793 .into_iter()
794 .filter(|metric| metric.enabled)
795 .filter_map(|metric| {
796 if metric.name.starts_with(collection_name) {
797 Some(metric.name)
798 } else {
799 None
800 }
801 })
802 .collect::<Vec<_>>();
803 selected.sort();
804 selected.dedup();
805 selected
806}
807
808fn collection_index_consistency(
809 declared_indices: &[String],
810 operational_indices: &[String],
811) -> (bool, Vec<String>, Vec<String>) {
812 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
813 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
814 let missing_operational_indices = declared
815 .difference(&operational)
816 .cloned()
817 .collect::<Vec<_>>();
818 let undeclared_operational_indices = operational
819 .difference(&declared)
820 .cloned()
821 .collect::<Vec<_>>();
822 (
823 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
824 missing_operational_indices,
825 undeclared_operational_indices,
826 )
827}
828
829fn collection_index_readiness(
830 collection_name: &str,
831 statuses: &[CatalogIndexStatus],
832) -> (usize, usize, bool) {
833 let local = statuses
834 .iter()
835 .filter(|status| status.collection.as_deref() == Some(collection_name))
836 .collect::<Vec<_>>();
837 let queryable_count = local.iter().filter(|status| status.queryable).count();
838 let requires_rebuild_count = local
839 .iter()
840 .filter(|status| status.requires_rebuild)
841 .count();
842 let locally_in_sync = local.iter().all(|status| status.in_sync);
843 (queryable_count, requires_rebuild_count, locally_in_sync)
844}
845
846fn collection_graph_projection_readiness(
847 collection_name: &str,
848 statuses: &[CatalogGraphProjectionStatus],
849) -> (usize, usize, bool) {
850 let local = statuses
851 .iter()
852 .filter(|status| status.source.as_deref() == Some(collection_name))
853 .collect::<Vec<_>>();
854 let queryable_count = local.iter().filter(|status| status.queryable).count();
855 let requires_rematerialization_count = local
856 .iter()
857 .filter(|status| status.requires_rematerialization)
858 .count();
859 let locally_in_sync = local
860 .iter()
861 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
862 (
863 queryable_count,
864 requires_rematerialization_count,
865 locally_in_sync,
866 )
867}
868
869fn collection_analytics_job_readiness(
870 collection_name: &str,
871 projection_statuses: &[CatalogGraphProjectionStatus],
872 job_statuses: &[CatalogAnalyticsJobStatus],
873) -> (usize, usize, bool) {
874 let local = job_statuses
875 .iter()
876 .filter(|status| {
877 let Some(projection_name) = status.projection.as_deref() else {
878 return false;
879 };
880 projection_statuses
881 .iter()
882 .find(|projection| projection.name == projection_name)
883 .and_then(|projection| projection.source.as_deref())
884 == Some(collection_name)
885 })
886 .collect::<Vec<_>>();
887 let executable_count = local.iter().filter(|status| status.executable).count();
888 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
889 let locally_in_sync = local
890 .iter()
891 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
892 (executable_count, requires_rerun_count, locally_in_sync)
893}
894
895fn graph_projection_statuses(
896 declarations: Option<&CatalogDeclarations>,
897) -> Vec<CatalogGraphProjectionStatus> {
898 let declared = declarations
899 .map(|declarations| declarations.declared_graph_projections.as_slice())
900 .unwrap_or(&[]);
901 let operational = declarations
902 .map(|declarations| declarations.operational_graph_projections.as_slice())
903 .unwrap_or(&[]);
904 let declared_jobs = declarations
905 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
906 .unwrap_or(&[]);
907 let operational_jobs = declarations
908 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
909 .unwrap_or(&[]);
910
911 let mut names = declared
912 .iter()
913 .map(|projection| projection.name.clone())
914 .chain(operational.iter().map(|projection| projection.name.clone()))
915 .collect::<Vec<_>>();
916 names.sort();
917 names.dedup();
918
919 names
920 .into_iter()
921 .map(|name| {
922 let declared_projection = declared.iter().find(|projection| projection.name == name);
923 let operational_projection = operational
924 .iter()
925 .find(|projection| projection.name == name);
926 let declared_present = declared_projection.is_some();
927 let operational_present = operational_projection.is_some();
928 let mut dependent_job_ids = BTreeSet::new();
929 let mut dependent_job_count = 0usize;
930 let mut active_dependent_job_count = 0usize;
931 let mut stale_dependent_job_count = 0usize;
932 for job in declared_jobs
933 .iter()
934 .chain(operational_jobs.iter())
935 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
936 {
937 if !dependent_job_ids.insert(job.id.clone()) {
938 continue;
939 }
940 dependent_job_count += 1;
941 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
942 active_dependent_job_count += 1;
943 }
944 if job.state == "stale" {
945 stale_dependent_job_count += 1;
946 }
947 }
948 let lifecycle_state = match (
949 declared_present,
950 operational_present,
951 declared_projection
952 .map(|projection| projection.state.as_str())
953 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
954 .unwrap_or_default(),
955 ) {
956 (true, _, "failed") => "failed",
957 (true, true, "stale") => "stale",
958 (true, _, "materializing") => "materializing",
959 (true, true, "materialized") => "materialized",
960 (true, true, _) => "materialized",
961 (false, true, _) => "orphaned-operational",
962 (true, false, _) => "declared",
963 (false, false, _) => "unknown",
964 }
965 .to_string();
966 let queryable = declared_present
967 && operational_present
968 && matches!(
969 declared_projection
970 .map(|projection| projection.state.as_str())
971 .or_else(
972 || operational_projection.map(|projection| projection.state.as_str())
973 )
974 .unwrap_or_default(),
975 "materialized"
976 );
977 let requires_rematerialization = matches!(
978 declared_projection
979 .map(|projection| projection.state.as_str())
980 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
981 .unwrap_or_default(),
982 "declared" | "materializing" | "failed" | "stale"
983 );
984 let mut attention_reasons = Vec::new();
985 if !declared_present || !operational_present {
986 attention_reasons.push("declaration_drift".to_string());
987 }
988 if requires_rematerialization {
989 attention_reasons.push("requires_rematerialization".to_string());
990 }
991 if stale_dependent_job_count > 0 {
992 attention_reasons.push("dependent_jobs_stale".to_string());
993 }
994 let attention_score = stale_dependent_job_count.saturating_mul(2)
995 + usize::from(requires_rematerialization).saturating_mul(4)
996 + usize::from(!declared_present || !operational_present)
997 + usize::from(!queryable);
998 CatalogGraphProjectionStatus {
999 name,
1000 source: declared_projection
1001 .map(|projection| projection.source.clone())
1002 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
1003 lifecycle_state,
1004 declared: declared_present,
1005 operational: operational_present,
1006 in_sync: declared_present == operational_present,
1007 last_materialized_sequence: declared_projection
1008 .and_then(|projection| projection.last_materialized_sequence)
1009 .or_else(|| {
1010 operational_projection
1011 .and_then(|projection| projection.last_materialized_sequence)
1012 }),
1013 queryable,
1014 requires_rematerialization,
1015 dependent_job_count,
1016 active_dependent_job_count,
1017 stale_dependent_job_count,
1018 dependent_jobs_in_sync: stale_dependent_job_count == 0,
1019 rerun_required: stale_dependent_job_count > 0,
1020 attention_score,
1021 attention_reasons,
1022 }
1023 })
1024 .collect()
1025}
1026
1027fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
1028 let declared = declarations
1029 .map(|declarations| declarations.declared_indexes.as_slice())
1030 .unwrap_or(&[]);
1031 let operational = declarations
1032 .map(|declarations| declarations.operational_indexes.as_slice())
1033 .unwrap_or(&[]);
1034
1035 let mut names = declared
1036 .iter()
1037 .map(|index| index.name.clone())
1038 .chain(operational.iter().map(|index| index.name.clone()))
1039 .collect::<Vec<_>>();
1040 names.sort();
1041 names.dedup();
1042
1043 names
1044 .into_iter()
1045 .map(|name| {
1046 let declared_index = declared.iter().find(|index| index.name == name);
1047 let operational_index = operational.iter().find(|index| index.name == name);
1048 let collection = declared_index
1049 .and_then(|index| index.collection.clone())
1050 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
1051 let kind = declared_index
1052 .map(|index| index_kind_string(index.kind))
1053 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
1054 .unwrap_or_default();
1055 let enabled = declared_index
1056 .map(|index| index.enabled)
1057 .or_else(|| operational_index.map(|index| index.enabled))
1058 .unwrap_or(false);
1059 let build_state = operational_index
1060 .map(|index| index.build_state.clone())
1061 .or_else(|| declared_index.map(|index| index.build_state.clone()));
1062 let declared_present = declared_index.is_some();
1063 let operational_present = operational_index.is_some();
1064 let lifecycle_state = index_lifecycle_state(
1065 declared_present,
1066 operational_present,
1067 enabled,
1068 build_state.as_deref(),
1069 );
1070
1071 let mut attention_reasons = Vec::new();
1072 if declared_present != operational_present {
1073 attention_reasons.push("declaration_drift".to_string());
1074 }
1075 if !enabled && declared_present {
1076 attention_reasons.push("disabled".to_string());
1077 }
1078 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1079 attention_reasons.push("failed".to_string());
1080 }
1081 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1082 attention_reasons.push("stale".to_string());
1083 }
1084 if matches!(
1085 build_state.as_deref().unwrap_or_default(),
1086 "building" | "declared-unbuilt"
1087 ) {
1088 attention_reasons.push("requires_rebuild".to_string());
1089 }
1090 let queryable = declared_present
1091 && operational_present
1092 && enabled
1093 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1094 let requires_rebuild = matches!(
1095 build_state.as_deref().unwrap_or_default(),
1096 "declared-unbuilt" | "building" | "stale" | "failed"
1097 );
1098 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1099 + usize::from(!enabled && declared_present)
1100 + usize::from(declared_present != operational_present)
1101 + usize::from(!queryable);
1102
1103 let artifact_state = crate::physical::ArtifactState::from_build_state(
1104 build_state.as_deref().unwrap_or("declared-unbuilt"),
1105 enabled,
1106 );
1107 CatalogIndexStatus {
1108 name,
1109 collection,
1110 kind,
1111 declared: declared_present,
1112 operational: operational_present,
1113 enabled,
1114 build_state,
1115 artifact_state,
1116 lifecycle_state,
1117 in_sync: declared_present == operational_present,
1118 queryable,
1119 requires_rebuild,
1120 attention_score,
1121 attention_reasons,
1122 }
1123 })
1124 .collect()
1125}
1126
1127fn index_lifecycle_state(
1128 declared: bool,
1129 operational: bool,
1130 enabled: bool,
1131 build_state: Option<&str>,
1132) -> String {
1133 if !declared && operational {
1134 return "orphaned-operational".to_string();
1135 }
1136 if declared && !enabled {
1137 return "disabled".to_string();
1138 }
1139 if !declared {
1140 return "unknown".to_string();
1141 }
1142 if !operational {
1143 return "declared".to_string();
1144 }
1145
1146 match build_state.unwrap_or_default() {
1147 "ready" => "ready".to_string(),
1148 "failed" => "failed".to_string(),
1149 "stale" => "stale".to_string(),
1150 "declared-unbuilt" => "declared".to_string(),
1151 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1152 "building".to_string()
1153 }
1154 _ => "building".to_string(),
1155 }
1156}
1157
1158fn index_kind_string(kind: IndexKind) -> String {
1159 match kind {
1160 IndexKind::BTree => "btree",
1161 IndexKind::Hash => "hash",
1162 IndexKind::Bitmap => "bitmap",
1163 IndexKind::Spatial => "spatial.rtree",
1164 IndexKind::VectorHnsw => "vector.hnsw",
1165 IndexKind::VectorInverted => "vector.inverted",
1166 IndexKind::VectorTurbo => "vector.turbo",
1167 IndexKind::GraphAdjacency => "graph.adjacency",
1168 IndexKind::FullText => "text.fulltext",
1169 IndexKind::DocumentPathValue => "document.pathvalue",
1170 IndexKind::HybridSearch => "search.hybrid",
1171 }
1172 .to_string()
1173}
1174
1175fn analytics_job_statuses(
1176 declarations: Option<&CatalogDeclarations>,
1177) -> Vec<CatalogAnalyticsJobStatus> {
1178 let declared = declarations
1179 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1180 .unwrap_or(&[]);
1181 let operational = declarations
1182 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1183 .unwrap_or(&[]);
1184 let declared_projections = declarations
1185 .map(|declarations| declarations.declared_graph_projections.as_slice())
1186 .unwrap_or(&[]);
1187 let operational_projections = declarations
1188 .map(|declarations| declarations.operational_graph_projections.as_slice())
1189 .unwrap_or(&[]);
1190
1191 let mut ids = declared
1192 .iter()
1193 .map(|job| job.id.clone())
1194 .chain(operational.iter().map(|job| job.id.clone()))
1195 .collect::<Vec<_>>();
1196 ids.sort();
1197 ids.dedup();
1198
1199 ids.into_iter()
1200 .map(|id| {
1201 let declared_job = declared.iter().find(|job| job.id == id);
1202 let operational_job = operational.iter().find(|job| job.id == id);
1203 let kind = declared_job
1204 .map(|job| job.kind.clone())
1205 .or_else(|| operational_job.map(|job| job.kind.clone()))
1206 .unwrap_or_default();
1207 let projection = declared_job
1208 .and_then(|job| job.projection.clone())
1209 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1210 let state = declared_job
1211 .map(|job| job.state.clone())
1212 .or_else(|| operational_job.map(|job| job.state.clone()))
1213 .unwrap_or_default();
1214 let declared_present = declared_job.is_some();
1215 let operational_present = operational_job.is_some();
1216 let last_run_sequence = declared_job
1217 .and_then(|job| job.last_run_sequence)
1218 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1219 let projection_declared = projection.as_ref().map(|projection_name| {
1220 declared_projections
1221 .iter()
1222 .any(|projection| projection.name == *projection_name)
1223 });
1224 let projection_operational = projection.as_ref().map(|projection_name| {
1225 operational_projections
1226 .iter()
1227 .any(|projection| projection.name == *projection_name)
1228 });
1229 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1230 declared_projections
1231 .iter()
1232 .find(|projection| projection.name == *projection_name)
1233 .map(|projection| projection.state.as_str())
1234 .or_else(|| {
1235 operational_projections
1236 .iter()
1237 .find(|projection| projection.name == *projection_name)
1238 .map(|projection| projection.state.as_str())
1239 })
1240 });
1241 let dependency_in_sync = projection.as_ref().map(|_| {
1242 matches!(projection_lifecycle, Some("materialized"))
1243 && projection_operational == Some(true)
1244 });
1245 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1246 (true, false, _) => "declared",
1247 (true, true, _)
1248 if matches!(
1249 projection_lifecycle,
1250 Some("stale" | "failed" | "materializing" | "declared")
1251 ) =>
1252 {
1253 "stale"
1254 }
1255 (true, true, "completed") => "completed",
1256 (true, true, "running") => "running",
1257 (true, true, "failed") => "failed",
1258 (true, true, "queued") => "queued",
1259 (true, true, "stale") => "stale",
1260 (true, true, _) => "materialized",
1261 (false, true, _) => "orphaned-operational",
1262 (false, false, _) => "unknown",
1263 }
1264 .to_string();
1265 let executable = declared_present
1266 && operational_present
1267 && !matches!(state.as_str(), "failed" | "stale")
1268 && dependency_in_sync.unwrap_or(true);
1269 let requires_rerun =
1270 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1271 let mut attention_reasons = Vec::new();
1272 if declared_present != operational_present {
1273 attention_reasons.push("declaration_drift".to_string());
1274 }
1275 if matches!(state.as_str(), "failed") {
1276 attention_reasons.push("failed".to_string());
1277 }
1278 if matches!(state.as_str(), "stale") {
1279 attention_reasons.push("stale".to_string());
1280 }
1281 if dependency_in_sync == Some(false) {
1282 attention_reasons.push("dependency_drift".to_string());
1283 }
1284 if requires_rerun {
1285 attention_reasons.push("requires_rerun".to_string());
1286 }
1287 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1288 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1289 + usize::from(declared_present != operational_present)
1290 + usize::from(!executable);
1291 CatalogAnalyticsJobStatus {
1292 id,
1293 kind,
1294 projection,
1295 state: state.clone(),
1296 lifecycle_state,
1297 declared: declared_present,
1298 operational: operational_present,
1299 in_sync: declared_present == operational_present,
1300 last_run_sequence,
1301 projection_declared,
1302 projection_operational,
1303 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1304 dependency_in_sync,
1305 executable,
1306 requires_rerun,
1307 attention_score,
1308 attention_reasons,
1309 }
1310 })
1311 .collect()
1312}
1313
1314pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1315 let declared_indexes = snapshot
1316 .declared_indexes
1317 .iter()
1318 .map(|index| index.name.clone())
1319 .collect::<BTreeSet<_>>();
1320 let operational_indexes = snapshot
1321 .operational_indexes
1322 .iter()
1323 .map(|index| index.name.clone())
1324 .collect::<BTreeSet<_>>();
1325 let declared_graph_projections = snapshot
1326 .declared_graph_projections
1327 .iter()
1328 .map(|projection| projection.name.clone())
1329 .collect::<BTreeSet<_>>();
1330 let operational_graph_projections = snapshot
1331 .operational_graph_projections
1332 .iter()
1333 .map(|projection| projection.name.clone())
1334 .collect::<BTreeSet<_>>();
1335 let declared_analytics_jobs = snapshot
1336 .declared_analytics_jobs
1337 .iter()
1338 .map(|job| job.id.clone())
1339 .collect::<BTreeSet<_>>();
1340 let operational_analytics_jobs = snapshot
1341 .operational_analytics_jobs
1342 .iter()
1343 .map(|job| job.id.clone())
1344 .collect::<BTreeSet<_>>();
1345
1346 CatalogConsistencyReport {
1347 declared_index_count: declared_indexes.len(),
1348 operational_index_count: operational_indexes.len(),
1349 declared_graph_projection_count: declared_graph_projections.len(),
1350 operational_graph_projection_count: operational_graph_projections.len(),
1351 declared_analytics_job_count: declared_analytics_jobs.len(),
1352 operational_analytics_job_count: operational_analytics_jobs.len(),
1353 missing_operational_indexes: declared_indexes
1354 .difference(&operational_indexes)
1355 .cloned()
1356 .collect(),
1357 undeclared_operational_indexes: operational_indexes
1358 .difference(&declared_indexes)
1359 .cloned()
1360 .collect(),
1361 missing_operational_graph_projections: declared_graph_projections
1362 .difference(&operational_graph_projections)
1363 .cloned()
1364 .collect(),
1365 undeclared_operational_graph_projections: operational_graph_projections
1366 .difference(&declared_graph_projections)
1367 .cloned()
1368 .collect(),
1369 missing_operational_analytics_jobs: declared_analytics_jobs
1370 .difference(&operational_analytics_jobs)
1371 .cloned()
1372 .collect(),
1373 undeclared_operational_analytics_jobs: operational_analytics_jobs
1374 .difference(&declared_analytics_jobs)
1375 .cloned()
1376 .collect(),
1377 }
1378}
1379
1380pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1381 CatalogAttentionSummary {
1382 collections_requiring_attention: snapshot
1383 .collections
1384 .iter()
1385 .filter(|collection| collection.attention_required)
1386 .count(),
1387 indexes_requiring_attention: snapshot
1388 .index_statuses
1389 .iter()
1390 .filter(|status| status.attention_score > 0)
1391 .count(),
1392 graph_projections_requiring_attention: snapshot
1393 .graph_projection_statuses
1394 .iter()
1395 .filter(|status| status.attention_score > 0)
1396 .count(),
1397 analytics_jobs_requiring_attention: snapshot
1398 .analytics_job_statuses
1399 .iter()
1400 .filter(|status| status.attention_score > 0)
1401 .count(),
1402 top_collection: snapshot
1403 .collections
1404 .iter()
1405 .filter(|collection| collection.attention_score > 0)
1406 .max_by_key(|collection| collection.attention_score)
1407 .cloned(),
1408 top_index: snapshot
1409 .index_statuses
1410 .iter()
1411 .filter(|status| status.attention_score > 0)
1412 .max_by_key(|status| status.attention_score)
1413 .cloned(),
1414 top_graph_projection: snapshot
1415 .graph_projection_statuses
1416 .iter()
1417 .filter(|status| status.attention_score > 0)
1418 .max_by_key(|status| status.attention_score)
1419 .cloned(),
1420 top_analytics_job: snapshot
1421 .analytics_job_statuses
1422 .iter()
1423 .filter(|status| status.attention_score > 0)
1424 .max_by_key(|status| status.attention_score)
1425 .cloned(),
1426 }
1427}
1428
1429pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1430 let mut collections = snapshot
1431 .collections
1432 .iter()
1433 .filter(|collection| collection.attention_required)
1434 .cloned()
1435 .collect::<Vec<_>>();
1436 collections.sort_by(|left, right| {
1437 right
1438 .attention_score
1439 .cmp(&left.attention_score)
1440 .then_with(|| left.name.cmp(&right.name))
1441 });
1442 collections
1443}
1444
1445pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1446 let mut statuses = snapshot
1447 .index_statuses
1448 .iter()
1449 .filter(|status| status.attention_score > 0)
1450 .cloned()
1451 .collect::<Vec<_>>();
1452 statuses.sort_by(|left, right| {
1453 right
1454 .attention_score
1455 .cmp(&left.attention_score)
1456 .then_with(|| left.name.cmp(&right.name))
1457 });
1458 statuses
1459}
1460
1461pub fn graph_projection_attention(
1462 snapshot: &CatalogModelSnapshot,
1463) -> Vec<CatalogGraphProjectionStatus> {
1464 let mut statuses = snapshot
1465 .graph_projection_statuses
1466 .iter()
1467 .filter(|status| status.attention_score > 0)
1468 .cloned()
1469 .collect::<Vec<_>>();
1470 statuses.sort_by(|left, right| {
1471 right
1472 .attention_score
1473 .cmp(&left.attention_score)
1474 .then_with(|| left.name.cmp(&right.name))
1475 });
1476 statuses
1477}
1478
1479pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1480 let mut statuses = snapshot
1481 .analytics_job_statuses
1482 .iter()
1483 .filter(|status| status.attention_score > 0)
1484 .cloned()
1485 .collect::<Vec<_>>();
1486 statuses.sort_by(|left, right| {
1487 right
1488 .attention_score
1489 .cmp(&left.attention_score)
1490 .then_with(|| left.id.cmp(&right.id))
1491 });
1492 statuses
1493}