use async_trait::async_trait;
use datafusion::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
use crate::core::current_validation_context;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinMaxState {
pub min: Option<f64>,
pub max: Option<f64>,
}
impl AnalyzerState for MinMaxState {
fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
let min = states
.iter()
.filter_map(|s| s.min)
.min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let max = states
.iter()
.filter_map(|s| s.max)
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
Ok(MinMaxState { min, max })
}
fn is_empty(&self) -> bool {
self.min.is_none() && self.max.is_none()
}
}
#[derive(Debug, Clone)]
pub struct MinAnalyzer {
column: String,
}
impl MinAnalyzer {
pub fn new(column: impl Into<String>) -> Self {
Self {
column: column.into(),
}
}
pub fn column(&self) -> &str {
&self.column
}
}
#[async_trait]
impl Analyzer for MinAnalyzer {
type State = MinMaxState;
type Metric = MetricValue;
#[instrument(skip(ctx), fields(analyzer = "min", column = %self.column))]
async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
let validation_ctx = current_validation_context();
let table_name = validation_ctx.table_name();
let sql = format!(
"SELECT MIN({0}) as min, MAX({0}) as max FROM {table_name}",
self.column
);
let df = ctx.sql(&sql).await?;
let batches = df.collect().await?;
let (min, max) = if let Some(batch) = batches.first() {
if batch.num_rows() > 0 {
let min = if batch.column(0).is_null(0) {
None
} else {
if let Some(arr) = batch
.column(0)
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
{
Some(arr.value(0))
} else if let Some(arr) = batch
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
{
Some(arr.value(0) as f64)
} else {
return Err(AnalyzerError::invalid_data(format!(
"Expected numeric array for min, got {:?}",
batch.column(0).data_type()
)));
}
};
let max = if batch.column(1).is_null(0) {
None
} else {
if let Some(arr) = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
{
Some(arr.value(0))
} else if let Some(arr) = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
{
Some(arr.value(0) as f64)
} else {
return Err(AnalyzerError::invalid_data(format!(
"Expected numeric array for max, got {:?}",
batch.column(1).data_type()
)));
}
};
(min, max)
} else {
(None, None)
}
} else {
(None, None)
};
Ok(MinMaxState { min, max })
}
fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
match state.min {
Some(min) => Ok(MetricValue::Double(min)),
None => Err(AnalyzerError::NoData),
}
}
fn name(&self) -> &str {
"min"
}
fn description(&self) -> &str {
"Computes the minimum value of a numeric column"
}
fn metric_key(&self) -> String {
format!("{}.{}", self.name(), self.column)
}
fn columns(&self) -> Vec<&str> {
vec![&self.column]
}
}
#[derive(Debug, Clone)]
pub struct MaxAnalyzer {
column: String,
}
impl MaxAnalyzer {
pub fn new(column: impl Into<String>) -> Self {
Self {
column: column.into(),
}
}
pub fn column(&self) -> &str {
&self.column
}
}
#[async_trait]
impl Analyzer for MaxAnalyzer {
type State = MinMaxState;
type Metric = MetricValue;
#[instrument(skip(ctx), fields(analyzer = "max", column = %self.column))]
async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
let validation_ctx = current_validation_context();
let table_name = validation_ctx.table_name();
let sql = format!(
"SELECT MIN({0}) as min, MAX({0}) as max FROM {table_name}",
self.column
);
let df = ctx.sql(&sql).await?;
let batches = df.collect().await?;
let (min, max) = if let Some(batch) = batches.first() {
if batch.num_rows() > 0 {
let min = if batch.column(0).is_null(0) {
None
} else {
if let Some(arr) = batch
.column(0)
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
{
Some(arr.value(0))
} else if let Some(arr) = batch
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
{
Some(arr.value(0) as f64)
} else {
return Err(AnalyzerError::invalid_data(format!(
"Expected numeric array for min, got {:?}",
batch.column(0).data_type()
)));
}
};
let max = if batch.column(1).is_null(0) {
None
} else {
if let Some(arr) = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
{
Some(arr.value(0))
} else if let Some(arr) = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
{
Some(arr.value(0) as f64)
} else {
return Err(AnalyzerError::invalid_data(format!(
"Expected numeric array for max, got {:?}",
batch.column(1).data_type()
)));
}
};
(min, max)
} else {
(None, None)
}
} else {
(None, None)
};
Ok(MinMaxState { min, max })
}
fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
match state.max {
Some(max) => Ok(MetricValue::Double(max)),
None => Err(AnalyzerError::NoData),
}
}
fn name(&self) -> &str {
"max"
}
fn description(&self) -> &str {
"Computes the maximum value of a numeric column"
}
fn metric_key(&self) -> String {
format!("{}.{}", self.name(), self.column)
}
fn columns(&self) -> Vec<&str> {
vec![&self.column]
}
}