Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Hll,
23    Sketch,
24    Filter,
25    Kv,
26    Config,
27    Vault,
28    Mixed,
29    TimeSeries,
30    Queue,
31    Metrics,
32}
33
34/// Per-collection analytical-storage seam (PRD #850, Phase 1).
35///
36/// Declares that a `Metrics`/`TimeSeries` collection's hypertable chunks
37/// are backed by the **columnar** sealed-chunk layout
38/// ([`column_block`](crate::storage::unified::column_block)) rather than
39/// the default row engine. `columnar = false` (or the whole config
40/// absent) keeps the row engine — the seal dispatch routes only
41/// columnar-flagged chunks to the `ColumnBlock` writer.
42///
43/// `time_key` / `order_by_key` mirror the ClickHouse `ORDER BY` intent so
44/// downstream slices (#854 granules, #856 vectorized read) can lay the
45/// columns out in their natural sort order; v1 records them as the durable
46/// contract without yet acting on `order_by_key`.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct AnalyticalStorageConfig {
49    /// When true, sealing a chunk routes to the columnar `ColumnBlock`
50    /// writer; otherwise the row engine stays the default.
51    pub columnar: bool,
52    /// Column carrying the time axis (the chunk's partition key).
53    pub time_key: String,
54    /// Optional secondary sort key (ClickHouse `ORDER BY` tail). `None`
55    /// orders by `time_key` alone.
56    pub order_by_key: Option<String>,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum SchemaMode {
61    Strict,
62    SemiStructured,
63    Dynamic,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum SubscriptionOperation {
68    Insert,
69    Update,
70    Delete,
71}
72
73impl SubscriptionOperation {
74    pub fn as_str(self) -> &'static str {
75        match self {
76            Self::Insert => "INSERT",
77            Self::Update => "UPDATE",
78            Self::Delete => "DELETE",
79        }
80    }
81
82    pub fn from_str(value: &str) -> Option<Self> {
83        match value.to_ascii_uppercase().as_str() {
84            "INSERT" => Some(Self::Insert),
85            "UPDATE" => Some(Self::Update),
86            "DELETE" => Some(Self::Delete),
87            _ => None,
88        }
89    }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct SubscriptionDescriptor {
94    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
95    pub name: String,
96    pub source: String,
97    pub target_queue: String,
98    pub ops_filter: Vec<SubscriptionOperation>,
99    pub where_filter: Option<String>,
100    pub redact_fields: Vec<String>,
101    pub enabled: bool,
102    /// When true, events are routed to the bare `target_queue` regardless of
103    /// the current tenant — a cluster-wide subscription. When false (default),
104    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
105    /// context is active, enforcing per-tenant isolation.
106    pub all_tenants: bool,
107}
108
109/// A graph-analytics output declared by `CREATE GRAPH ... WITH ANALYTICS (...)`.
110///
111/// Each variant maps to a family of pure graph algorithms (issues #795-#797)
112/// and resolves as a virtual `<graph>.<output>` view returning that family's
113/// native row shape. The `using` option selects the concrete algorithm inside
114/// the family (e.g. `centrality (using = pagerank)`); the remaining options are
115/// algorithm parameters carried verbatim into the executor.
116#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
117pub enum AnalyticsOutput {
118    Communities,
119    Components,
120    Centrality,
121}
122
123impl AnalyticsOutput {
124    pub fn as_str(self) -> &'static str {
125        match self {
126            Self::Communities => "communities",
127            Self::Components => "components",
128            Self::Centrality => "centrality",
129        }
130    }
131
132    pub fn from_str(value: &str) -> Option<Self> {
133        match value.to_ascii_lowercase().as_str() {
134            "communities" => Some(Self::Communities),
135            "components" => Some(Self::Components),
136            "centrality" => Some(Self::Centrality),
137            _ => None,
138        }
139    }
140}
141
142/// One enabled analytics output plus its declared options. Persisted in the
143/// parent graph's `CollectionContract` (WAL-backed) and surfaced on the
144/// `CollectionDescriptor` so the resolver can recognise `<graph>.<output>`.
145///
146/// `SHOW COLLECTIONS` behaviour (issue #800 HITL decision): analytics outputs
147/// resolve as virtual `<graph>.<output>` views and are deliberately **not**
148/// registered as top-level collections. They therefore never appear in
149/// `SHOW COLLECTIONS` — not by default and not under `SHOW COLLECTIONS
150/// INCLUDING INTERNAL` — keeping the parent graph's listing clean. Only the
151/// parent graph collection is listed; its enabled outputs are introspectable
152/// through this `analytics_config` on the parent's descriptor.
153#[derive(Debug, Clone, PartialEq)]
154pub struct AnalyticsViewDescriptor {
155    pub output: AnalyticsOutput,
156    /// `using = <algorithm>` — concrete algorithm within the output family.
157    /// `None` resolves to the family default (louvain / connected-components /
158    /// pagerank).
159    pub algorithm: Option<String>,
160    /// `resolution = <f64>` — Louvain resolution (γ) for `communities`.
161    pub resolution: Option<f64>,
162    /// `max_iterations = <i64>` — iteration cap for iterative centralities.
163    pub max_iterations: Option<i64>,
164    /// `tolerance = <f64>` — convergence tolerance for iterative centralities.
165    pub tolerance: Option<f64>,
166}
167
168#[derive(Debug, Clone)]
169pub struct CollectionDescriptor {
170    pub name: String,
171    pub model: CollectionModel,
172    pub schema_mode: SchemaMode,
173    pub contract_present: bool,
174    pub contract_origin: Option<crate::physical::ContractOrigin>,
175    pub declared_model: Option<CollectionModel>,
176    pub observed_model: CollectionModel,
177    pub queue_mode: Option<QueueMode>,
178    /// MAX_ATTEMPTS per-queue policy. Hot-fields tier — populated for
179    /// `model = queue` from the catalog snapshot for sub-ms reads by
180    /// `QueueLifecycle`. `None` for non-queue collections.
181    pub queue_max_attempts: Option<u32>,
182    /// LOCK_DEADLINE_MS per-queue policy. Hot-fields tier.
183    pub queue_lock_deadline_ms: Option<u64>,
184    /// IN_FLIGHT_CAP_PER_GROUP per-queue policy. Hot-fields tier.
185    pub queue_in_flight_cap_per_group: Option<u32>,
186    /// DLQ target collection name. `None` means on-max drop.
187    pub queue_dlq_target: Option<String>,
188    pub vector_dimension: Option<usize>,
189    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
190    /// `WITH SESSION_KEY <col>` declared on the timeseries collection.
191    /// `None` for non-timeseries and for timeseries without the clause.
192    /// Issue #576 slice 1.
193    pub session_key: Option<String>,
194    /// `SESSION_GAP <duration>` in milliseconds declared on the
195    /// timeseries collection. Issue #576 slice 1.
196    pub session_gap_ms: Option<u64>,
197    /// Declarative retention duration in milliseconds. `None` means no
198    /// retention policy is enforced. Set via `ALTER COLLECTION ... SET
199    /// RETENTION <duration>`. Issue #580 slice 1.
200    pub retention_duration_ms: Option<u64>,
201    pub declared_schema_mode: Option<SchemaMode>,
202    pub observed_schema_mode: SchemaMode,
203    pub entities: usize,
204    pub cross_refs: usize,
205    pub segments: usize,
206    pub indices: Vec<String>,
207    pub declared_indices: Vec<String>,
208    pub operational_indices: Vec<String>,
209    pub indexes_in_sync: bool,
210    pub missing_operational_indices: Vec<String>,
211    pub undeclared_operational_indices: Vec<String>,
212    pub queryable_index_count: usize,
213    pub indexes_requiring_rebuild_count: usize,
214    pub queryable_graph_projection_count: usize,
215    pub graph_projections_requiring_rematerialization_count: usize,
216    pub executable_analytics_job_count: usize,
217    pub analytics_jobs_requiring_rerun_count: usize,
218    pub subscriptions: Vec<SubscriptionDescriptor>,
219    /// Analytics views declared by `WITH ANALYTICS (...)`. Empty for
220    /// collections without the clause. Sourced from the persisted
221    /// `CollectionContract` so it survives restarts (issue #800).
222    pub analytics_config: Vec<AnalyticsViewDescriptor>,
223    pub resources_in_sync: bool,
224    pub attention_required: bool,
225    pub attention_score: usize,
226    pub attention_reasons: Vec<String>,
227}
228
229#[derive(Debug, Clone)]
230pub struct CatalogModelSnapshot {
231    pub summary: CatalogSnapshot,
232    pub collections: Vec<CollectionDescriptor>,
233    pub indices: IndexCatalogSnapshot,
234    pub declared_indexes: Vec<PhysicalIndexState>,
235    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
236    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
237    pub operational_indexes: Vec<PhysicalIndexState>,
238    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
239    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
240    pub index_statuses: Vec<CatalogIndexStatus>,
241    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
242    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
243    pub queryable_index_count: usize,
244    pub indexes_requiring_rebuild_count: usize,
245    pub queryable_graph_projection_count: usize,
246    pub graph_projections_requiring_rematerialization_count: usize,
247    pub executable_analytics_job_count: usize,
248    pub analytics_jobs_requiring_rerun_count: usize,
249}
250
251#[derive(Debug, Clone)]
252pub struct CatalogAttentionSummary {
253    pub collections_requiring_attention: usize,
254    pub indexes_requiring_attention: usize,
255    pub graph_projections_requiring_attention: usize,
256    pub analytics_jobs_requiring_attention: usize,
257    pub top_collection: Option<CollectionDescriptor>,
258    pub top_index: Option<CatalogIndexStatus>,
259    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
260    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
261}
262
263#[derive(Debug, Clone, Default)]
264pub struct CatalogDeclarations {
265    pub declared_indexes: Vec<PhysicalIndexState>,
266    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
267    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
268    pub operational_indexes: Vec<PhysicalIndexState>,
269    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
270    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
271}
272
273#[derive(Debug, Clone)]
274pub struct CatalogGraphProjectionStatus {
275    pub name: String,
276    pub source: Option<String>,
277    pub lifecycle_state: String,
278    pub declared: bool,
279    pub operational: bool,
280    pub in_sync: bool,
281    pub last_materialized_sequence: Option<u64>,
282    pub queryable: bool,
283    pub requires_rematerialization: bool,
284    pub dependent_job_count: usize,
285    pub active_dependent_job_count: usize,
286    pub stale_dependent_job_count: usize,
287    pub dependent_jobs_in_sync: bool,
288    pub rerun_required: bool,
289    pub attention_score: usize,
290    pub attention_reasons: Vec<String>,
291}
292
293#[derive(Debug, Clone)]
294pub struct CatalogIndexStatus {
295    pub name: String,
296    pub collection: Option<String>,
297    pub kind: String,
298    pub declared: bool,
299    pub operational: bool,
300    pub enabled: bool,
301    pub build_state: Option<String>,
302    pub artifact_state: crate::physical::ArtifactState,
303    pub lifecycle_state: String,
304    pub in_sync: bool,
305    pub queryable: bool,
306    pub requires_rebuild: bool,
307    pub attention_score: usize,
308    pub attention_reasons: Vec<String>,
309}
310
311#[derive(Debug, Clone)]
312pub struct CatalogAnalyticsJobStatus {
313    pub id: String,
314    pub kind: String,
315    pub projection: Option<String>,
316    pub state: String,
317    pub lifecycle_state: String,
318    pub declared: bool,
319    pub operational: bool,
320    pub in_sync: bool,
321    pub last_run_sequence: Option<u64>,
322    pub projection_declared: Option<bool>,
323    pub projection_operational: Option<bool>,
324    pub projection_lifecycle_state: Option<String>,
325    pub dependency_in_sync: Option<bool>,
326    pub executable: bool,
327    pub requires_rerun: bool,
328    pub attention_score: usize,
329    pub attention_reasons: Vec<String>,
330}
331
332#[derive(Debug, Clone, Default)]
333pub struct CatalogConsistencyReport {
334    pub declared_index_count: usize,
335    pub operational_index_count: usize,
336    pub declared_graph_projection_count: usize,
337    pub operational_graph_projection_count: usize,
338    pub declared_analytics_job_count: usize,
339    pub operational_analytics_job_count: usize,
340    pub missing_operational_indexes: Vec<String>,
341    pub undeclared_operational_indexes: Vec<String>,
342    pub missing_operational_graph_projections: Vec<String>,
343    pub undeclared_operational_graph_projections: Vec<String>,
344    pub missing_operational_analytics_jobs: Vec<String>,
345    pub undeclared_operational_analytics_jobs: Vec<String>,
346}
347
348pub fn snapshot_store(
349    name: &str,
350    store: &UnifiedStore,
351    index_catalog: Option<&IndexCatalog>,
352) -> CatalogModelSnapshot {
353    snapshot_store_with_declarations(name, store, index_catalog, None, None)
354}
355
356pub fn snapshot_store_with_declarations(
357    name: &str,
358    store: &UnifiedStore,
359    index_catalog: Option<&IndexCatalog>,
360    declarations: Option<&CatalogDeclarations>,
361    contracts: Option<&[CollectionContract]>,
362) -> CatalogModelSnapshot {
363    let index_statuses = index_statuses(declarations);
364    let graph_projection_statuses = graph_projection_statuses(declarations);
365    let analytics_job_statuses = analytics_job_statuses(declarations);
366
367    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
368    for (collection, entity) in store.query_all(|_| true) {
369        grouped.entry(collection).or_default().push(entity);
370    }
371    let queue_policies = queue_policies_from_grouped(&grouped);
372
373    let mut stats_by_collection = BTreeMap::new();
374    let mut collections = Vec::new();
375
376    for collection_name in store.list_collections() {
377        let entities = grouped.remove(&collection_name).unwrap_or_default();
378        let inferred_model = infer_model(&entities);
379        let inferred_schema_mode = infer_schema_mode(inferred_model);
380        let contract = collection_contract(&collection_name, contracts);
381        let model = contract
382            .map(|contract| contract.declared_model)
383            .unwrap_or(inferred_model);
384        let cross_refs = entities
385            .iter()
386            .map(|entity| entity.cross_refs().len())
387            .sum();
388        let entity_count = entities.len();
389        let manager_stats = store
390            .get_collection(&collection_name)
391            .map(|manager| manager.stats());
392        let segments = manager_stats
393            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
394            .unwrap_or(0);
395
396        stats_by_collection.insert(
397            collection_name.clone(),
398            CollectionStats {
399                entities: entity_count,
400                cross_refs,
401                segments,
402            },
403        );
404
405        let declared_indices = declared_indices(&collection_name, declarations);
406        let operational_indices =
407            operational_indices(&collection_name, index_catalog, declarations);
408        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
409            collection_index_consistency(&declared_indices, &operational_indices);
410        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
411            collection_index_readiness(&collection_name, &index_statuses);
412        let (
413            queryable_graph_projection_count,
414            graph_projections_requiring_rematerialization_count,
415            graph_projections_locally_in_sync,
416        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
417        let (
418            executable_analytics_job_count,
419            analytics_jobs_requiring_rerun_count,
420            analytics_jobs_locally_in_sync,
421        ) = collection_analytics_job_readiness(
422            &collection_name,
423            &graph_projection_statuses,
424            &analytics_job_statuses,
425        );
426        let resources_in_sync = indexes_in_sync
427            && indexes_locally_in_sync
428            && graph_projections_locally_in_sync
429            && analytics_jobs_locally_in_sync;
430        let attention_required = !resources_in_sync
431            || indexes_requiring_rebuild_count > 0
432            || graph_projections_requiring_rematerialization_count > 0
433            || analytics_jobs_requiring_rerun_count > 0;
434        let mut attention_reasons = Vec::new();
435        if !indexes_in_sync || !indexes_locally_in_sync {
436            attention_reasons.push("index_drift".to_string());
437        }
438        if indexes_requiring_rebuild_count > 0 {
439            attention_reasons.push("indexes_require_rebuild".to_string());
440        }
441        if !graph_projections_locally_in_sync {
442            attention_reasons.push("graph_projection_drift".to_string());
443        }
444        if graph_projections_requiring_rematerialization_count > 0 {
445            attention_reasons.push("graph_projections_require_rematerialization".to_string());
446        }
447        if !analytics_jobs_locally_in_sync {
448            attention_reasons.push("analytics_job_drift".to_string());
449        }
450        if analytics_jobs_requiring_rerun_count > 0 {
451            attention_reasons.push("analytics_jobs_require_rerun".to_string());
452        }
453        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
454            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
455            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
456            + usize::from(!resources_in_sync);
457
458        collections.push(CollectionDescriptor {
459            name: collection_name.clone(),
460            model,
461            schema_mode: contract
462                .map(|contract| contract.schema_mode)
463                .unwrap_or(inferred_schema_mode),
464            contract_present: contract.is_some(),
465            contract_origin: contract.map(|contract| contract.origin),
466            declared_model: contract.map(|contract| contract.declared_model),
467            observed_model: inferred_model,
468            queue_mode: if model == CollectionModel::Queue {
469                Some(
470                    queue_policies
471                        .get(&collection_name)
472                        .map(|p| p.mode)
473                        .unwrap_or_default(),
474                )
475            } else {
476                None
477            },
478            queue_max_attempts: if model == CollectionModel::Queue {
479                Some(
480                    queue_policies
481                        .get(&collection_name)
482                        .map(|p| p.max_attempts)
483                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS),
484                )
485            } else {
486                None
487            },
488            queue_lock_deadline_ms: if model == CollectionModel::Queue {
489                Some(
490                    queue_policies
491                        .get(&collection_name)
492                        .map(|p| p.lock_deadline_ms)
493                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS),
494                )
495            } else {
496                None
497            },
498            queue_in_flight_cap_per_group: if model == CollectionModel::Queue {
499                Some(
500                    queue_policies
501                        .get(&collection_name)
502                        .map(|p| p.in_flight_cap_per_group)
503                        .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP),
504                )
505            } else {
506                None
507            },
508            queue_dlq_target: if model == CollectionModel::Queue {
509                queue_policies
510                    .get(&collection_name)
511                    .and_then(|p| p.dlq_target.clone())
512            } else {
513                None
514            },
515            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
516            vector_metric: contract.and_then(|contract| contract.vector_metric),
517            session_key: contract.and_then(|contract| contract.session_key.clone()),
518            session_gap_ms: contract.and_then(|contract| contract.session_gap_ms),
519            retention_duration_ms: contract.and_then(|contract| contract.retention_duration_ms),
520            declared_schema_mode: contract.map(|contract| contract.schema_mode),
521            observed_schema_mode: inferred_schema_mode,
522            entities: entity_count,
523            cross_refs,
524            segments,
525            indices: infer_indices(&collection_name, model, index_catalog, declarations),
526            declared_indices,
527            operational_indices,
528            indexes_in_sync,
529            missing_operational_indices,
530            undeclared_operational_indices,
531            queryable_index_count,
532            indexes_requiring_rebuild_count,
533            queryable_graph_projection_count,
534            graph_projections_requiring_rematerialization_count,
535            executable_analytics_job_count,
536            analytics_jobs_requiring_rerun_count,
537            subscriptions: contract
538                .map(|contract| contract.subscriptions.clone())
539                .unwrap_or_default(),
540            analytics_config: contract
541                .map(|contract| contract.analytics_config.clone())
542                .unwrap_or_default(),
543            resources_in_sync,
544            attention_required,
545            attention_score,
546            attention_reasons,
547        });
548    }
549
550    collections.sort_by(|left, right| left.name.cmp(&right.name));
551
552    let summary = CatalogSnapshot {
553        name: name.to_string(),
554        total_entities: stats_by_collection
555            .values()
556            .map(|stats| stats.entities)
557            .sum(),
558        total_collections: stats_by_collection.len(),
559        stats_by_collection,
560        updated_at: SystemTime::now(),
561    };
562
563    CatalogModelSnapshot {
564        summary,
565        collections,
566        indices: index_catalog
567            .map(IndexCatalog::snapshot)
568            .unwrap_or_default(),
569        declared_indexes: declarations
570            .map(|declarations| declarations.declared_indexes.clone())
571            .unwrap_or_default(),
572        declared_graph_projections: declarations
573            .map(|declarations| declarations.declared_graph_projections.clone())
574            .unwrap_or_default(),
575        declared_analytics_jobs: declarations
576            .map(|declarations| declarations.declared_analytics_jobs.clone())
577            .unwrap_or_default(),
578        operational_indexes: declarations
579            .map(|declarations| declarations.operational_indexes.clone())
580            .unwrap_or_default(),
581        operational_graph_projections: declarations
582            .map(|declarations| declarations.operational_graph_projections.clone())
583            .unwrap_or_default(),
584        operational_analytics_jobs: declarations
585            .map(|declarations| declarations.operational_analytics_jobs.clone())
586            .unwrap_or_default(),
587        queryable_index_count: index_statuses
588            .iter()
589            .filter(|status| status.queryable)
590            .count(),
591        indexes_requiring_rebuild_count: index_statuses
592            .iter()
593            .filter(|status| status.requires_rebuild)
594            .count(),
595        queryable_graph_projection_count: graph_projection_statuses
596            .iter()
597            .filter(|status| status.queryable)
598            .count(),
599        graph_projections_requiring_rematerialization_count: graph_projection_statuses
600            .iter()
601            .filter(|status| status.requires_rematerialization)
602            .count(),
603        executable_analytics_job_count: analytics_job_statuses
604            .iter()
605            .filter(|status| status.executable)
606            .count(),
607        analytics_jobs_requiring_rerun_count: analytics_job_statuses
608            .iter()
609            .filter(|status| status.requires_rerun)
610            .count(),
611        index_statuses,
612        graph_projection_statuses,
613        analytics_job_statuses,
614    }
615}
616
617/// Hot-fields snapshot of per-queue policy, materialised at catalog
618/// snapshot time so `QueueLifecycle` (and the descriptor) can read
619/// values without re-scanning `red_queue_meta`.
620#[derive(Debug, Clone)]
621pub struct QueuePolicySnapshot {
622    pub mode: QueueMode,
623    pub max_attempts: u32,
624    pub lock_deadline_ms: u64,
625    pub in_flight_cap_per_group: u32,
626    pub dlq_target: Option<String>,
627}
628
629fn queue_policies_from_grouped(
630    grouped: &HashMap<String, Vec<UnifiedEntity>>,
631) -> HashMap<String, QueuePolicySnapshot> {
632    let Some(meta) = grouped.get("red_queue_meta") else {
633        return HashMap::new();
634    };
635
636    let mut policies = HashMap::new();
637    for entity in meta {
638        let Some(row) = entity.data.as_row() else {
639            continue;
640        };
641        if row_text(row, "kind").as_deref() != Some("queue_config") {
642            continue;
643        }
644        let Some(queue) = row_text(row, "queue") else {
645            continue;
646        };
647        let mode = row_text(row, "mode")
648            .as_deref()
649            .and_then(QueueMode::parse)
650            .unwrap_or_default();
651        let max_attempts = row_u64(row, "max_attempts")
652            .map(|v| v as u32)
653            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_MAX_ATTEMPTS);
654        let lock_deadline_ms = row_u64(row, "lock_deadline_ms")
655            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_LOCK_DEADLINE_MS);
656        let in_flight_cap_per_group = row_u64(row, "in_flight_cap_per_group")
657            .map(|v| v as u32)
658            .unwrap_or(crate::storage::query::DEFAULT_QUEUE_IN_FLIGHT_CAP_PER_GROUP);
659        let dlq_target = row_text(row, "dlq");
660        policies.insert(
661            queue,
662            QueuePolicySnapshot {
663                mode,
664                max_attempts,
665                lock_deadline_ms,
666                in_flight_cap_per_group,
667                dlq_target,
668            },
669        );
670    }
671    policies
672}
673
674fn row_u64(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<u64> {
675    match row.get_field(key) {
676        Some(Value::UnsignedInteger(v)) => Some(*v),
677        Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
678        _ => None,
679    }
680}
681
682fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
683    match row.get_field(key) {
684        Some(Value::Text(value)) => Some(value.to_string()),
685        _ => None,
686    }
687}
688
689fn collection_contract<'a>(
690    collection_name: &str,
691    contracts: Option<&'a [CollectionContract]>,
692) -> Option<&'a CollectionContract> {
693    contracts.and_then(|contracts| {
694        contracts
695            .iter()
696            .find(|contract| contract.name == collection_name)
697    })
698}
699
700fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
701    let mut has_table = false;
702    let mut has_graph = false;
703    let mut has_vector = false;
704
705    for entity in entities {
706        match &entity.kind {
707            EntityKind::TableRow { .. } => has_table = true,
708            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
709            EntityKind::Vector { .. } => has_vector = true,
710            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
711        }
712    }
713
714    match (has_table, has_graph, has_vector) {
715        (true, false, false) => CollectionModel::Table,
716        (false, true, false) => CollectionModel::Graph,
717        (false, false, true) => CollectionModel::Vector,
718        _ => CollectionModel::Mixed,
719    }
720}
721
722fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
723    match model {
724        CollectionModel::Table => SchemaMode::Strict,
725        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
726        CollectionModel::Document
727        | CollectionModel::Hll
728        | CollectionModel::Sketch
729        | CollectionModel::Filter
730        | CollectionModel::Kv
731        | CollectionModel::Config
732        | CollectionModel::Vault
733        | CollectionModel::Mixed => SchemaMode::Dynamic,
734        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
735        CollectionModel::Queue => SchemaMode::Dynamic,
736    }
737}
738
739fn infer_indices(
740    collection_name: &str,
741    model: CollectionModel,
742    index_catalog: Option<&IndexCatalog>,
743    declarations: Option<&CatalogDeclarations>,
744) -> Vec<String> {
745    let declared = declared_indices(collection_name, declarations);
746    if !declared.is_empty() {
747        return declared;
748    }
749
750    let available = index_catalog
751        .map(IndexCatalog::snapshot)
752        .unwrap_or_default();
753    let mut selected = Vec::new();
754
755    for metric in available {
756        let relevant = matches!(
757            (model, metric.kind),
758            (_, IndexKind::BTree)
759                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
760                | (CollectionModel::Vector, IndexKind::VectorHnsw)
761                | (CollectionModel::Vector, IndexKind::VectorInverted)
762                | (CollectionModel::Vector, IndexKind::VectorTurbo)
763                | (CollectionModel::Document, IndexKind::FullText)
764                | (CollectionModel::Document, IndexKind::DocumentPathValue)
765                | (CollectionModel::Kv, IndexKind::Hash)
766                | (CollectionModel::Config, IndexKind::Hash)
767                | (CollectionModel::Vault, IndexKind::Hash)
768                | (CollectionModel::Mixed, _)
769        );
770
771        if relevant && metric.enabled {
772            selected.push(metric.name);
773        }
774    }
775
776    selected
777}
778
779fn declared_indices(
780    collection_name: &str,
781    declarations: Option<&CatalogDeclarations>,
782) -> Vec<String> {
783    let mut selected = declarations
784        .map(|declarations| {
785            declarations
786                .declared_indexes
787                .iter()
788                .filter(|index| index.collection.as_deref() == Some(collection_name))
789                .map(|index| index.name.clone())
790                .collect::<Vec<_>>()
791        })
792        .unwrap_or_default();
793    selected.sort();
794    selected.dedup();
795    selected
796}
797
798fn operational_indices(
799    collection_name: &str,
800    index_catalog: Option<&IndexCatalog>,
801    declarations: Option<&CatalogDeclarations>,
802) -> Vec<String> {
803    if let Some(declarations) = declarations {
804        let mut selected = declarations
805            .operational_indexes
806            .iter()
807            .filter(|index| index.collection.as_deref() == Some(collection_name))
808            .map(|index| index.name.clone())
809            .collect::<Vec<_>>();
810        selected.sort();
811        selected.dedup();
812        return selected;
813    }
814
815    let mut selected = index_catalog
816        .map(IndexCatalog::snapshot)
817        .unwrap_or_default()
818        .into_iter()
819        .filter(|metric| metric.enabled)
820        .filter_map(|metric| {
821            if metric.name.starts_with(collection_name) {
822                Some(metric.name)
823            } else {
824                None
825            }
826        })
827        .collect::<Vec<_>>();
828    selected.sort();
829    selected.dedup();
830    selected
831}
832
833fn collection_index_consistency(
834    declared_indices: &[String],
835    operational_indices: &[String],
836) -> (bool, Vec<String>, Vec<String>) {
837    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
838    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
839    let missing_operational_indices = declared
840        .difference(&operational)
841        .cloned()
842        .collect::<Vec<_>>();
843    let undeclared_operational_indices = operational
844        .difference(&declared)
845        .cloned()
846        .collect::<Vec<_>>();
847    (
848        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
849        missing_operational_indices,
850        undeclared_operational_indices,
851    )
852}
853
854fn collection_index_readiness(
855    collection_name: &str,
856    statuses: &[CatalogIndexStatus],
857) -> (usize, usize, bool) {
858    let local = statuses
859        .iter()
860        .filter(|status| status.collection.as_deref() == Some(collection_name))
861        .collect::<Vec<_>>();
862    let queryable_count = local.iter().filter(|status| status.queryable).count();
863    let requires_rebuild_count = local
864        .iter()
865        .filter(|status| status.requires_rebuild)
866        .count();
867    let locally_in_sync = local.iter().all(|status| status.in_sync);
868    (queryable_count, requires_rebuild_count, locally_in_sync)
869}
870
871fn collection_graph_projection_readiness(
872    collection_name: &str,
873    statuses: &[CatalogGraphProjectionStatus],
874) -> (usize, usize, bool) {
875    let local = statuses
876        .iter()
877        .filter(|status| status.source.as_deref() == Some(collection_name))
878        .collect::<Vec<_>>();
879    let queryable_count = local.iter().filter(|status| status.queryable).count();
880    let requires_rematerialization_count = local
881        .iter()
882        .filter(|status| status.requires_rematerialization)
883        .count();
884    let locally_in_sync = local
885        .iter()
886        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
887    (
888        queryable_count,
889        requires_rematerialization_count,
890        locally_in_sync,
891    )
892}
893
894fn collection_analytics_job_readiness(
895    collection_name: &str,
896    projection_statuses: &[CatalogGraphProjectionStatus],
897    job_statuses: &[CatalogAnalyticsJobStatus],
898) -> (usize, usize, bool) {
899    let local = job_statuses
900        .iter()
901        .filter(|status| {
902            let Some(projection_name) = status.projection.as_deref() else {
903                return false;
904            };
905            projection_statuses
906                .iter()
907                .find(|projection| projection.name == projection_name)
908                .and_then(|projection| projection.source.as_deref())
909                == Some(collection_name)
910        })
911        .collect::<Vec<_>>();
912    let executable_count = local.iter().filter(|status| status.executable).count();
913    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
914    let locally_in_sync = local
915        .iter()
916        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
917    (executable_count, requires_rerun_count, locally_in_sync)
918}
919
920fn graph_projection_statuses(
921    declarations: Option<&CatalogDeclarations>,
922) -> Vec<CatalogGraphProjectionStatus> {
923    let declared = declarations
924        .map(|declarations| declarations.declared_graph_projections.as_slice())
925        .unwrap_or(&[]);
926    let operational = declarations
927        .map(|declarations| declarations.operational_graph_projections.as_slice())
928        .unwrap_or(&[]);
929    let declared_jobs = declarations
930        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
931        .unwrap_or(&[]);
932    let operational_jobs = declarations
933        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
934        .unwrap_or(&[]);
935
936    let mut names = declared
937        .iter()
938        .map(|projection| projection.name.clone())
939        .chain(operational.iter().map(|projection| projection.name.clone()))
940        .collect::<Vec<_>>();
941    names.sort();
942    names.dedup();
943
944    names
945        .into_iter()
946        .map(|name| {
947            let declared_projection = declared.iter().find(|projection| projection.name == name);
948            let operational_projection = operational
949                .iter()
950                .find(|projection| projection.name == name);
951            let declared_present = declared_projection.is_some();
952            let operational_present = operational_projection.is_some();
953            let mut dependent_job_ids = BTreeSet::new();
954            let mut dependent_job_count = 0usize;
955            let mut active_dependent_job_count = 0usize;
956            let mut stale_dependent_job_count = 0usize;
957            for job in declared_jobs
958                .iter()
959                .chain(operational_jobs.iter())
960                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
961            {
962                if !dependent_job_ids.insert(job.id.clone()) {
963                    continue;
964                }
965                dependent_job_count += 1;
966                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
967                    active_dependent_job_count += 1;
968                }
969                if job.state == "stale" {
970                    stale_dependent_job_count += 1;
971                }
972            }
973            let lifecycle_state = match (
974                declared_present,
975                operational_present,
976                declared_projection
977                    .map(|projection| projection.state.as_str())
978                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
979                    .unwrap_or_default(),
980            ) {
981                (true, _, "failed") => "failed",
982                (true, true, "stale") => "stale",
983                (true, _, "materializing") => "materializing",
984                (true, true, "materialized") => "materialized",
985                (true, true, _) => "materialized",
986                (false, true, _) => "orphaned-operational",
987                (true, false, _) => "declared",
988                (false, false, _) => "unknown",
989            }
990            .to_string();
991            let queryable = declared_present
992                && operational_present
993                && matches!(
994                    declared_projection
995                        .map(|projection| projection.state.as_str())
996                        .or_else(
997                            || operational_projection.map(|projection| projection.state.as_str())
998                        )
999                        .unwrap_or_default(),
1000                    "materialized"
1001                );
1002            let requires_rematerialization = matches!(
1003                declared_projection
1004                    .map(|projection| projection.state.as_str())
1005                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
1006                    .unwrap_or_default(),
1007                "declared" | "materializing" | "failed" | "stale"
1008            );
1009            let mut attention_reasons = Vec::new();
1010            if !declared_present || !operational_present {
1011                attention_reasons.push("declaration_drift".to_string());
1012            }
1013            if requires_rematerialization {
1014                attention_reasons.push("requires_rematerialization".to_string());
1015            }
1016            if stale_dependent_job_count > 0 {
1017                attention_reasons.push("dependent_jobs_stale".to_string());
1018            }
1019            let attention_score = stale_dependent_job_count.saturating_mul(2)
1020                + usize::from(requires_rematerialization).saturating_mul(4)
1021                + usize::from(!declared_present || !operational_present)
1022                + usize::from(!queryable);
1023            CatalogGraphProjectionStatus {
1024                name,
1025                source: declared_projection
1026                    .map(|projection| projection.source.clone())
1027                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
1028                lifecycle_state,
1029                declared: declared_present,
1030                operational: operational_present,
1031                in_sync: declared_present == operational_present,
1032                last_materialized_sequence: declared_projection
1033                    .and_then(|projection| projection.last_materialized_sequence)
1034                    .or_else(|| {
1035                        operational_projection
1036                            .and_then(|projection| projection.last_materialized_sequence)
1037                    }),
1038                queryable,
1039                requires_rematerialization,
1040                dependent_job_count,
1041                active_dependent_job_count,
1042                stale_dependent_job_count,
1043                dependent_jobs_in_sync: stale_dependent_job_count == 0,
1044                rerun_required: stale_dependent_job_count > 0,
1045                attention_score,
1046                attention_reasons,
1047            }
1048        })
1049        .collect()
1050}
1051
1052fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
1053    let declared = declarations
1054        .map(|declarations| declarations.declared_indexes.as_slice())
1055        .unwrap_or(&[]);
1056    let operational = declarations
1057        .map(|declarations| declarations.operational_indexes.as_slice())
1058        .unwrap_or(&[]);
1059
1060    let mut names = declared
1061        .iter()
1062        .map(|index| index.name.clone())
1063        .chain(operational.iter().map(|index| index.name.clone()))
1064        .collect::<Vec<_>>();
1065    names.sort();
1066    names.dedup();
1067
1068    names
1069        .into_iter()
1070        .map(|name| {
1071            let declared_index = declared.iter().find(|index| index.name == name);
1072            let operational_index = operational.iter().find(|index| index.name == name);
1073            let collection = declared_index
1074                .and_then(|index| index.collection.clone())
1075                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
1076            let kind = declared_index
1077                .map(|index| index_kind_string(index.kind))
1078                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
1079                .unwrap_or_default();
1080            let enabled = declared_index
1081                .map(|index| index.enabled)
1082                .or_else(|| operational_index.map(|index| index.enabled))
1083                .unwrap_or(false);
1084            let build_state = operational_index
1085                .map(|index| index.build_state.clone())
1086                .or_else(|| declared_index.map(|index| index.build_state.clone()));
1087            let declared_present = declared_index.is_some();
1088            let operational_present = operational_index.is_some();
1089            let lifecycle_state = index_lifecycle_state(
1090                declared_present,
1091                operational_present,
1092                enabled,
1093                build_state.as_deref(),
1094            );
1095
1096            let mut attention_reasons = Vec::new();
1097            if declared_present != operational_present {
1098                attention_reasons.push("declaration_drift".to_string());
1099            }
1100            if !enabled && declared_present {
1101                attention_reasons.push("disabled".to_string());
1102            }
1103            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
1104                attention_reasons.push("failed".to_string());
1105            }
1106            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
1107                attention_reasons.push("stale".to_string());
1108            }
1109            if matches!(
1110                build_state.as_deref().unwrap_or_default(),
1111                "building" | "declared-unbuilt"
1112            ) {
1113                attention_reasons.push("requires_rebuild".to_string());
1114            }
1115            let queryable = declared_present
1116                && operational_present
1117                && enabled
1118                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
1119            let requires_rebuild = matches!(
1120                build_state.as_deref().unwrap_or_default(),
1121                "declared-unbuilt" | "building" | "stale" | "failed"
1122            );
1123            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
1124                + usize::from(!enabled && declared_present)
1125                + usize::from(declared_present != operational_present)
1126                + usize::from(!queryable);
1127
1128            let artifact_state = crate::physical::ArtifactState::from_build_state(
1129                build_state.as_deref().unwrap_or("declared-unbuilt"),
1130                enabled,
1131            );
1132            CatalogIndexStatus {
1133                name,
1134                collection,
1135                kind,
1136                declared: declared_present,
1137                operational: operational_present,
1138                enabled,
1139                build_state,
1140                artifact_state,
1141                lifecycle_state,
1142                in_sync: declared_present == operational_present,
1143                queryable,
1144                requires_rebuild,
1145                attention_score,
1146                attention_reasons,
1147            }
1148        })
1149        .collect()
1150}
1151
1152fn index_lifecycle_state(
1153    declared: bool,
1154    operational: bool,
1155    enabled: bool,
1156    build_state: Option<&str>,
1157) -> String {
1158    if !declared && operational {
1159        return "orphaned-operational".to_string();
1160    }
1161    if declared && !enabled {
1162        return "disabled".to_string();
1163    }
1164    if !declared {
1165        return "unknown".to_string();
1166    }
1167    if !operational {
1168        return "declared".to_string();
1169    }
1170
1171    match build_state.unwrap_or_default() {
1172        "ready" => "ready".to_string(),
1173        "failed" => "failed".to_string(),
1174        "stale" => "stale".to_string(),
1175        "declared-unbuilt" => "declared".to_string(),
1176        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
1177            "building".to_string()
1178        }
1179        _ => "building".to_string(),
1180    }
1181}
1182
1183fn index_kind_string(kind: IndexKind) -> String {
1184    match kind {
1185        IndexKind::BTree => "btree",
1186        IndexKind::Hash => "hash",
1187        IndexKind::Bitmap => "bitmap",
1188        IndexKind::Spatial => "spatial.rtree",
1189        IndexKind::VectorHnsw => "vector.hnsw",
1190        IndexKind::VectorInverted => "vector.inverted",
1191        IndexKind::VectorTurbo => "vector.turbo",
1192        IndexKind::GraphAdjacency => "graph.adjacency",
1193        IndexKind::FullText => "text.fulltext",
1194        IndexKind::DocumentPathValue => "document.pathvalue",
1195        IndexKind::HybridSearch => "search.hybrid",
1196    }
1197    .to_string()
1198}
1199
1200fn analytics_job_statuses(
1201    declarations: Option<&CatalogDeclarations>,
1202) -> Vec<CatalogAnalyticsJobStatus> {
1203    let declared = declarations
1204        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1205        .unwrap_or(&[]);
1206    let operational = declarations
1207        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1208        .unwrap_or(&[]);
1209    let declared_projections = declarations
1210        .map(|declarations| declarations.declared_graph_projections.as_slice())
1211        .unwrap_or(&[]);
1212    let operational_projections = declarations
1213        .map(|declarations| declarations.operational_graph_projections.as_slice())
1214        .unwrap_or(&[]);
1215
1216    let mut ids = declared
1217        .iter()
1218        .map(|job| job.id.clone())
1219        .chain(operational.iter().map(|job| job.id.clone()))
1220        .collect::<Vec<_>>();
1221    ids.sort();
1222    ids.dedup();
1223
1224    ids.into_iter()
1225        .map(|id| {
1226            let declared_job = declared.iter().find(|job| job.id == id);
1227            let operational_job = operational.iter().find(|job| job.id == id);
1228            let kind = declared_job
1229                .map(|job| job.kind.clone())
1230                .or_else(|| operational_job.map(|job| job.kind.clone()))
1231                .unwrap_or_default();
1232            let projection = declared_job
1233                .and_then(|job| job.projection.clone())
1234                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1235            let state = declared_job
1236                .map(|job| job.state.clone())
1237                .or_else(|| operational_job.map(|job| job.state.clone()))
1238                .unwrap_or_default();
1239            let declared_present = declared_job.is_some();
1240            let operational_present = operational_job.is_some();
1241            let last_run_sequence = declared_job
1242                .and_then(|job| job.last_run_sequence)
1243                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1244            let projection_declared = projection.as_ref().map(|projection_name| {
1245                declared_projections
1246                    .iter()
1247                    .any(|projection| projection.name == *projection_name)
1248            });
1249            let projection_operational = projection.as_ref().map(|projection_name| {
1250                operational_projections
1251                    .iter()
1252                    .any(|projection| projection.name == *projection_name)
1253            });
1254            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1255                declared_projections
1256                    .iter()
1257                    .find(|projection| projection.name == *projection_name)
1258                    .map(|projection| projection.state.as_str())
1259                    .or_else(|| {
1260                        operational_projections
1261                            .iter()
1262                            .find(|projection| projection.name == *projection_name)
1263                            .map(|projection| projection.state.as_str())
1264                    })
1265            });
1266            let dependency_in_sync = projection.as_ref().map(|_| {
1267                matches!(projection_lifecycle, Some("materialized"))
1268                    && projection_operational == Some(true)
1269            });
1270            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1271                (true, false, _) => "declared",
1272                (true, true, _)
1273                    if matches!(
1274                        projection_lifecycle,
1275                        Some("stale" | "failed" | "materializing" | "declared")
1276                    ) =>
1277                {
1278                    "stale"
1279                }
1280                (true, true, "completed") => "completed",
1281                (true, true, "running") => "running",
1282                (true, true, "failed") => "failed",
1283                (true, true, "queued") => "queued",
1284                (true, true, "stale") => "stale",
1285                (true, true, _) => "materialized",
1286                (false, true, _) => "orphaned-operational",
1287                (false, false, _) => "unknown",
1288            }
1289            .to_string();
1290            let executable = declared_present
1291                && operational_present
1292                && !matches!(state.as_str(), "failed" | "stale")
1293                && dependency_in_sync.unwrap_or(true);
1294            let requires_rerun =
1295                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1296            let mut attention_reasons = Vec::new();
1297            if declared_present != operational_present {
1298                attention_reasons.push("declaration_drift".to_string());
1299            }
1300            if matches!(state.as_str(), "failed") {
1301                attention_reasons.push("failed".to_string());
1302            }
1303            if matches!(state.as_str(), "stale") {
1304                attention_reasons.push("stale".to_string());
1305            }
1306            if dependency_in_sync == Some(false) {
1307                attention_reasons.push("dependency_drift".to_string());
1308            }
1309            if requires_rerun {
1310                attention_reasons.push("requires_rerun".to_string());
1311            }
1312            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1313                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1314                + usize::from(declared_present != operational_present)
1315                + usize::from(!executable);
1316            CatalogAnalyticsJobStatus {
1317                id,
1318                kind,
1319                projection,
1320                state: state.clone(),
1321                lifecycle_state,
1322                declared: declared_present,
1323                operational: operational_present,
1324                in_sync: declared_present == operational_present,
1325                last_run_sequence,
1326                projection_declared,
1327                projection_operational,
1328                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1329                dependency_in_sync,
1330                executable,
1331                requires_rerun,
1332                attention_score,
1333                attention_reasons,
1334            }
1335        })
1336        .collect()
1337}
1338
1339pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1340    let declared_indexes = snapshot
1341        .declared_indexes
1342        .iter()
1343        .map(|index| index.name.clone())
1344        .collect::<BTreeSet<_>>();
1345    let operational_indexes = snapshot
1346        .operational_indexes
1347        .iter()
1348        .map(|index| index.name.clone())
1349        .collect::<BTreeSet<_>>();
1350    let declared_graph_projections = snapshot
1351        .declared_graph_projections
1352        .iter()
1353        .map(|projection| projection.name.clone())
1354        .collect::<BTreeSet<_>>();
1355    let operational_graph_projections = snapshot
1356        .operational_graph_projections
1357        .iter()
1358        .map(|projection| projection.name.clone())
1359        .collect::<BTreeSet<_>>();
1360    let declared_analytics_jobs = snapshot
1361        .declared_analytics_jobs
1362        .iter()
1363        .map(|job| job.id.clone())
1364        .collect::<BTreeSet<_>>();
1365    let operational_analytics_jobs = snapshot
1366        .operational_analytics_jobs
1367        .iter()
1368        .map(|job| job.id.clone())
1369        .collect::<BTreeSet<_>>();
1370
1371    CatalogConsistencyReport {
1372        declared_index_count: declared_indexes.len(),
1373        operational_index_count: operational_indexes.len(),
1374        declared_graph_projection_count: declared_graph_projections.len(),
1375        operational_graph_projection_count: operational_graph_projections.len(),
1376        declared_analytics_job_count: declared_analytics_jobs.len(),
1377        operational_analytics_job_count: operational_analytics_jobs.len(),
1378        missing_operational_indexes: declared_indexes
1379            .difference(&operational_indexes)
1380            .cloned()
1381            .collect(),
1382        undeclared_operational_indexes: operational_indexes
1383            .difference(&declared_indexes)
1384            .cloned()
1385            .collect(),
1386        missing_operational_graph_projections: declared_graph_projections
1387            .difference(&operational_graph_projections)
1388            .cloned()
1389            .collect(),
1390        undeclared_operational_graph_projections: operational_graph_projections
1391            .difference(&declared_graph_projections)
1392            .cloned()
1393            .collect(),
1394        missing_operational_analytics_jobs: declared_analytics_jobs
1395            .difference(&operational_analytics_jobs)
1396            .cloned()
1397            .collect(),
1398        undeclared_operational_analytics_jobs: operational_analytics_jobs
1399            .difference(&declared_analytics_jobs)
1400            .cloned()
1401            .collect(),
1402    }
1403}
1404
1405pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1406    CatalogAttentionSummary {
1407        collections_requiring_attention: snapshot
1408            .collections
1409            .iter()
1410            .filter(|collection| collection.attention_required)
1411            .count(),
1412        indexes_requiring_attention: snapshot
1413            .index_statuses
1414            .iter()
1415            .filter(|status| status.attention_score > 0)
1416            .count(),
1417        graph_projections_requiring_attention: snapshot
1418            .graph_projection_statuses
1419            .iter()
1420            .filter(|status| status.attention_score > 0)
1421            .count(),
1422        analytics_jobs_requiring_attention: snapshot
1423            .analytics_job_statuses
1424            .iter()
1425            .filter(|status| status.attention_score > 0)
1426            .count(),
1427        top_collection: snapshot
1428            .collections
1429            .iter()
1430            .filter(|collection| collection.attention_score > 0)
1431            .max_by_key(|collection| collection.attention_score)
1432            .cloned(),
1433        top_index: snapshot
1434            .index_statuses
1435            .iter()
1436            .filter(|status| status.attention_score > 0)
1437            .max_by_key(|status| status.attention_score)
1438            .cloned(),
1439        top_graph_projection: snapshot
1440            .graph_projection_statuses
1441            .iter()
1442            .filter(|status| status.attention_score > 0)
1443            .max_by_key(|status| status.attention_score)
1444            .cloned(),
1445        top_analytics_job: snapshot
1446            .analytics_job_statuses
1447            .iter()
1448            .filter(|status| status.attention_score > 0)
1449            .max_by_key(|status| status.attention_score)
1450            .cloned(),
1451    }
1452}
1453
1454pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1455    let mut collections = snapshot
1456        .collections
1457        .iter()
1458        .filter(|collection| collection.attention_required)
1459        .cloned()
1460        .collect::<Vec<_>>();
1461    collections.sort_by(|left, right| {
1462        right
1463            .attention_score
1464            .cmp(&left.attention_score)
1465            .then_with(|| left.name.cmp(&right.name))
1466    });
1467    collections
1468}
1469
1470pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1471    let mut statuses = snapshot
1472        .index_statuses
1473        .iter()
1474        .filter(|status| status.attention_score > 0)
1475        .cloned()
1476        .collect::<Vec<_>>();
1477    statuses.sort_by(|left, right| {
1478        right
1479            .attention_score
1480            .cmp(&left.attention_score)
1481            .then_with(|| left.name.cmp(&right.name))
1482    });
1483    statuses
1484}
1485
1486pub fn graph_projection_attention(
1487    snapshot: &CatalogModelSnapshot,
1488) -> Vec<CatalogGraphProjectionStatus> {
1489    let mut statuses = snapshot
1490        .graph_projection_statuses
1491        .iter()
1492        .filter(|status| status.attention_score > 0)
1493        .cloned()
1494        .collect::<Vec<_>>();
1495    statuses.sort_by(|left, right| {
1496        right
1497            .attention_score
1498            .cmp(&left.attention_score)
1499            .then_with(|| left.name.cmp(&right.name))
1500    });
1501    statuses
1502}
1503
1504pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1505    let mut statuses = snapshot
1506        .analytics_job_statuses
1507        .iter()
1508        .filter(|status| status.attention_score > 0)
1509        .cloned()
1510        .collect::<Vec<_>>();
1511    statuses.sort_by(|left, right| {
1512        right
1513            .attention_score
1514            .cmp(&left.attention_score)
1515            .then_with(|| left.id.cmp(&right.id))
1516    });
1517    statuses
1518}