use crate::{
models::{ContainerReference, ThroughputControlGroupName},
options::PriorityLevel,
};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
#[derive(Clone, Copy, Debug, PartialEq)]
#[non_exhaustive]
pub enum ThroughputTarget {
Absolute(u32),
Threshold(f64),
}
#[derive(Clone, Debug)]
pub(crate) struct ClientSideMutableValues {
target_throughput: ThroughputTarget,
priority_level: Option<PriorityLevel>,
}
impl ClientSideMutableValues {
pub(crate) fn new(
target_throughput: ThroughputTarget,
priority_level: Option<PriorityLevel>,
) -> Self {
Self {
target_throughput,
priority_level,
}
}
pub(crate) fn target_throughput(&self) -> ThroughputTarget {
self.target_throughput
}
pub(crate) fn priority_level(&self) -> Option<PriorityLevel> {
self.priority_level
}
}
#[derive(Clone, Debug)]
pub(crate) struct ServerSideThroughputBucketMutableValues {
throughput_bucket: u32,
}
impl ServerSideThroughputBucketMutableValues {
pub(crate) fn new(throughput_bucket: u32) -> Self {
Self { throughput_bucket }
}
pub(crate) fn throughput_bucket(&self) -> u32 {
self.throughput_bucket
}
}
#[derive(Clone, Debug)]
pub(crate) struct ServerSidePriorityMutableValues {
priority_level: PriorityLevel,
}
impl ServerSidePriorityMutableValues {
pub(crate) fn new(priority_level: PriorityLevel) -> Self {
Self { priority_level }
}
pub(crate) fn priority_level(&self) -> PriorityLevel {
self.priority_level
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
#[allow(private_interfaces)] pub enum ThroughputControlGroupOptions {
ClientSide {
name: ThroughputControlGroupName,
container: ContainerReference,
is_default: bool,
mutable: Arc<RwLock<ClientSideMutableValues>>,
},
ServerSideThroughputBucket {
name: ThroughputControlGroupName,
container: ContainerReference,
is_default: bool,
mutable: Arc<RwLock<ServerSideThroughputBucketMutableValues>>,
},
ServerSidePriorityBasedThrottling {
name: ThroughputControlGroupName,
container: ContainerReference,
is_default: bool,
mutable: Arc<RwLock<ServerSidePriorityMutableValues>>,
},
}
impl ThroughputControlGroupOptions {
pub fn client_side(
name: impl Into<ThroughputControlGroupName>,
container: ContainerReference,
target_throughput: ThroughputTarget,
priority_level: Option<PriorityLevel>,
is_default: bool,
) -> Self {
Self::ClientSide {
name: name.into(),
container,
is_default,
mutable: Arc::new(RwLock::new(ClientSideMutableValues::new(
target_throughput,
priority_level,
))),
}
}
pub fn server_side_throughput_bucket(
name: impl Into<ThroughputControlGroupName>,
container: ContainerReference,
throughput_bucket: u32,
is_default: bool,
) -> Self {
Self::ServerSideThroughputBucket {
name: name.into(),
container,
is_default,
mutable: Arc::new(RwLock::new(ServerSideThroughputBucketMutableValues::new(
throughput_bucket,
))),
}
}
pub fn server_side_priority_based_throttling(
name: impl Into<ThroughputControlGroupName>,
container: ContainerReference,
priority_level: PriorityLevel,
is_default: bool,
) -> Self {
Self::ServerSidePriorityBasedThrottling {
name: name.into(),
container,
is_default,
mutable: Arc::new(RwLock::new(ServerSidePriorityMutableValues::new(
priority_level,
))),
}
}
pub fn name(&self) -> &ThroughputControlGroupName {
match self {
Self::ClientSide { name, .. } => name,
Self::ServerSideThroughputBucket { name, .. } => name,
Self::ServerSidePriorityBasedThrottling { name, .. } => name,
}
}
pub fn container(&self) -> &ContainerReference {
match self {
Self::ClientSide { container, .. } => container,
Self::ServerSideThroughputBucket { container, .. } => container,
Self::ServerSidePriorityBasedThrottling { container, .. } => container,
}
}
pub fn is_default(&self) -> bool {
match self {
Self::ClientSide { is_default, .. } => *is_default,
Self::ServerSideThroughputBucket { is_default, .. } => *is_default,
Self::ServerSidePriorityBasedThrottling { is_default, .. } => *is_default,
}
}
pub fn key(&self) -> ThroughputControlGroupKey {
ThroughputControlGroupKey {
container: self.container().clone(),
name: self.name().clone(),
}
}
pub fn target_throughput(&self) -> Option<ThroughputTarget> {
match self {
Self::ClientSide { mutable, .. } => Some(mutable.read().unwrap().target_throughput()),
_ => None,
}
}
pub fn set_target_throughput(&self, target: ThroughputTarget) {
if let Self::ClientSide { mutable, .. } = self {
mutable.write().unwrap().target_throughput = target;
}
}
pub fn throughput_bucket(&self) -> Option<u32> {
match self {
Self::ServerSideThroughputBucket { mutable, .. } => {
Some(mutable.read().unwrap().throughput_bucket())
}
_ => None,
}
}
pub fn set_throughput_bucket(&self, bucket: u32) {
if let Self::ServerSideThroughputBucket { mutable, .. } = self {
mutable.write().unwrap().throughput_bucket = bucket;
}
}
pub fn priority_level(&self) -> Option<PriorityLevel> {
match self {
Self::ClientSide { mutable, .. } => mutable.read().unwrap().priority_level(),
Self::ServerSidePriorityBasedThrottling { mutable, .. } => {
Some(mutable.read().unwrap().priority_level())
}
Self::ServerSideThroughputBucket { .. } => None,
}
}
pub fn set_priority_level(&self, level: PriorityLevel) {
match self {
Self::ClientSide { mutable, .. } => {
mutable.write().unwrap().priority_level = Some(level);
}
Self::ServerSidePriorityBasedThrottling { mutable, .. } => {
mutable.write().unwrap().priority_level = level;
}
Self::ServerSideThroughputBucket { .. } => {}
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ThroughputControlGroupKey {
pub container: ContainerReference,
pub name: ThroughputControlGroupName,
}
impl ThroughputControlGroupKey {
pub fn new(container: ContainerReference, name: impl Into<ThroughputControlGroupName>) -> Self {
Self {
container,
name: name.into(),
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct ThroughputControlGroupSnapshot {
pub name: ThroughputControlGroupName,
pub container: ContainerReference,
pub is_default: bool,
pub target_throughput: Option<ThroughputTarget>,
pub throughput_bucket: Option<u32>,
pub priority_level: Option<PriorityLevel>,
pub is_client_side: bool,
}
impl ThroughputControlGroupSnapshot {
pub fn new(
name: ThroughputControlGroupName,
container: ContainerReference,
is_default: bool,
is_client_side: bool,
) -> Self {
Self {
name,
container,
is_default,
target_throughput: None,
throughput_bucket: None,
priority_level: None,
is_client_side,
}
}
pub fn with_target_throughput(mut self, target: ThroughputTarget) -> Self {
self.target_throughput = Some(target);
self
}
pub fn with_throughput_bucket(mut self, bucket: u32) -> Self {
self.throughput_bucket = Some(bucket);
self
}
pub fn with_priority_level(mut self, level: PriorityLevel) -> Self {
self.priority_level = Some(level);
self
}
}
impl From<&ThroughputControlGroupOptions> for ThroughputControlGroupSnapshot {
fn from(group: &ThroughputControlGroupOptions) -> Self {
let mut snapshot = Self::new(
group.name().clone(),
group.container().clone(),
group.is_default(),
matches!(group, ThroughputControlGroupOptions::ClientSide { .. }),
);
snapshot.target_throughput = group.target_throughput();
snapshot.throughput_bucket = group.throughput_bucket();
snapshot.priority_level = group.priority_level();
snapshot
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum ThroughputControlGroupRegistrationError {
DuplicateGroup(ThroughputControlGroupKey),
DuplicateDefault {
container: ContainerReference,
existing_default: ThroughputControlGroupName,
},
}
impl std::fmt::Display for ThroughputControlGroupRegistrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicateGroup(key) => {
write!(
f,
"Throughput control group '{}' already registered for container",
key.name
)
}
Self::DuplicateDefault {
existing_default, ..
} => {
write!(
f,
"Container already has a default throughput control group: '{}'",
existing_default
)
}
}
}
}
impl std::error::Error for ThroughputControlGroupRegistrationError {}
#[non_exhaustive]
#[derive(Clone, Debug, Default)]
pub struct ThroughputControlGroupRegistry {
groups: HashMap<ThroughputControlGroupKey, Arc<ThroughputControlGroupOptions>>,
defaults: HashMap<ContainerReference, ThroughputControlGroupName>,
}
impl ThroughputControlGroupRegistry {
pub fn new() -> Self {
Self::default()
}
#[allow(clippy::result_large_err)]
pub fn register(
&mut self,
group: ThroughputControlGroupOptions,
) -> Result<(), ThroughputControlGroupRegistrationError> {
let key = group.key();
if self.groups.contains_key(&key) {
return Err(ThroughputControlGroupRegistrationError::DuplicateGroup(key));
}
if group.is_default() {
if let Some(existing_default) = self.defaults.get(group.container()) {
return Err(ThroughputControlGroupRegistrationError::DuplicateDefault {
container: group.container().clone(),
existing_default: existing_default.clone(),
});
}
self.defaults
.insert(group.container().clone(), group.name().clone());
}
self.groups.insert(key, Arc::new(group));
Ok(())
}
pub fn get(
&self,
key: &ThroughputControlGroupKey,
) -> Option<&Arc<ThroughputControlGroupOptions>> {
self.groups.get(key)
}
pub fn get_by_container_and_name(
&self,
container: &ContainerReference,
name: &ThroughputControlGroupName,
) -> Option<&Arc<ThroughputControlGroupOptions>> {
let key = ThroughputControlGroupKey {
container: container.clone(),
name: name.clone(),
};
self.groups.get(&key)
}
pub fn get_default_for_container(
&self,
container: &ContainerReference,
) -> Option<&Arc<ThroughputControlGroupOptions>> {
self.defaults.get(container).and_then(|name| {
let key = ThroughputControlGroupKey {
container: container.clone(),
name: name.clone(),
};
self.groups.get(&key)
})
}
pub fn groups_for_container(
&self,
container: &ContainerReference,
) -> Vec<&Arc<ThroughputControlGroupOptions>> {
self.groups
.iter()
.filter(|(key, _)| &key.container == container)
.map(|(_, group)| group)
.collect()
}
pub fn len(&self) -> usize {
self.groups.len()
}
pub fn is_empty(&self) -> bool {
self.groups.is_empty()
}
pub fn iter(
&self,
) -> impl Iterator<
Item = (
&ThroughputControlGroupKey,
&Arc<ThroughputControlGroupOptions>,
),
> {
self.groups.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{AccountReference, PartitionKeyDefinition, SystemProperties};
use url::Url;
fn test_partition_key_definition(path: &str) -> PartitionKeyDefinition {
serde_json::from_str(&format!(r#"{{"paths":["{path}"]}}"#)).unwrap()
}
fn test_container_props() -> crate::models::ContainerProperties {
crate::models::ContainerProperties {
id: "testcontainer".into(),
partition_key: test_partition_key_definition("/pk"),
system_properties: SystemProperties::default(),
}
}
fn test_container() -> ContainerReference {
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"test-key",
);
ContainerReference::new(
account,
"testdb",
"testdb_rid",
"testcontainer",
"testcontainer_rid",
&test_container_props(),
)
}
fn test_container_2() -> ContainerReference {
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"test-key",
);
ContainerReference::new(
account,
"testdb",
"testdb_rid",
"container2",
"container2_rid",
&test_container_props(),
)
}
#[test]
fn client_side_group_creation() {
let container = test_container();
let group = ThroughputControlGroupOptions::client_side(
"my-group",
container.clone(),
ThroughputTarget::Threshold(0.5),
Some(PriorityLevel::High),
true,
);
assert_eq!(group.name().as_str(), "my-group");
assert_eq!(group.container(), &container);
assert!(group.is_default());
assert_eq!(
group.target_throughput(),
Some(ThroughputTarget::Threshold(0.5))
);
assert_eq!(group.priority_level(), Some(PriorityLevel::High));
assert!(group.throughput_bucket().is_none());
}
#[test]
fn server_side_bucket_group_creation() {
let container = test_container();
let group = ThroughputControlGroupOptions::server_side_throughput_bucket(
"bucket-group",
container.clone(),
100,
false,
);
assert_eq!(group.name().as_str(), "bucket-group");
assert!(!group.is_default());
assert!(group.target_throughput().is_none());
assert_eq!(group.throughput_bucket(), Some(100));
assert!(group.priority_level().is_none());
}
#[test]
fn server_side_priority_group_creation() {
let container = test_container();
let group = ThroughputControlGroupOptions::server_side_priority_based_throttling(
"priority-group",
container.clone(),
PriorityLevel::Low,
true,
);
assert_eq!(group.name().as_str(), "priority-group");
assert!(group.is_default());
assert!(group.target_throughput().is_none());
assert!(group.throughput_bucket().is_none());
assert_eq!(group.priority_level(), Some(PriorityLevel::Low));
}
#[test]
fn mutable_values_can_be_updated() {
let container = test_container();
let group = ThroughputControlGroupOptions::client_side(
"mutable-test",
container,
ThroughputTarget::Absolute(1000),
None,
false,
);
assert_eq!(
group.target_throughput(),
Some(ThroughputTarget::Absolute(1000))
);
assert!(group.priority_level().is_none());
group.set_target_throughput(ThroughputTarget::Threshold(0.75));
group.set_priority_level(PriorityLevel::High);
assert_eq!(
group.target_throughput(),
Some(ThroughputTarget::Threshold(0.75))
);
assert_eq!(group.priority_level(), Some(PriorityLevel::High));
}
#[test]
fn registry_registration() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group = ThroughputControlGroupOptions::client_side(
"test-group",
container.clone(),
ThroughputTarget::Threshold(0.5),
None,
false,
);
assert!(registry.register(group).is_ok());
assert_eq!(registry.len(), 1);
let key = ThroughputControlGroupKey::new(container.clone(), "test-group");
assert!(registry.get(&key).is_some());
}
#[test]
fn registry_rejects_duplicate_key() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group1 = ThroughputControlGroupOptions::client_side(
"same-name",
container.clone(),
ThroughputTarget::Threshold(0.5),
None,
false,
);
let group2 = ThroughputControlGroupOptions::server_side_throughput_bucket(
"same-name",
container.clone(),
100,
false,
);
assert!(registry.register(group1).is_ok());
let result = registry.register(group2);
assert!(matches!(
result,
Err(ThroughputControlGroupRegistrationError::DuplicateGroup(_))
));
}
#[test]
fn registry_rejects_duplicate_default() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group1 = ThroughputControlGroupOptions::client_side(
"default-1",
container.clone(),
ThroughputTarget::Threshold(0.5),
None,
true, );
let group2 = ThroughputControlGroupOptions::client_side(
"default-2",
container.clone(),
ThroughputTarget::Threshold(0.3),
None,
true, );
assert!(registry.register(group1).is_ok());
let result = registry.register(group2);
assert!(matches!(
result,
Err(ThroughputControlGroupRegistrationError::DuplicateDefault { .. })
));
}
#[test]
fn same_name_different_containers_allowed() {
let mut registry = ThroughputControlGroupRegistry::new();
let container1 = test_container();
let container2 = test_container_2();
let group1 = ThroughputControlGroupOptions::client_side(
"shared-name",
container1.clone(),
ThroughputTarget::Threshold(0.5),
None,
true,
);
let group2 = ThroughputControlGroupOptions::client_side(
"shared-name",
container2.clone(),
ThroughputTarget::Threshold(0.3),
None,
true, );
assert!(registry.register(group1).is_ok());
assert!(registry.register(group2).is_ok());
assert_eq!(registry.len(), 2);
}
#[test]
fn get_default_for_container() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let default_group = ThroughputControlGroupOptions::client_side(
"default-group",
container.clone(),
ThroughputTarget::Threshold(0.5),
None,
true,
);
let other_group = ThroughputControlGroupOptions::client_side(
"other-group",
container.clone(),
ThroughputTarget::Threshold(0.3),
None,
false,
);
registry.register(default_group).unwrap();
registry.register(other_group).unwrap();
let default = registry.get_default_for_container(&container);
assert!(default.is_some());
assert_eq!(default.unwrap().name().as_str(), "default-group");
}
#[test]
fn groups_for_container() {
let mut registry = ThroughputControlGroupRegistry::new();
let container1 = test_container();
let container2 = test_container_2();
let group1 = ThroughputControlGroupOptions::client_side(
"group1",
container1.clone(),
ThroughputTarget::Threshold(0.5),
None,
false,
);
let group2 = ThroughputControlGroupOptions::client_side(
"group2",
container1.clone(),
ThroughputTarget::Threshold(0.3),
None,
false,
);
let group3 = ThroughputControlGroupOptions::client_side(
"group3",
container2.clone(),
ThroughputTarget::Threshold(0.4),
None,
false,
);
registry.register(group1).unwrap();
registry.register(group2).unwrap();
registry.register(group3).unwrap();
let c1_groups = registry.groups_for_container(&container1);
assert_eq!(c1_groups.len(), 2);
let c2_groups = registry.groups_for_container(&container2);
assert_eq!(c2_groups.len(), 1);
}
#[test]
fn snapshot_captures_current_state() {
let container = test_container();
let group = ThroughputControlGroupOptions::client_side(
"snapshot-test",
container.clone(),
ThroughputTarget::Absolute(500),
Some(PriorityLevel::Low),
true,
);
let snapshot = ThroughputControlGroupSnapshot::from(&group);
assert_eq!(snapshot.name.as_str(), "snapshot-test");
assert!(snapshot.is_default);
assert_eq!(
snapshot.target_throughput,
Some(ThroughputTarget::Absolute(500))
);
assert_eq!(snapshot.priority_level, Some(PriorityLevel::Low));
assert!(snapshot.is_client_side);
assert!(snapshot.throughput_bucket.is_none());
}
}