sql_cli/sql/aggregates/
analytics.rs

1//! Analytics aggregate functions for time series and statistical operations
2
3use anyhow::{anyhow, Result};
4use std::collections::BTreeMap;
5
6use super::{AggregateFunction, AggregateState};
7use crate::data::datatable::DataValue;
8
9/// State for analytics aggregates
10#[derive(Debug, Clone)]
11pub struct AnalyticsState {
12    pub function_type: AnalyticsType,
13    pub values: Vec<f64>,
14    pub window_size: Option<usize>,
15}
16
17#[derive(Debug, Clone)]
18pub enum AnalyticsType {
19    Deltas,
20    Sums,
21    Mavg,
22    PctChange,
23    Rank,
24    CumMax,
25    CumMin,
26}
27
28impl AnalyticsState {
29    #[must_use]
30    pub fn new(function_type: AnalyticsType, window_size: Option<usize>) -> Self {
31        Self {
32            function_type,
33            values: Vec::new(),
34            window_size,
35        }
36    }
37
38    pub fn add(&mut self, value: &DataValue) -> Result<()> {
39        let num = match value {
40            DataValue::Null => return Ok(()), // Skip nulls
41            DataValue::Integer(n) => *n as f64,
42            DataValue::Float(f) => *f,
43            _ => return Err(anyhow!("Analytics functions require numeric values")),
44        };
45        self.values.push(num);
46        Ok(())
47    }
48
49    #[must_use]
50    pub fn finalize(self) -> DataValue {
51        if self.values.is_empty() {
52            return DataValue::Null;
53        }
54
55        let result = match self.function_type {
56            AnalyticsType::Deltas => compute_deltas(&self.values),
57            AnalyticsType::Sums => compute_sums(&self.values),
58            AnalyticsType::Mavg => compute_mavg(&self.values, self.window_size.unwrap_or(3)),
59            AnalyticsType::PctChange => compute_pct_change(&self.values),
60            AnalyticsType::Rank => compute_rank(&self.values),
61            AnalyticsType::CumMax => compute_cummax(&self.values),
62            AnalyticsType::CumMin => compute_cummin(&self.values),
63        };
64
65        DataValue::String(format!("[{}]", result.join(", ")))
66    }
67}
68
69fn compute_deltas(values: &[f64]) -> Vec<String> {
70    if values.is_empty() {
71        return vec![];
72    }
73
74    let mut deltas = vec![format_number(values[0])];
75    for i in 1..values.len() {
76        let delta = values[i] - values[i - 1];
77        deltas.push(format_number(delta));
78    }
79    deltas
80}
81
82fn compute_sums(values: &[f64]) -> Vec<String> {
83    let mut sums = Vec::new();
84    let mut running_sum = 0.0;
85
86    for val in values {
87        running_sum += val;
88        sums.push(format_number(running_sum));
89    }
90    sums
91}
92
93fn compute_mavg(values: &[f64], window: usize) -> Vec<String> {
94    let mut results = Vec::new();
95
96    for i in 0..values.len() {
97        let start = if i >= window { i - window + 1 } else { 0 };
98        let end = i + 1;
99        let window_values = &values[start..end];
100        let avg = window_values.iter().sum::<f64>() / window_values.len() as f64;
101        results.push(format_number(avg));
102    }
103    results
104}
105
106fn compute_pct_change(values: &[f64]) -> Vec<String> {
107    if values.is_empty() {
108        return vec![];
109    }
110
111    let mut changes = vec!["null".to_string()]; // First value has no previous
112    for i in 1..values.len() {
113        if values[i - 1] == 0.0 {
114            changes.push("null".to_string());
115        } else {
116            let pct = ((values[i] - values[i - 1]) / values[i - 1]) * 100.0;
117            changes.push(format!("{pct:.2}%"));
118        }
119    }
120    changes
121}
122
123fn compute_rank(values: &[f64]) -> Vec<String> {
124    // Create sorted unique values with their ranks
125    let mut sorted_unique: Vec<f64> = values.to_vec();
126    sorted_unique.sort_by(|a, b| a.partial_cmp(b).unwrap());
127    sorted_unique.dedup();
128
129    let mut rank_map = BTreeMap::new();
130    for (i, val) in sorted_unique.iter().enumerate() {
131        rank_map.insert(val.to_bits(), i + 1); // Use to_bits for exact matching
132    }
133
134    // Map each value to its rank
135    values
136        .iter()
137        .map(|v| rank_map[&v.to_bits()].to_string())
138        .collect()
139}
140
141fn compute_cummax(values: &[f64]) -> Vec<String> {
142    let mut results = Vec::new();
143    let mut current_max = f64::NEG_INFINITY;
144
145    for val in values {
146        current_max = current_max.max(*val);
147        results.push(format_number(current_max));
148    }
149    results
150}
151
152fn compute_cummin(values: &[f64]) -> Vec<String> {
153    let mut results = Vec::new();
154    let mut current_min = f64::INFINITY;
155
156    for val in values {
157        current_min = current_min.min(*val);
158        results.push(format_number(current_min));
159    }
160    results
161}
162
163fn format_number(n: f64) -> String {
164    if n.fract() == 0.0 && n.abs() < 1e10 {
165        format!("{}", n as i64)
166    } else {
167        // Format with 2 decimal places, removing trailing zeros
168        let formatted = format!("{n:.2}");
169        if formatted.contains('.') {
170            formatted
171                .trim_end_matches('0')
172                .trim_end_matches('.')
173                .to_string()
174        } else {
175            formatted
176        }
177    }
178}
179
180// Aggregate function implementations
181
182pub struct DeltasFunction;
183impl AggregateFunction for DeltasFunction {
184    fn name(&self) -> &'static str {
185        "DELTAS"
186    }
187
188    fn init(&self) -> AggregateState {
189        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Deltas, None))
190    }
191
192    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
193        if let AggregateState::Analytics(ref mut analytics) = state {
194            analytics.add(value)
195        } else {
196            Err(anyhow!("Invalid state for DELTAS"))
197        }
198    }
199
200    fn finalize(&self, state: AggregateState) -> DataValue {
201        if let AggregateState::Analytics(analytics) = state {
202            analytics.finalize()
203        } else {
204            DataValue::Null
205        }
206    }
207}
208
209pub struct SumsFunction;
210impl AggregateFunction for SumsFunction {
211    fn name(&self) -> &'static str {
212        "SUMS"
213    }
214
215    fn init(&self) -> AggregateState {
216        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Sums, None))
217    }
218
219    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
220        if let AggregateState::Analytics(ref mut analytics) = state {
221            analytics.add(value)
222        } else {
223            Err(anyhow!("Invalid state for SUMS"))
224        }
225    }
226
227    fn finalize(&self, state: AggregateState) -> DataValue {
228        if let AggregateState::Analytics(analytics) = state {
229            analytics.finalize()
230        } else {
231            DataValue::Null
232        }
233    }
234}
235
236pub struct MavgFunction;
237impl AggregateFunction for MavgFunction {
238    fn name(&self) -> &'static str {
239        "MAVG"
240    }
241
242    fn init(&self) -> AggregateState {
243        // Default window size is 3
244        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Mavg, Some(3)))
245    }
246
247    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
248        if let AggregateState::Analytics(ref mut analytics) = state {
249            analytics.add(value)
250        } else {
251            Err(anyhow!("Invalid state for MAVG"))
252        }
253    }
254
255    fn finalize(&self, state: AggregateState) -> DataValue {
256        if let AggregateState::Analytics(analytics) = state {
257            analytics.finalize()
258        } else {
259            DataValue::Null
260        }
261    }
262}
263
264pub struct PctChangeFunction;
265impl AggregateFunction for PctChangeFunction {
266    fn name(&self) -> &'static str {
267        "PCT_CHANGE"
268    }
269
270    fn init(&self) -> AggregateState {
271        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::PctChange, None))
272    }
273
274    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
275        if let AggregateState::Analytics(ref mut analytics) = state {
276            analytics.add(value)
277        } else {
278            Err(anyhow!("Invalid state for PCT_CHANGE"))
279        }
280    }
281
282    fn finalize(&self, state: AggregateState) -> DataValue {
283        if let AggregateState::Analytics(analytics) = state {
284            analytics.finalize()
285        } else {
286            DataValue::Null
287        }
288    }
289}
290
291pub struct RankFunction;
292impl AggregateFunction for RankFunction {
293    fn name(&self) -> &'static str {
294        "RANK"
295    }
296
297    fn init(&self) -> AggregateState {
298        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Rank, None))
299    }
300
301    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
302        if let AggregateState::Analytics(ref mut analytics) = state {
303            analytics.add(value)
304        } else {
305            Err(anyhow!("Invalid state for RANK"))
306        }
307    }
308
309    fn finalize(&self, state: AggregateState) -> DataValue {
310        if let AggregateState::Analytics(analytics) = state {
311            analytics.finalize()
312        } else {
313            DataValue::Null
314        }
315    }
316}
317
318pub struct CumMaxFunction;
319impl AggregateFunction for CumMaxFunction {
320    fn name(&self) -> &'static str {
321        "CUMMAX"
322    }
323
324    fn init(&self) -> AggregateState {
325        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMax, None))
326    }
327
328    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
329        if let AggregateState::Analytics(ref mut analytics) = state {
330            analytics.add(value)
331        } else {
332            Err(anyhow!("Invalid state for CUMMAX"))
333        }
334    }
335
336    fn finalize(&self, state: AggregateState) -> DataValue {
337        if let AggregateState::Analytics(analytics) = state {
338            analytics.finalize()
339        } else {
340            DataValue::Null
341        }
342    }
343}
344
345pub struct CumMinFunction;
346impl AggregateFunction for CumMinFunction {
347    fn name(&self) -> &'static str {
348        "CUMMIN"
349    }
350
351    fn init(&self) -> AggregateState {
352        AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMin, None))
353    }
354
355    fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
356        if let AggregateState::Analytics(ref mut analytics) = state {
357            analytics.add(value)
358        } else {
359            Err(anyhow!("Invalid state for CUMMIN"))
360        }
361    }
362
363    fn finalize(&self, state: AggregateState) -> DataValue {
364        if let AggregateState::Analytics(analytics) = state {
365            analytics.finalize()
366        } else {
367            DataValue::Null
368        }
369    }
370}