Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16/// Per-collection analytical-storage seam (PRD #850, Phase 1).
17///
18/// Declares that a `Metrics`/`TimeSeries` collection's hypertable chunks
19/// are backed by the **columnar** sealed-chunk layout
20/// ([`column_block`](crate::storage::unified::column_block)) rather than
21/// the default row engine. `columnar = false` (or the whole config
22/// absent) keeps the row engine — the seal dispatch routes only
23/// columnar-flagged chunks to the `ColumnBlock` writer.
24///
25/// `time_key` / `order_by_key` mirror the ClickHouse `ORDER BY` intent so
26/// downstream slices (#854 granules, #856 vectorized read) can lay the
27/// columns out in their natural sort order; v1 records them as the durable
28/// contract without yet acting on `order_by_key`.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct AnalyticalStorageConfig {
31    /// When true, sealing a chunk routes to the columnar `ColumnBlock`
32    /// writer; otherwise the row engine stays the default.
33    pub columnar: bool,
34    /// Column carrying the time axis (the chunk's partition key).
35    pub time_key: String,
36    /// Optional secondary sort key (ClickHouse `ORDER BY` tail). `None`
37    /// orders by `time_key` alone.
38    pub order_by_key: Option<String>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaMode {
43    Strict,
44    SemiStructured,
45    Dynamic,
46}
47
48// Catalog AST-leaf descriptors re-homed to the neutral keystone crate (ADR
49// 0053, RQL Phase 2 S4b) so the canonical SQL AST
50// (`CreateTableQuery.{subscriptions, analytics_config}`) resolves them without a
51// `reddb-server` edge. This shim keeps `crate::catalog::{...}` valid for
52// existing call-sites.
53pub use reddb_types::catalog::{
54    AnalyticsOutput, AnalyticsViewDescriptor, CollectionModel, SubscriptionDescriptor,
55    SubscriptionOperation,
56};
57
58#[derive(Debug, Clone)]
59pub struct CollectionDescriptor {
60    pub name: String,
61    pub model: CollectionModel,
62    pub schema_mode: SchemaMode,
63    pub contract_present: bool,
64    pub contract_origin: Option<crate::physical::ContractOrigin>,
65    pub declared_model: Option<CollectionModel>,
66    pub observed_model: CollectionModel,
67    pub queue_mode: Option<QueueMode>,
68    /// MAX_ATTEMPTS per-queue policy. Hot-fields tier — populated for
69    /// `model = queue` from the catalog snapshot for sub-ms reads by
70    /// `QueueLifecycle`. `None` for non-queue collections.
71    pub queue_max_attempts: Option<u32>,
72    /// LOCK_DEADLINE_MS per-queue policy. Hot-fields tier.
73    pub queue_lock_deadline_ms: Option<u64>,
74    /// IN_FLIGHT_CAP_PER_GROUP per-queue policy. Hot-fields tier.
75    pub queue_in_flight_cap_per_group: Option<u32>,
76    /// DLQ target collection name. `None` means on-max drop.
77    pub queue_dlq_target: Option<String>,
78    pub vector_dimension: Option<usize>,
79    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
80    /// `WITH SESSION_KEY <col>` declared on the timeseries collection.
81    /// `None` for non-timeseries and for timeseries without the clause.
82    /// Issue #576 slice 1.
83    pub session_key: Option<String>,
84    /// `SESSION_GAP <duration>` in milliseconds declared on the
85    /// timeseries collection. Issue #576 slice 1.
86    pub session_gap_ms: Option<u64>,
87    /// Declarative retention duration in milliseconds. `None` means no
88    /// retention policy is enforced. Set via `ALTER COLLECTION ... SET
89    /// RETENTION <duration>`. Issue #580 slice 1.
90    pub retention_duration_ms: Option<u64>,
91    pub declared_schema_mode: Option<SchemaMode>,
92    pub observed_schema_mode: SchemaMode,
93    pub entities: usize,
94    pub cross_refs: usize,
95    pub segments: usize,
96    pub indices: Vec<String>,
97    pub declared_indices: Vec<String>,
98    pub operational_indices: Vec<String>,
99    pub indexes_in_sync: bool,
100    pub missing_operational_indices: Vec<String>,
101    pub undeclared_operational_indices: Vec<String>,
102    pub queryable_index_count: usize,
103    pub indexes_requiring_rebuild_count: usize,
104    pub queryable_graph_projection_count: usize,
105    pub graph_projections_requiring_rematerialization_count: usize,
106    pub executable_analytics_job_count: usize,
107    pub analytics_jobs_requiring_rerun_count: usize,
108    pub subscriptions: Vec<SubscriptionDescriptor>,
109    /// Analytics views declared by `WITH ANALYTICS (...)`. Empty for
110    /// collections without the clause. Sourced from the persisted
111    /// `CollectionContract` so it survives restarts (issue #800).
112    pub analytics_config: Vec<AnalyticsViewDescriptor>,
113    pub resources_in_sync: bool,
114    pub attention_required: bool,
115    pub attention_score: usize,
116    pub attention_reasons: Vec<String>,
117}
118
119#[derive(Debug, Clone)]
120pub struct CatalogModelSnapshot {
121    pub summary: CatalogSnapshot,
122    pub collections: Vec<CollectionDescriptor>,
123    pub indices: IndexCatalogSnapshot,
124    pub declared_indexes: Vec<PhysicalIndexState>,
125    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
126    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
127    pub operational_indexes: Vec<PhysicalIndexState>,
128    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
129    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
130    pub index_statuses: Vec<CatalogIndexStatus>,
131    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
132    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
133    pub queryable_index_count: usize,
134    pub indexes_requiring_rebuild_count: usize,
135    pub queryable_graph_projection_count: usize,
136    pub graph_projections_requiring_rematerialization_count: usize,
137    pub executable_analytics_job_count: usize,
138    pub analytics_jobs_requiring_rerun_count: usize,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogAttentionSummary {
143    pub collections_requiring_attention: usize,
144    pub indexes_requiring_attention: usize,
145    pub graph_projections_requiring_attention: usize,
146    pub analytics_jobs_requiring_attention: usize,
147    pub top_collection: Option<CollectionDescriptor>,
148    pub top_index: Option<CatalogIndexStatus>,
149    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
150    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct CatalogDeclarations {
155    pub declared_indexes: Vec<PhysicalIndexState>,
156    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
157    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
158    pub operational_indexes: Vec<PhysicalIndexState>,
159    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
160    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogGraphProjectionStatus {
165    pub name: String,
166    pub source: Option<String>,
167    pub lifecycle_state: String,
168    pub declared: bool,
169    pub operational: bool,
170    pub in_sync: bool,
171    pub last_materialized_sequence: Option<u64>,
172    pub queryable: bool,
173    pub requires_rematerialization: bool,
174    pub dependent_job_count: usize,
175    pub active_dependent_job_count: usize,
176    pub stale_dependent_job_count: usize,
177    pub dependent_jobs_in_sync: bool,
178    pub rerun_required: bool,
179    pub attention_score: usize,
180    pub attention_reasons: Vec<String>,
181}
182
183#[derive(Debug, Clone)]
184pub struct CatalogIndexStatus {
185    pub name: String,
186    pub collection: Option<String>,
187    pub kind: String,
188    pub declared: bool,
189    pub operational: bool,
190    pub enabled: bool,
191    pub build_state: Option<String>,
192    pub artifact_state: crate::physical::ArtifactState,
193    pub lifecycle_state: String,
194    pub in_sync: bool,
195    pub queryable: bool,
196    pub requires_rebuild: bool,
197    pub attention_score: usize,
198    pub attention_reasons: Vec<String>,
199}
200
201#[derive(Debug, Clone)]
202pub struct CatalogAnalyticsJobStatus {
203    pub id: String,
204    pub kind: String,
205    pub projection: Option<String>,
206    pub state: String,
207    pub lifecycle_state: String,
208    pub declared: bool,
209    pub operational: bool,
210    pub in_sync: bool,
211    pub last_run_sequence: Option<u64>,
212    pub projection_declared: Option<bool>,
213    pub projection_operational: Option<bool>,
214    pub projection_lifecycle_state: Option<String>,
215    pub dependency_in_sync: Option<bool>,
216    pub executable: bool,
217    pub requires_rerun: bool,
218    pub attention_score: usize,
219    pub attention_reasons: Vec<String>,
220}
221
222#[derive(Debug, Clone, Default)]
223pub struct CatalogConsistencyReport {
224    pub declared_index_count: usize,
225    pub operational_index_count: usize,
226    pub declared_graph_projection_count: usize,
227    pub operational_graph_projection_count: usize,
228    pub declared_analytics_job_count: usize,
229    pub operational_analytics_job_count: usize,
230    pub missing_operational_indexes: Vec<String>,
231    pub undeclared_operational_indexes: Vec<String>,
232    pub missing_operational_graph_projections: Vec<String>,
233    pub undeclared_operational_graph_projections: Vec<String>,
234    pub missing_operational_analytics_jobs: Vec<String>,
235    pub undeclared_operational_analytics_jobs: Vec<String>,
236}
237
238pub fn snapshot_store(
239    name: &str,
240    store: &UnifiedStore,
241    index_catalog: Option<&IndexCatalog>,
242) -> CatalogModelSnapshot {
243    snapshot_store_with_declarations(name, store, index_catalog, None, None)
244}
245
246pub fn snapshot_store_with_declarations(
247    name: &str,
248    store: &UnifiedStore,
249    index_catalog: Option<&IndexCatalog>,
250    declarations: Option<&CatalogDeclarations>,
251    contracts: Option<&[CollectionContract]>,
252) -> CatalogModelSnapshot {
253    let index_statuses = index_statuses(declarations);
254    let graph_projection_statuses = graph_projection_statuses(declarations);
255    let analytics_job_statuses = analytics_job_statuses(declarations);
256
257    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
258    for (collection, entity) in store.query_all(|_| true) {
259        grouped.entry(collection).or_default().push(entity);
260    }
261    let queue_policies = queue_policies_from_grouped(&grouped);
262
263    let mut stats_by_collection = BTreeMap::new();
264    let mut collections = Vec::new();
265
266    for collection_name in store.list_collections() {
267        let entities = grouped.remove(&collection_name).unwrap_or_default();
268        let inferred_model = infer_model(&entities);
269        let inferred_schema_mode = infer_schema_mode(inferred_model);
270        let contract = collection_contract(&collection_name, contracts);
271        let model = contract
272            .map(|contract| contract.declared_model)
273            .unwrap_or(inferred_model);
274        let cross_refs = entities
275            .iter()
276            .map(|entity| entity.cross_refs().len())
277            .sum();
278        let entity_count = entities.len();
279        let manager_stats = store
280            .get_collection(&collection_name)
281            .map(|manager| manager.stats());
282        let segments = manager_stats
283            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
284            .unwrap_or(0);
285
286        stats_by_collection.insert(
287            collection_name.clone(),
288            CollectionStats {
289                entities: entity_count,
290                cross_refs,
291                segments,
292            },
293        );
294
295        let declared_indices = declared_indices(&collection_name, declarations);
296        let operational_indices =
297            operational_indices(&collection_name, index_catalog, declarations);
298        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
299            collection_index_consistency(&declared_indices, &operational_indices);
300        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
301            collection_index_readiness(&collection_name, &index_statuses);
302        let (
303            queryable_graph_projection_count,
304            graph_projections_requiring_rematerialization_count,
305            graph_projections_locally_in_sync,
306        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
307        let (
308            executable_analytics_job_count,
309            analytics_jobs_requiring_rerun_count,
310            analytics_jobs_locally_in_sync,
311        ) = collection_analytics_job_readiness(
312            &collection_name,
313            &graph_projection_statuses,
314            &analytics_job_statuses,
315        );
316        let resources_in_sync = indexes_in_sync
317            && indexes_locally_in_sync
318            && graph_projections_locally_in_sync
319            && analytics_jobs_locally_in_sync;
320        let attention_required = !resources_in_sync
321            || indexes_requiring_rebuild_count > 0
322            || graph_projections_requiring_rematerialization_count > 0
323            || analytics_jobs_requiring_rerun_count > 0;
324        let mut attention_reasons = Vec::new();
325        if !indexes_in_sync || !indexes_locally_in_sync {
326            attention_reasons.push("index_drift".to_string());
327        }
328        if indexes_requiring_rebuild_count > 0 {
329            attention_reasons.push("indexes_require_rebuild".to_string());
330        }
331        if !graph_projections_locally_in_sync {
332            attention_reasons.push("graph_projection_drift".to_string());
333        }
334        if graph_projections_requiring_rematerialization_count > 0 {
335            attention_reasons.push("graph_projections_require_rematerialization".to_string());
336        }
337        if !analytics_jobs_locally_in_sync {
338            attention_reasons.push("analytics_job_drift".to_string());
339        }
340        if analytics_jobs_requiring_rerun_count > 0 {
341            attention_reasons.push("analytics_jobs_require_rerun".to_string());
342        }
343        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
344            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
345            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
346            + usize::from(!resources_in_sync);
347
348        collections.push(CollectionDescriptor {
349            name: collection_name.clone(),
350            model,
351            schema_mode: contract
352                .map(|contract| contract.schema_mode)
353                .unwrap_or(inferred_schema_mode),
354            contract_present: contract.is_some(),
355            contract_origin: contract.map(|contract| contract.origin),
356            declared_model: contract.map(|contract| contract.declared_model),
357            observed_model: inferred_model,
358            queue_mode: if model == CollectionModel::Queue {
359                Some(
360                    queue_policies
361                        .get(&collection_name)
362                        .map(|p| p.mode)
363                        .unwrap_or_default(),
364                )
365            } else {
366                None
367            },
368            queue_max_attempts: if model == CollectionModel::Queue {
369                Some(
370                    queue_policies
371                        .get(&collection_name)
372                        .map(|p| p.max_attempts)
373                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
374                )
375            } else {
376                None
377            },
378            queue_lock_deadline_ms: if model == CollectionModel::Queue {
379                Some(
380                    queue_policies
381                        .get(&collection_name)
382                        .map(|p| p.lock_deadline_ms)
383                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
384                )
385            } else {
386                None
387            },
388            queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
389                Some(
390                    queue_policies
391                        .get(&collection_name)
392                        .map(|p| p.in_flight_cap_per_group)
393                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
394                )
395            } else {
396                None
397            },
398            queue_dlq_target: if model == CollectionModel::Queue {
399                queue_policies
400                    .get(&collection_name)
401                    .and_then(|p| p.dlq_target.clone())
402            } else {
403                None
404            },
405            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
406            vector_metric: contract.and_then(|contract| contract.vector_metric),
407            session_key: contract.and_then(|contract| contract.session_key.clone()),
408            session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
409            retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
410            declared_schema_mode: contract.map(|contract| contract.schema_mode),
411            observed_schema_mode: inferred_schema_mode,
412            entities: entity_count,
413            cross_refs,
414            segments,
415            indices: infer_indices(&collection_name, model, index_catalog, declarations),
416            declared_indices,
417            operational_indices,
418            indexes_in_sync,
419            missing_operational_indices,
420            undeclared_operational_indices,
421            queryable_index_count,
422            indexes_requiring_rebuild_count,
423            queryable_graph_projection_count,
424            graph_projections_requiring_rematerialization_count,
425            executable_analytics_job_count,
426            analytics_jobs_requiring_rerun_count,
427            subscriptions: contract
428                .map(|contract| contract.subscriptions.clone())
429                .unwrap_or_default(),
430            analytics_config: contract
431                .map(|contract| contract.analytics_config.clone())
432                .unwrap_or_default(),
433            resources_in_sync,
434            attention_required,
435            attention_score,
436            attention_reasons,
437        });
438    }
439
440    collections.sort_by(|left, right| left.name.cmp(&right.name));
441
442    let summary = CatalogSnapshot {
443        name: name.to_string(),
444        total_entities: stats_by_collection
445            .values()
446            .map(|stats| stats.entities)
447            .sum(),
448        total_collections: stats_by_collection.len(),
449        stats_by_collection,
450        updated_at: SystemTime::now(),
451    };
452
453    CatalogModelSnapshot {
454        summary,
455        collections,
456        indices: index_catalog
457            .map(IndexCatalog::snapshot)
458            .unwrap_or_default(),
459        declared_indexes: declarations
460            .map(|declarations| declarations.declared_indexes.clone())
461            .unwrap_or_default(),
462        declared_graph_projections: declarations
463            .map(|declarations| declarations.declared_graph_projections.clone())
464            .unwrap_or_default(),
465        declared_analytics_jobs: declarations
466            .map(|declarations| declarations.declared_analytics_jobs.clone())
467            .unwrap_or_default(),
468        operational_indexes: declarations
469            .map(|declarations| declarations.operational_indexes.clone())
470            .unwrap_or_default(),
471        operational_graph_projections: declarations
472            .map(|declarations| declarations.operational_graph_projections.clone())
473            .unwrap_or_default(),
474        operational_analytics_jobs: declarations
475            .map(|declarations| declarations.operational_analytics_jobs.clone())
476            .unwrap_or_default(),
477        queryable_index_count: index_statuses
478            .iter()
479            .filter(|status| status.queryable)
480            .count(),
481        indexes_requiring_rebuild_count: index_statuses
482            .iter()
483            .filter(|status| status.requires_rebuild)
484            .count(),
485        queryable_graph_projection_count: graph_projection_statuses
486            .iter()
487            .filter(|status| status.queryable)
488            .count(),
489        graph_projections_requiring_rematerialization_count: graph_projection_statuses
490            .iter()
491            .filter(|status| status.requires_rematerialization)
492            .count(),
493        executable_analytics_job_count: analytics_job_statuses
494            .iter()
495            .filter(|status| status.executable)
496            .count(),
497        analytics_jobs_requiring_rerun_count: analytics_job_statuses
498            .iter()
499            .filter(|status| status.requires_rerun)
500            .count(),
501        index_statuses,
502        graph_projection_statuses,
503        analytics_job_statuses,
504    }
505}
506
507/// Hot-fields snapshot of per-queue policy, materialised at catalog
508/// snapshot time so `QueueLifecycle` (and the descriptor) can read
509/// values without re-scanning `red_queue_meta`.
510#[derive(Debug, Clone)]
511pub struct QueuePolicySnapshot {
512    pub mode: QueueMode,
513    pub max_attempts: u32,
514    pub lock_deadline_ms: u64,
515    pub in_flight_cap_per_group: u32,
516    pub dlq_target: Option<String>,
517}
518
519fn queue_policies_from_grouped(
520    grouped: &HashMap<String, Vec<UnifiedEntity>>,
521) -> HashMap<String, QueuePolicySnapshot> {
522    let Some(meta) = grouped.get("red_queue_meta") else {
523        return HashMap::new();
524    };
525
526    let mut policies = HashMap::new();
527    for entity in meta {
528        let Some(row) = entity.data.as_row() else {
529            continue;
530        };
531        if row_text(row, "kind").as_deref() != Some("queue_config") {
532            continue;
533        }
534        let Some(queue) = row_text(row, "queue") else {
535            continue;
536        };
537        let mode = row_text(row, "mode")
538            .as_deref()
539            .and_then(QueueMode::parse)
540            .unwrap_or_default();
541        let max_attempts = row_u64(row, "max_attempts")
542            .map(|v| v as u32)
543            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
544        let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
545            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
546        let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
547            .map(|v| v as u32)
548            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
549        let dlq_target = row_text(row, "dlq");
550        policies.insert(
551            queue,
552            QueuePolicySnapshot {
553                mode,
554                max_attempts,
555                lock_deadline_ms,
556                in_flight_cap_per_group,
557                dlq_target,
558            },
559        );
560    }
561    policies
562}
563
564fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
565    match row.get_field(key) {
566        Some(Value::UnsignedInteger(v)) => Some(*v),
567        Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
568        _ => None,
569    }
570}
571
572fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
573    match row.get_field(key) {
574        Some(Value::Text(value)) => Some(value.to_string()),
575        _ => None,
576    }
577}
578
579fn collection_contract<'a>(
580    collection_name: &str,
581    contracts: Option<&'a [CollectionContract]>,
582) -> Option<&'a CollectionContract> {
583    contracts.and_then(|contracts| {
584        contracts
585            .iter()
586            .find(|contract| contract.name == collection_name)
587    })
588}
589
590fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
591    let mut has_table = false;
592    let mut has_graph = false;
593    let mut has_vector = false;
594
595    for entity in entities {
596        match &entity.kind {
597            EntityKind::TableRow { .. } => has_table = true,
598            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
599            EntityKind::Vector { .. } => has_vector = true,
600            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
601        }
602    }
603
604    match (has_table, has_graph, has_vector) {
605        (true, false, false) => CollectionModel::Table,
606        (false, true, false) => CollectionModel::Graph,
607        (false, false, true) => CollectionModel::Vector,
608        _ => CollectionModel::Mixed,
609    }
610}
611
612fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
613    match model {
614        CollectionModel::Table => SchemaMode::Strict,
615        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
616        CollectionModel::Document
617        | CollectionModel::Hll
618        | CollectionModel::Sketch
619        | CollectionModel::Filter
620        | CollectionModel::Kv
621        | CollectionModel::Config
622        | CollectionModel::Vault
623        | CollectionModel::Mixed => SchemaMode::Dynamic,
624        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
625        CollectionModel::Queue => SchemaMode::Dynamic,
626    }
627}
628
629fn infer_indices(
630    collection_name: &str,
631    model: CollectionModel,
632    index_catalog: Option<&IndexCatalog>,
633    declarations: Option<&CatalogDeclarations>,
634) -> Vec<String> {
635    let declared = declared_indices(collection_name, declarations);
636    if !declared.is_empty() {
637        return declared;
638    }
639
640    let available = index_catalog
641        .map(IndexCatalog::snapshot)
642        .unwrap_or_default();
643    let mut selected = Vec::new();
644
645    for metric in available {
646        let relevant = matches!(
647            (model, metric.kind),
648            (_, IndexKind::BTree)
649                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
650                | (CollectionModel::Vector, IndexKind::VectorHnsw)
651                | (CollectionModel::Vector, IndexKind::VectorInverted)
652                | (CollectionModel::Vector, IndexKind::VectorTurbo)
653                | (CollectionModel::Document, IndexKind::FullText)
654                | (CollectionModel::Document, IndexKind::DocumentPathValue)
655                | (CollectionModel::Kv, IndexKind::Hash)
656                | (CollectionModel::Config, IndexKind::Hash)
657                | (CollectionModel::Vault, IndexKind::Hash)
658                | (CollectionModel::Mixed, _)
659        );
660
661        if relevant && metric.enabled {
662            selected.push(metric.name);
663        }
664    }
665
666    selected
667}
668
669fn declared_indices(
670    collection_name: &str,
671    declarations: Option<&CatalogDeclarations>,
672) -> Vec<String> {
673    let mut selected = declarations
674        .map(|declarations| {
675            declarations
676                .declared_indexes
677                .iter()
678                .filter(|index| index.collection.as_deref() == Some(collection_name))
679                .map(|index| index.name.clone())
680                .collect::<Vec<_>>()
681        })
682        .unwrap_or_default();
683    selected.sort();
684    selected.dedup();
685    selected
686}
687
688fn operational_indices(
689    collection_name: &str,
690    index_catalog: Option<&IndexCatalog>,
691    declarations: Option<&CatalogDeclarations>,
692) -> Vec<String> {
693    if let Some(declarations) = declarations {
694        let mut selected = declarations
695            .operational_indexes
696            .iter()
697            .filter(|index| index.collection.as_deref() == Some(collection_name))
698            .map(|index| index.name.clone())
699            .collect::<Vec<_>>();
700        selected.sort();
701        selected.dedup();
702        return selected;
703    }
704
705    let mut selected = index_catalog
706        .map(IndexCatalog::snapshot)
707        .unwrap_or_default()
708        .into_iter()
709        .filter(|metric| metric.enabled)
710        .filter_map(|metric| {
711            if metric.name.starts_with(collection_name) {
712                Some(metric.name)
713            } else {
714                None
715            }
716        })
717        .collect::<Vec<_>>();
718    selected.sort();
719    selected.dedup();
720    selected
721}
722
723fn collection_index_consistency(
724    declared_indices: &[String],
725    operational_indices: &[String],
726) -> (bool, Vec<String>, Vec<String>) {
727    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
728    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
729    let missing_operational_indices = declared
730        .difference(&operational)
731        .cloned()
732        .collect::<Vec<_>>();
733    let undeclared_operational_indices = operational
734        .difference(&declared)
735        .cloned()
736        .collect::<Vec<_>>();
737    (
738        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
739        missing_operational_indices,
740        undeclared_operational_indices,
741    )
742}
743
744fn collection_index_readiness(
745    collection_name: &str,
746    statuses: &[CatalogIndexStatus],
747) -> (usize, usize, bool) {
748    let local = statuses
749        .iter()
750        .filter(|status| status.collection.as_deref() == Some(collection_name))
751        .collect::<Vec<_>>();
752    let queryable_count = local.iter().filter(|status| status.queryable).count();
753    let requires_rebuild_count = local
754        .iter()
755        .filter(|status| status.requires_rebuild)
756        .count();
757    let locally_in_sync = local.iter().all(|status| status.in_sync);
758    (queryable_count, requires_rebuild_count, locally_in_sync)
759}
760
761fn collection_graph_projection_readiness(
762    collection_name: &str,
763    statuses: &[CatalogGraphProjectionStatus],
764) -> (usize, usize, bool) {
765    let local = statuses
766        .iter()
767        .filter(|status| status.source.as_deref() == Some(collection_name))
768        .collect::<Vec<_>>();
769    let queryable_count = local.iter().filter(|status| status.queryable).count();
770    let requires_rematerialization_count = local
771        .iter()
772        .filter(|status| status.requires_rematerialization)
773        .count();
774    let locally_in_sync = local
775        .iter()
776        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
777    (
778        queryable_count,
779        requires_rematerialization_count,
780        locally_in_sync,
781    )
782}
783
784fn collection_analytics_job_readiness(
785    collection_name: &str,
786    projection_statuses: &[CatalogGraphProjectionStatus],
787    job_statuses: &[CatalogAnalyticsJobStatus],
788) -> (usize, usize, bool) {
789    let local = job_statuses
790        .iter()
791        .filter(|status| {
792            let Some(projection_name) = status.projection.as_deref() else {
793                return false;
794            };
795            projection_statuses
796                .iter()
797                .find(|projection| projection.name == projection_name)
798                .and_then(|projection| projection.source.as_deref())
799                == Some(collection_name)
800        })
801        .collect::<Vec<_>>();
802    let executable_count = local.iter().filter(|status| status.executable).count();
803    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
804    let locally_in_sync = local
805        .iter()
806        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
807    (executable_count, requires_rerun_count, locally_in_sync)
808}
809
810fn graph_projection_statuses(
811    declarations: Option<&CatalogDeclarations>,
812) -> Vec<CatalogGraphProjectionStatus> {
813    let declared = declarations
814        .map(|declarations| declarations.declared_graph_projections.as_slice())
815        .unwrap_or(&[]);
816    let operational = declarations
817        .map(|declarations| declarations.operational_graph_projections.as_slice())
818        .unwrap_or(&[]);
819    let declared_jobs = declarations
820        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
821        .unwrap_or(&[]);
822    let operational_jobs = declarations
823        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
824        .unwrap_or(&[]);
825
826    let mut names = declared
827        .iter()
828        .map(|projection| projection.name.clone())
829        .chain(operational.iter().map(|projection| projection.name.clone()))
830        .collect::<Vec<_>>();
831    names.sort();
832    names.dedup();
833
834    names
835        .into_iter()
836        .map(|name| {
837            let declared_projection = declared.iter().find(|projection| projection.name == name);
838            let operational_projection = operational
839                .iter()
840                .find(|projection| projection.name == name);
841            let declared_present = declared_projection.is_some();
842            let operational_present = operational_projection.is_some();
843            let mut dependent_job_ids = BTreeSet::new();
844            let mut dependent_job_count = 0usize;
845            let mut active_dependent_job_count = 0usize;
846            let mut stale_dependent_job_count = 0usize;
847            for job in declared_jobs
848                .iter()
849                .chain(operational_jobs.iter())
850                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
851            {
852                if !dependent_job_ids.insert(job.id.clone()) {
853                    continue;
854                }
855                dependent_job_count += 1;
856                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
857                    active_dependent_job_count += 1;
858                }
859                if job.state == "stale" {
860                    stale_dependent_job_count += 1;
861                }
862            }
863            let lifecycle_state = match (
864                declared_present,
865                operational_present,
866                declared_projection
867                    .map(|projection| projection.state.as_str())
868                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
869                    .unwrap_or_default(),
870            ) {
871                (true, _, "failed") => "failed",
872                (true, true, "stale") => "stale",
873                (true, _, "materializing") => "materializing",
874                (true, true, "materialized") => "materialized",
875                (true, true, _) => "materialized",
876                (false, true, _) => "orphaned-operational",
877                (true, false, _) => "declared",
878                (false, false, _) => "unknown",
879            }
880            .to_string();
881            let queryable = declared_present
882                && operational_present
883                && matches!(
884                    declared_projection
885                        .map(|projection| projection.state.as_str())
886                        .or_else(
887                            || operational_projection.map(|projection| projection.state.as_str())
888                        )
889                        .unwrap_or_default(),
890                    "materialized"
891                );
892            let requires_rematerialization = matches!(
893                declared_projection
894                    .map(|projection| projection.state.as_str())
895                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
896                    .unwrap_or_default(),
897                "declared" | "materializing" | "failed" | "stale"
898            );
899            let mut attention_reasons = Vec::new();
900            if !declared_present || !operational_present {
901                attention_reasons.push("declaration_drift".to_string());
902            }
903            if requires_rematerialization {
904                attention_reasons.push("requires_rematerialization".to_string());
905            }
906            if stale_dependent_job_count > 0 {
907                attention_reasons.push("dependent_jobs_stale".to_string());
908            }
909            let attention_score = stale_dependent_job_count.saturating_mul(2)
910                + usize::from(requires_rematerialization).saturating_mul(4)
911                + usize::from(!declared_present || !operational_present)
912                + usize::from(!queryable);
913            CatalogGraphProjectionStatus {
914                name,
915                source: declared_projection
916                    .map(|projection| projection.source.clone())
917                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
918                lifecycle_state,
919                declared: declared_present,
920                operational: operational_present,
921                in_sync: declared_present == operational_present,
922                last_materialized_sequence: declared_projection
923                    .and_then(|projection| projection.last_materialized_sequence)
924                    .or_else(|| {
925                        operational_projection
926                            .and_then(|projection| projection.last_materialized_sequence)
927                    }),
928                queryable,
929                requires_rematerialization,
930                dependent_job_count,
931                active_dependent_job_count,
932                stale_dependent_job_count,
933                dependent_jobs_in_sync: stale_dependent_job_count == 0,
934                rerun_required: stale_dependent_job_count > 0,
935                attention_score,
936                attention_reasons,
937            }
938        })
939        .collect()
940}
941
942fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
943    let declared = declarations
944        .map(|declarations| declarations.declared_indexes.as_slice())
945        .unwrap_or(&[]);
946    let operational = declarations
947        .map(|declarations| declarations.operational_indexes.as_slice())
948        .unwrap_or(&[]);
949
950    let mut names = declared
951        .iter()
952        .map(|index| index.name.clone())
953        .chain(operational.iter().map(|index| index.name.clone()))
954        .collect::<Vec<_>>();
955    names.sort();
956    names.dedup();
957
958    names
959        .into_iter()
960        .map(|name| {
961            let declared_index = declared.iter().find(|index| index.name == name);
962            let operational_index = operational.iter().find(|index| index.name == name);
963            let collection = declared_index
964                .and_then(|index| index.collection.clone())
965                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
966            let kind = declared_index
967                .map(|index| index_kind_string(index.kind))
968                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
969                .unwrap_or_default();
970            let enabled = declared_index
971                .map(|index| index.enabled)
972                .or_else(|| operational_index.map(|index| index.enabled))
973                .unwrap_or(false);
974            let build_state = operational_index
975                .map(|index| index.build_state.clone())
976                .or_else(|| declared_index.map(|index| index.build_state.clone()));
977            let declared_present = declared_index.is_some();
978            let operational_present = operational_index.is_some();
979            let lifecycle_state = index_lifecycle_state(
980                declared_present,
981                operational_present,
982                enabled,
983                build_state.as_deref(),
984            );
985
986            let mut attention_reasons = Vec::new();
987            if declared_present != operational_present {
988                attention_reasons.push("declaration_drift".to_string());
989            }
990            if !enabled && declared_present {
991                attention_reasons.push("disabled".to_string());
992            }
993            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
994                attention_reasons.push("failed".to_string());
995            }
996            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
997                attention_reasons.push("stale".to_string());
998            }
999            if matches!(
1000                build_state.as_deref().unwrap_or_default(),
1001                "building" | "declared-unbuilt"
1002            ) {
1003                attention_reasons.push("requires_rebuild".to_string());
1004            }
1005            let queryable = declared_present
1006                && operational_present
1007                && enabled
1008                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1009            let requires_rebuild = matches!(
1010                build_state.as_deref().unwrap_or_default(),
1011                "declared-unbuilt" | "building" | "stale" | "failed"
1012            );
1013            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1014                + usize::from(!enabled && declared_present)
1015                + usize::from(declared_present != operational_present)
1016                + usize::from(!queryable);
1017
1018            let artifact_state = crate::physical::ArtifactState::from_build_state(
1019                build_state.as_deref().unwrap_or("declared-unbuilt"),
1020                enabled,
1021            );
1022            CatalogIndexStatus {
1023                name,
1024                collection,
1025                kind,
1026                declared: declared_present,
1027                operational: operational_present,
1028                enabled,
1029                build_state,
1030                artifact_state,
1031                lifecycle_state,
1032                in_sync: declared_present == operational_present,
1033                queryable,
1034                requires_rebuild,
1035                attention_score,
1036                attention_reasons,
1037            }
1038        })
1039        .collect()
1040}
1041
1042fn index_lifecycle_state(
1043    declared: bool,
1044    operational: bool,
1045    enabled: bool,
1046    build_state: Option<&str>,
1047) -> String {
1048    if !declared && operational {
1049        return "orphaned-operational".to_string();
1050    }
1051    if declared && !enabled {
1052        return "disabled".to_string();
1053    }
1054    if !declared {
1055        return "unknown".to_string();
1056    }
1057    if !operational {
1058        return "declared".to_string();
1059    }
1060
1061    match build_state.unwrap_or_default() {
1062        "ready" => "ready".to_string(),
1063        "failed" => "failed".to_string(),
1064        "stale" => "stale".to_string(),
1065        "declared-unbuilt" => "declared".to_string(),
1066        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1067            "building".to_string()
1068        }
1069        _ => "building".to_string(),
1070    }
1071}
1072
1073fn index_kind_string(kind: IndexKind) -> String {
1074    match kind {
1075        IndexKind::BTree => "btree",
1076        IndexKind::Hash => "hash",
1077        IndexKind::Bitmap => "bitmap",
1078        IndexKind::Spatial => "spatial.rtree",
1079        IndexKind::VectorHnsw => "vector.hnsw",
1080        IndexKind::VectorInverted => "vector.inverted",
1081        IndexKind::VectorTurbo => "vector.turbo",
1082        IndexKind::GraphAdjacency => "graph.adjacency",
1083        IndexKind::FullText => "text.fulltext",
1084        IndexKind::DocumentPathValue => "document.pathvalue",
1085        IndexKind::HybridSearch => "search.hybrid",
1086    }
1087    .to_string()
1088}
1089
1090fn analytics_job_statuses(
1091    declarations: Option<&CatalogDeclarations>,
1092) -> Vec<CatalogAnalyticsJobStatus> {
1093    let declared = declarations
1094        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1095        .unwrap_or(&[]);
1096    let operational = declarations
1097        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1098        .unwrap_or(&[]);
1099    let declared_projections = declarations
1100        .map(|declarations| declarations.declared_graph_projections.as_slice())
1101        .unwrap_or(&[]);
1102    let operational_projections = declarations
1103        .map(|declarations| declarations.operational_graph_projections.as_slice())
1104        .unwrap_or(&[]);
1105
1106    let mut ids = declared
1107        .iter()
1108        .map(|job| job.id.clone())
1109        .chain(operational.iter().map(|job| job.id.clone()))
1110        .collect::<Vec<_>>();
1111    ids.sort();
1112    ids.dedup();
1113
1114    ids.into_iter()
1115        .map(|id| {
1116            let declared_job = declared.iter().find(|job| job.id == id);
1117            let operational_job = operational.iter().find(|job| job.id == id);
1118            let kind = declared_job
1119                .map(|job| job.kind.clone())
1120                .or_else(|| operational_job.map(|job| job.kind.clone()))
1121                .unwrap_or_default();
1122            let projection = declared_job
1123                .and_then(|job| job.projection.clone())
1124                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1125            let state = declared_job
1126                .map(|job| job.state.clone())
1127                .or_else(|| operational_job.map(|job| job.state.clone()))
1128                .unwrap_or_default();
1129            let declared_present = declared_job.is_some();
1130            let operational_present = operational_job.is_some();
1131            let last_run_sequence = declared_job
1132                .and_then(|job| job.last_run_sequence)
1133                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1134            let projection_declared = projection.as_ref().map(|projection_name| {
1135                declared_projections
1136                    .iter()
1137                    .any(|projection| projection.name == *projection_name)
1138            });
1139            let projection_operational = projection.as_ref().map(|projection_name| {
1140                operational_projections
1141                    .iter()
1142                    .any(|projection| projection.name == *projection_name)
1143            });
1144            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1145                declared_projections
1146                    .iter()
1147                    .find(|projection| projection.name == *projection_name)
1148                    .map(|projection| projection.state.as_str())
1149                    .or_else(|| {
1150                        operational_projections
1151                            .iter()
1152                            .find(|projection| projection.name == *projection_name)
1153                            .map(|projection| projection.state.as_str())
1154                    })
1155            });
1156            let dependency_in_sync = projection.as_ref().map(|_| {
1157                matches!(projection_lifecycle, Some("materialized"))
1158                    && projection_operational == Some(true)
1159            });
1160            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1161                (true, false, _) => "declared",
1162                (true, true, _)
1163                    if matches!(
1164                        projection_lifecycle,
1165                        Some("stale" | "failed" | "materializing" | "declared")
1166                    ) =>
1167                {
1168                    "stale"
1169                }
1170                (true, true, "completed") => "completed",
1171                (true, true, "running") => "running",
1172                (true, true, "failed") => "failed",
1173                (true, true, "queued") => "queued",
1174                (true, true, "stale") => "stale",
1175                (true, true, _) => "materialized",
1176                (false, true, _) => "orphaned-operational",
1177                (false, false, _) => "unknown",
1178            }
1179            .to_string();
1180            let executable = declared_present
1181                && operational_present
1182                && !matches!(state.as_str(), "failed" | "stale")
1183                && dependency_in_sync.unwrap_or(true);
1184            let requires_rerun =
1185                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1186            let mut attention_reasons = Vec::new();
1187            if declared_present != operational_present {
1188                attention_reasons.push("declaration_drift".to_string());
1189            }
1190            if matches!(state.as_str(), "failed") {
1191                attention_reasons.push("failed".to_string());
1192            }
1193            if matches!(state.as_str(), "stale") {
1194                attention_reasons.push("stale".to_string());
1195            }
1196            if dependency_in_sync == Some(false) {
1197                attention_reasons.push("dependency_drift".to_string());
1198            }
1199            if requires_rerun {
1200                attention_reasons.push("requires_rerun".to_string());
1201            }
1202            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1203                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1204                + usize::from(declared_present != operational_present)
1205                + usize::from(!executable);
1206            CatalogAnalyticsJobStatus {
1207                id,
1208                kind,
1209                projection,
1210                state: state.clone(),
1211                lifecycle_state,
1212                declared: declared_present,
1213                operational: operational_present,
1214                in_sync: declared_present == operational_present,
1215                last_run_sequence,
1216                projection_declared,
1217                projection_operational,
1218                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1219                dependency_in_sync,
1220                executable,
1221                requires_rerun,
1222                attention_score,
1223                attention_reasons,
1224            }
1225        })
1226        .collect()
1227}
1228
1229pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1230    let declared_indexes = snapshot
1231        .declared_indexes
1232        .iter()
1233        .map(|index| index.name.clone())
1234        .collect::<BTreeSet<_>>();
1235    let operational_indexes = snapshot
1236        .operational_indexes
1237        .iter()
1238        .map(|index| index.name.clone())
1239        .collect::<BTreeSet<_>>();
1240    let declared_graph_projections = snapshot
1241        .declared_graph_projections
1242        .iter()
1243        .map(|projection| projection.name.clone())
1244        .collect::<BTreeSet<_>>();
1245    let operational_graph_projections = snapshot
1246        .operational_graph_projections
1247        .iter()
1248        .map(|projection| projection.name.clone())
1249        .collect::<BTreeSet<_>>();
1250    let declared_analytics_jobs = snapshot
1251        .declared_analytics_jobs
1252        .iter()
1253        .map(|job| job.id.clone())
1254        .collect::<BTreeSet<_>>();
1255    let operational_analytics_jobs = snapshot
1256        .operational_analytics_jobs
1257        .iter()
1258        .map(|job| job.id.clone())
1259        .collect::<BTreeSet<_>>();
1260
1261    CatalogConsistencyReport {
1262        declared_index_count: declared_indexes.len(),
1263        operational_index_count: operational_indexes.len(),
1264        declared_graph_projection_count: declared_graph_projections.len(),
1265        operational_graph_projection_count: operational_graph_projections.len(),
1266        declared_analytics_job_count: declared_analytics_jobs.len(),
1267        operational_analytics_job_count: operational_analytics_jobs.len(),
1268        missing_operational_indexes: declared_indexes
1269            .difference(&operational_indexes)
1270            .cloned()
1271            .collect(),
1272        undeclared_operational_indexes: operational_indexes
1273            .difference(&declared_indexes)
1274            .cloned()
1275            .collect(),
1276        missing_operational_graph_projections: declared_graph_projections
1277            .difference(&operational_graph_projections)
1278            .cloned()
1279            .collect(),
1280        undeclared_operational_graph_projections: operational_graph_projections
1281            .difference(&declared_graph_projections)
1282            .cloned()
1283            .collect(),
1284        missing_operational_analytics_jobs: declared_analytics_jobs
1285            .difference(&operational_analytics_jobs)
1286            .cloned()
1287            .collect(),
1288        undeclared_operational_analytics_jobs: operational_analytics_jobs
1289            .difference(&declared_analytics_jobs)
1290            .cloned()
1291            .collect(),
1292    }
1293}
1294
1295pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1296    CatalogAttentionSummary {
1297        collections_requiring_attention: snapshot
1298            .collections
1299            .iter()
1300            .filter(|collection| collection.attention_required)
1301            .count(),
1302        indexes_requiring_attention: snapshot
1303            .index_statuses
1304            .iter()
1305            .filter(|status| status.attention_score > 0)
1306            .count(),
1307        graph_projections_requiring_attention: snapshot
1308            .graph_projection_statuses
1309            .iter()
1310            .filter(|status| status.attention_score > 0)
1311            .count(),
1312        analytics_jobs_requiring_attention: snapshot
1313            .analytics_job_statuses
1314            .iter()
1315            .filter(|status| status.attention_score > 0)
1316            .count(),
1317        top_collection: snapshot
1318            .collections
1319            .iter()
1320            .filter(|collection| collection.attention_score > 0)
1321            .max_by_key(|collection| collection.attention_score)
1322            .cloned(),
1323        top_index: snapshot
1324            .index_statuses
1325            .iter()
1326            .filter(|status| status.attention_score > 0)
1327            .max_by_key(|status| status.attention_score)
1328            .cloned(),
1329        top_graph_projection: snapshot
1330            .graph_projection_statuses
1331            .iter()
1332            .filter(|status| status.attention_score > 0)
1333            .max_by_key(|status| status.attention_score)
1334            .cloned(),
1335        top_analytics_job: snapshot
1336            .analytics_job_statuses
1337            .iter()
1338            .filter(|status| status.attention_score > 0)
1339            .max_by_key(|status| status.attention_score)
1340            .cloned(),
1341    }
1342}
1343
1344pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1345    let mut collections = snapshot
1346        .collections
1347        .iter()
1348        .filter(|collection| collection.attention_required)
1349        .cloned()
1350        .collect::<Vec<_>>();
1351    collections.sort_by(|left, right| {
1352        right
1353            .attention_score
1354            .cmp(&left.attention_score)
1355            .then_with(|| left.name.cmp(&right.name))
1356    });
1357    collections
1358}
1359
1360pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1361    let mut statuses = snapshot
1362        .index_statuses
1363        .iter()
1364        .filter(|status| status.attention_score > 0)
1365        .cloned()
1366        .collect::<Vec<_>>();
1367    statuses.sort_by(|left, right| {
1368        right
1369            .attention_score
1370            .cmp(&left.attention_score)
1371            .then_with(|| left.name.cmp(&right.name))
1372    });
1373    statuses
1374}
1375
1376pub fn graph_projection_attention(
1377    snapshot: &CatalogModelSnapshot,
1378) -> Vec<CatalogGraphProjectionStatus> {
1379    let mut statuses = snapshot
1380        .graph_projection_statuses
1381        .iter()
1382        .filter(|status| status.attention_score > 0)
1383        .cloned()
1384        .collect::<Vec<_>>();
1385    statuses.sort_by(|left, right| {
1386        right
1387            .attention_score
1388            .cmp(&left.attention_score)
1389            .then_with(|| left.name.cmp(&right.name))
1390    });
1391    statuses
1392}
1393
1394pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1395    let mut statuses = snapshot
1396        .analytics_job_statuses
1397        .iter()
1398        .filter(|status| status.attention_score > 0)
1399        .cloned()
1400        .collect::<Vec<_>>();
1401    statuses.sort_by(|left, right| {
1402        right
1403            .attention_score
1404            .cmp(&left.attention_score)
1405            .then_with(|| left.id.cmp(&right.id))
1406    });
1407    statuses
1408}