proof_of_sql/base/commitment/
column_bounds.rs

1use super::committable_column::CommittableColumn;
2use alloc::boxed::Box;
3use serde::{Deserialize, Serialize};
4use snafu::Snafu;
5
6/// Cannot construct bounds where min is greater than max.
7#[derive(Snafu, Debug)]
8#[snafu(display("cannot construct bounds where min is greater than max"))]
9pub struct NegativeBounds;
10
11/// Inner value for [`Bounds::Sharp`] and [`Bounds::Bounded`].
12///
13/// Creating a separate type for this provides two benefits.
14/// 1. reduced repeated code between the two variants
15/// 2. privatization of the min/max for these variants, preventing invalid states
16#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
17pub struct BoundsInner<T>
18where
19    T: Ord,
20{
21    /// The minimum value of the data.
22    min: T,
23    /// The maximum value of the data.
24    max: T,
25}
26
27impl<T> BoundsInner<T>
28where
29    T: Ord,
30{
31    /// Construct a new [`BoundsInner`].
32    ///
33    /// Returns an error if min > max.
34    pub fn try_new(min: T, max: T) -> Result<Self, NegativeBounds> {
35        if min > max {
36            return Err(NegativeBounds);
37        }
38
39        Ok(BoundsInner { min, max })
40    }
41
42    /// Immutable accessor for the minimum value.
43    pub fn min(&self) -> &T {
44        &self.min
45    }
46
47    /// Immutable accessor for the maximum value.
48    pub fn max(&self) -> &T {
49        &self.max
50    }
51
52    /// Combine two [`Bounds`]s as if their source collections are being unioned.
53    pub fn union(self, other: BoundsInner<T>) -> Self {
54        BoundsInner {
55            min: self.min.min(other.min),
56            max: self.max.max(other.max),
57        }
58    }
59
60    /// Returns true if the value is within these bounds.
61    ///
62    /// This doesn't necessarily mean that the source collection contains this value.
63    /// However, a `false` result implies that the source collection cannot contain this value.
64    pub fn surrounds(&self, value: &T) -> bool {
65        &self.min <= value && value <= &self.max
66    }
67}
68
69/// Minimum and maximum values (inclusive) of a collection of data, with some other variants for edge cases.
70#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
71pub enum Bounds<T>
72where
73    T: Ord,
74{
75    /// The source collection is empty so has no bounds.
76    #[default]
77    Empty,
78    /// After some operation (like `Bounds::difference`), the bounds cannot be determined exactly.
79    ///
80    /// Instead, this variant underestimates the minimum and overestimates the maximum.
81    Bounded(BoundsInner<T>),
82    /// The exact bounds of the values of the source collection (inclusive).
83    Sharp(BoundsInner<T>),
84}
85
86impl<T> Bounds<T>
87where
88    T: Ord,
89{
90    /// Construct a new [`Bounds::Sharp`].
91    ///
92    /// Returns an error if min > max.
93    pub fn sharp(min: T, max: T) -> Result<Self, NegativeBounds> {
94        Ok(Bounds::Sharp(BoundsInner::try_new(min, max)?))
95    }
96
97    /// Construct a new [`Bounds::bounded`].
98    ///
99    /// Returns an error if min > max.
100    pub fn bounded(min: T, max: T) -> Result<Self, NegativeBounds> {
101        Ok(Bounds::Bounded(BoundsInner::try_new(min, max)?))
102    }
103
104    /// Combine two [`Bounds`]s as if their source collections are being unioned.
105    fn union(self, other: Bounds<T>) -> Self {
106        match (self, other) {
107            (Bounds::Sharp(bounds_a), Bounds::Sharp(bounds_b)) => {
108                Bounds::Sharp(bounds_a.union(bounds_b))
109            }
110            (Bounds::Bounded(bounds_a) | Bounds::Sharp(bounds_a), Bounds::Bounded(bounds_b))
111            | (Bounds::Bounded(bounds_a), Bounds::Sharp(bounds_b)) => {
112                Bounds::Bounded(bounds_a.union(bounds_b))
113            }
114            (bounds, Bounds::Empty) | (Bounds::Empty, bounds) => bounds,
115        }
116    }
117
118    /// Combine two [`Bounds`]s as if their source collections are being differenced.
119    ///
120    /// This should be interpreted as the set difference of the two collections.
121    /// The result would be the rows in self that are not also rows in other.
122    ///
123    /// It can't be determined *which* values are being removed from self's source collection.
124    /// So, in most cases, the resulting [`Bounds`] is [`Bounds::Bounded`].
125    /// Exceptions to this are cases where it can be determined that *no* values are removed.
126    fn difference(self, other: Bounds<T>) -> Self {
127        match (self, other) {
128            (Bounds::Empty, _) => Bounds::Empty,
129            (bounds, Bounds::Empty) => bounds,
130            (Bounds::Sharp(bounds_a), Bounds::Sharp(bounds_b) | Bounds::Bounded(bounds_b))
131                if bounds_a.max() < bounds_b.min() || bounds_b.max() < bounds_a.min() =>
132            {
133                // source collections must be disjoint, so no rows are removed
134                Bounds::Sharp(bounds_a)
135            }
136            (Bounds::Bounded(bounds) | Bounds::Sharp(bounds), _) => Bounds::Bounded(bounds),
137        }
138    }
139
140    /// Returns true if the value is within these bounds.
141    ///
142    /// This doesn't necessarily mean that the source collection contains this value.
143    /// However, a `false` result implies that the source collection cannot contain this value.
144    pub fn surrounds(&self, value: &T) -> bool {
145        match self {
146            Bounds::Empty => false,
147            Bounds::Bounded(inner) | Bounds::Sharp(inner) => inner.surrounds(value),
148        }
149    }
150}
151
152impl<'a, T> FromIterator<&'a T> for Bounds<T>
153where
154    T: Ord + Copy + 'a,
155{
156    fn from_iter<I: IntoIterator<Item = &'a T>>(iter: I) -> Self {
157        let bounds_borrowed = iter
158            .into_iter()
159            .fold(Bounds::<&T>::Empty, |bounds, element| match bounds {
160                Bounds::Sharp(BoundsInner { min, max }) => Bounds::Sharp(BoundsInner {
161                    min: min.min(element),
162                    max: max.max(element),
163                }),
164                Bounds::Empty => Bounds::Sharp(BoundsInner {
165                    min: element,
166                    max: element,
167                }),
168                Bounds::Bounded(_) => {
169                    panic!("bounds should never be bounded in this function");
170                }
171            });
172
173        // Copy only on the final bounds values
174        match bounds_borrowed {
175            Bounds::Sharp(BoundsInner { min, max }) => Bounds::Sharp(BoundsInner {
176                min: *min,
177                max: *max,
178            }),
179            Bounds::Empty => Bounds::Empty,
180            Bounds::Bounded(_) => {
181                panic!("bounds should never be bounded in this function")
182            }
183        }
184    }
185}
186
187/// Columns with different [`ColumnBounds`] variants cannot operate with each other.
188#[derive(Debug, Snafu)]
189#[snafu(display(
190    "column with bounds {bounds_a:?} cannot operate with column with bounds {bounds_b:?}"
191))]
192pub struct ColumnBoundsMismatch {
193    bounds_a: Box<ColumnBounds>,
194    bounds_b: Box<ColumnBounds>,
195}
196
197/// Column metadata storing the bounds for column types that have order.
198///
199/// Other Ord column variants do exist (like Scalar/Boolean).
200/// However, bounding these is useless unless we are performing indexing on these columns.
201/// This functionality only be considered after we support them in the user-facing sql.
202#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
203pub enum ColumnBounds {
204    /// Column does not have order.
205    NoOrder,
206    /// The bounds of a `Uint8` column.
207    Uint8(Bounds<u8>),
208    /// The bounds of a `TinyInt` column.
209    TinyInt(Bounds<i8>),
210    /// The bounds of a `SmallInt` column.
211    SmallInt(Bounds<i16>),
212    /// The bounds of an Int column.
213    Int(Bounds<i32>),
214    /// The bounds of a `BigInt` column.
215    BigInt(Bounds<i64>),
216    /// The bounds of an Int128 column.
217    Int128(Bounds<i128>),
218    /// The bounds of a Timestamp column.
219    TimestampTZ(Bounds<i64>),
220}
221
222impl ColumnBounds {
223    /// Construct a [`ColumnBounds`] from a column by reference.
224    ///
225    /// If the column variant has order, only the minimum and maximum value will be copied.
226    #[must_use]
227    pub fn from_column(column: &CommittableColumn) -> ColumnBounds {
228        match column {
229            CommittableColumn::TinyInt(ints) => ColumnBounds::TinyInt(Bounds::from_iter(*ints)),
230            CommittableColumn::Uint8(ints) => ColumnBounds::Uint8(Bounds::from_iter(*ints)),
231            CommittableColumn::SmallInt(ints) => ColumnBounds::SmallInt(Bounds::from_iter(*ints)),
232            CommittableColumn::Int(ints) => ColumnBounds::Int(Bounds::from_iter(*ints)),
233            CommittableColumn::BigInt(ints) => ColumnBounds::BigInt(Bounds::from_iter(*ints)),
234            CommittableColumn::Int128(ints) => ColumnBounds::Int128(Bounds::from_iter(*ints)),
235            CommittableColumn::TimestampTZ(_, _, times) => {
236                ColumnBounds::TimestampTZ(Bounds::from_iter(*times))
237            }
238            CommittableColumn::Boolean(_)
239            | CommittableColumn::Decimal75(_, _, _)
240            | CommittableColumn::Scalar(_)
241            | CommittableColumn::VarChar(_) => ColumnBounds::NoOrder,
242        }
243    }
244
245    /// Combine two [`ColumnBounds`] as if their source collections are being unioned.
246    ///
247    /// Can error if the two values do not share the same [`ColumnBounds`] variant.
248    pub fn try_union(self, other: Self) -> Result<Self, ColumnBoundsMismatch> {
249        match (self, other) {
250            (ColumnBounds::NoOrder, ColumnBounds::NoOrder) => Ok(ColumnBounds::NoOrder),
251            (ColumnBounds::Uint8(bounds_a), ColumnBounds::Uint8(bounds_b)) => {
252                Ok(ColumnBounds::Uint8(bounds_a.union(bounds_b)))
253            }
254            (ColumnBounds::TinyInt(bounds_a), ColumnBounds::TinyInt(bounds_b)) => {
255                Ok(ColumnBounds::TinyInt(bounds_a.union(bounds_b)))
256            }
257            (ColumnBounds::SmallInt(bounds_a), ColumnBounds::SmallInt(bounds_b)) => {
258                Ok(ColumnBounds::SmallInt(bounds_a.union(bounds_b)))
259            }
260            (ColumnBounds::Int(bounds_a), ColumnBounds::Int(bounds_b)) => {
261                Ok(ColumnBounds::Int(bounds_a.union(bounds_b)))
262            }
263            (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => {
264                Ok(ColumnBounds::BigInt(bounds_a.union(bounds_b)))
265            }
266            (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => {
267                Ok(ColumnBounds::TimestampTZ(bounds_a.union(bounds_b)))
268            }
269            (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => {
270                Ok(ColumnBounds::Int128(bounds_a.union(bounds_b)))
271            }
272            (bounds_a, bounds_b) => Err(ColumnBoundsMismatch {
273                bounds_a: Box::new(bounds_a),
274                bounds_b: Box::new(bounds_b),
275            }),
276        }
277    }
278
279    /// Combine two [`ColumnBounds`] as if their source collections are being differenced.
280    ///
281    /// This should be interpreted as the set difference of the two collections.
282    /// The result would be the rows in self that are not also rows in other.
283    pub fn try_difference(self, other: Self) -> Result<Self, ColumnBoundsMismatch> {
284        match (self, other) {
285            (ColumnBounds::NoOrder, ColumnBounds::NoOrder) => Ok(self),
286            (ColumnBounds::Uint8(bounds_a), ColumnBounds::Uint8(bounds_b)) => {
287                Ok(ColumnBounds::Uint8(bounds_a.difference(bounds_b)))
288            }
289            (ColumnBounds::TinyInt(bounds_a), ColumnBounds::TinyInt(bounds_b)) => {
290                Ok(ColumnBounds::TinyInt(bounds_a.difference(bounds_b)))
291            }
292            (ColumnBounds::SmallInt(bounds_a), ColumnBounds::SmallInt(bounds_b)) => {
293                Ok(ColumnBounds::SmallInt(bounds_a.difference(bounds_b)))
294            }
295            (ColumnBounds::Int(bounds_a), ColumnBounds::Int(bounds_b)) => {
296                Ok(ColumnBounds::Int(bounds_a.difference(bounds_b)))
297            }
298            (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => {
299                Ok(ColumnBounds::BigInt(bounds_a.difference(bounds_b)))
300            }
301            (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => {
302                Ok(ColumnBounds::Int128(bounds_a.difference(bounds_b)))
303            }
304            (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => {
305                Ok(ColumnBounds::TimestampTZ(bounds_a.difference(bounds_b)))
306            }
307            (_, _) => Err(ColumnBoundsMismatch {
308                bounds_a: Box::new(self),
309                bounds_b: Box::new(other),
310            }),
311        }
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use crate::base::{
319        database::OwnedColumn, math::decimal::Precision, scalar::test_scalar::TestScalar,
320    };
321    use alloc::{string::String, vec};
322    use itertools::Itertools;
323    use proof_of_sql_parser::posql_time::{PoSQLTimeUnit, PoSQLTimeZone};
324
325    #[test]
326    fn we_can_construct_bounds_by_method() {
327        let sharp_bounds = Bounds::<i32>::sharp(-5, 10).unwrap();
328        assert_eq!(
329            sharp_bounds,
330            Bounds::Sharp(BoundsInner { min: -5, max: 10 })
331        );
332
333        let bounded_bounds = Bounds::<i32>::bounded(-15, -10).unwrap();
334        assert_eq!(
335            bounded_bounds,
336            Bounds::Bounded(BoundsInner { min: -15, max: -10 })
337        );
338    }
339
340    #[test]
341    fn we_cannot_construct_negative_bounds() {
342        let negative_sharp_bounds = Bounds::<i32>::sharp(10, 5);
343        assert!(matches!(negative_sharp_bounds, Err(NegativeBounds)));
344
345        let negative_bounded_bounds = Bounds::<i32>::bounded(-10, -15);
346        assert!(matches!(negative_bounded_bounds, Err(NegativeBounds)));
347    }
348
349    #[test]
350    fn we_can_construct_bounds_from_iterator() {
351        // empty case
352        let empty_bounds = Bounds::<i32>::from_iter([]);
353        assert_eq!(empty_bounds, Bounds::Empty);
354
355        // nonempty case
356        let ints = [1, 2, 3, 1, 0, -1];
357        let bounds = Bounds::from_iter(&ints);
358        assert_eq!(bounds, Bounds::Sharp(BoundsInner { min: -1, max: 3 }));
359    }
360
361    #[test]
362    fn we_can_determine_if_bounds_surround_value() {
363        // empty case
364        assert!(!Bounds::Empty.surrounds(&0));
365
366        let sharp = Bounds::Sharp(BoundsInner { min: 2, max: 4 });
367        assert!(!sharp.surrounds(&1));
368        assert!(sharp.surrounds(&2));
369        assert!(sharp.surrounds(&3));
370        assert!(sharp.surrounds(&4));
371        assert!(!sharp.surrounds(&5));
372
373        let bounded = Bounds::Bounded(BoundsInner { min: 2, max: 4 });
374        assert!(!bounded.surrounds(&1));
375        assert!(bounded.surrounds(&2));
376        assert!(bounded.surrounds(&3));
377        assert!(bounded.surrounds(&4));
378        assert!(!bounded.surrounds(&5));
379    }
380
381    #[test]
382    fn we_can_union_sharp_bounds() {
383        let bounds_a = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
384
385        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 2 });
386        assert_eq!(
387            bounds_a.union(bounds_b),
388            Bounds::Sharp(BoundsInner { min: 1, max: 6 })
389        );
390
391        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 4 });
392        assert_eq!(
393            bounds_a.union(bounds_b),
394            Bounds::Sharp(BoundsInner { min: 1, max: 6 })
395        );
396
397        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 7 });
398        assert_eq!(
399            bounds_a.union(bounds_b),
400            Bounds::Sharp(BoundsInner { min: 1, max: 7 })
401        );
402
403        let bounds_b = Bounds::Sharp(BoundsInner { min: 4, max: 5 });
404        assert_eq!(
405            bounds_a.union(bounds_b),
406            Bounds::Sharp(BoundsInner { min: 3, max: 6 })
407        );
408
409        let bounds_b = Bounds::Sharp(BoundsInner { min: 4, max: 7 });
410        assert_eq!(
411            bounds_a.union(bounds_b),
412            Bounds::Sharp(BoundsInner { min: 3, max: 7 })
413        );
414
415        let bounds_b = Bounds::Sharp(BoundsInner { min: 7, max: 8 });
416        assert_eq!(
417            bounds_a.union(bounds_b),
418            Bounds::Sharp(BoundsInner { min: 3, max: 8 })
419        );
420    }
421
422    #[test]
423    fn we_can_union_sharp_and_empty_bounds() {
424        let sharp = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
425        let empty = Bounds::Empty;
426
427        assert_eq!(sharp.union(empty), sharp);
428        assert_eq!(empty.union(sharp), sharp);
429        assert_eq!(empty.union(empty), empty);
430    }
431
432    #[test]
433    fn union_of_bounded_bounds_is_bounded() {
434        let sharp = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
435        let bounded = Bounds::Bounded(BoundsInner { min: 7, max: 10 });
436        let union = Bounds::Bounded(BoundsInner { min: 3, max: 10 });
437        let empty = Bounds::Empty;
438
439        assert_eq!(sharp.union(bounded), union);
440        assert_eq!(bounded.union(sharp), union);
441
442        assert_eq!(empty.union(bounded), bounded);
443        assert_eq!(bounded.union(empty), bounded);
444
445        assert_eq!(bounded.union(bounded), bounded);
446    }
447
448    #[test]
449    fn we_can_take_difference_of_disjoint_bounds() {
450        let bounds_a = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
451        let bounds_b = Bounds::Sharp(BoundsInner { min: -6, max: -3 });
452        let bounded = Bounds::Bounded(BoundsInner { min: -6, max: -3 });
453
454        assert_eq!(bounds_a.difference(bounds_b), bounds_a);
455        assert_eq!(bounds_b.difference(bounds_a), bounds_b);
456
457        assert_eq!(bounds_a.difference(bounded), bounds_a);
458
459        let empty = Bounds::Empty;
460
461        assert_eq!(bounds_a.difference(empty), bounds_a);
462        assert_eq!(empty.difference(bounds_a), empty);
463
464        assert_eq!(empty.difference(empty), empty);
465    }
466
467    #[test]
468    fn difference_with_bounded_minuend_is_bounded() {
469        let sharp = Bounds::Sharp(BoundsInner { min: -5, max: 5 });
470        let bounded_a = Bounds::Bounded(BoundsInner { min: 6, max: 10 });
471        let bounded_b = Bounds::Bounded(BoundsInner { min: 11, max: 15 });
472        let empty = Bounds::Empty;
473
474        assert_eq!(bounded_a.difference(sharp), bounded_a);
475
476        assert_eq!(bounded_a.difference(bounded_b), bounded_a);
477        assert_eq!(bounded_b.difference(bounded_a), bounded_b);
478
479        assert_eq!(bounded_a.difference(empty), bounded_a);
480
481        // Still empty since there are still no rows in empty that are also in bounded
482        assert_eq!(empty.difference(bounded_a), empty);
483    }
484
485    #[test]
486    fn difference_of_overlapping_bounds_is_bounded() {
487        let bounds_a = BoundsInner { min: 3, max: 6 };
488        let sharp_a = Bounds::Sharp(bounds_a);
489        let bounded_a = Bounds::Bounded(bounds_a);
490
491        let bounds_b = BoundsInner { min: 1, max: 4 };
492        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
493        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
494
495        let bounds_b = BoundsInner { min: 1, max: 7 };
496        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
497        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
498
499        let bounds_b = BoundsInner { min: 4, max: 5 };
500        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
501        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
502
503        let bounds_b = BoundsInner { min: 4, max: 7 };
504        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
505        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
506    }
507
508    #[test]
509    fn we_can_construct_column_bounds_from_column() {
510        let varchar_column = OwnedColumn::<TestScalar>::VarChar(
511            ["Lorem", "ipsum", "dolor", "sit", "amet"]
512                .map(String::from)
513                .to_vec(),
514        );
515        let committable_varchar_column = CommittableColumn::from(&varchar_column);
516        let varchar_column_bounds = ColumnBounds::from_column(&committable_varchar_column);
517        assert_eq!(varchar_column_bounds, ColumnBounds::NoOrder);
518
519        let tinyint_column = OwnedColumn::<TestScalar>::TinyInt([1, 2, 3, 1, 0].to_vec());
520        let committable_tinyint_column = CommittableColumn::from(&tinyint_column);
521        let tinyint_column_bounds = ColumnBounds::from_column(&committable_tinyint_column);
522        assert_eq!(
523            tinyint_column_bounds,
524            ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
525        );
526
527        let smallint_column = OwnedColumn::<TestScalar>::SmallInt([1, 2, 3, 1, 0].to_vec());
528        let committable_smallint_column = CommittableColumn::from(&smallint_column);
529        let smallint_column_bounds = ColumnBounds::from_column(&committable_smallint_column);
530        assert_eq!(
531            smallint_column_bounds,
532            ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
533        );
534
535        let int_column = OwnedColumn::<TestScalar>::Int([1, 2, 3, 1, 0].to_vec());
536        let committable_int_column = CommittableColumn::from(&int_column);
537        let int_column_bounds = ColumnBounds::from_column(&committable_int_column);
538        assert_eq!(
539            int_column_bounds,
540            ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
541        );
542
543        let bigint_column = OwnedColumn::<TestScalar>::BigInt([1, 2, 3, 1, 0].to_vec());
544        let committable_bigint_column = CommittableColumn::from(&bigint_column);
545        let bigint_column_bounds = ColumnBounds::from_column(&committable_bigint_column);
546        assert_eq!(
547            bigint_column_bounds,
548            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
549        );
550
551        let int128_column = OwnedColumn::<TestScalar>::Int128([1, 2, 3, 1, 0].to_vec());
552        let committable_int128_column = CommittableColumn::from(&int128_column);
553        let int128_column_bounds = ColumnBounds::from_column(&committable_int128_column);
554        assert_eq!(
555            int128_column_bounds,
556            ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
557        );
558
559        let decimal75_column = OwnedColumn::<TestScalar>::Decimal75(
560            Precision::new(1).unwrap(),
561            0,
562            vec![
563                -TestScalar::from([1, 0, 0, 0]),
564                TestScalar::from([2, 0, 0, 0]),
565                TestScalar::from([3, 0, 0, 0]),
566            ],
567        );
568        let committable_decimal75_column = CommittableColumn::from(&decimal75_column);
569        let decimal75_column_bounds = ColumnBounds::from_column(&committable_decimal75_column);
570        assert_eq!(decimal75_column_bounds, ColumnBounds::NoOrder);
571
572        let timestamp_column = OwnedColumn::<TestScalar>::TimestampTZ(
573            PoSQLTimeUnit::Second,
574            PoSQLTimeZone::utc(),
575            vec![1_i64, 2, 3, 4],
576        );
577        let committable_timestamp_column = CommittableColumn::from(&timestamp_column);
578        let timestamp_column_bounds = ColumnBounds::from_column(&committable_timestamp_column);
579        assert_eq!(
580            timestamp_column_bounds,
581            ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 }))
582        );
583    }
584
585    #[test]
586    fn we_can_union_column_bounds_with_matching_variant() {
587        let no_order = ColumnBounds::NoOrder;
588        assert_eq!(no_order.try_union(no_order).unwrap(), no_order);
589
590        let tinyint_a = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
591        let tinyint_b = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
592        assert_eq!(
593            tinyint_a.try_union(tinyint_b).unwrap(),
594            ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
595        );
596
597        let smallint_a = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
598        let smallint_b = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
599        assert_eq!(
600            smallint_a.try_union(smallint_b).unwrap(),
601            ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
602        );
603
604        let int_a = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
605        let int_b = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
606        assert_eq!(
607            int_a.try_union(int_b).unwrap(),
608            ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
609        );
610
611        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
612        let bigint_b = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
613        assert_eq!(
614            bigint_a.try_union(bigint_b).unwrap(),
615            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
616        );
617
618        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
619        let bigint_b = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
620        assert_eq!(
621            bigint_a.try_union(bigint_b).unwrap(),
622            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
623        );
624
625        let int128_a = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
626        let int128_b = ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 4, max: 6 }));
627        assert_eq!(
628            int128_a.try_union(int128_b).unwrap(),
629            ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 6 }))
630        );
631
632        let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
633        let timestamp_b =
634            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 4, max: 6 }));
635        assert_eq!(
636            timestamp_a.try_union(timestamp_b).unwrap(),
637            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 6 }))
638        );
639    }
640
641    #[test]
642    fn we_cannot_union_mismatched_column_bounds() {
643        let no_order = ColumnBounds::NoOrder;
644        let tinyint = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: -3, max: 3 }));
645        let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: -5, max: 5 }));
646        let int = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: -10, max: 10 }));
647        let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
648        let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
649        let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
650
651        let bounds = [
652            (no_order, "NoOrder"),
653            (tinyint, "TinyInt"),
654            (smallint, "SmallInt"),
655            (int, "Int"),
656            (bigint, "BigInt"),
657            (int128, "Int128"),
658            (timestamp, "Timestamp"),
659        ];
660
661        for ((bound_a, name_a), (bound_b, name_b)) in bounds.iter().tuple_combinations() {
662            assert!(
663                bound_a.try_union(*bound_b).is_err(),
664                "Expected error when trying to union {name_a} with {name_b}"
665            );
666            assert!(
667                bound_b.try_union(*bound_a).is_err(),
668                "Expected error when trying to union {name_b} with {name_a}"
669            );
670        }
671    }
672
673    #[test]
674    fn we_can_difference_column_bounds_with_matching_variant() {
675        let no_order = ColumnBounds::NoOrder;
676        assert_eq!(no_order.try_difference(no_order).unwrap(), no_order);
677
678        let tinyint_a = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
679        let tinyint_b = ColumnBounds::TinyInt(Bounds::Empty);
680        assert_eq!(tinyint_a.try_difference(tinyint_b).unwrap(), tinyint_a);
681
682        let smallint_a = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
683        let smallint_b = ColumnBounds::SmallInt(Bounds::Empty);
684        assert_eq!(smallint_a.try_difference(smallint_b).unwrap(), smallint_a);
685
686        let int_a = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
687        let int_b = ColumnBounds::Int(Bounds::Empty);
688        assert_eq!(int_a.try_difference(int_b).unwrap(), int_a);
689
690        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
691        let bigint_b = ColumnBounds::BigInt(Bounds::Empty);
692        assert_eq!(bigint_a.try_difference(bigint_b).unwrap(), bigint_a);
693
694        let int128_a = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 1, max: 4 }));
695        let int128_b = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 3, max: 6 }));
696        assert_eq!(
697            int128_a.try_difference(int128_b).unwrap(),
698            ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 4 }))
699        );
700
701        let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 }));
702        let timestamp_b = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 3, max: 6 }));
703        assert_eq!(
704            timestamp_a.try_difference(timestamp_b).unwrap(),
705            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 4 }))
706        );
707    }
708
709    #[test]
710    fn we_cannot_difference_mismatched_column_bounds() {
711        let no_order = ColumnBounds::NoOrder;
712        let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
713        let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
714        let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
715        let tinyint = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
716        let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
717
718        assert!(no_order.try_difference(bigint).is_err());
719        assert!(bigint.try_difference(no_order).is_err());
720
721        assert!(no_order.try_difference(int128).is_err());
722        assert!(int128.try_difference(no_order).is_err());
723
724        assert!(bigint.try_difference(int128).is_err());
725        assert!(int128.try_difference(bigint).is_err());
726
727        assert!(tinyint.try_difference(timestamp).is_err());
728        assert!(timestamp.try_difference(tinyint).is_err());
729
730        assert!(smallint.try_difference(timestamp).is_err());
731        assert!(timestamp.try_difference(smallint).is_err());
732    }
733}