1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Hll,
23 Sketch,
24 Filter,
25 Kv,
26 Config,
27 Vault,
28 Mixed,
29 TimeSeries,
30 Queue,
31 Metrics,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct AnalyticalStorageConfig {
49 pub columnar: bool,
52 pub time_key: String,
54 pub order_by_key: Option<String>,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum SchemaMode {
61 Strict,
62 SemiStructured,
63 Dynamic,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum SubscriptionOperation {
68 Insert,
69 Update,
70 Delete,
71}
72
73impl SubscriptionOperation {
74 pub fn as_str(self) -> &'static str {
75 match self {
76 Self::Insert => "INSERT",
77 Self::Update => "UPDATE",
78 Self::Delete => "DELETE",
79 }
80 }
81
82 pub fn from_str(value: &str) -> Option<Self> {
83 match value.to_ascii_uppercase().as_str() {
84 "INSERT" => Some(Self::Insert),
85 "UPDATE" => Some(Self::Update),
86 "DELETE" => Some(Self::Delete),
87 _ => None,
88 }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct SubscriptionDescriptor {
94 pub name: String,
96 pub source: String,
97 pub target_queue: String,
98 pub ops_filter: Vec<SubscriptionOperation>,
99 pub where_filter: Option<String>,
100 pub redact_fields: Vec<String>,
101 pub enabled: bool,
102 pub all_tenants: bool,
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
117pub enum AnalyticsOutput {
118 Communities,
119 Components,
120 Centrality,
121}
122
123impl AnalyticsOutput {
124 pub fn as_str(self) -> &'static str {
125 match self {
126 Self::Communities => "communities",
127 Self::Components => "components",
128 Self::Centrality => "centrality",
129 }
130 }
131
132 pub fn from_str(value: &str) -> Option<Self> {
133 match value.to_ascii_lowercase().as_str() {
134 "communities" => Some(Self::Communities),
135 "components" => Some(Self::Components),
136 "centrality" => Some(Self::Centrality),
137 _ => None,
138 }
139 }
140}
141
142#[derive(Debug, Clone, PartialEq)]
154pub struct AnalyticsViewDescriptor {
155 pub output: AnalyticsOutput,
156 pub algorithm: Option<String>,
160 pub resolution: Option<f64>,
162 pub max_iterations: Option<i64>,
164 pub tolerance: Option<f64>,
166}
167
168#[derive(Debug, Clone)]
169pub struct CollectionDescriptor {
170 pub name: String,
171 pub model: CollectionModel,
172 pub schema_mode: SchemaMode,
173 pub contract_present: bool,
174 pub contract_origin: Option<crate::physical::ContractOrigin>,
175 pub declared_model: Option<CollectionModel>,
176 pub observed_model: CollectionModel,
177 pub queue_mode: Option<QueueMode>,
178 pub queue_max_attempts: Option<u32>,
182 pub queue_lock_deadline_ms: Option<u64>,
184 pub queue_in_flight_cap_per_group: Option<u32>,
186 pub queue_dlq_target: Option<String>,
188 pub vector_dimension: Option<usize>,
189 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
190 pub session_key: Option<String>,
194 pub session_gap_ms: Option<u64>,
197 pub retention_duration_ms: Option<u64>,
201 pub declared_schema_mode: Option<SchemaMode>,
202 pub observed_schema_mode: SchemaMode,
203 pub entities: usize,
204 pub cross_refs: usize,
205 pub segments: usize,
206 pub indices: Vec<String>,
207 pub declared_indices: Vec<String>,
208 pub operational_indices: Vec<String>,
209 pub indexes_in_sync: bool,
210 pub missing_operational_indices: Vec<String>,
211 pub undeclared_operational_indices: Vec<String>,
212 pub queryable_index_count: usize,
213 pub indexes_requiring_rebuild_count: usize,
214 pub queryable_graph_projection_count: usize,
215 pub graph_projections_requiring_rematerialization_count: usize,
216 pub executable_analytics_job_count: usize,
217 pub analytics_jobs_requiring_rerun_count: usize,
218 pub subscriptions: Vec<SubscriptionDescriptor>,
219 pub analytics_config: Vec<AnalyticsViewDescriptor>,
223 pub resources_in_sync: bool,
224 pub attention_required: bool,
225 pub attention_score: usize,
226 pub attention_reasons: Vec<String>,
227}
228
229#[derive(Debug, Clone)]
230pub struct CatalogModelSnapshot {
231 pub summary: CatalogSnapshot,
232 pub collections: Vec<CollectionDescriptor>,
233 pub indices: IndexCatalogSnapshot,
234 pub declared_indexes: Vec<PhysicalIndexState>,
235 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
236 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
237 pub operational_indexes: Vec<PhysicalIndexState>,
238 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
239 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
240 pub index_statuses: Vec<CatalogIndexStatus>,
241 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
242 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
243 pub queryable_index_count: usize,
244 pub indexes_requiring_rebuild_count: usize,
245 pub queryable_graph_projection_count: usize,
246 pub graph_projections_requiring_rematerialization_count: usize,
247 pub executable_analytics_job_count: usize,
248 pub analytics_jobs_requiring_rerun_count: usize,
249}
250
251#[derive(Debug, Clone)]
252pub struct CatalogAttentionSummary {
253 pub collections_requiring_attention: usize,
254 pub indexes_requiring_attention: usize,
255 pub graph_projections_requiring_attention: usize,
256 pub analytics_jobs_requiring_attention: usize,
257 pub top_collection: Option<CollectionDescriptor>,
258 pub top_index: Option<CatalogIndexStatus>,
259 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
260 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
261}
262
263#[derive(Debug, Clone, Default)]
264pub struct CatalogDeclarations {
265 pub declared_indexes: Vec<PhysicalIndexState>,
266 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
267 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
268 pub operational_indexes: Vec<PhysicalIndexState>,
269 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
270 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
271}
272
273#[derive(Debug, Clone)]
274pub struct CatalogGraphProjectionStatus {
275 pub name: String,
276 pub source: Option<String>,
277 pub lifecycle_state: String,
278 pub declared: bool,
279 pub operational: bool,
280 pub in_sync: bool,
281 pub last_materialized_sequence: Option<u64>,
282 pub queryable: bool,
283 pub requires_rematerialization: bool,
284 pub dependent_job_count: usize,
285 pub active_dependent_job_count: usize,
286 pub stale_dependent_job_count: usize,
287 pub dependent_jobs_in_sync: bool,
288 pub rerun_required: bool,
289 pub attention_score: usize,
290 pub attention_reasons: Vec<String>,
291}
292
293#[derive(Debug, Clone)]
294pub struct CatalogIndexStatus {
295 pub name: String,
296 pub collection: Option<String>,
297 pub kind: String,
298 pub declared: bool,
299 pub operational: bool,
300 pub enabled: bool,
301 pub build_state: Option<String>,
302 pub artifact_state: crate::physical::ArtifactState,
303 pub lifecycle_state: String,
304 pub in_sync: bool,
305 pub queryable: bool,
306 pub requires_rebuild: bool,
307 pub attention_score: usize,
308 pub attention_reasons: Vec<String>,
309}
310
311#[derive(Debug, Clone)]
312pub struct CatalogAnalyticsJobStatus {
313 pub id: String,
314 pub kind: String,
315 pub projection: Option<String>,
316 pub state: String,
317 pub lifecycle_state: String,
318 pub declared: bool,
319 pub operational: bool,
320 pub in_sync: bool,
321 pub last_run_sequence: Option<u64>,
322 pub projection_declared: Option<bool>,
323 pub projection_operational: Option<bool>,
324 pub projection_lifecycle_state: Option<String>,
325 pub dependency_in_sync: Option<bool>,
326 pub executable: bool,
327 pub requires_rerun: bool,
328 pub attention_score: usize,
329 pub attention_reasons: Vec<String>,
330}
331
332#[derive(Debug, Clone, Default)]
333pub struct CatalogConsistencyReport {
334 pub declared_index_count: usize,
335 pub operational_index_count: usize,
336 pub declared_graph_projection_count: usize,
337 pub operational_graph_projection_count: usize,
338 pub declared_analytics_job_count: usize,
339 pub operational_analytics_job_count: usize,
340 pub missing_operational_indexes: Vec<String>,
341 pub undeclared_operational_indexes: Vec<String>,
342 pub missing_operational_graph_projections: Vec<String>,
343 pub undeclared_operational_graph_projections: Vec<String>,
344 pub missing_operational_analytics_jobs: Vec<String>,
345 pub undeclared_operational_analytics_jobs: Vec<String>,
346}
347
348pub fn snapshot_store(
349 name: &str,
350 store: &UnifiedStore,
351 index_catalog: Option<&IndexCatalog>,
352) -> CatalogModelSnapshot {
353 snapshot_store_with_declarations(name, store, index_catalog, None, None)
354}
355
356pub fn snapshot_store_with_declarations(
357 name: &str,
358 store: &UnifiedStore,
359 index_catalog: Option<&IndexCatalog>,
360 declarations: Option<&CatalogDeclarations>,
361 contracts: Option<&[CollectionContract]>,
362) -> CatalogModelSnapshot {
363 let index_statuses = index_statuses(declarations);
364 let graph_projection_statuses = graph_projection_statuses(declarations);
365 let analytics_job_statuses = analytics_job_statuses(declarations);
366
367 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
368 for (collection, entity) in store.query_all(|_| true) {
369 grouped.entry(collection).or_default().push(entity);
370 }
371 let queue_policies = queue_policies_from_grouped(&grouped);
372
373 let mut stats_by_collection = BTreeMap::new();
374 let mut collections = Vec::new();
375
376 for collection_name in store.list_collections() {
377 let entities = grouped.remove(&collection_name).unwrap_or_default();
378 let inferred_model = infer_model(&entities);
379 let inferred_schema_mode = infer_schema_mode(inferred_model);
380 let contract = collection_contract(&collection_name, contracts);
381 let model = contract
382 .map(|contract| contract.declared_model)
383 .unwrap_or(inferred_model);
384 let cross_refs = entities
385 .iter()
386 .map(|entity| entity.cross_refs().len())
387 .sum();
388 let entity_count = entities.len();
389 let manager_stats = store
390 .get_collection(&collection_name)
391 .map(|manager| manager.stats());
392 let segments = manager_stats
393 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
394 .unwrap_or(0);
395
396 stats_by_collection.insert(
397 collection_name.clone(),
398 CollectionStats {
399 entities: entity_count,
400 cross_refs,
401 segments,
402 },
403 );
404
405 let declared_indices = declared_indices(&collection_name, declarations);
406 let operational_indices =
407 operational_indices(&collection_name, index_catalog, declarations);
408 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
409 collection_index_consistency(&declared_indices, &operational_indices);
410 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
411 collection_index_readiness(&collection_name, &index_statuses);
412 let (
413 queryable_graph_projection_count,
414 graph_projections_requiring_rematerialization_count,
415 graph_projections_locally_in_sync,
416 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
417 let (
418 executable_analytics_job_count,
419 analytics_jobs_requiring_rerun_count,
420 analytics_jobs_locally_in_sync,
421 ) = collection_analytics_job_readiness(
422 &collection_name,
423 &graph_projection_statuses,
424 &analytics_job_statuses,
425 );
426 let resources_in_sync = indexes_in_sync
427 && indexes_locally_in_sync
428 && graph_projections_locally_in_sync
429 && analytics_jobs_locally_in_sync;
430 let attention_required = !resources_in_sync
431 || indexes_requiring_rebuild_count > 0
432 || graph_projections_requiring_rematerialization_count > 0
433 || analytics_jobs_requiring_rerun_count > 0;
434 let mut attention_reasons = Vec::new();
435 if !indexes_in_sync || !indexes_locally_in_sync {
436 attention_reasons.push("index_drift".to_string());
437 }
438 if indexes_requiring_rebuild_count > 0 {
439 attention_reasons.push("indexes_require_rebuild".to_string());
440 }
441 if !graph_projections_locally_in_sync {
442 attention_reasons.push("graph_projection_drift".to_string());
443 }
444 if graph_projections_requiring_rematerialization_count > 0 {
445 attention_reasons.push("graph_projections_require_rematerialization".to_string());
446 }
447 if !analytics_jobs_locally_in_sync {
448 attention_reasons.push("analytics_job_drift".to_string());
449 }
450 if analytics_jobs_requiring_rerun_count > 0 {
451 attention_reasons.push("analytics_jobs_require_rerun".to_string());
452 }
453 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
454 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
455 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
456 + usize::from(!resources_in_sync);
457
458 collections.push(CollectionDescriptor {
459 name: collection_name.clone(),
460 model,
461 schema_mode: contract
462 .map(|contract| contract.schema_mode)
463 .unwrap_or(inferred_schema_mode),
464 contract_present: contract.is_some(),
465 contract_origin: contract.map(|contract| contract.origin),
466 declared_model: contract.map(|contract| contract.declared_model),
467 observed_model: inferred_model,
468 queue_mode: if model == CollectionModel::Queue {
469 Some(
470 queue_policies
471 .get(&collection_name)
472 .map(|p| p.mode)
473 .unwrap_or_default(),
474 )
475 } else {
476 None
477 },
478 queue_max_attempts: if model == CollectionModel::Queue {
479 Some(
480 queue_policies
481 .get(&collection_name)
482 .map(|p| p.max_attempts)
483 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
484 )
485 } else {
486 None
487 },
488 queue_lock_deadline_ms: if model == CollectionModel::Queue {
489 Some(
490 queue_policies
491 .get(&collection_name)
492 .map(|p| p.lock_deadline_ms)
493 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
494 )
495 } else {
496 None
497 },
498 queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
499 Some(
500 queue_policies
501 .get(&collection_name)
502 .map(|p| p.in_flight_cap_per_group)
503 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
504 )
505 } else {
506 None
507 },
508 queue_dlq_target: if model == CollectionModel::Queue {
509 queue_policies
510 .get(&collection_name)
511 .and_then(|p| p.dlq_target.clone())
512 } else {
513 None
514 },
515 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
516 vector_metric: contract.and_then(|contract| contract.vector_metric),
517 session_key: contract.and_then(|contract| contract.session_key.clone()),
518 session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
519 retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
520 declared_schema_mode: contract.map(|contract| contract.schema_mode),
521 observed_schema_mode: inferred_schema_mode,
522 entities: entity_count,
523 cross_refs,
524 segments,
525 indices: infer_indices(&collection_name, model, index_catalog, declarations),
526 declared_indices,
527 operational_indices,
528 indexes_in_sync,
529 missing_operational_indices,
530 undeclared_operational_indices,
531 queryable_index_count,
532 indexes_requiring_rebuild_count,
533 queryable_graph_projection_count,
534 graph_projections_requiring_rematerialization_count,
535 executable_analytics_job_count,
536 analytics_jobs_requiring_rerun_count,
537 subscriptions: contract
538 .map(|contract| contract.subscriptions.clone())
539 .unwrap_or_default(),
540 analytics_config: contract
541 .map(|contract| contract.analytics_config.clone())
542 .unwrap_or_default(),
543 resources_in_sync,
544 attention_required,
545 attention_score,
546 attention_reasons,
547 });
548 }
549
550 collections.sort_by(|left, right| left.name.cmp(&right.name));
551
552 let summary = CatalogSnapshot {
553 name: name.to_string(),
554 total_entities: stats_by_collection
555 .values()
556 .map(|stats| stats.entities)
557 .sum(),
558 total_collections: stats_by_collection.len(),
559 stats_by_collection,
560 updated_at: SystemTime::now(),
561 };
562
563 CatalogModelSnapshot {
564 summary,
565 collections,
566 indices: index_catalog
567 .map(IndexCatalog::snapshot)
568 .unwrap_or_default(),
569 declared_indexes: declarations
570 .map(|declarations| declarations.declared_indexes.clone())
571 .unwrap_or_default(),
572 declared_graph_projections: declarations
573 .map(|declarations| declarations.declared_graph_projections.clone())
574 .unwrap_or_default(),
575 declared_analytics_jobs: declarations
576 .map(|declarations| declarations.declared_analytics_jobs.clone())
577 .unwrap_or_default(),
578 operational_indexes: declarations
579 .map(|declarations| declarations.operational_indexes.clone())
580 .unwrap_or_default(),
581 operational_graph_projections: declarations
582 .map(|declarations| declarations.operational_graph_projections.clone())
583 .unwrap_or_default(),
584 operational_analytics_jobs: declarations
585 .map(|declarations| declarations.operational_analytics_jobs.clone())
586 .unwrap_or_default(),
587 queryable_index_count: index_statuses
588 .iter()
589 .filter(|status| status.queryable)
590 .count(),
591 indexes_requiring_rebuild_count: index_statuses
592 .iter()
593 .filter(|status| status.requires_rebuild)
594 .count(),
595 queryable_graph_projection_count: graph_projection_statuses
596 .iter()
597 .filter(|status| status.queryable)
598 .count(),
599 graph_projections_requiring_rematerialization_count: graph_projection_statuses
600 .iter()
601 .filter(|status| status.requires_rematerialization)
602 .count(),
603 executable_analytics_job_count: analytics_job_statuses
604 .iter()
605 .filter(|status| status.executable)
606 .count(),
607 analytics_jobs_requiring_rerun_count: analytics_job_statuses
608 .iter()
609 .filter(|status| status.requires_rerun)
610 .count(),
611 index_statuses,
612 graph_projection_statuses,
613 analytics_job_statuses,
614 }
615}
616
617#[derive(Debug, Clone)]
621pub struct QueuePolicySnapshot {
622 pub mode: QueueMode,
623 pub max_attempts: u32,
624 pub lock_deadline_ms: u64,
625 pub in_flight_cap_per_group: u32,
626 pub dlq_target: Option<String>,
627}
628
629fn queue_policies_from_grouped(
630 grouped: &HashMap<String, Vec<UnifiedEntity>>,
631) -> HashMap<String, QueuePolicySnapshot> {
632 let Some(meta) = grouped.get("red_queue_meta") else {
633 return HashMap::new();
634 };
635
636 let mut policies = HashMap::new();
637 for entity in meta {
638 let Some(row) = entity.data.as_row() else {
639 continue;
640 };
641 if row_text(row, "kind").as_deref() != Some("queue_config") {
642 continue;
643 }
644 let Some(queue) = row_text(row, "queue") else {
645 continue;
646 };
647 let mode = row_text(row, "mode")
648 .as_deref()
649 .and_then(QueueMode::parse)
650 .unwrap_or_default();
651 let max_attempts = row_u64(row, "max_attempts")
652 .map(|v| v as u32)
653 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
654 let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
655 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
656 let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
657 .map(|v| v as u32)
658 .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
659 let dlq_target = row_text(row, "dlq");
660 policies.insert(
661 queue,
662 QueuePolicySnapshot {
663 mode,
664 max_attempts,
665 lock_deadline_ms,
666 in_flight_cap_per_group,
667 dlq_target,
668 },
669 );
670 }
671 policies
672}
673
674fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
675 match row.get_field(key) {
676 Some(Value::UnsignedInteger(v)) => Some(*v),
677 Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
678 _ => None,
679 }
680}
681
682fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
683 match row.get_field(key) {
684 Some(Value::Text(value)) => Some(value.to_string()),
685 _ => None,
686 }
687}
688
689fn collection_contract<'a>(
690 collection_name: &str,
691 contracts: Option<&'a [CollectionContract]>,
692) -> Option<&'a CollectionContract> {
693 contracts.and_then(|contracts| {
694 contracts
695 .iter()
696 .find(|contract| contract.name == collection_name)
697 })
698}
699
700fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
701 let mut has_table = false;
702 let mut has_graph = false;
703 let mut has_vector = false;
704
705 for entity in entities {
706 match &entity.kind {
707 EntityKind::TableRow { .. } => has_table = true,
708 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
709 EntityKind::Vector { .. } => has_vector = true,
710 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
711 }
712 }
713
714 match (has_table, has_graph, has_vector) {
715 (true, false, false) => CollectionModel::Table,
716 (false, true, false) => CollectionModel::Graph,
717 (false, false, true) => CollectionModel::Vector,
718 _ => CollectionModel::Mixed,
719 }
720}
721
722fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
723 match model {
724 CollectionModel::Table => SchemaMode::Strict,
725 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
726 CollectionModel::Document
727 | CollectionModel::Hll
728 | CollectionModel::Sketch
729 | CollectionModel::Filter
730 | CollectionModel::Kv
731 | CollectionModel::Config
732 | CollectionModel::Vault
733 | CollectionModel::Mixed => SchemaMode::Dynamic,
734 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
735 CollectionModel::Queue => SchemaMode::Dynamic,
736 }
737}
738
739fn infer_indices(
740 collection_name: &str,
741 model: CollectionModel,
742 index_catalog: Option<&IndexCatalog>,
743 declarations: Option<&CatalogDeclarations>,
744) -> Vec<String> {
745 let declared = declared_indices(collection_name, declarations);
746 if !declared.is_empty() {
747 return declared;
748 }
749
750 let available = index_catalog
751 .map(IndexCatalog::snapshot)
752 .unwrap_or_default();
753 let mut selected = Vec::new();
754
755 for metric in available {
756 let relevant = matches!(
757 (model, metric.kind),
758 (_, IndexKind::BTree)
759 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
760 | (CollectionModel::Vector, IndexKind::VectorHnsw)
761 | (CollectionModel::Vector, IndexKind::VectorInverted)
762 | (CollectionModel::Vector, IndexKind::VectorTurbo)
763 | (CollectionModel::Document, IndexKind::FullText)
764 | (CollectionModel::Document, IndexKind::DocumentPathValue)
765 | (CollectionModel::Kv, IndexKind::Hash)
766 | (CollectionModel::Config, IndexKind::Hash)
767 | (CollectionModel::Vault, IndexKind::Hash)
768 | (CollectionModel::Mixed, _)
769 );
770
771 if relevant && metric.enabled {
772 selected.push(metric.name);
773 }
774 }
775
776 selected
777}
778
779fn declared_indices(
780 collection_name: &str,
781 declarations: Option<&CatalogDeclarations>,
782) -> Vec<String> {
783 let mut selected = declarations
784 .map(|declarations| {
785 declarations
786 .declared_indexes
787 .iter()
788 .filter(|index| index.collection.as_deref() == Some(collection_name))
789 .map(|index| index.name.clone())
790 .collect::<Vec<_>>()
791 })
792 .unwrap_or_default();
793 selected.sort();
794 selected.dedup();
795 selected
796}
797
798fn operational_indices(
799 collection_name: &str,
800 index_catalog: Option<&IndexCatalog>,
801 declarations: Option<&CatalogDeclarations>,
802) -> Vec<String> {
803 if let Some(declarations) = declarations {
804 let mut selected = declarations
805 .operational_indexes
806 .iter()
807 .filter(|index| index.collection.as_deref() == Some(collection_name))
808 .map(|index| index.name.clone())
809 .collect::<Vec<_>>();
810 selected.sort();
811 selected.dedup();
812 return selected;
813 }
814
815 let mut selected = index_catalog
816 .map(IndexCatalog::snapshot)
817 .unwrap_or_default()
818 .into_iter()
819 .filter(|metric| metric.enabled)
820 .filter_map(|metric| {
821 if metric.name.starts_with(collection_name) {
822 Some(metric.name)
823 } else {
824 None
825 }
826 })
827 .collect::<Vec<_>>();
828 selected.sort();
829 selected.dedup();
830 selected
831}
832
833fn collection_index_consistency(
834 declared_indices: &[String],
835 operational_indices: &[String],
836) -> (bool, Vec<String>, Vec<String>) {
837 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
838 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
839 let missing_operational_indices = declared
840 .difference(&operational)
841 .cloned()
842 .collect::<Vec<_>>();
843 let undeclared_operational_indices = operational
844 .difference(&declared)
845 .cloned()
846 .collect::<Vec<_>>();
847 (
848 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
849 missing_operational_indices,
850 undeclared_operational_indices,
851 )
852}
853
854fn collection_index_readiness(
855 collection_name: &str,
856 statuses: &[CatalogIndexStatus],
857) -> (usize, usize, bool) {
858 let local = statuses
859 .iter()
860 .filter(|status| status.collection.as_deref() == Some(collection_name))
861 .collect::<Vec<_>>();
862 let queryable_count = local.iter().filter(|status| status.queryable).count();
863 let requires_rebuild_count = local
864 .iter()
865 .filter(|status| status.requires_rebuild)
866 .count();
867 let locally_in_sync = local.iter().all(|status| status.in_sync);
868 (queryable_count, requires_rebuild_count, locally_in_sync)
869}
870
871fn collection_graph_projection_readiness(
872 collection_name: &str,
873 statuses: &[CatalogGraphProjectionStatus],
874) -> (usize, usize, bool) {
875 let local = statuses
876 .iter()
877 .filter(|status| status.source.as_deref() == Some(collection_name))
878 .collect::<Vec<_>>();
879 let queryable_count = local.iter().filter(|status| status.queryable).count();
880 let requires_rematerialization_count = local
881 .iter()
882 .filter(|status| status.requires_rematerialization)
883 .count();
884 let locally_in_sync = local
885 .iter()
886 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
887 (
888 queryable_count,
889 requires_rematerialization_count,
890 locally_in_sync,
891 )
892}
893
894fn collection_analytics_job_readiness(
895 collection_name: &str,
896 projection_statuses: &[CatalogGraphProjectionStatus],
897 job_statuses: &[CatalogAnalyticsJobStatus],
898) -> (usize, usize, bool) {
899 let local = job_statuses
900 .iter()
901 .filter(|status| {
902 let Some(projection_name) = status.projection.as_deref() else {
903 return false;
904 };
905 projection_statuses
906 .iter()
907 .find(|projection| projection.name == projection_name)
908 .and_then(|projection| projection.source.as_deref())
909 == Some(collection_name)
910 })
911 .collect::<Vec<_>>();
912 let executable_count = local.iter().filter(|status| status.executable).count();
913 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
914 let locally_in_sync = local
915 .iter()
916 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
917 (executable_count, requires_rerun_count, locally_in_sync)
918}
919
920fn graph_projection_statuses(
921 declarations: Option<&CatalogDeclarations>,
922) -> Vec<CatalogGraphProjectionStatus> {
923 let declared = declarations
924 .map(|declarations| declarations.declared_graph_projections.as_slice())
925 .unwrap_or(&[]);
926 let operational = declarations
927 .map(|declarations| declarations.operational_graph_projections.as_slice())
928 .unwrap_or(&[]);
929 let declared_jobs = declarations
930 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
931 .unwrap_or(&[]);
932 let operational_jobs = declarations
933 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
934 .unwrap_or(&[]);
935
936 let mut names = declared
937 .iter()
938 .map(|projection| projection.name.clone())
939 .chain(operational.iter().map(|projection| projection.name.clone()))
940 .collect::<Vec<_>>();
941 names.sort();
942 names.dedup();
943
944 names
945 .into_iter()
946 .map(|name| {
947 let declared_projection = declared.iter().find(|projection| projection.name == name);
948 let operational_projection = operational
949 .iter()
950 .find(|projection| projection.name == name);
951 let declared_present = declared_projection.is_some();
952 let operational_present = operational_projection.is_some();
953 let mut dependent_job_ids = BTreeSet::new();
954 let mut dependent_job_count = 0usize;
955 let mut active_dependent_job_count = 0usize;
956 let mut stale_dependent_job_count = 0usize;
957 for job in declared_jobs
958 .iter()
959 .chain(operational_jobs.iter())
960 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
961 {
962 if !dependent_job_ids.insert(job.id.clone()) {
963 continue;
964 }
965 dependent_job_count += 1;
966 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
967 active_dependent_job_count += 1;
968 }
969 if job.state == "stale" {
970 stale_dependent_job_count += 1;
971 }
972 }
973 let lifecycle_state = match (
974 declared_present,
975 operational_present,
976 declared_projection
977 .map(|projection| projection.state.as_str())
978 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
979 .unwrap_or_default(),
980 ) {
981 (true, _, "failed") => "failed",
982 (true, true, "stale") => "stale",
983 (true, _, "materializing") => "materializing",
984 (true, true, "materialized") => "materialized",
985 (true, true, _) => "materialized",
986 (false, true, _) => "orphaned-operational",
987 (true, false, _) => "declared",
988 (false, false, _) => "unknown",
989 }
990 .to_string();
991 let queryable = declared_present
992 && operational_present
993 && matches!(
994 declared_projection
995 .map(|projection| projection.state.as_str())
996 .or_else(
997 || operational_projection.map(|projection| projection.state.as_str())
998 )
999 .unwrap_or_default(),
1000 "materialized"
1001 );
1002 let requires_rematerialization = matches!(
1003 declared_projection
1004 .map(|projection| projection.state.as_str())
1005 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
1006 .unwrap_or_default(),
1007 "declared" | "materializing" | "failed" | "stale"
1008 );
1009 let mut attention_reasons = Vec::new();
1010 if !declared_present || !operational_present {
1011 attention_reasons.push("declaration_drift".to_string());
1012 }
1013 if requires_rematerialization {
1014 attention_reasons.push("requires_rematerialization".to_string());
1015 }
1016 if stale_dependent_job_count > 0 {
1017 attention_reasons.push("dependent_jobs_stale".to_string());
1018 }
1019 let attention_score = stale_dependent_job_count.saturating_mul(2)
1020 + usize::from(requires_rematerialization).saturating_mul(4)
1021 + usize::from(!declared_present || !operational_present)
1022 + usize::from(!queryable);
1023 CatalogGraphProjectionStatus {
1024 name,
1025 source: declared_projection
1026 .map(|projection| projection.source.clone())
1027 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
1028 lifecycle_state,
1029 declared: declared_present,
1030 operational: operational_present,
1031 in_sync: declared_present == operational_present,
1032 last_materialized_sequence: declared_projection
1033 .and_then(|projection| projection.last_materialized_sequence)
1034 .or_else(|| {
1035 operational_projection
1036 .and_then(|projection| projection.last_materialized_sequence)
1037 }),
1038 queryable,
1039 requires_rematerialization,
1040 dependent_job_count,
1041 active_dependent_job_count,
1042 stale_dependent_job_count,
1043 dependent_jobs_in_sync: stale_dependent_job_count == 0,
1044 rerun_required: stale_dependent_job_count > 0,
1045 attention_score,
1046 attention_reasons,
1047 }
1048 })
1049 .collect()
1050}
1051
1052fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
1053 let declared = declarations
1054 .map(|declarations| declarations.declared_indexes.as_slice())
1055 .unwrap_or(&[]);
1056 let operational = declarations
1057 .map(|declarations| declarations.operational_indexes.as_slice())
1058 .unwrap_or(&[]);
1059
1060 let mut names = declared
1061 .iter()
1062 .map(|index| index.name.clone())
1063 .chain(operational.iter().map(|index| index.name.clone()))
1064 .collect::<Vec<_>>();
1065 names.sort();
1066 names.dedup();
1067
1068 names
1069 .into_iter()
1070 .map(|name| {
1071 let declared_index = declared.iter().find(|index| index.name == name);
1072 let operational_index = operational.iter().find(|index| index.name == name);
1073 let collection = declared_index
1074 .and_then(|index| index.collection.clone())
1075 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
1076 let kind = declared_index
1077 .map(|index| index_kind_string(index.kind))
1078 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
1079 .unwrap_or_default();
1080 let enabled = declared_index
1081 .map(|index| index.enabled)
1082 .or_else(|| operational_index.map(|index| index.enabled))
1083 .unwrap_or(false);
1084 let build_state = operational_index
1085 .map(|index| index.build_state.clone())
1086 .or_else(|| declared_index.map(|index| index.build_state.clone()));
1087 let declared_present = declared_index.is_some();
1088 let operational_present = operational_index.is_some();
1089 let lifecycle_state = index_lifecycle_state(
1090 declared_present,
1091 operational_present,
1092 enabled,
1093 build_state.as_deref(),
1094 );
1095
1096 let mut attention_reasons = Vec::new();
1097 if declared_present != operational_present {
1098 attention_reasons.push("declaration_drift".to_string());
1099 }
1100 if !enabled && declared_present {
1101 attention_reasons.push("disabled".to_string());
1102 }
1103 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1104 attention_reasons.push("failed".to_string());
1105 }
1106 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1107 attention_reasons.push("stale".to_string());
1108 }
1109 if matches!(
1110 build_state.as_deref().unwrap_or_default(),
1111 "building" | "declared-unbuilt"
1112 ) {
1113 attention_reasons.push("requires_rebuild".to_string());
1114 }
1115 let queryable = declared_present
1116 && operational_present
1117 && enabled
1118 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1119 let requires_rebuild = matches!(
1120 build_state.as_deref().unwrap_or_default(),
1121 "declared-unbuilt" | "building" | "stale" | "failed"
1122 );
1123 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1124 + usize::from(!enabled && declared_present)
1125 + usize::from(declared_present != operational_present)
1126 + usize::from(!queryable);
1127
1128 let artifact_state = crate::physical::ArtifactState::from_build_state(
1129 build_state.as_deref().unwrap_or("declared-unbuilt"),
1130 enabled,
1131 );
1132 CatalogIndexStatus {
1133 name,
1134 collection,
1135 kind,
1136 declared: declared_present,
1137 operational: operational_present,
1138 enabled,
1139 build_state,
1140 artifact_state,
1141 lifecycle_state,
1142 in_sync: declared_present == operational_present,
1143 queryable,
1144 requires_rebuild,
1145 attention_score,
1146 attention_reasons,
1147 }
1148 })
1149 .collect()
1150}
1151
1152fn index_lifecycle_state(
1153 declared: bool,
1154 operational: bool,
1155 enabled: bool,
1156 build_state: Option<&str>,
1157) -> String {
1158 if !declared && operational {
1159 return "orphaned-operational".to_string();
1160 }
1161 if declared && !enabled {
1162 return "disabled".to_string();
1163 }
1164 if !declared {
1165 return "unknown".to_string();
1166 }
1167 if !operational {
1168 return "declared".to_string();
1169 }
1170
1171 match build_state.unwrap_or_default() {
1172 "ready" => "ready".to_string(),
1173 "failed" => "failed".to_string(),
1174 "stale" => "stale".to_string(),
1175 "declared-unbuilt" => "declared".to_string(),
1176 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1177 "building".to_string()
1178 }
1179 _ => "building".to_string(),
1180 }
1181}
1182
1183fn index_kind_string(kind: IndexKind) -> String {
1184 match kind {
1185 IndexKind::BTree => "btree",
1186 IndexKind::Hash => "hash",
1187 IndexKind::Bitmap => "bitmap",
1188 IndexKind::Spatial => "spatial.rtree",
1189 IndexKind::VectorHnsw => "vector.hnsw",
1190 IndexKind::VectorInverted => "vector.inverted",
1191 IndexKind::VectorTurbo => "vector.turbo",
1192 IndexKind::GraphAdjacency => "graph.adjacency",
1193 IndexKind::FullText => "text.fulltext",
1194 IndexKind::DocumentPathValue => "document.pathvalue",
1195 IndexKind::HybridSearch => "search.hybrid",
1196 }
1197 .to_string()
1198}
1199
1200fn analytics_job_statuses(
1201 declarations: Option<&CatalogDeclarations>,
1202) -> Vec<CatalogAnalyticsJobStatus> {
1203 let declared = declarations
1204 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1205 .unwrap_or(&[]);
1206 let operational = declarations
1207 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1208 .unwrap_or(&[]);
1209 let declared_projections = declarations
1210 .map(|declarations| declarations.declared_graph_projections.as_slice())
1211 .unwrap_or(&[]);
1212 let operational_projections = declarations
1213 .map(|declarations| declarations.operational_graph_projections.as_slice())
1214 .unwrap_or(&[]);
1215
1216 let mut ids = declared
1217 .iter()
1218 .map(|job| job.id.clone())
1219 .chain(operational.iter().map(|job| job.id.clone()))
1220 .collect::<Vec<_>>();
1221 ids.sort();
1222 ids.dedup();
1223
1224 ids.into_iter()
1225 .map(|id| {
1226 let declared_job = declared.iter().find(|job| job.id == id);
1227 let operational_job = operational.iter().find(|job| job.id == id);
1228 let kind = declared_job
1229 .map(|job| job.kind.clone())
1230 .or_else(|| operational_job.map(|job| job.kind.clone()))
1231 .unwrap_or_default();
1232 let projection = declared_job
1233 .and_then(|job| job.projection.clone())
1234 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1235 let state = declared_job
1236 .map(|job| job.state.clone())
1237 .or_else(|| operational_job.map(|job| job.state.clone()))
1238 .unwrap_or_default();
1239 let declared_present = declared_job.is_some();
1240 let operational_present = operational_job.is_some();
1241 let last_run_sequence = declared_job
1242 .and_then(|job| job.last_run_sequence)
1243 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1244 let projection_declared = projection.as_ref().map(|projection_name| {
1245 declared_projections
1246 .iter()
1247 .any(|projection| projection.name == *projection_name)
1248 });
1249 let projection_operational = projection.as_ref().map(|projection_name| {
1250 operational_projections
1251 .iter()
1252 .any(|projection| projection.name == *projection_name)
1253 });
1254 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1255 declared_projections
1256 .iter()
1257 .find(|projection| projection.name == *projection_name)
1258 .map(|projection| projection.state.as_str())
1259 .or_else(|| {
1260 operational_projections
1261 .iter()
1262 .find(|projection| projection.name == *projection_name)
1263 .map(|projection| projection.state.as_str())
1264 })
1265 });
1266 let dependency_in_sync = projection.as_ref().map(|_| {
1267 matches!(projection_lifecycle, Some("materialized"))
1268 && projection_operational == Some(true)
1269 });
1270 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1271 (true, false, _) => "declared",
1272 (true, true, _)
1273 if matches!(
1274 projection_lifecycle,
1275 Some("stale" | "failed" | "materializing" | "declared")
1276 ) =>
1277 {
1278 "stale"
1279 }
1280 (true, true, "completed") => "completed",
1281 (true, true, "running") => "running",
1282 (true, true, "failed") => "failed",
1283 (true, true, "queued") => "queued",
1284 (true, true, "stale") => "stale",
1285 (true, true, _) => "materialized",
1286 (false, true, _) => "orphaned-operational",
1287 (false, false, _) => "unknown",
1288 }
1289 .to_string();
1290 let executable = declared_present
1291 && operational_present
1292 && !matches!(state.as_str(), "failed" | "stale")
1293 && dependency_in_sync.unwrap_or(true);
1294 let requires_rerun =
1295 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1296 let mut attention_reasons = Vec::new();
1297 if declared_present != operational_present {
1298 attention_reasons.push("declaration_drift".to_string());
1299 }
1300 if matches!(state.as_str(), "failed") {
1301 attention_reasons.push("failed".to_string());
1302 }
1303 if matches!(state.as_str(), "stale") {
1304 attention_reasons.push("stale".to_string());
1305 }
1306 if dependency_in_sync == Some(false) {
1307 attention_reasons.push("dependency_drift".to_string());
1308 }
1309 if requires_rerun {
1310 attention_reasons.push("requires_rerun".to_string());
1311 }
1312 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1313 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1314 + usize::from(declared_present != operational_present)
1315 + usize::from(!executable);
1316 CatalogAnalyticsJobStatus {
1317 id,
1318 kind,
1319 projection,
1320 state: state.clone(),
1321 lifecycle_state,
1322 declared: declared_present,
1323 operational: operational_present,
1324 in_sync: declared_present == operational_present,
1325 last_run_sequence,
1326 projection_declared,
1327 projection_operational,
1328 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1329 dependency_in_sync,
1330 executable,
1331 requires_rerun,
1332 attention_score,
1333 attention_reasons,
1334 }
1335 })
1336 .collect()
1337}
1338
1339pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1340 let declared_indexes = snapshot
1341 .declared_indexes
1342 .iter()
1343 .map(|index| index.name.clone())
1344 .collect::<BTreeSet<_>>();
1345 let operational_indexes = snapshot
1346 .operational_indexes
1347 .iter()
1348 .map(|index| index.name.clone())
1349 .collect::<BTreeSet<_>>();
1350 let declared_graph_projections = snapshot
1351 .declared_graph_projections
1352 .iter()
1353 .map(|projection| projection.name.clone())
1354 .collect::<BTreeSet<_>>();
1355 let operational_graph_projections = snapshot
1356 .operational_graph_projections
1357 .iter()
1358 .map(|projection| projection.name.clone())
1359 .collect::<BTreeSet<_>>();
1360 let declared_analytics_jobs = snapshot
1361 .declared_analytics_jobs
1362 .iter()
1363 .map(|job| job.id.clone())
1364 .collect::<BTreeSet<_>>();
1365 let operational_analytics_jobs = snapshot
1366 .operational_analytics_jobs
1367 .iter()
1368 .map(|job| job.id.clone())
1369 .collect::<BTreeSet<_>>();
1370
1371 CatalogConsistencyReport {
1372 declared_index_count: declared_indexes.len(),
1373 operational_index_count: operational_indexes.len(),
1374 declared_graph_projection_count: declared_graph_projections.len(),
1375 operational_graph_projection_count: operational_graph_projections.len(),
1376 declared_analytics_job_count: declared_analytics_jobs.len(),
1377 operational_analytics_job_count: operational_analytics_jobs.len(),
1378 missing_operational_indexes: declared_indexes
1379 .difference(&operational_indexes)
1380 .cloned()
1381 .collect(),
1382 undeclared_operational_indexes: operational_indexes
1383 .difference(&declared_indexes)
1384 .cloned()
1385 .collect(),
1386 missing_operational_graph_projections: declared_graph_projections
1387 .difference(&operational_graph_projections)
1388 .cloned()
1389 .collect(),
1390 undeclared_operational_graph_projections: operational_graph_projections
1391 .difference(&declared_graph_projections)
1392 .cloned()
1393 .collect(),
1394 missing_operational_analytics_jobs: declared_analytics_jobs
1395 .difference(&operational_analytics_jobs)
1396 .cloned()
1397 .collect(),
1398 undeclared_operational_analytics_jobs: operational_analytics_jobs
1399 .difference(&declared_analytics_jobs)
1400 .cloned()
1401 .collect(),
1402 }
1403}
1404
1405pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1406 CatalogAttentionSummary {
1407 collections_requiring_attention: snapshot
1408 .collections
1409 .iter()
1410 .filter(|collection| collection.attention_required)
1411 .count(),
1412 indexes_requiring_attention: snapshot
1413 .index_statuses
1414 .iter()
1415 .filter(|status| status.attention_score > 0)
1416 .count(),
1417 graph_projections_requiring_attention: snapshot
1418 .graph_projection_statuses
1419 .iter()
1420 .filter(|status| status.attention_score > 0)
1421 .count(),
1422 analytics_jobs_requiring_attention: snapshot
1423 .analytics_job_statuses
1424 .iter()
1425 .filter(|status| status.attention_score > 0)
1426 .count(),
1427 top_collection: snapshot
1428 .collections
1429 .iter()
1430 .filter(|collection| collection.attention_score > 0)
1431 .max_by_key(|collection| collection.attention_score)
1432 .cloned(),
1433 top_index: snapshot
1434 .index_statuses
1435 .iter()
1436 .filter(|status| status.attention_score > 0)
1437 .max_by_key(|status| status.attention_score)
1438 .cloned(),
1439 top_graph_projection: snapshot
1440 .graph_projection_statuses
1441 .iter()
1442 .filter(|status| status.attention_score > 0)
1443 .max_by_key(|status| status.attention_score)
1444 .cloned(),
1445 top_analytics_job: snapshot
1446 .analytics_job_statuses
1447 .iter()
1448 .filter(|status| status.attention_score > 0)
1449 .max_by_key(|status| status.attention_score)
1450 .cloned(),
1451 }
1452}
1453
1454pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1455 let mut collections = snapshot
1456 .collections
1457 .iter()
1458 .filter(|collection| collection.attention_required)
1459 .cloned()
1460 .collect::<Vec<_>>();
1461 collections.sort_by(|left, right| {
1462 right
1463 .attention_score
1464 .cmp(&left.attention_score)
1465 .then_with(|| left.name.cmp(&right.name))
1466 });
1467 collections
1468}
1469
1470pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1471 let mut statuses = snapshot
1472 .index_statuses
1473 .iter()
1474 .filter(|status| status.attention_score > 0)
1475 .cloned()
1476 .collect::<Vec<_>>();
1477 statuses.sort_by(|left, right| {
1478 right
1479 .attention_score
1480 .cmp(&left.attention_score)
1481 .then_with(|| left.name.cmp(&right.name))
1482 });
1483 statuses
1484}
1485
1486pub fn graph_projection_attention(
1487 snapshot: &CatalogModelSnapshot,
1488) -> Vec<CatalogGraphProjectionStatus> {
1489 let mut statuses = snapshot
1490 .graph_projection_statuses
1491 .iter()
1492 .filter(|status| status.attention_score > 0)
1493 .cloned()
1494 .collect::<Vec<_>>();
1495 statuses.sort_by(|left, right| {
1496 right
1497 .attention_score
1498 .cmp(&left.attention_score)
1499 .then_with(|| left.name.cmp(&right.name))
1500 });
1501 statuses
1502}
1503
1504pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1505 let mut statuses = snapshot
1506 .analytics_job_statuses
1507 .iter()
1508 .filter(|status| status.attention_score > 0)
1509 .cloned()
1510 .collect::<Vec<_>>();
1511 statuses.sort_by(|left, right| {
1512 right
1513 .attention_score
1514 .cmp(&left.attention_score)
1515 .then_with(|| left.id.cmp(&right.id))
1516 });
1517 statuses
1518}