ratmom 0.1.0

Sensible, async, curl-based HTTP client
use event_listener::Event;
use http::HeaderMap;
use once_cell::sync::OnceCell;
use std::{sync::Arc, time::Duration};

/// Holds the current state of a trailer for a response.
///
/// This object acts as a shared handle that can be cloned and polled from
/// multiple threads to wait for and act on the response trailer.
///
/// There are two typical workflows for accessing trailer headers:
///
/// - If you are consuming the response body and then accessing the headers
///   afterward, then all trailers are guaranteed to have arrived (if any).
///   [`Trailer::try_get`] will allow you to access them without extra overhead.
/// - If you are handling trailers in a separate task, callback, or thread, then
///   either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait
///   for the trailer headers to arrive and then handle them.
///
/// Note that in either approach, trailer headers are delivered to your
/// application as a single [`HeaderMap`]; it is not possible to handle
/// individual headers as they arrive.
#[derive(Clone, Debug)]
pub struct Trailer {
    shared: Arc<Shared>,
}

#[derive(Debug)]
struct Shared {
    headers: OnceCell<HeaderMap>,
    ready: Event,
}

impl Trailer {
    /// Get a populated trailer handle containing no headers.
    pub(crate) fn empty() -> &'static Self {
        static EMPTY: OnceCell<Trailer> = OnceCell::new();

        EMPTY.get_or_init(|| Self {
            shared: Arc::new(Shared {
                headers: OnceCell::from(HeaderMap::new()),
                ready: Event::new(),
            }),
        })
    }

    /// Returns true if the trailer has been received (if any).
    ///
    /// The trailer will not be received until the body stream associated with
    /// this response has been fully consumed.
    #[inline]
    pub fn is_ready(&self) -> bool {
        self.try_get().is_some()
    }

    /// Attempt to get the trailer headers without blocking. Returns `None` if
    /// the trailer has not been received yet.
    #[inline]
    pub fn try_get(&self) -> Option<&HeaderMap> {
        self.shared.headers.get()
    }

    /// Block the current thread until the trailer headers arrive, and then
    /// return them.
    ///
    /// This is a blocking operation! If you are writing an asynchronous
    /// application, then you probably want to use [`Trailer::wait_async`]
    /// instead.
    pub fn wait(&self) -> &HeaderMap {
        loop {
            // Fast path: If the headers are already set, return them.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Headers not set, jump into the slow path by creating a new
            // listener for the ready event.
            let listener = self.shared.ready.listen();

            // Double-check that the headers are not set.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Otherwise, block until they are set.
            listener.wait();

            // If we got the notification, then the headers are likely to be
            // set.
            if let Some(headers) = self.try_get() {
                return headers;
            }
        }
    }

    /// Block the current thread until the trailer headers arrive or a timeout
    /// expires.
    ///
    /// If the given timeout expired before the trailer arrived then `None` is
    /// returned.
    ///
    /// This is a blocking operation! If you are writing an asynchronous
    /// application, then you probably want to use [`Trailer::wait_async`]
    /// instead.
    pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> {
        // Fast path: If the headers are already set, return them.
        if let Some(headers) = self.try_get() {
            return Some(headers);
        }

        // Headers not set, jump into the slow path by creating a new listener
        // for the ready event.
        let listener = self.shared.ready.listen();

        // Double-check that the headers are not set.
        if let Some(headers) = self.try_get() {
            return Some(headers);
        }

        // Otherwise, block with a timeout.
        if listener.wait_timeout(timeout) {
            self.try_get()
        } else {
            None
        }
    }

    /// Wait asynchronously until the trailer headers arrive, and then return
    /// them.
    pub async fn wait_async(&self) -> &HeaderMap {
        loop {
            // Fast path: If the headers are already set, return them.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Headers not set, jump into the slow path by creating a new
            // listener for the ready event.
            let listener = self.shared.ready.listen();

            // Double-check that the headers are not set.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Otherwise, wait asynchronously until they are.
            listener.await;

            // If we got the notification, then the headers are likely to be
            // set.
            if let Some(headers) = self.try_get() {
                return headers;
            }
        }
    }
}

pub(crate) struct TrailerWriter {
    shared: Arc<Shared>,
    headers: Option<HeaderMap>,
}

impl TrailerWriter {
    pub(crate) fn new() -> Self {
        Self {
            shared: Arc::new(Shared {
                headers: Default::default(),
                ready: Event::new(),
            }),
            headers: Some(HeaderMap::new()),
        }
    }

    pub(crate) fn trailer(&self) -> Trailer {
        Trailer {
            shared: self.shared.clone(),
        }
    }

    pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> {
        self.headers.as_mut()
    }

    #[inline]
    pub(crate) fn flush(&mut self) {
        if !self.flush_impl() {
            tracing::warn!("tried to flush trailer multiple times");
        }
    }

    fn flush_impl(&mut self) -> bool {
        if let Some(headers) = self.headers.take() {
            let _ = self.shared.headers.set(headers);

            // Wake up any calls waiting for the headers.
            self.shared.ready.notify(usize::max_value());

            true
        } else {
            false
        }
    }
}

impl Drop for TrailerWriter {
    fn drop(&mut self) {
        self.flush_impl();
    }
}