use crate::cosmos_request::CosmosRequest;
use crate::retry_policies::client_retry_policy::ClientRetryPolicy;
use crate::retry_policies::metadata_request_retry_policy::MetadataRequestRetryPolicy;
use crate::retry_policies::{RetryPolicy, RetryResult};
use crate::routing::global_endpoint_manager::GlobalEndpointManager;
use crate::routing::global_partition_endpoint_manager::GlobalPartitionEndpointManager;
use async_trait::async_trait;
use azure_core::{async_runtime::get_async_runtime, http::RawResponse};
use std::sync::Arc;
use tracing::debug;
#[allow(dead_code)]
#[async_trait]
pub trait RetryHandler: Send + Sync {
async fn send<Sender, Fut>(
&self,
request: &mut CosmosRequest,
sender: Sender,
) -> azure_core::Result<RawResponse>
where
Sender: Fn(&mut CosmosRequest) -> Fut + Send + Sync,
Fut: std::future::Future<Output = azure_core::Result<RawResponse>> + Send;
}
#[derive(Debug, Clone)]
pub(crate) struct BackOffRetryHandler {
global_endpoint_manager: Arc<GlobalEndpointManager>,
global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
}
impl BackOffRetryHandler {
pub fn retry_policy_for_request(&self, request: &CosmosRequest) -> RetryPolicy {
if request.resource_type.is_meta_data() {
RetryPolicy::Metadata(MetadataRequestRetryPolicy::new(
self.global_endpoint_manager.clone(),
))
} else {
RetryPolicy::Client(Box::from(ClientRetryPolicy::new(
self.global_endpoint_manager.clone(),
self.global_partition_endpoint_manager.clone(),
request.excluded_regions.clone().map(|e| e.0),
)))
}
}
pub fn new(
global_endpoint_manager: Arc<GlobalEndpointManager>,
global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
) -> Self {
Self {
global_endpoint_manager,
global_partition_endpoint_manager,
}
}
}
#[async_trait]
impl RetryHandler for BackOffRetryHandler {
async fn send<Sender, Fut>(
&self,
request: &mut CosmosRequest,
sender: Sender,
) -> azure_core::Result<RawResponse>
where
Sender: Fn(&mut CosmosRequest) -> Fut + Send + Sync,
Fut: std::future::Future<Output = azure_core::Result<RawResponse>> + Send,
{
let mut retry_policy = self.retry_policy_for_request(request);
loop {
retry_policy.before_send_request(request).await;
debug!(
target: "azure_data_cosmos::retry_handler",
"Sending request - endpoint: {:?}, region: {:?}, operation: {:?}, resource: {:?}",
request.request_context.location_endpoint_to_route,
request.request_context.region_name,
request.operation_type,
request.resource_type
);
let result = sender(request).await;
let retry_result = retry_policy.should_retry(&result).await;
match retry_result {
RetryResult::DoNotRetry => return result,
RetryResult::Retry { after } => get_async_runtime().sleep(after).await,
}
}
}
}