async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bitvec`](https://docs.rs/bitvec): [`BitVec<u8, Lsb0>`](https://docs.rs/bitvec/latest/bitvec/vec/struct.BitVec.html)
17//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
18//! * [`bytesize`](https://docs.rs/bytesize): [`ByteSize`](https://docs.rs/bytesize/latest/bytesize/struct.ByteSize.html)
19//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
20//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
21//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
22//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
23//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
24//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
25//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
26//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
27//! * [`nonempty-collections`](https://docs.rs/nonempty-collections): [`NEMap`](https://docs.rs/nonempty-collections/latest/nonempty_collections/map/struct.NEMap.html), [`NESet`](https://docs.rs/nonempty-collections/latest/nonempty_collections/set/struct.NESet.html), and [`NEVec`](https://docs.rs/nonempty-collections/latest/nonempty_collections/vector/struct.NEVec.html)
28//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
29//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
30//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
31//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
32//! * [`hematite-nbt`](https://docs.rs/hematite-nbt): [`Blob`](https://docs.rs/hematite-nbt/latest/nbt/struct.Blob.html)
33//! * [`url`](https://docs.rs/url): [`Url`](https://docs.rs/url/latest/url/struct.Url.html)
34//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
35//!
36//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
37//!
38//! * The latest release (currently [`tokio-tungstenite` 0.28](https://docs.rs/tokio-tungstenite/0.28), feature flag `tokio-tungstenite028`)
39//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
40//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
41
42use {
43    std::{
44        future::Future,
45        io::{
46            self,
47            prelude::*,
48        },
49        pin::Pin,
50    },
51    tokio::io::{
52        AsyncRead,
53        AsyncWrite,
54    },
55};
56#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] use {
57    std::{
58        iter,
59        mem,
60    },
61    fallible_collections::FallibleVec,
62    futures::{
63        Sink,
64        SinkExt as _,
65        future::{
66            self,
67            Either,
68        },
69        stream::{
70            self,
71            Stream,
72            StreamExt as _,
73            TryStreamExt as _,
74        },
75    },
76};
77#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
78#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
79#[cfg(feature = "tokio-tungstenite028")] use tokio_tungstenite028::tungstenite as tungstenite028;
80pub use {
81    async_proto_derive::{
82        Protocol,
83        bitflags,
84    },
85    crate::error::*,
86};
87#[doc(hidden)] pub use tokio; // used in proc macro
88
89mod error;
90mod impls;
91
92/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
93#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
94
95/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
96pub trait Protocol: Sized {
97    /// Reads a value of this type from an async stream.
98    ///
99    /// # Cancellation safety
100    ///
101    /// Implementations of this method are generally not cancellation safe.
102    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
103    /// Writes a value of this type to an async sink.
104    ///
105    /// # Cancellation safety
106    ///
107    /// Implementations of this method are generally not cancellation safe.
108    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
109    /// Reads a value of this type from a sync stream.
110    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
111    /// Writes a value of this type to a sync sink.
112    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
113
114    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
115    ///
116    /// This can be used to get around drop glue issues that might arise with `read`.
117    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
118        Box::pin(async move {
119            let value = Self::read(&mut stream).await?;
120            Ok((stream, value))
121        })
122    }
123
124    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
125    ///
126    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
127    ///
128    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
129    ///
130    /// # Example
131    ///
132    /// ```
133    /// use {
134    ///     std::{
135    ///         io,
136    ///         net::TcpStream,
137    ///     },
138    ///     async_proto::Protocol,
139    /// };
140    ///
141    /// struct Client {
142    ///     tcp_stream: TcpStream,
143    ///     buf: Vec<u8>,
144    /// }
145    ///
146    /// impl Client {
147    ///     fn new(tcp_stream: TcpStream) -> Self {
148    ///         Self {
149    ///             tcp_stream,
150    ///             buf: Vec::default(),
151    ///         }
152    ///     }
153    ///
154    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
155    ///         self.tcp_stream.set_nonblocking(true)?;
156    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
157    ///     }
158    ///
159    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
160    ///         self.tcp_stream.set_nonblocking(false)?;
161    ///         msg.write_sync(&mut self.tcp_stream)?;
162    ///         Ok(())
163    ///     }
164    /// }
165    /// ```
166    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
167        let mut temp_buf = vec![0; 8];
168        loop {
169            let mut slice = &mut &**buf;
170            match Self::read_sync(&mut slice) {
171                Ok(value) => {
172                    let value_len = slice.len();
173                    buf.drain(..buf.len() - value_len);
174                    return Ok(Some(value))
175                }
176                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
177                Err(e) => return Err(e),
178            }
179            match stream.read(&mut temp_buf) {
180                Ok(0) => return Err(ReadError {
181                    context: ErrorContext::DefaultImpl,
182                    kind: ReadErrorKind::EndOfStream,
183                }),
184                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
185                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
186                Err(e) => return Err(ReadError {
187                    context: ErrorContext::DefaultImpl,
188                    kind: e.into(),
189                }),
190            }
191        }
192    }
193
194    #[cfg(feature = "tokio-tungstenite021")]
195    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
196    /// Reads a value of this type from a `tokio-tungstenite` websocket.
197    ///
198    /// # Cancellation safety
199    ///
200    /// The default implementation of this method is not cancellation safe.
201    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
202        Box::pin(async move {
203            let packet = stream.try_next().await.map_err(|e| ReadError {
204                context: ErrorContext::DefaultImpl,
205                kind: e.into(),
206            })?.ok_or_else(|| ReadError {
207                context: ErrorContext::DefaultImpl,
208                kind: ReadErrorKind::EndOfStream,
209            })?;
210            match packet {
211                tungstenite021::Message::Text(data) => match data.chars().next() {
212                    Some('m') => {
213                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
214                            context: ErrorContext::DefaultImpl,
215                            kind: e.into(),
216                        })?;
217                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
218                            context: ErrorContext::DefaultImpl,
219                            kind: e.into(),
220                        })?;
221                        while buf.len() < len {
222                            let packet = stream.try_next().await.map_err(|e| ReadError {
223                                context: ErrorContext::DefaultImpl,
224                                kind: e.into(),
225                            })?.ok_or_else(|| ReadError {
226                                context: ErrorContext::DefaultImpl,
227                                kind: ReadErrorKind::EndOfStream,
228                            })?;
229                            if let tungstenite021::Message::Binary(data) = packet {
230                                buf.extend_from_slice(&data);
231                            } else {
232                                return Err(ReadError {
233                                    context: ErrorContext::DefaultImpl,
234                                    kind: ReadErrorKind::MessageKind021(packet),
235                                })
236                            }
237                        }
238                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
239                            context: ErrorContext::WebSocket {
240                                source: Box::new(context),
241                            },
242                            kind,
243                        })
244                    }
245                    _ => Err(ReadError {
246                        context: ErrorContext::DefaultImpl,
247                        kind: ReadErrorKind::WebSocketTextMessage024(data),
248                    }),
249                },
250                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
251                    context: ErrorContext::WebSocket {
252                        source: Box::new(context),
253                    },
254                    kind,
255                }),
256                _ => Err(ReadError {
257                    context: ErrorContext::DefaultImpl,
258                    kind: ReadErrorKind::MessageKind021(packet),
259                }),
260            }
261        })
262    }
263
264    #[cfg(feature = "tokio-tungstenite024")]
265    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
266    /// Reads a value of this type from a `tokio-tungstenite` websocket.
267    ///
268    /// # Cancellation safety
269    ///
270    /// The default implementation of this method is not cancellation safe.
271    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
272        Box::pin(async move {
273            let packet = stream.try_next().await.map_err(|e| ReadError {
274                context: ErrorContext::DefaultImpl,
275                kind: e.into(),
276            })?.ok_or_else(|| ReadError {
277                context: ErrorContext::DefaultImpl,
278                kind: ReadErrorKind::EndOfStream,
279            })?;
280            match packet {
281                tungstenite024::Message::Text(data) => match data.chars().next() {
282                    Some('m') => {
283                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
284                            context: ErrorContext::DefaultImpl,
285                            kind: e.into(),
286                        })?;
287                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
288                            context: ErrorContext::DefaultImpl,
289                            kind: e.into(),
290                        })?;
291                        while buf.len() < len {
292                            let packet = stream.try_next().await.map_err(|e| ReadError {
293                                context: ErrorContext::DefaultImpl,
294                                kind: e.into(),
295                            })?.ok_or_else(|| ReadError {
296                                context: ErrorContext::DefaultImpl,
297                                kind: ReadErrorKind::EndOfStream,
298                            })?;
299                            if let tungstenite024::Message::Binary(data) = packet {
300                                buf.extend_from_slice(&data);
301                            } else {
302                                return Err(ReadError {
303                                    context: ErrorContext::DefaultImpl,
304                                    kind: ReadErrorKind::MessageKind024(packet),
305                                })
306                            }
307                        }
308                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
309                            context: ErrorContext::WebSocket {
310                                source: Box::new(context),
311                            },
312                            kind,
313                        })
314                    }
315                    _ => Err(ReadError {
316                        context: ErrorContext::DefaultImpl,
317                        kind: ReadErrorKind::WebSocketTextMessage024(data),
318                    }),
319                },
320                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
321                    context: ErrorContext::WebSocket {
322                        source: Box::new(context),
323                    },
324                    kind,
325                }),
326                _ => Err(ReadError {
327                    context: ErrorContext::DefaultImpl,
328                    kind: ReadErrorKind::MessageKind024(packet),
329                }),
330            }
331        })
332    }
333
334    #[cfg(feature = "tokio-tungstenite028")]
335    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
336    /// Reads a value of this type from a `tokio-tungstenite` websocket.
337    ///
338    /// # Cancellation safety
339    ///
340    /// The default implementation of this method is not cancellation safe.
341    fn read_ws028<'a, R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
342        Box::pin(async move {
343            let packet = stream.try_next().await.map_err(|e| ReadError {
344                context: ErrorContext::DefaultImpl,
345                kind: e.into(),
346            })?.ok_or_else(|| ReadError {
347                context: ErrorContext::DefaultImpl,
348                kind: ReadErrorKind::EndOfStream,
349            })?;
350            match packet {
351                tungstenite028::Message::Text(data) => match data.chars().next() {
352                    Some('m') => {
353                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
354                            context: ErrorContext::DefaultImpl,
355                            kind: e.into(),
356                        })?;
357                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
358                            context: ErrorContext::DefaultImpl,
359                            kind: e.into(),
360                        })?;
361                        while buf.len() < len {
362                            let packet = stream.try_next().await.map_err(|e| ReadError {
363                                context: ErrorContext::DefaultImpl,
364                                kind: e.into(),
365                            })?.ok_or_else(|| ReadError {
366                                context: ErrorContext::DefaultImpl,
367                                kind: ReadErrorKind::EndOfStream,
368                            })?;
369                            if let tungstenite028::Message::Binary(data) = packet {
370                                buf.extend_from_slice(&data);
371                            } else {
372                                return Err(ReadError {
373                                    context: ErrorContext::DefaultImpl,
374                                    kind: ReadErrorKind::MessageKind028(packet),
375                                })
376                            }
377                        }
378                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
379                            context: ErrorContext::WebSocket {
380                                source: Box::new(context),
381                            },
382                            kind,
383                        })
384                    }
385                    _ => Err(ReadError {
386                        context: ErrorContext::DefaultImpl,
387                        kind: ReadErrorKind::WebSocketTextMessage028(data),
388                    }),
389                },
390                tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
391                    context: ErrorContext::WebSocket {
392                        source: Box::new(context),
393                    },
394                    kind,
395                }),
396                _ => Err(ReadError {
397                    context: ErrorContext::DefaultImpl,
398                    kind: ReadErrorKind::MessageKind028(packet),
399                }),
400            }
401        })
402    }
403
404    #[cfg(feature = "tokio-tungstenite021")]
405    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
406    /// Writes a value of this type to a `tokio-tungstenite` websocket.
407    ///
408    /// # Cancellation safety
409    ///
410    /// The default implementation of this method is not cancellation safe.
411    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
412    where Self: Sync {
413        Box::pin(async move {
414            let mut buf = Vec::default();
415            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
416                context: ErrorContext::WebSocket {
417                    source: Box::new(context),
418                },
419                kind,
420            })?;
421            if buf.len() <= WS_MAX_MESSAGE_SIZE {
422                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
423                    context: ErrorContext::DefaultImpl,
424                    kind: e.into(),
425                })?;
426            } else {
427                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
428                    context: ErrorContext::DefaultImpl,
429                    kind: e.into(),
430                })?;
431                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
432                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
433                        context: ErrorContext::DefaultImpl,
434                        kind: e.into(),
435                    })?;
436                }
437            }
438            Ok(())
439        })
440    }
441
442    #[cfg(feature = "tokio-tungstenite024")]
443    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
444    /// Writes a value of this type to a `tokio-tungstenite` websocket.
445    ///
446    /// # Cancellation safety
447    ///
448    /// The default implementation of this method is not cancellation safe.
449    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
450    where Self: Sync {
451        Box::pin(async move {
452            let mut buf = Vec::default();
453            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
454                context: ErrorContext::WebSocket {
455                    source: Box::new(context),
456                },
457                kind,
458            })?;
459            if buf.len() <= WS_MAX_MESSAGE_SIZE {
460                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
461                    context: ErrorContext::DefaultImpl,
462                    kind: e.into(),
463                })?;
464            } else {
465                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
466                    context: ErrorContext::DefaultImpl,
467                    kind: e.into(),
468                })?;
469                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
470                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
471                        context: ErrorContext::DefaultImpl,
472                        kind: e.into(),
473                    })?;
474                }
475            }
476            Ok(())
477        })
478    }
479
480    #[cfg(feature = "tokio-tungstenite028")]
481    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
482    /// Writes a value of this type to a `tokio-tungstenite` websocket.
483    ///
484    /// # Cancellation safety
485    ///
486    /// The default implementation of this method is not cancellation safe.
487    fn write_ws028<'a, W: Sink<tungstenite028::Message, Error = tungstenite028::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
488    where Self: Sync {
489        Box::pin(async move {
490            let mut buf = Vec::default();
491            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
492                context: ErrorContext::WebSocket {
493                    source: Box::new(context),
494                },
495                kind,
496            })?;
497            if buf.len() <= WS_MAX_MESSAGE_SIZE {
498                sink.send(tungstenite028::Message::binary(buf)).await.map_err(|e| WriteError {
499                    context: ErrorContext::DefaultImpl,
500                    kind: e.into(),
501                })?;
502            } else {
503                sink.send(tungstenite028::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
504                    context: ErrorContext::DefaultImpl,
505                    kind: e.into(),
506                })?;
507                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
508                    sink.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
509                        context: ErrorContext::DefaultImpl,
510                        kind: e.into(),
511                    })?;
512                }
513            }
514            Ok(())
515        })
516    }
517
518    #[cfg(feature = "tokio-tungstenite021")]
519    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
520    /// Reads a value of this type from a [`tungstenite021`] websocket.
521    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
522        let packet = websocket.read().map_err(|e| ReadError {
523            context: ErrorContext::DefaultImpl,
524            kind: e.into(),
525        })?;
526        match packet {
527            tungstenite021::Message::Text(data) => match data.chars().next() {
528                Some('m') => {
529                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
530                        context: ErrorContext::DefaultImpl,
531                        kind: e.into(),
532                    })?;
533                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
534                        context: ErrorContext::DefaultImpl,
535                        kind: e.into(),
536                    })?;
537                    while buf.len() < len {
538                        let packet = websocket.read().map_err(|e| ReadError {
539                            context: ErrorContext::DefaultImpl,
540                            kind: e.into(),
541                        })?;
542                        if let tungstenite021::Message::Binary(data) = packet {
543                            buf.extend_from_slice(&data);
544                        } else {
545                            return Err(ReadError {
546                                context: ErrorContext::DefaultImpl,
547                                kind: ReadErrorKind::MessageKind021(packet),
548                            })
549                        }
550                    }
551                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
552                        context: ErrorContext::WebSocket {
553                            source: Box::new(context),
554                        },
555                        kind,
556                    })
557                }
558                _ => return Err(ReadError {
559                    context: ErrorContext::DefaultImpl,
560                    kind: ReadErrorKind::WebSocketTextMessage024(data),
561                }),
562            },
563            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
564                context: ErrorContext::WebSocket {
565                    source: Box::new(context),
566                },
567                kind,
568            }),
569            _ => Err(ReadError {
570                context: ErrorContext::DefaultImpl,
571                kind: ReadErrorKind::MessageKind021(packet),
572            }),
573        }
574    }
575
576    #[cfg(feature = "tokio-tungstenite024")]
577    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
578    /// Reads a value of this type from a [`tungstenite024`] websocket.
579    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
580        let packet = websocket.read().map_err(|e| ReadError {
581            context: ErrorContext::DefaultImpl,
582            kind: e.into(),
583        })?;
584        match packet {
585            tungstenite024::Message::Text(data) => match data.chars().next() {
586                Some('m') => {
587                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
588                        context: ErrorContext::DefaultImpl,
589                        kind: e.into(),
590                    })?;
591                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
592                        context: ErrorContext::DefaultImpl,
593                        kind: e.into(),
594                    })?;
595                    while buf.len() < len {
596                        let packet = websocket.read().map_err(|e| ReadError {
597                            context: ErrorContext::DefaultImpl,
598                            kind: e.into(),
599                        })?;
600                        if let tungstenite024::Message::Binary(data) = packet {
601                            buf.extend_from_slice(&data);
602                        } else {
603                            return Err(ReadError {
604                                context: ErrorContext::DefaultImpl,
605                                kind: ReadErrorKind::MessageKind024(packet),
606                            })
607                        }
608                    }
609                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
610                        context: ErrorContext::WebSocket {
611                            source: Box::new(context),
612                        },
613                        kind,
614                    })
615                }
616                _ => return Err(ReadError {
617                    context: ErrorContext::DefaultImpl,
618                    kind: ReadErrorKind::WebSocketTextMessage024(data),
619                }),
620            },
621            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
622                context: ErrorContext::WebSocket {
623                    source: Box::new(context),
624                },
625                kind,
626            }),
627            _ => Err(ReadError {
628                context: ErrorContext::DefaultImpl,
629                kind: ReadErrorKind::MessageKind024(packet),
630            }),
631        }
632    }
633
634    #[cfg(feature = "tokio-tungstenite028")]
635    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
636    /// Reads a value of this type from a [`tungstenite028`] websocket.
637    fn read_ws_sync028(websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
638        let packet = websocket.read().map_err(|e| ReadError {
639            context: ErrorContext::DefaultImpl,
640            kind: e.into(),
641        })?;
642        match packet {
643            tungstenite028::Message::Text(data) => match data.chars().next() {
644                Some('m') => {
645                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
646                        context: ErrorContext::DefaultImpl,
647                        kind: e.into(),
648                    })?;
649                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
650                        context: ErrorContext::DefaultImpl,
651                        kind: e.into(),
652                    })?;
653                    while buf.len() < len {
654                        let packet = websocket.read().map_err(|e| ReadError {
655                            context: ErrorContext::DefaultImpl,
656                            kind: e.into(),
657                        })?;
658                        if let tungstenite028::Message::Binary(data) = packet {
659                            buf.extend_from_slice(&data);
660                        } else {
661                            return Err(ReadError {
662                                context: ErrorContext::DefaultImpl,
663                                kind: ReadErrorKind::MessageKind028(packet),
664                            })
665                        }
666                    }
667                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
668                        context: ErrorContext::WebSocket {
669                            source: Box::new(context),
670                        },
671                        kind,
672                    })
673                }
674                _ => return Err(ReadError {
675                    context: ErrorContext::DefaultImpl,
676                    kind: ReadErrorKind::WebSocketTextMessage028(data),
677                }),
678            },
679            tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
680                context: ErrorContext::WebSocket {
681                    source: Box::new(context),
682                },
683                kind,
684            }),
685            _ => Err(ReadError {
686                context: ErrorContext::DefaultImpl,
687                kind: ReadErrorKind::MessageKind028(packet),
688            }),
689        }
690    }
691
692    #[cfg(feature = "tokio-tungstenite021")]
693    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
694    /// Writes a value of this type to a [`tungstenite021`] websocket.
695    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
696        let mut buf = Vec::default();
697        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
698            context: ErrorContext::WebSocket {
699                source: Box::new(context),
700            },
701            kind,
702        })?;
703        if buf.len() <= WS_MAX_MESSAGE_SIZE {
704            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
705                context: ErrorContext::DefaultImpl,
706                kind: e.into(),
707            })?;
708        } else {
709            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
710                context: ErrorContext::DefaultImpl,
711                kind: e.into(),
712            })?;
713            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
714                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
715                    context: ErrorContext::DefaultImpl,
716                    kind: e.into(),
717                })?;
718            }
719        }
720        websocket.flush().map_err(|e| WriteError {
721            context: ErrorContext::DefaultImpl,
722            kind: e.into(),
723        })?;
724        Ok(())
725    }
726
727    #[cfg(feature = "tokio-tungstenite024")]
728    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
729    /// Writes a value of this type to a [`tungstenite024`] websocket.
730    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
731        let mut buf = Vec::default();
732        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
733            context: ErrorContext::WebSocket {
734                source: Box::new(context),
735            },
736            kind,
737        })?;
738        if buf.len() <= WS_MAX_MESSAGE_SIZE {
739            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
740                context: ErrorContext::DefaultImpl,
741                kind: e.into(),
742            })?;
743        } else {
744            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
745                context: ErrorContext::DefaultImpl,
746                kind: e.into(),
747            })?;
748            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
749                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
750                    context: ErrorContext::DefaultImpl,
751                    kind: e.into(),
752                })?;
753            }
754        }
755        websocket.flush().map_err(|e| WriteError {
756            context: ErrorContext::DefaultImpl,
757            kind: e.into(),
758        })?;
759        Ok(())
760    }
761
762    #[cfg(feature = "tokio-tungstenite028")]
763    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
764    /// Writes a value of this type to a [`tungstenite028`] websocket.
765    fn write_ws_sync028(&self, websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
766        let mut buf = Vec::default();
767        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
768            context: ErrorContext::WebSocket {
769                source: Box::new(context),
770            },
771            kind,
772        })?;
773        if buf.len() <= WS_MAX_MESSAGE_SIZE {
774            websocket.send(tungstenite028::Message::binary(buf)).map_err(|e| WriteError {
775                context: ErrorContext::DefaultImpl,
776                kind: e.into(),
777            })?;
778        } else {
779            websocket.send(tungstenite028::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
780                context: ErrorContext::DefaultImpl,
781                kind: e.into(),
782            })?;
783            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
784                websocket.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
785                    context: ErrorContext::DefaultImpl,
786                    kind: e.into(),
787                })?;
788            }
789        }
790        websocket.flush().map_err(|e| WriteError {
791            context: ErrorContext::DefaultImpl,
792            kind: e.into(),
793        })?;
794        Ok(())
795    }
796
797    #[cfg(feature = "tokio-tungstenite021")]
798    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
799    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
800    ///
801    /// This can be used to get around drop glue issues that might arise with `read_ws`.
802    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
803        Box::pin(async move {
804            let value = Self::read_ws021(&mut stream).await?;
805            Ok((stream, value))
806        })
807    }
808
809    #[cfg(feature = "tokio-tungstenite024")]
810    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
811    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
812    ///
813    /// This can be used to get around drop glue issues that might arise with `read_ws`.
814    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
815        Box::pin(async move {
816            let value = Self::read_ws024(&mut stream).await?;
817            Ok((stream, value))
818        })
819    }
820
821    #[cfg(feature = "tokio-tungstenite028")]
822    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
823    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
824    ///
825    /// This can be used to get around drop glue issues that might arise with `read_ws`.
826    fn read_ws_owned028<R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
827        Box::pin(async move {
828            let value = Self::read_ws028(&mut stream).await?;
829            Ok((stream, value))
830        })
831    }
832}
833
834/// This trait allows restricting the acceptable length of collection types.
835///
836/// By default, types from this crate implementing this trait represent their length as a `u64`. If the maximum length is limited (e.g. using the `#[async_proto(max_len = ...)]` attribute when deriving [`Protocol`]), the length may be represented using a smaller integer type.
837pub trait LengthPrefixed: Protocol {
838    /// Reads a value of this type from an async stream, limiting the length to the given value.
839    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
840    /// Writes a value of this type to an async sink, limiting the length to the given value.
841    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
842    /// Reads a value of this type from a sync stream, limiting the length to the given value.
843    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
844    /// Writes a value of this type to a sync sink, limiting the length to the given value.
845    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
846}
847
848/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
849///
850/// Useful for WebSocket connections where the message type per direction is always the same.
851#[cfg(feature = "tokio-tungstenite021")]
852#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
853pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
854    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
855    let (sink, stream) = sock.split();
856    Ok((
857        sink.sink_map_err(|e| WriteError {
858            context: ErrorContext::WebSocketSink,
859            kind: e.into(),
860        }).with_flat_map::<W, _, _>(|msg| {
861            let mut buf = Vec::default();
862            match msg.write_sync(&mut buf) {
863                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
864                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
865                } else {
866                    Either::Right(stream::iter(
867                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
868                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
869                        .collect::<Vec<_>>()
870                    ))
871                }.map(Ok)),
872                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
873                    context: ErrorContext::WebSocket {
874                        source: Box::new(context),
875                    },
876                    kind,
877                }))),
878            }
879        }),
880        stream.scan(None, |state, res| {
881            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
882                let packet = res.map_err(|e| ReadError {
883                    context: ErrorContext::WebSocketStream,
884                    kind: e.into(),
885                })?;
886                Ok(if let Some((len, buf)) = state {
887                    if let tungstenite021::Message::Binary(data) = packet {
888                        buf.extend_from_slice(&data);
889                    } else {
890                        return Err(ReadError {
891                            context: ErrorContext::DefaultImpl,
892                            kind: ReadErrorKind::MessageKind021(packet),
893                        })
894                    }
895                    if buf.len() >= *len {
896                        let buf = mem::take(buf);
897                        *state = None;
898                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
899                            context: ErrorContext::WebSocket {
900                                source: Box::new(context),
901                            },
902                            kind,
903                        })?)))
904                    } else {
905                        Either::Left(stream::empty())
906                    }
907                } else {
908                    match packet {
909                        tungstenite021::Message::Text(data) => match data.chars().next() {
910                            Some('m') => {
911                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
912                                    context: ErrorContext::DefaultImpl,
913                                    kind: e.into(),
914                                })?;
915                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
916                                    context: ErrorContext::DefaultImpl,
917                                    kind: e.into(),
918                                })?;
919                                *state = Some((len, buf));
920                                Either::Left(stream::empty())
921                            }
922                            _ => return Err(ReadError {
923                                context: ErrorContext::DefaultImpl,
924                                kind: ReadErrorKind::WebSocketTextMessage024(data),
925                            }),
926                        },
927                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
928                            context: ErrorContext::WebSocket {
929                                source: Box::new(context),
930                            },
931                            kind,
932                        })?))),
933                        _ => return Err(ReadError {
934                            context: ErrorContext::DefaultImpl,
935                            kind: ReadErrorKind::MessageKind021(packet),
936                        }),
937                    }
938                })
939            }
940
941            future::ready(Some(scanner(state, res)))
942        }).try_flatten(),
943    ))
944}
945
946/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
947///
948/// Useful for WebSocket connections where the message type per direction is always the same.
949#[cfg(feature = "tokio-tungstenite024")]
950#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
951pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
952    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
953    let (sink, stream) = sock.split();
954    Ok((
955        sink.sink_map_err(|e| WriteError {
956            context: ErrorContext::WebSocketSink,
957            kind: e.into(),
958        }).with_flat_map::<W, _, _>(|msg| {
959            let mut buf = Vec::default();
960            match msg.write_sync(&mut buf) {
961                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
962                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
963                } else {
964                    Either::Right(stream::iter(
965                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
966                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
967                        .collect::<Vec<_>>()
968                    ))
969                }.map(Ok)),
970                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
971                    context: ErrorContext::WebSocket {
972                        source: Box::new(context),
973                    },
974                    kind,
975                }))),
976            }
977        }),
978        stream.scan(None, |state, res| {
979            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
980                let packet = res.map_err(|e| ReadError {
981                    context: ErrorContext::WebSocketStream,
982                    kind: e.into(),
983                })?;
984                Ok(if let Some((len, buf)) = state {
985                    if let tungstenite024::Message::Binary(data) = packet {
986                        buf.extend_from_slice(&data);
987                    } else {
988                        return Err(ReadError {
989                            context: ErrorContext::DefaultImpl,
990                            kind: ReadErrorKind::MessageKind024(packet),
991                        })
992                    }
993                    if buf.len() >= *len {
994                        let buf = mem::take(buf);
995                        *state = None;
996                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
997                            context: ErrorContext::WebSocket {
998                                source: Box::new(context),
999                            },
1000                            kind,
1001                        })?)))
1002                    } else {
1003                        Either::Left(stream::empty())
1004                    }
1005                } else {
1006                    match packet {
1007                        tungstenite024::Message::Text(data) => match data.chars().next() {
1008                            Some('m') => {
1009                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1010                                    context: ErrorContext::DefaultImpl,
1011                                    kind: e.into(),
1012                                })?;
1013                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1014                                    context: ErrorContext::DefaultImpl,
1015                                    kind: e.into(),
1016                                })?;
1017                                *state = Some((len, buf));
1018                                Either::Left(stream::empty())
1019                            }
1020                            _ => return Err(ReadError {
1021                                context: ErrorContext::DefaultImpl,
1022                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1023                            }),
1024                        },
1025                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1026                            context: ErrorContext::WebSocket {
1027                                source: Box::new(context),
1028                            },
1029                            kind,
1030                        })?))),
1031                        _ => return Err(ReadError {
1032                            context: ErrorContext::DefaultImpl,
1033                            kind: ReadErrorKind::MessageKind024(packet),
1034                        }),
1035                    }
1036                })
1037            }
1038
1039            future::ready(Some(scanner(state, res)))
1040        }).try_flatten(),
1041    ))
1042}
1043
1044/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1045///
1046/// Useful for WebSocket connections where the message type per direction is always the same.
1047#[cfg(feature = "tokio-tungstenite028")]
1048#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
1049pub async fn websocket028<R: Protocol, W: Protocol>(request: impl tungstenite028::client::IntoClientRequest + Unpin) -> tungstenite028::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1050    let (sock, _) = tokio_tungstenite028::connect_async(request).await?;
1051    let (sink, stream) = sock.split();
1052    Ok((
1053        sink.sink_map_err(|e| WriteError {
1054            context: ErrorContext::WebSocketSink,
1055            kind: e.into(),
1056        }).with_flat_map::<W, _, _>(|msg| {
1057            let mut buf = Vec::default();
1058            match msg.write_sync(&mut buf) {
1059                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1060                    Either::Left(stream::once(future::ready(tungstenite028::Message::binary(buf))))
1061                } else {
1062                    Either::Right(stream::iter(
1063                        iter::once(tungstenite028::Message::text(format!("m{}", buf.len())))
1064                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))))
1065                        .collect::<Vec<_>>()
1066                    ))
1067                }.map(Ok)),
1068                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1069                    context: ErrorContext::WebSocket {
1070                        source: Box::new(context),
1071                    },
1072                    kind,
1073                }))),
1074            }
1075        }),
1076        stream.scan(None, |state, res| {
1077            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite028::Result<tungstenite028::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1078                let packet = res.map_err(|e| ReadError {
1079                    context: ErrorContext::WebSocketStream,
1080                    kind: e.into(),
1081                })?;
1082                Ok(if let Some((len, buf)) = state {
1083                    if let tungstenite028::Message::Binary(data) = packet {
1084                        buf.extend_from_slice(&data);
1085                    } else {
1086                        return Err(ReadError {
1087                            context: ErrorContext::DefaultImpl,
1088                            kind: ReadErrorKind::MessageKind028(packet),
1089                        })
1090                    }
1091                    if buf.len() >= *len {
1092                        let buf = mem::take(buf);
1093                        *state = None;
1094                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1095                            context: ErrorContext::WebSocket {
1096                                source: Box::new(context),
1097                            },
1098                            kind,
1099                        })?)))
1100                    } else {
1101                        Either::Left(stream::empty())
1102                    }
1103                } else {
1104                    match packet {
1105                        tungstenite028::Message::Text(data) => match data.chars().next() {
1106                            Some('m') => {
1107                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1108                                    context: ErrorContext::DefaultImpl,
1109                                    kind: e.into(),
1110                                })?;
1111                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1112                                    context: ErrorContext::DefaultImpl,
1113                                    kind: e.into(),
1114                                })?;
1115                                *state = Some((len, buf));
1116                                Either::Left(stream::empty())
1117                            }
1118                            _ => return Err(ReadError {
1119                                context: ErrorContext::DefaultImpl,
1120                                kind: ReadErrorKind::WebSocketTextMessage028(data),
1121                            }),
1122                        },
1123                        tungstenite028::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1124                            context: ErrorContext::WebSocket {
1125                                source: Box::new(context),
1126                            },
1127                            kind,
1128                        })?))),
1129                        _ => return Err(ReadError {
1130                            context: ErrorContext::DefaultImpl,
1131                            kind: ReadErrorKind::MessageKind028(packet),
1132                        }),
1133                    }
1134                })
1135            }
1136
1137            future::ready(Some(scanner(state, res)))
1138        }).try_flatten(),
1139    ))
1140}