jacquard_common/
stream.rs1use alloc::boxed::Box;
46use alloc::string::String;
47use core::error::Error;
48use core::fmt;
49use core::pin::Pin;
50
51pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
53
54#[derive(Debug, thiserror::Error, miette::Diagnostic)]
56pub struct StreamError {
57 kind: StreamErrorKind,
58 #[source]
59 source: Option<BoxError>,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64#[non_exhaustive]
65pub enum StreamErrorKind {
66 Transport,
68 Closed,
70 Protocol,
72 Decode,
74 Encode,
76 WrongMessageFormat,
78}
79
80impl StreamError {
81 pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self {
83 Self { kind, source }
84 }
85
86 pub fn kind(&self) -> &StreamErrorKind {
88 &self.kind
89 }
90
91 pub fn source(&self) -> Option<&BoxError> {
93 self.source.as_ref()
94 }
95
96 pub fn closed() -> Self {
98 Self {
99 kind: StreamErrorKind::Closed,
100 source: None,
101 }
102 }
103
104 pub fn transport(source: impl Error + Send + Sync + 'static) -> Self {
106 Self {
107 kind: StreamErrorKind::Transport,
108 source: Some(Box::new(source)),
109 }
110 }
111
112 pub fn protocol(msg: impl Into<String>) -> Self {
114 Self {
115 kind: StreamErrorKind::Protocol,
116 source: Some(msg.into().into()),
117 }
118 }
119
120 pub fn decode(source: impl Error + Send + Sync + 'static) -> Self {
122 Self {
123 kind: StreamErrorKind::Decode,
124 source: Some(Box::new(source)),
125 }
126 }
127
128 pub fn encode(source: impl Error + Send + Sync + 'static) -> Self {
130 Self {
131 kind: StreamErrorKind::Encode,
132 source: Some(Box::new(source)),
133 }
134 }
135
136 pub fn wrong_message_format(msg: impl Into<String>) -> Self {
138 Self {
139 kind: StreamErrorKind::WrongMessageFormat,
140 source: Some(msg.into().into()),
141 }
142 }
143}
144
145impl fmt::Display for StreamError {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 match self.kind {
148 StreamErrorKind::Transport => write!(f, "Transport error"),
149 StreamErrorKind::Closed => write!(f, "Stream closed"),
150 StreamErrorKind::Protocol => write!(f, "Protocol error"),
151 StreamErrorKind::Decode => write!(f, "Decode error"),
152 StreamErrorKind::Encode => write!(f, "Encode error"),
153 StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"),
154 }?;
155
156 if let Some(source) = &self.source {
157 write!(f, ": {}", source)?;
158 }
159
160 Ok(())
161 }
162}
163
164use bytes::Bytes;
165
166#[cfg(not(target_arch = "wasm32"))]
168type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;
169
170#[cfg(target_arch = "wasm32")]
172type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;
173
174pub struct ByteStream {
176 inner: Boxed<Result<Bytes, StreamError>>,
177}
178
179impl ByteStream {
180 #[cfg(not(target_arch = "wasm32"))]
182 pub fn new<S>(stream: S) -> Self
183 where
184 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static,
185 {
186 Self {
187 inner: Box::pin(stream),
188 }
189 }
190
191 #[cfg(target_arch = "wasm32")]
193 pub fn new<S>(stream: S) -> Self
194 where
195 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static,
196 {
197 Self {
198 inner: Box::pin(stream),
199 }
200 }
201
202 pub fn is_empty(&self) -> bool {
204 false
205 }
206
207 pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
209 self.inner
210 }
211
212 pub fn tee(self) -> (ByteStream, ByteStream) {
219 use futures::channel::mpsc;
220 use n0_future::StreamExt as _;
221
222 let (tx1, rx1) = mpsc::unbounded();
223 let (tx2, rx2) = mpsc::unbounded();
224
225 n0_future::task::spawn(async move {
226 let mut stream = self.inner;
227 while let Some(result) = stream.next().await {
228 match result {
229 Ok(chunk) => {
230 let chunk2 = chunk.clone();
232
233 let send1 = tx1.unbounded_send(Ok(chunk));
235 let send2 = tx2.unbounded_send(Ok(chunk2));
236
237 if send1.is_err() && send2.is_err() {
239 break;
240 }
241 }
242 Err(_e) => {
243 break;
246 }
247 }
248 }
249 });
250
251 (ByteStream::new(rx1), ByteStream::new(rx2))
252 }
253}
254
255impl fmt::Debug for ByteStream {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 f.debug_struct("ByteStream").finish_non_exhaustive()
258 }
259}
260
261pub struct ByteSink {
263 inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
264}
265
266impl ByteSink {
267 pub fn new<S>(sink: S) -> Self
269 where
270 S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
271 {
272 Self {
273 inner: Box::pin(sink),
274 }
275 }
276
277 pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
279 self.inner
280 }
281}
282
283impl fmt::Debug for ByteSink {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 f.debug_struct("ByteSink").finish_non_exhaustive()
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use bytes::Bytes;
293
294 #[test]
295 fn stream_error_carries_kind_and_source() {
296 let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed");
297 let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source)));
298
299 assert_eq!(err.kind(), &StreamErrorKind::Transport);
300 assert!(err.source().is_some());
301 assert_eq!(format!("{}", err), "Transport error: pipe closed");
302 }
303
304 #[test]
305 fn stream_error_without_source() {
306 let err = StreamError::closed();
307
308 assert_eq!(err.kind(), &StreamErrorKind::Closed);
309 assert!(err.source().is_none());
310 }
311
312 #[tokio::test]
313 async fn byte_stream_can_be_created() {
314 use futures::stream;
315
316 let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
317 let stream = stream::iter(data);
318
319 let byte_stream = ByteStream::new(stream);
320 assert!(!byte_stream.is_empty());
321 }
322}