jacquard-common 0.10.0

Core AT Protocol types and utilities for Jacquard
Documentation
//! Streaming support for XRPC requests and responses

use crate::{IntoStatic, StreamError, stream::ByteStream, xrpc::XrpcRequest};
use alloc::boxed::Box;
use bytes::Bytes;
use core::{marker::PhantomData, pin::Pin};
use http::StatusCode;
use n0_future::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
#[cfg(all(not(target_arch = "wasm32"), feature = "std"))]
use std::path::Path;

/// Boxed stream type with proper Send bounds for native, no Send for WASM
#[cfg(not(target_arch = "wasm32"))]
type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;

/// Boxed stream type without Send bound for WASM
#[cfg(target_arch = "wasm32")]
type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;

/// Trait for streaming XRPC procedures (bidirectional streaming).
///
/// Defines frame encoding/decoding for procedures that send/receive streams of data.
pub trait XrpcProcedureStream {
    /// The NSID for this XRPC method
    const NSID: &'static str;
    /// The upload encoding
    const ENCODING: &'static str;

    /// Frame type for this streaming procedure
    type Frame<'de>;

    /// Associated request type
    type Request: XrpcRequest;

    /// Response type returned from the XRPC call (marker struct)
    type Response: XrpcStreamResp;

    /// Encode a frame into bytes for transmission.
    ///
    /// Default implementation uses DAG-CBOR encoding.
    fn encode_frame<'de>(data: Self::Frame<'de>) -> Result<Bytes, StreamError>
    where
        Self::Frame<'de>: Serialize,
    {
        Ok(Bytes::from_owner(
            serde_ipld_dagcbor::to_vec(&data).map_err(StreamError::encode)?,
        ))
    }

    /// Decode the request body for procedures.
    ///
    /// Default implementation deserializes from CBOR. Override for non-CBOR encodings.
    fn decode_frame<'de>(frame: &'de [u8]) -> Result<Self::Frame<'de>, StreamError>
    where
        Self::Frame<'de>: Deserialize<'de>,
    {
        Ok(serde_ipld_dagcbor::from_slice(frame).map_err(StreamError::decode)?)
    }
}

/// Trait for XRPC Response types
///
/// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types
pub trait XrpcStreamResp {
    /// The NSID for this XRPC method
    const NSID: &'static str;

    /// Output encoding (MIME type)
    const ENCODING: &'static str;

    /// Response output type
    type Frame<'de>: IntoStatic;

    /// Encode a frame into bytes for transmission.
    ///
    /// Default implementation uses DAG-CBOR encoding.
    fn encode_frame<'de>(data: Self::Frame<'de>) -> Result<Bytes, StreamError>
    where
        Self::Frame<'de>: Serialize,
    {
        Ok(Bytes::from_owner(
            serde_ipld_dagcbor::to_vec(&data).map_err(StreamError::encode)?,
        ))
    }

    /// Decode the request body for procedures.
    ///
    /// Default implementation deserializes from CBOR. Override for non-CBOR encodings.
    ///
    /// TODO: make this handle when frames are fragmented?
    fn decode_frame<'de>(frame: &'de [u8]) -> Result<Self::Frame<'de>, StreamError>
    where
        Self::Frame<'de>: Deserialize<'de>,
    {
        Ok(serde_ipld_dagcbor::from_slice(frame).map_err(StreamError::decode)?)
    }
}

/// A single frame in a streaming XRPC request or response.
///
/// Wraps a buffer of bytes with optional type tagging via the phantom parameter.
#[repr(transparent)]
pub struct XrpcStreamFrame<F = ()> {
    /// The frame data
    pub buffer: Bytes,
    _marker: PhantomData<F>,
}

impl XrpcStreamFrame {
    /// Create a new untyped stream frame
    pub fn new(buffer: Bytes) -> Self {
        Self {
            buffer,
            _marker: PhantomData,
        }
    }
}

impl<F> XrpcStreamFrame<F> {
    /// Create a new typed stream frame
    pub fn new_typed<G>(buffer: Bytes) -> Self {
        Self {
            buffer,
            _marker: PhantomData,
        }
    }
}

/// Dumb file upload stream
///
/// Unavailable on wasm and no_std due to use of tokio I/O
#[cfg(all(not(target_arch = "wasm32"), feature = "std"))]
pub async fn upload_stream(file: impl AsRef<Path>) -> Result<XrpcProcedureSend, tokio::io::Error> {
    use tokio_util::io::ReaderStream;

    let file = tokio::fs::File::open(file).await?;
    let reader = ReaderStream::new(file);
    let stream = reader
        .map(|b| match b {
            Ok(bytes) => Ok(XrpcStreamFrame::new(bytes)),
            Err(err) => Err(StreamError::transport(err)),
        })
        .boxed();

    Ok(XrpcProcedureSend(stream))
}

