Skip to main content

nodedb_cluster/distributed_document/
merge_sort.rs

1//! Distributed ORDER BY + LIMIT merge for document scans.
2//!
3//! Each shard applies ORDER BY + LIMIT locally and returns its top-N rows.
4//! The coordinator performs an N-way merge sort on the (shards × N) rows
5//! and returns the global top-N.
6//!
7//! This is NOT simple concatenation — Shard A's top-10 might all rank
8//! below Shard B's top-10 globally.
9
10use serde::{Deserialize, Serialize};
11
12/// A row from a shard, with a sort key for merge-sorting.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct ShardRow {
15    /// The row payload (JSON bytes or MessagePack).
16    pub payload: Vec<u8>,
17    /// Sort key extracted from the ORDER BY column(s).
18    /// Encoded as comparable bytes (big-endian for numbers, UTF-8 for strings).
19    pub sort_key: Vec<u8>,
20    /// Which shard produced this row.
21    pub shard_id: u16,
22}
23
24/// Sort direction for ORDER BY.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum SortDirection {
27    Ascending,
28    Descending,
29}
30
31/// N-way merge-sort merger for distributed ORDER BY + LIMIT.
32pub struct OrderByMerger {
33    /// All rows from all shards, unsorted.
34    rows: Vec<ShardRow>,
35    /// Sort direction.
36    direction: SortDirection,
37}
38
39impl OrderByMerger {
40    pub fn new(direction: SortDirection) -> Self {
41        Self {
42            rows: Vec::new(),
43            direction,
44        }
45    }
46
47    /// Add a shard's locally-sorted, locally-limited rows.
48    pub fn add_shard_rows(&mut self, rows: Vec<ShardRow>) {
49        self.rows.extend(rows);
50    }
51
52    /// Perform the global merge sort and apply the global LIMIT.
53    ///
54    /// Each shard already applied `ORDER BY + LIMIT` locally, so we have
55    /// at most `num_shards × limit` rows. The global sort + limit produces
56    /// the correct result.
57    pub fn merge(&mut self, global_limit: usize) -> Vec<ShardRow> {
58        match self.direction {
59            SortDirection::Ascending => {
60                self.rows.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
61            }
62            SortDirection::Descending => {
63                self.rows.sort_by(|a, b| b.sort_key.cmp(&a.sort_key));
64            }
65        }
66        self.rows.truncate(global_limit);
67        self.rows.clone()
68    }
69
70    /// Total rows collected before merge.
71    pub fn total_rows(&self) -> usize {
72        self.rows.len()
73    }
74}
75
76/// Encode a sort key from a typed value for byte-comparable ordering.
77///
78/// Numbers are encoded big-endian with sign flip for correct ordering.
79/// Strings are encoded as UTF-8 (natural lexicographic order).
80pub fn encode_sort_key_i64(value: i64) -> Vec<u8> {
81    // Flip sign bit so negative < positive in unsigned byte ordering.
82    let unsigned = (value as u64) ^ (1u64 << 63);
83    unsigned.to_be_bytes().to_vec()
84}
85
86pub fn encode_sort_key_f64(value: f64) -> Vec<u8> {
87    let bits = value.to_bits();
88    // IEEE 754 float ordering trick: flip all bits if negative, flip sign bit if positive.
89    let ordered = if bits >> 63 == 1 {
90        !bits // Negative: flip all bits.
91    } else {
92        bits | (1u64 << 63) // Positive: flip sign bit.
93    };
94    ordered.to_be_bytes().to_vec()
95}
96
97pub fn encode_sort_key_string(value: &str) -> Vec<u8> {
98    value.as_bytes().to_vec()
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn merge_sort_ascending() {
107        let mut merger = OrderByMerger::new(SortDirection::Ascending);
108
109        // Shard 0: ages [20, 30, 40] (locally sorted, limit 3).
110        merger.add_shard_rows(vec![
111            ShardRow {
112                payload: b"alice".to_vec(),
113                sort_key: encode_sort_key_i64(20),
114                shard_id: 0,
115            },
116            ShardRow {
117                payload: b"bob".to_vec(),
118                sort_key: encode_sort_key_i64(30),
119                shard_id: 0,
120            },
121            ShardRow {
122                payload: b"carol".to_vec(),
123                sort_key: encode_sort_key_i64(40),
124                shard_id: 0,
125            },
126        ]);
127
128        // Shard 1: ages [15, 25, 35] (locally sorted, limit 3).
129        merger.add_shard_rows(vec![
130            ShardRow {
131                payload: b"dave".to_vec(),
132                sort_key: encode_sort_key_i64(15),
133                shard_id: 1,
134            },
135            ShardRow {
136                payload: b"eve".to_vec(),
137                sort_key: encode_sort_key_i64(25),
138                shard_id: 1,
139            },
140            ShardRow {
141                payload: b"frank".to_vec(),
142                sort_key: encode_sort_key_i64(35),
143                shard_id: 1,
144            },
145        ]);
146
147        let result = merger.merge(3); // Global LIMIT 3.
148        assert_eq!(result.len(), 3);
149        // Youngest 3: dave(15), alice(20), eve(25).
150        assert_eq!(result[0].payload, b"dave");
151        assert_eq!(result[1].payload, b"alice");
152        assert_eq!(result[2].payload, b"eve");
153    }
154
155    #[test]
156    fn merge_sort_descending() {
157        let mut merger = OrderByMerger::new(SortDirection::Descending);
158
159        merger.add_shard_rows(vec![
160            ShardRow {
161                payload: b"a".to_vec(),
162                sort_key: encode_sort_key_i64(100),
163                shard_id: 0,
164            },
165            ShardRow {
166                payload: b"b".to_vec(),
167                sort_key: encode_sort_key_i64(50),
168                shard_id: 0,
169            },
170        ]);
171        merger.add_shard_rows(vec![
172            ShardRow {
173                payload: b"c".to_vec(),
174                sort_key: encode_sort_key_i64(90),
175                shard_id: 1,
176            },
177            ShardRow {
178                payload: b"d".to_vec(),
179                sort_key: encode_sort_key_i64(10),
180                shard_id: 1,
181            },
182        ]);
183
184        let result = merger.merge(2);
185        assert_eq!(result.len(), 2);
186        assert_eq!(result[0].payload, b"a"); // 100 (highest)
187        assert_eq!(result[1].payload, b"c"); // 90
188    }
189
190    #[test]
191    fn sort_key_i64_ordering() {
192        let neg = encode_sort_key_i64(-100);
193        let zero = encode_sort_key_i64(0);
194        let pos = encode_sort_key_i64(100);
195        assert!(neg < zero);
196        assert!(zero < pos);
197    }
198
199    #[test]
200    fn sort_key_f64_ordering() {
201        let neg = encode_sort_key_f64(-1.5);
202        let zero = encode_sort_key_f64(0.0);
203        let pos = encode_sort_key_f64(1.5);
204        assert!(neg < zero);
205        assert!(zero < pos);
206    }
207
208    #[test]
209    fn sort_key_string_ordering() {
210        let a = encode_sort_key_string("alice");
211        let b = encode_sort_key_string("bob");
212        assert!(a < b);
213    }
214}