Skip to main content

jacquard_common/
stream.rs

1//! Stream abstractions for HTTP request/response bodies
2//!
3//! This module provides platform-agnostic streaming types for handling large
4//! payloads efficiently without loading everything into memory.
5//!
6//! # Features
7//!
8//! - [`ByteStream`]: Streaming response bodies
9//! - [`ByteSink`]: Streaming request bodies
10//! - [`StreamError`]: Concrete error type for streaming operations
11//!
12//! # Platform Support
13//!
14//! Uses `n0-future` for platform-agnostic async streams that work on both
15//! native and WASM targets without requiring `Send` bounds on WASM.
16//!
17//! # Examples
18//!
19//! ## Streaming Download
20//!
21//! ```no_run
22//! # #[cfg(all(feature = "streaming", feature = "reqwest-client"))]
23//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24//! use jacquard_common::http_client::{HttpClient, HttpClientExt};
25//! use futures_lite::StreamExt;
26//!
27//! let client = reqwest::Client::new();
28//! let request = http::Request::builder()
29//!     .uri("https://example.com/large-file")
30//!     .body(vec![])
31//!     .unwrap();
32//!
33//! let response = client.send_http_streaming(request).await?;
34//! let (_parts, body) = response.into_parts();
35//! let mut stream = Box::pin(body.into_inner());
36//!
37//! // Use futures_lite::StreamExt for iteration
38//! while let Some(chunk) = stream.as_mut().try_next().await? {
39//!     // Process chunk without loading entire file into memory
40//! }
41//! # Ok(())
42//! # }
43//! ```
44
45use alloc::boxed::Box;
46use alloc::string::String;
47use core::error::Error;
48use core::fmt;
49use core::pin::Pin;
50
51/// Boxed error type for streaming operations
52pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
53
54/// Error type for streaming operations
55#[derive(Debug, thiserror::Error, miette::Diagnostic)]
56pub struct StreamError {
57    kind: StreamErrorKind,
58    #[source]
59    source: Option<BoxError>,
60}
61
62/// Categories of streaming errors
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64#[non_exhaustive]
65pub enum StreamErrorKind {
66    /// Network or I/O error
67    Transport,
68    /// Stream or connection closed
69    Closed,
70    /// Protocol violation or framing error
71    Protocol,
72    /// Message deserialization failed
73    Decode,
74    /// Message serialization failed
75    Encode,
76    /// Wrong message format (e.g., text frame when expecting binary)
77    WrongMessageFormat,
78}
79
80impl StreamError {
81    /// Create a new streaming error
82    pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self {
83        Self { kind, source }
84    }
85
86    /// Get the error kind
87    pub fn kind(&self) -> &StreamErrorKind {
88        &self.kind
89    }
90
91    /// Get the underlying error source
92    pub fn source(&self) -> Option<&BoxError> {
93        self.source.as_ref()
94    }
95
96    /// Create a "connection closed" error
97    pub fn closed() -> Self {
98        Self {
99            kind: StreamErrorKind::Closed,
100            source: None,
101        }
102    }
103
104    /// Create a transport error with source
105    pub fn transport(source: impl Error + Send + Sync + 'static) -> Self {
106        Self {
107            kind: StreamErrorKind::Transport,
108            source: Some(Box::new(source)),
109        }
110    }
111
112    /// Create a protocol error
113    pub fn protocol(msg: impl Into<String>) -> Self {
114        Self {
115            kind: StreamErrorKind::Protocol,
116            source: Some(msg.into().into()),
117        }
118    }
119
120    /// Create a decode error with source
121    pub fn decode(source: impl Error + Send + Sync + 'static) -> Self {
122        Self {
123            kind: StreamErrorKind::Decode,
124            source: Some(Box::new(source)),
125        }
126    }
127
128    /// Create an encode error with source
129    pub fn encode(source: impl Error + Send + Sync + 'static) -> Self {
130        Self {
131            kind: StreamErrorKind::Encode,
132            source: Some(Box::new(source)),
133        }
134    }
135
136    /// Create a wrong message format error
137    pub fn wrong_message_format(msg: impl Into<String>) -> Self {
138        Self {
139            kind: StreamErrorKind::WrongMessageFormat,
140            source: Some(msg.into().into()),
141        }
142    }
143}
144
145impl fmt::Display for StreamError {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        match self.kind {
148            StreamErrorKind::Transport => write!(f, "Transport error"),
149            StreamErrorKind::Closed => write!(f, "Stream closed"),
150            StreamErrorKind::Protocol => write!(f, "Protocol error"),
151            StreamErrorKind::Decode => write!(f, "Decode error"),
152            StreamErrorKind::Encode => write!(f, "Encode error"),
153            StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"),
154        }?;
155
156        if let Some(source) = &self.source {
157            write!(f, ": {}", source)?;
158        }
159
160        Ok(())
161    }
162}
163
164use bytes::Bytes;
165
166/// Boxed stream type with proper Send bounds for native, no Send for WASM
167#[cfg(not(target_arch = "wasm32"))]
168type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;
169
170/// Boxed stream type without Send bound for WASM
171#[cfg(target_arch = "wasm32")]
172type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;
173
174/// Platform-agnostic byte stream abstraction
175pub struct ByteStream {
176    inner: Boxed<Result<Bytes, StreamError>>,
177}
178
179impl ByteStream {
180    /// Create a new byte stream from any compatible stream
181    #[cfg(not(target_arch = "wasm32"))]
182    pub fn new<S>(stream: S) -> Self
183    where
184        S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static,
185    {
186        Self {
187            inner: Box::pin(stream),
188        }
189    }
190
191    /// Create a new byte stream from any compatible stream (WASM)
192    #[cfg(target_arch = "wasm32")]
193    pub fn new<S>(stream: S) -> Self
194    where
195        S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static,
196    {
197        Self {
198            inner: Box::pin(stream),
199        }
200    }
201
202    /// Check if stream is known to be empty (always false for dynamic streams)
203    pub fn is_empty(&self) -> bool {
204        false
205    }
206
207    /// Convert into the inner boxed stream
208    pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
209        self.inner
210    }
211
212    /// Split this stream into two streams that both receive all chunks
213    ///
214    /// Chunks are cloned (cheaply via Bytes rc). Spawns a forwarder task.
215    /// Both returned streams will receive all chunks from the original stream.
216    /// The forwarder continues as long as at least one stream is alive.
217    /// If the underlying stream errors, both teed streams will end.
218    pub fn tee(self) -> (ByteStream, ByteStream) {
219        use futures::channel::mpsc;
220        use n0_future::StreamExt as _;
221
222        let (tx1, rx1) = mpsc::unbounded();
223        let (tx2, rx2) = mpsc::unbounded();
224
225        n0_future::task::spawn(async move {
226            let mut stream = self.inner;
227            while let Some(result) = stream.next().await {
228                match result {
229                    Ok(chunk) => {
230                        // Clone chunk (cheap - Bytes is rc'd)
231                        let chunk2 = chunk.clone();
232
233                        // Send to both channels, continue if at least one succeeds
234                        let send1 = tx1.unbounded_send(Ok(chunk));
235                        let send2 = tx2.unbounded_send(Ok(chunk2));
236
237                        // Only stop if both channels are closed
238                        if send1.is_err() && send2.is_err() {
239                            break;
240                        }
241                    }
242                    Err(_e) => {
243                        // Underlying stream errored, stop forwarding.
244                        // Both channels will close, ending both streams.
245                        break;
246                    }
247                }
248            }
249        });
250
251        (ByteStream::new(rx1), ByteStream::new(rx2))
252    }
253}
254
255impl fmt::Debug for ByteStream {
256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257        f.debug_struct("ByteStream").finish_non_exhaustive()
258    }
259}
260
261/// Platform-agnostic byte sink abstraction
262pub struct ByteSink {
263    inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
264}
265
266impl ByteSink {
267    /// Create a new byte sink from any compatible sink
268    pub fn new<S>(sink: S) -> Self
269    where
270        S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
271    {
272        Self {
273            inner: Box::pin(sink),
274        }
275    }
276
277    /// Convert into the inner boxed sink
278    pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
279        self.inner
280    }
281}
282
283impl fmt::Debug for ByteSink {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        f.debug_struct("ByteSink").finish_non_exhaustive()
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use bytes::Bytes;
293
294    #[test]
295    fn stream_error_carries_kind_and_source() {
296        let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed");
297        let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source)));
298
299        assert_eq!(err.kind(), &StreamErrorKind::Transport);
300        assert!(err.source().is_some());
301        assert_eq!(format!("{}", err), "Transport error: pipe closed");
302    }
303
304    #[test]
305    fn stream_error_without_source() {
306        let err = StreamError::closed();
307
308        assert_eq!(err.kind(), &StreamErrorKind::Closed);
309        assert!(err.source().is_none());
310    }
311
312    #[tokio::test]
313    async fn byte_stream_can_be_created() {
314        use futures::stream;
315
316        let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
317        let stream = stream::iter(data);
318
319        let byte_stream = ByteStream::new(stream);
320        assert!(!byte_stream.is_empty());
321    }
322}