nodedb_cluster/distributed_document/
merge_sort.rs1use serde::{Deserialize, Serialize};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct ShardRow {
15 pub payload: Vec<u8>,
17 pub sort_key: Vec<u8>,
20 pub shard_id: u16,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum SortDirection {
27 Ascending,
28 Descending,
29}
30
31pub struct OrderByMerger {
33 rows: Vec<ShardRow>,
35 direction: SortDirection,
37}
38
39impl OrderByMerger {
40 pub fn new(direction: SortDirection) -> Self {
41 Self {
42 rows: Vec::new(),
43 direction,
44 }
45 }
46
47 pub fn add_shard_rows(&mut self, rows: Vec<ShardRow>) {
49 self.rows.extend(rows);
50 }
51
52 pub fn merge(&mut self, global_limit: usize) -> Vec<ShardRow> {
58 match self.direction {
59 SortDirection::Ascending => {
60 self.rows.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
61 }
62 SortDirection::Descending => {
63 self.rows.sort_by(|a, b| b.sort_key.cmp(&a.sort_key));
64 }
65 }
66 self.rows.truncate(global_limit);
67 self.rows.clone()
68 }
69
70 pub fn total_rows(&self) -> usize {
72 self.rows.len()
73 }
74}
75
76pub fn encode_sort_key_i64(value: i64) -> Vec<u8> {
81 let unsigned = (value as u64) ^ (1u64 << 63);
83 unsigned.to_be_bytes().to_vec()
84}
85
86pub fn encode_sort_key_f64(value: f64) -> Vec<u8> {
87 let bits = value.to_bits();
88 let ordered = if bits >> 63 == 1 {
90 !bits } else {
92 bits | (1u64 << 63) };
94 ordered.to_be_bytes().to_vec()
95}
96
97pub fn encode_sort_key_string(value: &str) -> Vec<u8> {
98 value.as_bytes().to_vec()
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn merge_sort_ascending() {
107 let mut merger = OrderByMerger::new(SortDirection::Ascending);
108
109 merger.add_shard_rows(vec![
111 ShardRow {
112 payload: b"alice".to_vec(),
113 sort_key: encode_sort_key_i64(20),
114 shard_id: 0,
115 },
116 ShardRow {
117 payload: b"bob".to_vec(),
118 sort_key: encode_sort_key_i64(30),
119 shard_id: 0,
120 },
121 ShardRow {
122 payload: b"carol".to_vec(),
123 sort_key: encode_sort_key_i64(40),
124 shard_id: 0,
125 },
126 ]);
127
128 merger.add_shard_rows(vec![
130 ShardRow {
131 payload: b"dave".to_vec(),
132 sort_key: encode_sort_key_i64(15),
133 shard_id: 1,
134 },
135 ShardRow {
136 payload: b"eve".to_vec(),
137 sort_key: encode_sort_key_i64(25),
138 shard_id: 1,
139 },
140 ShardRow {
141 payload: b"frank".to_vec(),
142 sort_key: encode_sort_key_i64(35),
143 shard_id: 1,
144 },
145 ]);
146
147 let result = merger.merge(3); assert_eq!(result.len(), 3);
149 assert_eq!(result[0].payload, b"dave");
151 assert_eq!(result[1].payload, b"alice");
152 assert_eq!(result[2].payload, b"eve");
153 }
154
155 #[test]
156 fn merge_sort_descending() {
157 let mut merger = OrderByMerger::new(SortDirection::Descending);
158
159 merger.add_shard_rows(vec![
160 ShardRow {
161 payload: b"a".to_vec(),
162 sort_key: encode_sort_key_i64(100),
163 shard_id: 0,
164 },
165 ShardRow {
166 payload: b"b".to_vec(),
167 sort_key: encode_sort_key_i64(50),
168 shard_id: 0,
169 },
170 ]);
171 merger.add_shard_rows(vec![
172 ShardRow {
173 payload: b"c".to_vec(),
174 sort_key: encode_sort_key_i64(90),
175 shard_id: 1,
176 },
177 ShardRow {
178 payload: b"d".to_vec(),
179 sort_key: encode_sort_key_i64(10),
180 shard_id: 1,
181 },
182 ]);
183
184 let result = merger.merge(2);
185 assert_eq!(result.len(), 2);
186 assert_eq!(result[0].payload, b"a"); assert_eq!(result[1].payload, b"c"); }
189
190 #[test]
191 fn sort_key_i64_ordering() {
192 let neg = encode_sort_key_i64(-100);
193 let zero = encode_sort_key_i64(0);
194 let pos = encode_sort_key_i64(100);
195 assert!(neg < zero);
196 assert!(zero < pos);
197 }
198
199 #[test]
200 fn sort_key_f64_ordering() {
201 let neg = encode_sort_key_f64(-1.5);
202 let zero = encode_sort_key_f64(0.0);
203 let pos = encode_sort_key_f64(1.5);
204 assert!(neg < zero);
205 assert!(zero < pos);
206 }
207
208 #[test]
209 fn sort_key_string_ordering() {
210 let a = encode_sort_key_string("alice");
211 let b = encode_sort_key_string("bob");
212 assert!(a < b);
213 }
214}