Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16/// Per-collection analytical-storage seam (PRD #850, Phase 1).
17///
18/// Declares that a `Metrics`/`TimeSeries` collection's hypertable chunks
19/// are backed by the **columnar** sealed-chunk layout
20/// ([`column_block`](crate::storage::unified::column_block)) rather than
21/// the default row engine. `columnar = false` (or the whole config
22/// absent) keeps the row engine — the seal dispatch routes only
23/// columnar-flagged chunks to the `ColumnBlock` writer.
24///
25/// `time_key` / `order_by_key` mirror the ClickHouse `ORDER BY` intent so
26/// downstream slices (#854 granules, #856 vectorized read) can lay the
27/// columns out in their natural sort order; v1 records them as the durable
28/// contract without yet acting on `order_by_key`.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct AnalyticalStorageConfig {
31    /// When true, sealing a chunk routes to the columnar `ColumnBlock`
32    /// writer; otherwise the row engine stays the default.
33    pub columnar: bool,
34    /// Column carrying the time axis (the chunk's partition key).
35    pub time_key: String,
36    /// Optional secondary sort key (ClickHouse `ORDER BY` tail). `None`
37    /// orders by `time_key` alone.
38    pub order_by_key: Option<String>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaMode {
43    Strict,
44    SemiStructured,
45    Dynamic,
46}
47
48// Catalog AST-leaf descriptors re-homed to the neutral keystone crate (ADR
49// 0053, RQL Phase 2 S4b) so the canonical SQL AST
50// (`CreateTableQuery.{subscriptions, analytics_config}`) resolves them without a
51// `reddb-server` edge. This shim keeps `crate::catalog::{...}` valid for
52// existing call-sites.
53pub use reddb_types::catalog::{
54    AiPolicy, AnalyticsOutput, AnalyticsViewDescriptor, CollectionModel, EmbedPolicy,
55    ModerateDegradedMode, ModeratePolicy, ModerateRejectAction, SubscriptionDescriptor,
56    SubscriptionOperation, VisionPolicy,
57};
58
59#[derive(Debug, Clone)]
60pub struct CollectionDescriptor {
61    pub name: String,
62    pub model: CollectionModel,
63    pub schema_mode: SchemaMode,
64    pub contract_present: bool,
65    pub contract_origin: Option<crate::physical::ContractOrigin>,
66    pub declared_model: Option<CollectionModel>,
67    pub observed_model: CollectionModel,
68    pub queue_mode: Option<QueueMode>,
69    /// MAX_ATTEMPTS per-queue policy. Hot-fields tier — populated for
70    /// `model = queue` from the catalog snapshot for sub-ms reads by
71    /// `QueueLifecycle`. `None` for non-queue collections.
72    pub queue_max_attempts: Option<u32>,
73    /// LOCK_DEADLINE_MS per-queue policy. Hot-fields tier.
74    pub queue_lock_deadline_ms: Option<u64>,
75    /// IN_FLIGHT_CAP_PER_GROUP per-queue policy. Hot-fields tier.
76    pub queue_in_flight_cap_per_group: Option<u32>,
77    /// DLQ target collection name. `None` means on-max drop.
78    pub queue_dlq_target: Option<String>,
79    pub vector_dimension: Option<usize>,
80    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
81    /// `WITH SESSION_KEY <col>` declared on the timeseries collection.
82    /// `None` for non-timeseries and for timeseries without the clause.
83    /// Issue #576 slice 1.
84    pub session_key: Option<String>,
85    /// `SESSION_GAP <duration>` in milliseconds declared on the
86    /// timeseries collection. Issue #576 slice 1.
87    pub session_gap_ms: Option<u64>,
88    /// Declarative retention duration in milliseconds. `None` means no
89    /// retention policy is enforced. Set via `ALTER COLLECTION ... SET
90    /// RETENTION <duration>`. Issue #580 slice 1.
91    pub retention_duration_ms: Option<u64>,
92    pub declared_schema_mode: Option<SchemaMode>,
93    pub observed_schema_mode: SchemaMode,
94    pub entities: usize,
95    pub cross_refs: usize,
96    pub segments: usize,
97    pub indices: Vec<String>,
98    pub declared_indices: Vec<String>,
99    pub operational_indices: Vec<String>,
100    pub indexes_in_sync: bool,
101    pub missing_operational_indices: Vec<String>,
102    pub undeclared_operational_indices: Vec<String>,
103    pub queryable_index_count: usize,
104    pub indexes_requiring_rebuild_count: usize,
105    pub queryable_graph_projection_count: usize,
106    pub graph_projections_requiring_rematerialization_count: usize,
107    pub executable_analytics_job_count: usize,
108    pub analytics_jobs_requiring_rerun_count: usize,
109    pub subscriptions: Vec<SubscriptionDescriptor>,
110    /// Analytics views declared by `WITH ANALYTICS (...)`. Empty for
111    /// collections without the clause. Sourced from the persisted
112    /// `CollectionContract` so it survives restarts (issue #800).
113    pub analytics_config: Vec<AnalyticsViewDescriptor>,
114    /// Per-collection AI policy declared by `WITH (EMBED|MODERATE|VISION
115    /// (...))`. `None` when no AI clause is present. Sourced from the
116    /// persisted `CollectionContract` so introspection survives restarts
117    /// (issue #1271).
118    pub ai_policy: Option<AiPolicy>,
119    pub resources_in_sync: bool,
120    pub attention_required: bool,
121    pub attention_score: usize,
122    pub attention_reasons: Vec<String>,
123}
124
125#[derive(Debug, Clone)]
126pub struct CatalogModelSnapshot {
127    pub summary: CatalogSnapshot,
128    pub collections: Vec<CollectionDescriptor>,
129    pub indices: IndexCatalogSnapshot,
130    pub declared_indexes: Vec<PhysicalIndexState>,
131    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
132    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
133    pub operational_indexes: Vec<PhysicalIndexState>,
134    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
135    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
136    pub index_statuses: Vec<CatalogIndexStatus>,
137    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
138    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
139    pub queryable_index_count: usize,
140    pub indexes_requiring_rebuild_count: usize,
141    pub queryable_graph_projection_count: usize,
142    pub graph_projections_requiring_rematerialization_count: usize,
143    pub executable_analytics_job_count: usize,
144    pub analytics_jobs_requiring_rerun_count: usize,
145}
146
147#[derive(Debug, Clone)]
148pub struct CatalogAttentionSummary {
149    pub collections_requiring_attention: usize,
150    pub indexes_requiring_attention: usize,
151    pub graph_projections_requiring_attention: usize,
152    pub analytics_jobs_requiring_attention: usize,
153    pub top_collection: Option<CollectionDescriptor>,
154    pub top_index: Option<CatalogIndexStatus>,
155    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
156    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
157}
158
159#[derive(Debug, Clone, Default)]
160pub struct CatalogDeclarations {
161    pub declared_indexes: Vec<PhysicalIndexState>,
162    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
163    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
164    pub operational_indexes: Vec<PhysicalIndexState>,
165    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
166    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
167}
168
169#[derive(Debug, Clone)]
170pub struct CatalogGraphProjectionStatus {
171    pub name: String,
172    pub source: Option<String>,
173    pub lifecycle_state: String,
174    pub declared: bool,
175    pub operational: bool,
176    pub in_sync: bool,
177    pub last_materialized_sequence: Option<u64>,
178    pub queryable: bool,
179    pub requires_rematerialization: bool,
180    pub dependent_job_count: usize,
181    pub active_dependent_job_count: usize,
182    pub stale_dependent_job_count: usize,
183    pub dependent_jobs_in_sync: bool,
184    pub rerun_required: bool,
185    pub attention_score: usize,
186    pub attention_reasons: Vec<String>,
187}
188
189#[derive(Debug, Clone)]
190pub struct CatalogIndexStatus {
191    pub name: String,
192    pub collection: Option<String>,
193    pub kind: String,
194    pub declared: bool,
195    pub operational: bool,
196    pub enabled: bool,
197    pub build_state: Option<String>,
198    pub artifact_state: crate::physical::ArtifactState,
199    pub lifecycle_state: String,
200    pub in_sync: bool,
201    pub queryable: bool,
202    pub requires_rebuild: bool,
203    pub attention_score: usize,
204    pub attention_reasons: Vec<String>,
205}
206
207#[derive(Debug, Clone)]
208pub struct CatalogAnalyticsJobStatus {
209    pub id: String,
210    pub kind: String,
211    pub projection: Option<String>,
212    pub state: String,
213    pub lifecycle_state: String,
214    pub declared: bool,
215    pub operational: bool,
216    pub in_sync: bool,
217    pub last_run_sequence: Option<u64>,
218    pub projection_declared: Option<bool>,
219    pub projection_operational: Option<bool>,
220    pub projection_lifecycle_state: Option<String>,
221    pub dependency_in_sync: Option<bool>,
222    pub executable: bool,
223    pub requires_rerun: bool,
224    pub attention_score: usize,
225    pub attention_reasons: Vec<String>,
226}
227
228#[derive(Debug, Clone, Default)]
229pub struct CatalogConsistencyReport {
230    pub declared_index_count: usize,
231    pub operational_index_count: usize,
232    pub declared_graph_projection_count: usize,
233    pub operational_graph_projection_count: usize,
234    pub declared_analytics_job_count: usize,
235    pub operational_analytics_job_count: usize,
236    pub missing_operational_indexes: Vec<String>,
237    pub undeclared_operational_indexes: Vec<String>,
238    pub missing_operational_graph_projections: Vec<String>,
239    pub undeclared_operational_graph_projections: Vec<String>,
240    pub missing_operational_analytics_jobs: Vec<String>,
241    pub undeclared_operational_analytics_jobs: Vec<String>,
242}
243
244pub fn snapshot_store(
245    name: &str,
246    store: &UnifiedStore,
247    index_catalog: Option<&IndexCatalog>,
248) -> CatalogModelSnapshot {
249    snapshot_store_with_declarations(name, store, index_catalog, None, None)
250}
251
252pub fn snapshot_store_with_declarations(
253    name: &str,
254    store: &UnifiedStore,
255    index_catalog: Option<&IndexCatalog>,
256    declarations: Option<&CatalogDeclarations>,
257    contracts: Option<&[CollectionContract]>,
258) -> CatalogModelSnapshot {
259    let index_statuses = index_statuses(declarations);
260    let graph_projection_statuses = graph_projection_statuses(declarations);
261    let analytics_job_statuses = analytics_job_statuses(declarations);
262
263    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
264    for (collection, entity) in store.query_all(|_| true) {
265        grouped.entry(collection).or_default().push(entity);
266    }
267    let queue_policies = queue_policies_from_grouped(&grouped);
268
269    let mut stats_by_collection = BTreeMap::new();
270    let mut collections = Vec::new();
271
272    for collection_name in store.list_collections() {
273        let entities = grouped.remove(&collection_name).unwrap_or_default();
274        let inferred_model = infer_model(&entities);
275        let inferred_schema_mode = infer_schema_mode(inferred_model);
276        let contract = collection_contract(&collection_name, contracts);
277        let model = contract
278            .map(|contract| contract.declared_model)
279            .unwrap_or(inferred_model);
280        let cross_refs = entities
281            .iter()
282            .map(|entity| entity.cross_refs().len())
283            .sum();
284        let entity_count = entities.len();
285        let manager_stats = store
286            .get_collection(&collection_name)
287            .map(|manager| manager.stats());
288        let segments = manager_stats
289            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
290            .unwrap_or(0);
291
292        stats_by_collection.insert(
293            collection_name.clone(),
294            CollectionStats {
295                entities: entity_count,
296                cross_refs,
297                segments,
298            },
299        );
300
301        let declared_indices = declared_indices(&collection_name, declarations);
302        let operational_indices =
303            operational_indices(&collection_name, index_catalog, declarations);
304        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
305            collection_index_consistency(&declared_indices, &operational_indices);
306        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
307            collection_index_readiness(&collection_name, &index_statuses);
308        let (
309            queryable_graph_projection_count,
310            graph_projections_requiring_rematerialization_count,
311            graph_projections_locally_in_sync,
312        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
313        let (
314            executable_analytics_job_count,
315            analytics_jobs_requiring_rerun_count,
316            analytics_jobs_locally_in_sync,
317        ) = collection_analytics_job_readiness(
318            &collection_name,
319            &graph_projection_statuses,
320            &analytics_job_statuses,
321        );
322        let resources_in_sync = indexes_in_sync
323            && indexes_locally_in_sync
324            && graph_projections_locally_in_sync
325            && analytics_jobs_locally_in_sync;
326        let attention_required = !resources_in_sync
327            || indexes_requiring_rebuild_count > 0
328            || graph_projections_requiring_rematerialization_count > 0
329            || analytics_jobs_requiring_rerun_count > 0;
330        let mut attention_reasons = Vec::new();
331        if !indexes_in_sync || !indexes_locally_in_sync {
332            attention_reasons.push("index_drift".to_string());
333        }
334        if indexes_requiring_rebuild_count > 0 {
335            attention_reasons.push("indexes_require_rebuild".to_string());
336        }
337        if !graph_projections_locally_in_sync {
338            attention_reasons.push("graph_projection_drift".to_string());
339        }
340        if graph_projections_requiring_rematerialization_count > 0 {
341            attention_reasons.push("graph_projections_require_rematerialization".to_string());
342        }
343        if !analytics_jobs_locally_in_sync {
344            attention_reasons.push("analytics_job_drift".to_string());
345        }
346        if analytics_jobs_requiring_rerun_count > 0 {
347            attention_reasons.push("analytics_jobs_require_rerun".to_string());
348        }
349        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
350            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
351            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
352            + usize::from(!resources_in_sync);
353
354        collections.push(CollectionDescriptor {
355            name: collection_name.clone(),
356            model,
357            schema_mode: contract
358                .map(|contract| contract.schema_mode)
359                .unwrap_or(inferred_schema_mode),
360            contract_present: contract.is_some(),
361            contract_origin: contract.map(|contract| contract.origin),
362            declared_model: contract.map(|contract| contract.declared_model),
363            observed_model: inferred_model,
364            queue_mode: if model == CollectionModel::Queue {
365                Some(
366                    queue_policies
367                        .get(&collection_name)
368                        .map(|p| p.mode)
369                        .unwrap_or_default(),
370                )
371            } else {
372                None
373            },
374            queue_max_attempts: if model == CollectionModel::Queue {
375                Some(
376                    queue_policies
377                        .get(&collection_name)
378                        .map(|p| p.max_attempts)
379                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
380                )
381            } else {
382                None
383            },
384            queue_lock_deadline_ms: if model == CollectionModel::Queue {
385                Some(
386                    queue_policies
387                        .get(&collection_name)
388                        .map(|p| p.lock_deadline_ms)
389                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
390                )
391            } else {
392                None
393            },
394            queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
395                Some(
396                    queue_policies
397                        .get(&collection_name)
398                        .map(|p| p.in_flight_cap_per_group)
399                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
400                )
401            } else {
402                None
403            },
404            queue_dlq_target: if model == CollectionModel::Queue {
405                queue_policies
406                    .get(&collection_name)
407                    .and_then(|p| p.dlq_target.clone())
408            } else {
409                None
410            },
411            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
412            vector_metric: contract.and_then(|contract| contract.vector_metric),
413            session_key: contract.and_then(|contract| contract.session_key.clone()),
414            session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
415            retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
416            declared_schema_mode: contract.map(|contract| contract.schema_mode),
417            observed_schema_mode: inferred_schema_mode,
418            entities: entity_count,
419            cross_refs,
420            segments,
421            indices: infer_indices(&collection_name, model, index_catalog, declarations),
422            declared_indices,
423            operational_indices,
424            indexes_in_sync,
425            missing_operational_indices,
426            undeclared_operational_indices,
427            queryable_index_count,
428            indexes_requiring_rebuild_count,
429            queryable_graph_projection_count,
430            graph_projections_requiring_rematerialization_count,
431            executable_analytics_job_count,
432            analytics_jobs_requiring_rerun_count,
433            subscriptions: contract
434                .map(|contract| contract.subscriptions.clone())
435                .unwrap_or_default(),
436            analytics_config: contract
437                .map(|contract| contract.analytics_config.clone())
438                .unwrap_or_default(),
439            ai_policy: contract.and_then(|contract| contract.ai_policy.clone()),
440            resources_in_sync,
441            attention_required,
442            attention_score,
443            attention_reasons,
444        });
445    }
446
447    collections.sort_by(|left, right| left.name.cmp(&right.name));
448
449    let summary = CatalogSnapshot {
450        name: name.to_string(),
451        total_entities: stats_by_collection
452            .values()
453            .map(|stats| stats.entities)
454            .sum(),
455        total_collections: stats_by_collection.len(),
456        stats_by_collection,
457        updated_at: SystemTime::now(),
458    };
459
460    CatalogModelSnapshot {
461        summary,
462        collections,
463        indices: index_catalog
464            .map(IndexCatalog::snapshot)
465            .unwrap_or_default(),
466        declared_indexes: declarations
467            .map(|declarations| declarations.declared_indexes.clone())
468            .unwrap_or_default(),
469        declared_graph_projections: declarations
470            .map(|declarations| declarations.declared_graph_projections.clone())
471            .unwrap_or_default(),
472        declared_analytics_jobs: declarations
473            .map(|declarations| declarations.declared_analytics_jobs.clone())
474            .unwrap_or_default(),
475        operational_indexes: declarations
476            .map(|declarations| declarations.operational_indexes.clone())
477            .unwrap_or_default(),
478        operational_graph_projections: declarations
479            .map(|declarations| declarations.operational_graph_projections.clone())
480            .unwrap_or_default(),
481        operational_analytics_jobs: declarations
482            .map(|declarations| declarations.operational_analytics_jobs.clone())
483            .unwrap_or_default(),
484        queryable_index_count: index_statuses
485            .iter()
486            .filter(|status| status.queryable)
487            .count(),
488        indexes_requiring_rebuild_count: index_statuses
489            .iter()
490            .filter(|status| status.requires_rebuild)
491            .count(),
492        queryable_graph_projection_count: graph_projection_statuses
493            .iter()
494            .filter(|status| status.queryable)
495            .count(),
496        graph_projections_requiring_rematerialization_count: graph_projection_statuses
497            .iter()
498            .filter(|status| status.requires_rematerialization)
499            .count(),
500        executable_analytics_job_count: analytics_job_statuses
501            .iter()
502            .filter(|status| status.executable)
503            .count(),
504        analytics_jobs_requiring_rerun_count: analytics_job_statuses
505            .iter()
506            .filter(|status| status.requires_rerun)
507            .count(),
508        index_statuses,
509        graph_projection_statuses,
510        analytics_job_statuses,
511    }
512}
513
514/// Hot-fields snapshot of per-queue policy, materialised at catalog
515/// snapshot time so `QueueLifecycle` (and the descriptor) can read
516/// values without re-scanning `red_queue_meta`.
517#[derive(Debug, Clone)]
518pub struct QueuePolicySnapshot {
519    pub mode: QueueMode,
520    pub max_attempts: u32,
521    pub lock_deadline_ms: u64,
522    pub in_flight_cap_per_group: u32,
523    pub dlq_target: Option<String>,
524}
525
526fn queue_policies_from_grouped(
527    grouped: &HashMap<String, Vec<UnifiedEntity>>,
528) -> HashMap<String, QueuePolicySnapshot> {
529    let Some(meta) = grouped.get("red_queue_meta") else {
530        return HashMap::new();
531    };
532
533    let mut policies = HashMap::new();
534    for entity in meta {
535        let Some(row) = entity.data.as_row() else {
536            continue;
537        };
538        if row_text(row, "kind").as_deref() != Some("queue_config") {
539            continue;
540        }
541        let Some(queue) = row_text(row, "queue") else {
542            continue;
543        };
544        let mode = row_text(row, "mode")
545            .as_deref()
546            .and_then(QueueMode::parse)
547            .unwrap_or_default();
548        let max_attempts = row_u64(row, "max_attempts")
549            .map(|v| v as u32)
550            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
551        let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
552            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
553        let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
554            .map(|v| v as u32)
555            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
556        let dlq_target = row_text(row, "dlq");
557        policies.insert(
558            queue,
559            QueuePolicySnapshot {
560                mode,
561                max_attempts,
562                lock_deadline_ms,
563                in_flight_cap_per_group,
564                dlq_target,
565            },
566        );
567    }
568    policies
569}
570
571fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
572    match row.get_field(key) {
573        Some(Value::UnsignedInteger(v)) => Some(*v),
574        Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
575        _ => None,
576    }
577}
578
579fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
580    match row.get_field(key) {
581        Some(Value::Text(value)) => Some(value.to_string()),
582        _ => None,
583    }
584}
585
586fn collection_contract<'a>(
587    collection_name: &str,
588    contracts: Option<&'a [CollectionContract]>,
589) -> Option<&'a CollectionContract> {
590    contracts.and_then(|contracts| {
591        contracts
592            .iter()
593            .find(|contract| contract.name == collection_name)
594    })
595}
596
597fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
598    let mut has_table = false;
599    let mut has_graph = false;
600    let mut has_vector = false;
601
602    for entity in entities {
603        match &entity.kind {
604            EntityKind::TableRow { .. } => has_table = true,
605            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
606            EntityKind::Vector { .. } => has_vector = true,
607            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
608        }
609    }
610
611    match (has_table, has_graph, has_vector) {
612        (true, false, false) => CollectionModel::Table,
613        (false, true, false) => CollectionModel::Graph,
614        (false, false, true) => CollectionModel::Vector,
615        _ => CollectionModel::Mixed,
616    }
617}
618
619fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
620    match model {
621        CollectionModel::Table => SchemaMode::Strict,
622        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
623        CollectionModel::Document
624        | CollectionModel::Hll
625        | CollectionModel::Sketch
626        | CollectionModel::Filter
627        | CollectionModel::Kv
628        | CollectionModel::Config
629        | CollectionModel::Vault
630        | CollectionModel::Mixed => SchemaMode::Dynamic,
631        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
632        CollectionModel::Queue => SchemaMode::Dynamic,
633    }
634}
635
636fn infer_indices(
637    collection_name: &str,
638    model: CollectionModel,
639    index_catalog: Option<&IndexCatalog>,
640    declarations: Option<&CatalogDeclarations>,
641) -> Vec<String> {
642    let declared = declared_indices(collection_name, declarations);
643    if !declared.is_empty() {
644        return declared;
645    }
646
647    let available = index_catalog
648        .map(IndexCatalog::snapshot)
649        .unwrap_or_default();
650    let mut selected = Vec::new();
651
652    for metric in available {
653        let relevant = matches!(
654            (model, metric.kind),
655            (_, IndexKind::BTree)
656                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
657                | (CollectionModel::Vector, IndexKind::VectorHnsw)
658                | (CollectionModel::Vector, IndexKind::VectorInverted)
659                | (CollectionModel::Vector, IndexKind::VectorTurbo)
660                | (CollectionModel::Document, IndexKind::FullText)
661                | (CollectionModel::Document, IndexKind::DocumentPathValue)
662                | (CollectionModel::Kv, IndexKind::Hash)
663                | (CollectionModel::Config, IndexKind::Hash)
664                | (CollectionModel::Vault, IndexKind::Hash)
665                | (CollectionModel::Mixed, _)
666        );
667
668        if relevant && metric.enabled {
669            selected.push(metric.name);
670        }
671    }
672
673    selected
674}
675
676fn declared_indices(
677    collection_name: &str,
678    declarations: Option<&CatalogDeclarations>,
679) -> Vec<String> {
680    let mut selected = declarations
681        .map(|declarations| {
682            declarations
683                .declared_indexes
684                .iter()
685                .filter(|index| index.collection.as_deref() == Some(collection_name))
686                .map(|index| index.name.clone())
687                .collect::<Vec<_>>()
688        })
689        .unwrap_or_default();
690    selected.sort();
691    selected.dedup();
692    selected
693}
694
695fn operational_indices(
696    collection_name: &str,
697    index_catalog: Option<&IndexCatalog>,
698    declarations: Option<&CatalogDeclarations>,
699) -> Vec<String> {
700    if let Some(declarations) = declarations {
701        let mut selected = declarations
702            .operational_indexes
703            .iter()
704            .filter(|index| index.collection.as_deref() == Some(collection_name))
705            .map(|index| index.name.clone())
706            .collect::<Vec<_>>();
707        selected.sort();
708        selected.dedup();
709        return selected;
710    }
711
712    let mut selected = index_catalog
713        .map(IndexCatalog::snapshot)
714        .unwrap_or_default()
715        .into_iter()
716        .filter(|metric| metric.enabled)
717        .filter_map(|metric| {
718            if metric.name.starts_with(collection_name) {
719                Some(metric.name)
720            } else {
721                None
722            }
723        })
724        .collect::<Vec<_>>();
725    selected.sort();
726    selected.dedup();
727    selected
728}
729
730fn collection_index_consistency(
731    declared_indices: &[String],
732    operational_indices: &[String],
733) -> (bool, Vec<String>, Vec<String>) {
734    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
735    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
736    let missing_operational_indices = declared
737        .difference(&operational)
738        .cloned()
739        .collect::<Vec<_>>();
740    let undeclared_operational_indices = operational
741        .difference(&declared)
742        .cloned()
743        .collect::<Vec<_>>();
744    (
745        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
746        missing_operational_indices,
747        undeclared_operational_indices,
748    )
749}
750
751fn collection_index_readiness(
752    collection_name: &str,
753    statuses: &[CatalogIndexStatus],
754) -> (usize, usize, bool) {
755    let local = statuses
756        .iter()
757        .filter(|status| status.collection.as_deref() == Some(collection_name))
758        .collect::<Vec<_>>();
759    let queryable_count = local.iter().filter(|status| status.queryable).count();
760    let requires_rebuild_count = local
761        .iter()
762        .filter(|status| status.requires_rebuild)
763        .count();
764    let locally_in_sync = local.iter().all(|status| status.in_sync);
765    (queryable_count, requires_rebuild_count, locally_in_sync)
766}
767
768fn collection_graph_projection_readiness(
769    collection_name: &str,
770    statuses: &[CatalogGraphProjectionStatus],
771) -> (usize, usize, bool) {
772    let local = statuses
773        .iter()
774        .filter(|status| status.source.as_deref() == Some(collection_name))
775        .collect::<Vec<_>>();
776    let queryable_count = local.iter().filter(|status| status.queryable).count();
777    let requires_rematerialization_count = local
778        .iter()
779        .filter(|status| status.requires_rematerialization)
780        .count();
781    let locally_in_sync = local
782        .iter()
783        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
784    (
785        queryable_count,
786        requires_rematerialization_count,
787        locally_in_sync,
788    )
789}
790
791fn collection_analytics_job_readiness(
792    collection_name: &str,
793    projection_statuses: &[CatalogGraphProjectionStatus],
794    job_statuses: &[CatalogAnalyticsJobStatus],
795) -> (usize, usize, bool) {
796    let local = job_statuses
797        .iter()
798        .filter(|status| {
799            let Some(projection_name) = status.projection.as_deref() else {
800                return false;
801            };
802            projection_statuses
803                .iter()
804                .find(|projection| projection.name == projection_name)
805                .and_then(|projection| projection.source.as_deref())
806                == Some(collection_name)
807        })
808        .collect::<Vec<_>>();
809    let executable_count = local.iter().filter(|status| status.executable).count();
810    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
811    let locally_in_sync = local
812        .iter()
813        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
814    (executable_count, requires_rerun_count, locally_in_sync)
815}
816
817fn graph_projection_statuses(
818    declarations: Option<&CatalogDeclarations>,
819) -> Vec<CatalogGraphProjectionStatus> {
820    let declared = declarations
821        .map(|declarations| declarations.declared_graph_projections.as_slice())
822        .unwrap_or(&[]);
823    let operational = declarations
824        .map(|declarations| declarations.operational_graph_projections.as_slice())
825        .unwrap_or(&[]);
826    let declared_jobs = declarations
827        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
828        .unwrap_or(&[]);
829    let operational_jobs = declarations
830        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
831        .unwrap_or(&[]);
832
833    let mut names = declared
834        .iter()
835        .map(|projection| projection.name.clone())
836        .chain(operational.iter().map(|projection| projection.name.clone()))
837        .collect::<Vec<_>>();
838    names.sort();
839    names.dedup();
840
841    names
842        .into_iter()
843        .map(|name| {
844            let declared_projection = declared.iter().find(|projection| projection.name == name);
845            let operational_projection = operational
846                .iter()
847                .find(|projection| projection.name == name);
848            let declared_present = declared_projection.is_some();
849            let operational_present = operational_projection.is_some();
850            let mut dependent_job_ids = BTreeSet::new();
851            let mut dependent_job_count = 0usize;
852            let mut active_dependent_job_count = 0usize;
853            let mut stale_dependent_job_count = 0usize;
854            for job in declared_jobs
855                .iter()
856                .chain(operational_jobs.iter())
857                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
858            {
859                if !dependent_job_ids.insert(job.id.clone()) {
860                    continue;
861                }
862                dependent_job_count += 1;
863                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
864                    active_dependent_job_count += 1;
865                }
866                if job.state == "stale" {
867                    stale_dependent_job_count += 1;
868                }
869            }
870            let lifecycle_state = match (
871                declared_present,
872                operational_present,
873                declared_projection
874                    .map(|projection| projection.state.as_str())
875                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
876                    .unwrap_or_default(),
877            ) {
878                (true, _, "failed") => "failed",
879                (true, true, "stale") => "stale",
880                (true, _, "materializing") => "materializing",
881                (true, true, "materialized") => "materialized",
882                (true, true, _) => "materialized",
883                (false, true, _) => "orphaned-operational",
884                (true, false, _) => "declared",
885                (false, false, _) => "unknown",
886            }
887            .to_string();
888            let queryable = declared_present
889                && operational_present
890                && matches!(
891                    declared_projection
892                        .map(|projection| projection.state.as_str())
893                        .or_else(
894                            || operational_projection.map(|projection| projection.state.as_str())
895                        )
896                        .unwrap_or_default(),
897                    "materialized"
898                );
899            let requires_rematerialization = matches!(
900                declared_projection
901                    .map(|projection| projection.state.as_str())
902                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
903                    .unwrap_or_default(),
904                "declared" | "materializing" | "failed" | "stale"
905            );
906            let mut attention_reasons = Vec::new();
907            if !declared_present || !operational_present {
908                attention_reasons.push("declaration_drift".to_string());
909            }
910            if requires_rematerialization {
911                attention_reasons.push("requires_rematerialization".to_string());
912            }
913            if stale_dependent_job_count > 0 {
914                attention_reasons.push("dependent_jobs_stale".to_string());
915            }
916            let attention_score = stale_dependent_job_count.saturating_mul(2)
917                + usize::from(requires_rematerialization).saturating_mul(4)
918                + usize::from(!declared_present || !operational_present)
919                + usize::from(!queryable);
920            CatalogGraphProjectionStatus {
921                name,
922                source: declared_projection
923                    .map(|projection| projection.source.clone())
924                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
925                lifecycle_state,
926                declared: declared_present,
927                operational: operational_present,
928                in_sync: declared_present == operational_present,
929                last_materialized_sequence: declared_projection
930                    .and_then(|projection| projection.last_materialized_sequence)
931                    .or_else(|| {
932                        operational_projection
933                            .and_then(|projection| projection.last_materialized_sequence)
934                    }),
935                queryable,
936                requires_rematerialization,
937                dependent_job_count,
938                active_dependent_job_count,
939                stale_dependent_job_count,
940                dependent_jobs_in_sync: stale_dependent_job_count == 0,
941                rerun_required: stale_dependent_job_count > 0,
942                attention_score,
943                attention_reasons,
944            }
945        })
946        .collect()
947}
948
949fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
950    let declared = declarations
951        .map(|declarations| declarations.declared_indexes.as_slice())
952        .unwrap_or(&[]);
953    let operational = declarations
954        .map(|declarations| declarations.operational_indexes.as_slice())
955        .unwrap_or(&[]);
956
957    let mut names = declared
958        .iter()
959        .map(|index| index.name.clone())
960        .chain(operational.iter().map(|index| index.name.clone()))
961        .collect::<Vec<_>>();
962    names.sort();
963    names.dedup();
964
965    names
966        .into_iter()
967        .map(|name| {
968            let declared_index = declared.iter().find(|index| index.name == name);
969            let operational_index = operational.iter().find(|index| index.name == name);
970            let collection = declared_index
971                .and_then(|index| index.collection.clone())
972                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
973            let kind = declared_index
974                .map(|index| index_kind_string(index.kind))
975                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
976                .unwrap_or_default();
977            let enabled = declared_index
978                .map(|index| index.enabled)
979                .or_else(|| operational_index.map(|index| index.enabled))
980                .unwrap_or(false);
981            let build_state = operational_index
982                .map(|index| index.build_state.clone())
983                .or_else(|| declared_index.map(|index| index.build_state.clone()));
984            let declared_present = declared_index.is_some();
985            let operational_present = operational_index.is_some();
986            let lifecycle_state = index_lifecycle_state(
987                declared_present,
988                operational_present,
989                enabled,
990                build_state.as_deref(),
991            );
992
993            let mut attention_reasons = Vec::new();
994            if declared_present != operational_present {
995                attention_reasons.push("declaration_drift".to_string());
996            }
997            if !enabled && declared_present {
998                attention_reasons.push("disabled".to_string());
999            }
1000            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1001                attention_reasons.push("failed".to_string());
1002            }
1003            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1004                attention_reasons.push("stale".to_string());
1005            }
1006            if matches!(
1007                build_state.as_deref().unwrap_or_default(),
1008                "building" | "declared-unbuilt"
1009            ) {
1010                attention_reasons.push("requires_rebuild".to_string());
1011            }
1012            let queryable = declared_present
1013                && operational_present
1014                && enabled
1015                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1016            let requires_rebuild = matches!(
1017                build_state.as_deref().unwrap_or_default(),
1018                "declared-unbuilt" | "building" | "stale" | "failed"
1019            );
1020            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1021                + usize::from(!enabled && declared_present)
1022                + usize::from(declared_present != operational_present)
1023                + usize::from(!queryable);
1024
1025            let artifact_state = crate::physical::ArtifactState::from_build_state(
1026                build_state.as_deref().unwrap_or("declared-unbuilt"),
1027                enabled,
1028            );
1029            CatalogIndexStatus {
1030                name,
1031                collection,
1032                kind,
1033                declared: declared_present,
1034                operational: operational_present,
1035                enabled,
1036                build_state,
1037                artifact_state,
1038                lifecycle_state,
1039                in_sync: declared_present == operational_present,
1040                queryable,
1041                requires_rebuild,
1042                attention_score,
1043                attention_reasons,
1044            }
1045        })
1046        .collect()
1047}
1048
1049fn index_lifecycle_state(
1050    declared: bool,
1051    operational: bool,
1052    enabled: bool,
1053    build_state: Option<&str>,
1054) -> String {
1055    if !declared && operational {
1056        return "orphaned-operational".to_string();
1057    }
1058    if declared && !enabled {
1059        return "disabled".to_string();
1060    }
1061    if !declared {
1062        return "unknown".to_string();
1063    }
1064    if !operational {
1065        return "declared".to_string();
1066    }
1067
1068    match build_state.unwrap_or_default() {
1069        "ready" => "ready".to_string(),
1070        "failed" => "failed".to_string(),
1071        "stale" => "stale".to_string(),
1072        "declared-unbuilt" => "declared".to_string(),
1073        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1074            "building".to_string()
1075        }
1076        _ => "building".to_string(),
1077    }
1078}
1079
1080fn index_kind_string(kind: IndexKind) -> String {
1081    match kind {
1082        IndexKind::BTree => "btree",
1083        IndexKind::Hash => "hash",
1084        IndexKind::Bitmap => "bitmap",
1085        IndexKind::Spatial => "spatial.rtree",
1086        IndexKind::VectorHnsw => "vector.hnsw",
1087        IndexKind::VectorInverted => "vector.inverted",
1088        IndexKind::VectorTurbo => "vector.turbo",
1089        IndexKind::GraphAdjacency => "graph.adjacency",
1090        IndexKind::FullText => "text.fulltext",
1091        IndexKind::DocumentPathValue => "document.pathvalue",
1092        IndexKind::HybridSearch => "search.hybrid",
1093    }
1094    .to_string()
1095}
1096
1097fn analytics_job_statuses(
1098    declarations: Option<&CatalogDeclarations>,
1099) -> Vec<CatalogAnalyticsJobStatus> {
1100    let declared = declarations
1101        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1102        .unwrap_or(&[]);
1103    let operational = declarations
1104        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1105        .unwrap_or(&[]);
1106    let declared_projections = declarations
1107        .map(|declarations| declarations.declared_graph_projections.as_slice())
1108        .unwrap_or(&[]);
1109    let operational_projections = declarations
1110        .map(|declarations| declarations.operational_graph_projections.as_slice())
1111        .unwrap_or(&[]);
1112
1113    let mut ids = declared
1114        .iter()
1115        .map(|job| job.id.clone())
1116        .chain(operational.iter().map(|job| job.id.clone()))
1117        .collect::<Vec<_>>();
1118    ids.sort();
1119    ids.dedup();
1120
1121    ids.into_iter()
1122        .map(|id| {
1123            let declared_job = declared.iter().find(|job| job.id == id);
1124            let operational_job = operational.iter().find(|job| job.id == id);
1125            let kind = declared_job
1126                .map(|job| job.kind.clone())
1127                .or_else(|| operational_job.map(|job| job.kind.clone()))
1128                .unwrap_or_default();
1129            let projection = declared_job
1130                .and_then(|job| job.projection.clone())
1131                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1132            let state = declared_job
1133                .map(|job| job.state.clone())
1134                .or_else(|| operational_job.map(|job| job.state.clone()))
1135                .unwrap_or_default();
1136            let declared_present = declared_job.is_some();
1137            let operational_present = operational_job.is_some();
1138            let last_run_sequence = declared_job
1139                .and_then(|job| job.last_run_sequence)
1140                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1141            let projection_declared = projection.as_ref().map(|projection_name| {
1142                declared_projections
1143                    .iter()
1144                    .any(|projection| projection.name == *projection_name)
1145            });
1146            let projection_operational = projection.as_ref().map(|projection_name| {
1147                operational_projections
1148                    .iter()
1149                    .any(|projection| projection.name == *projection_name)
1150            });
1151            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1152                declared_projections
1153                    .iter()
1154                    .find(|projection| projection.name == *projection_name)
1155                    .map(|projection| projection.state.as_str())
1156                    .or_else(|| {
1157                        operational_projections
1158                            .iter()
1159                            .find(|projection| projection.name == *projection_name)
1160                            .map(|projection| projection.state.as_str())
1161                    })
1162            });
1163            let dependency_in_sync = projection.as_ref().map(|_| {
1164                matches!(projection_lifecycle, Some("materialized"))
1165                    && projection_operational == Some(true)
1166            });
1167            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1168                (true, false, _) => "declared",
1169                (true, true, _)
1170                    if matches!(
1171                        projection_lifecycle,
1172                        Some("stale" | "failed" | "materializing" | "declared")
1173                    ) =>
1174                {
1175                    "stale"
1176                }
1177                (true, true, "completed") => "completed",
1178                (true, true, "running") => "running",
1179                (true, true, "failed") => "failed",
1180                (true, true, "queued") => "queued",
1181                (true, true, "stale") => "stale",
1182                (true, true, _) => "materialized",
1183                (false, true, _) => "orphaned-operational",
1184                (false, false, _) => "unknown",
1185            }
1186            .to_string();
1187            let executable = declared_present
1188                && operational_present
1189                && !matches!(state.as_str(), "failed" | "stale")
1190                && dependency_in_sync.unwrap_or(true);
1191            let requires_rerun =
1192                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1193            let mut attention_reasons = Vec::new();
1194            if declared_present != operational_present {
1195                attention_reasons.push("declaration_drift".to_string());
1196            }
1197            if matches!(state.as_str(), "failed") {
1198                attention_reasons.push("failed".to_string());
1199            }
1200            if matches!(state.as_str(), "stale") {
1201                attention_reasons.push("stale".to_string());
1202            }
1203            if dependency_in_sync == Some(false) {
1204                attention_reasons.push("dependency_drift".to_string());
1205            }
1206            if requires_rerun {
1207                attention_reasons.push("requires_rerun".to_string());
1208            }
1209            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1210                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1211                + usize::from(declared_present != operational_present)
1212                + usize::from(!executable);
1213            CatalogAnalyticsJobStatus {
1214                id,
1215                kind,
1216                projection,
1217                state: state.clone(),
1218                lifecycle_state,
1219                declared: declared_present,
1220                operational: operational_present,
1221                in_sync: declared_present == operational_present,
1222                last_run_sequence,
1223                projection_declared,
1224                projection_operational,
1225                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1226                dependency_in_sync,
1227                executable,
1228                requires_rerun,
1229                attention_score,
1230                attention_reasons,
1231            }
1232        })
1233        .collect()
1234}
1235
1236pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1237    let declared_indexes = snapshot
1238        .declared_indexes
1239        .iter()
1240        .map(|index| index.name.clone())
1241        .collect::<BTreeSet<_>>();
1242    let operational_indexes = snapshot
1243        .operational_indexes
1244        .iter()
1245        .map(|index| index.name.clone())
1246        .collect::<BTreeSet<_>>();
1247    let declared_graph_projections = snapshot
1248        .declared_graph_projections
1249        .iter()
1250        .map(|projection| projection.name.clone())
1251        .collect::<BTreeSet<_>>();
1252    let operational_graph_projections = snapshot
1253        .operational_graph_projections
1254        .iter()
1255        .map(|projection| projection.name.clone())
1256        .collect::<BTreeSet<_>>();
1257    let declared_analytics_jobs = snapshot
1258        .declared_analytics_jobs
1259        .iter()
1260        .map(|job| job.id.clone())
1261        .collect::<BTreeSet<_>>();
1262    let operational_analytics_jobs = snapshot
1263        .operational_analytics_jobs
1264        .iter()
1265        .map(|job| job.id.clone())
1266        .collect::<BTreeSet<_>>();
1267
1268    CatalogConsistencyReport {
1269        declared_index_count: declared_indexes.len(),
1270        operational_index_count: operational_indexes.len(),
1271        declared_graph_projection_count: declared_graph_projections.len(),
1272        operational_graph_projection_count: operational_graph_projections.len(),
1273        declared_analytics_job_count: declared_analytics_jobs.len(),
1274        operational_analytics_job_count: operational_analytics_jobs.len(),
1275        missing_operational_indexes: declared_indexes
1276            .difference(&operational_indexes)
1277            .cloned()
1278            .collect(),
1279        undeclared_operational_indexes: operational_indexes
1280            .difference(&declared_indexes)
1281            .cloned()
1282            .collect(),
1283        missing_operational_graph_projections: declared_graph_projections
1284            .difference(&operational_graph_projections)
1285            .cloned()
1286            .collect(),
1287        undeclared_operational_graph_projections: operational_graph_projections
1288            .difference(&declared_graph_projections)
1289            .cloned()
1290            .collect(),
1291        missing_operational_analytics_jobs: declared_analytics_jobs
1292            .difference(&operational_analytics_jobs)
1293            .cloned()
1294            .collect(),
1295        undeclared_operational_analytics_jobs: operational_analytics_jobs
1296            .difference(&declared_analytics_jobs)
1297            .cloned()
1298            .collect(),
1299    }
1300}
1301
1302pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1303    CatalogAttentionSummary {
1304        collections_requiring_attention: snapshot
1305            .collections
1306            .iter()
1307            .filter(|collection| collection.attention_required)
1308            .count(),
1309        indexes_requiring_attention: snapshot
1310            .index_statuses
1311            .iter()
1312            .filter(|status| status.attention_score > 0)
1313            .count(),
1314        graph_projections_requiring_attention: snapshot
1315            .graph_projection_statuses
1316            .iter()
1317            .filter(|status| status.attention_score > 0)
1318            .count(),
1319        analytics_jobs_requiring_attention: snapshot
1320            .analytics_job_statuses
1321            .iter()
1322            .filter(|status| status.attention_score > 0)
1323            .count(),
1324        top_collection: snapshot
1325            .collections
1326            .iter()
1327            .filter(|collection| collection.attention_score > 0)
1328            .max_by_key(|collection| collection.attention_score)
1329            .cloned(),
1330        top_index: snapshot
1331            .index_statuses
1332            .iter()
1333            .filter(|status| status.attention_score > 0)
1334            .max_by_key(|status| status.attention_score)
1335            .cloned(),
1336        top_graph_projection: snapshot
1337            .graph_projection_statuses
1338            .iter()
1339            .filter(|status| status.attention_score > 0)
1340            .max_by_key(|status| status.attention_score)
1341            .cloned(),
1342        top_analytics_job: snapshot
1343            .analytics_job_statuses
1344            .iter()
1345            .filter(|status| status.attention_score > 0)
1346            .max_by_key(|status| status.attention_score)
1347            .cloned(),
1348    }
1349}
1350
1351pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1352    let mut collections = snapshot
1353        .collections
1354        .iter()
1355        .filter(|collection| collection.attention_required)
1356        .cloned()
1357        .collect::<Vec<_>>();
1358    collections.sort_by(|left, right| {
1359        right
1360            .attention_score
1361            .cmp(&left.attention_score)
1362            .then_with(|| left.name.cmp(&right.name))
1363    });
1364    collections
1365}
1366
1367pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1368    let mut statuses = snapshot
1369        .index_statuses
1370        .iter()
1371        .filter(|status| status.attention_score > 0)
1372        .cloned()
1373        .collect::<Vec<_>>();
1374    statuses.sort_by(|left, right| {
1375        right
1376            .attention_score
1377            .cmp(&left.attention_score)
1378            .then_with(|| left.name.cmp(&right.name))
1379    });
1380    statuses
1381}
1382
1383pub fn graph_projection_attention(
1384    snapshot: &CatalogModelSnapshot,
1385) -> Vec<CatalogGraphProjectionStatus> {
1386    let mut statuses = snapshot
1387        .graph_projection_statuses
1388        .iter()
1389        .filter(|status| status.attention_score > 0)
1390        .cloned()
1391        .collect::<Vec<_>>();
1392    statuses.sort_by(|left, right| {
1393        right
1394            .attention_score
1395            .cmp(&left.attention_score)
1396            .then_with(|| left.name.cmp(&right.name))
1397    });
1398    statuses
1399}
1400
1401pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1402    let mut statuses = snapshot
1403        .analytics_job_statuses
1404        .iter()
1405        .filter(|status| status.attention_score > 0)
1406        .cloned()
1407        .collect::<Vec<_>>();
1408    statuses.sort_by(|left, right| {
1409        right
1410            .attention_score
1411            .cmp(&left.attention_score)
1412            .then_with(|| left.id.cmp(&right.id))
1413    });
1414    statuses
1415}