1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct AnalyticalStorageConfig {
31 pub columnar: bool,
34 pub time_key: String,
36 pub order_by_key: Option<String>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaMode {
43 Strict,
44 SemiStructured,
45 Dynamic,
46}
47
48pub use reddb_types::catalog::{
54 AnalyticsOutput, AnalyticsViewDescriptor, CollectionModel, SubscriptionDescriptor,
55 SubscriptionOperation,
56};
57
58#[derive(Debug, Clone)]
59pub struct CollectionDescriptor {
60 pub name: String,
61 pub model: CollectionModel,
62 pub schema_mode: SchemaMode,
63 pub contract_present: bool,
64 pub contract_origin: Option<crate::physical::ContractOrigin>,
65 pub declared_model: Option<CollectionModel>,
66 pub observed_model: CollectionModel,
67 pub queue_mode: Option<QueueMode>,
68 pub queue_max_attempts: Option<u32>,
72 pub queue_lock_deadline_ms: Option<u64>,
74 pub queue_in_flight_cap_per_group: Option<u32>,
76 pub queue_dlq_target: Option<String>,
78 pub vector_dimension: Option<usize>,
79 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
80 pub session_key: Option<String>,
84 pub session_gap_ms: Option<u64>,
87 pub retention_duration_ms: Option<u64>,
91 pub declared_schema_mode: Option<SchemaMode>,
92 pub observed_schema_mode: SchemaMode,
93 pub entities: usize,
94 pub cross_refs: usize,
95 pub segments: usize,
96 pub indices: Vec<String>,
97 pub declared_indices: Vec<String>,
98 pub operational_indices: Vec<String>,
99 pub indexes_in_sync: bool,
100 pub missing_operational_indices: Vec<String>,
101 pub undeclared_operational_indices: Vec<String>,
102 pub queryable_index_count: usize,
103 pub indexes_requiring_rebuild_count: usize,
104 pub queryable_graph_projection_count: usize,
105 pub graph_projections_requiring_rematerialization_count: usize,
106 pub executable_analytics_job_count: usize,
107 pub analytics_jobs_requiring_rerun_count: usize,
108 pub subscriptions: Vec<SubscriptionDescriptor>,
109 pub analytics_config: Vec<AnalyticsViewDescriptor>,
113 pub resources_in_sync: bool,
114 pub attention_required: bool,
115 pub attention_score: usize,
116 pub attention_reasons: Vec<String>,
117}
118
119#[derive(Debug, Clone)]
120pub struct CatalogModelSnapshot {
121 pub summary: CatalogSnapshot,
122 pub collections: Vec<CollectionDescriptor>,
123 pub indices: IndexCatalogSnapshot,
124 pub declared_indexes: Vec<PhysicalIndexState>,
125 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
126 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
127 pub operational_indexes: Vec<PhysicalIndexState>,
128 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
129 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
130 pub index_statuses: Vec<CatalogIndexStatus>,
131 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
132 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
133 pub queryable_index_count: usize,
134 pub indexes_requiring_rebuild_count: usize,
135 pub queryable_graph_projection_count: usize,
136 pub graph_projections_requiring_rematerialization_count: usize,
137 pub executable_analytics_job_count: usize,
138 pub analytics_jobs_requiring_rerun_count: usize,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogAttentionSummary {
143 pub collections_requiring_attention: usize,
144 pub indexes_requiring_attention: usize,
145 pub graph_projections_requiring_attention: usize,
146 pub analytics_jobs_requiring_attention: usize,
147 pub top_collection: Option<CollectionDescriptor>,
148 pub top_index: Option<CatalogIndexStatus>,
149 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
150 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct CatalogDeclarations {
155 pub declared_indexes: Vec<PhysicalIndexState>,
156 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
157 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
158 pub operational_indexes: Vec<PhysicalIndexState>,
159 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
160 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogGraphProjectionStatus {
165 pub name: String,
166 pub source: Option<String>,
167 pub lifecycle_state: String,
168 pub declared: bool,
169 pub operational: bool,
170 pub in_sync: bool,
171 pub last_materialized_sequence: Option<u64>,
172 pub queryable: bool,
173 pub requires_rematerialization: bool,
174 pub dependent_job_count: usize,
175 pub active_dependent_job_count: usize,
176 pub stale_dependent_job_count: usize,
177 pub dependent_jobs_in_sync: bool,
178 pub rerun_required: bool,
179 pub attention_score: usize,
180 pub attention_reasons: Vec<String>,
181}
182
183#[derive(Debug, Clone)]
184pub struct CatalogIndexStatus {
185 pub name: String,
186 pub collection: Option<String>,
187 pub kind: String,
188 pub declared: bool,
189 pub operational: bool,
190 pub enabled: bool,
191 pub build_state: Option<String>,
192 pub artifact_state: crate::physical::ArtifactState,
193 pub lifecycle_state: String,
194 pub in_sync: bool,
195 pub queryable: bool,
196 pub requires_rebuild: bool,
197 pub attention_score: usize,
198 pub attention_reasons: Vec<String>,
199}
200
201#[derive(Debug, Clone)]
202pub struct CatalogAnalyticsJobStatus {
203 pub id: String,
204 pub kind: String,
205 pub projection: Option<String>,
206 pub state: String,
207 pub lifecycle_state: String,
208 pub declared: bool,
209 pub operational: bool,
210 pub in_sync: bool,
211 pub last_run_sequence: Option<u64>,
212 pub projection_declared: Option<bool>,
213 pub projection_operational: Option<bool>,
214 pub projection_lifecycle_state: Option<String>,
215 pub dependency_in_sync: Option<bool>,
216 pub executable: bool,
217 pub requires_rerun: bool,
218 pub attention_score: usize,
219 pub attention_reasons: Vec<String>,
220}
221
222#[derive(Debug, Clone, Default)]
223pub struct CatalogConsistencyReport {
224 pub declared_index_count: usize,
225 pub operational_index_count: usize,
226 pub declared_graph_projection_count: usize,
227 pub operational_graph_projection_count: usize,
228 pub declared_analytics_job_count: usize,
229 pub operational_analytics_job_count: usize,
230 pub missing_operational_indexes: Vec<String>,
231 pub undeclared_operational_indexes: Vec<String>,
232 pub missing_operational_graph_projections: Vec<String>,
233 pub undeclared_operational_graph_projections: Vec<String>,
234 pub missing_operational_analytics_jobs: Vec<String>,
235 pub undeclared_operational_analytics_jobs: Vec<String>,
236}
237
238pub fn snapshot_store(
239 name: &str,
240 store: &UnifiedStore,
241 index_catalog: Option<&IndexCatalog>,
242) -> CatalogModelSnapshot {
243 snapshot_store_with_declarations(name, store, index_catalog, None, None)
244}
245
246pub fn snapshot_store_with_declarations(
247 name: &str,
248 store: &UnifiedStore,
249 index_catalog: Option<&IndexCatalog>,
250 declarations: Option<&CatalogDeclarations>,
251 contracts: Option<&[CollectionContract]>,
252) -> CatalogModelSnapshot {
253 let index_statuses = index_statuses(declarations);
254 let graph_projection_statuses = graph_projection_statuses(declarations);
255 let analytics_job_statuses = analytics_job_statuses(declarations);
256
257 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
258 for (collection, entity) in store.query_all(|_| true) {
259 grouped.entry(collection).or_default().push(entity);
260 }
261 let queue_policies = queue_policies_from_grouped(&grouped);
262
263 let mut stats_by_collection = BTreeMap::new();
264 let mut collections = Vec::new();
265
266 for collection_name in store.list_collections() {
267 let entities = grouped.remove(&collection_name).unwrap_or_default();
268 let inferred_model = infer_model(&entities);
269 let inferred_schema_mode = infer_schema_mode(inferred_model);
270 let contract = collection_contract(&collection_name, contracts);
271 let model = contract
272 .map(|contract| contract.declared_model)
273 .unwrap_or(inferred_model);
274 let cross_refs = entities
275 .iter()
276 .map(|entity| entity.cross_refs().len())
277 .sum();
278 let entity_count = entities.len();
279 let manager_stats = store
280 .get_collection(&collection_name)
281 .map(|manager| manager.stats());
282 let segments = manager_stats
283 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
284 .unwrap_or(0);
285
286 stats_by_collection.insert(
287 collection_name.clone(),
288 CollectionStats {
289 entities: entity_count,
290 cross_refs,
291 segments,
292 },
293 );
294
295 let declared_indices = declared_indices(&collection_name, declarations);
296 let operational_indices =
297 operational_indices(&collection_name, index_catalog, declarations);
298 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
299 collection_index_consistency(&declared_indices, &operational_indices);
300 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
301 collection_index_readiness(&collection_name, &index_statuses);
302 let (
303 queryable_graph_projection_count,
304 graph_projections_requiring_rematerialization_count,
305 graph_projections_locally_in_sync,
306 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
307 let (
308 executable_analytics_job_count,
309 analytics_jobs_requiring_rerun_count,
310 analytics_jobs_locally_in_sync,
311 ) = collection_analytics_job_readiness(
312 &collection_name,
313 &graph_projection_statuses,
314 &analytics_job_statuses,
315 );
316 let resources_in_sync = indexes_in_sync
317 && indexes_locally_in_sync
318 && graph_projections_locally_in_sync
319 && analytics_jobs_locally_in_sync;
320 let attention_required = !resources_in_sync
321 || indexes_requiring_rebuild_count > 0
322 || graph_projections_requiring_rematerialization_count > 0
323 || analytics_jobs_requiring_rerun_count > 0;
324 let mut attention_reasons = Vec::new();
325 if !indexes_in_sync || !indexes_locally_in_sync {
326 attention_reasons.push("index_drift".to_string());
327 }
328 if indexes_requiring_rebuild_count > 0 {
329 attention_reasons.push("indexes_require_rebuild".to_string());
330 }
331 if !graph_projections_locally_in_sync {
332 attention_reasons.push("graph_projection_drift".to_string());
333 }
334 if graph_projections_requiring_rematerialization_count > 0 {
335 attention_reasons.push("graph_projections_require_rematerialization".to_string());
336 }
337 if !analytics_jobs_locally_in_sync {
338 attention_reasons.push("analytics_job_drift".to_string());
339 }
340 if analytics_jobs_requiring_rerun_count > 0 {
341 attention_reasons.push("analytics_jobs_require_rerun".to_string());
342 }
343 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
344 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
345 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
346 + usize::from(!resources_in_sync);
347
348 collections.push(CollectionDescriptor {
349 name: collection_name.clone(),
350 model,
351 schema_mode: contract
352 .map(|contract| contract.schema_mode)
353 .unwrap_or(inferred_schema_mode),
354 contract_present: contract.is_some(),
355 contract_origin: contract.map(|contract| contract.origin),
356 declared_model: contract.map(|contract| contract.declared_model),
357 observed_model: inferred_model,
358 queue_mode: if model == CollectionModel::Queue {
359 Some(
360 queue_policies
361 .get(&collection_name)
362 .map(|p| p.mode)
363 .unwrap_or_default(),
364 )
365 } else {
366 None
367 },
368 queue_max_attempts: if model == CollectionModel::Queue {
369 Some(
370 queue_policies
371 .get(&collection_name)
372 .map(|p| p.max_attempts)
373 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
374 )
375 } else {
376 None
377 },
378 queue_lock_deadline_ms: if model == CollectionModel::Queue {
379 Some(
380 queue_policies
381 .get(&collection_name)
382 .map(|p| p.lock_deadline_ms)
383 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
384 )
385 } else {
386 None
387 },
388 queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
389 Some(
390 queue_policies
391 .get(&collection_name)
392 .map(|p| p.in_flight_cap_per_group)
393 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
394 )
395 } else {
396 None
397 },
398 queue_dlq_target: if model == CollectionModel::Queue {
399 queue_policies
400 .get(&collection_name)
401 .and_then(|p| p.dlq_target.clone())
402 } else {
403 None
404 },
405 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
406 vector_metric: contract.and_then(|contract| contract.vector_metric),
407 session_key: contract.and_then(|contract| contract.session_key.clone()),
408 session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
409 retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
410 declared_schema_mode: contract.map(|contract| contract.schema_mode),
411 observed_schema_mode: inferred_schema_mode,
412 entities: entity_count,
413 cross_refs,
414 segments,
415 indices: infer_indices(&collection_name, model, index_catalog, declarations),
416 declared_indices,
417 operational_indices,
418 indexes_in_sync,
419 missing_operational_indices,
420 undeclared_operational_indices,
421 queryable_index_count,
422 indexes_requiring_rebuild_count,
423 queryable_graph_projection_count,
424 graph_projections_requiring_rematerialization_count,
425 executable_analytics_job_count,
426 analytics_jobs_requiring_rerun_count,
427 subscriptions: contract
428 .map(|contract| contract.subscriptions.clone())
429 .unwrap_or_default(),
430 analytics_config: contract
431 .map(|contract| contract.analytics_config.clone())
432 .unwrap_or_default(),
433 resources_in_sync,
434 attention_required,
435 attention_score,
436 attention_reasons,
437 });
438 }
439
440 collections.sort_by(|left, right| left.name.cmp(&right.name));
441
442 let summary = CatalogSnapshot {
443 name: name.to_string(),
444 total_entities: stats_by_collection
445 .values()
446 .map(|stats| stats.entities)
447 .sum(),
448 total_collections: stats_by_collection.len(),
449 stats_by_collection,
450 updated_at: SystemTime::now(),
451 };
452
453 CatalogModelSnapshot {
454 summary,
455 collections,
456 indices: index_catalog
457 .map(IndexCatalog::snapshot)
458 .unwrap_or_default(),
459 declared_indexes: declarations
460 .map(|declarations| declarations.declared_indexes.clone())
461 .unwrap_or_default(),
462 declared_graph_projections: declarations
463 .map(|declarations| declarations.declared_graph_projections.clone())
464 .unwrap_or_default(),
465 declared_analytics_jobs: declarations
466 .map(|declarations| declarations.declared_analytics_jobs.clone())
467 .unwrap_or_default(),
468 operational_indexes: declarations
469 .map(|declarations| declarations.operational_indexes.clone())
470 .unwrap_or_default(),
471 operational_graph_projections: declarations
472 .map(|declarations| declarations.operational_graph_projections.clone())
473 .unwrap_or_default(),
474 operational_analytics_jobs: declarations
475 .map(|declarations| declarations.operational_analytics_jobs.clone())
476 .unwrap_or_default(),
477 queryable_index_count: index_statuses
478 .iter()
479 .filter(|status| status.queryable)
480 .count(),
481 indexes_requiring_rebuild_count: index_statuses
482 .iter()
483 .filter(|status| status.requires_rebuild)
484 .count(),
485 queryable_graph_projection_count: graph_projection_statuses
486 .iter()
487 .filter(|status| status.queryable)
488 .count(),
489 graph_projections_requiring_rematerialization_count: graph_projection_statuses
490 .iter()
491 .filter(|status| status.requires_rematerialization)
492 .count(),
493 executable_analytics_job_count: analytics_job_statuses
494 .iter()
495 .filter(|status| status.executable)
496 .count(),
497 analytics_jobs_requiring_rerun_count: analytics_job_statuses
498 .iter()
499 .filter(|status| status.requires_rerun)
500 .count(),
501 index_statuses,
502 graph_projection_statuses,
503 analytics_job_statuses,
504 }
505}
506
507#[derive(Debug, Clone)]
511pub struct QueuePolicySnapshot {
512 pub mode: QueueMode,
513 pub max_attempts: u32,
514 pub lock_deadline_ms: u64,
515 pub in_flight_cap_per_group: u32,
516 pub dlq_target: Option<String>,
517}
518
519fn queue_policies_from_grouped(
520 grouped: &HashMap<String, Vec<UnifiedEntity>>,
521) -> HashMap<String, QueuePolicySnapshot> {
522 let Some(meta) = grouped.get("red_queue_meta") else {
523 return HashMap::new();
524 };
525
526 let mut policies = HashMap::new();
527 for entity in meta {
528 let Some(row) = entity.data.as_row() else {
529 continue;
530 };
531 if row_text(row, "kind").as_deref() != Some("queue_config") {
532 continue;
533 }
534 let Some(queue) = row_text(row, "queue") else {
535 continue;
536 };
537 let mode = row_text(row, "mode")
538 .as_deref()
539 .and_then(QueueMode::parse)
540 .unwrap_or_default();
541 let max_attempts = row_u64(row, "max_attempts")
542 .map(|v| v as u32)
543 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
544 let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
545 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
546 let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
547 .map(|v| v as u32)
548 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
549 let dlq_target = row_text(row, "dlq");
550 policies.insert(
551 queue,
552 QueuePolicySnapshot {
553 mode,
554 max_attempts,
555 lock_deadline_ms,
556 in_flight_cap_per_group,
557 dlq_target,
558 },
559 );
560 }
561 policies
562}
563
564fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
565 match row.get_field(key) {
566 Some(Value::UnsignedInteger(v)) => Some(*v),
567 Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
568 _ => None,
569 }
570}
571
572fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
573 match row.get_field(key) {
574 Some(Value::Text(value)) => Some(value.to_string()),
575 _ => None,
576 }
577}
578
579fn collection_contract<'a>(
580 collection_name: &str,
581 contracts: Option<&'a [CollectionContract]>,
582) -> Option<&'a CollectionContract> {
583 contracts.and_then(|contracts| {
584 contracts
585 .iter()
586 .find(|contract| contract.name == collection_name)
587 })
588}
589
590fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
591 let mut has_table = false;
592 let mut has_graph = false;
593 let mut has_vector = false;
594
595 for entity in entities {
596 match &entity.kind {
597 EntityKind::TableRow { .. } => has_table = true,
598 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
599 EntityKind::Vector { .. } => has_vector = true,
600 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
601 }
602 }
603
604 match (has_table, has_graph, has_vector) {
605 (true, false, false) => CollectionModel::Table,
606 (false, true, false) => CollectionModel::Graph,
607 (false, false, true) => CollectionModel::Vector,
608 _ => CollectionModel::Mixed,
609 }
610}
611
612fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
613 match model {
614 CollectionModel::Table => SchemaMode::Strict,
615 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
616 CollectionModel::Document
617 | CollectionModel::Hll
618 | CollectionModel::Sketch
619 | CollectionModel::Filter
620 | CollectionModel::Kv
621 | CollectionModel::Config
622 | CollectionModel::Vault
623 | CollectionModel::Mixed => SchemaMode::Dynamic,
624 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
625 CollectionModel::Queue => SchemaMode::Dynamic,
626 }
627}
628
629fn infer_indices(
630 collection_name: &str,
631 model: CollectionModel,
632 index_catalog: Option<&IndexCatalog>,
633 declarations: Option<&CatalogDeclarations>,
634) -> Vec<String> {
635 let declared = declared_indices(collection_name, declarations);
636 if !declared.is_empty() {
637 return declared;
638 }
639
640 let available = index_catalog
641 .map(IndexCatalog::snapshot)
642 .unwrap_or_default();
643 let mut selected = Vec::new();
644
645 for metric in available {
646 let relevant = matches!(
647 (model, metric.kind),
648 (_, IndexKind::BTree)
649 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
650 | (CollectionModel::Vector, IndexKind::VectorHnsw)
651 | (CollectionModel::Vector, IndexKind::VectorInverted)
652 | (CollectionModel::Vector, IndexKind::VectorTurbo)
653 | (CollectionModel::Document, IndexKind::FullText)
654 | (CollectionModel::Document, IndexKind::DocumentPathValue)
655 | (CollectionModel::Kv, IndexKind::Hash)
656 | (CollectionModel::Config, IndexKind::Hash)
657 | (CollectionModel::Vault, IndexKind::Hash)
658 | (CollectionModel::Mixed, _)
659 );
660
661 if relevant && metric.enabled {
662 selected.push(metric.name);
663 }
664 }
665
666 selected
667}
668
669fn declared_indices(
670 collection_name: &str,
671 declarations: Option<&CatalogDeclarations>,
672) -> Vec<String> {
673 let mut selected = declarations
674 .map(|declarations| {
675 declarations
676 .declared_indexes
677 .iter()
678 .filter(|index| index.collection.as_deref() == Some(collection_name))
679 .map(|index| index.name.clone())
680 .collect::<Vec<_>>()
681 })
682 .unwrap_or_default();
683 selected.sort();
684 selected.dedup();
685 selected
686}
687
688fn operational_indices(
689 collection_name: &str,
690 index_catalog: Option<&IndexCatalog>,
691 declarations: Option<&CatalogDeclarations>,
692) -> Vec<String> {
693 if let Some(declarations) = declarations {
694 let mut selected = declarations
695 .operational_indexes
696 .iter()
697 .filter(|index| index.collection.as_deref() == Some(collection_name))
698 .map(|index| index.name.clone())
699 .collect::<Vec<_>>();
700 selected.sort();
701 selected.dedup();
702 return selected;
703 }
704
705 let mut selected = index_catalog
706 .map(IndexCatalog::snapshot)
707 .unwrap_or_default()
708 .into_iter()
709 .filter(|metric| metric.enabled)
710 .filter_map(|metric| {
711 if metric.name.starts_with(collection_name) {
712 Some(metric.name)
713 } else {
714 None
715 }
716 })
717 .collect::<Vec<_>>();
718 selected.sort();
719 selected.dedup();
720 selected
721}
722
723fn collection_index_consistency(
724 declared_indices: &[String],
725 operational_indices: &[String],
726) -> (bool, Vec<String>, Vec<String>) {
727 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
728 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
729 let missing_operational_indices = declared
730 .difference(&operational)
731 .cloned()
732 .collect::<Vec<_>>();
733 let undeclared_operational_indices = operational
734 .difference(&declared)
735 .cloned()
736 .collect::<Vec<_>>();
737 (
738 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
739 missing_operational_indices,
740 undeclared_operational_indices,
741 )
742}
743
744fn collection_index_readiness(
745 collection_name: &str,
746 statuses: &[CatalogIndexStatus],
747) -> (usize, usize, bool) {
748 let local = statuses
749 .iter()
750 .filter(|status| status.collection.as_deref() == Some(collection_name))
751 .collect::<Vec<_>>();
752 let queryable_count = local.iter().filter(|status| status.queryable).count();
753 let requires_rebuild_count = local
754 .iter()
755 .filter(|status| status.requires_rebuild)
756 .count();
757 let locally_in_sync = local.iter().all(|status| status.in_sync);
758 (queryable_count, requires_rebuild_count, locally_in_sync)
759}
760
761fn collection_graph_projection_readiness(
762 collection_name: &str,
763 statuses: &[CatalogGraphProjectionStatus],
764) -> (usize, usize, bool) {
765 let local = statuses
766 .iter()
767 .filter(|status| status.source.as_deref() == Some(collection_name))
768 .collect::<Vec<_>>();
769 let queryable_count = local.iter().filter(|status| status.queryable).count();
770 let requires_rematerialization_count = local
771 .iter()
772 .filter(|status| status.requires_rematerialization)
773 .count();
774 let locally_in_sync = local
775 .iter()
776 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
777 (
778 queryable_count,
779 requires_rematerialization_count,
780 locally_in_sync,
781 )
782}
783
784fn collection_analytics_job_readiness(
785 collection_name: &str,
786 projection_statuses: &[CatalogGraphProjectionStatus],
787 job_statuses: &[CatalogAnalyticsJobStatus],
788) -> (usize, usize, bool) {
789 let local = job_statuses
790 .iter()
791 .filter(|status| {
792 let Some(projection_name) = status.projection.as_deref() else {
793 return false;
794 };
795 projection_statuses
796 .iter()
797 .find(|projection| projection.name == projection_name)
798 .and_then(|projection| projection.source.as_deref())
799 == Some(collection_name)
800 })
801 .collect::<Vec<_>>();
802 let executable_count = local.iter().filter(|status| status.executable).count();
803 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
804 let locally_in_sync = local
805 .iter()
806 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
807 (executable_count, requires_rerun_count, locally_in_sync)
808}
809
810fn graph_projection_statuses(
811 declarations: Option<&CatalogDeclarations>,
812) -> Vec<CatalogGraphProjectionStatus> {
813 let declared = declarations
814 .map(|declarations| declarations.declared_graph_projections.as_slice())
815 .unwrap_or(&[]);
816 let operational = declarations
817 .map(|declarations| declarations.operational_graph_projections.as_slice())
818 .unwrap_or(&[]);
819 let declared_jobs = declarations
820 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
821 .unwrap_or(&[]);
822 let operational_jobs = declarations
823 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
824 .unwrap_or(&[]);
825
826 let mut names = declared
827 .iter()
828 .map(|projection| projection.name.clone())
829 .chain(operational.iter().map(|projection| projection.name.clone()))
830 .collect::<Vec<_>>();
831 names.sort();
832 names.dedup();
833
834 names
835 .into_iter()
836 .map(|name| {
837 let declared_projection = declared.iter().find(|projection| projection.name == name);
838 let operational_projection = operational
839 .iter()
840 .find(|projection| projection.name == name);
841 let declared_present = declared_projection.is_some();
842 let operational_present = operational_projection.is_some();
843 let mut dependent_job_ids = BTreeSet::new();
844 let mut dependent_job_count = 0usize;
845 let mut active_dependent_job_count = 0usize;
846 let mut stale_dependent_job_count = 0usize;
847 for job in declared_jobs
848 .iter()
849 .chain(operational_jobs.iter())
850 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
851 {
852 if !dependent_job_ids.insert(job.id.clone()) {
853 continue;
854 }
855 dependent_job_count += 1;
856 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
857 active_dependent_job_count += 1;
858 }
859 if job.state == "stale" {
860 stale_dependent_job_count += 1;
861 }
862 }
863 let lifecycle_state = match (
864 declared_present,
865 operational_present,
866 declared_projection
867 .map(|projection| projection.state.as_str())
868 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
869 .unwrap_or_default(),
870 ) {
871 (true, _, "failed") => "failed",
872 (true, true, "stale") => "stale",
873 (true, _, "materializing") => "materializing",
874 (true, true, "materialized") => "materialized",
875 (true, true, _) => "materialized",
876 (false, true, _) => "orphaned-operational",
877 (true, false, _) => "declared",
878 (false, false, _) => "unknown",
879 }
880 .to_string();
881 let queryable = declared_present
882 && operational_present
883 && matches!(
884 declared_projection
885 .map(|projection| projection.state.as_str())
886 .or_else(
887 || operational_projection.map(|projection| projection.state.as_str())
888 )
889 .unwrap_or_default(),
890 "materialized"
891 );
892 let requires_rematerialization = matches!(
893 declared_projection
894 .map(|projection| projection.state.as_str())
895 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
896 .unwrap_or_default(),
897 "declared" | "materializing" | "failed" | "stale"
898 );
899 let mut attention_reasons = Vec::new();
900 if !declared_present || !operational_present {
901 attention_reasons.push("declaration_drift".to_string());
902 }
903 if requires_rematerialization {
904 attention_reasons.push("requires_rematerialization".to_string());
905 }
906 if stale_dependent_job_count > 0 {
907 attention_reasons.push("dependent_jobs_stale".to_string());
908 }
909 let attention_score = stale_dependent_job_count.saturating_mul(2)
910 + usize::from(requires_rematerialization).saturating_mul(4)
911 + usize::from(!declared_present || !operational_present)
912 + usize::from(!queryable);
913 CatalogGraphProjectionStatus {
914 name,
915 source: declared_projection
916 .map(|projection| projection.source.clone())
917 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
918 lifecycle_state,
919 declared: declared_present,
920 operational: operational_present,
921 in_sync: declared_present == operational_present,
922 last_materialized_sequence: declared_projection
923 .and_then(|projection| projection.last_materialized_sequence)
924 .or_else(|| {
925 operational_projection
926 .and_then(|projection| projection.last_materialized_sequence)
927 }),
928 queryable,
929 requires_rematerialization,
930 dependent_job_count,
931 active_dependent_job_count,
932 stale_dependent_job_count,
933 dependent_jobs_in_sync: stale_dependent_job_count == 0,
934 rerun_required: stale_dependent_job_count > 0,
935 attention_score,
936 attention_reasons,
937 }
938 })
939 .collect()
940}
941
942fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
943 let declared = declarations
944 .map(|declarations| declarations.declared_indexes.as_slice())
945 .unwrap_or(&[]);
946 let operational = declarations
947 .map(|declarations| declarations.operational_indexes.as_slice())
948 .unwrap_or(&[]);
949
950 let mut names = declared
951 .iter()
952 .map(|index| index.name.clone())
953 .chain(operational.iter().map(|index| index.name.clone()))
954 .collect::<Vec<_>>();
955 names.sort();
956 names.dedup();
957
958 names
959 .into_iter()
960 .map(|name| {
961 let declared_index = declared.iter().find(|index| index.name == name);
962 let operational_index = operational.iter().find(|index| index.name == name);
963 let collection = declared_index
964 .and_then(|index| index.collection.clone())
965 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
966 let kind = declared_index
967 .map(|index| index_kind_string(index.kind))
968 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
969 .unwrap_or_default();
970 let enabled = declared_index
971 .map(|index| index.enabled)
972 .or_else(|| operational_index.map(|index| index.enabled))
973 .unwrap_or(false);
974 let build_state = operational_index
975 .map(|index| index.build_state.clone())
976 .or_else(|| declared_index.map(|index| index.build_state.clone()));
977 let declared_present = declared_index.is_some();
978 let operational_present = operational_index.is_some();
979 let lifecycle_state = index_lifecycle_state(
980 declared_present,
981 operational_present,
982 enabled,
983 build_state.as_deref(),
984 );
985
986 let mut attention_reasons = Vec::new();
987 if declared_present != operational_present {
988 attention_reasons.push("declaration_drift".to_string());
989 }
990 if !enabled && declared_present {
991 attention_reasons.push("disabled".to_string());
992 }
993 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
994 attention_reasons.push("failed".to_string());
995 }
996 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
997 attention_reasons.push("stale".to_string());
998 }
999 if matches!(
1000 build_state.as_deref().unwrap_or_default(),
1001 "building" | "declared-unbuilt"
1002 ) {
1003 attention_reasons.push("requires_rebuild".to_string());
1004 }
1005 let queryable = declared_present
1006 && operational_present
1007 && enabled
1008 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1009 let requires_rebuild = matches!(
1010 build_state.as_deref().unwrap_or_default(),
1011 "declared-unbuilt" | "building" | "stale" | "failed"
1012 );
1013 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1014 + usize::from(!enabled && declared_present)
1015 + usize::from(declared_present != operational_present)
1016 + usize::from(!queryable);
1017
1018 let artifact_state = crate::physical::ArtifactState::from_build_state(
1019 build_state.as_deref().unwrap_or("declared-unbuilt"),
1020 enabled,
1021 );
1022 CatalogIndexStatus {
1023 name,
1024 collection,
1025 kind,
1026 declared: declared_present,
1027 operational: operational_present,
1028 enabled,
1029 build_state,
1030 artifact_state,
1031 lifecycle_state,
1032 in_sync: declared_present == operational_present,
1033 queryable,
1034 requires_rebuild,
1035 attention_score,
1036 attention_reasons,
1037 }
1038 })
1039 .collect()
1040}
1041
1042fn index_lifecycle_state(
1043 declared: bool,
1044 operational: bool,
1045 enabled: bool,
1046 build_state: Option<&str>,
1047) -> String {
1048 if !declared && operational {
1049 return "orphaned-operational".to_string();
1050 }
1051 if declared && !enabled {
1052 return "disabled".to_string();
1053 }
1054 if !declared {
1055 return "unknown".to_string();
1056 }
1057 if !operational {
1058 return "declared".to_string();
1059 }
1060
1061 match build_state.unwrap_or_default() {
1062 "ready" => "ready".to_string(),
1063 "failed" => "failed".to_string(),
1064 "stale" => "stale".to_string(),
1065 "declared-unbuilt" => "declared".to_string(),
1066 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1067 "building".to_string()
1068 }
1069 _ => "building".to_string(),
1070 }
1071}
1072
1073fn index_kind_string(kind: IndexKind) -> String {
1074 match kind {
1075 IndexKind::BTree => "btree",
1076 IndexKind::Hash => "hash",
1077 IndexKind::Bitmap => "bitmap",
1078 IndexKind::Spatial => "spatial.rtree",
1079 IndexKind::VectorHnsw => "vector.hnsw",
1080 IndexKind::VectorInverted => "vector.inverted",
1081 IndexKind::VectorTurbo => "vector.turbo",
1082 IndexKind::GraphAdjacency => "graph.adjacency",
1083 IndexKind::FullText => "text.fulltext",
1084 IndexKind::DocumentPathValue => "document.pathvalue",
1085 IndexKind::HybridSearch => "search.hybrid",
1086 }
1087 .to_string()
1088}
1089
1090fn analytics_job_statuses(
1091 declarations: Option<&CatalogDeclarations>,
1092) -> Vec<CatalogAnalyticsJobStatus> {
1093 let declared = declarations
1094 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1095 .unwrap_or(&[]);
1096 let operational = declarations
1097 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1098 .unwrap_or(&[]);
1099 let declared_projections = declarations
1100 .map(|declarations| declarations.declared_graph_projections.as_slice())
1101 .unwrap_or(&[]);
1102 let operational_projections = declarations
1103 .map(|declarations| declarations.operational_graph_projections.as_slice())
1104 .unwrap_or(&[]);
1105
1106 let mut ids = declared
1107 .iter()
1108 .map(|job| job.id.clone())
1109 .chain(operational.iter().map(|job| job.id.clone()))
1110 .collect::<Vec<_>>();
1111 ids.sort();
1112 ids.dedup();
1113
1114 ids.into_iter()
1115 .map(|id| {
1116 let declared_job = declared.iter().find(|job| job.id == id);
1117 let operational_job = operational.iter().find(|job| job.id == id);
1118 let kind = declared_job
1119 .map(|job| job.kind.clone())
1120 .or_else(|| operational_job.map(|job| job.kind.clone()))
1121 .unwrap_or_default();
1122 let projection = declared_job
1123 .and_then(|job| job.projection.clone())
1124 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1125 let state = declared_job
1126 .map(|job| job.state.clone())
1127 .or_else(|| operational_job.map(|job| job.state.clone()))
1128 .unwrap_or_default();
1129 let declared_present = declared_job.is_some();
1130 let operational_present = operational_job.is_some();
1131 let last_run_sequence = declared_job
1132 .and_then(|job| job.last_run_sequence)
1133 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1134 let projection_declared = projection.as_ref().map(|projection_name| {
1135 declared_projections
1136 .iter()
1137 .any(|projection| projection.name == *projection_name)
1138 });
1139 let projection_operational = projection.as_ref().map(|projection_name| {
1140 operational_projections
1141 .iter()
1142 .any(|projection| projection.name == *projection_name)
1143 });
1144 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1145 declared_projections
1146 .iter()
1147 .find(|projection| projection.name == *projection_name)
1148 .map(|projection| projection.state.as_str())
1149 .or_else(|| {
1150 operational_projections
1151 .iter()
1152 .find(|projection| projection.name == *projection_name)
1153 .map(|projection| projection.state.as_str())
1154 })
1155 });
1156 let dependency_in_sync = projection.as_ref().map(|_| {
1157 matches!(projection_lifecycle, Some("materialized"))
1158 && projection_operational == Some(true)
1159 });
1160 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1161 (true, false, _) => "declared",
1162 (true, true, _)
1163 if matches!(
1164 projection_lifecycle,
1165 Some("stale" | "failed" | "materializing" | "declared")
1166 ) =>
1167 {
1168 "stale"
1169 }
1170 (true, true, "completed") => "completed",
1171 (true, true, "running") => "running",
1172 (true, true, "failed") => "failed",
1173 (true, true, "queued") => "queued",
1174 (true, true, "stale") => "stale",
1175 (true, true, _) => "materialized",
1176 (false, true, _) => "orphaned-operational",
1177 (false, false, _) => "unknown",
1178 }
1179 .to_string();
1180 let executable = declared_present
1181 && operational_present
1182 && !matches!(state.as_str(), "failed" | "stale")
1183 && dependency_in_sync.unwrap_or(true);
1184 let requires_rerun =
1185 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1186 let mut attention_reasons = Vec::new();
1187 if declared_present != operational_present {
1188 attention_reasons.push("declaration_drift".to_string());
1189 }
1190 if matches!(state.as_str(), "failed") {
1191 attention_reasons.push("failed".to_string());
1192 }
1193 if matches!(state.as_str(), "stale") {
1194 attention_reasons.push("stale".to_string());
1195 }
1196 if dependency_in_sync == Some(false) {
1197 attention_reasons.push("dependency_drift".to_string());
1198 }
1199 if requires_rerun {
1200 attention_reasons.push("requires_rerun".to_string());
1201 }
1202 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1203 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1204 + usize::from(declared_present != operational_present)
1205 + usize::from(!executable);
1206 CatalogAnalyticsJobStatus {
1207 id,
1208 kind,
1209 projection,
1210 state: state.clone(),
1211 lifecycle_state,
1212 declared: declared_present,
1213 operational: operational_present,
1214 in_sync: declared_present == operational_present,
1215 last_run_sequence,
1216 projection_declared,
1217 projection_operational,
1218 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1219 dependency_in_sync,
1220 executable,
1221 requires_rerun,
1222 attention_score,
1223 attention_reasons,
1224 }
1225 })
1226 .collect()
1227}
1228
1229pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1230 let declared_indexes = snapshot
1231 .declared_indexes
1232 .iter()
1233 .map(|index| index.name.clone())
1234 .collect::<BTreeSet<_>>();
1235 let operational_indexes = snapshot
1236 .operational_indexes
1237 .iter()
1238 .map(|index| index.name.clone())
1239 .collect::<BTreeSet<_>>();
1240 let declared_graph_projections = snapshot
1241 .declared_graph_projections
1242 .iter()
1243 .map(|projection| projection.name.clone())
1244 .collect::<BTreeSet<_>>();
1245 let operational_graph_projections = snapshot
1246 .operational_graph_projections
1247 .iter()
1248 .map(|projection| projection.name.clone())
1249 .collect::<BTreeSet<_>>();
1250 let declared_analytics_jobs = snapshot
1251 .declared_analytics_jobs
1252 .iter()
1253 .map(|job| job.id.clone())
1254 .collect::<BTreeSet<_>>();
1255 let operational_analytics_jobs = snapshot
1256 .operational_analytics_jobs
1257 .iter()
1258 .map(|job| job.id.clone())
1259 .collect::<BTreeSet<_>>();
1260
1261 CatalogConsistencyReport {
1262 declared_index_count: declared_indexes.len(),
1263 operational_index_count: operational_indexes.len(),
1264 declared_graph_projection_count: declared_graph_projections.len(),
1265 operational_graph_projection_count: operational_graph_projections.len(),
1266 declared_analytics_job_count: declared_analytics_jobs.len(),
1267 operational_analytics_job_count: operational_analytics_jobs.len(),
1268 missing_operational_indexes: declared_indexes
1269 .difference(&operational_indexes)
1270 .cloned()
1271 .collect(),
1272 undeclared_operational_indexes: operational_indexes
1273 .difference(&declared_indexes)
1274 .cloned()
1275 .collect(),
1276 missing_operational_graph_projections: declared_graph_projections
1277 .difference(&operational_graph_projections)
1278 .cloned()
1279 .collect(),
1280 undeclared_operational_graph_projections: operational_graph_projections
1281 .difference(&declared_graph_projections)
1282 .cloned()
1283 .collect(),
1284 missing_operational_analytics_jobs: declared_analytics_jobs
1285 .difference(&operational_analytics_jobs)
1286 .cloned()
1287 .collect(),
1288 undeclared_operational_analytics_jobs: operational_analytics_jobs
1289 .difference(&declared_analytics_jobs)
1290 .cloned()
1291 .collect(),
1292 }
1293}
1294
1295pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1296 CatalogAttentionSummary {
1297 collections_requiring_attention: snapshot
1298 .collections
1299 .iter()
1300 .filter(|collection| collection.attention_required)
1301 .count(),
1302 indexes_requiring_attention: snapshot
1303 .index_statuses
1304 .iter()
1305 .filter(|status| status.attention_score > 0)
1306 .count(),
1307 graph_projections_requiring_attention: snapshot
1308 .graph_projection_statuses
1309 .iter()
1310 .filter(|status| status.attention_score > 0)
1311 .count(),
1312 analytics_jobs_requiring_attention: snapshot
1313 .analytics_job_statuses
1314 .iter()
1315 .filter(|status| status.attention_score > 0)
1316 .count(),
1317 top_collection: snapshot
1318 .collections
1319 .iter()
1320 .filter(|collection| collection.attention_score > 0)
1321 .max_by_key(|collection| collection.attention_score)
1322 .cloned(),
1323 top_index: snapshot
1324 .index_statuses
1325 .iter()
1326 .filter(|status| status.attention_score > 0)
1327 .max_by_key(|status| status.attention_score)
1328 .cloned(),
1329 top_graph_projection: snapshot
1330 .graph_projection_statuses
1331 .iter()
1332 .filter(|status| status.attention_score > 0)
1333 .max_by_key(|status| status.attention_score)
1334 .cloned(),
1335 top_analytics_job: snapshot
1336 .analytics_job_statuses
1337 .iter()
1338 .filter(|status| status.attention_score > 0)
1339 .max_by_key(|status| status.attention_score)
1340 .cloned(),
1341 }
1342}
1343
1344pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1345 let mut collections = snapshot
1346 .collections
1347 .iter()
1348 .filter(|collection| collection.attention_required)
1349 .cloned()
1350 .collect::<Vec<_>>();
1351 collections.sort_by(|left, right| {
1352 right
1353 .attention_score
1354 .cmp(&left.attention_score)
1355 .then_with(|| left.name.cmp(&right.name))
1356 });
1357 collections
1358}
1359
1360pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1361 let mut statuses = snapshot
1362 .index_statuses
1363 .iter()
1364 .filter(|status| status.attention_score > 0)
1365 .cloned()
1366 .collect::<Vec<_>>();
1367 statuses.sort_by(|left, right| {
1368 right
1369 .attention_score
1370 .cmp(&left.attention_score)
1371 .then_with(|| left.name.cmp(&right.name))
1372 });
1373 statuses
1374}
1375
1376pub fn graph_projection_attention(
1377 snapshot: &CatalogModelSnapshot,
1378) -> Vec<CatalogGraphProjectionStatus> {
1379 let mut statuses = snapshot
1380 .graph_projection_statuses
1381 .iter()
1382 .filter(|status| status.attention_score > 0)
1383 .cloned()
1384 .collect::<Vec<_>>();
1385 statuses.sort_by(|left, right| {
1386 right
1387 .attention_score
1388 .cmp(&left.attention_score)
1389 .then_with(|| left.name.cmp(&right.name))
1390 });
1391 statuses
1392}
1393
1394pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1395 let mut statuses = snapshot
1396 .analytics_job_statuses
1397 .iter()
1398 .filter(|status| status.attention_score > 0)
1399 .cloned()
1400 .collect::<Vec<_>>();
1401 statuses.sort_by(|left, right| {
1402 right
1403 .attention_score
1404 .cmp(&left.attention_score)
1405 .then_with(|| left.id.cmp(&right.id))
1406 });
1407 statuses
1408}