use crate::{
models::{ContainerReference, ThroughputControlGroupName},
options::PriorityLevel,
};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
#[derive(Clone, Debug, Default)]
struct ThroughputControlSettings {
throughput_bucket: Option<u32>,
priority_level: Option<PriorityLevel>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ThroughputControlGroupOptions {
name: ThroughputControlGroupName,
container: ContainerReference,
is_default: bool,
mutable: Arc<RwLock<ThroughputControlSettings>>,
}
impl ThroughputControlGroupOptions {
pub fn new(
name: impl Into<ThroughputControlGroupName>,
container: ContainerReference,
is_default: bool,
) -> Self {
Self {
name: name.into(),
container,
is_default,
mutable: Arc::new(RwLock::new(ThroughputControlSettings::default())),
}
}
pub fn with_throughput_bucket(self, bucket: u32) -> Self {
self.mutable.write().unwrap().throughput_bucket = Some(bucket);
self
}
pub fn with_priority_level(self, level: PriorityLevel) -> Self {
self.mutable.write().unwrap().priority_level = Some(level);
self
}
pub fn name(&self) -> &ThroughputControlGroupName {
&self.name
}
pub fn container(&self) -> &ContainerReference {
&self.container
}
pub fn is_default(&self) -> bool {
self.is_default
}
pub(crate) fn key(&self) -> ThroughputControlGroupKey {
ThroughputControlGroupKey {
container: self.container.clone(),
name: self.name.clone(),
}
}
pub fn throughput_bucket(&self) -> Option<u32> {
self.mutable.read().unwrap().throughput_bucket
}
pub fn set_throughput_bucket(&self, bucket: u32) {
self.mutable.write().unwrap().throughput_bucket = Some(bucket);
}
pub fn priority_level(&self) -> Option<PriorityLevel> {
self.mutable.read().unwrap().priority_level
}
pub fn set_priority_level(&self, level: PriorityLevel) {
self.mutable.write().unwrap().priority_level = Some(level);
}
}
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct ThroughputControlGroupKey {
pub(crate) container: ContainerReference,
pub(crate) name: ThroughputControlGroupName,
}
#[allow(dead_code)] impl ThroughputControlGroupKey {
pub(crate) fn new(
container: ContainerReference,
name: impl Into<ThroughputControlGroupName>,
) -> Self {
Self {
container,
name: name.into(),
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug)]
#[allow(dead_code)] pub(crate) struct ThroughputControlGroupSnapshot {
pub(crate) name: ThroughputControlGroupName,
pub(crate) container: ContainerReference,
pub(crate) is_default: bool,
pub(crate) throughput_bucket: Option<u32>,
pub(crate) priority_level: Option<PriorityLevel>,
}
#[allow(dead_code)] impl ThroughputControlGroupSnapshot {
pub(crate) fn new(
name: ThroughputControlGroupName,
container: ContainerReference,
is_default: bool,
) -> Self {
Self {
name,
container,
is_default,
throughput_bucket: None,
priority_level: None,
}
}
pub(crate) fn with_throughput_bucket(mut self, bucket: u32) -> Self {
self.throughput_bucket = Some(bucket);
self
}
pub(crate) fn with_priority_level(mut self, level: PriorityLevel) -> Self {
self.priority_level = Some(level);
self
}
pub(crate) fn name(&self) -> &ThroughputControlGroupName {
&self.name
}
pub(crate) fn container(&self) -> &ContainerReference {
&self.container
}
pub(crate) fn is_default(&self) -> bool {
self.is_default
}
pub(crate) fn throughput_bucket(&self) -> Option<u32> {
self.throughput_bucket
}
pub(crate) fn priority_level(&self) -> Option<PriorityLevel> {
self.priority_level
}
}
impl From<&ThroughputControlGroupOptions> for ThroughputControlGroupSnapshot {
fn from(group: &ThroughputControlGroupOptions) -> Self {
let mutable = group.mutable.read().unwrap();
let mut snapshot = Self::new(
group.name().clone(),
group.container().clone(),
group.is_default(),
);
if let Some(bucket) = mutable.throughput_bucket {
snapshot = snapshot.with_throughput_bucket(bucket);
}
if let Some(level) = mutable.priority_level {
snapshot = snapshot.with_priority_level(level);
}
snapshot
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum ThroughputControlGroupRegistrationError {
DuplicateGroup(ThroughputControlGroupKey),
DuplicateDefault {
container: ContainerReference,
existing_default: ThroughputControlGroupName,
},
}
impl std::fmt::Display for ThroughputControlGroupRegistrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicateGroup(key) => {
write!(
f,
"Throughput control group '{}' already registered for container",
key.name
)
}
Self::DuplicateDefault {
existing_default, ..
} => {
write!(
f,
"Container already has a default throughput control group: '{}'",
existing_default
)
}
}
}
}
impl std::error::Error for ThroughputControlGroupRegistrationError {}
#[non_exhaustive]
#[derive(Clone, Debug, Default)]
pub(crate) struct ThroughputControlGroupRegistry {
groups: HashMap<ThroughputControlGroupKey, Arc<ThroughputControlGroupOptions>>,
defaults: HashMap<ContainerReference, ThroughputControlGroupName>,
}
#[allow(dead_code)] impl ThroughputControlGroupRegistry {
pub(crate) fn new() -> Self {
Self::default()
}
#[allow(clippy::result_large_err)]
pub(crate) fn register(
&mut self,
group: ThroughputControlGroupOptions,
) -> Result<(), ThroughputControlGroupRegistrationError> {
let key = group.key();
if self.groups.contains_key(&key) {
return Err(ThroughputControlGroupRegistrationError::DuplicateGroup(key));
}
if group.is_default() {
if let Some(existing_default) = self.defaults.get(group.container()) {
return Err(ThroughputControlGroupRegistrationError::DuplicateDefault {
container: group.container().clone(),
existing_default: existing_default.clone(),
});
}
self.defaults
.insert(group.container().clone(), group.name().clone());
}
self.groups.insert(key, Arc::new(group));
Ok(())
}
pub(crate) fn get(
&self,
key: &ThroughputControlGroupKey,
) -> Option<&Arc<ThroughputControlGroupOptions>> {
self.groups.get(key)
}
pub(crate) fn get_by_container_and_name(
&self,
container: &ContainerReference,
name: &ThroughputControlGroupName,
) -> Option<&Arc<ThroughputControlGroupOptions>> {
let key = ThroughputControlGroupKey {
container: container.clone(),
name: name.clone(),
};
self.groups.get(&key)
}
pub(crate) fn get_default_for_container(
&self,
container: &ContainerReference,
) -> Option<&Arc<ThroughputControlGroupOptions>> {
self.defaults.get(container).and_then(|name| {
let key = ThroughputControlGroupKey {
container: container.clone(),
name: name.clone(),
};
self.groups.get(&key)
})
}
pub(crate) fn groups_for_container(
&self,
container: &ContainerReference,
) -> Vec<&Arc<ThroughputControlGroupOptions>> {
self.groups
.iter()
.filter(|(key, _)| &key.container == container)
.map(|(_, group)| group)
.collect()
}
pub(crate) fn len(&self) -> usize {
self.groups.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.groups.is_empty()
}
pub(crate) fn iter(
&self,
) -> impl Iterator<
Item = (
&ThroughputControlGroupKey,
&Arc<ThroughputControlGroupOptions>,
),
> {
self.groups.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{AccountReference, PartitionKeyDefinition, SystemProperties};
use url::Url;
fn test_partition_key_definition(path: &str) -> PartitionKeyDefinition {
serde_json::from_str(&format!(r#"{{"paths":["{path}"]}}"#)).unwrap()
}
fn test_container_props() -> crate::models::ContainerProperties {
crate::models::ContainerProperties {
id: "testcontainer".into(),
partition_key: test_partition_key_definition("/pk"),
system_properties: SystemProperties::default(),
}
}
fn test_container() -> ContainerReference {
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"test-key",
);
ContainerReference::new(
account,
"testdb",
"testdb_rid",
"testcontainer",
"testcontainer_rid",
&test_container_props(),
)
}
fn test_container_2() -> ContainerReference {
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"test-key",
);
ContainerReference::new(
account,
"testdb",
"testdb_rid",
"container2",
"container2_rid",
&test_container_props(),
)
}
#[test]
fn bucket_group_creation() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("bucket-group", container.clone(), false)
.with_throughput_bucket(100);
assert_eq!(group.name().as_str(), "bucket-group");
assert!(!group.is_default());
assert_eq!(group.throughput_bucket(), Some(100));
assert!(group.priority_level().is_none());
}
#[test]
fn priority_group_creation() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("priority-group", container.clone(), true)
.with_priority_level(PriorityLevel::Low);
assert_eq!(group.name().as_str(), "priority-group");
assert!(group.is_default());
assert!(group.throughput_bucket().is_none());
assert_eq!(group.priority_level(), Some(PriorityLevel::Low));
}
#[test]
fn registry_registration() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group = ThroughputControlGroupOptions::new("test-group", container.clone(), false)
.with_throughput_bucket(100);
assert!(registry.register(group).is_ok());
assert_eq!(registry.len(), 1);
let key = ThroughputControlGroupKey::new(container.clone(), "test-group");
assert!(registry.get(&key).is_some());
}
#[test]
fn registry_rejects_duplicate_key() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group1 = ThroughputControlGroupOptions::new("same-name", container.clone(), false)
.with_throughput_bucket(100);
let group2 = ThroughputControlGroupOptions::new("same-name", container.clone(), false)
.with_throughput_bucket(100);
assert!(registry.register(group1).is_ok());
let result = registry.register(group2);
assert!(matches!(
result,
Err(ThroughputControlGroupRegistrationError::DuplicateGroup(_))
));
}
#[test]
fn registry_rejects_duplicate_default() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group1 = ThroughputControlGroupOptions::new(
"default-1",
container.clone(),
true, )
.with_throughput_bucket(100);
let group2 = ThroughputControlGroupOptions::new(
"default-2",
container.clone(),
true, )
.with_throughput_bucket(200);
assert!(registry.register(group1).is_ok());
let result = registry.register(group2);
assert!(matches!(
result,
Err(ThroughputControlGroupRegistrationError::DuplicateDefault { .. })
));
}
#[test]
fn same_name_different_containers_allowed() {
let mut registry = ThroughputControlGroupRegistry::new();
let container1 = test_container();
let container2 = test_container_2();
let group1 = ThroughputControlGroupOptions::new("shared-name", container1.clone(), true)
.with_throughput_bucket(100);
let group2 = ThroughputControlGroupOptions::new(
"shared-name",
container2.clone(),
true, )
.with_throughput_bucket(200);
assert!(registry.register(group1).is_ok());
assert!(registry.register(group2).is_ok());
assert_eq!(registry.len(), 2);
}
#[test]
fn get_default_for_container() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let default_group =
ThroughputControlGroupOptions::new("default-group", container.clone(), true)
.with_throughput_bucket(100);
let other_group =
ThroughputControlGroupOptions::new("other-group", container.clone(), false)
.with_throughput_bucket(200);
registry.register(default_group).unwrap();
registry.register(other_group).unwrap();
let default = registry.get_default_for_container(&container);
assert!(default.is_some());
assert_eq!(default.unwrap().name().as_str(), "default-group");
}
#[test]
fn groups_for_container() {
let mut registry = ThroughputControlGroupRegistry::new();
let container1 = test_container();
let container2 = test_container_2();
let group1 = ThroughputControlGroupOptions::new("group1", container1.clone(), false)
.with_throughput_bucket(100);
let group2 = ThroughputControlGroupOptions::new("group2", container1.clone(), false)
.with_throughput_bucket(200);
let group3 = ThroughputControlGroupOptions::new("group3", container2.clone(), false)
.with_throughput_bucket(300);
registry.register(group1).unwrap();
registry.register(group2).unwrap();
registry.register(group3).unwrap();
let c1_groups = registry.groups_for_container(&container1);
assert_eq!(c1_groups.len(), 2);
let c2_groups = registry.groups_for_container(&container2);
assert_eq!(c2_groups.len(), 1);
}
#[test]
fn snapshot_captures_current_state() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("snapshot-test", container.clone(), true)
.with_priority_level(PriorityLevel::Low);
let snapshot = ThroughputControlGroupSnapshot::from(&group);
assert_eq!(snapshot.name().as_str(), "snapshot-test");
assert!(snapshot.is_default());
assert_eq!(snapshot.priority_level(), Some(PriorityLevel::Low));
assert!(snapshot.throughput_bucket().is_none());
}
#[test]
fn registry_lookup_by_container_and_name() {
let mut registry = ThroughputControlGroupRegistry::new();
let container = test_container();
let group = ThroughputControlGroupOptions::new("lookup-test", container.clone(), false)
.with_throughput_bucket(10);
registry.register(group).unwrap();
let name = ThroughputControlGroupName::new("lookup-test");
let found = registry.get_by_container_and_name(&container, &name);
assert!(found.is_some());
assert_eq!(found.unwrap().throughput_bucket(), Some(10));
let missing = ThroughputControlGroupName::new("no-such-group");
assert!(registry
.get_by_container_and_name(&container, &missing)
.is_none());
}
#[test]
fn set_throughput_bucket_succeeds() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("bucket-group", container, false)
.with_throughput_bucket(100);
group.set_throughput_bucket(200);
assert_eq!(group.throughput_bucket(), Some(200));
}
#[test]
fn set_priority_level_succeeds() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("priority-group", container, false)
.with_priority_level(PriorityLevel::Low);
group.set_priority_level(PriorityLevel::High);
assert_eq!(group.priority_level(), Some(PriorityLevel::High));
}
#[test]
fn group_with_both_bucket_and_priority() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("both-group", container, false)
.with_throughput_bucket(42)
.with_priority_level(PriorityLevel::Low);
assert_eq!(group.throughput_bucket(), Some(42));
assert_eq!(group.priority_level(), Some(PriorityLevel::Low));
}
#[test]
fn snapshot_reflects_mutation() {
let container = test_container();
let group = ThroughputControlGroupOptions::new("mutate-test", container, false)
.with_throughput_bucket(100)
.with_priority_level(PriorityLevel::Low);
group.set_throughput_bucket(200);
group.set_priority_level(PriorityLevel::High);
let snapshot = ThroughputControlGroupSnapshot::from(&group);
assert_eq!(snapshot.throughput_bucket(), Some(200));
assert_eq!(snapshot.priority_level(), Some(PriorityLevel::High));
}
}