async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
17//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
18//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
19//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
20//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
21//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
22//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
23//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
24//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
25//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
26//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
27//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
28//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
29//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
30//!
31//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
32//!
33//! * The latest release (currently [`tokio-tungstenite` 0.26](https://docs.rs/tokio-tungstenite/0.26), feature flag `tokio-tungstenite026`)
34//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
35//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
36
37use {
38    std::{
39        future::Future,
40        io::{
41            self,
42            prelude::*,
43        },
44        pin::Pin,
45    },
46    tokio::io::{
47        AsyncRead,
48        AsyncWrite,
49    },
50};
51#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite026"))] use {
52    std::{
53        iter,
54        mem,
55    },
56    fallible_collections::FallibleVec,
57    futures::{
58        Sink,
59        SinkExt as _,
60        future::{
61            self,
62            Either,
63        },
64        stream::{
65            self,
66            Stream,
67            StreamExt as _,
68            TryStreamExt as _,
69        },
70    },
71};
72#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
73#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
74#[cfg(feature = "tokio-tungstenite026")] use tokio_tungstenite026::tungstenite as tungstenite026;
75pub use {
76    async_proto_derive::{
77        Protocol,
78        bitflags,
79    },
80    crate::error::*,
81};
82#[doc(hidden)] pub use tokio; // used in proc macro
83
84mod error;
85mod impls;
86
87/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
88#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite026"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
89
90/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
91pub trait Protocol: Sized {
92    /// Reads a value of this type from an async stream.
93    ///
94    /// # Cancellation safety
95    ///
96    /// Implementations of this method are generally not cancellation safe.
97    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
98    /// Writes a value of this type to an async sink.
99    ///
100    /// # Cancellation safety
101    ///
102    /// Implementations of this method are generally not cancellation safe.
103    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
104    /// Reads a value of this type from a sync stream.
105    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
106    /// Writes a value of this type to a sync sink.
107    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
108
109    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
110    ///
111    /// This can be used to get around drop glue issues that might arise with `read`.
112    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
113        Box::pin(async move {
114            let value = Self::read(&mut stream).await?;
115            Ok((stream, value))
116        })
117    }
118
119    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
120    ///
121    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
122    ///
123    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
124    ///
125    /// # Example
126    ///
127    /// ```
128    /// use {
129    ///     std::{
130    ///         io,
131    ///         net::TcpStream,
132    ///     },
133    ///     async_proto::Protocol,
134    /// };
135    ///
136    /// struct Client {
137    ///     tcp_stream: TcpStream,
138    ///     buf: Vec<u8>,
139    /// }
140    ///
141    /// impl Client {
142    ///     fn new(tcp_stream: TcpStream) -> Self {
143    ///         Self {
144    ///             tcp_stream,
145    ///             buf: Vec::default(),
146    ///         }
147    ///     }
148    ///
149    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
150    ///         self.tcp_stream.set_nonblocking(true)?;
151    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
152    ///     }
153    ///
154    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
155    ///         self.tcp_stream.set_nonblocking(false)?;
156    ///         msg.write_sync(&mut self.tcp_stream)?;
157    ///         Ok(())
158    ///     }
159    /// }
160    /// ```
161    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
162        let mut temp_buf = vec![0; 8];
163        loop {
164            let mut slice = &mut &**buf;
165            match Self::read_sync(&mut slice) {
166                Ok(value) => {
167                    let value_len = slice.len();
168                    buf.drain(..buf.len() - value_len);
169                    return Ok(Some(value))
170                }
171                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
172                Err(e) => return Err(e),
173            }
174            match stream.read(&mut temp_buf) {
175                Ok(0) => return Err(ReadError {
176                    context: ErrorContext::DefaultImpl,
177                    kind: ReadErrorKind::EndOfStream,
178                }),
179                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
180                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
181                Err(e) => return Err(ReadError {
182                    context: ErrorContext::DefaultImpl,
183                    kind: e.into(),
184                }),
185            }
186        }
187    }
188
189    #[cfg(feature = "tokio-tungstenite021")]
190    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
191    /// Reads a value of this type from a `tokio-tungstenite` websocket.
192    ///
193    /// # Cancellation safety
194    ///
195    /// The default implementation of this method is cancellation safe.
196    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
197        Box::pin(async move {
198            let packet = stream.try_next().await.map_err(|e| ReadError {
199                context: ErrorContext::DefaultImpl,
200                kind: e.into(),
201            })?.ok_or_else(|| ReadError {
202                context: ErrorContext::DefaultImpl,
203                kind: ReadErrorKind::EndOfStream,
204            })?;
205            match packet {
206                tungstenite021::Message::Text(data) => match data.chars().next() {
207                    Some('m') => {
208                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
209                            context: ErrorContext::DefaultImpl,
210                            kind: e.into(),
211                        })?;
212                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
213                            context: ErrorContext::DefaultImpl,
214                            kind: e.into(),
215                        })?;
216                        while buf.len() < len {
217                            let packet = stream.try_next().await.map_err(|e| ReadError {
218                                context: ErrorContext::DefaultImpl,
219                                kind: e.into(),
220                            })?.ok_or_else(|| ReadError {
221                                context: ErrorContext::DefaultImpl,
222                                kind: ReadErrorKind::EndOfStream,
223                            })?;
224                            if let tungstenite021::Message::Binary(data) = packet {
225                                buf.extend_from_slice(&data);
226                            } else {
227                                return Err(ReadError {
228                                    context: ErrorContext::DefaultImpl,
229                                    kind: ReadErrorKind::MessageKind021(packet),
230                                })
231                            }
232                        }
233                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
234                            context: ErrorContext::WebSocket {
235                                source: Box::new(context),
236                            },
237                            kind,
238                        })
239                    }
240                    _ => Err(ReadError {
241                        context: ErrorContext::DefaultImpl,
242                        kind: ReadErrorKind::WebSocketTextMessage024(data),
243                    }),
244                },
245                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
246                    context: ErrorContext::WebSocket {
247                        source: Box::new(context),
248                    },
249                    kind,
250                }),
251                _ => Err(ReadError {
252                    context: ErrorContext::DefaultImpl,
253                    kind: ReadErrorKind::MessageKind021(packet),
254                }),
255            }
256        })
257    }
258
259    #[cfg(feature = "tokio-tungstenite024")]
260    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
261    /// Reads a value of this type from a `tokio-tungstenite` websocket.
262    ///
263    /// # Cancellation safety
264    ///
265    /// The default implementation of this method is cancellation safe.
266    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
267        Box::pin(async move {
268            let packet = stream.try_next().await.map_err(|e| ReadError {
269                context: ErrorContext::DefaultImpl,
270                kind: e.into(),
271            })?.ok_or_else(|| ReadError {
272                context: ErrorContext::DefaultImpl,
273                kind: ReadErrorKind::EndOfStream,
274            })?;
275            match packet {
276                tungstenite024::Message::Text(data) => match data.chars().next() {
277                    Some('m') => {
278                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
279                            context: ErrorContext::DefaultImpl,
280                            kind: e.into(),
281                        })?;
282                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
283                            context: ErrorContext::DefaultImpl,
284                            kind: e.into(),
285                        })?;
286                        while buf.len() < len {
287                            let packet = stream.try_next().await.map_err(|e| ReadError {
288                                context: ErrorContext::DefaultImpl,
289                                kind: e.into(),
290                            })?.ok_or_else(|| ReadError {
291                                context: ErrorContext::DefaultImpl,
292                                kind: ReadErrorKind::EndOfStream,
293                            })?;
294                            if let tungstenite024::Message::Binary(data) = packet {
295                                buf.extend_from_slice(&data);
296                            } else {
297                                return Err(ReadError {
298                                    context: ErrorContext::DefaultImpl,
299                                    kind: ReadErrorKind::MessageKind024(packet),
300                                })
301                            }
302                        }
303                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
304                            context: ErrorContext::WebSocket {
305                                source: Box::new(context),
306                            },
307                            kind,
308                        })
309                    }
310                    _ => Err(ReadError {
311                        context: ErrorContext::DefaultImpl,
312                        kind: ReadErrorKind::WebSocketTextMessage024(data),
313                    }),
314                },
315                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
316                    context: ErrorContext::WebSocket {
317                        source: Box::new(context),
318                    },
319                    kind,
320                }),
321                _ => Err(ReadError {
322                    context: ErrorContext::DefaultImpl,
323                    kind: ReadErrorKind::MessageKind024(packet),
324                }),
325            }
326        })
327    }
328
329    #[cfg(feature = "tokio-tungstenite026")]
330    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
331    /// Reads a value of this type from a `tokio-tungstenite` websocket.
332    ///
333    /// # Cancellation safety
334    ///
335    /// The default implementation of this method is cancellation safe.
336    fn read_ws026<'a, R: Stream<Item = Result<tungstenite026::Message, tungstenite026::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
337        Box::pin(async move {
338            let packet = stream.try_next().await.map_err(|e| ReadError {
339                context: ErrorContext::DefaultImpl,
340                kind: e.into(),
341            })?.ok_or_else(|| ReadError {
342                context: ErrorContext::DefaultImpl,
343                kind: ReadErrorKind::EndOfStream,
344            })?;
345            match packet {
346                tungstenite026::Message::Text(data) => match data.chars().next() {
347                    Some('m') => {
348                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
349                            context: ErrorContext::DefaultImpl,
350                            kind: e.into(),
351                        })?;
352                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
353                            context: ErrorContext::DefaultImpl,
354                            kind: e.into(),
355                        })?;
356                        while buf.len() < len {
357                            let packet = stream.try_next().await.map_err(|e| ReadError {
358                                context: ErrorContext::DefaultImpl,
359                                kind: e.into(),
360                            })?.ok_or_else(|| ReadError {
361                                context: ErrorContext::DefaultImpl,
362                                kind: ReadErrorKind::EndOfStream,
363                            })?;
364                            if let tungstenite026::Message::Binary(data) = packet {
365                                buf.extend_from_slice(&data);
366                            } else {
367                                return Err(ReadError {
368                                    context: ErrorContext::DefaultImpl,
369                                    kind: ReadErrorKind::MessageKind026(packet),
370                                })
371                            }
372                        }
373                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
374                            context: ErrorContext::WebSocket {
375                                source: Box::new(context),
376                            },
377                            kind,
378                        })
379                    }
380                    _ => Err(ReadError {
381                        context: ErrorContext::DefaultImpl,
382                        kind: ReadErrorKind::WebSocketTextMessage026(data),
383                    }),
384                },
385                tungstenite026::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
386                    context: ErrorContext::WebSocket {
387                        source: Box::new(context),
388                    },
389                    kind,
390                }),
391                _ => Err(ReadError {
392                    context: ErrorContext::DefaultImpl,
393                    kind: ReadErrorKind::MessageKind026(packet),
394                }),
395            }
396        })
397    }
398
399    #[cfg(feature = "tokio-tungstenite021")]
400    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
401    /// Writes a value of this type to a `tokio-tungstenite` websocket.
402    ///
403    /// # Cancellation safety
404    ///
405    /// The default implementation of this method is not cancellation safe.
406    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
407    where Self: Sync {
408        Box::pin(async move {
409            let mut buf = Vec::default();
410            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
411                context: ErrorContext::WebSocket {
412                    source: Box::new(context),
413                },
414                kind,
415            })?;
416            if buf.len() <= WS_MAX_MESSAGE_SIZE {
417                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
418                    context: ErrorContext::DefaultImpl,
419                    kind: e.into(),
420                })?;
421            } else {
422                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
423                    context: ErrorContext::DefaultImpl,
424                    kind: e.into(),
425                })?;
426                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
427                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
428                        context: ErrorContext::DefaultImpl,
429                        kind: e.into(),
430                    })?;
431                }
432            }
433            Ok(())
434        })
435    }
436
437    #[cfg(feature = "tokio-tungstenite024")]
438    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
439    /// Writes a value of this type to a `tokio-tungstenite` websocket.
440    ///
441    /// # Cancellation safety
442    ///
443    /// The default implementation of this method is not cancellation safe.
444    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
445    where Self: Sync {
446        Box::pin(async move {
447            let mut buf = Vec::default();
448            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
449                context: ErrorContext::WebSocket {
450                    source: Box::new(context),
451                },
452                kind,
453            })?;
454            if buf.len() <= WS_MAX_MESSAGE_SIZE {
455                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
456                    context: ErrorContext::DefaultImpl,
457                    kind: e.into(),
458                })?;
459            } else {
460                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
461                    context: ErrorContext::DefaultImpl,
462                    kind: e.into(),
463                })?;
464                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
465                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
466                        context: ErrorContext::DefaultImpl,
467                        kind: e.into(),
468                    })?;
469                }
470            }
471            Ok(())
472        })
473    }
474
475    #[cfg(feature = "tokio-tungstenite026")]
476    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
477    /// Writes a value of this type to a `tokio-tungstenite` websocket.
478    ///
479    /// # Cancellation safety
480    ///
481    /// The default implementation of this method is not cancellation safe.
482    fn write_ws026<'a, W: Sink<tungstenite026::Message, Error = tungstenite026::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
483    where Self: Sync {
484        Box::pin(async move {
485            let mut buf = Vec::default();
486            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
487                context: ErrorContext::WebSocket {
488                    source: Box::new(context),
489                },
490                kind,
491            })?;
492            if buf.len() <= WS_MAX_MESSAGE_SIZE {
493                sink.send(tungstenite026::Message::binary(buf)).await.map_err(|e| WriteError {
494                    context: ErrorContext::DefaultImpl,
495                    kind: e.into(),
496                })?;
497            } else {
498                sink.send(tungstenite026::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
499                    context: ErrorContext::DefaultImpl,
500                    kind: e.into(),
501                })?;
502                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
503                    sink.send(tungstenite026::Message::binary(tungstenite026::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
504                        context: ErrorContext::DefaultImpl,
505                        kind: e.into(),
506                    })?;
507                }
508            }
509            Ok(())
510        })
511    }
512
513    #[cfg(feature = "tokio-tungstenite021")]
514    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
515    /// Reads a value of this type from a [`tungstenite021`] websocket.
516    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
517        let packet = websocket.read().map_err(|e| ReadError {
518            context: ErrorContext::DefaultImpl,
519            kind: e.into(),
520        })?;
521        match packet {
522            tungstenite021::Message::Text(data) => match data.chars().next() {
523                Some('m') => {
524                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
525                        context: ErrorContext::DefaultImpl,
526                        kind: e.into(),
527                    })?;
528                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
529                        context: ErrorContext::DefaultImpl,
530                        kind: e.into(),
531                    })?;
532                    while buf.len() < len {
533                        let packet = websocket.read().map_err(|e| ReadError {
534                            context: ErrorContext::DefaultImpl,
535                            kind: e.into(),
536                        })?;
537                        if let tungstenite021::Message::Binary(data) = packet {
538                            buf.extend_from_slice(&data);
539                        } else {
540                            return Err(ReadError {
541                                context: ErrorContext::DefaultImpl,
542                                kind: ReadErrorKind::MessageKind021(packet),
543                            })
544                        }
545                    }
546                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
547                        context: ErrorContext::WebSocket {
548                            source: Box::new(context),
549                        },
550                        kind,
551                    })
552                }
553                _ => return Err(ReadError {
554                    context: ErrorContext::DefaultImpl,
555                    kind: ReadErrorKind::WebSocketTextMessage024(data),
556                }),
557            },
558            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
559                context: ErrorContext::WebSocket {
560                    source: Box::new(context),
561                },
562                kind,
563            }),
564            _ => Err(ReadError {
565                context: ErrorContext::DefaultImpl,
566                kind: ReadErrorKind::MessageKind021(packet),
567            }),
568        }
569    }
570
571    #[cfg(feature = "tokio-tungstenite024")]
572    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
573    /// Reads a value of this type from a [`tungstenite024`] websocket.
574    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
575        let packet = websocket.read().map_err(|e| ReadError {
576            context: ErrorContext::DefaultImpl,
577            kind: e.into(),
578        })?;
579        match packet {
580            tungstenite024::Message::Text(data) => match data.chars().next() {
581                Some('m') => {
582                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
583                        context: ErrorContext::DefaultImpl,
584                        kind: e.into(),
585                    })?;
586                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
587                        context: ErrorContext::DefaultImpl,
588                        kind: e.into(),
589                    })?;
590                    while buf.len() < len {
591                        let packet = websocket.read().map_err(|e| ReadError {
592                            context: ErrorContext::DefaultImpl,
593                            kind: e.into(),
594                        })?;
595                        if let tungstenite024::Message::Binary(data) = packet {
596                            buf.extend_from_slice(&data);
597                        } else {
598                            return Err(ReadError {
599                                context: ErrorContext::DefaultImpl,
600                                kind: ReadErrorKind::MessageKind024(packet),
601                            })
602                        }
603                    }
604                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
605                        context: ErrorContext::WebSocket {
606                            source: Box::new(context),
607                        },
608                        kind,
609                    })
610                }
611                _ => return Err(ReadError {
612                    context: ErrorContext::DefaultImpl,
613                    kind: ReadErrorKind::WebSocketTextMessage024(data),
614                }),
615            },
616            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
617                context: ErrorContext::WebSocket {
618                    source: Box::new(context),
619                },
620                kind,
621            }),
622            _ => Err(ReadError {
623                context: ErrorContext::DefaultImpl,
624                kind: ReadErrorKind::MessageKind024(packet),
625            }),
626        }
627    }
628
629    #[cfg(feature = "tokio-tungstenite026")]
630    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
631    /// Reads a value of this type from a [`tungstenite026`] websocket.
632    fn read_ws_sync026(websocket: &mut tungstenite026::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
633        let packet = websocket.read().map_err(|e| ReadError {
634            context: ErrorContext::DefaultImpl,
635            kind: e.into(),
636        })?;
637        match packet {
638            tungstenite026::Message::Text(data) => match data.chars().next() {
639                Some('m') => {
640                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
641                        context: ErrorContext::DefaultImpl,
642                        kind: e.into(),
643                    })?;
644                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
645                        context: ErrorContext::DefaultImpl,
646                        kind: e.into(),
647                    })?;
648                    while buf.len() < len {
649                        let packet = websocket.read().map_err(|e| ReadError {
650                            context: ErrorContext::DefaultImpl,
651                            kind: e.into(),
652                        })?;
653                        if let tungstenite026::Message::Binary(data) = packet {
654                            buf.extend_from_slice(&data);
655                        } else {
656                            return Err(ReadError {
657                                context: ErrorContext::DefaultImpl,
658                                kind: ReadErrorKind::MessageKind026(packet),
659                            })
660                        }
661                    }
662                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
663                        context: ErrorContext::WebSocket {
664                            source: Box::new(context),
665                        },
666                        kind,
667                    })
668                }
669                _ => return Err(ReadError {
670                    context: ErrorContext::DefaultImpl,
671                    kind: ReadErrorKind::WebSocketTextMessage026(data),
672                }),
673            },
674            tungstenite026::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
675                context: ErrorContext::WebSocket {
676                    source: Box::new(context),
677                },
678                kind,
679            }),
680            _ => Err(ReadError {
681                context: ErrorContext::DefaultImpl,
682                kind: ReadErrorKind::MessageKind026(packet),
683            }),
684        }
685    }
686
687    #[cfg(feature = "tokio-tungstenite021")]
688    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
689    /// Writes a value of this type to a [`tungstenite021`] websocket.
690    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
691        let mut buf = Vec::default();
692        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
693            context: ErrorContext::WebSocket {
694                source: Box::new(context),
695            },
696            kind,
697        })?;
698        if buf.len() <= WS_MAX_MESSAGE_SIZE {
699            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
700                context: ErrorContext::DefaultImpl,
701                kind: e.into(),
702            })?;
703        } else {
704            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
705                context: ErrorContext::DefaultImpl,
706                kind: e.into(),
707            })?;
708            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
709                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
710                    context: ErrorContext::DefaultImpl,
711                    kind: e.into(),
712                })?;
713            }
714        }
715        websocket.flush().map_err(|e| WriteError {
716            context: ErrorContext::DefaultImpl,
717            kind: e.into(),
718        })?;
719        Ok(())
720    }
721
722    #[cfg(feature = "tokio-tungstenite024")]
723    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
724    /// Writes a value of this type to a [`tungstenite024`] websocket.
725    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
726        let mut buf = Vec::default();
727        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
728            context: ErrorContext::WebSocket {
729                source: Box::new(context),
730            },
731            kind,
732        })?;
733        if buf.len() <= WS_MAX_MESSAGE_SIZE {
734            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
735                context: ErrorContext::DefaultImpl,
736                kind: e.into(),
737            })?;
738        } else {
739            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
740                context: ErrorContext::DefaultImpl,
741                kind: e.into(),
742            })?;
743            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
744                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
745                    context: ErrorContext::DefaultImpl,
746                    kind: e.into(),
747                })?;
748            }
749        }
750        websocket.flush().map_err(|e| WriteError {
751            context: ErrorContext::DefaultImpl,
752            kind: e.into(),
753        })?;
754        Ok(())
755    }
756
757    #[cfg(feature = "tokio-tungstenite026")]
758    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
759    /// Writes a value of this type to a [`tungstenite026`] websocket.
760    fn write_ws_sync026(&self, websocket: &mut tungstenite026::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
761        let mut buf = Vec::default();
762        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
763            context: ErrorContext::WebSocket {
764                source: Box::new(context),
765            },
766            kind,
767        })?;
768        if buf.len() <= WS_MAX_MESSAGE_SIZE {
769            websocket.send(tungstenite026::Message::binary(buf)).map_err(|e| WriteError {
770                context: ErrorContext::DefaultImpl,
771                kind: e.into(),
772            })?;
773        } else {
774            websocket.send(tungstenite026::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
775                context: ErrorContext::DefaultImpl,
776                kind: e.into(),
777            })?;
778            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
779                websocket.send(tungstenite026::Message::binary(tungstenite026::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
780                    context: ErrorContext::DefaultImpl,
781                    kind: e.into(),
782                })?;
783            }
784        }
785        websocket.flush().map_err(|e| WriteError {
786            context: ErrorContext::DefaultImpl,
787            kind: e.into(),
788        })?;
789        Ok(())
790    }
791
792    #[cfg(feature = "tokio-tungstenite021")]
793    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
794    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
795    ///
796    /// This can be used to get around drop glue issues that might arise with `read_ws`.
797    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
798        Box::pin(async move {
799            let value = Self::read_ws021(&mut stream).await?;
800            Ok((stream, value))
801        })
802    }
803
804    #[cfg(feature = "tokio-tungstenite024")]
805    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
806    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
807    ///
808    /// This can be used to get around drop glue issues that might arise with `read_ws`.
809    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
810        Box::pin(async move {
811            let value = Self::read_ws024(&mut stream).await?;
812            Ok((stream, value))
813        })
814    }
815
816    #[cfg(feature = "tokio-tungstenite026")]
817    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
818    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
819    ///
820    /// This can be used to get around drop glue issues that might arise with `read_ws`.
821    fn read_ws_owned026<R: Stream<Item = Result<tungstenite026::Message, tungstenite026::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
822        Box::pin(async move {
823            let value = Self::read_ws026(&mut stream).await?;
824            Ok((stream, value))
825        })
826    }
827}
828
829/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
830///
831/// Useful for WebSocket connections where the message type per direction is always the same.
832#[cfg(feature = "tokio-tungstenite021")]
833#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
834pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
835    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
836    let (sink, stream) = sock.split();
837    Ok((
838        sink.sink_map_err(|e| WriteError {
839            context: ErrorContext::WebSocketSink,
840            kind: e.into(),
841        }).with_flat_map::<W, _, _>(|msg| {
842            let mut buf = Vec::default();
843            match msg.write_sync(&mut buf) {
844                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
845                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
846                } else {
847                    Either::Right(stream::iter(
848                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
849                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
850                        .collect::<Vec<_>>()
851                    ))
852                }.map(Ok)),
853                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
854                    context: ErrorContext::WebSocket {
855                        source: Box::new(context),
856                    },
857                    kind,
858                }))),
859            }
860        }),
861        stream.scan(None, |state, res| {
862            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
863                let packet = res.map_err(|e| ReadError {
864                    context: ErrorContext::WebSocketStream,
865                    kind: e.into(),
866                })?;
867                Ok(if let Some((len, buf)) = state {
868                    if let tungstenite021::Message::Binary(data) = packet {
869                        buf.extend_from_slice(&data);
870                    } else {
871                        return Err(ReadError {
872                            context: ErrorContext::DefaultImpl,
873                            kind: ReadErrorKind::MessageKind021(packet),
874                        })
875                    }
876                    if buf.len() >= *len {
877                        let buf = mem::take(buf);
878                        *state = None;
879                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
880                            context: ErrorContext::WebSocket {
881                                source: Box::new(context),
882                            },
883                            kind,
884                        })?)))
885                    } else {
886                        Either::Left(stream::empty())
887                    }
888                } else {
889                    match packet {
890                        tungstenite021::Message::Text(data) => match data.chars().next() {
891                            Some('m') => {
892                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
893                                    context: ErrorContext::DefaultImpl,
894                                    kind: e.into(),
895                                })?;
896                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
897                                    context: ErrorContext::DefaultImpl,
898                                    kind: e.into(),
899                                })?;
900                                *state = Some((len, buf));
901                                Either::Left(stream::empty())
902                            }
903                            _ => return Err(ReadError {
904                                context: ErrorContext::DefaultImpl,
905                                kind: ReadErrorKind::WebSocketTextMessage024(data),
906                            }),
907                        },
908                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
909                            context: ErrorContext::WebSocket {
910                                source: Box::new(context),
911                            },
912                            kind,
913                        })?))),
914                        _ => return Err(ReadError {
915                            context: ErrorContext::DefaultImpl,
916                            kind: ReadErrorKind::MessageKind021(packet),
917                        }),
918                    }
919                })
920            }
921
922            future::ready(Some(scanner(state, res)))
923        }).try_flatten(),
924    ))
925}
926
927/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
928///
929/// Useful for WebSocket connections where the message type per direction is always the same.
930#[cfg(feature = "tokio-tungstenite024")]
931#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
932pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
933    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
934    let (sink, stream) = sock.split();
935    Ok((
936        sink.sink_map_err(|e| WriteError {
937            context: ErrorContext::WebSocketSink,
938            kind: e.into(),
939        }).with_flat_map::<W, _, _>(|msg| {
940            let mut buf = Vec::default();
941            match msg.write_sync(&mut buf) {
942                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
943                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
944                } else {
945                    Either::Right(stream::iter(
946                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
947                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
948                        .collect::<Vec<_>>()
949                    ))
950                }.map(Ok)),
951                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
952                    context: ErrorContext::WebSocket {
953                        source: Box::new(context),
954                    },
955                    kind,
956                }))),
957            }
958        }),
959        stream.scan(None, |state, res| {
960            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
961                let packet = res.map_err(|e| ReadError {
962                    context: ErrorContext::WebSocketStream,
963                    kind: e.into(),
964                })?;
965                Ok(if let Some((len, buf)) = state {
966                    if let tungstenite024::Message::Binary(data) = packet {
967                        buf.extend_from_slice(&data);
968                    } else {
969                        return Err(ReadError {
970                            context: ErrorContext::DefaultImpl,
971                            kind: ReadErrorKind::MessageKind024(packet),
972                        })
973                    }
974                    if buf.len() >= *len {
975                        let buf = mem::take(buf);
976                        *state = None;
977                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
978                            context: ErrorContext::WebSocket {
979                                source: Box::new(context),
980                            },
981                            kind,
982                        })?)))
983                    } else {
984                        Either::Left(stream::empty())
985                    }
986                } else {
987                    match packet {
988                        tungstenite024::Message::Text(data) => match data.chars().next() {
989                            Some('m') => {
990                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
991                                    context: ErrorContext::DefaultImpl,
992                                    kind: e.into(),
993                                })?;
994                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
995                                    context: ErrorContext::DefaultImpl,
996                                    kind: e.into(),
997                                })?;
998                                *state = Some((len, buf));
999                                Either::Left(stream::empty())
1000                            }
1001                            _ => return Err(ReadError {
1002                                context: ErrorContext::DefaultImpl,
1003                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1004                            }),
1005                        },
1006                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1007                            context: ErrorContext::WebSocket {
1008                                source: Box::new(context),
1009                            },
1010                            kind,
1011                        })?))),
1012                        _ => return Err(ReadError {
1013                            context: ErrorContext::DefaultImpl,
1014                            kind: ReadErrorKind::MessageKind024(packet),
1015                        }),
1016                    }
1017                })
1018            }
1019
1020            future::ready(Some(scanner(state, res)))
1021        }).try_flatten(),
1022    ))
1023}
1024
1025/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1026///
1027/// Useful for WebSocket connections where the message type per direction is always the same.
1028#[cfg(feature = "tokio-tungstenite026")]
1029#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
1030pub async fn websocket026<R: Protocol, W: Protocol>(request: impl tungstenite026::client::IntoClientRequest + Unpin) -> tungstenite026::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1031    let (sock, _) = tokio_tungstenite026::connect_async(request).await?;
1032    let (sink, stream) = sock.split();
1033    Ok((
1034        sink.sink_map_err(|e| WriteError {
1035            context: ErrorContext::WebSocketSink,
1036            kind: e.into(),
1037        }).with_flat_map::<W, _, _>(|msg| {
1038            let mut buf = Vec::default();
1039            match msg.write_sync(&mut buf) {
1040                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1041                    Either::Left(stream::once(future::ready(tungstenite026::Message::binary(buf))))
1042                } else {
1043                    Either::Right(stream::iter(
1044                        iter::once(tungstenite026::Message::text(format!("m{}", buf.len())))
1045                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite026::Message::binary(tungstenite026::Bytes::copy_from_slice(chunk))))
1046                        .collect::<Vec<_>>()
1047                    ))
1048                }.map(Ok)),
1049                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1050                    context: ErrorContext::WebSocket {
1051                        source: Box::new(context),
1052                    },
1053                    kind,
1054                }))),
1055            }
1056        }),
1057        stream.scan(None, |state, res| {
1058            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite026::Result<tungstenite026::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1059                let packet = res.map_err(|e| ReadError {
1060                    context: ErrorContext::WebSocketStream,
1061                    kind: e.into(),
1062                })?;
1063                Ok(if let Some((len, buf)) = state {
1064                    if let tungstenite026::Message::Binary(data) = packet {
1065                        buf.extend_from_slice(&data);
1066                    } else {
1067                        return Err(ReadError {
1068                            context: ErrorContext::DefaultImpl,
1069                            kind: ReadErrorKind::MessageKind026(packet),
1070                        })
1071                    }
1072                    if buf.len() >= *len {
1073                        let buf = mem::take(buf);
1074                        *state = None;
1075                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1076                            context: ErrorContext::WebSocket {
1077                                source: Box::new(context),
1078                            },
1079                            kind,
1080                        })?)))
1081                    } else {
1082                        Either::Left(stream::empty())
1083                    }
1084                } else {
1085                    match packet {
1086                        tungstenite026::Message::Text(data) => match data.chars().next() {
1087                            Some('m') => {
1088                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1089                                    context: ErrorContext::DefaultImpl,
1090                                    kind: e.into(),
1091                                })?;
1092                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1093                                    context: ErrorContext::DefaultImpl,
1094                                    kind: e.into(),
1095                                })?;
1096                                *state = Some((len, buf));
1097                                Either::Left(stream::empty())
1098                            }
1099                            _ => return Err(ReadError {
1100                                context: ErrorContext::DefaultImpl,
1101                                kind: ReadErrorKind::WebSocketTextMessage026(data),
1102                            }),
1103                        },
1104                        tungstenite026::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1105                            context: ErrorContext::WebSocket {
1106                                source: Box::new(context),
1107                            },
1108                            kind,
1109                        })?))),
1110                        _ => return Err(ReadError {
1111                            context: ErrorContext::DefaultImpl,
1112                            kind: ReadErrorKind::MessageKind026(packet),
1113                        }),
1114                    }
1115                })
1116            }
1117
1118            future::ready(Some(scanner(state, res)))
1119        }).try_flatten(),
1120    ))
1121}