1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Hll,
23 Sketch,
24 Filter,
25 Kv,
26 Config,
27 Vault,
28 Mixed,
29 TimeSeries,
30 Queue,
31 Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36 Strict,
37 SemiStructured,
38 Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43 Insert,
44 Update,
45 Delete,
46}
47
48impl SubscriptionOperation {
49 pub fn as_str(self) -> &'static str {
50 match self {
51 Self::Insert => "INSERT",
52 Self::Update => "UPDATE",
53 Self::Delete => "DELETE",
54 }
55 }
56
57 pub fn from_str(value: &str) -> Option<Self> {
58 match value.to_ascii_uppercase().as_str() {
59 "INSERT" => Some(Self::Insert),
60 "UPDATE" => Some(Self::Update),
61 "DELETE" => Some(Self::Delete),
62 _ => None,
63 }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69 pub name: String,
71 pub source: String,
72 pub target_queue: String,
73 pub ops_filter: Vec<SubscriptionOperation>,
74 pub where_filter: Option<String>,
75 pub redact_fields: Vec<String>,
76 pub enabled: bool,
77 pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct CollectionDescriptor {
86 pub name: String,
87 pub model: CollectionModel,
88 pub schema_mode: SchemaMode,
89 pub contract_present: bool,
90 pub contract_origin: Option<crate::physical::ContractOrigin>,
91 pub declared_model: Option<CollectionModel>,
92 pub observed_model: CollectionModel,
93 pub queue_mode: Option<QueueMode>,
94 pub queue_max_attempts: Option<u32>,
98 pub queue_lock_deadline_ms: Option<u64>,
100 pub queue_in_flight_cap_per_group: Option<u32>,
102 pub queue_dlq_target: Option<String>,
104 pub vector_dimension: Option<usize>,
105 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
106 pub session_key: Option<String>,
110 pub session_gap_ms: Option<u64>,
113 pub retention_duration_ms: Option<u64>,
117 pub declared_schema_mode: Option<SchemaMode>,
118 pub observed_schema_mode: SchemaMode,
119 pub entities: usize,
120 pub cross_refs: usize,
121 pub segments: usize,
122 pub indices: Vec<String>,
123 pub declared_indices: Vec<String>,
124 pub operational_indices: Vec<String>,
125 pub indexes_in_sync: bool,
126 pub missing_operational_indices: Vec<String>,
127 pub undeclared_operational_indices: Vec<String>,
128 pub queryable_index_count: usize,
129 pub indexes_requiring_rebuild_count: usize,
130 pub queryable_graph_projection_count: usize,
131 pub graph_projections_requiring_rematerialization_count: usize,
132 pub executable_analytics_job_count: usize,
133 pub analytics_jobs_requiring_rerun_count: usize,
134 pub subscriptions: Vec<SubscriptionDescriptor>,
135 pub resources_in_sync: bool,
136 pub attention_required: bool,
137 pub attention_score: usize,
138 pub attention_reasons: Vec<String>,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogModelSnapshot {
143 pub summary: CatalogSnapshot,
144 pub collections: Vec<CollectionDescriptor>,
145 pub indices: IndexCatalogSnapshot,
146 pub declared_indexes: Vec<PhysicalIndexState>,
147 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
148 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
149 pub operational_indexes: Vec<PhysicalIndexState>,
150 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
151 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
152 pub index_statuses: Vec<CatalogIndexStatus>,
153 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
154 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
155 pub queryable_index_count: usize,
156 pub indexes_requiring_rebuild_count: usize,
157 pub queryable_graph_projection_count: usize,
158 pub graph_projections_requiring_rematerialization_count: usize,
159 pub executable_analytics_job_count: usize,
160 pub analytics_jobs_requiring_rerun_count: usize,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogAttentionSummary {
165 pub collections_requiring_attention: usize,
166 pub indexes_requiring_attention: usize,
167 pub graph_projections_requiring_attention: usize,
168 pub analytics_jobs_requiring_attention: usize,
169 pub top_collection: Option<CollectionDescriptor>,
170 pub top_index: Option<CatalogIndexStatus>,
171 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
172 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
173}
174
175#[derive(Debug, Clone, Default)]
176pub struct CatalogDeclarations {
177 pub declared_indexes: Vec<PhysicalIndexState>,
178 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
179 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
180 pub operational_indexes: Vec<PhysicalIndexState>,
181 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
182 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
183}
184
185#[derive(Debug, Clone)]
186pub struct CatalogGraphProjectionStatus {
187 pub name: String,
188 pub source: Option<String>,
189 pub lifecycle_state: String,
190 pub declared: bool,
191 pub operational: bool,
192 pub in_sync: bool,
193 pub last_materialized_sequence: Option<u64>,
194 pub queryable: bool,
195 pub requires_rematerialization: bool,
196 pub dependent_job_count: usize,
197 pub active_dependent_job_count: usize,
198 pub stale_dependent_job_count: usize,
199 pub dependent_jobs_in_sync: bool,
200 pub rerun_required: bool,
201 pub attention_score: usize,
202 pub attention_reasons: Vec<String>,
203}
204
205#[derive(Debug, Clone)]
206pub struct CatalogIndexStatus {
207 pub name: String,
208 pub collection: Option<String>,
209 pub kind: String,
210 pub declared: bool,
211 pub operational: bool,
212 pub enabled: bool,
213 pub build_state: Option<String>,
214 pub artifact_state: crate::physical::ArtifactState,
215 pub lifecycle_state: String,
216 pub in_sync: bool,
217 pub queryable: bool,
218 pub requires_rebuild: bool,
219 pub attention_score: usize,
220 pub attention_reasons: Vec<String>,
221}
222
223#[derive(Debug, Clone)]
224pub struct CatalogAnalyticsJobStatus {
225 pub id: String,
226 pub kind: String,
227 pub projection: Option<String>,
228 pub state: String,
229 pub lifecycle_state: String,
230 pub declared: bool,
231 pub operational: bool,
232 pub in_sync: bool,
233 pub last_run_sequence: Option<u64>,
234 pub projection_declared: Option<bool>,
235 pub projection_operational: Option<bool>,
236 pub projection_lifecycle_state: Option<String>,
237 pub dependency_in_sync: Option<bool>,
238 pub executable: bool,
239 pub requires_rerun: bool,
240 pub attention_score: usize,
241 pub attention_reasons: Vec<String>,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct CatalogConsistencyReport {
246 pub declared_index_count: usize,
247 pub operational_index_count: usize,
248 pub declared_graph_projection_count: usize,
249 pub operational_graph_projection_count: usize,
250 pub declared_analytics_job_count: usize,
251 pub operational_analytics_job_count: usize,
252 pub missing_operational_indexes: Vec<String>,
253 pub undeclared_operational_indexes: Vec<String>,
254 pub missing_operational_graph_projections: Vec<String>,
255 pub undeclared_operational_graph_projections: Vec<String>,
256 pub missing_operational_analytics_jobs: Vec<String>,
257 pub undeclared_operational_analytics_jobs: Vec<String>,
258}
259
260pub fn snapshot_store(
261 name: &str,
262 store: &UnifiedStore,
263 index_catalog: Option<&IndexCatalog>,
264) -> CatalogModelSnapshot {
265 snapshot_store_with_declarations(name, store, index_catalog, None, None)
266}
267
268pub fn snapshot_store_with_declarations(
269 name: &str,
270 store: &UnifiedStore,
271 index_catalog: Option<&IndexCatalog>,
272 declarations: Option<&CatalogDeclarations>,
273 contracts: Option<&[CollectionContract]>,
274) -> CatalogModelSnapshot {
275 let index_statuses = index_statuses(declarations);
276 let graph_projection_statuses = graph_projection_statuses(declarations);
277 let analytics_job_statuses = analytics_job_statuses(declarations);
278
279 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
280 for (collection, entity) in store.query_all(|_| true) {
281 grouped.entry(collection).or_default().push(entity);
282 }
283 let queue_policies = queue_policies_from_grouped(&grouped);
284
285 let mut stats_by_collection = BTreeMap::new();
286 let mut collections = Vec::new();
287
288 for collection_name in store.list_collections() {
289 let entities = grouped.remove(&collection_name).unwrap_or_default();
290 let inferred_model = infer_model(&entities);
291 let inferred_schema_mode = infer_schema_mode(inferred_model);
292 let contract = collection_contract(&collection_name, contracts);
293 let model = contract
294 .map(|contract| contract.declared_model)
295 .unwrap_or(inferred_model);
296 let cross_refs = entities
297 .iter()
298 .map(|entity| entity.cross_refs().len())
299 .sum();
300 let entity_count = entities.len();
301 let manager_stats = store
302 .get_collection(&collection_name)
303 .map(|manager| manager.stats());
304 let segments = manager_stats
305 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
306 .unwrap_or(0);
307
308 stats_by_collection.insert(
309 collection_name.clone(),
310 CollectionStats {
311 entities: entity_count,
312 cross_refs,
313 segments,
314 },
315 );
316
317 let declared_indices = declared_indices(&collection_name, declarations);
318 let operational_indices =
319 operational_indices(&collection_name, index_catalog, declarations);
320 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
321 collection_index_consistency(&declared_indices, &operational_indices);
322 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
323 collection_index_readiness(&collection_name, &index_statuses);
324 let (
325 queryable_graph_projection_count,
326 graph_projections_requiring_rematerialization_count,
327 graph_projections_locally_in_sync,
328 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
329 let (
330 executable_analytics_job_count,
331 analytics_jobs_requiring_rerun_count,
332 analytics_jobs_locally_in_sync,
333 ) = collection_analytics_job_readiness(
334 &collection_name,
335 &graph_projection_statuses,
336 &analytics_job_statuses,
337 );
338 let resources_in_sync = indexes_in_sync
339 && indexes_locally_in_sync
340 && graph_projections_locally_in_sync
341 && analytics_jobs_locally_in_sync;
342 let attention_required = !resources_in_sync
343 || indexes_requiring_rebuild_count > 0
344 || graph_projections_requiring_rematerialization_count > 0
345 || analytics_jobs_requiring_rerun_count > 0;
346 let mut attention_reasons = Vec::new();
347 if !indexes_in_sync || !indexes_locally_in_sync {
348 attention_reasons.push("index_drift".to_string());
349 }
350 if indexes_requiring_rebuild_count > 0 {
351 attention_reasons.push("indexes_require_rebuild".to_string());
352 }
353 if !graph_projections_locally_in_sync {
354 attention_reasons.push("graph_projection_drift".to_string());
355 }
356 if graph_projections_requiring_rematerialization_count > 0 {
357 attention_reasons.push("graph_projections_require_rematerialization".to_string());
358 }
359 if !analytics_jobs_locally_in_sync {
360 attention_reasons.push("analytics_job_drift".to_string());
361 }
362 if analytics_jobs_requiring_rerun_count > 0 {
363 attention_reasons.push("analytics_jobs_require_rerun".to_string());
364 }
365 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
366 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
367 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
368 + usize::from(!resources_in_sync);
369
370 collections.push(CollectionDescriptor {
371 name: collection_name.clone(),
372 model,
373 schema_mode: contract
374 .map(|contract| contract.schema_mode)
375 .unwrap_or(inferred_schema_mode),
376 contract_present: contract.is_some(),
377 contract_origin: contract.map(|contract| contract.origin),
378 declared_model: contract.map(|contract| contract.declared_model),
379 observed_model: inferred_model,
380 queue_mode: if model == CollectionModel::Queue {
381 Some(
382 queue_policies
383 .get(&collection_name)
384 .map(|p| p.mode)
385 .unwrap_or_default(),
386 )
387 } else {
388 None
389 },
390 queue_max_attempts: if model == CollectionModel::Queue {
391 Some(
392 queue_policies
393 .get(&collection_name)
394 .map(|p| p.max_attempts)
395 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
396 )
397 } else {
398 None
399 },
400 queue_lock_deadline_ms: if model == CollectionModel::Queue {
401 Some(
402 queue_policies
403 .get(&collection_name)
404 .map(|p| p.lock_deadline_ms)
405 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
406 )
407 } else {
408 None
409 },
410 queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
411 Some(
412 queue_policies
413 .get(&collection_name)
414 .map(|p| p.in_flight_cap_per_group)
415 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
416 )
417 } else {
418 None
419 },
420 queue_dlq_target: if model == CollectionModel::Queue {
421 queue_policies
422 .get(&collection_name)
423 .and_then(|p| p.dlq_target.clone())
424 } else {
425 None
426 },
427 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
428 vector_metric: contract.and_then(|contract| contract.vector_metric),
429 session_key: contract.and_then(|contract| contract.session_key.clone()),
430 session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
431 retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
432 declared_schema_mode: contract.map(|contract| contract.schema_mode),
433 observed_schema_mode: inferred_schema_mode,
434 entities: entity_count,
435 cross_refs,
436 segments,
437 indices: infer_indices(&collection_name, model, index_catalog, declarations),
438 declared_indices,
439 operational_indices,
440 indexes_in_sync,
441 missing_operational_indices,
442 undeclared_operational_indices,
443 queryable_index_count,
444 indexes_requiring_rebuild_count,
445 queryable_graph_projection_count,
446 graph_projections_requiring_rematerialization_count,
447 executable_analytics_job_count,
448 analytics_jobs_requiring_rerun_count,
449 subscriptions: contract
450 .map(|contract| contract.subscriptions.clone())
451 .unwrap_or_default(),
452 resources_in_sync,
453 attention_required,
454 attention_score,
455 attention_reasons,
456 });
457 }
458
459 collections.sort_by(|left, right| left.name.cmp(&right.name));
460
461 let summary = CatalogSnapshot {
462 name: name.to_string(),
463 total_entities: stats_by_collection
464 .values()
465 .map(|stats| stats.entities)
466 .sum(),
467 total_collections: stats_by_collection.len(),
468 stats_by_collection,
469 updated_at: SystemTime::now(),
470 };
471
472 CatalogModelSnapshot {
473 summary,
474 collections,
475 indices: index_catalog
476 .map(IndexCatalog::snapshot)
477 .unwrap_or_default(),
478 declared_indexes: declarations
479 .map(|declarations| declarations.declared_indexes.clone())
480 .unwrap_or_default(),
481 declared_graph_projections: declarations
482 .map(|declarations| declarations.declared_graph_projections.clone())
483 .unwrap_or_default(),
484 declared_analytics_jobs: declarations
485 .map(|declarations| declarations.declared_analytics_jobs.clone())
486 .unwrap_or_default(),
487 operational_indexes: declarations
488 .map(|declarations| declarations.operational_indexes.clone())
489 .unwrap_or_default(),
490 operational_graph_projections: declarations
491 .map(|declarations| declarations.operational_graph_projections.clone())
492 .unwrap_or_default(),
493 operational_analytics_jobs: declarations
494 .map(|declarations| declarations.operational_analytics_jobs.clone())
495 .unwrap_or_default(),
496 queryable_index_count: index_statuses
497 .iter()
498 .filter(|status| status.queryable)
499 .count(),
500 indexes_requiring_rebuild_count: index_statuses
501 .iter()
502 .filter(|status| status.requires_rebuild)
503 .count(),
504 queryable_graph_projection_count: graph_projection_statuses
505 .iter()
506 .filter(|status| status.queryable)
507 .count(),
508 graph_projections_requiring_rematerialization_count: graph_projection_statuses
509 .iter()
510 .filter(|status| status.requires_rematerialization)
511 .count(),
512 executable_analytics_job_count: analytics_job_statuses
513 .iter()
514 .filter(|status| status.executable)
515 .count(),
516 analytics_jobs_requiring_rerun_count: analytics_job_statuses
517 .iter()
518 .filter(|status| status.requires_rerun)
519 .count(),
520 index_statuses,
521 graph_projection_statuses,
522 analytics_job_statuses,
523 }
524}
525
526#[derive(Debug, Clone)]
530pub struct QueuePolicySnapshot {
531 pub mode: QueueMode,
532 pub max_attempts: u32,
533 pub lock_deadline_ms: u64,
534 pub in_flight_cap_per_group: u32,
535 pub dlq_target: Option<String>,
536}
537
538fn queue_policies_from_grouped(
539 grouped: &HashMap<String, Vec<UnifiedEntity>>,
540) -> HashMap<String, QueuePolicySnapshot> {
541 let Some(meta) = grouped.get("red_queue_meta") else {
542 return HashMap::new();
543 };
544
545 let mut policies = HashMap::new();
546 for entity in meta {
547 let Some(row) = entity.data.as_row() else {
548 continue;
549 };
550 if row_text(row, "kind").as_deref() != Some("queue_config") {
551 continue;
552 }
553 let Some(queue) = row_text(row, "queue") else {
554 continue;
555 };
556 let mode = row_text(row, "mode")
557 .as_deref()
558 .and_then(QueueMode::parse)
559 .unwrap_or_default();
560 let max_attempts = row_u64(row, "max_attempts")
561 .map(|v| v as u32)
562 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
563 let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
564 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
565 let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
566 .map(|v| v as u32)
567 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
568 let dlq_target = row_text(row, "dlq");
569 policies.insert(
570 queue,
571 QueuePolicySnapshot {
572 mode,
573 max_attempts,
574 lock_deadline_ms,
575 in_flight_cap_per_group,
576 dlq_target,
577 },
578 );
579 }
580 policies
581}
582
583fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
584 match row.get_field(key) {
585 Some(Value::UnsignedInteger(v)) => Some(*v),
586 Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
587 _ => None,
588 }
589}
590
591fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
592 match row.get_field(key) {
593 Some(Value::Text(value)) => Some(value.to_string()),
594 _ => None,
595 }
596}
597
598fn collection_contract<'a>(
599 collection_name: &str,
600 contracts: Option<&'a [CollectionContract]>,
601) -> Option<&'a CollectionContract> {
602 contracts.and_then(|contracts| {
603 contracts
604 .iter()
605 .find(|contract| contract.name == collection_name)
606 })
607}
608
609fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
610 let mut has_table = false;
611 let mut has_graph = false;
612 let mut has_vector = false;
613
614 for entity in entities {
615 match &entity.kind {
616 EntityKind::TableRow { .. } => has_table = true,
617 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
618 EntityKind::Vector { .. } => has_vector = true,
619 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
620 }
621 }
622
623 match (has_table, has_graph, has_vector) {
624 (true, false, false) => CollectionModel::Table,
625 (false, true, false) => CollectionModel::Graph,
626 (false, false, true) => CollectionModel::Vector,
627 _ => CollectionModel::Mixed,
628 }
629}
630
631fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
632 match model {
633 CollectionModel::Table => SchemaMode::Strict,
634 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
635 CollectionModel::Document
636 | CollectionModel::Hll
637 | CollectionModel::Sketch
638 | CollectionModel::Filter
639 | CollectionModel::Kv
640 | CollectionModel::Config
641 | CollectionModel::Vault
642 | CollectionModel::Mixed => SchemaMode::Dynamic,
643 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
644 CollectionModel::Queue => SchemaMode::Dynamic,
645 }
646}
647
648fn infer_indices(
649 collection_name: &str,
650 model: CollectionModel,
651 index_catalog: Option<&IndexCatalog>,
652 declarations: Option<&CatalogDeclarations>,
653) -> Vec<String> {
654 let declared = declared_indices(collection_name, declarations);
655 if !declared.is_empty() {
656 return declared;
657 }
658
659 let available = index_catalog
660 .map(IndexCatalog::snapshot)
661 .unwrap_or_default();
662 let mut selected = Vec::new();
663
664 for metric in available {
665 let relevant = matches!(
666 (model, metric.kind),
667 (_, IndexKind::BTree)
668 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
669 | (CollectionModel::Vector, IndexKind::VectorHnsw)
670 | (CollectionModel::Vector, IndexKind::VectorInverted)
671 | (CollectionModel::Vector, IndexKind::VectorTurbo)
672 | (CollectionModel::Document, IndexKind::FullText)
673 | (CollectionModel::Document, IndexKind::DocumentPathValue)
674 | (CollectionModel::Kv, IndexKind::Hash)
675 | (CollectionModel::Config, IndexKind::Hash)
676 | (CollectionModel::Vault, IndexKind::Hash)
677 | (CollectionModel::Mixed, _)
678 );
679
680 if relevant && metric.enabled {
681 selected.push(metric.name);
682 }
683 }
684
685 selected
686}
687
688fn declared_indices(
689 collection_name: &str,
690 declarations: Option<&CatalogDeclarations>,
691) -> Vec<String> {
692 let mut selected = declarations
693 .map(|declarations| {
694 declarations
695 .declared_indexes
696 .iter()
697 .filter(|index| index.collection.as_deref() == Some(collection_name))
698 .map(|index| index.name.clone())
699 .collect::<Vec<_>>()
700 })
701 .unwrap_or_default();
702 selected.sort();
703 selected.dedup();
704 selected
705}
706
707fn operational_indices(
708 collection_name: &str,
709 index_catalog: Option<&IndexCatalog>,
710 declarations: Option<&CatalogDeclarations>,
711) -> Vec<String> {
712 if let Some(declarations) = declarations {
713 let mut selected = declarations
714 .operational_indexes
715 .iter()
716 .filter(|index| index.collection.as_deref() == Some(collection_name))
717 .map(|index| index.name.clone())
718 .collect::<Vec<_>>();
719 selected.sort();
720 selected.dedup();
721 return selected;
722 }
723
724 let mut selected = index_catalog
725 .map(IndexCatalog::snapshot)
726 .unwrap_or_default()
727 .into_iter()
728 .filter(|metric| metric.enabled)
729 .filter_map(|metric| {
730 if metric.name.starts_with(collection_name) {
731 Some(metric.name)
732 } else {
733 None
734 }
735 })
736 .collect::<Vec<_>>();
737 selected.sort();
738 selected.dedup();
739 selected
740}
741
742fn collection_index_consistency(
743 declared_indices: &[String],
744 operational_indices: &[String],
745) -> (bool, Vec<String>, Vec<String>) {
746 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
747 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
748 let missing_operational_indices = declared
749 .difference(&operational)
750 .cloned()
751 .collect::<Vec<_>>();
752 let undeclared_operational_indices = operational
753 .difference(&declared)
754 .cloned()
755 .collect::<Vec<_>>();
756 (
757 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
758 missing_operational_indices,
759 undeclared_operational_indices,
760 )
761}
762
763fn collection_index_readiness(
764 collection_name: &str,
765 statuses: &[CatalogIndexStatus],
766) -> (usize, usize, bool) {
767 let local = statuses
768 .iter()
769 .filter(|status| status.collection.as_deref() == Some(collection_name))
770 .collect::<Vec<_>>();
771 let queryable_count = local.iter().filter(|status| status.queryable).count();
772 let requires_rebuild_count = local
773 .iter()
774 .filter(|status| status.requires_rebuild)
775 .count();
776 let locally_in_sync = local.iter().all(|status| status.in_sync);
777 (queryable_count, requires_rebuild_count, locally_in_sync)
778}
779
780fn collection_graph_projection_readiness(
781 collection_name: &str,
782 statuses: &[CatalogGraphProjectionStatus],
783) -> (usize, usize, bool) {
784 let local = statuses
785 .iter()
786 .filter(|status| status.source.as_deref() == Some(collection_name))
787 .collect::<Vec<_>>();
788 let queryable_count = local.iter().filter(|status| status.queryable).count();
789 let requires_rematerialization_count = local
790 .iter()
791 .filter(|status| status.requires_rematerialization)
792 .count();
793 let locally_in_sync = local
794 .iter()
795 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
796 (
797 queryable_count,
798 requires_rematerialization_count,
799 locally_in_sync,
800 )
801}
802
803fn collection_analytics_job_readiness(
804 collection_name: &str,
805 projection_statuses: &[CatalogGraphProjectionStatus],
806 job_statuses: &[CatalogAnalyticsJobStatus],
807) -> (usize, usize, bool) {
808 let local = job_statuses
809 .iter()
810 .filter(|status| {
811 let Some(projection_name) = status.projection.as_deref() else {
812 return false;
813 };
814 projection_statuses
815 .iter()
816 .find(|projection| projection.name == projection_name)
817 .and_then(|projection| projection.source.as_deref())
818 == Some(collection_name)
819 })
820 .collect::<Vec<_>>();
821 let executable_count = local.iter().filter(|status| status.executable).count();
822 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
823 let locally_in_sync = local
824 .iter()
825 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
826 (executable_count, requires_rerun_count, locally_in_sync)
827}
828
829fn graph_projection_statuses(
830 declarations: Option<&CatalogDeclarations>,
831) -> Vec<CatalogGraphProjectionStatus> {
832 let declared = declarations
833 .map(|declarations| declarations.declared_graph_projections.as_slice())
834 .unwrap_or(&[]);
835 let operational = declarations
836 .map(|declarations| declarations.operational_graph_projections.as_slice())
837 .unwrap_or(&[]);
838 let declared_jobs = declarations
839 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
840 .unwrap_or(&[]);
841 let operational_jobs = declarations
842 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
843 .unwrap_or(&[]);
844
845 let mut names = declared
846 .iter()
847 .map(|projection| projection.name.clone())
848 .chain(operational.iter().map(|projection| projection.name.clone()))
849 .collect::<Vec<_>>();
850 names.sort();
851 names.dedup();
852
853 names
854 .into_iter()
855 .map(|name| {
856 let declared_projection = declared.iter().find(|projection| projection.name == name);
857 let operational_projection = operational
858 .iter()
859 .find(|projection| projection.name == name);
860 let declared_present = declared_projection.is_some();
861 let operational_present = operational_projection.is_some();
862 let mut dependent_job_ids = BTreeSet::new();
863 let mut dependent_job_count = 0usize;
864 let mut active_dependent_job_count = 0usize;
865 let mut stale_dependent_job_count = 0usize;
866 for job in declared_jobs
867 .iter()
868 .chain(operational_jobs.iter())
869 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
870 {
871 if !dependent_job_ids.insert(job.id.clone()) {
872 continue;
873 }
874 dependent_job_count += 1;
875 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
876 active_dependent_job_count += 1;
877 }
878 if job.state == "stale" {
879 stale_dependent_job_count += 1;
880 }
881 }
882 let lifecycle_state = match (
883 declared_present,
884 operational_present,
885 declared_projection
886 .map(|projection| projection.state.as_str())
887 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
888 .unwrap_or_default(),
889 ) {
890 (true, _, "failed") => "failed",
891 (true, true, "stale") => "stale",
892 (true, _, "materializing") => "materializing",
893 (true, true, "materialized") => "materialized",
894 (true, true, _) => "materialized",
895 (false, true, _) => "orphaned-operational",
896 (true, false, _) => "declared",
897 (false, false, _) => "unknown",
898 }
899 .to_string();
900 let queryable = declared_present
901 && operational_present
902 && matches!(
903 declared_projection
904 .map(|projection| projection.state.as_str())
905 .or_else(
906 || operational_projection.map(|projection| projection.state.as_str())
907 )
908 .unwrap_or_default(),
909 "materialized"
910 );
911 let requires_rematerialization = matches!(
912 declared_projection
913 .map(|projection| projection.state.as_str())
914 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
915 .unwrap_or_default(),
916 "declared" | "materializing" | "failed" | "stale"
917 );
918 let mut attention_reasons = Vec::new();
919 if !declared_present || !operational_present {
920 attention_reasons.push("declaration_drift".to_string());
921 }
922 if requires_rematerialization {
923 attention_reasons.push("requires_rematerialization".to_string());
924 }
925 if stale_dependent_job_count > 0 {
926 attention_reasons.push("dependent_jobs_stale".to_string());
927 }
928 let attention_score = stale_dependent_job_count.saturating_mul(2)
929 + usize::from(requires_rematerialization).saturating_mul(4)
930 + usize::from(!declared_present || !operational_present)
931 + usize::from(!queryable);
932 CatalogGraphProjectionStatus {
933 name,
934 source: declared_projection
935 .map(|projection| projection.source.clone())
936 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
937 lifecycle_state,
938 declared: declared_present,
939 operational: operational_present,
940 in_sync: declared_present == operational_present,
941 last_materialized_sequence: declared_projection
942 .and_then(|projection| projection.last_materialized_sequence)
943 .or_else(|| {
944 operational_projection
945 .and_then(|projection| projection.last_materialized_sequence)
946 }),
947 queryable,
948 requires_rematerialization,
949 dependent_job_count,
950 active_dependent_job_count,
951 stale_dependent_job_count,
952 dependent_jobs_in_sync: stale_dependent_job_count == 0,
953 rerun_required: stale_dependent_job_count > 0,
954 attention_score,
955 attention_reasons,
956 }
957 })
958 .collect()
959}
960
961fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
962 let declared = declarations
963 .map(|declarations| declarations.declared_indexes.as_slice())
964 .unwrap_or(&[]);
965 let operational = declarations
966 .map(|declarations| declarations.operational_indexes.as_slice())
967 .unwrap_or(&[]);
968
969 let mut names = declared
970 .iter()
971 .map(|index| index.name.clone())
972 .chain(operational.iter().map(|index| index.name.clone()))
973 .collect::<Vec<_>>();
974 names.sort();
975 names.dedup();
976
977 names
978 .into_iter()
979 .map(|name| {
980 let declared_index = declared.iter().find(|index| index.name == name);
981 let operational_index = operational.iter().find(|index| index.name == name);
982 let collection = declared_index
983 .and_then(|index| index.collection.clone())
984 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
985 let kind = declared_index
986 .map(|index| index_kind_string(index.kind))
987 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
988 .unwrap_or_default();
989 let enabled = declared_index
990 .map(|index| index.enabled)
991 .or_else(|| operational_index.map(|index| index.enabled))
992 .unwrap_or(false);
993 let build_state = operational_index
994 .map(|index| index.build_state.clone())
995 .or_else(|| declared_index.map(|index| index.build_state.clone()));
996 let declared_present = declared_index.is_some();
997 let operational_present = operational_index.is_some();
998 let lifecycle_state = index_lifecycle_state(
999 declared_present,
1000 operational_present,
1001 enabled,
1002 build_state.as_deref(),
1003 );
1004
1005 let mut attention_reasons = Vec::new();
1006 if declared_present != operational_present {
1007 attention_reasons.push("declaration_drift".to_string());
1008 }
1009 if !enabled && declared_present {
1010 attention_reasons.push("disabled".to_string());
1011 }
1012 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1013 attention_reasons.push("failed".to_string());
1014 }
1015 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1016 attention_reasons.push("stale".to_string());
1017 }
1018 if matches!(
1019 build_state.as_deref().unwrap_or_default(),
1020 "building" | "declared-unbuilt"
1021 ) {
1022 attention_reasons.push("requires_rebuild".to_string());
1023 }
1024 let queryable = declared_present
1025 && operational_present
1026 && enabled
1027 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1028 let requires_rebuild = matches!(
1029 build_state.as_deref().unwrap_or_default(),
1030 "declared-unbuilt" | "building" | "stale" | "failed"
1031 );
1032 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1033 + usize::from(!enabled && declared_present)
1034 + usize::from(declared_present != operational_present)
1035 + usize::from(!queryable);
1036
1037 let artifact_state = crate::physical::ArtifactState::from_build_state(
1038 build_state.as_deref().unwrap_or("declared-unbuilt"),
1039 enabled,
1040 );
1041 CatalogIndexStatus {
1042 name,
1043 collection,
1044 kind,
1045 declared: declared_present,
1046 operational: operational_present,
1047 enabled,
1048 build_state,
1049 artifact_state,
1050 lifecycle_state,
1051 in_sync: declared_present == operational_present,
1052 queryable,
1053 requires_rebuild,
1054 attention_score,
1055 attention_reasons,
1056 }
1057 })
1058 .collect()
1059}
1060
1061fn index_lifecycle_state(
1062 declared: bool,
1063 operational: bool,
1064 enabled: bool,
1065 build_state: Option<&str>,
1066) -> String {
1067 if !declared && operational {
1068 return "orphaned-operational".to_string();
1069 }
1070 if declared && !enabled {
1071 return "disabled".to_string();
1072 }
1073 if !declared {
1074 return "unknown".to_string();
1075 }
1076 if !operational {
1077 return "declared".to_string();
1078 }
1079
1080 match build_state.unwrap_or_default() {
1081 "ready" => "ready".to_string(),
1082 "failed" => "failed".to_string(),
1083 "stale" => "stale".to_string(),
1084 "declared-unbuilt" => "declared".to_string(),
1085 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1086 "building".to_string()
1087 }
1088 _ => "building".to_string(),
1089 }
1090}
1091
1092fn index_kind_string(kind: IndexKind) -> String {
1093 match kind {
1094 IndexKind::BTree => "btree",
1095 IndexKind::Hash => "hash",
1096 IndexKind::Bitmap => "bitmap",
1097 IndexKind::Spatial => "spatial.rtree",
1098 IndexKind::VectorHnsw => "vector.hnsw",
1099 IndexKind::VectorInverted => "vector.inverted",
1100 IndexKind::VectorTurbo => "vector.turbo",
1101 IndexKind::GraphAdjacency => "graph.adjacency",
1102 IndexKind::FullText => "text.fulltext",
1103 IndexKind::DocumentPathValue => "document.pathvalue",
1104 IndexKind::HybridSearch => "search.hybrid",
1105 }
1106 .to_string()
1107}
1108
1109fn analytics_job_statuses(
1110 declarations: Option<&CatalogDeclarations>,
1111) -> Vec<CatalogAnalyticsJobStatus> {
1112 let declared = declarations
1113 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1114 .unwrap_or(&[]);
1115 let operational = declarations
1116 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1117 .unwrap_or(&[]);
1118 let declared_projections = declarations
1119 .map(|declarations| declarations.declared_graph_projections.as_slice())
1120 .unwrap_or(&[]);
1121 let operational_projections = declarations
1122 .map(|declarations| declarations.operational_graph_projections.as_slice())
1123 .unwrap_or(&[]);
1124
1125 let mut ids = declared
1126 .iter()
1127 .map(|job| job.id.clone())
1128 .chain(operational.iter().map(|job| job.id.clone()))
1129 .collect::<Vec<_>>();
1130 ids.sort();
1131 ids.dedup();
1132
1133 ids.into_iter()
1134 .map(|id| {
1135 let declared_job = declared.iter().find(|job| job.id == id);
1136 let operational_job = operational.iter().find(|job| job.id == id);
1137 let kind = declared_job
1138 .map(|job| job.kind.clone())
1139 .or_else(|| operational_job.map(|job| job.kind.clone()))
1140 .unwrap_or_default();
1141 let projection = declared_job
1142 .and_then(|job| job.projection.clone())
1143 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1144 let state = declared_job
1145 .map(|job| job.state.clone())
1146 .or_else(|| operational_job.map(|job| job.state.clone()))
1147 .unwrap_or_default();
1148 let declared_present = declared_job.is_some();
1149 let operational_present = operational_job.is_some();
1150 let last_run_sequence = declared_job
1151 .and_then(|job| job.last_run_sequence)
1152 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1153 let projection_declared = projection.as_ref().map(|projection_name| {
1154 declared_projections
1155 .iter()
1156 .any(|projection| projection.name == *projection_name)
1157 });
1158 let projection_operational = projection.as_ref().map(|projection_name| {
1159 operational_projections
1160 .iter()
1161 .any(|projection| projection.name == *projection_name)
1162 });
1163 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1164 declared_projections
1165 .iter()
1166 .find(|projection| projection.name == *projection_name)
1167 .map(|projection| projection.state.as_str())
1168 .or_else(|| {
1169 operational_projections
1170 .iter()
1171 .find(|projection| projection.name == *projection_name)
1172 .map(|projection| projection.state.as_str())
1173 })
1174 });
1175 let dependency_in_sync = projection.as_ref().map(|_| {
1176 matches!(projection_lifecycle, Some("materialized"))
1177 && projection_operational == Some(true)
1178 });
1179 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1180 (true, false, _) => "declared",
1181 (true, true, _)
1182 if matches!(
1183 projection_lifecycle,
1184 Some("stale" | "failed" | "materializing" | "declared")
1185 ) =>
1186 {
1187 "stale"
1188 }
1189 (true, true, "completed") => "completed",
1190 (true, true, "running") => "running",
1191 (true, true, "failed") => "failed",
1192 (true, true, "queued") => "queued",
1193 (true, true, "stale") => "stale",
1194 (true, true, _) => "materialized",
1195 (false, true, _) => "orphaned-operational",
1196 (false, false, _) => "unknown",
1197 }
1198 .to_string();
1199 let executable = declared_present
1200 && operational_present
1201 && !matches!(state.as_str(), "failed" | "stale")
1202 && dependency_in_sync.unwrap_or(true);
1203 let requires_rerun =
1204 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1205 let mut attention_reasons = Vec::new();
1206 if declared_present != operational_present {
1207 attention_reasons.push("declaration_drift".to_string());
1208 }
1209 if matches!(state.as_str(), "failed") {
1210 attention_reasons.push("failed".to_string());
1211 }
1212 if matches!(state.as_str(), "stale") {
1213 attention_reasons.push("stale".to_string());
1214 }
1215 if dependency_in_sync == Some(false) {
1216 attention_reasons.push("dependency_drift".to_string());
1217 }
1218 if requires_rerun {
1219 attention_reasons.push("requires_rerun".to_string());
1220 }
1221 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1222 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1223 + usize::from(declared_present != operational_present)
1224 + usize::from(!executable);
1225 CatalogAnalyticsJobStatus {
1226 id,
1227 kind,
1228 projection,
1229 state: state.clone(),
1230 lifecycle_state,
1231 declared: declared_present,
1232 operational: operational_present,
1233 in_sync: declared_present == operational_present,
1234 last_run_sequence,
1235 projection_declared,
1236 projection_operational,
1237 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1238 dependency_in_sync,
1239 executable,
1240 requires_rerun,
1241 attention_score,
1242 attention_reasons,
1243 }
1244 })
1245 .collect()
1246}
1247
1248pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1249 let declared_indexes = snapshot
1250 .declared_indexes
1251 .iter()
1252 .map(|index| index.name.clone())
1253 .collect::<BTreeSet<_>>();
1254 let operational_indexes = snapshot
1255 .operational_indexes
1256 .iter()
1257 .map(|index| index.name.clone())
1258 .collect::<BTreeSet<_>>();
1259 let declared_graph_projections = snapshot
1260 .declared_graph_projections
1261 .iter()
1262 .map(|projection| projection.name.clone())
1263 .collect::<BTreeSet<_>>();
1264 let operational_graph_projections = snapshot
1265 .operational_graph_projections
1266 .iter()
1267 .map(|projection| projection.name.clone())
1268 .collect::<BTreeSet<_>>();
1269 let declared_analytics_jobs = snapshot
1270 .declared_analytics_jobs
1271 .iter()
1272 .map(|job| job.id.clone())
1273 .collect::<BTreeSet<_>>();
1274 let operational_analytics_jobs = snapshot
1275 .operational_analytics_jobs
1276 .iter()
1277 .map(|job| job.id.clone())
1278 .collect::<BTreeSet<_>>();
1279
1280 CatalogConsistencyReport {
1281 declared_index_count: declared_indexes.len(),
1282 operational_index_count: operational_indexes.len(),
1283 declared_graph_projection_count: declared_graph_projections.len(),
1284 operational_graph_projection_count: operational_graph_projections.len(),
1285 declared_analytics_job_count: declared_analytics_jobs.len(),
1286 operational_analytics_job_count: operational_analytics_jobs.len(),
1287 missing_operational_indexes: declared_indexes
1288 .difference(&operational_indexes)
1289 .cloned()
1290 .collect(),
1291 undeclared_operational_indexes: operational_indexes
1292 .difference(&declared_indexes)
1293 .cloned()
1294 .collect(),
1295 missing_operational_graph_projections: declared_graph_projections
1296 .difference(&operational_graph_projections)
1297 .cloned()
1298 .collect(),
1299 undeclared_operational_graph_projections: operational_graph_projections
1300 .difference(&declared_graph_projections)
1301 .cloned()
1302 .collect(),
1303 missing_operational_analytics_jobs: declared_analytics_jobs
1304 .difference(&operational_analytics_jobs)
1305 .cloned()
1306 .collect(),
1307 undeclared_operational_analytics_jobs: operational_analytics_jobs
1308 .difference(&declared_analytics_jobs)
1309 .cloned()
1310 .collect(),
1311 }
1312}
1313
1314pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1315 CatalogAttentionSummary {
1316 collections_requiring_attention: snapshot
1317 .collections
1318 .iter()
1319 .filter(|collection| collection.attention_required)
1320 .count(),
1321 indexes_requiring_attention: snapshot
1322 .index_statuses
1323 .iter()
1324 .filter(|status| status.attention_score > 0)
1325 .count(),
1326 graph_projections_requiring_attention: snapshot
1327 .graph_projection_statuses
1328 .iter()
1329 .filter(|status| status.attention_score > 0)
1330 .count(),
1331 analytics_jobs_requiring_attention: snapshot
1332 .analytics_job_statuses
1333 .iter()
1334 .filter(|status| status.attention_score > 0)
1335 .count(),
1336 top_collection: snapshot
1337 .collections
1338 .iter()
1339 .filter(|collection| collection.attention_score > 0)
1340 .max_by_key(|collection| collection.attention_score)
1341 .cloned(),
1342 top_index: snapshot
1343 .index_statuses
1344 .iter()
1345 .filter(|status| status.attention_score > 0)
1346 .max_by_key(|status| status.attention_score)
1347 .cloned(),
1348 top_graph_projection: snapshot
1349 .graph_projection_statuses
1350 .iter()
1351 .filter(|status| status.attention_score > 0)
1352 .max_by_key(|status| status.attention_score)
1353 .cloned(),
1354 top_analytics_job: snapshot
1355 .analytics_job_statuses
1356 .iter()
1357 .filter(|status| status.attention_score > 0)
1358 .max_by_key(|status| status.attention_score)
1359 .cloned(),
1360 }
1361}
1362
1363pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1364 let mut collections = snapshot
1365 .collections
1366 .iter()
1367 .filter(|collection| collection.attention_required)
1368 .cloned()
1369 .collect::<Vec<_>>();
1370 collections.sort_by(|left, right| {
1371 right
1372 .attention_score
1373 .cmp(&left.attention_score)
1374 .then_with(|| left.name.cmp(&right.name))
1375 });
1376 collections
1377}
1378
1379pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1380 let mut statuses = snapshot
1381 .index_statuses
1382 .iter()
1383 .filter(|status| status.attention_score > 0)
1384 .cloned()
1385 .collect::<Vec<_>>();
1386 statuses.sort_by(|left, right| {
1387 right
1388 .attention_score
1389 .cmp(&left.attention_score)
1390 .then_with(|| left.name.cmp(&right.name))
1391 });
1392 statuses
1393}
1394
1395pub fn graph_projection_attention(
1396 snapshot: &CatalogModelSnapshot,
1397) -> Vec<CatalogGraphProjectionStatus> {
1398 let mut statuses = snapshot
1399 .graph_projection_statuses
1400 .iter()
1401 .filter(|status| status.attention_score > 0)
1402 .cloned()
1403 .collect::<Vec<_>>();
1404 statuses.sort_by(|left, right| {
1405 right
1406 .attention_score
1407 .cmp(&left.attention_score)
1408 .then_with(|| left.name.cmp(&right.name))
1409 });
1410 statuses
1411}
1412
1413pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1414 let mut statuses = snapshot
1415 .analytics_job_statuses
1416 .iter()
1417 .filter(|status| status.attention_score > 0)
1418 .cloned()
1419 .collect::<Vec<_>>();
1420 statuses.sort_by(|left, right| {
1421 right
1422 .attention_score
1423 .cmp(&left.attention_score)
1424 .then_with(|| left.id.cmp(&right.id))
1425 });
1426 statuses
1427}