async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bitvec`](https://docs.rs/bitvec): [`BitVec<u8, Lsb0>`](https://docs.rs/bitvec/latest/bitvec/vec/struct.BitVec.html)
17//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
18//! * [`bytesize`](https://docs.rs/bytesize): [`ByteSize`](https://docs.rs/bytesize/latest/bytesize/struct.ByteSize.html)
19//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
20//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
21//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
22//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
23//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
24//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
25//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
26//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
27//! * [`nonempty-collections`](https://docs.rs/nonempty-collections): [`NEMap`](https://docs.rs/nonempty-collections/latest/nonempty_collections/map/struct.NEMap.html), [`NESet`](https://docs.rs/nonempty-collections/latest/nonempty_collections/set/struct.NESet.html), and [`NEVec`](https://docs.rs/nonempty-collections/latest/nonempty_collections/vector/struct.NEVec.html)
28//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
29//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
30//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
31//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
32//! * [`url`](https://docs.rs/url): [`Url`](https://docs.rs/url/latest/url/struct.Url.html)
33//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
34//!
35//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
36//!
37//! * The latest release (currently [`tokio-tungstenite` 0.27](https://docs.rs/tokio-tungstenite/0.27), feature flag `tokio-tungstenite027`)
38//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
39//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
40
41use {
42    std::{
43        future::Future,
44        io::{
45            self,
46            prelude::*,
47        },
48        pin::Pin,
49    },
50    tokio::io::{
51        AsyncRead,
52        AsyncWrite,
53    },
54};
55#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
56    std::{
57        iter,
58        mem,
59    },
60    fallible_collections::FallibleVec,
61    futures::{
62        Sink,
63        SinkExt as _,
64        future::{
65            self,
66            Either,
67        },
68        stream::{
69            self,
70            Stream,
71            StreamExt as _,
72            TryStreamExt as _,
73        },
74    },
75};
76#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
77#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
78#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
79pub use {
80    async_proto_derive::{
81        Protocol,
82        bitflags,
83    },
84    crate::error::*,
85};
86#[doc(hidden)] pub use tokio; // used in proc macro
87
88mod error;
89mod impls;
90
91/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
92#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
93
94/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
95pub trait Protocol: Sized {
96    /// Reads a value of this type from an async stream.
97    ///
98    /// # Cancellation safety
99    ///
100    /// Implementations of this method are generally not cancellation safe.
101    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
102    /// Writes a value of this type to an async sink.
103    ///
104    /// # Cancellation safety
105    ///
106    /// Implementations of this method are generally not cancellation safe.
107    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
108    /// Reads a value of this type from a sync stream.
109    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
110    /// Writes a value of this type to a sync sink.
111    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
112
113    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
114    ///
115    /// This can be used to get around drop glue issues that might arise with `read`.
116    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
117        Box::pin(async move {
118            let value = Self::read(&mut stream).await?;
119            Ok((stream, value))
120        })
121    }
122
123    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
124    ///
125    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
126    ///
127    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
128    ///
129    /// # Example
130    ///
131    /// ```
132    /// use {
133    ///     std::{
134    ///         io,
135    ///         net::TcpStream,
136    ///     },
137    ///     async_proto::Protocol,
138    /// };
139    ///
140    /// struct Client {
141    ///     tcp_stream: TcpStream,
142    ///     buf: Vec<u8>,
143    /// }
144    ///
145    /// impl Client {
146    ///     fn new(tcp_stream: TcpStream) -> Self {
147    ///         Self {
148    ///             tcp_stream,
149    ///             buf: Vec::default(),
150    ///         }
151    ///     }
152    ///
153    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
154    ///         self.tcp_stream.set_nonblocking(true)?;
155    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
156    ///     }
157    ///
158    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
159    ///         self.tcp_stream.set_nonblocking(false)?;
160    ///         msg.write_sync(&mut self.tcp_stream)?;
161    ///         Ok(())
162    ///     }
163    /// }
164    /// ```
165    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
166        let mut temp_buf = vec![0; 8];
167        loop {
168            let mut slice = &mut &**buf;
169            match Self::read_sync(&mut slice) {
170                Ok(value) => {
171                    let value_len = slice.len();
172                    buf.drain(..buf.len() - value_len);
173                    return Ok(Some(value))
174                }
175                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
176                Err(e) => return Err(e),
177            }
178            match stream.read(&mut temp_buf) {
179                Ok(0) => return Err(ReadError {
180                    context: ErrorContext::DefaultImpl,
181                    kind: ReadErrorKind::EndOfStream,
182                }),
183                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
184                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
185                Err(e) => return Err(ReadError {
186                    context: ErrorContext::DefaultImpl,
187                    kind: e.into(),
188                }),
189            }
190        }
191    }
192
193    #[cfg(feature = "tokio-tungstenite021")]
194    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
195    /// Reads a value of this type from a `tokio-tungstenite` websocket.
196    ///
197    /// # Cancellation safety
198    ///
199    /// The default implementation of this method is not cancellation safe.
200    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
201        Box::pin(async move {
202            let packet = stream.try_next().await.map_err(|e| ReadError {
203                context: ErrorContext::DefaultImpl,
204                kind: e.into(),
205            })?.ok_or_else(|| ReadError {
206                context: ErrorContext::DefaultImpl,
207                kind: ReadErrorKind::EndOfStream,
208            })?;
209            match packet {
210                tungstenite021::Message::Text(data) => match data.chars().next() {
211                    Some('m') => {
212                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
213                            context: ErrorContext::DefaultImpl,
214                            kind: e.into(),
215                        })?;
216                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
217                            context: ErrorContext::DefaultImpl,
218                            kind: e.into(),
219                        })?;
220                        while buf.len() < len {
221                            let packet = stream.try_next().await.map_err(|e| ReadError {
222                                context: ErrorContext::DefaultImpl,
223                                kind: e.into(),
224                            })?.ok_or_else(|| ReadError {
225                                context: ErrorContext::DefaultImpl,
226                                kind: ReadErrorKind::EndOfStream,
227                            })?;
228                            if let tungstenite021::Message::Binary(data) = packet {
229                                buf.extend_from_slice(&data);
230                            } else {
231                                return Err(ReadError {
232                                    context: ErrorContext::DefaultImpl,
233                                    kind: ReadErrorKind::MessageKind021(packet),
234                                })
235                            }
236                        }
237                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
238                            context: ErrorContext::WebSocket {
239                                source: Box::new(context),
240                            },
241                            kind,
242                        })
243                    }
244                    _ => Err(ReadError {
245                        context: ErrorContext::DefaultImpl,
246                        kind: ReadErrorKind::WebSocketTextMessage024(data),
247                    }),
248                },
249                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
250                    context: ErrorContext::WebSocket {
251                        source: Box::new(context),
252                    },
253                    kind,
254                }),
255                _ => Err(ReadError {
256                    context: ErrorContext::DefaultImpl,
257                    kind: ReadErrorKind::MessageKind021(packet),
258                }),
259            }
260        })
261    }
262
263    #[cfg(feature = "tokio-tungstenite024")]
264    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
265    /// Reads a value of this type from a `tokio-tungstenite` websocket.
266    ///
267    /// # Cancellation safety
268    ///
269    /// The default implementation of this method is not cancellation safe.
270    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
271        Box::pin(async move {
272            let packet = stream.try_next().await.map_err(|e| ReadError {
273                context: ErrorContext::DefaultImpl,
274                kind: e.into(),
275            })?.ok_or_else(|| ReadError {
276                context: ErrorContext::DefaultImpl,
277                kind: ReadErrorKind::EndOfStream,
278            })?;
279            match packet {
280                tungstenite024::Message::Text(data) => match data.chars().next() {
281                    Some('m') => {
282                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
283                            context: ErrorContext::DefaultImpl,
284                            kind: e.into(),
285                        })?;
286                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
287                            context: ErrorContext::DefaultImpl,
288                            kind: e.into(),
289                        })?;
290                        while buf.len() < len {
291                            let packet = stream.try_next().await.map_err(|e| ReadError {
292                                context: ErrorContext::DefaultImpl,
293                                kind: e.into(),
294                            })?.ok_or_else(|| ReadError {
295                                context: ErrorContext::DefaultImpl,
296                                kind: ReadErrorKind::EndOfStream,
297                            })?;
298                            if let tungstenite024::Message::Binary(data) = packet {
299                                buf.extend_from_slice(&data);
300                            } else {
301                                return Err(ReadError {
302                                    context: ErrorContext::DefaultImpl,
303                                    kind: ReadErrorKind::MessageKind024(packet),
304                                })
305                            }
306                        }
307                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
308                            context: ErrorContext::WebSocket {
309                                source: Box::new(context),
310                            },
311                            kind,
312                        })
313                    }
314                    _ => Err(ReadError {
315                        context: ErrorContext::DefaultImpl,
316                        kind: ReadErrorKind::WebSocketTextMessage024(data),
317                    }),
318                },
319                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
320                    context: ErrorContext::WebSocket {
321                        source: Box::new(context),
322                    },
323                    kind,
324                }),
325                _ => Err(ReadError {
326                    context: ErrorContext::DefaultImpl,
327                    kind: ReadErrorKind::MessageKind024(packet),
328                }),
329            }
330        })
331    }
332
333    #[cfg(feature = "tokio-tungstenite027")]
334    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
335    /// Reads a value of this type from a `tokio-tungstenite` websocket.
336    ///
337    /// # Cancellation safety
338    ///
339    /// The default implementation of this method is not cancellation safe.
340    fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
341        Box::pin(async move {
342            let packet = stream.try_next().await.map_err(|e| ReadError {
343                context: ErrorContext::DefaultImpl,
344                kind: e.into(),
345            })?.ok_or_else(|| ReadError {
346                context: ErrorContext::DefaultImpl,
347                kind: ReadErrorKind::EndOfStream,
348            })?;
349            match packet {
350                tungstenite027::Message::Text(data) => match data.chars().next() {
351                    Some('m') => {
352                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
353                            context: ErrorContext::DefaultImpl,
354                            kind: e.into(),
355                        })?;
356                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
357                            context: ErrorContext::DefaultImpl,
358                            kind: e.into(),
359                        })?;
360                        while buf.len() < len {
361                            let packet = stream.try_next().await.map_err(|e| ReadError {
362                                context: ErrorContext::DefaultImpl,
363                                kind: e.into(),
364                            })?.ok_or_else(|| ReadError {
365                                context: ErrorContext::DefaultImpl,
366                                kind: ReadErrorKind::EndOfStream,
367                            })?;
368                            if let tungstenite027::Message::Binary(data) = packet {
369                                buf.extend_from_slice(&data);
370                            } else {
371                                return Err(ReadError {
372                                    context: ErrorContext::DefaultImpl,
373                                    kind: ReadErrorKind::MessageKind027(packet),
374                                })
375                            }
376                        }
377                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
378                            context: ErrorContext::WebSocket {
379                                source: Box::new(context),
380                            },
381                            kind,
382                        })
383                    }
384                    _ => Err(ReadError {
385                        context: ErrorContext::DefaultImpl,
386                        kind: ReadErrorKind::WebSocketTextMessage027(data),
387                    }),
388                },
389                tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
390                    context: ErrorContext::WebSocket {
391                        source: Box::new(context),
392                    },
393                    kind,
394                }),
395                _ => Err(ReadError {
396                    context: ErrorContext::DefaultImpl,
397                    kind: ReadErrorKind::MessageKind027(packet),
398                }),
399            }
400        })
401    }
402
403    #[cfg(feature = "tokio-tungstenite021")]
404    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
405    /// Writes a value of this type to a `tokio-tungstenite` websocket.
406    ///
407    /// # Cancellation safety
408    ///
409    /// The default implementation of this method is not cancellation safe.
410    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
411    where Self: Sync {
412        Box::pin(async move {
413            let mut buf = Vec::default();
414            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
415                context: ErrorContext::WebSocket {
416                    source: Box::new(context),
417                },
418                kind,
419            })?;
420            if buf.len() <= WS_MAX_MESSAGE_SIZE {
421                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
422                    context: ErrorContext::DefaultImpl,
423                    kind: e.into(),
424                })?;
425            } else {
426                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
427                    context: ErrorContext::DefaultImpl,
428                    kind: e.into(),
429                })?;
430                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
431                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
432                        context: ErrorContext::DefaultImpl,
433                        kind: e.into(),
434                    })?;
435                }
436            }
437            Ok(())
438        })
439    }
440
441    #[cfg(feature = "tokio-tungstenite024")]
442    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
443    /// Writes a value of this type to a `tokio-tungstenite` websocket.
444    ///
445    /// # Cancellation safety
446    ///
447    /// The default implementation of this method is not cancellation safe.
448    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
449    where Self: Sync {
450        Box::pin(async move {
451            let mut buf = Vec::default();
452            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
453                context: ErrorContext::WebSocket {
454                    source: Box::new(context),
455                },
456                kind,
457            })?;
458            if buf.len() <= WS_MAX_MESSAGE_SIZE {
459                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
460                    context: ErrorContext::DefaultImpl,
461                    kind: e.into(),
462                })?;
463            } else {
464                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
465                    context: ErrorContext::DefaultImpl,
466                    kind: e.into(),
467                })?;
468                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
469                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
470                        context: ErrorContext::DefaultImpl,
471                        kind: e.into(),
472                    })?;
473                }
474            }
475            Ok(())
476        })
477    }
478
479    #[cfg(feature = "tokio-tungstenite027")]
480    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
481    /// Writes a value of this type to a `tokio-tungstenite` websocket.
482    ///
483    /// # Cancellation safety
484    ///
485    /// The default implementation of this method is not cancellation safe.
486    fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
487    where Self: Sync {
488        Box::pin(async move {
489            let mut buf = Vec::default();
490            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
491                context: ErrorContext::WebSocket {
492                    source: Box::new(context),
493                },
494                kind,
495            })?;
496            if buf.len() <= WS_MAX_MESSAGE_SIZE {
497                sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
498                    context: ErrorContext::DefaultImpl,
499                    kind: e.into(),
500                })?;
501            } else {
502                sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
503                    context: ErrorContext::DefaultImpl,
504                    kind: e.into(),
505                })?;
506                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
507                    sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
508                        context: ErrorContext::DefaultImpl,
509                        kind: e.into(),
510                    })?;
511                }
512            }
513            Ok(())
514        })
515    }
516
517    #[cfg(feature = "tokio-tungstenite021")]
518    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
519    /// Reads a value of this type from a [`tungstenite021`] websocket.
520    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
521        let packet = websocket.read().map_err(|e| ReadError {
522            context: ErrorContext::DefaultImpl,
523            kind: e.into(),
524        })?;
525        match packet {
526            tungstenite021::Message::Text(data) => match data.chars().next() {
527                Some('m') => {
528                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
529                        context: ErrorContext::DefaultImpl,
530                        kind: e.into(),
531                    })?;
532                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
533                        context: ErrorContext::DefaultImpl,
534                        kind: e.into(),
535                    })?;
536                    while buf.len() < len {
537                        let packet = websocket.read().map_err(|e| ReadError {
538                            context: ErrorContext::DefaultImpl,
539                            kind: e.into(),
540                        })?;
541                        if let tungstenite021::Message::Binary(data) = packet {
542                            buf.extend_from_slice(&data);
543                        } else {
544                            return Err(ReadError {
545                                context: ErrorContext::DefaultImpl,
546                                kind: ReadErrorKind::MessageKind021(packet),
547                            })
548                        }
549                    }
550                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
551                        context: ErrorContext::WebSocket {
552                            source: Box::new(context),
553                        },
554                        kind,
555                    })
556                }
557                _ => return Err(ReadError {
558                    context: ErrorContext::DefaultImpl,
559                    kind: ReadErrorKind::WebSocketTextMessage024(data),
560                }),
561            },
562            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
563                context: ErrorContext::WebSocket {
564                    source: Box::new(context),
565                },
566                kind,
567            }),
568            _ => Err(ReadError {
569                context: ErrorContext::DefaultImpl,
570                kind: ReadErrorKind::MessageKind021(packet),
571            }),
572        }
573    }
574
575    #[cfg(feature = "tokio-tungstenite024")]
576    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
577    /// Reads a value of this type from a [`tungstenite024`] websocket.
578    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
579        let packet = websocket.read().map_err(|e| ReadError {
580            context: ErrorContext::DefaultImpl,
581            kind: e.into(),
582        })?;
583        match packet {
584            tungstenite024::Message::Text(data) => match data.chars().next() {
585                Some('m') => {
586                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
587                        context: ErrorContext::DefaultImpl,
588                        kind: e.into(),
589                    })?;
590                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
591                        context: ErrorContext::DefaultImpl,
592                        kind: e.into(),
593                    })?;
594                    while buf.len() < len {
595                        let packet = websocket.read().map_err(|e| ReadError {
596                            context: ErrorContext::DefaultImpl,
597                            kind: e.into(),
598                        })?;
599                        if let tungstenite024::Message::Binary(data) = packet {
600                            buf.extend_from_slice(&data);
601                        } else {
602                            return Err(ReadError {
603                                context: ErrorContext::DefaultImpl,
604                                kind: ReadErrorKind::MessageKind024(packet),
605                            })
606                        }
607                    }
608                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
609                        context: ErrorContext::WebSocket {
610                            source: Box::new(context),
611                        },
612                        kind,
613                    })
614                }
615                _ => return Err(ReadError {
616                    context: ErrorContext::DefaultImpl,
617                    kind: ReadErrorKind::WebSocketTextMessage024(data),
618                }),
619            },
620            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
621                context: ErrorContext::WebSocket {
622                    source: Box::new(context),
623                },
624                kind,
625            }),
626            _ => Err(ReadError {
627                context: ErrorContext::DefaultImpl,
628                kind: ReadErrorKind::MessageKind024(packet),
629            }),
630        }
631    }
632
633    #[cfg(feature = "tokio-tungstenite027")]
634    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
635    /// Reads a value of this type from a [`tungstenite027`] websocket.
636    fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
637        let packet = websocket.read().map_err(|e| ReadError {
638            context: ErrorContext::DefaultImpl,
639            kind: e.into(),
640        })?;
641        match packet {
642            tungstenite027::Message::Text(data) => match data.chars().next() {
643                Some('m') => {
644                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
645                        context: ErrorContext::DefaultImpl,
646                        kind: e.into(),
647                    })?;
648                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
649                        context: ErrorContext::DefaultImpl,
650                        kind: e.into(),
651                    })?;
652                    while buf.len() < len {
653                        let packet = websocket.read().map_err(|e| ReadError {
654                            context: ErrorContext::DefaultImpl,
655                            kind: e.into(),
656                        })?;
657                        if let tungstenite027::Message::Binary(data) = packet {
658                            buf.extend_from_slice(&data);
659                        } else {
660                            return Err(ReadError {
661                                context: ErrorContext::DefaultImpl,
662                                kind: ReadErrorKind::MessageKind027(packet),
663                            })
664                        }
665                    }
666                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
667                        context: ErrorContext::WebSocket {
668                            source: Box::new(context),
669                        },
670                        kind,
671                    })
672                }
673                _ => return Err(ReadError {
674                    context: ErrorContext::DefaultImpl,
675                    kind: ReadErrorKind::WebSocketTextMessage027(data),
676                }),
677            },
678            tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
679                context: ErrorContext::WebSocket {
680                    source: Box::new(context),
681                },
682                kind,
683            }),
684            _ => Err(ReadError {
685                context: ErrorContext::DefaultImpl,
686                kind: ReadErrorKind::MessageKind027(packet),
687            }),
688        }
689    }
690
691    #[cfg(feature = "tokio-tungstenite021")]
692    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
693    /// Writes a value of this type to a [`tungstenite021`] websocket.
694    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
695        let mut buf = Vec::default();
696        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
697            context: ErrorContext::WebSocket {
698                source: Box::new(context),
699            },
700            kind,
701        })?;
702        if buf.len() <= WS_MAX_MESSAGE_SIZE {
703            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
704                context: ErrorContext::DefaultImpl,
705                kind: e.into(),
706            })?;
707        } else {
708            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
709                context: ErrorContext::DefaultImpl,
710                kind: e.into(),
711            })?;
712            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
713                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
714                    context: ErrorContext::DefaultImpl,
715                    kind: e.into(),
716                })?;
717            }
718        }
719        websocket.flush().map_err(|e| WriteError {
720            context: ErrorContext::DefaultImpl,
721            kind: e.into(),
722        })?;
723        Ok(())
724    }
725
726    #[cfg(feature = "tokio-tungstenite024")]
727    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
728    /// Writes a value of this type to a [`tungstenite024`] websocket.
729    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
730        let mut buf = Vec::default();
731        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
732            context: ErrorContext::WebSocket {
733                source: Box::new(context),
734            },
735            kind,
736        })?;
737        if buf.len() <= WS_MAX_MESSAGE_SIZE {
738            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
739                context: ErrorContext::DefaultImpl,
740                kind: e.into(),
741            })?;
742        } else {
743            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
744                context: ErrorContext::DefaultImpl,
745                kind: e.into(),
746            })?;
747            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
748                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
749                    context: ErrorContext::DefaultImpl,
750                    kind: e.into(),
751                })?;
752            }
753        }
754        websocket.flush().map_err(|e| WriteError {
755            context: ErrorContext::DefaultImpl,
756            kind: e.into(),
757        })?;
758        Ok(())
759    }
760
761    #[cfg(feature = "tokio-tungstenite027")]
762    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
763    /// Writes a value of this type to a [`tungstenite027`] websocket.
764    fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
765        let mut buf = Vec::default();
766        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
767            context: ErrorContext::WebSocket {
768                source: Box::new(context),
769            },
770            kind,
771        })?;
772        if buf.len() <= WS_MAX_MESSAGE_SIZE {
773            websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
774                context: ErrorContext::DefaultImpl,
775                kind: e.into(),
776            })?;
777        } else {
778            websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
779                context: ErrorContext::DefaultImpl,
780                kind: e.into(),
781            })?;
782            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
783                websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
784                    context: ErrorContext::DefaultImpl,
785                    kind: e.into(),
786                })?;
787            }
788        }
789        websocket.flush().map_err(|e| WriteError {
790            context: ErrorContext::DefaultImpl,
791            kind: e.into(),
792        })?;
793        Ok(())
794    }
795
796    #[cfg(feature = "tokio-tungstenite021")]
797    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
798    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
799    ///
800    /// This can be used to get around drop glue issues that might arise with `read_ws`.
801    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
802        Box::pin(async move {
803            let value = Self::read_ws021(&mut stream).await?;
804            Ok((stream, value))
805        })
806    }
807
808    #[cfg(feature = "tokio-tungstenite024")]
809    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
810    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
811    ///
812    /// This can be used to get around drop glue issues that might arise with `read_ws`.
813    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
814        Box::pin(async move {
815            let value = Self::read_ws024(&mut stream).await?;
816            Ok((stream, value))
817        })
818    }
819
820    #[cfg(feature = "tokio-tungstenite027")]
821    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
822    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
823    ///
824    /// This can be used to get around drop glue issues that might arise with `read_ws`.
825    fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
826        Box::pin(async move {
827            let value = Self::read_ws027(&mut stream).await?;
828            Ok((stream, value))
829        })
830    }
831}
832
833/// This trait allows restricting the acceptable length of collection types.
834///
835/// By default, types from this crate implementing this trait represent their length as a `u64`. If the maximum length is limited (e.g. using the `#[async_proto(max_len = ...)]` attribute when deriving [`Protocol`]), the length may be represented using a smaller integer type.
836pub trait LengthPrefixed: Protocol {
837    /// Reads a value of this type from an async stream, limiting the length to the given value.
838    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
839    /// Writes a value of this type to an async sink, limiting the length to the given value.
840    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
841    /// Reads a value of this type from a sync stream, limiting the length to the given value.
842    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
843    /// Writes a value of this type to a sync sink, limiting the length to the given value.
844    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
845}
846
847/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
848///
849/// Useful for WebSocket connections where the message type per direction is always the same.
850#[cfg(feature = "tokio-tungstenite021")]
851#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
852pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
853    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
854    let (sink, stream) = sock.split();
855    Ok((
856        sink.sink_map_err(|e| WriteError {
857            context: ErrorContext::WebSocketSink,
858            kind: e.into(),
859        }).with_flat_map::<W, _, _>(|msg| {
860            let mut buf = Vec::default();
861            match msg.write_sync(&mut buf) {
862                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
863                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
864                } else {
865                    Either::Right(stream::iter(
866                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
867                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
868                        .collect::<Vec<_>>()
869                    ))
870                }.map(Ok)),
871                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
872                    context: ErrorContext::WebSocket {
873                        source: Box::new(context),
874                    },
875                    kind,
876                }))),
877            }
878        }),
879        stream.scan(None, |state, res| {
880            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
881                let packet = res.map_err(|e| ReadError {
882                    context: ErrorContext::WebSocketStream,
883                    kind: e.into(),
884                })?;
885                Ok(if let Some((len, buf)) = state {
886                    if let tungstenite021::Message::Binary(data) = packet {
887                        buf.extend_from_slice(&data);
888                    } else {
889                        return Err(ReadError {
890                            context: ErrorContext::DefaultImpl,
891                            kind: ReadErrorKind::MessageKind021(packet),
892                        })
893                    }
894                    if buf.len() >= *len {
895                        let buf = mem::take(buf);
896                        *state = None;
897                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
898                            context: ErrorContext::WebSocket {
899                                source: Box::new(context),
900                            },
901                            kind,
902                        })?)))
903                    } else {
904                        Either::Left(stream::empty())
905                    }
906                } else {
907                    match packet {
908                        tungstenite021::Message::Text(data) => match data.chars().next() {
909                            Some('m') => {
910                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
911                                    context: ErrorContext::DefaultImpl,
912                                    kind: e.into(),
913                                })?;
914                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
915                                    context: ErrorContext::DefaultImpl,
916                                    kind: e.into(),
917                                })?;
918                                *state = Some((len, buf));
919                                Either::Left(stream::empty())
920                            }
921                            _ => return Err(ReadError {
922                                context: ErrorContext::DefaultImpl,
923                                kind: ReadErrorKind::WebSocketTextMessage024(data),
924                            }),
925                        },
926                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
927                            context: ErrorContext::WebSocket {
928                                source: Box::new(context),
929                            },
930                            kind,
931                        })?))),
932                        _ => return Err(ReadError {
933                            context: ErrorContext::DefaultImpl,
934                            kind: ReadErrorKind::MessageKind021(packet),
935                        }),
936                    }
937                })
938            }
939
940            future::ready(Some(scanner(state, res)))
941        }).try_flatten(),
942    ))
943}
944
945/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
946///
947/// Useful for WebSocket connections where the message type per direction is always the same.
948#[cfg(feature = "tokio-tungstenite024")]
949#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
950pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
951    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
952    let (sink, stream) = sock.split();
953    Ok((
954        sink.sink_map_err(|e| WriteError {
955            context: ErrorContext::WebSocketSink,
956            kind: e.into(),
957        }).with_flat_map::<W, _, _>(|msg| {
958            let mut buf = Vec::default();
959            match msg.write_sync(&mut buf) {
960                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
961                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
962                } else {
963                    Either::Right(stream::iter(
964                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
965                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
966                        .collect::<Vec<_>>()
967                    ))
968                }.map(Ok)),
969                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
970                    context: ErrorContext::WebSocket {
971                        source: Box::new(context),
972                    },
973                    kind,
974                }))),
975            }
976        }),
977        stream.scan(None, |state, res| {
978            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
979                let packet = res.map_err(|e| ReadError {
980                    context: ErrorContext::WebSocketStream,
981                    kind: e.into(),
982                })?;
983                Ok(if let Some((len, buf)) = state {
984                    if let tungstenite024::Message::Binary(data) = packet {
985                        buf.extend_from_slice(&data);
986                    } else {
987                        return Err(ReadError {
988                            context: ErrorContext::DefaultImpl,
989                            kind: ReadErrorKind::MessageKind024(packet),
990                        })
991                    }
992                    if buf.len() >= *len {
993                        let buf = mem::take(buf);
994                        *state = None;
995                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
996                            context: ErrorContext::WebSocket {
997                                source: Box::new(context),
998                            },
999                            kind,
1000                        })?)))
1001                    } else {
1002                        Either::Left(stream::empty())
1003                    }
1004                } else {
1005                    match packet {
1006                        tungstenite024::Message::Text(data) => match data.chars().next() {
1007                            Some('m') => {
1008                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1009                                    context: ErrorContext::DefaultImpl,
1010                                    kind: e.into(),
1011                                })?;
1012                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1013                                    context: ErrorContext::DefaultImpl,
1014                                    kind: e.into(),
1015                                })?;
1016                                *state = Some((len, buf));
1017                                Either::Left(stream::empty())
1018                            }
1019                            _ => return Err(ReadError {
1020                                context: ErrorContext::DefaultImpl,
1021                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1022                            }),
1023                        },
1024                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1025                            context: ErrorContext::WebSocket {
1026                                source: Box::new(context),
1027                            },
1028                            kind,
1029                        })?))),
1030                        _ => return Err(ReadError {
1031                            context: ErrorContext::DefaultImpl,
1032                            kind: ReadErrorKind::MessageKind024(packet),
1033                        }),
1034                    }
1035                })
1036            }
1037
1038            future::ready(Some(scanner(state, res)))
1039        }).try_flatten(),
1040    ))
1041}
1042
1043/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1044///
1045/// Useful for WebSocket connections where the message type per direction is always the same.
1046#[cfg(feature = "tokio-tungstenite027")]
1047#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1048pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1049    let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1050    let (sink, stream) = sock.split();
1051    Ok((
1052        sink.sink_map_err(|e| WriteError {
1053            context: ErrorContext::WebSocketSink,
1054            kind: e.into(),
1055        }).with_flat_map::<W, _, _>(|msg| {
1056            let mut buf = Vec::default();
1057            match msg.write_sync(&mut buf) {
1058                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1059                    Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1060                } else {
1061                    Either::Right(stream::iter(
1062                        iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1063                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1064                        .collect::<Vec<_>>()
1065                    ))
1066                }.map(Ok)),
1067                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1068                    context: ErrorContext::WebSocket {
1069                        source: Box::new(context),
1070                    },
1071                    kind,
1072                }))),
1073            }
1074        }),
1075        stream.scan(None, |state, res| {
1076            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1077                let packet = res.map_err(|e| ReadError {
1078                    context: ErrorContext::WebSocketStream,
1079                    kind: e.into(),
1080                })?;
1081                Ok(if let Some((len, buf)) = state {
1082                    if let tungstenite027::Message::Binary(data) = packet {
1083                        buf.extend_from_slice(&data);
1084                    } else {
1085                        return Err(ReadError {
1086                            context: ErrorContext::DefaultImpl,
1087                            kind: ReadErrorKind::MessageKind027(packet),
1088                        })
1089                    }
1090                    if buf.len() >= *len {
1091                        let buf = mem::take(buf);
1092                        *state = None;
1093                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1094                            context: ErrorContext::WebSocket {
1095                                source: Box::new(context),
1096                            },
1097                            kind,
1098                        })?)))
1099                    } else {
1100                        Either::Left(stream::empty())
1101                    }
1102                } else {
1103                    match packet {
1104                        tungstenite027::Message::Text(data) => match data.chars().next() {
1105                            Some('m') => {
1106                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1107                                    context: ErrorContext::DefaultImpl,
1108                                    kind: e.into(),
1109                                })?;
1110                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1111                                    context: ErrorContext::DefaultImpl,
1112                                    kind: e.into(),
1113                                })?;
1114                                *state = Some((len, buf));
1115                                Either::Left(stream::empty())
1116                            }
1117                            _ => return Err(ReadError {
1118                                context: ErrorContext::DefaultImpl,
1119                                kind: ReadErrorKind::WebSocketTextMessage027(data),
1120                            }),
1121                        },
1122                        tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1123                            context: ErrorContext::WebSocket {
1124                                source: Box::new(context),
1125                            },
1126                            kind,
1127                        })?))),
1128                        _ => return Err(ReadError {
1129                            context: ErrorContext::DefaultImpl,
1130                            kind: ReadErrorKind::MessageKind027(packet),
1131                        }),
1132                    }
1133                })
1134            }
1135
1136            future::ready(Some(scanner(state, res)))
1137        }).try_flatten(),
1138    ))
1139}