sql_cli/sql/aggregates/
analytics.rs1use anyhow::{anyhow, Result};
4use std::collections::BTreeMap;
5
6use super::{AggregateFunction, AggregateState};
7use crate::data::datatable::DataValue;
8
9#[derive(Debug, Clone)]
11pub struct AnalyticsState {
12 pub function_type: AnalyticsType,
13 pub values: Vec<f64>,
14 pub window_size: Option<usize>,
15}
16
17#[derive(Debug, Clone)]
18pub enum AnalyticsType {
19 Deltas,
20 Sums,
21 Mavg,
22 PctChange,
23 Rank,
24 CumMax,
25 CumMin,
26}
27
28impl AnalyticsState {
29 #[must_use]
30 pub fn new(function_type: AnalyticsType, window_size: Option<usize>) -> Self {
31 Self {
32 function_type,
33 values: Vec::new(),
34 window_size,
35 }
36 }
37
38 pub fn add(&mut self, value: &DataValue) -> Result<()> {
39 let num = match value {
40 DataValue::Null => return Ok(()), DataValue::Integer(n) => *n as f64,
42 DataValue::Float(f) => *f,
43 _ => return Err(anyhow!("Analytics functions require numeric values")),
44 };
45 self.values.push(num);
46 Ok(())
47 }
48
49 #[must_use]
50 pub fn finalize(self) -> DataValue {
51 if self.values.is_empty() {
52 return DataValue::Null;
53 }
54
55 let result = match self.function_type {
56 AnalyticsType::Deltas => compute_deltas(&self.values),
57 AnalyticsType::Sums => compute_sums(&self.values),
58 AnalyticsType::Mavg => compute_mavg(&self.values, self.window_size.unwrap_or(3)),
59 AnalyticsType::PctChange => compute_pct_change(&self.values),
60 AnalyticsType::Rank => compute_rank(&self.values),
61 AnalyticsType::CumMax => compute_cummax(&self.values),
62 AnalyticsType::CumMin => compute_cummin(&self.values),
63 };
64
65 DataValue::String(format!("[{}]", result.join(", ")))
66 }
67}
68
69fn compute_deltas(values: &[f64]) -> Vec<String> {
70 if values.is_empty() {
71 return vec![];
72 }
73
74 let mut deltas = vec![format_number(values[0])];
75 for i in 1..values.len() {
76 let delta = values[i] - values[i - 1];
77 deltas.push(format_number(delta));
78 }
79 deltas
80}
81
82fn compute_sums(values: &[f64]) -> Vec<String> {
83 let mut sums = Vec::new();
84 let mut running_sum = 0.0;
85
86 for val in values {
87 running_sum += val;
88 sums.push(format_number(running_sum));
89 }
90 sums
91}
92
93fn compute_mavg(values: &[f64], window: usize) -> Vec<String> {
94 let mut results = Vec::new();
95
96 for i in 0..values.len() {
97 let start = if i >= window { i - window + 1 } else { 0 };
98 let end = i + 1;
99 let window_values = &values[start..end];
100 let avg = window_values.iter().sum::<f64>() / window_values.len() as f64;
101 results.push(format_number(avg));
102 }
103 results
104}
105
106fn compute_pct_change(values: &[f64]) -> Vec<String> {
107 if values.is_empty() {
108 return vec![];
109 }
110
111 let mut changes = vec!["null".to_string()]; for i in 1..values.len() {
113 if values[i - 1] == 0.0 {
114 changes.push("null".to_string());
115 } else {
116 let pct = ((values[i] - values[i - 1]) / values[i - 1]) * 100.0;
117 changes.push(format!("{pct:.2}%"));
118 }
119 }
120 changes
121}
122
123fn compute_rank(values: &[f64]) -> Vec<String> {
124 let mut sorted_unique: Vec<f64> = values.to_vec();
126 sorted_unique.sort_by(|a, b| a.partial_cmp(b).unwrap());
127 sorted_unique.dedup();
128
129 let mut rank_map = BTreeMap::new();
130 for (i, val) in sorted_unique.iter().enumerate() {
131 rank_map.insert(val.to_bits(), i + 1); }
133
134 values
136 .iter()
137 .map(|v| rank_map[&v.to_bits()].to_string())
138 .collect()
139}
140
141fn compute_cummax(values: &[f64]) -> Vec<String> {
142 let mut results = Vec::new();
143 let mut current_max = f64::NEG_INFINITY;
144
145 for val in values {
146 current_max = current_max.max(*val);
147 results.push(format_number(current_max));
148 }
149 results
150}
151
152fn compute_cummin(values: &[f64]) -> Vec<String> {
153 let mut results = Vec::new();
154 let mut current_min = f64::INFINITY;
155
156 for val in values {
157 current_min = current_min.min(*val);
158 results.push(format_number(current_min));
159 }
160 results
161}
162
163fn format_number(n: f64) -> String {
164 if n.fract() == 0.0 && n.abs() < 1e10 {
165 format!("{}", n as i64)
166 } else {
167 let formatted = format!("{n:.2}");
169 if formatted.contains('.') {
170 formatted
171 .trim_end_matches('0')
172 .trim_end_matches('.')
173 .to_string()
174 } else {
175 formatted
176 }
177 }
178}
179
180pub struct DeltasFunction;
183impl AggregateFunction for DeltasFunction {
184 fn name(&self) -> &'static str {
185 "DELTAS"
186 }
187
188 fn init(&self) -> AggregateState {
189 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Deltas, None))
190 }
191
192 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
193 if let AggregateState::Analytics(ref mut analytics) = state {
194 analytics.add(value)
195 } else {
196 Err(anyhow!("Invalid state for DELTAS"))
197 }
198 }
199
200 fn finalize(&self, state: AggregateState) -> DataValue {
201 if let AggregateState::Analytics(analytics) = state {
202 analytics.finalize()
203 } else {
204 DataValue::Null
205 }
206 }
207}
208
209pub struct SumsFunction;
210impl AggregateFunction for SumsFunction {
211 fn name(&self) -> &'static str {
212 "SUMS"
213 }
214
215 fn init(&self) -> AggregateState {
216 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Sums, None))
217 }
218
219 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
220 if let AggregateState::Analytics(ref mut analytics) = state {
221 analytics.add(value)
222 } else {
223 Err(anyhow!("Invalid state for SUMS"))
224 }
225 }
226
227 fn finalize(&self, state: AggregateState) -> DataValue {
228 if let AggregateState::Analytics(analytics) = state {
229 analytics.finalize()
230 } else {
231 DataValue::Null
232 }
233 }
234}
235
236pub struct MavgFunction;
237impl AggregateFunction for MavgFunction {
238 fn name(&self) -> &'static str {
239 "MAVG"
240 }
241
242 fn init(&self) -> AggregateState {
243 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Mavg, Some(3)))
245 }
246
247 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
248 if let AggregateState::Analytics(ref mut analytics) = state {
249 analytics.add(value)
250 } else {
251 Err(anyhow!("Invalid state for MAVG"))
252 }
253 }
254
255 fn finalize(&self, state: AggregateState) -> DataValue {
256 if let AggregateState::Analytics(analytics) = state {
257 analytics.finalize()
258 } else {
259 DataValue::Null
260 }
261 }
262}
263
264pub struct PctChangeFunction;
265impl AggregateFunction for PctChangeFunction {
266 fn name(&self) -> &'static str {
267 "PCT_CHANGE"
268 }
269
270 fn init(&self) -> AggregateState {
271 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::PctChange, None))
272 }
273
274 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
275 if let AggregateState::Analytics(ref mut analytics) = state {
276 analytics.add(value)
277 } else {
278 Err(anyhow!("Invalid state for PCT_CHANGE"))
279 }
280 }
281
282 fn finalize(&self, state: AggregateState) -> DataValue {
283 if let AggregateState::Analytics(analytics) = state {
284 analytics.finalize()
285 } else {
286 DataValue::Null
287 }
288 }
289}
290
291pub struct RankFunction;
292impl AggregateFunction for RankFunction {
293 fn name(&self) -> &'static str {
294 "RANK"
295 }
296
297 fn init(&self) -> AggregateState {
298 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Rank, None))
299 }
300
301 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
302 if let AggregateState::Analytics(ref mut analytics) = state {
303 analytics.add(value)
304 } else {
305 Err(anyhow!("Invalid state for RANK"))
306 }
307 }
308
309 fn finalize(&self, state: AggregateState) -> DataValue {
310 if let AggregateState::Analytics(analytics) = state {
311 analytics.finalize()
312 } else {
313 DataValue::Null
314 }
315 }
316}
317
318pub struct CumMaxFunction;
319impl AggregateFunction for CumMaxFunction {
320 fn name(&self) -> &'static str {
321 "CUMMAX"
322 }
323
324 fn init(&self) -> AggregateState {
325 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMax, None))
326 }
327
328 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
329 if let AggregateState::Analytics(ref mut analytics) = state {
330 analytics.add(value)
331 } else {
332 Err(anyhow!("Invalid state for CUMMAX"))
333 }
334 }
335
336 fn finalize(&self, state: AggregateState) -> DataValue {
337 if let AggregateState::Analytics(analytics) = state {
338 analytics.finalize()
339 } else {
340 DataValue::Null
341 }
342 }
343}
344
345pub struct CumMinFunction;
346impl AggregateFunction for CumMinFunction {
347 fn name(&self) -> &'static str {
348 "CUMMIN"
349 }
350
351 fn init(&self) -> AggregateState {
352 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMin, None))
353 }
354
355 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
356 if let AggregateState::Analytics(ref mut analytics) = state {
357 analytics.add(value)
358 } else {
359 Err(anyhow!("Invalid state for CUMMIN"))
360 }
361 }
362
363 fn finalize(&self, state: AggregateState) -> DataValue {
364 if let AggregateState::Analytics(analytics) = state {
365 analytics.finalize()
366 } else {
367 DataValue::Null
368 }
369 }
370}