use azure_core::time::Duration;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::Instant;
use crate::background_task_manager::BackgroundTaskManager;
use crate::constants;
use crate::cosmos_request::CosmosRequest;
use crate::operation_context::OperationType;
use crate::resource_context::ResourceType;
use crate::routing::global_endpoint_manager::GlobalEndpointManager;
use crate::routing::partition_key_range::PartitionKeyRange;
use tracing::info;
use url::Url;
const DEFAULT_ALLOWED_PARTITION_UNAVAILABILITY_DURATION_SECS: i64 = 5;
const DEFAULT_STALE_PARTITION_UNAVAILABILITY_REFRESH_INTERVAL_SECS: i64 = 300;
const DEFAULT_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_COUNT_FOR_READS: i32 = 2;
const DEFAULT_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_COUNT_FOR_WRITES: i32 = 5;
const DEFAULT_CIRCUIT_BREAKER_TIMEOUT_COUNTER_RESET_WINDOW_MINS: i64 = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(i32)]
pub enum PartitionHealthStatus {
Healthy = 100,
Unhealthy = 200,
}
#[derive(Debug)]
pub struct GlobalPartitionEndpointManager {
global_endpoint_manager: Arc<GlobalEndpointManager>,
partition_unavailability_duration_secs: i64,
background_connection_init_interval_secs: i64,
partition_key_range_to_location_for_write:
Arc<RwLock<HashMap<PartitionKeyRange, PartitionKeyRangeFailoverInfo>>>,
partition_key_range_to_location_for_read_and_write:
Arc<RwLock<HashMap<PartitionKeyRange, PartitionKeyRangeFailoverInfo>>>,
background_connection_init_active: AtomicBool,
partition_level_automatic_failover_enabled: AtomicBool,
partition_level_circuit_breaker_enabled: AtomicBool,
background_task_manager: BackgroundTaskManager,
}
impl GlobalPartitionEndpointManager {
pub fn new(
global_endpoint_manager: Arc<GlobalEndpointManager>,
partition_level_failover_enabled: bool,
partition_level_circuit_breaker_enabled: bool,
) -> Arc<Self> {
let instance = Arc::new(Self {
global_endpoint_manager,
partition_unavailability_duration_secs:
Self::allowed_partition_unavailability_duration_secs(
DEFAULT_ALLOWED_PARTITION_UNAVAILABILITY_DURATION_SECS,
),
background_connection_init_interval_secs:
Self::stale_partition_unavailability_refresh_interval_secs(
DEFAULT_STALE_PARTITION_UNAVAILABILITY_REFRESH_INTERVAL_SECS,
),
partition_key_range_to_location_for_write: Arc::new(RwLock::new(HashMap::new())),
partition_key_range_to_location_for_read_and_write: Arc::new(RwLock::new(
HashMap::new(),
)),
background_connection_init_active: AtomicBool::new(false),
partition_level_automatic_failover_enabled: AtomicBool::new(
partition_level_failover_enabled,
),
partition_level_circuit_breaker_enabled: AtomicBool::new(
partition_level_circuit_breaker_enabled,
),
background_task_manager: BackgroundTaskManager::new(),
});
instance.initialize_and_start_circuit_breaker_failback_background_refresh();
instance
}
fn allowed_partition_unavailability_duration_secs(default: i64) -> i64 {
std::env::var(constants::AZURE_COSMOS_ALLOWED_PARTITION_UNAVAILABILITY_DURATION_IN_SECONDS)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn stale_partition_unavailability_refresh_interval_secs(default: i64) -> i64 {
std::env::var(
constants::AZURE_COSMOS_PPCB_STALE_PARTITION_UNAVAILABILITY_REFRESH_INTERVAL_IN_SECONDS,
)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn initialize_and_start_circuit_breaker_failback_background_refresh(self: &Arc<Self>) {
if self
.background_connection_init_active
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
let weak_self = Arc::downgrade(self);
self.background_task_manager.spawn(Box::pin(async move {
Self::initiate_circuit_breaker_failback_loop(weak_self).await;
}));
}
async fn initiate_circuit_breaker_failback_loop(weak_self: Weak<Self>) {
let interval = match weak_self.upgrade() {
Some(strong) => Duration::seconds(strong.background_connection_init_interval_secs),
None => return,
};
loop {
azure_core::async_runtime::get_async_runtime()
.sleep(interval)
.await;
let strong = match weak_self.upgrade() {
Some(s) => s,
None => {
info!("GlobalPartitionEndpointManager: background failback loop exiting because the client has been dropped.");
return;
}
};
info!("GlobalPartitionEndpointManager: initiate_circuit_breaker_failback_loop() un-deterministically marking the failed partitions back to healthy.");
if let Err(e) = strong.initiate_failback_to_unhealthy_endpoints().await {
tracing::error!("GlobalPartitionEndpointManager: initiate_circuit_breaker_failback_loop() - failed to mark the failed partitions back to healthy. Exception: {}", e);
}
}
}
async fn initiate_failback_to_unhealthy_endpoints(
&self,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("GlobalPartitionEndpointManager: initiate_circuit_breaker_failback_loop() - Attempting to mark the failed partitions back to healthy and initiate failback.");
let mut pk_range_to_endpoint_mappings: HashMap<
PartitionKeyRange,
(String, String, PartitionHealthStatus),
> = HashMap::new();
{
let guard = self
.partition_key_range_to_location_for_read_and_write
.read()
.map_err(|e| e.to_string())?;
for (pk_range, partition_failover) in guard.iter() {
let pk_range = pk_range.clone();
let (first_request_failure_time, _) =
partition_failover.snapshot_partition_failover_timestamps();
if Instant::now().duration_since(first_request_failure_time)
> Duration::seconds(self.partition_unavailability_duration_secs)
{
let original_failed_location = partition_failover.first_failed_location.clone();
pk_range_to_endpoint_mappings.insert(
pk_range,
(
partition_failover.collection_rid.clone(),
original_failed_location,
PartitionHealthStatus::Unhealthy,
),
);
}
}
}
if !pk_range_to_endpoint_mappings.is_empty() {
Self::mark_endpoints_to_healthy(&mut pk_range_to_endpoint_mappings);
for (pk_range, (_, original_failed_location, current_health_state)) in
pk_range_to_endpoint_mappings
{
if current_health_state == PartitionHealthStatus::Healthy {
info!(
"Initiating failback to endpoint: {}, for partition key range: {:?}",
original_failed_location, pk_range
);
self.partition_key_range_to_location_for_read_and_write
.write()
.unwrap()
.remove(&pk_range);
}
}
}
Ok(())
}
fn mark_endpoints_to_healthy(
pk_range_uri_mappings: &mut HashMap<
PartitionKeyRange,
(String, String, PartitionHealthStatus),
>,
) {
for (pk_range, mapping) in pk_range_uri_mappings.iter_mut() {
info!(
"Un-deterministically marking the original failed endpoint: {}, for the PkRange: {}, collectionRid: {} back to healthy.",
mapping.1,
pk_range.id,
mapping.0
);
mapping.2 = PartitionHealthStatus::Healthy;
}
}
pub fn is_request_eligible_for_per_partition_automatic_failover(
&self,
request: &CosmosRequest,
) -> bool {
self.partition_level_automatic_failover_enabled
.load(Ordering::SeqCst)
&& !request.is_read_only_request()
&& !self
.global_endpoint_manager
.can_support_multiple_write_locations(request.resource_type, request.operation_type)
}
pub fn is_request_eligible_for_partition_level_circuit_breaker(
&self,
request: &CosmosRequest,
) -> bool {
self.partition_level_circuit_breaker_enabled
.load(Ordering::SeqCst)
&& (request.resource_type == ResourceType::Documents
|| (request.resource_type == ResourceType::StoredProcedures
&& request.operation_type == OperationType::Execute))
&& (request.is_read_only_request()
|| (!request.is_read_only_request()
&& self
.global_endpoint_manager
.can_support_multiple_write_locations(
request.resource_type,
request.operation_type,
)))
}
fn is_request_eligible_for_partition_failover(
&self,
request: &CosmosRequest,
should_validate_failed_location: bool,
) -> Option<(PartitionKeyRange, Option<Url>)> {
if !self.partition_level_automatic_failover_enabled()
&& !self.partition_level_circuit_breaker_enabled()
{
return None;
}
let request_context = &request.request_context;
if !self.can_use_partition_level_failover_locations(request) {
return None;
}
let partition_key_range = request_context.resolved_partition_key_range.clone()?;
let failed_location = if should_validate_failed_location {
let location = request_context.location_endpoint_to_route.clone()?;
Some(location)
} else {
None
};
Some((partition_key_range, failed_location))
}
fn can_use_partition_level_failover_locations(&self, request: &CosmosRequest) -> bool {
if self.global_endpoint_manager.read_endpoints().len() <= 1 {
return false;
}
matches!(request.resource_type, ResourceType::Documents)
|| (request.resource_type == ResourceType::StoredProcedures
&& request.operation_type == OperationType::Execute)
}
fn try_route_request_for_partition_level_override(
&self,
partition_key_range: &PartitionKeyRange,
request: &mut CosmosRequest,
partition_key_range_to_location_mapping: &Arc<
RwLock<HashMap<PartitionKeyRange, PartitionKeyRangeFailoverInfo>>,
>,
) -> bool {
if let Some(partition_key_range_failover) = partition_key_range_to_location_mapping
.read()
.unwrap()
.get(partition_key_range)
{
if self.is_request_eligible_for_partition_level_circuit_breaker(request)
&& !partition_key_range_failover
.can_circuit_breaker_trigger_partition_failover(request.is_read_only_request())
{
return false;
}
let triggered_by = if self
.partition_level_automatic_failover_enabled
.load(Ordering::SeqCst)
{
"Automatic Failover"
} else {
"Circuit Breaker"
};
info!(
"Attempting to route request for partition level override triggered by {}, for operation type: {:?}. URI: {}, PartitionKeyRange: {:?}",
triggered_by,
request.operation_type,
partition_key_range_failover.current,
partition_key_range.id
);
if let Ok(endpoint) = partition_key_range_failover.current.parse() {
request.request_context.route_to_location_endpoint(endpoint);
return true;
} else {
info!(
"Skipping partition level override due to invalid URI in failover info: {}",
partition_key_range_failover.current
);
}
}
false
}
pub fn partition_level_automatic_failover_enabled(&self) -> bool {
self.partition_level_automatic_failover_enabled
.load(Ordering::SeqCst)
}
pub fn partition_level_circuit_breaker_enabled(&self) -> bool {
self.partition_level_circuit_breaker_enabled
.load(Ordering::SeqCst)
}
pub fn partition_level_failover_enabled(&self) -> bool {
self.partition_level_circuit_breaker_enabled()
|| self.partition_level_automatic_failover_enabled()
}
pub fn try_add_partition_level_location_override(&self, request: &mut CosmosRequest) -> bool {
let Some((partition_key_range, _)) =
self.is_request_eligible_for_partition_failover(request, false)
else {
return false;
};
if self.is_request_eligible_for_partition_level_circuit_breaker(request) {
return self.try_route_request_for_partition_level_override(
&partition_key_range,
request,
&self.partition_key_range_to_location_for_read_and_write,
);
} else if self.is_request_eligible_for_per_partition_automatic_failover(request) {
return self.try_route_request_for_partition_level_override(
&partition_key_range,
request,
&self.partition_key_range_to_location_for_write,
);
}
false
}
pub fn try_mark_endpoint_unavailable_for_partition_key_range(
&self,
request: &CosmosRequest,
) -> bool {
let Some((partition_key_range, failed_location)) =
self.is_request_eligible_for_partition_failover(request, true)
else {
return false;
};
let Some(failed_location) = failed_location else {
return false;
};
let failed_location_str = failed_location.as_str();
if self.is_request_eligible_for_partition_level_circuit_breaker(request) {
let next_locations: Vec<String> = self
.global_endpoint_manager
.read_endpoints()
.iter()
.map(|u| u.to_string())
.collect();
return self.try_add_or_update_partition_failover_info_and_move_to_next_location(
&partition_key_range,
failed_location_str,
&next_locations,
request,
&self.partition_key_range_to_location_for_read_and_write,
);
} else if self.is_request_eligible_for_per_partition_automatic_failover(request) {
let next_locations: Vec<String> = self
.global_endpoint_manager
.account_read_endpoints()
.iter()
.map(|u| u.to_string())
.collect();
return self.try_add_or_update_partition_failover_info_and_move_to_next_location(
&partition_key_range,
failed_location_str,
&next_locations,
request,
&self.partition_key_range_to_location_for_write,
);
}
tracing::info!(
"Partition level override was skipped since the request did not meet the minimum requirements."
);
false
}
fn try_add_or_update_partition_failover_info_and_move_to_next_location(
&self,
partition_key_range: &PartitionKeyRange,
failed_location: &str,
next_locations: &[String],
request: &CosmosRequest,
partition_key_range_to_location_mapping: &Arc<
RwLock<HashMap<PartitionKeyRange, PartitionKeyRangeFailoverInfo>>,
>,
) -> bool {
if request.request_context.resolved_collection_rid.is_none() {
return false;
}
let triggered_by = if self
.partition_level_automatic_failover_enabled
.load(Ordering::SeqCst)
{
"Automatic Failover"
} else {
"Circuit Breaker"
};
let collection_rid = request
.request_context
.resolved_collection_rid
.clone()
.unwrap();
let mut guard = partition_key_range_to_location_mapping.write().unwrap();
let partition_failover = guard.entry(partition_key_range.clone()).or_insert_with(|| {
PartitionKeyRangeFailoverInfo::new(collection_rid, failed_location.to_string())
});
if partition_failover.try_move_next_location(next_locations, failed_location) {
tracing::info!(
"Partition level override triggered by {}, added to new location for {:?}. \
PartitionKeyRange: {:?}, failedLocation: {}, new location: {}",
triggered_by,
request.operation_type,
partition_key_range,
failed_location,
partition_failover.current
);
return true;
}
tracing::info!(
"Partition level override removed for {:?}. PartitionKeyRange: {:?}, failedLocation: {}",
request.operation_type,
partition_key_range,
failed_location
);
guard.remove(partition_key_range);
false
}
pub(crate) fn increment_request_failure_counter_and_check_if_partition_can_failover(
&self,
request: &CosmosRequest,
) -> bool {
let Some((partition_key_range, Some(failed_location))) =
self.is_request_eligible_for_partition_failover(request, true)
else {
return false;
};
if request.request_context.resolved_collection_rid.is_none() {
return false;
}
let collection_rid = request
.request_context
.resolved_collection_rid
.clone()
.unwrap();
let is_read_only = request.is_read_only_request();
if self.is_request_eligible_for_per_partition_automatic_failover(request) {
let mut guard = self
.partition_key_range_to_location_for_write
.write()
.unwrap();
let partition_failover = guard.entry(partition_key_range).or_insert_with(|| {
PartitionKeyRangeFailoverInfo::new(
collection_rid.clone(),
failed_location.to_string(),
)
});
partition_failover.increment_request_failure_counts(is_read_only, Instant::now());
partition_failover.can_circuit_breaker_trigger_partition_failover(is_read_only)
} else {
let mut guard = self
.partition_key_range_to_location_for_read_and_write
.write()
.unwrap();
let partition_failover = guard.entry(partition_key_range).or_insert_with(|| {
PartitionKeyRangeFailoverInfo::new(collection_rid, failed_location.to_string())
});
partition_failover.increment_request_failure_counts(is_read_only, Instant::now());
partition_failover.can_circuit_breaker_trigger_partition_failover(is_read_only)
}
}
pub fn configure_partition_level_automatic_failover(&self, is_enabled: bool) {
let previous = self
.partition_level_automatic_failover_enabled
.swap(is_enabled, Ordering::SeqCst);
if previous != is_enabled {
info!(
"Per partition automatic failover enablement flag changed: {} -> {}",
previous, is_enabled
);
}
}
pub fn configure_per_partition_circuit_breaker(&self, is_enabled: bool) {
let previous = self
.partition_level_circuit_breaker_enabled
.swap(is_enabled, Ordering::SeqCst);
if previous != is_enabled {
info!(
"Per partition circuit breaker enablement flag changed: {} -> {}",
previous, is_enabled
);
}
}
}
#[derive(Debug)]
pub struct PartitionKeyRangeFailoverInfo {
failed_locations: Mutex<HashMap<String, Instant>>,
timeout_counter_reset_window: Duration,
read_request_failure_counter_threshold: i32,
write_request_failure_counter_threshold: i32,
last_request_failure_time: RwLock<Instant>,
consecutive_read_request_failure_count: AtomicI32,
consecutive_write_request_failure_count: AtomicI32,
pub current: String,
pub first_failed_location: String,
pub collection_rid: String,
pub first_request_failure_time: Instant,
}
impl PartitionKeyRangeFailoverInfo {
pub fn new(collection_rid: String, current_location: String) -> Self {
Self {
collection_rid,
current: current_location.clone(),
first_failed_location: current_location,
failed_locations: Mutex::new(HashMap::new()),
consecutive_read_request_failure_count: AtomicI32::new(0),
consecutive_write_request_failure_count: AtomicI32::new(0),
read_request_failure_counter_threshold:
Self::circuit_breaker_consecutive_failure_count_for_reads(
DEFAULT_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_COUNT_FOR_READS,
),
write_request_failure_counter_threshold:
Self::circuit_breaker_consecutive_failure_count_for_writes(
DEFAULT_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_COUNT_FOR_WRITES,
),
timeout_counter_reset_window: Duration::seconds(
Self::circuit_breaker_timeout_counter_reset_window_mins(
DEFAULT_CIRCUIT_BREAKER_TIMEOUT_COUNTER_RESET_WINDOW_MINS,
) * 60,
),
first_request_failure_time: Instant::now(),
last_request_failure_time: RwLock::new(Instant::now()),
}
}
fn circuit_breaker_consecutive_failure_count_for_reads(default: i32) -> i32 {
std::env::var(constants::AZURE_COSMOS_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_COUNT_FOR_READS)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn circuit_breaker_consecutive_failure_count_for_writes(default: i32) -> i32 {
std::env::var(constants::AZURE_COSMOS_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_COUNT_FOR_WRITES)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn circuit_breaker_timeout_counter_reset_window_mins(default: i64) -> i64 {
std::env::var(
constants::AZURE_COSMOS_CIRCUIT_BREAKER_TIMEOUT_COUNTER_RESET_WINDOW_IN_MINUTES,
)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
pub fn try_move_next_location(&mut self, locations: &[String], failed_location: &str) -> bool {
if failed_location != self.current {
return true;
}
let mut guard = self.failed_locations.lock().unwrap();
if failed_location != self.current {
return true;
}
for location in locations {
if self.current == *location {
continue;
}
if guard.contains_key(location) {
continue;
}
guard.insert(failed_location.to_string(), Instant::now());
self.current = location.clone();
return true;
}
false
}
pub fn can_circuit_breaker_trigger_partition_failover(
&self,
is_read_only_request: bool,
) -> bool {
let (read_count, write_count) = self.snapshot_consecutive_request_failure_count();
if is_read_only_request {
read_count > self.read_request_failure_counter_threshold
} else {
write_count > self.write_request_failure_counter_threshold
}
}
pub fn increment_request_failure_counts(
&self,
is_read_only_request: bool,
current_time: Instant,
) {
let (_, last_failure_time) = self.snapshot_partition_failover_timestamps();
if current_time.duration_since(last_failure_time) > self.timeout_counter_reset_window {
self.consecutive_read_request_failure_count
.store(0, Ordering::SeqCst);
self.consecutive_write_request_failure_count
.store(0, Ordering::SeqCst);
}
if is_read_only_request {
self.consecutive_read_request_failure_count
.fetch_add(1, Ordering::SeqCst);
} else {
self.consecutive_write_request_failure_count
.fetch_add(1, Ordering::SeqCst);
}
*self.last_request_failure_time.write().unwrap() = current_time;
}
pub fn snapshot_partition_failover_timestamps(&self) -> (Instant, Instant) {
(
self.first_request_failure_time,
*self.last_request_failure_time.read().unwrap(),
)
}
pub fn snapshot_consecutive_request_failure_count(&self) -> (i32, i32) {
(
self.consecutive_read_request_failure_count
.load(Ordering::SeqCst),
self.consecutive_write_request_failure_count
.load(Ordering::SeqCst),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cosmos_request::CosmosRequest;
use crate::models::AccountRegion;
use crate::operation_context::OperationType;
use crate::regions::Region;
use crate::resource_context::{ResourceLink, ResourceType};
use crate::routing::global_endpoint_manager::GlobalEndpointManager;
use crate::routing::partition_key_range::PartitionKeyRange;
use azure_core::http::Pipeline;
use std::sync::Arc;
use std::time::Instant;
fn create_test_pipeline() -> Pipeline {
Pipeline::new(
option_env!("CARGO_PKG_NAME"),
option_env!("CARGO_PKG_VERSION"),
azure_core::http::ClientOptions::default(),
Vec::new(),
Vec::new(),
None,
)
}
fn create_single_region_manager() -> Arc<GlobalEndpointManager> {
GlobalEndpointManager::new(
"https://test.documents.azure.com".parse().unwrap(),
vec![Region::from("West US")],
vec![],
create_test_pipeline(),
)
}
fn create_multi_region_manager() -> Arc<GlobalEndpointManager> {
let manager = GlobalEndpointManager::new(
"https://test.documents.azure.com".parse().unwrap(),
vec![Region::from("West US"), Region::from("East US")],
vec![],
create_test_pipeline(),
);
let west = AccountRegion {
name: Region::from("West US"),
database_account_endpoint: "https://test-westus.documents.azure.com".parse().unwrap(),
};
let east = AccountRegion {
name: Region::from("East US"),
database_account_endpoint: "https://test-eastus.documents.azure.com".parse().unwrap(),
};
manager.update_location_cache(vec![west.clone(), east.clone()], vec![west, east]);
manager
}
fn create_three_region_manager() -> Arc<GlobalEndpointManager> {
let manager = GlobalEndpointManager::new(
"https://test.documents.azure.com".parse().unwrap(),
vec![
Region::from("West US"),
Region::from("East US"),
Region::from("Central US"),
],
vec![],
create_test_pipeline(),
);
let west = AccountRegion {
name: Region::from("West US"),
database_account_endpoint: "https://test-westus.documents.azure.com".parse().unwrap(),
};
let east = AccountRegion {
name: Region::from("East US"),
database_account_endpoint: "https://test-eastus.documents.azure.com".parse().unwrap(),
};
let central = AccountRegion {
name: Region::from("Central US"),
database_account_endpoint: "https://test-centralus.documents.azure.com"
.parse()
.unwrap(),
};
manager.update_location_cache(
vec![west.clone(), east.clone(), central.clone()],
vec![west, east, central],
);
manager
}
fn create_single_master_multi_region_manager() -> Arc<GlobalEndpointManager> {
let manager = GlobalEndpointManager::new(
"https://test.documents.azure.com".parse().unwrap(),
vec![Region::from("West US"), Region::from("East US")],
vec![],
create_test_pipeline(),
);
let west = AccountRegion {
name: Region::from("West US"),
database_account_endpoint: "https://test-westus.documents.azure.com".parse().unwrap(),
};
let east = AccountRegion {
name: Region::from("East US"),
database_account_endpoint: "https://test-eastus.documents.azure.com".parse().unwrap(),
};
manager.update_location_cache(vec![west.clone()], vec![west, east]);
manager
}
fn create_read_request() -> CosmosRequest {
let resource_link = ResourceLink::root(ResourceType::Documents);
let mut request = CosmosRequest::builder(OperationType::Read, resource_link)
.build()
.unwrap();
request.request_context.location_endpoint_to_route =
Some("https://test-westus.documents.azure.com/".parse().unwrap());
request.request_context.resolved_partition_key_range =
Some(PartitionKeyRange::new("0".into(), "".into(), "FF".into()));
request.request_context.resolved_collection_rid = Some("dbs/db1/colls/coll1".into());
request
}
fn create_write_request() -> CosmosRequest {
let resource_link = ResourceLink::root(ResourceType::Documents);
let mut request = CosmosRequest::builder(OperationType::Create, resource_link)
.build()
.unwrap();
request.request_context.location_endpoint_to_route =
Some("https://test-westus.documents.azure.com/".parse().unwrap());
request.request_context.resolved_partition_key_range =
Some(PartitionKeyRange::new("0".into(), "".into(), "FF".into()));
request.request_context.resolved_collection_rid = Some("dbs/db1/colls/coll1".into());
request
}
fn create_stored_procedure_execute_request() -> CosmosRequest {
let resource_link = ResourceLink::root(ResourceType::StoredProcedures);
let mut request = CosmosRequest::builder(OperationType::Execute, resource_link)
.build()
.unwrap();
request.request_context.location_endpoint_to_route =
Some("https://test-westus.documents.azure.com/".parse().unwrap());
request.request_context.resolved_partition_key_range =
Some(PartitionKeyRange::new("0".into(), "".into(), "FF".into()));
request.request_context.resolved_collection_rid = Some("dbs/db1/colls/coll1".into());
request
}
fn create_database_request() -> CosmosRequest {
let resource_link = ResourceLink::root(ResourceType::Databases);
let mut request = CosmosRequest::builder(OperationType::Read, resource_link)
.build()
.unwrap();
request.request_context.location_endpoint_to_route =
Some("https://test-westus.documents.azure.com/".parse().unwrap());
request.request_context.resolved_partition_key_range =
Some(PartitionKeyRange::new("0".into(), "".into(), "FF".into()));
request.request_context.resolved_collection_rid = Some("dbs/db1/colls/coll1".into());
request
}
#[tokio::test]
async fn test_health_status_values() {
assert_eq!(PartitionHealthStatus::Healthy as i32, 100);
assert_eq!(PartitionHealthStatus::Unhealthy as i32, 200);
}
#[tokio::test]
async fn test_health_status_equality() {
assert_eq!(
PartitionHealthStatus::Healthy,
PartitionHealthStatus::Healthy
);
assert_ne!(
PartitionHealthStatus::Healthy,
PartitionHealthStatus::Unhealthy
);
}
#[tokio::test]
async fn test_failover_info_new_initializes_correctly() {
let info = PartitionKeyRangeFailoverInfo::new(
"rid1".to_string(),
"https://loc1.documents.azure.com/".to_string(),
);
assert_eq!(info.collection_rid, "rid1");
assert_eq!(info.current, "https://loc1.documents.azure.com/");
assert_eq!(
info.first_failed_location,
"https://loc1.documents.azure.com/"
);
let (read_count, write_count) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 0);
assert_eq!(write_count, 0);
}
#[tokio::test]
async fn test_failover_info_timestamps_initialized_to_now() {
let before = Instant::now();
let info = PartitionKeyRangeFailoverInfo::new("rid".into(), "https://loc.com/".into());
let after = Instant::now();
let (first, last) = info.snapshot_partition_failover_timestamps();
assert!(first >= before && first <= after);
assert!(last >= before && last <= after);
}
#[tokio::test]
async fn test_try_move_next_location_moves_to_first_available() {
let mut info =
PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let locations = vec![
"https://loc1.com/".to_string(),
"https://loc2.com/".to_string(),
"https://loc3.com/".to_string(),
];
let result = info.try_move_next_location(&locations, "https://loc1.com/");
assert!(result);
assert_eq!(info.current, "https://loc2.com/");
}
#[tokio::test]
async fn test_try_move_next_location_skips_current_location() {
let mut info =
PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let locations = vec!["https://loc1.com/".to_string()];
let result = info.try_move_next_location(&locations, "https://loc1.com/");
assert!(!result);
assert_eq!(info.current, "https://loc1.com/");
}
#[tokio::test]
async fn test_try_move_next_location_returns_true_if_already_moved() {
let mut info =
PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let locations = vec![
"https://loc1.com/".to_string(),
"https://loc2.com/".to_string(),
];
info.try_move_next_location(&locations, "https://loc1.com/");
assert_eq!(info.current, "https://loc2.com/");
let result = info.try_move_next_location(&locations, "https://loc1.com/");
assert!(result);
assert_eq!(info.current, "https://loc2.com/");
}
#[tokio::test]
async fn test_try_move_next_location_sequential_failover() {
let mut info =
PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let locations = vec![
"https://loc1.com/".to_string(),
"https://loc2.com/".to_string(),
"https://loc3.com/".to_string(),
];
assert!(info.try_move_next_location(&locations, "https://loc1.com/"));
assert_eq!(info.current, "https://loc2.com/");
assert!(info.try_move_next_location(&locations, "https://loc2.com/"));
assert_eq!(info.current, "https://loc3.com/");
assert!(!info.try_move_next_location(&locations, "https://loc3.com/"));
assert_eq!(info.current, "https://loc3.com/");
}
#[tokio::test]
async fn test_try_move_next_location_empty_locations() {
let mut info =
PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let result = info.try_move_next_location(&[], "https://loc1.com/");
assert!(!result);
}
#[tokio::test]
async fn test_can_circuit_breaker_trigger_reads_below_threshold() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
assert!(!info.can_circuit_breaker_trigger_partition_failover(true));
}
#[tokio::test]
async fn test_can_circuit_breaker_trigger_reads_at_threshold() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
info.consecutive_read_request_failure_count
.store(2, Ordering::SeqCst);
assert!(!info.can_circuit_breaker_trigger_partition_failover(true));
}
#[tokio::test]
async fn test_can_circuit_breaker_trigger_reads_above_threshold() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
info.consecutive_read_request_failure_count
.store(11, Ordering::SeqCst);
assert!(info.can_circuit_breaker_trigger_partition_failover(true));
}
#[tokio::test]
async fn test_can_circuit_breaker_trigger_writes_below_threshold() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
assert!(!info.can_circuit_breaker_trigger_partition_failover(false));
}
#[tokio::test]
async fn test_can_circuit_breaker_trigger_writes_at_threshold() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
info.consecutive_write_request_failure_count
.store(5, Ordering::SeqCst);
assert!(!info.can_circuit_breaker_trigger_partition_failover(false));
}
#[tokio::test]
async fn test_can_circuit_breaker_trigger_writes_above_threshold() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
info.consecutive_write_request_failure_count
.store(6, Ordering::SeqCst);
assert!(info.can_circuit_breaker_trigger_partition_failover(false));
}
#[tokio::test]
async fn test_can_circuit_breaker_read_count_does_not_affect_write_check() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
info.consecutive_read_request_failure_count
.store(100, Ordering::SeqCst);
assert!(!info.can_circuit_breaker_trigger_partition_failover(false));
info.consecutive_write_request_failure_count
.store(100, Ordering::SeqCst);
info.consecutive_read_request_failure_count
.store(0, Ordering::SeqCst);
assert!(!info.can_circuit_breaker_trigger_partition_failover(true));
}
#[tokio::test]
async fn test_increment_read_failure_count() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let now = Instant::now();
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(true, now);
let (read_count, write_count) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 3);
assert_eq!(write_count, 0);
}
#[tokio::test]
async fn test_increment_write_failure_count() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let now = Instant::now();
info.increment_request_failure_counts(false, now);
info.increment_request_failure_counts(false, now);
let (read_count, write_count) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 0);
assert_eq!(write_count, 2);
}
#[tokio::test]
async fn test_increment_mixed_read_and_write_failures() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let now = Instant::now();
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(false, now);
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(false, now);
info.increment_request_failure_counts(false, now);
let (read_count, write_count) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 2);
assert_eq!(write_count, 3);
}
#[tokio::test]
async fn test_increment_updates_last_failure_time() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let (_, time_before) = info.snapshot_partition_failover_timestamps();
std::thread::sleep(std::time::Duration::from_millis(10));
let later = Instant::now();
info.increment_request_failure_counts(true, later);
let (_, time_after) = info.snapshot_partition_failover_timestamps();
assert!(time_after > time_before);
}
#[tokio::test]
async fn test_increment_resets_counters_when_timeout_window_exceeded() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let now = Instant::now();
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(false, now);
let (read_count, write_count) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 2);
assert_eq!(write_count, 1);
let far_future = now + std::time::Duration::from_secs(400);
info.increment_request_failure_counts(true, far_future);
let (read_count, write_count) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 1);
assert_eq!(write_count, 0);
}
#[tokio::test]
async fn test_increment_does_not_reset_within_timeout_window() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let now = Instant::now();
info.increment_request_failure_counts(true, now);
info.increment_request_failure_counts(true, now);
let soon = now + std::time::Duration::from_secs(100);
info.increment_request_failure_counts(true, soon);
let (read_count, _) = info.snapshot_consecutive_request_failure_count();
assert_eq!(read_count, 3);
}
#[tokio::test]
async fn test_snapshot_consecutive_count_returns_current_values() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
info.consecutive_read_request_failure_count
.store(7, Ordering::SeqCst);
info.consecutive_write_request_failure_count
.store(3, Ordering::SeqCst);
let (r, w) = info.snapshot_consecutive_request_failure_count();
assert_eq!(r, 7);
assert_eq!(w, 3);
}
#[tokio::test]
async fn test_mark_endpoints_to_healthy_marks_all_as_healthy() {
let pk1 = PartitionKeyRange::new("0".into(), "".into(), "AA".into());
let pk2 = PartitionKeyRange::new("1".into(), "AA".into(), "FF".into());
let mut mappings = HashMap::new();
mappings.insert(
pk1.clone(),
(
"rid1".to_string(),
"https://loc1.com/".to_string(),
PartitionHealthStatus::Unhealthy,
),
);
mappings.insert(
pk2.clone(),
(
"rid2".to_string(),
"https://loc2.com/".to_string(),
PartitionHealthStatus::Unhealthy,
),
);
GlobalPartitionEndpointManager::mark_endpoints_to_healthy(&mut mappings);
assert_eq!(mappings[&pk1].2, PartitionHealthStatus::Healthy);
assert_eq!(mappings[&pk2].2, PartitionHealthStatus::Healthy);
}
#[tokio::test]
async fn test_mark_endpoints_to_healthy_empty_map() {
let mut mappings: HashMap<PartitionKeyRange, (String, String, PartitionHealthStatus)> =
HashMap::new();
GlobalPartitionEndpointManager::mark_endpoints_to_healthy(&mut mappings);
assert!(mappings.is_empty());
}
#[tokio::test]
async fn test_mark_endpoints_to_healthy_already_healthy() {
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let mut mappings = HashMap::new();
mappings.insert(
pk.clone(),
(
"rid1".to_string(),
"https://loc1.com/".to_string(),
PartitionHealthStatus::Healthy,
),
);
GlobalPartitionEndpointManager::mark_endpoints_to_healthy(&mut mappings);
assert_eq!(mappings[&pk].2, PartitionHealthStatus::Healthy);
}
#[tokio::test]
async fn test_new_both_flags_disabled() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
assert!(!manager.partition_level_automatic_failover_enabled());
assert!(!manager.partition_level_circuit_breaker_enabled());
assert!(!manager.partition_level_failover_enabled());
}
#[tokio::test]
async fn test_new_auto_failover_enabled_only() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
assert!(manager.partition_level_automatic_failover_enabled());
assert!(!manager.partition_level_circuit_breaker_enabled());
assert!(manager.partition_level_failover_enabled());
}
#[tokio::test]
async fn test_new_circuit_breaker_enabled_only() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
assert!(!manager.partition_level_automatic_failover_enabled());
assert!(manager.partition_level_circuit_breaker_enabled());
assert!(manager.partition_level_failover_enabled());
}
#[tokio::test]
async fn test_new_both_flags_enabled() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
assert!(manager.partition_level_automatic_failover_enabled());
assert!(manager.partition_level_circuit_breaker_enabled());
assert!(manager.partition_level_failover_enabled());
}
#[tokio::test]
async fn test_can_use_failover_locations_with_single_endpoint() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
let request = create_read_request();
assert!(!manager.can_use_partition_level_failover_locations(&request));
}
#[tokio::test]
async fn test_can_use_failover_locations_with_multiple_endpoints_documents() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
let request = create_read_request();
assert!(manager.can_use_partition_level_failover_locations(&request));
}
#[tokio::test]
async fn test_can_use_failover_locations_with_stored_procedure_execute() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
let request = create_stored_procedure_execute_request();
assert!(manager.can_use_partition_level_failover_locations(&request));
}
#[tokio::test]
async fn test_can_use_failover_locations_with_database_resource() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
let request = create_database_request();
assert!(!manager.can_use_partition_level_failover_locations(&request));
}
#[tokio::test]
async fn test_auto_failover_eligible_write_on_single_master() {
let gem = create_single_master_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let request = create_write_request();
assert!(manager.is_request_eligible_for_per_partition_automatic_failover(&request));
}
#[tokio::test]
async fn test_auto_failover_not_eligible_when_disabled() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
let request = create_write_request();
assert!(!manager.is_request_eligible_for_per_partition_automatic_failover(&request));
}
#[tokio::test]
async fn test_auto_failover_not_eligible_for_read_request() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let request = create_read_request();
assert!(!manager.is_request_eligible_for_per_partition_automatic_failover(&request));
}
#[tokio::test]
async fn test_circuit_breaker_eligible_for_read_request() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
assert!(manager.is_request_eligible_for_partition_level_circuit_breaker(&request));
}
#[tokio::test]
async fn test_circuit_breaker_not_eligible_when_disabled() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
let request = create_read_request();
assert!(!manager.is_request_eligible_for_partition_level_circuit_breaker(&request));
}
#[tokio::test]
async fn test_circuit_breaker_not_eligible_write_on_single_master() {
let gem = create_single_master_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_write_request();
assert!(!manager.is_request_eligible_for_partition_level_circuit_breaker(&request));
}
#[tokio::test]
async fn test_partition_failover_returns_none_when_both_disabled() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
let request = create_read_request();
assert!(manager
.is_request_eligible_for_partition_failover(&request, false)
.is_none());
}
#[tokio::test]
async fn test_partition_failover_returns_some_when_eligible() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
let result = manager.is_request_eligible_for_partition_failover(&request, false);
assert!(result.is_some());
let (pk_range, failed_loc) = result.unwrap();
assert_eq!(pk_range.id, "0");
assert!(failed_loc.is_none()); }
#[tokio::test]
async fn test_partition_failover_validates_failed_location() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
let result = manager.is_request_eligible_for_partition_failover(&request, true);
assert!(result.is_some());
let (pk_range, failed_loc) = result.unwrap();
assert_eq!(pk_range.id, "0");
assert!(failed_loc.is_some());
assert_eq!(
failed_loc.unwrap().as_str(),
"https://test-westus.documents.azure.com/"
);
}
#[tokio::test]
async fn test_partition_failover_returns_none_without_partition_key_range() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let mut request = create_read_request();
request.request_context.resolved_partition_key_range = None;
assert!(manager
.is_request_eligible_for_partition_failover(&request, false)
.is_none());
}
#[tokio::test]
async fn test_partition_failover_returns_none_for_ineligible_resource_type() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_database_request();
assert!(manager
.is_request_eligible_for_partition_failover(&request, false)
.is_none());
}
#[tokio::test]
async fn test_partition_failover_returns_none_without_failed_location() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let mut request = create_read_request();
request.request_context.location_endpoint_to_route = None;
assert!(manager
.is_request_eligible_for_partition_failover(&request, true)
.is_none());
}
#[tokio::test]
async fn test_mark_endpoint_unavailable_circuit_breaker_path() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
let result = manager.try_mark_endpoint_unavailable_for_partition_key_range(&request);
assert!(result);
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
assert!(guard.contains_key(&pk));
let failover_info = guard.get(&pk).unwrap();
assert_ne!(
failover_info.current,
"https://test-westus.documents.azure.com/"
);
}
#[tokio::test]
async fn test_mark_endpoint_unavailable_auto_failover_path() {
let gem = create_single_master_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let request = create_write_request();
let result = manager.try_mark_endpoint_unavailable_for_partition_key_range(&request);
assert!(result);
let guard = manager
.partition_key_range_to_location_for_write
.read()
.unwrap();
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
assert!(guard.contains_key(&pk));
}
#[tokio::test]
async fn test_mark_endpoint_unavailable_returns_false_when_disabled() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
let request = create_read_request();
assert!(!manager.try_mark_endpoint_unavailable_for_partition_key_range(&request));
}
#[tokio::test]
async fn test_mark_endpoint_unavailable_returns_false_without_failed_location() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let mut request = create_read_request();
request.request_context.location_endpoint_to_route = None;
assert!(!manager.try_mark_endpoint_unavailable_for_partition_key_range(&request));
}
#[tokio::test]
async fn test_mark_endpoint_unavailable_sequential_failover_removes_on_exhaust() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let request = create_read_request();
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request));
{
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
let info = guard.get(&pk).unwrap();
assert_eq!(info.current, "https://test-eastus.documents.azure.com/");
}
let mut request2 = create_read_request();
request2.request_context.location_endpoint_to_route =
Some("https://test-eastus.documents.azure.com/".parse().unwrap());
let result = manager.try_mark_endpoint_unavailable_for_partition_key_range(&request2);
assert!(!result);
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
assert!(!guard.contains_key(&pk));
}
#[tokio::test]
async fn test_add_override_returns_false_when_no_override_exists() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let mut request = create_read_request();
assert!(!manager.try_add_partition_level_location_override(&mut request));
}
#[tokio::test]
async fn test_add_override_routes_to_override_location() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
manager.try_mark_endpoint_unavailable_for_partition_key_range(&request);
{
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let info = guard.get(&pk).unwrap();
info.consecutive_read_request_failure_count
.store(11, Ordering::SeqCst);
}
let mut request2 = create_read_request();
let result = manager.try_add_partition_level_location_override(&mut request2);
assert!(result);
assert_eq!(
request2
.request_context
.location_endpoint_to_route
.unwrap()
.as_str(),
"https://test-eastus.documents.azure.com/"
);
}
#[tokio::test]
async fn test_add_override_returns_false_when_disabled() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
let mut request = create_read_request();
assert!(!manager.try_add_partition_level_location_override(&mut request));
}
#[tokio::test]
async fn test_add_override_circuit_breaker_below_threshold_returns_false() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
manager.try_mark_endpoint_unavailable_for_partition_key_range(&request);
let mut request2 = create_read_request();
let result = manager.try_add_partition_level_location_override(&mut request2);
assert!(!result);
}
#[tokio::test]
async fn test_add_override_auto_failover_path() {
let gem = create_single_master_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let request = create_write_request();
manager.try_mark_endpoint_unavailable_for_partition_key_range(&request);
let mut request2 = create_write_request();
let result = manager.try_add_partition_level_location_override(&mut request2);
assert!(result);
}
#[tokio::test]
async fn test_increment_failure_counter_returns_false_when_disabled() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
let request = create_read_request();
assert!(!manager
.increment_request_failure_counter_and_check_if_partition_can_failover(&request));
}
#[tokio::test]
async fn test_increment_failure_counter_creates_entry_on_first_call() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
let _ =
manager.increment_request_failure_counter_and_check_if_partition_can_failover(&request);
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
assert!(guard.contains_key(&pk));
}
#[tokio::test]
async fn test_increment_failure_counter_below_threshold_returns_false() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
let result =
manager.increment_request_failure_counter_and_check_if_partition_can_failover(&request);
assert!(!result);
}
#[tokio::test]
async fn test_increment_failure_counter_above_threshold_returns_true() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
for _ in 0..11 {
manager.increment_request_failure_counter_and_check_if_partition_can_failover(&request);
}
let result =
manager.increment_request_failure_counter_and_check_if_partition_can_failover(&request);
assert!(result);
}
#[tokio::test]
async fn test_increment_failure_counter_auto_failover_path_for_writes() {
let gem = create_single_master_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let request = create_write_request();
let _ =
manager.increment_request_failure_counter_and_check_if_partition_can_failover(&request);
let guard = manager
.partition_key_range_to_location_for_write
.read()
.unwrap();
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
assert!(guard.contains_key(&pk));
let guard2 = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
assert!(!guard2.contains_key(&pk));
}
#[tokio::test]
async fn test_add_or_update_moves_to_next_location() {
let gem = create_three_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let request = create_read_request();
let map = Arc::new(RwLock::new(HashMap::new()));
let next_locations = vec![
"https://test-westus.documents.azure.com/".to_string(),
"https://test-eastus.documents.azure.com/".to_string(),
"https://test-centralus.documents.azure.com/".to_string(),
];
let result = manager.try_add_or_update_partition_failover_info_and_move_to_next_location(
&pk,
"https://test-westus.documents.azure.com/",
&next_locations,
&request,
&map,
);
assert!(result);
let guard = map.read().unwrap();
let info = guard.get(&pk).unwrap();
assert_eq!(info.current, "https://test-eastus.documents.azure.com/");
}
#[tokio::test]
async fn test_add_or_update_removes_on_all_exhausted() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let request = create_read_request();
let map = Arc::new(RwLock::new(HashMap::new()));
let next_locations = vec![
"https://test-westus.documents.azure.com/".to_string(),
"https://test-eastus.documents.azure.com/".to_string(),
];
let result = manager.try_add_or_update_partition_failover_info_and_move_to_next_location(
&pk,
"https://test-westus.documents.azure.com/",
&next_locations,
&request,
&map,
);
assert!(result);
let result2 = manager.try_add_or_update_partition_failover_info_and_move_to_next_location(
&pk,
"https://test-eastus.documents.azure.com/",
&next_locations,
&request,
&map,
);
assert!(!result2);
let guard = map.read().unwrap();
assert!(!guard.contains_key(&pk));
}
#[tokio::test]
async fn test_different_partition_key_ranges_tracked_independently() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let mut request1 = create_read_request();
request1.request_context.resolved_partition_key_range =
Some(PartitionKeyRange::new("0".into(), "".into(), "AA".into()));
let mut request2 = create_read_request();
request2.request_context.resolved_partition_key_range =
Some(PartitionKeyRange::new("1".into(), "AA".into(), "FF".into()));
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request1));
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request2));
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
assert_eq!(guard.len(), 2);
}
#[tokio::test]
async fn test_debug_formatting() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let debug_str = format!("{:?}", manager);
assert!(debug_str.contains("GlobalPartitionEndpointManager"));
assert!(debug_str.contains("partition_level_automatic_failover_enabled"));
assert!(debug_str.contains("partition_level_circuit_breaker_enabled"));
}
#[tokio::test]
async fn test_failover_info_debug_formatting() {
let info = PartitionKeyRangeFailoverInfo::new("rid1".into(), "https://loc1.com/".into());
let debug_str = format!("{:?}", info);
assert!(debug_str.contains("PartitionKeyRangeFailoverInfo"));
assert!(debug_str.contains("rid1"));
}
#[tokio::test]
async fn test_three_region_sequential_failover() {
let gem = create_three_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let request1 = create_read_request();
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request1));
{
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
assert_eq!(
guard.get(&pk).unwrap().current,
"https://test-eastus.documents.azure.com/"
);
}
let mut request2 = create_read_request();
request2.request_context.location_endpoint_to_route =
Some("https://test-eastus.documents.azure.com/".parse().unwrap());
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request2));
{
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
assert_eq!(
guard.get(&pk).unwrap().current,
"https://test-centralus.documents.azure.com/"
);
}
let mut request3 = create_read_request();
request3.request_context.location_endpoint_to_route = Some(
"https://test-centralus.documents.azure.com/"
.parse()
.unwrap(),
);
assert!(!manager.try_mark_endpoint_unavailable_for_partition_key_range(&request3));
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
assert!(!guard.contains_key(&pk));
}
#[tokio::test]
async fn test_background_init_flag_set_on_construction() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
assert!(manager
.background_connection_init_active
.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_second_background_init_is_noop() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
assert!(manager
.background_connection_init_active
.load(Ordering::SeqCst));
manager.initialize_and_start_circuit_breaker_failback_background_refresh();
assert!(manager
.background_connection_init_active
.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_end_to_end_failover_and_override_routing() {
let gem = create_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, true);
let request = create_read_request();
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request));
{
let guard = manager
.partition_key_range_to_location_for_read_and_write
.read()
.unwrap();
let pk = PartitionKeyRange::new("0".into(), "".into(), "FF".into());
let info = guard.get(&pk).unwrap();
info.consecutive_read_request_failure_count
.store(11, Ordering::SeqCst);
}
let mut new_request = create_read_request();
assert!(manager.try_add_partition_level_location_override(&mut new_request));
assert_eq!(
new_request
.request_context
.location_endpoint_to_route
.unwrap()
.as_str(),
"https://test-eastus.documents.azure.com/"
);
}
#[tokio::test]
async fn test_end_to_end_auto_failover_write_request() {
let gem = create_single_master_multi_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, false);
let request = create_write_request();
assert!(manager.try_mark_endpoint_unavailable_for_partition_key_range(&request));
let mut new_request = create_write_request();
assert!(manager.try_add_partition_level_location_override(&mut new_request));
assert_eq!(
new_request
.request_context
.location_endpoint_to_route
.unwrap()
.as_str(),
"https://test-eastus.documents.azure.com/"
);
}
#[tokio::test]
async fn test_configure_partition_level_automatic_failover_toggles_flag() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
assert!(!manager.partition_level_automatic_failover_enabled());
manager.configure_partition_level_automatic_failover(true);
assert!(manager.partition_level_automatic_failover_enabled());
manager.configure_partition_level_automatic_failover(false);
assert!(!manager.partition_level_automatic_failover_enabled());
}
#[tokio::test]
async fn test_configure_per_partition_circuit_breaker_toggles_flag() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, false, false);
assert!(!manager.partition_level_circuit_breaker_enabled());
manager.configure_per_partition_circuit_breaker(true);
assert!(manager.partition_level_circuit_breaker_enabled());
manager.configure_per_partition_circuit_breaker(false);
assert!(!manager.partition_level_circuit_breaker_enabled());
}
#[tokio::test]
async fn test_configure_idempotent_same_value() {
let gem = create_single_region_manager();
let manager = GlobalPartitionEndpointManager::new(gem, true, true);
assert!(manager.partition_level_automatic_failover_enabled());
manager.configure_partition_level_automatic_failover(true);
assert!(manager.partition_level_automatic_failover_enabled());
assert!(manager.partition_level_circuit_breaker_enabled());
manager.configure_per_partition_circuit_breaker(true);
assert!(manager.partition_level_circuit_breaker_enabled());
}
}