/// Encode a stream of items into the corresponding XRPC procedure stream.
pub fn encode_stream<P: XrpcProcedureStream + 'static>(
    s: Boxed<P::Frame<'static>>,
) -> XrpcProcedureSend<P::Frame<'static>>
where
    <P as XrpcProcedureStream>::Frame<'static>: Serialize,
{
    let stream =
        s.map(|f| P::encode_frame(f).map(|b| XrpcStreamFrame::new_typed::<P::Frame<'_>>(b)));

    XrpcProcedureSend(Box::pin(stream))
}

/// Sending stream for streaming XRPC procedure uplink.
pub struct XrpcProcedureSend<F = ()>(pub Boxed<Result<XrpcStreamFrame<F>, StreamError>>);

/// Sink half of XRPC procedure uplink stream, for use in pipe scenarios.
pub struct XrpcProcedureSink<F = ()>(
    pub Pin<Box<dyn n0_future::Sink<XrpcStreamFrame<F>, Error = StreamError> + Send>>,
);

/// Typed streaming XRPC response.
///
/// Similar to `StreamingResponse` but with optional type-level frame tagging.
pub struct XrpcResponseStream<F = ()> {
    parts: http::response::Parts,
    body: Boxed<Result<XrpcStreamFrame<F>, StreamError>>,
}

impl XrpcResponseStream {
    /// Create from a `StreamingResponse`
    pub fn from_bytestream(StreamingResponse { parts, body }: StreamingResponse) -> Self {
        Self {
            parts,
            body: Box::pin(body.into_inner().map_ok(|b| XrpcStreamFrame::new(b))),
        }
    }

    /// Create from response parts and a byte stream
    pub fn from_parts(parts: http::response::Parts, body: ByteStream) -> Self {
        Self {
            parts,
            body: Box::pin(body.into_inner().map_ok(|b| XrpcStreamFrame::new(b))),
        }
    }

    /// Consume and return parts and body separately
    pub fn into_parts(self) -> (http::response::Parts, ByteStream) {
        (
            self.parts,
            ByteStream::new(Box::pin(self.body.map_ok(|f| f.buffer))),
        )
    }

    /// Consume and return just the body stream
    pub fn into_bytestream(self) -> ByteStream {
        ByteStream::new(Box::pin(self.body.map_ok(|f| f.buffer)))
    }
}

impl<F: XrpcStreamResp> XrpcResponseStream<F> {
    /// Create a typed response stream from a `StreamingResponse`
    pub fn from_stream(StreamingResponse { parts, body }: StreamingResponse) -> Self {
        Self {
            parts,
            body: Box::pin(
                body.into_inner()
                    .map_ok(|b| XrpcStreamFrame::new_typed::<F::Frame<'_>>(b)),
            ),
        }
    }

    /// Create a typed response stream from parts and body
    pub fn from_typed_parts(parts: http::response::Parts, body: ByteStream) -> Self {
        Self {
            parts,
            body: Box::pin(
                body.into_inner()
                    .map_ok(|b| XrpcStreamFrame::new_typed::<F::Frame<'_>>(b)),
            ),
        }
    }
}

impl<F: XrpcStreamResp + 'static> XrpcResponseStream<F> {
    /// Consume the typed stream and return just the raw byte stream
    pub fn into_bytestream(self) -> ByteStream {
        ByteStream::new(Box::pin(self.body.map_ok(|f| f.buffer)))
    }
}

/// HTTP streaming response
///
/// Similar to `Response<R>` but holds a streaming body instead of a buffer.
pub struct StreamingResponse {
    parts: http::response::Parts,
    body: ByteStream,
}

impl StreamingResponse {
    /// Create a new streaming response
    pub fn new(parts: http::response::Parts, body: ByteStream) -> Self {
        Self { parts, body }
    }

    /// Get the HTTP status code
    pub fn status(&self) -> StatusCode {
        self.parts.status
    }

    /// Get the response headers
    pub fn headers(&self) -> &http::HeaderMap {
        &self.parts.headers
    }

    /// Get the response version
    pub fn version(&self) -> http::Version {
        self.parts.version
    }

    /// Consume the response and return parts and body separately
    pub fn into_parts(self) -> (http::response::Parts, ByteStream) {
        (self.parts, self.body)
    }

    /// Get mutable access to the body stream
    pub fn body_mut(&mut self) -> &mut ByteStream {
        &mut self.body
    }

    /// Get a reference to the body stream
    pub fn body(&self) -> &ByteStream {
        &self.body
    }
}

impl core::fmt::Debug for StreamingResponse {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        f.debug_struct("StreamingResponse")
            .field("status", &self.parts.status)
            .field("version", &self.parts.version)
            .field("headers", &self.parts.headers)
            .finish_non_exhaustive()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;
    use futures::stream;

    #[test]
    fn streaming_response_holds_parts_and_body() {
        // Build parts from a Response and extract them
        let response = http::Response::builder()
            .status(StatusCode::OK)
            .body(())
            .unwrap();
        let (parts, _) = response.into_parts();

        let stream = stream::iter(vec![Ok(Bytes::from("test"))]);
        let body = ByteStream::new(stream);

        let response = StreamingResponse::new(parts, body);
        assert_eq!(response.status(), StatusCode::OK);
    }
}