1use std::{fmt::Debug, sync::Arc};
16
17use crate::{
18 evaluation::{
19 temporal_constants, variable_value::zoned_datetime::ZonedDateTime, FunctionError,
20 FunctionEvaluationError,
21 },
22 interface::ResultIndex,
23};
24
25use async_trait::async_trait;
26
27use drasi_query_ast::ast;
28
29use crate::evaluation::{
30 variable_value::duration::Duration, variable_value::float::Float,
31 variable_value::zoned_time::ZonedTime, variable_value::VariableValue,
32 ExpressionEvaluationContext,
33};
34
35use super::{super::AggregatingFunction, lazy_sorted_set::LazySortedSet, Accumulator};
36use chrono::{offset::LocalResult, DateTime, Duration as ChronoDuration};
37
38#[derive(Clone)]
39pub struct Min {}
40
41#[async_trait]
42impl AggregatingFunction for Min {
43 fn initialize_accumulator(
44 &self,
45 _context: &ExpressionEvaluationContext,
46 expression: &ast::FunctionExpression,
47 grouping_keys: &Vec<VariableValue>,
48 index: Arc<dyn ResultIndex>,
49 ) -> Accumulator {
50 Accumulator::LazySortedSet(LazySortedSet::new(
51 expression.position_in_query,
52 grouping_keys,
53 index,
54 ))
55 }
56
57 fn accumulator_is_lazy(&self) -> bool {
58 true
59 }
60
61 async fn apply(
62 &self,
63 _context: &ExpressionEvaluationContext,
64 args: Vec<VariableValue>,
65 accumulator: &mut Accumulator,
66 ) -> Result<VariableValue, FunctionError> {
67 if args.len() != 1 {
68 return Err(FunctionError {
69 function_name: "Min".to_string(),
70 error: FunctionEvaluationError::InvalidArgumentCount,
71 });
72 }
73
74 let accumulator = match accumulator {
75 Accumulator::LazySortedSet(accumulator) => accumulator,
76 _ => {
77 return Err(FunctionError {
78 function_name: "Min".to_string(),
79 error: FunctionEvaluationError::CorruptData,
80 })
81 }
82 };
83
84 match &args[0] {
85 VariableValue::Float(n) => {
86 let value = match n.as_f64() {
87 Some(n) => n,
88 None => {
89 return Err(FunctionError {
90 function_name: "Min".to_string(),
91 error: FunctionEvaluationError::OverflowError,
92 })
93 }
94 };
95 accumulator.insert(value).await;
96 match accumulator.get_head().await {
97 Ok(Some(head)) => match Float::from_f64(head) {
98 Some(f) => Ok(VariableValue::Float(f)),
99 None => Err(FunctionError {
100 function_name: "Min".to_string(),
101 error: FunctionEvaluationError::OverflowError,
102 }),
103 },
104 Ok(None) => Ok(VariableValue::Null),
105 Err(e) => Err(FunctionError {
106 function_name: "Min".to_string(),
107 error: FunctionEvaluationError::IndexError(e),
108 }),
109 }
110 }
111 VariableValue::Integer(n) => {
112 let value = match n.as_i64() {
113 Some(n) => n,
114 None => {
115 return Err(FunctionError {
116 function_name: "Min".to_string(),
117 error: FunctionEvaluationError::OverflowError,
118 })
119 }
120 };
121 accumulator.insert(value as f64).await;
122 match accumulator.get_head().await {
123 Ok(Some(head)) => match Float::from_f64(head) {
124 Some(f) => Ok(VariableValue::Float(f)),
125 None => Err(FunctionError {
126 function_name: "Min".to_string(),
127 error: FunctionEvaluationError::OverflowError,
128 }),
129 },
130 Ok(None) => Ok(VariableValue::Null),
131 Err(e) => Err(FunctionError {
132 function_name: "Min".to_string(),
133 error: FunctionEvaluationError::IndexError(e),
134 }),
135 }
136 }
137 VariableValue::ZonedDateTime(zdt) => {
138 let value = zdt.datetime().timestamp_millis() as f64;
139 accumulator.insert(value).await;
140 match accumulator.get_head().await {
141 Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
142 ZonedDateTime::from_epoch_millis(head as u64),
143 )),
144 Ok(None) => Ok(VariableValue::Null),
145 Err(e) => Err(FunctionError {
146 function_name: "Min".to_string(),
147 error: FunctionEvaluationError::IndexError(e),
148 }),
149 }
150 }
151 VariableValue::Duration(d) => {
152 let value = d.duration().num_milliseconds() as f64;
153 accumulator.insert(value).await;
154 match accumulator.get_head().await {
155 Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
156 ChronoDuration::milliseconds(head as i64),
157 0,
158 0,
159 ))),
160 Ok(None) => Ok(VariableValue::Null),
161 Err(e) => Err(FunctionError {
162 function_name: "Min".to_string(),
163 error: FunctionEvaluationError::IndexError(e),
164 }),
165 }
166 }
167 VariableValue::Date(d) => {
168 let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
169 let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
170 accumulator.insert(days_since_epoch).await;
171 match accumulator.get_head().await {
172 Ok(Some(head)) => Ok(VariableValue::Date(
173 reference_date + ChronoDuration::days(head as i64),
174 )),
175 Ok(None) => Ok(VariableValue::Null),
176 Err(e) => Err(FunctionError {
177 function_name: "Min".to_string(),
178 error: FunctionEvaluationError::IndexError(e),
179 }),
180 }
181 }
182 VariableValue::LocalTime(t) => {
183 let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
184 let seconds_since_midnight =
185 t.signed_duration_since(reference_time).num_milliseconds() as f64;
186 accumulator.insert(seconds_since_midnight).await;
187 match accumulator.get_head().await {
188 Ok(Some(head)) => Ok(VariableValue::LocalTime(
189 reference_time + ChronoDuration::milliseconds(head as i64),
190 )),
191 Ok(None) => Ok(VariableValue::Null),
192 Err(e) => Err(FunctionError {
193 function_name: "Min".to_string(),
194 error: FunctionEvaluationError::IndexError(e),
195 }),
196 }
197 }
198 VariableValue::LocalDateTime(dt) => {
199 let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
200 accumulator.insert(duration_since_epoch).await;
201 match accumulator.get_head().await {
202 Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
203 DateTime::from_timestamp_millis(head as i64)
204 .unwrap_or_default()
205 .naive_local(),
206 )),
207 Ok(None) => Ok(VariableValue::Null),
208 Err(e) => Err(FunctionError {
209 function_name: "Min".to_string(),
210 error: FunctionEvaluationError::IndexError(e),
211 }),
212 }
213 }
214 VariableValue::ZonedTime(t) => {
215 let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
216 let epoch_datetime = match epoch_date
217 .and_time(*t.time())
218 .and_local_timezone(*t.offset())
219 {
220 LocalResult::Single(dt) => dt,
221 _ => {
222 return Err(FunctionError {
223 function_name: "Min".to_string(),
224 error: FunctionEvaluationError::InvalidFormat {
225 expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
226 .to_string(),
227 },
228 })
229 }
230 };
231 let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
232 accumulator.insert(duration_since_epoch).await;
233 match accumulator.get_head().await {
234 Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
235 (epoch_datetime + ChronoDuration::milliseconds(head as i64)).time(),
236 *temporal_constants::UTC_FIXED_OFFSET,
237 ))),
238 Ok(None) => Ok(VariableValue::Null),
239 Err(e) => Err(FunctionError {
240 function_name: "Min".to_string(),
241 error: FunctionEvaluationError::IndexError(e),
242 }),
243 }
244 }
245 VariableValue::Null => Ok(VariableValue::Null),
246 _ => Err(FunctionError {
247 function_name: "Min".to_string(),
248 error: FunctionEvaluationError::InvalidArgument(0),
249 }),
250 }
251 }
252
253 async fn revert(
254 &self,
255 _context: &ExpressionEvaluationContext,
256 args: Vec<VariableValue>,
257 accumulator: &mut Accumulator,
258 ) -> Result<VariableValue, FunctionError> {
259 if args.len() != 1 {
260 return Err(FunctionError {
261 function_name: "Min".to_string(),
262 error: FunctionEvaluationError::InvalidArgumentCount,
263 });
264 }
265 let accumulator = match accumulator {
266 Accumulator::LazySortedSet(accumulator) => accumulator,
267 _ => {
268 return Err(FunctionError {
269 function_name: "Min".to_string(),
270 error: FunctionEvaluationError::CorruptData,
271 })
272 }
273 };
274
275 match &args[0] {
276 VariableValue::Float(n) => {
277 let value = match n.as_f64() {
278 Some(n) => n,
279 None => {
280 return Err(FunctionError {
281 function_name: "Min".to_string(),
282 error: FunctionEvaluationError::OverflowError,
283 })
284 }
285 };
286 accumulator.remove(value).await;
287 match accumulator.get_head().await {
288 Ok(Some(head)) => match Float::from_f64(head) {
289 Some(f) => Ok(VariableValue::Float(f)),
290 None => Err(FunctionError {
291 function_name: "Min".to_string(),
292 error: FunctionEvaluationError::OverflowError,
293 }),
294 },
295 Ok(None) => Ok(VariableValue::Null),
296 Err(e) => Err(FunctionError {
297 function_name: "Min".to_string(),
298 error: FunctionEvaluationError::IndexError(e),
299 }),
300 }
301 }
302 VariableValue::Integer(n) => {
303 let value = match n.as_i64() {
304 Some(n) => n,
305 None => {
306 return Err(FunctionError {
307 function_name: "Min".to_string(),
308 error: FunctionEvaluationError::OverflowError,
309 })
310 }
311 };
312 accumulator.remove((value as f64) * 1.0).await;
313 match accumulator.get_head().await {
314 Ok(Some(head)) => match Float::from_f64(head) {
315 Some(f) => Ok(VariableValue::Float(f)),
316 None => Err(FunctionError {
317 function_name: "Min".to_string(),
318 error: FunctionEvaluationError::OverflowError,
319 }),
320 },
321 Ok(None) => Ok(VariableValue::Null),
322 Err(e) => Err(FunctionError {
323 function_name: "Min".to_string(),
324 error: FunctionEvaluationError::IndexError(e),
325 }),
326 }
327 }
328 VariableValue::ZonedDateTime(zdt) => {
329 let value = zdt.datetime().timestamp_millis() as f64;
330 accumulator.remove(value).await;
331 match accumulator.get_head().await {
332 Ok(Some(head)) => Ok(VariableValue::ZonedDateTime(
333 ZonedDateTime::from_epoch_millis(head as u64),
334 )),
335 Ok(None) => Ok(VariableValue::Null),
336 Err(e) => Err(FunctionError {
337 function_name: "Min".to_string(),
338 error: FunctionEvaluationError::IndexError(e),
339 }),
340 }
341 }
342 VariableValue::Duration(d) => {
343 let value = d.duration().num_milliseconds() as f64;
344 accumulator.remove(value).await;
345 match accumulator.get_head().await {
346 Ok(Some(head)) => Ok(VariableValue::Duration(Duration::new(
347 ChronoDuration::milliseconds(head as i64),
348 0,
349 0,
350 ))),
351 Ok(None) => Ok(VariableValue::Null),
352 Err(e) => Err(FunctionError {
353 function_name: "Min".to_string(),
354 error: FunctionEvaluationError::IndexError(e),
355 }),
356 }
357 }
358 VariableValue::Date(d) => {
359 let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
361 let days_since_epoch = d.signed_duration_since(reference_date).num_days() as f64;
362 accumulator.remove(days_since_epoch).await;
363 match accumulator.get_head().await {
364 Ok(Some(head)) => Ok(VariableValue::Date(
365 reference_date + ChronoDuration::days(head as i64),
366 )),
367 Ok(None) => Ok(VariableValue::Null),
368 Err(e) => Err(FunctionError {
369 function_name: "Min".to_string(),
370 error: FunctionEvaluationError::IndexError(e),
371 }),
372 }
373 }
374 VariableValue::LocalTime(t) => {
375 let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
376 let duration_since_midnight =
377 t.signed_duration_since(reference_time).num_milliseconds() as f64;
378 accumulator.remove(duration_since_midnight).await;
379
380 match accumulator.get_head().await {
381 Ok(Some(head)) => Ok(VariableValue::LocalTime(
382 reference_time + ChronoDuration::milliseconds(head as i64),
383 )),
384 Ok(None) => Ok(VariableValue::Null),
385 Err(e) => Err(FunctionError {
386 function_name: "Min".to_string(),
387 error: FunctionEvaluationError::IndexError(e),
388 }),
389 }
390 }
391 VariableValue::LocalDateTime(dt) => {
392 let duration_since_epoch = dt.and_utc().timestamp_millis() as f64;
393 accumulator.remove(duration_since_epoch).await;
394 match accumulator.get_head().await {
395 Ok(Some(head)) => Ok(VariableValue::LocalDateTime(
396 DateTime::from_timestamp_millis(head as i64)
397 .unwrap_or_default()
398 .naive_local(),
399 )),
400 Ok(None) => Ok(VariableValue::Null),
401 Err(e) => Err(FunctionError {
402 function_name: "Min".to_string(),
403 error: FunctionEvaluationError::IndexError(e),
404 }),
405 }
406 }
407 VariableValue::ZonedTime(t) => {
408 let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
409 let epoch_datetime = match epoch_date
410 .and_time(*t.time())
411 .and_local_timezone(*t.offset())
412 {
413 LocalResult::Single(dt) => dt,
414 _ => {
415 return Err(FunctionError {
416 function_name: "Min".to_string(),
417 error: FunctionEvaluationError::InvalidFormat {
418 expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
419 .to_string(),
420 },
421 })
422 }
423 };
424 let duration_since_epoch = epoch_datetime.timestamp_millis() as f64;
425 accumulator.remove(duration_since_epoch).await;
426 match accumulator.get_head().await {
427 Ok(Some(head)) => Ok(VariableValue::ZonedTime(ZonedTime::new(
428 (epoch_datetime + ChronoDuration::milliseconds(head as i64)).time(),
429 *temporal_constants::UTC_FIXED_OFFSET,
430 ))),
431 Ok(None) => Ok(VariableValue::Null),
432 Err(e) => Err(FunctionError {
433 function_name: "Min".to_string(),
434 error: FunctionEvaluationError::IndexError(e),
435 }),
436 }
437 }
438 VariableValue::Null => Ok(VariableValue::Null),
439 _ => Err(FunctionError {
440 function_name: "Min".to_string(),
441 error: FunctionEvaluationError::InvalidArgument(0),
442 }),
443 }
444 }
445
446 async fn snapshot(
447 &self,
448 _context: &ExpressionEvaluationContext,
449 args: Vec<VariableValue>,
450 accumulator: &Accumulator,
451 ) -> Result<VariableValue, FunctionError> {
452 if args.len() != 1 {
453 return Err(FunctionError {
454 function_name: "Min".to_string(),
455 error: FunctionEvaluationError::InvalidArgumentCount,
456 });
457 }
458
459 let accumulator = match accumulator {
460 Accumulator::LazySortedSet(accumulator) => accumulator,
461 _ => {
462 return Err(FunctionError {
463 function_name: "Min".to_string(),
464 error: FunctionEvaluationError::CorruptData,
465 })
466 }
467 };
468
469 let value = match accumulator.get_head().await {
470 Ok(Some(head)) => head,
471 Ok(None) => return Ok(VariableValue::Null),
472 Err(e) => {
473 return Err(FunctionError {
474 function_name: "Min".to_string(),
475 error: FunctionEvaluationError::IndexError(e),
476 })
477 }
478 };
479
480 return match &args[0] {
481 VariableValue::Float(_) => Ok(VariableValue::Float(match Float::from_f64(value) {
482 Some(f) => f,
483 None => {
484 return Err(FunctionError {
485 function_name: "Min".to_string(),
486 error: FunctionEvaluationError::OverflowError,
487 })
488 }
489 })),
490 VariableValue::Integer(_) => Ok(VariableValue::Integer((value as i64).into())),
491 VariableValue::ZonedDateTime(_) => Ok(VariableValue::ZonedDateTime(
492 ZonedDateTime::from_epoch_millis(value as u64),
493 )),
494 VariableValue::Duration(_) => Ok(VariableValue::Duration(Duration::new(
495 ChronoDuration::milliseconds(value as i64),
496 0,
497 0,
498 ))),
499 VariableValue::Date(_) => {
500 let reference_date = *temporal_constants::EPOCH_NAIVE_DATE;
501 Ok(VariableValue::Date(
502 reference_date + ChronoDuration::days(value as i64),
503 ))
504 }
505 VariableValue::LocalTime(_) => {
506 let reference_time = *temporal_constants::MIDNIGHT_NAIVE_TIME;
507 Ok(VariableValue::LocalTime(
508 reference_time + ChronoDuration::milliseconds(value as i64),
509 ))
510 }
511 VariableValue::LocalDateTime(_) => Ok(VariableValue::LocalDateTime(
512 DateTime::from_timestamp_millis(value as i64)
513 .unwrap_or_default()
514 .naive_local(),
515 )),
516 VariableValue::ZonedTime(_) => {
517 let epoch_date = *temporal_constants::EPOCH_NAIVE_DATE;
518 let epoch_datetime = match epoch_date
519 .and_time(*temporal_constants::MIDNIGHT_NAIVE_TIME)
520 .and_local_timezone(*temporal_constants::UTC_FIXED_OFFSET)
521 {
522 LocalResult::Single(dt) => dt,
523 _ => {
524 return Err(FunctionError {
525 function_name: "Min".to_string(),
526 error: FunctionEvaluationError::InvalidFormat {
527 expected: temporal_constants::INVALID_ZONED_TIME_FORMAT_ERROR
528 .to_string(),
529 },
530 })
531 }
532 };
533 Ok(VariableValue::ZonedTime(ZonedTime::new(
534 (epoch_datetime + ChronoDuration::milliseconds(value as i64)).time(),
535 *temporal_constants::UTC_FIXED_OFFSET,
536 )))
537 }
538 VariableValue::Null => Ok(VariableValue::Null),
539 _ => Err(FunctionError {
540 function_name: "Min".to_string(),
541 error: FunctionEvaluationError::InvalidArgument(0),
542 }),
543 };
544 }
545}
546
547impl Debug for Min {
548 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549 write!(f, "Min")
550 }
551}