Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Hll,
23    Sketch,
24    Filter,
25    Kv,
26    Config,
27    Vault,
28    Mixed,
29    TimeSeries,
30    Queue,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum SchemaMode {
35    Strict,
36    SemiStructured,
37    Dynamic,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum SubscriptionOperation {
42    Insert,
43    Update,
44    Delete,
45}
46
47impl SubscriptionOperation {
48    pub fn as_str(self) -> &'static str {
49        match self {
50            Self::Insert => "INSERT",
51            Self::Update => "UPDATE",
52            Self::Delete => "DELETE",
53        }
54    }
55
56    pub fn from_str(value: &str) -> Option<Self> {
57        match value.to_ascii_uppercase().as_str() {
58            "INSERT" => Some(Self::Insert),
59            "UPDATE" => Some(Self::Update),
60            "DELETE" => Some(Self::Delete),
61            _ => None,
62        }
63    }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct SubscriptionDescriptor {
68    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
69    pub name: String,
70    pub source: String,
71    pub target_queue: String,
72    pub ops_filter: Vec<SubscriptionOperation>,
73    pub where_filter: Option<String>,
74    pub redact_fields: Vec<String>,
75    pub enabled: bool,
76    /// When true, events are routed to the bare `target_queue` regardless of
77    /// the current tenant — a cluster-wide subscription. When false (default),
78    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
79    /// context is active, enforcing per-tenant isolation.
80    pub all_tenants: bool,
81}
82
83#[derive(Debug, Clone)]
84pub struct CollectionDescriptor {
85    pub name: String,
86    pub model: CollectionModel,
87    pub schema_mode: SchemaMode,
88    pub contract_present: bool,
89    pub contract_origin: Option<crate::physical::ContractOrigin>,
90    pub declared_model: Option<CollectionModel>,
91    pub observed_model: CollectionModel,
92    pub queue_mode: Option<QueueMode>,
93    pub vector_dimension: Option<usize>,
94    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
95    pub declared_schema_mode: Option<SchemaMode>,
96    pub observed_schema_mode: SchemaMode,
97    pub entities: usize,
98    pub cross_refs: usize,
99    pub segments: usize,
100    pub indices: Vec<String>,
101    pub declared_indices: Vec<String>,
102    pub operational_indices: Vec<String>,
103    pub indexes_in_sync: bool,
104    pub missing_operational_indices: Vec<String>,
105    pub undeclared_operational_indices: Vec<String>,
106    pub queryable_index_count: usize,
107    pub indexes_requiring_rebuild_count: usize,
108    pub queryable_graph_projection_count: usize,
109    pub graph_projections_requiring_rematerialization_count: usize,
110    pub executable_analytics_job_count: usize,
111    pub analytics_jobs_requiring_rerun_count: usize,
112    pub subscriptions: Vec<SubscriptionDescriptor>,
113    pub resources_in_sync: bool,
114    pub attention_required: bool,
115    pub attention_score: usize,
116    pub attention_reasons: Vec<String>,
117}
118
119#[derive(Debug, Clone)]
120pub struct CatalogModelSnapshot {
121    pub summary: CatalogSnapshot,
122    pub collections: Vec<CollectionDescriptor>,
123    pub indices: IndexCatalogSnapshot,
124    pub declared_indexes: Vec<PhysicalIndexState>,
125    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
126    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
127    pub operational_indexes: Vec<PhysicalIndexState>,
128    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
129    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
130    pub index_statuses: Vec<CatalogIndexStatus>,
131    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
132    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
133    pub queryable_index_count: usize,
134    pub indexes_requiring_rebuild_count: usize,
135    pub queryable_graph_projection_count: usize,
136    pub graph_projections_requiring_rematerialization_count: usize,
137    pub executable_analytics_job_count: usize,
138    pub analytics_jobs_requiring_rerun_count: usize,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogAttentionSummary {
143    pub collections_requiring_attention: usize,
144    pub indexes_requiring_attention: usize,
145    pub graph_projections_requiring_attention: usize,
146    pub analytics_jobs_requiring_attention: usize,
147    pub top_collection: Option<CollectionDescriptor>,
148    pub top_index: Option<CatalogIndexStatus>,
149    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
150    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct CatalogDeclarations {
155    pub declared_indexes: Vec<PhysicalIndexState>,
156    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
157    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
158    pub operational_indexes: Vec<PhysicalIndexState>,
159    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
160    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogGraphProjectionStatus {
165    pub name: String,
166    pub source: Option<String>,
167    pub lifecycle_state: String,
168    pub declared: bool,
169    pub operational: bool,
170    pub in_sync: bool,
171    pub last_materialized_sequence: Option<u64>,
172    pub queryable: bool,
173    pub requires_rematerialization: bool,
174    pub dependent_job_count: usize,
175    pub active_dependent_job_count: usize,
176    pub stale_dependent_job_count: usize,
177    pub dependent_jobs_in_sync: bool,
178    pub rerun_required: bool,
179    pub attention_score: usize,
180    pub attention_reasons: Vec<String>,
181}
182
183#[derive(Debug, Clone)]
184pub struct CatalogIndexStatus {
185    pub name: String,
186    pub collection: Option<String>,
187    pub kind: String,
188    pub declared: bool,
189    pub operational: bool,
190    pub enabled: bool,
191    pub build_state: Option<String>,
192    pub artifact_state: crate::physical::ArtifactState,
193    pub lifecycle_state: String,
194    pub in_sync: bool,
195    pub queryable: bool,
196    pub requires_rebuild: bool,
197    pub attention_score: usize,
198    pub attention_reasons: Vec<String>,
199}
200
201#[derive(Debug, Clone)]
202pub struct CatalogAnalyticsJobStatus {
203    pub id: String,
204    pub kind: String,
205    pub projection: Option<String>,
206    pub state: String,
207    pub lifecycle_state: String,
208    pub declared: bool,
209    pub operational: bool,
210    pub in_sync: bool,
211    pub last_run_sequence: Option<u64>,
212    pub projection_declared: Option<bool>,
213    pub projection_operational: Option<bool>,
214    pub projection_lifecycle_state: Option<String>,
215    pub dependency_in_sync: Option<bool>,
216    pub executable: bool,
217    pub requires_rerun: bool,
218    pub attention_score: usize,
219    pub attention_reasons: Vec<String>,
220}
221
222#[derive(Debug, Clone, Default)]
223pub struct CatalogConsistencyReport {
224    pub declared_index_count: usize,
225    pub operational_index_count: usize,
226    pub declared_graph_projection_count: usize,
227    pub operational_graph_projection_count: usize,
228    pub declared_analytics_job_count: usize,
229    pub operational_analytics_job_count: usize,
230    pub missing_operational_indexes: Vec<String>,
231    pub undeclared_operational_indexes: Vec<String>,
232    pub missing_operational_graph_projections: Vec<String>,
233    pub undeclared_operational_graph_projections: Vec<String>,
234    pub missing_operational_analytics_jobs: Vec<String>,
235    pub undeclared_operational_analytics_jobs: Vec<String>,
236}
237
238pub fn snapshot_store(
239    name: &str,
240    store: &UnifiedStore,
241    index_catalog: Option<&IndexCatalog>,
242) -> CatalogModelSnapshot {
243    snapshot_store_with_declarations(name, store, index_catalog, None, None)
244}
245
246pub fn snapshot_store_with_declarations(
247    name: &str,
248    store: &UnifiedStore,
249    index_catalog: Option<&IndexCatalog>,
250    declarations: Option<&CatalogDeclarations>,
251    contracts: Option<&[CollectionContract]>,
252) -> CatalogModelSnapshot {
253    let index_statuses = index_statuses(declarations);
254    let graph_projection_statuses = graph_projection_statuses(declarations);
255    let analytics_job_statuses = analytics_job_statuses(declarations);
256
257    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
258    for (collection, entity) in store.query_all(|_| true) {
259        grouped.entry(collection).or_default().push(entity);
260    }
261    let queue_modes = queue_modes_from_grouped(&grouped);
262
263    let mut stats_by_collection = BTreeMap::new();
264    let mut collections = Vec::new();
265
266    for collection_name in store.list_collections() {
267        let entities = grouped.remove(&collection_name).unwrap_or_default();
268        let inferred_model = infer_model(&entities);
269        let inferred_schema_mode = infer_schema_mode(inferred_model);
270        let contract = collection_contract(&collection_name, contracts);
271        let model = contract
272            .map(|contract| contract.declared_model)
273            .unwrap_or(inferred_model);
274        let cross_refs = entities
275            .iter()
276            .map(|entity| entity.cross_refs().len())
277            .sum();
278        let entity_count = entities.len();
279        let manager_stats = store
280            .get_collection(&collection_name)
281            .map(|manager| manager.stats());
282        let segments = manager_stats
283            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
284            .unwrap_or(0);
285
286        stats_by_collection.insert(
287            collection_name.clone(),
288            CollectionStats {
289                entities: entity_count,
290                cross_refs,
291                segments,
292            },
293        );
294
295        let declared_indices = declared_indices(&collection_name, declarations);
296        let operational_indices = operational_indices(&collection_name, index_catalog);
297        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
298            collection_index_consistency(&declared_indices, &operational_indices);
299        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
300            collection_index_readiness(&collection_name, &index_statuses);
301        let (
302            queryable_graph_projection_count,
303            graph_projections_requiring_rematerialization_count,
304            graph_projections_locally_in_sync,
305        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
306        let (
307            executable_analytics_job_count,
308            analytics_jobs_requiring_rerun_count,
309            analytics_jobs_locally_in_sync,
310        ) = collection_analytics_job_readiness(
311            &collection_name,
312            &graph_projection_statuses,
313            &analytics_job_statuses,
314        );
315        let resources_in_sync = indexes_in_sync
316            && indexes_locally_in_sync
317            && graph_projections_locally_in_sync
318            && analytics_jobs_locally_in_sync;
319        let attention_required = !resources_in_sync
320            || indexes_requiring_rebuild_count > 0
321            || graph_projections_requiring_rematerialization_count > 0
322            || analytics_jobs_requiring_rerun_count > 0;
323        let mut attention_reasons = Vec::new();
324        if !indexes_in_sync || !indexes_locally_in_sync {
325            attention_reasons.push("index_drift".to_string());
326        }
327        if indexes_requiring_rebuild_count > 0 {
328            attention_reasons.push("indexes_require_rebuild".to_string());
329        }
330        if !graph_projections_locally_in_sync {
331            attention_reasons.push("graph_projection_drift".to_string());
332        }
333        if graph_projections_requiring_rematerialization_count > 0 {
334            attention_reasons.push("graph_projections_require_rematerialization".to_string());
335        }
336        if !analytics_jobs_locally_in_sync {
337            attention_reasons.push("analytics_job_drift".to_string());
338        }
339        if analytics_jobs_requiring_rerun_count > 0 {
340            attention_reasons.push("analytics_jobs_require_rerun".to_string());
341        }
342        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
343            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
344            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
345            + usize::from(!resources_in_sync);
346
347        collections.push(CollectionDescriptor {
348            name: collection_name.clone(),
349            model,
350            schema_mode: contract
351                .map(|contract| contract.schema_mode)
352                .unwrap_or(inferred_schema_mode),
353            contract_present: contract.is_some(),
354            contract_origin: contract.map(|contract| contract.origin),
355            declared_model: contract.map(|contract| contract.declared_model),
356            observed_model: inferred_model,
357            queue_mode: if model == CollectionModel::Queue {
358                Some(
359                    queue_modes
360                        .get(&collection_name)
361                        .copied()
362                        .unwrap_or_default(),
363                )
364            } else {
365                None
366            },
367            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
368            vector_metric: contract.and_then(|contract| contract.vector_metric),
369            declared_schema_mode: contract.map(|contract| contract.schema_mode),
370            observed_schema_mode: inferred_schema_mode,
371            entities: entity_count,
372            cross_refs,
373            segments,
374            indices: infer_indices(&collection_name, model, index_catalog, declarations),
375            declared_indices,
376            operational_indices,
377            indexes_in_sync,
378            missing_operational_indices,
379            undeclared_operational_indices,
380            queryable_index_count,
381            indexes_requiring_rebuild_count,
382            queryable_graph_projection_count,
383            graph_projections_requiring_rematerialization_count,
384            executable_analytics_job_count,
385            analytics_jobs_requiring_rerun_count,
386            subscriptions: contract
387                .map(|contract| contract.subscriptions.clone())
388                .unwrap_or_default(),
389            resources_in_sync,
390            attention_required,
391            attention_score,
392            attention_reasons,
393        });
394    }
395
396    collections.sort_by(|left, right| left.name.cmp(&right.name));
397
398    let summary = CatalogSnapshot {
399        name: name.to_string(),
400        total_entities: stats_by_collection
401            .values()
402            .map(|stats| stats.entities)
403            .sum(),
404        total_collections: stats_by_collection.len(),
405        stats_by_collection,
406        updated_at: SystemTime::now(),
407    };
408
409    CatalogModelSnapshot {
410        summary,
411        collections,
412        indices: index_catalog
413            .map(IndexCatalog::snapshot)
414            .unwrap_or_default(),
415        declared_indexes: declarations
416            .map(|declarations| declarations.declared_indexes.clone())
417            .unwrap_or_default(),
418        declared_graph_projections: declarations
419            .map(|declarations| declarations.declared_graph_projections.clone())
420            .unwrap_or_default(),
421        declared_analytics_jobs: declarations
422            .map(|declarations| declarations.declared_analytics_jobs.clone())
423            .unwrap_or_default(),
424        operational_indexes: declarations
425            .map(|declarations| declarations.operational_indexes.clone())
426            .unwrap_or_default(),
427        operational_graph_projections: declarations
428            .map(|declarations| declarations.operational_graph_projections.clone())
429            .unwrap_or_default(),
430        operational_analytics_jobs: declarations
431            .map(|declarations| declarations.operational_analytics_jobs.clone())
432            .unwrap_or_default(),
433        queryable_index_count: index_statuses
434            .iter()
435            .filter(|status| status.queryable)
436            .count(),
437        indexes_requiring_rebuild_count: index_statuses
438            .iter()
439            .filter(|status| status.requires_rebuild)
440            .count(),
441        queryable_graph_projection_count: graph_projection_statuses
442            .iter()
443            .filter(|status| status.queryable)
444            .count(),
445        graph_projections_requiring_rematerialization_count: graph_projection_statuses
446            .iter()
447            .filter(|status| status.requires_rematerialization)
448            .count(),
449        executable_analytics_job_count: analytics_job_statuses
450            .iter()
451            .filter(|status| status.executable)
452            .count(),
453        analytics_jobs_requiring_rerun_count: analytics_job_statuses
454            .iter()
455            .filter(|status| status.requires_rerun)
456            .count(),
457        index_statuses,
458        graph_projection_statuses,
459        analytics_job_statuses,
460    }
461}
462
463fn queue_modes_from_grouped(
464    grouped: &HashMap<String, Vec<UnifiedEntity>>,
465) -> HashMap<String, QueueMode> {
466    let Some(meta) = grouped.get("red_queue_meta") else {
467        return HashMap::new();
468    };
469
470    let mut modes = HashMap::new();
471    for entity in meta {
472        let Some(row) = entity.data.as_row() else {
473            continue;
474        };
475        if row_text(row, "kind").as_deref() != Some("queue_config") {
476            continue;
477        }
478        let Some(queue) = row_text(row, "queue") else {
479            continue;
480        };
481        let mode = row_text(row, "mode")
482            .as_deref()
483            .and_then(QueueMode::parse)
484            .unwrap_or_default();
485        modes.insert(queue, mode);
486    }
487    modes
488}
489
490fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
491    match row.get_field(key) {
492        Some(Value::Text(value)) => Some(value.to_string()),
493        _ => None,
494    }
495}
496
497fn collection_contract<'a>(
498    collection_name: &str,
499    contracts: Option<&'a [CollectionContract]>,
500) -> Option<&'a CollectionContract> {
501    contracts.and_then(|contracts| {
502        contracts
503            .iter()
504            .find(|contract| contract.name == collection_name)
505    })
506}
507
508fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
509    let mut has_table = false;
510    let mut has_graph = false;
511    let mut has_vector = false;
512
513    for entity in entities {
514        match &entity.kind {
515            EntityKind::TableRow { .. } => has_table = true,
516            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
517            EntityKind::Vector { .. } => has_vector = true,
518            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
519        }
520    }
521
522    match (has_table, has_graph, has_vector) {
523        (true, false, false) => CollectionModel::Table,
524        (false, true, false) => CollectionModel::Graph,
525        (false, false, true) => CollectionModel::Vector,
526        _ => CollectionModel::Mixed,
527    }
528}
529
530fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
531    match model {
532        CollectionModel::Table => SchemaMode::Strict,
533        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
534        CollectionModel::Document
535        | CollectionModel::Hll
536        | CollectionModel::Sketch
537        | CollectionModel::Filter
538        | CollectionModel::Kv
539        | CollectionModel::Config
540        | CollectionModel::Vault
541        | CollectionModel::Mixed => SchemaMode::Dynamic,
542        CollectionModel::TimeSeries => SchemaMode::SemiStructured,
543        CollectionModel::Queue => SchemaMode::Dynamic,
544    }
545}
546
547fn infer_indices(
548    collection_name: &str,
549    model: CollectionModel,
550    index_catalog: Option<&IndexCatalog>,
551    declarations: Option<&CatalogDeclarations>,
552) -> Vec<String> {
553    let declared = declared_indices(collection_name, declarations);
554    if !declared.is_empty() {
555        return declared;
556    }
557
558    let available = index_catalog
559        .map(IndexCatalog::snapshot)
560        .unwrap_or_default();
561    let mut selected = Vec::new();
562
563    for metric in available {
564        let relevant = matches!(
565            (model, metric.kind),
566            (_, IndexKind::BTree)
567                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
568                | (CollectionModel::Vector, IndexKind::VectorHnsw)
569                | (CollectionModel::Vector, IndexKind::VectorInverted)
570                | (CollectionModel::Document, IndexKind::FullText)
571                | (CollectionModel::Document, IndexKind::DocumentPathValue)
572                | (CollectionModel::Kv, IndexKind::Hash)
573                | (CollectionModel::Config, IndexKind::Hash)
574                | (CollectionModel::Vault, IndexKind::Hash)
575                | (CollectionModel::Mixed, _)
576        );
577
578        if relevant && metric.enabled {
579            selected.push(metric.name);
580        }
581    }
582
583    selected
584}
585
586fn declared_indices(
587    collection_name: &str,
588    declarations: Option<&CatalogDeclarations>,
589) -> Vec<String> {
590    let mut selected = declarations
591        .map(|declarations| {
592            declarations
593                .declared_indexes
594                .iter()
595                .filter(|index| index.collection.as_deref() == Some(collection_name))
596                .map(|index| index.name.clone())
597                .collect::<Vec<_>>()
598        })
599        .unwrap_or_default();
600    selected.sort();
601    selected.dedup();
602    selected
603}
604
605fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
606    let mut selected = index_catalog
607        .map(IndexCatalog::snapshot)
608        .unwrap_or_default()
609        .into_iter()
610        .filter(|metric| metric.enabled)
611        .filter_map(|metric| {
612            if metric.name.starts_with(collection_name)
613                || matches!(
614                    metric.kind,
615                    IndexKind::BTree
616                        | IndexKind::GraphAdjacency
617                        | IndexKind::VectorHnsw
618                        | IndexKind::VectorInverted
619                        | IndexKind::FullText
620                        | IndexKind::DocumentPathValue
621                        | IndexKind::HybridSearch
622                )
623            {
624                Some(metric.name)
625            } else {
626                None
627            }
628        })
629        .collect::<Vec<_>>();
630    selected.sort();
631    selected.dedup();
632    selected
633}
634
635fn collection_index_consistency(
636    declared_indices: &[String],
637    operational_indices: &[String],
638) -> (bool, Vec<String>, Vec<String>) {
639    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
640    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
641    let missing_operational_indices = declared
642        .difference(&operational)
643        .cloned()
644        .collect::<Vec<_>>();
645    let undeclared_operational_indices = operational
646        .difference(&declared)
647        .cloned()
648        .collect::<Vec<_>>();
649    (
650        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
651        missing_operational_indices,
652        undeclared_operational_indices,
653    )
654}
655
656fn collection_index_readiness(
657    collection_name: &str,
658    statuses: &[CatalogIndexStatus],
659) -> (usize, usize, bool) {
660    let local = statuses
661        .iter()
662        .filter(|status| status.collection.as_deref() == Some(collection_name))
663        .collect::<Vec<_>>();
664    let queryable_count = local.iter().filter(|status| status.queryable).count();
665    let requires_rebuild_count = local
666        .iter()
667        .filter(|status| status.requires_rebuild)
668        .count();
669    let locally_in_sync = local.iter().all(|status| status.in_sync);
670    (queryable_count, requires_rebuild_count, locally_in_sync)
671}
672
673fn collection_graph_projection_readiness(
674    collection_name: &str,
675    statuses: &[CatalogGraphProjectionStatus],
676) -> (usize, usize, bool) {
677    let local = statuses
678        .iter()
679        .filter(|status| status.source.as_deref() == Some(collection_name))
680        .collect::<Vec<_>>();
681    let queryable_count = local.iter().filter(|status| status.queryable).count();
682    let requires_rematerialization_count = local
683        .iter()
684        .filter(|status| status.requires_rematerialization)
685        .count();
686    let locally_in_sync = local
687        .iter()
688        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
689    (
690        queryable_count,
691        requires_rematerialization_count,
692        locally_in_sync,
693    )
694}
695
696fn collection_analytics_job_readiness(
697    collection_name: &str,
698    projection_statuses: &[CatalogGraphProjectionStatus],
699    job_statuses: &[CatalogAnalyticsJobStatus],
700) -> (usize, usize, bool) {
701    let local = job_statuses
702        .iter()
703        .filter(|status| {
704            let Some(projection_name) = status.projection.as_deref() else {
705                return false;
706            };
707            projection_statuses
708                .iter()
709                .find(|projection| projection.name == projection_name)
710                .and_then(|projection| projection.source.as_deref())
711                == Some(collection_name)
712        })
713        .collect::<Vec<_>>();
714    let executable_count = local.iter().filter(|status| status.executable).count();
715    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
716    let locally_in_sync = local
717        .iter()
718        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
719    (executable_count, requires_rerun_count, locally_in_sync)
720}
721
722fn graph_projection_statuses(
723    declarations: Option<&CatalogDeclarations>,
724) -> Vec<CatalogGraphProjectionStatus> {
725    let declared = declarations
726        .map(|declarations| declarations.declared_graph_projections.as_slice())
727        .unwrap_or(&[]);
728    let operational = declarations
729        .map(|declarations| declarations.operational_graph_projections.as_slice())
730        .unwrap_or(&[]);
731    let declared_jobs = declarations
732        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
733        .unwrap_or(&[]);
734    let operational_jobs = declarations
735        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
736        .unwrap_or(&[]);
737
738    let mut names = declared
739        .iter()
740        .map(|projection| projection.name.clone())
741        .chain(operational.iter().map(|projection| projection.name.clone()))
742        .collect::<Vec<_>>();
743    names.sort();
744    names.dedup();
745
746    names
747        .into_iter()
748        .map(|name| {
749            let declared_projection = declared.iter().find(|projection| projection.name == name);
750            let operational_projection = operational
751                .iter()
752                .find(|projection| projection.name == name);
753            let declared_present = declared_projection.is_some();
754            let operational_present = operational_projection.is_some();
755            let mut dependent_job_ids = BTreeSet::new();
756            let mut dependent_job_count = 0usize;
757            let mut active_dependent_job_count = 0usize;
758            let mut stale_dependent_job_count = 0usize;
759            for job in declared_jobs
760                .iter()
761                .chain(operational_jobs.iter())
762                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
763            {
764                if !dependent_job_ids.insert(job.id.clone()) {
765                    continue;
766                }
767                dependent_job_count += 1;
768                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
769                    active_dependent_job_count += 1;
770                }
771                if job.state == "stale" {
772                    stale_dependent_job_count += 1;
773                }
774            }
775            let lifecycle_state = match (
776                declared_present,
777                operational_present,
778                declared_projection
779                    .map(|projection| projection.state.as_str())
780                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
781                    .unwrap_or_default(),
782            ) {
783                (true, _, "failed") => "failed",
784                (true, true, "stale") => "stale",
785                (true, _, "materializing") => "materializing",
786                (true, true, "materialized") => "materialized",
787                (true, true, _) => "materialized",
788                (false, true, _) => "orphaned-operational",
789                (true, false, _) => "declared",
790                (false, false, _) => "unknown",
791            }
792            .to_string();
793            let queryable = declared_present
794                && operational_present
795                && matches!(
796                    declared_projection
797                        .map(|projection| projection.state.as_str())
798                        .or_else(
799                            || operational_projection.map(|projection| projection.state.as_str())
800                        )
801                        .unwrap_or_default(),
802                    "materialized"
803                );
804            let requires_rematerialization = matches!(
805                declared_projection
806                    .map(|projection| projection.state.as_str())
807                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
808                    .unwrap_or_default(),
809                "declared" | "materializing" | "failed" | "stale"
810            );
811            let mut attention_reasons = Vec::new();
812            if !declared_present || !operational_present {
813                attention_reasons.push("declaration_drift".to_string());
814            }
815            if requires_rematerialization {
816                attention_reasons.push("requires_rematerialization".to_string());
817            }
818            if stale_dependent_job_count > 0 {
819                attention_reasons.push("dependent_jobs_stale".to_string());
820            }
821            let attention_score = stale_dependent_job_count.saturating_mul(2)
822                + usize::from(requires_rematerialization).saturating_mul(4)
823                + usize::from(!declared_present || !operational_present)
824                + usize::from(!queryable);
825            CatalogGraphProjectionStatus {
826                name,
827                source: declared_projection
828                    .map(|projection| projection.source.clone())
829                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
830                lifecycle_state,
831                declared: declared_present,
832                operational: operational_present,
833                in_sync: declared_present == operational_present,
834                last_materialized_sequence: declared_projection
835                    .and_then(|projection| projection.last_materialized_sequence)
836                    .or_else(|| {
837                        operational_projection
838                            .and_then(|projection| projection.last_materialized_sequence)
839                    }),
840                queryable,
841                requires_rematerialization,
842                dependent_job_count,
843                active_dependent_job_count,
844                stale_dependent_job_count,
845                dependent_jobs_in_sync: stale_dependent_job_count == 0,
846                rerun_required: stale_dependent_job_count > 0,
847                attention_score,
848                attention_reasons,
849            }
850        })
851        .collect()
852}
853
854fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
855    let declared = declarations
856        .map(|declarations| declarations.declared_indexes.as_slice())
857        .unwrap_or(&[]);
858    let operational = declarations
859        .map(|declarations| declarations.operational_indexes.as_slice())
860        .unwrap_or(&[]);
861
862    let mut names = declared
863        .iter()
864        .map(|index| index.name.clone())
865        .chain(operational.iter().map(|index| index.name.clone()))
866        .collect::<Vec<_>>();
867    names.sort();
868    names.dedup();
869
870    names
871        .into_iter()
872        .map(|name| {
873            let declared_index = declared.iter().find(|index| index.name == name);
874            let operational_index = operational.iter().find(|index| index.name == name);
875            let collection = declared_index
876                .and_then(|index| index.collection.clone())
877                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
878            let kind = declared_index
879                .map(|index| index_kind_string(index.kind))
880                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
881                .unwrap_or_default();
882            let enabled = declared_index
883                .map(|index| index.enabled)
884                .or_else(|| operational_index.map(|index| index.enabled))
885                .unwrap_or(false);
886            let build_state = operational_index
887                .map(|index| index.build_state.clone())
888                .or_else(|| declared_index.map(|index| index.build_state.clone()));
889            let declared_present = declared_index.is_some();
890            let operational_present = operational_index.is_some();
891            let lifecycle_state = index_lifecycle_state(
892                declared_present,
893                operational_present,
894                enabled,
895                build_state.as_deref(),
896            );
897
898            let mut attention_reasons = Vec::new();
899            if declared_present != operational_present {
900                attention_reasons.push("declaration_drift".to_string());
901            }
902            if !enabled && declared_present {
903                attention_reasons.push("disabled".to_string());
904            }
905            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
906                attention_reasons.push("failed".to_string());
907            }
908            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
909                attention_reasons.push("stale".to_string());
910            }
911            if matches!(
912                build_state.as_deref().unwrap_or_default(),
913                "building" | "declared-unbuilt"
914            ) {
915                attention_reasons.push("requires_rebuild".to_string());
916            }
917            let queryable = declared_present
918                && operational_present
919                && enabled
920                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
921            let requires_rebuild = matches!(
922                build_state.as_deref().unwrap_or_default(),
923                "declared-unbuilt" | "building" | "stale" | "failed"
924            );
925            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
926                + usize::from(!enabled && declared_present)
927                + usize::from(declared_present != operational_present)
928                + usize::from(!queryable);
929
930            let artifact_state = crate::physical::ArtifactState::from_build_state(
931                build_state.as_deref().unwrap_or("declared-unbuilt"),
932                enabled,
933            );
934            CatalogIndexStatus {
935                name,
936                collection,
937                kind,
938                declared: declared_present,
939                operational: operational_present,
940                enabled,
941                build_state,
942                artifact_state,
943                lifecycle_state,
944                in_sync: declared_present == operational_present,
945                queryable,
946                requires_rebuild,
947                attention_score,
948                attention_reasons,
949            }
950        })
951        .collect()
952}
953
954fn index_lifecycle_state(
955    declared: bool,
956    operational: bool,
957    enabled: bool,
958    build_state: Option<&str>,
959) -> String {
960    if !declared && operational {
961        return "orphaned-operational".to_string();
962    }
963    if declared && !enabled {
964        return "disabled".to_string();
965    }
966    if !declared {
967        return "unknown".to_string();
968    }
969    if !operational {
970        return "declared".to_string();
971    }
972
973    match build_state.unwrap_or_default() {
974        "ready" => "ready".to_string(),
975        "failed" => "failed".to_string(),
976        "stale" => "stale".to_string(),
977        "declared-unbuilt" => "declared".to_string(),
978        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
979            "building".to_string()
980        }
981        _ => "building".to_string(),
982    }
983}
984
985fn index_kind_string(kind: IndexKind) -> String {
986    match kind {
987        IndexKind::BTree => "btree",
988        IndexKind::Hash => "hash",
989        IndexKind::Bitmap => "bitmap",
990        IndexKind::Spatial => "spatial.rtree",
991        IndexKind::VectorHnsw => "vector.hnsw",
992        IndexKind::VectorInverted => "vector.inverted",
993        IndexKind::GraphAdjacency => "graph.adjacency",
994        IndexKind::FullText => "text.fulltext",
995        IndexKind::DocumentPathValue => "document.pathvalue",
996        IndexKind::HybridSearch => "search.hybrid",
997    }
998    .to_string()
999}
1000
1001fn analytics_job_statuses(
1002    declarations: Option<&CatalogDeclarations>,
1003) -> Vec<CatalogAnalyticsJobStatus> {
1004    let declared = declarations
1005        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1006        .unwrap_or(&[]);
1007    let operational = declarations
1008        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1009        .unwrap_or(&[]);
1010    let declared_projections = declarations
1011        .map(|declarations| declarations.declared_graph_projections.as_slice())
1012        .unwrap_or(&[]);
1013    let operational_projections = declarations
1014        .map(|declarations| declarations.operational_graph_projections.as_slice())
1015        .unwrap_or(&[]);
1016
1017    let mut ids = declared
1018        .iter()
1019        .map(|job| job.id.clone())
1020        .chain(operational.iter().map(|job| job.id.clone()))
1021        .collect::<Vec<_>>();
1022    ids.sort();
1023    ids.dedup();
1024
1025    ids.into_iter()
1026        .map(|id| {
1027            let declared_job = declared.iter().find(|job| job.id == id);
1028            let operational_job = operational.iter().find(|job| job.id == id);
1029            let kind = declared_job
1030                .map(|job| job.kind.clone())
1031                .or_else(|| operational_job.map(|job| job.kind.clone()))
1032                .unwrap_or_default();
1033            let projection = declared_job
1034                .and_then(|job| job.projection.clone())
1035                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1036            let state = declared_job
1037                .map(|job| job.state.clone())
1038                .or_else(|| operational_job.map(|job| job.state.clone()))
1039                .unwrap_or_default();
1040            let declared_present = declared_job.is_some();
1041            let operational_present = operational_job.is_some();
1042            let last_run_sequence = declared_job
1043                .and_then(|job| job.last_run_sequence)
1044                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1045            let projection_declared = projection.as_ref().map(|projection_name| {
1046                declared_projections
1047                    .iter()
1048                    .any(|projection| projection.name == *projection_name)
1049            });
1050            let projection_operational = projection.as_ref().map(|projection_name| {
1051                operational_projections
1052                    .iter()
1053                    .any(|projection| projection.name == *projection_name)
1054            });
1055            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1056                declared_projections
1057                    .iter()
1058                    .find(|projection| projection.name == *projection_name)
1059                    .map(|projection| projection.state.as_str())
1060                    .or_else(|| {
1061                        operational_projections
1062                            .iter()
1063                            .find(|projection| projection.name == *projection_name)
1064                            .map(|projection| projection.state.as_str())
1065                    })
1066            });
1067            let dependency_in_sync = projection.as_ref().map(|_| {
1068                matches!(projection_lifecycle, Some("materialized"))
1069                    && projection_operational == Some(true)
1070            });
1071            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1072                (true, false, _) => "declared",
1073                (true, true, _)
1074                    if matches!(
1075                        projection_lifecycle,
1076                        Some("stale" | "failed" | "materializing" | "declared")
1077                    ) =>
1078                {
1079                    "stale"
1080                }
1081                (true, true, "completed") => "completed",
1082                (true, true, "running") => "running",
1083                (true, true, "failed") => "failed",
1084                (true, true, "queued") => "queued",
1085                (true, true, "stale") => "stale",
1086                (true, true, _) => "materialized",
1087                (false, true, _) => "orphaned-operational",
1088                (false, false, _) => "unknown",
1089            }
1090            .to_string();
1091            let executable = declared_present
1092                && operational_present
1093                && !matches!(state.as_str(), "failed" | "stale")
1094                && dependency_in_sync.unwrap_or(true);
1095            let requires_rerun =
1096                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1097            let mut attention_reasons = Vec::new();
1098            if declared_present != operational_present {
1099                attention_reasons.push("declaration_drift".to_string());
1100            }
1101            if matches!(state.as_str(), "failed") {
1102                attention_reasons.push("failed".to_string());
1103            }
1104            if matches!(state.as_str(), "stale") {
1105                attention_reasons.push("stale".to_string());
1106            }
1107            if dependency_in_sync == Some(false) {
1108                attention_reasons.push("dependency_drift".to_string());
1109            }
1110            if requires_rerun {
1111                attention_reasons.push("requires_rerun".to_string());
1112            }
1113            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1114                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1115                + usize::from(declared_present != operational_present)
1116                + usize::from(!executable);
1117            CatalogAnalyticsJobStatus {
1118                id,
1119                kind,
1120                projection,
1121                state: state.clone(),
1122                lifecycle_state,
1123                declared: declared_present,
1124                operational: operational_present,
1125                in_sync: declared_present == operational_present,
1126                last_run_sequence,
1127                projection_declared,
1128                projection_operational,
1129                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1130                dependency_in_sync,
1131                executable,
1132                requires_rerun,
1133                attention_score,
1134                attention_reasons,
1135            }
1136        })
1137        .collect()
1138}
1139
1140pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1141    let declared_indexes = snapshot
1142        .declared_indexes
1143        .iter()
1144        .map(|index| index.name.clone())
1145        .collect::<BTreeSet<_>>();
1146    let operational_indexes = snapshot
1147        .operational_indexes
1148        .iter()
1149        .map(|index| index.name.clone())
1150        .collect::<BTreeSet<_>>();
1151    let declared_graph_projections = snapshot
1152        .declared_graph_projections
1153        .iter()
1154        .map(|projection| projection.name.clone())
1155        .collect::<BTreeSet<_>>();
1156    let operational_graph_projections = snapshot
1157        .operational_graph_projections
1158        .iter()
1159        .map(|projection| projection.name.clone())
1160        .collect::<BTreeSet<_>>();
1161    let declared_analytics_jobs = snapshot
1162        .declared_analytics_jobs
1163        .iter()
1164        .map(|job| job.id.clone())
1165        .collect::<BTreeSet<_>>();
1166    let operational_analytics_jobs = snapshot
1167        .operational_analytics_jobs
1168        .iter()
1169        .map(|job| job.id.clone())
1170        .collect::<BTreeSet<_>>();
1171
1172    CatalogConsistencyReport {
1173        declared_index_count: declared_indexes.len(),
1174        operational_index_count: operational_indexes.len(),
1175        declared_graph_projection_count: declared_graph_projections.len(),
1176        operational_graph_projection_count: operational_graph_projections.len(),
1177        declared_analytics_job_count: declared_analytics_jobs.len(),
1178        operational_analytics_job_count: operational_analytics_jobs.len(),
1179        missing_operational_indexes: declared_indexes
1180            .difference(&operational_indexes)
1181            .cloned()
1182            .collect(),
1183        undeclared_operational_indexes: operational_indexes
1184            .difference(&declared_indexes)
1185            .cloned()
1186            .collect(),
1187        missing_operational_graph_projections: declared_graph_projections
1188            .difference(&operational_graph_projections)
1189            .cloned()
1190            .collect(),
1191        undeclared_operational_graph_projections: operational_graph_projections
1192            .difference(&declared_graph_projections)
1193            .cloned()
1194            .collect(),
1195        missing_operational_analytics_jobs: declared_analytics_jobs
1196            .difference(&operational_analytics_jobs)
1197            .cloned()
1198            .collect(),
1199        undeclared_operational_analytics_jobs: operational_analytics_jobs
1200            .difference(&declared_analytics_jobs)
1201            .cloned()
1202            .collect(),
1203    }
1204}
1205
1206pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1207    CatalogAttentionSummary {
1208        collections_requiring_attention: snapshot
1209            .collections
1210            .iter()
1211            .filter(|collection| collection.attention_required)
1212            .count(),
1213        indexes_requiring_attention: snapshot
1214            .index_statuses
1215            .iter()
1216            .filter(|status| status.attention_score > 0)
1217            .count(),
1218        graph_projections_requiring_attention: snapshot
1219            .graph_projection_statuses
1220            .iter()
1221            .filter(|status| status.attention_score > 0)
1222            .count(),
1223        analytics_jobs_requiring_attention: snapshot
1224            .analytics_job_statuses
1225            .iter()
1226            .filter(|status| status.attention_score > 0)
1227            .count(),
1228        top_collection: snapshot
1229            .collections
1230            .iter()
1231            .filter(|collection| collection.attention_score > 0)
1232            .max_by_key(|collection| collection.attention_score)
1233            .cloned(),
1234        top_index: snapshot
1235            .index_statuses
1236            .iter()
1237            .filter(|status| status.attention_score > 0)
1238            .max_by_key(|status| status.attention_score)
1239            .cloned(),
1240        top_graph_projection: snapshot
1241            .graph_projection_statuses
1242            .iter()
1243            .filter(|status| status.attention_score > 0)
1244            .max_by_key(|status| status.attention_score)
1245            .cloned(),
1246        top_analytics_job: snapshot
1247            .analytics_job_statuses
1248            .iter()
1249            .filter(|status| status.attention_score > 0)
1250            .max_by_key(|status| status.attention_score)
1251            .cloned(),
1252    }
1253}
1254
1255pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1256    let mut collections = snapshot
1257        .collections
1258        .iter()
1259        .filter(|collection| collection.attention_required)
1260        .cloned()
1261        .collect::<Vec<_>>();
1262    collections.sort_by(|left, right| {
1263        right
1264            .attention_score
1265            .cmp(&left.attention_score)
1266            .then_with(|| left.name.cmp(&right.name))
1267    });
1268    collections
1269}
1270
1271pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1272    let mut statuses = snapshot
1273        .index_statuses
1274        .iter()
1275        .filter(|status| status.attention_score > 0)
1276        .cloned()
1277        .collect::<Vec<_>>();
1278    statuses.sort_by(|left, right| {
1279        right
1280            .attention_score
1281            .cmp(&left.attention_score)
1282            .then_with(|| left.name.cmp(&right.name))
1283    });
1284    statuses
1285}
1286
1287pub fn graph_projection_attention(
1288    snapshot: &CatalogModelSnapshot,
1289) -> Vec<CatalogGraphProjectionStatus> {
1290    let mut statuses = snapshot
1291        .graph_projection_statuses
1292        .iter()
1293        .filter(|status| status.attention_score > 0)
1294        .cloned()
1295        .collect::<Vec<_>>();
1296    statuses.sort_by(|left, right| {
1297        right
1298            .attention_score
1299            .cmp(&left.attention_score)
1300            .then_with(|| left.name.cmp(&right.name))
1301    });
1302    statuses
1303}
1304
1305pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1306    let mut statuses = snapshot
1307        .analytics_job_statuses
1308        .iter()
1309        .filter(|status| status.attention_score > 0)
1310        .cloned()
1311        .collect::<Vec<_>>();
1312    statuses.sort_by(|left, right| {
1313        right
1314            .attention_score
1315            .cmp(&left.attention_score)
1316            .then_with(|| left.id.cmp(&right.id))
1317    });
1318    statuses
1319}