Skip to main content

drasi_core/evaluation/functions/aggregation/
max.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Debug, sync::Arc};
16
17use crate::{
18    evaluation::{
19        temporal_constants,
20        variable_value::{
21            duration::Duration, zoned_datetime::ZonedDateTime, zoned_time::ZonedTime,
22        },
23        {FunctionError, FunctionEvaluationError},
24    },
25    interface::ResultIndex,
26};
27
28use async_trait::async_trait;
29
30use drasi_query_ast::ast;
31
32use crate::evaluation::{
33    variable_value::float::Float, variable_value::VariableValue, ExpressionEvaluationContext,
34};
35
36use chrono::{DateTime, Duration as ChronoDuration, LocalResult};
37
38use super::{super::AggregatingFunction, lazy_sorted_set::LazySortedSet, Accumulator};
39
40#[derive(Clone)]
41pub struct Max {}
42
43#[async_trait]
44impl AggregatingFunction for Max {
45    fn initialize_accumulator(
46        &self,
47        _context: &ExpressionEvaluationContext,
48        expression: &ast::FunctionExpression,
49        grouping_keys: &Vec<VariableValue>,
50        index: Arc<dyn ResultIndex>,
51    ) -> Accumulator {
52        Accumulator::LazySortedSet(LazySortedSet::new(
53            expression.position_in_query,
54            grouping_keys,
55            index,
56        ))
57    }
58
59    fn accumulator_is_lazy(&self) -> bool {
60        true
61    }
62
63    async fn apply(
64        &self,
65        _context: &ExpressionEvaluationContext,
66        args: Vec<VariableValue>,
67        accumulator: &mut Accumulator,
68    ) -> Result<VariableValue, FunctionError> {
69        if args.len() != 1 {
70            return Err(FunctionError {
71                function_name: "Max".to_string(),
72                error: FunctionEvaluationError::InvalidArgumentCount,
73            });
74        }
75
76        log::info!("Applying Max with args: {args:?}");
77        let accumulator = match accumulator {
78            Accumulator::LazySortedSet(accumulator) => accumulator,
79            _ => {
80                return Err(FunctionError {
81                    function_name: "Max".to_string(),
82                    error: FunctionEvaluationError::CorruptData,
83                })
84            }
85        };
86
87        match &args[0] {
88            VariableValue::Float(n) => {
89                let value = match n.as_f64() {
90                    Some(n) => n,
91                    None => {
92                        return Err(FunctionError {
93                            function_name: "Max".to_string(),
94                            error: FunctionEvaluationError::OverflowError,
95                        })
96                    }
97                };
98                accumulator.insert(-value).await;
99                match accumulator.get_head().await {
100                    Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
101                        Some(f) => f,
102                        None => {
103                            return Err(FunctionError {
104                                function_name: "Max".to_string(),
105                                error: FunctionEvaluationError::CorruptData,
106                            })
107                        }
108                    })),
109                    Ok(None) => Ok(VariableValue::Null),
110                    Err(e) => Err(FunctionError {
111                        function_name: "Max".to_string(),
112                        error: FunctionEvaluationError::IndexError(e),
113                    }),
114                }
115            }
116            VariableValue::Integer(n) => {
117                let value = match n.as_i64() {
118                    Some(n) => n,
119                    None => {
120                        return Err(FunctionError {
121                            function_name: "Max".to_string(),
122                            error: FunctionEvaluationError::OverflowError,
123                        })
124                    }
125                };
126                accumulator.insert(-(value as f64)).await;
127                match accumulator.get_head().await {
128                    Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
129                        Some(f) => f,
130                        None => {
131                            return Err(FunctionError {
132                                function_name: "Max".to_string(),
133                                error: FunctionEvaluationError::CorruptData,
134                            })
135                        }
136                    })),
137                    Ok(None) => Ok(VariableValue::Null),
138                    Err(e) => Err(FunctionError {
139                        function_name: "Max".to_string(),
140                        error: FunctionEvaluationError::IndexError(e),
141                    }),
142                }
143            }
144            VariableValue::ZonedDateTime(zdt) => {
145                let value = zdt.datetime().timestamp_millis() as f64;
146                accumulator.insert(-value).await;
147                match accumulator.get_head().await {
148                    Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
149                        ZonedDateTime::from_epoch_millis(-head as i64),
150                    )),
151                    Ok(None) => Ok(VariableValue::Null),
152                    Err(e) => Err(FunctionError {
153                        function_name: "Max".to_string(),
154                        error: FunctionEvaluationError::IndexError(e),
155                    }),
156                }
157            }
158            VariableValue::Duration(d) => {
159                let value = d.duration().num_milliseconds() as f64;
160                accumulator.insert(-value).await;
161                match accumulator.get_head().await {
162                    Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
163                        ChronoDuration::milliseconds(-head as i64),
164                        0,
165                        0,
166                    ))),
167                    Ok(None) => Ok(VariableValue::Null),
168                    Err(e) => Err(FunctionError {
169                        function_name: "Max".to_string(),
170                        error: FunctionEvaluationError::IndexError(e),
171                    }),
172                }
173            }
174            VariableValue::Date(d) => {
175                // For date (Chrono::NaiveDate), we can store the number of days since the epoch
176                let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
177                let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
178                accumulator.insert(-days_since_epoch).await;
179                match accumulator.get_head().await {
180                    Ok(Some(head)) => Ok(VariableValue::Date(
181                        reference_date + ChronoDuration::days(-head as i64),
182                    )),
183                    Ok(None) => Ok(VariableValue::Null),
184                    Err(e) => Err(FunctionError {
185                        function_name: "Max".to_string(),
186                        error: FunctionEvaluationError::IndexError(e),
187                    }),
188                }
189            }
190            VariableValue::LocalTime(t) => {
191                let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
192                let duration_since_midnight =
193                    t.signed_duration_since(reference_time).num_milliseconds() as f64;
194                accumulator.insert(-duration_since_midnight).await;
195
196                match accumulator.get_head().await {
197                    Ok(Some(head)) => Ok(VariableValue::LocalTime(
198                        reference_time + ChronoDuration::milliseconds(-head as i64),
199                    )),
200                    Ok(None) => Ok(VariableValue::Null),
201                    Err(e) => Err(FunctionError {
202                        function_name: "Max".to_string(),
203                        error: FunctionEvaluationError::IndexError(e),
204                    }),
205                }
206            }
207            VariableValue::LocalDateTime(dt) => {
208                let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
209                accumulator.insert(-duration_since_epoch).await;
210                match accumulator.get_head().await {
211                    Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
212                        DateTime::from_timestamp_millis(head as i64 * -1.0 as i64)
213                            .unwrap_or_default()
214                            .naive_local(),
215                    )),
216                    Ok(None) => Ok(VariableValue::Null),
217                    Err(e) => Err(FunctionError {
218                        function_name: "Max".to_string(),
219                        error: FunctionEvaluationError::IndexError(e),
220                    }),
221                }
222            }
223            VariableValue::ZonedTime(t) => {
224                let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
225                let epoch_datetime = match epoch_date
226                    .and_time(*t.time())
227                    .and_local_timezone(*t.offset())
228                {
229                    LocalResult::Single(dt) => dt,
230                    _ => {
231                        return Err(FunctionError {
232                            function_name: "Max".to_string(),
233                            error: FunctionEvaluationError::InvalidFormat {
234                                expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
235                                    .to_string(),
236                            },
237                        })
238                    }
239                };
240                let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
241                accumulator.insert(-duration_since_epoch).await;
242                match accumulator.get_head().await {
243                    Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
244                        (epoch_datetime + ChronoDuration::milliseconds(-head as i64)).time(),
245                        *temporal_constants::UTC_FIXED_OFFSET,
246                    ))),
247                    Ok(None) => Ok(VariableValue::Null),
248                    Err(e) => Err(FunctionError {
249                        function_name: "Max".to_string(),
250                        error: FunctionEvaluationError::IndexError(e),
251                    }),
252                }
253            }
254            VariableValue::Null => Ok(VariableValue::Null),
255            _ => Err(FunctionError {
256                function_name: "Max".to_string(),
257                error: FunctionEvaluationError::InvalidArgument(0),
258            }),
259        }
260    }
261
262    async fn revert(
263        &self,
264        _context: &ExpressionEvaluationContext,
265        args: Vec<VariableValue>,
266        accumulator: &mut Accumulator,
267    ) -> Result<VariableValue, FunctionError> {
268        if args.len() != 1 {
269            return Err(FunctionError {
270                function_name: "Max".to_string(),
271                error: FunctionEvaluationError::InvalidArgumentCount,
272            });
273        }
274        let accumulator = match accumulator {
275            Accumulator::LazySortedSet(accumulator) => accumulator,
276            _ => {
277                return Err(FunctionError {
278                    function_name: "Max".to_string(),
279                    error: FunctionEvaluationError::CorruptData,
280                })
281            }
282        };
283
284        match &args[0] {
285            VariableValue::Float(n) => {
286                let value = match n.as_f64() {
287                    Some(n) => n,
288                    None => {
289                        return Err(FunctionError {
290                            function_name: "Max".to_string(),
291                            error: FunctionEvaluationError::OverflowError,
292                        })
293                    }
294                };
295                accumulator.remove(-value).await;
296                match accumulator.get_head().await {
297                    Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
298                        Some(f) => f,
299                        None => {
300                            return Err(FunctionError {
301                                function_name: "Max".to_string(),
302                                error: FunctionEvaluationError::CorruptData,
303                            })
304                        }
305                    })),
306                    Ok(None) => Ok(VariableValue::Null),
307                    Err(e) => Err(FunctionError {
308                        function_name: "Max".to_string(),
309                        error: FunctionEvaluationError::IndexError(e),
310                    }),
311                }
312            }
313            VariableValue::Integer(n) => {
314                let value = match n.as_i64() {
315                    Some(n) => n,
316                    None => {
317                        return Err(FunctionError {
318                            function_name: "Max".to_string(),
319                            error: FunctionEvaluationError::OverflowError,
320                        })
321                    }
322                };
323                accumulator.remove(-(value as f64)).await;
324                match accumulator.get_head().await {
325                    Ok(Some(head)) => Ok(VariableValue::Float(match Float::from_f64(-head) {
326                        Some(f) => f,
327                        None => {
328                            return Err(FunctionError {
329                                function_name: "Max".to_string(),
330                                error: FunctionEvaluationError::CorruptData,
331                            })
332                        }
333                    })),
334                    Ok(None) => Ok(VariableValue::Null),
335                    Err(e) => Err(FunctionError {
336                        function_name: "Max".to_string(),
337                        error: FunctionEvaluationError::IndexError(e),
338                    }),
339                }
340            }
341            VariableValue::ZonedDateTime(zdt) => {
342                let value = zdt.datetime().timestamp_millis() as f64;
343                accumulator.remove(-value).await;
344                match accumulator.get_head().await {
345                    Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
346                        ZonedDateTime::from_epoch_millis(-head as i64),
347                    )),
348                    Ok(None) => Ok(VariableValue::Null),
349                    Err(e) => Err(FunctionError {
350                        function_name: "Max".to_string(),
351                        error: FunctionEvaluationError::IndexError(e),
352                    }),
353                }
354            }
355            VariableValue::Duration(d) => {
356                let value = d.duration().num_milliseconds() as f64;
357                accumulator.remove(-value).await;
358                match accumulator.get_head().await {
359                    Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
360                        ChronoDuration::milliseconds(-head as i64),
361                        0,
362                        0,
363                    ))),
364                    Ok(None) => Ok(VariableValue::Null),
365                    Err(e) => Err(FunctionError {
366                        function_name: "Max".to_string(),
367                        error: FunctionEvaluationError::IndexError(e),
368                    }),
369                }
370            }
371            VariableValue::Date(d) => {
372                // For date (Chrono::NaiveDate), we can store the number of days since the epoch
373                let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
374                let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
375                accumulator.remove(-days_since_epoch).await;
376                match accumulator.get_head().await {
377                    Ok(Some(head)) => Ok(VariableValue::Date(
378                        reference_date + ChronoDuration::days(-head as i64),
379                    )),
380                    Ok(None) => Ok(VariableValue::Null),
381                    Err(e) => Err(FunctionError {
382                        function_name: "Max".to_string(),
383                        error: FunctionEvaluationError::IndexError(e),
384                    }),
385                }
386            }
387            VariableValue::LocalTime(t) => {
388                let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
389                let duration_since_midnight =
390                    t.signed_duration_since(reference_time).num_milliseconds() as f64;
391                accumulator.remove(-duration_since_midnight).await;
392
393                match accumulator.get_head().await {
394                    Ok(Some(head)) => Ok(VariableValue::LocalTime(
395                        reference_time + ChronoDuration::milliseconds(-head as i64),
396                    )),
397                    Ok(None) => Ok(VariableValue::Null),
398                    Err(e) => Err(FunctionError {
399                        function_name: "Max".to_string(),
400                        error: FunctionEvaluationError::IndexError(e),
401                    }),
402                }
403            }
404            VariableValue::LocalDateTime(dt) => {
405                let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
406                accumulator.remove(-duration_since_epoch).await;
407                match accumulator.get_head().await {
408                    Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
409                        DateTime::from_timestamp_millis(head as i64 * -1.0 as i64)
410                            .unwrap_or_default()
411                            .naive_local(),
412                    )),
413                    Ok(None) => Ok(VariableValue::Null),
414                    Err(e) => Err(FunctionError {
415                        function_name: "Max".to_string(),
416                        error: FunctionEvaluationError::IndexError(e),
417                    }),
418                }
419            }
420            VariableValue::ZonedTime(t) => {
421                let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
422                let epoch_datetime = match epoch_date
423                    .and_time(*t.time())
424                    .and_local_timezone(*t.offset())
425                {
426                    LocalResult::Single(dt) => dt,
427                    _ => {
428                        return Err(FunctionError {
429                            function_name: "Max".to_string(),
430                            error: FunctionEvaluationError::InvalidFormat {
431                                expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
432                                    .to_string(),
433                            },
434                        })
435                    }
436                };
437                let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
438                accumulator.remove(-duration_since_epoch).await;
439                match accumulator.get_head().await {
440                    Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
441                        (epoch_datetime + ChronoDuration::milliseconds(-head as i64)).time(),
442                        *temporal_constants::UTC_FIXED_OFFSET,
443                    ))),
444                    Ok(None) => Ok(VariableValue::Null),
445                    Err(e) => Err(FunctionError {
446                        function_name: "Max".to_string(),
447                        error: FunctionEvaluationError::IndexError(e),
448                    }),
449                }
450            }
451            VariableValue::Null => Ok(VariableValue::Null),
452            _ => Err(FunctionError {
453                function_name: "Max".to_string(),
454                error: FunctionEvaluationError::InvalidArgument(0),
455            }),
456        }
457    }
458
459    async fn snapshot(
460        &self,
461        _context: &ExpressionEvaluationContext,
462        args: Vec<VariableValue>,
463        accumulator: &Accumulator,
464    ) -> Result<VariableValue, FunctionError> {
465        if args.len() != 1 {
466            return Err(FunctionError {
467                function_name: "Max".to_string(),
468                error: FunctionEvaluationError::InvalidArgumentCount,
469            });
470        }
471
472        let accumulator = match accumulator {
473            Accumulator::LazySortedSet(accumulator) => accumulator,
474            _ => {
475                return Err(FunctionError {
476                    function_name: "Max".to_string(),
477                    error: FunctionEvaluationError::CorruptData,
478                })
479            }
480        };
481
482        let value = match accumulator.get_head().await {
483            Ok(Some(head)) => -head,
484            Ok(None) => return Ok(VariableValue::Null),
485            Err(e) => {
486                return Err(FunctionError {
487                    function_name: "Max".to_string(),
488                    error: FunctionEvaluationError::IndexError(e),
489                })
490            }
491        };
492
493        return match &args[0] {
494            VariableValue::Float(_) => Ok(VariableValue::Float(match Float::from_f64(value) {
495                Some(f) => f,
496                None => {
497                    return Err(FunctionError {
498                        function_name: "Max".to_string(),
499                        error: FunctionEvaluationError::OverflowError,
500                    })
501                }
502            })),
503            VariableValue::Integer(_) => Ok(VariableValue::Integer((value as i64).into())),
504            VariableValue::ZonedDateTime(_) => Ok(VariableValue::ZonedDateTime(
505                ZonedDateTime::from_epoch_millis(value as i64),
506            )),
507            VariableValue::Duration(_) => Ok(VariableValue::Duration(Duration::new(
508                ChronoDuration::milliseconds(value as i64),
509                0,
510                0,
511            ))),
512            VariableValue::Date(_) => {
513                let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
514                Ok(VariableValue::Date(
515                    reference_date + ChronoDuration::days(value as i64),
516                ))
517            }
518            VariableValue::LocalTime(_) => {
519                let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
520                Ok(VariableValue::LocalTime(
521                    reference_time + ChronoDuration::milliseconds(value as i64),
522                ))
523            }
524            VariableValue::LocalDateTime(_) => Ok(VariableValue::LocalDateTime(
525                DateTime::from_timestamp_millis(value as i64)
526                    .unwrap_or_default()
527                    .naive_local(),
528            )),
529            VariableValue::ZonedTime(_) => {
530                let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
531                let epoch_datetime = match epoch_date
532                    .and_time(*temporal_constants::MIDNIGHT_NAIVE_TIME)
533                    .and_local_timezone(*temporal_constants::UTC_FIXED_OFFSET)
534                {
535                    LocalResult::Single(dt) => dt,
536                    _ => {
537                        return Err(FunctionError {
538                            function_name: "Max".to_string(),
539                            error: FunctionEvaluationError::InvalidFormat {
540                                expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
541                                    .to_string(),
542                            },
543                        })
544                    }
545                };
546                Ok(VariableValue::ZonedTime(ZonedTime::new(
547                    (epoch_datetime + ChronoDuration::milliseconds(value as i64)).time(),
548                    *temporal_constants::UTC_FIXED_OFFSET,
549                )))
550            }
551            VariableValue::Null => Ok(VariableValue::Null),
552            _ => Err(FunctionError {
553                function_name: "Max".to_string(),
554                error: FunctionEvaluationError::InvalidArgument(0),
555            }),
556        };
557    }
558}
559
560impl Debug for Max {
561    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
562        write!(f, "Max")
563    }
564}