term_guard/analyzers/basic/
completeness.rs1use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerError, AnalyzerResult, AnalyzerState, MetricValue};
9
10use crate::core::current_validation_context;
11#[derive(Debug, Clone)]
37pub struct CompletenessAnalyzer {
38 column: String,
40}
41
42impl CompletenessAnalyzer {
43 pub fn new(column: impl Into<String>) -> Self {
45 Self {
46 column: column.into(),
47 }
48 }
49
50 pub fn column(&self) -> &str {
52 &self.column
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct CompletenessState {
59 pub total_count: u64,
61 pub non_null_count: u64,
63}
64
65impl CompletenessState {
66 pub fn completeness(&self) -> f64 {
68 if self.total_count == 0 {
69 1.0 } else {
71 self.non_null_count as f64 / self.total_count as f64
72 }
73 }
74}
75
76impl AnalyzerState for CompletenessState {
77 fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
78 let total_count = states.iter().map(|s| s.total_count).sum();
79 let non_null_count = states.iter().map(|s| s.non_null_count).sum();
80
81 Ok(CompletenessState {
82 total_count,
83 non_null_count,
84 })
85 }
86
87 fn is_empty(&self) -> bool {
88 self.total_count == 0
89 }
90}
91
92#[async_trait]
93impl Analyzer for CompletenessAnalyzer {
94 type State = CompletenessState;
95 type Metric = MetricValue;
96
97 #[instrument(skip(ctx), fields(analyzer = "completeness", column = %self.column))]
98 async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
99 let validation_ctx = current_validation_context();
103
104 let table_name = validation_ctx.table_name();
105
106 let sql = format!(
107 "SELECT COUNT(*) as total_count, COUNT({}) as non_null_count FROM {table_name}",
108 self.column
109 );
110
111 let df = ctx.sql(&sql).await?;
113 let batches = df.collect().await?;
114
115 let (total_count, non_null_count) = if let Some(batch) = batches.first() {
117 if batch.num_rows() > 0 {
118 let total_array = batch
119 .column(0)
120 .as_any()
121 .downcast_ref::<arrow::array::Int64Array>()
122 .ok_or_else(|| {
123 AnalyzerError::invalid_data("Expected Int64 array for total count")
124 })?;
125
126 let non_null_array = batch
127 .column(1)
128 .as_any()
129 .downcast_ref::<arrow::array::Int64Array>()
130 .ok_or_else(|| {
131 AnalyzerError::invalid_data("Expected Int64 array for non-null count")
132 })?;
133
134 (total_array.value(0) as u64, non_null_array.value(0) as u64)
135 } else {
136 (0, 0)
137 }
138 } else {
139 (0, 0)
140 };
141
142 Ok(CompletenessState {
143 total_count,
144 non_null_count,
145 })
146 }
147
148 fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
149 Ok(MetricValue::Double(state.completeness()))
150 }
151
152 fn name(&self) -> &str {
153 "completeness"
154 }
155
156 fn description(&self) -> &str {
157 "Computes the fraction of non-null values in a column"
158 }
159
160 fn metric_key(&self) -> String {
161 format!("{}.{}", self.name(), self.column)
162 }
163
164 fn columns(&self) -> Vec<&str> {
165 vec![&self.column]
166 }
167}