1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct AnalyticalStorageConfig {
31 pub columnar: bool,
34 pub time_key: String,
36 pub order_by_key: Option<String>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaMode {
43 Strict,
44 SemiStructured,
45 Dynamic,
46}
47
48pub use reddb_types::catalog::{
54 AiPolicy, AnalyticsOutput, AnalyticsViewDescriptor, CollectionModel, EmbedPolicy,
55 ModerateDegradedMode, ModeratePolicy, ModerateRejectAction, SubscriptionDescriptor,
56 SubscriptionOperation, VisionPolicy,
57};
58
59#[derive(Debug, Clone)]
60pub struct CollectionDescriptor {
61 pub name: String,
62 pub model: CollectionModel,
63 pub schema_mode: SchemaMode,
64 pub contract_present: bool,
65 pub contract_origin: Option<crate::physical::ContractOrigin>,
66 pub declared_model: Option<CollectionModel>,
67 pub observed_model: CollectionModel,
68 pub queue_mode: Option<QueueMode>,
69 pub queue_max_attempts: Option<u32>,
73 pub queue_lock_deadline_ms: Option<u64>,
75 pub queue_in_flight_cap_per_group: Option<u32>,
77 pub queue_dlq_target: Option<String>,
79 pub vector_dimension: Option<usize>,
80 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
81 pub session_key: Option<String>,
85 pub session_gap_ms: Option<u64>,
88 pub retention_duration_ms: Option<u64>,
92 pub declared_schema_mode: Option<SchemaMode>,
93 pub observed_schema_mode: SchemaMode,
94 pub entities: usize,
95 pub cross_refs: usize,
96 pub segments: usize,
97 pub indices: Vec<String>,
98 pub declared_indices: Vec<String>,
99 pub operational_indices: Vec<String>,
100 pub indexes_in_sync: bool,
101 pub missing_operational_indices: Vec<String>,
102 pub undeclared_operational_indices: Vec<String>,
103 pub queryable_index_count: usize,
104 pub indexes_requiring_rebuild_count: usize,
105 pub queryable_graph_projection_count: usize,
106 pub graph_projections_requiring_rematerialization_count: usize,
107 pub executable_analytics_job_count: usize,
108 pub analytics_jobs_requiring_rerun_count: usize,
109 pub subscriptions: Vec<SubscriptionDescriptor>,
110 pub analytics_config: Vec<AnalyticsViewDescriptor>,
114 pub ai_policy: Option<AiPolicy>,
119 pub resources_in_sync: bool,
120 pub attention_required: bool,
121 pub attention_score: usize,
122 pub attention_reasons: Vec<String>,
123}
124
125#[derive(Debug, Clone)]
126pub struct CatalogModelSnapshot {
127 pub summary: CatalogSnapshot,
128 pub collections: Vec<CollectionDescriptor>,
129 pub indices: IndexCatalogSnapshot,
130 pub declared_indexes: Vec<PhysicalIndexState>,
131 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
132 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
133 pub operational_indexes: Vec<PhysicalIndexState>,
134 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
135 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
136 pub index_statuses: Vec<CatalogIndexStatus>,
137 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
138 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
139 pub queryable_index_count: usize,
140 pub indexes_requiring_rebuild_count: usize,
141 pub queryable_graph_projection_count: usize,
142 pub graph_projections_requiring_rematerialization_count: usize,
143 pub executable_analytics_job_count: usize,
144 pub analytics_jobs_requiring_rerun_count: usize,
145}
146
147#[derive(Debug, Clone)]
148pub struct CatalogAttentionSummary {
149 pub collections_requiring_attention: usize,
150 pub indexes_requiring_attention: usize,
151 pub graph_projections_requiring_attention: usize,
152 pub analytics_jobs_requiring_attention: usize,
153 pub top_collection: Option<CollectionDescriptor>,
154 pub top_index: Option<CatalogIndexStatus>,
155 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
156 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
157}
158
159#[derive(Debug, Clone, Default)]
160pub struct CatalogDeclarations {
161 pub declared_indexes: Vec<PhysicalIndexState>,
162 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
163 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
164 pub operational_indexes: Vec<PhysicalIndexState>,
165 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
166 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
167}
168
169#[derive(Debug, Clone)]
170pub struct CatalogGraphProjectionStatus {
171 pub name: String,
172 pub source: Option<String>,
173 pub lifecycle_state: String,
174 pub declared: bool,
175 pub operational: bool,
176 pub in_sync: bool,
177 pub last_materialized_sequence: Option<u64>,
178 pub queryable: bool,
179 pub requires_rematerialization: bool,
180 pub dependent_job_count: usize,
181 pub active_dependent_job_count: usize,
182 pub stale_dependent_job_count: usize,
183 pub dependent_jobs_in_sync: bool,
184 pub rerun_required: bool,
185 pub attention_score: usize,
186 pub attention_reasons: Vec<String>,
187}
188
189#[derive(Debug, Clone)]
190pub struct CatalogIndexStatus {
191 pub name: String,
192 pub collection: Option<String>,
193 pub kind: String,
194 pub declared: bool,
195 pub operational: bool,
196 pub enabled: bool,
197 pub build_state: Option<String>,
198 pub artifact_state: crate::physical::ArtifactState,
199 pub lifecycle_state: String,
200 pub in_sync: bool,
201 pub queryable: bool,
202 pub requires_rebuild: bool,
203 pub attention_score: usize,
204 pub attention_reasons: Vec<String>,
205}
206
207#[derive(Debug, Clone)]
208pub struct CatalogAnalyticsJobStatus {
209 pub id: String,
210 pub kind: String,
211 pub projection: Option<String>,
212 pub state: String,
213 pub lifecycle_state: String,
214 pub declared: bool,
215 pub operational: bool,
216 pub in_sync: bool,
217 pub last_run_sequence: Option<u64>,
218 pub projection_declared: Option<bool>,
219 pub projection_operational: Option<bool>,
220 pub projection_lifecycle_state: Option<String>,
221 pub dependency_in_sync: Option<bool>,
222 pub executable: bool,
223 pub requires_rerun: bool,
224 pub attention_score: usize,
225 pub attention_reasons: Vec<String>,
226}
227
228#[derive(Debug, Clone, Default)]
229pub struct CatalogConsistencyReport {
230 pub declared_index_count: usize,
231 pub operational_index_count: usize,
232 pub declared_graph_projection_count: usize,
233 pub operational_graph_projection_count: usize,
234 pub declared_analytics_job_count: usize,
235 pub operational_analytics_job_count: usize,
236 pub missing_operational_indexes: Vec<String>,
237 pub undeclared_operational_indexes: Vec<String>,
238 pub missing_operational_graph_projections: Vec<String>,
239 pub undeclared_operational_graph_projections: Vec<String>,
240 pub missing_operational_analytics_jobs: Vec<String>,
241 pub undeclared_operational_analytics_jobs: Vec<String>,
242}
243
244pub fn snapshot_store(
245 name: &str,
246 store: &UnifiedStore,
247 index_catalog: Option<&IndexCatalog>,
248) -> CatalogModelSnapshot {
249 snapshot_store_with_declarations(name, store, index_catalog, None, None)
250}
251
252pub fn snapshot_store_with_declarations(
253 name: &str,
254 store: &UnifiedStore,
255 index_catalog: Option<&IndexCatalog>,
256 declarations: Option<&CatalogDeclarations>,
257 contracts: Option<&[CollectionContract]>,
258) -> CatalogModelSnapshot {
259 let index_statuses = index_statuses(declarations);
260 let graph_projection_statuses = graph_projection_statuses(declarations);
261 let analytics_job_statuses = analytics_job_statuses(declarations);
262
263 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
264 for (collection, entity) in store.query_all(|_| true) {
265 grouped.entry(collection).or_default().push(entity);
266 }
267 let queue_policies = queue_policies_from_grouped(&grouped);
268
269 let mut stats_by_collection = BTreeMap::new();
270 let mut collections = Vec::new();
271
272 for collection_name in store.list_collections() {
273 let entities = grouped.remove(&collection_name).unwrap_or_default();
274 let inferred_model = infer_model(&entities);
275 let inferred_schema_mode = infer_schema_mode(inferred_model);
276 let contract = collection_contract(&collection_name, contracts);
277 let model = contract
278 .map(|contract| contract.declared_model)
279 .unwrap_or(inferred_model);
280 let cross_refs = entities
281 .iter()
282 .map(|entity| entity.cross_refs().len())
283 .sum();
284 let entity_count = entities.len();
285 let manager_stats = store
286 .get_collection(&collection_name)
287 .map(|manager| manager.stats());
288 let segments = manager_stats
289 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
290 .unwrap_or(0);
291
292 stats_by_collection.insert(
293 collection_name.clone(),
294 CollectionStats {
295 entities: entity_count,
296 cross_refs,
297 segments,
298 },
299 );
300
301 let declared_indices = declared_indices(&collection_name, declarations);
302 let operational_indices =
303 operational_indices(&collection_name, index_catalog, declarations);
304 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
305 collection_index_consistency(&declared_indices, &operational_indices);
306 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
307 collection_index_readiness(&collection_name, &index_statuses);
308 let (
309 queryable_graph_projection_count,
310 graph_projections_requiring_rematerialization_count,
311 graph_projections_locally_in_sync,
312 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
313 let (
314 executable_analytics_job_count,
315 analytics_jobs_requiring_rerun_count,
316 analytics_jobs_locally_in_sync,
317 ) = collection_analytics_job_readiness(
318 &collection_name,
319 &graph_projection_statuses,
320 &analytics_job_statuses,
321 );
322 let resources_in_sync = indexes_in_sync
323 && indexes_locally_in_sync
324 && graph_projections_locally_in_sync
325 && analytics_jobs_locally_in_sync;
326 let attention_required = !resources_in_sync
327 || indexes_requiring_rebuild_count > 0
328 || graph_projections_requiring_rematerialization_count > 0
329 || analytics_jobs_requiring_rerun_count > 0;
330 let mut attention_reasons = Vec::new();
331 if !indexes_in_sync || !indexes_locally_in_sync {
332 attention_reasons.push("index_drift".to_string());
333 }
334 if indexes_requiring_rebuild_count > 0 {
335 attention_reasons.push("indexes_require_rebuild".to_string());
336 }
337 if !graph_projections_locally_in_sync {
338 attention_reasons.push("graph_projection_drift".to_string());
339 }
340 if graph_projections_requiring_rematerialization_count > 0 {
341 attention_reasons.push("graph_projections_require_rematerialization".to_string());
342 }
343 if !analytics_jobs_locally_in_sync {
344 attention_reasons.push("analytics_job_drift".to_string());
345 }
346 if analytics_jobs_requiring_rerun_count > 0 {
347 attention_reasons.push("analytics_jobs_require_rerun".to_string());
348 }
349 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
350 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
351 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
352 + usize::from(!resources_in_sync);
353
354 collections.push(CollectionDescriptor {
355 name: collection_name.clone(),
356 model,
357 schema_mode: contract
358 .map(|contract| contract.schema_mode)
359 .unwrap_or(inferred_schema_mode),
360 contract_present: contract.is_some(),
361 contract_origin: contract.map(|contract| contract.origin),
362 declared_model: contract.map(|contract| contract.declared_model),
363 observed_model: inferred_model,
364 queue_mode: if model == CollectionModel::Queue {
365 Some(
366 queue_policies
367 .get(&collection_name)
368 .map(|p| p.mode)
369 .unwrap_or_default(),
370 )
371 } else {
372 None
373 },
374 queue_max_attempts: if model == CollectionModel::Queue {
375 Some(
376 queue_policies
377 .get(&collection_name)
378 .map(|p| p.max_attempts)
379 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
380 )
381 } else {
382 None
383 },
384 queue_lock_deadline_ms: if model == CollectionModel::Queue {
385 Some(
386 queue_policies
387 .get(&collection_name)
388 .map(|p| p.lock_deadline_ms)
389 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
390 )
391 } else {
392 None
393 },
394 queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
395 Some(
396 queue_policies
397 .get(&collection_name)
398 .map(|p| p.in_flight_cap_per_group)
399 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
400 )
401 } else {
402 None
403 },
404 queue_dlq_target: if model == CollectionModel::Queue {
405 queue_policies
406 .get(&collection_name)
407 .and_then(|p| p.dlq_target.clone())
408 } else {
409 None
410 },
411 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
412 vector_metric: contract.and_then(|contract| contract.vector_metric),
413 session_key: contract.and_then(|contract| contract.session_key.clone()),
414 session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
415 retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
416 declared_schema_mode: contract.map(|contract| contract.schema_mode),
417 observed_schema_mode: inferred_schema_mode,
418 entities: entity_count,
419 cross_refs,
420 segments,
421 indices: infer_indices(&collection_name, model, index_catalog, declarations),
422 declared_indices,
423 operational_indices,
424 indexes_in_sync,
425 missing_operational_indices,
426 undeclared_operational_indices,
427 queryable_index_count,
428 indexes_requiring_rebuild_count,
429 queryable_graph_projection_count,
430 graph_projections_requiring_rematerialization_count,
431 executable_analytics_job_count,
432 analytics_jobs_requiring_rerun_count,
433 subscriptions: contract
434 .map(|contract| contract.subscriptions.clone())
435 .unwrap_or_default(),
436 analytics_config: contract
437 .map(|contract| contract.analytics_config.clone())
438 .unwrap_or_default(),
439 ai_policy: contract.and_then(|contract| contract.ai_policy.clone()),
440 resources_in_sync,
441 attention_required,
442 attention_score,
443 attention_reasons,
444 });
445 }
446
447 collections.sort_by(|left, right| left.name.cmp(&right.name));
448
449 let summary = CatalogSnapshot {
450 name: name.to_string(),
451 total_entities: stats_by_collection
452 .values()
453 .map(|stats| stats.entities)
454 .sum(),
455 total_collections: stats_by_collection.len(),
456 stats_by_collection,
457 updated_at: SystemTime::now(),
458 };
459
460 CatalogModelSnapshot {
461 summary,
462 collections,
463 indices: index_catalog
464 .map(IndexCatalog::snapshot)
465 .unwrap_or_default(),
466 declared_indexes: declarations
467 .map(|declarations| declarations.declared_indexes.clone())
468 .unwrap_or_default(),
469 declared_graph_projections: declarations
470 .map(|declarations| declarations.declared_graph_projections.clone())
471 .unwrap_or_default(),
472 declared_analytics_jobs: declarations
473 .map(|declarations| declarations.declared_analytics_jobs.clone())
474 .unwrap_or_default(),
475 operational_indexes: declarations
476 .map(|declarations| declarations.operational_indexes.clone())
477 .unwrap_or_default(),
478 operational_graph_projections: declarations
479 .map(|declarations| declarations.operational_graph_projections.clone())
480 .unwrap_or_default(),
481 operational_analytics_jobs: declarations
482 .map(|declarations| declarations.operational_analytics_jobs.clone())
483 .unwrap_or_default(),
484 queryable_index_count: index_statuses
485 .iter()
486 .filter(|status| status.queryable)
487 .count(),
488 indexes_requiring_rebuild_count: index_statuses
489 .iter()
490 .filter(|status| status.requires_rebuild)
491 .count(),
492 queryable_graph_projection_count: graph_projection_statuses
493 .iter()
494 .filter(|status| status.queryable)
495 .count(),
496 graph_projections_requiring_rematerialization_count: graph_projection_statuses
497 .iter()
498 .filter(|status| status.requires_rematerialization)
499 .count(),
500 executable_analytics_job_count: analytics_job_statuses
501 .iter()
502 .filter(|status| status.executable)
503 .count(),
504 analytics_jobs_requiring_rerun_count: analytics_job_statuses
505 .iter()
506 .filter(|status| status.requires_rerun)
507 .count(),
508 index_statuses,
509 graph_projection_statuses,
510 analytics_job_statuses,
511 }
512}
513
514#[derive(Debug, Clone)]
518pub struct QueuePolicySnapshot {
519 pub mode: QueueMode,
520 pub max_attempts: u32,
521 pub lock_deadline_ms: u64,
522 pub in_flight_cap_per_group: u32,
523 pub dlq_target: Option<String>,
524}
525
526fn queue_policies_from_grouped(
527 grouped: &HashMap<String, Vec<UnifiedEntity>>,
528) -> HashMap<String, QueuePolicySnapshot> {
529 let Some(meta) = grouped.get("red_queue_meta") else {
530 return HashMap::new();
531 };
532
533 let mut policies = HashMap::new();
534 for entity in meta {
535 let Some(row) = entity.data.as_row() else {
536 continue;
537 };
538 if row_text(row, "kind").as_deref() != Some("queue_config") {
539 continue;
540 }
541 let Some(queue) = row_text(row, "queue") else {
542 continue;
543 };
544 let mode = row_text(row, "mode")
545 .as_deref()
546 .and_then(QueueMode::parse)
547 .unwrap_or_default();
548 let max_attempts = row_u64(row, "max_attempts")
549 .map(|v| v as u32)
550 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
551 let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
552 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
553 let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
554 .map(|v| v as u32)
555 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
556 let dlq_target = row_text(row, "dlq");
557 policies.insert(
558 queue,
559 QueuePolicySnapshot {
560 mode,
561 max_attempts,
562 lock_deadline_ms,
563 in_flight_cap_per_group,
564 dlq_target,
565 },
566 );
567 }
568 policies
569}
570
571fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
572 match row.get_field(key) {
573 Some(Value::UnsignedInteger(v)) => Some(*v),
574 Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
575 _ => None,
576 }
577}
578
579fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
580 match row.get_field(key) {
581 Some(Value::Text(value)) => Some(value.to_string()),
582 _ => None,
583 }
584}
585
586fn collection_contract<'a>(
587 collection_name: &str,
588 contracts: Option<&'a [CollectionContract]>,
589) -> Option<&'a CollectionContract> {
590 contracts.and_then(|contracts| {
591 contracts
592 .iter()
593 .find(|contract| contract.name == collection_name)
594 })
595}
596
597fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
598 let mut has_table = false;
599 let mut has_graph = false;
600 let mut has_vector = false;
601
602 for entity in entities {
603 match &entity.kind {
604 EntityKind::TableRow { .. } => has_table = true,
605 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
606 EntityKind::Vector { .. } => has_vector = true,
607 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
608 }
609 }
610
611 match (has_table, has_graph, has_vector) {
612 (true, false, false) => CollectionModel::Table,
613 (false, true, false) => CollectionModel::Graph,
614 (false, false, true) => CollectionModel::Vector,
615 _ => CollectionModel::Mixed,
616 }
617}
618
619fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
620 match model {
621 CollectionModel::Table => SchemaMode::Strict,
622 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
623 CollectionModel::Document
624 | CollectionModel::Hll
625 | CollectionModel::Sketch
626 | CollectionModel::Filter
627 | CollectionModel::Kv
628 | CollectionModel::Config
629 | CollectionModel::Vault
630 | CollectionModel::Mixed => SchemaMode::Dynamic,
631 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
632 CollectionModel::Queue => SchemaMode::Dynamic,
633 }
634}
635
636fn infer_indices(
637 collection_name: &str,
638 model: CollectionModel,
639 index_catalog: Option<&IndexCatalog>,
640 declarations: Option<&CatalogDeclarations>,
641) -> Vec<String> {
642 let declared = declared_indices(collection_name, declarations);
643 if !declared.is_empty() {
644 return declared;
645 }
646
647 let available = index_catalog
648 .map(IndexCatalog::snapshot)
649 .unwrap_or_default();
650 let mut selected = Vec::new();
651
652 for metric in available {
653 let relevant = matches!(
654 (model, metric.kind),
655 (_, IndexKind::BTree)
656 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
657 | (CollectionModel::Vector, IndexKind::VectorHnsw)
658 | (CollectionModel::Vector, IndexKind::VectorInverted)
659 | (CollectionModel::Vector, IndexKind::VectorTurbo)
660 | (CollectionModel::Document, IndexKind::FullText)
661 | (CollectionModel::Document, IndexKind::DocumentPathValue)
662 | (CollectionModel::Kv, IndexKind::Hash)
663 | (CollectionModel::Config, IndexKind::Hash)
664 | (CollectionModel::Vault, IndexKind::Hash)
665 | (CollectionModel::Mixed, _)
666 );
667
668 if relevant && metric.enabled {
669 selected.push(metric.name);
670 }
671 }
672
673 selected
674}
675
676fn declared_indices(
677 collection_name: &str,
678 declarations: Option<&CatalogDeclarations>,
679) -> Vec<String> {
680 let mut selected = declarations
681 .map(|declarations| {
682 declarations
683 .declared_indexes
684 .iter()
685 .filter(|index| index.collection.as_deref() == Some(collection_name))
686 .map(|index| index.name.clone())
687 .collect::<Vec<_>>()
688 })
689 .unwrap_or_default();
690 selected.sort();
691 selected.dedup();
692 selected
693}
694
695fn operational_indices(
696 collection_name: &str,
697 index_catalog: Option<&IndexCatalog>,
698 declarations: Option<&CatalogDeclarations>,
699) -> Vec<String> {
700 if let Some(declarations) = declarations {
701 let mut selected = declarations
702 .operational_indexes
703 .iter()
704 .filter(|index| index.collection.as_deref() == Some(collection_name))
705 .map(|index| index.name.clone())
706 .collect::<Vec<_>>();
707 selected.sort();
708 selected.dedup();
709 return selected;
710 }
711
712 let mut selected = index_catalog
713 .map(IndexCatalog::snapshot)
714 .unwrap_or_default()
715 .into_iter()
716 .filter(|metric| metric.enabled)
717 .filter_map(|metric| {
718 if metric.name.starts_with(collection_name) {
719 Some(metric.name)
720 } else {
721 None
722 }
723 })
724 .collect::<Vec<_>>();
725 selected.sort();
726 selected.dedup();
727 selected
728}
729
730fn collection_index_consistency(
731 declared_indices: &[String],
732 operational_indices: &[String],
733) -> (bool, Vec<String>, Vec<String>) {
734 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
735 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
736 let missing_operational_indices = declared
737 .difference(&operational)
738 .cloned()
739 .collect::<Vec<_>>();
740 let undeclared_operational_indices = operational
741 .difference(&declared)
742 .cloned()
743 .collect::<Vec<_>>();
744 (
745 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
746 missing_operational_indices,
747 undeclared_operational_indices,
748 )
749}
750
751fn collection_index_readiness(
752 collection_name: &str,
753 statuses: &[CatalogIndexStatus],
754) -> (usize, usize, bool) {
755 let local = statuses
756 .iter()
757 .filter(|status| status.collection.as_deref() == Some(collection_name))
758 .collect::<Vec<_>>();
759 let queryable_count = local.iter().filter(|status| status.queryable).count();
760 let requires_rebuild_count = local
761 .iter()
762 .filter(|status| status.requires_rebuild)
763 .count();
764 let locally_in_sync = local.iter().all(|status| status.in_sync);
765 (queryable_count, requires_rebuild_count, locally_in_sync)
766}
767
768fn collection_graph_projection_readiness(
769 collection_name: &str,
770 statuses: &[CatalogGraphProjectionStatus],
771) -> (usize, usize, bool) {
772 let local = statuses
773 .iter()
774 .filter(|status| status.source.as_deref() == Some(collection_name))
775 .collect::<Vec<_>>();
776 let queryable_count = local.iter().filter(|status| status.queryable).count();
777 let requires_rematerialization_count = local
778 .iter()
779 .filter(|status| status.requires_rematerialization)
780 .count();
781 let locally_in_sync = local
782 .iter()
783 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
784 (
785 queryable_count,
786 requires_rematerialization_count,
787 locally_in_sync,
788 )
789}
790
791fn collection_analytics_job_readiness(
792 collection_name: &str,
793 projection_statuses: &[CatalogGraphProjectionStatus],
794 job_statuses: &[CatalogAnalyticsJobStatus],
795) -> (usize, usize, bool) {
796 let local = job_statuses
797 .iter()
798 .filter(|status| {
799 let Some(projection_name) = status.projection.as_deref() else {
800 return false;
801 };
802 projection_statuses
803 .iter()
804 .find(|projection| projection.name == projection_name)
805 .and_then(|projection| projection.source.as_deref())
806 == Some(collection_name)
807 })
808 .collect::<Vec<_>>();
809 let executable_count = local.iter().filter(|status| status.executable).count();
810 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
811 let locally_in_sync = local
812 .iter()
813 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
814 (executable_count, requires_rerun_count, locally_in_sync)
815}
816
817fn graph_projection_statuses(
818 declarations: Option<&CatalogDeclarations>,
819) -> Vec<CatalogGraphProjectionStatus> {
820 let declared = declarations
821 .map(|declarations| declarations.declared_graph_projections.as_slice())
822 .unwrap_or(&[]);
823 let operational = declarations
824 .map(|declarations| declarations.operational_graph_projections.as_slice())
825 .unwrap_or(&[]);
826 let declared_jobs = declarations
827 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
828 .unwrap_or(&[]);
829 let operational_jobs = declarations
830 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
831 .unwrap_or(&[]);
832
833 let mut names = declared
834 .iter()
835 .map(|projection| projection.name.clone())
836 .chain(operational.iter().map(|projection| projection.name.clone()))
837 .collect::<Vec<_>>();
838 names.sort();
839 names.dedup();
840
841 names
842 .into_iter()
843 .map(|name| {
844 let declared_projection = declared.iter().find(|projection| projection.name == name);
845 let operational_projection = operational
846 .iter()
847 .find(|projection| projection.name == name);
848 let declared_present = declared_projection.is_some();
849 let operational_present = operational_projection.is_some();
850 let mut dependent_job_ids = BTreeSet::new();
851 let mut dependent_job_count = 0usize;
852 let mut active_dependent_job_count = 0usize;
853 let mut stale_dependent_job_count = 0usize;
854 for job in declared_jobs
855 .iter()
856 .chain(operational_jobs.iter())
857 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
858 {
859 if !dependent_job_ids.insert(job.id.clone()) {
860 continue;
861 }
862 dependent_job_count += 1;
863 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
864 active_dependent_job_count += 1;
865 }
866 if job.state == "stale" {
867 stale_dependent_job_count += 1;
868 }
869 }
870 let lifecycle_state = match (
871 declared_present,
872 operational_present,
873 declared_projection
874 .map(|projection| projection.state.as_str())
875 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
876 .unwrap_or_default(),
877 ) {
878 (true, _, "failed") => "failed",
879 (true, true, "stale") => "stale",
880 (true, _, "materializing") => "materializing",
881 (true, true, "materialized") => "materialized",
882 (true, true, _) => "materialized",
883 (false, true, _) => "orphaned-operational",
884 (true, false, _) => "declared",
885 (false, false, _) => "unknown",
886 }
887 .to_string();
888 let queryable = declared_present
889 && operational_present
890 && matches!(
891 declared_projection
892 .map(|projection| projection.state.as_str())
893 .or_else(
894 || operational_projection.map(|projection| projection.state.as_str())
895 )
896 .unwrap_or_default(),
897 "materialized"
898 );
899 let requires_rematerialization = matches!(
900 declared_projection
901 .map(|projection| projection.state.as_str())
902 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
903 .unwrap_or_default(),
904 "declared" | "materializing" | "failed" | "stale"
905 );
906 let mut attention_reasons = Vec::new();
907 if !declared_present || !operational_present {
908 attention_reasons.push("declaration_drift".to_string());
909 }
910 if requires_rematerialization {
911 attention_reasons.push("requires_rematerialization".to_string());
912 }
913 if stale_dependent_job_count > 0 {
914 attention_reasons.push("dependent_jobs_stale".to_string());
915 }
916 let attention_score = stale_dependent_job_count.saturating_mul(2)
917 + usize::from(requires_rematerialization).saturating_mul(4)
918 + usize::from(!declared_present || !operational_present)
919 + usize::from(!queryable);
920 CatalogGraphProjectionStatus {
921 name,
922 source: declared_projection
923 .map(|projection| projection.source.clone())
924 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
925 lifecycle_state,
926 declared: declared_present,
927 operational: operational_present,
928 in_sync: declared_present == operational_present,
929 last_materialized_sequence: declared_projection
930 .and_then(|projection| projection.last_materialized_sequence)
931 .or_else(|| {
932 operational_projection
933 .and_then(|projection| projection.last_materialized_sequence)
934 }),
935 queryable,
936 requires_rematerialization,
937 dependent_job_count,
938 active_dependent_job_count,
939 stale_dependent_job_count,
940 dependent_jobs_in_sync: stale_dependent_job_count == 0,
941 rerun_required: stale_dependent_job_count > 0,
942 attention_score,
943 attention_reasons,
944 }
945 })
946 .collect()
947}
948
949fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
950 let declared = declarations
951 .map(|declarations| declarations.declared_indexes.as_slice())
952 .unwrap_or(&[]);
953 let operational = declarations
954 .map(|declarations| declarations.operational_indexes.as_slice())
955 .unwrap_or(&[]);
956
957 let mut names = declared
958 .iter()
959 .map(|index| index.name.clone())
960 .chain(operational.iter().map(|index| index.name.clone()))
961 .collect::<Vec<_>>();
962 names.sort();
963 names.dedup();
964
965 names
966 .into_iter()
967 .map(|name| {
968 let declared_index = declared.iter().find(|index| index.name == name);
969 let operational_index = operational.iter().find(|index| index.name == name);
970 let collection = declared_index
971 .and_then(|index| index.collection.clone())
972 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
973 let kind = declared_index
974 .map(|index| index_kind_string(index.kind))
975 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
976 .unwrap_or_default();
977 let enabled = declared_index
978 .map(|index| index.enabled)
979 .or_else(|| operational_index.map(|index| index.enabled))
980 .unwrap_or(false);
981 let build_state = operational_index
982 .map(|index| index.build_state.clone())
983 .or_else(|| declared_index.map(|index| index.build_state.clone()));
984 let declared_present = declared_index.is_some();
985 let operational_present = operational_index.is_some();
986 let lifecycle_state = index_lifecycle_state(
987 declared_present,
988 operational_present,
989 enabled,
990 build_state.as_deref(),
991 );
992
993 let mut attention_reasons = Vec::new();
994 if declared_present != operational_present {
995 attention_reasons.push("declaration_drift".to_string());
996 }
997 if !enabled && declared_present {
998 attention_reasons.push("disabled".to_string());
999 }
1000 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1001 attention_reasons.push("failed".to_string());
1002 }
1003 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1004 attention_reasons.push("stale".to_string());
1005 }
1006 if matches!(
1007 build_state.as_deref().unwrap_or_default(),
1008 "building" | "declared-unbuilt"
1009 ) {
1010 attention_reasons.push("requires_rebuild".to_string());
1011 }
1012 let queryable = declared_present
1013 && operational_present
1014 && enabled
1015 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1016 let requires_rebuild = matches!(
1017 build_state.as_deref().unwrap_or_default(),
1018 "declared-unbuilt" | "building" | "stale" | "failed"
1019 );
1020 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1021 + usize::from(!enabled && declared_present)
1022 + usize::from(declared_present != operational_present)
1023 + usize::from(!queryable);
1024
1025 let artifact_state = crate::physical::ArtifactState::from_build_state(
1026 build_state.as_deref().unwrap_or("declared-unbuilt"),
1027 enabled,
1028 );
1029 CatalogIndexStatus {
1030 name,
1031 collection,
1032 kind,
1033 declared: declared_present,
1034 operational: operational_present,
1035 enabled,
1036 build_state,
1037 artifact_state,
1038 lifecycle_state,
1039 in_sync: declared_present == operational_present,
1040 queryable,
1041 requires_rebuild,
1042 attention_score,
1043 attention_reasons,
1044 }
1045 })
1046 .collect()
1047}
1048
1049fn index_lifecycle_state(
1050 declared: bool,
1051 operational: bool,
1052 enabled: bool,
1053 build_state: Option<&str>,
1054) -> String {
1055 if !declared && operational {
1056 return "orphaned-operational".to_string();
1057 }
1058 if declared && !enabled {
1059 return "disabled".to_string();
1060 }
1061 if !declared {
1062 return "unknown".to_string();
1063 }
1064 if !operational {
1065 return "declared".to_string();
1066 }
1067
1068 match build_state.unwrap_or_default() {
1069 "ready" => "ready".to_string(),
1070 "failed" => "failed".to_string(),
1071 "stale" => "stale".to_string(),
1072 "declared-unbuilt" => "declared".to_string(),
1073 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1074 "building".to_string()
1075 }
1076 _ => "building".to_string(),
1077 }
1078}
1079
1080fn index_kind_string(kind: IndexKind) -> String {
1081 match kind {
1082 IndexKind::BTree => "btree",
1083 IndexKind::Hash => "hash",
1084 IndexKind::Bitmap => "bitmap",
1085 IndexKind::Spatial => "spatial.rtree",
1086 IndexKind::VectorHnsw => "vector.hnsw",
1087 IndexKind::VectorInverted => "vector.inverted",
1088 IndexKind::VectorTurbo => "vector.turbo",
1089 IndexKind::GraphAdjacency => "graph.adjacency",
1090 IndexKind::FullText => "text.fulltext",
1091 IndexKind::DocumentPathValue => "document.pathvalue",
1092 IndexKind::HybridSearch => "search.hybrid",
1093 }
1094 .to_string()
1095}
1096
1097fn analytics_job_statuses(
1098 declarations: Option<&CatalogDeclarations>,
1099) -> Vec<CatalogAnalyticsJobStatus> {
1100 let declared = declarations
1101 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1102 .unwrap_or(&[]);
1103 let operational = declarations
1104 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1105 .unwrap_or(&[]);
1106 let declared_projections = declarations
1107 .map(|declarations| declarations.declared_graph_projections.as_slice())
1108 .unwrap_or(&[]);
1109 let operational_projections = declarations
1110 .map(|declarations| declarations.operational_graph_projections.as_slice())
1111 .unwrap_or(&[]);
1112
1113 let mut ids = declared
1114 .iter()
1115 .map(|job| job.id.clone())
1116 .chain(operational.iter().map(|job| job.id.clone()))
1117 .collect::<Vec<_>>();
1118 ids.sort();
1119 ids.dedup();
1120
1121 ids.into_iter()
1122 .map(|id| {
1123 let declared_job = declared.iter().find(|job| job.id == id);
1124 let operational_job = operational.iter().find(|job| job.id == id);
1125 let kind = declared_job
1126 .map(|job| job.kind.clone())
1127 .or_else(|| operational_job.map(|job| job.kind.clone()))
1128 .unwrap_or_default();
1129 let projection = declared_job
1130 .and_then(|job| job.projection.clone())
1131 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1132 let state = declared_job
1133 .map(|job| job.state.clone())
1134 .or_else(|| operational_job.map(|job| job.state.clone()))
1135 .unwrap_or_default();
1136 let declared_present = declared_job.is_some();
1137 let operational_present = operational_job.is_some();
1138 let last_run_sequence = declared_job
1139 .and_then(|job| job.last_run_sequence)
1140 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1141 let projection_declared = projection.as_ref().map(|projection_name| {
1142 declared_projections
1143 .iter()
1144 .any(|projection| projection.name == *projection_name)
1145 });
1146 let projection_operational = projection.as_ref().map(|projection_name| {
1147 operational_projections
1148 .iter()
1149 .any(|projection| projection.name == *projection_name)
1150 });
1151 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1152 declared_projections
1153 .iter()
1154 .find(|projection| projection.name == *projection_name)
1155 .map(|projection| projection.state.as_str())
1156 .or_else(|| {
1157 operational_projections
1158 .iter()
1159 .find(|projection| projection.name == *projection_name)
1160 .map(|projection| projection.state.as_str())
1161 })
1162 });
1163 let dependency_in_sync = projection.as_ref().map(|_| {
1164 matches!(projection_lifecycle, Some("materialized"))
1165 && projection_operational == Some(true)
1166 });
1167 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1168 (true, false, _) => "declared",
1169 (true, true, _)
1170 if matches!(
1171 projection_lifecycle,
1172 Some("stale" | "failed" | "materializing" | "declared")
1173 ) =>
1174 {
1175 "stale"
1176 }
1177 (true, true, "completed") => "completed",
1178 (true, true, "running") => "running",
1179 (true, true, "failed") => "failed",
1180 (true, true, "queued") => "queued",
1181 (true, true, "stale") => "stale",
1182 (true, true, _) => "materialized",
1183 (false, true, _) => "orphaned-operational",
1184 (false, false, _) => "unknown",
1185 }
1186 .to_string();
1187 let executable = declared_present
1188 && operational_present
1189 && !matches!(state.as_str(), "failed" | "stale")
1190 && dependency_in_sync.unwrap_or(true);
1191 let requires_rerun =
1192 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1193 let mut attention_reasons = Vec::new();
1194 if declared_present != operational_present {
1195 attention_reasons.push("declaration_drift".to_string());
1196 }
1197 if matches!(state.as_str(), "failed") {
1198 attention_reasons.push("failed".to_string());
1199 }
1200 if matches!(state.as_str(), "stale") {
1201 attention_reasons.push("stale".to_string());
1202 }
1203 if dependency_in_sync == Some(false) {
1204 attention_reasons.push("dependency_drift".to_string());
1205 }
1206 if requires_rerun {
1207 attention_reasons.push("requires_rerun".to_string());
1208 }
1209 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1210 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1211 + usize::from(declared_present != operational_present)
1212 + usize::from(!executable);
1213 CatalogAnalyticsJobStatus {
1214 id,
1215 kind,
1216 projection,
1217 state: state.clone(),
1218 lifecycle_state,
1219 declared: declared_present,
1220 operational: operational_present,
1221 in_sync: declared_present == operational_present,
1222 last_run_sequence,
1223 projection_declared,
1224 projection_operational,
1225 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1226 dependency_in_sync,
1227 executable,
1228 requires_rerun,
1229 attention_score,
1230 attention_reasons,
1231 }
1232 })
1233 .collect()
1234}
1235
1236pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1237 let declared_indexes = snapshot
1238 .declared_indexes
1239 .iter()
1240 .map(|index| index.name.clone())
1241 .collect::<BTreeSet<_>>();
1242 let operational_indexes = snapshot
1243 .operational_indexes
1244 .iter()
1245 .map(|index| index.name.clone())
1246 .collect::<BTreeSet<_>>();
1247 let declared_graph_projections = snapshot
1248 .declared_graph_projections
1249 .iter()
1250 .map(|projection| projection.name.clone())
1251 .collect::<BTreeSet<_>>();
1252 let operational_graph_projections = snapshot
1253 .operational_graph_projections
1254 .iter()
1255 .map(|projection| projection.name.clone())
1256 .collect::<BTreeSet<_>>();
1257 let declared_analytics_jobs = snapshot
1258 .declared_analytics_jobs
1259 .iter()
1260 .map(|job| job.id.clone())
1261 .collect::<BTreeSet<_>>();
1262 let operational_analytics_jobs = snapshot
1263 .operational_analytics_jobs
1264 .iter()
1265 .map(|job| job.id.clone())
1266 .collect::<BTreeSet<_>>();
1267
1268 CatalogConsistencyReport {
1269 declared_index_count: declared_indexes.len(),
1270 operational_index_count: operational_indexes.len(),
1271 declared_graph_projection_count: declared_graph_projections.len(),
1272 operational_graph_projection_count: operational_graph_projections.len(),
1273 declared_analytics_job_count: declared_analytics_jobs.len(),
1274 operational_analytics_job_count: operational_analytics_jobs.len(),
1275 missing_operational_indexes: declared_indexes
1276 .difference(&operational_indexes)
1277 .cloned()
1278 .collect(),
1279 undeclared_operational_indexes: operational_indexes
1280 .difference(&declared_indexes)
1281 .cloned()
1282 .collect(),
1283 missing_operational_graph_projections: declared_graph_projections
1284 .difference(&operational_graph_projections)
1285 .cloned()
1286 .collect(),
1287 undeclared_operational_graph_projections: operational_graph_projections
1288 .difference(&declared_graph_projections)
1289 .cloned()
1290 .collect(),
1291 missing_operational_analytics_jobs: declared_analytics_jobs
1292 .difference(&operational_analytics_jobs)
1293 .cloned()
1294 .collect(),
1295 undeclared_operational_analytics_jobs: operational_analytics_jobs
1296 .difference(&declared_analytics_jobs)
1297 .cloned()
1298 .collect(),
1299 }
1300}
1301
1302pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1303 CatalogAttentionSummary {
1304 collections_requiring_attention: snapshot
1305 .collections
1306 .iter()
1307 .filter(|collection| collection.attention_required)
1308 .count(),
1309 indexes_requiring_attention: snapshot
1310 .index_statuses
1311 .iter()
1312 .filter(|status| status.attention_score > 0)
1313 .count(),
1314 graph_projections_requiring_attention: snapshot
1315 .graph_projection_statuses
1316 .iter()
1317 .filter(|status| status.attention_score > 0)
1318 .count(),
1319 analytics_jobs_requiring_attention: snapshot
1320 .analytics_job_statuses
1321 .iter()
1322 .filter(|status| status.attention_score > 0)
1323 .count(),
1324 top_collection: snapshot
1325 .collections
1326 .iter()
1327 .filter(|collection| collection.attention_score > 0)
1328 .max_by_key(|collection| collection.attention_score)
1329 .cloned(),
1330 top_index: snapshot
1331 .index_statuses
1332 .iter()
1333 .filter(|status| status.attention_score > 0)
1334 .max_by_key(|status| status.attention_score)
1335 .cloned(),
1336 top_graph_projection: snapshot
1337 .graph_projection_statuses
1338 .iter()
1339 .filter(|status| status.attention_score > 0)
1340 .max_by_key(|status| status.attention_score)
1341 .cloned(),
1342 top_analytics_job: snapshot
1343 .analytics_job_statuses
1344 .iter()
1345 .filter(|status| status.attention_score > 0)
1346 .max_by_key(|status| status.attention_score)
1347 .cloned(),
1348 }
1349}
1350
1351pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1352 let mut collections = snapshot
1353 .collections
1354 .iter()
1355 .filter(|collection| collection.attention_required)
1356 .cloned()
1357 .collect::<Vec<_>>();
1358 collections.sort_by(|left, right| {
1359 right
1360 .attention_score
1361 .cmp(&left.attention_score)
1362 .then_with(|| left.name.cmp(&right.name))
1363 });
1364 collections
1365}
1366
1367pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1368 let mut statuses = snapshot
1369 .index_statuses
1370 .iter()
1371 .filter(|status| status.attention_score > 0)
1372 .cloned()
1373 .collect::<Vec<_>>();
1374 statuses.sort_by(|left, right| {
1375 right
1376 .attention_score
1377 .cmp(&left.attention_score)
1378 .then_with(|| left.name.cmp(&right.name))
1379 });
1380 statuses
1381}
1382
1383pub fn graph_projection_attention(
1384 snapshot: &CatalogModelSnapshot,
1385) -> Vec<CatalogGraphProjectionStatus> {
1386 let mut statuses = snapshot
1387 .graph_projection_statuses
1388 .iter()
1389 .filter(|status| status.attention_score > 0)
1390 .cloned()
1391 .collect::<Vec<_>>();
1392 statuses.sort_by(|left, right| {
1393 right
1394 .attention_score
1395 .cmp(&left.attention_score)
1396 .then_with(|| left.name.cmp(&right.name))
1397 });
1398 statuses
1399}
1400
1401pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1402 let mut statuses = snapshot
1403 .analytics_job_statuses
1404 .iter()
1405 .filter(|status| status.attention_score > 0)
1406 .cloned()
1407 .collect::<Vec<_>>();
1408 statuses.sort_by(|left, right| {
1409 right
1410 .attention_score
1411 .cmp(&left.attention_score)
1412 .then_with(|| left.id.cmp(&right.id))
1413 });
1414 statuses
1415}