Skip to main content

reifydb_function/math/scalar/
max.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::data::ColumnData;
5use reifydb_type::value::r#type::Type;
6
7use crate::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionError, propagate_options};
8
9pub struct Max;
10
11impl Max {
12	pub fn new() -> Self {
13		Self
14	}
15}
16
17impl ScalarFunction for Max {
18	fn scalar(&self, ctx: ScalarFunctionContext) -> crate::error::ScalarFunctionResult<ColumnData> {
19		if let Some(result) = propagate_options(self, &ctx) {
20			return result;
21		}
22		let columns = ctx.columns;
23		let row_count = ctx.row_count;
24
25		// Validate at least 1 argument
26		if columns.is_empty() {
27			return Err(ScalarFunctionError::ArityMismatch {
28				function: ctx.fragment.clone(),
29				expected: 1,
30				actual: 0,
31			});
32		}
33
34		// For max function, we need to find the maximum value across all columns for each row
35		let first_column = columns.get(0).unwrap();
36
37		match first_column.data() {
38			ColumnData::Int1(_) => {
39				let mut result = Vec::with_capacity(row_count);
40				let mut bitvec = Vec::with_capacity(row_count);
41
42				for row_idx in 0..row_count {
43					let mut max_value: Option<i8> = None;
44
45					// Check all columns for this row
46					for column in columns.iter() {
47						if let ColumnData::Int1(container) = column.data() {
48							if let Some(value) = container.get(row_idx) {
49								max_value = Some(match max_value {
50									None => *value,
51									Some(current_max) => current_max.max(*value),
52								});
53							}
54						}
55					}
56
57					match max_value {
58						Some(v) => {
59							result.push(v);
60							bitvec.push(true);
61						}
62						None => {
63							result.push(0);
64							bitvec.push(false);
65						}
66					}
67				}
68
69				Ok(ColumnData::int1_with_bitvec(result, bitvec))
70			}
71			ColumnData::Int2(_) => {
72				let mut result = Vec::with_capacity(row_count);
73				let mut bitvec = Vec::with_capacity(row_count);
74
75				for row_idx in 0..row_count {
76					let mut max_value: Option<i16> = None;
77
78					for column in columns.iter() {
79						if let ColumnData::Int2(container) = column.data() {
80							if let Some(value) = container.get(row_idx) {
81								max_value = Some(match max_value {
82									None => *value,
83									Some(current_max) => current_max.max(*value),
84								});
85							}
86						}
87					}
88
89					match max_value {
90						Some(v) => {
91							result.push(v);
92							bitvec.push(true);
93						}
94						None => {
95							result.push(0);
96							bitvec.push(false);
97						}
98					}
99				}
100
101				Ok(ColumnData::int2_with_bitvec(result, bitvec))
102			}
103			ColumnData::Int4(_) => {
104				let mut result = Vec::with_capacity(row_count);
105				let mut bitvec = Vec::with_capacity(row_count);
106
107				for row_idx in 0..row_count {
108					let mut max_value: Option<i32> = None;
109
110					for column in columns.iter() {
111						if let ColumnData::Int4(container) = column.data() {
112							if let Some(value) = container.get(row_idx) {
113								max_value = Some(match max_value {
114									None => *value,
115									Some(current_max) => current_max.max(*value),
116								});
117							}
118						}
119					}
120
121					match max_value {
122						Some(v) => {
123							result.push(v);
124							bitvec.push(true);
125						}
126						None => {
127							result.push(0);
128							bitvec.push(false);
129						}
130					}
131				}
132
133				Ok(ColumnData::int4_with_bitvec(result, bitvec))
134			}
135			ColumnData::Int8(_) => {
136				let mut result = Vec::with_capacity(row_count);
137				let mut bitvec = Vec::with_capacity(row_count);
138
139				for row_idx in 0..row_count {
140					let mut max_value: Option<i64> = None;
141
142					for column in columns.iter() {
143						if let ColumnData::Int8(container) = column.data() {
144							if let Some(value) = container.get(row_idx) {
145								max_value = Some(match max_value {
146									None => *value,
147									Some(current_max) => current_max.max(*value),
148								});
149							}
150						}
151					}
152
153					match max_value {
154						Some(v) => {
155							result.push(v);
156							bitvec.push(true);
157						}
158						None => {
159							result.push(0);
160							bitvec.push(false);
161						}
162					}
163				}
164
165				Ok(ColumnData::int8_with_bitvec(result, bitvec))
166			}
167			ColumnData::Int16(_) => {
168				let mut result = Vec::with_capacity(row_count);
169				let mut bitvec = Vec::with_capacity(row_count);
170
171				for row_idx in 0..row_count {
172					let mut max_value: Option<i128> = None;
173
174					for column in columns.iter() {
175						if let ColumnData::Int16(container) = column.data() {
176							if let Some(value) = container.get(row_idx) {
177								max_value = Some(match max_value {
178									None => *value,
179									Some(current_max) => current_max.max(*value),
180								});
181							}
182						}
183					}
184
185					match max_value {
186						Some(v) => {
187							result.push(v);
188							bitvec.push(true);
189						}
190						None => {
191							result.push(0);
192							bitvec.push(false);
193						}
194					}
195				}
196
197				Ok(ColumnData::int16_with_bitvec(result, bitvec))
198			}
199			ColumnData::Uint1(_) => {
200				let mut result = Vec::with_capacity(row_count);
201				let mut bitvec = Vec::with_capacity(row_count);
202
203				for row_idx in 0..row_count {
204					let mut max_value: Option<u8> = None;
205
206					for column in columns.iter() {
207						if let ColumnData::Uint1(container) = column.data() {
208							if let Some(value) = container.get(row_idx) {
209								max_value = Some(match max_value {
210									None => *value,
211									Some(current_max) => current_max.max(*value),
212								});
213							}
214						}
215					}
216
217					match max_value {
218						Some(v) => {
219							result.push(v);
220							bitvec.push(true);
221						}
222						None => {
223							result.push(0);
224							bitvec.push(false);
225						}
226					}
227				}
228
229				Ok(ColumnData::uint1_with_bitvec(result, bitvec))
230			}
231			ColumnData::Uint2(_) => {
232				let mut result = Vec::with_capacity(row_count);
233				let mut bitvec = Vec::with_capacity(row_count);
234
235				for row_idx in 0..row_count {
236					let mut max_value: Option<u16> = None;
237
238					for column in columns.iter() {
239						if let ColumnData::Uint2(container) = column.data() {
240							if let Some(value) = container.get(row_idx) {
241								max_value = Some(match max_value {
242									None => *value,
243									Some(current_max) => current_max.max(*value),
244								});
245							}
246						}
247					}
248
249					match max_value {
250						Some(v) => {
251							result.push(v);
252							bitvec.push(true);
253						}
254						None => {
255							result.push(0);
256							bitvec.push(false);
257						}
258					}
259				}
260
261				Ok(ColumnData::uint2_with_bitvec(result, bitvec))
262			}
263			ColumnData::Uint4(_) => {
264				let mut result = Vec::with_capacity(row_count);
265				let mut bitvec = Vec::with_capacity(row_count);
266
267				for row_idx in 0..row_count {
268					let mut max_value: Option<u32> = None;
269
270					for column in columns.iter() {
271						if let ColumnData::Uint4(container) = column.data() {
272							if let Some(value) = container.get(row_idx) {
273								max_value = Some(match max_value {
274									None => *value,
275									Some(current_max) => current_max.max(*value),
276								});
277							}
278						}
279					}
280
281					match max_value {
282						Some(v) => {
283							result.push(v);
284							bitvec.push(true);
285						}
286						None => {
287							result.push(0);
288							bitvec.push(false);
289						}
290					}
291				}
292
293				Ok(ColumnData::uint4_with_bitvec(result, bitvec))
294			}
295			ColumnData::Uint8(_) => {
296				let mut result = Vec::with_capacity(row_count);
297				let mut bitvec = Vec::with_capacity(row_count);
298
299				for row_idx in 0..row_count {
300					let mut max_value: Option<u64> = None;
301
302					for column in columns.iter() {
303						if let ColumnData::Uint8(container) = column.data() {
304							if let Some(value) = container.get(row_idx) {
305								max_value = Some(match max_value {
306									None => *value,
307									Some(current_max) => current_max.max(*value),
308								});
309							}
310						}
311					}
312
313					match max_value {
314						Some(v) => {
315							result.push(v);
316							bitvec.push(true);
317						}
318						None => {
319							result.push(0);
320							bitvec.push(false);
321						}
322					}
323				}
324
325				Ok(ColumnData::uint8_with_bitvec(result, bitvec))
326			}
327			ColumnData::Uint16(_) => {
328				let mut result = Vec::with_capacity(row_count);
329				let mut bitvec = Vec::with_capacity(row_count);
330
331				for row_idx in 0..row_count {
332					let mut max_value: Option<u128> = None;
333
334					for column in columns.iter() {
335						if let ColumnData::Uint16(container) = column.data() {
336							if let Some(value) = container.get(row_idx) {
337								max_value = Some(match max_value {
338									None => *value,
339									Some(current_max) => current_max.max(*value),
340								});
341							}
342						}
343					}
344
345					match max_value {
346						Some(v) => {
347							result.push(v);
348							bitvec.push(true);
349						}
350						None => {
351							result.push(0);
352							bitvec.push(false);
353						}
354					}
355				}
356
357				Ok(ColumnData::uint16_with_bitvec(result, bitvec))
358			}
359			ColumnData::Float4(_) => {
360				let mut result = Vec::with_capacity(row_count);
361				let mut bitvec = Vec::with_capacity(row_count);
362
363				for row_idx in 0..row_count {
364					let mut max_value: Option<f32> = None;
365
366					for column in columns.iter() {
367						if let ColumnData::Float4(container) = column.data() {
368							if let Some(value) = container.get(row_idx) {
369								max_value = Some(match max_value {
370									None => *value,
371									Some(current_max) => current_max.max(*value),
372								});
373							}
374						}
375					}
376
377					match max_value {
378						Some(v) => {
379							result.push(v);
380							bitvec.push(true);
381						}
382						None => {
383							result.push(0.0);
384							bitvec.push(false);
385						}
386					}
387				}
388
389				Ok(ColumnData::float4_with_bitvec(result, bitvec))
390			}
391			ColumnData::Float8(_) => {
392				let mut result = Vec::with_capacity(row_count);
393				let mut bitvec = Vec::with_capacity(row_count);
394
395				for row_idx in 0..row_count {
396					let mut max_value: Option<f64> = None;
397
398					for column in columns.iter() {
399						if let ColumnData::Float8(container) = column.data() {
400							if let Some(value) = container.get(row_idx) {
401								max_value = Some(match max_value {
402									None => *value,
403									Some(current_max) => current_max.max(*value),
404								});
405							}
406						}
407					}
408
409					match max_value {
410						Some(v) => {
411							result.push(v);
412							bitvec.push(true);
413						}
414						None => {
415							result.push(0.0);
416							bitvec.push(false);
417						}
418					}
419				}
420
421				Ok(ColumnData::float8_with_bitvec(result, bitvec))
422			}
423			ColumnData::Int {
424				max_bytes,
425				..
426			} => {
427				use reifydb_type::value::int::Int;
428				let mut result = Vec::with_capacity(row_count);
429				let mut bitvec = Vec::with_capacity(row_count);
430
431				for row_idx in 0..row_count {
432					let mut max_value: Option<Int> = None;
433
434					for column in columns.iter() {
435						if let ColumnData::Int {
436							container,
437							..
438						} = column.data()
439						{
440							if let Some(value) = container.get(row_idx) {
441								max_value = Some(match max_value {
442									None => value.clone(),
443									Some(current_max) => {
444										if value > &current_max {
445											value.clone()
446										} else {
447											current_max
448										}
449									}
450								});
451							}
452						}
453					}
454
455					match max_value {
456						Some(v) => {
457							result.push(v);
458							bitvec.push(true);
459						}
460						None => {
461							result.push(Int::default());
462							bitvec.push(false);
463						}
464					}
465				}
466
467				Ok(ColumnData::Int {
468					container: reifydb_type::value::container::number::NumberContainer::new(result),
469					max_bytes: *max_bytes,
470				})
471			}
472			ColumnData::Uint {
473				max_bytes,
474				..
475			} => {
476				use reifydb_type::value::uint::Uint;
477				let mut result = Vec::with_capacity(row_count);
478				let mut bitvec = Vec::with_capacity(row_count);
479
480				for row_idx in 0..row_count {
481					let mut max_value: Option<Uint> = None;
482
483					for column in columns.iter() {
484						if let ColumnData::Uint {
485							container,
486							..
487						} = column.data()
488						{
489							if let Some(value) = container.get(row_idx) {
490								max_value = Some(match max_value {
491									None => value.clone(),
492									Some(current_max) => {
493										if value > &current_max {
494											value.clone()
495										} else {
496											current_max
497										}
498									}
499								});
500							}
501						}
502					}
503
504					match max_value {
505						Some(v) => {
506							result.push(v);
507							bitvec.push(true);
508						}
509						None => {
510							result.push(Uint::default());
511							bitvec.push(false);
512						}
513					}
514				}
515
516				Ok(ColumnData::Uint {
517					container: reifydb_type::value::container::number::NumberContainer::new(result),
518					max_bytes: *max_bytes,
519				})
520			}
521			ColumnData::Decimal {
522				precision,
523				scale,
524				..
525			} => {
526				use reifydb_type::value::decimal::Decimal;
527				let mut result = Vec::with_capacity(row_count);
528				let mut bitvec = Vec::with_capacity(row_count);
529
530				for row_idx in 0..row_count {
531					let mut max_value: Option<Decimal> = None;
532
533					for column in columns.iter() {
534						if let ColumnData::Decimal {
535							container,
536							..
537						} = column.data()
538						{
539							if let Some(value) = container.get(row_idx) {
540								max_value = Some(match max_value {
541									None => value.clone(),
542									Some(current_max) => {
543										if value > &current_max {
544											value.clone()
545										} else {
546											current_max
547										}
548									}
549								});
550							}
551						}
552					}
553
554					match max_value {
555						Some(v) => {
556							result.push(v);
557							bitvec.push(true);
558						}
559						None => {
560							result.push(Decimal::default());
561							bitvec.push(false);
562						}
563					}
564				}
565
566				Ok(ColumnData::Decimal {
567					container: reifydb_type::value::container::number::NumberContainer::new(result),
568					precision: *precision,
569					scale: *scale,
570				})
571			}
572			other => Err(ScalarFunctionError::InvalidArgumentType {
573				function: ctx.fragment.clone(),
574				argument_index: 0,
575				expected: vec![
576					Type::Int1,
577					Type::Int2,
578					Type::Int4,
579					Type::Int8,
580					Type::Int16,
581					Type::Uint1,
582					Type::Uint2,
583					Type::Uint4,
584					Type::Uint8,
585					Type::Uint16,
586					Type::Float4,
587					Type::Float8,
588					Type::Int,
589					Type::Uint,
590					Type::Decimal,
591				],
592				actual: other.get_type(),
593			}),
594		}
595	}
596
597	fn return_type(&self, input_types: &[Type]) -> Type {
598		input_types[0].clone()
599	}
600}