flowlog_runtime/sort.rs
1//! Sort/merge helpers for `ORDER BY` / `LIMIT` on generated IDB output.
2//!
3//! Used by both binary mode (drain → sort → write file) and library mode
4//! (drain → sort → populate `BatchResults`). Pure algorithms over
5//! user-supplied comparators — no allocation beyond the inputs.
6
7use std::cmp::Ordering;
8
9/// Stream a k-way merge of pre-sorted per-worker buffers into `sink`.
10///
11/// Each `per_worker[i]` must already be sorted by `cmp`. Repeatedly pops the
12/// smallest current head across all buffers and feeds it to `sink` until all
13/// buffers are drained. Linear-scan head selection (O(k) per element) — fine
14/// for k = number of timely workers (small).
15pub fn k_way_merge<T, F, S>(per_worker: Vec<Vec<T>>, cmp: F, mut sink: S)
16where
17 F: Fn(&T, &T) -> Ordering,
18 S: FnMut(T),
19{
20 let mut iters: Vec<_> = per_worker.into_iter().map(Vec::into_iter).collect();
21 let mut heads: Vec<Option<T>> = iters.iter_mut().map(Iterator::next).collect();
22
23 while let Some(best) = heads
24 .iter()
25 .enumerate()
26 .filter_map(|(i, h)| h.as_ref().map(|v| (i, v)))
27 .min_by(|(_, a), (_, b)| cmp(a, b))
28 .map(|(i, _)| i)
29 {
30 sink(heads[best].take().unwrap());
31 heads[best] = iters[best].next();
32 }
33}
34
35/// Return the top-`k` of `rows` by `cmp`, fully sorted.
36///
37/// Uses `select_nth_unstable_by` for O(n) partitioning then sorts the
38/// retained prefix. Falls back to a full sort when `rows.len() <= k`.
39pub fn topk<T, F>(mut rows: Vec<T>, k: usize, cmp: F) -> Vec<T>
40where
41 F: Fn(&T, &T) -> Ordering,
42{
43 if k == 0 {
44 rows.clear();
45 } else if rows.len() > k {
46 rows.select_nth_unstable_by(k, |a, b| cmp(a, b));
47 rows.truncate(k);
48 }
49 rows.sort_by(|a, b| cmp(a, b));
50 rows
51}