Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Hll,
23    Sketch,
24    Filter,
25    Kv,
26    Config,
27    Vault,
28    Mixed,
29    TimeSeries,
30    Queue,
31    Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36    Strict,
37    SemiStructured,
38    Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43    Insert,
44    Update,
45    Delete,
46}
47
48impl SubscriptionOperation {
49    pub fn as_str(self) -> &'static str {
50        match self {
51            Self::Insert => "INSERT",
52            Self::Update => "UPDATE",
53            Self::Delete => "DELETE",
54        }
55    }
56
57    pub fn from_str(value: &str) -> Option<Self> {
58        match value.to_ascii_uppercase().as_str() {
59            "INSERT" => Some(Self::Insert),
60            "UPDATE" => Some(Self::Update),
61            "DELETE" => Some(Self::Delete),
62            _ => None,
63        }
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
70    pub name: String,
71    pub source: String,
72    pub target_queue: String,
73    pub ops_filter: Vec<SubscriptionOperation>,
74    pub where_filter: Option<String>,
75    pub redact_fields: Vec<String>,
76    pub enabled: bool,
77    /// When true, events are routed to the bare `target_queue` regardless of
78    /// the current tenant — a cluster-wide subscription. When false (default),
79    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
80    /// context is active, enforcing per-tenant isolation.
81    pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct CollectionDescriptor {
86    pub name: String,
87    pub model: CollectionModel,
88    pub schema_mode: SchemaMode,
89    pub contract_present: bool,
90    pub contract_origin: Option<crate::physical::ContractOrigin>,
91    pub declared_model: Option<CollectionModel>,
92    pub observed_model: CollectionModel,
93    pub queue_mode: Option<QueueMode>,
94    /// MAX_ATTEMPTS per-queue policy. Hot-fields tier — populated for
95    /// `model = queue` from the catalog snapshot for sub-ms reads by
96    /// `QueueLifecycle`. `None` for non-queue collections.
97    pub queue_max_attempts: Option<u32>,
98    /// LOCK_DEADLINE_MS per-queue policy. Hot-fields tier.
99    pub queue_lock_deadline_ms: Option<u64>,
100    /// IN_FLIGHT_CAP_PER_GROUP per-queue policy. Hot-fields tier.
101    pub queue_in_flight_cap_per_group: Option<u32>,
102    /// DLQ target collection name. `None` means on-max drop.
103    pub queue_dlq_target: Option<String>,
104    pub vector_dimension: Option<usize>,
105    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
106    /// `WITH SESSION_KEY <col>` declared on the timeseries collection.
107    /// `None` for non-timeseries and for timeseries without the clause.
108    /// Issue #576 slice 1.
109    pub session_key: Option<String>,
110    /// `SESSION_GAP <duration>` in milliseconds declared on the
111    /// timeseries collection. Issue #576 slice 1.
112    pub session_gap_ms: Option<u64>,
113    /// Declarative retention duration in milliseconds. `None` means no
114    /// retention policy is enforced. Set via `ALTER COLLECTION ... SET
115    /// RETENTION <duration>`. Issue #580 slice 1.
116    pub retention_duration_ms: Option<u64>,
117    pub declared_schema_mode: Option<SchemaMode>,
118    pub observed_schema_mode: SchemaMode,
119    pub entities: usize,
120    pub cross_refs: usize,
121    pub segments: usize,
122    pub indices: Vec<String>,
123    pub declared_indices: Vec<String>,
124    pub operational_indices: Vec<String>,
125    pub indexes_in_sync: bool,
126    pub missing_operational_indices: Vec<String>,
127    pub undeclared_operational_indices: Vec<String>,
128    pub queryable_index_count: usize,
129    pub indexes_requiring_rebuild_count: usize,
130    pub queryable_graph_projection_count: usize,
131    pub graph_projections_requiring_rematerialization_count: usize,
132    pub executable_analytics_job_count: usize,
133    pub analytics_jobs_requiring_rerun_count: usize,
134    pub subscriptions: Vec<SubscriptionDescriptor>,
135    pub resources_in_sync: bool,
136    pub attention_required: bool,
137    pub attention_score: usize,
138    pub attention_reasons: Vec<String>,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogModelSnapshot {
143    pub summary: CatalogSnapshot,
144    pub collections: Vec<CollectionDescriptor>,
145    pub indices: IndexCatalogSnapshot,
146    pub declared_indexes: Vec<PhysicalIndexState>,
147    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
148    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
149    pub operational_indexes: Vec<PhysicalIndexState>,
150    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
151    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
152    pub index_statuses: Vec<CatalogIndexStatus>,
153    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
154    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
155    pub queryable_index_count: usize,
156    pub indexes_requiring_rebuild_count: usize,
157    pub queryable_graph_projection_count: usize,
158    pub graph_projections_requiring_rematerialization_count: usize,
159    pub executable_analytics_job_count: usize,
160    pub analytics_jobs_requiring_rerun_count: usize,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogAttentionSummary {
165    pub collections_requiring_attention: usize,
166    pub indexes_requiring_attention: usize,
167    pub graph_projections_requiring_attention: usize,
168    pub analytics_jobs_requiring_attention: usize,
169    pub top_collection: Option<CollectionDescriptor>,
170    pub top_index: Option<CatalogIndexStatus>,
171    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
172    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
173}
174
175#[derive(Debug, Clone, Default)]
176pub struct CatalogDeclarations {
177    pub declared_indexes: Vec<PhysicalIndexState>,
178    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
179    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
180    pub operational_indexes: Vec<PhysicalIndexState>,
181    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
182    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
183}
184
185#[derive(Debug, Clone)]
186pub struct CatalogGraphProjectionStatus {
187    pub name: String,
188    pub source: Option<String>,
189    pub lifecycle_state: String,
190    pub declared: bool,
191    pub operational: bool,
192    pub in_sync: bool,
193    pub last_materialized_sequence: Option<u64>,
194    pub queryable: bool,
195    pub requires_rematerialization: bool,
196    pub dependent_job_count: usize,
197    pub active_dependent_job_count: usize,
198    pub stale_dependent_job_count: usize,
199    pub dependent_jobs_in_sync: bool,
200    pub rerun_required: bool,
201    pub attention_score: usize,
202    pub attention_reasons: Vec<String>,
203}
204
205#[derive(Debug, Clone)]
206pub struct CatalogIndexStatus {
207    pub name: String,
208    pub collection: Option<String>,
209    pub kind: String,
210    pub declared: bool,
211    pub operational: bool,
212    pub enabled: bool,
213    pub build_state: Option<String>,
214    pub artifact_state: crate::physical::ArtifactState,
215    pub lifecycle_state: String,
216    pub in_sync: bool,
217    pub queryable: bool,
218    pub requires_rebuild: bool,
219    pub attention_score: usize,
220    pub attention_reasons: Vec<String>,
221}
222
223#[derive(Debug, Clone)]
224pub struct CatalogAnalyticsJobStatus {
225    pub id: String,
226    pub kind: String,
227    pub projection: Option<String>,
228    pub state: String,
229    pub lifecycle_state: String,
230    pub declared: bool,
231    pub operational: bool,
232    pub in_sync: bool,
233    pub last_run_sequence: Option<u64>,
234    pub projection_declared: Option<bool>,
235    pub projection_operational: Option<bool>,
236    pub projection_lifecycle_state: Option<String>,
237    pub dependency_in_sync: Option<bool>,
238    pub executable: bool,
239    pub requires_rerun: bool,
240    pub attention_score: usize,
241    pub attention_reasons: Vec<String>,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct CatalogConsistencyReport {
246    pub declared_index_count: usize,
247    pub operational_index_count: usize,
248    pub declared_graph_projection_count: usize,
249    pub operational_graph_projection_count: usize,
250    pub declared_analytics_job_count: usize,
251    pub operational_analytics_job_count: usize,
252    pub missing_operational_indexes: Vec<String>,
253    pub undeclared_operational_indexes: Vec<String>,
254    pub missing_operational_graph_projections: Vec<String>,
255    pub undeclared_operational_graph_projections: Vec<String>,
256    pub missing_operational_analytics_jobs: Vec<String>,
257    pub undeclared_operational_analytics_jobs: Vec<String>,
258}
259
260pub fn snapshot_store(
261    name: &str,
262    store: &UnifiedStore,
263    index_catalog: Option<&IndexCatalog>,
264) -> CatalogModelSnapshot {
265    snapshot_store_with_declarations(name, store, index_catalog, None, None)
266}
267
268pub fn snapshot_store_with_declarations(
269    name: &str,
270    store: &UnifiedStore,
271    index_catalog: Option<&IndexCatalog>,
272    declarations: Option<&CatalogDeclarations>,
273    contracts: Option<&[CollectionContract]>,
274) -> CatalogModelSnapshot {
275    let index_statuses = index_statuses(declarations);
276    let graph_projection_statuses = graph_projection_statuses(declarations);
277    let analytics_job_statuses = analytics_job_statuses(declarations);
278
279    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
280    for (collection, entity) in store.query_all(|_| true) {
281        grouped.entry(collection).or_default().push(entity);
282    }
283    let queue_policies = queue_policies_from_grouped(&grouped);
284
285    let mut stats_by_collection = BTreeMap::new();
286    let mut collections = Vec::new();
287
288    for collection_name in store.list_collections() {
289        let entities = grouped.remove(&collection_name).unwrap_or_default();
290        let inferred_model = infer_model(&entities);
291        let inferred_schema_mode = infer_schema_mode(inferred_model);
292        let contract = collection_contract(&collection_name, contracts);
293        let model = contract
294            .map(|contract| contract.declared_model)
295            .unwrap_or(inferred_model);
296        let cross_refs = entities
297            .iter()
298            .map(|entity| entity.cross_refs().len())
299            .sum();
300        let entity_count = entities.len();
301        let manager_stats = store
302            .get_collection(&collection_name)
303            .map(|manager| manager.stats());
304        let segments = manager_stats
305            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
306            .unwrap_or(0);
307
308        stats_by_collection.insert(
309            collection_name.clone(),
310            CollectionStats {
311                entities: entity_count,
312                cross_refs,
313                segments,
314            },
315        );
316
317        let declared_indices = declared_indices(&collection_name, declarations);
318        let operational_indices =
319            operational_indices(&collection_name, index_catalog, declarations);
320        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
321            collection_index_consistency(&declared_indices, &operational_indices);
322        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
323            collection_index_readiness(&collection_name, &index_statuses);
324        let (
325            queryable_graph_projection_count,
326            graph_projections_requiring_rematerialization_count,
327            graph_projections_locally_in_sync,
328        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
329        let (
330            executable_analytics_job_count,
331            analytics_jobs_requiring_rerun_count,
332            analytics_jobs_locally_in_sync,
333        ) = collection_analytics_job_readiness(
334            &collection_name,
335            &graph_projection_statuses,
336            &analytics_job_statuses,
337        );
338        let resources_in_sync = indexes_in_sync
339            && indexes_locally_in_sync
340            && graph_projections_locally_in_sync
341            && analytics_jobs_locally_in_sync;
342        let attention_required = !resources_in_sync
343            || indexes_requiring_rebuild_count > 0
344            || graph_projections_requiring_rematerialization_count > 0
345            || analytics_jobs_requiring_rerun_count > 0;
346        let mut attention_reasons = Vec::new();
347        if !indexes_in_sync || !indexes_locally_in_sync {
348            attention_reasons.push("index_drift".to_string());
349        }
350        if indexes_requiring_rebuild_count > 0 {
351            attention_reasons.push("indexes_require_rebuild".to_string());
352        }
353        if !graph_projections_locally_in_sync {
354            attention_reasons.push("graph_projection_drift".to_string());
355        }
356        if graph_projections_requiring_rematerialization_count > 0 {
357            attention_reasons.push("graph_projections_require_rematerialization".to_string());
358        }
359        if !analytics_jobs_locally_in_sync {
360            attention_reasons.push("analytics_job_drift".to_string());
361        }
362        if analytics_jobs_requiring_rerun_count > 0 {
363            attention_reasons.push("analytics_jobs_require_rerun".to_string());
364        }
365        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
366            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
367            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
368            + usize::from(!resources_in_sync);
369
370        collections.push(CollectionDescriptor {
371            name: collection_name.clone(),
372            model,
373            schema_mode: contract
374                .map(|contract| contract.schema_mode)
375                .unwrap_or(inferred_schema_mode),
376            contract_present: contract.is_some(),
377            contract_origin: contract.map(|contract| contract.origin),
378            declared_model: contract.map(|contract| contract.declared_model),
379            observed_model: inferred_model,
380            queue_mode: if model == CollectionModel::Queue {
381                Some(
382                    queue_policies
383                        .get(&collection_name)
384                        .map(|p| p.mode)
385                        .unwrap_or_default(),
386                )
387            } else {
388                None
389            },
390            queue_max_attempts: if model == CollectionModel::Queue {
391                Some(
392                    queue_policies
393                        .get(&collection_name)
394                        .map(|p| p.max_attempts)
395                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
396                )
397            } else {
398                None
399            },
400            queue_lock_deadline_ms: if model == CollectionModel::Queue {
401                Some(
402                    queue_policies
403                        .get(&collection_name)
404                        .map(|p| p.lock_deadline_ms)
405                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
406                )
407            } else {
408                None
409            },
410            queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
411                Some(
412                    queue_policies
413                        .get(&collection_name)
414                        .map(|p| p.in_flight_cap_per_group)
415                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
416                )
417            } else {
418                None
419            },
420            queue_dlq_target: if model == CollectionModel::Queue {
421                queue_policies
422                    .get(&collection_name)
423                    .and_then(|p| p.dlq_target.clone())
424            } else {
425                None
426            },
427            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
428            vector_metric: contract.and_then(|contract| contract.vector_metric),
429            session_key: contract.and_then(|contract| contract.session_key.clone()),
430            session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
431            retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
432            declared_schema_mode: contract.map(|contract| contract.schema_mode),
433            observed_schema_mode: inferred_schema_mode,
434            entities: entity_count,
435            cross_refs,
436            segments,
437            indices: infer_indices(&collection_name, model, index_catalog, declarations),
438            declared_indices,
439            operational_indices,
440            indexes_in_sync,
441            missing_operational_indices,
442            undeclared_operational_indices,
443            queryable_index_count,
444            indexes_requiring_rebuild_count,
445            queryable_graph_projection_count,
446            graph_projections_requiring_rematerialization_count,
447            executable_analytics_job_count,
448            analytics_jobs_requiring_rerun_count,
449            subscriptions: contract
450                .map(|contract| contract.subscriptions.clone())
451                .unwrap_or_default(),
452            resources_in_sync,
453            attention_required,
454            attention_score,
455            attention_reasons,
456        });
457    }
458
459    collections.sort_by(|left, right| left.name.cmp(&right.name));
460
461    let summary = CatalogSnapshot {
462        name: name.to_string(),
463        total_entities: stats_by_collection
464            .values()
465            .map(|stats| stats.entities)
466            .sum(),
467        total_collections: stats_by_collection.len(),
468        stats_by_collection,
469        updated_at: SystemTime::now(),
470    };
471
472    CatalogModelSnapshot {
473        summary,
474        collections,
475        indices: index_catalog
476            .map(IndexCatalog::snapshot)
477            .unwrap_or_default(),
478        declared_indexes: declarations
479            .map(|declarations| declarations.declared_indexes.clone())
480            .unwrap_or_default(),
481        declared_graph_projections: declarations
482            .map(|declarations| declarations.declared_graph_projections.clone())
483            .unwrap_or_default(),
484        declared_analytics_jobs: declarations
485            .map(|declarations| declarations.declared_analytics_jobs.clone())
486            .unwrap_or_default(),
487        operational_indexes: declarations
488            .map(|declarations| declarations.operational_indexes.clone())
489            .unwrap_or_default(),
490        operational_graph_projections: declarations
491            .map(|declarations| declarations.operational_graph_projections.clone())
492            .unwrap_or_default(),
493        operational_analytics_jobs: declarations
494            .map(|declarations| declarations.operational_analytics_jobs.clone())
495            .unwrap_or_default(),
496        queryable_index_count: index_statuses
497            .iter()
498            .filter(|status| status.queryable)
499            .count(),
500        indexes_requiring_rebuild_count: index_statuses
501            .iter()
502            .filter(|status| status.requires_rebuild)
503            .count(),
504        queryable_graph_projection_count: graph_projection_statuses
505            .iter()
506            .filter(|status| status.queryable)
507            .count(),
508        graph_projections_requiring_rematerialization_count: graph_projection_statuses
509            .iter()
510            .filter(|status| status.requires_rematerialization)
511            .count(),
512        executable_analytics_job_count: analytics_job_statuses
513            .iter()
514            .filter(|status| status.executable)
515            .count(),
516        analytics_jobs_requiring_rerun_count: analytics_job_statuses
517            .iter()
518            .filter(|status| status.requires_rerun)
519            .count(),
520        index_statuses,
521        graph_projection_statuses,
522        analytics_job_statuses,
523    }
524}
525
526/// Hot-fields snapshot of per-queue policy, materialised at catalog
527/// snapshot time so `QueueLifecycle` (and the descriptor) can read
528/// values without re-scanning `red_queue_meta`.
529#[derive(Debug, Clone)]
530pub struct QueuePolicySnapshot {
531    pub mode: QueueMode,
532    pub max_attempts: u32,
533    pub lock_deadline_ms: u64,
534    pub in_flight_cap_per_group: u32,
535    pub dlq_target: Option<String>,
536}
537
538fn queue_policies_from_grouped(
539    grouped: &HashMap<String, Vec<UnifiedEntity>>,
540) -> HashMap<String, QueuePolicySnapshot> {
541    let Some(meta) = grouped.get("red_queue_meta") else {
542        return HashMap::new();
543    };
544
545    let mut policies = HashMap::new();
546    for entity in meta {
547        let Some(row) = entity.data.as_row() else {
548            continue;
549        };
550        if row_text(row, "kind").as_deref() != Some("queue_config") {
551            continue;
552        }
553        let Some(queue) = row_text(row, "queue") else {
554            continue;
555        };
556        let mode = row_text(row, "mode")
557            .as_deref()
558            .and_then(QueueMode::parse)
559            .unwrap_or_default();
560        let max_attempts = row_u64(row, "max_attempts")
561            .map(|v| v as u32)
562            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
563        let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
564            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
565        let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
566            .map(|v| v as u32)
567            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
568        let dlq_target = row_text(row, "dlq");
569        policies.insert(
570            queue,
571            QueuePolicySnapshot {
572                mode,
573                max_attempts,
574                lock_deadline_ms,
575                in_flight_cap_per_group,
576                dlq_target,
577            },
578        );
579    }
580    policies
581}
582
583fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
584    match row.get_field(key) {
585        Some(Value::UnsignedInteger(v)) => Some(*v),
586        Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
587        _ => None,
588    }
589}
590
591fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
592    match row.get_field(key) {
593        Some(Value::Text(value)) => Some(value.to_string()),
594        _ => None,
595    }
596}
597
598fn collection_contract<'a>(
599    collection_name: &str,
600    contracts: Option<&'a [CollectionContract]>,
601) -> Option<&'a CollectionContract> {
602    contracts.and_then(|contracts| {
603        contracts
604            .iter()
605            .find(|contract| contract.name == collection_name)
606    })
607}
608
609fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
610    let mut has_table = false;
611    let mut has_graph = false;
612    let mut has_vector = false;
613
614    for entity in entities {
615        match &entity.kind {
616            EntityKind::TableRow { .. } => has_table = true,
617            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
618            EntityKind::Vector { .. } => has_vector = true,
619            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
620        }
621    }
622
623    match (has_table, has_graph, has_vector) {
624        (true, false, false) => CollectionModel::Table,
625        (false, true, false) => CollectionModel::Graph,
626        (false, false, true) => CollectionModel::Vector,
627        _ => CollectionModel::Mixed,
628    }
629}
630
631fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
632    match model {
633        CollectionModel::Table => SchemaMode::Strict,
634        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
635        CollectionModel::Document
636        | CollectionModel::Hll
637        | CollectionModel::Sketch
638        | CollectionModel::Filter
639        | CollectionModel::Kv
640        | CollectionModel::Config
641        | CollectionModel::Vault
642        | CollectionModel::Mixed => SchemaMode::Dynamic,
643        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
644        CollectionModel::Queue => SchemaMode::Dynamic,
645    }
646}
647
648fn infer_indices(
649    collection_name: &str,
650    model: CollectionModel,
651    index_catalog: Option<&IndexCatalog>,
652    declarations: Option<&CatalogDeclarations>,
653) -> Vec<String> {
654    let declared = declared_indices(collection_name, declarations);
655    if !declared.is_empty() {
656        return declared;
657    }
658
659    let available = index_catalog
660        .map(IndexCatalog::snapshot)
661        .unwrap_or_default();
662    let mut selected = Vec::new();
663
664    for metric in available {
665        let relevant = matches!(
666            (model, metric.kind),
667            (_, IndexKind::BTree)
668                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
669                | (CollectionModel::Vector, IndexKind::VectorHnsw)
670                | (CollectionModel::Vector, IndexKind::VectorInverted)
671                | (CollectionModel::Vector, IndexKind::VectorTurbo)
672                | (CollectionModel::Document, IndexKind::FullText)
673                | (CollectionModel::Document, IndexKind::DocumentPathValue)
674                | (CollectionModel::Kv, IndexKind::Hash)
675                | (CollectionModel::Config, IndexKind::Hash)
676                | (CollectionModel::Vault, IndexKind::Hash)
677                | (CollectionModel::Mixed, _)
678        );
679
680        if relevant && metric.enabled {
681            selected.push(metric.name);
682        }
683    }
684
685    selected
686}
687
688fn declared_indices(
689    collection_name: &str,
690    declarations: Option<&CatalogDeclarations>,
691) -> Vec<String> {
692    let mut selected = declarations
693        .map(|declarations| {
694            declarations
695                .declared_indexes
696                .iter()
697                .filter(|index| index.collection.as_deref() == Some(collection_name))
698                .map(|index| index.name.clone())
699                .collect::<Vec<_>>()
700        })
701        .unwrap_or_default();
702    selected.sort();
703    selected.dedup();
704    selected
705}
706
707fn operational_indices(
708    collection_name: &str,
709    index_catalog: Option<&IndexCatalog>,
710    declarations: Option<&CatalogDeclarations>,
711) -> Vec<String> {
712    if let Some(declarations) = declarations {
713        let mut selected = declarations
714            .operational_indexes
715            .iter()
716            .filter(|index| index.collection.as_deref() == Some(collection_name))
717            .map(|index| index.name.clone())
718            .collect::<Vec<_>>();
719        selected.sort();
720        selected.dedup();
721        return selected;
722    }
723
724    let mut selected = index_catalog
725        .map(IndexCatalog::snapshot)
726        .unwrap_or_default()
727        .into_iter()
728        .filter(|metric| metric.enabled)
729        .filter_map(|metric| {
730            if metric.name.starts_with(collection_name) {
731                Some(metric.name)
732            } else {
733                None
734            }
735        })
736        .collect::<Vec<_>>();
737    selected.sort();
738    selected.dedup();
739    selected
740}
741
742fn collection_index_consistency(
743    declared_indices: &[String],
744    operational_indices: &[String],
745) -> (bool, Vec<String>, Vec<String>) {
746    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
747    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
748    let missing_operational_indices = declared
749        .difference(&operational)
750        .cloned()
751        .collect::<Vec<_>>();
752    let undeclared_operational_indices = operational
753        .difference(&declared)
754        .cloned()
755        .collect::<Vec<_>>();
756    (
757        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
758        missing_operational_indices,
759        undeclared_operational_indices,
760    )
761}
762
763fn collection_index_readiness(
764    collection_name: &str,
765    statuses: &[CatalogIndexStatus],
766) -> (usize, usize, bool) {
767    let local = statuses
768        .iter()
769        .filter(|status| status.collection.as_deref() == Some(collection_name))
770        .collect::<Vec<_>>();
771    let queryable_count = local.iter().filter(|status| status.queryable).count();
772    let requires_rebuild_count = local
773        .iter()
774        .filter(|status| status.requires_rebuild)
775        .count();
776    let locally_in_sync = local.iter().all(|status| status.in_sync);
777    (queryable_count, requires_rebuild_count, locally_in_sync)
778}
779
780fn collection_graph_projection_readiness(
781    collection_name: &str,
782    statuses: &[CatalogGraphProjectionStatus],
783) -> (usize, usize, bool) {
784    let local = statuses
785        .iter()
786        .filter(|status| status.source.as_deref() == Some(collection_name))
787        .collect::<Vec<_>>();
788    let queryable_count = local.iter().filter(|status| status.queryable).count();
789    let requires_rematerialization_count = local
790        .iter()
791        .filter(|status| status.requires_rematerialization)
792        .count();
793    let locally_in_sync = local
794        .iter()
795        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
796    (
797        queryable_count,
798        requires_rematerialization_count,
799        locally_in_sync,
800    )
801}
802
803fn collection_analytics_job_readiness(
804    collection_name: &str,
805    projection_statuses: &[CatalogGraphProjectionStatus],
806    job_statuses: &[CatalogAnalyticsJobStatus],
807) -> (usize, usize, bool) {
808    let local = job_statuses
809        .iter()
810        .filter(|status| {
811            let Some(projection_name) = status.projection.as_deref() else {
812                return false;
813            };
814            projection_statuses
815                .iter()
816                .find(|projection| projection.name == projection_name)
817                .and_then(|projection| projection.source.as_deref())
818                == Some(collection_name)
819        })
820        .collect::<Vec<_>>();
821    let executable_count = local.iter().filter(|status| status.executable).count();
822    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
823    let locally_in_sync = local
824        .iter()
825        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
826    (executable_count, requires_rerun_count, locally_in_sync)
827}
828
829fn graph_projection_statuses(
830    declarations: Option<&CatalogDeclarations>,
831) -> Vec<CatalogGraphProjectionStatus> {
832    let declared = declarations
833        .map(|declarations| declarations.declared_graph_projections.as_slice())
834        .unwrap_or(&[]);
835    let operational = declarations
836        .map(|declarations| declarations.operational_graph_projections.as_slice())
837        .unwrap_or(&[]);
838    let declared_jobs = declarations
839        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
840        .unwrap_or(&[]);
841    let operational_jobs = declarations
842        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
843        .unwrap_or(&[]);
844
845    let mut names = declared
846        .iter()
847        .map(|projection| projection.name.clone())
848        .chain(operational.iter().map(|projection| projection.name.clone()))
849        .collect::<Vec<_>>();
850    names.sort();
851    names.dedup();
852
853    names
854        .into_iter()
855        .map(|name| {
856            let declared_projection = declared.iter().find(|projection| projection.name == name);
857            let operational_projection = operational
858                .iter()
859                .find(|projection| projection.name == name);
860            let declared_present = declared_projection.is_some();
861            let operational_present = operational_projection.is_some();
862            let mut dependent_job_ids = BTreeSet::new();
863            let mut dependent_job_count = 0usize;
864            let mut active_dependent_job_count = 0usize;
865            let mut stale_dependent_job_count = 0usize;
866            for job in declared_jobs
867                .iter()
868                .chain(operational_jobs.iter())
869                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
870            {
871                if !dependent_job_ids.insert(job.id.clone()) {
872                    continue;
873                }
874                dependent_job_count += 1;
875                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
876                    active_dependent_job_count += 1;
877                }
878                if job.state == "stale" {
879                    stale_dependent_job_count += 1;
880                }
881            }
882            let lifecycle_state = match (
883                declared_present,
884                operational_present,
885                declared_projection
886                    .map(|projection| projection.state.as_str())
887                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
888                    .unwrap_or_default(),
889            ) {
890                (true, _, "failed") => "failed",
891                (true, true, "stale") => "stale",
892                (true, _, "materializing") => "materializing",
893                (true, true, "materialized") => "materialized",
894                (true, true, _) => "materialized",
895                (false, true, _) => "orphaned-operational",
896                (true, false, _) => "declared",
897                (false, false, _) => "unknown",
898            }
899            .to_string();
900            let queryable = declared_present
901                && operational_present
902                && matches!(
903                    declared_projection
904                        .map(|projection| projection.state.as_str())
905                        .or_else(
906                            || operational_projection.map(|projection| projection.state.as_str())
907                        )
908                        .unwrap_or_default(),
909                    "materialized"
910                );
911            let requires_rematerialization = matches!(
912                declared_projection
913                    .map(|projection| projection.state.as_str())
914                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
915                    .unwrap_or_default(),
916                "declared" | "materializing" | "failed" | "stale"
917            );
918            let mut attention_reasons = Vec::new();
919            if !declared_present || !operational_present {
920                attention_reasons.push("declaration_drift".to_string());
921            }
922            if requires_rematerialization {
923                attention_reasons.push("requires_rematerialization".to_string());
924            }
925            if stale_dependent_job_count > 0 {
926                attention_reasons.push("dependent_jobs_stale".to_string());
927            }
928            let attention_score = stale_dependent_job_count.saturating_mul(2)
929                + usize::from(requires_rematerialization).saturating_mul(4)
930                + usize::from(!declared_present || !operational_present)
931                + usize::from(!queryable);
932            CatalogGraphProjectionStatus {
933                name,
934                source: declared_projection
935                    .map(|projection| projection.source.clone())
936                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
937                lifecycle_state,
938                declared: declared_present,
939                operational: operational_present,
940                in_sync: declared_present == operational_present,
941                last_materialized_sequence: declared_projection
942                    .and_then(|projection| projection.last_materialized_sequence)
943                    .or_else(|| {
944                        operational_projection
945                            .and_then(|projection| projection.last_materialized_sequence)
946                    }),
947                queryable,
948                requires_rematerialization,
949                dependent_job_count,
950                active_dependent_job_count,
951                stale_dependent_job_count,
952                dependent_jobs_in_sync: stale_dependent_job_count == 0,
953                rerun_required: stale_dependent_job_count > 0,
954                attention_score,
955                attention_reasons,
956            }
957        })
958        .collect()
959}
960
961fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
962    let declared = declarations
963        .map(|declarations| declarations.declared_indexes.as_slice())
964        .unwrap_or(&[]);
965    let operational = declarations
966        .map(|declarations| declarations.operational_indexes.as_slice())
967        .unwrap_or(&[]);
968
969    let mut names = declared
970        .iter()
971        .map(|index| index.name.clone())
972        .chain(operational.iter().map(|index| index.name.clone()))
973        .collect::<Vec<_>>();
974    names.sort();
975    names.dedup();
976
977    names
978        .into_iter()
979        .map(|name| {
980            let declared_index = declared.iter().find(|index| index.name == name);
981            let operational_index = operational.iter().find(|index| index.name == name);
982            let collection = declared_index
983                .and_then(|index| index.collection.clone())
984                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
985            let kind = declared_index
986                .map(|index| index_kind_string(index.kind))
987                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
988                .unwrap_or_default();
989            let enabled = declared_index
990                .map(|index| index.enabled)
991                .or_else(|| operational_index.map(|index| index.enabled))
992                .unwrap_or(false);
993            let build_state = operational_index
994                .map(|index| index.build_state.clone())
995                .or_else(|| declared_index.map(|index| index.build_state.clone()));
996            let declared_present = declared_index.is_some();
997            let operational_present = operational_index.is_some();
998            let lifecycle_state = index_lifecycle_state(
999                declared_present,
1000                operational_present,
1001                enabled,
1002                build_state.as_deref(),
1003            );
1004
1005            let mut attention_reasons = Vec::new();
1006            if declared_present != operational_present {
1007                attention_reasons.push("declaration_drift".to_string());
1008            }
1009            if !enabled && declared_present {
1010                attention_reasons.push("disabled".to_string());
1011            }
1012            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1013                attention_reasons.push("failed".to_string());
1014            }
1015            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1016                attention_reasons.push("stale".to_string());
1017            }
1018            if matches!(
1019                build_state.as_deref().unwrap_or_default(),
1020                "building" | "declared-unbuilt"
1021            ) {
1022                attention_reasons.push("requires_rebuild".to_string());
1023            }
1024            let queryable = declared_present
1025                && operational_present
1026                && enabled
1027                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1028            let requires_rebuild = matches!(
1029                build_state.as_deref().unwrap_or_default(),
1030                "declared-unbuilt" | "building" | "stale" | "failed"
1031            );
1032            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1033                + usize::from(!enabled && declared_present)
1034                + usize::from(declared_present != operational_present)
1035                + usize::from(!queryable);
1036
1037            let artifact_state = crate::physical::ArtifactState::from_build_state(
1038                build_state.as_deref().unwrap_or("declared-unbuilt"),
1039                enabled,
1040            );
1041            CatalogIndexStatus {
1042                name,
1043                collection,
1044                kind,
1045                declared: declared_present,
1046                operational: operational_present,
1047                enabled,
1048                build_state,
1049                artifact_state,
1050                lifecycle_state,
1051                in_sync: declared_present == operational_present,
1052                queryable,
1053                requires_rebuild,
1054                attention_score,
1055                attention_reasons,
1056            }
1057        })
1058        .collect()
1059}
1060
1061fn index_lifecycle_state(
1062    declared: bool,
1063    operational: bool,
1064    enabled: bool,
1065    build_state: Option<&str>,
1066) -> String {
1067    if !declared && operational {
1068        return "orphaned-operational".to_string();
1069    }
1070    if declared && !enabled {
1071        return "disabled".to_string();
1072    }
1073    if !declared {
1074        return "unknown".to_string();
1075    }
1076    if !operational {
1077        return "declared".to_string();
1078    }
1079
1080    match build_state.unwrap_or_default() {
1081        "ready" => "ready".to_string(),
1082        "failed" => "failed".to_string(),
1083        "stale" => "stale".to_string(),
1084        "declared-unbuilt" => "declared".to_string(),
1085        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1086            "building".to_string()
1087        }
1088        _ => "building".to_string(),
1089    }
1090}
1091
1092fn index_kind_string(kind: IndexKind) -> String {
1093    match kind {
1094        IndexKind::BTree => "btree",
1095        IndexKind::Hash => "hash",
1096        IndexKind::Bitmap => "bitmap",
1097        IndexKind::Spatial => "spatial.rtree",
1098        IndexKind::VectorHnsw => "vector.hnsw",
1099        IndexKind::VectorInverted => "vector.inverted",
1100        IndexKind::VectorTurbo => "vector.turbo",
1101        IndexKind::GraphAdjacency => "graph.adjacency",
1102        IndexKind::FullText => "text.fulltext",
1103        IndexKind::DocumentPathValue => "document.pathvalue",
1104        IndexKind::HybridSearch => "search.hybrid",
1105    }
1106    .to_string()
1107}
1108
1109fn analytics_job_statuses(
1110    declarations: Option<&CatalogDeclarations>,
1111) -> Vec<CatalogAnalyticsJobStatus> {
1112    let declared = declarations
1113        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1114        .unwrap_or(&[]);
1115    let operational = declarations
1116        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1117        .unwrap_or(&[]);
1118    let declared_projections = declarations
1119        .map(|declarations| declarations.declared_graph_projections.as_slice())
1120        .unwrap_or(&[]);
1121    let operational_projections = declarations
1122        .map(|declarations| declarations.operational_graph_projections.as_slice())
1123        .unwrap_or(&[]);
1124
1125    let mut ids = declared
1126        .iter()
1127        .map(|job| job.id.clone())
1128        .chain(operational.iter().map(|job| job.id.clone()))
1129        .collect::<Vec<_>>();
1130    ids.sort();
1131    ids.dedup();
1132
1133    ids.into_iter()
1134        .map(|id| {
1135            let declared_job = declared.iter().find(|job| job.id == id);
1136            let operational_job = operational.iter().find(|job| job.id == id);
1137            let kind = declared_job
1138                .map(|job| job.kind.clone())
1139                .or_else(|| operational_job.map(|job| job.kind.clone()))
1140                .unwrap_or_default();
1141            let projection = declared_job
1142                .and_then(|job| job.projection.clone())
1143                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1144            let state = declared_job
1145                .map(|job| job.state.clone())
1146                .or_else(|| operational_job.map(|job| job.state.clone()))
1147                .unwrap_or_default();
1148            let declared_present = declared_job.is_some();
1149            let operational_present = operational_job.is_some();
1150            let last_run_sequence = declared_job
1151                .and_then(|job| job.last_run_sequence)
1152                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1153            let projection_declared = projection.as_ref().map(|projection_name| {
1154                declared_projections
1155                    .iter()
1156                    .any(|projection| projection.name == *projection_name)
1157            });
1158            let projection_operational = projection.as_ref().map(|projection_name| {
1159                operational_projections
1160                    .iter()
1161                    .any(|projection| projection.name == *projection_name)
1162            });
1163            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1164                declared_projections
1165                    .iter()
1166                    .find(|projection| projection.name == *projection_name)
1167                    .map(|projection| projection.state.as_str())
1168                    .or_else(|| {
1169                        operational_projections
1170                            .iter()
1171                            .find(|projection| projection.name == *projection_name)
1172                            .map(|projection| projection.state.as_str())
1173                    })
1174            });
1175            let dependency_in_sync = projection.as_ref().map(|_| {
1176                matches!(projection_lifecycle, Some("materialized"))
1177                    && projection_operational == Some(true)
1178            });
1179            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1180                (true, false, _) => "declared",
1181                (true, true, _)
1182                    if matches!(
1183                        projection_lifecycle,
1184                        Some("stale" | "failed" | "materializing" | "declared")
1185                    ) =>
1186                {
1187                    "stale"
1188                }
1189                (true, true, "completed") => "completed",
1190                (true, true, "running") => "running",
1191                (true, true, "failed") => "failed",
1192                (true, true, "queued") => "queued",
1193                (true, true, "stale") => "stale",
1194                (true, true, _) => "materialized",
1195                (false, true, _) => "orphaned-operational",
1196                (false, false, _) => "unknown",
1197            }
1198            .to_string();
1199            let executable = declared_present
1200                && operational_present
1201                && !matches!(state.as_str(), "failed" | "stale")
1202                && dependency_in_sync.unwrap_or(true);
1203            let requires_rerun =
1204                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1205            let mut attention_reasons = Vec::new();
1206            if declared_present != operational_present {
1207                attention_reasons.push("declaration_drift".to_string());
1208            }
1209            if matches!(state.as_str(), "failed") {
1210                attention_reasons.push("failed".to_string());
1211            }
1212            if matches!(state.as_str(), "stale") {
1213                attention_reasons.push("stale".to_string());
1214            }
1215            if dependency_in_sync == Some(false) {
1216                attention_reasons.push("dependency_drift".to_string());
1217            }
1218            if requires_rerun {
1219                attention_reasons.push("requires_rerun".to_string());
1220            }
1221            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1222                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1223                + usize::from(declared_present != operational_present)
1224                + usize::from(!executable);
1225            CatalogAnalyticsJobStatus {
1226                id,
1227                kind,
1228                projection,
1229                state: state.clone(),
1230                lifecycle_state,
1231                declared: declared_present,
1232                operational: operational_present,
1233                in_sync: declared_present == operational_present,
1234                last_run_sequence,
1235                projection_declared,
1236                projection_operational,
1237                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1238                dependency_in_sync,
1239                executable,
1240                requires_rerun,
1241                attention_score,
1242                attention_reasons,
1243            }
1244        })
1245        .collect()
1246}
1247
1248pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1249    let declared_indexes = snapshot
1250        .declared_indexes
1251        .iter()
1252        .map(|index| index.name.clone())
1253        .collect::<BTreeSet<_>>();
1254    let operational_indexes = snapshot
1255        .operational_indexes
1256        .iter()
1257        .map(|index| index.name.clone())
1258        .collect::<BTreeSet<_>>();
1259    let declared_graph_projections = snapshot
1260        .declared_graph_projections
1261        .iter()
1262        .map(|projection| projection.name.clone())
1263        .collect::<BTreeSet<_>>();
1264    let operational_graph_projections = snapshot
1265        .operational_graph_projections
1266        .iter()
1267        .map(|projection| projection.name.clone())
1268        .collect::<BTreeSet<_>>();
1269    let declared_analytics_jobs = snapshot
1270        .declared_analytics_jobs
1271        .iter()
1272        .map(|job| job.id.clone())
1273        .collect::<BTreeSet<_>>();
1274    let operational_analytics_jobs = snapshot
1275        .operational_analytics_jobs
1276        .iter()
1277        .map(|job| job.id.clone())
1278        .collect::<BTreeSet<_>>();
1279
1280    CatalogConsistencyReport {
1281        declared_index_count: declared_indexes.len(),
1282        operational_index_count: operational_indexes.len(),
1283        declared_graph_projection_count: declared_graph_projections.len(),
1284        operational_graph_projection_count: operational_graph_projections.len(),
1285        declared_analytics_job_count: declared_analytics_jobs.len(),
1286        operational_analytics_job_count: operational_analytics_jobs.len(),
1287        missing_operational_indexes: declared_indexes
1288            .difference(&operational_indexes)
1289            .cloned()
1290            .collect(),
1291        undeclared_operational_indexes: operational_indexes
1292            .difference(&declared_indexes)
1293            .cloned()
1294            .collect(),
1295        missing_operational_graph_projections: declared_graph_projections
1296            .difference(&operational_graph_projections)
1297            .cloned()
1298            .collect(),
1299        undeclared_operational_graph_projections: operational_graph_projections
1300            .difference(&declared_graph_projections)
1301            .cloned()
1302            .collect(),
1303        missing_operational_analytics_jobs: declared_analytics_jobs
1304            .difference(&operational_analytics_jobs)
1305            .cloned()
1306            .collect(),
1307        undeclared_operational_analytics_jobs: operational_analytics_jobs
1308            .difference(&declared_analytics_jobs)
1309            .cloned()
1310            .collect(),
1311    }
1312}
1313
1314pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1315    CatalogAttentionSummary {
1316        collections_requiring_attention: snapshot
1317            .collections
1318            .iter()
1319            .filter(|collection| collection.attention_required)
1320            .count(),
1321        indexes_requiring_attention: snapshot
1322            .index_statuses
1323            .iter()
1324            .filter(|status| status.attention_score > 0)
1325            .count(),
1326        graph_projections_requiring_attention: snapshot
1327            .graph_projection_statuses
1328            .iter()
1329            .filter(|status| status.attention_score > 0)
1330            .count(),
1331        analytics_jobs_requiring_attention: snapshot
1332            .analytics_job_statuses
1333            .iter()
1334            .filter(|status| status.attention_score > 0)
1335            .count(),
1336        top_collection: snapshot
1337            .collections
1338            .iter()
1339            .filter(|collection| collection.attention_score > 0)
1340            .max_by_key(|collection| collection.attention_score)
1341            .cloned(),
1342        top_index: snapshot
1343            .index_statuses
1344            .iter()
1345            .filter(|status| status.attention_score > 0)
1346            .max_by_key(|status| status.attention_score)
1347            .cloned(),
1348        top_graph_projection: snapshot
1349            .graph_projection_statuses
1350            .iter()
1351            .filter(|status| status.attention_score > 0)
1352            .max_by_key(|status| status.attention_score)
1353            .cloned(),
1354        top_analytics_job: snapshot
1355            .analytics_job_statuses
1356            .iter()
1357            .filter(|status| status.attention_score > 0)
1358            .max_by_key(|status| status.attention_score)
1359            .cloned(),
1360    }
1361}
1362
1363pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1364    let mut collections = snapshot
1365        .collections
1366        .iter()
1367        .filter(|collection| collection.attention_required)
1368        .cloned()
1369        .collect::<Vec<_>>();
1370    collections.sort_by(|left, right| {
1371        right
1372            .attention_score
1373            .cmp(&left.attention_score)
1374            .then_with(|| left.name.cmp(&right.name))
1375    });
1376    collections
1377}
1378
1379pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1380    let mut statuses = snapshot
1381        .index_statuses
1382        .iter()
1383        .filter(|status| status.attention_score > 0)
1384        .cloned()
1385        .collect::<Vec<_>>();
1386    statuses.sort_by(|left, right| {
1387        right
1388            .attention_score
1389            .cmp(&left.attention_score)
1390            .then_with(|| left.name.cmp(&right.name))
1391    });
1392    statuses
1393}
1394
1395pub fn graph_projection_attention(
1396    snapshot: &CatalogModelSnapshot,
1397) -> Vec<CatalogGraphProjectionStatus> {
1398    let mut statuses = snapshot
1399        .graph_projection_statuses
1400        .iter()
1401        .filter(|status| status.attention_score > 0)
1402        .cloned()
1403        .collect::<Vec<_>>();
1404    statuses.sort_by(|left, right| {
1405        right
1406            .attention_score
1407            .cmp(&left.attention_score)
1408            .then_with(|| left.name.cmp(&right.name))
1409    });
1410    statuses
1411}
1412
1413pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1414    let mut statuses = snapshot
1415        .analytics_job_statuses
1416        .iter()
1417        .filter(|status| status.attention_score > 0)
1418        .cloned()
1419        .collect::<Vec<_>>();
1420    statuses.sort_by(|left, right| {
1421        right
1422            .attention_score
1423            .cmp(&left.attention_score)
1424            .then_with(|| left.id.cmp(&right.id))
1425    });
1426    statuses
1427}