eventsrc-client 0.0.2

One-shot and replayable SSE / EventSource client for Rust.
Documentation
use ::reqwest::{
    Client, Request, RequestBuilder,
    header::{ACCEPT, HeaderValue},
};

use super::{LAST_EVENT_ID_HEADER, SSE_CONTENT_TYPE, into_body_stream, validate_response};
use crate::{
    error::{Error, ErrorKind},
    replayable::{ConnectFuture, Connector},
};

pub(crate) fn freeze_request(builder: RequestBuilder) -> Result<ReplayableRequest, Error> {
    let (client, request) = builder.build_split();
    let mut prototype = request.map_err(|err| {
        Error::new(ErrorKind::InvalidRequest, "request build failed").set_source(err)
    })?;

    if !prototype.headers().contains_key(ACCEPT) {
        prototype
            .headers_mut()
            .insert(ACCEPT, HeaderValue::from_static(SSE_CONTENT_TYPE));
    }

    ReplayableRequest::new(client, prototype)
}

#[derive(Debug)]
pub(crate) struct ReplayableRequest {
    client: Client,
    prototype: Request,
}

impl Clone for ReplayableRequest {
    fn clone(&self) -> Self {
        Self {
            client: self.client.clone(),
            prototype: self
                .prototype
                .try_clone()
                .expect("replayable requests must remain cloneable"),
        }
    }
}

impl ReplayableRequest {
    fn new(client: Client, prototype: Request) -> Result<Self, Error> {
        if prototype.headers().contains_key(LAST_EVENT_ID_HEADER) {
            return Err(Error::new(ErrorKind::InvalidRequest, "last event id header is reserved"));
        }

        if prototype.try_clone().is_none() {
            return Err(Error::new(ErrorKind::InvalidRequest, "request body is not replayable"));
        }

        Ok(Self { client, prototype })
    }

    pub(crate) fn prepare(&self, last_event_id: Option<&str>) -> Result<Request, Error> {
        let mut request = self
            .prototype
            .try_clone()
            .ok_or(Error::new(ErrorKind::InvalidRequest, "request body is not replayable"))?;

        request.headers_mut().remove(LAST_EVENT_ID_HEADER);

        if let Some(last_event_id) = last_event_id.filter(|value| !value.is_empty()) {
            let value = HeaderValue::from_str(last_event_id).map_err(|_| {
                Error::new(ErrorKind::InvalidRequest, "invalid `Last-Event-ID` header value")
            })?;
            request.headers_mut().insert(LAST_EVENT_ID_HEADER, value);
        }

        Ok(request)
    }
}

impl Connector for ReplayableRequest {
    fn connect(&self, last_event_id: Option<&str>) -> Result<ConnectFuture, Error> {
        let request = self.prepare(last_event_id)?;
        let client = self.client.clone();

        Ok(Box::pin(async move {
            let response = client.execute(request).await.map_err(|err| {
                Error::new(ErrorKind::Transport, "request execution failed").set_source(err)
            })?;

            validate_response(&response)?;

            Ok(into_body_stream(response))
        }))
    }
}