term_guard/analyzers/basic/
distinctness.rs1use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11#[derive(Debug, Clone)]
37pub struct DistinctnessAnalyzer {
38 column: String,
40}
41
42impl DistinctnessAnalyzer {
43 pub fn new(column: impl Into<String>) -> Self {
45 Self {
46 column: column.into(),
47 }
48 }
49
50 pub fn column(&self) -> &str {
52 &self.column
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct DistinctnessState {
59 pub total_count: u64,
61 pub distinct_count: u64,
63}
64
65impl DistinctnessState {
66 pub fn distinctness(&self) -> f64 {
68 if self.total_count == 0 {
69 1.0 } else {
71 self.distinct_count as f64 / self.total_count as f64
72 }
73 }
74}
75
76impl AnalyzerState for DistinctnessState {
77 fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
78 let total_count = states.iter().map(|s| s.total_count).sum();
82 let distinct_count = states
83 .iter()
84 .map(|s| s.distinct_count)
85 .sum::<u64>()
86 .min(total_count);
87
88 Ok(DistinctnessState {
89 total_count,
90 distinct_count,
91 })
92 }
93
94 fn is_empty(&self) -> bool {
95 self.total_count == 0
96 }
97}
98
99#[async_trait]
100impl Analyzer for DistinctnessAnalyzer {
101 type State = DistinctnessState;
102 type Metric = MetricValue;
103
104 #[instrument(skip(ctx), fields(analyzer = "distinctness", column = %self.column))]
105 async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
106 let validation_ctx = current_validation_context();
110
111 let table_name = validation_ctx.table_name();
112
113 let sql = format!(
114 "SELECT COUNT({0}) as total_count, COUNT(DISTINCT {0}) as distinct_count FROM {table_name}",
115 self.column
116 );
117
118 let df = ctx.sql(&sql).await?;
120 let batches = df.collect().await?;
121
122 let (total_count, distinct_count) = if let Some(batch) = batches.first() {
124 if batch.num_rows() > 0 {
125 let total_array = batch
126 .column(0)
127 .as_any()
128 .downcast_ref::<arrow::array::Int64Array>()
129 .ok_or_else(|| {
130 AnalyzerError::invalid_data("Expected Int64 array for total count")
131 })?;
132
133 let distinct_array = batch
134 .column(1)
135 .as_any()
136 .downcast_ref::<arrow::array::Int64Array>()
137 .ok_or_else(|| {
138 AnalyzerError::invalid_data("Expected Int64 array for distinct count")
139 })?;
140
141 (total_array.value(0) as u64, distinct_array.value(0) as u64)
142 } else {
143 (0, 0)
144 }
145 } else {
146 (0, 0)
147 };
148
149 Ok(DistinctnessState {
150 total_count,
151 distinct_count,
152 })
153 }
154
155 fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
156 Ok(MetricValue::Double(state.distinctness()))
157 }
158
159 fn name(&self) -> &str {
160 "distinctness"
161 }
162
163 fn description(&self) -> &str {
164 "Computes the fraction of distinct values in a column"
165 }
166
167 fn metric_key(&self) -> String {
168 format!("{}.{}", self.name(), self.column)
169 }
170
171 fn columns(&self) -> Vec<&str> {
172 vec![&self.column]
173 }
174}