Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Hll,
23    Sketch,
24    Filter,
25    Kv,
26    Config,
27    Vault,
28    Mixed,
29    TimeSeries,
30    Queue,
31    Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36    Strict,
37    SemiStructured,
38    Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43    Insert,
44    Update,
45    Delete,
46}
47
48impl SubscriptionOperation {
49    pub fn as_str(self) -> &'static str {
50        match self {
51            Self::Insert => "INSERT",
52            Self::Update => "UPDATE",
53            Self::Delete => "DELETE",
54        }
55    }
56
57    pub fn from_str(value: &str) -> Option<Self> {
58        match value.to_ascii_uppercase().as_str() {
59            "INSERT" => Some(Self::Insert),
60            "UPDATE" => Some(Self::Update),
61            "DELETE" => Some(Self::Delete),
62            _ => None,
63        }
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
70    pub name: String,
71    pub source: String,
72    pub target_queue: String,
73    pub ops_filter: Vec<SubscriptionOperation>,
74    pub where_filter: Option<String>,
75    pub redact_fields: Vec<String>,
76    pub enabled: bool,
77    /// When true, events are routed to the bare `target_queue` regardless of
78    /// the current tenant — a cluster-wide subscription. When false (default),
79    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
80    /// context is active, enforcing per-tenant isolation.
81    pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct CollectionDescriptor {
86    pub name: String,
87    pub model: CollectionModel,
88    pub schema_mode: SchemaMode,
89    pub contract_present: bool,
90    pub contract_origin: Option<crate::physical::ContractOrigin>,
91    pub declared_model: Option<CollectionModel>,
92    pub observed_model: CollectionModel,
93    pub queue_mode: Option<QueueMode>,
94    /// MAX_ATTEMPTS per-queue policy. Hot-fields tier — populated for
95    /// `model = queue` from the catalog snapshot for sub-ms reads by
96    /// `QueueLifecycle`. `None` for non-queue collections.
97    pub queue_max_attempts: Option<u32>,
98    /// LOCK_DEADLINE_MS per-queue policy. Hot-fields tier.
99    pub queue_lock_deadline_ms: Option<u64>,
100    /// IN_FLIGHT_CAP_PER_GROUP per-queue policy. Hot-fields tier.
101    pub queue_in_flight_cap_per_group: Option<u32>,
102    /// DLQ target collection name. `None` means on-max drop.
103    pub queue_dlq_target: Option<String>,
104    pub vector_dimension: Option<usize>,
105    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
106    /// `WITH SESSION_KEY <col>` declared on the timeseries collection.
107    /// `None` for non-timeseries and for timeseries without the clause.
108    /// Issue #576 slice 1.
109    pub session_key: Option<String>,
110    /// `SESSION_GAP <duration>` in milliseconds declared on the
111    /// timeseries collection. Issue #576 slice 1.
112    pub session_gap_ms: Option<u64>,
113    /// Declarative retention duration in milliseconds. `None` means no
114    /// retention policy is enforced. Set via `ALTER COLLECTION ... SET
115    /// RETENTION <duration>`. Issue #580 slice 1.
116    pub retention_duration_ms: Option<u64>,
117    pub declared_schema_mode: Option<SchemaMode>,
118    pub observed_schema_mode: SchemaMode,
119    pub entities: usize,
120    pub cross_refs: usize,
121    pub segments: usize,
122    pub indices: Vec<String>,
123    pub declared_indices: Vec<String>,
124    pub operational_indices: Vec<String>,
125    pub indexes_in_sync: bool,
126    pub missing_operational_indices: Vec<String>,
127    pub undeclared_operational_indices: Vec<String>,
128    pub queryable_index_count: usize,
129    pub indexes_requiring_rebuild_count: usize,
130    pub queryable_graph_projection_count: usize,
131    pub graph_projections_requiring_rematerialization_count: usize,
132    pub executable_analytics_job_count: usize,
133    pub analytics_jobs_requiring_rerun_count: usize,
134    pub subscriptions: Vec<SubscriptionDescriptor>,
135    pub resources_in_sync: bool,
136    pub attention_required: bool,
137    pub attention_score: usize,
138    pub attention_reasons: Vec<String>,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogModelSnapshot {
143    pub summary: CatalogSnapshot,
144    pub collections: Vec<CollectionDescriptor>,
145    pub indices: IndexCatalogSnapshot,
146    pub declared_indexes: Vec<PhysicalIndexState>,
147    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
148    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
149    pub operational_indexes: Vec<PhysicalIndexState>,
150    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
151    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
152    pub index_statuses: Vec<CatalogIndexStatus>,
153    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
154    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
155    pub queryable_index_count: usize,
156    pub indexes_requiring_rebuild_count: usize,
157    pub queryable_graph_projection_count: usize,
158    pub graph_projections_requiring_rematerialization_count: usize,
159    pub executable_analytics_job_count: usize,
160    pub analytics_jobs_requiring_rerun_count: usize,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogAttentionSummary {
165    pub collections_requiring_attention: usize,
166    pub indexes_requiring_attention: usize,
167    pub graph_projections_requiring_attention: usize,
168    pub analytics_jobs_requiring_attention: usize,
169    pub top_collection: Option<CollectionDescriptor>,
170    pub top_index: Option<CatalogIndexStatus>,
171    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
172    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
173}
174
175#[derive(Debug, Clone, Default)]
176pub struct CatalogDeclarations {
177    pub declared_indexes: Vec<PhysicalIndexState>,
178    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
179    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
180    pub operational_indexes: Vec<PhysicalIndexState>,
181    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
182    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
183}
184
185#[derive(Debug, Clone)]
186pub struct CatalogGraphProjectionStatus {
187    pub name: String,
188    pub source: Option<String>,
189    pub lifecycle_state: String,
190    pub declared: bool,
191    pub operational: bool,
192    pub in_sync: bool,
193    pub last_materialized_sequence: Option<u64>,
194    pub queryable: bool,
195    pub requires_rematerialization: bool,
196    pub dependent_job_count: usize,
197    pub active_dependent_job_count: usize,
198    pub stale_dependent_job_count: usize,
199    pub dependent_jobs_in_sync: bool,
200    pub rerun_required: bool,
201    pub attention_score: usize,
202    pub attention_reasons: Vec<String>,
203}
204
205#[derive(Debug, Clone)]
206pub struct CatalogIndexStatus {
207    pub name: String,
208    pub collection: Option<String>,
209    pub kind: String,
210    pub declared: bool,
211    pub operational: bool,
212    pub enabled: bool,
213    pub build_state: Option<String>,
214    pub artifact_state: crate::physical::ArtifactState,
215    pub lifecycle_state: String,
216    pub in_sync: bool,
217    pub queryable: bool,
218    pub requires_rebuild: bool,
219    pub attention_score: usize,
220    pub attention_reasons: Vec<String>,
221}
222
223#[derive(Debug, Clone)]
224pub struct CatalogAnalyticsJobStatus {
225    pub id: String,
226    pub kind: String,
227    pub projection: Option<String>,
228    pub state: String,
229    pub lifecycle_state: String,
230    pub declared: bool,
231    pub operational: bool,
232    pub in_sync: bool,
233    pub last_run_sequence: Option<u64>,
234    pub projection_declared: Option<bool>,
235    pub projection_operational: Option<bool>,
236    pub projection_lifecycle_state: Option<String>,
237    pub dependency_in_sync: Option<bool>,
238    pub executable: bool,
239    pub requires_rerun: bool,
240    pub attention_score: usize,
241    pub attention_reasons: Vec<String>,
242}
243
244#[derive(Debug, Clone, Default)]
245pub struct CatalogConsistencyReport {
246    pub declared_index_count: usize,
247    pub operational_index_count: usize,
248    pub declared_graph_projection_count: usize,
249    pub operational_graph_projection_count: usize,
250    pub declared_analytics_job_count: usize,
251    pub operational_analytics_job_count: usize,
252    pub missing_operational_indexes: Vec<String>,
253    pub undeclared_operational_indexes: Vec<String>,
254    pub missing_operational_graph_projections: Vec<String>,
255    pub undeclared_operational_graph_projections: Vec<String>,
256    pub missing_operational_analytics_jobs: Vec<String>,
257    pub undeclared_operational_analytics_jobs: Vec<String>,
258}
259
260pub fn snapshot_store(
261    name: &str,
262    store: &UnifiedStore,
263    index_catalog: Option<&IndexCatalog>,
264) -> CatalogModelSnapshot {
265    snapshot_store_with_declarations(name, store, index_catalog, None, None)
266}
267
268pub fn snapshot_store_with_declarations(
269    name: &str,
270    store: &UnifiedStore,
271    index_catalog: Option<&IndexCatalog>,
272    declarations: Option<&CatalogDeclarations>,
273    contracts: Option<&[CollectionContract]>,
274) -> CatalogModelSnapshot {
275    let index_statuses = index_statuses(declarations);
276    let graph_projection_statuses = graph_projection_statuses(declarations);
277    let analytics_job_statuses = analytics_job_statuses(declarations);
278
279    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
280    for (collection, entity) in store.query_all(|_| true) {
281        grouped.entry(collection).or_default().push(entity);
282    }
283    let queue_policies = queue_policies_from_grouped(&grouped);
284
285    let mut stats_by_collection = BTreeMap::new();
286    let mut collections = Vec::new();
287
288    for collection_name in store.list_collections() {
289        let entities = grouped.remove(&collection_name).unwrap_or_default();
290        let inferred_model = infer_model(&entities);
291        let inferred_schema_mode = infer_schema_mode(inferred_model);
292        let contract = collection_contract(&collection_name, contracts);
293        let model = contract
294            .map(|contract| contract.declared_model)
295            .unwrap_or(inferred_model);
296        let cross_refs = entities
297            .iter()
298            .map(|entity| entity.cross_refs().len())
299            .sum();
300        let entity_count = entities.len();
301        let manager_stats = store
302            .get_collection(&collection_name)
303            .map(|manager| manager.stats());
304        let segments = manager_stats
305            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
306            .unwrap_or(0);
307
308        stats_by_collection.insert(
309            collection_name.clone(),
310            CollectionStats {
311                entities: entity_count,
312                cross_refs,
313                segments,
314            },
315        );
316
317        let declared_indices = declared_indices(&collection_name, declarations);
318        let operational_indices = operational_indices(&collection_name, index_catalog);
319        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
320            collection_index_consistency(&declared_indices, &operational_indices);
321        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
322            collection_index_readiness(&collection_name, &index_statuses);
323        let (
324            queryable_graph_projection_count,
325            graph_projections_requiring_rematerialization_count,
326            graph_projections_locally_in_sync,
327        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
328        let (
329            executable_analytics_job_count,
330            analytics_jobs_requiring_rerun_count,
331            analytics_jobs_locally_in_sync,
332        ) = collection_analytics_job_readiness(
333            &collection_name,
334            &graph_projection_statuses,
335            &analytics_job_statuses,
336        );
337        let resources_in_sync = indexes_in_sync
338            && indexes_locally_in_sync
339            && graph_projections_locally_in_sync
340            && analytics_jobs_locally_in_sync;
341        let attention_required = !resources_in_sync
342            || indexes_requiring_rebuild_count > 0
343            || graph_projections_requiring_rematerialization_count > 0
344            || analytics_jobs_requiring_rerun_count > 0;
345        let mut attention_reasons = Vec::new();
346        if !indexes_in_sync || !indexes_locally_in_sync {
347            attention_reasons.push("index_drift".to_string());
348        }
349        if indexes_requiring_rebuild_count > 0 {
350            attention_reasons.push("indexes_require_rebuild".to_string());
351        }
352        if !graph_projections_locally_in_sync {
353            attention_reasons.push("graph_projection_drift".to_string());
354        }
355        if graph_projections_requiring_rematerialization_count > 0 {
356            attention_reasons.push("graph_projections_require_rematerialization".to_string());
357        }
358        if !analytics_jobs_locally_in_sync {
359            attention_reasons.push("analytics_job_drift".to_string());
360        }
361        if analytics_jobs_requiring_rerun_count > 0 {
362            attention_reasons.push("analytics_jobs_require_rerun".to_string());
363        }
364        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
365            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
366            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
367            + usize::from(!resources_in_sync);
368
369        collections.push(CollectionDescriptor {
370            name: collection_name.clone(),
371            model,
372            schema_mode: contract
373                .map(|contract| contract.schema_mode)
374                .unwrap_or(inferred_schema_mode),
375            contract_present: contract.is_some(),
376            contract_origin: contract.map(|contract| contract.origin),
377            declared_model: contract.map(|contract| contract.declared_model),
378            observed_model: inferred_model,
379            queue_mode: if model == CollectionModel::Queue {
380                Some(
381                    queue_policies
382                        .get(&collection_name)
383                        .map(|p| p.mode)
384                        .unwrap_or_default(),
385                )
386            } else {
387                None
388            },
389            queue_max_attempts: if model == CollectionModel::Queue {
390                Some(
391                    queue_policies
392                        .get(&collection_name)
393                        .map(|p| p.max_attempts)
394                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
395                )
396            } else {
397                None
398            },
399            queue_lock_deadline_ms: if model == CollectionModel::Queue {
400                Some(
401                    queue_policies
402                        .get(&collection_name)
403                        .map(|p| p.lock_deadline_ms)
404                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
405                )
406            } else {
407                None
408            },
409            queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
410                Some(
411                    queue_policies
412                        .get(&collection_name)
413                        .map(|p| p.in_flight_cap_per_group)
414                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
415                )
416            } else {
417                None
418            },
419            queue_dlq_target: if model == CollectionModel::Queue {
420                queue_policies
421                    .get(&collection_name)
422                    .and_then(|p| p.dlq_target.clone())
423            } else {
424                None
425            },
426            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
427            vector_metric: contract.and_then(|contract| contract.vector_metric),
428            session_key: contract.and_then(|contract| contract.session_key.clone()),
429            session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
430            retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
431            declared_schema_mode: contract.map(|contract| contract.schema_mode),
432            observed_schema_mode: inferred_schema_mode,
433            entities: entity_count,
434            cross_refs,
435            segments,
436            indices: infer_indices(&collection_name, model, index_catalog, declarations),
437            declared_indices,
438            operational_indices,
439            indexes_in_sync,
440            missing_operational_indices,
441            undeclared_operational_indices,
442            queryable_index_count,
443            indexes_requiring_rebuild_count,
444            queryable_graph_projection_count,
445            graph_projections_requiring_rematerialization_count,
446            executable_analytics_job_count,
447            analytics_jobs_requiring_rerun_count,
448            subscriptions: contract
449                .map(|contract| contract.subscriptions.clone())
450                .unwrap_or_default(),
451            resources_in_sync,
452            attention_required,
453            attention_score,
454            attention_reasons,
455        });
456    }
457
458    collections.sort_by(|left, right| left.name.cmp(&right.name));
459
460    let summary = CatalogSnapshot {
461        name: name.to_string(),
462        total_entities: stats_by_collection
463            .values()
464            .map(|stats| stats.entities)
465            .sum(),
466        total_collections: stats_by_collection.len(),
467        stats_by_collection,
468        updated_at: SystemTime::now(),
469    };
470
471    CatalogModelSnapshot {
472        summary,
473        collections,
474        indices: index_catalog
475            .map(IndexCatalog::snapshot)
476            .unwrap_or_default(),
477        declared_indexes: declarations
478            .map(|declarations| declarations.declared_indexes.clone())
479            .unwrap_or_default(),
480        declared_graph_projections: declarations
481            .map(|declarations| declarations.declared_graph_projections.clone())
482            .unwrap_or_default(),
483        declared_analytics_jobs: declarations
484            .map(|declarations| declarations.declared_analytics_jobs.clone())
485            .unwrap_or_default(),
486        operational_indexes: declarations
487            .map(|declarations| declarations.operational_indexes.clone())
488            .unwrap_or_default(),
489        operational_graph_projections: declarations
490            .map(|declarations| declarations.operational_graph_projections.clone())
491            .unwrap_or_default(),
492        operational_analytics_jobs: declarations
493            .map(|declarations| declarations.operational_analytics_jobs.clone())
494            .unwrap_or_default(),
495        queryable_index_count: index_statuses
496            .iter()
497            .filter(|status| status.queryable)
498            .count(),
499        indexes_requiring_rebuild_count: index_statuses
500            .iter()
501            .filter(|status| status.requires_rebuild)
502            .count(),
503        queryable_graph_projection_count: graph_projection_statuses
504            .iter()
505            .filter(|status| status.queryable)
506            .count(),
507        graph_projections_requiring_rematerialization_count: graph_projection_statuses
508            .iter()
509            .filter(|status| status.requires_rematerialization)
510            .count(),
511        executable_analytics_job_count: analytics_job_statuses
512            .iter()
513            .filter(|status| status.executable)
514            .count(),
515        analytics_jobs_requiring_rerun_count: analytics_job_statuses
516            .iter()
517            .filter(|status| status.requires_rerun)
518            .count(),
519        index_statuses,
520        graph_projection_statuses,
521        analytics_job_statuses,
522    }
523}
524
525/// Hot-fields snapshot of per-queue policy, materialised at catalog
526/// snapshot time so `QueueLifecycle` (and the descriptor) can read
527/// values without re-scanning `red_queue_meta`.
528#[derive(Debug, Clone)]
529pub struct QueuePolicySnapshot {
530    pub mode: QueueMode,
531    pub max_attempts: u32,
532    pub lock_deadline_ms: u64,
533    pub in_flight_cap_per_group: u32,
534    pub dlq_target: Option<String>,
535}
536
537fn queue_policies_from_grouped(
538    grouped: &HashMap<String, Vec<UnifiedEntity>>,
539) -> HashMap<String, QueuePolicySnapshot> {
540    let Some(meta) = grouped.get("red_queue_meta") else {
541        return HashMap::new();
542    };
543
544    let mut policies = HashMap::new();
545    for entity in meta {
546        let Some(row) = entity.data.as_row() else {
547            continue;
548        };
549        if row_text(row, "kind").as_deref() != Some("queue_config") {
550            continue;
551        }
552        let Some(queue) = row_text(row, "queue") else {
553            continue;
554        };
555        let mode = row_text(row, "mode")
556            .as_deref()
557            .and_then(QueueMode::parse)
558            .unwrap_or_default();
559        let max_attempts = row_u64(row, "max_attempts")
560            .map(|v| v as u32)
561            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
562        let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
563            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
564        let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
565            .map(|v| v as u32)
566            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
567        let dlq_target = row_text(row, "dlq");
568        policies.insert(
569            queue,
570            QueuePolicySnapshot {
571                mode,
572                max_attempts,
573                lock_deadline_ms,
574                in_flight_cap_per_group,
575                dlq_target,
576            },
577        );
578    }
579    policies
580}
581
582fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
583    match row.get_field(key) {
584        Some(Value::UnsignedInteger(v)) => Some(*v),
585        Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
586        _ => None,
587    }
588}
589
590fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
591    match row.get_field(key) {
592        Some(Value::Text(value)) => Some(value.to_string()),
593        _ => None,
594    }
595}
596
597fn collection_contract<'a>(
598    collection_name: &str,
599    contracts: Option<&'a [CollectionContract]>,
600) -> Option<&'a CollectionContract> {
601    contracts.and_then(|contracts| {
602        contracts
603            .iter()
604            .find(|contract| contract.name == collection_name)
605    })
606}
607
608fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
609    let mut has_table = false;
610    let mut has_graph = false;
611    let mut has_vector = false;
612
613    for entity in entities {
614        match &entity.kind {
615            EntityKind::TableRow { .. } => has_table = true,
616            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
617            EntityKind::Vector { .. } => has_vector = true,
618            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
619        }
620    }
621
622    match (has_table, has_graph, has_vector) {
623        (true, false, false) => CollectionModel::Table,
624        (false, true, false) => CollectionModel::Graph,
625        (false, false, true) => CollectionModel::Vector,
626        _ => CollectionModel::Mixed,
627    }
628}
629
630fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
631    match model {
632        CollectionModel::Table => SchemaMode::Strict,
633        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
634        CollectionModel::Document
635        | CollectionModel::Hll
636        | CollectionModel::Sketch
637        | CollectionModel::Filter
638        | CollectionModel::Kv
639        | CollectionModel::Config
640        | CollectionModel::Vault
641        | CollectionModel::Mixed => SchemaMode::Dynamic,
642        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
643        CollectionModel::Queue => SchemaMode::Dynamic,
644    }
645}
646
647fn infer_indices(
648    collection_name: &str,
649    model: CollectionModel,
650    index_catalog: Option<&IndexCatalog>,
651    declarations: Option<&CatalogDeclarations>,
652) -> Vec<String> {
653    let declared = declared_indices(collection_name, declarations);
654    if !declared.is_empty() {
655        return declared;
656    }
657
658    let available = index_catalog
659        .map(IndexCatalog::snapshot)
660        .unwrap_or_default();
661    let mut selected = Vec::new();
662
663    for metric in available {
664        let relevant = matches!(
665            (model, metric.kind),
666            (_, IndexKind::BTree)
667                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
668                | (CollectionModel::Vector, IndexKind::VectorHnsw)
669                | (CollectionModel::Vector, IndexKind::VectorInverted)
670                | (CollectionModel::Document, IndexKind::FullText)
671                | (CollectionModel::Document, IndexKind::DocumentPathValue)
672                | (CollectionModel::Kv, IndexKind::Hash)
673                | (CollectionModel::Config, IndexKind::Hash)
674                | (CollectionModel::Vault, IndexKind::Hash)
675                | (CollectionModel::Mixed, _)
676        );
677
678        if relevant && metric.enabled {
679            selected.push(metric.name);
680        }
681    }
682
683    selected
684}
685
686fn declared_indices(
687    collection_name: &str,
688    declarations: Option<&CatalogDeclarations>,
689) -> Vec<String> {
690    let mut selected = declarations
691        .map(|declarations| {
692            declarations
693                .declared_indexes
694                .iter()
695                .filter(|index| index.collection.as_deref() == Some(collection_name))
696                .map(|index| index.name.clone())
697                .collect::<Vec<_>>()
698        })
699        .unwrap_or_default();
700    selected.sort();
701    selected.dedup();
702    selected
703}
704
705fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
706    let mut selected = index_catalog
707        .map(IndexCatalog::snapshot)
708        .unwrap_or_default()
709        .into_iter()
710        .filter(|metric| metric.enabled)
711        .filter_map(|metric| {
712            if metric.name.starts_with(collection_name)
713                || matches!(
714                    metric.kind,
715                    IndexKind::BTree
716                        | IndexKind::GraphAdjacency
717                        | IndexKind::VectorHnsw
718                        | IndexKind::VectorInverted
719                        | IndexKind::FullText
720                        | IndexKind::DocumentPathValue
721                        | IndexKind::HybridSearch
722                )
723            {
724                Some(metric.name)
725            } else {
726                None
727            }
728        })
729        .collect::<Vec<_>>();
730    selected.sort();
731    selected.dedup();
732    selected
733}
734
735fn collection_index_consistency(
736    declared_indices: &[String],
737    operational_indices: &[String],
738) -> (bool, Vec<String>, Vec<String>) {
739    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
740    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
741    let missing_operational_indices = declared
742        .difference(&operational)
743        .cloned()
744        .collect::<Vec<_>>();
745    let undeclared_operational_indices = operational
746        .difference(&declared)
747        .cloned()
748        .collect::<Vec<_>>();
749    (
750        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
751        missing_operational_indices,
752        undeclared_operational_indices,
753    )
754}
755
756fn collection_index_readiness(
757    collection_name: &str,
758    statuses: &[CatalogIndexStatus],
759) -> (usize, usize, bool) {
760    let local = statuses
761        .iter()
762        .filter(|status| status.collection.as_deref() == Some(collection_name))
763        .collect::<Vec<_>>();
764    let queryable_count = local.iter().filter(|status| status.queryable).count();
765    let requires_rebuild_count = local
766        .iter()
767        .filter(|status| status.requires_rebuild)
768        .count();
769    let locally_in_sync = local.iter().all(|status| status.in_sync);
770    (queryable_count, requires_rebuild_count, locally_in_sync)
771}
772
773fn collection_graph_projection_readiness(
774    collection_name: &str,
775    statuses: &[CatalogGraphProjectionStatus],
776) -> (usize, usize, bool) {
777    let local = statuses
778        .iter()
779        .filter(|status| status.source.as_deref() == Some(collection_name))
780        .collect::<Vec<_>>();
781    let queryable_count = local.iter().filter(|status| status.queryable).count();
782    let requires_rematerialization_count = local
783        .iter()
784        .filter(|status| status.requires_rematerialization)
785        .count();
786    let locally_in_sync = local
787        .iter()
788        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
789    (
790        queryable_count,
791        requires_rematerialization_count,
792        locally_in_sync,
793    )
794}
795
796fn collection_analytics_job_readiness(
797    collection_name: &str,
798    projection_statuses: &[CatalogGraphProjectionStatus],
799    job_statuses: &[CatalogAnalyticsJobStatus],
800) -> (usize, usize, bool) {
801    let local = job_statuses
802        .iter()
803        .filter(|status| {
804            let Some(projection_name) = status.projection.as_deref() else {
805                return false;
806            };
807            projection_statuses
808                .iter()
809                .find(|projection| projection.name == projection_name)
810                .and_then(|projection| projection.source.as_deref())
811                == Some(collection_name)
812        })
813        .collect::<Vec<_>>();
814    let executable_count = local.iter().filter(|status| status.executable).count();
815    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
816    let locally_in_sync = local
817        .iter()
818        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
819    (executable_count, requires_rerun_count, locally_in_sync)
820}
821
822fn graph_projection_statuses(
823    declarations: Option<&CatalogDeclarations>,
824) -> Vec<CatalogGraphProjectionStatus> {
825    let declared = declarations
826        .map(|declarations| declarations.declared_graph_projections.as_slice())
827        .unwrap_or(&[]);
828    let operational = declarations
829        .map(|declarations| declarations.operational_graph_projections.as_slice())
830        .unwrap_or(&[]);
831    let declared_jobs = declarations
832        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
833        .unwrap_or(&[]);
834    let operational_jobs = declarations
835        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
836        .unwrap_or(&[]);
837
838    let mut names = declared
839        .iter()
840        .map(|projection| projection.name.clone())
841        .chain(operational.iter().map(|projection| projection.name.clone()))
842        .collect::<Vec<_>>();
843    names.sort();
844    names.dedup();
845
846    names
847        .into_iter()
848        .map(|name| {
849            let declared_projection = declared.iter().find(|projection| projection.name == name);
850            let operational_projection = operational
851                .iter()
852                .find(|projection| projection.name == name);
853            let declared_present = declared_projection.is_some();
854            let operational_present = operational_projection.is_some();
855            let mut dependent_job_ids = BTreeSet::new();
856            let mut dependent_job_count = 0usize;
857            let mut active_dependent_job_count = 0usize;
858            let mut stale_dependent_job_count = 0usize;
859            for job in declared_jobs
860                .iter()
861                .chain(operational_jobs.iter())
862                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
863            {
864                if !dependent_job_ids.insert(job.id.clone()) {
865                    continue;
866                }
867                dependent_job_count += 1;
868                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
869                    active_dependent_job_count += 1;
870                }
871                if job.state == "stale" {
872                    stale_dependent_job_count += 1;
873                }
874            }
875            let lifecycle_state = match (
876                declared_present,
877                operational_present,
878                declared_projection
879                    .map(|projection| projection.state.as_str())
880                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
881                    .unwrap_or_default(),
882            ) {
883                (true, _, "failed") => "failed",
884                (true, true, "stale") => "stale",
885                (true, _, "materializing") => "materializing",
886                (true, true, "materialized") => "materialized",
887                (true, true, _) => "materialized",
888                (false, true, _) => "orphaned-operational",
889                (true, false, _) => "declared",
890                (false, false, _) => "unknown",
891            }
892            .to_string();
893            let queryable = declared_present
894                && operational_present
895                && matches!(
896                    declared_projection
897                        .map(|projection| projection.state.as_str())
898                        .or_else(
899                            || operational_projection.map(|projection| projection.state.as_str())
900                        )
901                        .unwrap_or_default(),
902                    "materialized"
903                );
904            let requires_rematerialization = matches!(
905                declared_projection
906                    .map(|projection| projection.state.as_str())
907                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
908                    .unwrap_or_default(),
909                "declared" | "materializing" | "failed" | "stale"
910            );
911            let mut attention_reasons = Vec::new();
912            if !declared_present || !operational_present {
913                attention_reasons.push("declaration_drift".to_string());
914            }
915            if requires_rematerialization {
916                attention_reasons.push("requires_rematerialization".to_string());
917            }
918            if stale_dependent_job_count > 0 {
919                attention_reasons.push("dependent_jobs_stale".to_string());
920            }
921            let attention_score = stale_dependent_job_count.saturating_mul(2)
922                + usize::from(requires_rematerialization).saturating_mul(4)
923                + usize::from(!declared_present || !operational_present)
924                + usize::from(!queryable);
925            CatalogGraphProjectionStatus {
926                name,
927                source: declared_projection
928                    .map(|projection| projection.source.clone())
929                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
930                lifecycle_state,
931                declared: declared_present,
932                operational: operational_present,
933                in_sync: declared_present == operational_present,
934                last_materialized_sequence: declared_projection
935                    .and_then(|projection| projection.last_materialized_sequence)
936                    .or_else(|| {
937                        operational_projection
938                            .and_then(|projection| projection.last_materialized_sequence)
939                    }),
940                queryable,
941                requires_rematerialization,
942                dependent_job_count,
943                active_dependent_job_count,
944                stale_dependent_job_count,
945                dependent_jobs_in_sync: stale_dependent_job_count == 0,
946                rerun_required: stale_dependent_job_count > 0,
947                attention_score,
948                attention_reasons,
949            }
950        })
951        .collect()
952}
953
954fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
955    let declared = declarations
956        .map(|declarations| declarations.declared_indexes.as_slice())
957        .unwrap_or(&[]);
958    let operational = declarations
959        .map(|declarations| declarations.operational_indexes.as_slice())
960        .unwrap_or(&[]);
961
962    let mut names = declared
963        .iter()
964        .map(|index| index.name.clone())
965        .chain(operational.iter().map(|index| index.name.clone()))
966        .collect::<Vec<_>>();
967    names.sort();
968    names.dedup();
969
970    names
971        .into_iter()
972        .map(|name| {
973            let declared_index = declared.iter().find(|index| index.name == name);
974            let operational_index = operational.iter().find(|index| index.name == name);
975            let collection = declared_index
976                .and_then(|index| index.collection.clone())
977                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
978            let kind = declared_index
979                .map(|index| index_kind_string(index.kind))
980                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
981                .unwrap_or_default();
982            let enabled = declared_index
983                .map(|index| index.enabled)
984                .or_else(|| operational_index.map(|index| index.enabled))
985                .unwrap_or(false);
986            let build_state = operational_index
987                .map(|index| index.build_state.clone())
988                .or_else(|| declared_index.map(|index| index.build_state.clone()));
989            let declared_present = declared_index.is_some();
990            let operational_present = operational_index.is_some();
991            let lifecycle_state = index_lifecycle_state(
992                declared_present,
993                operational_present,
994                enabled,
995                build_state.as_deref(),
996            );
997
998            let mut attention_reasons = Vec::new();
999            if declared_present != operational_present {
1000                attention_reasons.push("declaration_drift".to_string());
1001            }
1002            if !enabled && declared_present {
1003                attention_reasons.push("disabled".to_string());
1004            }
1005            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1006                attention_reasons.push("failed".to_string());
1007            }
1008            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1009                attention_reasons.push("stale".to_string());
1010            }
1011            if matches!(
1012                build_state.as_deref().unwrap_or_default(),
1013                "building" | "declared-unbuilt"
1014            ) {
1015                attention_reasons.push("requires_rebuild".to_string());
1016            }
1017            let queryable = declared_present
1018                && operational_present
1019                && enabled
1020                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1021            let requires_rebuild = matches!(
1022                build_state.as_deref().unwrap_or_default(),
1023                "declared-unbuilt" | "building" | "stale" | "failed"
1024            );
1025            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1026                + usize::from(!enabled && declared_present)
1027                + usize::from(declared_present != operational_present)
1028                + usize::from(!queryable);
1029
1030            let artifact_state = crate::physical::ArtifactState::from_build_state(
1031                build_state.as_deref().unwrap_or("declared-unbuilt"),
1032                enabled,
1033            );
1034            CatalogIndexStatus {
1035                name,
1036                collection,
1037                kind,
1038                declared: declared_present,
1039                operational: operational_present,
1040                enabled,
1041                build_state,
1042                artifact_state,
1043                lifecycle_state,
1044                in_sync: declared_present == operational_present,
1045                queryable,
1046                requires_rebuild,
1047                attention_score,
1048                attention_reasons,
1049            }
1050        })
1051        .collect()
1052}
1053
1054fn index_lifecycle_state(
1055    declared: bool,
1056    operational: bool,
1057    enabled: bool,
1058    build_state: Option<&str>,
1059) -> String {
1060    if !declared && operational {
1061        return "orphaned-operational".to_string();
1062    }
1063    if declared && !enabled {
1064        return "disabled".to_string();
1065    }
1066    if !declared {
1067        return "unknown".to_string();
1068    }
1069    if !operational {
1070        return "declared".to_string();
1071    }
1072
1073    match build_state.unwrap_or_default() {
1074        "ready" => "ready".to_string(),
1075        "failed" => "failed".to_string(),
1076        "stale" => "stale".to_string(),
1077        "declared-unbuilt" => "declared".to_string(),
1078        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1079            "building".to_string()
1080        }
1081        _ => "building".to_string(),
1082    }
1083}
1084
1085fn index_kind_string(kind: IndexKind) -> String {
1086    match kind {
1087        IndexKind::BTree => "btree",
1088        IndexKind::Hash => "hash",
1089        IndexKind::Bitmap => "bitmap",
1090        IndexKind::Spatial => "spatial.rtree",
1091        IndexKind::VectorHnsw => "vector.hnsw",
1092        IndexKind::VectorInverted => "vector.inverted",
1093        IndexKind::GraphAdjacency => "graph.adjacency",
1094        IndexKind::FullText => "text.fulltext",
1095        IndexKind::DocumentPathValue => "document.pathvalue",
1096        IndexKind::HybridSearch => "search.hybrid",
1097    }
1098    .to_string()
1099}
1100
1101fn analytics_job_statuses(
1102    declarations: Option<&CatalogDeclarations>,
1103) -> Vec<CatalogAnalyticsJobStatus> {
1104    let declared = declarations
1105        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1106        .unwrap_or(&[]);
1107    let operational = declarations
1108        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1109        .unwrap_or(&[]);
1110    let declared_projections = declarations
1111        .map(|declarations| declarations.declared_graph_projections.as_slice())
1112        .unwrap_or(&[]);
1113    let operational_projections = declarations
1114        .map(|declarations| declarations.operational_graph_projections.as_slice())
1115        .unwrap_or(&[]);
1116
1117    let mut ids = declared
1118        .iter()
1119        .map(|job| job.id.clone())
1120        .chain(operational.iter().map(|job| job.id.clone()))
1121        .collect::<Vec<_>>();
1122    ids.sort();
1123    ids.dedup();
1124
1125    ids.into_iter()
1126        .map(|id| {
1127            let declared_job = declared.iter().find(|job| job.id == id);
1128            let operational_job = operational.iter().find(|job| job.id == id);
1129            let kind = declared_job
1130                .map(|job| job.kind.clone())
1131                .or_else(|| operational_job.map(|job| job.kind.clone()))
1132                .unwrap_or_default();
1133            let projection = declared_job
1134                .and_then(|job| job.projection.clone())
1135                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1136            let state = declared_job
1137                .map(|job| job.state.clone())
1138                .or_else(|| operational_job.map(|job| job.state.clone()))
1139                .unwrap_or_default();
1140            let declared_present = declared_job.is_some();
1141            let operational_present = operational_job.is_some();
1142            let last_run_sequence = declared_job
1143                .and_then(|job| job.last_run_sequence)
1144                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1145            let projection_declared = projection.as_ref().map(|projection_name| {
1146                declared_projections
1147                    .iter()
1148                    .any(|projection| projection.name == *projection_name)
1149            });
1150            let projection_operational = projection.as_ref().map(|projection_name| {
1151                operational_projections
1152                    .iter()
1153                    .any(|projection| projection.name == *projection_name)
1154            });
1155            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1156                declared_projections
1157                    .iter()
1158                    .find(|projection| projection.name == *projection_name)
1159                    .map(|projection| projection.state.as_str())
1160                    .or_else(|| {
1161                        operational_projections
1162                            .iter()
1163                            .find(|projection| projection.name == *projection_name)
1164                            .map(|projection| projection.state.as_str())
1165                    })
1166            });
1167            let dependency_in_sync = projection.as_ref().map(|_| {
1168                matches!(projection_lifecycle, Some("materialized"))
1169                    && projection_operational == Some(true)
1170            });
1171            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1172                (true, false, _) => "declared",
1173                (true, true, _)
1174                    if matches!(
1175                        projection_lifecycle,
1176                        Some("stale" | "failed" | "materializing" | "declared")
1177                    ) =>
1178                {
1179                    "stale"
1180                }
1181                (true, true, "completed") => "completed",
1182                (true, true, "running") => "running",
1183                (true, true, "failed") => "failed",
1184                (true, true, "queued") => "queued",
1185                (true, true, "stale") => "stale",
1186                (true, true, _) => "materialized",
1187                (false, true, _) => "orphaned-operational",
1188                (false, false, _) => "unknown",
1189            }
1190            .to_string();
1191            let executable = declared_present
1192                && operational_present
1193                && !matches!(state.as_str(), "failed" | "stale")
1194                && dependency_in_sync.unwrap_or(true);
1195            let requires_rerun =
1196                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1197            let mut attention_reasons = Vec::new();
1198            if declared_present != operational_present {
1199                attention_reasons.push("declaration_drift".to_string());
1200            }
1201            if matches!(state.as_str(), "failed") {
1202                attention_reasons.push("failed".to_string());
1203            }
1204            if matches!(state.as_str(), "stale") {
1205                attention_reasons.push("stale".to_string());
1206            }
1207            if dependency_in_sync == Some(false) {
1208                attention_reasons.push("dependency_drift".to_string());
1209            }
1210            if requires_rerun {
1211                attention_reasons.push("requires_rerun".to_string());
1212            }
1213            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1214                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1215                + usize::from(declared_present != operational_present)
1216                + usize::from(!executable);
1217            CatalogAnalyticsJobStatus {
1218                id,
1219                kind,
1220                projection,
1221                state: state.clone(),
1222                lifecycle_state,
1223                declared: declared_present,
1224                operational: operational_present,
1225                in_sync: declared_present == operational_present,
1226                last_run_sequence,
1227                projection_declared,
1228                projection_operational,
1229                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1230                dependency_in_sync,
1231                executable,
1232                requires_rerun,
1233                attention_score,
1234                attention_reasons,
1235            }
1236        })
1237        .collect()
1238}
1239
1240pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1241    let declared_indexes = snapshot
1242        .declared_indexes
1243        .iter()
1244        .map(|index| index.name.clone())
1245        .collect::<BTreeSet<_>>();
1246    let operational_indexes = snapshot
1247        .operational_indexes
1248        .iter()
1249        .map(|index| index.name.clone())
1250        .collect::<BTreeSet<_>>();
1251    let declared_graph_projections = snapshot
1252        .declared_graph_projections
1253        .iter()
1254        .map(|projection| projection.name.clone())
1255        .collect::<BTreeSet<_>>();
1256    let operational_graph_projections = snapshot
1257        .operational_graph_projections
1258        .iter()
1259        .map(|projection| projection.name.clone())
1260        .collect::<BTreeSet<_>>();
1261    let declared_analytics_jobs = snapshot
1262        .declared_analytics_jobs
1263        .iter()
1264        .map(|job| job.id.clone())
1265        .collect::<BTreeSet<_>>();
1266    let operational_analytics_jobs = snapshot
1267        .operational_analytics_jobs
1268        .iter()
1269        .map(|job| job.id.clone())
1270        .collect::<BTreeSet<_>>();
1271
1272    CatalogConsistencyReport {
1273        declared_index_count: declared_indexes.len(),
1274        operational_index_count: operational_indexes.len(),
1275        declared_graph_projection_count: declared_graph_projections.len(),
1276        operational_graph_projection_count: operational_graph_projections.len(),
1277        declared_analytics_job_count: declared_analytics_jobs.len(),
1278        operational_analytics_job_count: operational_analytics_jobs.len(),
1279        missing_operational_indexes: declared_indexes
1280            .difference(&operational_indexes)
1281            .cloned()
1282            .collect(),
1283        undeclared_operational_indexes: operational_indexes
1284            .difference(&declared_indexes)
1285            .cloned()
1286            .collect(),
1287        missing_operational_graph_projections: declared_graph_projections
1288            .difference(&operational_graph_projections)
1289            .cloned()
1290            .collect(),
1291        undeclared_operational_graph_projections: operational_graph_projections
1292            .difference(&declared_graph_projections)
1293            .cloned()
1294            .collect(),
1295        missing_operational_analytics_jobs: declared_analytics_jobs
1296            .difference(&operational_analytics_jobs)
1297            .cloned()
1298            .collect(),
1299        undeclared_operational_analytics_jobs: operational_analytics_jobs
1300            .difference(&declared_analytics_jobs)
1301            .cloned()
1302            .collect(),
1303    }
1304}
1305
1306pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1307    CatalogAttentionSummary {
1308        collections_requiring_attention: snapshot
1309            .collections
1310            .iter()
1311            .filter(|collection| collection.attention_required)
1312            .count(),
1313        indexes_requiring_attention: snapshot
1314            .index_statuses
1315            .iter()
1316            .filter(|status| status.attention_score > 0)
1317            .count(),
1318        graph_projections_requiring_attention: snapshot
1319            .graph_projection_statuses
1320            .iter()
1321            .filter(|status| status.attention_score > 0)
1322            .count(),
1323        analytics_jobs_requiring_attention: snapshot
1324            .analytics_job_statuses
1325            .iter()
1326            .filter(|status| status.attention_score > 0)
1327            .count(),
1328        top_collection: snapshot
1329            .collections
1330            .iter()
1331            .filter(|collection| collection.attention_score > 0)
1332            .max_by_key(|collection| collection.attention_score)
1333            .cloned(),
1334        top_index: snapshot
1335            .index_statuses
1336            .iter()
1337            .filter(|status| status.attention_score > 0)
1338            .max_by_key(|status| status.attention_score)
1339            .cloned(),
1340        top_graph_projection: snapshot
1341            .graph_projection_statuses
1342            .iter()
1343            .filter(|status| status.attention_score > 0)
1344            .max_by_key(|status| status.attention_score)
1345            .cloned(),
1346        top_analytics_job: snapshot
1347            .analytics_job_statuses
1348            .iter()
1349            .filter(|status| status.attention_score > 0)
1350            .max_by_key(|status| status.attention_score)
1351            .cloned(),
1352    }
1353}
1354
1355pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1356    let mut collections = snapshot
1357        .collections
1358        .iter()
1359        .filter(|collection| collection.attention_required)
1360        .cloned()
1361        .collect::<Vec<_>>();
1362    collections.sort_by(|left, right| {
1363        right
1364            .attention_score
1365            .cmp(&left.attention_score)
1366            .then_with(|| left.name.cmp(&right.name))
1367    });
1368    collections
1369}
1370
1371pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1372    let mut statuses = snapshot
1373        .index_statuses
1374        .iter()
1375        .filter(|status| status.attention_score > 0)
1376        .cloned()
1377        .collect::<Vec<_>>();
1378    statuses.sort_by(|left, right| {
1379        right
1380            .attention_score
1381            .cmp(&left.attention_score)
1382            .then_with(|| left.name.cmp(&right.name))
1383    });
1384    statuses
1385}
1386
1387pub fn graph_projection_attention(
1388    snapshot: &CatalogModelSnapshot,
1389) -> Vec<CatalogGraphProjectionStatus> {
1390    let mut statuses = snapshot
1391        .graph_projection_statuses
1392        .iter()
1393        .filter(|status| status.attention_score > 0)
1394        .cloned()
1395        .collect::<Vec<_>>();
1396    statuses.sort_by(|left, right| {
1397        right
1398            .attention_score
1399            .cmp(&left.attention_score)
1400            .then_with(|| left.name.cmp(&right.name))
1401    });
1402    statuses
1403}
1404
1405pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1406    let mut statuses = snapshot
1407        .analytics_job_statuses
1408        .iter()
1409        .filter(|status| status.attention_score > 0)
1410        .cloned()
1411        .collect::<Vec<_>>();
1412    statuses.sort_by(|left, right| {
1413        right
1414            .attention_score
1415            .cmp(&left.attention_score)
1416            .then_with(|| left.id.cmp(&right.id))
1417    });
1418    statuses
1419}