term_guard/analyzers/basic/
distinctness.rs

1//! Distinctness analyzer for measuring the fraction of distinct values.
2
3use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11/// Analyzer that computes the fraction of distinct values for a column.
12///
13/// Distinctness measures uniqueness in data and is useful for identifying
14/// columns that might be identifiers or have high cardinality.
15///
16/// # Example
17///
18/// ```rust,ignore
19/// use term_guard::analyzers::basic::DistinctnessAnalyzer;
20/// use datafusion::prelude::*;
21///
22/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23/// let ctx = SessionContext::new();
24/// // Register your data table
25///
26/// let analyzer = DistinctnessAnalyzer::new("user_id");
27/// let state = analyzer.compute_state_from_data(&ctx).await?;
28/// let metric = analyzer.compute_metric_from_state(&state)?;
29///
30/// if let MetricValue::Double(distinctness) = metric {
31///     println!("Column distinctness: {:.2}%", distinctness * 100.0);
32/// }
33/// # Ok(())
34/// # }
35/// ```
36#[derive(Debug, Clone)]
37pub struct DistinctnessAnalyzer {
38    /// The column to analyze.
39    column: String,
40}
41
42impl DistinctnessAnalyzer {
43    /// Creates a new distinctness analyzer for the specified column.
44    pub fn new(column: impl Into<String>) -> Self {
45        Self {
46            column: column.into(),
47        }
48    }
49
50    /// Returns the column being analyzed.
51    pub fn column(&self) -> &str {
52        &self.column
53    }
54}
55
56/// State for the distinctness analyzer.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct DistinctnessState {
59    /// Total number of non-null values.
60    pub total_count: u64,
61    /// Number of distinct values.
62    pub distinct_count: u64,
63}
64
65impl DistinctnessState {
66    /// Calculates the distinctness fraction.
67    pub fn distinctness(&self) -> f64 {
68        if self.total_count == 0 {
69            1.0 // Empty dataset is considered fully distinct
70        } else {
71            self.distinct_count as f64 / self.total_count as f64
72        }
73    }
74}
75
76impl AnalyzerState for DistinctnessState {
77    fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
78        // Note: Merging distinct counts is approximate when done across partitions
79        // For exact results, we would need to track actual distinct values
80        // This implementation provides an upper bound
81        let total_count = states.iter().map(|s| s.total_count).sum();
82        let distinct_count = states
83            .iter()
84            .map(|s| s.distinct_count)
85            .sum::<u64>()
86            .min(total_count);
87
88        Ok(DistinctnessState {
89            total_count,
90            distinct_count,
91        })
92    }
93
94    fn is_empty(&self) -> bool {
95        self.total_count == 0
96    }
97}
98
99#[async_trait]
100impl Analyzer for DistinctnessAnalyzer {
101    type State = DistinctnessState;
102    type Metric = MetricValue;
103
104    #[instrument(skip(ctx), fields(analyzer = "distinctness", column = %self.column))]
105    async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
106        // Build SQL query to count total non-null values and distinct values
107        // Get the table name from the validation context
108
109        let validation_ctx = current_validation_context();
110
111        let table_name = validation_ctx.table_name();
112
113        let sql = format!(
114            "SELECT COUNT({0}) as total_count, COUNT(DISTINCT {0}) as distinct_count FROM {table_name}",
115            self.column
116        );
117
118        // Execute query
119        let df = ctx.sql(&sql).await?;
120        let batches = df.collect().await?;
121
122        // Extract counts from result
123        let (total_count, distinct_count) = if let Some(batch) = batches.first() {
124            if batch.num_rows() > 0 {
125                let total_array = batch
126                    .column(0)
127                    .as_any()
128                    .downcast_ref::<arrow::array::Int64Array>()
129                    .ok_or_else(|| {
130                        AnalyzerError::invalid_data("Expected Int64 array for total count")
131                    })?;
132
133                let distinct_array = batch
134                    .column(1)
135                    .as_any()
136                    .downcast_ref::<arrow::array::Int64Array>()
137                    .ok_or_else(|| {
138                        AnalyzerError::invalid_data("Expected Int64 array for distinct count")
139                    })?;
140
141                (total_array.value(0) as u64, distinct_array.value(0) as u64)
142            } else {
143                (0, 0)
144            }
145        } else {
146            (0, 0)
147        };
148
149        Ok(DistinctnessState {
150            total_count,
151            distinct_count,
152        })
153    }
154
155    fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
156        Ok(MetricValue::Double(state.distinctness()))
157    }
158
159    fn name(&self) -> &str {
160        "distinctness"
161    }
162
163    fn description(&self) -> &str {
164        "Computes the fraction of distinct values in a column"
165    }
166
167    fn metric_key(&self) -> String {
168        format!("{}.{}", self.name(), self.column)
169    }
170
171    fn columns(&self) -> Vec<&str> {
172        vec![&self.column]
173    }
174}