proof_of_sql/base/commitment/
column_bounds.rs

1use super::committable_column::CommittableColumn;
2use alloc::boxed::Box;
3use serde::{Deserialize, Serialize};
4use snafu::Snafu;
5
6/// Cannot construct bounds where min is greater than max.
7#[derive(Snafu, Debug)]
8#[snafu(display("cannot construct bounds where min is greater than max"))]
9pub struct NegativeBounds;
10
11/// Inner value for [`Bounds::Sharp`] and [`Bounds::Bounded`].
12///
13/// Creating a separate type for this provides two benefits.
14/// 1. reduced repeated code between the two variants
15/// 2. privatization of the min/max for these variants, preventing invalid states
16#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
17pub struct BoundsInner<T>
18where
19    T: Ord,
20{
21    /// The minimum value of the data.
22    min: T,
23    /// The maximum value of the data.
24    max: T,
25}
26
27impl<T> BoundsInner<T>
28where
29    T: Ord,
30{
31    /// Construct a new [`BoundsInner`].
32    ///
33    /// Returns an error if min > max.
34    pub fn try_new(min: T, max: T) -> Result<Self, NegativeBounds> {
35        if min > max {
36            return Err(NegativeBounds);
37        }
38
39        Ok(BoundsInner { min, max })
40    }
41
42    /// Immutable accessor for the minimum value.
43    pub fn min(&self) -> &T {
44        &self.min
45    }
46
47    /// Immutable accessor for the maximum value.
48    pub fn max(&self) -> &T {
49        &self.max
50    }
51
52    /// Combine two [`Bounds`]s as if their source collections are being unioned.
53    pub fn union(self, other: BoundsInner<T>) -> Self {
54        BoundsInner {
55            min: self.min.min(other.min),
56            max: self.max.max(other.max),
57        }
58    }
59
60    /// Returns true if the value is within these bounds.
61    ///
62    /// This doesn't necessarily mean that the source collection contains this value.
63    /// However, a `false` result implies that the source collection cannot contain this value.
64    pub fn surrounds(&self, value: &T) -> bool {
65        &self.min <= value && value <= &self.max
66    }
67}
68
69/// Minimum and maximum values (inclusive) of a collection of data, with some other variants for edge cases.
70#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
71pub enum Bounds<T>
72where
73    T: Ord,
74{
75    /// The source collection is empty so has no bounds.
76    #[default]
77    Empty,
78    /// After some operation (like `Bounds::difference`), the bounds cannot be determined exactly.
79    ///
80    /// Instead, this variant underestimates the minimum and overestimates the maximum.
81    Bounded(BoundsInner<T>),
82    /// The exact bounds of the values of the source collection (inclusive).
83    Sharp(BoundsInner<T>),
84}
85
86impl<T> Bounds<T>
87where
88    T: Ord,
89{
90    /// Construct a new [`Bounds::Sharp`].
91    ///
92    /// Returns an error if min > max.
93    pub fn sharp(min: T, max: T) -> Result<Self, NegativeBounds> {
94        Ok(Bounds::Sharp(BoundsInner::try_new(min, max)?))
95    }
96
97    /// Construct a new [`Bounds::bounded`].
98    ///
99    /// Returns an error if min > max.
100    pub fn bounded(min: T, max: T) -> Result<Self, NegativeBounds> {
101        Ok(Bounds::Bounded(BoundsInner::try_new(min, max)?))
102    }
103
104    /// Combine two [`Bounds`]s as if their source collections are being unioned.
105    fn union(self, other: Bounds<T>) -> Self {
106        match (self, other) {
107            (Bounds::Sharp(bounds_a), Bounds::Sharp(bounds_b)) => {
108                Bounds::Sharp(bounds_a.union(bounds_b))
109            }
110            (Bounds::Bounded(bounds_a) | Bounds::Sharp(bounds_a), Bounds::Bounded(bounds_b))
111            | (Bounds::Bounded(bounds_a), Bounds::Sharp(bounds_b)) => {
112                Bounds::Bounded(bounds_a.union(bounds_b))
113            }
114            (bounds, Bounds::Empty) | (Bounds::Empty, bounds) => bounds,
115        }
116    }
117
118    /// Combine two [`Bounds`]s as if their source collections are being differenced.
119    ///
120    /// This should be interpreted as the set difference of the two collections.
121    /// The result would be the rows in self that are not also rows in other.
122    ///
123    /// It can't be determined *which* values are being removed from self's source collection.
124    /// So, in most cases, the resulting [`Bounds`] is [`Bounds::Bounded`].
125    /// Exceptions to this are cases where it can be determined that *no* values are removed.
126    fn difference(self, other: Bounds<T>) -> Self {
127        match (self, other) {
128            (Bounds::Empty, _) => Bounds::Empty,
129            (bounds, Bounds::Empty) => bounds,
130            (Bounds::Sharp(bounds_a), Bounds::Sharp(bounds_b) | Bounds::Bounded(bounds_b))
131                if bounds_a.max() < bounds_b.min() || bounds_b.max() < bounds_a.min() =>
132            {
133                // source collections must be disjoint, so no rows are removed
134                Bounds::Sharp(bounds_a)
135            }
136            (Bounds::Bounded(bounds) | Bounds::Sharp(bounds), _) => Bounds::Bounded(bounds),
137        }
138    }
139
140    /// Returns true if the value is within these bounds.
141    ///
142    /// This doesn't necessarily mean that the source collection contains this value.
143    /// However, a `false` result implies that the source collection cannot contain this value.
144    pub fn surrounds(&self, value: &T) -> bool {
145        match self {
146            Bounds::Empty => false,
147            Bounds::Bounded(inner) | Bounds::Sharp(inner) => inner.surrounds(value),
148        }
149    }
150}
151
152impl<'a, T> FromIterator<&'a T> for Bounds<T>
153where
154    T: Ord + Copy + 'a,
155{
156    fn from_iter<I: IntoIterator<Item = &'a T>>(iter: I) -> Self {
157        let bounds_borrowed = iter
158            .into_iter()
159            .fold(Bounds::<&T>::Empty, |bounds, element| match bounds {
160                Bounds::Sharp(BoundsInner { min, max }) => Bounds::Sharp(BoundsInner {
161                    min: min.min(element),
162                    max: max.max(element),
163                }),
164                Bounds::Empty => Bounds::Sharp(BoundsInner {
165                    min: element,
166                    max: element,
167                }),
168                Bounds::Bounded(_) => {
169                    panic!("bounds should never be bounded in this function");
170                }
171            });
172
173        // Copy only on the final bounds values
174        match bounds_borrowed {
175            Bounds::Sharp(BoundsInner { min, max }) => Bounds::Sharp(BoundsInner {
176                min: *min,
177                max: *max,
178            }),
179            Bounds::Empty => Bounds::Empty,
180            Bounds::Bounded(_) => {
181                panic!("bounds should never be bounded in this function")
182            }
183        }
184    }
185}
186
187/// Columns with different [`ColumnBounds`] variants cannot operate with each other.
188#[derive(Debug, Snafu)]
189#[snafu(display(
190    "column with bounds {bounds_a:?} cannot operate with column with bounds {bounds_b:?}"
191))]
192pub struct ColumnBoundsMismatch {
193    bounds_a: Box<ColumnBounds>,
194    bounds_b: Box<ColumnBounds>,
195}
196
197/// Column metadata storing the bounds for column types that have order.
198///
199/// Other Ord column variants do exist (like Scalar/Boolean).
200/// However, bounding these is useless unless we are performing indexing on these columns.
201/// This functionality only be considered after we support them in the user-facing sql.
202#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
203pub enum ColumnBounds {
204    /// Column does not have order.
205    NoOrder,
206    /// The bounds of a `Uint8` column.
207    Uint8(Bounds<u8>),
208    /// The bounds of a `TinyInt` column.
209    TinyInt(Bounds<i8>),
210    /// The bounds of a `SmallInt` column.
211    SmallInt(Bounds<i16>),
212    /// The bounds of an Int column.
213    Int(Bounds<i32>),
214    /// The bounds of a `BigInt` column.
215    BigInt(Bounds<i64>),
216    /// The bounds of an Int128 column.
217    Int128(Bounds<i128>),
218    /// The bounds of a Timestamp column.
219    TimestampTZ(Bounds<i64>),
220}
221
222impl ColumnBounds {
223    /// Construct a [`ColumnBounds`] from a column by reference.
224    ///
225    /// If the column variant has order, only the minimum and maximum value will be copied.
226    #[must_use]
227    pub fn from_column(column: &CommittableColumn) -> ColumnBounds {
228        match column {
229            CommittableColumn::TinyInt(ints) => ColumnBounds::TinyInt(Bounds::from_iter(*ints)),
230            CommittableColumn::Uint8(ints) => ColumnBounds::Uint8(Bounds::from_iter(*ints)),
231            CommittableColumn::SmallInt(ints) => ColumnBounds::SmallInt(Bounds::from_iter(*ints)),
232            CommittableColumn::Int(ints) => ColumnBounds::Int(Bounds::from_iter(*ints)),
233            CommittableColumn::BigInt(ints) => ColumnBounds::BigInt(Bounds::from_iter(*ints)),
234            CommittableColumn::Int128(ints) => ColumnBounds::Int128(Bounds::from_iter(*ints)),
235            CommittableColumn::TimestampTZ(_, _, times) => {
236                ColumnBounds::TimestampTZ(Bounds::from_iter(*times))
237            }
238            CommittableColumn::Boolean(_)
239            | CommittableColumn::Decimal75(_, _, _)
240            | CommittableColumn::Scalar(_)
241            | CommittableColumn::VarBinary(_)
242            | CommittableColumn::VarChar(_) => ColumnBounds::NoOrder,
243        }
244    }
245
246    /// Combine two [`ColumnBounds`] as if their source collections are being unioned.
247    ///
248    /// Can error if the two values do not share the same [`ColumnBounds`] variant.
249    pub fn try_union(self, other: Self) -> Result<Self, ColumnBoundsMismatch> {
250        match (self, other) {
251            (ColumnBounds::NoOrder, ColumnBounds::NoOrder) => Ok(ColumnBounds::NoOrder),
252            (ColumnBounds::Uint8(bounds_a), ColumnBounds::Uint8(bounds_b)) => {
253                Ok(ColumnBounds::Uint8(bounds_a.union(bounds_b)))
254            }
255            (ColumnBounds::TinyInt(bounds_a), ColumnBounds::TinyInt(bounds_b)) => {
256                Ok(ColumnBounds::TinyInt(bounds_a.union(bounds_b)))
257            }
258            (ColumnBounds::SmallInt(bounds_a), ColumnBounds::SmallInt(bounds_b)) => {
259                Ok(ColumnBounds::SmallInt(bounds_a.union(bounds_b)))
260            }
261            (ColumnBounds::Int(bounds_a), ColumnBounds::Int(bounds_b)) => {
262                Ok(ColumnBounds::Int(bounds_a.union(bounds_b)))
263            }
264            (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => {
265                Ok(ColumnBounds::BigInt(bounds_a.union(bounds_b)))
266            }
267            (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => {
268                Ok(ColumnBounds::TimestampTZ(bounds_a.union(bounds_b)))
269            }
270            (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => {
271                Ok(ColumnBounds::Int128(bounds_a.union(bounds_b)))
272            }
273            (bounds_a, bounds_b) => Err(ColumnBoundsMismatch {
274                bounds_a: Box::new(bounds_a),
275                bounds_b: Box::new(bounds_b),
276            }),
277        }
278    }
279
280    /// Combine two [`ColumnBounds`] as if their source collections are being differenced.
281    ///
282    /// This should be interpreted as the set difference of the two collections.
283    /// The result would be the rows in self that are not also rows in other.
284    pub fn try_difference(self, other: Self) -> Result<Self, ColumnBoundsMismatch> {
285        match (self, other) {
286            (ColumnBounds::NoOrder, ColumnBounds::NoOrder) => Ok(self),
287            (ColumnBounds::Uint8(bounds_a), ColumnBounds::Uint8(bounds_b)) => {
288                Ok(ColumnBounds::Uint8(bounds_a.difference(bounds_b)))
289            }
290            (ColumnBounds::TinyInt(bounds_a), ColumnBounds::TinyInt(bounds_b)) => {
291                Ok(ColumnBounds::TinyInt(bounds_a.difference(bounds_b)))
292            }
293            (ColumnBounds::SmallInt(bounds_a), ColumnBounds::SmallInt(bounds_b)) => {
294                Ok(ColumnBounds::SmallInt(bounds_a.difference(bounds_b)))
295            }
296            (ColumnBounds::Int(bounds_a), ColumnBounds::Int(bounds_b)) => {
297                Ok(ColumnBounds::Int(bounds_a.difference(bounds_b)))
298            }
299            (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => {
300                Ok(ColumnBounds::BigInt(bounds_a.difference(bounds_b)))
301            }
302            (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => {
303                Ok(ColumnBounds::Int128(bounds_a.difference(bounds_b)))
304            }
305            (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => {
306                Ok(ColumnBounds::TimestampTZ(bounds_a.difference(bounds_b)))
307            }
308            (_, _) => Err(ColumnBoundsMismatch {
309                bounds_a: Box::new(self),
310                bounds_b: Box::new(other),
311            }),
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::base::{
320        database::OwnedColumn, math::decimal::Precision, scalar::test_scalar::TestScalar,
321    };
322    use alloc::{string::String, vec};
323    use itertools::Itertools;
324    use proof_of_sql_parser::posql_time::{PoSQLTimeUnit, PoSQLTimeZone};
325
326    #[test]
327    fn we_can_construct_bounds_by_method() {
328        let sharp_bounds = Bounds::<i32>::sharp(-5, 10).unwrap();
329        assert_eq!(
330            sharp_bounds,
331            Bounds::Sharp(BoundsInner { min: -5, max: 10 })
332        );
333
334        let bounded_bounds = Bounds::<i32>::bounded(-15, -10).unwrap();
335        assert_eq!(
336            bounded_bounds,
337            Bounds::Bounded(BoundsInner { min: -15, max: -10 })
338        );
339    }
340
341    #[test]
342    fn we_cannot_construct_negative_bounds() {
343        let negative_sharp_bounds = Bounds::<i32>::sharp(10, 5);
344        assert!(matches!(negative_sharp_bounds, Err(NegativeBounds)));
345
346        let negative_bounded_bounds = Bounds::<i32>::bounded(-10, -15);
347        assert!(matches!(negative_bounded_bounds, Err(NegativeBounds)));
348    }
349
350    #[test]
351    fn we_can_construct_bounds_from_iterator() {
352        // empty case
353        let empty_bounds = Bounds::<i32>::from_iter([]);
354        assert_eq!(empty_bounds, Bounds::Empty);
355
356        // nonempty case
357        let ints = [1, 2, 3, 1, 0, -1];
358        let bounds = Bounds::from_iter(&ints);
359        assert_eq!(bounds, Bounds::Sharp(BoundsInner { min: -1, max: 3 }));
360    }
361
362    #[test]
363    fn we_can_determine_if_bounds_surround_value() {
364        // empty case
365        assert!(!Bounds::Empty.surrounds(&0));
366
367        let sharp = Bounds::Sharp(BoundsInner { min: 2, max: 4 });
368        assert!(!sharp.surrounds(&1));
369        assert!(sharp.surrounds(&2));
370        assert!(sharp.surrounds(&3));
371        assert!(sharp.surrounds(&4));
372        assert!(!sharp.surrounds(&5));
373
374        let bounded = Bounds::Bounded(BoundsInner { min: 2, max: 4 });
375        assert!(!bounded.surrounds(&1));
376        assert!(bounded.surrounds(&2));
377        assert!(bounded.surrounds(&3));
378        assert!(bounded.surrounds(&4));
379        assert!(!bounded.surrounds(&5));
380    }
381
382    #[test]
383    fn we_can_union_sharp_bounds() {
384        let bounds_a = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
385
386        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 2 });
387        assert_eq!(
388            bounds_a.union(bounds_b),
389            Bounds::Sharp(BoundsInner { min: 1, max: 6 })
390        );
391
392        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 4 });
393        assert_eq!(
394            bounds_a.union(bounds_b),
395            Bounds::Sharp(BoundsInner { min: 1, max: 6 })
396        );
397
398        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 7 });
399        assert_eq!(
400            bounds_a.union(bounds_b),
401            Bounds::Sharp(BoundsInner { min: 1, max: 7 })
402        );
403
404        let bounds_b = Bounds::Sharp(BoundsInner { min: 4, max: 5 });
405        assert_eq!(
406            bounds_a.union(bounds_b),
407            Bounds::Sharp(BoundsInner { min: 3, max: 6 })
408        );
409
410        let bounds_b = Bounds::Sharp(BoundsInner { min: 4, max: 7 });
411        assert_eq!(
412            bounds_a.union(bounds_b),
413            Bounds::Sharp(BoundsInner { min: 3, max: 7 })
414        );
415
416        let bounds_b = Bounds::Sharp(BoundsInner { min: 7, max: 8 });
417        assert_eq!(
418            bounds_a.union(bounds_b),
419            Bounds::Sharp(BoundsInner { min: 3, max: 8 })
420        );
421    }
422
423    #[test]
424    fn we_can_union_sharp_and_empty_bounds() {
425        let sharp = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
426        let empty = Bounds::Empty;
427
428        assert_eq!(sharp.union(empty), sharp);
429        assert_eq!(empty.union(sharp), sharp);
430        assert_eq!(empty.union(empty), empty);
431    }
432
433    #[test]
434    fn union_of_bounded_bounds_is_bounded() {
435        let sharp = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
436        let bounded = Bounds::Bounded(BoundsInner { min: 7, max: 10 });
437        let union = Bounds::Bounded(BoundsInner { min: 3, max: 10 });
438        let empty = Bounds::Empty;
439
440        assert_eq!(sharp.union(bounded), union);
441        assert_eq!(bounded.union(sharp), union);
442
443        assert_eq!(empty.union(bounded), bounded);
444        assert_eq!(bounded.union(empty), bounded);
445
446        assert_eq!(bounded.union(bounded), bounded);
447    }
448
449    #[test]
450    fn we_can_take_difference_of_disjoint_bounds() {
451        let bounds_a = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
452        let bounds_b = Bounds::Sharp(BoundsInner { min: -6, max: -3 });
453        let bounded = Bounds::Bounded(BoundsInner { min: -6, max: -3 });
454
455        assert_eq!(bounds_a.difference(bounds_b), bounds_a);
456        assert_eq!(bounds_b.difference(bounds_a), bounds_b);
457
458        assert_eq!(bounds_a.difference(bounded), bounds_a);
459
460        let empty = Bounds::Empty;
461
462        assert_eq!(bounds_a.difference(empty), bounds_a);
463        assert_eq!(empty.difference(bounds_a), empty);
464
465        assert_eq!(empty.difference(empty), empty);
466    }
467
468    #[test]
469    fn difference_with_bounded_minuend_is_bounded() {
470        let sharp = Bounds::Sharp(BoundsInner { min: -5, max: 5 });
471        let bounded_a = Bounds::Bounded(BoundsInner { min: 6, max: 10 });
472        let bounded_b = Bounds::Bounded(BoundsInner { min: 11, max: 15 });
473        let empty = Bounds::Empty;
474
475        assert_eq!(bounded_a.difference(sharp), bounded_a);
476
477        assert_eq!(bounded_a.difference(bounded_b), bounded_a);
478        assert_eq!(bounded_b.difference(bounded_a), bounded_b);
479
480        assert_eq!(bounded_a.difference(empty), bounded_a);
481
482        // Still empty since there are still no rows in empty that are also in bounded
483        assert_eq!(empty.difference(bounded_a), empty);
484    }
485
486    #[test]
487    fn difference_of_overlapping_bounds_is_bounded() {
488        let bounds_a = BoundsInner { min: 3, max: 6 };
489        let sharp_a = Bounds::Sharp(bounds_a);
490        let bounded_a = Bounds::Bounded(bounds_a);
491
492        let bounds_b = BoundsInner { min: 1, max: 4 };
493        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
494        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
495
496        let bounds_b = BoundsInner { min: 1, max: 7 };
497        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
498        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
499
500        let bounds_b = BoundsInner { min: 4, max: 5 };
501        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
502        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
503
504        let bounds_b = BoundsInner { min: 4, max: 7 };
505        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
506        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
507    }
508
509    #[test]
510    fn we_can_construct_column_bounds_from_column() {
511        let varchar_column = OwnedColumn::<TestScalar>::VarChar(
512            ["Lorem", "ipsum", "dolor", "sit", "amet"]
513                .map(String::from)
514                .to_vec(),
515        );
516        let committable_varchar_column = CommittableColumn::from(&varchar_column);
517        let varchar_column_bounds = ColumnBounds::from_column(&committable_varchar_column);
518        assert_eq!(varchar_column_bounds, ColumnBounds::NoOrder);
519
520        let tinyint_column = OwnedColumn::<TestScalar>::TinyInt([1, 2, 3, 1, 0].to_vec());
521        let committable_tinyint_column = CommittableColumn::from(&tinyint_column);
522        let tinyint_column_bounds = ColumnBounds::from_column(&committable_tinyint_column);
523        assert_eq!(
524            tinyint_column_bounds,
525            ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
526        );
527
528        let smallint_column = OwnedColumn::<TestScalar>::SmallInt([1, 2, 3, 1, 0].to_vec());
529        let committable_smallint_column = CommittableColumn::from(&smallint_column);
530        let smallint_column_bounds = ColumnBounds::from_column(&committable_smallint_column);
531        assert_eq!(
532            smallint_column_bounds,
533            ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
534        );
535
536        let int_column = OwnedColumn::<TestScalar>::Int([1, 2, 3, 1, 0].to_vec());
537        let committable_int_column = CommittableColumn::from(&int_column);
538        let int_column_bounds = ColumnBounds::from_column(&committable_int_column);
539        assert_eq!(
540            int_column_bounds,
541            ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
542        );
543
544        let bigint_column = OwnedColumn::<TestScalar>::BigInt([1, 2, 3, 1, 0].to_vec());
545        let committable_bigint_column = CommittableColumn::from(&bigint_column);
546        let bigint_column_bounds = ColumnBounds::from_column(&committable_bigint_column);
547        assert_eq!(
548            bigint_column_bounds,
549            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
550        );
551
552        let int128_column = OwnedColumn::<TestScalar>::Int128([1, 2, 3, 1, 0].to_vec());
553        let committable_int128_column = CommittableColumn::from(&int128_column);
554        let int128_column_bounds = ColumnBounds::from_column(&committable_int128_column);
555        assert_eq!(
556            int128_column_bounds,
557            ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
558        );
559
560        let decimal75_column = OwnedColumn::<TestScalar>::Decimal75(
561            Precision::new(1).unwrap(),
562            0,
563            vec![
564                -TestScalar::from([1, 0, 0, 0]),
565                TestScalar::from([2, 0, 0, 0]),
566                TestScalar::from([3, 0, 0, 0]),
567            ],
568        );
569        let committable_decimal75_column = CommittableColumn::from(&decimal75_column);
570        let decimal75_column_bounds = ColumnBounds::from_column(&committable_decimal75_column);
571        assert_eq!(decimal75_column_bounds, ColumnBounds::NoOrder);
572
573        let timestamp_column = OwnedColumn::<TestScalar>::TimestampTZ(
574            PoSQLTimeUnit::Second,
575            PoSQLTimeZone::utc(),
576            vec![1_i64, 2, 3, 4],
577        );
578        let committable_timestamp_column = CommittableColumn::from(&timestamp_column);
579        let timestamp_column_bounds = ColumnBounds::from_column(&committable_timestamp_column);
580        assert_eq!(
581            timestamp_column_bounds,
582            ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 }))
583        );
584    }
585
586    #[test]
587    fn we_can_union_column_bounds_with_matching_variant() {
588        let no_order = ColumnBounds::NoOrder;
589        assert_eq!(no_order.try_union(no_order).unwrap(), no_order);
590
591        let tinyint_a = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
592        let tinyint_b = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
593        assert_eq!(
594            tinyint_a.try_union(tinyint_b).unwrap(),
595            ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
596        );
597
598        let smallint_a = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
599        let smallint_b = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
600        assert_eq!(
601            smallint_a.try_union(smallint_b).unwrap(),
602            ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
603        );
604
605        let int_a = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
606        let int_b = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
607        assert_eq!(
608            int_a.try_union(int_b).unwrap(),
609            ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
610        );
611
612        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
613        let bigint_b = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
614        assert_eq!(
615            bigint_a.try_union(bigint_b).unwrap(),
616            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
617        );
618
619        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
620        let bigint_b = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
621        assert_eq!(
622            bigint_a.try_union(bigint_b).unwrap(),
623            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
624        );
625
626        let int128_a = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
627        let int128_b = ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 4, max: 6 }));
628        assert_eq!(
629            int128_a.try_union(int128_b).unwrap(),
630            ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 6 }))
631        );
632
633        let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
634        let timestamp_b =
635            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 4, max: 6 }));
636        assert_eq!(
637            timestamp_a.try_union(timestamp_b).unwrap(),
638            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 6 }))
639        );
640    }
641
642    #[test]
643    fn we_cannot_union_mismatched_column_bounds() {
644        let no_order = ColumnBounds::NoOrder;
645        let tinyint = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: -3, max: 3 }));
646        let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: -5, max: 5 }));
647        let int = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: -10, max: 10 }));
648        let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
649        let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
650        let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
651
652        let bounds = [
653            (no_order, "NoOrder"),
654            (tinyint, "TinyInt"),
655            (smallint, "SmallInt"),
656            (int, "Int"),
657            (bigint, "BigInt"),
658            (int128, "Int128"),
659            (timestamp, "Timestamp"),
660        ];
661
662        for ((bound_a, name_a), (bound_b, name_b)) in bounds.iter().tuple_combinations() {
663            assert!(
664                bound_a.try_union(*bound_b).is_err(),
665                "Expected error when trying to union {name_a} with {name_b}"
666            );
667            assert!(
668                bound_b.try_union(*bound_a).is_err(),
669                "Expected error when trying to union {name_b} with {name_a}"
670            );
671        }
672    }
673
674    #[test]
675    fn we_can_difference_column_bounds_with_matching_variant() {
676        let no_order = ColumnBounds::NoOrder;
677        assert_eq!(no_order.try_difference(no_order).unwrap(), no_order);
678
679        let tinyint_a = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
680        let tinyint_b = ColumnBounds::TinyInt(Bounds::Empty);
681        assert_eq!(tinyint_a.try_difference(tinyint_b).unwrap(), tinyint_a);
682
683        let smallint_a = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
684        let smallint_b = ColumnBounds::SmallInt(Bounds::Empty);
685        assert_eq!(smallint_a.try_difference(smallint_b).unwrap(), smallint_a);
686
687        let int_a = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
688        let int_b = ColumnBounds::Int(Bounds::Empty);
689        assert_eq!(int_a.try_difference(int_b).unwrap(), int_a);
690
691        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
692        let bigint_b = ColumnBounds::BigInt(Bounds::Empty);
693        assert_eq!(bigint_a.try_difference(bigint_b).unwrap(), bigint_a);
694
695        let int128_a = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 1, max: 4 }));
696        let int128_b = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 3, max: 6 }));
697        assert_eq!(
698            int128_a.try_difference(int128_b).unwrap(),
699            ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 4 }))
700        );
701
702        let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 }));
703        let timestamp_b = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 3, max: 6 }));
704        assert_eq!(
705            timestamp_a.try_difference(timestamp_b).unwrap(),
706            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 4 }))
707        );
708    }
709
710    #[test]
711    fn we_cannot_difference_mismatched_column_bounds() {
712        let no_order = ColumnBounds::NoOrder;
713        let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
714        let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
715        let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
716        let tinyint = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
717        let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
718
719        assert!(no_order.try_difference(bigint).is_err());
720        assert!(bigint.try_difference(no_order).is_err());
721
722        assert!(no_order.try_difference(int128).is_err());
723        assert!(int128.try_difference(no_order).is_err());
724
725        assert!(bigint.try_difference(int128).is_err());
726        assert!(int128.try_difference(bigint).is_err());
727
728        assert!(tinyint.try_difference(timestamp).is_err());
729        assert!(timestamp.try_difference(tinyint).is_err());
730
731        assert!(smallint.try_difference(timestamp).is_err());
732        assert!(timestamp.try_difference(smallint).is_err());
733    }
734}