reqx 0.1.35

Rust HTTP transport client for API SDK libraries with retry, timeout, idempotency, proxy, and pluggable TLS backends
Documentation
use bytes::Bytes;
#[cfg(any(feature = "_async", feature = "_blocking"))]
use http::header::{CONTENT_ENCODING, CONTENT_LENGTH};
use http::{HeaderMap, Method, StatusCode};
use serde::de::DeserializeOwned;
#[cfg(any(feature = "_async", feature = "_blocking"))]
use std::io;
#[cfg(any(feature = "_async", feature = "_blocking"))]
use std::time::{Duration, Instant};

#[cfg(any(feature = "_async", feature = "_blocking"))]
use crate::content_encoding::DecodeContentEncodingError;
use crate::error::Error;
#[cfg(any(feature = "_async", feature = "_blocking"))]
use crate::metrics::StreamCompletion;
use crate::util::truncate_body;

#[derive(Clone, Debug)]
/// Fully buffered HTTP response body and metadata.
pub struct Response {
    status: StatusCode,
    headers: HeaderMap,
    body: Bytes,
}

impl Response {
    pub(crate) fn new(status: StatusCode, headers: HeaderMap, body: Bytes) -> Self {
        Self {
            status,
            headers,
            body,
        }
    }

    /// Returns the HTTP status code.
    pub fn status(&self) -> StatusCode {
        self.status
    }

    /// Returns the response headers.
    pub fn headers(&self) -> &HeaderMap {
        &self.headers
    }

    /// Returns the raw response body bytes.
    pub fn body(&self) -> &Bytes {
        &self.body
    }

    /// Interprets the buffered body as UTF-8 text.
    pub fn text(&self) -> crate::Result<&str> {
        std::str::from_utf8(&self.body).map_err(|source| Error::DecodeText {
            source,
            body: truncate_body(&self.body),
        })
    }

    /// Decodes the buffered body as lossy UTF-8 text.
    pub fn text_lossy(&self) -> String {
        String::from_utf8_lossy(&self.body).into_owned()
    }

