reqx 0.1.35

Rust HTTP transport client for API SDK libraries with retry, timeout, idempotency, proxy, and pluggable TLS backends
Documentation
use std::io::Read;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};

use bytes::Bytes;
use http::{HeaderMap, Uri};

use crate::extensions::{BackoffSource, BodyCodec, Clock, EndpointSelector, OtelPathNormalizer};
use crate::metrics::ClientMetrics;
use crate::observe::Observer;
use crate::policy::{Interceptor, RedirectPolicy, StatusPolicy};
use crate::proxy::{NoProxyRule, ProxyConfig};
use crate::rate_limit::{RateLimitPolicy, RateLimiter, ServerThrottleScope};
use crate::resilience::{
    AdaptiveConcurrencyPolicy, AdaptiveConcurrencyState, CircuitBreaker, CircuitBreakerPolicy,
    RetryBudget, RetryBudgetPolicy,
};
use crate::retry::{RetryEligibility, RetryPolicy};
use crate::tls::{TlsBackend, TlsOptions};
use crate::util::lock_unpoisoned;

mod builder;
mod execute;
pub(crate) mod limiters;
mod request;
mod transport;

use crate::blocking_client::limiters::RequestLimiters;

pub use request::RequestBuilder;

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(90);
const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 8;
const DEFAULT_POOL_MAX_IDLE_CONNECTIONS: usize = 16;
const DEFAULT_CLIENT_NAME: &str = "reqx";
const DEFAULT_MAX_RESPONSE_BODY_BYTES: usize = 8 * 1024 * 1024;

struct AdaptiveConcurrencyController {
    policy: AdaptiveConcurrencyPolicy,
    state: Mutex<AdaptiveConcurrencyState>,
    clock: Arc<dyn Clock>,
    condvar: Condvar,
}

impl AdaptiveConcurrencyController {
    fn new(policy: AdaptiveConcurrencyPolicy, clock: Arc<dyn Clock>) -> Self {
        Self {
            policy,
            state: Mutex::new(AdaptiveConcurrencyState::new(policy)),
            clock,
            condvar: Condvar::new(),
        }
    }

    fn acquire(
        self: &Arc<Self>,
        deadline_at: Option<Instant>,
    ) -> Option<AdaptiveConcurrencyPermit> {
        let mut state = lock_unpoisoned(&self.state);
        while !state.try_acquire() {
            state = match deadline_at {
                Some(deadline_at) => {
                    let now = Instant::now();
                    if now >= deadline_at {
                        return None;
                    }
                    let wait_for = deadline_at.duration_since(now);
                    let (next_state, wait_result) = match self.condvar.wait_timeout(state, wait_for)
                    {
                        Ok(result) => result,
                        Err(poisoned) => poisoned.into_inner(),
                    };
                    if wait_result.timed_out() && Instant::now() >= deadline_at {
                        return None;
                    }
                    next_state
                }
                None => match self.condvar.wait(state) {
                    Ok(guard) => guard,
                    Err(poisoned) => poisoned.into_inner(),
                },
            };
        }
        drop(state);
        Some(AdaptiveConcurrencyPermit {
            controller: Arc::clone(self),
            started_at: self.clock.now_monotonic(),
            completed: false,
        })
    }

    fn release_and_record(&self, success: bool, latency: Duration) {
        let mut state = lock_unpoisoned(&self.state);
        state.release_and_record(self.policy, success, latency);
        self.condvar.notify_all();
    }

    fn release_without_record(&self) {
        let mut state = lock_unpoisoned(&self.state);
        state.release_without_record();
        self.condvar.notify_all();
    }
}

struct AdaptiveConcurrencyPermit {
    controller: Arc<AdaptiveConcurrencyController>,
    started_at: Instant,
    completed: bool,
}

impl AdaptiveConcurrencyPermit {
    fn latency(&self) -> Duration {
        self.controller
            .clock
            .now_monotonic()
            .saturating_duration_since(self.started_at)
    }

    fn mark_success(mut self) {
        self.controller.release_and_record(true, self.latency());
        self.completed = true;
    }

    fn mark_failure(mut self) {
        self.controller.release_and_record(false, self.latency());
        self.completed = true;
    }

    fn cancel(mut self) {
        self.controller.release_without_record();
        self.completed = true;
    }
}

impl Drop for AdaptiveConcurrencyPermit {
    fn drop(&mut self) {
        if !self.completed {
            self.controller.release_and_record(false, self.latency());
            self.completed = true;
        }
    }
}

