use azure_core::fmt::SafeDebug;
use base64::Engine;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;
use azure_data_cosmos_driver::models::partition_key_range::PartitionKeyRange;
use crate::hash::EffectivePartitionKey;
use crate::hash::{MAX_EXCLUSIVE_EFFECTIVE_PARTITION_KEY, MIN_INCLUSIVE_EFFECTIVE_PARTITION_KEY};
use crate::routing::range::Range;
#[derive(Clone, SafeDebug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct FeedRange {
pub(crate) min_inclusive: EffectivePartitionKey,
pub(crate) max_exclusive: EffectivePartitionKey,
}
#[derive(Serialize, Deserialize)]
struct FeedRangeJson {
#[serde(rename = "Range")]
range: RangeJson,
}
#[derive(Serialize, Deserialize)]
struct RangeJson {
min: String,
max: String,
#[serde(rename = "isMinInclusive")]
is_min_inclusive: bool,
#[serde(rename = "isMaxInclusive")]
is_max_inclusive: bool,
}
impl FeedRange {
pub fn full() -> Self {
Self {
min_inclusive: EffectivePartitionKey::from(MIN_INCLUSIVE_EFFECTIVE_PARTITION_KEY),
max_exclusive: EffectivePartitionKey::from(MAX_EXCLUSIVE_EFFECTIVE_PARTITION_KEY),
}
}
pub(crate) fn is_subset_of(&self, other: &FeedRange) -> bool {
other.min_inclusive <= self.min_inclusive && other.max_exclusive >= self.max_exclusive
}
pub(crate) fn overlaps(&self, other: &FeedRange) -> bool {
self.min_inclusive < other.max_exclusive && other.min_inclusive < self.max_exclusive
}
pub(crate) fn can_merge(&self, other: &FeedRange) -> bool {
self.max_exclusive >= other.min_inclusive && other.max_exclusive >= self.min_inclusive
}
pub(crate) fn merge_with(&self, other: &FeedRange) -> FeedRange {
debug_assert!(
self.can_merge(other),
"merge_with called on disjoint ranges"
);
FeedRange {
min_inclusive: std::cmp::min(self.min_inclusive.clone(), other.min_inclusive.clone()),
max_exclusive: std::cmp::max(self.max_exclusive.clone(), other.max_exclusive.clone()),
}
}
#[allow(
dead_code,
reason = "will be used when query/change-feed gain FeedRange support"
)]
pub(crate) fn from_range(range: &Range<String>) -> azure_core::Result<Self> {
if !range.is_min_inclusive || range.is_max_inclusive {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::DataConversion,
"FeedRange requires [min, max) semantics (isMinInclusive=true, isMaxInclusive=false)",
));
}
Ok(Self {
min_inclusive: EffectivePartitionKey::from(range.min.as_str()),
max_exclusive: EffectivePartitionKey::from(range.max.as_str()),
})
}
#[allow(
dead_code,
reason = "will be used when query/change-feed gain FeedRange support"
)]
pub(crate) fn to_range(&self) -> Range<String> {
Range::new(
self.min_inclusive.as_str().to_owned(),
self.max_exclusive.as_str().to_owned(),
true,
false,
)
}
#[allow(
dead_code,
reason = "will be used when feed range methods route through the driver's routing map"
)]
pub(crate) fn from_partition_key_range(pkr: &PartitionKeyRange) -> azure_core::Result<Self> {
if pkr.min_inclusive > pkr.max_exclusive {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::DataConversion,
"partition key range min_inclusive must be <= max_exclusive",
));
}
Ok(Self {
min_inclusive: EffectivePartitionKey::from(pkr.min_inclusive.as_str()),
max_exclusive: EffectivePartitionKey::from(pkr.max_exclusive.as_str()),
})
}
pub(crate) fn from_sdk_partition_key_range(
pkr: &crate::routing::partition_key_range::PartitionKeyRange,
) -> Self {
debug_assert!(
pkr.min_inclusive.as_str() <= pkr.max_exclusive.as_str(),
"partition key range min_inclusive must be <= max_exclusive"
);
Self {
min_inclusive: EffectivePartitionKey::from(pkr.min_inclusive.as_str()),
max_exclusive: EffectivePartitionKey::from(pkr.max_exclusive.as_str()),
}
}
fn to_json(&self) -> FeedRangeJson {
FeedRangeJson {
range: RangeJson {
min: self.min_inclusive.as_str().to_owned(),
max: self.max_exclusive.as_str().to_owned(),
is_min_inclusive: true,
is_max_inclusive: false,
},
}
}
fn from_json(json: FeedRangeJson) -> azure_core::Result<Self> {
if !json.range.is_min_inclusive || json.range.is_max_inclusive {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::DataConversion,
"feed range must have [min, max) semantics (isMinInclusive=true, isMaxInclusive=false)",
));
}
let min = EffectivePartitionKey::from(json.range.min);
let max = EffectivePartitionKey::from(json.range.max);
if min > max {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::DataConversion,
"feed range min must be less than or equal to max",
));
}
Ok(Self {
min_inclusive: min,
max_exclusive: max,
})
}
}
impl fmt::Display for FeedRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let json_str = serde_json::to_string(&self.to_json()).map_err(|_| fmt::Error)?;
let encoded = base64::engine::general_purpose::STANDARD.encode(json_str.as_bytes());
f.write_str(&encoded)
}
}
impl FromStr for FeedRange {
type Err = azure_core::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let decoded_bytes = base64::engine::general_purpose::STANDARD
.decode(s)
.map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::DataConversion, e))?;
let json: FeedRangeJson = serde_json::from_slice(&decoded_bytes)
.map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::DataConversion, e))?;
Self::from_json(json)
}
}
impl Serialize for FeedRange {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.to_json().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for FeedRange {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let json = FeedRangeJson::deserialize(deserializer)?;
Self::from_json(json).map_err(|e| serde::de::Error::custom(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn full_range() {
let full = FeedRange::full();
assert_eq!(full.min_inclusive.as_str(), "");
assert_eq!(full.max_exclusive.as_str(), "FF");
}
#[test]
fn is_subset_of_full() {
let full = FeedRange::full();
let sub = FeedRange {
min_inclusive: EffectivePartitionKey::from("00"),
max_exclusive: EffectivePartitionKey::from("80"),
};
assert!(sub.is_subset_of(&full));
assert!(!full.is_subset_of(&sub));
}
#[test]
fn is_subset_of_self() {
let range = FeedRange {
min_inclusive: EffectivePartitionKey::from("20"),
max_exclusive: EffectivePartitionKey::from("80"),
};
assert!(range.is_subset_of(&range));
}
#[test]
fn overlaps_basic() {
let a = FeedRange {
min_inclusive: EffectivePartitionKey::from("00"),
max_exclusive: EffectivePartitionKey::from("50"),
};
let b = FeedRange {
min_inclusive: EffectivePartitionKey::from("30"),
max_exclusive: EffectivePartitionKey::from("80"),
};
assert!(a.overlaps(&b));
assert!(b.overlaps(&a));
}
#[test]
fn overlaps_adjacent_no_overlap() {
let a = FeedRange {
min_inclusive: EffectivePartitionKey::from("00"),
max_exclusive: EffectivePartitionKey::from("50"),
};
let b = FeedRange {
min_inclusive: EffectivePartitionKey::from("50"),
max_exclusive: EffectivePartitionKey::from("FF"),
};
assert!(!a.overlaps(&b));
assert!(!b.overlaps(&a));
}
#[test]
fn overlaps_disjoint() {
let a = FeedRange {
min_inclusive: EffectivePartitionKey::from("00"),
max_exclusive: EffectivePartitionKey::from("30"),
};
let b = FeedRange {
min_inclusive: EffectivePartitionKey::from("50"),
max_exclusive: EffectivePartitionKey::from("FF"),
};
assert!(!a.overlaps(&b));
assert!(!b.overlaps(&a));
}
#[test]
fn display_produces_expected_base64_full_range() {
let range = FeedRange {
min_inclusive: EffectivePartitionKey::from(""),
max_exclusive: EffectivePartitionKey::from("FF"),
};
assert_eq!(
range.to_string(),
"eyJSYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiIsImlzTWluSW5jbHVzaXZlIjp0cnVlLCJpc01heEluY2x1c2l2ZSI6ZmFsc2V9fQ=="
);
}
#[test]
fn display_produces_expected_base64_sub_range() {
let range = FeedRange {
min_inclusive: EffectivePartitionKey::from("3FFFFFFFFFFF"),
max_exclusive: EffectivePartitionKey::from("7FFFFFFFFFFF"),
};
assert_eq!(
range.to_string(),
"eyJSYW5nZSI6eyJtaW4iOiIzRkZGRkZGRkZGRkYiLCJtYXgiOiI3RkZGRkZGRkZGRkYiLCJpc01pbkluY2x1c2l2ZSI6dHJ1ZSwiaXNNYXhJbmNsdXNpdmUiOmZhbHNlfX0="
);
}
#[test]
fn from_str_parses_full_range() {
let input = "eyJSYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiIsImlzTWluSW5jbHVzaXZlIjp0cnVlLCJpc01heEluY2x1c2l2ZSI6ZmFsc2V9fQ==";
let range: FeedRange = input.parse().unwrap();
assert_eq!(range.min_inclusive.as_str(), "");
assert_eq!(range.max_exclusive.as_str(), "FF");
}
#[test]
fn from_str_parses_sub_range() {
let input = "eyJSYW5nZSI6eyJtaW4iOiIzRkZGRkZGRkZGRkYiLCJtYXgiOiI3RkZGRkZGRkZGRkYiLCJpc01pbkluY2x1c2l2ZSI6dHJ1ZSwiaXNNYXhJbmNsdXNpdmUiOmZhbHNlfX0=";
let range: FeedRange = input.parse().unwrap();
assert_eq!(range.min_inclusive.as_str(), "3FFFFFFFFFFF");
assert_eq!(range.max_exclusive.as_str(), "7FFFFFFFFFFF");
}
#[test]
fn serde_json_serializes_to_cross_sdk_format() {
let range = FeedRange {
min_inclusive: EffectivePartitionKey::from(""),
max_exclusive: EffectivePartitionKey::from("FF"),
};
let json = serde_json::to_string(&range).unwrap();
let value: serde_json::Value = serde_json::from_str(&json).unwrap();
let inner = value.get("Range").expect("expected 'Range' key");
assert_eq!(inner.get("min").unwrap().as_str().unwrap(), "");
assert_eq!(inner.get("max").unwrap().as_str().unwrap(), "FF");
assert!(inner.get("isMinInclusive").unwrap().as_bool().unwrap());
assert!(!inner.get("isMaxInclusive").unwrap().as_bool().unwrap());
}
#[test]
fn serde_json_deserializes_cross_sdk_format() {
let json =
r#"{"Range":{"min":"","max":"FF","isMinInclusive":true,"isMaxInclusive":false}}"#;
let range: FeedRange = serde_json::from_str(json).unwrap();
assert_eq!(range.min_inclusive.as_str(), "");
assert_eq!(range.max_exclusive.as_str(), "FF");
}
#[test]
fn from_str_invalid_base64() {
let result = "not-valid-base64!!!".parse::<FeedRange>();
assert!(result.is_err());
}
#[test]
fn from_str_invalid_json() {
let encoded = base64::engine::general_purpose::STANDARD.encode(b"not json");
let result = encoded.parse::<FeedRange>();
assert!(result.is_err());
}
#[test]
fn from_partition_key_range() {
let pkr = PartitionKeyRange::new("0".to_string(), "".to_string(), "FF".to_string());
let feed_range = FeedRange::from_partition_key_range(&pkr).unwrap();
assert_eq!(feed_range.min_inclusive.as_str(), "");
assert_eq!(feed_range.max_exclusive.as_str(), "FF");
}
#[test]
fn to_range_produces_expected_fields() {
let feed_range = FeedRange {
min_inclusive: EffectivePartitionKey::from("20"),
max_exclusive: EffectivePartitionKey::from("80"),
};
let range = feed_range.to_range();
assert_eq!(range.min, "20");
assert_eq!(range.max, "80");
assert!(range.is_min_inclusive);
assert!(!range.is_max_inclusive);
}
#[test]
fn from_range_parses_expected_fields() {
let range = Range::new("20".to_owned(), "80".to_owned(), true, false);
let feed_range = FeedRange::from_range(&range).unwrap();
assert_eq!(feed_range.min_inclusive.as_str(), "20");
assert_eq!(feed_range.max_exclusive.as_str(), "80");
}
#[test]
fn cross_sdk_compatibility() {
let full = FeedRange::full();
let serialized = full.to_string();
let decoded = base64::engine::general_purpose::STANDARD
.decode(&serialized)
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&decoded).unwrap();
let range = json.get("Range").unwrap();
assert_eq!(range.get("min").unwrap().as_str().unwrap(), "");
assert_eq!(range.get("max").unwrap().as_str().unwrap(), "FF");
assert!(range.get("isMinInclusive").unwrap().as_bool().unwrap());
assert!(!range.get("isMaxInclusive").unwrap().as_bool().unwrap());
}
#[test]
fn from_str_rejects_max_inclusive() {
let json = r#"{"Range":{"min":"","max":"FF","isMinInclusive":true,"isMaxInclusive":true}}"#;
let encoded = base64::engine::general_purpose::STANDARD.encode(json.as_bytes());
assert!(encoded.parse::<FeedRange>().is_err());
}
#[test]
fn serde_rejects_min_not_inclusive() {
let json =
r#"{"Range":{"min":"","max":"FF","isMinInclusive":false,"isMaxInclusive":false}}"#;
assert!(serde_json::from_str::<FeedRange>(json).is_err());
}
#[test]
fn from_str_rejects_inverted_range() {
let json =
r#"{"Range":{"min":"FF","max":"","isMinInclusive":true,"isMaxInclusive":false}}"#;
let encoded = base64::engine::general_purpose::STANDARD.encode(json.as_bytes());
assert!(encoded.parse::<FeedRange>().is_err());
}
#[test]
fn serde_rejects_inverted_range() {
let json =
r#"{"Range":{"min":"FF","max":"","isMinInclusive":true,"isMaxInclusive":false}}"#;
assert!(serde_json::from_str::<FeedRange>(json).is_err());
}
#[test]
fn from_range_rejects_wrong_inclusivity() {
let range = Range::new("".to_string(), "FF".to_string(), false, true);
assert!(FeedRange::from_range(&range).is_err());
}
}