Skip to main content

reddb_server/
catalog.rs

1//! Logical catalog structures for the unified multi-structure model.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9    CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18    Table,
19    Document,
20    Graph,
21    Vector,
22    Hll,
23    Sketch,
24    Filter,
25    Kv,
26    Config,
27    Vault,
28    Mixed,
29    TimeSeries,
30    Queue,
31    Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36    Strict,
37    SemiStructured,
38    Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43    Insert,
44    Update,
45    Delete,
46}
47
48impl SubscriptionOperation {
49    pub fn as_str(self) -> &'static str {
50        match self {
51            Self::Insert => "INSERT",
52            Self::Update => "UPDATE",
53            Self::Delete => "DELETE",
54        }
55    }
56
57    pub fn from_str(value: &str) -> Option<Self> {
58        match value.to_ascii_uppercase().as_str() {
59            "INSERT" => Some(Self::Insert),
60            "UPDATE" => Some(Self::Update),
61            "DELETE" => Some(Self::Delete),
62            _ => None,
63        }
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69    /// Logical name for this subscription. Empty string for legacy unnamed subscriptions.
70    pub name: String,
71    pub source: String,
72    pub target_queue: String,
73    pub ops_filter: Vec<SubscriptionOperation>,
74    pub where_filter: Option<String>,
75    pub redact_fields: Vec<String>,
76    pub enabled: bool,
77    /// When true, events are routed to the bare `target_queue` regardless of
78    /// the current tenant — a cluster-wide subscription. When false (default),
79    /// events are namespaced as `{tenant}/{target_queue}` whenever a tenant
80    /// context is active, enforcing per-tenant isolation.
81    pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct CollectionDescriptor {
86    pub name: String,
87    pub model: CollectionModel,
88    pub schema_mode: SchemaMode,
89    pub contract_present: bool,
90    pub contract_origin: Option<crate::physical::ContractOrigin>,
91    pub declared_model: Option<CollectionModel>,
92    pub observed_model: CollectionModel,
93    pub queue_mode: Option<QueueMode>,
94    pub vector_dimension: Option<usize>,
95    pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
96    pub declared_schema_mode: Option<SchemaMode>,
97    pub observed_schema_mode: SchemaMode,
98    pub entities: usize,
99    pub cross_refs: usize,
100    pub segments: usize,
101    pub indices: Vec<String>,
102    pub declared_indices: Vec<String>,
103    pub operational_indices: Vec<String>,
104    pub indexes_in_sync: bool,
105    pub missing_operational_indices: Vec<String>,
106    pub undeclared_operational_indices: Vec<String>,
107    pub queryable_index_count: usize,
108    pub indexes_requiring_rebuild_count: usize,
109    pub queryable_graph_projection_count: usize,
110    pub graph_projections_requiring_rematerialization_count: usize,
111    pub executable_analytics_job_count: usize,
112    pub analytics_jobs_requiring_rerun_count: usize,
113    pub subscriptions: Vec<SubscriptionDescriptor>,
114    pub resources_in_sync: bool,
115    pub attention_required: bool,
116    pub attention_score: usize,
117    pub attention_reasons: Vec<String>,
118}
119
120#[derive(Debug, Clone)]
121pub struct CatalogModelSnapshot {
122    pub summary: CatalogSnapshot,
123    pub collections: Vec<CollectionDescriptor>,
124    pub indices: IndexCatalogSnapshot,
125    pub declared_indexes: Vec<PhysicalIndexState>,
126    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
127    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
128    pub operational_indexes: Vec<PhysicalIndexState>,
129    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
130    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
131    pub index_statuses: Vec<CatalogIndexStatus>,
132    pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
133    pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
134    pub queryable_index_count: usize,
135    pub indexes_requiring_rebuild_count: usize,
136    pub queryable_graph_projection_count: usize,
137    pub graph_projections_requiring_rematerialization_count: usize,
138    pub executable_analytics_job_count: usize,
139    pub analytics_jobs_requiring_rerun_count: usize,
140}
141
142#[derive(Debug, Clone)]
143pub struct CatalogAttentionSummary {
144    pub collections_requiring_attention: usize,
145    pub indexes_requiring_attention: usize,
146    pub graph_projections_requiring_attention: usize,
147    pub analytics_jobs_requiring_attention: usize,
148    pub top_collection: Option<CollectionDescriptor>,
149    pub top_index: Option<CatalogIndexStatus>,
150    pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
151    pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
152}
153
154#[derive(Debug, Clone, Default)]
155pub struct CatalogDeclarations {
156    pub declared_indexes: Vec<PhysicalIndexState>,
157    pub declared_graph_projections: Vec<PhysicalGraphProjection>,
158    pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
159    pub operational_indexes: Vec<PhysicalIndexState>,
160    pub operational_graph_projections: Vec<PhysicalGraphProjection>,
161    pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
162}
163
164#[derive(Debug, Clone)]
165pub struct CatalogGraphProjectionStatus {
166    pub name: String,
167    pub source: Option<String>,
168    pub lifecycle_state: String,
169    pub declared: bool,
170    pub operational: bool,
171    pub in_sync: bool,
172    pub last_materialized_sequence: Option<u64>,
173    pub queryable: bool,
174    pub requires_rematerialization: bool,
175    pub dependent_job_count: usize,
176    pub active_dependent_job_count: usize,
177    pub stale_dependent_job_count: usize,
178    pub dependent_jobs_in_sync: bool,
179    pub rerun_required: bool,
180    pub attention_score: usize,
181    pub attention_reasons: Vec<String>,
182}
183
184#[derive(Debug, Clone)]
185pub struct CatalogIndexStatus {
186    pub name: String,
187    pub collection: Option<String>,
188    pub kind: String,
189    pub declared: bool,
190    pub operational: bool,
191    pub enabled: bool,
192    pub build_state: Option<String>,
193    pub artifact_state: crate::physical::ArtifactState,
194    pub lifecycle_state: String,
195    pub in_sync: bool,
196    pub queryable: bool,
197    pub requires_rebuild: bool,
198    pub attention_score: usize,
199    pub attention_reasons: Vec<String>,
200}
201
202#[derive(Debug, Clone)]
203pub struct CatalogAnalyticsJobStatus {
204    pub id: String,
205    pub kind: String,
206    pub projection: Option<String>,
207    pub state: String,
208    pub lifecycle_state: String,
209    pub declared: bool,
210    pub operational: bool,
211    pub in_sync: bool,
212    pub last_run_sequence: Option<u64>,
213    pub projection_declared: Option<bool>,
214    pub projection_operational: Option<bool>,
215    pub projection_lifecycle_state: Option<String>,
216    pub dependency_in_sync: Option<bool>,
217    pub executable: bool,
218    pub requires_rerun: bool,
219    pub attention_score: usize,
220    pub attention_reasons: Vec<String>,
221}
222
223#[derive(Debug, Clone, Default)]
224pub struct CatalogConsistencyReport {
225    pub declared_index_count: usize,
226    pub operational_index_count: usize,
227    pub declared_graph_projection_count: usize,
228    pub operational_graph_projection_count: usize,
229    pub declared_analytics_job_count: usize,
230    pub operational_analytics_job_count: usize,
231    pub missing_operational_indexes: Vec<String>,
232    pub undeclared_operational_indexes: Vec<String>,
233    pub missing_operational_graph_projections: Vec<String>,
234    pub undeclared_operational_graph_projections: Vec<String>,
235    pub missing_operational_analytics_jobs: Vec<String>,
236    pub undeclared_operational_analytics_jobs: Vec<String>,
237}
238
239pub fn snapshot_store(
240    name: &str,
241    store: &UnifiedStore,
242    index_catalog: Option<&IndexCatalog>,
243) -> CatalogModelSnapshot {
244    snapshot_store_with_declarations(name, store, index_catalog, None, None)
245}
246
247pub fn snapshot_store_with_declarations(
248    name: &str,
249    store: &UnifiedStore,
250    index_catalog: Option<&IndexCatalog>,
251    declarations: Option<&CatalogDeclarations>,
252    contracts: Option<&[CollectionContract]>,
253) -> CatalogModelSnapshot {
254    let index_statuses = index_statuses(declarations);
255    let graph_projection_statuses = graph_projection_statuses(declarations);
256    let analytics_job_statuses = analytics_job_statuses(declarations);
257
258    let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
259    for (collection, entity) in store.query_all(|_| true) {
260        grouped.entry(collection).or_default().push(entity);
261    }
262    let queue_modes = queue_modes_from_grouped(&grouped);
263
264    let mut stats_by_collection = BTreeMap::new();
265    let mut collections = Vec::new();
266
267    for collection_name in store.list_collections() {
268        let entities = grouped.remove(&collection_name).unwrap_or_default();
269        let inferred_model = infer_model(&entities);
270        let inferred_schema_mode = infer_schema_mode(inferred_model);
271        let contract = collection_contract(&collection_name, contracts);
272        let model = contract
273            .map(|contract| contract.declared_model)
274            .unwrap_or(inferred_model);
275        let cross_refs = entities
276            .iter()
277            .map(|entity| entity.cross_refs().len())
278            .sum();
279        let entity_count = entities.len();
280        let manager_stats = store
281            .get_collection(&collection_name)
282            .map(|manager| manager.stats());
283        let segments = manager_stats
284            .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
285            .unwrap_or(0);
286
287        stats_by_collection.insert(
288            collection_name.clone(),
289            CollectionStats {
290                entities: entity_count,
291                cross_refs,
292                segments,
293            },
294        );
295
296        let declared_indices = declared_indices(&collection_name, declarations);
297        let operational_indices = operational_indices(&collection_name, index_catalog);
298        let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
299            collection_index_consistency(&declared_indices, &operational_indices);
300        let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
301            collection_index_readiness(&collection_name, &index_statuses);
302        let (
303            queryable_graph_projection_count,
304            graph_projections_requiring_rematerialization_count,
305            graph_projections_locally_in_sync,
306        ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
307        let (
308            executable_analytics_job_count,
309            analytics_jobs_requiring_rerun_count,
310            analytics_jobs_locally_in_sync,
311        ) = collection_analytics_job_readiness(
312            &collection_name,
313            &graph_projection_statuses,
314            &analytics_job_statuses,
315        );
316        let resources_in_sync = indexes_in_sync
317            && indexes_locally_in_sync
318            && graph_projections_locally_in_sync
319            && analytics_jobs_locally_in_sync;
320        let attention_required = !resources_in_sync
321            || indexes_requiring_rebuild_count > 0
322            || graph_projections_requiring_rematerialization_count > 0
323            || analytics_jobs_requiring_rerun_count > 0;
324        let mut attention_reasons = Vec::new();
325        if !indexes_in_sync || !indexes_locally_in_sync {
326            attention_reasons.push("index_drift".to_string());
327        }
328        if indexes_requiring_rebuild_count > 0 {
329            attention_reasons.push("indexes_require_rebuild".to_string());
330        }
331        if !graph_projections_locally_in_sync {
332            attention_reasons.push("graph_projection_drift".to_string());
333        }
334        if graph_projections_requiring_rematerialization_count > 0 {
335            attention_reasons.push("graph_projections_require_rematerialization".to_string());
336        }
337        if !analytics_jobs_locally_in_sync {
338            attention_reasons.push("analytics_job_drift".to_string());
339        }
340        if analytics_jobs_requiring_rerun_count > 0 {
341            attention_reasons.push("analytics_jobs_require_rerun".to_string());
342        }
343        let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
344            + graph_projections_requiring_rematerialization_count.saturating_mul(4)
345            + analytics_jobs_requiring_rerun_count.saturating_mul(2)
346            + usize::from(!resources_in_sync);
347
348        collections.push(CollectionDescriptor {
349            name: collection_name.clone(),
350            model,
351            schema_mode: contract
352                .map(|contract| contract.schema_mode)
353                .unwrap_or(inferred_schema_mode),
354            contract_present: contract.is_some(),
355            contract_origin: contract.map(|contract| contract.origin),
356            declared_model: contract.map(|contract| contract.declared_model),
357            observed_model: inferred_model,
358            queue_mode: if model == CollectionModel::Queue {
359                Some(
360                    queue_modes
361                        .get(&collection_name)
362                        .copied()
363                        .unwrap_or_default(),
364                )
365            } else {
366                None
367            },
368            vector_dimension: contract.and_then(|contract| contract.vector_dimension),
369            vector_metric: contract.and_then(|contract| contract.vector_metric),
370            declared_schema_mode: contract.map(|contract| contract.schema_mode),
371            observed_schema_mode: inferred_schema_mode,
372            entities: entity_count,
373            cross_refs,
374            segments,
375            indices: infer_indices(&collection_name, model, index_catalog, declarations),
376            declared_indices,
377            operational_indices,
378            indexes_in_sync,
379            missing_operational_indices,
380            undeclared_operational_indices,
381            queryable_index_count,
382            indexes_requiring_rebuild_count,
383            queryable_graph_projection_count,
384            graph_projections_requiring_rematerialization_count,
385            executable_analytics_job_count,
386            analytics_jobs_requiring_rerun_count,
387            subscriptions: contract
388                .map(|contract| contract.subscriptions.clone())
389                .unwrap_or_default(),
390            resources_in_sync,
391            attention_required,
392            attention_score,
393            attention_reasons,
394        });
395    }
396
397    collections.sort_by(|left, right| left.name.cmp(&right.name));
398
399    let summary = CatalogSnapshot {
400        name: name.to_string(),
401        total_entities: stats_by_collection
402            .values()
403            .map(|stats| stats.entities)
404            .sum(),
405        total_collections: stats_by_collection.len(),
406        stats_by_collection,
407        updated_at: SystemTime::now(),
408    };
409
410    CatalogModelSnapshot {
411        summary,
412        collections,
413        indices: index_catalog
414            .map(IndexCatalog::snapshot)
415            .unwrap_or_default(),
416        declared_indexes: declarations
417            .map(|declarations| declarations.declared_indexes.clone())
418            .unwrap_or_default(),
419        declared_graph_projections: declarations
420            .map(|declarations| declarations.declared_graph_projections.clone())
421            .unwrap_or_default(),
422        declared_analytics_jobs: declarations
423            .map(|declarations| declarations.declared_analytics_jobs.clone())
424            .unwrap_or_default(),
425        operational_indexes: declarations
426            .map(|declarations| declarations.operational_indexes.clone())
427            .unwrap_or_default(),
428        operational_graph_projections: declarations
429            .map(|declarations| declarations.operational_graph_projections.clone())
430            .unwrap_or_default(),
431        operational_analytics_jobs: declarations
432            .map(|declarations| declarations.operational_analytics_jobs.clone())
433            .unwrap_or_default(),
434        queryable_index_count: index_statuses
435            .iter()
436            .filter(|status| status.queryable)
437            .count(),
438        indexes_requiring_rebuild_count: index_statuses
439            .iter()
440            .filter(|status| status.requires_rebuild)
441            .count(),
442        queryable_graph_projection_count: graph_projection_statuses
443            .iter()
444            .filter(|status| status.queryable)
445            .count(),
446        graph_projections_requiring_rematerialization_count: graph_projection_statuses
447            .iter()
448            .filter(|status| status.requires_rematerialization)
449            .count(),
450        executable_analytics_job_count: analytics_job_statuses
451            .iter()
452            .filter(|status| status.executable)
453            .count(),
454        analytics_jobs_requiring_rerun_count: analytics_job_statuses
455            .iter()
456            .filter(|status| status.requires_rerun)
457            .count(),
458        index_statuses,
459        graph_projection_statuses,
460        analytics_job_statuses,
461    }
462}
463
464fn queue_modes_from_grouped(
465    grouped: &HashMap<String, Vec<UnifiedEntity>>,
466) -> HashMap<String, QueueMode> {
467    let Some(meta) = grouped.get("red_queue_meta") else {
468        return HashMap::new();
469    };
470
471    let mut modes = HashMap::new();
472    for entity in meta {
473        let Some(row) = entity.data.as_row() else {
474            continue;
475        };
476        if row_text(row, "kind").as_deref() != Some("queue_config") {
477            continue;
478        }
479        let Some(queue) = row_text(row, "queue") else {
480            continue;
481        };
482        let mode = row_text(row, "mode")
483            .as_deref()
484            .and_then(QueueMode::parse)
485            .unwrap_or_default();
486        modes.insert(queue, mode);
487    }
488    modes
489}
490
491fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
492    match row.get_field(key) {
493        Some(Value::Text(value)) => Some(value.to_string()),
494        _ => None,
495    }
496}
497
498fn collection_contract<'a>(
499    collection_name: &str,
500    contracts: Option<&'a [CollectionContract]>,
501) -> Option<&'a CollectionContract> {
502    contracts.and_then(|contracts| {
503        contracts
504            .iter()
505            .find(|contract| contract.name == collection_name)
506    })
507}
508
509fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
510    let mut has_table = false;
511    let mut has_graph = false;
512    let mut has_vector = false;
513
514    for entity in entities {
515        match &entity.kind {
516            EntityKind::TableRow { .. } => has_table = true,
517            EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
518            EntityKind::Vector { .. } => has_vector = true,
519            EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
520        }
521    }
522
523    match (has_table, has_graph, has_vector) {
524        (true, false, false) => CollectionModel::Table,
525        (false, true, false) => CollectionModel::Graph,
526        (false, false, true) => CollectionModel::Vector,
527        _ => CollectionModel::Mixed,
528    }
529}
530
531fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
532    match model {
533        CollectionModel::Table => SchemaMode::Strict,
534        CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
535        CollectionModel::Document
536        | CollectionModel::Hll
537        | CollectionModel::Sketch
538        | CollectionModel::Filter
539        | CollectionModel::Kv
540        | CollectionModel::Config
541        | CollectionModel::Vault
542        | CollectionModel::Mixed => SchemaMode::Dynamic,
543        CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
544        CollectionModel::Queue => SchemaMode::Dynamic,
545    }
546}
547
548fn infer_indices(
549    collection_name: &str,
550    model: CollectionModel,
551    index_catalog: Option<&IndexCatalog>,
552    declarations: Option<&CatalogDeclarations>,
553) -> Vec<String> {
554    let declared = declared_indices(collection_name, declarations);
555    if !declared.is_empty() {
556        return declared;
557    }
558
559    let available = index_catalog
560        .map(IndexCatalog::snapshot)
561        .unwrap_or_default();
562    let mut selected = Vec::new();
563
564    for metric in available {
565        let relevant = matches!(
566            (model, metric.kind),
567            (_, IndexKind::BTree)
568                | (CollectionModel::Graph, IndexKind::GraphAdjacency)
569                | (CollectionModel::Vector, IndexKind::VectorHnsw)
570                | (CollectionModel::Vector, IndexKind::VectorInverted)
571                | (CollectionModel::Document, IndexKind::FullText)
572                | (CollectionModel::Document, IndexKind::DocumentPathValue)
573                | (CollectionModel::Kv, IndexKind::Hash)
574                | (CollectionModel::Config, IndexKind::Hash)
575                | (CollectionModel::Vault, IndexKind::Hash)
576                | (CollectionModel::Mixed, _)
577        );
578
579        if relevant && metric.enabled {
580            selected.push(metric.name);
581        }
582    }
583
584    selected
585}
586
587fn declared_indices(
588    collection_name: &str,
589    declarations: Option<&CatalogDeclarations>,
590) -> Vec<String> {
591    let mut selected = declarations
592        .map(|declarations| {
593            declarations
594                .declared_indexes
595                .iter()
596                .filter(|index| index.collection.as_deref() == Some(collection_name))
597                .map(|index| index.name.clone())
598                .collect::<Vec<_>>()
599        })
600        .unwrap_or_default();
601    selected.sort();
602    selected.dedup();
603    selected
604}
605
606fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
607    let mut selected = index_catalog
608        .map(IndexCatalog::snapshot)
609        .unwrap_or_default()
610        .into_iter()
611        .filter(|metric| metric.enabled)
612        .filter_map(|metric| {
613            if metric.name.starts_with(collection_name)
614                || matches!(
615                    metric.kind,
616                    IndexKind::BTree
617                        | IndexKind::GraphAdjacency
618                        | IndexKind::VectorHnsw
619                        | IndexKind::VectorInverted
620                        | IndexKind::FullText
621                        | IndexKind::DocumentPathValue
622                        | IndexKind::HybridSearch
623                )
624            {
625                Some(metric.name)
626            } else {
627                None
628            }
629        })
630        .collect::<Vec<_>>();
631    selected.sort();
632    selected.dedup();
633    selected
634}
635
636fn collection_index_consistency(
637    declared_indices: &[String],
638    operational_indices: &[String],
639) -> (bool, Vec<String>, Vec<String>) {
640    let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
641    let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
642    let missing_operational_indices = declared
643        .difference(&operational)
644        .cloned()
645        .collect::<Vec<_>>();
646    let undeclared_operational_indices = operational
647        .difference(&declared)
648        .cloned()
649        .collect::<Vec<_>>();
650    (
651        missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
652        missing_operational_indices,
653        undeclared_operational_indices,
654    )
655}
656
657fn collection_index_readiness(
658    collection_name: &str,
659    statuses: &[CatalogIndexStatus],
660) -> (usize, usize, bool) {
661    let local = statuses
662        .iter()
663        .filter(|status| status.collection.as_deref() == Some(collection_name))
664        .collect::<Vec<_>>();
665    let queryable_count = local.iter().filter(|status| status.queryable).count();
666    let requires_rebuild_count = local
667        .iter()
668        .filter(|status| status.requires_rebuild)
669        .count();
670    let locally_in_sync = local.iter().all(|status| status.in_sync);
671    (queryable_count, requires_rebuild_count, locally_in_sync)
672}
673
674fn collection_graph_projection_readiness(
675    collection_name: &str,
676    statuses: &[CatalogGraphProjectionStatus],
677) -> (usize, usize, bool) {
678    let local = statuses
679        .iter()
680        .filter(|status| status.source.as_deref() == Some(collection_name))
681        .collect::<Vec<_>>();
682    let queryable_count = local.iter().filter(|status| status.queryable).count();
683    let requires_rematerialization_count = local
684        .iter()
685        .filter(|status| status.requires_rematerialization)
686        .count();
687    let locally_in_sync = local
688        .iter()
689        .all(|status| status.in_sync && status.dependent_jobs_in_sync);
690    (
691        queryable_count,
692        requires_rematerialization_count,
693        locally_in_sync,
694    )
695}
696
697fn collection_analytics_job_readiness(
698    collection_name: &str,
699    projection_statuses: &[CatalogGraphProjectionStatus],
700    job_statuses: &[CatalogAnalyticsJobStatus],
701) -> (usize, usize, bool) {
702    let local = job_statuses
703        .iter()
704        .filter(|status| {
705            let Some(projection_name) = status.projection.as_deref() else {
706                return false;
707            };
708            projection_statuses
709                .iter()
710                .find(|projection| projection.name == projection_name)
711                .and_then(|projection| projection.source.as_deref())
712                == Some(collection_name)
713        })
714        .collect::<Vec<_>>();
715    let executable_count = local.iter().filter(|status| status.executable).count();
716    let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
717    let locally_in_sync = local
718        .iter()
719        .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
720    (executable_count, requires_rerun_count, locally_in_sync)
721}
722
723fn graph_projection_statuses(
724    declarations: Option<&CatalogDeclarations>,
725) -> Vec<CatalogGraphProjectionStatus> {
726    let declared = declarations
727        .map(|declarations| declarations.declared_graph_projections.as_slice())
728        .unwrap_or(&[]);
729    let operational = declarations
730        .map(|declarations| declarations.operational_graph_projections.as_slice())
731        .unwrap_or(&[]);
732    let declared_jobs = declarations
733        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
734        .unwrap_or(&[]);
735    let operational_jobs = declarations
736        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
737        .unwrap_or(&[]);
738
739    let mut names = declared
740        .iter()
741        .map(|projection| projection.name.clone())
742        .chain(operational.iter().map(|projection| projection.name.clone()))
743        .collect::<Vec<_>>();
744    names.sort();
745    names.dedup();
746
747    names
748        .into_iter()
749        .map(|name| {
750            let declared_projection = declared.iter().find(|projection| projection.name == name);
751            let operational_projection = operational
752                .iter()
753                .find(|projection| projection.name == name);
754            let declared_present = declared_projection.is_some();
755            let operational_present = operational_projection.is_some();
756            let mut dependent_job_ids = BTreeSet::new();
757            let mut dependent_job_count = 0usize;
758            let mut active_dependent_job_count = 0usize;
759            let mut stale_dependent_job_count = 0usize;
760            for job in declared_jobs
761                .iter()
762                .chain(operational_jobs.iter())
763                .filter(|job| job.projection.as_deref() == Some(name.as_str()))
764            {
765                if !dependent_job_ids.insert(job.id.clone()) {
766                    continue;
767                }
768                dependent_job_count += 1;
769                if matches!(job.state.as_str(), "queued" | "running" | "completed") {
770                    active_dependent_job_count += 1;
771                }
772                if job.state == "stale" {
773                    stale_dependent_job_count += 1;
774                }
775            }
776            let lifecycle_state = match (
777                declared_present,
778                operational_present,
779                declared_projection
780                    .map(|projection| projection.state.as_str())
781                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
782                    .unwrap_or_default(),
783            ) {
784                (true, _, "failed") => "failed",
785                (true, true, "stale") => "stale",
786                (true, _, "materializing") => "materializing",
787                (true, true, "materialized") => "materialized",
788                (true, true, _) => "materialized",
789                (false, true, _) => "orphaned-operational",
790                (true, false, _) => "declared",
791                (false, false, _) => "unknown",
792            }
793            .to_string();
794            let queryable = declared_present
795                && operational_present
796                && matches!(
797                    declared_projection
798                        .map(|projection| projection.state.as_str())
799                        .or_else(
800                            || operational_projection.map(|projection| projection.state.as_str())
801                        )
802                        .unwrap_or_default(),
803                    "materialized"
804                );
805            let requires_rematerialization = matches!(
806                declared_projection
807                    .map(|projection| projection.state.as_str())
808                    .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
809                    .unwrap_or_default(),
810                "declared" | "materializing" | "failed" | "stale"
811            );
812            let mut attention_reasons = Vec::new();
813            if !declared_present || !operational_present {
814                attention_reasons.push("declaration_drift".to_string());
815            }
816            if requires_rematerialization {
817                attention_reasons.push("requires_rematerialization".to_string());
818            }
819            if stale_dependent_job_count > 0 {
820                attention_reasons.push("dependent_jobs_stale".to_string());
821            }
822            let attention_score = stale_dependent_job_count.saturating_mul(2)
823                + usize::from(requires_rematerialization).saturating_mul(4)
824                + usize::from(!declared_present || !operational_present)
825                + usize::from(!queryable);
826            CatalogGraphProjectionStatus {
827                name,
828                source: declared_projection
829                    .map(|projection| projection.source.clone())
830                    .or_else(|| operational_projection.map(|projection| projection.source.clone())),
831                lifecycle_state,
832                declared: declared_present,
833                operational: operational_present,
834                in_sync: declared_present == operational_present,
835                last_materialized_sequence: declared_projection
836                    .and_then(|projection| projection.last_materialized_sequence)
837                    .or_else(|| {
838                        operational_projection
839                            .and_then(|projection| projection.last_materialized_sequence)
840                    }),
841                queryable,
842                requires_rematerialization,
843                dependent_job_count,
844                active_dependent_job_count,
845                stale_dependent_job_count,
846                dependent_jobs_in_sync: stale_dependent_job_count == 0,
847                rerun_required: stale_dependent_job_count > 0,
848                attention_score,
849                attention_reasons,
850            }
851        })
852        .collect()
853}
854
855fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
856    let declared = declarations
857        .map(|declarations| declarations.declared_indexes.as_slice())
858        .unwrap_or(&[]);
859    let operational = declarations
860        .map(|declarations| declarations.operational_indexes.as_slice())
861        .unwrap_or(&[]);
862
863    let mut names = declared
864        .iter()
865        .map(|index| index.name.clone())
866        .chain(operational.iter().map(|index| index.name.clone()))
867        .collect::<Vec<_>>();
868    names.sort();
869    names.dedup();
870
871    names
872        .into_iter()
873        .map(|name| {
874            let declared_index = declared.iter().find(|index| index.name == name);
875            let operational_index = operational.iter().find(|index| index.name == name);
876            let collection = declared_index
877                .and_then(|index| index.collection.clone())
878                .or_else(|| operational_index.and_then(|index| index.collection.clone()));
879            let kind = declared_index
880                .map(|index| index_kind_string(index.kind))
881                .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
882                .unwrap_or_default();
883            let enabled = declared_index
884                .map(|index| index.enabled)
885                .or_else(|| operational_index.map(|index| index.enabled))
886                .unwrap_or(false);
887            let build_state = operational_index
888                .map(|index| index.build_state.clone())
889                .or_else(|| declared_index.map(|index| index.build_state.clone()));
890            let declared_present = declared_index.is_some();
891            let operational_present = operational_index.is_some();
892            let lifecycle_state = index_lifecycle_state(
893                declared_present,
894                operational_present,
895                enabled,
896                build_state.as_deref(),
897            );
898
899            let mut attention_reasons = Vec::new();
900            if declared_present != operational_present {
901                attention_reasons.push("declaration_drift".to_string());
902            }
903            if !enabled && declared_present {
904                attention_reasons.push("disabled".to_string());
905            }
906            if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
907                attention_reasons.push("failed".to_string());
908            }
909            if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
910                attention_reasons.push("stale".to_string());
911            }
912            if matches!(
913                build_state.as_deref().unwrap_or_default(),
914                "building" | "declared-unbuilt"
915            ) {
916                attention_reasons.push("requires_rebuild".to_string());
917            }
918            let queryable = declared_present
919                && operational_present
920                && enabled
921                && matches!(build_state.as_deref().unwrap_or_default(), "ready");
922            let requires_rebuild = matches!(
923                build_state.as_deref().unwrap_or_default(),
924                "declared-unbuilt" | "building" | "stale" | "failed"
925            );
926            let attention_score = usize::from(requires_rebuild).saturating_mul(3)
927                + usize::from(!enabled && declared_present)
928                + usize::from(declared_present != operational_present)
929                + usize::from(!queryable);
930
931            let artifact_state = crate::physical::ArtifactState::from_build_state(
932                build_state.as_deref().unwrap_or("declared-unbuilt"),
933                enabled,
934            );
935            CatalogIndexStatus {
936                name,
937                collection,
938                kind,
939                declared: declared_present,
940                operational: operational_present,
941                enabled,
942                build_state,
943                artifact_state,
944                lifecycle_state,
945                in_sync: declared_present == operational_present,
946                queryable,
947                requires_rebuild,
948                attention_score,
949                attention_reasons,
950            }
951        })
952        .collect()
953}
954
955fn index_lifecycle_state(
956    declared: bool,
957    operational: bool,
958    enabled: bool,
959    build_state: Option<&str>,
960) -> String {
961    if !declared && operational {
962        return "orphaned-operational".to_string();
963    }
964    if declared && !enabled {
965        return "disabled".to_string();
966    }
967    if !declared {
968        return "unknown".to_string();
969    }
970    if !operational {
971        return "declared".to_string();
972    }
973
974    match build_state.unwrap_or_default() {
975        "ready" => "ready".to_string(),
976        "failed" => "failed".to_string(),
977        "stale" => "stale".to_string(),
978        "declared-unbuilt" => "declared".to_string(),
979        "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
980            "building".to_string()
981        }
982        _ => "building".to_string(),
983    }
984}
985
986fn index_kind_string(kind: IndexKind) -> String {
987    match kind {
988        IndexKind::BTree => "btree",
989        IndexKind::Hash => "hash",
990        IndexKind::Bitmap => "bitmap",
991        IndexKind::Spatial => "spatial.rtree",
992        IndexKind::VectorHnsw => "vector.hnsw",
993        IndexKind::VectorInverted => "vector.inverted",
994        IndexKind::GraphAdjacency => "graph.adjacency",
995        IndexKind::FullText => "text.fulltext",
996        IndexKind::DocumentPathValue => "document.pathvalue",
997        IndexKind::HybridSearch => "search.hybrid",
998    }
999    .to_string()
1000}
1001
1002fn analytics_job_statuses(
1003    declarations: Option<&CatalogDeclarations>,
1004) -> Vec<CatalogAnalyticsJobStatus> {
1005    let declared = declarations
1006        .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1007        .unwrap_or(&[]);
1008    let operational = declarations
1009        .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1010        .unwrap_or(&[]);
1011    let declared_projections = declarations
1012        .map(|declarations| declarations.declared_graph_projections.as_slice())
1013        .unwrap_or(&[]);
1014    let operational_projections = declarations
1015        .map(|declarations| declarations.operational_graph_projections.as_slice())
1016        .unwrap_or(&[]);
1017
1018    let mut ids = declared
1019        .iter()
1020        .map(|job| job.id.clone())
1021        .chain(operational.iter().map(|job| job.id.clone()))
1022        .collect::<Vec<_>>();
1023    ids.sort();
1024    ids.dedup();
1025
1026    ids.into_iter()
1027        .map(|id| {
1028            let declared_job = declared.iter().find(|job| job.id == id);
1029            let operational_job = operational.iter().find(|job| job.id == id);
1030            let kind = declared_job
1031                .map(|job| job.kind.clone())
1032                .or_else(|| operational_job.map(|job| job.kind.clone()))
1033                .unwrap_or_default();
1034            let projection = declared_job
1035                .and_then(|job| job.projection.clone())
1036                .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1037            let state = declared_job
1038                .map(|job| job.state.clone())
1039                .or_else(|| operational_job.map(|job| job.state.clone()))
1040                .unwrap_or_default();
1041            let declared_present = declared_job.is_some();
1042            let operational_present = operational_job.is_some();
1043            let last_run_sequence = declared_job
1044                .and_then(|job| job.last_run_sequence)
1045                .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1046            let projection_declared = projection.as_ref().map(|projection_name| {
1047                declared_projections
1048                    .iter()
1049                    .any(|projection| projection.name == *projection_name)
1050            });
1051            let projection_operational = projection.as_ref().map(|projection_name| {
1052                operational_projections
1053                    .iter()
1054                    .any(|projection| projection.name == *projection_name)
1055            });
1056            let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1057                declared_projections
1058                    .iter()
1059                    .find(|projection| projection.name == *projection_name)
1060                    .map(|projection| projection.state.as_str())
1061                    .or_else(|| {
1062                        operational_projections
1063                            .iter()
1064                            .find(|projection| projection.name == *projection_name)
1065                            .map(|projection| projection.state.as_str())
1066                    })
1067            });
1068            let dependency_in_sync = projection.as_ref().map(|_| {
1069                matches!(projection_lifecycle, Some("materialized"))
1070                    && projection_operational == Some(true)
1071            });
1072            let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1073                (true, false, _) => "declared",
1074                (true, true, _)
1075                    if matches!(
1076                        projection_lifecycle,
1077                        Some("stale" | "failed" | "materializing" | "declared")
1078                    ) =>
1079                {
1080                    "stale"
1081                }
1082                (true, true, "completed") => "completed",
1083                (true, true, "running") => "running",
1084                (true, true, "failed") => "failed",
1085                (true, true, "queued") => "queued",
1086                (true, true, "stale") => "stale",
1087                (true, true, _) => "materialized",
1088                (false, true, _) => "orphaned-operational",
1089                (false, false, _) => "unknown",
1090            }
1091            .to_string();
1092            let executable = declared_present
1093                && operational_present
1094                && !matches!(state.as_str(), "failed" | "stale")
1095                && dependency_in_sync.unwrap_or(true);
1096            let requires_rerun =
1097                matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1098            let mut attention_reasons = Vec::new();
1099            if declared_present != operational_present {
1100                attention_reasons.push("declaration_drift".to_string());
1101            }
1102            if matches!(state.as_str(), "failed") {
1103                attention_reasons.push("failed".to_string());
1104            }
1105            if matches!(state.as_str(), "stale") {
1106                attention_reasons.push("stale".to_string());
1107            }
1108            if dependency_in_sync == Some(false) {
1109                attention_reasons.push("dependency_drift".to_string());
1110            }
1111            if requires_rerun {
1112                attention_reasons.push("requires_rerun".to_string());
1113            }
1114            let attention_score = usize::from(requires_rerun).saturating_mul(3)
1115                + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1116                + usize::from(declared_present != operational_present)
1117                + usize::from(!executable);
1118            CatalogAnalyticsJobStatus {
1119                id,
1120                kind,
1121                projection,
1122                state: state.clone(),
1123                lifecycle_state,
1124                declared: declared_present,
1125                operational: operational_present,
1126                in_sync: declared_present == operational_present,
1127                last_run_sequence,
1128                projection_declared,
1129                projection_operational,
1130                projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1131                dependency_in_sync,
1132                executable,
1133                requires_rerun,
1134                attention_score,
1135                attention_reasons,
1136            }
1137        })
1138        .collect()
1139}
1140
1141pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1142    let declared_indexes = snapshot
1143        .declared_indexes
1144        .iter()
1145        .map(|index| index.name.clone())
1146        .collect::<BTreeSet<_>>();
1147    let operational_indexes = snapshot
1148        .operational_indexes
1149        .iter()
1150        .map(|index| index.name.clone())
1151        .collect::<BTreeSet<_>>();
1152    let declared_graph_projections = snapshot
1153        .declared_graph_projections
1154        .iter()
1155        .map(|projection| projection.name.clone())
1156        .collect::<BTreeSet<_>>();
1157    let operational_graph_projections = snapshot
1158        .operational_graph_projections
1159        .iter()
1160        .map(|projection| projection.name.clone())
1161        .collect::<BTreeSet<_>>();
1162    let declared_analytics_jobs = snapshot
1163        .declared_analytics_jobs
1164        .iter()
1165        .map(|job| job.id.clone())
1166        .collect::<BTreeSet<_>>();
1167    let operational_analytics_jobs = snapshot
1168        .operational_analytics_jobs
1169        .iter()
1170        .map(|job| job.id.clone())
1171        .collect::<BTreeSet<_>>();
1172
1173    CatalogConsistencyReport {
1174        declared_index_count: declared_indexes.len(),
1175        operational_index_count: operational_indexes.len(),
1176        declared_graph_projection_count: declared_graph_projections.len(),
1177        operational_graph_projection_count: operational_graph_projections.len(),
1178        declared_analytics_job_count: declared_analytics_jobs.len(),
1179        operational_analytics_job_count: operational_analytics_jobs.len(),
1180        missing_operational_indexes: declared_indexes
1181            .difference(&operational_indexes)
1182            .cloned()
1183            .collect(),
1184        undeclared_operational_indexes: operational_indexes
1185            .difference(&declared_indexes)
1186            .cloned()
1187            .collect(),
1188        missing_operational_graph_projections: declared_graph_projections
1189            .difference(&operational_graph_projections)
1190            .cloned()
1191            .collect(),
1192        undeclared_operational_graph_projections: operational_graph_projections
1193            .difference(&declared_graph_projections)
1194            .cloned()
1195            .collect(),
1196        missing_operational_analytics_jobs: declared_analytics_jobs
1197            .difference(&operational_analytics_jobs)
1198            .cloned()
1199            .collect(),
1200        undeclared_operational_analytics_jobs: operational_analytics_jobs
1201            .difference(&declared_analytics_jobs)
1202            .cloned()
1203            .collect(),
1204    }
1205}
1206
1207pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1208    CatalogAttentionSummary {
1209        collections_requiring_attention: snapshot
1210            .collections
1211            .iter()
1212            .filter(|collection| collection.attention_required)
1213            .count(),
1214        indexes_requiring_attention: snapshot
1215            .index_statuses
1216            .iter()
1217            .filter(|status| status.attention_score > 0)
1218            .count(),
1219        graph_projections_requiring_attention: snapshot
1220            .graph_projection_statuses
1221            .iter()
1222            .filter(|status| status.attention_score > 0)
1223            .count(),
1224        analytics_jobs_requiring_attention: snapshot
1225            .analytics_job_statuses
1226            .iter()
1227            .filter(|status| status.attention_score > 0)
1228            .count(),
1229        top_collection: snapshot
1230            .collections
1231            .iter()
1232            .filter(|collection| collection.attention_score > 0)
1233            .max_by_key(|collection| collection.attention_score)
1234            .cloned(),
1235        top_index: snapshot
1236            .index_statuses
1237            .iter()
1238            .filter(|status| status.attention_score > 0)
1239            .max_by_key(|status| status.attention_score)
1240            .cloned(),
1241        top_graph_projection: snapshot
1242            .graph_projection_statuses
1243            .iter()
1244            .filter(|status| status.attention_score > 0)
1245            .max_by_key(|status| status.attention_score)
1246            .cloned(),
1247        top_analytics_job: snapshot
1248            .analytics_job_statuses
1249            .iter()
1250            .filter(|status| status.attention_score > 0)
1251            .max_by_key(|status| status.attention_score)
1252            .cloned(),
1253    }
1254}
1255
1256pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1257    let mut collections = snapshot
1258        .collections
1259        .iter()
1260        .filter(|collection| collection.attention_required)
1261        .cloned()
1262        .collect::<Vec<_>>();
1263    collections.sort_by(|left, right| {
1264        right
1265            .attention_score
1266            .cmp(&left.attention_score)
1267            .then_with(|| left.name.cmp(&right.name))
1268    });
1269    collections
1270}
1271
1272pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1273    let mut statuses = snapshot
1274        .index_statuses
1275        .iter()
1276        .filter(|status| status.attention_score > 0)
1277        .cloned()
1278        .collect::<Vec<_>>();
1279    statuses.sort_by(|left, right| {
1280        right
1281            .attention_score
1282            .cmp(&left.attention_score)
1283            .then_with(|| left.name.cmp(&right.name))
1284    });
1285    statuses
1286}
1287
1288pub fn graph_projection_attention(
1289    snapshot: &CatalogModelSnapshot,
1290) -> Vec<CatalogGraphProjectionStatus> {
1291    let mut statuses = snapshot
1292        .graph_projection_statuses
1293        .iter()
1294        .filter(|status| status.attention_score > 0)
1295        .cloned()
1296        .collect::<Vec<_>>();
1297    statuses.sort_by(|left, right| {
1298        right
1299            .attention_score
1300            .cmp(&left.attention_score)
1301            .then_with(|| left.name.cmp(&right.name))
1302    });
1303    statuses
1304}
1305
1306pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1307    let mut statuses = snapshot
1308        .analytics_job_statuses
1309        .iter()
1310        .filter(|status| status.attention_score > 0)
1311        .cloned()
1312        .collect::<Vec<_>>();
1313    statuses.sort_by(|left, right| {
1314        right
1315            .attention_score
1316            .cmp(&left.attention_score)
1317            .then_with(|| left.id.cmp(&right.id))
1318    });
1319    statuses
1320}