struct RequestExecutionOptions {
    request_timeout: Option<Duration>,
    total_timeout: Option<Duration>,
    retry_policy: Option<RetryPolicy>,
    max_response_body_bytes: Option<usize>,
    redirect_policy: Option<RedirectPolicy>,
    status_policy: Option<StatusPolicy>,
    auto_accept_encoding: Option<bool>,
}

enum RequestBody {
    Buffered(Bytes),
    Reader(Box<dyn Read + Send>),
}

/// Builds a blocking [`Client`] with transport, timeout, retry, TLS, and
/// observability settings.
#[cfg_attr(
    docsrs,
    doc(cfg(any(
        feature = "blocking-tls-rustls-ring",
        feature = "blocking-tls-rustls-aws-lc-rs",
        feature = "blocking-tls-native"
    )))
)]
pub struct ClientBuilder {
    base_url: String,
    default_headers: HeaderMap,
    buffered_auto_accept_encoding: bool,
    stream_auto_accept_encoding: bool,
    request_timeout: Duration,
    total_timeout: Option<Duration>,
    stream_deadline_slack: Duration,
    max_response_body_bytes: usize,
    connect_timeout: Duration,
    pool_idle_timeout: Duration,
    pool_max_idle_per_host: usize,
    pool_max_idle_connections: usize,
    http_proxy: Option<Uri>,
    proxy_authorization: Option<http::HeaderValue>,
    no_proxy_rules: Vec<NoProxyRule>,
    invalid_no_proxy_rules: Vec<String>,
    retry_policy: RetryPolicy,
    retry_eligibility: Arc<dyn RetryEligibility>,
    retry_budget_policy: Option<RetryBudgetPolicy>,
    circuit_breaker_policy: Option<CircuitBreakerPolicy>,
    adaptive_concurrency_policy: Option<AdaptiveConcurrencyPolicy>,
    global_rate_limit_policy: Option<RateLimitPolicy>,
    per_host_rate_limit_policy: Option<RateLimitPolicy>,
    server_throttle_scope: ServerThrottleScope,
    redirect_policy: RedirectPolicy,
    default_status_policy: StatusPolicy,
    tls_backend: TlsBackend,
    tls_options: TlsOptions,
    endpoint_selector: Arc<dyn EndpointSelector>,
    body_codec: Arc<dyn BodyCodec>,
    clock: Arc<dyn Clock>,
    backoff_source: Arc<dyn BackoffSource>,
    client_name: String,
    max_in_flight: Option<usize>,
    max_in_flight_per_host: Option<usize>,
    metrics_enabled: bool,
    otel_enabled: bool,
    otel_path_normalizer: Arc<dyn OtelPathNormalizer>,
    interceptors: Vec<Arc<dyn Interceptor>>,
    observers: Vec<Arc<dyn Observer>>,
}

/// Reusable blocking HTTP client for SDK transports.
#[cfg_attr(
    docsrs,
    doc(cfg(any(
        feature = "blocking-tls-rustls-ring",
        feature = "blocking-tls-rustls-aws-lc-rs",
        feature = "blocking-tls-native"
    )))
)]
pub struct Client {
    base_url: String,
    default_headers: HeaderMap,
    buffered_auto_accept_encoding: bool,
    stream_auto_accept_encoding: bool,
    request_timeout: Duration,
    total_timeout: Option<Duration>,
    stream_deadline_slack: Duration,
    max_response_body_bytes: usize,
    retry_policy: RetryPolicy,
    retry_eligibility: Arc<dyn RetryEligibility>,
    retry_budget: Option<Arc<RetryBudget>>,
    circuit_breaker: Option<Arc<CircuitBreaker>>,
    adaptive_concurrency: Option<Arc<AdaptiveConcurrencyController>>,
    rate_limiter: Option<Arc<RateLimiter>>,
    server_throttle_scope: ServerThrottleScope,
    redirect_policy: RedirectPolicy,
    default_status_policy: StatusPolicy,
    tls_backend: TlsBackend,
    transport: transport::TransportAgents,
    proxy_config: Option<ProxyConfig>,
    endpoint_selector: Arc<dyn EndpointSelector>,
    body_codec: Arc<dyn BodyCodec>,
    clock: Arc<dyn Clock>,
    backoff_source: Arc<dyn BackoffSource>,
    connect_timeout: Duration,
    request_limiters: Option<RequestLimiters>,
    metrics: ClientMetrics,
    interceptors: Vec<Arc<dyn Interceptor>>,
    observers: Vec<Arc<dyn Observer>>,
}