drasi_core/evaluation/functions/aggregation/
min.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Debug, sync::Arc};
16
17use crate::{
18    evaluation::{
19        temporal_constants, variable_value::zoned_datetime::ZonedDateTime, FunctionError,
20        FunctionEvaluationError,
21    },
22    interface::ResultIndex,
23};
24
25use async_trait::async_trait;
26
27use drasi_query_ast::ast;
28
29use crate::evaluation::{
30    variable_value::duration::Duration, variable_value::float::Float,
31    variable_value::zoned_time::ZonedTime, variable_value::VariableValue,
32    ExpressionEvaluationContext,
33};
34
35use super::{super::AggregatingFunction, lazy_sorted_set::LazySortedSet, Accumulator};
36use chrono::{offset::LocalResult, DateTime, Duration as ChronoDuration};
37
38#[derive(Clone)]
39pub struct Min {}
40
41#[async_trait]
42impl AggregatingFunction for Min {
43    fn initialize_accumulator(
44        &self,
45        _context: &ExpressionEvaluationContext,
46        expression: &ast::FunctionExpression,
47        grouping_keys: &Vec<VariableValue>,
48        index: Arc<dyn ResultIndex>,
49    ) -> Accumulator {
50        Accumulator::LazySortedSet(LazySortedSet::new(
51            expression.position_in_query,
52            grouping_keys,
53            index,
54        ))
55    }
56
57    fn accumulator_is_lazy(&self) -> bool {
58        true
59    }
60
61    async fn apply(
62        &self,
63        _context: &ExpressionEvaluationContext,
64        args: Vec<VariableValue>,
65        accumulator: &mut Accumulator,
66    ) -> Result<VariableValue, FunctionError> {
67        if args.len() != 1 {
68            return Err(FunctionError {
69                function_name: "Min".to_string(),
70                error: FunctionEvaluationError::InvalidArgumentCount,
71            });
72        }
73
74        let accumulator = match accumulator {
75            Accumulator::LazySortedSet(accumulator) => accumulator,
76            _ => {
77                return Err(FunctionError {
78                    function_name: "Min".to_string(),
79                    error: FunctionEvaluationError::CorruptData,
80                })
81            }
82        };
83
84        match &args[0] {
85            VariableValue::Float(n) => {
86                let value = match n.as_f64() {
87                    Some(n) => n,
88                    None => {
89                        return Err(FunctionError {
90                            function_name: "Min".to_string(),
91                            error: FunctionEvaluationError::OverflowError,
92                        })
93                    }
94                };
95                accumulator.insert(value).await;
96                match accumulator.get_head().await {
97                    Ok(Some(head)) => match Float::from_f64(head) {
98                        Some(f) => Ok(VariableValue::Float(f)),
99                        None => Err(FunctionError {
100                            function_name: "Min".to_string(),
101                            error: FunctionEvaluationError::OverflowError,
102                        }),
103                    },
104                    Ok(None) => Ok(VariableValue::Null),
105                    Err(e) => Err(FunctionError {
106                        function_name: "Min".to_string(),
107                        error: FunctionEvaluationError::IndexError(e),
108                    }),
109                }
110            }
111            VariableValue::Integer(n) => {
112                let value = match n.as_i64() {
113                    Some(n) => n,
114                    None => {
115                        return Err(FunctionError {
116                            function_name: "Min".to_string(),
117                            error: FunctionEvaluationError::OverflowError,
118                        })
119                    }
120                };
121                accumulator.insert(value as f64).await;
122                match accumulator.get_head().await {
123                    Ok(Some(head)) => match Float::from_f64(head) {
124                        Some(f) => Ok(VariableValue::Float(f)),
125                        None => Err(FunctionError {
126                            function_name: "Min".to_string(),
127                            error: FunctionEvaluationError::OverflowError,
128                        }),
129                    },
130                    Ok(None) => Ok(VariableValue::Null),
131                    Err(e) => Err(FunctionError {
132                        function_name: "Min".to_string(),
133                        error: FunctionEvaluationError::IndexError(e),
134                    }),
135                }
136            }
137            VariableValue::ZonedDateTime(zdt) => {
138                let value = zdt.datetime().timestamp_millis() as f64;
139                accumulator.insert(value).await;
140                match accumulator.get_head().await {
141                    Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
142                        ZonedDateTime::from_epoch_millis(head as u64),
143                    )),
144                    Ok(None) => Ok(VariableValue::Null),
145                    Err(e) => Err(FunctionError {
146                        function_name: "Min".to_string(),
147                        error: FunctionEvaluationError::IndexError(e),
148                    }),
149                }
150            }
151            VariableValue::Duration(d) => {
152                let value = d.duration().num_milliseconds() as f64;
153                accumulator.insert(value).await;
154                match accumulator.get_head().await {
155                    Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
156                        ChronoDuration::milliseconds(head as i64),
157                        0,
158                        0,
159                    ))),
160                    Ok(None) => Ok(VariableValue::Null),
161                    Err(e) => Err(FunctionError {
162                        function_name: "Min".to_string(),
163                        error: FunctionEvaluationError::IndexError(e),
164                    }),
165                }
166            }
167            VariableValue::Date(d) => {
168                let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
169                let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
170                accumulator.insert(days_since_epoch).await;
171                match accumulator.get_head().await {
172                    Ok(Some(head)) => Ok(VariableValue::Date(
173                        reference_date + ChronoDuration::days(head as i64),
174                    )),
175                    Ok(None) => Ok(VariableValue::Null),
176                    Err(e) => Err(FunctionError {
177                        function_name: "Min".to_string(),
178                        error: FunctionEvaluationError::IndexError(e),
179                    }),
180                }
181            }
182            VariableValue::LocalTime(t) => {
183                let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
184                let seconds_since_midnight =
185                    t.signed_duration_since(reference_time).num_milliseconds() as f64;
186                accumulator.insert(seconds_since_midnight).await;
187                match accumulator.get_head().await {
188                    Ok(Some(head)) => Ok(VariableValue::LocalTime(
189                        reference_time + ChronoDuration::milliseconds(head as i64),
190                    )),
191                    Ok(None) => Ok(VariableValue::Null),
192                    Err(e) => Err(FunctionError {
193                        function_name: "Min".to_string(),
194                        error: FunctionEvaluationError::IndexError(e),
195                    }),
196                }
197            }
198            VariableValue::LocalDateTime(dt) => {
199                let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
200                accumulator.insert(duration_since_epoch).await;
201                match accumulator.get_head().await {
202                    Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
203                        DateTime::from_timestamp_millis(head as i64)
204                            .unwrap_or_default()
205                            .naive_local(),
206                    )),
207                    Ok(None) => Ok(VariableValue::Null),
208                    Err(e) => Err(FunctionError {
209                        function_name: "Min".to_string(),
210                        error: FunctionEvaluationError::IndexError(e),
211                    }),
212                }
213            }
214            VariableValue::ZonedTime(t) => {
215                let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
216                let epoch_datetime = match epoch_date
217                    .and_time(*t.time())
218                    .and_local_timezone(*t.offset())
219                {
220                    LocalResult::Single(dt) => dt,
221                    _ => {
222                        return Err(FunctionError {
223                            function_name: "Min".to_string(),
224                            error: FunctionEvaluationError::InvalidFormat {
225                                expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
226                                    .to_string(),
227                            },
228                        })
229                    }
230                };
231                let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
232                accumulator.insert(duration_since_epoch).await;
233                match accumulator.get_head().await {
234                    Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
235                        (epoch_datetime + ChronoDuration::milliseconds(head as i64)).time(),
236                        *temporal_constants::UTC_FIXED_OFFSET,
237                    ))),
238                    Ok(None) => Ok(VariableValue::Null),
239                    Err(e) => Err(FunctionError {
240                        function_name: "Min".to_string(),
241                        error: FunctionEvaluationError::IndexError(e),
242                    }),
243                }
244            }
245            VariableValue::Null => Ok(VariableValue::Null),
246            _ => Err(FunctionError {
247                function_name: "Min".to_string(),
248                error: FunctionEvaluationError::InvalidArgument(0),
249            }),
250        }
251    }
252
253    async fn revert(
254        &self,
255        _context: &ExpressionEvaluationContext,
256        args: Vec<VariableValue>,
257        accumulator: &mut Accumulator,
258    ) -> Result<VariableValue, FunctionError> {
259        if args.len() != 1 {
260            return Err(FunctionError {
261                function_name: "Min".to_string(),
262                error: FunctionEvaluationError::InvalidArgumentCount,
263            });
264        }
265        let accumulator = match accumulator {
266            Accumulator::LazySortedSet(accumulator) => accumulator,
267            _ => {
268                return Err(FunctionError {
269                    function_name: "Min".to_string(),
270                    error: FunctionEvaluationError::CorruptData,
271                })
272            }
273        };
274
275        match &args[0] {
276            VariableValue::Float(n) => {
277                let value = match n.as_f64() {
278                    Some(n) => n,
279                    None => {
280                        return Err(FunctionError {
281                            function_name: "Min".to_string(),
282                            error: FunctionEvaluationError::OverflowError,
283                        })
284                    }
285                };
286                accumulator.remove(value).await;
287                match accumulator.get_head().await {
288                    Ok(Some(head)) => match Float::from_f64(head) {
289                        Some(f) => Ok(VariableValue::Float(f)),
290                        None => Err(FunctionError {
291                            function_name: "Min".to_string(),
292                            error: FunctionEvaluationError::OverflowError,
293                        }),
294                    },
295                    Ok(None) => Ok(VariableValue::Null),
296                    Err(e) => Err(FunctionError {
297                        function_name: "Min".to_string(),
298                        error: FunctionEvaluationError::IndexError(e),
299                    }),
300                }
301            }
302            VariableValue::Integer(n) => {
303                let value = match n.as_i64() {
304                    Some(n) => n,
305                    None => {
306                        return Err(FunctionError {
307                            function_name: "Min".to_string(),
308                            error: FunctionEvaluationError::OverflowError,
309                        })
310                    }
311                };
312                accumulator.remove((value as f64) * 1.0).await;
313                match accumulator.get_head().await {
314                    Ok(Some(head)) => match Float::from_f64(head) {
315                        Some(f) => Ok(VariableValue::Float(f)),
316                        None => Err(FunctionError {
317                            function_name: "Min".to_string(),
318                            error: FunctionEvaluationError::OverflowError,
319                        }),
320                    },
321                    Ok(None) => Ok(VariableValue::Null),
322                    Err(e) => Err(FunctionError {
323                        function_name: "Min".to_string(),
324                        error: FunctionEvaluationError::IndexError(e),
325                    }),
326                }
327            }
328            VariableValue::ZonedDateTime(zdt) => {
329                let value = zdt.datetime().timestamp_millis() as f64;
330                accumulator.remove(value).await;
331                match accumulator.get_head().await {
332                    Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
333                        ZonedDateTime::from_epoch_millis(head as u64),
334                    )),
335                    Ok(None) => Ok(VariableValue::Null),
336                    Err(e) => Err(FunctionError {
337                        function_name: "Min".to_string(),
338                        error: FunctionEvaluationError::IndexError(e),
339                    }),
340                }
341            }
342            VariableValue::Duration(d) => {
343                let value = d.duration().num_milliseconds() as f64;
344                accumulator.remove(value).await;
345                match accumulator.get_head().await {
346                    Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
347                        ChronoDuration::milliseconds(head as i64),
348                        0,
349                        0,
350                    ))),
351                    Ok(None) => Ok(VariableValue::Null),
352                    Err(e) => Err(FunctionError {
353                        function_name: "Min".to_string(),
354                        error: FunctionEvaluationError::IndexError(e),
355                    }),
356                }
357            }
358            VariableValue::Date(d) => {
359                // For date (Chrono::NaiveDate), we can store the number of days since the epoch
360                let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
361                let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
362                accumulator.remove(days_since_epoch).await;
363                match accumulator.get_head().await {
364                    Ok(Some(head)) => Ok(VariableValue::Date(
365                        reference_date + ChronoDuration::days(head as i64),
366                    )),
367                    Ok(None) => Ok(VariableValue::Null),
368                    Err(e) => Err(FunctionError {
369                        function_name: "Min".to_string(),
370                        error: FunctionEvaluationError::IndexError(e),
371                    }),
372                }
373            }
374            VariableValue::LocalTime(t) => {
375                let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
376                let duration_since_midnight =
377                    t.signed_duration_since(reference_time).num_milliseconds() as f64;
378                accumulator.remove(duration_since_midnight).await;
379
380                match accumulator.get_head().await {
381                    Ok(Some(head)) => Ok(VariableValue::LocalTime(
382                        reference_time + ChronoDuration::milliseconds(head as i64),
383                    )),
384                    Ok(None) => Ok(VariableValue::Null),
385                    Err(e) => Err(FunctionError {
386                        function_name: "Min".to_string(),
387                        error: FunctionEvaluationError::IndexError(e),
388                    }),
389                }
390            }
391            VariableValue::LocalDateTime(dt) => {
392                let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
393                accumulator.remove(duration_since_epoch).await;
394                match accumulator.get_head().await {
395                    Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
396                        DateTime::from_timestamp_millis(head as i64)
397                            .unwrap_or_default()
398                            .naive_local(),
399                    )),
400                    Ok(None) => Ok(VariableValue::Null),
401                    Err(e) => Err(FunctionError {
402                        function_name: "Min".to_string(),
403                        error: FunctionEvaluationError::IndexError(e),
404                    }),
405                }
406            }
407            VariableValue::ZonedTime(t) => {
408                let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
409                let epoch_datetime = match epoch_date
410                    .and_time(*t.time())
411                    .and_local_timezone(*t.offset())
412                {
413                    LocalResult::Single(dt) => dt,
414                    _ => {
415                        return Err(FunctionError {
416                            function_name: "Min".to_string(),
417                            error: FunctionEvaluationError::InvalidFormat {
418                                expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
419                                    .to_string(),
420                            },
421                        })
422                    }
423                };
424                let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
425                accumulator.remove(duration_since_epoch).await;
426                match accumulator.get_head().await {
427                    Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
428                        (epoch_datetime + ChronoDuration::milliseconds(head as i64)).time(),
429                        *temporal_constants::UTC_FIXED_OFFSET,
430                    ))),
431                    Ok(None) => Ok(VariableValue::Null),
432                    Err(e) => Err(FunctionError {
433                        function_name: "Min".to_string(),
434                        error: FunctionEvaluationError::IndexError(e),
435                    }),
436                }
437            }
438            VariableValue::Null => Ok(VariableValue::Null),
439            _ => Err(FunctionError {
440                function_name: "Min".to_string(),
441                error: FunctionEvaluationError::InvalidArgument(0),
442            }),
443        }
444    }
445
446    async fn snapshot(
447        &self,
448        _context: &ExpressionEvaluationContext,
449        args: Vec<VariableValue>,
450        accumulator: &Accumulator,
451    ) -> Result<VariableValue, FunctionError> {
452        if args.len() != 1 {
453            return Err(FunctionError {
454                function_name: "Min".to_string(),
455                error: FunctionEvaluationError::InvalidArgumentCount,
456            });
457        }
458
459        let accumulator = match accumulator {
460            Accumulator::LazySortedSet(accumulator) => accumulator,
461            _ => {
462                return Err(FunctionError {
463                    function_name: "Min".to_string(),
464                    error: FunctionEvaluationError::CorruptData,
465                })
466            }
467        };
468
469        let value = match accumulator.get_head().await {
470            Ok(Some(head)) => head,
471            Ok(None) => return Ok(VariableValue::Null),
472            Err(e) => {
473                return Err(FunctionError {
474                    function_name: "Min".to_string(),
475                    error: FunctionEvaluationError::IndexError(e),
476                })
477            }
478        };
479
480        return match &args[0] {
481            VariableValue::Float(_) => Ok(VariableValue::Float(match Float::from_f64(value) {
482                Some(f) => f,
483                None => {
484                    return Err(FunctionError {
485                        function_name: "Min".to_string(),
486                        error: FunctionEvaluationError::OverflowError,
487                    })
488                }
489            })),
490            VariableValue::Integer(_) => Ok(VariableValue::Integer((value as i64).into())),
491            VariableValue::ZonedDateTime(_) => Ok(VariableValue::ZonedDateTime(
492                ZonedDateTime::from_epoch_millis(value as u64),
493            )),
494            VariableValue::Duration(_) => Ok(VariableValue::Duration(Duration::new(
495                ChronoDuration::milliseconds(value as i64),
496                0,
497                0,
498            ))),
499            VariableValue::Date(_) => {
500                let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
501                Ok(VariableValue::Date(
502                    reference_date + ChronoDuration::days(value as i64),
503                ))
504            }
505            VariableValue::LocalTime(_) => {
506                let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
507                Ok(VariableValue::LocalTime(
508                    reference_time + ChronoDuration::milliseconds(value as i64),
509                ))
510            }
511            VariableValue::LocalDateTime(_) => Ok(VariableValue::LocalDateTime(
512                DateTime::from_timestamp_millis(value as i64)
513                    .unwrap_or_default()
514                    .naive_local(),
515            )),
516            VariableValue::ZonedTime(_) => {
517                let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
518                let epoch_datetime = match epoch_date
519                    .and_time(*temporal_constants::MIDNIGHT_NAIVE_TIME)
520                    .and_local_timezone(*temporal_constants::UTC_FIXED_OFFSET)
521                {
522                    LocalResult::Single(dt) => dt,
523                    _ => {
524                        return Err(FunctionError {
525                            function_name: "Min".to_string(),
526                            error: FunctionEvaluationError::InvalidFormat {
527                                expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
528                                    .to_string(),
529                            },
530                        })
531                    }
532                };
533                Ok(VariableValue::ZonedTime(ZonedTime::new(
534                    (epoch_datetime + ChronoDuration::milliseconds(value as i64)).time(),
535                    *temporal_constants::UTC_FIXED_OFFSET,
536                )))
537            }
538            VariableValue::Null => Ok(VariableValue::Null),
539            _ => Err(FunctionError {
540                function_name: "Min".to_string(),
541                error: FunctionEvaluationError::InvalidArgument(0),
542            }),
543        };
544    }
545}
546
547impl Debug for Min {
548    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549        write!(f, "Min")
550    }
551}