Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Kv,
23    Config,
24    Vault,
25    Mixed,
26    TimeSeries,
27    Queue,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum SchemaMode {
32    Strict,
33    SemiStructured,
34    Dynamic,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum SubscriptionOperation {
39    Insert,
40    Update,
41    Delete,
42}
43
44impl SubscriptionOperation {
45    pub fn as_str(self) -> &'static str {
46        match self {
47            Self::Insert => "INSERT",
48            Self::Update => "UPDATE",
49            Self::Delete => "DELETE",
50        }
51    }
52
53    pub fn from_str(value: &str) -> Option<Self> {
54        match value.to_ascii_uppercase().as_str() {
55            "INSERT" => Some(Self::Insert),
56            "UPDATE" => Some(Self::Update),
57            "DELETE" => Some(Self::Delete),
58            _ => None,
59        }
60    }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct SubscriptionDescriptor {
65    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
66    pub name: String,
67    pub source: String,
68    pub target_queue: String,
69    pub ops_filter: Vec<SubscriptionOperation>,
70    pub where_filter: Option<String>,
71    pub redact_fields: Vec<String>,
72    pub enabled: bool,
73    /// When true, events are routed to the bare `target_queue` regardless of
74    /// the current tenant — a cluster-wide subscription. When false (default),
75    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
76    /// context is active, enforcing per-tenant isolation.
77    pub all_tenants: bool,
78}
79
80#[derive(Debug, Clone)]
81pub struct CollectionDescriptor {
82    pub name: String,
83    pub model: CollectionModel,
84    pub schema_mode: SchemaMode,
85    pub contract_present: bool,
86    pub contract_origin: Option<crate::physical::ContractOrigin>,
87    pub declared_model: Option<CollectionModel>,
88    pub observed_model: CollectionModel,
89    pub queue_mode: Option<QueueMode>,
90    pub declared_schema_mode: Option<SchemaMode>,
91    pub observed_schema_mode: SchemaMode,
92    pub entities: usize,
93    pub cross_refs: usize,
94    pub segments: usize,
95    pub indices: Vec<String>,
96    pub declared_indices: Vec<String>,
97    pub operational_indices: Vec<String>,
98    pub indexes_in_sync: bool,
99    pub missing_operational_indices: Vec<String>,
100    pub undeclared_operational_indices: Vec<String>,
101    pub queryable_index_count: usize,
102    pub indexes_requiring_rebuild_count: usize,
103    pub queryable_graph_projection_count: usize,
104    pub graph_projections_requiring_rematerialization_count: usize,
105    pub executable_analytics_job_count: usize,
106    pub analytics_jobs_requiring_rerun_count: usize,
107    pub subscriptions: Vec<SubscriptionDescriptor>,
108    pub resources_in_sync: bool,
109    pub attention_required: bool,
110    pub attention_score: usize,
111    pub attention_reasons: Vec<String>,
112}
113
114#[derive(Debug, Clone)]
115pub struct CatalogModelSnapshot {
116    pub summary: CatalogSnapshot,
117    pub collections: Vec<CollectionDescriptor>,
118    pub indices: IndexCatalogSnapshot,
119    pub declared_indexes: Vec<PhysicalIndexState>,
120    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
121    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
122    pub operational_indexes: Vec<PhysicalIndexState>,
123    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
124    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
125    pub index_statuses: Vec<CatalogIndexStatus>,
126    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
127    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
128    pub queryable_index_count: usize,
129    pub indexes_requiring_rebuild_count: usize,
130    pub queryable_graph_projection_count: usize,
131    pub graph_projections_requiring_rematerialization_count: usize,
132    pub executable_analytics_job_count: usize,
133    pub analytics_jobs_requiring_rerun_count: usize,
134}
135
136#[derive(Debug, Clone)]
137pub struct CatalogAttentionSummary {
138    pub collections_requiring_attention: usize,
139    pub indexes_requiring_attention: usize,
140    pub graph_projections_requiring_attention: usize,
141    pub analytics_jobs_requiring_attention: usize,
142    pub top_collection: Option<CollectionDescriptor>,
143    pub top_index: Option<CatalogIndexStatus>,
144    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
145    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
146}
147
148#[derive(Debug, Clone, Default)]
149pub struct CatalogDeclarations {
150    pub declared_indexes: Vec<PhysicalIndexState>,
151    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
152    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
153    pub operational_indexes: Vec<PhysicalIndexState>,
154    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
155    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
156}
157
158#[derive(Debug, Clone)]
159pub struct CatalogGraphProjectionStatus {
160    pub name: String,
161    pub source: Option<String>,
162    pub lifecycle_state: String,
163    pub declared: bool,
164    pub operational: bool,
165    pub in_sync: bool,
166    pub last_materialized_sequence: Option<u64>,
167    pub queryable: bool,
168    pub requires_rematerialization: bool,
169    pub dependent_job_count: usize,
170    pub active_dependent_job_count: usize,
171    pub stale_dependent_job_count: usize,
172    pub dependent_jobs_in_sync: bool,
173    pub rerun_required: bool,
174    pub attention_score: usize,
175    pub attention_reasons: Vec<String>,
176}
177
178#[derive(Debug, Clone)]
179pub struct CatalogIndexStatus {
180    pub name: String,
181    pub collection: Option<String>,
182    pub kind: String,
183    pub declared: bool,
184    pub operational: bool,
185    pub enabled: bool,
186    pub build_state: Option<String>,
187    pub artifact_state: crate::physical::ArtifactState,
188    pub lifecycle_state: String,
189    pub in_sync: bool,
190    pub queryable: bool,
191    pub requires_rebuild: bool,
192    pub attention_score: usize,
193    pub attention_reasons: Vec<String>,
194}
195
196#[derive(Debug, Clone)]
197pub struct CatalogAnalyticsJobStatus {
198    pub id: String,
199    pub kind: String,
200    pub projection: Option<String>,
201    pub state: String,
202    pub lifecycle_state: String,
203    pub declared: bool,
204    pub operational: bool,
205    pub in_sync: bool,
206    pub last_run_sequence: Option<u64>,
207    pub projection_declared: Option<bool>,
208    pub projection_operational: Option<bool>,
209    pub projection_lifecycle_state: Option<String>,
210    pub dependency_in_sync: Option<bool>,
211    pub executable: bool,
212    pub requires_rerun: bool,
213    pub attention_score: usize,
214    pub attention_reasons: Vec<String>,
215}
216
217#[derive(Debug, Clone, Default)]
218pub struct CatalogConsistencyReport {
219    pub declared_index_count: usize,
220    pub operational_index_count: usize,
221    pub declared_graph_projection_count: usize,
222    pub operational_graph_projection_count: usize,
223    pub declared_analytics_job_count: usize,
224    pub operational_analytics_job_count: usize,
225    pub missing_operational_indexes: Vec<String>,
226    pub undeclared_operational_indexes: Vec<String>,
227    pub missing_operational_graph_projections: Vec<String>,
228    pub undeclared_operational_graph_projections: Vec<String>,
229    pub missing_operational_analytics_jobs: Vec<String>,
230    pub undeclared_operational_analytics_jobs: Vec<String>,
231}
232
233pub fn snapshot_store(
234    name: &str,
235    store: &UnifiedStore,
236    index_catalog: Option<&IndexCatalog>,
237) -> CatalogModelSnapshot {
238    snapshot_store_with_declarations(name, store, index_catalog, None, None)
239}
240
241pub fn snapshot_store_with_declarations(
242    name: &str,
243    store: &UnifiedStore,
244    index_catalog: Option<&IndexCatalog>,
245    declarations: Option<&CatalogDeclarations>,
246    contracts: Option<&[CollectionContract]>,
247) -> CatalogModelSnapshot {
248    let index_statuses = index_statuses(declarations);
249    let graph_projection_statuses = graph_projection_statuses(declarations);
250    let analytics_job_statuses = analytics_job_statuses(declarations);
251
252    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
253    for (collection, entity) in store.query_all(|_| true) {
254        grouped.entry(collection).or_default().push(entity);
255    }
256    let queue_modes = queue_modes_from_grouped(&grouped);
257
258    let mut stats_by_collection = BTreeMap::new();
259    let mut collections = Vec::new();
260
261    for collection_name in store.list_collections() {
262        let entities = grouped.remove(&collection_name).unwrap_or_default();
263        let inferred_model = infer_model(&entities);
264        let inferred_schema_mode = infer_schema_mode(inferred_model);
265        let contract = collection_contract(&collection_name, contracts);
266        let model = contract
267            .map(|contract| contract.declared_model)
268            .unwrap_or(inferred_model);
269        let cross_refs = entities
270            .iter()
271            .map(|entity| entity.cross_refs().len())
272            .sum();
273        let entity_count = entities.len();
274        let manager_stats = store
275            .get_collection(&collection_name)
276            .map(|manager| manager.stats());
277        let segments = manager_stats
278            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
279            .unwrap_or(0);
280
281        stats_by_collection.insert(
282            collection_name.clone(),
283            CollectionStats {
284                entities: entity_count,
285                cross_refs,
286                segments,
287            },
288        );
289
290        let declared_indices = declared_indices(&collection_name, declarations);
291        let operational_indices = operational_indices(&collection_name, index_catalog);
292        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
293            collection_index_consistency(&declared_indices, &operational_indices);
294        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
295            collection_index_readiness(&collection_name, &index_statuses);
296        let (
297            queryable_graph_projection_count,
298            graph_projections_requiring_rematerialization_count,
299            graph_projections_locally_in_sync,
300        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
301        let (
302            executable_analytics_job_count,
303            analytics_jobs_requiring_rerun_count,
304            analytics_jobs_locally_in_sync,
305        ) = collection_analytics_job_readiness(
306            &collection_name,
307            &graph_projection_statuses,
308            &analytics_job_statuses,
309        );
310        let resources_in_sync = indexes_in_sync
311            && indexes_locally_in_sync
312            && graph_projections_locally_in_sync
313            && analytics_jobs_locally_in_sync;
314        let attention_required = !resources_in_sync
315            || indexes_requiring_rebuild_count > 0
316            || graph_projections_requiring_rematerialization_count > 0
317            || analytics_jobs_requiring_rerun_count > 0;
318        let mut attention_reasons = Vec::new();
319        if !indexes_in_sync || !indexes_locally_in_sync {
320            attention_reasons.push("index_drift".to_string());
321        }
322        if indexes_requiring_rebuild_count > 0 {
323            attention_reasons.push("indexes_require_rebuild".to_string());
324        }
325        if !graph_projections_locally_in_sync {
326            attention_reasons.push("graph_projection_drift".to_string());
327        }
328        if graph_projections_requiring_rematerialization_count > 0 {
329            attention_reasons.push("graph_projections_require_rematerialization".to_string());
330        }
331        if !analytics_jobs_locally_in_sync {
332            attention_reasons.push("analytics_job_drift".to_string());
333        }
334        if analytics_jobs_requiring_rerun_count > 0 {
335            attention_reasons.push("analytics_jobs_require_rerun".to_string());
336        }
337        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
338            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
339            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
340            + usize::from(!resources_in_sync);
341
342        collections.push(CollectionDescriptor {
343            name: collection_name.clone(),
344            model,
345            schema_mode: contract
346                .map(|contract| contract.schema_mode)
347                .unwrap_or(inferred_schema_mode),
348            contract_present: contract.is_some(),
349            contract_origin: contract.map(|contract| contract.origin),
350            declared_model: contract.map(|contract| contract.declared_model),
351            observed_model: inferred_model,
352            queue_mode: if model == CollectionModel::Queue {
353                Some(
354                    queue_modes
355                        .get(&collection_name)
356                        .copied()
357                        .unwrap_or_default(),
358                )
359            } else {
360                None
361            },
362            declared_schema_mode: contract.map(|contract| contract.schema_mode),
363            observed_schema_mode: inferred_schema_mode,
364            entities: entity_count,
365            cross_refs,
366            segments,
367            indices: infer_indices(&collection_name, model, index_catalog, declarations),
368            declared_indices,
369            operational_indices,
370            indexes_in_sync,
371            missing_operational_indices,
372            undeclared_operational_indices,
373            queryable_index_count,
374            indexes_requiring_rebuild_count,
375            queryable_graph_projection_count,
376            graph_projections_requiring_rematerialization_count,
377            executable_analytics_job_count,
378            analytics_jobs_requiring_rerun_count,
379            subscriptions: contract
380                .map(|contract| contract.subscriptions.clone())
381                .unwrap_or_default(),
382            resources_in_sync,
383            attention_required,
384            attention_score,
385            attention_reasons,
386        });
387    }
388
389    collections.sort_by(|left, right| left.name.cmp(&right.name));
390
391    let summary = CatalogSnapshot {
392        name: name.to_string(),
393        total_entities: stats_by_collection
394            .values()
395            .map(|stats| stats.entities)
396            .sum(),
397        total_collections: stats_by_collection.len(),
398        stats_by_collection,
399        updated_at: SystemTime::now(),
400    };
401
402    CatalogModelSnapshot {
403        summary,
404        collections,
405        indices: index_catalog
406            .map(IndexCatalog::snapshot)
407            .unwrap_or_default(),
408        declared_indexes: declarations
409            .map(|declarations| declarations.declared_indexes.clone())
410            .unwrap_or_default(),
411        declared_graph_projections: declarations
412            .map(|declarations| declarations.declared_graph_projections.clone())
413            .unwrap_or_default(),
414        declared_analytics_jobs: declarations
415            .map(|declarations| declarations.declared_analytics_jobs.clone())
416            .unwrap_or_default(),
417        operational_indexes: declarations
418            .map(|declarations| declarations.operational_indexes.clone())
419            .unwrap_or_default(),
420        operational_graph_projections: declarations
421            .map(|declarations| declarations.operational_graph_projections.clone())
422            .unwrap_or_default(),
423        operational_analytics_jobs: declarations
424            .map(|declarations| declarations.operational_analytics_jobs.clone())
425            .unwrap_or_default(),
426        queryable_index_count: index_statuses
427            .iter()
428            .filter(|status| status.queryable)
429            .count(),
430        indexes_requiring_rebuild_count: index_statuses
431            .iter()
432            .filter(|status| status.requires_rebuild)
433            .count(),
434        queryable_graph_projection_count: graph_projection_statuses
435            .iter()
436            .filter(|status| status.queryable)
437            .count(),
438        graph_projections_requiring_rematerialization_count: graph_projection_statuses
439            .iter()
440            .filter(|status| status.requires_rematerialization)
441            .count(),
442        executable_analytics_job_count: analytics_job_statuses
443            .iter()
444            .filter(|status| status.executable)
445            .count(),
446        analytics_jobs_requiring_rerun_count: analytics_job_statuses
447            .iter()
448            .filter(|status| status.requires_rerun)
449            .count(),
450        index_statuses,
451        graph_projection_statuses,
452        analytics_job_statuses,
453    }
454}
455
456fn queue_modes_from_grouped(
457    grouped: &HashMap<String, Vec<UnifiedEntity>>,
458) -> HashMap<String, QueueMode> {
459    let Some(meta) = grouped.get("red_queue_meta") else {
460        return HashMap::new();
461    };
462
463    let mut modes = HashMap::new();
464    for entity in meta {
465        let Some(row) = entity.data.as_row() else {
466            continue;
467        };
468        if row_text(row, "kind").as_deref() != Some("queue_config") {
469            continue;
470        }
471        let Some(queue) = row_text(row, "queue") else {
472            continue;
473        };
474        let mode = row_text(row, "mode")
475            .as_deref()
476            .and_then(QueueMode::parse)
477            .unwrap_or_default();
478        modes.insert(queue, mode);
479    }
480    modes
481}
482
483fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
484    match row.get_field(key) {
485        Some(Value::Text(value)) => Some(value.to_string()),
486        _ => None,
487    }
488}
489
490fn collection_contract<'a>(
491    collection_name: &str,
492    contracts: Option<&'a [CollectionContract]>,
493) -> Option<&'a CollectionContract> {
494    contracts.and_then(|contracts| {
495        contracts
496            .iter()
497            .find(|contract| contract.name == collection_name)
498    })
499}
500
501fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
502    let mut has_table = false;
503    let mut has_graph = false;
504    let mut has_vector = false;
505
506    for entity in entities {
507        match &entity.kind {
508            EntityKind::TableRow { .. } => has_table = true,
509            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
510            EntityKind::Vector { .. } => has_vector = true,
511            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
512        }
513    }
514
515    match (has_table, has_graph, has_vector) {
516        (true, false, false) => CollectionModel::Table,
517        (false, true, false) => CollectionModel::Graph,
518        (false, false, true) => CollectionModel::Vector,
519        _ => CollectionModel::Mixed,
520    }
521}
522
523fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
524    match model {
525        CollectionModel::Table => SchemaMode::Strict,
526        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
527        CollectionModel::Document
528        | CollectionModel::Kv
529        | CollectionModel::Config
530        | CollectionModel::Vault
531        | CollectionModel::Mixed => SchemaMode::Dynamic,
532        CollectionModel::TimeSeries => SchemaMode::SemiStructured,
533        CollectionModel::Queue => SchemaMode::Dynamic,
534    }
535}
536
537fn infer_indices(
538    collection_name: &str,
539    model: CollectionModel,
540    index_catalog: Option<&IndexCatalog>,
541    declarations: Option<&CatalogDeclarations>,
542) -> Vec<String> {
543    let declared = declared_indices(collection_name, declarations);
544    if !declared.is_empty() {
545        return declared;
546    }
547
548    let available = index_catalog
549        .map(IndexCatalog::snapshot)
550        .unwrap_or_default();
551    let mut selected = Vec::new();
552
553    for metric in available {
554        let relevant = matches!(
555            (model, metric.kind),
556            (_, IndexKind::BTree)
557                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
558                | (CollectionModel::Vector, IndexKind::VectorHnsw)
559                | (CollectionModel::Vector, IndexKind::VectorInverted)
560                | (CollectionModel::Document, IndexKind::FullText)
561                | (CollectionModel::Document, IndexKind::DocumentPathValue)
562                | (CollectionModel::Kv, IndexKind::Hash)
563                | (CollectionModel::Config, IndexKind::Hash)
564                | (CollectionModel::Vault, IndexKind::Hash)
565                | (CollectionModel::Mixed, _)
566        );
567
568        if relevant && metric.enabled {
569            selected.push(metric.name);
570        }
571    }
572
573    selected
574}
575
576fn declared_indices(
577    collection_name: &str,
578    declarations: Option<&CatalogDeclarations>,
579) -> Vec<String> {
580    let mut selected = declarations
581        .map(|declarations| {
582            declarations
583                .declared_indexes
584                .iter()
585                .filter(|index| index.collection.as_deref() == Some(collection_name))
586                .map(|index| index.name.clone())
587                .collect::<Vec<_>>()
588        })
589        .unwrap_or_default();
590    selected.sort();
591    selected.dedup();
592    selected
593}
594
595fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
596    let mut selected = index_catalog
597        .map(IndexCatalog::snapshot)
598        .unwrap_or_default()
599        .into_iter()
600        .filter(|metric| metric.enabled)
601        .filter_map(|metric| {
602            if metric.name.starts_with(collection_name)
603                || matches!(
604                    metric.kind,
605                    IndexKind::BTree
606                        | IndexKind::GraphAdjacency
607                        | IndexKind::VectorHnsw
608                        | IndexKind::VectorInverted
609                        | IndexKind::FullText
610                        | IndexKind::DocumentPathValue
611                        | IndexKind::HybridSearch
612                )
613            {
614                Some(metric.name)
615            } else {
616                None
617            }
618        })
619        .collect::<Vec<_>>();
620    selected.sort();
621    selected.dedup();
622    selected
623}
624
625fn collection_index_consistency(
626    declared_indices: &[String],
627    operational_indices: &[String],
628) -> (bool, Vec<String>, Vec<String>) {
629    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
630    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
631    let missing_operational_indices = declared
632        .difference(&operational)
633        .cloned()
634        .collect::<Vec<_>>();
635    let undeclared_operational_indices = operational
636        .difference(&declared)
637        .cloned()
638        .collect::<Vec<_>>();
639    (
640        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
641        missing_operational_indices,
642        undeclared_operational_indices,
643    )
644}
645
646fn collection_index_readiness(
647    collection_name: &str,
648    statuses: &[CatalogIndexStatus],
649) -> (usize, usize, bool) {
650    let local = statuses
651        .iter()
652        .filter(|status| status.collection.as_deref() == Some(collection_name))
653        .collect::<Vec<_>>();
654    let queryable_count = local.iter().filter(|status| status.queryable).count();
655    let requires_rebuild_count = local
656        .iter()
657        .filter(|status| status.requires_rebuild)
658        .count();
659    let locally_in_sync = local.iter().all(|status| status.in_sync);
660    (queryable_count, requires_rebuild_count, locally_in_sync)
661}
662
663fn collection_graph_projection_readiness(
664    collection_name: &str,
665    statuses: &[CatalogGraphProjectionStatus],
666) -> (usize, usize, bool) {
667    let local = statuses
668        .iter()
669        .filter(|status| status.source.as_deref() == Some(collection_name))
670        .collect::<Vec<_>>();
671    let queryable_count = local.iter().filter(|status| status.queryable).count();
672    let requires_rematerialization_count = local
673        .iter()
674        .filter(|status| status.requires_rematerialization)
675        .count();
676    let locally_in_sync = local
677        .iter()
678        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
679    (
680        queryable_count,
681        requires_rematerialization_count,
682        locally_in_sync,
683    )
684}
685
686fn collection_analytics_job_readiness(
687    collection_name: &str,
688    projection_statuses: &[CatalogGraphProjectionStatus],
689    job_statuses: &[CatalogAnalyticsJobStatus],
690) -> (usize, usize, bool) {
691    let local = job_statuses
692        .iter()
693        .filter(|status| {
694            let Some(projection_name) = status.projection.as_deref() else {
695                return false;
696            };
697            projection_statuses
698                .iter()
699                .find(|projection| projection.name == projection_name)
700                .and_then(|projection| projection.source.as_deref())
701                == Some(collection_name)
702        })
703        .collect::<Vec<_>>();
704    let executable_count = local.iter().filter(|status| status.executable).count();
705    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
706    let locally_in_sync = local
707        .iter()
708        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
709    (executable_count, requires_rerun_count, locally_in_sync)
710}
711
712fn graph_projection_statuses(
713    declarations: Option<&CatalogDeclarations>,
714) -> Vec<CatalogGraphProjectionStatus> {
715    let declared = declarations
716        .map(|declarations| declarations.declared_graph_projections.as_slice())
717        .unwrap_or(&[]);
718    let operational = declarations
719        .map(|declarations| declarations.operational_graph_projections.as_slice())
720        .unwrap_or(&[]);
721    let declared_jobs = declarations
722        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
723        .unwrap_or(&[]);
724    let operational_jobs = declarations
725        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
726        .unwrap_or(&[]);
727
728    let mut names = declared
729        .iter()
730        .map(|projection| projection.name.clone())
731        .chain(operational.iter().map(|projection| projection.name.clone()))
732        .collect::<Vec<_>>();
733    names.sort();
734    names.dedup();
735
736    names
737        .into_iter()
738        .map(|name| {
739            let declared_projection = declared.iter().find(|projection| projection.name == name);
740            let operational_projection = operational
741                .iter()
742                .find(|projection| projection.name == name);
743            let declared_present = declared_projection.is_some();
744            let operational_present = operational_projection.is_some();
745            let mut dependent_job_ids = BTreeSet::new();
746            let mut dependent_job_count = 0usize;
747            let mut active_dependent_job_count = 0usize;
748            let mut stale_dependent_job_count = 0usize;
749            for job in declared_jobs
750                .iter()
751                .chain(operational_jobs.iter())
752                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
753            {
754                if !dependent_job_ids.insert(job.id.clone()) {
755                    continue;
756                }
757                dependent_job_count += 1;
758                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
759                    active_dependent_job_count += 1;
760                }
761                if job.state == "stale" {
762                    stale_dependent_job_count += 1;
763                }
764            }
765            let lifecycle_state = match (
766                declared_present,
767                operational_present,
768                declared_projection
769                    .map(|projection| projection.state.as_str())
770                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
771                    .unwrap_or_default(),
772            ) {
773                (true, _, "failed") => "failed",
774                (true, true, "stale") => "stale",
775                (true, _, "materializing") => "materializing",
776                (true, true, "materialized") => "materialized",
777                (true, true, _) => "materialized",
778                (false, true, _) => "orphaned-operational",
779                (true, false, _) => "declared",
780                (false, false, _) => "unknown",
781            }
782            .to_string();
783            let queryable = declared_present
784                && operational_present
785                && matches!(
786                    declared_projection
787                        .map(|projection| projection.state.as_str())
788                        .or_else(
789                            || operational_projection.map(|projection| projection.state.as_str())
790                        )
791                        .unwrap_or_default(),
792                    "materialized"
793                );
794            let requires_rematerialization = matches!(
795                declared_projection
796                    .map(|projection| projection.state.as_str())
797                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
798                    .unwrap_or_default(),
799                "declared" | "materializing" | "failed" | "stale"
800            );
801            let mut attention_reasons = Vec::new();
802            if !declared_present || !operational_present {
803                attention_reasons.push("declaration_drift".to_string());
804            }
805            if requires_rematerialization {
806                attention_reasons.push("requires_rematerialization".to_string());
807            }
808            if stale_dependent_job_count > 0 {
809                attention_reasons.push("dependent_jobs_stale".to_string());
810            }
811            let attention_score = stale_dependent_job_count.saturating_mul(2)
812                + usize::from(requires_rematerialization).saturating_mul(4)
813                + usize::from(!declared_present || !operational_present)
814                + usize::from(!queryable);
815            CatalogGraphProjectionStatus {
816                name,
817                source: declared_projection
818                    .map(|projection| projection.source.clone())
819                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
820                lifecycle_state,
821                declared: declared_present,
822                operational: operational_present,
823                in_sync: declared_present == operational_present,
824                last_materialized_sequence: declared_projection
825                    .and_then(|projection| projection.last_materialized_sequence)
826                    .or_else(|| {
827                        operational_projection
828                            .and_then(|projection| projection.last_materialized_sequence)
829                    }),
830                queryable,
831                requires_rematerialization,
832                dependent_job_count,
833                active_dependent_job_count,
834                stale_dependent_job_count,
835                dependent_jobs_in_sync: stale_dependent_job_count == 0,
836                rerun_required: stale_dependent_job_count > 0,
837                attention_score,
838                attention_reasons,
839            }
840        })
841        .collect()
842}
843
844fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
845    let declared = declarations
846        .map(|declarations| declarations.declared_indexes.as_slice())
847        .unwrap_or(&[]);
848    let operational = declarations
849        .map(|declarations| declarations.operational_indexes.as_slice())
850        .unwrap_or(&[]);
851
852    let mut names = declared
853        .iter()
854        .map(|index| index.name.clone())
855        .chain(operational.iter().map(|index| index.name.clone()))
856        .collect::<Vec<_>>();
857    names.sort();
858    names.dedup();
859
860    names
861        .into_iter()
862        .map(|name| {
863            let declared_index = declared.iter().find(|index| index.name == name);
864            let operational_index = operational.iter().find(|index| index.name == name);
865            let collection = declared_index
866                .and_then(|index| index.collection.clone())
867                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
868            let kind = declared_index
869                .map(|index| index_kind_string(index.kind))
870                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
871                .unwrap_or_default();
872            let enabled = declared_index
873                .map(|index| index.enabled)
874                .or_else(|| operational_index.map(|index| index.enabled))
875                .unwrap_or(false);
876            let build_state = operational_index
877                .map(|index| index.build_state.clone())
878                .or_else(|| declared_index.map(|index| index.build_state.clone()));
879            let declared_present = declared_index.is_some();
880            let operational_present = operational_index.is_some();
881            let lifecycle_state = index_lifecycle_state(
882                declared_present,
883                operational_present,
884                enabled,
885                build_state.as_deref(),
886            );
887
888            let mut attention_reasons = Vec::new();
889            if declared_present != operational_present {
890                attention_reasons.push("declaration_drift".to_string());
891            }
892            if !enabled && declared_present {
893                attention_reasons.push("disabled".to_string());
894            }
895            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
896                attention_reasons.push("failed".to_string());
897            }
898            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
899                attention_reasons.push("stale".to_string());
900            }
901            if matches!(
902                build_state.as_deref().unwrap_or_default(),
903                "building" | "declared-unbuilt"
904            ) {
905                attention_reasons.push("requires_rebuild".to_string());
906            }
907            let queryable = declared_present
908                && operational_present
909                && enabled
910                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
911            let requires_rebuild = matches!(
912                build_state.as_deref().unwrap_or_default(),
913                "declared-unbuilt" | "building" | "stale" | "failed"
914            );
915            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
916                + usize::from(!enabled && declared_present)
917                + usize::from(declared_present != operational_present)
918                + usize::from(!queryable);
919
920            let artifact_state = crate::physical::ArtifactState::from_build_state(
921                build_state.as_deref().unwrap_or("declared-unbuilt"),
922                enabled,
923            );
924            CatalogIndexStatus {
925                name,
926                collection,
927                kind,
928                declared: declared_present,
929                operational: operational_present,
930                enabled,
931                build_state,
932                artifact_state,
933                lifecycle_state,
934                in_sync: declared_present == operational_present,
935                queryable,
936                requires_rebuild,
937                attention_score,
938                attention_reasons,
939            }
940        })
941        .collect()
942}
943
944fn index_lifecycle_state(
945    declared: bool,
946    operational: bool,
947    enabled: bool,
948    build_state: Option<&str>,
949) -> String {
950    if !declared && operational {
951        return "orphaned-operational".to_string();
952    }
953    if declared && !enabled {
954        return "disabled".to_string();
955    }
956    if !declared {
957        return "unknown".to_string();
958    }
959    if !operational {
960        return "declared".to_string();
961    }
962
963    match build_state.unwrap_or_default() {
964        "ready" => "ready".to_string(),
965        "failed" => "failed".to_string(),
966        "stale" => "stale".to_string(),
967        "declared-unbuilt" => "declared".to_string(),
968        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
969            "building".to_string()
970        }
971        _ => "building".to_string(),
972    }
973}
974
975fn index_kind_string(kind: IndexKind) -> String {
976    match kind {
977        IndexKind::BTree => "btree",
978        IndexKind::Hash => "hash",
979        IndexKind::Bitmap => "bitmap",
980        IndexKind::Spatial => "spatial.rtree",
981        IndexKind::VectorHnsw => "vector.hnsw",
982        IndexKind::VectorInverted => "vector.inverted",
983        IndexKind::GraphAdjacency => "graph.adjacency",
984        IndexKind::FullText => "text.fulltext",
985        IndexKind::DocumentPathValue => "document.pathvalue",
986        IndexKind::HybridSearch => "search.hybrid",
987    }
988    .to_string()
989}
990
991fn analytics_job_statuses(
992    declarations: Option<&CatalogDeclarations>,
993) -> Vec<CatalogAnalyticsJobStatus> {
994    let declared = declarations
995        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
996        .unwrap_or(&[]);
997    let operational = declarations
998        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
999        .unwrap_or(&[]);
1000    let declared_projections = declarations
1001        .map(|declarations| declarations.declared_graph_projections.as_slice())
1002        .unwrap_or(&[]);
1003    let operational_projections = declarations
1004        .map(|declarations| declarations.operational_graph_projections.as_slice())
1005        .unwrap_or(&[]);
1006
1007    let mut ids = declared
1008        .iter()
1009        .map(|job| job.id.clone())
1010        .chain(operational.iter().map(|job| job.id.clone()))
1011        .collect::<Vec<_>>();
1012    ids.sort();
1013    ids.dedup();
1014
1015    ids.into_iter()
1016        .map(|id| {
1017            let declared_job = declared.iter().find(|job| job.id == id);
1018            let operational_job = operational.iter().find(|job| job.id == id);
1019            let kind = declared_job
1020                .map(|job| job.kind.clone())
1021                .or_else(|| operational_job.map(|job| job.kind.clone()))
1022                .unwrap_or_default();
1023            let projection = declared_job
1024                .and_then(|job| job.projection.clone())
1025                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1026            let state = declared_job
1027                .map(|job| job.state.clone())
1028                .or_else(|| operational_job.map(|job| job.state.clone()))
1029                .unwrap_or_default();
1030            let declared_present = declared_job.is_some();
1031            let operational_present = operational_job.is_some();
1032            let last_run_sequence = declared_job
1033                .and_then(|job| job.last_run_sequence)
1034                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1035            let projection_declared = projection.as_ref().map(|projection_name| {
1036                declared_projections
1037                    .iter()
1038                    .any(|projection| projection.name == *projection_name)
1039            });
1040            let projection_operational = projection.as_ref().map(|projection_name| {
1041                operational_projections
1042                    .iter()
1043                    .any(|projection| projection.name == *projection_name)
1044            });
1045            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1046                declared_projections
1047                    .iter()
1048                    .find(|projection| projection.name == *projection_name)
1049                    .map(|projection| projection.state.as_str())
1050                    .or_else(|| {
1051                        operational_projections
1052                            .iter()
1053                            .find(|projection| projection.name == *projection_name)
1054                            .map(|projection| projection.state.as_str())
1055                    })
1056            });
1057            let dependency_in_sync = projection.as_ref().map(|_| {
1058                matches!(projection_lifecycle, Some("materialized"))
1059                    && projection_operational == Some(true)
1060            });
1061            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1062                (true, false, _) => "declared",
1063                (true, true, _)
1064                    if matches!(
1065                        projection_lifecycle,
1066                        Some("stale" | "failed" | "materializing" | "declared")
1067                    ) =>
1068                {
1069                    "stale"
1070                }
1071                (true, true, "completed") => "completed",
1072                (true, true, "running") => "running",
1073                (true, true, "failed") => "failed",
1074                (true, true, "queued") => "queued",
1075                (true, true, "stale") => "stale",
1076                (true, true, _) => "materialized",
1077                (false, true, _) => "orphaned-operational",
1078                (false, false, _) => "unknown",
1079            }
1080            .to_string();
1081            let executable = declared_present
1082                && operational_present
1083                && !matches!(state.as_str(), "failed" | "stale")
1084                && dependency_in_sync.unwrap_or(true);
1085            let requires_rerun =
1086                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1087            let mut attention_reasons = Vec::new();
1088            if declared_present != operational_present {
1089                attention_reasons.push("declaration_drift".to_string());
1090            }
1091            if matches!(state.as_str(), "failed") {
1092                attention_reasons.push("failed".to_string());
1093            }
1094            if matches!(state.as_str(), "stale") {
1095                attention_reasons.push("stale".to_string());
1096            }
1097            if dependency_in_sync == Some(false) {
1098                attention_reasons.push("dependency_drift".to_string());
1099            }
1100            if requires_rerun {
1101                attention_reasons.push("requires_rerun".to_string());
1102            }
1103            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1104                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1105                + usize::from(declared_present != operational_present)
1106                + usize::from(!executable);
1107            CatalogAnalyticsJobStatus {
1108                id,
1109                kind,
1110                projection,
1111                state: state.clone(),
1112                lifecycle_state,
1113                declared: declared_present,
1114                operational: operational_present,
1115                in_sync: declared_present == operational_present,
1116                last_run_sequence,
1117                projection_declared,
1118                projection_operational,
1119                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1120                dependency_in_sync,
1121                executable,
1122                requires_rerun,
1123                attention_score,
1124                attention_reasons,
1125            }
1126        })
1127        .collect()
1128}
1129
1130pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1131    let declared_indexes = snapshot
1132        .declared_indexes
1133        .iter()
1134        .map(|index| index.name.clone())
1135        .collect::<BTreeSet<_>>();
1136    let operational_indexes = snapshot
1137        .operational_indexes
1138        .iter()
1139        .map(|index| index.name.clone())
1140        .collect::<BTreeSet<_>>();
1141    let declared_graph_projections = snapshot
1142        .declared_graph_projections
1143        .iter()
1144        .map(|projection| projection.name.clone())
1145        .collect::<BTreeSet<_>>();
1146    let operational_graph_projections = snapshot
1147        .operational_graph_projections
1148        .iter()
1149        .map(|projection| projection.name.clone())
1150        .collect::<BTreeSet<_>>();
1151    let declared_analytics_jobs = snapshot
1152        .declared_analytics_jobs
1153        .iter()
1154        .map(|job| job.id.clone())
1155        .collect::<BTreeSet<_>>();
1156    let operational_analytics_jobs = snapshot
1157        .operational_analytics_jobs
1158        .iter()
1159        .map(|job| job.id.clone())
1160        .collect::<BTreeSet<_>>();
1161
1162    CatalogConsistencyReport {
1163        declared_index_count: declared_indexes.len(),
1164        operational_index_count: operational_indexes.len(),
1165        declared_graph_projection_count: declared_graph_projections.len(),
1166        operational_graph_projection_count: operational_graph_projections.len(),
1167        declared_analytics_job_count: declared_analytics_jobs.len(),
1168        operational_analytics_job_count: operational_analytics_jobs.len(),
1169        missing_operational_indexes: declared_indexes
1170            .difference(&operational_indexes)
1171            .cloned()
1172            .collect(),
1173        undeclared_operational_indexes: operational_indexes
1174            .difference(&declared_indexes)
1175            .cloned()
1176            .collect(),
1177        missing_operational_graph_projections: declared_graph_projections
1178            .difference(&operational_graph_projections)
1179            .cloned()
1180            .collect(),
1181        undeclared_operational_graph_projections: operational_graph_projections
1182            .difference(&declared_graph_projections)
1183            .cloned()
1184            .collect(),
1185        missing_operational_analytics_jobs: declared_analytics_jobs
1186            .difference(&operational_analytics_jobs)
1187            .cloned()
1188            .collect(),
1189        undeclared_operational_analytics_jobs: operational_analytics_jobs
1190            .difference(&declared_analytics_jobs)
1191            .cloned()
1192            .collect(),
1193    }
1194}
1195
1196pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1197    CatalogAttentionSummary {
1198        collections_requiring_attention: snapshot
1199            .collections
1200            .iter()
1201            .filter(|collection| collection.attention_required)
1202            .count(),
1203        indexes_requiring_attention: snapshot
1204            .index_statuses
1205            .iter()
1206            .filter(|status| status.attention_score > 0)
1207            .count(),
1208        graph_projections_requiring_attention: snapshot
1209            .graph_projection_statuses
1210            .iter()
1211            .filter(|status| status.attention_score > 0)
1212            .count(),
1213        analytics_jobs_requiring_attention: snapshot
1214            .analytics_job_statuses
1215            .iter()
1216            .filter(|status| status.attention_score > 0)
1217            .count(),
1218        top_collection: snapshot
1219            .collections
1220            .iter()
1221            .filter(|collection| collection.attention_score > 0)
1222            .max_by_key(|collection| collection.attention_score)
1223            .cloned(),
1224        top_index: snapshot
1225            .index_statuses
1226            .iter()
1227            .filter(|status| status.attention_score > 0)
1228            .max_by_key(|status| status.attention_score)
1229            .cloned(),
1230        top_graph_projection: snapshot
1231            .graph_projection_statuses
1232            .iter()
1233            .filter(|status| status.attention_score > 0)
1234            .max_by_key(|status| status.attention_score)
1235            .cloned(),
1236        top_analytics_job: snapshot
1237            .analytics_job_statuses
1238            .iter()
1239            .filter(|status| status.attention_score > 0)
1240            .max_by_key(|status| status.attention_score)
1241            .cloned(),
1242    }
1243}
1244
1245pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1246    let mut collections = snapshot
1247        .collections
1248        .iter()
1249        .filter(|collection| collection.attention_required)
1250        .cloned()
1251        .collect::<Vec<_>>();
1252    collections.sort_by(|left, right| {
1253        right
1254            .attention_score
1255            .cmp(&left.attention_score)
1256            .then_with(|| left.name.cmp(&right.name))
1257    });
1258    collections
1259}
1260
1261pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1262    let mut statuses = snapshot
1263        .index_statuses
1264        .iter()
1265        .filter(|status| status.attention_score > 0)
1266        .cloned()
1267        .collect::<Vec<_>>();
1268    statuses.sort_by(|left, right| {
1269        right
1270            .attention_score
1271            .cmp(&left.attention_score)
1272            .then_with(|| left.name.cmp(&right.name))
1273    });
1274    statuses
1275}
1276
1277pub fn graph_projection_attention(
1278    snapshot: &CatalogModelSnapshot,
1279) -> Vec<CatalogGraphProjectionStatus> {
1280    let mut statuses = snapshot
1281        .graph_projection_statuses
1282        .iter()
1283        .filter(|status| status.attention_score > 0)
1284        .cloned()
1285        .collect::<Vec<_>>();
1286    statuses.sort_by(|left, right| {
1287        right
1288            .attention_score
1289            .cmp(&left.attention_score)
1290            .then_with(|| left.name.cmp(&right.name))
1291    });
1292    statuses
1293}
1294
1295pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1296    let mut statuses = snapshot
1297        .analytics_job_statuses
1298        .iter()
1299        .filter(|status| status.attention_score > 0)
1300        .cloned()
1301        .collect::<Vec<_>>();
1302    statuses.sort_by(|left, right| {
1303        right
1304            .attention_score
1305            .cmp(&left.attention_score)
1306            .then_with(|| left.id.cmp(&right.id))
1307    });
1308    statuses
1309}