sql_cli/sql/aggregates/
analytics.rs1use anyhow::{anyhow, Result};
4use std::collections::BTreeMap;
5
6use super::{AggregateFunction, AggregateState};
7use crate::data::datatable::DataValue;
8
9#[derive(Debug, Clone)]
11pub struct AnalyticsState {
12 pub function_type: AnalyticsType,
13 pub values: Vec<f64>,
14 pub window_size: Option<usize>,
15}
16
17#[derive(Debug, Clone)]
18pub enum AnalyticsType {
19 Deltas,
20 Sums,
21 Mavg,
22 PctChange,
23 Rank,
24 CumMax,
25 CumMin,
26}
27
28impl AnalyticsState {
29 pub fn new(function_type: AnalyticsType, window_size: Option<usize>) -> Self {
30 Self {
31 function_type,
32 values: Vec::new(),
33 window_size,
34 }
35 }
36
37 pub fn add(&mut self, value: &DataValue) -> Result<()> {
38 let num = match value {
39 DataValue::Null => return Ok(()), DataValue::Integer(n) => *n as f64,
41 DataValue::Float(f) => *f,
42 _ => return Err(anyhow!("Analytics functions require numeric values")),
43 };
44 self.values.push(num);
45 Ok(())
46 }
47
48 pub fn finalize(self) -> DataValue {
49 if self.values.is_empty() {
50 return DataValue::Null;
51 }
52
53 let result = match self.function_type {
54 AnalyticsType::Deltas => compute_deltas(&self.values),
55 AnalyticsType::Sums => compute_sums(&self.values),
56 AnalyticsType::Mavg => compute_mavg(&self.values, self.window_size.unwrap_or(3)),
57 AnalyticsType::PctChange => compute_pct_change(&self.values),
58 AnalyticsType::Rank => compute_rank(&self.values),
59 AnalyticsType::CumMax => compute_cummax(&self.values),
60 AnalyticsType::CumMin => compute_cummin(&self.values),
61 };
62
63 DataValue::String(format!("[{}]", result.join(", ")))
64 }
65}
66
67fn compute_deltas(values: &[f64]) -> Vec<String> {
68 if values.is_empty() {
69 return vec![];
70 }
71
72 let mut deltas = vec![format_number(values[0])];
73 for i in 1..values.len() {
74 let delta = values[i] - values[i - 1];
75 deltas.push(format_number(delta));
76 }
77 deltas
78}
79
80fn compute_sums(values: &[f64]) -> Vec<String> {
81 let mut sums = Vec::new();
82 let mut running_sum = 0.0;
83
84 for val in values {
85 running_sum += val;
86 sums.push(format_number(running_sum));
87 }
88 sums
89}
90
91fn compute_mavg(values: &[f64], window: usize) -> Vec<String> {
92 let mut results = Vec::new();
93
94 for i in 0..values.len() {
95 let start = if i >= window { i - window + 1 } else { 0 };
96 let end = i + 1;
97 let window_values = &values[start..end];
98 let avg = window_values.iter().sum::<f64>() / window_values.len() as f64;
99 results.push(format_number(avg));
100 }
101 results
102}
103
104fn compute_pct_change(values: &[f64]) -> Vec<String> {
105 if values.is_empty() {
106 return vec![];
107 }
108
109 let mut changes = vec!["null".to_string()]; for i in 1..values.len() {
111 if values[i - 1] == 0.0 {
112 changes.push("null".to_string());
113 } else {
114 let pct = ((values[i] - values[i - 1]) / values[i - 1]) * 100.0;
115 changes.push(format!("{:.2}%", pct));
116 }
117 }
118 changes
119}
120
121fn compute_rank(values: &[f64]) -> Vec<String> {
122 let mut sorted_unique: Vec<f64> = values.to_vec();
124 sorted_unique.sort_by(|a, b| a.partial_cmp(b).unwrap());
125 sorted_unique.dedup();
126
127 let mut rank_map = BTreeMap::new();
128 for (i, val) in sorted_unique.iter().enumerate() {
129 rank_map.insert(val.to_bits(), i + 1); }
131
132 values
134 .iter()
135 .map(|v| rank_map[&v.to_bits()].to_string())
136 .collect()
137}
138
139fn compute_cummax(values: &[f64]) -> Vec<String> {
140 let mut results = Vec::new();
141 let mut current_max = f64::NEG_INFINITY;
142
143 for val in values {
144 current_max = current_max.max(*val);
145 results.push(format_number(current_max));
146 }
147 results
148}
149
150fn compute_cummin(values: &[f64]) -> Vec<String> {
151 let mut results = Vec::new();
152 let mut current_min = f64::INFINITY;
153
154 for val in values {
155 current_min = current_min.min(*val);
156 results.push(format_number(current_min));
157 }
158 results
159}
160
161fn format_number(n: f64) -> String {
162 if n.fract() == 0.0 && n.abs() < 1e10 {
163 format!("{}", n as i64)
164 } else {
165 let formatted = format!("{:.2}", n);
167 if formatted.contains('.') {
168 formatted
169 .trim_end_matches('0')
170 .trim_end_matches('.')
171 .to_string()
172 } else {
173 formatted
174 }
175 }
176}
177
178pub struct DeltasFunction;
181impl AggregateFunction for DeltasFunction {
182 fn name(&self) -> &str {
183 "DELTAS"
184 }
185
186 fn init(&self) -> AggregateState {
187 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Deltas, None))
188 }
189
190 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
191 if let AggregateState::Analytics(ref mut analytics) = state {
192 analytics.add(value)
193 } else {
194 Err(anyhow!("Invalid state for DELTAS"))
195 }
196 }
197
198 fn finalize(&self, state: AggregateState) -> DataValue {
199 if let AggregateState::Analytics(analytics) = state {
200 analytics.finalize()
201 } else {
202 DataValue::Null
203 }
204 }
205}
206
207pub struct SumsFunction;
208impl AggregateFunction for SumsFunction {
209 fn name(&self) -> &str {
210 "SUMS"
211 }
212
213 fn init(&self) -> AggregateState {
214 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Sums, None))
215 }
216
217 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
218 if let AggregateState::Analytics(ref mut analytics) = state {
219 analytics.add(value)
220 } else {
221 Err(anyhow!("Invalid state for SUMS"))
222 }
223 }
224
225 fn finalize(&self, state: AggregateState) -> DataValue {
226 if let AggregateState::Analytics(analytics) = state {
227 analytics.finalize()
228 } else {
229 DataValue::Null
230 }
231 }
232}
233
234pub struct MavgFunction;
235impl AggregateFunction for MavgFunction {
236 fn name(&self) -> &str {
237 "MAVG"
238 }
239
240 fn init(&self) -> AggregateState {
241 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Mavg, Some(3)))
243 }
244
245 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
246 if let AggregateState::Analytics(ref mut analytics) = state {
247 analytics.add(value)
248 } else {
249 Err(anyhow!("Invalid state for MAVG"))
250 }
251 }
252
253 fn finalize(&self, state: AggregateState) -> DataValue {
254 if let AggregateState::Analytics(analytics) = state {
255 analytics.finalize()
256 } else {
257 DataValue::Null
258 }
259 }
260}
261
262pub struct PctChangeFunction;
263impl AggregateFunction for PctChangeFunction {
264 fn name(&self) -> &str {
265 "PCT_CHANGE"
266 }
267
268 fn init(&self) -> AggregateState {
269 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::PctChange, None))
270 }
271
272 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
273 if let AggregateState::Analytics(ref mut analytics) = state {
274 analytics.add(value)
275 } else {
276 Err(anyhow!("Invalid state for PCT_CHANGE"))
277 }
278 }
279
280 fn finalize(&self, state: AggregateState) -> DataValue {
281 if let AggregateState::Analytics(analytics) = state {
282 analytics.finalize()
283 } else {
284 DataValue::Null
285 }
286 }
287}
288
289pub struct RankFunction;
290impl AggregateFunction for RankFunction {
291 fn name(&self) -> &str {
292 "RANK"
293 }
294
295 fn init(&self) -> AggregateState {
296 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::Rank, None))
297 }
298
299 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
300 if let AggregateState::Analytics(ref mut analytics) = state {
301 analytics.add(value)
302 } else {
303 Err(anyhow!("Invalid state for RANK"))
304 }
305 }
306
307 fn finalize(&self, state: AggregateState) -> DataValue {
308 if let AggregateState::Analytics(analytics) = state {
309 analytics.finalize()
310 } else {
311 DataValue::Null
312 }
313 }
314}
315
316pub struct CumMaxFunction;
317impl AggregateFunction for CumMaxFunction {
318 fn name(&self) -> &str {
319 "CUMMAX"
320 }
321
322 fn init(&self) -> AggregateState {
323 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMax, None))
324 }
325
326 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
327 if let AggregateState::Analytics(ref mut analytics) = state {
328 analytics.add(value)
329 } else {
330 Err(anyhow!("Invalid state for CUMMAX"))
331 }
332 }
333
334 fn finalize(&self, state: AggregateState) -> DataValue {
335 if let AggregateState::Analytics(analytics) = state {
336 analytics.finalize()
337 } else {
338 DataValue::Null
339 }
340 }
341}
342
343pub struct CumMinFunction;
344impl AggregateFunction for CumMinFunction {
345 fn name(&self) -> &str {
346 "CUMMIN"
347 }
348
349 fn init(&self) -> AggregateState {
350 AggregateState::Analytics(AnalyticsState::new(AnalyticsType::CumMin, None))
351 }
352
353 fn accumulate(&self, state: &mut AggregateState, value: &DataValue) -> Result<()> {
354 if let AggregateState::Analytics(ref mut analytics) = state {
355 analytics.add(value)
356 } else {
357 Err(anyhow!("Invalid state for CUMMIN"))
358 }
359 }
360
361 fn finalize(&self, state: AggregateState) -> DataValue {
362 if let AggregateState::Analytics(analytics) = state {
363 analytics.finalize()
364 } else {
365 DataValue::Null
366 }
367 }
368}