term_guard/analyzers/basic/
size.rs1use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerResult, AnalyzerState, MetricValue};
9use crate::core::current_validation_context;
10
11#[derive(Debug, Clone)]
37pub struct SizeAnalyzer;
38
39impl SizeAnalyzer {
40 pub fn new() -> Self {
42 Self
43 }
44}
45
46impl Default for SizeAnalyzer {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct SizeState {
55 pub count: u64,
57}
58
59impl AnalyzerState for SizeState {
60 fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
61 let total_count = states.iter().map(|s| s.count).sum();
62 Ok(SizeState { count: total_count })
63 }
64
65 fn is_empty(&self) -> bool {
66 self.count == 0
67 }
68}
69
70#[async_trait]
71impl Analyzer for SizeAnalyzer {
72 type State = SizeState;
73 type Metric = MetricValue;
74
75 #[instrument(skip(ctx), fields(analyzer = "size"))]
76 async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
77 let validation_ctx = current_validation_context();
79 let table_name = validation_ctx.table_name();
80
81 let sql = format!("SELECT COUNT(*) as count FROM {table_name}");
83 let df = ctx.sql(&sql).await?;
84 let batches = df.collect().await?;
85
86 let count = if let Some(batch) = batches.first() {
88 if batch.num_rows() > 0 {
89 if let Some(array) = batch
90 .column(0)
91 .as_any()
92 .downcast_ref::<arrow::array::Int64Array>()
93 {
94 array.value(0) as u64
95 } else {
96 0
97 }
98 } else {
99 0
100 }
101 } else {
102 0
103 };
104
105 Ok(SizeState { count })
106 }
107
108 fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
109 Ok(MetricValue::Long(state.count as i64))
110 }
111
112 fn name(&self) -> &str {
113 "size"
114 }
115
116 fn description(&self) -> &str {
117 "Computes the number of rows in the dataset"
118 }
119}