jacquard_common/
stream.rs

1//! Stream abstractions for HTTP request/response bodies
2//!
3//! This module provides platform-agnostic streaming types for handling large
4//! payloads efficiently without loading everything into memory.
5//!
6//! # Features
7//!
8//! - [`ByteStream`]: Streaming response bodies
9//! - [`ByteSink`]: Streaming request bodies
10//! - [`StreamError`]: Concrete error type for streaming operations
11//!
12//! # Platform Support
13//!
14//! Uses `n0-future` for platform-agnostic async streams that work on both
15//! native and WASM targets without requiring `Send` bounds on WASM.
16//!
17//! # Examples
18//!
19//! ## Streaming Download
20//!
21//! ```no_run
22//! # #[cfg(all(feature = "streaming", feature = "reqwest-client"))]
23//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24//! use jacquard_common::http_client::{HttpClient, HttpClientExt};
25//! use futures_lite::StreamExt;
26//!
27//! let client = reqwest::Client::new();
28//! let request = http::Request::builder()
29//!     .uri("https://example.com/large-file")
30//!     .body(vec![])
31//!     .unwrap();
32//!
33//! let response = client.send_http_streaming(request).await?;
34//! let (_parts, body) = response.into_parts();
35//! let mut stream = Box::pin(body.into_inner());
36//!
37//! // Use futures_lite::StreamExt for iteration
38//! while let Some(chunk) = stream.as_mut().try_next().await? {
39//!     // Process chunk without loading entire file into memory
40//! }
41//! # Ok(())
42//! # }
43//! ```
44
45use std::error::Error;
46use std::fmt;
47use std::pin::Pin;
48
49/// Boxed error type for streaming operations
50pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
51
52/// Error type for streaming operations
53#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub struct StreamError {
55    kind: StreamErrorKind,
56    #[source]
57    source: Option<BoxError>,
58}
59
60/// Categories of streaming errors
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum StreamErrorKind {
63    /// Network or I/O error
64    Transport,
65    /// Stream or connection closed
66    Closed,
67    /// Protocol violation or framing error
68    Protocol,
69    /// Message deserialization failed
70    Decode,
71    /// Message serialization failed
72    Encode,
73    /// Wrong message format (e.g., text frame when expecting binary)
74    WrongMessageFormat,
75}
76
77impl StreamError {
78    /// Create a new streaming error
79    pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self {
80        Self { kind, source }
81    }
82
83    /// Get the error kind
84    pub fn kind(&self) -> &StreamErrorKind {
85        &self.kind
86    }
87
88    /// Get the underlying error source
89    pub fn source(&self) -> Option<&BoxError> {
90        self.source.as_ref()
91    }
92
93    /// Create a "connection closed" error
94    pub fn closed() -> Self {
95        Self {
96            kind: StreamErrorKind::Closed,
97            source: None,
98        }
99    }
100
101    /// Create a transport error with source
102    pub fn transport(source: impl Error + Send + Sync + 'static) -> Self {
103        Self {
104            kind: StreamErrorKind::Transport,
105            source: Some(Box::new(source)),
106        }
107    }
108
109    /// Create a protocol error
110    pub fn protocol(msg: impl Into<String>) -> Self {
111        Self {
112            kind: StreamErrorKind::Protocol,
113            source: Some(msg.into().into()),
114        }
115    }
116
117    /// Create a decode error with source
118    pub fn decode(source: impl Error + Send + Sync + 'static) -> Self {
119        Self {
120            kind: StreamErrorKind::Decode,
121            source: Some(Box::new(source)),
122        }
123    }
124
125    /// Create an encode error with source
126    pub fn encode(source: impl Error + Send + Sync + 'static) -> Self {
127        Self {
128            kind: StreamErrorKind::Encode,
129            source: Some(Box::new(source)),
130        }
131    }
132
133    /// Create a wrong message format error
134    pub fn wrong_message_format(msg: impl Into<String>) -> Self {
135        Self {
136            kind: StreamErrorKind::WrongMessageFormat,
137            source: Some(msg.into().into()),
138        }
139    }
140}
141
142impl fmt::Display for StreamError {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        match self.kind {
145            StreamErrorKind::Transport => write!(f, "Transport error"),
146            StreamErrorKind::Closed => write!(f, "Stream closed"),
147            StreamErrorKind::Protocol => write!(f, "Protocol error"),
148            StreamErrorKind::Decode => write!(f, "Decode error"),
149            StreamErrorKind::Encode => write!(f, "Encode error"),
150            StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"),
151        }?;
152
153        if let Some(source) = &self.source {
154            write!(f, ": {}", source)?;
155        }
156
157        Ok(())
158    }
159}
160
161use bytes::Bytes;
162
163/// Boxed stream type with proper Send bounds for native, no Send for WASM
164#[cfg(not(target_arch = "wasm32"))]
165type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;
166
167/// Boxed stream type without Send bound for WASM
168#[cfg(target_arch = "wasm32")]
169type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;
170
171/// Platform-agnostic byte stream abstraction
172pub struct ByteStream {
173    inner: Boxed<Result<Bytes, StreamError>>,
174}
175
176impl ByteStream {
177    /// Create a new byte stream from any compatible stream
178    #[cfg(not(target_arch = "wasm32"))]
179    pub fn new<S>(stream: S) -> Self
180    where
181        S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static,
182    {
183        Self {
184            inner: Box::pin(stream),
185        }
186    }
187
188    /// Create a new byte stream from any compatible stream (WASM)
189    #[cfg(target_arch = "wasm32")]
190    pub fn new<S>(stream: S) -> Self
191    where
192        S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static,
193    {
194        Self {
195            inner: Box::pin(stream),
196        }
197    }
198
199    /// Check if stream is known to be empty (always false for dynamic streams)
200    pub fn is_empty(&self) -> bool {
201        false
202    }
203
204    /// Convert into the inner boxed stream
205    pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
206        self.inner
207    }
208
209    /// Split this stream into two streams that both receive all chunks
210    ///
211    /// Chunks are cloned (cheaply via Bytes rc). Spawns a forwarder task.
212    /// Both returned streams will receive all chunks from the original stream.
213    /// The forwarder continues as long as at least one stream is alive.
214    /// If the underlying stream errors, both teed streams will end.
215    pub fn tee(self) -> (ByteStream, ByteStream) {
216        use futures::channel::mpsc;
217        use n0_future::StreamExt as _;
218
219        let (tx1, rx1) = mpsc::unbounded();
220        let (tx2, rx2) = mpsc::unbounded();
221
222        n0_future::task::spawn(async move {
223            let mut stream = self.inner;
224            while let Some(result) = stream.next().await {
225                match result {
226                    Ok(chunk) => {
227                        // Clone chunk (cheap - Bytes is rc'd)
228                        let chunk2 = chunk.clone();
229
230                        // Send to both channels, continue if at least one succeeds
231                        let send1 = tx1.unbounded_send(Ok(chunk));
232                        let send2 = tx2.unbounded_send(Ok(chunk2));
233
234                        // Only stop if both channels are closed
235                        if send1.is_err() && send2.is_err() {
236                            break;
237                        }
238                    }
239                    Err(_e) => {
240                        // Underlying stream errored, stop forwarding.
241                        // Both channels will close, ending both streams.
242                        break;
243                    }
244                }
245            }
246        });
247
248        (ByteStream::new(rx1), ByteStream::new(rx2))
249    }
250}
251
252impl fmt::Debug for ByteStream {
253    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254        f.debug_struct("ByteStream").finish_non_exhaustive()
255    }
256}
257
258/// Platform-agnostic byte sink abstraction
259pub struct ByteSink {
260    inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
261}
262
263impl ByteSink {
264    /// Create a new byte sink from any compatible sink
265    pub fn new<S>(sink: S) -> Self
266    where
267        S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
268    {
269        Self {
270            inner: Box::pin(sink),
271        }
272    }
273
274    /// Convert into the inner boxed sink
275    pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
276        self.inner
277    }
278}
279
280impl fmt::Debug for ByteSink {
281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282        f.debug_struct("ByteSink").finish_non_exhaustive()
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use bytes::Bytes;
290
291    #[test]
292    fn stream_error_carries_kind_and_source() {
293        let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed");
294        let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source)));
295
296        assert_eq!(err.kind(), &StreamErrorKind::Transport);
297        assert!(err.source().is_some());
298        assert_eq!(format!("{}", err), "Transport error: pipe closed");
299    }
300
301    #[test]
302    fn stream_error_without_source() {
303        let err = StreamError::closed();
304
305        assert_eq!(err.kind(), &StreamErrorKind::Closed);
306        assert!(err.source().is_none());
307    }
308
309    #[tokio::test]
310    async fn byte_stream_can_be_created() {
311        use futures::stream;
312
313        let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
314        let stream = stream::iter(data);
315
316        let byte_stream = ByteStream::new(stream);
317        assert!(!byte_stream.is_empty());
318    }
319}