term_guard/analyzers/advanced/
approx_count_distinct.rs1use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11#[derive(Debug, Clone)]
38pub struct ApproxCountDistinctAnalyzer {
39 column: String,
41}
42
43impl ApproxCountDistinctAnalyzer {
44 pub fn new(column: impl Into<String>) -> Self {
46 Self {
47 column: column.into(),
48 }
49 }
50
51 pub fn column(&self) -> &str {
53 &self.column
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ApproxCountDistinctState {
60 pub approx_distinct_count: u64,
62 pub total_count: u64,
64}
65
66impl ApproxCountDistinctState {
67 pub fn distinctness_ratio(&self) -> f64 {
69 if self.total_count == 0 {
70 1.0
71 } else {
72 self.approx_distinct_count as f64 / self.total_count as f64
73 }
74 }
75}
76
77impl AnalyzerState for ApproxCountDistinctState {
78 fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
79 let approx_distinct_count = states
83 .iter()
84 .map(|s| s.approx_distinct_count)
85 .max()
86 .unwrap_or(0);
87 let total_count = states.iter().map(|s| s.total_count).sum();
88
89 Ok(ApproxCountDistinctState {
90 approx_distinct_count,
91 total_count,
92 })
93 }
94
95 fn is_empty(&self) -> bool {
96 self.total_count == 0
97 }
98}
99
100#[async_trait]
101impl Analyzer for ApproxCountDistinctAnalyzer {
102 type State = ApproxCountDistinctState;
103 type Metric = MetricValue;
104
105 #[instrument(skip(ctx), fields(analyzer = "approx_count_distinct", column = %self.column))]
106 async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
107 let validation_ctx = current_validation_context();
111
112 let table_name = validation_ctx.table_name();
113
114 let sql = format!(
115 "SELECT APPROX_DISTINCT({0}) as approx_distinct, COUNT({0}) as total FROM {table_name}",
116 self.column
117 );
118
119 let df = ctx.sql(&sql).await?;
121 let batches = df.collect().await?;
122
123 let (approx_distinct_count, total_count) = if let Some(batch) = batches.first() {
125 if batch.num_rows() > 0 {
126 let approx_distinct_array = batch
128 .column(0)
129 .as_any()
130 .downcast_ref::<arrow::array::UInt64Array>()
131 .ok_or_else(|| {
132 AnalyzerError::invalid_data("Expected UInt64 array for approx_distinct")
133 })?;
134 let approx_distinct = approx_distinct_array.value(0);
135
136 let total_array = batch
137 .column(1)
138 .as_any()
139 .downcast_ref::<arrow::array::Int64Array>()
140 .ok_or_else(|| {
141 AnalyzerError::invalid_data("Expected Int64 array for total count")
142 })?;
143 let total = total_array.value(0) as u64;
144
145 (approx_distinct, total)
146 } else {
147 (0, 0)
148 }
149 } else {
150 (0, 0)
151 };
152
153 Ok(ApproxCountDistinctState {
154 approx_distinct_count,
155 total_count,
156 })
157 }
158
159 fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
160 Ok(MetricValue::Long(state.approx_distinct_count as i64))
161 }
162
163 fn name(&self) -> &str {
164 "approx_count_distinct"
165 }
166
167 fn description(&self) -> &str {
168 "Computes approximate count of distinct values using HyperLogLog"
169 }
170
171 fn metric_key(&self) -> String {
172 format!("{}.{}", self.name(), self.column)
173 }
174
175 fn columns(&self) -> Vec<&str> {
176 vec![&self.column]
177 }
178}