1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Hll,
23 Sketch,
24 Filter,
25 Kv,
26 Config,
27 Vault,
28 Mixed,
29 TimeSeries,
30 Queue,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum SchemaMode {
35 Strict,
36 SemiStructured,
37 Dynamic,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum SubscriptionOperation {
42 Insert,
43 Update,
44 Delete,
45}
46
47impl SubscriptionOperation {
48 pub fn as_str(self) -> &'static str {
49 match self {
50 Self::Insert => "INSERT",
51 Self::Update => "UPDATE",
52 Self::Delete => "DELETE",
53 }
54 }
55
56 pub fn from_str(value: &str) -> Option<Self> {
57 match value.to_ascii_uppercase().as_str() {
58 "INSERT" => Some(Self::Insert),
59 "UPDATE" => Some(Self::Update),
60 "DELETE" => Some(Self::Delete),
61 _ => None,
62 }
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct SubscriptionDescriptor {
68 pub name: String,
70 pub source: String,
71 pub target_queue: String,
72 pub ops_filter: Vec<SubscriptionOperation>,
73 pub where_filter: Option<String>,
74 pub redact_fields: Vec<String>,
75 pub enabled: bool,
76 pub all_tenants: bool,
81}
82
83#[derive(Debug, Clone)]
84pub struct CollectionDescriptor {
85 pub name: String,
86 pub model: CollectionModel,
87 pub schema_mode: SchemaMode,
88 pub contract_present: bool,
89 pub contract_origin: Option<crate::physical::ContractOrigin>,
90 pub declared_model: Option<CollectionModel>,
91 pub observed_model: CollectionModel,
92 pub queue_mode: Option<QueueMode>,
93 pub vector_dimension: Option<usize>,
94 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
95 pub declared_schema_mode: Option<SchemaMode>,
96 pub observed_schema_mode: SchemaMode,
97 pub entities: usize,
98 pub cross_refs: usize,
99 pub segments: usize,
100 pub indices: Vec<String>,
101 pub declared_indices: Vec<String>,
102 pub operational_indices: Vec<String>,
103 pub indexes_in_sync: bool,
104 pub missing_operational_indices: Vec<String>,
105 pub undeclared_operational_indices: Vec<String>,
106 pub queryable_index_count: usize,
107 pub indexes_requiring_rebuild_count: usize,
108 pub queryable_graph_projection_count: usize,
109 pub graph_projections_requiring_rematerialization_count: usize,
110 pub executable_analytics_job_count: usize,
111 pub analytics_jobs_requiring_rerun_count: usize,
112 pub subscriptions: Vec<SubscriptionDescriptor>,
113 pub resources_in_sync: bool,
114 pub attention_required: bool,
115 pub attention_score: usize,
116 pub attention_reasons: Vec<String>,
117}
118
119#[derive(Debug, Clone)]
120pub struct CatalogModelSnapshot {
121 pub summary: CatalogSnapshot,
122 pub collections: Vec<CollectionDescriptor>,
123 pub indices: IndexCatalogSnapshot,
124 pub declared_indexes: Vec<PhysicalIndexState>,
125 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
126 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
127 pub operational_indexes: Vec<PhysicalIndexState>,
128 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
129 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
130 pub index_statuses: Vec<CatalogIndexStatus>,
131 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
132 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
133 pub queryable_index_count: usize,
134 pub indexes_requiring_rebuild_count: usize,
135 pub queryable_graph_projection_count: usize,
136 pub graph_projections_requiring_rematerialization_count: usize,
137 pub executable_analytics_job_count: usize,
138 pub analytics_jobs_requiring_rerun_count: usize,
139}
140
141#[derive(Debug, Clone)]
142pub struct CatalogAttentionSummary {
143 pub collections_requiring_attention: usize,
144 pub indexes_requiring_attention: usize,
145 pub graph_projections_requiring_attention: usize,
146 pub analytics_jobs_requiring_attention: usize,
147 pub top_collection: Option<CollectionDescriptor>,
148 pub top_index: Option<CatalogIndexStatus>,
149 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
150 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct CatalogDeclarations {
155 pub declared_indexes: Vec<PhysicalIndexState>,
156 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
157 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
158 pub operational_indexes: Vec<PhysicalIndexState>,
159 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
160 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
161}
162
163#[derive(Debug, Clone)]
164pub struct CatalogGraphProjectionStatus {
165 pub name: String,
166 pub source: Option<String>,
167 pub lifecycle_state: String,
168 pub declared: bool,
169 pub operational: bool,
170 pub in_sync: bool,
171 pub last_materialized_sequence: Option<u64>,
172 pub queryable: bool,
173 pub requires_rematerialization: bool,
174 pub dependent_job_count: usize,
175 pub active_dependent_job_count: usize,
176 pub stale_dependent_job_count: usize,
177 pub dependent_jobs_in_sync: bool,
178 pub rerun_required: bool,
179 pub attention_score: usize,
180 pub attention_reasons: Vec<String>,
181}
182
183#[derive(Debug, Clone)]
184pub struct CatalogIndexStatus {
185 pub name: String,
186 pub collection: Option<String>,
187 pub kind: String,
188 pub declared: bool,
189 pub operational: bool,
190 pub enabled: bool,
191 pub build_state: Option<String>,
192 pub artifact_state: crate::physical::ArtifactState,
193 pub lifecycle_state: String,
194 pub in_sync: bool,
195 pub queryable: bool,
196 pub requires_rebuild: bool,
197 pub attention_score: usize,
198 pub attention_reasons: Vec<String>,
199}
200
201#[derive(Debug, Clone)]
202pub struct CatalogAnalyticsJobStatus {
203 pub id: String,
204 pub kind: String,
205 pub projection: Option<String>,
206 pub state: String,
207 pub lifecycle_state: String,
208 pub declared: bool,
209 pub operational: bool,
210 pub in_sync: bool,
211 pub last_run_sequence: Option<u64>,
212 pub projection_declared: Option<bool>,
213 pub projection_operational: Option<bool>,
214 pub projection_lifecycle_state: Option<String>,
215 pub dependency_in_sync: Option<bool>,
216 pub executable: bool,
217 pub requires_rerun: bool,
218 pub attention_score: usize,
219 pub attention_reasons: Vec<String>,
220}
221
222#[derive(Debug, Clone, Default)]
223pub struct CatalogConsistencyReport {
224 pub declared_index_count: usize,
225 pub operational_index_count: usize,
226 pub declared_graph_projection_count: usize,
227 pub operational_graph_projection_count: usize,
228 pub declared_analytics_job_count: usize,
229 pub operational_analytics_job_count: usize,
230 pub missing_operational_indexes: Vec<String>,
231 pub undeclared_operational_indexes: Vec<String>,
232 pub missing_operational_graph_projections: Vec<String>,
233 pub undeclared_operational_graph_projections: Vec<String>,
234 pub missing_operational_analytics_jobs: Vec<String>,
235 pub undeclared_operational_analytics_jobs: Vec<String>,
236}
237
238pub fn snapshot_store(
239 name: &str,
240 store: &UnifiedStore,
241 index_catalog: Option<&IndexCatalog>,
242) -> CatalogModelSnapshot {
243 snapshot_store_with_declarations(name, store, index_catalog, None, None)
244}
245
246pub fn snapshot_store_with_declarations(
247 name: &str,
248 store: &UnifiedStore,
249 index_catalog: Option<&IndexCatalog>,
250 declarations: Option<&CatalogDeclarations>,
251 contracts: Option<&[CollectionContract]>,
252) -> CatalogModelSnapshot {
253 let index_statuses = index_statuses(declarations);
254 let graph_projection_statuses = graph_projection_statuses(declarations);
255 let analytics_job_statuses = analytics_job_statuses(declarations);
256
257 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
258 for (collection, entity) in store.query_all(|_| true) {
259 grouped.entry(collection).or_default().push(entity);
260 }
261 let queue_modes = queue_modes_from_grouped(&grouped);
262
263 let mut stats_by_collection = BTreeMap::new();
264 let mut collections = Vec::new();
265
266 for collection_name in store.list_collections() {
267 let entities = grouped.remove(&collection_name).unwrap_or_default();
268 let inferred_model = infer_model(&entities);
269 let inferred_schema_mode = infer_schema_mode(inferred_model);
270 let contract = collection_contract(&collection_name, contracts);
271 let model = contract
272 .map(|contract| contract.declared_model)
273 .unwrap_or(inferred_model);
274 let cross_refs = entities
275 .iter()
276 .map(|entity| entity.cross_refs().len())
277 .sum();
278 let entity_count = entities.len();
279 let manager_stats = store
280 .get_collection(&collection_name)
281 .map(|manager| manager.stats());
282 let segments = manager_stats
283 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
284 .unwrap_or(0);
285
286 stats_by_collection.insert(
287 collection_name.clone(),
288 CollectionStats {
289 entities: entity_count,
290 cross_refs,
291 segments,
292 },
293 );
294
295 let declared_indices = declared_indices(&collection_name, declarations);
296 let operational_indices = operational_indices(&collection_name, index_catalog);
297 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
298 collection_index_consistency(&declared_indices, &operational_indices);
299 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
300 collection_index_readiness(&collection_name, &index_statuses);
301 let (
302 queryable_graph_projection_count,
303 graph_projections_requiring_rematerialization_count,
304 graph_projections_locally_in_sync,
305 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
306 let (
307 executable_analytics_job_count,
308 analytics_jobs_requiring_rerun_count,
309 analytics_jobs_locally_in_sync,
310 ) = collection_analytics_job_readiness(
311 &collection_name,
312 &graph_projection_statuses,
313 &analytics_job_statuses,
314 );
315 let resources_in_sync = indexes_in_sync
316 && indexes_locally_in_sync
317 && graph_projections_locally_in_sync
318 && analytics_jobs_locally_in_sync;
319 let attention_required = !resources_in_sync
320 || indexes_requiring_rebuild_count > 0
321 || graph_projections_requiring_rematerialization_count > 0
322 || analytics_jobs_requiring_rerun_count > 0;
323 let mut attention_reasons = Vec::new();
324 if !indexes_in_sync || !indexes_locally_in_sync {
325 attention_reasons.push("index_drift".to_string());
326 }
327 if indexes_requiring_rebuild_count > 0 {
328 attention_reasons.push("indexes_require_rebuild".to_string());
329 }
330 if !graph_projections_locally_in_sync {
331 attention_reasons.push("graph_projection_drift".to_string());
332 }
333 if graph_projections_requiring_rematerialization_count > 0 {
334 attention_reasons.push("graph_projections_require_rematerialization".to_string());
335 }
336 if !analytics_jobs_locally_in_sync {
337 attention_reasons.push("analytics_job_drift".to_string());
338 }
339 if analytics_jobs_requiring_rerun_count > 0 {
340 attention_reasons.push("analytics_jobs_require_rerun".to_string());
341 }
342 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
343 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
344 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
345 + usize::from(!resources_in_sync);
346
347 collections.push(CollectionDescriptor {
348 name: collection_name.clone(),
349 model,
350 schema_mode: contract
351 .map(|contract| contract.schema_mode)
352 .unwrap_or(inferred_schema_mode),
353 contract_present: contract.is_some(),
354 contract_origin: contract.map(|contract| contract.origin),
355 declared_model: contract.map(|contract| contract.declared_model),
356 observed_model: inferred_model,
357 queue_mode: if model == CollectionModel::Queue {
358 Some(
359 queue_modes
360 .get(&collection_name)
361 .copied()
362 .unwrap_or_default(),
363 )
364 } else {
365 None
366 },
367 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
368 vector_metric: contract.and_then(|contract| contract.vector_metric),
369 declared_schema_mode: contract.map(|contract| contract.schema_mode),
370 observed_schema_mode: inferred_schema_mode,
371 entities: entity_count,
372 cross_refs,
373 segments,
374 indices: infer_indices(&collection_name, model, index_catalog, declarations),
375 declared_indices,
376 operational_indices,
377 indexes_in_sync,
378 missing_operational_indices,
379 undeclared_operational_indices,
380 queryable_index_count,
381 indexes_requiring_rebuild_count,
382 queryable_graph_projection_count,
383 graph_projections_requiring_rematerialization_count,
384 executable_analytics_job_count,
385 analytics_jobs_requiring_rerun_count,
386 subscriptions: contract
387 .map(|contract| contract.subscriptions.clone())
388 .unwrap_or_default(),
389 resources_in_sync,
390 attention_required,
391 attention_score,
392 attention_reasons,
393 });
394 }
395
396 collections.sort_by(|left, right| left.name.cmp(&right.name));
397
398 let summary = CatalogSnapshot {
399 name: name.to_string(),
400 total_entities: stats_by_collection
401 .values()
402 .map(|stats| stats.entities)
403 .sum(),
404 total_collections: stats_by_collection.len(),
405 stats_by_collection,
406 updated_at: SystemTime::now(),
407 };
408
409 CatalogModelSnapshot {
410 summary,
411 collections,
412 indices: index_catalog
413 .map(IndexCatalog::snapshot)
414 .unwrap_or_default(),
415 declared_indexes: declarations
416 .map(|declarations| declarations.declared_indexes.clone())
417 .unwrap_or_default(),
418 declared_graph_projections: declarations
419 .map(|declarations| declarations.declared_graph_projections.clone())
420 .unwrap_or_default(),
421 declared_analytics_jobs: declarations
422 .map(|declarations| declarations.declared_analytics_jobs.clone())
423 .unwrap_or_default(),
424 operational_indexes: declarations
425 .map(|declarations| declarations.operational_indexes.clone())
426 .unwrap_or_default(),
427 operational_graph_projections: declarations
428 .map(|declarations| declarations.operational_graph_projections.clone())
429 .unwrap_or_default(),
430 operational_analytics_jobs: declarations
431 .map(|declarations| declarations.operational_analytics_jobs.clone())
432 .unwrap_or_default(),
433 queryable_index_count: index_statuses
434 .iter()
435 .filter(|status| status.queryable)
436 .count(),
437 indexes_requiring_rebuild_count: index_statuses
438 .iter()
439 .filter(|status| status.requires_rebuild)
440 .count(),
441 queryable_graph_projection_count: graph_projection_statuses
442 .iter()
443 .filter(|status| status.queryable)
444 .count(),
445 graph_projections_requiring_rematerialization_count: graph_projection_statuses
446 .iter()
447 .filter(|status| status.requires_rematerialization)
448 .count(),
449 executable_analytics_job_count: analytics_job_statuses
450 .iter()
451 .filter(|status| status.executable)
452 .count(),
453 analytics_jobs_requiring_rerun_count: analytics_job_statuses
454 .iter()
455 .filter(|status| status.requires_rerun)
456 .count(),
457 index_statuses,
458 graph_projection_statuses,
459 analytics_job_statuses,
460 }
461}
462
463fn queue_modes_from_grouped(
464 grouped: &HashMap<String, Vec<UnifiedEntity>>,
465) -> HashMap<String, QueueMode> {
466 let Some(meta) = grouped.get("red_queue_meta") else {
467 return HashMap::new();
468 };
469
470 let mut modes = HashMap::new();
471 for entity in meta {
472 let Some(row) = entity.data.as_row() else {
473 continue;
474 };
475 if row_text(row, "kind").as_deref() != Some("queue_config") {
476 continue;
477 }
478 let Some(queue) = row_text(row, "queue") else {
479 continue;
480 };
481 let mode = row_text(row, "mode")
482 .as_deref()
483 .and_then(QueueMode::parse)
484 .unwrap_or_default();
485 modes.insert(queue, mode);
486 }
487 modes
488}
489
490fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
491 match row.get_field(key) {
492 Some(Value::Text(value)) => Some(value.to_string()),
493 _ => None,
494 }
495}
496
497fn collection_contract<'a>(
498 collection_name: &str,
499 contracts: Option<&'a [CollectionContract]>,
500) -> Option<&'a CollectionContract> {
501 contracts.and_then(|contracts| {
502 contracts
503 .iter()
504 .find(|contract| contract.name == collection_name)
505 })
506}
507
508fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
509 let mut has_table = false;
510 let mut has_graph = false;
511 let mut has_vector = false;
512
513 for entity in entities {
514 match &entity.kind {
515 EntityKind::TableRow { .. } => has_table = true,
516 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
517 EntityKind::Vector { .. } => has_vector = true,
518 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
519 }
520 }
521
522 match (has_table, has_graph, has_vector) {
523 (true, false, false) => CollectionModel::Table,
524 (false, true, false) => CollectionModel::Graph,
525 (false, false, true) => CollectionModel::Vector,
526 _ => CollectionModel::Mixed,
527 }
528}
529
530fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
531 match model {
532 CollectionModel::Table => SchemaMode::Strict,
533 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
534 CollectionModel::Document
535 | CollectionModel::Hll
536 | CollectionModel::Sketch
537 | CollectionModel::Filter
538 | CollectionModel::Kv
539 | CollectionModel::Config
540 | CollectionModel::Vault
541 | CollectionModel::Mixed => SchemaMode::Dynamic,
542 CollectionModel::TimeSeries => SchemaMode::SemiStructured,
543 CollectionModel::Queue => SchemaMode::Dynamic,
544 }
545}
546
547fn infer_indices(
548 collection_name: &str,
549 model: CollectionModel,
550 index_catalog: Option<&IndexCatalog>,
551 declarations: Option<&CatalogDeclarations>,
552) -> Vec<String> {
553 let declared = declared_indices(collection_name, declarations);
554 if !declared.is_empty() {
555 return declared;
556 }
557
558 let available = index_catalog
559 .map(IndexCatalog::snapshot)
560 .unwrap_or_default();
561 let mut selected = Vec::new();
562
563 for metric in available {
564 let relevant = matches!(
565 (model, metric.kind),
566 (_, IndexKind::BTree)
567 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
568 | (CollectionModel::Vector, IndexKind::VectorHnsw)
569 | (CollectionModel::Vector, IndexKind::VectorInverted)
570 | (CollectionModel::Document, IndexKind::FullText)
571 | (CollectionModel::Document, IndexKind::DocumentPathValue)
572 | (CollectionModel::Kv, IndexKind::Hash)
573 | (CollectionModel::Config, IndexKind::Hash)
574 | (CollectionModel::Vault, IndexKind::Hash)
575 | (CollectionModel::Mixed, _)
576 );
577
578 if relevant && metric.enabled {
579 selected.push(metric.name);
580 }
581 }
582
583 selected
584}
585
586fn declared_indices(
587 collection_name: &str,
588 declarations: Option<&CatalogDeclarations>,
589) -> Vec<String> {
590 let mut selected = declarations
591 .map(|declarations| {
592 declarations
593 .declared_indexes
594 .iter()
595 .filter(|index| index.collection.as_deref() == Some(collection_name))
596 .map(|index| index.name.clone())
597 .collect::<Vec<_>>()
598 })
599 .unwrap_or_default();
600 selected.sort();
601 selected.dedup();
602 selected
603}
604
605fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
606 let mut selected = index_catalog
607 .map(IndexCatalog::snapshot)
608 .unwrap_or_default()
609 .into_iter()
610 .filter(|metric| metric.enabled)
611 .filter_map(|metric| {
612 if metric.name.starts_with(collection_name)
613 || matches!(
614 metric.kind,
615 IndexKind::BTree
616 | IndexKind::GraphAdjacency
617 | IndexKind::VectorHnsw
618 | IndexKind::VectorInverted
619 | IndexKind::FullText
620 | IndexKind::DocumentPathValue
621 | IndexKind::HybridSearch
622 )
623 {
624 Some(metric.name)
625 } else {
626 None
627 }
628 })
629 .collect::<Vec<_>>();
630 selected.sort();
631 selected.dedup();
632 selected
633}
634
635fn collection_index_consistency(
636 declared_indices: &[String],
637 operational_indices: &[String],
638) -> (bool, Vec<String>, Vec<String>) {
639 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
640 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
641 let missing_operational_indices = declared
642 .difference(&operational)
643 .cloned()
644 .collect::<Vec<_>>();
645 let undeclared_operational_indices = operational
646 .difference(&declared)
647 .cloned()
648 .collect::<Vec<_>>();
649 (
650 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
651 missing_operational_indices,
652 undeclared_operational_indices,
653 )
654}
655
656fn collection_index_readiness(
657 collection_name: &str,
658 statuses: &[CatalogIndexStatus],
659) -> (usize, usize, bool) {
660 let local = statuses
661 .iter()
662 .filter(|status| status.collection.as_deref() == Some(collection_name))
663 .collect::<Vec<_>>();
664 let queryable_count = local.iter().filter(|status| status.queryable).count();
665 let requires_rebuild_count = local
666 .iter()
667 .filter(|status| status.requires_rebuild)
668 .count();
669 let locally_in_sync = local.iter().all(|status| status.in_sync);
670 (queryable_count, requires_rebuild_count, locally_in_sync)
671}
672
673fn collection_graph_projection_readiness(
674 collection_name: &str,
675 statuses: &[CatalogGraphProjectionStatus],
676) -> (usize, usize, bool) {
677 let local = statuses
678 .iter()
679 .filter(|status| status.source.as_deref() == Some(collection_name))
680 .collect::<Vec<_>>();
681 let queryable_count = local.iter().filter(|status| status.queryable).count();
682 let requires_rematerialization_count = local
683 .iter()
684 .filter(|status| status.requires_rematerialization)
685 .count();
686 let locally_in_sync = local
687 .iter()
688 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
689 (
690 queryable_count,
691 requires_rematerialization_count,
692 locally_in_sync,
693 )
694}
695
696fn collection_analytics_job_readiness(
697 collection_name: &str,
698 projection_statuses: &[CatalogGraphProjectionStatus],
699 job_statuses: &[CatalogAnalyticsJobStatus],
700) -> (usize, usize, bool) {
701 let local = job_statuses
702 .iter()
703 .filter(|status| {
704 let Some(projection_name) = status.projection.as_deref() else {
705 return false;
706 };
707 projection_statuses
708 .iter()
709 .find(|projection| projection.name == projection_name)
710 .and_then(|projection| projection.source.as_deref())
711 == Some(collection_name)
712 })
713 .collect::<Vec<_>>();
714 let executable_count = local.iter().filter(|status| status.executable).count();
715 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
716 let locally_in_sync = local
717 .iter()
718 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
719 (executable_count, requires_rerun_count, locally_in_sync)
720}
721
722fn graph_projection_statuses(
723 declarations: Option<&CatalogDeclarations>,
724) -> Vec<CatalogGraphProjectionStatus> {
725 let declared = declarations
726 .map(|declarations| declarations.declared_graph_projections.as_slice())
727 .unwrap_or(&[]);
728 let operational = declarations
729 .map(|declarations| declarations.operational_graph_projections.as_slice())
730 .unwrap_or(&[]);
731 let declared_jobs = declarations
732 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
733 .unwrap_or(&[]);
734 let operational_jobs = declarations
735 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
736 .unwrap_or(&[]);
737
738 let mut names = declared
739 .iter()
740 .map(|projection| projection.name.clone())
741 .chain(operational.iter().map(|projection| projection.name.clone()))
742 .collect::<Vec<_>>();
743 names.sort();
744 names.dedup();
745
746 names
747 .into_iter()
748 .map(|name| {
749 let declared_projection = declared.iter().find(|projection| projection.name == name);
750 let operational_projection = operational
751 .iter()
752 .find(|projection| projection.name == name);
753 let declared_present = declared_projection.is_some();
754 let operational_present = operational_projection.is_some();
755 let mut dependent_job_ids = BTreeSet::new();
756 let mut dependent_job_count = 0usize;
757 let mut active_dependent_job_count = 0usize;
758 let mut stale_dependent_job_count = 0usize;
759 for job in declared_jobs
760 .iter()
761 .chain(operational_jobs.iter())
762 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
763 {
764 if !dependent_job_ids.insert(job.id.clone()) {
765 continue;
766 }
767 dependent_job_count += 1;
768 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
769 active_dependent_job_count += 1;
770 }
771 if job.state == "stale" {
772 stale_dependent_job_count += 1;
773 }
774 }
775 let lifecycle_state = match (
776 declared_present,
777 operational_present,
778 declared_projection
779 .map(|projection| projection.state.as_str())
780 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
781 .unwrap_or_default(),
782 ) {
783 (true, _, "failed") => "failed",
784 (true, true, "stale") => "stale",
785 (true, _, "materializing") => "materializing",
786 (true, true, "materialized") => "materialized",
787 (true, true, _) => "materialized",
788 (false, true, _) => "orphaned-operational",
789 (true, false, _) => "declared",
790 (false, false, _) => "unknown",
791 }
792 .to_string();
793 let queryable = declared_present
794 && operational_present
795 && matches!(
796 declared_projection
797 .map(|projection| projection.state.as_str())
798 .or_else(
799 || operational_projection.map(|projection| projection.state.as_str())
800 )
801 .unwrap_or_default(),
802 "materialized"
803 );
804 let requires_rematerialization = matches!(
805 declared_projection
806 .map(|projection| projection.state.as_str())
807 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
808 .unwrap_or_default(),
809 "declared" | "materializing" | "failed" | "stale"
810 );
811 let mut attention_reasons = Vec::new();
812 if !declared_present || !operational_present {
813 attention_reasons.push("declaration_drift".to_string());
814 }
815 if requires_rematerialization {
816 attention_reasons.push("requires_rematerialization".to_string());
817 }
818 if stale_dependent_job_count > 0 {
819 attention_reasons.push("dependent_jobs_stale".to_string());
820 }
821 let attention_score = stale_dependent_job_count.saturating_mul(2)
822 + usize::from(requires_rematerialization).saturating_mul(4)
823 + usize::from(!declared_present || !operational_present)
824 + usize::from(!queryable);
825 CatalogGraphProjectionStatus {
826 name,
827 source: declared_projection
828 .map(|projection| projection.source.clone())
829 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
830 lifecycle_state,
831 declared: declared_present,
832 operational: operational_present,
833 in_sync: declared_present == operational_present,
834 last_materialized_sequence: declared_projection
835 .and_then(|projection| projection.last_materialized_sequence)
836 .or_else(|| {
837 operational_projection
838 .and_then(|projection| projection.last_materialized_sequence)
839 }),
840 queryable,
841 requires_rematerialization,
842 dependent_job_count,
843 active_dependent_job_count,
844 stale_dependent_job_count,
845 dependent_jobs_in_sync: stale_dependent_job_count == 0,
846 rerun_required: stale_dependent_job_count > 0,
847 attention_score,
848 attention_reasons,
849 }
850 })
851 .collect()
852}
853
854fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
855 let declared = declarations
856 .map(|declarations| declarations.declared_indexes.as_slice())
857 .unwrap_or(&[]);
858 let operational = declarations
859 .map(|declarations| declarations.operational_indexes.as_slice())
860 .unwrap_or(&[]);
861
862 let mut names = declared
863 .iter()
864 .map(|index| index.name.clone())
865 .chain(operational.iter().map(|index| index.name.clone()))
866 .collect::<Vec<_>>();
867 names.sort();
868 names.dedup();
869
870 names
871 .into_iter()
872 .map(|name| {
873 let declared_index = declared.iter().find(|index| index.name == name);
874 let operational_index = operational.iter().find(|index| index.name == name);
875 let collection = declared_index
876 .and_then(|index| index.collection.clone())
877 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
878 let kind = declared_index
879 .map(|index| index_kind_string(index.kind))
880 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
881 .unwrap_or_default();
882 let enabled = declared_index
883 .map(|index| index.enabled)
884 .or_else(|| operational_index.map(|index| index.enabled))
885 .unwrap_or(false);
886 let build_state = operational_index
887 .map(|index| index.build_state.clone())
888 .or_else(|| declared_index.map(|index| index.build_state.clone()));
889 let declared_present = declared_index.is_some();
890 let operational_present = operational_index.is_some();
891 let lifecycle_state = index_lifecycle_state(
892 declared_present,
893 operational_present,
894 enabled,
895 build_state.as_deref(),
896 );
897
898 let mut attention_reasons = Vec::new();
899 if declared_present != operational_present {
900 attention_reasons.push("declaration_drift".to_string());
901 }
902 if !enabled && declared_present {
903 attention_reasons.push("disabled".to_string());
904 }
905 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
906 attention_reasons.push("failed".to_string());
907 }
908 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
909 attention_reasons.push("stale".to_string());
910 }
911 if matches!(
912 build_state.as_deref().unwrap_or_default(),
913 "building" | "declared-unbuilt"
914 ) {
915 attention_reasons.push("requires_rebuild".to_string());
916 }
917 let queryable = declared_present
918 && operational_present
919 && enabled
920 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
921 let requires_rebuild = matches!(
922 build_state.as_deref().unwrap_or_default(),
923 "declared-unbuilt" | "building" | "stale" | "failed"
924 );
925 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
926 + usize::from(!enabled && declared_present)
927 + usize::from(declared_present != operational_present)
928 + usize::from(!queryable);
929
930 let artifact_state = crate::physical::ArtifactState::from_build_state(
931 build_state.as_deref().unwrap_or("declared-unbuilt"),
932 enabled,
933 );
934 CatalogIndexStatus {
935 name,
936 collection,
937 kind,
938 declared: declared_present,
939 operational: operational_present,
940 enabled,
941 build_state,
942 artifact_state,
943 lifecycle_state,
944 in_sync: declared_present == operational_present,
945 queryable,
946 requires_rebuild,
947 attention_score,
948 attention_reasons,
949 }
950 })
951 .collect()
952}
953
954fn index_lifecycle_state(
955 declared: bool,
956 operational: bool,
957 enabled: bool,
958 build_state: Option<&str>,
959) -> String {
960 if !declared && operational {
961 return "orphaned-operational".to_string();
962 }
963 if declared && !enabled {
964 return "disabled".to_string();
965 }
966 if !declared {
967 return "unknown".to_string();
968 }
969 if !operational {
970 return "declared".to_string();
971 }
972
973 match build_state.unwrap_or_default() {
974 "ready" => "ready".to_string(),
975 "failed" => "failed".to_string(),
976 "stale" => "stale".to_string(),
977 "declared-unbuilt" => "declared".to_string(),
978 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
979 "building".to_string()
980 }
981 _ => "building".to_string(),
982 }
983}
984
985fn index_kind_string(kind: IndexKind) -> String {
986 match kind {
987 IndexKind::BTree => "btree",
988 IndexKind::Hash => "hash",
989 IndexKind::Bitmap => "bitmap",
990 IndexKind::Spatial => "spatial.rtree",
991 IndexKind::VectorHnsw => "vector.hnsw",
992 IndexKind::VectorInverted => "vector.inverted",
993 IndexKind::GraphAdjacency => "graph.adjacency",
994 IndexKind::FullText => "text.fulltext",
995 IndexKind::DocumentPathValue => "document.pathvalue",
996 IndexKind::HybridSearch => "search.hybrid",
997 }
998 .to_string()
999}
1000
1001fn analytics_job_statuses(
1002 declarations: Option<&CatalogDeclarations>,
1003) -> Vec<CatalogAnalyticsJobStatus> {
1004 let declared = declarations
1005 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1006 .unwrap_or(&[]);
1007 let operational = declarations
1008 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1009 .unwrap_or(&[]);
1010 let declared_projections = declarations
1011 .map(|declarations| declarations.declared_graph_projections.as_slice())
1012 .unwrap_or(&[]);
1013 let operational_projections = declarations
1014 .map(|declarations| declarations.operational_graph_projections.as_slice())
1015 .unwrap_or(&[]);
1016
1017 let mut ids = declared
1018 .iter()
1019 .map(|job| job.id.clone())
1020 .chain(operational.iter().map(|job| job.id.clone()))
1021 .collect::<Vec<_>>();
1022 ids.sort();
1023 ids.dedup();
1024
1025 ids.into_iter()
1026 .map(|id| {
1027 let declared_job = declared.iter().find(|job| job.id == id);
1028 let operational_job = operational.iter().find(|job| job.id == id);
1029 let kind = declared_job
1030 .map(|job| job.kind.clone())
1031 .or_else(|| operational_job.map(|job| job.kind.clone()))
1032 .unwrap_or_default();
1033 let projection = declared_job
1034 .and_then(|job| job.projection.clone())
1035 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1036 let state = declared_job
1037 .map(|job| job.state.clone())
1038 .or_else(|| operational_job.map(|job| job.state.clone()))
1039 .unwrap_or_default();
1040 let declared_present = declared_job.is_some();
1041 let operational_present = operational_job.is_some();
1042 let last_run_sequence = declared_job
1043 .and_then(|job| job.last_run_sequence)
1044 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1045 let projection_declared = projection.as_ref().map(|projection_name| {
1046 declared_projections
1047 .iter()
1048 .any(|projection| projection.name == *projection_name)
1049 });
1050 let projection_operational = projection.as_ref().map(|projection_name| {
1051 operational_projections
1052 .iter()
1053 .any(|projection| projection.name == *projection_name)
1054 });
1055 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1056 declared_projections
1057 .iter()
1058 .find(|projection| projection.name == *projection_name)
1059 .map(|projection| projection.state.as_str())
1060 .or_else(|| {
1061 operational_projections
1062 .iter()
1063 .find(|projection| projection.name == *projection_name)
1064 .map(|projection| projection.state.as_str())
1065 })
1066 });
1067 let dependency_in_sync = projection.as_ref().map(|_| {
1068 matches!(projection_lifecycle, Some("materialized"))
1069 && projection_operational == Some(true)
1070 });
1071 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1072 (true, false, _) => "declared",
1073 (true, true, _)
1074 if matches!(
1075 projection_lifecycle,
1076 Some("stale" | "failed" | "materializing" | "declared")
1077 ) =>
1078 {
1079 "stale"
1080 }
1081 (true, true, "completed") => "completed",
1082 (true, true, "running") => "running",
1083 (true, true, "failed") => "failed",
1084 (true, true, "queued") => "queued",
1085 (true, true, "stale") => "stale",
1086 (true, true, _) => "materialized",
1087 (false, true, _) => "orphaned-operational",
1088 (false, false, _) => "unknown",
1089 }
1090 .to_string();
1091 let executable = declared_present
1092 && operational_present
1093 && !matches!(state.as_str(), "failed" | "stale")
1094 && dependency_in_sync.unwrap_or(true);
1095 let requires_rerun =
1096 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1097 let mut attention_reasons = Vec::new();
1098 if declared_present != operational_present {
1099 attention_reasons.push("declaration_drift".to_string());
1100 }
1101 if matches!(state.as_str(), "failed") {
1102 attention_reasons.push("failed".to_string());
1103 }
1104 if matches!(state.as_str(), "stale") {
1105 attention_reasons.push("stale".to_string());
1106 }
1107 if dependency_in_sync == Some(false) {
1108 attention_reasons.push("dependency_drift".to_string());
1109 }
1110 if requires_rerun {
1111 attention_reasons.push("requires_rerun".to_string());
1112 }
1113 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1114 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1115 + usize::from(declared_present != operational_present)
1116 + usize::from(!executable);
1117 CatalogAnalyticsJobStatus {
1118 id,
1119 kind,
1120 projection,
1121 state: state.clone(),
1122 lifecycle_state,
1123 declared: declared_present,
1124 operational: operational_present,
1125 in_sync: declared_present == operational_present,
1126 last_run_sequence,
1127 projection_declared,
1128 projection_operational,
1129 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1130 dependency_in_sync,
1131 executable,
1132 requires_rerun,
1133 attention_score,
1134 attention_reasons,
1135 }
1136 })
1137 .collect()
1138}
1139
1140pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1141 let declared_indexes = snapshot
1142 .declared_indexes
1143 .iter()
1144 .map(|index| index.name.clone())
1145 .collect::<BTreeSet<_>>();
1146 let operational_indexes = snapshot
1147 .operational_indexes
1148 .iter()
1149 .map(|index| index.name.clone())
1150 .collect::<BTreeSet<_>>();
1151 let declared_graph_projections = snapshot
1152 .declared_graph_projections
1153 .iter()
1154 .map(|projection| projection.name.clone())
1155 .collect::<BTreeSet<_>>();
1156 let operational_graph_projections = snapshot
1157 .operational_graph_projections
1158 .iter()
1159 .map(|projection| projection.name.clone())
1160 .collect::<BTreeSet<_>>();
1161 let declared_analytics_jobs = snapshot
1162 .declared_analytics_jobs
1163 .iter()
1164 .map(|job| job.id.clone())
1165 .collect::<BTreeSet<_>>();
1166 let operational_analytics_jobs = snapshot
1167 .operational_analytics_jobs
1168 .iter()
1169 .map(|job| job.id.clone())
1170 .collect::<BTreeSet<_>>();
1171
1172 CatalogConsistencyReport {
1173 declared_index_count: declared_indexes.len(),
1174 operational_index_count: operational_indexes.len(),
1175 declared_graph_projection_count: declared_graph_projections.len(),
1176 operational_graph_projection_count: operational_graph_projections.len(),
1177 declared_analytics_job_count: declared_analytics_jobs.len(),
1178 operational_analytics_job_count: operational_analytics_jobs.len(),
1179 missing_operational_indexes: declared_indexes
1180 .difference(&operational_indexes)
1181 .cloned()
1182 .collect(),
1183 undeclared_operational_indexes: operational_indexes
1184 .difference(&declared_indexes)
1185 .cloned()
1186 .collect(),
1187 missing_operational_graph_projections: declared_graph_projections
1188 .difference(&operational_graph_projections)
1189 .cloned()
1190 .collect(),
1191 undeclared_operational_graph_projections: operational_graph_projections
1192 .difference(&declared_graph_projections)
1193 .cloned()
1194 .collect(),
1195 missing_operational_analytics_jobs: declared_analytics_jobs
1196 .difference(&operational_analytics_jobs)
1197 .cloned()
1198 .collect(),
1199 undeclared_operational_analytics_jobs: operational_analytics_jobs
1200 .difference(&declared_analytics_jobs)
1201 .cloned()
1202 .collect(),
1203 }
1204}
1205
1206pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1207 CatalogAttentionSummary {
1208 collections_requiring_attention: snapshot
1209 .collections
1210 .iter()
1211 .filter(|collection| collection.attention_required)
1212 .count(),
1213 indexes_requiring_attention: snapshot
1214 .index_statuses
1215 .iter()
1216 .filter(|status| status.attention_score > 0)
1217 .count(),
1218 graph_projections_requiring_attention: snapshot
1219 .graph_projection_statuses
1220 .iter()
1221 .filter(|status| status.attention_score > 0)
1222 .count(),
1223 analytics_jobs_requiring_attention: snapshot
1224 .analytics_job_statuses
1225 .iter()
1226 .filter(|status| status.attention_score > 0)
1227 .count(),
1228 top_collection: snapshot
1229 .collections
1230 .iter()
1231 .filter(|collection| collection.attention_score > 0)
1232 .max_by_key(|collection| collection.attention_score)
1233 .cloned(),
1234 top_index: snapshot
1235 .index_statuses
1236 .iter()
1237 .filter(|status| status.attention_score > 0)
1238 .max_by_key(|status| status.attention_score)
1239 .cloned(),
1240 top_graph_projection: snapshot
1241 .graph_projection_statuses
1242 .iter()
1243 .filter(|status| status.attention_score > 0)
1244 .max_by_key(|status| status.attention_score)
1245 .cloned(),
1246 top_analytics_job: snapshot
1247 .analytics_job_statuses
1248 .iter()
1249 .filter(|status| status.attention_score > 0)
1250 .max_by_key(|status| status.attention_score)
1251 .cloned(),
1252 }
1253}
1254
1255pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1256 let mut collections = snapshot
1257 .collections
1258 .iter()
1259 .filter(|collection| collection.attention_required)
1260 .cloned()
1261 .collect::<Vec<_>>();
1262 collections.sort_by(|left, right| {
1263 right
1264 .attention_score
1265 .cmp(&left.attention_score)
1266 .then_with(|| left.name.cmp(&right.name))
1267 });
1268 collections
1269}
1270
1271pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1272 let mut statuses = snapshot
1273 .index_statuses
1274 .iter()
1275 .filter(|status| status.attention_score > 0)
1276 .cloned()
1277 .collect::<Vec<_>>();
1278 statuses.sort_by(|left, right| {
1279 right
1280 .attention_score
1281 .cmp(&left.attention_score)
1282 .then_with(|| left.name.cmp(&right.name))
1283 });
1284 statuses
1285}
1286
1287pub fn graph_projection_attention(
1288 snapshot: &CatalogModelSnapshot,
1289) -> Vec<CatalogGraphProjectionStatus> {
1290 let mut statuses = snapshot
1291 .graph_projection_statuses
1292 .iter()
1293 .filter(|status| status.attention_score > 0)
1294 .cloned()
1295 .collect::<Vec<_>>();
1296 statuses.sort_by(|left, right| {
1297 right
1298 .attention_score
1299 .cmp(&left.attention_score)
1300 .then_with(|| left.name.cmp(&right.name))
1301 });
1302 statuses
1303}
1304
1305pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1306 let mut statuses = snapshot
1307 .analytics_job_statuses
1308 .iter()
1309 .filter(|status| status.attention_score > 0)
1310 .cloned()
1311 .collect::<Vec<_>>();
1312 statuses.sort_by(|left, right| {
1313 right
1314 .attention_score
1315 .cmp(&left.attention_score)
1316 .then_with(|| left.id.cmp(&right.id))
1317 });
1318 statuses
1319}