Skip to main content

plexus_conformance/
merge.rs

1use plexus_engine::Row;
2
3use crate::types::MergeSortDir;
4
5pub fn normalize_rows(rows: &[Row]) -> Vec<String> {
6    let mut out: Vec<String> = rows.iter().map(|r| format!("{r:?}")).collect();
7    out.sort();
8    out
9}
10
11/// Deterministically merge fanout/sharded batches using explicit key + tie-break extractors.
12/// Shard-order invariance requires `(key, tie-break)` to be a total order.
13/// If ties remain, output can depend on shard input ordering.
14pub fn deterministic_fanout_merge_by<T, K, Tiebreak, FKey, FTie>(
15    shards: &[Vec<T>],
16    sort_dir: MergeSortDir,
17    key_of: FKey,
18    tiebreak_of: FTie,
19) -> Vec<T>
20where
21    T: Clone,
22    K: Ord,
23    Tiebreak: Ord,
24    FKey: Fn(&T) -> K,
25    FTie: Fn(&T) -> Tiebreak,
26{
27    deterministic_fanout_merge_by_with_fallback(shards, sort_dir, key_of, tiebreak_of, |_| ())
28}
29
30/// Deterministically merge fanout/sharded batches with an additional fallback key.
31/// If `(key, tie-break, fallback)` is a total order, output is shard-order invariant.
32pub fn deterministic_fanout_merge_by_with_fallback<
33    T,
34    K,
35    Tiebreak,
36    Fallback,
37    FKey,
38    FTie,
39    FFallback,
40>(
41    shards: &[Vec<T>],
42    sort_dir: MergeSortDir,
43    key_of: FKey,
44    tiebreak_of: FTie,
45    fallback_of: FFallback,
46) -> Vec<T>
47where
48    T: Clone,
49    K: Ord,
50    Tiebreak: Ord,
51    Fallback: Ord,
52    FKey: Fn(&T) -> K,
53    FTie: Fn(&T) -> Tiebreak,
54    FFallback: Fn(&T) -> Fallback,
55{
56    let mut out = shards
57        .iter()
58        .flat_map(|s| s.iter().cloned())
59        .collect::<Vec<_>>();
60    out.sort_by(|a, b| {
61        let mut ord = key_of(a).cmp(&key_of(b));
62        if ord == std::cmp::Ordering::Equal {
63            ord = tiebreak_of(a).cmp(&tiebreak_of(b));
64        }
65        if ord == std::cmp::Ordering::Equal {
66            ord = fallback_of(a).cmp(&fallback_of(b));
67        }
68        match sort_dir {
69            MergeSortDir::Asc => ord,
70            MergeSortDir::Desc => ord.reverse(),
71        }
72    });
73    out
74}