proof_of_sql/base/commitment/
column_bounds.rs

1use super::committable_column::CommittableColumn;
2use alloc::boxed::Box;
3use serde::{Deserialize, Serialize};
4use snafu::Snafu;
5
6/// Cannot construct bounds where min is greater than max.
7#[derive(Snafu, Debug)]
8#[snafu(display("cannot construct bounds where min is greater than max"))]
9pub struct NegativeBounds;
10
11/// Inner value for [`Bounds::Sharp`] and [`Bounds::Bounded`].
12///
13/// Creating a separate type for this provides two benefits.
14/// 1. reduced repeated code between the two variants
15/// 2. privatization of the min/max for these variants, preventing invalid states
16#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
17pub struct BoundsInner<T>
18where
19    T: Ord,
20{
21    /// The minimum value of the data.
22    min: T,
23    /// The maximum value of the data.
24    max: T,
25}
26
27impl<T> BoundsInner<T>
28where
29    T: Ord,
30{
31    /// Construct a new [`BoundsInner`].
32    ///
33    /// Returns an error if min > max.
34    pub fn try_new(min: T, max: T) -> Result<Self, NegativeBounds> {
35        if min > max {
36            return Err(NegativeBounds);
37        }
38
39        Ok(BoundsInner { min, max })
40    }
41
42    /// Immutable accessor for the minimum value.
43    pub fn min(&self) -> &T {
44        &self.min
45    }
46
47    /// Immutable accessor for the maximum value.
48    pub fn max(&self) -> &T {
49        &self.max
50    }
51
52    /// Combine two [`Bounds`]s as if their source collections are being unioned.
53    pub fn union(self, other: BoundsInner<T>) -> Self {
54        BoundsInner {
55            min: self.min.min(other.min),
56            max: self.max.max(other.max),
57        }
58    }
59
60    /// Returns true if the value is within these bounds.
61    ///
62    /// This doesn't necessarily mean that the source collection contains this value.
63    /// However, a `false` result implies that the source collection cannot contain this value.
64    pub fn surrounds(&self, value: &T) -> bool {
65        &self.min <= value && value <= &self.max
66    }
67}
68
69/// Minimum and maximum values (inclusive) of a collection of data, with some other variants for edge cases.
70#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
71pub enum Bounds<T>
72where
73    T: Ord,
74{
75    /// The source collection is empty so has no bounds.
76    #[default]
77    Empty,
78    /// After some operation (like `Bounds::difference`), the bounds cannot be determined exactly.
79    ///
80    /// Instead, this variant underestimates the minimum and overestimates the maximum.
81    Bounded(BoundsInner<T>),
82    /// The exact bounds of the values of the source collection (inclusive).
83    Sharp(BoundsInner<T>),
84}
85
86impl<T> Bounds<T>
87where
88    T: Ord,
89{
90    /// Construct a new [`Bounds::Sharp`].
91    ///
92    /// Returns an error if min > max.
93    pub fn sharp(min: T, max: T) -> Result<Self, NegativeBounds> {
94        Ok(Bounds::Sharp(BoundsInner::try_new(min, max)?))
95    }
96
97    /// Construct a new [`Bounds::bounded`].
98    ///
99    /// Returns an error if min > max.
100    pub fn bounded(min: T, max: T) -> Result<Self, NegativeBounds> {
101        Ok(Bounds::Bounded(BoundsInner::try_new(min, max)?))
102    }
103
104    /// Combine two [`Bounds`]s as if their source collections are being unioned.
105    fn union(self, other: Bounds<T>) -> Self {
106        match (self, other) {
107            (Bounds::Sharp(bounds_a), Bounds::Sharp(bounds_b)) => {
108                Bounds::Sharp(bounds_a.union(bounds_b))
109            }
110            (Bounds::Bounded(bounds_a) | Bounds::Sharp(bounds_a), Bounds::Bounded(bounds_b))
111            | (Bounds::Bounded(bounds_a), Bounds::Sharp(bounds_b)) => {
112                Bounds::Bounded(bounds_a.union(bounds_b))
113            }
114            (bounds, Bounds::Empty) | (Bounds::Empty, bounds) => bounds,
115        }
116    }
117
118    /// Combine two [`Bounds`]s as if their source collections are being differenced.
119    ///
120    /// This should be interpreted as the set difference of the two collections.
121    /// The result would be the rows in self that are not also rows in other.
122    ///
123    /// It can't be determined *which* values are being removed from self's source collection.
124    /// So, in most cases, the resulting [`Bounds`] is [`Bounds::Bounded`].
125    /// Exceptions to this are cases where it can be determined that *no* values are removed.
126    fn difference(self, other: Bounds<T>) -> Self {
127        match (self, other) {
128            (Bounds::Empty, _) => Bounds::Empty,
129            (bounds, Bounds::Empty) => bounds,
130            (Bounds::Sharp(bounds_a), Bounds::Sharp(bounds_b) | Bounds::Bounded(bounds_b))
131                if bounds_a.max() < bounds_b.min() || bounds_b.max() < bounds_a.min() =>
132            {
133                // source collections must be disjoint, so no rows are removed
134                Bounds::Sharp(bounds_a)
135            }
136            (Bounds::Bounded(bounds) | Bounds::Sharp(bounds), _) => Bounds::Bounded(bounds),
137        }
138    }
139
140    /// Returns true if the value is within these bounds.
141    ///
142    /// This doesn't necessarily mean that the source collection contains this value.
143    /// However, a `false` result implies that the source collection cannot contain this value.
144    pub fn surrounds(&self, value: &T) -> bool {
145        match self {
146            Bounds::Empty => false,
147            Bounds::Bounded(inner) | Bounds::Sharp(inner) => inner.surrounds(value),
148        }
149    }
150}
151
152impl<'a, T> FromIterator<&'a T> for Bounds<T>
153where
154    T: Ord + Copy + 'a,
155{
156    fn from_iter<I: IntoIterator<Item = &'a T>>(iter: I) -> Self {
157        let bounds_borrowed = iter
158            .into_iter()
159            .fold(Bounds::<&T>::Empty, |bounds, element| match bounds {
160                Bounds::Sharp(BoundsInner { min, max }) => Bounds::Sharp(BoundsInner {
161                    min: min.min(element),
162                    max: max.max(element),
163                }),
164                Bounds::Empty => Bounds::Sharp(BoundsInner {
165                    min: element,
166                    max: element,
167                }),
168                Bounds::Bounded(_) => {
169                    panic!("bounds should never be bounded in this function");
170                }
171            });
172
173        // Copy only on the final bounds values
174        match bounds_borrowed {
175            Bounds::Sharp(BoundsInner { min, max }) => Bounds::Sharp(BoundsInner {
176                min: *min,
177                max: *max,
178            }),
179            Bounds::Empty => Bounds::Empty,
180            Bounds::Bounded(_) => {
181                panic!("bounds should never be bounded in this function")
182            }
183        }
184    }
185}
186
187/// Columns with different [`ColumnBounds`] variants cannot operate with each other.
188#[derive(Debug, Snafu)]
189#[snafu(display(
190    "column with bounds {bounds_a:?} cannot operate with column with bounds {bounds_b:?}"
191))]
192pub struct ColumnBoundsMismatch {
193    bounds_a: Box<ColumnBounds>,
194    bounds_b: Box<ColumnBounds>,
195}
196
197/// Column metadata storing the bounds for column types that have order.
198///
199/// Other Ord column variants do exist (like Scalar/Boolean).
200/// However, bounding these is useless unless we are performing indexing on these columns.
201/// This functionality only be considered after we support them in the user-facing sql.
202#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
203pub enum ColumnBounds {
204    /// Column does not have order.
205    NoOrder,
206    /// The bounds of a `Uint8` column.
207    Uint8(Bounds<u8>),
208    /// The bounds of a `TinyInt` column.
209    TinyInt(Bounds<i8>),
210    /// The bounds of a `SmallInt` column.
211    SmallInt(Bounds<i16>),
212    /// The bounds of an Int column.
213    Int(Bounds<i32>),
214    /// The bounds of a `BigInt` column.
215    BigInt(Bounds<i64>),
216    /// The bounds of an Int128 column.
217    Int128(Bounds<i128>),
218    /// The bounds of a Timestamp column.
219    TimestampTZ(Bounds<i64>),
220}
221
222impl ColumnBounds {
223    /// Construct a [`ColumnBounds`] from a column by reference.
224    ///
225    /// If the column variant has order, only the minimum and maximum value will be copied.
226    #[must_use]
227    pub fn from_column(column: &CommittableColumn) -> ColumnBounds {
228        match column {
229            CommittableColumn::TinyInt(ints) => ColumnBounds::TinyInt(Bounds::from_iter(*ints)),
230            CommittableColumn::Uint8(ints) => ColumnBounds::Uint8(Bounds::from_iter(*ints)),
231            CommittableColumn::SmallInt(ints) => ColumnBounds::SmallInt(Bounds::from_iter(*ints)),
232            CommittableColumn::Int(ints) => ColumnBounds::Int(Bounds::from_iter(*ints)),
233            CommittableColumn::BigInt(ints) => ColumnBounds::BigInt(Bounds::from_iter(*ints)),
234            CommittableColumn::Int128(ints) => ColumnBounds::Int128(Bounds::from_iter(*ints)),
235            CommittableColumn::TimestampTZ(_, _, times) => {
236                ColumnBounds::TimestampTZ(Bounds::from_iter(*times))
237            }
238            CommittableColumn::Boolean(_)
239            | CommittableColumn::Decimal75(_, _, _)
240            | CommittableColumn::Scalar(_)
241            | CommittableColumn::VarBinary(_)
242            | CommittableColumn::VarChar(_) => ColumnBounds::NoOrder,
243        }
244    }
245
246    /// Combine two [`ColumnBounds`] as if their source collections are being unioned.
247    ///
248    /// Can error if the two values do not share the same [`ColumnBounds`] variant.
249    pub fn try_union(self, other: Self) -> Result<Self, ColumnBoundsMismatch> {
250        match (self, other) {
251            (ColumnBounds::NoOrder, ColumnBounds::NoOrder) => Ok(ColumnBounds::NoOrder),
252            (ColumnBounds::Uint8(bounds_a), ColumnBounds::Uint8(bounds_b)) => {
253                Ok(ColumnBounds::Uint8(bounds_a.union(bounds_b)))
254            }
255            (ColumnBounds::TinyInt(bounds_a), ColumnBounds::TinyInt(bounds_b)) => {
256                Ok(ColumnBounds::TinyInt(bounds_a.union(bounds_b)))
257            }
258            (ColumnBounds::SmallInt(bounds_a), ColumnBounds::SmallInt(bounds_b)) => {
259                Ok(ColumnBounds::SmallInt(bounds_a.union(bounds_b)))
260            }
261            (ColumnBounds::Int(bounds_a), ColumnBounds::Int(bounds_b)) => {
262                Ok(ColumnBounds::Int(bounds_a.union(bounds_b)))
263            }
264            (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => {
265                Ok(ColumnBounds::BigInt(bounds_a.union(bounds_b)))
266            }
267            (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => {
268                Ok(ColumnBounds::TimestampTZ(bounds_a.union(bounds_b)))
269            }
270            (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => {
271                Ok(ColumnBounds::Int128(bounds_a.union(bounds_b)))
272            }
273            (bounds_a, bounds_b) => Err(ColumnBoundsMismatch {
274                bounds_a: Box::new(bounds_a),
275                bounds_b: Box::new(bounds_b),
276            }),
277        }
278    }
279
280    /// Combine two [`ColumnBounds`] as if their source collections are being differenced.
281    ///
282    /// This should be interpreted as the set difference of the two collections.
283    /// The result would be the rows in self that are not also rows in other.
284    pub fn try_difference(self, other: Self) -> Result<Self, ColumnBoundsMismatch> {
285        match (self, other) {
286            (ColumnBounds::NoOrder, ColumnBounds::NoOrder) => Ok(self),
287            (ColumnBounds::Uint8(bounds_a), ColumnBounds::Uint8(bounds_b)) => {
288                Ok(ColumnBounds::Uint8(bounds_a.difference(bounds_b)))
289            }
290            (ColumnBounds::TinyInt(bounds_a), ColumnBounds::TinyInt(bounds_b)) => {
291                Ok(ColumnBounds::TinyInt(bounds_a.difference(bounds_b)))
292            }
293            (ColumnBounds::SmallInt(bounds_a), ColumnBounds::SmallInt(bounds_b)) => {
294                Ok(ColumnBounds::SmallInt(bounds_a.difference(bounds_b)))
295            }
296            (ColumnBounds::Int(bounds_a), ColumnBounds::Int(bounds_b)) => {
297                Ok(ColumnBounds::Int(bounds_a.difference(bounds_b)))
298            }
299            (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => {
300                Ok(ColumnBounds::BigInt(bounds_a.difference(bounds_b)))
301            }
302            (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => {
303                Ok(ColumnBounds::Int128(bounds_a.difference(bounds_b)))
304            }
305            (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => {
306                Ok(ColumnBounds::TimestampTZ(bounds_a.difference(bounds_b)))
307            }
308            (_, _) => Err(ColumnBoundsMismatch {
309                bounds_a: Box::new(self),
310                bounds_b: Box::new(other),
311            }),
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::base::{
320        database::OwnedColumn,
321        math::decimal::Precision,
322        posql_time::{PoSQLTimeUnit, PoSQLTimeZone},
323        scalar::test_scalar::TestScalar,
324    };
325    use alloc::{string::String, vec};
326    use itertools::Itertools;
327
328    #[test]
329    fn we_can_construct_bounds_by_method() {
330        let sharp_bounds = Bounds::<i32>::sharp(-5, 10).unwrap();
331        assert_eq!(
332            sharp_bounds,
333            Bounds::Sharp(BoundsInner { min: -5, max: 10 })
334        );
335
336        let bounded_bounds = Bounds::<i32>::bounded(-15, -10).unwrap();
337        assert_eq!(
338            bounded_bounds,
339            Bounds::Bounded(BoundsInner { min: -15, max: -10 })
340        );
341    }
342
343    #[test]
344    fn we_cannot_construct_negative_bounds() {
345        let negative_sharp_bounds = Bounds::<i32>::sharp(10, 5);
346        assert!(matches!(negative_sharp_bounds, Err(NegativeBounds)));
347
348        let negative_bounded_bounds = Bounds::<i32>::bounded(-10, -15);
349        assert!(matches!(negative_bounded_bounds, Err(NegativeBounds)));
350    }
351
352    #[test]
353    fn we_can_construct_bounds_from_iterator() {
354        // empty case
355        let empty_bounds = Bounds::<i32>::from_iter([]);
356        assert_eq!(empty_bounds, Bounds::Empty);
357
358        // nonempty case
359        let ints = [1, 2, 3, 1, 0, -1];
360        let bounds = Bounds::from_iter(&ints);
361        assert_eq!(bounds, Bounds::Sharp(BoundsInner { min: -1, max: 3 }));
362    }
363
364    #[test]
365    fn we_can_determine_if_bounds_surround_value() {
366        // empty case
367        assert!(!Bounds::Empty.surrounds(&0));
368
369        let sharp = Bounds::Sharp(BoundsInner { min: 2, max: 4 });
370        assert!(!sharp.surrounds(&1));
371        assert!(sharp.surrounds(&2));
372        assert!(sharp.surrounds(&3));
373        assert!(sharp.surrounds(&4));
374        assert!(!sharp.surrounds(&5));
375
376        let bounded = Bounds::Bounded(BoundsInner { min: 2, max: 4 });
377        assert!(!bounded.surrounds(&1));
378        assert!(bounded.surrounds(&2));
379        assert!(bounded.surrounds(&3));
380        assert!(bounded.surrounds(&4));
381        assert!(!bounded.surrounds(&5));
382    }
383
384    #[test]
385    fn we_can_union_sharp_bounds() {
386        let bounds_a = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
387
388        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 2 });
389        assert_eq!(
390            bounds_a.union(bounds_b),
391            Bounds::Sharp(BoundsInner { min: 1, max: 6 })
392        );
393
394        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 4 });
395        assert_eq!(
396            bounds_a.union(bounds_b),
397            Bounds::Sharp(BoundsInner { min: 1, max: 6 })
398        );
399
400        let bounds_b = Bounds::Sharp(BoundsInner { min: 1, max: 7 });
401        assert_eq!(
402            bounds_a.union(bounds_b),
403            Bounds::Sharp(BoundsInner { min: 1, max: 7 })
404        );
405
406        let bounds_b = Bounds::Sharp(BoundsInner { min: 4, max: 5 });
407        assert_eq!(
408            bounds_a.union(bounds_b),
409            Bounds::Sharp(BoundsInner { min: 3, max: 6 })
410        );
411
412        let bounds_b = Bounds::Sharp(BoundsInner { min: 4, max: 7 });
413        assert_eq!(
414            bounds_a.union(bounds_b),
415            Bounds::Sharp(BoundsInner { min: 3, max: 7 })
416        );
417
418        let bounds_b = Bounds::Sharp(BoundsInner { min: 7, max: 8 });
419        assert_eq!(
420            bounds_a.union(bounds_b),
421            Bounds::Sharp(BoundsInner { min: 3, max: 8 })
422        );
423    }
424
425    #[test]
426    fn we_can_union_sharp_and_empty_bounds() {
427        let sharp = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
428        let empty = Bounds::Empty;
429
430        assert_eq!(sharp.union(empty), sharp);
431        assert_eq!(empty.union(sharp), sharp);
432        assert_eq!(empty.union(empty), empty);
433    }
434
435    #[test]
436    fn union_of_bounded_bounds_is_bounded() {
437        let sharp = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
438        let bounded = Bounds::Bounded(BoundsInner { min: 7, max: 10 });
439        let union = Bounds::Bounded(BoundsInner { min: 3, max: 10 });
440        let empty = Bounds::Empty;
441
442        assert_eq!(sharp.union(bounded), union);
443        assert_eq!(bounded.union(sharp), union);
444
445        assert_eq!(empty.union(bounded), bounded);
446        assert_eq!(bounded.union(empty), bounded);
447
448        assert_eq!(bounded.union(bounded), bounded);
449    }
450
451    #[test]
452    fn we_can_take_difference_of_disjoint_bounds() {
453        let bounds_a = Bounds::Sharp(BoundsInner { min: 3, max: 6 });
454        let bounds_b = Bounds::Sharp(BoundsInner { min: -6, max: -3 });
455        let bounded = Bounds::Bounded(BoundsInner { min: -6, max: -3 });
456
457        assert_eq!(bounds_a.difference(bounds_b), bounds_a);
458        assert_eq!(bounds_b.difference(bounds_a), bounds_b);
459
460        assert_eq!(bounds_a.difference(bounded), bounds_a);
461
462        let empty = Bounds::Empty;
463
464        assert_eq!(bounds_a.difference(empty), bounds_a);
465        assert_eq!(empty.difference(bounds_a), empty);
466
467        assert_eq!(empty.difference(empty), empty);
468    }
469
470    #[test]
471    fn difference_with_bounded_minuend_is_bounded() {
472        let sharp = Bounds::Sharp(BoundsInner { min: -5, max: 5 });
473        let bounded_a = Bounds::Bounded(BoundsInner { min: 6, max: 10 });
474        let bounded_b = Bounds::Bounded(BoundsInner { min: 11, max: 15 });
475        let empty = Bounds::Empty;
476
477        assert_eq!(bounded_a.difference(sharp), bounded_a);
478
479        assert_eq!(bounded_a.difference(bounded_b), bounded_a);
480        assert_eq!(bounded_b.difference(bounded_a), bounded_b);
481
482        assert_eq!(bounded_a.difference(empty), bounded_a);
483
484        // Still empty since there are still no rows in empty that are also in bounded
485        assert_eq!(empty.difference(bounded_a), empty);
486    }
487
488    #[test]
489    fn difference_of_overlapping_bounds_is_bounded() {
490        let bounds_a = BoundsInner { min: 3, max: 6 };
491        let sharp_a = Bounds::Sharp(bounds_a);
492        let bounded_a = Bounds::Bounded(bounds_a);
493
494        let bounds_b = BoundsInner { min: 1, max: 4 };
495        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
496        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
497
498        let bounds_b = BoundsInner { min: 1, max: 7 };
499        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
500        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
501
502        let bounds_b = BoundsInner { min: 4, max: 5 };
503        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
504        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
505
506        let bounds_b = BoundsInner { min: 4, max: 7 };
507        assert_eq!(sharp_a.difference(Bounds::Sharp(bounds_b)), bounded_a);
508        assert_eq!(sharp_a.difference(Bounds::Bounded(bounds_b)), bounded_a);
509    }
510
511    #[test]
512    fn we_can_construct_column_bounds_from_column() {
513        let varchar_column = OwnedColumn::<TestScalar>::VarChar(
514            ["Lorem", "ipsum", "dolor", "sit", "amet"]
515                .map(String::from)
516                .to_vec(),
517        );
518        let committable_varchar_column = CommittableColumn::from(&varchar_column);
519        let varchar_column_bounds = ColumnBounds::from_column(&committable_varchar_column);
520        assert_eq!(varchar_column_bounds, ColumnBounds::NoOrder);
521
522        let tinyint_column = OwnedColumn::<TestScalar>::TinyInt([1, 2, 3, 1, 0].to_vec());
523        let committable_tinyint_column = CommittableColumn::from(&tinyint_column);
524        let tinyint_column_bounds = ColumnBounds::from_column(&committable_tinyint_column);
525        assert_eq!(
526            tinyint_column_bounds,
527            ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
528        );
529
530        let smallint_column = OwnedColumn::<TestScalar>::SmallInt([1, 2, 3, 1, 0].to_vec());
531        let committable_smallint_column = CommittableColumn::from(&smallint_column);
532        let smallint_column_bounds = ColumnBounds::from_column(&committable_smallint_column);
533        assert_eq!(
534            smallint_column_bounds,
535            ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
536        );
537
538        let int_column = OwnedColumn::<TestScalar>::Int([1, 2, 3, 1, 0].to_vec());
539        let committable_int_column = CommittableColumn::from(&int_column);
540        let int_column_bounds = ColumnBounds::from_column(&committable_int_column);
541        assert_eq!(
542            int_column_bounds,
543            ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
544        );
545
546        let bigint_column = OwnedColumn::<TestScalar>::BigInt([1, 2, 3, 1, 0].to_vec());
547        let committable_bigint_column = CommittableColumn::from(&bigint_column);
548        let bigint_column_bounds = ColumnBounds::from_column(&committable_bigint_column);
549        assert_eq!(
550            bigint_column_bounds,
551            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
552        );
553
554        let int128_column = OwnedColumn::<TestScalar>::Int128([1, 2, 3, 1, 0].to_vec());
555        let committable_int128_column = CommittableColumn::from(&int128_column);
556        let int128_column_bounds = ColumnBounds::from_column(&committable_int128_column);
557        assert_eq!(
558            int128_column_bounds,
559            ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 0, max: 3 }))
560        );
561
562        let decimal75_column = OwnedColumn::<TestScalar>::Decimal75(
563            Precision::new(1).unwrap(),
564            0,
565            vec![
566                -TestScalar::from([1, 0, 0, 0]),
567                TestScalar::from([2, 0, 0, 0]),
568                TestScalar::from([3, 0, 0, 0]),
569            ],
570        );
571        let committable_decimal75_column = CommittableColumn::from(&decimal75_column);
572        let decimal75_column_bounds = ColumnBounds::from_column(&committable_decimal75_column);
573        assert_eq!(decimal75_column_bounds, ColumnBounds::NoOrder);
574
575        let timestamp_column = OwnedColumn::<TestScalar>::TimestampTZ(
576            PoSQLTimeUnit::Second,
577            PoSQLTimeZone::utc(),
578            vec![1_i64, 2, 3, 4],
579        );
580        let committable_timestamp_column = CommittableColumn::from(&timestamp_column);
581        let timestamp_column_bounds = ColumnBounds::from_column(&committable_timestamp_column);
582        assert_eq!(
583            timestamp_column_bounds,
584            ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 }))
585        );
586    }
587
588    #[test]
589    fn we_can_union_column_bounds_with_matching_variant() {
590        let no_order = ColumnBounds::NoOrder;
591        assert_eq!(no_order.try_union(no_order).unwrap(), no_order);
592
593        let tinyint_a = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
594        let tinyint_b = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
595        assert_eq!(
596            tinyint_a.try_union(tinyint_b).unwrap(),
597            ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
598        );
599
600        let smallint_a = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
601        let smallint_b = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
602        assert_eq!(
603            smallint_a.try_union(smallint_b).unwrap(),
604            ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
605        );
606
607        let int_a = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
608        let int_b = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
609        assert_eq!(
610            int_a.try_union(int_b).unwrap(),
611            ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
612        );
613
614        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
615        let bigint_b = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
616        assert_eq!(
617            bigint_a.try_union(bigint_b).unwrap(),
618            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
619        );
620
621        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
622        let bigint_b = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
623        assert_eq!(
624            bigint_a.try_union(bigint_b).unwrap(),
625            ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 6 }))
626        );
627
628        let int128_a = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
629        let int128_b = ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 4, max: 6 }));
630        assert_eq!(
631            int128_a.try_union(int128_b).unwrap(),
632            ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 6 }))
633        );
634
635        let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
636        let timestamp_b =
637            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 4, max: 6 }));
638        assert_eq!(
639            timestamp_a.try_union(timestamp_b).unwrap(),
640            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 6 }))
641        );
642    }
643
644    #[test]
645    fn we_cannot_union_mismatched_column_bounds() {
646        let no_order = ColumnBounds::NoOrder;
647        let tinyint = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: -3, max: 3 }));
648        let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: -5, max: 5 }));
649        let int = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: -10, max: 10 }));
650        let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
651        let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
652        let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
653
654        let bounds = [
655            (no_order, "NoOrder"),
656            (tinyint, "TinyInt"),
657            (smallint, "SmallInt"),
658            (int, "Int"),
659            (bigint, "BigInt"),
660            (int128, "Int128"),
661            (timestamp, "Timestamp"),
662        ];
663
664        for ((bound_a, name_a), (bound_b, name_b)) in bounds.iter().tuple_combinations() {
665            assert!(
666                bound_a.try_union(*bound_b).is_err(),
667                "Expected error when trying to union {name_a} with {name_b}"
668            );
669            assert!(
670                bound_b.try_union(*bound_a).is_err(),
671                "Expected error when trying to union {name_b} with {name_a}"
672            );
673        }
674    }
675
676    #[test]
677    fn we_can_difference_column_bounds_with_matching_variant() {
678        let no_order = ColumnBounds::NoOrder;
679        assert_eq!(no_order.try_difference(no_order).unwrap(), no_order);
680
681        let tinyint_a = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
682        let tinyint_b = ColumnBounds::TinyInt(Bounds::Empty);
683        assert_eq!(tinyint_a.try_difference(tinyint_b).unwrap(), tinyint_a);
684
685        let smallint_a = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
686        let smallint_b = ColumnBounds::SmallInt(Bounds::Empty);
687        assert_eq!(smallint_a.try_difference(smallint_b).unwrap(), smallint_a);
688
689        let int_a = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
690        let int_b = ColumnBounds::Int(Bounds::Empty);
691        assert_eq!(int_a.try_difference(int_b).unwrap(), int_a);
692
693        let bigint_a = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
694        let bigint_b = ColumnBounds::BigInt(Bounds::Empty);
695        assert_eq!(bigint_a.try_difference(bigint_b).unwrap(), bigint_a);
696
697        let int128_a = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 1, max: 4 }));
698        let int128_b = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 3, max: 6 }));
699        assert_eq!(
700            int128_a.try_difference(int128_b).unwrap(),
701            ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 4 }))
702        );
703
704        let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 }));
705        let timestamp_b = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 3, max: 6 }));
706        assert_eq!(
707            timestamp_a.try_difference(timestamp_b).unwrap(),
708            ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 4 }))
709        );
710    }
711
712    #[test]
713    fn we_cannot_difference_mismatched_column_bounds() {
714        let no_order = ColumnBounds::NoOrder;
715        let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
716        let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
717        let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 }));
718        let tinyint = ColumnBounds::TinyInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
719        let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 }));
720
721        assert!(no_order.try_difference(bigint).is_err());
722        assert!(bigint.try_difference(no_order).is_err());
723
724        assert!(no_order.try_difference(int128).is_err());
725        assert!(int128.try_difference(no_order).is_err());
726
727        assert!(bigint.try_difference(int128).is_err());
728        assert!(int128.try_difference(bigint).is_err());
729
730        assert!(tinyint.try_difference(timestamp).is_err());
731        assert!(timestamp.try_difference(tinyint).is_err());
732
733        assert!(smallint.try_difference(timestamp).is_err());
734        assert!(timestamp.try_difference(smallint).is_err());
735    }
736}