1use crate::config::DetectConfig;
14use crate::{calibrate, Detector, Report, ScanContext};
15use ax_core::finding::Handle;
16use ax_core::{AnomalyClass, ColType, Finding, RecordSet};
17
18#[derive(Debug, Default, Clone)]
19pub struct SchemaDetector;
20
21impl SchemaDetector {
22 fn col_handle(name: &str) -> Handle {
23 Handle::Column {
24 name: name.to_string(),
25 }
26 }
27
28 fn check_shape(&self, current: &RecordSet, cfg: &DetectConfig, out: &mut Report) {
30 for col in ¤t.columns {
31 if col.ty == ColType::Mixed {
32 out.push(
33 Finding::new(
34 self.id(),
35 AnomalyClass::Structural,
36 Self::col_handle(&col.name),
37 0.7,
38 0.0,
39 format!("column '{}' mixes conflicting cell types", col.name),
40 )
41 .with_col_type(ColType::Mixed),
42 );
43 }
44 if col.is_empty() {
45 continue;
46 }
47 let frac = col.null_count() as f64 / col.len() as f64;
48 if frac > cfg.struct_null_rate {
49 out.push(
50 Finding::new(
51 self.id(),
52 AnomalyClass::Structural,
53 Self::col_handle(&col.name),
54 calibrate::from_exceedance(frac, cfg.struct_null_rate),
55 frac,
56 format!("column '{}' is {:.0}% null", col.name, frac * 100.0),
57 )
58 .with_col_type(col.ty),
59 );
60 }
61 }
62 }
63
64 fn check_schema_diff(&self, current: &RecordSet, baseline: &RecordSet, out: &mut Report) {
66 for col in ¤t.columns {
67 match baseline.column(&col.name) {
68 None => out.push(
69 Finding::new(
70 self.id(),
71 AnomalyClass::Structural,
72 Self::col_handle(&col.name),
73 0.9,
74 1.0,
75 format!("column '{}' is new (absent in baseline)", col.name),
76 )
77 .with_col_type(col.ty),
78 ),
79 Some(bcol) if bcol.ty != col.ty => out.push(
80 Finding::new(
81 self.id(),
82 AnomalyClass::Structural,
83 Self::col_handle(&col.name),
84 0.85,
85 1.0,
86 format!(
87 "column '{}' type changed {:?} → {:?}",
88 col.name, bcol.ty, col.ty
89 ),
90 )
91 .with_col_type(col.ty),
92 ),
93 Some(_) => {}
94 }
95 }
96 for bcol in &baseline.columns {
97 if current.column(&bcol.name).is_none() {
98 out.push(
99 Finding::new(
100 self.id(),
101 AnomalyClass::Structural,
102 Self::col_handle(&bcol.name),
103 0.9,
104 1.0,
105 format!("column '{}' was dropped (present in baseline)", bcol.name),
106 )
107 .with_col_type(bcol.ty),
108 );
109 }
110 }
111 }
112}
113
114impl Detector for SchemaDetector {
115 fn id(&self) -> &'static str {
116 "struct.schema"
117 }
118 fn class(&self) -> AnomalyClass {
119 AnomalyClass::Structural
120 }
121 fn detect(&self, ctx: &ScanContext, cfg: &DetectConfig, out: &mut Report) {
122 self.check_shape(ctx.current, cfg, out);
123 if let Some(baseline) = ctx.baseline {
124 self.check_schema_diff(ctx.current, baseline, out);
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use ax_core::{Column, Value};
133
134 fn rs(cols: Vec<Column>) -> RecordSet {
135 RecordSet::new("-", "t", cols)
136 }
137
138 fn run(ctx: &ScanContext) -> Report {
139 let mut out = Report::new();
140 SchemaDetector.detect(ctx, &DetectConfig::default(), &mut out);
141 out
142 }
143
144 #[test]
145 fn clean_single_corpus_has_no_findings_and_no_absence() {
146 let r = rs(vec![Column::new("x", vec![Value::Int(1), Value::Int(2)])]);
147 let report = run(&ScanContext::single(&r));
148 assert!(report.findings.is_empty());
149 assert!(report.absent.is_empty(), "structural detector always runs");
150 }
151
152 #[test]
153 fn mixed_type_column_is_flagged() {
154 let r = rs(vec![Column::new(
155 "x",
156 vec![Value::Int(1), Value::Str("oops".into()), Value::Bool(true)],
157 )]);
158 let report = run(&ScanContext::single(&r));
159 assert_eq!(report.findings.len(), 1);
160 assert!(report.findings[0].reason.contains("conflicting"));
161 }
162
163 #[test]
164 fn high_null_rate_is_flagged_low_is_not() {
165 let r = rs(vec![Column::new(
167 "x",
168 vec![Value::Int(1), Value::Null, Value::Null, Value::Null],
169 )]);
170 let report = run(&ScanContext::single(&r));
171 assert_eq!(report.findings.len(), 1);
172 assert!(report.findings[0].score > 0.5);
173
174 let r2 = rs(vec![Column::new(
176 "x",
177 vec![Value::Int(1), Value::Int(2), Value::Int(3), Value::Null],
178 )]);
179 assert!(run(&ScanContext::single(&r2)).findings.is_empty());
180
181 let r3 = rs(vec![Column::new(
183 "x",
184 vec![Value::Int(1), Value::Int(2), Value::Null, Value::Null],
185 )]);
186 assert!(run(&ScanContext::single(&r3)).findings.is_empty());
187 }
188
189 #[test]
190 fn added_and_dropped_columns_detected() {
191 let base = rs(vec![
192 Column::new("a", vec![Value::Int(1)]),
193 Column::new("gone", vec![Value::Int(2)]),
194 ]);
195 let cur = rs(vec![
196 Column::new("a", vec![Value::Int(1)]),
197 Column::new("fresh", vec![Value::Int(9)]),
198 ]);
199 let report = run(&ScanContext::compared(&base, &cur));
200 let reasons: Vec<&str> = report.findings.iter().map(|f| f.reason.as_str()).collect();
201 assert!(reasons.iter().any(|r| r.contains("'fresh' is new")));
202 assert!(reasons.iter().any(|r| r.contains("'gone' was dropped")));
203 }
204
205 #[test]
206 fn type_change_detected() {
207 let base = rs(vec![Column::new("v", vec![Value::Int(1), Value::Int(2)])]);
208 let cur = rs(vec![Column::new(
209 "v",
210 vec![Value::Str("a".into()), Value::Str("b".into())],
211 )]);
212 let report = run(&ScanContext::compared(&base, &cur));
213 assert_eq!(report.findings.len(), 1);
214 assert!(report.findings[0].reason.contains("type changed"));
215 }
216
217 #[test]
218 fn matching_schema_yields_no_schema_findings() {
219 let base = rs(vec![Column::new("v", vec![Value::Int(1), Value::Int(2)])]);
220 let cur = rs(vec![Column::new("v", vec![Value::Int(5), Value::Int(6)])]);
221 assert!(run(&ScanContext::compared(&base, &cur)).findings.is_empty());
222 }
223}