Skip to main content

nodedb_cluster/distributed_array/
partition.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cell-to-tile partitioning for distributed array writes.
4//!
5//! Both `coord_put` and `coord_delete` receive a flat list of cells (or
6//! coord tuples) paired with their pre-computed Hilbert prefixes. This
7//! module groups them by tile bucket (the top `prefix_bits` of the Hilbert
8//! prefix) and maps each bucket to its owning vShard via
9//! `array_vshard_for_tile`.
10//!
11//! Callers pre-attach Hilbert prefixes because the query planner already
12//! holds the schema and can compute them at planning time; doing the same
13//! computation again inside the coordinator would require an additional
14//! schema lookup that `nodedb-cluster` intentionally avoids (it carries no
15//! dependency on `nodedb-array`).
16//!
17//! ## Output shape
18//!
19//! `partition_put_cells` returns a `Vec<(vshard_id, representative_hilbert_prefix,
20//! cells_msgpack)>` where `cells_msgpack` is a zerompk-encoded
21//! `Vec<raw-cell-bytes>` (the same bytes the shard handler passes to
22//! `exec_put`).
23//!
24//! `partition_delete_coords` returns a `Vec<(vshard_id, coords_msgpack)>` where
25//! `coords_msgpack` is a zerompk-encoded `Vec<raw-coord-bytes>`.
26
27use std::collections::HashMap;
28
29use crate::error::{ClusterError, Result};
30
31use super::routing::array_vshard_for_tile;
32
33/// One bucket produced by `partition_put_cells`.
34pub struct PutBucket {
35    pub vshard_id: u32,
36    /// Hilbert prefix of the first cell added to this bucket. Used by the
37    /// shard handler for routing validation (`validate_put_routing`).
38    pub representative_hilbert_prefix: u64,
39    /// zerompk-encoded `Vec<raw-cell-bytes>` for all cells in this bucket.
40    pub cells_msgpack: Vec<u8>,
41}
42
43/// One bucket produced by `partition_delete_coords`.
44pub struct DeleteBucket {
45    pub vshard_id: u32,
46    /// Hilbert prefix of the first coord added to this bucket. Used by the
47    /// shard handler for routing validation (`validate_delete_routing`).
48    pub representative_hilbert_prefix: u64,
49    /// zerompk-encoded `Vec<raw-coord-bytes>` for all coords in this bucket.
50    pub coords_msgpack: Vec<u8>,
51}
52
53/// Group `(hilbert_prefix, cell_msgpack)` pairs by tile bucket.
54///
55/// `cells` — each element is `(hilbert_prefix, zerompk-encoded single cell)`.
56/// `prefix_bits` — routing granularity (1–16).
57///
58/// Returns one `PutBucket` per unique owning vShard. The order of buckets
59/// in the output is unspecified but deterministic within a call.
60pub fn partition_put_cells(cells: &[(u64, Vec<u8>)], prefix_bits: u8) -> Result<Vec<PutBucket>> {
61    if cells.is_empty() {
62        return Ok(Vec::new());
63    }
64
65    // vshard_id → (representative_hilbert_prefix, per-cell byte blobs)
66    let mut buckets: HashMap<u32, (u64, Vec<Vec<u8>>)> = HashMap::new();
67
68    for (hilbert_prefix, cell_bytes) in cells {
69        let vshard_id = array_vshard_for_tile(*hilbert_prefix, prefix_bits)?;
70        let entry = buckets
71            .entry(vshard_id)
72            .or_insert((*hilbert_prefix, Vec::new()));
73        entry.1.push(cell_bytes.clone());
74    }
75
76    buckets
77        .into_iter()
78        .map(|(vshard_id, (representative, cell_blobs))| {
79            let cells_msgpack = encode_blob_vec(&cell_blobs)?;
80            Ok(PutBucket {
81                vshard_id,
82                representative_hilbert_prefix: representative,
83                cells_msgpack,
84            })
85        })
86        .collect()
87}
88
89/// Group `(hilbert_prefix, coord_msgpack)` pairs by tile bucket.
90///
91/// `coords` — each element is `(hilbert_prefix, zerompk-encoded single coord)`.
92/// `prefix_bits` — routing granularity (1–16).
93///
94/// Returns one `DeleteBucket` per unique owning vShard.
95pub fn partition_delete_coords(
96    coords: &[(u64, Vec<u8>)],
97    prefix_bits: u8,
98) -> Result<Vec<DeleteBucket>> {
99    if coords.is_empty() {
100        return Ok(Vec::new());
101    }
102
103    // vshard_id → (representative_hilbert_prefix, coord byte blobs)
104    let mut buckets: HashMap<u32, (u64, Vec<Vec<u8>>)> = HashMap::new();
105
106    for (hilbert_prefix, coord_bytes) in coords {
107        let vshard_id = array_vshard_for_tile(*hilbert_prefix, prefix_bits)?;
108        let entry = buckets
109            .entry(vshard_id)
110            .or_insert((*hilbert_prefix, Vec::new()));
111        entry.1.push(coord_bytes.clone());
112    }
113
114    buckets
115        .into_iter()
116        .map(|(vshard_id, (representative, coord_blobs))| {
117            let coords_msgpack = encode_blob_vec(&coord_blobs)?;
118            Ok(DeleteBucket {
119                vshard_id,
120                representative_hilbert_prefix: representative,
121                coords_msgpack,
122            })
123        })
124        .collect()
125}
126
127/// Encode a `Vec<Vec<u8>>` (collection of opaque blobs) as a zerompk array.
128///
129/// Each blob is emitted as raw msgpack bytes (already encoded by the caller)
130/// wrapped in a msgpack bin or str value. We re-encode the entire collection
131/// as a msgpack array of byte-arrays so the shard handler can deserialise it
132/// as `Vec<raw-bytes>` and forward to the engine.
133fn encode_blob_vec(blobs: &[Vec<u8>]) -> Result<Vec<u8>> {
134    // zerompk::to_msgpack_vec requires Sized, so pass as a Vec reference.
135    let owned: Vec<Vec<u8>> = blobs.to_vec();
136    zerompk::to_msgpack_vec(&owned).map_err(|e| ClusterError::Codec {
137        detail: format!("partition encode blob vec: {e}"),
138    })
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    // Build a fake (hilbert_prefix, cell_bytes) pair. The cell_bytes are
146    // opaque for the partitioner — any non-empty Vec<u8> works.
147    fn cell(prefix: u64, tag: u8) -> (u64, Vec<u8>) {
148        (prefix, vec![tag])
149    }
150
151    // Build a fake coord pair.
152    fn coord(prefix: u64, tag: u8) -> (u64, Vec<u8>) {
153        (prefix, vec![tag])
154    }
155
156    // Helper: compute expected vshard for a given hilbert prefix and prefix_bits.
157    fn expected_vshard(prefix: u64, prefix_bits: u8) -> u32 {
158        array_vshard_for_tile(prefix, prefix_bits).unwrap()
159    }
160
161    #[test]
162    fn partition_put_empty_returns_empty() {
163        let buckets = partition_put_cells(&[], 8).unwrap();
164        assert!(buckets.is_empty());
165    }
166
167    #[test]
168    fn partition_delete_empty_returns_empty() {
169        let buckets = partition_delete_coords(&[], 8).unwrap();
170        assert!(buckets.is_empty());
171    }
172
173    #[test]
174    fn partition_put_single_cell_one_bucket() {
175        // prefix_bits=10, stride=1 → vshard == bucket == top 10 bits.
176        // All-zero hilbert prefix → bucket 0 → vshard 0.
177        let cells = vec![cell(0, 0xAA)];
178        let buckets = partition_put_cells(&cells, 10).unwrap();
179        assert_eq!(buckets.len(), 1);
180        assert_eq!(buckets[0].vshard_id, 0);
181        assert_eq!(buckets[0].representative_hilbert_prefix, 0);
182    }
183
184    #[test]
185    fn partition_put_cells_by_tile_three_buckets() {
186        // With prefix_bits=10 (stride=1), every distinct top-10-bit value
187        // is its own vshard. Use three well-separated prefixes.
188        //
189        //  prefix 0x0000_0000_0000_0000 → bucket 0  → vshard 0
190        //  prefix 0x0040_0000_0000_0000 → top 10 bits = 1 → vshard 1
191        //  prefix 0x0080_0000_0000_0000 → top 10 bits = 2 → vshard 2
192        let p0 = 0x0000_0000_0000_0000u64;
193        let p1 = 0x0040_0000_0000_0000u64;
194        let p2 = 0x0080_0000_0000_0000u64;
195
196        let cells = vec![
197            cell(p0, 0x01),
198            cell(p1, 0x02),
199            cell(p0, 0x03), // same bucket as first
200            cell(p2, 0x04),
201            cell(p1, 0x05), // same bucket as second
202        ];
203
204        let mut buckets = partition_put_cells(&cells, 10).unwrap();
205        // Sort by vshard_id for deterministic assertion.
206        buckets.sort_by_key(|b| b.vshard_id);
207        assert_eq!(buckets.len(), 3);
208
209        assert_eq!(buckets[0].vshard_id, expected_vshard(p0, 10));
210        assert_eq!(buckets[1].vshard_id, expected_vshard(p1, 10));
211        assert_eq!(buckets[2].vshard_id, expected_vshard(p2, 10));
212
213        // Bucket 0 and 1 each have 2 cells, bucket 2 has 1.
214        let blobs0: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[0].cells_msgpack).unwrap();
215        let blobs1: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[1].cells_msgpack).unwrap();
216        let blobs2: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[2].cells_msgpack).unwrap();
217        assert_eq!(blobs0.len(), 2);
218        assert_eq!(blobs1.len(), 2);
219        assert_eq!(blobs2.len(), 1);
220    }
221
222    #[test]
223    fn partition_delete_coords_by_tile() {
224        let p0 = 0x0000_0000_0000_0000u64;
225        let p1 = 0x0040_0000_0000_0000u64;
226
227        let coords = vec![coord(p0, 0xAA), coord(p1, 0xBB), coord(p0, 0xCC)];
228
229        let mut buckets = partition_delete_coords(&coords, 10).unwrap();
230        buckets.sort_by_key(|b| b.vshard_id);
231        assert_eq!(buckets.len(), 2);
232        assert_eq!(buckets[0].vshard_id, expected_vshard(p0, 10));
233        assert_eq!(buckets[1].vshard_id, expected_vshard(p1, 10));
234
235        let blobs0: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[0].coords_msgpack).unwrap();
236        let blobs1: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[1].coords_msgpack).unwrap();
237        assert_eq!(blobs0.len(), 2);
238        assert_eq!(blobs1.len(), 1);
239    }
240
241    #[test]
242    fn partition_put_invalid_prefix_bits_errors() {
243        let cells = vec![cell(0, 0x01)];
244        assert!(partition_put_cells(&cells, 0).is_err());
245        assert!(partition_put_cells(&cells, 17).is_err());
246    }
247}