1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Hll,
23 Sketch,
24 Filter,
25 Kv,
26 Config,
27 Vault,
28 Mixed,
29 TimeSeries,
30 Queue,
31 Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36 Strict,
37 SemiStructured,
38 Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43 Insert,
44 Update,
45 Delete,
46}
47
48impl SubscriptionOperation {
49 pub fn as_str(self) -> &'static str {
50 match self {
51 Self::Insert => "INSERT",
52 Self::Update => "UPDATE",
53 Self::Delete => "DELETE",
54 }
55 }
56
57 pub fn from_str(value: &str) -> Option<Self> {
58 match value.to_ascii_uppercase().as_str() {
59 "INSERT" => Some(Self::Insert),
60 "UPDATE" => Some(Self::Update),
61 "DELETE" => Some(Self::Delete),
62 _ => None,
63 }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69 pub name: String,
71 pub source: String,
72 pub target_queue: String,
73 pub ops_filter: Vec<SubscriptionOperation>,
74 pub where_filter: Option<String>,
75 pub redact_fields: Vec<String>,
76 pub enabled: bool,
77 pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct CollectionDescriptor {
86 pub name: String,
87 pub model: CollectionModel,
88 pub schema_mode: SchemaMode,
89 pub contract_present: bool,
90 pub contract_origin: Option<crate::physical::ContractOrigin>,
91 pub declared_model: Option<CollectionModel>,
92 pub observed_model: CollectionModel,
93 pub queue_mode: Option<QueueMode>,
94 pub queue_max_attempts: Option<u32>,
98 pub queue_lock_deadline_ms: Option<u64>,
100 pub queue_in_flight_cap_per_group: Option<u32>,
102 pub queue_dlq_target: Option<String>,
104 pub vector_dimension: Option<usize>,
105 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
106 pub session_key: Option<String>,
110 pub session_gap_ms: Option<u64>,
113 pub retention_duration_ms: Option<u64>,
117 pub declared_schema_mode: Option<SchemaMode>,
118 pub observed_schema_mode: SchemaMode,
119 pub entities: usize,
120 pub cross_refs: usize,
121 pub segments: usize,
122 pub indices: Vec<String>,
123 pub declared_indices: Vec<String>,
124 pub operational_indices: Vec<String>,
125 pub indexes_in_sync: bool,
126 pub missing_operational_indices: Vec<String>,
127 pub undeclared_operational_indices: Vec<String>,
128 pub queryable_index_count: usize,
129 pub indexes_requiring_rebuild_count: usize,
130 pub queryable_graph_projection_count: usize,
131 pub graph_projections_requiring_rematerialization_count: usize,
132 pub executable_analytics_job_count: usize,
133 pub analytics_jobs_requiring_rerun_count: usize,
134 pub subscriptions: Vec<SubscriptionDescriptor>,
135 pub resources_in_sync: bool,
136 pub attention_required: bool,
137 pub attention_score: usize,
138 pub attention_reasons: Vec<String>,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogModelSnapshot {
143 pub summary: CatalogSnapshot,
144 pub collections: Vec<CollectionDescriptor>,
145 pub indices: IndexCatalogSnapshot,
146 pub declared_indexes: Vec<PhysicalIndexState>,
147 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
148 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
149 pub operational_indexes: Vec<PhysicalIndexState>,
150 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
151 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
152 pub index_statuses: Vec<CatalogIndexStatus>,
153 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
154 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
155 pub queryable_index_count: usize,
156 pub indexes_requiring_rebuild_count: usize,
157 pub queryable_graph_projection_count: usize,
158 pub graph_projections_requiring_rematerialization_count: usize,
159 pub executable_analytics_job_count: usize,
160 pub analytics_jobs_requiring_rerun_count: usize,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogAttentionSummary {
165 pub collections_requiring_attention: usize,
166 pub indexes_requiring_attention: usize,
167 pub graph_projections_requiring_attention: usize,
168 pub analytics_jobs_requiring_attention: usize,
169 pub top_collection: Option<CollectionDescriptor>,
170 pub top_index: Option<CatalogIndexStatus>,
171 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
172 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
173}
174
175#[derive(Debug, Clone, Default)]
176pub struct CatalogDeclarations {
177 pub declared_indexes: Vec<PhysicalIndexState>,
178 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
179 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
180 pub operational_indexes: Vec<PhysicalIndexState>,
181 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
182 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
183}
184
185#[derive(Debug, Clone)]
186pub struct CatalogGraphProjectionStatus {
187 pub name: String,
188 pub source: Option<String>,
189 pub lifecycle_state: String,
190 pub declared: bool,
191 pub operational: bool,
192 pub in_sync: bool,
193 pub last_materialized_sequence: Option<u64>,
194 pub queryable: bool,
195 pub requires_rematerialization: bool,
196 pub dependent_job_count: usize,
197 pub active_dependent_job_count: usize,
198 pub stale_dependent_job_count: usize,
199 pub dependent_jobs_in_sync: bool,
200 pub rerun_required: bool,
201 pub attention_score: usize,
202 pub attention_reasons: Vec<String>,
203}
204
205#[derive(Debug, Clone)]
206pub struct CatalogIndexStatus {
207 pub name: String,
208 pub collection: Option<String>,
209 pub kind: String,
210 pub declared: bool,
211 pub operational: bool,
212 pub enabled: bool,
213 pub build_state: Option<String>,
214 pub artifact_state: crate::physical::ArtifactState,
215 pub lifecycle_state: String,
216 pub in_sync: bool,
217 pub queryable: bool,
218 pub requires_rebuild: bool,
219 pub attention_score: usize,
220 pub attention_reasons: Vec<String>,
221}
222
223#[derive(Debug, Clone)]
224pub struct CatalogAnalyticsJobStatus {
225 pub id: String,
226 pub kind: String,
227 pub projection: Option<String>,
228 pub state: String,
229 pub lifecycle_state: String,
230 pub declared: bool,
231 pub operational: bool,
232 pub in_sync: bool,
233 pub last_run_sequence: Option<u64>,
234 pub projection_declared: Option<bool>,
235 pub projection_operational: Option<bool>,
236 pub projection_lifecycle_state: Option<String>,
237 pub dependency_in_sync: Option<bool>,
238 pub executable: bool,
239 pub requires_rerun: bool,
240 pub attention_score: usize,
241 pub attention_reasons: Vec<String>,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct CatalogConsistencyReport {
246 pub declared_index_count: usize,
247 pub operational_index_count: usize,
248 pub declared_graph_projection_count: usize,
249 pub operational_graph_projection_count: usize,
250 pub declared_analytics_job_count: usize,
251 pub operational_analytics_job_count: usize,
252 pub missing_operational_indexes: Vec<String>,
253 pub undeclared_operational_indexes: Vec<String>,
254 pub missing_operational_graph_projections: Vec<String>,
255 pub undeclared_operational_graph_projections: Vec<String>,
256 pub missing_operational_analytics_jobs: Vec<String>,
257 pub undeclared_operational_analytics_jobs: Vec<String>,
258}
259
260pub fn snapshot_store(
261 name: &str,
262 store: &UnifiedStore,
263 index_catalog: Option<&IndexCatalog>,
264) -> CatalogModelSnapshot {
265 snapshot_store_with_declarations(name, store, index_catalog, None, None)
266}
267
268pub fn snapshot_store_with_declarations(
269 name: &str,
270 store: &UnifiedStore,
271 index_catalog: Option<&IndexCatalog>,
272 declarations: Option<&CatalogDeclarations>,
273 contracts: Option<&[CollectionContract]>,
274) -> CatalogModelSnapshot {
275 let index_statuses = index_statuses(declarations);
276 let graph_projection_statuses = graph_projection_statuses(declarations);
277 let analytics_job_statuses = analytics_job_statuses(declarations);
278
279 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
280 for (collection, entity) in store.query_all(|_| true) {
281 grouped.entry(collection).or_default().push(entity);
282 }
283 let queue_policies = queue_policies_from_grouped(&grouped);
284
285 let mut stats_by_collection = BTreeMap::new();
286 let mut collections = Vec::new();
287
288 for collection_name in store.list_collections() {
289 let entities = grouped.remove(&collection_name).unwrap_or_default();
290 let inferred_model = infer_model(&entities);
291 let inferred_schema_mode = infer_schema_mode(inferred_model);
292 let contract = collection_contract(&collection_name, contracts);
293 let model = contract
294 .map(|contract| contract.declared_model)
295 .unwrap_or(inferred_model);
296 let cross_refs = entities
297 .iter()
298 .map(|entity| entity.cross_refs().len())
299 .sum();
300 let entity_count = entities.len();
301 let manager_stats = store
302 .get_collection(&collection_name)
303 .map(|manager| manager.stats());
304 let segments = manager_stats
305 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
306 .unwrap_or(0);
307
308 stats_by_collection.insert(
309 collection_name.clone(),
310 CollectionStats {
311 entities: entity_count,
312 cross_refs,
313 segments,
314 },
315 );
316
317 let declared_indices = declared_indices(&collection_name, declarations);
318 let operational_indices = operational_indices(&collection_name, index_catalog);
319 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
320 collection_index_consistency(&declared_indices, &operational_indices);
321 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
322 collection_index_readiness(&collection_name, &index_statuses);
323 let (
324 queryable_graph_projection_count,
325 graph_projections_requiring_rematerialization_count,
326 graph_projections_locally_in_sync,
327 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
328 let (
329 executable_analytics_job_count,
330 analytics_jobs_requiring_rerun_count,
331 analytics_jobs_locally_in_sync,
332 ) = collection_analytics_job_readiness(
333 &collection_name,
334 &graph_projection_statuses,
335 &analytics_job_statuses,
336 );
337 let resources_in_sync = indexes_in_sync
338 && indexes_locally_in_sync
339 && graph_projections_locally_in_sync
340 && analytics_jobs_locally_in_sync;
341 let attention_required = !resources_in_sync
342 || indexes_requiring_rebuild_count > 0
343 || graph_projections_requiring_rematerialization_count > 0
344 || analytics_jobs_requiring_rerun_count > 0;
345 let mut attention_reasons = Vec::new();
346 if !indexes_in_sync || !indexes_locally_in_sync {
347 attention_reasons.push("index_drift".to_string());
348 }
349 if indexes_requiring_rebuild_count > 0 {
350 attention_reasons.push("indexes_require_rebuild".to_string());
351 }
352 if !graph_projections_locally_in_sync {
353 attention_reasons.push("graph_projection_drift".to_string());
354 }
355 if graph_projections_requiring_rematerialization_count > 0 {
356 attention_reasons.push("graph_projections_require_rematerialization".to_string());
357 }
358 if !analytics_jobs_locally_in_sync {
359 attention_reasons.push("analytics_job_drift".to_string());
360 }
361 if analytics_jobs_requiring_rerun_count > 0 {
362 attention_reasons.push("analytics_jobs_require_rerun".to_string());
363 }
364 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
365 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
366 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
367 + usize::from(!resources_in_sync);
368
369 collections.push(CollectionDescriptor {
370 name: collection_name.clone(),
371 model,
372 schema_mode: contract
373 .map(|contract| contract.schema_mode)
374 .unwrap_or(inferred_schema_mode),
375 contract_present: contract.is_some(),
376 contract_origin: contract.map(|contract| contract.origin),
377 declared_model: contract.map(|contract| contract.declared_model),
378 observed_model: inferred_model,
379 queue_mode: if model == CollectionModel::Queue {
380 Some(
381 queue_policies
382 .get(&collection_name)
383 .map(|p| p.mode)
384 .unwrap_or_default(),
385 )
386 } else {
387 None
388 },
389 queue_max_attempts: if model == CollectionModel::Queue {
390 Some(
391 queue_policies
392 .get(&collection_name)
393 .map(|p| p.max_attempts)
394 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
395 )
396 } else {
397 None
398 },
399 queue_lock_deadline_ms: if model == CollectionModel::Queue {
400 Some(
401 queue_policies
402 .get(&collection_name)
403 .map(|p| p.lock_deadline_ms)
404 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
405 )
406 } else {
407 None
408 },
409 queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
410 Some(
411 queue_policies
412 .get(&collection_name)
413 .map(|p| p.in_flight_cap_per_group)
414 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
415 )
416 } else {
417 None
418 },
419 queue_dlq_target: if model == CollectionModel::Queue {
420 queue_policies
421 .get(&collection_name)
422 .and_then(|p| p.dlq_target.clone())
423 } else {
424 None
425 },
426 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
427 vector_metric: contract.and_then(|contract| contract.vector_metric),
428 session_key: contract.and_then(|contract| contract.session_key.clone()),
429 session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
430 retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
431 declared_schema_mode: contract.map(|contract| contract.schema_mode),
432 observed_schema_mode: inferred_schema_mode,
433 entities: entity_count,
434 cross_refs,
435 segments,
436 indices: infer_indices(&collection_name, model, index_catalog, declarations),
437 declared_indices,
438 operational_indices,
439 indexes_in_sync,
440 missing_operational_indices,
441 undeclared_operational_indices,
442 queryable_index_count,
443 indexes_requiring_rebuild_count,
444 queryable_graph_projection_count,
445 graph_projections_requiring_rematerialization_count,
446 executable_analytics_job_count,
447 analytics_jobs_requiring_rerun_count,
448 subscriptions: contract
449 .map(|contract| contract.subscriptions.clone())
450 .unwrap_or_default(),
451 resources_in_sync,
452 attention_required,
453 attention_score,
454 attention_reasons,
455 });
456 }
457
458 collections.sort_by(|left, right| left.name.cmp(&right.name));
459
460 let summary = CatalogSnapshot {
461 name: name.to_string(),
462 total_entities: stats_by_collection
463 .values()
464 .map(|stats| stats.entities)
465 .sum(),
466 total_collections: stats_by_collection.len(),
467 stats_by_collection,
468 updated_at: SystemTime::now(),
469 };
470
471 CatalogModelSnapshot {
472 summary,
473 collections,
474 indices: index_catalog
475 .map(IndexCatalog::snapshot)
476 .unwrap_or_default(),
477 declared_indexes: declarations
478 .map(|declarations| declarations.declared_indexes.clone())
479 .unwrap_or_default(),
480 declared_graph_projections: declarations
481 .map(|declarations| declarations.declared_graph_projections.clone())
482 .unwrap_or_default(),
483 declared_analytics_jobs: declarations
484 .map(|declarations| declarations.declared_analytics_jobs.clone())
485 .unwrap_or_default(),
486 operational_indexes: declarations
487 .map(|declarations| declarations.operational_indexes.clone())
488 .unwrap_or_default(),
489 operational_graph_projections: declarations
490 .map(|declarations| declarations.operational_graph_projections.clone())
491 .unwrap_or_default(),
492 operational_analytics_jobs: declarations
493 .map(|declarations| declarations.operational_analytics_jobs.clone())
494 .unwrap_or_default(),
495 queryable_index_count: index_statuses
496 .iter()
497 .filter(|status| status.queryable)
498 .count(),
499 indexes_requiring_rebuild_count: index_statuses
500 .iter()
501 .filter(|status| status.requires_rebuild)
502 .count(),
503 queryable_graph_projection_count: graph_projection_statuses
504 .iter()
505 .filter(|status| status.queryable)
506 .count(),
507 graph_projections_requiring_rematerialization_count: graph_projection_statuses
508 .iter()
509 .filter(|status| status.requires_rematerialization)
510 .count(),
511 executable_analytics_job_count: analytics_job_statuses
512 .iter()
513 .filter(|status| status.executable)
514 .count(),
515 analytics_jobs_requiring_rerun_count: analytics_job_statuses
516 .iter()
517 .filter(|status| status.requires_rerun)
518 .count(),
519 index_statuses,
520 graph_projection_statuses,
521 analytics_job_statuses,
522 }
523}
524
525#[derive(Debug, Clone)]
529pub struct QueuePolicySnapshot {
530 pub mode: QueueMode,
531 pub max_attempts: u32,
532 pub lock_deadline_ms: u64,
533 pub in_flight_cap_per_group: u32,
534 pub dlq_target: Option<String>,
535}
536
537fn queue_policies_from_grouped(
538 grouped: &HashMap<String, Vec<UnifiedEntity>>,
539) -> HashMap<String, QueuePolicySnapshot> {
540 let Some(meta) = grouped.get("red_queue_meta") else {
541 return HashMap::new();
542 };
543
544 let mut policies = HashMap::new();
545 for entity in meta {
546 let Some(row) = entity.data.as_row() else {
547 continue;
548 };
549 if row_text(row, "kind").as_deref() != Some("queue_config") {
550 continue;
551 }
552 let Some(queue) = row_text(row, "queue") else {
553 continue;
554 };
555 let mode = row_text(row, "mode")
556 .as_deref()
557 .and_then(QueueMode::parse)
558 .unwrap_or_default();
559 let max_attempts = row_u64(row, "max_attempts")
560 .map(|v| v as u32)
561 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
562 let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
563 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
564 let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
565 .map(|v| v as u32)
566 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
567 let dlq_target = row_text(row, "dlq");
568 policies.insert(
569 queue,
570 QueuePolicySnapshot {
571 mode,
572 max_attempts,
573 lock_deadline_ms,
574 in_flight_cap_per_group,
575 dlq_target,
576 },
577 );
578 }
579 policies
580}
581
582fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
583 match row.get_field(key) {
584 Some(Value::UnsignedInteger(v)) => Some(*v),
585 Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
586 _ => None,
587 }
588}
589
590fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
591 match row.get_field(key) {
592 Some(Value::Text(value)) => Some(value.to_string()),
593 _ => None,
594 }
595}
596
597fn collection_contract<'a>(
598 collection_name: &str,
599 contracts: Option<&'a [CollectionContract]>,
600) -> Option<&'a CollectionContract> {
601 contracts.and_then(|contracts| {
602 contracts
603 .iter()
604 .find(|contract| contract.name == collection_name)
605 })
606}
607
608fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
609 let mut has_table = false;
610 let mut has_graph = false;
611 let mut has_vector = false;
612
613 for entity in entities {
614 match &entity.kind {
615 EntityKind::TableRow { .. } => has_table = true,
616 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
617 EntityKind::Vector { .. } => has_vector = true,
618 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
619 }
620 }
621
622 match (has_table, has_graph, has_vector) {
623 (true, false, false) => CollectionModel::Table,
624 (false, true, false) => CollectionModel::Graph,
625 (false, false, true) => CollectionModel::Vector,
626 _ => CollectionModel::Mixed,
627 }
628}
629
630fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
631 match model {
632 CollectionModel::Table => SchemaMode::Strict,
633 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
634 CollectionModel::Document
635 | CollectionModel::Hll
636 | CollectionModel::Sketch
637 | CollectionModel::Filter
638 | CollectionModel::Kv
639 | CollectionModel::Config
640 | CollectionModel::Vault
641 | CollectionModel::Mixed => SchemaMode::Dynamic,
642 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
643 CollectionModel::Queue => SchemaMode::Dynamic,
644 }
645}
646
647fn infer_indices(
648 collection_name: &str,
649 model: CollectionModel,
650 index_catalog: Option<&IndexCatalog>,
651 declarations: Option<&CatalogDeclarations>,
652) -> Vec<String> {
653 let declared = declared_indices(collection_name, declarations);
654 if !declared.is_empty() {
655 return declared;
656 }
657
658 let available = index_catalog
659 .map(IndexCatalog::snapshot)
660 .unwrap_or_default();
661 let mut selected = Vec::new();
662
663 for metric in available {
664 let relevant = matches!(
665 (model, metric.kind),
666 (_, IndexKind::BTree)
667 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
668 | (CollectionModel::Vector, IndexKind::VectorHnsw)
669 | (CollectionModel::Vector, IndexKind::VectorInverted)
670 | (CollectionModel::Document, IndexKind::FullText)
671 | (CollectionModel::Document, IndexKind::DocumentPathValue)
672 | (CollectionModel::Kv, IndexKind::Hash)
673 | (CollectionModel::Config, IndexKind::Hash)
674 | (CollectionModel::Vault, IndexKind::Hash)
675 | (CollectionModel::Mixed, _)
676 );
677
678 if relevant && metric.enabled {
679 selected.push(metric.name);
680 }
681 }
682
683 selected
684}
685
686fn declared_indices(
687 collection_name: &str,
688 declarations: Option<&CatalogDeclarations>,
689) -> Vec<String> {
690 let mut selected = declarations
691 .map(|declarations| {
692 declarations
693 .declared_indexes
694 .iter()
695 .filter(|index| index.collection.as_deref() == Some(collection_name))
696 .map(|index| index.name.clone())
697 .collect::<Vec<_>>()
698 })
699 .unwrap_or_default();
700 selected.sort();
701 selected.dedup();
702 selected
703}
704
705fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
706 let mut selected = index_catalog
707 .map(IndexCatalog::snapshot)
708 .unwrap_or_default()
709 .into_iter()
710 .filter(|metric| metric.enabled)
711 .filter_map(|metric| {
712 if metric.name.starts_with(collection_name)
713 || matches!(
714 metric.kind,
715 IndexKind::BTree
716 | IndexKind::GraphAdjacency
717 | IndexKind::VectorHnsw
718 | IndexKind::VectorInverted
719 | IndexKind::FullText
720 | IndexKind::DocumentPathValue
721 | IndexKind::HybridSearch
722 )
723 {
724 Some(metric.name)
725 } else {
726 None
727 }
728 })
729 .collect::<Vec<_>>();
730 selected.sort();
731 selected.dedup();
732 selected
733}
734
735fn collection_index_consistency(
736 declared_indices: &[String],
737 operational_indices: &[String],
738) -> (bool, Vec<String>, Vec<String>) {
739 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
740 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
741 let missing_operational_indices = declared
742 .difference(&operational)
743 .cloned()
744 .collect::<Vec<_>>();
745 let undeclared_operational_indices = operational
746 .difference(&declared)
747 .cloned()
748 .collect::<Vec<_>>();
749 (
750 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
751 missing_operational_indices,
752 undeclared_operational_indices,
753 )
754}
755
756fn collection_index_readiness(
757 collection_name: &str,
758 statuses: &[CatalogIndexStatus],
759) -> (usize, usize, bool) {
760 let local = statuses
761 .iter()
762 .filter(|status| status.collection.as_deref() == Some(collection_name))
763 .collect::<Vec<_>>();
764 let queryable_count = local.iter().filter(|status| status.queryable).count();
765 let requires_rebuild_count = local
766 .iter()
767 .filter(|status| status.requires_rebuild)
768 .count();
769 let locally_in_sync = local.iter().all(|status| status.in_sync);
770 (queryable_count, requires_rebuild_count, locally_in_sync)
771}
772
773fn collection_graph_projection_readiness(
774 collection_name: &str,
775 statuses: &[CatalogGraphProjectionStatus],
776) -> (usize, usize, bool) {
777 let local = statuses
778 .iter()
779 .filter(|status| status.source.as_deref() == Some(collection_name))
780 .collect::<Vec<_>>();
781 let queryable_count = local.iter().filter(|status| status.queryable).count();
782 let requires_rematerialization_count = local
783 .iter()
784 .filter(|status| status.requires_rematerialization)
785 .count();
786 let locally_in_sync = local
787 .iter()
788 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
789 (
790 queryable_count,
791 requires_rematerialization_count,
792 locally_in_sync,
793 )
794}
795
796fn collection_analytics_job_readiness(
797 collection_name: &str,
798 projection_statuses: &[CatalogGraphProjectionStatus],
799 job_statuses: &[CatalogAnalyticsJobStatus],
800) -> (usize, usize, bool) {
801 let local = job_statuses
802 .iter()
803 .filter(|status| {
804 let Some(projection_name) = status.projection.as_deref() else {
805 return false;
806 };
807 projection_statuses
808 .iter()
809 .find(|projection| projection.name == projection_name)
810 .and_then(|projection| projection.source.as_deref())
811 == Some(collection_name)
812 })
813 .collect::<Vec<_>>();
814 let executable_count = local.iter().filter(|status| status.executable).count();
815 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
816 let locally_in_sync = local
817 .iter()
818 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
819 (executable_count, requires_rerun_count, locally_in_sync)
820}
821
822fn graph_projection_statuses(
823 declarations: Option<&CatalogDeclarations>,
824) -> Vec<CatalogGraphProjectionStatus> {
825 let declared = declarations
826 .map(|declarations| declarations.declared_graph_projections.as_slice())
827 .unwrap_or(&[]);
828 let operational = declarations
829 .map(|declarations| declarations.operational_graph_projections.as_slice())
830 .unwrap_or(&[]);
831 let declared_jobs = declarations
832 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
833 .unwrap_or(&[]);
834 let operational_jobs = declarations
835 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
836 .unwrap_or(&[]);
837
838 let mut names = declared
839 .iter()
840 .map(|projection| projection.name.clone())
841 .chain(operational.iter().map(|projection| projection.name.clone()))
842 .collect::<Vec<_>>();
843 names.sort();
844 names.dedup();
845
846 names
847 .into_iter()
848 .map(|name| {
849 let declared_projection = declared.iter().find(|projection| projection.name == name);
850 let operational_projection = operational
851 .iter()
852 .find(|projection| projection.name == name);
853 let declared_present = declared_projection.is_some();
854 let operational_present = operational_projection.is_some();
855 let mut dependent_job_ids = BTreeSet::new();
856 let mut dependent_job_count = 0usize;
857 let mut active_dependent_job_count = 0usize;
858 let mut stale_dependent_job_count = 0usize;
859 for job in declared_jobs
860 .iter()
861 .chain(operational_jobs.iter())
862 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
863 {
864 if !dependent_job_ids.insert(job.id.clone()) {
865 continue;
866 }
867 dependent_job_count += 1;
868 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
869 active_dependent_job_count += 1;
870 }
871 if job.state == "stale" {
872 stale_dependent_job_count += 1;
873 }
874 }
875 let lifecycle_state = match (
876 declared_present,
877 operational_present,
878 declared_projection
879 .map(|projection| projection.state.as_str())
880 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
881 .unwrap_or_default(),
882 ) {
883 (true, _, "failed") => "failed",
884 (true, true, "stale") => "stale",
885 (true, _, "materializing") => "materializing",
886 (true, true, "materialized") => "materialized",
887 (true, true, _) => "materialized",
888 (false, true, _) => "orphaned-operational",
889 (true, false, _) => "declared",
890 (false, false, _) => "unknown",
891 }
892 .to_string();
893 let queryable = declared_present
894 && operational_present
895 && matches!(
896 declared_projection
897 .map(|projection| projection.state.as_str())
898 .or_else(
899 || operational_projection.map(|projection| projection.state.as_str())
900 )
901 .unwrap_or_default(),
902 "materialized"
903 );
904 let requires_rematerialization = matches!(
905 declared_projection
906 .map(|projection| projection.state.as_str())
907 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
908 .unwrap_or_default(),
909 "declared" | "materializing" | "failed" | "stale"
910 );
911 let mut attention_reasons = Vec::new();
912 if !declared_present || !operational_present {
913 attention_reasons.push("declaration_drift".to_string());
914 }
915 if requires_rematerialization {
916 attention_reasons.push("requires_rematerialization".to_string());
917 }
918 if stale_dependent_job_count > 0 {
919 attention_reasons.push("dependent_jobs_stale".to_string());
920 }
921 let attention_score = stale_dependent_job_count.saturating_mul(2)
922 + usize::from(requires_rematerialization).saturating_mul(4)
923 + usize::from(!declared_present || !operational_present)
924 + usize::from(!queryable);
925 CatalogGraphProjectionStatus {
926 name,
927 source: declared_projection
928 .map(|projection| projection.source.clone())
929 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
930 lifecycle_state,
931 declared: declared_present,
932 operational: operational_present,
933 in_sync: declared_present == operational_present,
934 last_materialized_sequence: declared_projection
935 .and_then(|projection| projection.last_materialized_sequence)
936 .or_else(|| {
937 operational_projection
938 .and_then(|projection| projection.last_materialized_sequence)
939 }),
940 queryable,
941 requires_rematerialization,
942 dependent_job_count,
943 active_dependent_job_count,
944 stale_dependent_job_count,
945 dependent_jobs_in_sync: stale_dependent_job_count == 0,
946 rerun_required: stale_dependent_job_count > 0,
947 attention_score,
948 attention_reasons,
949 }
950 })
951 .collect()
952}
953
954fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
955 let declared = declarations
956 .map(|declarations| declarations.declared_indexes.as_slice())
957 .unwrap_or(&[]);
958 let operational = declarations
959 .map(|declarations| declarations.operational_indexes.as_slice())
960 .unwrap_or(&[]);
961
962 let mut names = declared
963 .iter()
964 .map(|index| index.name.clone())
965 .chain(operational.iter().map(|index| index.name.clone()))
966 .collect::<Vec<_>>();
967 names.sort();
968 names.dedup();
969
970 names
971 .into_iter()
972 .map(|name| {
973 let declared_index = declared.iter().find(|index| index.name == name);
974 let operational_index = operational.iter().find(|index| index.name == name);
975 let collection = declared_index
976 .and_then(|index| index.collection.clone())
977 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
978 let kind = declared_index
979 .map(|index| index_kind_string(index.kind))
980 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
981 .unwrap_or_default();
982 let enabled = declared_index
983 .map(|index| index.enabled)
984 .or_else(|| operational_index.map(|index| index.enabled))
985 .unwrap_or(false);
986 let build_state = operational_index
987 .map(|index| index.build_state.clone())
988 .or_else(|| declared_index.map(|index| index.build_state.clone()));
989 let declared_present = declared_index.is_some();
990 let operational_present = operational_index.is_some();
991 let lifecycle_state = index_lifecycle_state(
992 declared_present,
993 operational_present,
994 enabled,
995 build_state.as_deref(),
996 );
997
998 let mut attention_reasons = Vec::new();
999 if declared_present != operational_present {
1000 attention_reasons.push("declaration_drift".to_string());
1001 }
1002 if !enabled && declared_present {
1003 attention_reasons.push("disabled".to_string());
1004 }
1005 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1006 attention_reasons.push("failed".to_string());
1007 }
1008 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1009 attention_reasons.push("stale".to_string());
1010 }
1011 if matches!(
1012 build_state.as_deref().unwrap_or_default(),
1013 "building" | "declared-unbuilt"
1014 ) {
1015 attention_reasons.push("requires_rebuild".to_string());
1016 }
1017 let queryable = declared_present
1018 && operational_present
1019 && enabled
1020 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1021 let requires_rebuild = matches!(
1022 build_state.as_deref().unwrap_or_default(),
1023 "declared-unbuilt" | "building" | "stale" | "failed"
1024 );
1025 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1026 + usize::from(!enabled && declared_present)
1027 + usize::from(declared_present != operational_present)
1028 + usize::from(!queryable);
1029
1030 let artifact_state = crate::physical::ArtifactState::from_build_state(
1031 build_state.as_deref().unwrap_or("declared-unbuilt"),
1032 enabled,
1033 );
1034 CatalogIndexStatus {
1035 name,
1036 collection,
1037 kind,
1038 declared: declared_present,
1039 operational: operational_present,
1040 enabled,
1041 build_state,
1042 artifact_state,
1043 lifecycle_state,
1044 in_sync: declared_present == operational_present,
1045 queryable,
1046 requires_rebuild,
1047 attention_score,
1048 attention_reasons,
1049 }
1050 })
1051 .collect()
1052}
1053
1054fn index_lifecycle_state(
1055 declared: bool,
1056 operational: bool,
1057 enabled: bool,
1058 build_state: Option<&str>,
1059) -> String {
1060 if !declared && operational {
1061 return "orphaned-operational".to_string();
1062 }
1063 if declared && !enabled {
1064 return "disabled".to_string();
1065 }
1066 if !declared {
1067 return "unknown".to_string();
1068 }
1069 if !operational {
1070 return "declared".to_string();
1071 }
1072
1073 match build_state.unwrap_or_default() {
1074 "ready" => "ready".to_string(),
1075 "failed" => "failed".to_string(),
1076 "stale" => "stale".to_string(),
1077 "declared-unbuilt" => "declared".to_string(),
1078 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1079 "building".to_string()
1080 }
1081 _ => "building".to_string(),
1082 }
1083}
1084
1085fn index_kind_string(kind: IndexKind) -> String {
1086 match kind {
1087 IndexKind::BTree => "btree",
1088 IndexKind::Hash => "hash",
1089 IndexKind::Bitmap => "bitmap",
1090 IndexKind::Spatial => "spatial.rtree",
1091 IndexKind::VectorHnsw => "vector.hnsw",
1092 IndexKind::VectorInverted => "vector.inverted",
1093 IndexKind::GraphAdjacency => "graph.adjacency",
1094 IndexKind::FullText => "text.fulltext",
1095 IndexKind::DocumentPathValue => "document.pathvalue",
1096 IndexKind::HybridSearch => "search.hybrid",
1097 }
1098 .to_string()
1099}
1100
1101fn analytics_job_statuses(
1102 declarations: Option<&CatalogDeclarations>,
1103) -> Vec<CatalogAnalyticsJobStatus> {
1104 let declared = declarations
1105 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1106 .unwrap_or(&[]);
1107 let operational = declarations
1108 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1109 .unwrap_or(&[]);
1110 let declared_projections = declarations
1111 .map(|declarations| declarations.declared_graph_projections.as_slice())
1112 .unwrap_or(&[]);
1113 let operational_projections = declarations
1114 .map(|declarations| declarations.operational_graph_projections.as_slice())
1115 .unwrap_or(&[]);
1116
1117 let mut ids = declared
1118 .iter()
1119 .map(|job| job.id.clone())
1120 .chain(operational.iter().map(|job| job.id.clone()))
1121 .collect::<Vec<_>>();
1122 ids.sort();
1123 ids.dedup();
1124
1125 ids.into_iter()
1126 .map(|id| {
1127 let declared_job = declared.iter().find(|job| job.id == id);
1128 let operational_job = operational.iter().find(|job| job.id == id);
1129 let kind = declared_job
1130 .map(|job| job.kind.clone())
1131 .or_else(|| operational_job.map(|job| job.kind.clone()))
1132 .unwrap_or_default();
1133 let projection = declared_job
1134 .and_then(|job| job.projection.clone())
1135 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1136 let state = declared_job
1137 .map(|job| job.state.clone())
1138 .or_else(|| operational_job.map(|job| job.state.clone()))
1139 .unwrap_or_default();
1140 let declared_present = declared_job.is_some();
1141 let operational_present = operational_job.is_some();
1142 let last_run_sequence = declared_job
1143 .and_then(|job| job.last_run_sequence)
1144 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1145 let projection_declared = projection.as_ref().map(|projection_name| {
1146 declared_projections
1147 .iter()
1148 .any(|projection| projection.name == *projection_name)
1149 });
1150 let projection_operational = projection.as_ref().map(|projection_name| {
1151 operational_projections
1152 .iter()
1153 .any(|projection| projection.name == *projection_name)
1154 });
1155 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1156 declared_projections
1157 .iter()
1158 .find(|projection| projection.name == *projection_name)
1159 .map(|projection| projection.state.as_str())
1160 .or_else(|| {
1161 operational_projections
1162 .iter()
1163 .find(|projection| projection.name == *projection_name)
1164 .map(|projection| projection.state.as_str())
1165 })
1166 });
1167 let dependency_in_sync = projection.as_ref().map(|_| {
1168 matches!(projection_lifecycle, Some("materialized"))
1169 && projection_operational == Some(true)
1170 });
1171 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1172 (true, false, _) => "declared",
1173 (true, true, _)
1174 if matches!(
1175 projection_lifecycle,
1176 Some("stale" | "failed" | "materializing" | "declared")
1177 ) =>
1178 {
1179 "stale"
1180 }
1181 (true, true, "completed") => "completed",
1182 (true, true, "running") => "running",
1183 (true, true, "failed") => "failed",
1184 (true, true, "queued") => "queued",
1185 (true, true, "stale") => "stale",
1186 (true, true, _) => "materialized",
1187 (false, true, _) => "orphaned-operational",
1188 (false, false, _) => "unknown",
1189 }
1190 .to_string();
1191 let executable = declared_present
1192 && operational_present
1193 && !matches!(state.as_str(), "failed" | "stale")
1194 && dependency_in_sync.unwrap_or(true);
1195 let requires_rerun =
1196 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1197 let mut attention_reasons = Vec::new();
1198 if declared_present != operational_present {
1199 attention_reasons.push("declaration_drift".to_string());
1200 }
1201 if matches!(state.as_str(), "failed") {
1202 attention_reasons.push("failed".to_string());
1203 }
1204 if matches!(state.as_str(), "stale") {
1205 attention_reasons.push("stale".to_string());
1206 }
1207 if dependency_in_sync == Some(false) {
1208 attention_reasons.push("dependency_drift".to_string());
1209 }
1210 if requires_rerun {
1211 attention_reasons.push("requires_rerun".to_string());
1212 }
1213 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1214 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1215 + usize::from(declared_present != operational_present)
1216 + usize::from(!executable);
1217 CatalogAnalyticsJobStatus {
1218 id,
1219 kind,
1220 projection,
1221 state: state.clone(),
1222 lifecycle_state,
1223 declared: declared_present,
1224 operational: operational_present,
1225 in_sync: declared_present == operational_present,
1226 last_run_sequence,
1227 projection_declared,
1228 projection_operational,
1229 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1230 dependency_in_sync,
1231 executable,
1232 requires_rerun,
1233 attention_score,
1234 attention_reasons,
1235 }
1236 })
1237 .collect()
1238}
1239
1240pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1241 let declared_indexes = snapshot
1242 .declared_indexes
1243 .iter()
1244 .map(|index| index.name.clone())
1245 .collect::<BTreeSet<_>>();
1246 let operational_indexes = snapshot
1247 .operational_indexes
1248 .iter()
1249 .map(|index| index.name.clone())
1250 .collect::<BTreeSet<_>>();
1251 let declared_graph_projections = snapshot
1252 .declared_graph_projections
1253 .iter()
1254 .map(|projection| projection.name.clone())
1255 .collect::<BTreeSet<_>>();
1256 let operational_graph_projections = snapshot
1257 .operational_graph_projections
1258 .iter()
1259 .map(|projection| projection.name.clone())
1260 .collect::<BTreeSet<_>>();
1261 let declared_analytics_jobs = snapshot
1262 .declared_analytics_jobs
1263 .iter()
1264 .map(|job| job.id.clone())
1265 .collect::<BTreeSet<_>>();
1266 let operational_analytics_jobs = snapshot
1267 .operational_analytics_jobs
1268 .iter()
1269 .map(|job| job.id.clone())
1270 .collect::<BTreeSet<_>>();
1271
1272 CatalogConsistencyReport {
1273 declared_index_count: declared_indexes.len(),
1274 operational_index_count: operational_indexes.len(),
1275 declared_graph_projection_count: declared_graph_projections.len(),
1276 operational_graph_projection_count: operational_graph_projections.len(),
1277 declared_analytics_job_count: declared_analytics_jobs.len(),
1278 operational_analytics_job_count: operational_analytics_jobs.len(),
1279 missing_operational_indexes: declared_indexes
1280 .difference(&operational_indexes)
1281 .cloned()
1282 .collect(),
1283 undeclared_operational_indexes: operational_indexes
1284 .difference(&declared_indexes)
1285 .cloned()
1286 .collect(),
1287 missing_operational_graph_projections: declared_graph_projections
1288 .difference(&operational_graph_projections)
1289 .cloned()
1290 .collect(),
1291 undeclared_operational_graph_projections: operational_graph_projections
1292 .difference(&declared_graph_projections)
1293 .cloned()
1294 .collect(),
1295 missing_operational_analytics_jobs: declared_analytics_jobs
1296 .difference(&operational_analytics_jobs)
1297 .cloned()
1298 .collect(),
1299 undeclared_operational_analytics_jobs: operational_analytics_jobs
1300 .difference(&declared_analytics_jobs)
1301 .cloned()
1302 .collect(),
1303 }
1304}
1305
1306pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1307 CatalogAttentionSummary {
1308 collections_requiring_attention: snapshot
1309 .collections
1310 .iter()
1311 .filter(|collection| collection.attention_required)
1312 .count(),
1313 indexes_requiring_attention: snapshot
1314 .index_statuses
1315 .iter()
1316 .filter(|status| status.attention_score > 0)
1317 .count(),
1318 graph_projections_requiring_attention: snapshot
1319 .graph_projection_statuses
1320 .iter()
1321 .filter(|status| status.attention_score > 0)
1322 .count(),
1323 analytics_jobs_requiring_attention: snapshot
1324 .analytics_job_statuses
1325 .iter()
1326 .filter(|status| status.attention_score > 0)
1327 .count(),
1328 top_collection: snapshot
1329 .collections
1330 .iter()
1331 .filter(|collection| collection.attention_score > 0)
1332 .max_by_key(|collection| collection.attention_score)
1333 .cloned(),
1334 top_index: snapshot
1335 .index_statuses
1336 .iter()
1337 .filter(|status| status.attention_score > 0)
1338 .max_by_key(|status| status.attention_score)
1339 .cloned(),
1340 top_graph_projection: snapshot
1341 .graph_projection_statuses
1342 .iter()
1343 .filter(|status| status.attention_score > 0)
1344 .max_by_key(|status| status.attention_score)
1345 .cloned(),
1346 top_analytics_job: snapshot
1347 .analytics_job_statuses
1348 .iter()
1349 .filter(|status| status.attention_score > 0)
1350 .max_by_key(|status| status.attention_score)
1351 .cloned(),
1352 }
1353}
1354
1355pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1356 let mut collections = snapshot
1357 .collections
1358 .iter()
1359 .filter(|collection| collection.attention_required)
1360 .cloned()
1361 .collect::<Vec<_>>();
1362 collections.sort_by(|left, right| {
1363 right
1364 .attention_score
1365 .cmp(&left.attention_score)
1366 .then_with(|| left.name.cmp(&right.name))
1367 });
1368 collections
1369}
1370
1371pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1372 let mut statuses = snapshot
1373 .index_statuses
1374 .iter()
1375 .filter(|status| status.attention_score > 0)
1376 .cloned()
1377 .collect::<Vec<_>>();
1378 statuses.sort_by(|left, right| {
1379 right
1380 .attention_score
1381 .cmp(&left.attention_score)
1382 .then_with(|| left.name.cmp(&right.name))
1383 });
1384 statuses
1385}
1386
1387pub fn graph_projection_attention(
1388 snapshot: &CatalogModelSnapshot,
1389) -> Vec<CatalogGraphProjectionStatus> {
1390 let mut statuses = snapshot
1391 .graph_projection_statuses
1392 .iter()
1393 .filter(|status| status.attention_score > 0)
1394 .cloned()
1395 .collect::<Vec<_>>();
1396 statuses.sort_by(|left, right| {
1397 right
1398 .attention_score
1399 .cmp(&left.attention_score)
1400 .then_with(|| left.name.cmp(&right.name))
1401 });
1402 statuses
1403}
1404
1405pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1406 let mut statuses = snapshot
1407 .analytics_job_statuses
1408 .iter()
1409 .filter(|status| status.attention_score > 0)
1410 .cloned()
1411 .collect::<Vec<_>>();
1412 statuses.sort_by(|left, right| {
1413 right
1414 .attention_score
1415 .cmp(&left.attention_score)
1416 .then_with(|| left.id.cmp(&right.id))
1417 });
1418 statuses
1419}