term_guard/analyzers/advanced/
approx_count_distinct.rs

1//! Approximate count distinct analyzer using HyperLogLog algorithm.
2
3use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11/// Analyzer that computes approximate count distinct using HyperLogLog.
12///
13/// This analyzer provides memory-efficient cardinality estimation for high-cardinality
14/// columns with configurable precision. It uses DataFusion's built-in APPROX_DISTINCT
15/// function which implements the HyperLogLog algorithm.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// use term_guard::analyzers::advanced::ApproxCountDistinctAnalyzer;
21/// use datafusion::prelude::*;
22///
23/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24/// let ctx = SessionContext::new();
25/// // Register your data table
26///
27/// let analyzer = ApproxCountDistinctAnalyzer::new("user_id");
28/// let state = analyzer.compute_state_from_data(&ctx).await?;
29/// let metric = analyzer.compute_metric_from_state(&state)?;
30///
31/// if let MetricValue::Long(approx_distinct) = metric {
32///     println!("Approximate distinct users: {}", approx_distinct);
33/// }
34/// # Ok(())
35/// # }
36/// ```
37#[derive(Debug, Clone)]
38pub struct ApproxCountDistinctAnalyzer {
39    /// The column to analyze.
40    column: String,
41}
42
43impl ApproxCountDistinctAnalyzer {
44    /// Creates a new approximate count distinct analyzer for the specified column.
45    pub fn new(column: impl Into<String>) -> Self {
46        Self {
47            column: column.into(),
48        }
49    }
50
51    /// Returns the column being analyzed.
52    pub fn column(&self) -> &str {
53        &self.column
54    }
55}
56
57/// State for the approximate count distinct analyzer.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ApproxCountDistinctState {
60    /// Approximate count of distinct values.
61    pub approx_distinct_count: u64,
62    /// Total count of non-null values for calculating distinctness ratio.
63    pub total_count: u64,
64}
65
66impl ApproxCountDistinctState {
67    /// Calculates the approximate distinctness ratio.
68    pub fn distinctness_ratio(&self) -> f64 {
69        if self.total_count == 0 {
70            1.0
71        } else {
72            self.approx_distinct_count as f64 / self.total_count as f64
73        }
74    }
75}
76
77impl AnalyzerState for ApproxCountDistinctState {
78    fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
79        // Note: Merging HyperLogLog states would require access to the actual HLL sketches
80        // Since we only have the final counts, we take the max as an approximation
81        // In a production system, we'd store and merge the actual HLL data structures
82        let approx_distinct_count = states
83            .iter()
84            .map(|s| s.approx_distinct_count)
85            .max()
86            .unwrap_or(0);
87        let total_count = states.iter().map(|s| s.total_count).sum();
88
89        Ok(ApproxCountDistinctState {
90            approx_distinct_count,
91            total_count,
92        })
93    }
94
95    fn is_empty(&self) -> bool {
96        self.total_count == 0
97    }
98}
99
100#[async_trait]
101impl Analyzer for ApproxCountDistinctAnalyzer {
102    type State = ApproxCountDistinctState;
103    type Metric = MetricValue;
104
105    #[instrument(skip(ctx), fields(analyzer = "approx_count_distinct", column = %self.column))]
106    async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
107        // Build SQL query using APPROX_DISTINCT function
108        // Get the table name from the validation context
109
110        let validation_ctx = current_validation_context();
111
112        let table_name = validation_ctx.table_name();
113
114        let sql = format!(
115            "SELECT APPROX_DISTINCT({0}) as approx_distinct, COUNT({0}) as total FROM {table_name}",
116            self.column
117        );
118
119        // Execute query
120        let df = ctx.sql(&sql).await?;
121        let batches = df.collect().await?;
122
123        // Extract counts from result
124        let (approx_distinct_count, total_count) = if let Some(batch) = batches.first() {
125            if batch.num_rows() > 0 {
126                // APPROX_DISTINCT returns UInt64
127                let approx_distinct_array = batch
128                    .column(0)
129                    .as_any()
130                    .downcast_ref::<arrow::array::UInt64Array>()
131                    .ok_or_else(|| {
132                        AnalyzerError::invalid_data("Expected UInt64 array for approx_distinct")
133                    })?;
134                let approx_distinct = approx_distinct_array.value(0);
135
136                let total_array = batch
137                    .column(1)
138                    .as_any()
139                    .downcast_ref::<arrow::array::Int64Array>()
140                    .ok_or_else(|| {
141                        AnalyzerError::invalid_data("Expected Int64 array for total count")
142                    })?;
143                let total = total_array.value(0) as u64;
144
145                (approx_distinct, total)
146            } else {
147                (0, 0)
148            }
149        } else {
150            (0, 0)
151        };
152
153        Ok(ApproxCountDistinctState {
154            approx_distinct_count,
155            total_count,
156        })
157    }
158
159    fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
160        Ok(MetricValue::Long(state.approx_distinct_count as i64))
161    }
162
163    fn name(&self) -> &str {
164        "approx_count_distinct"
165    }
166
167    fn description(&self) -> &str {
168        "Computes approximate count of distinct values using HyperLogLog"
169    }
170
171    fn metric_key(&self) -> String {
172        format!("{}.{}", self.name(), self.column)
173    }
174
175    fn columns(&self) -> Vec<&str> {
176        vec![&self.column]
177    }
178}