async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bitvec`](https://docs.rs/bitvec): [`BitVec<u8, Lsb0>`](https://docs.rs/bitvec/latest/bitvec/vec/struct.BitVec.html)
17//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
18//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
19//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
20//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
21//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
22//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
23//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
24//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
25//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
26//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
27//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
28//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
29//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
30//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
31//!
32//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
33//!
34//! * The latest release (currently [`tokio-tungstenite` 0.27](https://docs.rs/tokio-tungstenite/0.27), feature flag `tokio-tungstenite027`)
35//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
36//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
37
38use {
39    std::{
40        future::Future,
41        io::{
42            self,
43            prelude::*,
44        },
45        pin::Pin,
46    },
47    tokio::io::{
48        AsyncRead,
49        AsyncWrite,
50    },
51};
52#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
53    std::{
54        iter,
55        mem,
56    },
57    fallible_collections::FallibleVec,
58    futures::{
59        Sink,
60        SinkExt as _,
61        future::{
62            self,
63            Either,
64        },
65        stream::{
66            self,
67            Stream,
68            StreamExt as _,
69            TryStreamExt as _,
70        },
71    },
72};
73#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
74#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
75#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
76pub use {
77    async_proto_derive::{
78        Protocol,
79        bitflags,
80    },
81    crate::error::*,
82};
83#[doc(hidden)] pub use tokio; // used in proc macro
84
85mod error;
86mod impls;
87
88/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
89#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
90
91/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
92pub trait Protocol: Sized {
93    /// Reads a value of this type from an async stream.
94    ///
95    /// # Cancellation safety
96    ///
97    /// Implementations of this method are generally not cancellation safe.
98    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
99    /// Writes a value of this type to an async sink.
100    ///
101    /// # Cancellation safety
102    ///
103    /// Implementations of this method are generally not cancellation safe.
104    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
105    /// Reads a value of this type from a sync stream.
106    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
107    /// Writes a value of this type to a sync sink.
108    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
109
110    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
111    ///
112    /// This can be used to get around drop glue issues that might arise with `read`.
113    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
114        Box::pin(async move {
115            let value = Self::read(&mut stream).await?;
116            Ok((stream, value))
117        })
118    }
119
120    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
121    ///
122    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
123    ///
124    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
125    ///
126    /// # Example
127    ///
128    /// ```
129    /// use {
130    ///     std::{
131    ///         io,
132    ///         net::TcpStream,
133    ///     },
134    ///     async_proto::Protocol,
135    /// };
136    ///
137    /// struct Client {
138    ///     tcp_stream: TcpStream,
139    ///     buf: Vec<u8>,
140    /// }
141    ///
142    /// impl Client {
143    ///     fn new(tcp_stream: TcpStream) -> Self {
144    ///         Self {
145    ///             tcp_stream,
146    ///             buf: Vec::default(),
147    ///         }
148    ///     }
149    ///
150    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
151    ///         self.tcp_stream.set_nonblocking(true)?;
152    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
153    ///     }
154    ///
155    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
156    ///         self.tcp_stream.set_nonblocking(false)?;
157    ///         msg.write_sync(&mut self.tcp_stream)?;
158    ///         Ok(())
159    ///     }
160    /// }
161    /// ```
162    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
163        let mut temp_buf = vec![0; 8];
164        loop {
165            let mut slice = &mut &**buf;
166            match Self::read_sync(&mut slice) {
167                Ok(value) => {
168                    let value_len = slice.len();
169                    buf.drain(..buf.len() - value_len);
170                    return Ok(Some(value))
171                }
172                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
173                Err(e) => return Err(e),
174            }
175            match stream.read(&mut temp_buf) {
176                Ok(0) => return Err(ReadError {
177                    context: ErrorContext::DefaultImpl,
178                    kind: ReadErrorKind::EndOfStream,
179                }),
180                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
181                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
182                Err(e) => return Err(ReadError {
183                    context: ErrorContext::DefaultImpl,
184                    kind: e.into(),
185                }),
186            }
187        }
188    }
189
190    #[cfg(feature = "tokio-tungstenite021")]
191    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
192    /// Reads a value of this type from a `tokio-tungstenite` websocket.
193    ///
194    /// # Cancellation safety
195    ///
196    /// The default implementation of this method is not cancellation safe.
197    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
198        Box::pin(async move {
199            let packet = stream.try_next().await.map_err(|e| ReadError {
200                context: ErrorContext::DefaultImpl,
201                kind: e.into(),
202            })?.ok_or_else(|| ReadError {
203                context: ErrorContext::DefaultImpl,
204                kind: ReadErrorKind::EndOfStream,
205            })?;
206            match packet {
207                tungstenite021::Message::Text(data) => match data.chars().next() {
208                    Some('m') => {
209                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
210                            context: ErrorContext::DefaultImpl,
211                            kind: e.into(),
212                        })?;
213                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
214                            context: ErrorContext::DefaultImpl,
215                            kind: e.into(),
216                        })?;
217                        while buf.len() < len {
218                            let packet = stream.try_next().await.map_err(|e| ReadError {
219                                context: ErrorContext::DefaultImpl,
220                                kind: e.into(),
221                            })?.ok_or_else(|| ReadError {
222                                context: ErrorContext::DefaultImpl,
223                                kind: ReadErrorKind::EndOfStream,
224                            })?;
225                            if let tungstenite021::Message::Binary(data) = packet {
226                                buf.extend_from_slice(&data);
227                            } else {
228                                return Err(ReadError {
229                                    context: ErrorContext::DefaultImpl,
230                                    kind: ReadErrorKind::MessageKind021(packet),
231                                })
232                            }
233                        }
234                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
235                            context: ErrorContext::WebSocket {
236                                source: Box::new(context),
237                            },
238                            kind,
239                        })
240                    }
241                    _ => Err(ReadError {
242                        context: ErrorContext::DefaultImpl,
243                        kind: ReadErrorKind::WebSocketTextMessage024(data),
244                    }),
245                },
246                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
247                    context: ErrorContext::WebSocket {
248                        source: Box::new(context),
249                    },
250                    kind,
251                }),
252                _ => Err(ReadError {
253                    context: ErrorContext::DefaultImpl,
254                    kind: ReadErrorKind::MessageKind021(packet),
255                }),
256            }
257        })
258    }
259
260    #[cfg(feature = "tokio-tungstenite024")]
261    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
262    /// Reads a value of this type from a `tokio-tungstenite` websocket.
263    ///
264    /// # Cancellation safety
265    ///
266    /// The default implementation of this method is not cancellation safe.
267    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
268        Box::pin(async move {
269            let packet = stream.try_next().await.map_err(|e| ReadError {
270                context: ErrorContext::DefaultImpl,
271                kind: e.into(),
272            })?.ok_or_else(|| ReadError {
273                context: ErrorContext::DefaultImpl,
274                kind: ReadErrorKind::EndOfStream,
275            })?;
276            match packet {
277                tungstenite024::Message::Text(data) => match data.chars().next() {
278                    Some('m') => {
279                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
280                            context: ErrorContext::DefaultImpl,
281                            kind: e.into(),
282                        })?;
283                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
284                            context: ErrorContext::DefaultImpl,
285                            kind: e.into(),
286                        })?;
287                        while buf.len() < len {
288                            let packet = stream.try_next().await.map_err(|e| ReadError {
289                                context: ErrorContext::DefaultImpl,
290                                kind: e.into(),
291                            })?.ok_or_else(|| ReadError {
292                                context: ErrorContext::DefaultImpl,
293                                kind: ReadErrorKind::EndOfStream,
294                            })?;
295                            if let tungstenite024::Message::Binary(data) = packet {
296                                buf.extend_from_slice(&data);
297                            } else {
298                                return Err(ReadError {
299                                    context: ErrorContext::DefaultImpl,
300                                    kind: ReadErrorKind::MessageKind024(packet),
301                                })
302                            }
303                        }
304                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
305                            context: ErrorContext::WebSocket {
306                                source: Box::new(context),
307                            },
308                            kind,
309                        })
310                    }
311                    _ => Err(ReadError {
312                        context: ErrorContext::DefaultImpl,
313                        kind: ReadErrorKind::WebSocketTextMessage024(data),
314                    }),
315                },
316                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
317                    context: ErrorContext::WebSocket {
318                        source: Box::new(context),
319                    },
320                    kind,
321                }),
322                _ => Err(ReadError {
323                    context: ErrorContext::DefaultImpl,
324                    kind: ReadErrorKind::MessageKind024(packet),
325                }),
326            }
327        })
328    }
329
330    #[cfg(feature = "tokio-tungstenite027")]
331    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
332    /// Reads a value of this type from a `tokio-tungstenite` websocket.
333    ///
334    /// # Cancellation safety
335    ///
336    /// The default implementation of this method is not cancellation safe.
337    fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
338        Box::pin(async move {
339            let packet = stream.try_next().await.map_err(|e| ReadError {
340                context: ErrorContext::DefaultImpl,
341                kind: e.into(),
342            })?.ok_or_else(|| ReadError {
343                context: ErrorContext::DefaultImpl,
344                kind: ReadErrorKind::EndOfStream,
345            })?;
346            match packet {
347                tungstenite027::Message::Text(data) => match data.chars().next() {
348                    Some('m') => {
349                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
350                            context: ErrorContext::DefaultImpl,
351                            kind: e.into(),
352                        })?;
353                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
354                            context: ErrorContext::DefaultImpl,
355                            kind: e.into(),
356                        })?;
357                        while buf.len() < len {
358                            let packet = stream.try_next().await.map_err(|e| ReadError {
359                                context: ErrorContext::DefaultImpl,
360                                kind: e.into(),
361                            })?.ok_or_else(|| ReadError {
362                                context: ErrorContext::DefaultImpl,
363                                kind: ReadErrorKind::EndOfStream,
364                            })?;
365                            if let tungstenite027::Message::Binary(data) = packet {
366                                buf.extend_from_slice(&data);
367                            } else {
368                                return Err(ReadError {
369                                    context: ErrorContext::DefaultImpl,
370                                    kind: ReadErrorKind::MessageKind027(packet),
371                                })
372                            }
373                        }
374                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
375                            context: ErrorContext::WebSocket {
376                                source: Box::new(context),
377                            },
378                            kind,
379                        })
380                    }
381                    _ => Err(ReadError {
382                        context: ErrorContext::DefaultImpl,
383                        kind: ReadErrorKind::WebSocketTextMessage027(data),
384                    }),
385                },
386                tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
387                    context: ErrorContext::WebSocket {
388                        source: Box::new(context),
389                    },
390                    kind,
391                }),
392                _ => Err(ReadError {
393                    context: ErrorContext::DefaultImpl,
394                    kind: ReadErrorKind::MessageKind027(packet),
395                }),
396            }
397        })
398    }
399
400    #[cfg(feature = "tokio-tungstenite021")]
401    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
402    /// Writes a value of this type to a `tokio-tungstenite` websocket.
403    ///
404    /// # Cancellation safety
405    ///
406    /// The default implementation of this method is not cancellation safe.
407    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
408    where Self: Sync {
409        Box::pin(async move {
410            let mut buf = Vec::default();
411            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
412                context: ErrorContext::WebSocket {
413                    source: Box::new(context),
414                },
415                kind,
416            })?;
417            if buf.len() <= WS_MAX_MESSAGE_SIZE {
418                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
419                    context: ErrorContext::DefaultImpl,
420                    kind: e.into(),
421                })?;
422            } else {
423                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
424                    context: ErrorContext::DefaultImpl,
425                    kind: e.into(),
426                })?;
427                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
428                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
429                        context: ErrorContext::DefaultImpl,
430                        kind: e.into(),
431                    })?;
432                }
433            }
434            Ok(())
435        })
436    }
437
438    #[cfg(feature = "tokio-tungstenite024")]
439    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
440    /// Writes a value of this type to a `tokio-tungstenite` websocket.
441    ///
442    /// # Cancellation safety
443    ///
444    /// The default implementation of this method is not cancellation safe.
445    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
446    where Self: Sync {
447        Box::pin(async move {
448            let mut buf = Vec::default();
449            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
450                context: ErrorContext::WebSocket {
451                    source: Box::new(context),
452                },
453                kind,
454            })?;
455            if buf.len() <= WS_MAX_MESSAGE_SIZE {
456                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
457                    context: ErrorContext::DefaultImpl,
458                    kind: e.into(),
459                })?;
460            } else {
461                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
462                    context: ErrorContext::DefaultImpl,
463                    kind: e.into(),
464                })?;
465                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
466                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
467                        context: ErrorContext::DefaultImpl,
468                        kind: e.into(),
469                    })?;
470                }
471            }
472            Ok(())
473        })
474    }
475
476    #[cfg(feature = "tokio-tungstenite027")]
477    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
478    /// Writes a value of this type to a `tokio-tungstenite` websocket.
479    ///
480    /// # Cancellation safety
481    ///
482    /// The default implementation of this method is not cancellation safe.
483    fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
484    where Self: Sync {
485        Box::pin(async move {
486            let mut buf = Vec::default();
487            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
488                context: ErrorContext::WebSocket {
489                    source: Box::new(context),
490                },
491                kind,
492            })?;
493            if buf.len() <= WS_MAX_MESSAGE_SIZE {
494                sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
495                    context: ErrorContext::DefaultImpl,
496                    kind: e.into(),
497                })?;
498            } else {
499                sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
500                    context: ErrorContext::DefaultImpl,
501                    kind: e.into(),
502                })?;
503                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
504                    sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
505                        context: ErrorContext::DefaultImpl,
506                        kind: e.into(),
507                    })?;
508                }
509            }
510            Ok(())
511        })
512    }
513
514    #[cfg(feature = "tokio-tungstenite021")]
515    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
516    /// Reads a value of this type from a [`tungstenite021`] websocket.
517    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
518        let packet = websocket.read().map_err(|e| ReadError {
519            context: ErrorContext::DefaultImpl,
520            kind: e.into(),
521        })?;
522        match packet {
523            tungstenite021::Message::Text(data) => match data.chars().next() {
524                Some('m') => {
525                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
526                        context: ErrorContext::DefaultImpl,
527                        kind: e.into(),
528                    })?;
529                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
530                        context: ErrorContext::DefaultImpl,
531                        kind: e.into(),
532                    })?;
533                    while buf.len() < len {
534                        let packet = websocket.read().map_err(|e| ReadError {
535                            context: ErrorContext::DefaultImpl,
536                            kind: e.into(),
537                        })?;
538                        if let tungstenite021::Message::Binary(data) = packet {
539                            buf.extend_from_slice(&data);
540                        } else {
541                            return Err(ReadError {
542                                context: ErrorContext::DefaultImpl,
543                                kind: ReadErrorKind::MessageKind021(packet),
544                            })
545                        }
546                    }
547                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
548                        context: ErrorContext::WebSocket {
549                            source: Box::new(context),
550                        },
551                        kind,
552                    })
553                }
554                _ => return Err(ReadError {
555                    context: ErrorContext::DefaultImpl,
556                    kind: ReadErrorKind::WebSocketTextMessage024(data),
557                }),
558            },
559            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
560                context: ErrorContext::WebSocket {
561                    source: Box::new(context),
562                },
563                kind,
564            }),
565            _ => Err(ReadError {
566                context: ErrorContext::DefaultImpl,
567                kind: ReadErrorKind::MessageKind021(packet),
568            }),
569        }
570    }
571
572    #[cfg(feature = "tokio-tungstenite024")]
573    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
574    /// Reads a value of this type from a [`tungstenite024`] websocket.
575    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
576        let packet = websocket.read().map_err(|e| ReadError {
577            context: ErrorContext::DefaultImpl,
578            kind: e.into(),
579        })?;
580        match packet {
581            tungstenite024::Message::Text(data) => match data.chars().next() {
582                Some('m') => {
583                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
584                        context: ErrorContext::DefaultImpl,
585                        kind: e.into(),
586                    })?;
587                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
588                        context: ErrorContext::DefaultImpl,
589                        kind: e.into(),
590                    })?;
591                    while buf.len() < len {
592                        let packet = websocket.read().map_err(|e| ReadError {
593                            context: ErrorContext::DefaultImpl,
594                            kind: e.into(),
595                        })?;
596                        if let tungstenite024::Message::Binary(data) = packet {
597                            buf.extend_from_slice(&data);
598                        } else {
599                            return Err(ReadError {
600                                context: ErrorContext::DefaultImpl,
601                                kind: ReadErrorKind::MessageKind024(packet),
602                            })
603                        }
604                    }
605                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
606                        context: ErrorContext::WebSocket {
607                            source: Box::new(context),
608                        },
609                        kind,
610                    })
611                }
612                _ => return Err(ReadError {
613                    context: ErrorContext::DefaultImpl,
614                    kind: ReadErrorKind::WebSocketTextMessage024(data),
615                }),
616            },
617            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
618                context: ErrorContext::WebSocket {
619                    source: Box::new(context),
620                },
621                kind,
622            }),
623            _ => Err(ReadError {
624                context: ErrorContext::DefaultImpl,
625                kind: ReadErrorKind::MessageKind024(packet),
626            }),
627        }
628    }
629
630    #[cfg(feature = "tokio-tungstenite027")]
631    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
632    /// Reads a value of this type from a [`tungstenite027`] websocket.
633    fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
634        let packet = websocket.read().map_err(|e| ReadError {
635            context: ErrorContext::DefaultImpl,
636            kind: e.into(),
637        })?;
638        match packet {
639            tungstenite027::Message::Text(data) => match data.chars().next() {
640                Some('m') => {
641                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
642                        context: ErrorContext::DefaultImpl,
643                        kind: e.into(),
644                    })?;
645                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
646                        context: ErrorContext::DefaultImpl,
647                        kind: e.into(),
648                    })?;
649                    while buf.len() < len {
650                        let packet = websocket.read().map_err(|e| ReadError {
651                            context: ErrorContext::DefaultImpl,
652                            kind: e.into(),
653                        })?;
654                        if let tungstenite027::Message::Binary(data) = packet {
655                            buf.extend_from_slice(&data);
656                        } else {
657                            return Err(ReadError {
658                                context: ErrorContext::DefaultImpl,
659                                kind: ReadErrorKind::MessageKind027(packet),
660                            })
661                        }
662                    }
663                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
664                        context: ErrorContext::WebSocket {
665                            source: Box::new(context),
666                        },
667                        kind,
668                    })
669                }
670                _ => return Err(ReadError {
671                    context: ErrorContext::DefaultImpl,
672                    kind: ReadErrorKind::WebSocketTextMessage027(data),
673                }),
674            },
675            tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
676                context: ErrorContext::WebSocket {
677                    source: Box::new(context),
678                },
679                kind,
680            }),
681            _ => Err(ReadError {
682                context: ErrorContext::DefaultImpl,
683                kind: ReadErrorKind::MessageKind027(packet),
684            }),
685        }
686    }
687
688    #[cfg(feature = "tokio-tungstenite021")]
689    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
690    /// Writes a value of this type to a [`tungstenite021`] websocket.
691    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
692        let mut buf = Vec::default();
693        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
694            context: ErrorContext::WebSocket {
695                source: Box::new(context),
696            },
697            kind,
698        })?;
699        if buf.len() <= WS_MAX_MESSAGE_SIZE {
700            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
701                context: ErrorContext::DefaultImpl,
702                kind: e.into(),
703            })?;
704        } else {
705            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
706                context: ErrorContext::DefaultImpl,
707                kind: e.into(),
708            })?;
709            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
710                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
711                    context: ErrorContext::DefaultImpl,
712                    kind: e.into(),
713                })?;
714            }
715        }
716        websocket.flush().map_err(|e| WriteError {
717            context: ErrorContext::DefaultImpl,
718            kind: e.into(),
719        })?;
720        Ok(())
721    }
722
723    #[cfg(feature = "tokio-tungstenite024")]
724    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
725    /// Writes a value of this type to a [`tungstenite024`] websocket.
726    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
727        let mut buf = Vec::default();
728        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
729            context: ErrorContext::WebSocket {
730                source: Box::new(context),
731            },
732            kind,
733        })?;
734        if buf.len() <= WS_MAX_MESSAGE_SIZE {
735            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
736                context: ErrorContext::DefaultImpl,
737                kind: e.into(),
738            })?;
739        } else {
740            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
741                context: ErrorContext::DefaultImpl,
742                kind: e.into(),
743            })?;
744            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
745                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
746                    context: ErrorContext::DefaultImpl,
747                    kind: e.into(),
748                })?;
749            }
750        }
751        websocket.flush().map_err(|e| WriteError {
752            context: ErrorContext::DefaultImpl,
753            kind: e.into(),
754        })?;
755        Ok(())
756    }
757
758    #[cfg(feature = "tokio-tungstenite027")]
759    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
760    /// Writes a value of this type to a [`tungstenite027`] websocket.
761    fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
762        let mut buf = Vec::default();
763        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
764            context: ErrorContext::WebSocket {
765                source: Box::new(context),
766            },
767            kind,
768        })?;
769        if buf.len() <= WS_MAX_MESSAGE_SIZE {
770            websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
771                context: ErrorContext::DefaultImpl,
772                kind: e.into(),
773            })?;
774        } else {
775            websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
776                context: ErrorContext::DefaultImpl,
777                kind: e.into(),
778            })?;
779            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
780                websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
781                    context: ErrorContext::DefaultImpl,
782                    kind: e.into(),
783                })?;
784            }
785        }
786        websocket.flush().map_err(|e| WriteError {
787            context: ErrorContext::DefaultImpl,
788            kind: e.into(),
789        })?;
790        Ok(())
791    }
792
793    #[cfg(feature = "tokio-tungstenite021")]
794    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
795    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
796    ///
797    /// This can be used to get around drop glue issues that might arise with `read_ws`.
798    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
799        Box::pin(async move {
800            let value = Self::read_ws021(&mut stream).await?;
801            Ok((stream, value))
802        })
803    }
804
805    #[cfg(feature = "tokio-tungstenite024")]
806    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
807    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
808    ///
809    /// This can be used to get around drop glue issues that might arise with `read_ws`.
810    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
811        Box::pin(async move {
812            let value = Self::read_ws024(&mut stream).await?;
813            Ok((stream, value))
814        })
815    }
816
817    #[cfg(feature = "tokio-tungstenite027")]
818    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
819    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
820    ///
821    /// This can be used to get around drop glue issues that might arise with `read_ws`.
822    fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
823        Box::pin(async move {
824            let value = Self::read_ws027(&mut stream).await?;
825            Ok((stream, value))
826        })
827    }
828}
829
830/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
831///
832/// Useful for WebSocket connections where the message type per direction is always the same.
833#[cfg(feature = "tokio-tungstenite021")]
834#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
835pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
836    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
837    let (sink, stream) = sock.split();
838    Ok((
839        sink.sink_map_err(|e| WriteError {
840            context: ErrorContext::WebSocketSink,
841            kind: e.into(),
842        }).with_flat_map::<W, _, _>(|msg| {
843            let mut buf = Vec::default();
844            match msg.write_sync(&mut buf) {
845                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
846                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
847                } else {
848                    Either::Right(stream::iter(
849                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
850                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
851                        .collect::<Vec<_>>()
852                    ))
853                }.map(Ok)),
854                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
855                    context: ErrorContext::WebSocket {
856                        source: Box::new(context),
857                    },
858                    kind,
859                }))),
860            }
861        }),
862        stream.scan(None, |state, res| {
863            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
864                let packet = res.map_err(|e| ReadError {
865                    context: ErrorContext::WebSocketStream,
866                    kind: e.into(),
867                })?;
868                Ok(if let Some((len, buf)) = state {
869                    if let tungstenite021::Message::Binary(data) = packet {
870                        buf.extend_from_slice(&data);
871                    } else {
872                        return Err(ReadError {
873                            context: ErrorContext::DefaultImpl,
874                            kind: ReadErrorKind::MessageKind021(packet),
875                        })
876                    }
877                    if buf.len() >= *len {
878                        let buf = mem::take(buf);
879                        *state = None;
880                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
881                            context: ErrorContext::WebSocket {
882                                source: Box::new(context),
883                            },
884                            kind,
885                        })?)))
886                    } else {
887                        Either::Left(stream::empty())
888                    }
889                } else {
890                    match packet {
891                        tungstenite021::Message::Text(data) => match data.chars().next() {
892                            Some('m') => {
893                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
894                                    context: ErrorContext::DefaultImpl,
895                                    kind: e.into(),
896                                })?;
897                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
898                                    context: ErrorContext::DefaultImpl,
899                                    kind: e.into(),
900                                })?;
901                                *state = Some((len, buf));
902                                Either::Left(stream::empty())
903                            }
904                            _ => return Err(ReadError {
905                                context: ErrorContext::DefaultImpl,
906                                kind: ReadErrorKind::WebSocketTextMessage024(data),
907                            }),
908                        },
909                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
910                            context: ErrorContext::WebSocket {
911                                source: Box::new(context),
912                            },
913                            kind,
914                        })?))),
915                        _ => return Err(ReadError {
916                            context: ErrorContext::DefaultImpl,
917                            kind: ReadErrorKind::MessageKind021(packet),
918                        }),
919                    }
920                })
921            }
922
923            future::ready(Some(scanner(state, res)))
924        }).try_flatten(),
925    ))
926}
927
928/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
929///
930/// Useful for WebSocket connections where the message type per direction is always the same.
931#[cfg(feature = "tokio-tungstenite024")]
932#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
933pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
934    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
935    let (sink, stream) = sock.split();
936    Ok((
937        sink.sink_map_err(|e| WriteError {
938            context: ErrorContext::WebSocketSink,
939            kind: e.into(),
940        }).with_flat_map::<W, _, _>(|msg| {
941            let mut buf = Vec::default();
942            match msg.write_sync(&mut buf) {
943                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
944                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
945                } else {
946                    Either::Right(stream::iter(
947                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
948                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
949                        .collect::<Vec<_>>()
950                    ))
951                }.map(Ok)),
952                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
953                    context: ErrorContext::WebSocket {
954                        source: Box::new(context),
955                    },
956                    kind,
957                }))),
958            }
959        }),
960        stream.scan(None, |state, res| {
961            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
962                let packet = res.map_err(|e| ReadError {
963                    context: ErrorContext::WebSocketStream,
964                    kind: e.into(),
965                })?;
966                Ok(if let Some((len, buf)) = state {
967                    if let tungstenite024::Message::Binary(data) = packet {
968                        buf.extend_from_slice(&data);
969                    } else {
970                        return Err(ReadError {
971                            context: ErrorContext::DefaultImpl,
972                            kind: ReadErrorKind::MessageKind024(packet),
973                        })
974                    }
975                    if buf.len() >= *len {
976                        let buf = mem::take(buf);
977                        *state = None;
978                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
979                            context: ErrorContext::WebSocket {
980                                source: Box::new(context),
981                            },
982                            kind,
983                        })?)))
984                    } else {
985                        Either::Left(stream::empty())
986                    }
987                } else {
988                    match packet {
989                        tungstenite024::Message::Text(data) => match data.chars().next() {
990                            Some('m') => {
991                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
992                                    context: ErrorContext::DefaultImpl,
993                                    kind: e.into(),
994                                })?;
995                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
996                                    context: ErrorContext::DefaultImpl,
997                                    kind: e.into(),
998                                })?;
999                                *state = Some((len, buf));
1000                                Either::Left(stream::empty())
1001                            }
1002                            _ => return Err(ReadError {
1003                                context: ErrorContext::DefaultImpl,
1004                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1005                            }),
1006                        },
1007                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1008                            context: ErrorContext::WebSocket {
1009                                source: Box::new(context),
1010                            },
1011                            kind,
1012                        })?))),
1013                        _ => return Err(ReadError {
1014                            context: ErrorContext::DefaultImpl,
1015                            kind: ReadErrorKind::MessageKind024(packet),
1016                        }),
1017                    }
1018                })
1019            }
1020
1021            future::ready(Some(scanner(state, res)))
1022        }).try_flatten(),
1023    ))
1024}
1025
1026/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1027///
1028/// Useful for WebSocket connections where the message type per direction is always the same.
1029#[cfg(feature = "tokio-tungstenite027")]
1030#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1031pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1032    let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1033    let (sink, stream) = sock.split();
1034    Ok((
1035        sink.sink_map_err(|e| WriteError {
1036            context: ErrorContext::WebSocketSink,
1037            kind: e.into(),
1038        }).with_flat_map::<W, _, _>(|msg| {
1039            let mut buf = Vec::default();
1040            match msg.write_sync(&mut buf) {
1041                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1042                    Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1043                } else {
1044                    Either::Right(stream::iter(
1045                        iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1046                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1047                        .collect::<Vec<_>>()
1048                    ))
1049                }.map(Ok)),
1050                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1051                    context: ErrorContext::WebSocket {
1052                        source: Box::new(context),
1053                    },
1054                    kind,
1055                }))),
1056            }
1057        }),
1058        stream.scan(None, |state, res| {
1059            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1060                let packet = res.map_err(|e| ReadError {
1061                    context: ErrorContext::WebSocketStream,
1062                    kind: e.into(),
1063                })?;
1064                Ok(if let Some((len, buf)) = state {
1065                    if let tungstenite027::Message::Binary(data) = packet {
1066                        buf.extend_from_slice(&data);
1067                    } else {
1068                        return Err(ReadError {
1069                            context: ErrorContext::DefaultImpl,
1070                            kind: ReadErrorKind::MessageKind027(packet),
1071                        })
1072                    }
1073                    if buf.len() >= *len {
1074                        let buf = mem::take(buf);
1075                        *state = None;
1076                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1077                            context: ErrorContext::WebSocket {
1078                                source: Box::new(context),
1079                            },
1080                            kind,
1081                        })?)))
1082                    } else {
1083                        Either::Left(stream::empty())
1084                    }
1085                } else {
1086                    match packet {
1087                        tungstenite027::Message::Text(data) => match data.chars().next() {
1088                            Some('m') => {
1089                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1090                                    context: ErrorContext::DefaultImpl,
1091                                    kind: e.into(),
1092                                })?;
1093                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1094                                    context: ErrorContext::DefaultImpl,
1095                                    kind: e.into(),
1096                                })?;
1097                                *state = Some((len, buf));
1098                                Either::Left(stream::empty())
1099                            }
1100                            _ => return Err(ReadError {
1101                                context: ErrorContext::DefaultImpl,
1102                                kind: ReadErrorKind::WebSocketTextMessage027(data),
1103                            }),
1104                        },
1105                        tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1106                            context: ErrorContext::WebSocket {
1107                                source: Box::new(context),
1108                            },
1109                            kind,
1110                        })?))),
1111                        _ => return Err(ReadError {
1112                            context: ErrorContext::DefaultImpl,
1113                            kind: ReadErrorKind::MessageKind027(packet),
1114                        }),
1115                    }
1116                })
1117            }
1118
1119            future::ready(Some(scanner(state, res)))
1120        }).try_flatten(),
1121    ))
1122}