Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Hll,
23    Sketch,
24    Filter,
25    Kv,
26    Config,
27    Vault,
28    Mixed,
29    TimeSeries,
30    Queue,
31    Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36    Strict,
37    SemiStructured,
38    Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43    Insert,
44    Update,
45    Delete,
46}
47
48impl SubscriptionOperation {
49    pub fn as_str(self) -> &'static str {
50        match self {
51            Self::Insert => "INSERT",
52            Self::Update => "UPDATE",
53            Self::Delete => "DELETE",
54        }
55    }
56
57    pub fn from_str(value: &str) -> Option<Self> {
58        match value.to_ascii_uppercase().as_str() {
59            "INSERT" => Some(Self::Insert),
60            "UPDATE" => Some(Self::Update),
61            "DELETE" => Some(Self::Delete),
62            _ => None,
63        }
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
70    pub name: String,
71    pub source: String,
72    pub target_queue: String,
73    pub ops_filter: Vec<SubscriptionOperation>,
74    pub where_filter: Option<String>,
75    pub redact_fields: Vec<String>,
76    pub enabled: bool,
77    /// When true, events are routed to the bare `target_queue` regardless of
78    /// the current tenant — a cluster-wide subscription. When false (default),
79    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
80    /// context is active, enforcing per-tenant isolation.
81    pub all_tenants: bool,
82}
83
84/// A graph-analytics output declared by `CREATE GRAPH ... WITH ANALYTICS (...)`.
85///
86/// Each variant maps to a family of pure graph algorithms (issues #795-#797)
87/// and resolves as a virtual `<graph>.<output>` view returning that family's
88/// native row shape. The `using` option selects the concrete algorithm inside
89/// the family (e.g. `centrality (using = pagerank)`); the remaining options are
90/// algorithm parameters carried verbatim into the executor.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
92pub enum AnalyticsOutput {
93    Communities,
94    Components,
95    Centrality,
96}
97
98impl AnalyticsOutput {
99    pub fn as_str(self) -> &'static str {
100        match self {
101            Self::Communities => "communities",
102            Self::Components => "components",
103            Self::Centrality => "centrality",
104        }
105    }
106
107    pub fn from_str(value: &str) -> Option<Self> {
108        match value.to_ascii_lowercase().as_str() {
109            "communities" => Some(Self::Communities),
110            "components" => Some(Self::Components),
111            "centrality" => Some(Self::Centrality),
112            _ => None,
113        }
114    }
115}
116
117/// One enabled analytics output plus its declared options. Persisted in the
118/// parent graph's `CollectionContract` (WAL-backed) and surfaced on the
119/// `CollectionDescriptor` so the resolver can recognise `<graph>.<output>`.
120///
121/// `SHOW COLLECTIONS` behaviour (issue #800 HITL decision): analytics outputs
122/// resolve as virtual `<graph>.<output>` views and are deliberately **not**
123/// registered as top-level collections. They therefore never appear in
124/// `SHOW COLLECTIONS` — not by default and not under `SHOW COLLECTIONS
125/// INCLUDING INTERNAL` — keeping the parent graph's listing clean. Only the
126/// parent graph collection is listed; its enabled outputs are introspectable
127/// through this `analytics_config` on the parent's descriptor.
128#[derive(Debug, Clone, PartialEq)]
129pub struct AnalyticsViewDescriptor {
130    pub output: AnalyticsOutput,
131    /// `using = <algorithm>` — concrete algorithm within the output family.
132    /// `None` resolves to the family default (louvain / connected-components /
133    /// pagerank).
134    pub algorithm: Option<String>,
135    /// `resolution = <f64>` — Louvain resolution (γ) for `communities`.
136    pub resolution: Option<f64>,
137    /// `max_iterations = <i64>` — iteration cap for iterative centralities.
138    pub max_iterations: Option<i64>,
139    /// `tolerance = <f64>` — convergence tolerance for iterative centralities.
140    pub tolerance: Option<f64>,
141}
142
143#[derive(Debug, Clone)]
144pub struct CollectionDescriptor {
145    pub name: String,
146    pub model: CollectionModel,
147    pub schema_mode: SchemaMode,
148    pub contract_present: bool,
149    pub contract_origin: Option<crate::physical::ContractOrigin>,
150    pub declared_model: Option<CollectionModel>,
151    pub observed_model: CollectionModel,
152    pub queue_mode: Option<QueueMode>,
153    /// MAX_ATTEMPTS per-queue policy. Hot-fields tier — populated for
154    /// `model = queue` from the catalog snapshot for sub-ms reads by
155    /// `QueueLifecycle`. `None` for non-queue collections.
156    pub queue_max_attempts: Option<u32>,
157    /// LOCK_DEADLINE_MS per-queue policy. Hot-fields tier.
158    pub queue_lock_deadline_ms: Option<u64>,
159    /// IN_FLIGHT_CAP_PER_GROUP per-queue policy. Hot-fields tier.
160    pub queue_in_flight_cap_per_group: Option<u32>,
161    /// DLQ target collection name. `None` means on-max drop.
162    pub queue_dlq_target: Option<String>,
163    pub vector_dimension: Option<usize>,
164    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
165    /// `WITH SESSION_KEY <col>` declared on the timeseries collection.
166    /// `None` for non-timeseries and for timeseries without the clause.
167    /// Issue #576 slice 1.
168    pub session_key: Option<String>,
169    /// `SESSION_GAP <duration>` in milliseconds declared on the
170    /// timeseries collection. Issue #576 slice 1.
171    pub session_gap_ms: Option<u64>,
172    /// Declarative retention duration in milliseconds. `None` means no
173    /// retention policy is enforced. Set via `ALTER COLLECTION ... SET
174    /// RETENTION <duration>`. Issue #580 slice 1.
175    pub retention_duration_ms: Option<u64>,
176    pub declared_schema_mode: Option<SchemaMode>,
177    pub observed_schema_mode: SchemaMode,
178    pub entities: usize,
179    pub cross_refs: usize,
180    pub segments: usize,
181    pub indices: Vec<String>,
182    pub declared_indices: Vec<String>,
183    pub operational_indices: Vec<String>,
184    pub indexes_in_sync: bool,
185    pub missing_operational_indices: Vec<String>,
186    pub undeclared_operational_indices: Vec<String>,
187    pub queryable_index_count: usize,
188    pub indexes_requiring_rebuild_count: usize,
189    pub queryable_graph_projection_count: usize,
190    pub graph_projections_requiring_rematerialization_count: usize,
191    pub executable_analytics_job_count: usize,
192    pub analytics_jobs_requiring_rerun_count: usize,
193    pub subscriptions: Vec<SubscriptionDescriptor>,
194    /// Analytics views declared by `WITH ANALYTICS (...)`. Empty for
195    /// collections without the clause. Sourced from the persisted
196    /// `CollectionContract` so it survives restarts (issue #800).
197    pub analytics_config: Vec<AnalyticsViewDescriptor>,
198    pub resources_in_sync: bool,
199    pub attention_required: bool,
200    pub attention_score: usize,
201    pub attention_reasons: Vec<String>,
202}
203
204#[derive(Debug, Clone)]
205pub struct CatalogModelSnapshot {
206    pub summary: CatalogSnapshot,
207    pub collections: Vec<CollectionDescriptor>,
208    pub indices: IndexCatalogSnapshot,
209    pub declared_indexes: Vec<PhysicalIndexState>,
210    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
211    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
212    pub operational_indexes: Vec<PhysicalIndexState>,
213    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
214    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
215    pub index_statuses: Vec<CatalogIndexStatus>,
216    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
217    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
218    pub queryable_index_count: usize,
219    pub indexes_requiring_rebuild_count: usize,
220    pub queryable_graph_projection_count: usize,
221    pub graph_projections_requiring_rematerialization_count: usize,
222    pub executable_analytics_job_count: usize,
223    pub analytics_jobs_requiring_rerun_count: usize,
224}
225
226#[derive(Debug, Clone)]
227pub struct CatalogAttentionSummary {
228    pub collections_requiring_attention: usize,
229    pub indexes_requiring_attention: usize,
230    pub graph_projections_requiring_attention: usize,
231    pub analytics_jobs_requiring_attention: usize,
232    pub top_collection: Option<CollectionDescriptor>,
233    pub top_index: Option<CatalogIndexStatus>,
234    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
235    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
236}
237
238#[derive(Debug, Clone, Default)]
239pub struct CatalogDeclarations {
240    pub declared_indexes: Vec<PhysicalIndexState>,
241    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
242    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
243    pub operational_indexes: Vec<PhysicalIndexState>,
244    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
245    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
246}
247
248#[derive(Debug, Clone)]
249pub struct CatalogGraphProjectionStatus {
250    pub name: String,
251    pub source: Option<String>,
252    pub lifecycle_state: String,
253    pub declared: bool,
254    pub operational: bool,
255    pub in_sync: bool,
256    pub last_materialized_sequence: Option<u64>,
257    pub queryable: bool,
258    pub requires_rematerialization: bool,
259    pub dependent_job_count: usize,
260    pub active_dependent_job_count: usize,
261    pub stale_dependent_job_count: usize,
262    pub dependent_jobs_in_sync: bool,
263    pub rerun_required: bool,
264    pub attention_score: usize,
265    pub attention_reasons: Vec<String>,
266}
267
268#[derive(Debug, Clone)]
269pub struct CatalogIndexStatus {
270    pub name: String,
271    pub collection: Option<String>,
272    pub kind: String,
273    pub declared: bool,
274    pub operational: bool,
275    pub enabled: bool,
276    pub build_state: Option<String>,
277    pub artifact_state: crate::physical::ArtifactState,
278    pub lifecycle_state: String,
279    pub in_sync: bool,
280    pub queryable: bool,
281    pub requires_rebuild: bool,
282    pub attention_score: usize,
283    pub attention_reasons: Vec<String>,
284}
285
286#[derive(Debug, Clone)]
287pub struct CatalogAnalyticsJobStatus {
288    pub id: String,
289    pub kind: String,
290    pub projection: Option<String>,
291    pub state: String,
292    pub lifecycle_state: String,
293    pub declared: bool,
294    pub operational: bool,
295    pub in_sync: bool,
296    pub last_run_sequence: Option<u64>,
297    pub projection_declared: Option<bool>,
298    pub projection_operational: Option<bool>,
299    pub projection_lifecycle_state: Option<String>,
300    pub dependency_in_sync: Option<bool>,
301    pub executable: bool,
302    pub requires_rerun: bool,
303    pub attention_score: usize,
304    pub attention_reasons: Vec<String>,
305}
306
307#[derive(Debug, Clone, Default)]
308pub struct CatalogConsistencyReport {
309    pub declared_index_count: usize,
310    pub operational_index_count: usize,
311    pub declared_graph_projection_count: usize,
312    pub operational_graph_projection_count: usize,
313    pub declared_analytics_job_count: usize,
314    pub operational_analytics_job_count: usize,
315    pub missing_operational_indexes: Vec<String>,
316    pub undeclared_operational_indexes: Vec<String>,
317    pub missing_operational_graph_projections: Vec<String>,
318    pub undeclared_operational_graph_projections: Vec<String>,
319    pub missing_operational_analytics_jobs: Vec<String>,
320    pub undeclared_operational_analytics_jobs: Vec<String>,
321}
322
323pub fn snapshot_store(
324    name: &str,
325    store: &UnifiedStore,
326    index_catalog: Option<&IndexCatalog>,
327) -> CatalogModelSnapshot {
328    snapshot_store_with_declarations(name, store, index_catalog, None, None)
329}
330
331pub fn snapshot_store_with_declarations(
332    name: &str,
333    store: &UnifiedStore,
334    index_catalog: Option<&IndexCatalog>,
335    declarations: Option<&CatalogDeclarations>,
336    contracts: Option<&[CollectionContract]>,
337) -> CatalogModelSnapshot {
338    let index_statuses = index_statuses(declarations);
339    let graph_projection_statuses = graph_projection_statuses(declarations);
340    let analytics_job_statuses = analytics_job_statuses(declarations);
341
342    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
343    for (collection, entity) in store.query_all(|_| true) {
344        grouped.entry(collection).or_default().push(entity);
345    }
346    let queue_policies = queue_policies_from_grouped(&grouped);
347
348    let mut stats_by_collection = BTreeMap::new();
349    let mut collections = Vec::new();
350
351    for collection_name in store.list_collections() {
352        let entities = grouped.remove(&collection_name).unwrap_or_default();
353        let inferred_model = infer_model(&entities);
354        let inferred_schema_mode = infer_schema_mode(inferred_model);
355        let contract = collection_contract(&collection_name, contracts);
356        let model = contract
357            .map(|contract| contract.declared_model)
358            .unwrap_or(inferred_model);
359        let cross_refs = entities
360            .iter()
361            .map(|entity| entity.cross_refs().len())
362            .sum();
363        let entity_count = entities.len();
364        let manager_stats = store
365            .get_collection(&collection_name)
366            .map(|manager| manager.stats());
367        let segments = manager_stats
368            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
369            .unwrap_or(0);
370
371        stats_by_collection.insert(
372            collection_name.clone(),
373            CollectionStats {
374                entities: entity_count,
375                cross_refs,
376                segments,
377            },
378        );
379
380        let declared_indices = declared_indices(&collection_name, declarations);
381        let operational_indices =
382            operational_indices(&collection_name, index_catalog, declarations);
383        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
384            collection_index_consistency(&declared_indices, &operational_indices);
385        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
386            collection_index_readiness(&collection_name, &index_statuses);
387        let (
388            queryable_graph_projection_count,
389            graph_projections_requiring_rematerialization_count,
390            graph_projections_locally_in_sync,
391        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
392        let (
393            executable_analytics_job_count,
394            analytics_jobs_requiring_rerun_count,
395            analytics_jobs_locally_in_sync,
396        ) = collection_analytics_job_readiness(
397            &collection_name,
398            &graph_projection_statuses,
399            &analytics_job_statuses,
400        );
401        let resources_in_sync = indexes_in_sync
402            && indexes_locally_in_sync
403            && graph_projections_locally_in_sync
404            && analytics_jobs_locally_in_sync;
405        let attention_required = !resources_in_sync
406            || indexes_requiring_rebuild_count > 0
407            || graph_projections_requiring_rematerialization_count > 0
408            || analytics_jobs_requiring_rerun_count > 0;
409        let mut attention_reasons = Vec::new();
410        if !indexes_in_sync || !indexes_locally_in_sync {
411            attention_reasons.push("index_drift".to_string());
412        }
413        if indexes_requiring_rebuild_count > 0 {
414            attention_reasons.push("indexes_require_rebuild".to_string());
415        }
416        if !graph_projections_locally_in_sync {
417            attention_reasons.push("graph_projection_drift".to_string());
418        }
419        if graph_projections_requiring_rematerialization_count > 0 {
420            attention_reasons.push("graph_projections_require_rematerialization".to_string());
421        }
422        if !analytics_jobs_locally_in_sync {
423            attention_reasons.push("analytics_job_drift".to_string());
424        }
425        if analytics_jobs_requiring_rerun_count > 0 {
426            attention_reasons.push("analytics_jobs_require_rerun".to_string());
427        }
428        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
429            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
430            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
431            + usize::from(!resources_in_sync);
432
433        collections.push(CollectionDescriptor {
434            name: collection_name.clone(),
435            model,
436            schema_mode: contract
437                .map(|contract| contract.schema_mode)
438                .unwrap_or(inferred_schema_mode),
439            contract_present: contract.is_some(),
440            contract_origin: contract.map(|contract| contract.origin),
441            declared_model: contract.map(|contract| contract.declared_model),
442            observed_model: inferred_model,
443            queue_mode: if model == CollectionModel::Queue {
444                Some(
445                    queue_policies
446                        .get(&collection_name)
447                        .map(|p| p.mode)
448                        .unwrap_or_default(),
449                )
450            } else {
451                None
452            },
453            queue_max_attempts: if model == CollectionModel::Queue {
454                Some(
455                    queue_policies
456                        .get(&collection_name)
457                        .map(|p| p.max_attempts)
458                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
459                )
460            } else {
461                None
462            },
463            queue_lock_deadline_ms: if model == CollectionModel::Queue {
464                Some(
465                    queue_policies
466                        .get(&collection_name)
467                        .map(|p| p.lock_deadline_ms)
468                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
469                )
470            } else {
471                None
472            },
473            queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
474                Some(
475                    queue_policies
476                        .get(&collection_name)
477                        .map(|p| p.in_flight_cap_per_group)
478                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
479                )
480            } else {
481                None
482            },
483            queue_dlq_target: if model == CollectionModel::Queue {
484                queue_policies
485                    .get(&collection_name)
486                    .and_then(|p| p.dlq_target.clone())
487            } else {
488                None
489            },
490            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
491            vector_metric: contract.and_then(|contract| contract.vector_metric),
492            session_key: contract.and_then(|contract| contract.session_key.clone()),
493            session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
494            retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
495            declared_schema_mode: contract.map(|contract| contract.schema_mode),
496            observed_schema_mode: inferred_schema_mode,
497            entities: entity_count,
498            cross_refs,
499            segments,
500            indices: infer_indices(&collection_name, model, index_catalog, declarations),
501            declared_indices,
502            operational_indices,
503            indexes_in_sync,
504            missing_operational_indices,
505            undeclared_operational_indices,
506            queryable_index_count,
507            indexes_requiring_rebuild_count,
508            queryable_graph_projection_count,
509            graph_projections_requiring_rematerialization_count,
510            executable_analytics_job_count,
511            analytics_jobs_requiring_rerun_count,
512            subscriptions: contract
513                .map(|contract| contract.subscriptions.clone())
514                .unwrap_or_default(),
515            analytics_config: contract
516                .map(|contract| contract.analytics_config.clone())
517                .unwrap_or_default(),
518            resources_in_sync,
519            attention_required,
520            attention_score,
521            attention_reasons,
522        });
523    }
524
525    collections.sort_by(|left, right| left.name.cmp(&right.name));
526
527    let summary = CatalogSnapshot {
528        name: name.to_string(),
529        total_entities: stats_by_collection
530            .values()
531            .map(|stats| stats.entities)
532            .sum(),
533        total_collections: stats_by_collection.len(),
534        stats_by_collection,
535        updated_at: SystemTime::now(),
536    };
537
538    CatalogModelSnapshot {
539        summary,
540        collections,
541        indices: index_catalog
542            .map(IndexCatalog::snapshot)
543            .unwrap_or_default(),
544        declared_indexes: declarations
545            .map(|declarations| declarations.declared_indexes.clone())
546            .unwrap_or_default(),
547        declared_graph_projections: declarations
548            .map(|declarations| declarations.declared_graph_projections.clone())
549            .unwrap_or_default(),
550        declared_analytics_jobs: declarations
551            .map(|declarations| declarations.declared_analytics_jobs.clone())
552            .unwrap_or_default(),
553        operational_indexes: declarations
554            .map(|declarations| declarations.operational_indexes.clone())
555            .unwrap_or_default(),
556        operational_graph_projections: declarations
557            .map(|declarations| declarations.operational_graph_projections.clone())
558            .unwrap_or_default(),
559        operational_analytics_jobs: declarations
560            .map(|declarations| declarations.operational_analytics_jobs.clone())
561            .unwrap_or_default(),
562        queryable_index_count: index_statuses
563            .iter()
564            .filter(|status| status.queryable)
565            .count(),
566        indexes_requiring_rebuild_count: index_statuses
567            .iter()
568            .filter(|status| status.requires_rebuild)
569            .count(),
570        queryable_graph_projection_count: graph_projection_statuses
571            .iter()
572            .filter(|status| status.queryable)
573            .count(),
574        graph_projections_requiring_rematerialization_count: graph_projection_statuses
575            .iter()
576            .filter(|status| status.requires_rematerialization)
577            .count(),
578        executable_analytics_job_count: analytics_job_statuses
579            .iter()
580            .filter(|status| status.executable)
581            .count(),
582        analytics_jobs_requiring_rerun_count: analytics_job_statuses
583            .iter()
584            .filter(|status| status.requires_rerun)
585            .count(),
586        index_statuses,
587        graph_projection_statuses,
588        analytics_job_statuses,
589    }
590}
591
592/// Hot-fields snapshot of per-queue policy, materialised at catalog
593/// snapshot time so `QueueLifecycle` (and the descriptor) can read
594/// values without re-scanning `red_queue_meta`.
595#[derive(Debug, Clone)]
596pub struct QueuePolicySnapshot {
597    pub mode: QueueMode,
598    pub max_attempts: u32,
599    pub lock_deadline_ms: u64,
600    pub in_flight_cap_per_group: u32,
601    pub dlq_target: Option<String>,
602}
603
604fn queue_policies_from_grouped(
605    grouped: &HashMap<String, Vec<UnifiedEntity>>,
606) -> HashMap<String, QueuePolicySnapshot> {
607    let Some(meta) = grouped.get("red_queue_meta") else {
608        return HashMap::new();
609    };
610
611    let mut policies = HashMap::new();
612    for entity in meta {
613        let Some(row) = entity.data.as_row() else {
614            continue;
615        };
616        if row_text(row, "kind").as_deref() != Some("queue_config") {
617            continue;
618        }
619        let Some(queue) = row_text(row, "queue") else {
620            continue;
621        };
622        let mode = row_text(row, "mode")
623            .as_deref()
624            .and_then(QueueMode::parse)
625            .unwrap_or_default();
626        let max_attempts = row_u64(row, "max_attempts")
627            .map(|v| v as u32)
628            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
629        let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
630            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
631        let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
632            .map(|v| v as u32)
633            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
634        let dlq_target = row_text(row, "dlq");
635        policies.insert(
636            queue,
637            QueuePolicySnapshot {
638                mode,
639                max_attempts,
640                lock_deadline_ms,
641                in_flight_cap_per_group,
642                dlq_target,
643            },
644        );
645    }
646    policies
647}
648
649fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
650    match row.get_field(key) {
651        Some(Value::UnsignedInteger(v)) => Some(*v),
652        Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
653        _ => None,
654    }
655}
656
657fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
658    match row.get_field(key) {
659        Some(Value::Text(value)) => Some(value.to_string()),
660        _ => None,
661    }
662}
663
664fn collection_contract<'a>(
665    collection_name: &str,
666    contracts: Option<&'a [CollectionContract]>,
667) -> Option<&'a CollectionContract> {
668    contracts.and_then(|contracts| {
669        contracts
670            .iter()
671            .find(|contract| contract.name == collection_name)
672    })
673}
674
675fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
676    let mut has_table = false;
677    let mut has_graph = false;
678    let mut has_vector = false;
679
680    for entity in entities {
681        match &entity.kind {
682            EntityKind::TableRow { .. } => has_table = true,
683            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
684            EntityKind::Vector { .. } => has_vector = true,
685            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
686        }
687    }
688
689    match (has_table, has_graph, has_vector) {
690        (true, false, false) => CollectionModel::Table,
691        (false, true, false) => CollectionModel::Graph,
692        (false, false, true) => CollectionModel::Vector,
693        _ => CollectionModel::Mixed,
694    }
695}
696
697fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
698    match model {
699        CollectionModel::Table => SchemaMode::Strict,
700        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
701        CollectionModel::Document
702        | CollectionModel::Hll
703        | CollectionModel::Sketch
704        | CollectionModel::Filter
705        | CollectionModel::Kv
706        | CollectionModel::Config
707        | CollectionModel::Vault
708        | CollectionModel::Mixed => SchemaMode::Dynamic,
709        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
710        CollectionModel::Queue => SchemaMode::Dynamic,
711    }
712}
713
714fn infer_indices(
715    collection_name: &str,
716    model: CollectionModel,
717    index_catalog: Option<&IndexCatalog>,
718    declarations: Option<&CatalogDeclarations>,
719) -> Vec<String> {
720    let declared = declared_indices(collection_name, declarations);
721    if !declared.is_empty() {
722        return declared;
723    }
724
725    let available = index_catalog
726        .map(IndexCatalog::snapshot)
727        .unwrap_or_default();
728    let mut selected = Vec::new();
729
730    for metric in available {
731        let relevant = matches!(
732            (model, metric.kind),
733            (_, IndexKind::BTree)
734                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
735                | (CollectionModel::Vector, IndexKind::VectorHnsw)
736                | (CollectionModel::Vector, IndexKind::VectorInverted)
737                | (CollectionModel::Vector, IndexKind::VectorTurbo)
738                | (CollectionModel::Document, IndexKind::FullText)
739                | (CollectionModel::Document, IndexKind::DocumentPathValue)
740                | (CollectionModel::Kv, IndexKind::Hash)
741                | (CollectionModel::Config, IndexKind::Hash)
742                | (CollectionModel::Vault, IndexKind::Hash)
743                | (CollectionModel::Mixed, _)
744        );
745
746        if relevant && metric.enabled {
747            selected.push(metric.name);
748        }
749    }
750
751    selected
752}
753
754fn declared_indices(
755    collection_name: &str,
756    declarations: Option<&CatalogDeclarations>,
757) -> Vec<String> {
758    let mut selected = declarations
759        .map(|declarations| {
760            declarations
761                .declared_indexes
762                .iter()
763                .filter(|index| index.collection.as_deref() == Some(collection_name))
764                .map(|index| index.name.clone())
765                .collect::<Vec<_>>()
766        })
767        .unwrap_or_default();
768    selected.sort();
769    selected.dedup();
770    selected
771}
772
773fn operational_indices(
774    collection_name: &str,
775    index_catalog: Option<&IndexCatalog>,
776    declarations: Option<&CatalogDeclarations>,
777) -> Vec<String> {
778    if let Some(declarations) = declarations {
779        let mut selected = declarations
780            .operational_indexes
781            .iter()
782            .filter(|index| index.collection.as_deref() == Some(collection_name))
783            .map(|index| index.name.clone())
784            .collect::<Vec<_>>();
785        selected.sort();
786        selected.dedup();
787        return selected;
788    }
789
790    let mut selected = index_catalog
791        .map(IndexCatalog::snapshot)
792        .unwrap_or_default()
793        .into_iter()
794        .filter(|metric| metric.enabled)
795        .filter_map(|metric| {
796            if metric.name.starts_with(collection_name) {
797                Some(metric.name)
798            } else {
799                None
800            }
801        })
802        .collect::<Vec<_>>();
803    selected.sort();
804    selected.dedup();
805    selected
806}
807
808fn collection_index_consistency(
809    declared_indices: &[String],
810    operational_indices: &[String],
811) -> (bool, Vec<String>, Vec<String>) {
812    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
813    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
814    let missing_operational_indices = declared
815        .difference(&operational)
816        .cloned()
817        .collect::<Vec<_>>();
818    let undeclared_operational_indices = operational
819        .difference(&declared)
820        .cloned()
821        .collect::<Vec<_>>();
822    (
823        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
824        missing_operational_indices,
825        undeclared_operational_indices,
826    )
827}
828
829fn collection_index_readiness(
830    collection_name: &str,
831    statuses: &[CatalogIndexStatus],
832) -> (usize, usize, bool) {
833    let local = statuses
834        .iter()
835        .filter(|status| status.collection.as_deref() == Some(collection_name))
836        .collect::<Vec<_>>();
837    let queryable_count = local.iter().filter(|status| status.queryable).count();
838    let requires_rebuild_count = local
839        .iter()
840        .filter(|status| status.requires_rebuild)
841        .count();
842    let locally_in_sync = local.iter().all(|status| status.in_sync);
843    (queryable_count, requires_rebuild_count, locally_in_sync)
844}
845
846fn collection_graph_projection_readiness(
847    collection_name: &str,
848    statuses: &[CatalogGraphProjectionStatus],
849) -> (usize, usize, bool) {
850    let local = statuses
851        .iter()
852        .filter(|status| status.source.as_deref() == Some(collection_name))
853        .collect::<Vec<_>>();
854    let queryable_count = local.iter().filter(|status| status.queryable).count();
855    let requires_rematerialization_count = local
856        .iter()
857        .filter(|status| status.requires_rematerialization)
858        .count();
859    let locally_in_sync = local
860        .iter()
861        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
862    (
863        queryable_count,
864        requires_rematerialization_count,
865        locally_in_sync,
866    )
867}
868
869fn collection_analytics_job_readiness(
870    collection_name: &str,
871    projection_statuses: &[CatalogGraphProjectionStatus],
872    job_statuses: &[CatalogAnalyticsJobStatus],
873) -> (usize, usize, bool) {
874    let local = job_statuses
875        .iter()
876        .filter(|status| {
877            let Some(projection_name) = status.projection.as_deref() else {
878                return false;
879            };
880            projection_statuses
881                .iter()
882                .find(|projection| projection.name == projection_name)
883                .and_then(|projection| projection.source.as_deref())
884                == Some(collection_name)
885        })
886        .collect::<Vec<_>>();
887    let executable_count = local.iter().filter(|status| status.executable).count();
888    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
889    let locally_in_sync = local
890        .iter()
891        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
892    (executable_count, requires_rerun_count, locally_in_sync)
893}
894
895fn graph_projection_statuses(
896    declarations: Option<&CatalogDeclarations>,
897) -> Vec<CatalogGraphProjectionStatus> {
898    let declared = declarations
899        .map(|declarations| declarations.declared_graph_projections.as_slice())
900        .unwrap_or(&[]);
901    let operational = declarations
902        .map(|declarations| declarations.operational_graph_projections.as_slice())
903        .unwrap_or(&[]);
904    let declared_jobs = declarations
905        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
906        .unwrap_or(&[]);
907    let operational_jobs = declarations
908        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
909        .unwrap_or(&[]);
910
911    let mut names = declared
912        .iter()
913        .map(|projection| projection.name.clone())
914        .chain(operational.iter().map(|projection| projection.name.clone()))
915        .collect::<Vec<_>>();
916    names.sort();
917    names.dedup();
918
919    names
920        .into_iter()
921        .map(|name| {
922            let declared_projection = declared.iter().find(|projection| projection.name == name);
923            let operational_projection = operational
924                .iter()
925                .find(|projection| projection.name == name);
926            let declared_present = declared_projection.is_some();
927            let operational_present = operational_projection.is_some();
928            let mut dependent_job_ids = BTreeSet::new();
929            let mut dependent_job_count = 0usize;
930            let mut active_dependent_job_count = 0usize;
931            let mut stale_dependent_job_count = 0usize;
932            for job in declared_jobs
933                .iter()
934                .chain(operational_jobs.iter())
935                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
936            {
937                if !dependent_job_ids.insert(job.id.clone()) {
938                    continue;
939                }
940                dependent_job_count += 1;
941                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
942                    active_dependent_job_count += 1;
943                }
944                if job.state == "stale" {
945                    stale_dependent_job_count += 1;
946                }
947            }
948            let lifecycle_state = match (
949                declared_present,
950                operational_present,
951                declared_projection
952                    .map(|projection| projection.state.as_str())
953                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
954                    .unwrap_or_default(),
955            ) {
956                (true, _, "failed") => "failed",
957                (true, true, "stale") => "stale",
958                (true, _, "materializing") => "materializing",
959                (true, true, "materialized") => "materialized",
960                (true, true, _) => "materialized",
961                (false, true, _) => "orphaned-operational",
962                (true, false, _) => "declared",
963                (false, false, _) => "unknown",
964            }
965            .to_string();
966            let queryable = declared_present
967                && operational_present
968                && matches!(
969                    declared_projection
970                        .map(|projection| projection.state.as_str())
971                        .or_else(
972                            || operational_projection.map(|projection| projection.state.as_str())
973                        )
974                        .unwrap_or_default(),
975                    "materialized"
976                );
977            let requires_rematerialization = matches!(
978                declared_projection
979                    .map(|projection| projection.state.as_str())
980                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
981                    .unwrap_or_default(),
982                "declared" | "materializing" | "failed" | "stale"
983            );
984            let mut attention_reasons = Vec::new();
985            if !declared_present || !operational_present {
986                attention_reasons.push("declaration_drift".to_string());
987            }
988            if requires_rematerialization {
989                attention_reasons.push("requires_rematerialization".to_string());
990            }
991            if stale_dependent_job_count > 0 {
992                attention_reasons.push("dependent_jobs_stale".to_string());
993            }
994            let attention_score = stale_dependent_job_count.saturating_mul(2)
995                + usize::from(requires_rematerialization).saturating_mul(4)
996                + usize::from(!declared_present || !operational_present)
997                + usize::from(!queryable);
998            CatalogGraphProjectionStatus {
999                name,
1000                source: declared_projection
1001                    .map(|projection| projection.source.clone())
1002                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
1003                lifecycle_state,
1004                declared: declared_present,
1005                operational: operational_present,
1006                in_sync: declared_present == operational_present,
1007                last_materialized_sequence: declared_projection
1008                    .and_then(|projection| projection.last_materialized_sequence)
1009                    .or_else(|| {
1010                        operational_projection
1011                            .and_then(|projection| projection.last_materialized_sequence)
1012                    }),
1013                queryable,
1014                requires_rematerialization,
1015                dependent_job_count,
1016                active_dependent_job_count,
1017                stale_dependent_job_count,
1018                dependent_jobs_in_sync: stale_dependent_job_count == 0,
1019                rerun_required: stale_dependent_job_count > 0,
1020                attention_score,
1021                attention_reasons,
1022            }
1023        })
1024        .collect()
1025}
1026
1027fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
1028    let declared = declarations
1029        .map(|declarations| declarations.declared_indexes.as_slice())
1030        .unwrap_or(&[]);
1031    let operational = declarations
1032        .map(|declarations| declarations.operational_indexes.as_slice())
1033        .unwrap_or(&[]);
1034
1035    let mut names = declared
1036        .iter()
1037        .map(|index| index.name.clone())
1038        .chain(operational.iter().map(|index| index.name.clone()))
1039        .collect::<Vec<_>>();
1040    names.sort();
1041    names.dedup();
1042
1043    names
1044        .into_iter()
1045        .map(|name| {
1046            let declared_index = declared.iter().find(|index| index.name == name);
1047            let operational_index = operational.iter().find(|index| index.name == name);
1048            let collection = declared_index
1049                .and_then(|index| index.collection.clone())
1050                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
1051            let kind = declared_index
1052                .map(|index| index_kind_string(index.kind))
1053                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
1054                .unwrap_or_default();
1055            let enabled = declared_index
1056                .map(|index| index.enabled)
1057                .or_else(|| operational_index.map(|index| index.enabled))
1058                .unwrap_or(false);
1059            let build_state = operational_index
1060                .map(|index| index.build_state.clone())
1061                .or_else(|| declared_index.map(|index| index.build_state.clone()));
1062            let declared_present = declared_index.is_some();
1063            let operational_present = operational_index.is_some();
1064            let lifecycle_state = index_lifecycle_state(
1065                declared_present,
1066                operational_present,
1067                enabled,
1068                build_state.as_deref(),
1069            );
1070
1071            let mut attention_reasons = Vec::new();
1072            if declared_present != operational_present {
1073                attention_reasons.push("declaration_drift".to_string());
1074            }
1075            if !enabled && declared_present {
1076                attention_reasons.push("disabled".to_string());
1077            }
1078            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1079                attention_reasons.push("failed".to_string());
1080            }
1081            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1082                attention_reasons.push("stale".to_string());
1083            }
1084            if matches!(
1085                build_state.as_deref().unwrap_or_default(),
1086                "building" | "declared-unbuilt"
1087            ) {
1088                attention_reasons.push("requires_rebuild".to_string());
1089            }
1090            let queryable = declared_present
1091                && operational_present
1092                && enabled
1093                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1094            let requires_rebuild = matches!(
1095                build_state.as_deref().unwrap_or_default(),
1096                "declared-unbuilt" | "building" | "stale" | "failed"
1097            );
1098            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1099                + usize::from(!enabled && declared_present)
1100                + usize::from(declared_present != operational_present)
1101                + usize::from(!queryable);
1102
1103            let artifact_state = crate::physical::ArtifactState::from_build_state(
1104                build_state.as_deref().unwrap_or("declared-unbuilt"),
1105                enabled,
1106            );
1107            CatalogIndexStatus {
1108                name,
1109                collection,
1110                kind,
1111                declared: declared_present,
1112                operational: operational_present,
1113                enabled,
1114                build_state,
1115                artifact_state,
1116                lifecycle_state,
1117                in_sync: declared_present == operational_present,
1118                queryable,
1119                requires_rebuild,
1120                attention_score,
1121                attention_reasons,
1122            }
1123        })
1124        .collect()
1125}
1126
1127fn index_lifecycle_state(
1128    declared: bool,
1129    operational: bool,
1130    enabled: bool,
1131    build_state: Option<&str>,
1132) -> String {
1133    if !declared && operational {
1134        return "orphaned-operational".to_string();
1135    }
1136    if declared && !enabled {
1137        return "disabled".to_string();
1138    }
1139    if !declared {
1140        return "unknown".to_string();
1141    }
1142    if !operational {
1143        return "declared".to_string();
1144    }
1145
1146    match build_state.unwrap_or_default() {
1147        "ready" => "ready".to_string(),
1148        "failed" => "failed".to_string(),
1149        "stale" => "stale".to_string(),
1150        "declared-unbuilt" => "declared".to_string(),
1151        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1152            "building".to_string()
1153        }
1154        _ => "building".to_string(),
1155    }
1156}
1157
1158fn index_kind_string(kind: IndexKind) -> String {
1159    match kind {
1160        IndexKind::BTree => "btree",
1161        IndexKind::Hash => "hash",
1162        IndexKind::Bitmap => "bitmap",
1163        IndexKind::Spatial => "spatial.rtree",
1164        IndexKind::VectorHnsw => "vector.hnsw",
1165        IndexKind::VectorInverted => "vector.inverted",
1166        IndexKind::VectorTurbo => "vector.turbo",
1167        IndexKind::GraphAdjacency => "graph.adjacency",
1168        IndexKind::FullText => "text.fulltext",
1169        IndexKind::DocumentPathValue => "document.pathvalue",
1170        IndexKind::HybridSearch => "search.hybrid",
1171    }
1172    .to_string()
1173}
1174
1175fn analytics_job_statuses(
1176    declarations: Option<&CatalogDeclarations>,
1177) -> Vec<CatalogAnalyticsJobStatus> {
1178    let declared = declarations
1179        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1180        .unwrap_or(&[]);
1181    let operational = declarations
1182        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1183        .unwrap_or(&[]);
1184    let declared_projections = declarations
1185        .map(|declarations| declarations.declared_graph_projections.as_slice())
1186        .unwrap_or(&[]);
1187    let operational_projections = declarations
1188        .map(|declarations| declarations.operational_graph_projections.as_slice())
1189        .unwrap_or(&[]);
1190
1191    let mut ids = declared
1192        .iter()
1193        .map(|job| job.id.clone())
1194        .chain(operational.iter().map(|job| job.id.clone()))
1195        .collect::<Vec<_>>();
1196    ids.sort();
1197    ids.dedup();
1198
1199    ids.into_iter()
1200        .map(|id| {
1201            let declared_job = declared.iter().find(|job| job.id == id);
1202            let operational_job = operational.iter().find(|job| job.id == id);
1203            let kind = declared_job
1204                .map(|job| job.kind.clone())
1205                .or_else(|| operational_job.map(|job| job.kind.clone()))
1206                .unwrap_or_default();
1207            let projection = declared_job
1208                .and_then(|job| job.projection.clone())
1209                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1210            let state = declared_job
1211                .map(|job| job.state.clone())
1212                .or_else(|| operational_job.map(|job| job.state.clone()))
1213                .unwrap_or_default();
1214            let declared_present = declared_job.is_some();
1215            let operational_present = operational_job.is_some();
1216            let last_run_sequence = declared_job
1217                .and_then(|job| job.last_run_sequence)
1218                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1219            let projection_declared = projection.as_ref().map(|projection_name| {
1220                declared_projections
1221                    .iter()
1222                    .any(|projection| projection.name == *projection_name)
1223            });
1224            let projection_operational = projection.as_ref().map(|projection_name| {
1225                operational_projections
1226                    .iter()
1227                    .any(|projection| projection.name == *projection_name)
1228            });
1229            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1230                declared_projections
1231                    .iter()
1232                    .find(|projection| projection.name == *projection_name)
1233                    .map(|projection| projection.state.as_str())
1234                    .or_else(|| {
1235                        operational_projections
1236                            .iter()
1237                            .find(|projection| projection.name == *projection_name)
1238                            .map(|projection| projection.state.as_str())
1239                    })
1240            });
1241            let dependency_in_sync = projection.as_ref().map(|_| {
1242                matches!(projection_lifecycle, Some("materialized"))
1243                    && projection_operational == Some(true)
1244            });
1245            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1246                (true, false, _) => "declared",
1247                (true, true, _)
1248                    if matches!(
1249                        projection_lifecycle,
1250                        Some("stale" | "failed" | "materializing" | "declared")
1251                    ) =>
1252                {
1253                    "stale"
1254                }
1255                (true, true, "completed") => "completed",
1256                (true, true, "running") => "running",
1257                (true, true, "failed") => "failed",
1258                (true, true, "queued") => "queued",
1259                (true, true, "stale") => "stale",
1260                (true, true, _) => "materialized",
1261                (false, true, _) => "orphaned-operational",
1262                (false, false, _) => "unknown",
1263            }
1264            .to_string();
1265            let executable = declared_present
1266                && operational_present
1267                && !matches!(state.as_str(), "failed" | "stale")
1268                && dependency_in_sync.unwrap_or(true);
1269            let requires_rerun =
1270                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1271            let mut attention_reasons = Vec::new();
1272            if declared_present != operational_present {
1273                attention_reasons.push("declaration_drift".to_string());
1274            }
1275            if matches!(state.as_str(), "failed") {
1276                attention_reasons.push("failed".to_string());
1277            }
1278            if matches!(state.as_str(), "stale") {
1279                attention_reasons.push("stale".to_string());
1280            }
1281            if dependency_in_sync == Some(false) {
1282                attention_reasons.push("dependency_drift".to_string());
1283            }
1284            if requires_rerun {
1285                attention_reasons.push("requires_rerun".to_string());
1286            }
1287            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1288                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1289                + usize::from(declared_present != operational_present)
1290                + usize::from(!executable);
1291            CatalogAnalyticsJobStatus {
1292                id,
1293                kind,
1294                projection,
1295                state: state.clone(),
1296                lifecycle_state,
1297                declared: declared_present,
1298                operational: operational_present,
1299                in_sync: declared_present == operational_present,
1300                last_run_sequence,
1301                projection_declared,
1302                projection_operational,
1303                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1304                dependency_in_sync,
1305                executable,
1306                requires_rerun,
1307                attention_score,
1308                attention_reasons,
1309            }
1310        })
1311        .collect()
1312}
1313
1314pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1315    let declared_indexes = snapshot
1316        .declared_indexes
1317        .iter()
1318        .map(|index| index.name.clone())
1319        .collect::<BTreeSet<_>>();
1320    let operational_indexes = snapshot
1321        .operational_indexes
1322        .iter()
1323        .map(|index| index.name.clone())
1324        .collect::<BTreeSet<_>>();
1325    let declared_graph_projections = snapshot
1326        .declared_graph_projections
1327        .iter()
1328        .map(|projection| projection.name.clone())
1329        .collect::<BTreeSet<_>>();
1330    let operational_graph_projections = snapshot
1331        .operational_graph_projections
1332        .iter()
1333        .map(|projection| projection.name.clone())
1334        .collect::<BTreeSet<_>>();
1335    let declared_analytics_jobs = snapshot
1336        .declared_analytics_jobs
1337        .iter()
1338        .map(|job| job.id.clone())
1339        .collect::<BTreeSet<_>>();
1340    let operational_analytics_jobs = snapshot
1341        .operational_analytics_jobs
1342        .iter()
1343        .map(|job| job.id.clone())
1344        .collect::<BTreeSet<_>>();
1345
1346    CatalogConsistencyReport {
1347        declared_index_count: declared_indexes.len(),
1348        operational_index_count: operational_indexes.len(),
1349        declared_graph_projection_count: declared_graph_projections.len(),
1350        operational_graph_projection_count: operational_graph_projections.len(),
1351        declared_analytics_job_count: declared_analytics_jobs.len(),
1352        operational_analytics_job_count: operational_analytics_jobs.len(),
1353        missing_operational_indexes: declared_indexes
1354            .difference(&operational_indexes)
1355            .cloned()
1356            .collect(),
1357        undeclared_operational_indexes: operational_indexes
1358            .difference(&declared_indexes)
1359            .cloned()
1360            .collect(),
1361        missing_operational_graph_projections: declared_graph_projections
1362            .difference(&operational_graph_projections)
1363            .cloned()
1364            .collect(),
1365        undeclared_operational_graph_projections: operational_graph_projections
1366            .difference(&declared_graph_projections)
1367            .cloned()
1368            .collect(),
1369        missing_operational_analytics_jobs: declared_analytics_jobs
1370            .difference(&operational_analytics_jobs)
1371            .cloned()
1372            .collect(),
1373        undeclared_operational_analytics_jobs: operational_analytics_jobs
1374            .difference(&declared_analytics_jobs)
1375            .cloned()
1376            .collect(),
1377    }
1378}
1379
1380pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1381    CatalogAttentionSummary {
1382        collections_requiring_attention: snapshot
1383            .collections
1384            .iter()
1385            .filter(|collection| collection.attention_required)
1386            .count(),
1387        indexes_requiring_attention: snapshot
1388            .index_statuses
1389            .iter()
1390            .filter(|status| status.attention_score > 0)
1391            .count(),
1392        graph_projections_requiring_attention: snapshot
1393            .graph_projection_statuses
1394            .iter()
1395            .filter(|status| status.attention_score > 0)
1396            .count(),
1397        analytics_jobs_requiring_attention: snapshot
1398            .analytics_job_statuses
1399            .iter()
1400            .filter(|status| status.attention_score > 0)
1401            .count(),
1402        top_collection: snapshot
1403            .collections
1404            .iter()
1405            .filter(|collection| collection.attention_score > 0)
1406            .max_by_key(|collection| collection.attention_score)
1407            .cloned(),
1408        top_index: snapshot
1409            .index_statuses
1410            .iter()
1411            .filter(|status| status.attention_score > 0)
1412            .max_by_key(|status| status.attention_score)
1413            .cloned(),
1414        top_graph_projection: snapshot
1415            .graph_projection_statuses
1416            .iter()
1417            .filter(|status| status.attention_score > 0)
1418            .max_by_key(|status| status.attention_score)
1419            .cloned(),
1420        top_analytics_job: snapshot
1421            .analytics_job_statuses
1422            .iter()
1423            .filter(|status| status.attention_score > 0)
1424            .max_by_key(|status| status.attention_score)
1425            .cloned(),
1426    }
1427}
1428
1429pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1430    let mut collections = snapshot
1431        .collections
1432        .iter()
1433        .filter(|collection| collection.attention_required)
1434        .cloned()
1435        .collect::<Vec<_>>();
1436    collections.sort_by(|left, right| {
1437        right
1438            .attention_score
1439            .cmp(&left.attention_score)
1440            .then_with(|| left.name.cmp(&right.name))
1441    });
1442    collections
1443}
1444
1445pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1446    let mut statuses = snapshot
1447        .index_statuses
1448        .iter()
1449        .filter(|status| status.attention_score > 0)
1450        .cloned()
1451        .collect::<Vec<_>>();
1452    statuses.sort_by(|left, right| {
1453        right
1454            .attention_score
1455            .cmp(&left.attention_score)
1456            .then_with(|| left.name.cmp(&right.name))
1457    });
1458    statuses
1459}
1460
1461pub fn graph_projection_attention(
1462    snapshot: &CatalogModelSnapshot,
1463) -> Vec<CatalogGraphProjectionStatus> {
1464    let mut statuses = snapshot
1465        .graph_projection_statuses
1466        .iter()
1467        .filter(|status| status.attention_score > 0)
1468        .cloned()
1469        .collect::<Vec<_>>();
1470    statuses.sort_by(|left, right| {
1471        right
1472            .attention_score
1473            .cmp(&left.attention_score)
1474            .then_with(|| left.name.cmp(&right.name))
1475    });
1476    statuses
1477}
1478
1479pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1480    let mut statuses = snapshot
1481        .analytics_job_statuses
1482        .iter()
1483        .filter(|status| status.attention_score > 0)
1484        .cloned()
1485        .collect::<Vec<_>>();
1486    statuses.sort_by(|left, right| {
1487        right
1488            .attention_score
1489            .cmp(&left.attention_score)
1490            .then_with(|| left.id.cmp(&right.id))
1491    });
1492    statuses
1493}