use std::collections::HashMap;
use crate::error::{ClusterError, Result};
use super::routing::array_vshard_for_tile;
pub struct PutBucket {
pub vshard_id: u32,
pub representative_hilbert_prefix: u64,
pub cells_msgpack: Vec<u8>,
}
pub struct DeleteBucket {
pub vshard_id: u32,
pub representative_hilbert_prefix: u64,
pub coords_msgpack: Vec<u8>,
}
pub fn partition_put_cells(cells: &[(u64, Vec<u8>)], prefix_bits: u8) -> Result<Vec<PutBucket>> {
if cells.is_empty() {
return Ok(Vec::new());
}
let mut buckets: HashMap<u32, (u64, Vec<Vec<u8>>)> = HashMap::new();
for (hilbert_prefix, cell_bytes) in cells {
let vshard_id = array_vshard_for_tile(*hilbert_prefix, prefix_bits)?;
let entry = buckets
.entry(vshard_id)
.or_insert((*hilbert_prefix, Vec::new()));
entry.1.push(cell_bytes.clone());
}
buckets
.into_iter()
.map(|(vshard_id, (representative, cell_blobs))| {
let cells_msgpack = encode_blob_vec(&cell_blobs)?;
Ok(PutBucket {
vshard_id,
representative_hilbert_prefix: representative,
cells_msgpack,
})
})
.collect()
}
pub fn partition_delete_coords(
coords: &[(u64, Vec<u8>)],
prefix_bits: u8,
) -> Result<Vec<DeleteBucket>> {
if coords.is_empty() {
return Ok(Vec::new());
}
let mut buckets: HashMap<u32, (u64, Vec<Vec<u8>>)> = HashMap::new();
for (hilbert_prefix, coord_bytes) in coords {
let vshard_id = array_vshard_for_tile(*hilbert_prefix, prefix_bits)?;
let entry = buckets
.entry(vshard_id)
.or_insert((*hilbert_prefix, Vec::new()));
entry.1.push(coord_bytes.clone());
}
buckets
.into_iter()
.map(|(vshard_id, (representative, coord_blobs))| {
let coords_msgpack = encode_blob_vec(&coord_blobs)?;
Ok(DeleteBucket {
vshard_id,
representative_hilbert_prefix: representative,
coords_msgpack,
})
})
.collect()
}
fn encode_blob_vec(blobs: &[Vec<u8>]) -> Result<Vec<u8>> {
let owned: Vec<Vec<u8>> = blobs.to_vec();
zerompk::to_msgpack_vec(&owned).map_err(|e| ClusterError::Codec {
detail: format!("partition encode blob vec: {e}"),
})
}
#[cfg(test)]
mod tests {
use super::*;
fn cell(prefix: u64, tag: u8) -> (u64, Vec<u8>) {
(prefix, vec![tag])
}
fn coord(prefix: u64, tag: u8) -> (u64, Vec<u8>) {
(prefix, vec![tag])
}
fn expected_vshard(prefix: u64, prefix_bits: u8) -> u32 {
array_vshard_for_tile(prefix, prefix_bits).unwrap()
}
#[test]
fn partition_put_empty_returns_empty() {
let buckets = partition_put_cells(&[], 8).unwrap();
assert!(buckets.is_empty());
}
#[test]
fn partition_delete_empty_returns_empty() {
let buckets = partition_delete_coords(&[], 8).unwrap();
assert!(buckets.is_empty());
}
#[test]
fn partition_put_single_cell_one_bucket() {
let cells = vec![cell(0, 0xAA)];
let buckets = partition_put_cells(&cells, 10).unwrap();
assert_eq!(buckets.len(), 1);
assert_eq!(buckets[0].vshard_id, 0);
assert_eq!(buckets[0].representative_hilbert_prefix, 0);
}
#[test]
fn partition_put_cells_by_tile_three_buckets() {
let p0 = 0x0000_0000_0000_0000u64;
let p1 = 0x0040_0000_0000_0000u64;
let p2 = 0x0080_0000_0000_0000u64;
let cells = vec![
cell(p0, 0x01),
cell(p1, 0x02),
cell(p0, 0x03), cell(p2, 0x04),
cell(p1, 0x05), ];
let mut buckets = partition_put_cells(&cells, 10).unwrap();
buckets.sort_by_key(|b| b.vshard_id);
assert_eq!(buckets.len(), 3);
assert_eq!(buckets[0].vshard_id, expected_vshard(p0, 10));
assert_eq!(buckets[1].vshard_id, expected_vshard(p1, 10));
assert_eq!(buckets[2].vshard_id, expected_vshard(p2, 10));
let blobs0: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[0].cells_msgpack).unwrap();
let blobs1: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[1].cells_msgpack).unwrap();
let blobs2: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[2].cells_msgpack).unwrap();
assert_eq!(blobs0.len(), 2);
assert_eq!(blobs1.len(), 2);
assert_eq!(blobs2.len(), 1);
}
#[test]
fn partition_delete_coords_by_tile() {
let p0 = 0x0000_0000_0000_0000u64;
let p1 = 0x0040_0000_0000_0000u64;
let coords = vec![coord(p0, 0xAA), coord(p1, 0xBB), coord(p0, 0xCC)];
let mut buckets = partition_delete_coords(&coords, 10).unwrap();
buckets.sort_by_key(|b| b.vshard_id);
assert_eq!(buckets.len(), 2);
assert_eq!(buckets[0].vshard_id, expected_vshard(p0, 10));
assert_eq!(buckets[1].vshard_id, expected_vshard(p1, 10));
let blobs0: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[0].coords_msgpack).unwrap();
let blobs1: Vec<Vec<u8>> = zerompk::from_msgpack(&buckets[1].coords_msgpack).unwrap();
assert_eq!(blobs0.len(), 2);
assert_eq!(blobs1.len(), 1);
}
#[test]
fn partition_put_invalid_prefix_bits_errors() {
let cells = vec![cell(0, 0x01)];
assert!(partition_put_cells(&cells, 0).is_err());
assert!(partition_put_cells(&cells, 17).is_err());
}
}