    /// Deserializes the buffered body from JSON.
    pub fn json<T>(&self) -> crate::Result<T>
    where
        T: DeserializeOwned,
    {
        serde_json::from_slice(&self.body).map_err(|source| Error::DeserializeJson {
            source,
            body: truncate_body(&self.body),
        })
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(crate) trait StreamOutcomeHooks {
    fn complete_success(&mut self);

    fn complete_error(&mut self, error: &Error);

    fn complete_canceled(&mut self);
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(crate) const DEFAULT_STREAM_DEADLINE_SLACK: Duration = Duration::from_millis(10);

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(crate) fn deadline_elapsed(deadline_at: Instant, now: Instant) -> bool {
    now >= deadline_at
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(crate) fn deadline_limits_wait(
    phase_timeout: Duration,
    deadline_at: Instant,
    now: Instant,
) -> bool {
    deadline_at.saturating_duration_since(now) <= phase_timeout
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(crate) fn deadline_within_slack(
    deadline_at: Instant,
    now: Instant,
    deadline_slack: Duration,
) -> bool {
    deadline_at.saturating_duration_since(now) <= deadline_slack
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum StreamLifecycleState {
    Pending,
    Success,
    Error,
    Canceled,
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(crate) struct StreamLifecycle {
    completion: Option<StreamCompletion>,
    hooks: Option<Box<dyn StreamOutcomeHooks + Send>>,
    state: StreamLifecycleState,
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
impl StreamLifecycle {
    pub(crate) fn new(hooks: Option<Box<dyn StreamOutcomeHooks + Send>>) -> Self {
        Self {
            completion: None,
            hooks,
            state: StreamLifecycleState::Pending,
        }
    }

    pub(crate) fn attach_completion(&mut self, completion: StreamCompletion) {
        self.completion = Some(completion);
    }

    pub(crate) fn complete_success(&mut self) {
        if self.state != StreamLifecycleState::Pending {
            return;
        }
        self.state = StreamLifecycleState::Success;
        if let Some(hooks) = &mut self.hooks {
            hooks.complete_success();
        }
        if let Some(completion) = &mut self.completion {
            completion.complete_success();
        }
    }

    pub(crate) fn complete_error(&mut self, error: &Error) {
        if self.state != StreamLifecycleState::Pending {
            return;
        }
        self.state = StreamLifecycleState::Error;
        if let Some(hooks) = &mut self.hooks {
            hooks.complete_error(error);
        }
        if let Some(completion) = &mut self.completion {
            completion.complete_error(error);
        }
    }

    pub(crate) fn complete_canceled(&mut self) {
        if self.state != StreamLifecycleState::Pending {
            return;
        }
        self.state = StreamLifecycleState::Canceled;
        if let Some(hooks) = &mut self.hooks {
            hooks.complete_canceled();
        }
        if let Some(completion) = &mut self.completion {
            completion.complete_canceled();
        }
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
impl std::fmt::Debug for StreamLifecycle {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter
            .debug_struct("StreamLifecycle")
            .field("has_completion", &self.completion.is_some())
            .field("has_hooks", &self.hooks.is_some())
            .field("state", &self.state)
            .finish()
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
impl Drop for StreamLifecycle {
    fn drop(&mut self) {
        self.complete_canceled();
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(super) fn attach_completion(
    lifecycle: &mut Option<StreamLifecycle>,
    completion: StreamCompletion,
) {
    if let Some(lifecycle) = lifecycle {
        lifecycle.attach_completion(completion);
    } else {
        let mut new_lifecycle = StreamLifecycle::new(None);
        new_lifecycle.attach_completion(completion);
        *lifecycle = Some(new_lifecycle);
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(super) fn complete_success(lifecycle: &mut Option<StreamLifecycle>) {
    if let Some(lifecycle) = lifecycle {
        lifecycle.complete_success();
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(super) fn complete_error(lifecycle: &mut Option<StreamLifecycle>, error: &Error) {
    if let Some(lifecycle) = lifecycle {
        lifecycle.complete_error(error);
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(super) fn map_decode_body_error(
    error: DecodeContentEncodingError,
    method: &Method,
    uri: &str,
    max_bytes: usize,
) -> Error {
    match error {
        DecodeContentEncodingError::Decode { encoding, message } => Error::DecodeContentEncoding {
            encoding,
            method: method.clone(),
            uri: uri.to_owned(),
            message,
        },
        DecodeContentEncodingError::TooLarge { actual_bytes } => Error::ResponseBodyTooLarge {
            limit_bytes: max_bytes,
            actual_bytes,
            method: method.clone(),
            uri: uri.to_owned(),
        },
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
fn stream_read_io_error_kind(error: &Error) -> io::ErrorKind {
    #[cfg(any(feature = "_async", feature = "_blocking"))]
    fn io_error_kind_in_chain(error: &(dyn std::error::Error + 'static)) -> Option<io::ErrorKind> {
        let mut current = Some(error);
        while let Some(source) = current {
            if let Some(io_error) = source.downcast_ref::<io::Error>() {
                return Some(io_error.kind());
            }
            current = source.source();
        }
        None
    }

    #[cfg(feature = "_async")]
    fn async_stream_error_kind(error: &(dyn std::error::Error + 'static)) -> Option<io::ErrorKind> {
        if let Some(kind) = io_error_kind_in_chain(error) {
            return Some(kind);
        }

        let hyper_error = error.downcast_ref::<hyper::Error>()?;
        if hyper_error.is_timeout() {
            return Some(io::ErrorKind::TimedOut);
        }
        if hyper_error.is_incomplete_message()
            || hyper_error.is_closed()
            || hyper_error.is_shutdown()
        {
            return Some(io::ErrorKind::UnexpectedEof);
        }
        if hyper_error.is_body_write_aborted() {
            return Some(io::ErrorKind::BrokenPipe);
        }

        None
    }

    match error {
        Error::Timeout { .. } | Error::DeadlineExceeded { .. } => io::ErrorKind::TimedOut,
        Error::ReadBody { source } => {
            #[cfg(feature = "_async")]
            if let Some(kind) = async_stream_error_kind(source.as_ref()) {
                return kind;
            }

            io_error_kind_in_chain(source.as_ref()).unwrap_or(io::ErrorKind::Other)
        }
        Error::WriteBody { source, .. } => {
            io_error_kind_in_chain(source.as_ref()).unwrap_or(io::ErrorKind::Other)
        }
        _ => io::ErrorKind::Other,
    }
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(super) fn into_stream_read_io_error(error: Error) -> io::Error {
    let kind = stream_read_io_error_kind(&error);
    io::Error::new(kind, error)
}

#[cfg(any(feature = "_async", feature = "_blocking"))]
pub(super) fn write_body_error<E>(method: &Method, uri: &str, source: E) -> Error
where
    E: std::error::Error + Send + Sync + 'static,
{
    Error::WriteBody {
        method: method.clone(),
        uri: uri.to_owned(),
        source: Box::new(source),
    }
}

#[cfg(feature = "_async")]
mod async_stream;
#[cfg(feature = "_blocking")]
mod blocking_stream;

#[cfg(feature = "_async")]
#[cfg_attr(
    docsrs,
    doc(cfg(any(
        feature = "async-tls-rustls-ring",
        feature = "async-tls-rustls-aws-lc-rs",
        feature = "async-tls-native"
    )))
)]
pub use async_stream::ResponseStream;
#[cfg(feature = "_async")]
pub(crate) use async_stream::{ResponseStreamContext, StreamPermits};
#[cfg(feature = "_blocking")]
#[cfg_attr(
    docsrs,
    doc(cfg(any(
        feature = "blocking-tls-rustls-ring",
        feature = "blocking-tls-rustls-aws-lc-rs",
        feature = "blocking-tls-native"
    )))
)]
pub use blocking_stream::BlockingResponseStream;
#[cfg(feature = "_blocking")]
pub(crate) use blocking_stream::BlockingResponseStreamContext;