1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Kv,
23 Config,
24 Vault,
25 Mixed,
26 TimeSeries,
27 Queue,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum SchemaMode {
32 Strict,
33 SemiStructured,
34 Dynamic,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum SubscriptionOperation {
39 Insert,
40 Update,
41 Delete,
42}
43
44impl SubscriptionOperation {
45 pub fn as_str(self) -> &'static str {
46 match self {
47 Self::Insert => "INSERT",
48 Self::Update => "UPDATE",
49 Self::Delete => "DELETE",
50 }
51 }
52
53 pub fn from_str(value: &str) -> Option<Self> {
54 match value.to_ascii_uppercase().as_str() {
55 "INSERT" => Some(Self::Insert),
56 "UPDATE" => Some(Self::Update),
57 "DELETE" => Some(Self::Delete),
58 _ => None,
59 }
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct SubscriptionDescriptor {
65 pub name: String,
67 pub source: String,
68 pub target_queue: String,
69 pub ops_filter: Vec<SubscriptionOperation>,
70 pub where_filter: Option<String>,
71 pub redact_fields: Vec<String>,
72 pub enabled: bool,
73 pub all_tenants: bool,
78}
79
80#[derive(Debug, Clone)]
81pub struct CollectionDescriptor {
82 pub name: String,
83 pub model: CollectionModel,
84 pub schema_mode: SchemaMode,
85 pub contract_present: bool,
86 pub contract_origin: Option<crate::physical::ContractOrigin>,
87 pub declared_model: Option<CollectionModel>,
88 pub observed_model: CollectionModel,
89 pub queue_mode: Option<QueueMode>,
90 pub declared_schema_mode: Option<SchemaMode>,
91 pub observed_schema_mode: SchemaMode,
92 pub entities: usize,
93 pub cross_refs: usize,
94 pub segments: usize,
95 pub indices: Vec<String>,
96 pub declared_indices: Vec<String>,
97 pub operational_indices: Vec<String>,
98 pub indexes_in_sync: bool,
99 pub missing_operational_indices: Vec<String>,
100 pub undeclared_operational_indices: Vec<String>,
101 pub queryable_index_count: usize,
102 pub indexes_requiring_rebuild_count: usize,
103 pub queryable_graph_projection_count: usize,
104 pub graph_projections_requiring_rematerialization_count: usize,
105 pub executable_analytics_job_count: usize,
106 pub analytics_jobs_requiring_rerun_count: usize,
107 pub subscriptions: Vec<SubscriptionDescriptor>,
108 pub resources_in_sync: bool,
109 pub attention_required: bool,
110 pub attention_score: usize,
111 pub attention_reasons: Vec<String>,
112}
113
114#[derive(Debug, Clone)]
115pub struct CatalogModelSnapshot {
116 pub summary: CatalogSnapshot,
117 pub collections: Vec<CollectionDescriptor>,
118 pub indices: IndexCatalogSnapshot,
119 pub declared_indexes: Vec<PhysicalIndexState>,
120 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
121 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
122 pub operational_indexes: Vec<PhysicalIndexState>,
123 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
124 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
125 pub index_statuses: Vec<CatalogIndexStatus>,
126 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
127 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
128 pub queryable_index_count: usize,
129 pub indexes_requiring_rebuild_count: usize,
130 pub queryable_graph_projection_count: usize,
131 pub graph_projections_requiring_rematerialization_count: usize,
132 pub executable_analytics_job_count: usize,
133 pub analytics_jobs_requiring_rerun_count: usize,
134}
135
136#[derive(Debug, Clone)]
137pub struct CatalogAttentionSummary {
138 pub collections_requiring_attention: usize,
139 pub indexes_requiring_attention: usize,
140 pub graph_projections_requiring_attention: usize,
141 pub analytics_jobs_requiring_attention: usize,
142 pub top_collection: Option<CollectionDescriptor>,
143 pub top_index: Option<CatalogIndexStatus>,
144 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
145 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
146}
147
148#[derive(Debug, Clone, Default)]
149pub struct CatalogDeclarations {
150 pub declared_indexes: Vec<PhysicalIndexState>,
151 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
152 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
153 pub operational_indexes: Vec<PhysicalIndexState>,
154 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
155 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
156}
157
158#[derive(Debug, Clone)]
159pub struct CatalogGraphProjectionStatus {
160 pub name: String,
161 pub source: Option<String>,
162 pub lifecycle_state: String,
163 pub declared: bool,
164 pub operational: bool,
165 pub in_sync: bool,
166 pub last_materialized_sequence: Option<u64>,
167 pub queryable: bool,
168 pub requires_rematerialization: bool,
169 pub dependent_job_count: usize,
170 pub active_dependent_job_count: usize,
171 pub stale_dependent_job_count: usize,
172 pub dependent_jobs_in_sync: bool,
173 pub rerun_required: bool,
174 pub attention_score: usize,
175 pub attention_reasons: Vec<String>,
176}
177
178#[derive(Debug, Clone)]
179pub struct CatalogIndexStatus {
180 pub name: String,
181 pub collection: Option<String>,
182 pub kind: String,
183 pub declared: bool,
184 pub operational: bool,
185 pub enabled: bool,
186 pub build_state: Option<String>,
187 pub artifact_state: crate::physical::ArtifactState,
188 pub lifecycle_state: String,
189 pub in_sync: bool,
190 pub queryable: bool,
191 pub requires_rebuild: bool,
192 pub attention_score: usize,
193 pub attention_reasons: Vec<String>,
194}
195
196#[derive(Debug, Clone)]
197pub struct CatalogAnalyticsJobStatus {
198 pub id: String,
199 pub kind: String,
200 pub projection: Option<String>,
201 pub state: String,
202 pub lifecycle_state: String,
203 pub declared: bool,
204 pub operational: bool,
205 pub in_sync: bool,
206 pub last_run_sequence: Option<u64>,
207 pub projection_declared: Option<bool>,
208 pub projection_operational: Option<bool>,
209 pub projection_lifecycle_state: Option<String>,
210 pub dependency_in_sync: Option<bool>,
211 pub executable: bool,
212 pub requires_rerun: bool,
213 pub attention_score: usize,
214 pub attention_reasons: Vec<String>,
215}
216
217#[derive(Debug, Clone, Default)]
218pub struct CatalogConsistencyReport {
219 pub declared_index_count: usize,
220 pub operational_index_count: usize,
221 pub declared_graph_projection_count: usize,
222 pub operational_graph_projection_count: usize,
223 pub declared_analytics_job_count: usize,
224 pub operational_analytics_job_count: usize,
225 pub missing_operational_indexes: Vec<String>,
226 pub undeclared_operational_indexes: Vec<String>,
227 pub missing_operational_graph_projections: Vec<String>,
228 pub undeclared_operational_graph_projections: Vec<String>,
229 pub missing_operational_analytics_jobs: Vec<String>,
230 pub undeclared_operational_analytics_jobs: Vec<String>,
231}
232
233pub fn snapshot_store(
234 name: &str,
235 store: &UnifiedStore,
236 index_catalog: Option<&IndexCatalog>,
237) -> CatalogModelSnapshot {
238 snapshot_store_with_declarations(name, store, index_catalog, None, None)
239}
240
241pub fn snapshot_store_with_declarations(
242 name: &str,
243 store: &UnifiedStore,
244 index_catalog: Option<&IndexCatalog>,
245 declarations: Option<&CatalogDeclarations>,
246 contracts: Option<&[CollectionContract]>,
247) -> CatalogModelSnapshot {
248 let index_statuses = index_statuses(declarations);
249 let graph_projection_statuses = graph_projection_statuses(declarations);
250 let analytics_job_statuses = analytics_job_statuses(declarations);
251
252 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
253 for (collection, entity) in store.query_all(|_| true) {
254 grouped.entry(collection).or_default().push(entity);
255 }
256 let queue_modes = queue_modes_from_grouped(&grouped);
257
258 let mut stats_by_collection = BTreeMap::new();
259 let mut collections = Vec::new();
260
261 for collection_name in store.list_collections() {
262 let entities = grouped.remove(&collection_name).unwrap_or_default();
263 let inferred_model = infer_model(&entities);
264 let inferred_schema_mode = infer_schema_mode(inferred_model);
265 let contract = collection_contract(&collection_name, contracts);
266 let model = contract
267 .map(|contract| contract.declared_model)
268 .unwrap_or(inferred_model);
269 let cross_refs = entities
270 .iter()
271 .map(|entity| entity.cross_refs().len())
272 .sum();
273 let entity_count = entities.len();
274 let manager_stats = store
275 .get_collection(&collection_name)
276 .map(|manager| manager.stats());
277 let segments = manager_stats
278 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
279 .unwrap_or(0);
280
281 stats_by_collection.insert(
282 collection_name.clone(),
283 CollectionStats {
284 entities: entity_count,
285 cross_refs,
286 segments,
287 },
288 );
289
290 let declared_indices = declared_indices(&collection_name, declarations);
291 let operational_indices = operational_indices(&collection_name, index_catalog);
292 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
293 collection_index_consistency(&declared_indices, &operational_indices);
294 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
295 collection_index_readiness(&collection_name, &index_statuses);
296 let (
297 queryable_graph_projection_count,
298 graph_projections_requiring_rematerialization_count,
299 graph_projections_locally_in_sync,
300 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
301 let (
302 executable_analytics_job_count,
303 analytics_jobs_requiring_rerun_count,
304 analytics_jobs_locally_in_sync,
305 ) = collection_analytics_job_readiness(
306 &collection_name,
307 &graph_projection_statuses,
308 &analytics_job_statuses,
309 );
310 let resources_in_sync = indexes_in_sync
311 && indexes_locally_in_sync
312 && graph_projections_locally_in_sync
313 && analytics_jobs_locally_in_sync;
314 let attention_required = !resources_in_sync
315 || indexes_requiring_rebuild_count > 0
316 || graph_projections_requiring_rematerialization_count > 0
317 || analytics_jobs_requiring_rerun_count > 0;
318 let mut attention_reasons = Vec::new();
319 if !indexes_in_sync || !indexes_locally_in_sync {
320 attention_reasons.push("index_drift".to_string());
321 }
322 if indexes_requiring_rebuild_count > 0 {
323 attention_reasons.push("indexes_require_rebuild".to_string());
324 }
325 if !graph_projections_locally_in_sync {
326 attention_reasons.push("graph_projection_drift".to_string());
327 }
328 if graph_projections_requiring_rematerialization_count > 0 {
329 attention_reasons.push("graph_projections_require_rematerialization".to_string());
330 }
331 if !analytics_jobs_locally_in_sync {
332 attention_reasons.push("analytics_job_drift".to_string());
333 }
334 if analytics_jobs_requiring_rerun_count > 0 {
335 attention_reasons.push("analytics_jobs_require_rerun".to_string());
336 }
337 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
338 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
339 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
340 + usize::from(!resources_in_sync);
341
342 collections.push(CollectionDescriptor {
343 name: collection_name.clone(),
344 model,
345 schema_mode: contract
346 .map(|contract| contract.schema_mode)
347 .unwrap_or(inferred_schema_mode),
348 contract_present: contract.is_some(),
349 contract_origin: contract.map(|contract| contract.origin),
350 declared_model: contract.map(|contract| contract.declared_model),
351 observed_model: inferred_model,
352 queue_mode: if model == CollectionModel::Queue {
353 Some(
354 queue_modes
355 .get(&collection_name)
356 .copied()
357 .unwrap_or_default(),
358 )
359 } else {
360 None
361 },
362 declared_schema_mode: contract.map(|contract| contract.schema_mode),
363 observed_schema_mode: inferred_schema_mode,
364 entities: entity_count,
365 cross_refs,
366 segments,
367 indices: infer_indices(&collection_name, model, index_catalog, declarations),
368 declared_indices,
369 operational_indices,
370 indexes_in_sync,
371 missing_operational_indices,
372 undeclared_operational_indices,
373 queryable_index_count,
374 indexes_requiring_rebuild_count,
375 queryable_graph_projection_count,
376 graph_projections_requiring_rematerialization_count,
377 executable_analytics_job_count,
378 analytics_jobs_requiring_rerun_count,
379 subscriptions: contract
380 .map(|contract| contract.subscriptions.clone())
381 .unwrap_or_default(),
382 resources_in_sync,
383 attention_required,
384 attention_score,
385 attention_reasons,
386 });
387 }
388
389 collections.sort_by(|left, right| left.name.cmp(&right.name));
390
391 let summary = CatalogSnapshot {
392 name: name.to_string(),
393 total_entities: stats_by_collection
394 .values()
395 .map(|stats| stats.entities)
396 .sum(),
397 total_collections: stats_by_collection.len(),
398 stats_by_collection,
399 updated_at: SystemTime::now(),
400 };
401
402 CatalogModelSnapshot {
403 summary,
404 collections,
405 indices: index_catalog
406 .map(IndexCatalog::snapshot)
407 .unwrap_or_default(),
408 declared_indexes: declarations
409 .map(|declarations| declarations.declared_indexes.clone())
410 .unwrap_or_default(),
411 declared_graph_projections: declarations
412 .map(|declarations| declarations.declared_graph_projections.clone())
413 .unwrap_or_default(),
414 declared_analytics_jobs: declarations
415 .map(|declarations| declarations.declared_analytics_jobs.clone())
416 .unwrap_or_default(),
417 operational_indexes: declarations
418 .map(|declarations| declarations.operational_indexes.clone())
419 .unwrap_or_default(),
420 operational_graph_projections: declarations
421 .map(|declarations| declarations.operational_graph_projections.clone())
422 .unwrap_or_default(),
423 operational_analytics_jobs: declarations
424 .map(|declarations| declarations.operational_analytics_jobs.clone())
425 .unwrap_or_default(),
426 queryable_index_count: index_statuses
427 .iter()
428 .filter(|status| status.queryable)
429 .count(),
430 indexes_requiring_rebuild_count: index_statuses
431 .iter()
432 .filter(|status| status.requires_rebuild)
433 .count(),
434 queryable_graph_projection_count: graph_projection_statuses
435 .iter()
436 .filter(|status| status.queryable)
437 .count(),
438 graph_projections_requiring_rematerialization_count: graph_projection_statuses
439 .iter()
440 .filter(|status| status.requires_rematerialization)
441 .count(),
442 executable_analytics_job_count: analytics_job_statuses
443 .iter()
444 .filter(|status| status.executable)
445 .count(),
446 analytics_jobs_requiring_rerun_count: analytics_job_statuses
447 .iter()
448 .filter(|status| status.requires_rerun)
449 .count(),
450 index_statuses,
451 graph_projection_statuses,
452 analytics_job_statuses,
453 }
454}
455
456fn queue_modes_from_grouped(
457 grouped: &HashMap<String, Vec<UnifiedEntity>>,
458) -> HashMap<String, QueueMode> {
459 let Some(meta) = grouped.get("red_queue_meta") else {
460 return HashMap::new();
461 };
462
463 let mut modes = HashMap::new();
464 for entity in meta {
465 let Some(row) = entity.data.as_row() else {
466 continue;
467 };
468 if row_text(row, "kind").as_deref() != Some("queue_config") {
469 continue;
470 }
471 let Some(queue) = row_text(row, "queue") else {
472 continue;
473 };
474 let mode = row_text(row, "mode")
475 .as_deref()
476 .and_then(QueueMode::parse)
477 .unwrap_or_default();
478 modes.insert(queue, mode);
479 }
480 modes
481}
482
483fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
484 match row.get_field(key) {
485 Some(Value::Text(value)) => Some(value.to_string()),
486 _ => None,
487 }
488}
489
490fn collection_contract<'a>(
491 collection_name: &str,
492 contracts: Option<&'a [CollectionContract]>,
493) -> Option<&'a CollectionContract> {
494 contracts.and_then(|contracts| {
495 contracts
496 .iter()
497 .find(|contract| contract.name == collection_name)
498 })
499}
500
501fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
502 let mut has_table = false;
503 let mut has_graph = false;
504 let mut has_vector = false;
505
506 for entity in entities {
507 match &entity.kind {
508 EntityKind::TableRow { .. } => has_table = true,
509 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
510 EntityKind::Vector { .. } => has_vector = true,
511 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
512 }
513 }
514
515 match (has_table, has_graph, has_vector) {
516 (true, false, false) => CollectionModel::Table,
517 (false, true, false) => CollectionModel::Graph,
518 (false, false, true) => CollectionModel::Vector,
519 _ => CollectionModel::Mixed,
520 }
521}
522
523fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
524 match model {
525 CollectionModel::Table => SchemaMode::Strict,
526 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
527 CollectionModel::Document
528 | CollectionModel::Kv
529 | CollectionModel::Config
530 | CollectionModel::Vault
531 | CollectionModel::Mixed => SchemaMode::Dynamic,
532 CollectionModel::TimeSeries => SchemaMode::SemiStructured,
533 CollectionModel::Queue => SchemaMode::Dynamic,
534 }
535}
536
537fn infer_indices(
538 collection_name: &str,
539 model: CollectionModel,
540 index_catalog: Option<&IndexCatalog>,
541 declarations: Option<&CatalogDeclarations>,
542) -> Vec<String> {
543 let declared = declared_indices(collection_name, declarations);
544 if !declared.is_empty() {
545 return declared;
546 }
547
548 let available = index_catalog
549 .map(IndexCatalog::snapshot)
550 .unwrap_or_default();
551 let mut selected = Vec::new();
552
553 for metric in available {
554 let relevant = matches!(
555 (model, metric.kind),
556 (_, IndexKind::BTree)
557 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
558 | (CollectionModel::Vector, IndexKind::VectorHnsw)
559 | (CollectionModel::Vector, IndexKind::VectorInverted)
560 | (CollectionModel::Document, IndexKind::FullText)
561 | (CollectionModel::Document, IndexKind::DocumentPathValue)
562 | (CollectionModel::Kv, IndexKind::Hash)
563 | (CollectionModel::Config, IndexKind::Hash)
564 | (CollectionModel::Vault, IndexKind::Hash)
565 | (CollectionModel::Mixed, _)
566 );
567
568 if relevant && metric.enabled {
569 selected.push(metric.name);
570 }
571 }
572
573 selected
574}
575
576fn declared_indices(
577 collection_name: &str,
578 declarations: Option<&CatalogDeclarations>,
579) -> Vec<String> {
580 let mut selected = declarations
581 .map(|declarations| {
582 declarations
583 .declared_indexes
584 .iter()
585 .filter(|index| index.collection.as_deref() == Some(collection_name))
586 .map(|index| index.name.clone())
587 .collect::<Vec<_>>()
588 })
589 .unwrap_or_default();
590 selected.sort();
591 selected.dedup();
592 selected
593}
594
595fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
596 let mut selected = index_catalog
597 .map(IndexCatalog::snapshot)
598 .unwrap_or_default()
599 .into_iter()
600 .filter(|metric| metric.enabled)
601 .filter_map(|metric| {
602 if metric.name.starts_with(collection_name)
603 || matches!(
604 metric.kind,
605 IndexKind::BTree
606 | IndexKind::GraphAdjacency
607 | IndexKind::VectorHnsw
608 | IndexKind::VectorInverted
609 | IndexKind::FullText
610 | IndexKind::DocumentPathValue
611 | IndexKind::HybridSearch
612 )
613 {
614 Some(metric.name)
615 } else {
616 None
617 }
618 })
619 .collect::<Vec<_>>();
620 selected.sort();
621 selected.dedup();
622 selected
623}
624
625fn collection_index_consistency(
626 declared_indices: &[String],
627 operational_indices: &[String],
628) -> (bool, Vec<String>, Vec<String>) {
629 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
630 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
631 let missing_operational_indices = declared
632 .difference(&operational)
633 .cloned()
634 .collect::<Vec<_>>();
635 let undeclared_operational_indices = operational
636 .difference(&declared)
637 .cloned()
638 .collect::<Vec<_>>();
639 (
640 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
641 missing_operational_indices,
642 undeclared_operational_indices,
643 )
644}
645
646fn collection_index_readiness(
647 collection_name: &str,
648 statuses: &[CatalogIndexStatus],
649) -> (usize, usize, bool) {
650 let local = statuses
651 .iter()
652 .filter(|status| status.collection.as_deref() == Some(collection_name))
653 .collect::<Vec<_>>();
654 let queryable_count = local.iter().filter(|status| status.queryable).count();
655 let requires_rebuild_count = local
656 .iter()
657 .filter(|status| status.requires_rebuild)
658 .count();
659 let locally_in_sync = local.iter().all(|status| status.in_sync);
660 (queryable_count, requires_rebuild_count, locally_in_sync)
661}
662
663fn collection_graph_projection_readiness(
664 collection_name: &str,
665 statuses: &[CatalogGraphProjectionStatus],
666) -> (usize, usize, bool) {
667 let local = statuses
668 .iter()
669 .filter(|status| status.source.as_deref() == Some(collection_name))
670 .collect::<Vec<_>>();
671 let queryable_count = local.iter().filter(|status| status.queryable).count();
672 let requires_rematerialization_count = local
673 .iter()
674 .filter(|status| status.requires_rematerialization)
675 .count();
676 let locally_in_sync = local
677 .iter()
678 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
679 (
680 queryable_count,
681 requires_rematerialization_count,
682 locally_in_sync,
683 )
684}
685
686fn collection_analytics_job_readiness(
687 collection_name: &str,
688 projection_statuses: &[CatalogGraphProjectionStatus],
689 job_statuses: &[CatalogAnalyticsJobStatus],
690) -> (usize, usize, bool) {
691 let local = job_statuses
692 .iter()
693 .filter(|status| {
694 let Some(projection_name) = status.projection.as_deref() else {
695 return false;
696 };
697 projection_statuses
698 .iter()
699 .find(|projection| projection.name == projection_name)
700 .and_then(|projection| projection.source.as_deref())
701 == Some(collection_name)
702 })
703 .collect::<Vec<_>>();
704 let executable_count = local.iter().filter(|status| status.executable).count();
705 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
706 let locally_in_sync = local
707 .iter()
708 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
709 (executable_count, requires_rerun_count, locally_in_sync)
710}
711
712fn graph_projection_statuses(
713 declarations: Option<&CatalogDeclarations>,
714) -> Vec<CatalogGraphProjectionStatus> {
715 let declared = declarations
716 .map(|declarations| declarations.declared_graph_projections.as_slice())
717 .unwrap_or(&[]);
718 let operational = declarations
719 .map(|declarations| declarations.operational_graph_projections.as_slice())
720 .unwrap_or(&[]);
721 let declared_jobs = declarations
722 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
723 .unwrap_or(&[]);
724 let operational_jobs = declarations
725 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
726 .unwrap_or(&[]);
727
728 let mut names = declared
729 .iter()
730 .map(|projection| projection.name.clone())
731 .chain(operational.iter().map(|projection| projection.name.clone()))
732 .collect::<Vec<_>>();
733 names.sort();
734 names.dedup();
735
736 names
737 .into_iter()
738 .map(|name| {
739 let declared_projection = declared.iter().find(|projection| projection.name == name);
740 let operational_projection = operational
741 .iter()
742 .find(|projection| projection.name == name);
743 let declared_present = declared_projection.is_some();
744 let operational_present = operational_projection.is_some();
745 let mut dependent_job_ids = BTreeSet::new();
746 let mut dependent_job_count = 0usize;
747 let mut active_dependent_job_count = 0usize;
748 let mut stale_dependent_job_count = 0usize;
749 for job in declared_jobs
750 .iter()
751 .chain(operational_jobs.iter())
752 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
753 {
754 if !dependent_job_ids.insert(job.id.clone()) {
755 continue;
756 }
757 dependent_job_count += 1;
758 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
759 active_dependent_job_count += 1;
760 }
761 if job.state == "stale" {
762 stale_dependent_job_count += 1;
763 }
764 }
765 let lifecycle_state = match (
766 declared_present,
767 operational_present,
768 declared_projection
769 .map(|projection| projection.state.as_str())
770 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
771 .unwrap_or_default(),
772 ) {
773 (true, _, "failed") => "failed",
774 (true, true, "stale") => "stale",
775 (true, _, "materializing") => "materializing",
776 (true, true, "materialized") => "materialized",
777 (true, true, _) => "materialized",
778 (false, true, _) => "orphaned-operational",
779 (true, false, _) => "declared",
780 (false, false, _) => "unknown",
781 }
782 .to_string();
783 let queryable = declared_present
784 && operational_present
785 && matches!(
786 declared_projection
787 .map(|projection| projection.state.as_str())
788 .or_else(
789 || operational_projection.map(|projection| projection.state.as_str())
790 )
791 .unwrap_or_default(),
792 "materialized"
793 );
794 let requires_rematerialization = matches!(
795 declared_projection
796 .map(|projection| projection.state.as_str())
797 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
798 .unwrap_or_default(),
799 "declared" | "materializing" | "failed" | "stale"
800 );
801 let mut attention_reasons = Vec::new();
802 if !declared_present || !operational_present {
803 attention_reasons.push("declaration_drift".to_string());
804 }
805 if requires_rematerialization {
806 attention_reasons.push("requires_rematerialization".to_string());
807 }
808 if stale_dependent_job_count > 0 {
809 attention_reasons.push("dependent_jobs_stale".to_string());
810 }
811 let attention_score = stale_dependent_job_count.saturating_mul(2)
812 + usize::from(requires_rematerialization).saturating_mul(4)
813 + usize::from(!declared_present || !operational_present)
814 + usize::from(!queryable);
815 CatalogGraphProjectionStatus {
816 name,
817 source: declared_projection
818 .map(|projection| projection.source.clone())
819 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
820 lifecycle_state,
821 declared: declared_present,
822 operational: operational_present,
823 in_sync: declared_present == operational_present,
824 last_materialized_sequence: declared_projection
825 .and_then(|projection| projection.last_materialized_sequence)
826 .or_else(|| {
827 operational_projection
828 .and_then(|projection| projection.last_materialized_sequence)
829 }),
830 queryable,
831 requires_rematerialization,
832 dependent_job_count,
833 active_dependent_job_count,
834 stale_dependent_job_count,
835 dependent_jobs_in_sync: stale_dependent_job_count == 0,
836 rerun_required: stale_dependent_job_count > 0,
837 attention_score,
838 attention_reasons,
839 }
840 })
841 .collect()
842}
843
844fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
845 let declared = declarations
846 .map(|declarations| declarations.declared_indexes.as_slice())
847 .unwrap_or(&[]);
848 let operational = declarations
849 .map(|declarations| declarations.operational_indexes.as_slice())
850 .unwrap_or(&[]);
851
852 let mut names = declared
853 .iter()
854 .map(|index| index.name.clone())
855 .chain(operational.iter().map(|index| index.name.clone()))
856 .collect::<Vec<_>>();
857 names.sort();
858 names.dedup();
859
860 names
861 .into_iter()
862 .map(|name| {
863 let declared_index = declared.iter().find(|index| index.name == name);
864 let operational_index = operational.iter().find(|index| index.name == name);
865 let collection = declared_index
866 .and_then(|index| index.collection.clone())
867 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
868 let kind = declared_index
869 .map(|index| index_kind_string(index.kind))
870 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
871 .unwrap_or_default();
872 let enabled = declared_index
873 .map(|index| index.enabled)
874 .or_else(|| operational_index.map(|index| index.enabled))
875 .unwrap_or(false);
876 let build_state = operational_index
877 .map(|index| index.build_state.clone())
878 .or_else(|| declared_index.map(|index| index.build_state.clone()));
879 let declared_present = declared_index.is_some();
880 let operational_present = operational_index.is_some();
881 let lifecycle_state = index_lifecycle_state(
882 declared_present,
883 operational_present,
884 enabled,
885 build_state.as_deref(),
886 );
887
888 let mut attention_reasons = Vec::new();
889 if declared_present != operational_present {
890 attention_reasons.push("declaration_drift".to_string());
891 }
892 if !enabled && declared_present {
893 attention_reasons.push("disabled".to_string());
894 }
895 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
896 attention_reasons.push("failed".to_string());
897 }
898 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
899 attention_reasons.push("stale".to_string());
900 }
901 if matches!(
902 build_state.as_deref().unwrap_or_default(),
903 "building" | "declared-unbuilt"
904 ) {
905 attention_reasons.push("requires_rebuild".to_string());
906 }
907 let queryable = declared_present
908 && operational_present
909 && enabled
910 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
911 let requires_rebuild = matches!(
912 build_state.as_deref().unwrap_or_default(),
913 "declared-unbuilt" | "building" | "stale" | "failed"
914 );
915 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
916 + usize::from(!enabled && declared_present)
917 + usize::from(declared_present != operational_present)
918 + usize::from(!queryable);
919
920 let artifact_state = crate::physical::ArtifactState::from_build_state(
921 build_state.as_deref().unwrap_or("declared-unbuilt"),
922 enabled,
923 );
924 CatalogIndexStatus {
925 name,
926 collection,
927 kind,
928 declared: declared_present,
929 operational: operational_present,
930 enabled,
931 build_state,
932 artifact_state,
933 lifecycle_state,
934 in_sync: declared_present == operational_present,
935 queryable,
936 requires_rebuild,
937 attention_score,
938 attention_reasons,
939 }
940 })
941 .collect()
942}
943
944fn index_lifecycle_state(
945 declared: bool,
946 operational: bool,
947 enabled: bool,
948 build_state: Option<&str>,
949) -> String {
950 if !declared && operational {
951 return "orphaned-operational".to_string();
952 }
953 if declared && !enabled {
954 return "disabled".to_string();
955 }
956 if !declared {
957 return "unknown".to_string();
958 }
959 if !operational {
960 return "declared".to_string();
961 }
962
963 match build_state.unwrap_or_default() {
964 "ready" => "ready".to_string(),
965 "failed" => "failed".to_string(),
966 "stale" => "stale".to_string(),
967 "declared-unbuilt" => "declared".to_string(),
968 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
969 "building".to_string()
970 }
971 _ => "building".to_string(),
972 }
973}
974
975fn index_kind_string(kind: IndexKind) -> String {
976 match kind {
977 IndexKind::BTree => "btree",
978 IndexKind::Hash => "hash",
979 IndexKind::Bitmap => "bitmap",
980 IndexKind::Spatial => "spatial.rtree",
981 IndexKind::VectorHnsw => "vector.hnsw",
982 IndexKind::VectorInverted => "vector.inverted",
983 IndexKind::GraphAdjacency => "graph.adjacency",
984 IndexKind::FullText => "text.fulltext",
985 IndexKind::DocumentPathValue => "document.pathvalue",
986 IndexKind::HybridSearch => "search.hybrid",
987 }
988 .to_string()
989}
990
991fn analytics_job_statuses(
992 declarations: Option<&CatalogDeclarations>,
993) -> Vec<CatalogAnalyticsJobStatus> {
994 let declared = declarations
995 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
996 .unwrap_or(&[]);
997 let operational = declarations
998 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
999 .unwrap_or(&[]);
1000 let declared_projections = declarations
1001 .map(|declarations| declarations.declared_graph_projections.as_slice())
1002 .unwrap_or(&[]);
1003 let operational_projections = declarations
1004 .map(|declarations| declarations.operational_graph_projections.as_slice())
1005 .unwrap_or(&[]);
1006
1007 let mut ids = declared
1008 .iter()
1009 .map(|job| job.id.clone())
1010 .chain(operational.iter().map(|job| job.id.clone()))
1011 .collect::<Vec<_>>();
1012 ids.sort();
1013 ids.dedup();
1014
1015 ids.into_iter()
1016 .map(|id| {
1017 let declared_job = declared.iter().find(|job| job.id == id);
1018 let operational_job = operational.iter().find(|job| job.id == id);
1019 let kind = declared_job
1020 .map(|job| job.kind.clone())
1021 .or_else(|| operational_job.map(|job| job.kind.clone()))
1022 .unwrap_or_default();
1023 let projection = declared_job
1024 .and_then(|job| job.projection.clone())
1025 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1026 let state = declared_job
1027 .map(|job| job.state.clone())
1028 .or_else(|| operational_job.map(|job| job.state.clone()))
1029 .unwrap_or_default();
1030 let declared_present = declared_job.is_some();
1031 let operational_present = operational_job.is_some();
1032 let last_run_sequence = declared_job
1033 .and_then(|job| job.last_run_sequence)
1034 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1035 let projection_declared = projection.as_ref().map(|projection_name| {
1036 declared_projections
1037 .iter()
1038 .any(|projection| projection.name == *projection_name)
1039 });
1040 let projection_operational = projection.as_ref().map(|projection_name| {
1041 operational_projections
1042 .iter()
1043 .any(|projection| projection.name == *projection_name)
1044 });
1045 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1046 declared_projections
1047 .iter()
1048 .find(|projection| projection.name == *projection_name)
1049 .map(|projection| projection.state.as_str())
1050 .or_else(|| {
1051 operational_projections
1052 .iter()
1053 .find(|projection| projection.name == *projection_name)
1054 .map(|projection| projection.state.as_str())
1055 })
1056 });
1057 let dependency_in_sync = projection.as_ref().map(|_| {
1058 matches!(projection_lifecycle, Some("materialized"))
1059 && projection_operational == Some(true)
1060 });
1061 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1062 (true, false, _) => "declared",
1063 (true, true, _)
1064 if matches!(
1065 projection_lifecycle,
1066 Some("stale" | "failed" | "materializing" | "declared")
1067 ) =>
1068 {
1069 "stale"
1070 }
1071 (true, true, "completed") => "completed",
1072 (true, true, "running") => "running",
1073 (true, true, "failed") => "failed",
1074 (true, true, "queued") => "queued",
1075 (true, true, "stale") => "stale",
1076 (true, true, _) => "materialized",
1077 (false, true, _) => "orphaned-operational",
1078 (false, false, _) => "unknown",
1079 }
1080 .to_string();
1081 let executable = declared_present
1082 && operational_present
1083 && !matches!(state.as_str(), "failed" | "stale")
1084 && dependency_in_sync.unwrap_or(true);
1085 let requires_rerun =
1086 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1087 let mut attention_reasons = Vec::new();
1088 if declared_present != operational_present {
1089 attention_reasons.push("declaration_drift".to_string());
1090 }
1091 if matches!(state.as_str(), "failed") {
1092 attention_reasons.push("failed".to_string());
1093 }
1094 if matches!(state.as_str(), "stale") {
1095 attention_reasons.push("stale".to_string());
1096 }
1097 if dependency_in_sync == Some(false) {
1098 attention_reasons.push("dependency_drift".to_string());
1099 }
1100 if requires_rerun {
1101 attention_reasons.push("requires_rerun".to_string());
1102 }
1103 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1104 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1105 + usize::from(declared_present != operational_present)
1106 + usize::from(!executable);
1107 CatalogAnalyticsJobStatus {
1108 id,
1109 kind,
1110 projection,
1111 state: state.clone(),
1112 lifecycle_state,
1113 declared: declared_present,
1114 operational: operational_present,
1115 in_sync: declared_present == operational_present,
1116 last_run_sequence,
1117 projection_declared,
1118 projection_operational,
1119 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1120 dependency_in_sync,
1121 executable,
1122 requires_rerun,
1123 attention_score,
1124 attention_reasons,
1125 }
1126 })
1127 .collect()
1128}
1129
1130pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1131 let declared_indexes = snapshot
1132 .declared_indexes
1133 .iter()
1134 .map(|index| index.name.clone())
1135 .collect::<BTreeSet<_>>();
1136 let operational_indexes = snapshot
1137 .operational_indexes
1138 .iter()
1139 .map(|index| index.name.clone())
1140 .collect::<BTreeSet<_>>();
1141 let declared_graph_projections = snapshot
1142 .declared_graph_projections
1143 .iter()
1144 .map(|projection| projection.name.clone())
1145 .collect::<BTreeSet<_>>();
1146 let operational_graph_projections = snapshot
1147 .operational_graph_projections
1148 .iter()
1149 .map(|projection| projection.name.clone())
1150 .collect::<BTreeSet<_>>();
1151 let declared_analytics_jobs = snapshot
1152 .declared_analytics_jobs
1153 .iter()
1154 .map(|job| job.id.clone())
1155 .collect::<BTreeSet<_>>();
1156 let operational_analytics_jobs = snapshot
1157 .operational_analytics_jobs
1158 .iter()
1159 .map(|job| job.id.clone())
1160 .collect::<BTreeSet<_>>();
1161
1162 CatalogConsistencyReport {
1163 declared_index_count: declared_indexes.len(),
1164 operational_index_count: operational_indexes.len(),
1165 declared_graph_projection_count: declared_graph_projections.len(),
1166 operational_graph_projection_count: operational_graph_projections.len(),
1167 declared_analytics_job_count: declared_analytics_jobs.len(),
1168 operational_analytics_job_count: operational_analytics_jobs.len(),
1169 missing_operational_indexes: declared_indexes
1170 .difference(&operational_indexes)
1171 .cloned()
1172 .collect(),
1173 undeclared_operational_indexes: operational_indexes
1174 .difference(&declared_indexes)
1175 .cloned()
1176 .collect(),
1177 missing_operational_graph_projections: declared_graph_projections
1178 .difference(&operational_graph_projections)
1179 .cloned()
1180 .collect(),
1181 undeclared_operational_graph_projections: operational_graph_projections
1182 .difference(&declared_graph_projections)
1183 .cloned()
1184 .collect(),
1185 missing_operational_analytics_jobs: declared_analytics_jobs
1186 .difference(&operational_analytics_jobs)
1187 .cloned()
1188 .collect(),
1189 undeclared_operational_analytics_jobs: operational_analytics_jobs
1190 .difference(&declared_analytics_jobs)
1191 .cloned()
1192 .collect(),
1193 }
1194}
1195
1196pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1197 CatalogAttentionSummary {
1198 collections_requiring_attention: snapshot
1199 .collections
1200 .iter()
1201 .filter(|collection| collection.attention_required)
1202 .count(),
1203 indexes_requiring_attention: snapshot
1204 .index_statuses
1205 .iter()
1206 .filter(|status| status.attention_score > 0)
1207 .count(),
1208 graph_projections_requiring_attention: snapshot
1209 .graph_projection_statuses
1210 .iter()
1211 .filter(|status| status.attention_score > 0)
1212 .count(),
1213 analytics_jobs_requiring_attention: snapshot
1214 .analytics_job_statuses
1215 .iter()
1216 .filter(|status| status.attention_score > 0)
1217 .count(),
1218 top_collection: snapshot
1219 .collections
1220 .iter()
1221 .filter(|collection| collection.attention_score > 0)
1222 .max_by_key(|collection| collection.attention_score)
1223 .cloned(),
1224 top_index: snapshot
1225 .index_statuses
1226 .iter()
1227 .filter(|status| status.attention_score > 0)
1228 .max_by_key(|status| status.attention_score)
1229 .cloned(),
1230 top_graph_projection: snapshot
1231 .graph_projection_statuses
1232 .iter()
1233 .filter(|status| status.attention_score > 0)
1234 .max_by_key(|status| status.attention_score)
1235 .cloned(),
1236 top_analytics_job: snapshot
1237 .analytics_job_statuses
1238 .iter()
1239 .filter(|status| status.attention_score > 0)
1240 .max_by_key(|status| status.attention_score)
1241 .cloned(),
1242 }
1243}
1244
1245pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1246 let mut collections = snapshot
1247 .collections
1248 .iter()
1249 .filter(|collection| collection.attention_required)
1250 .cloned()
1251 .collect::<Vec<_>>();
1252 collections.sort_by(|left, right| {
1253 right
1254 .attention_score
1255 .cmp(&left.attention_score)
1256 .then_with(|| left.name.cmp(&right.name))
1257 });
1258 collections
1259}
1260
1261pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1262 let mut statuses = snapshot
1263 .index_statuses
1264 .iter()
1265 .filter(|status| status.attention_score > 0)
1266 .cloned()
1267 .collect::<Vec<_>>();
1268 statuses.sort_by(|left, right| {
1269 right
1270 .attention_score
1271 .cmp(&left.attention_score)
1272 .then_with(|| left.name.cmp(&right.name))
1273 });
1274 statuses
1275}
1276
1277pub fn graph_projection_attention(
1278 snapshot: &CatalogModelSnapshot,
1279) -> Vec<CatalogGraphProjectionStatus> {
1280 let mut statuses = snapshot
1281 .graph_projection_statuses
1282 .iter()
1283 .filter(|status| status.attention_score > 0)
1284 .cloned()
1285 .collect::<Vec<_>>();
1286 statuses.sort_by(|left, right| {
1287 right
1288 .attention_score
1289 .cmp(&left.attention_score)
1290 .then_with(|| left.name.cmp(&right.name))
1291 });
1292 statuses
1293}
1294
1295pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1296 let mut statuses = snapshot
1297 .analytics_job_statuses
1298 .iter()
1299 .filter(|status| status.attention_score > 0)
1300 .cloned()
1301 .collect::<Vec<_>>();
1302 statuses.sort_by(|left, right| {
1303 right
1304 .attention_score
1305 .cmp(&left.attention_score)
1306 .then_with(|| left.id.cmp(&right.id))
1307 });
1308 statuses
1309}