use crate::models::effective_partition_key::EffectivePartitionKey;
use crate::models::partition_key_range::{PartitionKeyRange, PartitionKeyRangeStatus};
use crate::models::ETag;
use std::collections::{HashMap, HashSet};
use std::ops::Range;
#[derive(Debug)]
#[non_exhaustive]
pub(crate) enum RoutingMapError {
OverlappingRanges,
IncompleteRanges,
}
impl std::fmt::Display for RoutingMapError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RoutingMapError::OverlappingRanges => write!(f, "Partition key ranges overlap"),
RoutingMapError::IncompleteRanges => {
write!(f, "Partition key ranges do not cover the full EPK space")
}
}
}
}
impl std::error::Error for RoutingMapError {}
#[derive(Debug, Clone)]
pub(crate) struct ContainerRoutingMap {
range_by_id: HashMap<String, PartitionKeyRange>,
ordered_ranges: Vec<PartitionKeyRange>,
gone_ranges: HashSet<String>,
highest_non_offline_pk_range_id: i32,
pub etag: Option<ETag>,
pub change_feed_next_if_none_match: Option<String>,
}
const INVALID_PK_RANGE_ID: i32 = -1;
impl ContainerRoutingMap {
pub fn empty() -> Self {
Self {
range_by_id: HashMap::new(),
ordered_ranges: Vec::new(),
gone_ranges: HashSet::new(),
highest_non_offline_pk_range_id: INVALID_PK_RANGE_ID,
etag: None,
change_feed_next_if_none_match: None,
}
}
pub fn try_create(
ranges: Vec<PartitionKeyRange>,
etag: Option<ETag>,
change_feed_next_if_none_match: Option<String>,
) -> Result<Option<Self>, RoutingMapError> {
if ranges.is_empty() {
return Ok(None);
}
let gone: HashSet<String> = ranges
.iter()
.filter_map(|r| r.parents.as_ref())
.flat_map(|parents| parents.iter().cloned())
.collect();
let mut filtered: Vec<PartitionKeyRange> = ranges
.into_iter()
.filter(|r| !gone.contains(&r.id))
.collect();
if filtered.is_empty() {
return Ok(None);
}
filtered.sort();
let (highest_non_offline_pk_range_id, range_by_id) =
Self::validate_and_build_index(&filtered)?;
Ok(Some(Self {
range_by_id,
ordered_ranges: filtered,
gone_ranges: gone,
highest_non_offline_pk_range_id,
etag,
change_feed_next_if_none_match,
}))
}
pub fn get_range_by_effective_partition_key(
&self,
epk: &EffectivePartitionKey,
) -> Option<&PartitionKeyRange> {
if self.ordered_ranges.is_empty() {
return None;
}
let epk_str = epk.as_str();
if epk_str.is_empty() {
return Some(&self.ordered_ranges[0]);
}
let idx = self.find_range_index(epk_str);
let range = &self.ordered_ranges[idx];
let min_ok = range.min_inclusive <= *epk;
let max_ok = *epk < range.max_exclusive;
if min_ok && max_ok {
Some(range)
} else {
None
}
}
pub fn range(&self, id: &str) -> Option<&PartitionKeyRange> {
self.range_by_id.get(id)
}
pub fn ranges(&self) -> &[PartitionKeyRange] {
&self.ordered_ranges
}
pub fn is_gone(&self, partition_key_range_id: &str) -> bool {
self.gone_ranges.contains(partition_key_range_id)
}
pub fn get_overlapping_ranges(
&self,
epk_range: Range<&EffectivePartitionKey>,
) -> Vec<&PartitionKeyRange> {
if self.ordered_ranges.is_empty() {
return Vec::new();
}
let min_epk = epk_range.start;
let max_epk = epk_range.end;
let start_idx = self.find_range_index(min_epk.as_str());
let end_idx = self.ordered_ranges[start_idx..]
.partition_point(|r| r.min_inclusive < *max_epk)
+ start_idx;
self.ordered_ranges[start_idx..end_idx].iter().collect()
}
pub fn highest_non_offline_pk_range_id(&self) -> i32 {
self.highest_non_offline_pk_range_id
}
pub fn try_combine(
&self,
new_ranges: Vec<PartitionKeyRange>,
change_feed_next_if_none_match: Option<String>,
) -> Result<Option<Self>, RoutingMapError> {
let mut combined_gone: HashSet<String> = new_ranges
.iter()
.filter_map(|r| r.parents.as_ref())
.flat_map(|parents| parents.iter().cloned())
.collect();
combined_gone.extend(self.gone_ranges.iter().cloned());
let mut merged: HashMap<String, PartitionKeyRange> = self
.range_by_id
.iter()
.filter(|(id, _)| !combined_gone.contains(*id))
.map(|(id, r)| (id.clone(), r.clone()))
.collect();
for range in new_ranges {
if !combined_gone.contains(&range.id) {
merged.insert(range.id.clone(), range);
}
}
let mut sorted: Vec<PartitionKeyRange> = merged.into_values().collect();
sorted.sort();
if sorted.is_empty() {
return Ok(None);
}
let (highest_non_offline_pk_range_id, range_by_id) =
match Self::validate_and_build_index(&sorted) {
Ok(result) => result,
Err(RoutingMapError::IncompleteRanges) => return Ok(None),
Err(e) => return Err(e),
};
Ok(Some(Self {
range_by_id,
ordered_ranges: sorted,
gone_ranges: combined_gone,
highest_non_offline_pk_range_id,
etag: self.etag.clone(),
change_feed_next_if_none_match,
}))
}
fn find_range_index(&self, epk: &str) -> usize {
let epk_val = EffectivePartitionKey::from(epk);
match self
.ordered_ranges
.binary_search_by(|r| r.min_inclusive.cmp(&epk_val))
{
Ok(i) => i, Err(i) if i > 0 => i - 1, Err(_) => unreachable!("EPK before first range; constructor guarantees full coverage"),
}
}
fn validate_and_build_index(
sorted: &[PartitionKeyRange],
) -> Result<(i32, HashMap<String, PartitionKeyRange>), RoutingMapError> {
let min_epk = EffectivePartitionKey::min();
let max_epk = EffectivePartitionKey::max();
let mut expected_min = min_epk.as_str();
for range in sorted {
match range.min_inclusive.as_str().cmp(expected_min) {
std::cmp::Ordering::Greater => return Err(RoutingMapError::IncompleteRanges),
std::cmp::Ordering::Less => return Err(RoutingMapError::OverlappingRanges),
std::cmp::Ordering::Equal => {}
}
expected_min = range.max_exclusive.as_str();
}
if expected_min != max_epk.as_str() {
return Err(RoutingMapError::IncompleteRanges);
}
let range_by_id: HashMap<String, PartitionKeyRange> =
sorted.iter().map(|r| (r.id.clone(), r.clone())).collect();
let highest_non_offline_pk_range_id = sorted
.iter()
.filter_map(|r| {
if r.status != PartitionKeyRangeStatus::Offline {
r.id.parse::<i32>().ok()
} else {
None
}
})
.max()
.unwrap_or(INVALID_PK_RANGE_ID);
Ok((highest_non_offline_pk_range_id, range_by_id))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cmp::Ordering;
fn epk(s: &str) -> EffectivePartitionKey {
EffectivePartitionKey::from(s.to_string())
}
fn make_range(
id: &str,
min_inclusive: &str,
max_exclusive: &str,
parents: Option<Vec<String>>,
) -> PartitionKeyRange {
PartitionKeyRange {
id: id.into(),
resource_id: None,
self_link: None,
etag: None,
timestamp: None,
min_inclusive: min_inclusive.into(),
max_exclusive: max_exclusive.into(),
rid_prefix: None,
throughput_fraction: 0.0,
target_throughput: None,
status: Default::default(),
lsn: 0,
parents,
owned_archival_pk_range_ids: None,
}
}
fn single_range() -> Vec<PartitionKeyRange> {
vec![make_range("0", "", "FF", None)]
}
fn three_ranges() -> Vec<PartitionKeyRange> {
vec![
make_range("1", "", "3F", Some(vec!["0".into()])),
make_range("2", "3F", "7F", Some(vec!["0".into()])),
make_range("3", "7F", "FF", Some(vec!["0".into()])),
]
}
#[test]
fn create_single_range() {
let map = ContainerRoutingMap::try_create(single_range(), None, None)
.unwrap()
.unwrap();
let ranges = map.ranges();
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0].id, "0");
assert_eq!(ranges[0].min_inclusive, "");
assert_eq!(ranges[0].max_exclusive, "FF");
}
#[test]
fn create_three_ranges() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
let ids: Vec<&str> = map.ranges().iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["1", "2", "3"]);
assert_eq!(map.ranges()[0].min_inclusive, "");
assert_eq!(map.ranges()[0].max_exclusive, "3F");
assert_eq!(map.ranges()[1].min_inclusive, "3F");
assert_eq!(map.ranges()[1].max_exclusive, "7F");
assert_eq!(map.ranges()[2].min_inclusive, "7F");
assert_eq!(map.ranges()[2].max_exclusive, "FF");
}
#[test]
fn lookup_in_single_range() {
let map = ContainerRoutingMap::try_create(single_range(), None, None)
.unwrap()
.unwrap();
let r = map
.get_range_by_effective_partition_key(&epk("7A"))
.unwrap();
assert_eq!(r.id, "0");
}
#[test]
fn lookup_in_three_ranges() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
let r = map.get_range_by_effective_partition_key(&epk("")).unwrap();
assert_eq!(r.id, "1");
let r = map
.get_range_by_effective_partition_key(&epk("20"))
.unwrap();
assert_eq!(r.id, "1");
let r = map
.get_range_by_effective_partition_key(&epk("3F"))
.unwrap();
assert_eq!(r.id, "2");
let r = map
.get_range_by_effective_partition_key(&epk("50"))
.unwrap();
assert_eq!(r.id, "2");
let r = map
.get_range_by_effective_partition_key(&epk("7F"))
.unwrap();
assert_eq!(r.id, "3");
let r = map
.get_range_by_effective_partition_key(&epk("A0"))
.unwrap();
assert_eq!(r.id, "3");
}
#[test]
fn lookup_by_id() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
let r = map.range("2").unwrap();
assert_eq!(r.id, "2");
assert_eq!(r.min_inclusive, "3F");
assert_eq!(r.max_exclusive, "7F");
assert!(map.range("0").is_none()); }
#[test]
fn incomplete_range_returns_error() {
let ranges = vec![make_range("0", "", "7F", None)];
let result = ContainerRoutingMap::try_create(ranges, None, None);
assert!(matches!(result, Err(RoutingMapError::IncompleteRanges)));
}
#[test]
fn overlapping_ranges_returns_error() {
let ranges = vec![
make_range("0", "", "80", None),
make_range("1", "7F", "FF", None), ];
let result = ContainerRoutingMap::try_create(ranges, None, None);
assert!(matches!(result, Err(RoutingMapError::OverlappingRanges)));
}
#[test]
fn filters_gone_parent_ranges() {
let mut ranges = three_ranges();
ranges.push(make_range("0", "", "FF", None));
let map = ContainerRoutingMap::try_create(ranges, None, None)
.unwrap()
.unwrap();
let ids: Vec<&str> = map.ranges().iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["1", "2", "3"]);
assert!(map.range("0").is_none());
}
#[test]
fn is_gone_tracks_parent_ranges() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
assert!(map.is_gone("0"));
assert!(!map.is_gone("1"));
assert!(!map.is_gone("2"));
assert!(!map.is_gone("3"));
}
#[test]
fn get_overlapping_ranges_full_span() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(&epk("")..&epk("FF"));
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["1", "2", "3"]);
}
#[test]
fn get_overlapping_ranges_partial() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(&epk("30")..&epk("50"));
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["1", "2"]);
}
#[test]
fn get_overlapping_ranges_single() {
let map = ContainerRoutingMap::try_create(three_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(&epk("40")..&epk("50"));
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["2"]);
}
#[test]
fn empty_input_returns_none() {
let result = ContainerRoutingMap::try_create(vec![], None, None).unwrap();
assert!(result.is_none());
}
#[test]
fn try_combine_split_produces_valid_map() {
let map = ContainerRoutingMap::try_create(single_range(), None, None)
.unwrap()
.unwrap();
let new_ranges = vec![
make_range("1", "", "7F", Some(vec!["0".into()])),
make_range("2", "7F", "FF", Some(vec!["0".into()])),
];
let merged = map
.try_combine(new_ranges, Some("new-etag".into()))
.unwrap()
.unwrap();
let ids: Vec<&str> = merged.ranges().iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["1", "2"]);
assert!(merged.is_gone("0"));
assert_eq!(
merged
.get_range_by_effective_partition_key(&epk("30"))
.unwrap()
.id,
"1"
);
assert_eq!(
merged
.get_range_by_effective_partition_key(&epk("A0"))
.unwrap()
.id,
"2"
);
}
#[test]
fn try_combine_incomplete_returns_none() {
let map = ContainerRoutingMap::try_create(single_range(), None, None)
.unwrap()
.unwrap();
let new_ranges = vec![make_range("1", "", "7F", Some(vec!["0".into()]))];
let result = map.try_combine(new_ranges, Some("etag".into())).unwrap();
assert!(result.is_none(), "Incomplete merge should return None");
}
#[test]
fn try_combine_overlapping_returns_error() {
let map = ContainerRoutingMap::try_create(single_range(), None, None)
.unwrap()
.unwrap();
let new_ranges = vec![
make_range("1", "", "80", Some(vec!["0".into()])),
make_range("2", "7F", "FF", Some(vec!["0".into()])),
];
let result = map.try_combine(new_ranges, Some("etag".into()));
assert!(matches!(result, Err(RoutingMapError::OverlappingRanges)));
}
#[test]
fn epk_cmp_equal_strings() {
assert_eq!(epk("06AB34CF").cmp(&epk("06AB34CF")), Ordering::Equal);
}
#[test]
fn epk_cmp_shorter_less_than_longer_nonzero_suffix() {
assert_eq!(
epk("06AB34CF").cmp(&epk("06AB34CF11223344")),
Ordering::Less
);
}
#[test]
fn epk_cmp_prefix_with_partial_zero_suffix_is_equal() {
assert_eq!(
epk("06AB34CF").cmp(&epk("06AB34CF00000000")),
Ordering::Equal
);
}
#[test]
fn epk_cmp_prefix_with_zero_suffix_is_equal() {
assert_eq!(
epk("06AB34CFE4E482236BCACBBF50E234AB").cmp(&epk(
"06AB34CFE4E482236BCACBBF50E234AB00000000000000000000000000000000"
)),
Ordering::Equal
);
}
#[test]
fn epk_cmp_zero_padded_first_arg() {
assert_eq!(
epk("06AB34CFE4E482236BCACBBF50E234AB00000000000000000000000000000000")
.cmp(&epk("06AB34CFE4E482236BCACBBF50E234AB")),
Ordering::Equal
);
}
#[test]
fn epk_cmp_different_prefixes() {
assert_eq!(epk("06AB34CF").cmp(&epk("07AB34CF")), Ordering::Less);
assert_eq!(epk("07AB34CF").cmp(&epk("06AB34CF")), Ordering::Greater);
}
fn hpk_ranges() -> Vec<PartitionKeyRange> {
vec![
make_range("0", "", "03559A67F2724111B5E565DFA8711A00", None),
make_range(
"1",
"03559A67F2724111B5E565DFA8711A00",
"06AB34CFE4E482236BCACBBF50E234AB00000000000000000000000000000000",
None,
),
make_range(
"2",
"06AB34CFE4E482236BCACBBF50E234AB00000000000000000000000000000000",
"0BD3FBE846AF75790CE63F78B1A81620",
None,
),
make_range(
"3",
"0BD3FBE846AF75790CE63F78B1A81620",
"0BD3FBE846AF75790CE63F78B1A8163100000000000000000000000000000000",
None,
),
make_range(
"11",
"0BD3FBE846AF75790CE63F78B1A8163100000000000000000000000000000000",
"0BD3FBE846AF75790CE63F78B1A81631FF",
None,
),
make_range(
"12",
"0BD3FBE846AF75790CE63F78B1A81631FF",
"0D4DC2CD8F49C65A8E0C5306B61B4343",
None,
),
make_range(
"4",
"0D4DC2CD8F49C65A8E0C5306B61B4343",
"0D4EC2CD8F49C65A8E0C5306B61B4343",
None,
),
make_range(
"44",
"0D4EC2CD8F49C65A8E0C5306B61B4343",
"0D5DC2CD8F49C65A8E0C5306B61B4343",
None,
),
make_range(
"24",
"0D5DC2CD8F49C65A8E0C5306B61B4343",
"0DCEB8CE51C6BFE84F4BD9409F69B9BB2164DEBD78C50C850E0C1E3E3F0579ED",
None,
),
make_range(
"5",
"0DCEB8CE51C6BFE84F4BD9409F69B9BB2164DEBD78C50C850E0C1E3E3F0579ED",
"FF",
None,
),
]
}
#[test]
fn hpk_partial_epk_on_boundary_returns_correct_range() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(
&epk("06AB34CFE4E482236BCACBBF50E234AB")..&epk("06AB34CFE4E482236BCACBBF50E234ABFF"),
);
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["2"]);
}
#[test]
fn hpk_partial_epk_boundary_second_split() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(
&epk("0BD3FBE846AF75790CE63F78B1A81631")..&epk("0BD3FBE846AF75790CE63F78B1A81631FF"),
);
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["11"]);
}
#[test]
fn hpk_full_epk_within_single_range() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(
&epk("0D4DC2CD8F49C65A8E0C5306B61B43440D4DC2CD8F49C65A8E0C5306B61B4343")
..&epk("0D4DC2CD8F49C65A8E0C5306B61B43440D4DC2CD8F49C65A8E0C5306B61B4344"),
);
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["4"]);
}
#[test]
fn hpk_range_inside_range_3() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(
&epk("0BD3FBE846AF75790CE63F78B1A81620")..&epk("0BD3FBE846AF75790CE63F78B1A81631"),
);
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["3"]);
}
#[test]
fn hpk_partial_epk_spans_two_ranges() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(
&epk("0DCEB8CE51C6BFE84F4BD9409F69B9BB")..&epk("0DCEB8CE51C6BFE84F4BD9409F69B9BBFF"),
);
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["24", "5"]);
}
#[test]
fn hpk_partial_point_epk_in_middle() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let r = map
.get_range_by_effective_partition_key(&epk("02559A67F2724111B5E565DFA8711A00"))
.unwrap();
assert_eq!(r.id, "0");
}
#[test]
fn hpk_partial_point_epk_in_partial_range() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let r = map
.get_range_by_effective_partition_key(&epk("0D4DC2CD8F49C65A8E0C5306B61B4345"))
.unwrap();
assert_eq!(r.id, "4");
}
#[test]
fn hpk_full_epk_against_partial_backend_range() {
let map = ContainerRoutingMap::try_create(hpk_ranges(), None, None)
.unwrap()
.unwrap();
let overlapping = map.get_overlapping_ranges(
&epk("0D4DC2CD8F49C65A8E0C5306B61B434300000000000000000000000000000000")
..&epk("0D4EC2CD8F49C65A8E0C5306B61B434300000000000000000000000000000000"),
);
let ids: Vec<&str> = overlapping.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["4"]);
}
}