plexus_conformance/
merge.rs1use plexus_engine::Row;
2
3use crate::types::MergeSortDir;
4
5pub fn normalize_rows(rows: &[Row]) -> Vec<String> {
6 let mut out: Vec<String> = rows.iter().map(|r| format!("{r:?}")).collect();
7 out.sort();
8 out
9}
10
11pub fn deterministic_fanout_merge_by<T, K, Tiebreak, FKey, FTie>(
15 shards: &[Vec<T>],
16 sort_dir: MergeSortDir,
17 key_of: FKey,
18 tiebreak_of: FTie,
19) -> Vec<T>
20where
21 T: Clone,
22 K: Ord,
23 Tiebreak: Ord,
24 FKey: Fn(&T) -> K,
25 FTie: Fn(&T) -> Tiebreak,
26{
27 deterministic_fanout_merge_by_with_fallback(shards, sort_dir, key_of, tiebreak_of, |_| ())
28}
29
30pub fn deterministic_fanout_merge_by_with_fallback<
33 T,
34 K,
35 Tiebreak,
36 Fallback,
37 FKey,
38 FTie,
39 FFallback,
40>(
41 shards: &[Vec<T>],
42 sort_dir: MergeSortDir,
43 key_of: FKey,
44 tiebreak_of: FTie,
45 fallback_of: FFallback,
46) -> Vec<T>
47where
48 T: Clone,
49 K: Ord,
50 Tiebreak: Ord,
51 Fallback: Ord,
52 FKey: Fn(&T) -> K,
53 FTie: Fn(&T) -> Tiebreak,
54 FFallback: Fn(&T) -> Fallback,
55{
56 let mut out = shards
57 .iter()
58 .flat_map(|s| s.iter().cloned())
59 .collect::<Vec<_>>();
60 out.sort_by(|a, b| {
61 let mut ord = key_of(a).cmp(&key_of(b));
62 if ord == std::cmp::Ordering::Equal {
63 ord = tiebreak_of(a).cmp(&tiebreak_of(b));
64 }
65 if ord == std::cmp::Ordering::Equal {
66 ord = fallback_of(a).cmp(&fallback_of(b));
67 }
68 match sort_dir {
69 MergeSortDir::Asc => ord,
70 MergeSortDir::Desc => ord.reverse(),
71 }
72 });
73 out
74}