term_guard/analyzers/basic/
min_max.rs

1//! Min and Max analyzers for finding extreme values.
2
3use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11/// Shared state for min/max analyzers.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MinMaxState {
14    /// Minimum value found.
15    pub min: Option<f64>,
16    /// Maximum value found.
17    pub max: Option<f64>,
18}
19
20impl AnalyzerState for MinMaxState {
21    fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
22        let min = states
23            .iter()
24            .filter_map(|s| s.min)
25            .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
26
27        let max = states
28            .iter()
29            .filter_map(|s| s.max)
30            .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
31
32        Ok(MinMaxState { min, max })
33    }
34
35    fn is_empty(&self) -> bool {
36        self.min.is_none() && self.max.is_none()
37    }
38}
39
40/// Analyzer that computes the minimum value of a numeric column.
41///
42/// # Example
43///
44/// ```rust,ignore
45/// use term_guard::analyzers::basic::MinAnalyzer;
46/// use datafusion::prelude::*;
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// let ctx = SessionContext::new();
50/// // Register your data table
51///
52/// let analyzer = MinAnalyzer::new("price");
53/// let state = analyzer.compute_state_from_data(&ctx).await?;
54/// let metric = analyzer.compute_metric_from_state(&state)?;
55///
56/// if let MetricValue::Double(min) = metric {
57///     println!("Minimum price: ${:.2}", min);
58/// }
59/// # Ok(())
60/// # }
61/// ```
62#[derive(Debug, Clone)]
63pub struct MinAnalyzer {
64    /// The column to analyze.
65    column: String,
66}
67
68impl MinAnalyzer {
69    /// Creates a new min analyzer for the specified column.
70    pub fn new(column: impl Into<String>) -> Self {
71        Self {
72            column: column.into(),
73        }
74    }
75
76    /// Returns the column being analyzed.
77    pub fn column(&self) -> &str {
78        &self.column
79    }
80}
81
82#[async_trait]
83impl Analyzer for MinAnalyzer {
84    type State = MinMaxState;
85    type Metric = MetricValue;
86
87    #[instrument(skip(ctx), fields(analyzer = "min", column = %self.column))]
88    async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
89        // Build SQL query to compute min
90        // Get the table name from the validation context
91
92        let validation_ctx = current_validation_context();
93
94        let table_name = validation_ctx.table_name();
95
96        let sql = format!(
97            "SELECT MIN({0}) as min, MAX({0}) as max FROM {table_name}",
98            self.column
99        );
100
101        // Execute query
102        let df = ctx.sql(&sql).await?;
103        let batches = df.collect().await?;
104
105        // Extract min/max from result
106        let (min, max) = if let Some(batch) = batches.first() {
107            if batch.num_rows() > 0 {
108                let min = if batch.column(0).is_null(0) {
109                    None
110                } else {
111                    // Try Float64 first, then Int64
112                    if let Some(arr) = batch
113                        .column(0)
114                        .as_any()
115                        .downcast_ref::<arrow::array::Float64Array>()
116                    {
117                        Some(arr.value(0))
118                    } else if let Some(arr) = batch
119                        .column(0)
120                        .as_any()
121                        .downcast_ref::<arrow::array::Int64Array>()
122                    {
123                        Some(arr.value(0) as f64)
124                    } else {
125                        return Err(AnalyzerError::invalid_data(format!(
126                            "Expected numeric array for min, got {:?}",
127                            batch.column(0).data_type()
128                        )));
129                    }
130                };
131
132                let max = if batch.column(1).is_null(0) {
133                    None
134                } else {
135                    // Try Float64 first, then Int64
136                    if let Some(arr) = batch
137                        .column(1)
138                        .as_any()
139                        .downcast_ref::<arrow::array::Float64Array>()
140                    {
141                        Some(arr.value(0))
142                    } else if let Some(arr) = batch
143                        .column(1)
144                        .as_any()
145                        .downcast_ref::<arrow::array::Int64Array>()
146                    {
147                        Some(arr.value(0) as f64)
148                    } else {
149                        return Err(AnalyzerError::invalid_data(format!(
150                            "Expected numeric array for max, got {:?}",
151                            batch.column(1).data_type()
152                        )));
153                    }
154                };
155
156                (min, max)
157            } else {
158                (None, None)
159            }
160        } else {
161            (None, None)
162        };
163
164        Ok(MinMaxState { min, max })
165    }
166
167    fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
168        match state.min {
169            Some(min) => Ok(MetricValue::Double(min)),
170            None => Err(AnalyzerError::NoData),
171        }
172    }
173
174    fn name(&self) -> &str {
175        "min"
176    }
177
178    fn description(&self) -> &str {
179        "Computes the minimum value of a numeric column"
180    }
181
182    fn metric_key(&self) -> String {
183        format!("{}.{}", self.name(), self.column)
184    }
185
186    fn columns(&self) -> Vec<&str> {
187        vec![&self.column]
188    }
189}
190
191/// Analyzer that computes the maximum value of a numeric column.
192///
193/// # Example
194///
195/// ```rust,ignore
196/// use term_guard::analyzers::basic::MaxAnalyzer;
197/// use datafusion::prelude::*;
198///
199/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
200/// let ctx = SessionContext::new();
201/// // Register your data table
202///
203/// let analyzer = MaxAnalyzer::new("price");
204/// let state = analyzer.compute_state_from_data(&ctx).await?;
205/// let metric = analyzer.compute_metric_from_state(&state)?;
206///
207/// if let MetricValue::Double(max) = metric {
208///     println!("Maximum price: ${:.2}", max);
209/// }
210/// # Ok(())
211/// # }
212/// ```
213#[derive(Debug, Clone)]
214pub struct MaxAnalyzer {
215    /// The column to analyze.
216    column: String,
217}
218
219impl MaxAnalyzer {
220    /// Creates a new max analyzer for the specified column.
221    pub fn new(column: impl Into<String>) -> Self {
222        Self {
223            column: column.into(),
224        }
225    }
226
227    /// Returns the column being analyzed.
228    pub fn column(&self) -> &str {
229        &self.column
230    }
231}
232
233#[async_trait]
234impl Analyzer for MaxAnalyzer {
235    type State = MinMaxState;
236    type Metric = MetricValue;
237
238    #[instrument(skip(ctx), fields(analyzer = "max", column = %self.column))]
239    async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
240        // Get the table name from the validation context
241        let validation_ctx = current_validation_context();
242        let table_name = validation_ctx.table_name();
243
244        // Build SQL query to compute max (we compute both for efficiency)
245        let sql = format!(
246            "SELECT MIN({0}) as min, MAX({0}) as max FROM {table_name}",
247            self.column
248        );
249
250        // Execute query
251        let df = ctx.sql(&sql).await?;
252        let batches = df.collect().await?;
253
254        // Extract min/max from result
255        let (min, max) = if let Some(batch) = batches.first() {
256            if batch.num_rows() > 0 {
257                let min = if batch.column(0).is_null(0) {
258                    None
259                } else {
260                    // Try Float64 first, then Int64
261                    if let Some(arr) = batch
262                        .column(0)
263                        .as_any()
264                        .downcast_ref::<arrow::array::Float64Array>()
265                    {
266                        Some(arr.value(0))
267                    } else if let Some(arr) = batch
268                        .column(0)
269                        .as_any()
270                        .downcast_ref::<arrow::array::Int64Array>()
271                    {
272                        Some(arr.value(0) as f64)
273                    } else {
274                        return Err(AnalyzerError::invalid_data(format!(
275                            "Expected numeric array for min, got {:?}",
276                            batch.column(0).data_type()
277                        )));
278                    }
279                };
280
281                let max = if batch.column(1).is_null(0) {
282                    None
283                } else {
284                    // Try Float64 first, then Int64
285                    if let Some(arr) = batch
286                        .column(1)
287                        .as_any()
288                        .downcast_ref::<arrow::array::Float64Array>()
289                    {
290                        Some(arr.value(0))
291                    } else if let Some(arr) = batch
292                        .column(1)
293                        .as_any()
294                        .downcast_ref::<arrow::array::Int64Array>()
295                    {
296                        Some(arr.value(0) as f64)
297                    } else {
298                        return Err(AnalyzerError::invalid_data(format!(
299                            "Expected numeric array for max, got {:?}",
300                            batch.column(1).data_type()
301                        )));
302                    }
303                };
304
305                (min, max)
306            } else {
307                (None, None)
308            }
309        } else {
310            (None, None)
311        };
312
313        Ok(MinMaxState { min, max })
314    }
315
316    fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
317        match state.max {
318            Some(max) => Ok(MetricValue::Double(max)),
319            None => Err(AnalyzerError::NoData),
320        }
321    }
322
323    fn name(&self) -> &str {
324        "max"
325    }
326
327    fn description(&self) -> &str {
328        "Computes the maximum value of a numeric column"
329    }
330
331    fn metric_key(&self) -> String {
332        format!("{}.{}", self.name(), self.column)
333    }
334
335    fn columns(&self) -> Vec<&str> {
336        vec![&self.column]
337    }
338}