arcly-http 0.1.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! High-performance Server-Sent Events engine.
//!
//! `SseStream` adapts any `futures::Stream` of [`SseEvent`]s into a compliant
//! `text/event-stream` response. The conversion to the transport layer happens
//! once, lazily, inside `IntoResponse` — handler code only ever touches
//! arcly-owned types (`SseStream`, `SseEvent`).
//!
//! Back-pressure and flushing are delegated to the underlying async runtime:
//! the stream is polled only as fast as the client drains the socket, so a slow
//! consumer never buffers unboundedly inside the framework.

use std::convert::Infallible;
use std::time::Duration;

use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::{BoxStream, Stream, StreamExt};
use serde::Serialize;

/// A single Server-Sent Event. Builder-style; all fields optional except data.
#[derive(Debug, Default, Clone)]
pub struct SseEvent {
    event: Option<String>,
    data: String,
    id: Option<String>,
    retry: Option<u64>,
}

impl SseEvent {
    /// Set the raw `data:` payload.
    pub fn data(mut self, data: impl Into<String>) -> Self {
        self.data = data.into();
        self
    }

    /// Serialise `value` as JSON into the `data:` field.
    pub fn json_data<T: Serialize>(mut self, value: T) -> Result<Self, serde_json::Error> {
        self.data = serde_json::to_string(&value)?;
        Ok(self)
    }

    /// Set the `event:` (named event type) field.
    pub fn event(mut self, name: impl Into<String>) -> Self {
        self.event = Some(name.into());
        self
    }

    /// Set the `id:` field (used by clients for `Last-Event-ID` resumption).
    pub fn id(mut self, id: impl Into<String>) -> Self {
        self.id = Some(id.into());
        self
    }

    /// Set the client reconnection hint, in milliseconds.
    pub fn retry(mut self, millis: u64) -> Self {
        self.retry = Some(millis);
        self
    }

    /// Lower into the transport event. Private — never escapes this module.
    fn into_transport(self) -> Event {
        let mut e = Event::default().data(self.data);
        if let Some(name) = self.event {
            e = e.event(name);
        }
        if let Some(id) = self.id {
            e = e.id(id);
        }
        if let Some(ms) = self.retry {
            e = e.retry(Duration::from_millis(ms));
        }
        e
    }
}

/// A streaming `text/event-stream` response body.
///
/// Construct from any `Send + 'static` stream yielding `Result<SseEvent, _>`.
/// A periodic keep-alive comment is injected automatically to hold idle
/// connections open through proxies.
pub struct SseStream {
    inner: BoxStream<'static, Result<SseEvent, Infallible>>,
    keep_alive: Duration,
}

impl SseStream {
    /// Wrap a stream of events. The error type is fixed to [`Infallible`] —
    /// fallible producers should map their errors into a terminal `SseEvent`.
    pub fn new<S>(stream: S) -> Self
    where
        S: Stream<Item = Result<SseEvent, Infallible>> + Send + 'static,
    {
        Self {
            inner: stream.boxed(),
            keep_alive: Duration::from_secs(15),
        }
    }

    /// Override the keep-alive interval (default 15s).
    pub fn keep_alive(mut self, interval: Duration) -> Self {
        self.keep_alive = interval;
        self
    }
}

impl axum::response::IntoResponse for SseStream {
    fn into_response(self) -> axum::response::Response {
        let interval = self.keep_alive;
        let mapped = self.inner.map(|res| res.map(SseEvent::into_transport));
        Sse::new(mapped)
            .keep_alive(KeepAlive::new().interval(interval))
            .into_response()
    }
}