async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bitvec`](https://docs.rs/bitvec): [`BitVec<u8, Lsb0>`](https://docs.rs/bitvec/latest/bitvec/vec/struct.BitVec.html)
17//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
18//! * [`bytesize`](https://docs.rs/bytesize): [`ByteSize`](https://docs.rs/bytesize/latest/bytesize/struct.ByteSize.html)
19//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
20//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
21//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
22//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
23//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
24//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
25//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
26//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
27//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
28//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
29//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
30//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
31//! * [`url`](https://docs.rs/url): [`Url`](https://docs.rs/url/latest/url/struct.Url.html)
32//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
33//!
34//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
35//!
36//! * The latest release (currently [`tokio-tungstenite` 0.27](https://docs.rs/tokio-tungstenite/0.27), feature flag `tokio-tungstenite027`)
37//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
38//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
39
40use {
41    std::{
42        future::Future,
43        io::{
44            self,
45            prelude::*,
46        },
47        pin::Pin,
48    },
49    tokio::io::{
50        AsyncRead,
51        AsyncWrite,
52    },
53};
54#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
55    std::{
56        iter,
57        mem,
58    },
59    fallible_collections::FallibleVec,
60    futures::{
61        Sink,
62        SinkExt as _,
63        future::{
64            self,
65            Either,
66        },
67        stream::{
68            self,
69            Stream,
70            StreamExt as _,
71            TryStreamExt as _,
72        },
73    },
74};
75#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
76#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
77#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
78pub use {
79    async_proto_derive::{
80        Protocol,
81        bitflags,
82    },
83    crate::error::*,
84};
85#[doc(hidden)] pub use tokio; // used in proc macro
86
87mod error;
88mod impls;
89
90/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
91#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
92
93/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
94pub trait Protocol: Sized {
95    /// Reads a value of this type from an async stream.
96    ///
97    /// # Cancellation safety
98    ///
99    /// Implementations of this method are generally not cancellation safe.
100    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
101    /// Writes a value of this type to an async sink.
102    ///
103    /// # Cancellation safety
104    ///
105    /// Implementations of this method are generally not cancellation safe.
106    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
107    /// Reads a value of this type from a sync stream.
108    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
109    /// Writes a value of this type to a sync sink.
110    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
111
112    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
113    ///
114    /// This can be used to get around drop glue issues that might arise with `read`.
115    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
116        Box::pin(async move {
117            let value = Self::read(&mut stream).await?;
118            Ok((stream, value))
119        })
120    }
121
122    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
123    ///
124    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
125    ///
126    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
127    ///
128    /// # Example
129    ///
130    /// ```
131    /// use {
132    ///     std::{
133    ///         io,
134    ///         net::TcpStream,
135    ///     },
136    ///     async_proto::Protocol,
137    /// };
138    ///
139    /// struct Client {
140    ///     tcp_stream: TcpStream,
141    ///     buf: Vec<u8>,
142    /// }
143    ///
144    /// impl Client {
145    ///     fn new(tcp_stream: TcpStream) -> Self {
146    ///         Self {
147    ///             tcp_stream,
148    ///             buf: Vec::default(),
149    ///         }
150    ///     }
151    ///
152    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
153    ///         self.tcp_stream.set_nonblocking(true)?;
154    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
155    ///     }
156    ///
157    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
158    ///         self.tcp_stream.set_nonblocking(false)?;
159    ///         msg.write_sync(&mut self.tcp_stream)?;
160    ///         Ok(())
161    ///     }
162    /// }
163    /// ```
164    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
165        let mut temp_buf = vec![0; 8];
166        loop {
167            let mut slice = &mut &**buf;
168            match Self::read_sync(&mut slice) {
169                Ok(value) => {
170                    let value_len = slice.len();
171                    buf.drain(..buf.len() - value_len);
172                    return Ok(Some(value))
173                }
174                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
175                Err(e) => return Err(e),
176            }
177            match stream.read(&mut temp_buf) {
178                Ok(0) => return Err(ReadError {
179                    context: ErrorContext::DefaultImpl,
180                    kind: ReadErrorKind::EndOfStream,
181                }),
182                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
183                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
184                Err(e) => return Err(ReadError {
185                    context: ErrorContext::DefaultImpl,
186                    kind: e.into(),
187                }),
188            }
189        }
190    }
191
192    #[cfg(feature = "tokio-tungstenite021")]
193    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
194    /// Reads a value of this type from a `tokio-tungstenite` websocket.
195    ///
196    /// # Cancellation safety
197    ///
198    /// The default implementation of this method is not cancellation safe.
199    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
200        Box::pin(async move {
201            let packet = stream.try_next().await.map_err(|e| ReadError {
202                context: ErrorContext::DefaultImpl,
203                kind: e.into(),
204            })?.ok_or_else(|| ReadError {
205                context: ErrorContext::DefaultImpl,
206                kind: ReadErrorKind::EndOfStream,
207            })?;
208            match packet {
209                tungstenite021::Message::Text(data) => match data.chars().next() {
210                    Some('m') => {
211                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
212                            context: ErrorContext::DefaultImpl,
213                            kind: e.into(),
214                        })?;
215                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
216                            context: ErrorContext::DefaultImpl,
217                            kind: e.into(),
218                        })?;
219                        while buf.len() < len {
220                            let packet = stream.try_next().await.map_err(|e| ReadError {
221                                context: ErrorContext::DefaultImpl,
222                                kind: e.into(),
223                            })?.ok_or_else(|| ReadError {
224                                context: ErrorContext::DefaultImpl,
225                                kind: ReadErrorKind::EndOfStream,
226                            })?;
227                            if let tungstenite021::Message::Binary(data) = packet {
228                                buf.extend_from_slice(&data);
229                            } else {
230                                return Err(ReadError {
231                                    context: ErrorContext::DefaultImpl,
232                                    kind: ReadErrorKind::MessageKind021(packet),
233                                })
234                            }
235                        }
236                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
237                            context: ErrorContext::WebSocket {
238                                source: Box::new(context),
239                            },
240                            kind,
241                        })
242                    }
243                    _ => Err(ReadError {
244                        context: ErrorContext::DefaultImpl,
245                        kind: ReadErrorKind::WebSocketTextMessage024(data),
246                    }),
247                },
248                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
249                    context: ErrorContext::WebSocket {
250                        source: Box::new(context),
251                    },
252                    kind,
253                }),
254                _ => Err(ReadError {
255                    context: ErrorContext::DefaultImpl,
256                    kind: ReadErrorKind::MessageKind021(packet),
257                }),
258            }
259        })
260    }
261
262    #[cfg(feature = "tokio-tungstenite024")]
263    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
264    /// Reads a value of this type from a `tokio-tungstenite` websocket.
265    ///
266    /// # Cancellation safety
267    ///
268    /// The default implementation of this method is not cancellation safe.
269    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
270        Box::pin(async move {
271            let packet = stream.try_next().await.map_err(|e| ReadError {
272                context: ErrorContext::DefaultImpl,
273                kind: e.into(),
274            })?.ok_or_else(|| ReadError {
275                context: ErrorContext::DefaultImpl,
276                kind: ReadErrorKind::EndOfStream,
277            })?;
278            match packet {
279                tungstenite024::Message::Text(data) => match data.chars().next() {
280                    Some('m') => {
281                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
282                            context: ErrorContext::DefaultImpl,
283                            kind: e.into(),
284                        })?;
285                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
286                            context: ErrorContext::DefaultImpl,
287                            kind: e.into(),
288                        })?;
289                        while buf.len() < len {
290                            let packet = stream.try_next().await.map_err(|e| ReadError {
291                                context: ErrorContext::DefaultImpl,
292                                kind: e.into(),
293                            })?.ok_or_else(|| ReadError {
294                                context: ErrorContext::DefaultImpl,
295                                kind: ReadErrorKind::EndOfStream,
296                            })?;
297                            if let tungstenite024::Message::Binary(data) = packet {
298                                buf.extend_from_slice(&data);
299                            } else {
300                                return Err(ReadError {
301                                    context: ErrorContext::DefaultImpl,
302                                    kind: ReadErrorKind::MessageKind024(packet),
303                                })
304                            }
305                        }
306                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
307                            context: ErrorContext::WebSocket {
308                                source: Box::new(context),
309                            },
310                            kind,
311                        })
312                    }
313                    _ => Err(ReadError {
314                        context: ErrorContext::DefaultImpl,
315                        kind: ReadErrorKind::WebSocketTextMessage024(data),
316                    }),
317                },
318                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
319                    context: ErrorContext::WebSocket {
320                        source: Box::new(context),
321                    },
322                    kind,
323                }),
324                _ => Err(ReadError {
325                    context: ErrorContext::DefaultImpl,
326                    kind: ReadErrorKind::MessageKind024(packet),
327                }),
328            }
329        })
330    }
331
332    #[cfg(feature = "tokio-tungstenite027")]
333    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
334    /// Reads a value of this type from a `tokio-tungstenite` websocket.
335    ///
336    /// # Cancellation safety
337    ///
338    /// The default implementation of this method is not cancellation safe.
339    fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
340        Box::pin(async move {
341            let packet = stream.try_next().await.map_err(|e| ReadError {
342                context: ErrorContext::DefaultImpl,
343                kind: e.into(),
344            })?.ok_or_else(|| ReadError {
345                context: ErrorContext::DefaultImpl,
346                kind: ReadErrorKind::EndOfStream,
347            })?;
348            match packet {
349                tungstenite027::Message::Text(data) => match data.chars().next() {
350                    Some('m') => {
351                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
352                            context: ErrorContext::DefaultImpl,
353                            kind: e.into(),
354                        })?;
355                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
356                            context: ErrorContext::DefaultImpl,
357                            kind: e.into(),
358                        })?;
359                        while buf.len() < len {
360                            let packet = stream.try_next().await.map_err(|e| ReadError {
361                                context: ErrorContext::DefaultImpl,
362                                kind: e.into(),
363                            })?.ok_or_else(|| ReadError {
364                                context: ErrorContext::DefaultImpl,
365                                kind: ReadErrorKind::EndOfStream,
366                            })?;
367                            if let tungstenite027::Message::Binary(data) = packet {
368                                buf.extend_from_slice(&data);
369                            } else {
370                                return Err(ReadError {
371                                    context: ErrorContext::DefaultImpl,
372                                    kind: ReadErrorKind::MessageKind027(packet),
373                                })
374                            }
375                        }
376                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
377                            context: ErrorContext::WebSocket {
378                                source: Box::new(context),
379                            },
380                            kind,
381                        })
382                    }
383                    _ => Err(ReadError {
384                        context: ErrorContext::DefaultImpl,
385                        kind: ReadErrorKind::WebSocketTextMessage027(data),
386                    }),
387                },
388                tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
389                    context: ErrorContext::WebSocket {
390                        source: Box::new(context),
391                    },
392                    kind,
393                }),
394                _ => Err(ReadError {
395                    context: ErrorContext::DefaultImpl,
396                    kind: ReadErrorKind::MessageKind027(packet),
397                }),
398            }
399        })
400    }
401
402    #[cfg(feature = "tokio-tungstenite021")]
403    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
404    /// Writes a value of this type to a `tokio-tungstenite` websocket.
405    ///
406    /// # Cancellation safety
407    ///
408    /// The default implementation of this method is not cancellation safe.
409    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
410    where Self: Sync {
411        Box::pin(async move {
412            let mut buf = Vec::default();
413            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
414                context: ErrorContext::WebSocket {
415                    source: Box::new(context),
416                },
417                kind,
418            })?;
419            if buf.len() <= WS_MAX_MESSAGE_SIZE {
420                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
421                    context: ErrorContext::DefaultImpl,
422                    kind: e.into(),
423                })?;
424            } else {
425                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
426                    context: ErrorContext::DefaultImpl,
427                    kind: e.into(),
428                })?;
429                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
430                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
431                        context: ErrorContext::DefaultImpl,
432                        kind: e.into(),
433                    })?;
434                }
435            }
436            Ok(())
437        })
438    }
439
440    #[cfg(feature = "tokio-tungstenite024")]
441    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
442    /// Writes a value of this type to a `tokio-tungstenite` websocket.
443    ///
444    /// # Cancellation safety
445    ///
446    /// The default implementation of this method is not cancellation safe.
447    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
448    where Self: Sync {
449        Box::pin(async move {
450            let mut buf = Vec::default();
451            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
452                context: ErrorContext::WebSocket {
453                    source: Box::new(context),
454                },
455                kind,
456            })?;
457            if buf.len() <= WS_MAX_MESSAGE_SIZE {
458                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
459                    context: ErrorContext::DefaultImpl,
460                    kind: e.into(),
461                })?;
462            } else {
463                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
464                    context: ErrorContext::DefaultImpl,
465                    kind: e.into(),
466                })?;
467                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
468                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
469                        context: ErrorContext::DefaultImpl,
470                        kind: e.into(),
471                    })?;
472                }
473            }
474            Ok(())
475        })
476    }
477
478    #[cfg(feature = "tokio-tungstenite027")]
479    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
480    /// Writes a value of this type to a `tokio-tungstenite` websocket.
481    ///
482    /// # Cancellation safety
483    ///
484    /// The default implementation of this method is not cancellation safe.
485    fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
486    where Self: Sync {
487        Box::pin(async move {
488            let mut buf = Vec::default();
489            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
490                context: ErrorContext::WebSocket {
491                    source: Box::new(context),
492                },
493                kind,
494            })?;
495            if buf.len() <= WS_MAX_MESSAGE_SIZE {
496                sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
497                    context: ErrorContext::DefaultImpl,
498                    kind: e.into(),
499                })?;
500            } else {
501                sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
502                    context: ErrorContext::DefaultImpl,
503                    kind: e.into(),
504                })?;
505                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
506                    sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
507                        context: ErrorContext::DefaultImpl,
508                        kind: e.into(),
509                    })?;
510                }
511            }
512            Ok(())
513        })
514    }
515
516    #[cfg(feature = "tokio-tungstenite021")]
517    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
518    /// Reads a value of this type from a [`tungstenite021`] websocket.
519    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
520        let packet = websocket.read().map_err(|e| ReadError {
521            context: ErrorContext::DefaultImpl,
522            kind: e.into(),
523        })?;
524        match packet {
525            tungstenite021::Message::Text(data) => match data.chars().next() {
526                Some('m') => {
527                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
528                        context: ErrorContext::DefaultImpl,
529                        kind: e.into(),
530                    })?;
531                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
532                        context: ErrorContext::DefaultImpl,
533                        kind: e.into(),
534                    })?;
535                    while buf.len() < len {
536                        let packet = websocket.read().map_err(|e| ReadError {
537                            context: ErrorContext::DefaultImpl,
538                            kind: e.into(),
539                        })?;
540                        if let tungstenite021::Message::Binary(data) = packet {
541                            buf.extend_from_slice(&data);
542                        } else {
543                            return Err(ReadError {
544                                context: ErrorContext::DefaultImpl,
545                                kind: ReadErrorKind::MessageKind021(packet),
546                            })
547                        }
548                    }
549                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
550                        context: ErrorContext::WebSocket {
551                            source: Box::new(context),
552                        },
553                        kind,
554                    })
555                }
556                _ => return Err(ReadError {
557                    context: ErrorContext::DefaultImpl,
558                    kind: ReadErrorKind::WebSocketTextMessage024(data),
559                }),
560            },
561            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
562                context: ErrorContext::WebSocket {
563                    source: Box::new(context),
564                },
565                kind,
566            }),
567            _ => Err(ReadError {
568                context: ErrorContext::DefaultImpl,
569                kind: ReadErrorKind::MessageKind021(packet),
570            }),
571        }
572    }
573
574    #[cfg(feature = "tokio-tungstenite024")]
575    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
576    /// Reads a value of this type from a [`tungstenite024`] websocket.
577    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
578        let packet = websocket.read().map_err(|e| ReadError {
579            context: ErrorContext::DefaultImpl,
580            kind: e.into(),
581        })?;
582        match packet {
583            tungstenite024::Message::Text(data) => match data.chars().next() {
584                Some('m') => {
585                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
586                        context: ErrorContext::DefaultImpl,
587                        kind: e.into(),
588                    })?;
589                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
590                        context: ErrorContext::DefaultImpl,
591                        kind: e.into(),
592                    })?;
593                    while buf.len() < len {
594                        let packet = websocket.read().map_err(|e| ReadError {
595                            context: ErrorContext::DefaultImpl,
596                            kind: e.into(),
597                        })?;
598                        if let tungstenite024::Message::Binary(data) = packet {
599                            buf.extend_from_slice(&data);
600                        } else {
601                            return Err(ReadError {
602                                context: ErrorContext::DefaultImpl,
603                                kind: ReadErrorKind::MessageKind024(packet),
604                            })
605                        }
606                    }
607                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
608                        context: ErrorContext::WebSocket {
609                            source: Box::new(context),
610                        },
611                        kind,
612                    })
613                }
614                _ => return Err(ReadError {
615                    context: ErrorContext::DefaultImpl,
616                    kind: ReadErrorKind::WebSocketTextMessage024(data),
617                }),
618            },
619            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
620                context: ErrorContext::WebSocket {
621                    source: Box::new(context),
622                },
623                kind,
624            }),
625            _ => Err(ReadError {
626                context: ErrorContext::DefaultImpl,
627                kind: ReadErrorKind::MessageKind024(packet),
628            }),
629        }
630    }
631
632    #[cfg(feature = "tokio-tungstenite027")]
633    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
634    /// Reads a value of this type from a [`tungstenite027`] websocket.
635    fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
636        let packet = websocket.read().map_err(|e| ReadError {
637            context: ErrorContext::DefaultImpl,
638            kind: e.into(),
639        })?;
640        match packet {
641            tungstenite027::Message::Text(data) => match data.chars().next() {
642                Some('m') => {
643                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
644                        context: ErrorContext::DefaultImpl,
645                        kind: e.into(),
646                    })?;
647                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
648                        context: ErrorContext::DefaultImpl,
649                        kind: e.into(),
650                    })?;
651                    while buf.len() < len {
652                        let packet = websocket.read().map_err(|e| ReadError {
653                            context: ErrorContext::DefaultImpl,
654                            kind: e.into(),
655                        })?;
656                        if let tungstenite027::Message::Binary(data) = packet {
657                            buf.extend_from_slice(&data);
658                        } else {
659                            return Err(ReadError {
660                                context: ErrorContext::DefaultImpl,
661                                kind: ReadErrorKind::MessageKind027(packet),
662                            })
663                        }
664                    }
665                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
666                        context: ErrorContext::WebSocket {
667                            source: Box::new(context),
668                        },
669                        kind,
670                    })
671                }
672                _ => return Err(ReadError {
673                    context: ErrorContext::DefaultImpl,
674                    kind: ReadErrorKind::WebSocketTextMessage027(data),
675                }),
676            },
677            tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
678                context: ErrorContext::WebSocket {
679                    source: Box::new(context),
680                },
681                kind,
682            }),
683            _ => Err(ReadError {
684                context: ErrorContext::DefaultImpl,
685                kind: ReadErrorKind::MessageKind027(packet),
686            }),
687        }
688    }
689
690    #[cfg(feature = "tokio-tungstenite021")]
691    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
692    /// Writes a value of this type to a [`tungstenite021`] websocket.
693    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
694        let mut buf = Vec::default();
695        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
696            context: ErrorContext::WebSocket {
697                source: Box::new(context),
698            },
699            kind,
700        })?;
701        if buf.len() <= WS_MAX_MESSAGE_SIZE {
702            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
703                context: ErrorContext::DefaultImpl,
704                kind: e.into(),
705            })?;
706        } else {
707            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
708                context: ErrorContext::DefaultImpl,
709                kind: e.into(),
710            })?;
711            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
712                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
713                    context: ErrorContext::DefaultImpl,
714                    kind: e.into(),
715                })?;
716            }
717        }
718        websocket.flush().map_err(|e| WriteError {
719            context: ErrorContext::DefaultImpl,
720            kind: e.into(),
721        })?;
722        Ok(())
723    }
724
725    #[cfg(feature = "tokio-tungstenite024")]
726    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
727    /// Writes a value of this type to a [`tungstenite024`] websocket.
728    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
729        let mut buf = Vec::default();
730        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
731            context: ErrorContext::WebSocket {
732                source: Box::new(context),
733            },
734            kind,
735        })?;
736        if buf.len() <= WS_MAX_MESSAGE_SIZE {
737            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
738                context: ErrorContext::DefaultImpl,
739                kind: e.into(),
740            })?;
741        } else {
742            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
743                context: ErrorContext::DefaultImpl,
744                kind: e.into(),
745            })?;
746            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
747                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
748                    context: ErrorContext::DefaultImpl,
749                    kind: e.into(),
750                })?;
751            }
752        }
753        websocket.flush().map_err(|e| WriteError {
754            context: ErrorContext::DefaultImpl,
755            kind: e.into(),
756        })?;
757        Ok(())
758    }
759
760    #[cfg(feature = "tokio-tungstenite027")]
761    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
762    /// Writes a value of this type to a [`tungstenite027`] websocket.
763    fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
764        let mut buf = Vec::default();
765        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
766            context: ErrorContext::WebSocket {
767                source: Box::new(context),
768            },
769            kind,
770        })?;
771        if buf.len() <= WS_MAX_MESSAGE_SIZE {
772            websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
773                context: ErrorContext::DefaultImpl,
774                kind: e.into(),
775            })?;
776        } else {
777            websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
778                context: ErrorContext::DefaultImpl,
779                kind: e.into(),
780            })?;
781            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
782                websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
783                    context: ErrorContext::DefaultImpl,
784                    kind: e.into(),
785                })?;
786            }
787        }
788        websocket.flush().map_err(|e| WriteError {
789            context: ErrorContext::DefaultImpl,
790            kind: e.into(),
791        })?;
792        Ok(())
793    }
794
795    #[cfg(feature = "tokio-tungstenite021")]
796    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
797    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
798    ///
799    /// This can be used to get around drop glue issues that might arise with `read_ws`.
800    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
801        Box::pin(async move {
802            let value = Self::read_ws021(&mut stream).await?;
803            Ok((stream, value))
804        })
805    }
806
807    #[cfg(feature = "tokio-tungstenite024")]
808    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
809    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
810    ///
811    /// This can be used to get around drop glue issues that might arise with `read_ws`.
812    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
813        Box::pin(async move {
814            let value = Self::read_ws024(&mut stream).await?;
815            Ok((stream, value))
816        })
817    }
818
819    #[cfg(feature = "tokio-tungstenite027")]
820    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
821    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
822    ///
823    /// This can be used to get around drop glue issues that might arise with `read_ws`.
824    fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
825        Box::pin(async move {
826            let value = Self::read_ws027(&mut stream).await?;
827            Ok((stream, value))
828        })
829    }
830}
831
832/// This trait allows restricting the acceptable length of collection types.
833///
834/// By default, types from this crate implementing this trait represent their length as a `u64`. If the maximum length is limited (e.g. using the `#[async_proto(max_len = ...)]` attribute when deriving [`Protocol`]), the length may be represented using a smaller integer type.
835pub trait LengthPrefixed: Protocol {
836    /// Reads a value of this type from an async stream, limiting the length to the given value.
837    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
838    /// Writes a value of this type to an async sink, limiting the length to the given value.
839    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
840    /// Reads a value of this type from a sync stream, limiting the length to the given value.
841    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
842    /// Writes a value of this type to a sync sink, limiting the length to the given value.
843    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
844}
845
846/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
847///
848/// Useful for WebSocket connections where the message type per direction is always the same.
849#[cfg(feature = "tokio-tungstenite021")]
850#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
851pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
852    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
853    let (sink, stream) = sock.split();
854    Ok((
855        sink.sink_map_err(|e| WriteError {
856            context: ErrorContext::WebSocketSink,
857            kind: e.into(),
858        }).with_flat_map::<W, _, _>(|msg| {
859            let mut buf = Vec::default();
860            match msg.write_sync(&mut buf) {
861                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
862                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
863                } else {
864                    Either::Right(stream::iter(
865                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
866                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
867                        .collect::<Vec<_>>()
868                    ))
869                }.map(Ok)),
870                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
871                    context: ErrorContext::WebSocket {
872                        source: Box::new(context),
873                    },
874                    kind,
875                }))),
876            }
877        }),
878        stream.scan(None, |state, res| {
879            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
880                let packet = res.map_err(|e| ReadError {
881                    context: ErrorContext::WebSocketStream,
882                    kind: e.into(),
883                })?;
884                Ok(if let Some((len, buf)) = state {
885                    if let tungstenite021::Message::Binary(data) = packet {
886                        buf.extend_from_slice(&data);
887                    } else {
888                        return Err(ReadError {
889                            context: ErrorContext::DefaultImpl,
890                            kind: ReadErrorKind::MessageKind021(packet),
891                        })
892                    }
893                    if buf.len() >= *len {
894                        let buf = mem::take(buf);
895                        *state = None;
896                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
897                            context: ErrorContext::WebSocket {
898                                source: Box::new(context),
899                            },
900                            kind,
901                        })?)))
902                    } else {
903                        Either::Left(stream::empty())
904                    }
905                } else {
906                    match packet {
907                        tungstenite021::Message::Text(data) => match data.chars().next() {
908                            Some('m') => {
909                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
910                                    context: ErrorContext::DefaultImpl,
911                                    kind: e.into(),
912                                })?;
913                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
914                                    context: ErrorContext::DefaultImpl,
915                                    kind: e.into(),
916                                })?;
917                                *state = Some((len, buf));
918                                Either::Left(stream::empty())
919                            }
920                            _ => return Err(ReadError {
921                                context: ErrorContext::DefaultImpl,
922                                kind: ReadErrorKind::WebSocketTextMessage024(data),
923                            }),
924                        },
925                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
926                            context: ErrorContext::WebSocket {
927                                source: Box::new(context),
928                            },
929                            kind,
930                        })?))),
931                        _ => return Err(ReadError {
932                            context: ErrorContext::DefaultImpl,
933                            kind: ReadErrorKind::MessageKind021(packet),
934                        }),
935                    }
936                })
937            }
938
939            future::ready(Some(scanner(state, res)))
940        }).try_flatten(),
941    ))
942}
943
944/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
945///
946/// Useful for WebSocket connections where the message type per direction is always the same.
947#[cfg(feature = "tokio-tungstenite024")]
948#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
949pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
950    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
951    let (sink, stream) = sock.split();
952    Ok((
953        sink.sink_map_err(|e| WriteError {
954            context: ErrorContext::WebSocketSink,
955            kind: e.into(),
956        }).with_flat_map::<W, _, _>(|msg| {
957            let mut buf = Vec::default();
958            match msg.write_sync(&mut buf) {
959                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
960                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
961                } else {
962                    Either::Right(stream::iter(
963                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
964                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
965                        .collect::<Vec<_>>()
966                    ))
967                }.map(Ok)),
968                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
969                    context: ErrorContext::WebSocket {
970                        source: Box::new(context),
971                    },
972                    kind,
973                }))),
974            }
975        }),
976        stream.scan(None, |state, res| {
977            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
978                let packet = res.map_err(|e| ReadError {
979                    context: ErrorContext::WebSocketStream,
980                    kind: e.into(),
981                })?;
982                Ok(if let Some((len, buf)) = state {
983                    if let tungstenite024::Message::Binary(data) = packet {
984                        buf.extend_from_slice(&data);
985                    } else {
986                        return Err(ReadError {
987                            context: ErrorContext::DefaultImpl,
988                            kind: ReadErrorKind::MessageKind024(packet),
989                        })
990                    }
991                    if buf.len() >= *len {
992                        let buf = mem::take(buf);
993                        *state = None;
994                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
995                            context: ErrorContext::WebSocket {
996                                source: Box::new(context),
997                            },
998                            kind,
999                        })?)))
1000                    } else {
1001                        Either::Left(stream::empty())
1002                    }
1003                } else {
1004                    match packet {
1005                        tungstenite024::Message::Text(data) => match data.chars().next() {
1006                            Some('m') => {
1007                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1008                                    context: ErrorContext::DefaultImpl,
1009                                    kind: e.into(),
1010                                })?;
1011                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1012                                    context: ErrorContext::DefaultImpl,
1013                                    kind: e.into(),
1014                                })?;
1015                                *state = Some((len, buf));
1016                                Either::Left(stream::empty())
1017                            }
1018                            _ => return Err(ReadError {
1019                                context: ErrorContext::DefaultImpl,
1020                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1021                            }),
1022                        },
1023                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1024                            context: ErrorContext::WebSocket {
1025                                source: Box::new(context),
1026                            },
1027                            kind,
1028                        })?))),
1029                        _ => return Err(ReadError {
1030                            context: ErrorContext::DefaultImpl,
1031                            kind: ReadErrorKind::MessageKind024(packet),
1032                        }),
1033                    }
1034                })
1035            }
1036
1037            future::ready(Some(scanner(state, res)))
1038        }).try_flatten(),
1039    ))
1040}
1041
1042/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1043///
1044/// Useful for WebSocket connections where the message type per direction is always the same.
1045#[cfg(feature = "tokio-tungstenite027")]
1046#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1047pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1048    let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1049    let (sink, stream) = sock.split();
1050    Ok((
1051        sink.sink_map_err(|e| WriteError {
1052            context: ErrorContext::WebSocketSink,
1053            kind: e.into(),
1054        }).with_flat_map::<W, _, _>(|msg| {
1055            let mut buf = Vec::default();
1056            match msg.write_sync(&mut buf) {
1057                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1058                    Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1059                } else {
1060                    Either::Right(stream::iter(
1061                        iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1062                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1063                        .collect::<Vec<_>>()
1064                    ))
1065                }.map(Ok)),
1066                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1067                    context: ErrorContext::WebSocket {
1068                        source: Box::new(context),
1069                    },
1070                    kind,
1071                }))),
1072            }
1073        }),
1074        stream.scan(None, |state, res| {
1075            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1076                let packet = res.map_err(|e| ReadError {
1077                    context: ErrorContext::WebSocketStream,
1078                    kind: e.into(),
1079                })?;
1080                Ok(if let Some((len, buf)) = state {
1081                    if let tungstenite027::Message::Binary(data) = packet {
1082                        buf.extend_from_slice(&data);
1083                    } else {
1084                        return Err(ReadError {
1085                            context: ErrorContext::DefaultImpl,
1086                            kind: ReadErrorKind::MessageKind027(packet),
1087                        })
1088                    }
1089                    if buf.len() >= *len {
1090                        let buf = mem::take(buf);
1091                        *state = None;
1092                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1093                            context: ErrorContext::WebSocket {
1094                                source: Box::new(context),
1095                            },
1096                            kind,
1097                        })?)))
1098                    } else {
1099                        Either::Left(stream::empty())
1100                    }
1101                } else {
1102                    match packet {
1103                        tungstenite027::Message::Text(data) => match data.chars().next() {
1104                            Some('m') => {
1105                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1106                                    context: ErrorContext::DefaultImpl,
1107                                    kind: e.into(),
1108                                })?;
1109                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1110                                    context: ErrorContext::DefaultImpl,
1111                                    kind: e.into(),
1112                                })?;
1113                                *state = Some((len, buf));
1114                                Either::Left(stream::empty())
1115                            }
1116                            _ => return Err(ReadError {
1117                                context: ErrorContext::DefaultImpl,
1118                                kind: ReadErrorKind::WebSocketTextMessage027(data),
1119                            }),
1120                        },
1121                        tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1122                            context: ErrorContext::WebSocket {
1123                                source: Box::new(context),
1124                            },
1125                            kind,
1126                        })?))),
1127                        _ => return Err(ReadError {
1128                            context: ErrorContext::DefaultImpl,
1129                            kind: ReadErrorKind::MessageKind027(packet),
1130                        }),
1131                    }
1132                })
1133            }
1134
1135            future::ready(Some(scanner(state, res)))
1136        }).try_flatten(),
1137    ))
1138}