term_guard/analyzers/basic/
completeness.rs

1//! Completeness analyzer for measuring the fraction of non-null values.
2
3use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11/// Analyzer that computes the fraction of non-null values for a column.
12///
13/// Completeness is a fundamental data quality metric that measures
14/// how much of the expected data is actually present.
15///
16/// # Example
17///
18/// ```rust,ignore
19/// use term_guard::analyzers::basic::CompletenessAnalyzer;
20/// use datafusion::prelude::*;
21///
22/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23/// let ctx = SessionContext::new();
24/// // Register your data table
25///
26/// let analyzer = CompletenessAnalyzer::new("user_id");
27/// let state = analyzer.compute_state_from_data(&ctx).await?;
28/// let metric = analyzer.compute_metric_from_state(&state)?;
29///
30/// if let MetricValue::Double(completeness) = metric {
31///     println!("Column completeness: {:.2}%", completeness * 100.0);
32/// }
33/// # Ok(())
34/// # }
35/// ```
36#[derive(Debug, Clone)]
37pub struct CompletenessAnalyzer {
38    /// The column to analyze.
39    column: String,
40}
41
42impl CompletenessAnalyzer {
43    /// Creates a new completeness analyzer for the specified column.
44    pub fn new(column: impl Into<String>) -> Self {
45        Self {
46            column: column.into(),
47        }
48    }
49
50    /// Returns the column being analyzed.
51    pub fn column(&self) -> &str {
52        &self.column
53    }
54}
55
56/// State for the completeness analyzer.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct CompletenessState {
59    /// Total number of rows.
60    pub total_count: u64,
61    /// Number of non-null values.
62    pub non_null_count: u64,
63}
64
65impl CompletenessState {
66    /// Calculates the completeness fraction.
67    pub fn completeness(&self) -> f64 {
68        if self.total_count == 0 {
69            1.0 // Empty dataset is considered complete
70        } else {
71            self.non_null_count as f64 / self.total_count as f64
72        }
73    }
74}
75
76impl AnalyzerState for CompletenessState {
77    fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
78        let total_count = states.iter().map(|s| s.total_count).sum();
79        let non_null_count = states.iter().map(|s| s.non_null_count).sum();
80
81        Ok(CompletenessState {
82            total_count,
83            non_null_count,
84        })
85    }
86
87    fn is_empty(&self) -> bool {
88        self.total_count == 0
89    }
90}
91
92#[async_trait]
93impl Analyzer for CompletenessAnalyzer {
94    type State = CompletenessState;
95    type Metric = MetricValue;
96
97    #[instrument(skip(ctx), fields(analyzer = "completeness", column = %self.column))]
98    async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
99        // Build SQL query to count total rows and non-null values
100        // Get the table name from the validation context
101
102        let validation_ctx = current_validation_context();
103
104        let table_name = validation_ctx.table_name();
105
106        let sql = format!(
107            "SELECT COUNT(*) as total_count, COUNT({}) as non_null_count FROM {table_name}",
108            self.column
109        );
110
111        // Execute query
112        let df = ctx.sql(&sql).await?;
113        let batches = df.collect().await?;
114
115        // Extract counts from result
116        let (total_count, non_null_count) = if let Some(batch) = batches.first() {
117            if batch.num_rows() > 0 {
118                let total_array = batch
119                    .column(0)
120                    .as_any()
121                    .downcast_ref::<arrow::array::Int64Array>()
122                    .ok_or_else(|| {
123                        AnalyzerError::invalid_data("Expected Int64 array for total count")
124                    })?;
125
126                let non_null_array = batch
127                    .column(1)
128                    .as_any()
129                    .downcast_ref::<arrow::array::Int64Array>()
130                    .ok_or_else(|| {
131                        AnalyzerError::invalid_data("Expected Int64 array for non-null count")
132                    })?;
133
134                (total_array.value(0) as u64, non_null_array.value(0) as u64)
135            } else {
136                (0, 0)
137            }
138        } else {
139            (0, 0)
140        };
141
142        Ok(CompletenessState {
143            total_count,
144            non_null_count,
145        })
146    }
147
148    fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
149        Ok(MetricValue::Double(state.completeness()))
150    }
151
152    fn name(&self) -> &str {
153        "completeness"
154    }
155
156    fn description(&self) -> &str {
157        "Computes the fraction of non-null values in a column"
158    }
159
160    fn metric_key(&self) -> String {
161        format!("{}.{}", self.name(), self.column)
162    }
163
164    fn columns(&self) -> Vec<&str> {
165        vec![&self.column]
166    }
167}