1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
#[cfg(not(target_arch = "wasm32"))]
use crate::policies::TransportPolicy;
use crate::policies::{CustomHeadersPolicy, Policy, TelemetryPolicy};
use crate::{ClientOptions, Context, HttpClient, Request, Response};
use std::sync::Arc;
/// Execution pipeline.
///
/// A pipeline follows a precise flow:
///
/// 1. Client library-specified per-call policies are executed. Per-call policies can fail and bail out of the pipeline
/// immediately.
/// 2. User-specified per-call policies are executed.
/// 3. Telemetry policy.
/// 4. Retry policy. It allows to re-execute the following policies.
/// 5. Client library-specified per-retry policies. Per-retry polices are always executed at least once but are re-executed
/// in case of retries.
/// 6. User-specified per-retry policies are executed.
/// 7. Authorization policy. Authorization can depend on the HTTP headers and/or the request body so it
/// must be executed right before sending the request to the transport. Also, the authorization
/// can depend on the current time so it must be executed at every retry.
/// 8. Transport policy. Transport policy is always the last policy and is the policy that
/// actually constructs the `Response` to be passed up the pipeline.
///
/// A pipeline is immutable. In other words a policy can either succeed and call the following
/// policy of fail and return to the calling policy. Arbitrary policy "skip" must be avoided (but
/// cannot be enforced by code). All policies except Transport policy can assume there is another following policy (so
/// `self.pipeline[0]` is always valid).
///
/// The `C` generic contains the pipeline-specific context. Different crates can pass
/// different contexts using this generic. This way each crate can have its own specific pipeline
/// context. For example, in CosmosDB, the generic carries the operation-specific information used by
/// the authorization policy.
#[derive(Debug, Clone)]
pub struct Pipeline {
http_client: Arc<dyn HttpClient>,
pipeline: Vec<Arc<dyn Policy>>,
}
impl Pipeline {
/// Creates a new pipeline given the client library crate name and version,
/// alone with user-specified and client library-specified policies.
///
/// Crates can simply pass `option_env!("CARGO_PKG_NAME")` and `option_env!("CARGO_PKG_VERSION")` for the
/// `crate_name` and `crate_version` arguments respectively.
pub fn new(
crate_name: Option<&'static str>,
crate_version: Option<&'static str>,
options: ClientOptions,
per_call_policies: Vec<Arc<dyn Policy>>,
per_retry_policies: Vec<Arc<dyn Policy>>,
) -> Self {
let mut pipeline: Vec<Arc<dyn Policy>> = Vec::with_capacity(
options.per_call_policies.len()
+ per_call_policies.len()
+ options.per_retry_policies.len()
+ per_retry_policies.len()
+ 3,
);
pipeline.extend_from_slice(&per_call_policies);
pipeline.extend_from_slice(&options.per_call_policies);
let telemetry_policy = TelemetryPolicy::new(crate_name, crate_version, &options.telemetry);
pipeline.push(Arc::new(telemetry_policy));
pipeline.push(Arc::new(CustomHeadersPolicy::default()));
let retry_policy = options.retry.to_policy();
pipeline.push(retry_policy);
pipeline.extend_from_slice(&per_retry_policies);
pipeline.extend_from_slice(&options.per_retry_policies);
let http_client = options.transport.http_client.clone();
// TODO: Add transport policy for WASM once https://github.com/Azure/azure-sdk-for-rust/issues/293 is resolved.
#[cfg(not(target_arch = "wasm32"))]
{
#[allow(unused_mut)]
let mut policy: Arc<dyn Policy> =
Arc::new(TransportPolicy::new(options.transport.clone()));
#[cfg(feature = "mock_transport_framework")]
crate::mock::set_mock_transport_policy(&mut policy, options.transport);
pipeline.push(policy);
}
Self {
http_client,
pipeline,
}
}
/// Gets the `HttpClient` used by the pipeline.
pub fn http_client(&self) -> &dyn HttpClient {
// TODO: Request methods should be defined directly on the pipeline instead of exposing the HttpClient.
self.http_client.as_ref()
}
pub fn replace_policy(&mut self, policy: Arc<dyn Policy>, position: usize) -> Arc<dyn Policy> {
std::mem::replace(&mut self.pipeline[position], policy)
}
pub fn policies(&self) -> &[Arc<dyn Policy>] {
&self.pipeline
}
pub async fn send(
&self,
ctx: &mut Context,
request: &mut Request,
) -> crate::error::Result<Response> {
let res = self.pipeline[0]
.send(ctx, request, &self.pipeline[1..])
.await?;
Ok(res)
}
}