mod authorization_policy;
mod cosmos_headers_policy;
mod signature_target;
use crate::cosmos_request::CosmosRequest;
use crate::handler::retry_handler::{BackOffRetryHandler, RetryHandler};
use crate::models::CosmosResponse;
use crate::resource_context::ResourceLink;
use crate::routing::global_endpoint_manager::GlobalEndpointManager;
use crate::routing::global_partition_endpoint_manager::GlobalPartitionEndpointManager;
use crate::CosmosClientOptions;
pub(crate) use authorization_policy::AuthorizationPolicy;
use azure_core::error::CheckSuccessOptions;
use azure_core::http::{response::Response, Context, PipelineSendOptions, RawResponse};
pub(crate) use cosmos_headers_policy::CosmosHeadersPolicy;
use std::sync::Arc;
use url::Url;
const SUCCESS_CODES: [u16; 101] = {
let mut codes = [0u16; 101];
let mut i = 0;
while i < 100 {
codes[i] = 200 + i as u16;
i += 1;
}
codes[100] = 304;
codes
};
#[derive(Debug, Clone)]
pub(crate) struct GatewayPipeline {
pub endpoint: Url,
pipeline: azure_core::http::Pipeline,
retry_handler: BackOffRetryHandler,
options: CosmosClientOptions,
#[allow(dead_code)]
pub fault_injection_enabled: bool,
}
impl GatewayPipeline {
pub fn new(
endpoint: Url,
pipeline: azure_core::http::Pipeline,
global_endpoint_manager: Arc<GlobalEndpointManager>,
global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
options: CosmosClientOptions,
fault_injection_enabled: bool,
) -> Self {
let retry_handler =
BackOffRetryHandler::new(global_endpoint_manager, global_partition_endpoint_manager);
GatewayPipeline {
endpoint,
pipeline,
retry_handler,
options,
fault_injection_enabled,
}
}
#[allow(dead_code)]
pub(crate) fn url(&self, link: &ResourceLink) -> Url {
link.url(&self.endpoint)
}
pub async fn send<T>(
&self,
mut cosmos_request: CosmosRequest,
context: Context<'_>,
) -> azure_core::Result<CosmosResponse<T>> {
self.options.apply_headers(&mut cosmos_request.headers);
let sender = |req: &mut CosmosRequest| {
let pipeline = self.pipeline.clone();
let ctx = context.clone();
let success_options = CheckSuccessOptions {
success_codes: &SUCCESS_CODES,
};
let pipeline_send_options = PipelineSendOptions {
skip_checks: false,
check_success: success_options,
};
let resource_link = req.resource_link.clone();
let mut raw_req = req.clone().into_raw_request();
async move {
let ctx_owned = ctx.with_value(resource_link).into_owned();
pipeline
.send(&ctx_owned, &mut raw_req, Some(pipeline_send_options))
.await
}
};
#[cfg(feature = "fault_injection")]
if self.fault_injection_enabled {
cosmos_request.add_fault_injection_headers();
}
let raw_response: RawResponse =
self.retry_handler.send(&mut cosmos_request, sender).await?;
let typed_response: Response<T> = raw_response.into();
Ok(CosmosResponse::new(typed_response, cosmos_request))
}
}