term_guard/analyzers/basic/
min_max.rs1use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MinMaxState {
14 pub min: Option<f64>,
16 pub max: Option<f64>,
18}
19
20impl AnalyzerState for MinMaxState {
21 fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
22 let min = states
23 .iter()
24 .filter_map(|s| s.min)
25 .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
26
27 let max = states
28 .iter()
29 .filter_map(|s| s.max)
30 .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
31
32 Ok(MinMaxState { min, max })
33 }
34
35 fn is_empty(&self) -> bool {
36 self.min.is_none() && self.max.is_none()
37 }
38}
39
40#[derive(Debug, Clone)]
63pub struct MinAnalyzer {
64 column: String,
66}
67
68impl MinAnalyzer {
69 pub fn new(column: impl Into<String>) -> Self {
71 Self {
72 column: column.into(),
73 }
74 }
75
76 pub fn column(&self) -> &str {
78 &self.column
79 }
80}
81
82#[async_trait]
83impl Analyzer for MinAnalyzer {
84 type State = MinMaxState;
85 type Metric = MetricValue;
86
87 #[instrument(skip(ctx), fields(analyzer = "min", column = %self.column))]
88 async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
89 let validation_ctx = current_validation_context();
93
94 let table_name = validation_ctx.table_name();
95
96 let sql = format!(
97 "SELECT MIN({0}) as min, MAX({0}) as max FROM {table_name}",
98 self.column
99 );
100
101 let df = ctx.sql(&sql).await?;
103 let batches = df.collect().await?;
104
105 let (min, max) = if let Some(batch) = batches.first() {
107 if batch.num_rows() > 0 {
108 let min = if batch.column(0).is_null(0) {
109 None
110 } else {
111 if let Some(arr) = batch
113 .column(0)
114 .as_any()
115 .downcast_ref::<arrow::array::Float64Array>()
116 {
117 Some(arr.value(0))
118 } else if let Some(arr) = batch
119 .column(0)
120 .as_any()
121 .downcast_ref::<arrow::array::Int64Array>()
122 {
123 Some(arr.value(0) as f64)
124 } else {
125 return Err(AnalyzerError::invalid_data(format!(
126 "Expected numeric array for min, got {:?}",
127 batch.column(0).data_type()
128 )));
129 }
130 };
131
132 let max = if batch.column(1).is_null(0) {
133 None
134 } else {
135 if let Some(arr) = batch
137 .column(1)
138 .as_any()
139 .downcast_ref::<arrow::array::Float64Array>()
140 {
141 Some(arr.value(0))
142 } else if let Some(arr) = batch
143 .column(1)
144 .as_any()
145 .downcast_ref::<arrow::array::Int64Array>()
146 {
147 Some(arr.value(0) as f64)
148 } else {
149 return Err(AnalyzerError::invalid_data(format!(
150 "Expected numeric array for max, got {:?}",
151 batch.column(1).data_type()
152 )));
153 }
154 };
155
156 (min, max)
157 } else {
158 (None, None)
159 }
160 } else {
161 (None, None)
162 };
163
164 Ok(MinMaxState { min, max })
165 }
166
167 fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
168 match state.min {
169 Some(min) => Ok(MetricValue::Double(min)),
170 None => Err(AnalyzerError::NoData),
171 }
172 }
173
174 fn name(&self) -> &str {
175 "min"
176 }
177
178 fn description(&self) -> &str {
179 "Computes the minimum value of a numeric column"
180 }
181
182 fn metric_key(&self) -> String {
183 format!("{}.{}", self.name(), self.column)
184 }
185
186 fn columns(&self) -> Vec<&str> {
187 vec![&self.column]
188 }
189}
190
191#[derive(Debug, Clone)]
214pub struct MaxAnalyzer {
215 column: String,
217}
218
219impl MaxAnalyzer {
220 pub fn new(column: impl Into<String>) -> Self {
222 Self {
223 column: column.into(),
224 }
225 }
226
227 pub fn column(&self) -> &str {
229 &self.column
230 }
231}
232
233#[async_trait]
234impl Analyzer for MaxAnalyzer {
235 type State = MinMaxState;
236 type Metric = MetricValue;
237
238 #[instrument(skip(ctx), fields(analyzer = "max", column = %self.column))]
239 async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
240 let validation_ctx = current_validation_context();
242 let table_name = validation_ctx.table_name();
243
244 let sql = format!(
246 "SELECT MIN({0}) as min, MAX({0}) as max FROM {table_name}",
247 self.column
248 );
249
250 let df = ctx.sql(&sql).await?;
252 let batches = df.collect().await?;
253
254 let (min, max) = if let Some(batch) = batches.first() {
256 if batch.num_rows() > 0 {
257 let min = if batch.column(0).is_null(0) {
258 None
259 } else {
260 if let Some(arr) = batch
262 .column(0)
263 .as_any()
264 .downcast_ref::<arrow::array::Float64Array>()
265 {
266 Some(arr.value(0))
267 } else if let Some(arr) = batch
268 .column(0)
269 .as_any()
270 .downcast_ref::<arrow::array::Int64Array>()
271 {
272 Some(arr.value(0) as f64)
273 } else {
274 return Err(AnalyzerError::invalid_data(format!(
275 "Expected numeric array for min, got {:?}",
276 batch.column(0).data_type()
277 )));
278 }
279 };
280
281 let max = if batch.column(1).is_null(0) {
282 None
283 } else {
284 if let Some(arr) = batch
286 .column(1)
287 .as_any()
288 .downcast_ref::<arrow::array::Float64Array>()
289 {
290 Some(arr.value(0))
291 } else if let Some(arr) = batch
292 .column(1)
293 .as_any()
294 .downcast_ref::<arrow::array::Int64Array>()
295 {
296 Some(arr.value(0) as f64)
297 } else {
298 return Err(AnalyzerError::invalid_data(format!(
299 "Expected numeric array for max, got {:?}",
300 batch.column(1).data_type()
301 )));
302 }
303 };
304
305 (min, max)
306 } else {
307 (None, None)
308 }
309 } else {
310 (None, None)
311 };
312
313 Ok(MinMaxState { min, max })
314 }
315
316 fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
317 match state.max {
318 Some(max) => Ok(MetricValue::Double(max)),
319 None => Err(AnalyzerError::NoData),
320 }
321 }
322
323 fn name(&self) -> &str {
324 "max"
325 }
326
327 fn description(&self) -> &str {
328 "Computes the maximum value of a numeric column"
329 }
330
331 fn metric_key(&self) -> String {
332 format!("{}.{}", self.name(), self.column)
333 }
334
335 fn columns(&self) -> Vec<&str> {
336 vec![&self.column]
337 }
338}