nodedb_cluster/distributed_array/
partition.rs1use std::collections::HashMap;
28
29use crate::error::{ClusterError, Result};
30
31use super::routing::array_vshard_for_tile;
32
33pub struct PutBucket {
35 pub vshard_id: u32,
36 pub representative_hilbert_prefix: u64,
39 pub cells_msgpack: Vec<u8>,
41}
42
43pub struct DeleteBucket {
45 pub vshard_id: u32,
46 pub representative_hilbert_prefix: u64,
49 pub coords_msgpack: Vec<u8>,
51}
52
53pub fn partition_put_cells(cells: &[(u64, Vec<u8>)], prefix_bits: u8) -> Result<Vec<PutBucket>> {
61 if cells.is_empty() {
62 return Ok(Vec::new());
63 }
64
65 let mut buckets: HashMap<u32, (u64, Vec<Vec<u8>>)> = HashMap::new();
67
68 for (hilbert_prefix, cell_bytes) in cells {
69 let vshard_id = array_vshard_for_tile(*hilbert_prefix, prefix_bits)?;
70 let entry = buckets
71 .entry(vshard_id)
72 .or_insert((*hilbert_prefix, Vec::new()));
73 entry.1.push(cell_bytes.clone());
74 }
75
76 buckets
77 .into_iter()
78 .map(|(vshard_id, (representative, cell_blobs))| {
79 let cells_msgpack = encode_blob_vec(&cell_blobs)?;
80 Ok(PutBucket {
81 vshard_id,
82 representative_hilbert_prefix: representative,
83 cells_msgpack,
84 })
85 })
86 .collect()
87}
88
89pub fn partition_delete_coords(
96 coords: &[(u64, Vec<u8>)],
97 prefix_bits: u8,
98) -> Result<Vec<DeleteBucket>> {
99 if coords.is_empty() {
100 return Ok(Vec::new());
101 }
102
103 let mut buckets: HashMap<u32, (u64, Vec<Vec<u8>>)> = HashMap::new();
105
106 for (hilbert_prefix, coord_bytes) in coords {
107 let vshard_id = array_vshard_for_tile(*hilbert_prefix, prefix_bits)?;
108 let entry = buckets
109 .entry(vshard_id)
110 .or_insert((*hilbert_prefix, Vec::new()));
111 entry.1.push(coord_bytes.clone());
112 }
113
114 buckets
115 .into_iter()
116 .map(|(vshard_id, (representative, coord_blobs))| {
117 let coords_msgpack = encode_blob_vec(&coord_blobs)?;
118 Ok(DeleteBucket {
119 vshard_id,
120 representative_hilbert_prefix: representative,
121 coords_msgpack,
122 })
123 })
124 .collect()
125}
126
127fn encode_blob_vec(blobs: &[Vec<u8>]) -> Result<Vec<u8>> {
134 let owned: Vec<Vec<u8>> = blobs.to_vec();
136 zerompk::to_msgpack_vec(&owned).map_err(|e| ClusterError::Codec {
137 detail: format!("partition encode blob vec: {e}"),
138 })
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 fn cell(prefix: u64, tag: u8) -> (u64, Vec<u8>) {
148 (prefix, vec![tag])
149 }
150
151 fn coord(prefix: u64, tag: u8) -> (u64, Vec<u8>) {
153 (prefix, vec![tag])
154 }
155
156 fn expected_vshard(prefix: u64, prefix_bits: u8) -> u32 {
158 array_vshard_for_tile(prefix, prefix_bits).unwrap()
159 }
160
161 #[test]
162 fn partition_put_empty_returns_empty() {
163 let buckets = partition_put_cells(&[], 8).unwrap();
164 assert!(buckets.is_empty());
165 }
166
167 #[test]
168 fn partition_delete_empty_returns_empty() {
169 let buckets = partition_delete_coords(&[], 8).unwrap();
170 assert!(buckets.is_empty());
171 }
172
173 #[test]
174 fn partition_put_single_cell_one_bucket() {
175 let cells = vec![cell(0, 0xAA)];
178 let buckets = partition_put_cells(&cells, 10).unwrap();
179 assert_eq!(buckets.len(), 1);
180 assert_eq!(buckets[0].vshard_id, 0);
181 assert_eq!(buckets[0].representative_hilbert_prefix, 0);
182 }
183
184 #[test]
185 fn partition_put_cells_by_tile_three_buckets() {
186 let p0 = 0x0000_0000_0000_0000u64;
193 let p1 = 0x0040_0000_0000_0000u64;
194 let p2 = 0x0080_0000_0000_0000u64;
195
196 let cells = vec![
197 cell(p0, 0x01),
198 cell(p1, 0x02),
199 cell(p0, 0x03), cell(p2, 0x04),
201 cell(p1, 0x05), ];
203
204 let mut buckets = partition_put_cells(&cells, 10).unwrap();
205 buckets.sort_by_key(|b| b.vshard_id);
207 assert_eq!(buckets.len(), 3);
208
209 assert_eq!(buckets[0].vshard_id, expected_vshard(p0, 10));
210 assert_eq!(buckets[1].vshard_id, expected_vshard(p1, 10));
211 assert_eq!(buckets[2].vshard_id, expected_vshard(p2, 10));
212
213 let blobs0: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[0].cells_msgpack).unwrap();
215 let blobs1: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[1].cells_msgpack).unwrap();
216 let blobs2: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[2].cells_msgpack).unwrap();
217 assert_eq!(blobs0.len(), 2);
218 assert_eq!(blobs1.len(), 2);
219 assert_eq!(blobs2.len(), 1);
220 }
221
222 #[test]
223 fn partition_delete_coords_by_tile() {
224 let p0 = 0x0000_0000_0000_0000u64;
225 let p1 = 0x0040_0000_0000_0000u64;
226
227 let coords = vec![coord(p0, 0xAA), coord(p1, 0xBB), coord(p0, 0xCC)];
228
229 let mut buckets = partition_delete_coords(&coords, 10).unwrap();
230 buckets.sort_by_key(|b| b.vshard_id);
231 assert_eq!(buckets.len(), 2);
232 assert_eq!(buckets[0].vshard_id, expected_vshard(p0, 10));
233 assert_eq!(buckets[1].vshard_id, expected_vshard(p1, 10));
234
235 let blobs0: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[0].coords_msgpack).unwrap();
236 let blobs1: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[1].coords_msgpack).unwrap();
237 assert_eq!(blobs0.len(), 2);
238 assert_eq!(blobs1.len(), 1);
239 }
240
241 #[test]
242 fn partition_put_invalid_prefix_bits_errors() {
243 let cells = vec![cell(0, 0x01)];
244 assert!(partition_put_cells(&cells, 0).is_err());
245 assert!(partition_put_cells(&cells, 17).is_err());
246 }
247}