jacquard_common/
stream.rs1use std::error::Error;
46use std::fmt;
47use std::pin::Pin;
48
49pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
51
52#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub struct StreamError {
55 kind: StreamErrorKind,
56 #[source]
57 source: Option<BoxError>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum StreamErrorKind {
63 Transport,
65 Closed,
67 Protocol,
69 Decode,
71 Encode,
73 WrongMessageFormat,
75}
76
77impl StreamError {
78 pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self {
80 Self { kind, source }
81 }
82
83 pub fn kind(&self) -> &StreamErrorKind {
85 &self.kind
86 }
87
88 pub fn source(&self) -> Option<&BoxError> {
90 self.source.as_ref()
91 }
92
93 pub fn closed() -> Self {
95 Self {
96 kind: StreamErrorKind::Closed,
97 source: None,
98 }
99 }
100
101 pub fn transport(source: impl Error + Send + Sync + 'static) -> Self {
103 Self {
104 kind: StreamErrorKind::Transport,
105 source: Some(Box::new(source)),
106 }
107 }
108
109 pub fn protocol(msg: impl Into<String>) -> Self {
111 Self {
112 kind: StreamErrorKind::Protocol,
113 source: Some(msg.into().into()),
114 }
115 }
116
117 pub fn decode(source: impl Error + Send + Sync + 'static) -> Self {
119 Self {
120 kind: StreamErrorKind::Decode,
121 source: Some(Box::new(source)),
122 }
123 }
124
125 pub fn encode(source: impl Error + Send + Sync + 'static) -> Self {
127 Self {
128 kind: StreamErrorKind::Encode,
129 source: Some(Box::new(source)),
130 }
131 }
132
133 pub fn wrong_message_format(msg: impl Into<String>) -> Self {
135 Self {
136 kind: StreamErrorKind::WrongMessageFormat,
137 source: Some(msg.into().into()),
138 }
139 }
140}
141
142impl fmt::Display for StreamError {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 match self.kind {
145 StreamErrorKind::Transport => write!(f, "Transport error"),
146 StreamErrorKind::Closed => write!(f, "Stream closed"),
147 StreamErrorKind::Protocol => write!(f, "Protocol error"),
148 StreamErrorKind::Decode => write!(f, "Decode error"),
149 StreamErrorKind::Encode => write!(f, "Encode error"),
150 StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"),
151 }?;
152
153 if let Some(source) = &self.source {
154 write!(f, ": {}", source)?;
155 }
156
157 Ok(())
158 }
159}
160
161use bytes::Bytes;
162
163#[cfg(not(target_arch = "wasm32"))]
165type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;
166
167#[cfg(target_arch = "wasm32")]
169type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;
170
171pub struct ByteStream {
173 inner: Boxed<Result<Bytes, StreamError>>,
174}
175
176impl ByteStream {
177 #[cfg(not(target_arch = "wasm32"))]
179 pub fn new<S>(stream: S) -> Self
180 where
181 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static,
182 {
183 Self {
184 inner: Box::pin(stream),
185 }
186 }
187
188 #[cfg(target_arch = "wasm32")]
190 pub fn new<S>(stream: S) -> Self
191 where
192 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static,
193 {
194 Self {
195 inner: Box::pin(stream),
196 }
197 }
198
199 pub fn is_empty(&self) -> bool {
201 false
202 }
203
204 pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
206 self.inner
207 }
208
209 pub fn tee(self) -> (ByteStream, ByteStream) {
216 use futures::channel::mpsc;
217 use n0_future::StreamExt as _;
218
219 let (tx1, rx1) = mpsc::unbounded();
220 let (tx2, rx2) = mpsc::unbounded();
221
222 n0_future::task::spawn(async move {
223 let mut stream = self.inner;
224 while let Some(result) = stream.next().await {
225 match result {
226 Ok(chunk) => {
227 let chunk2 = chunk.clone();
229
230 let send1 = tx1.unbounded_send(Ok(chunk));
232 let send2 = tx2.unbounded_send(Ok(chunk2));
233
234 if send1.is_err() && send2.is_err() {
236 break;
237 }
238 }
239 Err(_e) => {
240 break;
243 }
244 }
245 }
246 });
247
248 (ByteStream::new(rx1), ByteStream::new(rx2))
249 }
250}
251
252impl fmt::Debug for ByteStream {
253 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254 f.debug_struct("ByteStream").finish_non_exhaustive()
255 }
256}
257
258pub struct ByteSink {
260 inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
261}
262
263impl ByteSink {
264 pub fn new<S>(sink: S) -> Self
266 where
267 S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
268 {
269 Self {
270 inner: Box::pin(sink),
271 }
272 }
273
274 pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
276 self.inner
277 }
278}
279
280impl fmt::Debug for ByteSink {
281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282 f.debug_struct("ByteSink").finish_non_exhaustive()
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use bytes::Bytes;
290
291 #[test]
292 fn stream_error_carries_kind_and_source() {
293 let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed");
294 let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source)));
295
296 assert_eq!(err.kind(), &StreamErrorKind::Transport);
297 assert!(err.source().is_some());
298 assert_eq!(format!("{}", err), "Transport error: pipe closed");
299 }
300
301 #[test]
302 fn stream_error_without_source() {
303 let err = StreamError::closed();
304
305 assert_eq!(err.kind(), &StreamErrorKind::Closed);
306 assert!(err.source().is_none());
307 }
308
309 #[tokio::test]
310 async fn byte_stream_can_be_created() {
311 use futures::stream;
312
313 let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
314 let stream = stream::iter(data);
315
316 let byte_stream = ByteStream::new(stream);
317 assert!(!byte_stream.is_empty());
318 }
319}