sql_cli/sql/aggregates/
analytics.rs

1//! Analytics aggregate functions for time series and statistical operations
2
3use anyhow::{anyhow, Result};
4use std::collections::BTreeMap;
5
6use super::{AggregateFunction, AggregateState};
7use crate::data::datatable::DataValue;
8
9/// State for analytics aggregates
10#[derive(Debug, Clone)]
11pub struct AnalyticsState {
12    pub function_type: AnalyticsType,
13    pub values: Vec<f64>,
14    pub window_size: Option<usize>,
15}
16
17#[derive(Debug, Clone)]
18pub enum AnalyticsType {
19    Deltas,
20    Sums,
21    Mavg,
22    PctChange,
23    Rank,
24    CumMax,
25    CumMin,
26}
27
28impl AnalyticsState {
29    pub fn new(function_type: AnalyticsType, window_size: Option<usize>) -> Self {
30        Self {
31            function_type,
32            values: Vec::new(),
33            window_size,
34        }
35    }
36
37    pub fn add(&mut self, value: &DataValue) -> Result<()> {
38        let num = match value {
39            DataValue::Null => return Ok(()), // Skip nulls
40            DataValue::Integer(n) => *n as f64,
41            DataValue::Float(f) => *f,
42            _ => return Err(anyhow!("Analytics functions require numeric values")),
43        };
44        self.values.push(num);
45        Ok(())
46    }
47
48    pub fn finalize(self) -> DataValue {
49        if self.values.is_empty() {
50            return DataValue::Null;
51        }
52
53        let result = match self.function_type {
54            AnalyticsType::Deltas => compute_deltas(&self.values),
55            AnalyticsType::Sums => compute_sums(&self.values),
56            AnalyticsType::Mavg => compute_mavg(&self.values, self.window_size.unwrap_or(3)),
57            AnalyticsType::PctChange => compute_pct_change(&self.values),
58            AnalyticsType::Rank => compute_rank(&self.values),
59            AnalyticsType::CumMax => compute_cummax(&self.values),
60            AnalyticsType::CumMin => compute_cummin(&self.values),
61        };
62
63        DataValue::String(format!("[{}]", result.join(", ")))
64    }
65}
66
67fn compute_deltas(values: &[f64]) -> Vec<String> {
68    if values.is_empty() {
69        return vec![];
70    }
71
72    let mut deltas = vec![format_number(values[0])];
73    for i in 1..values.len() {
74        let delta = values[i] - values[i - 1];
75        deltas.push(format_number(delta));
76    }
77    deltas
78}
79
80fn compute_sums(values: &[f64]) -> Vec<String> {
81    let mut sums = Vec::new();
82    let mut running_sum = 0.0;
83
84    for val in values {
85        running_sum += val;
86        sums.push(format_number(running_sum));
87    }
88    sums
89}
90
91fn compute_mavg(values: &[f64], window: usize) -> Vec<String> {
92    let mut results = Vec::new();
93
94    for i in 0..values.len() {
95        let start = if i >= window { i - window + 1 } else { 0 };
96        let end = i + 1;
97        let window_values = &values[start..end];
98        let avg = window_values.iter().sum::<f64>() / window_values.len() as f64;
99        results.push(format_number(avg));
100    }
101    results
102}
103
104fn compute_pct_change(values: &[f64]) -> Vec<String> {
105    if values.is_empty() {
106        return vec![];
107    }
108
109    let mut changes = vec!["null".to_string()]; // First value has no previous
110    for i in 1..values.len() {
111        if values[i - 1] == 0.0 {
112            changes.push("null".to_string());
113        } else {
114            let pct = ((values[i] - values[i - 1]) / values[i - 1]) * 100.0;
115            changes.push(format!("{:.2}%", pct));
116        }
117    }
118    changes
119}
120
121fn compute_rank(values: &[f64]) -> Vec<String> {
122    // Create sorted unique values with their ranks
123    let mut sorted_unique: Vec<f64> = values.to_vec();
124    sorted_unique.sort_by(|a, b| a.partial_cmp(b).unwrap());
125    sorted_unique.dedup();
126
127    let mut rank_map = BTreeMap::new();
128    for (i, val) in sorted_unique.iter().enumerate() {
129        rank_map.insert(val.to_bits(), i + 1); // Use to_bits for exact matching
130    }
131
132    // Map each value to its rank
133    values
134        .iter()
135        .map(|v| rank_map[&v.to_bits()].to_string())
136        .collect()
137}
138
139fn compute_cummax(values: &[f64]) -> Vec<String> {
140    let mut results = Vec::new();
141    let mut current_max = f64::NEG_INFINITY;
142
143    for val in values {
144        current_max = current_max.max(*val);
145        results.push(format_number(current_max));
146    }
147    results
148}
149
150fn compute_cummin(values: &[f64]) -> Vec<String> {
151    let mut results = Vec::new();
152    let mut current_min = f64::INFINITY;
153
154    for val in values {
155        current_min = current_min.min(*val);
156        results.push(format_number(current_min));
157    }
158    results
159}
160
161fn format_number(n: f64) -> String {
162    if n.fract() == 0.0 && n.abs() < 1e10 {
163        format!("{}", n as i64)
164    } else {
165        // Format with 2 decimal places, removing trailing zeros
166        let formatted = format!("{:.2}", n);
167        if formatted.contains('.') {
168            formatted
169                .trim_end_matches('0')
170                .trim_end_matches('.')
171                .to_string()
172        } else {
173            formatted
174        }
175    }
176}
177
178// Aggregate function implementations
179
180pub struct DeltasFunction;
181impl AggregateFunction for DeltasFunction {
182    fn name(&self) -> &str {
183        "DELTAS"
184    }
185
186    fn init(&self) -> AggregateState {
187        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Deltas, None))
188    }
189
190    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
191        if let AggregateState::Analytics(ref mut analytics) = state {
192            analytics.add(value)
193        } else {
194            Err(anyhow!("Invalid state for DELTAS"))
195        }
196    }
197
198    fn finalize(&self, state: AggregateState) -> DataValue {
199        if let AggregateState::Analytics(analytics) = state {
200            analytics.finalize()
201        } else {
202            DataValue::Null
203        }
204    }
205}
206
207pub struct SumsFunction;
208impl AggregateFunction for SumsFunction {
209    fn name(&self) -> &str {
210        "SUMS"
211    }
212
213    fn init(&self) -> AggregateState {
214        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Sums, None))
215    }
216
217    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
218        if let AggregateState::Analytics(ref mut analytics) = state {
219            analytics.add(value)
220        } else {
221            Err(anyhow!("Invalid state for SUMS"))
222        }
223    }
224
225    fn finalize(&self, state: AggregateState) -> DataValue {
226        if let AggregateState::Analytics(analytics) = state {
227            analytics.finalize()
228        } else {
229            DataValue::Null
230        }
231    }
232}
233
234pub struct MavgFunction;
235impl AggregateFunction for MavgFunction {
236    fn name(&self) -> &str {
237        "MAVG"
238    }
239
240    fn init(&self) -> AggregateState {
241        // Default window size is 3
242        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Mavg, Some(3)))
243    }
244
245    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
246        if let AggregateState::Analytics(ref mut analytics) = state {
247            analytics.add(value)
248        } else {
249            Err(anyhow!("Invalid state for MAVG"))
250        }
251    }
252
253    fn finalize(&self, state: AggregateState) -> DataValue {
254        if let AggregateState::Analytics(analytics) = state {
255            analytics.finalize()
256        } else {
257            DataValue::Null
258        }
259    }
260}
261
262pub struct PctChangeFunction;
263impl AggregateFunction for PctChangeFunction {
264    fn name(&self) -> &str {
265        "PCT_CHANGE"
266    }
267
268    fn init(&self) -> AggregateState {
269        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::PctChange, None))
270    }
271
272    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
273        if let AggregateState::Analytics(ref mut analytics) = state {
274            analytics.add(value)
275        } else {
276            Err(anyhow!("Invalid state for PCT_CHANGE"))
277        }
278    }
279
280    fn finalize(&self, state: AggregateState) -> DataValue {
281        if let AggregateState::Analytics(analytics) = state {
282            analytics.finalize()
283        } else {
284            DataValue::Null
285        }
286    }
287}
288
289pub struct RankFunction;
290impl AggregateFunction for RankFunction {
291    fn name(&self) -> &str {
292        "RANK"
293    }
294
295    fn init(&self) -> AggregateState {
296        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Rank, None))
297    }
298
299    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
300        if let AggregateState::Analytics(ref mut analytics) = state {
301            analytics.add(value)
302        } else {
303            Err(anyhow!("Invalid state for RANK"))
304        }
305    }
306
307    fn finalize(&self, state: AggregateState) -> DataValue {
308        if let AggregateState::Analytics(analytics) = state {
309            analytics.finalize()
310        } else {
311            DataValue::Null
312        }
313    }
314}
315
316pub struct CumMaxFunction;
317impl AggregateFunction for CumMaxFunction {
318    fn name(&self) -> &str {
319        "CUMMAX"
320    }
321
322    fn init(&self) -> AggregateState {
323        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMax, None))
324    }
325
326    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
327        if let AggregateState::Analytics(ref mut analytics) = state {
328            analytics.add(value)
329        } else {
330            Err(anyhow!("Invalid state for CUMMAX"))
331        }
332    }
333
334    fn finalize(&self, state: AggregateState) -> DataValue {
335        if let AggregateState::Analytics(analytics) = state {
336            analytics.finalize()
337        } else {
338            DataValue::Null
339        }
340    }
341}
342
343pub struct CumMinFunction;
344impl AggregateFunction for CumMinFunction {
345    fn name(&self) -> &str {
346        "CUMMIN"
347    }
348
349    fn init(&self) -> AggregateState {
350        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMin, None))
351    }
352
353    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
354        if let AggregateState::Analytics(ref mut analytics) = state {
355            analytics.add(value)
356        } else {
357            Err(anyhow!("Invalid state for CUMMIN"))
358        }
359    }
360
361    fn finalize(&self, state: AggregateState) -> DataValue {
362        if let AggregateState::Analytics(analytics) = state {
363            analytics.finalize()
364        } else {
365            DataValue::Null
366        }
367    }
368}