1use std::collections::{BTreeMap, BTreeSet, HashMap};
4use std::time::SystemTime;
5
6use crate::api::{CatalogSnapshot, CollectionStats};
7use crate::index::{IndexCatalog, IndexCatalogSnapshot, IndexKind};
8use crate::physical::{
9 CollectionContract, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState,
10};
11use crate::storage::queue::QueueMode;
12use crate::storage::schema::Value;
13use crate::storage::unified::UnifiedStore;
14use crate::storage::{EntityKind, UnifiedEntity};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CollectionModel {
18 Table,
19 Document,
20 Graph,
21 Vector,
22 Hll,
23 Sketch,
24 Filter,
25 Kv,
26 Config,
27 Vault,
28 Mixed,
29 TimeSeries,
30 Queue,
31 Metrics,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SchemaMode {
36 Strict,
37 SemiStructured,
38 Dynamic,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SubscriptionOperation {
43 Insert,
44 Update,
45 Delete,
46}
47
48impl SubscriptionOperation {
49 pub fn as_str(self) -> &'static str {
50 match self {
51 Self::Insert => "INSERT",
52 Self::Update => "UPDATE",
53 Self::Delete => "DELETE",
54 }
55 }
56
57 pub fn from_str(value: &str) -> Option<Self> {
58 match value.to_ascii_uppercase().as_str() {
59 "INSERT" => Some(Self::Insert),
60 "UPDATE" => Some(Self::Update),
61 "DELETE" => Some(Self::Delete),
62 _ => None,
63 }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SubscriptionDescriptor {
69 pub name: String,
71 pub source: String,
72 pub target_queue: String,
73 pub ops_filter: Vec<SubscriptionOperation>,
74 pub where_filter: Option<String>,
75 pub redact_fields: Vec<String>,
76 pub enabled: bool,
77 pub all_tenants: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct CollectionDescriptor {
86 pub name: String,
87 pub model: CollectionModel,
88 pub schema_mode: SchemaMode,
89 pub contract_present: bool,
90 pub contract_origin: Option<crate::physical::ContractOrigin>,
91 pub declared_model: Option<CollectionModel>,
92 pub observed_model: CollectionModel,
93 pub queue_mode: Option<QueueMode>,
94 pub vector_dimension: Option<usize>,
95 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
96 pub declared_schema_mode: Option<SchemaMode>,
97 pub observed_schema_mode: SchemaMode,
98 pub entities: usize,
99 pub cross_refs: usize,
100 pub segments: usize,
101 pub indices: Vec<String>,
102 pub declared_indices: Vec<String>,
103 pub operational_indices: Vec<String>,
104 pub indexes_in_sync: bool,
105 pub missing_operational_indices: Vec<String>,
106 pub undeclared_operational_indices: Vec<String>,
107 pub queryable_index_count: usize,
108 pub indexes_requiring_rebuild_count: usize,
109 pub queryable_graph_projection_count: usize,
110 pub graph_projections_requiring_rematerialization_count: usize,
111 pub executable_analytics_job_count: usize,
112 pub analytics_jobs_requiring_rerun_count: usize,
113 pub subscriptions: Vec<SubscriptionDescriptor>,
114 pub resources_in_sync: bool,
115 pub attention_required: bool,
116 pub attention_score: usize,
117 pub attention_reasons: Vec<String>,
118}
119
120#[derive(Debug, Clone)]
121pub struct CatalogModelSnapshot {
122 pub summary: CatalogSnapshot,
123 pub collections: Vec<CollectionDescriptor>,
124 pub indices: IndexCatalogSnapshot,
125 pub declared_indexes: Vec<PhysicalIndexState>,
126 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
127 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
128 pub operational_indexes: Vec<PhysicalIndexState>,
129 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
130 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
131 pub index_statuses: Vec<CatalogIndexStatus>,
132 pub graph_projection_statuses: Vec<CatalogGraphProjectionStatus>,
133 pub analytics_job_statuses: Vec<CatalogAnalyticsJobStatus>,
134 pub queryable_index_count: usize,
135 pub indexes_requiring_rebuild_count: usize,
136 pub queryable_graph_projection_count: usize,
137 pub graph_projections_requiring_rematerialization_count: usize,
138 pub executable_analytics_job_count: usize,
139 pub analytics_jobs_requiring_rerun_count: usize,
140}
141
142#[derive(Debug, Clone)]
143pub struct CatalogAttentionSummary {
144 pub collections_requiring_attention: usize,
145 pub indexes_requiring_attention: usize,
146 pub graph_projections_requiring_attention: usize,
147 pub analytics_jobs_requiring_attention: usize,
148 pub top_collection: Option<CollectionDescriptor>,
149 pub top_index: Option<CatalogIndexStatus>,
150 pub top_graph_projection: Option<CatalogGraphProjectionStatus>,
151 pub top_analytics_job: Option<CatalogAnalyticsJobStatus>,
152}
153
154#[derive(Debug, Clone, Default)]
155pub struct CatalogDeclarations {
156 pub declared_indexes: Vec<PhysicalIndexState>,
157 pub declared_graph_projections: Vec<PhysicalGraphProjection>,
158 pub declared_analytics_jobs: Vec<PhysicalAnalyticsJob>,
159 pub operational_indexes: Vec<PhysicalIndexState>,
160 pub operational_graph_projections: Vec<PhysicalGraphProjection>,
161 pub operational_analytics_jobs: Vec<PhysicalAnalyticsJob>,
162}
163
164#[derive(Debug, Clone)]
165pub struct CatalogGraphProjectionStatus {
166 pub name: String,
167 pub source: Option<String>,
168 pub lifecycle_state: String,
169 pub declared: bool,
170 pub operational: bool,
171 pub in_sync: bool,
172 pub last_materialized_sequence: Option<u64>,
173 pub queryable: bool,
174 pub requires_rematerialization: bool,
175 pub dependent_job_count: usize,
176 pub active_dependent_job_count: usize,
177 pub stale_dependent_job_count: usize,
178 pub dependent_jobs_in_sync: bool,
179 pub rerun_required: bool,
180 pub attention_score: usize,
181 pub attention_reasons: Vec<String>,
182}
183
184#[derive(Debug, Clone)]
185pub struct CatalogIndexStatus {
186 pub name: String,
187 pub collection: Option<String>,
188 pub kind: String,
189 pub declared: bool,
190 pub operational: bool,
191 pub enabled: bool,
192 pub build_state: Option<String>,
193 pub artifact_state: crate::physical::ArtifactState,
194 pub lifecycle_state: String,
195 pub in_sync: bool,
196 pub queryable: bool,
197 pub requires_rebuild: bool,
198 pub attention_score: usize,
199 pub attention_reasons: Vec<String>,
200}
201
202#[derive(Debug, Clone)]
203pub struct CatalogAnalyticsJobStatus {
204 pub id: String,
205 pub kind: String,
206 pub projection: Option<String>,
207 pub state: String,
208 pub lifecycle_state: String,
209 pub declared: bool,
210 pub operational: bool,
211 pub in_sync: bool,
212 pub last_run_sequence: Option<u64>,
213 pub projection_declared: Option<bool>,
214 pub projection_operational: Option<bool>,
215 pub projection_lifecycle_state: Option<String>,
216 pub dependency_in_sync: Option<bool>,
217 pub executable: bool,
218 pub requires_rerun: bool,
219 pub attention_score: usize,
220 pub attention_reasons: Vec<String>,
221}
222
223#[derive(Debug, Clone, Default)]
224pub struct CatalogConsistencyReport {
225 pub declared_index_count: usize,
226 pub operational_index_count: usize,
227 pub declared_graph_projection_count: usize,
228 pub operational_graph_projection_count: usize,
229 pub declared_analytics_job_count: usize,
230 pub operational_analytics_job_count: usize,
231 pub missing_operational_indexes: Vec<String>,
232 pub undeclared_operational_indexes: Vec<String>,
233 pub missing_operational_graph_projections: Vec<String>,
234 pub undeclared_operational_graph_projections: Vec<String>,
235 pub missing_operational_analytics_jobs: Vec<String>,
236 pub undeclared_operational_analytics_jobs: Vec<String>,
237}
238
239pub fn snapshot_store(
240 name: &str,
241 store: &UnifiedStore,
242 index_catalog: Option<&IndexCatalog>,
243) -> CatalogModelSnapshot {
244 snapshot_store_with_declarations(name, store, index_catalog, None, None)
245}
246
247pub fn snapshot_store_with_declarations(
248 name: &str,
249 store: &UnifiedStore,
250 index_catalog: Option<&IndexCatalog>,
251 declarations: Option<&CatalogDeclarations>,
252 contracts: Option<&[CollectionContract]>,
253) -> CatalogModelSnapshot {
254 let index_statuses = index_statuses(declarations);
255 let graph_projection_statuses = graph_projection_statuses(declarations);
256 let analytics_job_statuses = analytics_job_statuses(declarations);
257
258 let mut grouped: HashMap<String, Vec<UnifiedEntity>> = HashMap::new();
259 for (collection, entity) in store.query_all(|_| true) {
260 grouped.entry(collection).or_default().push(entity);
261 }
262 let queue_modes = queue_modes_from_grouped(&grouped);
263
264 let mut stats_by_collection = BTreeMap::new();
265 let mut collections = Vec::new();
266
267 for collection_name in store.list_collections() {
268 let entities = grouped.remove(&collection_name).unwrap_or_default();
269 let inferred_model = infer_model(&entities);
270 let inferred_schema_mode = infer_schema_mode(inferred_model);
271 let contract = collection_contract(&collection_name, contracts);
272 let model = contract
273 .map(|contract| contract.declared_model)
274 .unwrap_or(inferred_model);
275 let cross_refs = entities
276 .iter()
277 .map(|entity| entity.cross_refs().len())
278 .sum();
279 let entity_count = entities.len();
280 let manager_stats = store
281 .get_collection(&collection_name)
282 .map(|manager| manager.stats());
283 let segments = manager_stats
284 .map(|stats| stats.growing_count + stats.sealed_count + stats.archived_count)
285 .unwrap_or(0);
286
287 stats_by_collection.insert(
288 collection_name.clone(),
289 CollectionStats {
290 entities: entity_count,
291 cross_refs,
292 segments,
293 },
294 );
295
296 let declared_indices = declared_indices(&collection_name, declarations);
297 let operational_indices = operational_indices(&collection_name, index_catalog);
298 let (indexes_in_sync, missing_operational_indices, undeclared_operational_indices) =
299 collection_index_consistency(&declared_indices, &operational_indices);
300 let (queryable_index_count, indexes_requiring_rebuild_count, indexes_locally_in_sync) =
301 collection_index_readiness(&collection_name, &index_statuses);
302 let (
303 queryable_graph_projection_count,
304 graph_projections_requiring_rematerialization_count,
305 graph_projections_locally_in_sync,
306 ) = collection_graph_projection_readiness(&collection_name, &graph_projection_statuses);
307 let (
308 executable_analytics_job_count,
309 analytics_jobs_requiring_rerun_count,
310 analytics_jobs_locally_in_sync,
311 ) = collection_analytics_job_readiness(
312 &collection_name,
313 &graph_projection_statuses,
314 &analytics_job_statuses,
315 );
316 let resources_in_sync = indexes_in_sync
317 && indexes_locally_in_sync
318 && graph_projections_locally_in_sync
319 && analytics_jobs_locally_in_sync;
320 let attention_required = !resources_in_sync
321 || indexes_requiring_rebuild_count > 0
322 || graph_projections_requiring_rematerialization_count > 0
323 || analytics_jobs_requiring_rerun_count > 0;
324 let mut attention_reasons = Vec::new();
325 if !indexes_in_sync || !indexes_locally_in_sync {
326 attention_reasons.push("index_drift".to_string());
327 }
328 if indexes_requiring_rebuild_count > 0 {
329 attention_reasons.push("indexes_require_rebuild".to_string());
330 }
331 if !graph_projections_locally_in_sync {
332 attention_reasons.push("graph_projection_drift".to_string());
333 }
334 if graph_projections_requiring_rematerialization_count > 0 {
335 attention_reasons.push("graph_projections_require_rematerialization".to_string());
336 }
337 if !analytics_jobs_locally_in_sync {
338 attention_reasons.push("analytics_job_drift".to_string());
339 }
340 if analytics_jobs_requiring_rerun_count > 0 {
341 attention_reasons.push("analytics_jobs_require_rerun".to_string());
342 }
343 let attention_score = indexes_requiring_rebuild_count.saturating_mul(3)
344 + graph_projections_requiring_rematerialization_count.saturating_mul(4)
345 + analytics_jobs_requiring_rerun_count.saturating_mul(2)
346 + usize::from(!resources_in_sync);
347
348 collections.push(CollectionDescriptor {
349 name: collection_name.clone(),
350 model,
351 schema_mode: contract
352 .map(|contract| contract.schema_mode)
353 .unwrap_or(inferred_schema_mode),
354 contract_present: contract.is_some(),
355 contract_origin: contract.map(|contract| contract.origin),
356 declared_model: contract.map(|contract| contract.declared_model),
357 observed_model: inferred_model,
358 queue_mode: if model == CollectionModel::Queue {
359 Some(
360 queue_modes
361 .get(&collection_name)
362 .copied()
363 .unwrap_or_default(),
364 )
365 } else {
366 None
367 },
368 vector_dimension: contract.and_then(|contract| contract.vector_dimension),
369 vector_metric: contract.and_then(|contract| contract.vector_metric),
370 declared_schema_mode: contract.map(|contract| contract.schema_mode),
371 observed_schema_mode: inferred_schema_mode,
372 entities: entity_count,
373 cross_refs,
374 segments,
375 indices: infer_indices(&collection_name, model, index_catalog, declarations),
376 declared_indices,
377 operational_indices,
378 indexes_in_sync,
379 missing_operational_indices,
380 undeclared_operational_indices,
381 queryable_index_count,
382 indexes_requiring_rebuild_count,
383 queryable_graph_projection_count,
384 graph_projections_requiring_rematerialization_count,
385 executable_analytics_job_count,
386 analytics_jobs_requiring_rerun_count,
387 subscriptions: contract
388 .map(|contract| contract.subscriptions.clone())
389 .unwrap_or_default(),
390 resources_in_sync,
391 attention_required,
392 attention_score,
393 attention_reasons,
394 });
395 }
396
397 collections.sort_by(|left, right| left.name.cmp(&right.name));
398
399 let summary = CatalogSnapshot {
400 name: name.to_string(),
401 total_entities: stats_by_collection
402 .values()
403 .map(|stats| stats.entities)
404 .sum(),
405 total_collections: stats_by_collection.len(),
406 stats_by_collection,
407 updated_at: SystemTime::now(),
408 };
409
410 CatalogModelSnapshot {
411 summary,
412 collections,
413 indices: index_catalog
414 .map(IndexCatalog::snapshot)
415 .unwrap_or_default(),
416 declared_indexes: declarations
417 .map(|declarations| declarations.declared_indexes.clone())
418 .unwrap_or_default(),
419 declared_graph_projections: declarations
420 .map(|declarations| declarations.declared_graph_projections.clone())
421 .unwrap_or_default(),
422 declared_analytics_jobs: declarations
423 .map(|declarations| declarations.declared_analytics_jobs.clone())
424 .unwrap_or_default(),
425 operational_indexes: declarations
426 .map(|declarations| declarations.operational_indexes.clone())
427 .unwrap_or_default(),
428 operational_graph_projections: declarations
429 .map(|declarations| declarations.operational_graph_projections.clone())
430 .unwrap_or_default(),
431 operational_analytics_jobs: declarations
432 .map(|declarations| declarations.operational_analytics_jobs.clone())
433 .unwrap_or_default(),
434 queryable_index_count: index_statuses
435 .iter()
436 .filter(|status| status.queryable)
437 .count(),
438 indexes_requiring_rebuild_count: index_statuses
439 .iter()
440 .filter(|status| status.requires_rebuild)
441 .count(),
442 queryable_graph_projection_count: graph_projection_statuses
443 .iter()
444 .filter(|status| status.queryable)
445 .count(),
446 graph_projections_requiring_rematerialization_count: graph_projection_statuses
447 .iter()
448 .filter(|status| status.requires_rematerialization)
449 .count(),
450 executable_analytics_job_count: analytics_job_statuses
451 .iter()
452 .filter(|status| status.executable)
453 .count(),
454 analytics_jobs_requiring_rerun_count: analytics_job_statuses
455 .iter()
456 .filter(|status| status.requires_rerun)
457 .count(),
458 index_statuses,
459 graph_projection_statuses,
460 analytics_job_statuses,
461 }
462}
463
464fn queue_modes_from_grouped(
465 grouped: &HashMap<String, Vec<UnifiedEntity>>,
466) -> HashMap<String, QueueMode> {
467 let Some(meta) = grouped.get("red_queue_meta") else {
468 return HashMap::new();
469 };
470
471 let mut modes = HashMap::new();
472 for entity in meta {
473 let Some(row) = entity.data.as_row() else {
474 continue;
475 };
476 if row_text(row, "kind").as_deref() != Some("queue_config") {
477 continue;
478 }
479 let Some(queue) = row_text(row, "queue") else {
480 continue;
481 };
482 let mode = row_text(row, "mode")
483 .as_deref()
484 .and_then(QueueMode::parse)
485 .unwrap_or_default();
486 modes.insert(queue, mode);
487 }
488 modes
489}
490
491fn row_text(row: &crate::storage::unified::entity::RowData, key: &str) -> Option<String> {
492 match row.get_field(key) {
493 Some(Value::Text(value)) => Some(value.to_string()),
494 _ => None,
495 }
496}
497
498fn collection_contract<'a>(
499 collection_name: &str,
500 contracts: Option<&'a [CollectionContract]>,
501) -> Option<&'a CollectionContract> {
502 contracts.and_then(|contracts| {
503 contracts
504 .iter()
505 .find(|contract| contract.name == collection_name)
506 })
507}
508
509fn infer_model(entities: &[UnifiedEntity]) -> CollectionModel {
510 let mut has_table = false;
511 let mut has_graph = false;
512 let mut has_vector = false;
513
514 for entity in entities {
515 match &entity.kind {
516 EntityKind::TableRow { .. } => has_table = true,
517 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_) => has_graph = true,
518 EntityKind::Vector { .. } => has_vector = true,
519 EntityKind::TimeSeriesPoint(_) | EntityKind::QueueMessage { .. } => {}
520 }
521 }
522
523 match (has_table, has_graph, has_vector) {
524 (true, false, false) => CollectionModel::Table,
525 (false, true, false) => CollectionModel::Graph,
526 (false, false, true) => CollectionModel::Vector,
527 _ => CollectionModel::Mixed,
528 }
529}
530
531fn infer_schema_mode(model: CollectionModel) -> SchemaMode {
532 match model {
533 CollectionModel::Table => SchemaMode::Strict,
534 CollectionModel::Graph | CollectionModel::Vector => SchemaMode::SemiStructured,
535 CollectionModel::Document
536 | CollectionModel::Hll
537 | CollectionModel::Sketch
538 | CollectionModel::Filter
539 | CollectionModel::Kv
540 | CollectionModel::Config
541 | CollectionModel::Vault
542 | CollectionModel::Mixed => SchemaMode::Dynamic,
543 CollectionModel::TimeSeries | CollectionModel::Metrics => SchemaMode::SemiStructured,
544 CollectionModel::Queue => SchemaMode::Dynamic,
545 }
546}
547
548fn infer_indices(
549 collection_name: &str,
550 model: CollectionModel,
551 index_catalog: Option<&IndexCatalog>,
552 declarations: Option<&CatalogDeclarations>,
553) -> Vec<String> {
554 let declared = declared_indices(collection_name, declarations);
555 if !declared.is_empty() {
556 return declared;
557 }
558
559 let available = index_catalog
560 .map(IndexCatalog::snapshot)
561 .unwrap_or_default();
562 let mut selected = Vec::new();
563
564 for metric in available {
565 let relevant = matches!(
566 (model, metric.kind),
567 (_, IndexKind::BTree)
568 | (CollectionModel::Graph, IndexKind::GraphAdjacency)
569 | (CollectionModel::Vector, IndexKind::VectorHnsw)
570 | (CollectionModel::Vector, IndexKind::VectorInverted)
571 | (CollectionModel::Document, IndexKind::FullText)
572 | (CollectionModel::Document, IndexKind::DocumentPathValue)
573 | (CollectionModel::Kv, IndexKind::Hash)
574 | (CollectionModel::Config, IndexKind::Hash)
575 | (CollectionModel::Vault, IndexKind::Hash)
576 | (CollectionModel::Mixed, _)
577 );
578
579 if relevant && metric.enabled {
580 selected.push(metric.name);
581 }
582 }
583
584 selected
585}
586
587fn declared_indices(
588 collection_name: &str,
589 declarations: Option<&CatalogDeclarations>,
590) -> Vec<String> {
591 let mut selected = declarations
592 .map(|declarations| {
593 declarations
594 .declared_indexes
595 .iter()
596 .filter(|index| index.collection.as_deref() == Some(collection_name))
597 .map(|index| index.name.clone())
598 .collect::<Vec<_>>()
599 })
600 .unwrap_or_default();
601 selected.sort();
602 selected.dedup();
603 selected
604}
605
606fn operational_indices(collection_name: &str, index_catalog: Option<&IndexCatalog>) -> Vec<String> {
607 let mut selected = index_catalog
608 .map(IndexCatalog::snapshot)
609 .unwrap_or_default()
610 .into_iter()
611 .filter(|metric| metric.enabled)
612 .filter_map(|metric| {
613 if metric.name.starts_with(collection_name)
614 || matches!(
615 metric.kind,
616 IndexKind::BTree
617 | IndexKind::GraphAdjacency
618 | IndexKind::VectorHnsw
619 | IndexKind::VectorInverted
620 | IndexKind::FullText
621 | IndexKind::DocumentPathValue
622 | IndexKind::HybridSearch
623 )
624 {
625 Some(metric.name)
626 } else {
627 None
628 }
629 })
630 .collect::<Vec<_>>();
631 selected.sort();
632 selected.dedup();
633 selected
634}
635
636fn collection_index_consistency(
637 declared_indices: &[String],
638 operational_indices: &[String],
639) -> (bool, Vec<String>, Vec<String>) {
640 let declared = declared_indices.iter().cloned().collect::<BTreeSet<_>>();
641 let operational = operational_indices.iter().cloned().collect::<BTreeSet<_>>();
642 let missing_operational_indices = declared
643 .difference(&operational)
644 .cloned()
645 .collect::<Vec<_>>();
646 let undeclared_operational_indices = operational
647 .difference(&declared)
648 .cloned()
649 .collect::<Vec<_>>();
650 (
651 missing_operational_indices.is_empty() && undeclared_operational_indices.is_empty(),
652 missing_operational_indices,
653 undeclared_operational_indices,
654 )
655}
656
657fn collection_index_readiness(
658 collection_name: &str,
659 statuses: &[CatalogIndexStatus],
660) -> (usize, usize, bool) {
661 let local = statuses
662 .iter()
663 .filter(|status| status.collection.as_deref() == Some(collection_name))
664 .collect::<Vec<_>>();
665 let queryable_count = local.iter().filter(|status| status.queryable).count();
666 let requires_rebuild_count = local
667 .iter()
668 .filter(|status| status.requires_rebuild)
669 .count();
670 let locally_in_sync = local.iter().all(|status| status.in_sync);
671 (queryable_count, requires_rebuild_count, locally_in_sync)
672}
673
674fn collection_graph_projection_readiness(
675 collection_name: &str,
676 statuses: &[CatalogGraphProjectionStatus],
677) -> (usize, usize, bool) {
678 let local = statuses
679 .iter()
680 .filter(|status| status.source.as_deref() == Some(collection_name))
681 .collect::<Vec<_>>();
682 let queryable_count = local.iter().filter(|status| status.queryable).count();
683 let requires_rematerialization_count = local
684 .iter()
685 .filter(|status| status.requires_rematerialization)
686 .count();
687 let locally_in_sync = local
688 .iter()
689 .all(|status| status.in_sync && status.dependent_jobs_in_sync);
690 (
691 queryable_count,
692 requires_rematerialization_count,
693 locally_in_sync,
694 )
695}
696
697fn collection_analytics_job_readiness(
698 collection_name: &str,
699 projection_statuses: &[CatalogGraphProjectionStatus],
700 job_statuses: &[CatalogAnalyticsJobStatus],
701) -> (usize, usize, bool) {
702 let local = job_statuses
703 .iter()
704 .filter(|status| {
705 let Some(projection_name) = status.projection.as_deref() else {
706 return false;
707 };
708 projection_statuses
709 .iter()
710 .find(|projection| projection.name == projection_name)
711 .and_then(|projection| projection.source.as_deref())
712 == Some(collection_name)
713 })
714 .collect::<Vec<_>>();
715 let executable_count = local.iter().filter(|status| status.executable).count();
716 let requires_rerun_count = local.iter().filter(|status| status.requires_rerun).count();
717 let locally_in_sync = local
718 .iter()
719 .all(|status| status.in_sync && status.dependency_in_sync.unwrap_or(true));
720 (executable_count, requires_rerun_count, locally_in_sync)
721}
722
723fn graph_projection_statuses(
724 declarations: Option<&CatalogDeclarations>,
725) -> Vec<CatalogGraphProjectionStatus> {
726 let declared = declarations
727 .map(|declarations| declarations.declared_graph_projections.as_slice())
728 .unwrap_or(&[]);
729 let operational = declarations
730 .map(|declarations| declarations.operational_graph_projections.as_slice())
731 .unwrap_or(&[]);
732 let declared_jobs = declarations
733 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
734 .unwrap_or(&[]);
735 let operational_jobs = declarations
736 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
737 .unwrap_or(&[]);
738
739 let mut names = declared
740 .iter()
741 .map(|projection| projection.name.clone())
742 .chain(operational.iter().map(|projection| projection.name.clone()))
743 .collect::<Vec<_>>();
744 names.sort();
745 names.dedup();
746
747 names
748 .into_iter()
749 .map(|name| {
750 let declared_projection = declared.iter().find(|projection| projection.name == name);
751 let operational_projection = operational
752 .iter()
753 .find(|projection| projection.name == name);
754 let declared_present = declared_projection.is_some();
755 let operational_present = operational_projection.is_some();
756 let mut dependent_job_ids = BTreeSet::new();
757 let mut dependent_job_count = 0usize;
758 let mut active_dependent_job_count = 0usize;
759 let mut stale_dependent_job_count = 0usize;
760 for job in declared_jobs
761 .iter()
762 .chain(operational_jobs.iter())
763 .filter(|job| job.projection.as_deref() == Some(name.as_str()))
764 {
765 if !dependent_job_ids.insert(job.id.clone()) {
766 continue;
767 }
768 dependent_job_count += 1;
769 if matches!(job.state.as_str(), "queued" | "running" | "completed") {
770 active_dependent_job_count += 1;
771 }
772 if job.state == "stale" {
773 stale_dependent_job_count += 1;
774 }
775 }
776 let lifecycle_state = match (
777 declared_present,
778 operational_present,
779 declared_projection
780 .map(|projection| projection.state.as_str())
781 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
782 .unwrap_or_default(),
783 ) {
784 (true, _, "failed") => "failed",
785 (true, true, "stale") => "stale",
786 (true, _, "materializing") => "materializing",
787 (true, true, "materialized") => "materialized",
788 (true, true, _) => "materialized",
789 (false, true, _) => "orphaned-operational",
790 (true, false, _) => "declared",
791 (false, false, _) => "unknown",
792 }
793 .to_string();
794 let queryable = declared_present
795 && operational_present
796 && matches!(
797 declared_projection
798 .map(|projection| projection.state.as_str())
799 .or_else(
800 || operational_projection.map(|projection| projection.state.as_str())
801 )
802 .unwrap_or_default(),
803 "materialized"
804 );
805 let requires_rematerialization = matches!(
806 declared_projection
807 .map(|projection| projection.state.as_str())
808 .or_else(|| operational_projection.map(|projection| projection.state.as_str()))
809 .unwrap_or_default(),
810 "declared" | "materializing" | "failed" | "stale"
811 );
812 let mut attention_reasons = Vec::new();
813 if !declared_present || !operational_present {
814 attention_reasons.push("declaration_drift".to_string());
815 }
816 if requires_rematerialization {
817 attention_reasons.push("requires_rematerialization".to_string());
818 }
819 if stale_dependent_job_count > 0 {
820 attention_reasons.push("dependent_jobs_stale".to_string());
821 }
822 let attention_score = stale_dependent_job_count.saturating_mul(2)
823 + usize::from(requires_rematerialization).saturating_mul(4)
824 + usize::from(!declared_present || !operational_present)
825 + usize::from(!queryable);
826 CatalogGraphProjectionStatus {
827 name,
828 source: declared_projection
829 .map(|projection| projection.source.clone())
830 .or_else(|| operational_projection.map(|projection| projection.source.clone())),
831 lifecycle_state,
832 declared: declared_present,
833 operational: operational_present,
834 in_sync: declared_present == operational_present,
835 last_materialized_sequence: declared_projection
836 .and_then(|projection| projection.last_materialized_sequence)
837 .or_else(|| {
838 operational_projection
839 .and_then(|projection| projection.last_materialized_sequence)
840 }),
841 queryable,
842 requires_rematerialization,
843 dependent_job_count,
844 active_dependent_job_count,
845 stale_dependent_job_count,
846 dependent_jobs_in_sync: stale_dependent_job_count == 0,
847 rerun_required: stale_dependent_job_count > 0,
848 attention_score,
849 attention_reasons,
850 }
851 })
852 .collect()
853}
854
855fn index_statuses(declarations: Option<&CatalogDeclarations>) -> Vec<CatalogIndexStatus> {
856 let declared = declarations
857 .map(|declarations| declarations.declared_indexes.as_slice())
858 .unwrap_or(&[]);
859 let operational = declarations
860 .map(|declarations| declarations.operational_indexes.as_slice())
861 .unwrap_or(&[]);
862
863 let mut names = declared
864 .iter()
865 .map(|index| index.name.clone())
866 .chain(operational.iter().map(|index| index.name.clone()))
867 .collect::<Vec<_>>();
868 names.sort();
869 names.dedup();
870
871 names
872 .into_iter()
873 .map(|name| {
874 let declared_index = declared.iter().find(|index| index.name == name);
875 let operational_index = operational.iter().find(|index| index.name == name);
876 let collection = declared_index
877 .and_then(|index| index.collection.clone())
878 .or_else(|| operational_index.and_then(|index| index.collection.clone()));
879 let kind = declared_index
880 .map(|index| index_kind_string(index.kind))
881 .or_else(|| operational_index.map(|index| index_kind_string(index.kind)))
882 .unwrap_or_default();
883 let enabled = declared_index
884 .map(|index| index.enabled)
885 .or_else(|| operational_index.map(|index| index.enabled))
886 .unwrap_or(false);
887 let build_state = operational_index
888 .map(|index| index.build_state.clone())
889 .or_else(|| declared_index.map(|index| index.build_state.clone()));
890 let declared_present = declared_index.is_some();
891 let operational_present = operational_index.is_some();
892 let lifecycle_state = index_lifecycle_state(
893 declared_present,
894 operational_present,
895 enabled,
896 build_state.as_deref(),
897 );
898
899 let mut attention_reasons = Vec::new();
900 if declared_present != operational_present {
901 attention_reasons.push("declaration_drift".to_string());
902 }
903 if !enabled && declared_present {
904 attention_reasons.push("disabled".to_string());
905 }
906 if matches!(build_state.as_deref().unwrap_or_default(), "failed") {
907 attention_reasons.push("failed".to_string());
908 }
909 if matches!(build_state.as_deref().unwrap_or_default(), "stale") {
910 attention_reasons.push("stale".to_string());
911 }
912 if matches!(
913 build_state.as_deref().unwrap_or_default(),
914 "building" | "declared-unbuilt"
915 ) {
916 attention_reasons.push("requires_rebuild".to_string());
917 }
918 let queryable = declared_present
919 && operational_present
920 && enabled
921 && matches!(build_state.as_deref().unwrap_or_default(), "ready");
922 let requires_rebuild = matches!(
923 build_state.as_deref().unwrap_or_default(),
924 "declared-unbuilt" | "building" | "stale" | "failed"
925 );
926 let attention_score = usize::from(requires_rebuild).saturating_mul(3)
927 + usize::from(!enabled && declared_present)
928 + usize::from(declared_present != operational_present)
929 + usize::from(!queryable);
930
931 let artifact_state = crate::physical::ArtifactState::from_build_state(
932 build_state.as_deref().unwrap_or("declared-unbuilt"),
933 enabled,
934 );
935 CatalogIndexStatus {
936 name,
937 collection,
938 kind,
939 declared: declared_present,
940 operational: operational_present,
941 enabled,
942 build_state,
943 artifact_state,
944 lifecycle_state,
945 in_sync: declared_present == operational_present,
946 queryable,
947 requires_rebuild,
948 attention_score,
949 attention_reasons,
950 }
951 })
952 .collect()
953}
954
955fn index_lifecycle_state(
956 declared: bool,
957 operational: bool,
958 enabled: bool,
959 build_state: Option<&str>,
960) -> String {
961 if !declared && operational {
962 return "orphaned-operational".to_string();
963 }
964 if declared && !enabled {
965 return "disabled".to_string();
966 }
967 if !declared {
968 return "unknown".to_string();
969 }
970 if !operational {
971 return "declared".to_string();
972 }
973
974 match build_state.unwrap_or_default() {
975 "ready" => "ready".to_string(),
976 "failed" => "failed".to_string(),
977 "stale" => "stale".to_string(),
978 "declared-unbuilt" => "declared".to_string(),
979 "catalog-derived" | "metadata-only" | "artifact-published" | "registry-loaded" => {
980 "building".to_string()
981 }
982 _ => "building".to_string(),
983 }
984}
985
986fn index_kind_string(kind: IndexKind) -> String {
987 match kind {
988 IndexKind::BTree => "btree",
989 IndexKind::Hash => "hash",
990 IndexKind::Bitmap => "bitmap",
991 IndexKind::Spatial => "spatial.rtree",
992 IndexKind::VectorHnsw => "vector.hnsw",
993 IndexKind::VectorInverted => "vector.inverted",
994 IndexKind::GraphAdjacency => "graph.adjacency",
995 IndexKind::FullText => "text.fulltext",
996 IndexKind::DocumentPathValue => "document.pathvalue",
997 IndexKind::HybridSearch => "search.hybrid",
998 }
999 .to_string()
1000}
1001
1002fn analytics_job_statuses(
1003 declarations: Option<&CatalogDeclarations>,
1004) -> Vec<CatalogAnalyticsJobStatus> {
1005 let declared = declarations
1006 .map(|declarations| declarations.declared_analytics_jobs.as_slice())
1007 .unwrap_or(&[]);
1008 let operational = declarations
1009 .map(|declarations| declarations.operational_analytics_jobs.as_slice())
1010 .unwrap_or(&[]);
1011 let declared_projections = declarations
1012 .map(|declarations| declarations.declared_graph_projections.as_slice())
1013 .unwrap_or(&[]);
1014 let operational_projections = declarations
1015 .map(|declarations| declarations.operational_graph_projections.as_slice())
1016 .unwrap_or(&[]);
1017
1018 let mut ids = declared
1019 .iter()
1020 .map(|job| job.id.clone())
1021 .chain(operational.iter().map(|job| job.id.clone()))
1022 .collect::<Vec<_>>();
1023 ids.sort();
1024 ids.dedup();
1025
1026 ids.into_iter()
1027 .map(|id| {
1028 let declared_job = declared.iter().find(|job| job.id == id);
1029 let operational_job = operational.iter().find(|job| job.id == id);
1030 let kind = declared_job
1031 .map(|job| job.kind.clone())
1032 .or_else(|| operational_job.map(|job| job.kind.clone()))
1033 .unwrap_or_default();
1034 let projection = declared_job
1035 .and_then(|job| job.projection.clone())
1036 .or_else(|| operational_job.and_then(|job| job.projection.clone()));
1037 let state = declared_job
1038 .map(|job| job.state.clone())
1039 .or_else(|| operational_job.map(|job| job.state.clone()))
1040 .unwrap_or_default();
1041 let declared_present = declared_job.is_some();
1042 let operational_present = operational_job.is_some();
1043 let last_run_sequence = declared_job
1044 .and_then(|job| job.last_run_sequence)
1045 .or_else(|| operational_job.and_then(|job| job.last_run_sequence));
1046 let projection_declared = projection.as_ref().map(|projection_name| {
1047 declared_projections
1048 .iter()
1049 .any(|projection| projection.name == *projection_name)
1050 });
1051 let projection_operational = projection.as_ref().map(|projection_name| {
1052 operational_projections
1053 .iter()
1054 .any(|projection| projection.name == *projection_name)
1055 });
1056 let projection_lifecycle = projection.as_ref().and_then(|projection_name| {
1057 declared_projections
1058 .iter()
1059 .find(|projection| projection.name == *projection_name)
1060 .map(|projection| projection.state.as_str())
1061 .or_else(|| {
1062 operational_projections
1063 .iter()
1064 .find(|projection| projection.name == *projection_name)
1065 .map(|projection| projection.state.as_str())
1066 })
1067 });
1068 let dependency_in_sync = projection.as_ref().map(|_| {
1069 matches!(projection_lifecycle, Some("materialized"))
1070 && projection_operational == Some(true)
1071 });
1072 let lifecycle_state = match (declared_present, operational_present, state.as_str()) {
1073 (true, false, _) => "declared",
1074 (true, true, _)
1075 if matches!(
1076 projection_lifecycle,
1077 Some("stale" | "failed" | "materializing" | "declared")
1078 ) =>
1079 {
1080 "stale"
1081 }
1082 (true, true, "completed") => "completed",
1083 (true, true, "running") => "running",
1084 (true, true, "failed") => "failed",
1085 (true, true, "queued") => "queued",
1086 (true, true, "stale") => "stale",
1087 (true, true, _) => "materialized",
1088 (false, true, _) => "orphaned-operational",
1089 (false, false, _) => "unknown",
1090 }
1091 .to_string();
1092 let executable = declared_present
1093 && operational_present
1094 && !matches!(state.as_str(), "failed" | "stale")
1095 && dependency_in_sync.unwrap_or(true);
1096 let requires_rerun =
1097 matches!(state.as_str(), "stale" | "failed") || dependency_in_sync == Some(false);
1098 let mut attention_reasons = Vec::new();
1099 if declared_present != operational_present {
1100 attention_reasons.push("declaration_drift".to_string());
1101 }
1102 if matches!(state.as_str(), "failed") {
1103 attention_reasons.push("failed".to_string());
1104 }
1105 if matches!(state.as_str(), "stale") {
1106 attention_reasons.push("stale".to_string());
1107 }
1108 if dependency_in_sync == Some(false) {
1109 attention_reasons.push("dependency_drift".to_string());
1110 }
1111 if requires_rerun {
1112 attention_reasons.push("requires_rerun".to_string());
1113 }
1114 let attention_score = usize::from(requires_rerun).saturating_mul(3)
1115 + usize::from(dependency_in_sync == Some(false)).saturating_mul(2)
1116 + usize::from(declared_present != operational_present)
1117 + usize::from(!executable);
1118 CatalogAnalyticsJobStatus {
1119 id,
1120 kind,
1121 projection,
1122 state: state.clone(),
1123 lifecycle_state,
1124 declared: declared_present,
1125 operational: operational_present,
1126 in_sync: declared_present == operational_present,
1127 last_run_sequence,
1128 projection_declared,
1129 projection_operational,
1130 projection_lifecycle_state: projection_lifecycle.map(str::to_string),
1131 dependency_in_sync,
1132 executable,
1133 requires_rerun,
1134 attention_score,
1135 attention_reasons,
1136 }
1137 })
1138 .collect()
1139}
1140
1141pub fn consistency_report(snapshot: &CatalogModelSnapshot) -> CatalogConsistencyReport {
1142 let declared_indexes = snapshot
1143 .declared_indexes
1144 .iter()
1145 .map(|index| index.name.clone())
1146 .collect::<BTreeSet<_>>();
1147 let operational_indexes = snapshot
1148 .operational_indexes
1149 .iter()
1150 .map(|index| index.name.clone())
1151 .collect::<BTreeSet<_>>();
1152 let declared_graph_projections = snapshot
1153 .declared_graph_projections
1154 .iter()
1155 .map(|projection| projection.name.clone())
1156 .collect::<BTreeSet<_>>();
1157 let operational_graph_projections = snapshot
1158 .operational_graph_projections
1159 .iter()
1160 .map(|projection| projection.name.clone())
1161 .collect::<BTreeSet<_>>();
1162 let declared_analytics_jobs = snapshot
1163 .declared_analytics_jobs
1164 .iter()
1165 .map(|job| job.id.clone())
1166 .collect::<BTreeSet<_>>();
1167 let operational_analytics_jobs = snapshot
1168 .operational_analytics_jobs
1169 .iter()
1170 .map(|job| job.id.clone())
1171 .collect::<BTreeSet<_>>();
1172
1173 CatalogConsistencyReport {
1174 declared_index_count: declared_indexes.len(),
1175 operational_index_count: operational_indexes.len(),
1176 declared_graph_projection_count: declared_graph_projections.len(),
1177 operational_graph_projection_count: operational_graph_projections.len(),
1178 declared_analytics_job_count: declared_analytics_jobs.len(),
1179 operational_analytics_job_count: operational_analytics_jobs.len(),
1180 missing_operational_indexes: declared_indexes
1181 .difference(&operational_indexes)
1182 .cloned()
1183 .collect(),
1184 undeclared_operational_indexes: operational_indexes
1185 .difference(&declared_indexes)
1186 .cloned()
1187 .collect(),
1188 missing_operational_graph_projections: declared_graph_projections
1189 .difference(&operational_graph_projections)
1190 .cloned()
1191 .collect(),
1192 undeclared_operational_graph_projections: operational_graph_projections
1193 .difference(&declared_graph_projections)
1194 .cloned()
1195 .collect(),
1196 missing_operational_analytics_jobs: declared_analytics_jobs
1197 .difference(&operational_analytics_jobs)
1198 .cloned()
1199 .collect(),
1200 undeclared_operational_analytics_jobs: operational_analytics_jobs
1201 .difference(&declared_analytics_jobs)
1202 .cloned()
1203 .collect(),
1204 }
1205}
1206
1207pub fn attention_summary(snapshot: &CatalogModelSnapshot) -> CatalogAttentionSummary {
1208 CatalogAttentionSummary {
1209 collections_requiring_attention: snapshot
1210 .collections
1211 .iter()
1212 .filter(|collection| collection.attention_required)
1213 .count(),
1214 indexes_requiring_attention: snapshot
1215 .index_statuses
1216 .iter()
1217 .filter(|status| status.attention_score > 0)
1218 .count(),
1219 graph_projections_requiring_attention: snapshot
1220 .graph_projection_statuses
1221 .iter()
1222 .filter(|status| status.attention_score > 0)
1223 .count(),
1224 analytics_jobs_requiring_attention: snapshot
1225 .analytics_job_statuses
1226 .iter()
1227 .filter(|status| status.attention_score > 0)
1228 .count(),
1229 top_collection: snapshot
1230 .collections
1231 .iter()
1232 .filter(|collection| collection.attention_score > 0)
1233 .max_by_key(|collection| collection.attention_score)
1234 .cloned(),
1235 top_index: snapshot
1236 .index_statuses
1237 .iter()
1238 .filter(|status| status.attention_score > 0)
1239 .max_by_key(|status| status.attention_score)
1240 .cloned(),
1241 top_graph_projection: snapshot
1242 .graph_projection_statuses
1243 .iter()
1244 .filter(|status| status.attention_score > 0)
1245 .max_by_key(|status| status.attention_score)
1246 .cloned(),
1247 top_analytics_job: snapshot
1248 .analytics_job_statuses
1249 .iter()
1250 .filter(|status| status.attention_score > 0)
1251 .max_by_key(|status| status.attention_score)
1252 .cloned(),
1253 }
1254}
1255
1256pub fn collection_attention(snapshot: &CatalogModelSnapshot) -> Vec<CollectionDescriptor> {
1257 let mut collections = snapshot
1258 .collections
1259 .iter()
1260 .filter(|collection| collection.attention_required)
1261 .cloned()
1262 .collect::<Vec<_>>();
1263 collections.sort_by(|left, right| {
1264 right
1265 .attention_score
1266 .cmp(&left.attention_score)
1267 .then_with(|| left.name.cmp(&right.name))
1268 });
1269 collections
1270}
1271
1272pub fn index_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogIndexStatus> {
1273 let mut statuses = snapshot
1274 .index_statuses
1275 .iter()
1276 .filter(|status| status.attention_score > 0)
1277 .cloned()
1278 .collect::<Vec<_>>();
1279 statuses.sort_by(|left, right| {
1280 right
1281 .attention_score
1282 .cmp(&left.attention_score)
1283 .then_with(|| left.name.cmp(&right.name))
1284 });
1285 statuses
1286}
1287
1288pub fn graph_projection_attention(
1289 snapshot: &CatalogModelSnapshot,
1290) -> Vec<CatalogGraphProjectionStatus> {
1291 let mut statuses = snapshot
1292 .graph_projection_statuses
1293 .iter()
1294 .filter(|status| status.attention_score > 0)
1295 .cloned()
1296 .collect::<Vec<_>>();
1297 statuses.sort_by(|left, right| {
1298 right
1299 .attention_score
1300 .cmp(&left.attention_score)
1301 .then_with(|| left.name.cmp(&right.name))
1302 });
1303 statuses
1304}
1305
1306pub fn analytics_job_attention(snapshot: &CatalogModelSnapshot) -> Vec<CatalogAnalyticsJobStatus> {
1307 let mut statuses = snapshot
1308 .analytics_job_statuses
1309 .iter()
1310 .filter(|status| status.attention_score > 0)
1311 .cloned()
1312 .collect::<Vec<_>>();
1313 statuses.sort_by(|left, right| {
1314 right
1315 .attention_score
1316 .cmp(&left.attention_score)
1317 .then_with(|| left.id.cmp(&right.id))
1318 });
1319 statuses
1320}