1use std::{fmt::Debug, sync::Arc};
16
17use crate::{
18 evaluation::{
19 temporal_constants,
20 variable_value::{
21 duration::Duration, zoned_datetime::ZonedDateTime, zoned_time::ZonedTime,
22 },
23 {FunctionError, FunctionEvaluationError},
24 },
25 interface::ResultIndex,
26};
27
28use async_trait::async_trait;
29
30use drasi_query_ast::ast;
31
32use crate::evaluation::{
33 variable_value::float::Float, variable_value::VariableValue, ExpressionEvaluationContext,
34};
35
36use chrono::{DateTime, Duration as ChronoDuration, LocalResult};
37
38use super::{super::AggregatingFunction, lazy_sorted_set::LazySortedSet, Accumulator};
39
40#[derive(Clone)]
41pub struct Max {}
42
43#[async_trait]
44impl AggregatingFunction for Max {
45 fn initialize_accumulator(
46 &self,
47 _context: &ExpressionEvaluationContext,
48 expression: &ast::FunctionExpression,
49 grouping_keys: &Vec<VariableValue>,
50 index: Arc<dyn ResultIndex>,
51 ) -> Accumulator {
52 Accumulator::LazySortedSet(LazySortedSet::new(
53 expression.position_in_query,
54 grouping_keys,
55 index,
56 ))
57 }
58
59 fn accumulator_is_lazy(&self) -> bool {
60 true
61 }
62
63 async fn apply(
64 &self,
65 _context: &ExpressionEvaluationContext,
66 args: Vec<VariableValue>,
67 accumulator: &mut Accumulator,
68 ) -> Result<VariableValue, FunctionError> {
69 if args.len() != 1 {
70 return Err(FunctionError {
71 function_name: "Max".to_string(),
72 error: FunctionEvaluationError::InvalidArgumentCount,
73 });
74 }
75
76 log::info!("Applying Max with args: {args:?}");
77 let accumulator = match accumulator {
78 Accumulator::LazySortedSet(accumulator) => accumulator,
79 _ => {
80 return Err(FunctionError {
81 function_name: "Max".to_string(),
82 error: FunctionEvaluationError::CorruptData,
83 })
84 }
85 };
86
87 match &args[0] {
88 VariableValue::Float(n) => {
89 let value = match n.as_f64() {
90 Some(n) => n,
91 None => {
92 return Err(FunctionError {
93 function_name: "Max".to_string(),
94 error: FunctionEvaluationError::OverflowError,
95 })
96 }
97 };
98 accumulator.insert(-value).await;
99 match accumulator.get_head().await {
100 Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
101 Some(f) => f,
102 None => {
103 return Err(FunctionError {
104 function_name: "Max".to_string(),
105 error: FunctionEvaluationError::CorruptData,
106 })
107 }
108 })),
109 Ok(None) => Ok(VariableValue::Null),
110 Err(e) => Err(FunctionError {
111 function_name: "Max".to_string(),
112 error: FunctionEvaluationError::IndexError(e),
113 }),
114 }
115 }
116 VariableValue::Integer(n) => {
117 let value = match n.as_i64() {
118 Some(n) => n,
119 None => {
120 return Err(FunctionError {
121 function_name: "Max".to_string(),
122 error: FunctionEvaluationError::OverflowError,
123 })
124 }
125 };
126 accumulator.insert(-(value as f64)).await;
127 match accumulator.get_head().await {
128 Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
129 Some(f) => f,
130 None => {
131 return Err(FunctionError {
132 function_name: "Max".to_string(),
133 error: FunctionEvaluationError::CorruptData,
134 })
135 }
136 })),
137 Ok(None) => Ok(VariableValue::Null),
138 Err(e) => Err(FunctionError {
139 function_name: "Max".to_string(),
140 error: FunctionEvaluationError::IndexError(e),
141 }),
142 }
143 }
144 VariableValue::ZonedDateTime(zdt) => {
145 let value = zdt.datetime().timestamp_millis() as f64;
146 accumulator.insert(-value).await;
147 match accumulator.get_head().await {
148 Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
149 ZonedDateTime::from_epoch_millis(-head as i64),
150 )),
151 Ok(None) => Ok(VariableValue::Null),
152 Err(e) => Err(FunctionError {
153 function_name: "Max".to_string(),
154 error: FunctionEvaluationError::IndexError(e),
155 }),
156 }
157 }
158 VariableValue::Duration(d) => {
159 let value = d.duration().num_milliseconds() as f64;
160 accumulator.insert(-value).await;
161 match accumulator.get_head().await {
162 Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
163 ChronoDuration::milliseconds(-head as i64),
164 0,
165 0,
166 ))),
167 Ok(None) => Ok(VariableValue::Null),
168 Err(e) => Err(FunctionError {
169 function_name: "Max".to_string(),
170 error: FunctionEvaluationError::IndexError(e),
171 }),
172 }
173 }
174 VariableValue::Date(d) => {
175 let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
177 let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
178 accumulator.insert(-days_since_epoch).await;
179 match accumulator.get_head().await {
180 Ok(Some(head)) => Ok(VariableValue::Date(
181 reference_date + ChronoDuration::days(-head as i64),
182 )),
183 Ok(None) => Ok(VariableValue::Null),
184 Err(e) => Err(FunctionError {
185 function_name: "Max".to_string(),
186 error: FunctionEvaluationError::IndexError(e),
187 }),
188 }
189 }
190 VariableValue::LocalTime(t) => {
191 let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
192 let duration_since_midnight =
193 t.signed_duration_since(reference_time).num_milliseconds() as f64;
194 accumulator.insert(-duration_since_midnight).await;
195
196 match accumulator.get_head().await {
197 Ok(Some(head)) => Ok(VariableValue::LocalTime(
198 reference_time + ChronoDuration::milliseconds(-head as i64),
199 )),
200 Ok(None) => Ok(VariableValue::Null),
201 Err(e) => Err(FunctionError {
202 function_name: "Max".to_string(),
203 error: FunctionEvaluationError::IndexError(e),
204 }),
205 }
206 }
207 VariableValue::LocalDateTime(dt) => {
208 let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
209 accumulator.insert(-duration_since_epoch).await;
210 match accumulator.get_head().await {
211 Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
212 DateTime::from_timestamp_millis(head as i64 * -1.0 as i64)
213 .unwrap_or_default()
214 .naive_local(),
215 )),
216 Ok(None) => Ok(VariableValue::Null),
217 Err(e) => Err(FunctionError {
218 function_name: "Max".to_string(),
219 error: FunctionEvaluationError::IndexError(e),
220 }),
221 }
222 }
223 VariableValue::ZonedTime(t) => {
224 let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
225 let epoch_datetime = match epoch_date
226 .and_time(*t.time())
227 .and_local_timezone(*t.offset())
228 {
229 LocalResult::Single(dt) => dt,
230 _ => {
231 return Err(FunctionError {
232 function_name: "Max".to_string(),
233 error: FunctionEvaluationError::InvalidFormat {
234 expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
235 .to_string(),
236 },
237 })
238 }
239 };
240 let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
241 accumulator.insert(-duration_since_epoch).await;
242 match accumulator.get_head().await {
243 Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
244 (epoch_datetime + ChronoDuration::milliseconds(-head as i64)).time(),
245 *temporal_constants::UTC_FIXED_OFFSET,
246 ))),
247 Ok(None) => Ok(VariableValue::Null),
248 Err(e) => Err(FunctionError {
249 function_name: "Max".to_string(),
250 error: FunctionEvaluationError::IndexError(e),
251 }),
252 }
253 }
254 VariableValue::Null => Ok(VariableValue::Null),
255 _ => Err(FunctionError {
256 function_name: "Max".to_string(),
257 error: FunctionEvaluationError::InvalidArgument(0),
258 }),
259 }
260 }
261
262 async fn revert(
263 &self,
264 _context: &ExpressionEvaluationContext,
265 args: Vec<VariableValue>,
266 accumulator: &mut Accumulator,
267 ) -> Result<VariableValue, FunctionError> {
268 if args.len() != 1 {
269 return Err(FunctionError {
270 function_name: "Max".to_string(),
271 error: FunctionEvaluationError::InvalidArgumentCount,
272 });
273 }
274 let accumulator = match accumulator {
275 Accumulator::LazySortedSet(accumulator) => accumulator,
276 _ => {
277 return Err(FunctionError {
278 function_name: "Max".to_string(),
279 error: FunctionEvaluationError::CorruptData,
280 })
281 }
282 };
283
284 match &args[0] {
285 VariableValue::Float(n) => {
286 let value = match n.as_f64() {
287 Some(n) => n,
288 None => {
289 return Err(FunctionError {
290 function_name: "Max".to_string(),
291 error: FunctionEvaluationError::OverflowError,
292 })
293 }
294 };
295 accumulator.remove(-value).await;
296 match accumulator.get_head().await {
297 Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
298 Some(f) => f,
299 None => {
300 return Err(FunctionError {
301 function_name: "Max".to_string(),
302 error: FunctionEvaluationError::CorruptData,
303 })
304 }
305 })),
306 Ok(None) => Ok(VariableValue::Null),
307 Err(e) => Err(FunctionError {
308 function_name: "Max".to_string(),
309 error: FunctionEvaluationError::IndexError(e),
310 }),
311 }
312 }
313 VariableValue::Integer(n) => {
314 let value = match n.as_i64() {
315 Some(n) => n,
316 None => {
317 return Err(FunctionError {
318 function_name: "Max".to_string(),
319 error: FunctionEvaluationError::OverflowError,
320 })
321 }
322 };
323 accumulator.remove(-(value as f64)).await;
324 match accumulator.get_head().await {
325 Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
326 Some(f) => f,
327 None => {
328 return Err(FunctionError {
329 function_name: "Max".to_string(),
330 error: FunctionEvaluationError::CorruptData,
331 })
332 }
333 })),
334 Ok(None) => Ok(VariableValue::Null),
335 Err(e) => Err(FunctionError {
336 function_name: "Max".to_string(),
337 error: FunctionEvaluationError::IndexError(e),
338 }),
339 }
340 }
341 VariableValue::ZonedDateTime(zdt) => {
342 let value = zdt.datetime().timestamp_millis() as f64;
343 accumulator.remove(-value).await;
344 match accumulator.get_head().await {
345 Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
346 ZonedDateTime::from_epoch_millis(-head as i64),
347 )),
348 Ok(None) => Ok(VariableValue::Null),
349 Err(e) => Err(FunctionError {
350 function_name: "Max".to_string(),
351 error: FunctionEvaluationError::IndexError(e),
352 }),
353 }
354 }
355 VariableValue::Duration(d) => {
356 let value = d.duration().num_milliseconds() as f64;
357 accumulator.remove(-value).await;
358 match accumulator.get_head().await {
359 Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
360 ChronoDuration::milliseconds(-head as i64),
361 0,
362 0,
363 ))),
364 Ok(None) => Ok(VariableValue::Null),
365 Err(e) => Err(FunctionError {
366 function_name: "Max".to_string(),
367 error: FunctionEvaluationError::IndexError(e),
368 }),
369 }
370 }
371 VariableValue::Date(d) => {
372 let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
374 let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
375 accumulator.remove(-days_since_epoch).await;
376 match accumulator.get_head().await {
377 Ok(Some(head)) => Ok(VariableValue::Date(
378 reference_date + ChronoDuration::days(-head as i64),
379 )),
380 Ok(None) => Ok(VariableValue::Null),
381 Err(e) => Err(FunctionError {
382 function_name: "Max".to_string(),
383 error: FunctionEvaluationError::IndexError(e),
384 }),
385 }
386 }
387 VariableValue::LocalTime(t) => {
388 let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
389 let duration_since_midnight =
390 t.signed_duration_since(reference_time).num_milliseconds() as f64;
391 accumulator.remove(-duration_since_midnight).await;
392
393 match accumulator.get_head().await {
394 Ok(Some(head)) => Ok(VariableValue::LocalTime(
395 reference_time + ChronoDuration::milliseconds(-head as i64),
396 )),
397 Ok(None) => Ok(VariableValue::Null),
398 Err(e) => Err(FunctionError {
399 function_name: "Max".to_string(),
400 error: FunctionEvaluationError::IndexError(e),
401 }),
402 }
403 }
404 VariableValue::LocalDateTime(dt) => {
405 let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
406 accumulator.remove(-duration_since_epoch).await;
407 match accumulator.get_head().await {
408 Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
409 DateTime::from_timestamp_millis(head as i64 * -1.0 as i64)
410 .unwrap_or_default()
411 .naive_local(),
412 )),
413 Ok(None) => Ok(VariableValue::Null),
414 Err(e) => Err(FunctionError {
415 function_name: "Max".to_string(),
416 error: FunctionEvaluationError::IndexError(e),
417 }),
418 }
419 }
420 VariableValue::ZonedTime(t) => {
421 let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
422 let epoch_datetime = match epoch_date
423 .and_time(*t.time())
424 .and_local_timezone(*t.offset())
425 {
426 LocalResult::Single(dt) => dt,
427 _ => {
428 return Err(FunctionError {
429 function_name: "Max".to_string(),
430 error: FunctionEvaluationError::InvalidFormat {
431 expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
432 .to_string(),
433 },
434 })
435 }
436 };
437 let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
438 accumulator.remove(-duration_since_epoch).await;
439 match accumulator.get_head().await {
440 Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
441 (epoch_datetime + ChronoDuration::milliseconds(-head as i64)).time(),
442 *temporal_constants::UTC_FIXED_OFFSET,
443 ))),
444 Ok(None) => Ok(VariableValue::Null),
445 Err(e) => Err(FunctionError {
446 function_name: "Max".to_string(),
447 error: FunctionEvaluationError::IndexError(e),
448 }),
449 }
450 }
451 VariableValue::Null => Ok(VariableValue::Null),
452 _ => Err(FunctionError {
453 function_name: "Max".to_string(),
454 error: FunctionEvaluationError::InvalidArgument(0),
455 }),
456 }
457 }
458
459 async fn snapshot(
460 &self,
461 _context: &ExpressionEvaluationContext,
462 args: Vec<VariableValue>,
463 accumulator: &Accumulator,
464 ) -> Result<VariableValue, FunctionError> {
465 if args.len() != 1 {
466 return Err(FunctionError {
467 function_name: "Max".to_string(),
468 error: FunctionEvaluationError::InvalidArgumentCount,
469 });
470 }
471
472 let accumulator = match accumulator {
473 Accumulator::LazySortedSet(accumulator) => accumulator,
474 _ => {
475 return Err(FunctionError {
476 function_name: "Max".to_string(),
477 error: FunctionEvaluationError::CorruptData,
478 })
479 }
480 };
481
482 let value = match accumulator.get_head().await {
483 Ok(Some(head)) => -head,
484 Ok(None) => return Ok(VariableValue::Null),
485 Err(e) => {
486 return Err(FunctionError {
487 function_name: "Max".to_string(),
488 error: FunctionEvaluationError::IndexError(e),
489 })
490 }
491 };
492
493 return match &args[0] {
494 VariableValue::Float(_) => Ok(VariableValue::Float(match Float::from_f64(value) {
495 Some(f) => f,
496 None => {
497 return Err(FunctionError {
498 function_name: "Max".to_string(),
499 error: FunctionEvaluationError::OverflowError,
500 })
501 }
502 })),
503 VariableValue::Integer(_) => Ok(VariableValue::Integer((value as i64).into())),
504 VariableValue::ZonedDateTime(_) => Ok(VariableValue::ZonedDateTime(
505 ZonedDateTime::from_epoch_millis(value as i64),
506 )),
507 VariableValue::Duration(_) => Ok(VariableValue::Duration(Duration::new(
508 ChronoDuration::milliseconds(value as i64),
509 0,
510 0,
511 ))),
512 VariableValue::Date(_) => {
513 let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
514 Ok(VariableValue::Date(
515 reference_date + ChronoDuration::days(value as i64),
516 ))
517 }
518 VariableValue::LocalTime(_) => {
519 let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
520 Ok(VariableValue::LocalTime(
521 reference_time + ChronoDuration::milliseconds(value as i64),
522 ))
523 }
524 VariableValue::LocalDateTime(_) => Ok(VariableValue::LocalDateTime(
525 DateTime::from_timestamp_millis(value as i64)
526 .unwrap_or_default()
527 .naive_local(),
528 )),
529 VariableValue::ZonedTime(_) => {
530 let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
531 let epoch_datetime = match epoch_date
532 .and_time(*temporal_constants::MIDNIGHT_NAIVE_TIME)
533 .and_local_timezone(*temporal_constants::UTC_FIXED_OFFSET)
534 {
535 LocalResult::Single(dt) => dt,
536 _ => {
537 return Err(FunctionError {
538 function_name: "Max".to_string(),
539 error: FunctionEvaluationError::InvalidFormat {
540 expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
541 .to_string(),
542 },
543 })
544 }
545 };
546 Ok(VariableValue::ZonedTime(ZonedTime::new(
547 (epoch_datetime + ChronoDuration::milliseconds(value as i64)).time(),
548 *temporal_constants::UTC_FIXED_OFFSET,
549 )))
550 }
551 VariableValue::Null => Ok(VariableValue::Null),
552 _ => Err(FunctionError {
553 function_name: "Max".to_string(),
554 error: FunctionEvaluationError::InvalidArgument(0),
555 }),
556 };
557 }
558}
559
560impl Debug for Max {
561 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
562 write!(f, "Max")
563 }
564}