1use std::cmp::Ordering;
5
6use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer};
7use reifydb_type::{
8 error::Diagnostic,
9 fragment::Fragment,
10 return_error,
11 value::{
12 container::{
13 blob::BlobContainer, bool::BoolContainer, identity_id::IdentityIdContainer,
14 number::NumberContainer, temporal::TemporalContainer, utf8::Utf8Container, uuid::UuidContainer,
15 },
16 decimal::Decimal,
17 int::Int,
18 is::{IsNumber, IsTemporal, IsUuid},
19 number::{compare::partial_cmp, promote::Promote},
20 r#type::Type,
21 uint::Uint,
22 },
23};
24
25use super::option::binary_op_unwrap_option;
26use crate::Result;
27
28macro_rules! dispatch_compare {
29
30 (
31 $left:expr, $right:expr;
32 $fragment:expr;
33 $($extra:tt)*
34 ) => {
35 dispatch_compare!(@rows
36 ($left, $right) ($fragment)
37 [(Float4, f32) (Float8, f64) (Int1, i8) (Int2, i16) (Int4, i32) (Int8, i64) (Int16, i128) (Uint1, u8) (Uint2, u16) (Uint4, u32) (Uint8, u64) (Uint16, u128)]
38 {$($extra)*}
39 {}
40 )
41 };
42
43
44 (@rows
45 ($left:expr, $right:expr) ($fragment:expr)
46 [($L:ident, $Lt:ty) $($rest:tt)*]
47 {$($extra:tt)*}
48 {$($acc:tt)*}
49 ) => {
50 dispatch_compare!(@rows
51 ($left, $right) ($fragment)
52 [$($rest)*]
53 {$($extra)*}
54 {
55 $($acc)*
56 (ColumnBuffer::$L(l), ColumnBuffer::Float4(r)) => { return Ok(compare_number::<Op, $Lt, f32>(l, r, $fragment)); },
57 (ColumnBuffer::$L(l), ColumnBuffer::Float8(r)) => { return Ok(compare_number::<Op, $Lt, f64>(l, r, $fragment)); },
58 (ColumnBuffer::$L(l), ColumnBuffer::Int1(r)) => { return Ok(compare_number::<Op, $Lt, i8>(l, r, $fragment)); },
59 (ColumnBuffer::$L(l), ColumnBuffer::Int2(r)) => { return Ok(compare_number::<Op, $Lt, i16>(l, r, $fragment)); },
60 (ColumnBuffer::$L(l), ColumnBuffer::Int4(r)) => { return Ok(compare_number::<Op, $Lt, i32>(l, r, $fragment)); },
61 (ColumnBuffer::$L(l), ColumnBuffer::Int8(r)) => { return Ok(compare_number::<Op, $Lt, i64>(l, r, $fragment)); },
62 (ColumnBuffer::$L(l), ColumnBuffer::Int16(r)) => { return Ok(compare_number::<Op, $Lt, i128>(l, r, $fragment)); },
63 (ColumnBuffer::$L(l), ColumnBuffer::Uint1(r)) => { return Ok(compare_number::<Op, $Lt, u8>(l, r, $fragment)); },
64 (ColumnBuffer::$L(l), ColumnBuffer::Uint2(r)) => { return Ok(compare_number::<Op, $Lt, u16>(l, r, $fragment)); },
65 (ColumnBuffer::$L(l), ColumnBuffer::Uint4(r)) => { return Ok(compare_number::<Op, $Lt, u32>(l, r, $fragment)); },
66 (ColumnBuffer::$L(l), ColumnBuffer::Uint8(r)) => { return Ok(compare_number::<Op, $Lt, u64>(l, r, $fragment)); },
67 (ColumnBuffer::$L(l), ColumnBuffer::Uint16(r)) => { return Ok(compare_number::<Op, $Lt, u128>(l, r, $fragment)); },
68 (ColumnBuffer::$L(l), ColumnBuffer::Int { container: r, .. }) => { return Ok(compare_number::<Op, $Lt, Int>(l, r, $fragment)); },
69 (ColumnBuffer::$L(l), ColumnBuffer::Uint { container: r, .. }) => { return Ok(compare_number::<Op, $Lt, Uint>(l, r, $fragment)); },
70 (ColumnBuffer::$L(l), ColumnBuffer::Decimal { container: r, .. }) => { return Ok(compare_number::<Op, $Lt, Decimal>(l, r, $fragment)); },
71 }
72 )
73 };
74
75
76 (@rows
77 ($left:expr, $right:expr) ($fragment:expr)
78 []
79 {$($extra:tt)*}
80 {$($acc:tt)*}
81 ) => {
82 match ($left, $right) {
83
84 $($acc)*
85
86
87 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Float4(r)) => { return Ok(compare_number::<Op, Int, f32>(l, r, $fragment)); },
88 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Float8(r)) => { return Ok(compare_number::<Op, Int, f64>(l, r, $fragment)); },
89 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Int1(r)) => { return Ok(compare_number::<Op, Int, i8>(l, r, $fragment)); },
90 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Int2(r)) => { return Ok(compare_number::<Op, Int, i16>(l, r, $fragment)); },
91 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Int4(r)) => { return Ok(compare_number::<Op, Int, i32>(l, r, $fragment)); },
92 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Int8(r)) => { return Ok(compare_number::<Op, Int, i64>(l, r, $fragment)); },
93 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Int16(r)) => { return Ok(compare_number::<Op, Int, i128>(l, r, $fragment)); },
94 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Uint1(r)) => { return Ok(compare_number::<Op, Int, u8>(l, r, $fragment)); },
95 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Uint2(r)) => { return Ok(compare_number::<Op, Int, u16>(l, r, $fragment)); },
96 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Uint4(r)) => { return Ok(compare_number::<Op, Int, u32>(l, r, $fragment)); },
97 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Uint8(r)) => { return Ok(compare_number::<Op, Int, u64>(l, r, $fragment)); },
98 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Uint16(r)) => { return Ok(compare_number::<Op, Int, u128>(l, r, $fragment)); },
99 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Int { container: r, .. }) => { return Ok(compare_number::<Op, Int, Int>(l, r, $fragment)); },
100 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Uint { container: r, .. }) => { return Ok(compare_number::<Op, Int, Uint>(l, r, $fragment)); },
101 (ColumnBuffer::Int { container: l, .. }, ColumnBuffer::Decimal { container: r, .. }) => { return Ok(compare_number::<Op, Int, Decimal>(l, r, $fragment)); },
102
103
104 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Float4(r)) => { return Ok(compare_number::<Op, Uint, f32>(l, r, $fragment)); },
105 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Float8(r)) => { return Ok(compare_number::<Op, Uint, f64>(l, r, $fragment)); },
106 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Int1(r)) => { return Ok(compare_number::<Op, Uint, i8>(l, r, $fragment)); },
107 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Int2(r)) => { return Ok(compare_number::<Op, Uint, i16>(l, r, $fragment)); },
108 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Int4(r)) => { return Ok(compare_number::<Op, Uint, i32>(l, r, $fragment)); },
109 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Int8(r)) => { return Ok(compare_number::<Op, Uint, i64>(l, r, $fragment)); },
110 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Int16(r)) => { return Ok(compare_number::<Op, Uint, i128>(l, r, $fragment)); },
111 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Uint1(r)) => { return Ok(compare_number::<Op, Uint, u8>(l, r, $fragment)); },
112 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Uint2(r)) => { return Ok(compare_number::<Op, Uint, u16>(l, r, $fragment)); },
113 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Uint4(r)) => { return Ok(compare_number::<Op, Uint, u32>(l, r, $fragment)); },
114 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Uint8(r)) => { return Ok(compare_number::<Op, Uint, u64>(l, r, $fragment)); },
115 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Uint16(r)) => { return Ok(compare_number::<Op, Uint, u128>(l, r, $fragment)); },
116 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Int { container: r, .. }) => { return Ok(compare_number::<Op, Uint, Int>(l, r, $fragment)); },
117 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Uint { container: r, .. }) => { return Ok(compare_number::<Op, Uint, Uint>(l, r, $fragment)); },
118 (ColumnBuffer::Uint { container: l, .. }, ColumnBuffer::Decimal { container: r, .. }) => { return Ok(compare_number::<Op, Uint, Decimal>(l, r, $fragment)); },
119
120
121 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Float4(r)) => { return Ok(compare_number::<Op, Decimal, f32>(l, r, $fragment)); },
122 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Float8(r)) => { return Ok(compare_number::<Op, Decimal, f64>(l, r, $fragment)); },
123 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Int1(r)) => { return Ok(compare_number::<Op, Decimal, i8>(l, r, $fragment)); },
124 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Int2(r)) => { return Ok(compare_number::<Op, Decimal, i16>(l, r, $fragment)); },
125 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Int4(r)) => { return Ok(compare_number::<Op, Decimal, i32>(l, r, $fragment)); },
126 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Int8(r)) => { return Ok(compare_number::<Op, Decimal, i64>(l, r, $fragment)); },
127 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Int16(r)) => { return Ok(compare_number::<Op, Decimal, i128>(l, r, $fragment)); },
128 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Uint1(r)) => { return Ok(compare_number::<Op, Decimal, u8>(l, r, $fragment)); },
129 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Uint2(r)) => { return Ok(compare_number::<Op, Decimal, u16>(l, r, $fragment)); },
130 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Uint4(r)) => { return Ok(compare_number::<Op, Decimal, u32>(l, r, $fragment)); },
131 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Uint8(r)) => { return Ok(compare_number::<Op, Decimal, u64>(l, r, $fragment)); },
132 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Uint16(r)) => { return Ok(compare_number::<Op, Decimal, u128>(l, r, $fragment)); },
133 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Int { container: r, .. }) => { return Ok(compare_number::<Op, Decimal, Int>(l, r, $fragment)); },
134 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Uint { container: r, .. }) => { return Ok(compare_number::<Op, Decimal, Uint>(l, r, $fragment)); },
135 (ColumnBuffer::Decimal { container: l, .. }, ColumnBuffer::Decimal { container: r, .. }) => { return Ok(compare_number::<Op, Decimal, Decimal>(l, r, $fragment)); },
136
137
138 $($extra)*
139 }
140 };
141}
142
143pub(crate) trait CompareOp {
144 fn compare_ordering(ordering: Option<Ordering>) -> bool;
145 fn compare_bool(_l: bool, _r: bool) -> Option<bool> {
146 None
147 }
148}
149
150pub(crate) struct Equal;
151pub(crate) struct NotEqual;
152pub(crate) struct GreaterThan;
153pub(crate) struct GreaterThanEqual;
154pub(crate) struct LessThan;
155pub(crate) struct LessThanEqual;
156
157impl CompareOp for Equal {
158 #[inline]
159 fn compare_ordering(o: Option<Ordering>) -> bool {
160 o == Some(Ordering::Equal)
161 }
162 #[inline]
163 fn compare_bool(l: bool, r: bool) -> Option<bool> {
164 Some(l == r)
165 }
166}
167
168impl CompareOp for NotEqual {
169 #[inline]
170 fn compare_ordering(o: Option<Ordering>) -> bool {
171 o != Some(Ordering::Equal)
172 }
173 #[inline]
174 fn compare_bool(l: bool, r: bool) -> Option<bool> {
175 Some(l != r)
176 }
177}
178
179impl CompareOp for GreaterThan {
180 #[inline]
181 fn compare_ordering(o: Option<Ordering>) -> bool {
182 o == Some(Ordering::Greater)
183 }
184}
185
186impl CompareOp for GreaterThanEqual {
187 #[inline]
188 fn compare_ordering(o: Option<Ordering>) -> bool {
189 matches!(o, Some(Ordering::Greater) | Some(Ordering::Equal))
190 }
191}
192
193impl CompareOp for LessThan {
194 #[inline]
195 fn compare_ordering(o: Option<Ordering>) -> bool {
196 o == Some(Ordering::Less)
197 }
198}
199
200impl CompareOp for LessThanEqual {
201 #[inline]
202 fn compare_ordering(o: Option<Ordering>) -> bool {
203 matches!(o, Some(Ordering::Less) | Some(Ordering::Equal))
204 }
205}
206
207#[inline]
208fn compare_number<Op: CompareOp, L, R>(
209 l: &NumberContainer<L>,
210 r: &NumberContainer<R>,
211 fragment: Fragment,
212) -> ColumnWithName
213where
214 L: Promote<R> + IsNumber,
215 R: IsNumber,
216 <L as Promote<R>>::Output: IsNumber,
217{
218 debug_assert_eq!(l.len(), r.len());
219
220 let data: Vec<bool> =
221 l.data().iter()
222 .zip(r.data().iter())
223 .map(|(l_val, r_val)| Op::compare_ordering(partial_cmp(l_val, r_val)))
224 .collect();
225
226 ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data))
227}
228
229#[inline]
230fn compare_temporal<Op: CompareOp, T>(
231 l: &TemporalContainer<T>,
232 r: &TemporalContainer<T>,
233 fragment: Fragment,
234) -> ColumnWithName
235where
236 T: IsTemporal + Copy + PartialOrd,
237{
238 debug_assert_eq!(l.len(), r.len());
239
240 let data: Vec<bool> =
241 l.data().iter()
242 .zip(r.data().iter())
243 .map(|(l_val, r_val)| Op::compare_ordering(l_val.partial_cmp(r_val)))
244 .collect();
245
246 ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data))
247}
248
249#[inline]
250fn compare_uuid<Op: CompareOp, T>(l: &UuidContainer<T>, r: &UuidContainer<T>, fragment: Fragment) -> ColumnWithName
251where
252 T: IsUuid + PartialOrd,
253{
254 debug_assert_eq!(l.len(), r.len());
255
256 let data: Vec<bool> =
257 l.data().iter()
258 .zip(r.data().iter())
259 .map(|(l_val, r_val)| Op::compare_ordering(l_val.partial_cmp(r_val)))
260 .collect();
261
262 ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data))
263}
264
265#[inline]
266fn compare_identity_id<Op: CompareOp>(
267 l: &IdentityIdContainer,
268 r: &IdentityIdContainer,
269 fragment: Fragment,
270) -> ColumnWithName {
271 debug_assert_eq!(l.len(), r.len());
272
273 let data: Vec<bool> =
274 l.iter().zip(r.iter()).map(|(l_val, r_val)| Op::compare_ordering(l_val.partial_cmp(&r_val))).collect();
275
276 ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data))
277}
278
279#[inline]
280fn compare_blob<Op: CompareOp>(l: &BlobContainer, r: &BlobContainer, fragment: Fragment) -> ColumnWithName {
281 debug_assert_eq!(l.len(), r.len());
282
283 let data: Vec<bool> = l
284 .iter_bytes()
285 .zip(r.iter_bytes())
286 .map(|(l_val, r_val)| Op::compare_ordering(l_val.partial_cmp(r_val)))
287 .collect();
288
289 ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data))
290}
291
292#[inline]
293fn compare_utf8<Op: CompareOp>(l: &Utf8Container, r: &Utf8Container, fragment: Fragment) -> ColumnWithName {
294 debug_assert_eq!(l.len(), r.len());
295
296 let data: Vec<bool> = l
297 .iter_str()
298 .zip(r.iter_str())
299 .map(|(l_val, r_val)| Op::compare_ordering(l_val.partial_cmp(r_val)))
300 .collect();
301
302 ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data))
303}
304
305#[inline]
306fn compare_bool<Op: CompareOp>(l: &BoolContainer, r: &BoolContainer, fragment: Fragment) -> Option<ColumnWithName> {
307 debug_assert_eq!(l.len(), r.len());
308
309 let data: Vec<bool> =
310 l.data().iter()
311 .zip(r.data().iter())
312 .filter_map(|(l_val, r_val)| Op::compare_bool(l_val, r_val))
313 .collect();
314
315 if data.len() == l.len() {
316 Some(ColumnWithName::new(Fragment::internal(fragment.text()), ColumnBuffer::bool(data)))
317 } else {
318 None
319 }
320}
321
322pub(crate) fn compare_columns<Op: CompareOp>(
323 left: &ColumnWithName,
324 right: &ColumnWithName,
325 fragment: Fragment,
326 error_fn: impl FnOnce(Fragment, Type, Type) -> Diagnostic,
327) -> Result<ColumnWithName> {
328 binary_op_unwrap_option(left, right, fragment.clone(), |left, right| {
329 dispatch_compare!(
330 &left.data(), &right.data();
331 fragment;
332
333 (ColumnBuffer::Bool(l), ColumnBuffer::Bool(r)) => {
334 if let Some(col) = compare_bool::<Op>(l, r, fragment.clone()) {
335 return Ok(col);
336 }
337 return_error!(error_fn(fragment, left.get_type(), right.get_type()))
338 }
339
340 (ColumnBuffer::Date(l), ColumnBuffer::Date(r)) => {
341 Ok(compare_temporal::<Op, _>(l, r, fragment))
342 },
343 (ColumnBuffer::DateTime(l), ColumnBuffer::DateTime(r)) => {
344 Ok(compare_temporal::<Op, _>(l, r, fragment))
345 },
346 (ColumnBuffer::Time(l), ColumnBuffer::Time(r)) => {
347 Ok(compare_temporal::<Op, _>(l, r, fragment))
348 },
349 (ColumnBuffer::Duration(l), ColumnBuffer::Duration(r)) => {
350 Ok(compare_temporal::<Op, _>(l, r, fragment))
351 },
352
353 (
354 ColumnBuffer::Utf8 {
355 container: l,
356 ..
357 },
358 ColumnBuffer::Utf8 {
359 container: r,
360 ..
361 },
362 ) => {
363 Ok(compare_utf8::<Op>(l, r, fragment))
364 },
365
366 (ColumnBuffer::Uuid4(l), ColumnBuffer::Uuid4(r)) => {
367 Ok(compare_uuid::<Op, _>(l, r, fragment))
368 },
369 (ColumnBuffer::Uuid7(l), ColumnBuffer::Uuid7(r)) => {
370 Ok(compare_uuid::<Op, _>(l, r, fragment))
371 },
372 (ColumnBuffer::IdentityId(l), ColumnBuffer::IdentityId(r)) => {
373 Ok(compare_identity_id::<Op>(l, r, fragment))
374 },
375 (
376 ColumnBuffer::Blob {
377 container: l,
378 ..
379 },
380 ColumnBuffer::Blob {
381 container: r,
382 ..
383 },
384 ) => {
385 Ok(compare_blob::<Op>(l, r, fragment))
386 },
387
388 _ => {
389 return_error!(error_fn(fragment, left.get_type(), right.get_type()))
390 },
391 )
392 })
393}