term_guard/analyzers/basic/
size.rs

1//! Size analyzer for counting rows in a dataset.
2
3use async_trait::async_trait;
4use datafusion::prelude::*;
5use serde::{Deserialize, Serialize};
6use tracing::instrument;
7
8use crate::analyzers::{Analyzer, AnalyzerResult, AnalyzerState, MetricValue};
9use crate::core::current_validation_context;
10
11/// Analyzer that computes the number of rows in a dataset.
12///
13/// This is one of the most basic analyzers and serves as a foundation
14/// for other metrics that need row counts.
15///
16/// # Example
17///
18/// ```rust,ignore
19/// use term_guard::analyzers::basic::SizeAnalyzer;
20/// use datafusion::prelude::*;
21///
22/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23/// let ctx = SessionContext::new();
24/// // Register your data table
25///
26/// let analyzer = SizeAnalyzer::new();
27/// let state = analyzer.compute_state_from_data(&ctx).await?;
28/// let metric = analyzer.compute_metric_from_state(&state)?;
29///
30/// if let MetricValue::Long(count) = metric {
31///     println!("Dataset has {} rows", count);
32/// }
33/// # Ok(())
34/// # }
35/// ```
36#[derive(Debug, Clone)]
37pub struct SizeAnalyzer;
38
39impl SizeAnalyzer {
40    /// Creates a new size analyzer.
41    pub fn new() -> Self {
42        Self
43    }
44}
45
46impl Default for SizeAnalyzer {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52/// State for the size analyzer containing the row count.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct SizeState {
55    /// Number of rows counted.
56    pub count: u64,
57}
58
59impl AnalyzerState for SizeState {
60    fn merge(states: Vec<Self>) -> AnalyzerResult<Self> {
61        let total_count = states.iter().map(|s| s.count).sum();
62        Ok(SizeState { count: total_count })
63    }
64
65    fn is_empty(&self) -> bool {
66        self.count == 0
67    }
68}
69
70#[async_trait]
71impl Analyzer for SizeAnalyzer {
72    type State = SizeState;
73    type Metric = MetricValue;
74
75    #[instrument(skip(ctx), fields(analyzer = "size"))]
76    async fn compute_state_from_data(&self, ctx: &SessionContext) -> AnalyzerResult<Self::State> {
77        // Get the table name from the validation context
78        let validation_ctx = current_validation_context();
79        let table_name = validation_ctx.table_name();
80
81        // Execute count query
82        let sql = format!("SELECT COUNT(*) as count FROM {table_name}");
83        let df = ctx.sql(&sql).await?;
84        let batches = df.collect().await?;
85
86        // Extract count from result
87        let count = if let Some(batch) = batches.first() {
88            if batch.num_rows() > 0 {
89                if let Some(array) = batch
90                    .column(0)
91                    .as_any()
92                    .downcast_ref::<arrow::array::Int64Array>()
93                {
94                    array.value(0) as u64
95                } else {
96                    0
97                }
98            } else {
99                0
100            }
101        } else {
102            0
103        };
104
105        Ok(SizeState { count })
106    }
107
108    fn compute_metric_from_state(&self, state: &Self::State) -> AnalyzerResult<Self::Metric> {
109        Ok(MetricValue::Long(state.count as i64))
110    }
111
112    fn name(&self) -> &str {
113        "size"
114    }
115
116    fn description(&self) -> &str {
117        "Computes the number of rows in the dataset"
118    }
119}