pub(crate) trait RangePartitionedKeySpace {
fn partitions(&self) -> usize;
fn partition_first_key(&self, partition: usize) -> &[u8];
}
fn partition_point<T: RangePartitionedKeySpace, P: Fn(&[u8]) -> bool>(
keyspace: &T,
pred: P,
) -> usize {
if keyspace.partitions() == 0 {
return 0;
}
let mut low = 0;
let mut high = keyspace.partitions() - 1;
let mut part_point = 0;
while low <= high {
let mid = low + (high - low) / 2;
let mid_part_first_key = keyspace.partition_first_key(mid);
if pred(mid_part_first_key) {
low = mid + 1;
part_point = mid + 1;
} else if mid > low {
high = mid - 1;
} else {
break;
}
}
part_point
}
pub(crate) fn first_partition_including_or_after_key<T: RangePartitionedKeySpace>(
keyspace: &T,
key: &[u8],
) -> usize {
first_partition_including_key(keyspace, key).unwrap_or(0)
}
pub(crate) fn first_partition_including_key<T: RangePartitionedKeySpace>(
keyspace: &T,
key: &[u8],
) -> Option<usize> {
let part_point = partition_point(keyspace, |first_key| first_key < key);
if part_point > 0 {
return Some(part_point - 1);
}
if keyspace.partition_first_key(0) == key {
return Some(0);
}
None
}
pub(crate) fn last_partition_including_key<T: RangePartitionedKeySpace>(
keyspace: &T,
key: &[u8],
) -> Option<usize> {
let part_point = partition_point(keyspace, |first_key| first_key <= key);
if part_point == 0 {
return None;
}
Some(part_point - 1)
}
#[cfg(test)]
mod tests {
use crate::partitioned_keyspace::{
first_partition_including_key, last_partition_including_key, partition_point,
RangePartitionedKeySpace,
};
use rstest::rstest;
#[test]
fn test_partition_point() {
for len in 0..20 {
let mut keyspace = TestKeyspace {
partitions: vec![b"x"; len],
};
assert_eq!(0, partition_point(&keyspace, |k| k.is_empty()));
for i in 0..len {
keyspace.partitions[i] = b"";
let partition_point = partition_point(&keyspace, |k| k.is_empty());
assert_eq!(i + 1, partition_point);
}
}
}
struct FirstPartitionIncludingKeyTestCase {
first_keys: Vec<&'static [u8]>,
}
#[rstest]
#[case::one_partition(FirstPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa"]
})]
#[case::two_partitions(FirstPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb"]
})]
#[case::three_partitions(FirstPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb", b"cccccc"]
})]
#[case::five_partitions(FirstPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb", b"cccccccc", b"ddd", b"eeeee"]
})]
#[case::five_partitions_same_first_key(FirstPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb", b"bbbb", b"bbbb", b"bbbb"]
})]
#[case::all_same_first_key(FirstPartitionIncludingKeyTestCase{
first_keys: vec![b"bbbb", b"bbbb", b"bbbb"]
})]
fn test_first_partition_including_key(#[case] case: FirstPartitionIncludingKeyTestCase) {
let first_keys = &case.first_keys;
let keyspace = TestKeyspace {
partitions: case.first_keys.clone(),
};
let found_partition = first_partition_including_key(&keyspace, b"\x00\x00\x00");
assert_eq!(None, found_partition);
let mut prev_key = None;
let mut expected_found_partition = 0;
for i in 0..first_keys.len() {
let key = first_keys[i];
let found_partition = first_partition_including_key(&keyspace, key);
expected_found_partition = match prev_key {
Some(prev_key) if prev_key == key => expected_found_partition,
_ => {
if i == 0 {
0
} else {
i - 1
}
}
};
assert_eq!(Some(expected_found_partition), found_partition);
prev_key = Some(key);
if i == first_keys.len() - 1 || key != first_keys[i + 1] {
let key = [key, b"bla"].concat();
let found_partition = first_partition_including_key(&keyspace, &key);
assert_eq!(Some(i), found_partition);
}
}
}
struct LastPartitionIncludingKeyTestCase {
first_keys: Vec<&'static [u8]>,
}
#[rstest]
#[case::one_partition(LastPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa"]
})]
#[case::two_partitions(LastPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb"]
})]
#[case::three_partitions(LastPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb", b"cccccc"]
})]
#[case::five_partitions(LastPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb", b"cccccccc", b"ddd", b"eeeee"]
})]
#[case::five_partitions_same_first_key(LastPartitionIncludingKeyTestCase{
first_keys: vec![b"aaaa", b"bbbb", b"bbbb", b"bbbb", b"bbbb"]
})]
#[case::all_same_first_key(LastPartitionIncludingKeyTestCase{
first_keys: vec![b"bbbb", b"bbbb", b"bbbb"]
})]
fn test_last_partition_including_key(#[case] case: LastPartitionIncludingKeyTestCase) {
let first_keys = &case.first_keys;
let keyspace = TestKeyspace {
partitions: case.first_keys.clone(),
};
let found_partition = last_partition_including_key(&keyspace, b"\x00\x00\x00");
assert_eq!(None, found_partition);
for i in 0..first_keys.len() {
let key = first_keys[i];
let found_partition = last_partition_including_key(&keyspace, key);
let mut expected_found_partition = i;
for p in i..first_keys.len() {
if keyspace.partition_first_key(p) == key {
expected_found_partition = p;
}
}
assert_eq!(Some(expected_found_partition), found_partition);
if i == first_keys.len() - 1 || key != first_keys[i + 1] {
let key = [key, b"bla"].concat();
let found_partition = last_partition_including_key(&keyspace, &key);
assert_eq!(Some(i), found_partition);
}
}
}
struct TestKeyspace<'a> {
partitions: Vec<&'a [u8]>,
}
impl RangePartitionedKeySpace for TestKeyspace<'_> {
fn partitions(&self) -> usize {
self.partitions.len()
}
fn partition_first_key(&self, partition: usize) -> &[u8] {
self.partitions[partition]
}
}
}