Skip to main content

async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bitvec`](https://docs.rs/bitvec): [`BitVec<u8, Lsb0>`](https://docs.rs/bitvec/latest/bitvec/vec/struct.BitVec.html)
17//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
18//! * [`bytesize`](https://docs.rs/bytesize): [`ByteSize`](https://docs.rs/bytesize/latest/bytesize/struct.ByteSize.html)
19//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
20//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
21//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
22//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
23//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
24//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
25//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
26//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
27//! * [`nonempty-collections`](https://docs.rs/nonempty-collections): [`NEMap`](https://docs.rs/nonempty-collections/latest/nonempty_collections/map/struct.NEMap.html), [`NESet`](https://docs.rs/nonempty-collections/latest/nonempty_collections/set/struct.NESet.html), and [`NEVec`](https://docs.rs/nonempty-collections/latest/nonempty_collections/vector/struct.NEVec.html)
28//! * [`os_info`](https://docs.rs/os_info): [`Type`](https://docs.rs/os_info/latest/os_info/enum.Type.html) and [`Version`](https://docs.rs/os_info/latest/os_info/enum.Version.html)
29//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
30//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
31//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
32//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
33//! * [`hematite-nbt`](https://docs.rs/hematite-nbt): [`Blob`](https://docs.rs/hematite-nbt/latest/nbt/struct.Blob.html)
34//! * [`url`](https://docs.rs/url): [`Url`](https://docs.rs/url/latest/url/struct.Url.html)
35//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
36//!
37//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
38//!
39//! * The latest release (currently [`tokio-tungstenite` 0.28](https://docs.rs/tokio-tungstenite/0.28), feature flag `tokio-tungstenite028`)
40//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
41//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
42
43use {
44    std::{
45        future::Future,
46        io::{
47            self,
48            prelude::*,
49        },
50        pin::Pin,
51    },
52    tokio::io::{
53        AsyncRead,
54        AsyncWrite,
55    },
56};
57#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] use {
58    std::{
59        iter,
60        mem,
61    },
62    fallible_collections::FallibleVec,
63    futures::{
64        Sink,
65        SinkExt as _,
66        future::{
67            self,
68            Either,
69        },
70        stream::{
71            self,
72            Stream,
73            StreamExt as _,
74            TryStreamExt as _,
75        },
76    },
77};
78#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
79#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
80#[cfg(feature = "tokio-tungstenite028")] use tokio_tungstenite028::tungstenite as tungstenite028;
81pub use {
82    async_proto_derive::{
83        Protocol,
84        bitflags,
85    },
86    crate::error::*,
87};
88#[doc(hidden)] pub use tokio; // used in proc macro
89
90mod error;
91mod impls;
92
93/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
94#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
95
96/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
97pub trait Protocol: Sized {
98    /// Reads a value of this type from an async stream.
99    ///
100    /// # Cancellation safety
101    ///
102    /// Implementations of this method are generally not cancellation safe.
103    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
104    /// Writes a value of this type to an async sink.
105    ///
106    /// # Cancellation safety
107    ///
108    /// Implementations of this method are generally not cancellation safe.
109    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
110    /// Reads a value of this type from a sync stream.
111    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
112    /// Writes a value of this type to a sync sink.
113    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
114
115    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
116    ///
117    /// This can be used to get around drop glue issues that might arise with `read`.
118    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
119        Box::pin(async move {
120            let value = Self::read(&mut stream).await?;
121            Ok((stream, value))
122        })
123    }
124
125    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
126    ///
127    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
128    ///
129    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
130    ///
131    /// # Example
132    ///
133    /// ```
134    /// use {
135    ///     std::{
136    ///         io,
137    ///         net::TcpStream,
138    ///     },
139    ///     async_proto::Protocol,
140    /// };
141    ///
142    /// struct Client {
143    ///     tcp_stream: TcpStream,
144    ///     buf: Vec<u8>,
145    /// }
146    ///
147    /// impl Client {
148    ///     fn new(tcp_stream: TcpStream) -> Self {
149    ///         Self {
150    ///             tcp_stream,
151    ///             buf: Vec::default(),
152    ///         }
153    ///     }
154    ///
155    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
156    ///         self.tcp_stream.set_nonblocking(true)?;
157    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
158    ///     }
159    ///
160    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
161    ///         self.tcp_stream.set_nonblocking(false)?;
162    ///         msg.write_sync(&mut self.tcp_stream)?;
163    ///         Ok(())
164    ///     }
165    /// }
166    /// ```
167    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
168        let mut temp_buf = vec![0; 8];
169        loop {
170            let mut slice = &mut &**buf;
171            match Self::read_sync(&mut slice) {
172                Ok(value) => {
173                    let value_len = slice.len();
174                    buf.drain(..buf.len() - value_len);
175                    return Ok(Some(value))
176                }
177                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
178                Err(e) => return Err(e),
179            }
180            match stream.read(&mut temp_buf) {
181                Ok(0) => return Err(ReadError {
182                    context: ErrorContext::DefaultImpl,
183                    kind: ReadErrorKind::EndOfStream,
184                }),
185                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
186                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
187                Err(e) => return Err(ReadError {
188                    context: ErrorContext::DefaultImpl,
189                    kind: e.into(),
190                }),
191            }
192        }
193    }
194
195    #[cfg(feature = "tokio-tungstenite021")]
196    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
197    /// Reads a value of this type from a `tokio-tungstenite` websocket.
198    ///
199    /// # Cancellation safety
200    ///
201    /// The default implementation of this method is not cancellation safe.
202    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
203        Box::pin(async move {
204            let packet = stream.try_next().await.map_err(|e| ReadError {
205                context: ErrorContext::DefaultImpl,
206                kind: e.into(),
207            })?.ok_or_else(|| ReadError {
208                context: ErrorContext::DefaultImpl,
209                kind: ReadErrorKind::EndOfStream,
210            })?;
211            match packet {
212                tungstenite021::Message::Text(data) => match data.chars().next() {
213                    Some('m') => {
214                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
215                            context: ErrorContext::DefaultImpl,
216                            kind: e.into(),
217                        })?;
218                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
219                            context: ErrorContext::DefaultImpl,
220                            kind: e.into(),
221                        })?;
222                        while buf.len() < len {
223                            let packet = stream.try_next().await.map_err(|e| ReadError {
224                                context: ErrorContext::DefaultImpl,
225                                kind: e.into(),
226                            })?.ok_or_else(|| ReadError {
227                                context: ErrorContext::DefaultImpl,
228                                kind: ReadErrorKind::EndOfStream,
229                            })?;
230                            if let tungstenite021::Message::Binary(data) = packet {
231                                buf.extend_from_slice(&data);
232                            } else {
233                                return Err(ReadError {
234                                    context: ErrorContext::DefaultImpl,
235                                    kind: ReadErrorKind::MessageKind021(packet),
236                                })
237                            }
238                        }
239                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
240                            context: ErrorContext::WebSocket {
241                                source: Box::new(context),
242                            },
243                            kind,
244                        })
245                    }
246                    _ => Err(ReadError {
247                        context: ErrorContext::DefaultImpl,
248                        kind: ReadErrorKind::WebSocketTextMessage024(data),
249                    }),
250                },
251                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
252                    context: ErrorContext::WebSocket {
253                        source: Box::new(context),
254                    },
255                    kind,
256                }),
257                _ => Err(ReadError {
258                    context: ErrorContext::DefaultImpl,
259                    kind: ReadErrorKind::MessageKind021(packet),
260                }),
261            }
262        })
263    }
264
265    #[cfg(feature = "tokio-tungstenite024")]
266    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
267    /// Reads a value of this type from a `tokio-tungstenite` websocket.
268    ///
269    /// # Cancellation safety
270    ///
271    /// The default implementation of this method is not cancellation safe.
272    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
273        Box::pin(async move {
274            let packet = stream.try_next().await.map_err(|e| ReadError {
275                context: ErrorContext::DefaultImpl,
276                kind: e.into(),
277            })?.ok_or_else(|| ReadError {
278                context: ErrorContext::DefaultImpl,
279                kind: ReadErrorKind::EndOfStream,
280            })?;
281            match packet {
282                tungstenite024::Message::Text(data) => match data.chars().next() {
283                    Some('m') => {
284                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
285                            context: ErrorContext::DefaultImpl,
286                            kind: e.into(),
287                        })?;
288                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
289                            context: ErrorContext::DefaultImpl,
290                            kind: e.into(),
291                        })?;
292                        while buf.len() < len {
293                            let packet = stream.try_next().await.map_err(|e| ReadError {
294                                context: ErrorContext::DefaultImpl,
295                                kind: e.into(),
296                            })?.ok_or_else(|| ReadError {
297                                context: ErrorContext::DefaultImpl,
298                                kind: ReadErrorKind::EndOfStream,
299                            })?;
300                            if let tungstenite024::Message::Binary(data) = packet {
301                                buf.extend_from_slice(&data);
302                            } else {
303                                return Err(ReadError {
304                                    context: ErrorContext::DefaultImpl,
305                                    kind: ReadErrorKind::MessageKind024(packet),
306                                })
307                            }
308                        }
309                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
310                            context: ErrorContext::WebSocket {
311                                source: Box::new(context),
312                            },
313                            kind,
314                        })
315                    }
316                    _ => Err(ReadError {
317                        context: ErrorContext::DefaultImpl,
318                        kind: ReadErrorKind::WebSocketTextMessage024(data),
319                    }),
320                },
321                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
322                    context: ErrorContext::WebSocket {
323                        source: Box::new(context),
324                    },
325                    kind,
326                }),
327                _ => Err(ReadError {
328                    context: ErrorContext::DefaultImpl,
329                    kind: ReadErrorKind::MessageKind024(packet),
330                }),
331            }
332        })
333    }
334
335    #[cfg(feature = "tokio-tungstenite028")]
336    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
337    /// Reads a value of this type from a `tokio-tungstenite` websocket.
338    ///
339    /// # Cancellation safety
340    ///
341    /// The default implementation of this method is not cancellation safe.
342    fn read_ws028<'a, R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
343        Box::pin(async move {
344            let packet = stream.try_next().await.map_err(|e| ReadError {
345                context: ErrorContext::DefaultImpl,
346                kind: e.into(),
347            })?.ok_or_else(|| ReadError {
348                context: ErrorContext::DefaultImpl,
349                kind: ReadErrorKind::EndOfStream,
350            })?;
351            match packet {
352                tungstenite028::Message::Text(data) => match data.chars().next() {
353                    Some('m') => {
354                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
355                            context: ErrorContext::DefaultImpl,
356                            kind: e.into(),
357                        })?;
358                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
359                            context: ErrorContext::DefaultImpl,
360                            kind: e.into(),
361                        })?;
362                        while buf.len() < len {
363                            let packet = stream.try_next().await.map_err(|e| ReadError {
364                                context: ErrorContext::DefaultImpl,
365                                kind: e.into(),
366                            })?.ok_or_else(|| ReadError {
367                                context: ErrorContext::DefaultImpl,
368                                kind: ReadErrorKind::EndOfStream,
369                            })?;
370                            if let tungstenite028::Message::Binary(data) = packet {
371                                buf.extend_from_slice(&data);
372                            } else {
373                                return Err(ReadError {
374                                    context: ErrorContext::DefaultImpl,
375                                    kind: ReadErrorKind::MessageKind028(packet),
376                                })
377                            }
378                        }
379                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
380                            context: ErrorContext::WebSocket {
381                                source: Box::new(context),
382                            },
383                            kind,
384                        })
385                    }
386                    _ => Err(ReadError {
387                        context: ErrorContext::DefaultImpl,
388                        kind: ReadErrorKind::WebSocketTextMessage028(data),
389                    }),
390                },
391                tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
392                    context: ErrorContext::WebSocket {
393                        source: Box::new(context),
394                    },
395                    kind,
396                }),
397                _ => Err(ReadError {
398                    context: ErrorContext::DefaultImpl,
399                    kind: ReadErrorKind::MessageKind028(packet),
400                }),
401            }
402        })
403    }
404
405    #[cfg(feature = "tokio-tungstenite021")]
406    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
407    /// Writes a value of this type to a `tokio-tungstenite` websocket.
408    ///
409    /// # Cancellation safety
410    ///
411    /// The default implementation of this method is not cancellation safe.
412    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
413    where Self: Sync {
414        Box::pin(async move {
415            let mut buf = Vec::default();
416            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
417                context: ErrorContext::WebSocket {
418                    source: Box::new(context),
419                },
420                kind,
421            })?;
422            if buf.len() <= WS_MAX_MESSAGE_SIZE {
423                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
424                    context: ErrorContext::DefaultImpl,
425                    kind: e.into(),
426                })?;
427            } else {
428                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
429                    context: ErrorContext::DefaultImpl,
430                    kind: e.into(),
431                })?;
432                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
433                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
434                        context: ErrorContext::DefaultImpl,
435                        kind: e.into(),
436                    })?;
437                }
438            }
439            Ok(())
440        })
441    }
442
443    #[cfg(feature = "tokio-tungstenite024")]
444    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
445    /// Writes a value of this type to a `tokio-tungstenite` websocket.
446    ///
447    /// # Cancellation safety
448    ///
449    /// The default implementation of this method is not cancellation safe.
450    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
451    where Self: Sync {
452        Box::pin(async move {
453            let mut buf = Vec::default();
454            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
455                context: ErrorContext::WebSocket {
456                    source: Box::new(context),
457                },
458                kind,
459            })?;
460            if buf.len() <= WS_MAX_MESSAGE_SIZE {
461                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
462                    context: ErrorContext::DefaultImpl,
463                    kind: e.into(),
464                })?;
465            } else {
466                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
467                    context: ErrorContext::DefaultImpl,
468                    kind: e.into(),
469                })?;
470                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
471                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
472                        context: ErrorContext::DefaultImpl,
473                        kind: e.into(),
474                    })?;
475                }
476            }
477            Ok(())
478        })
479    }
480
481    #[cfg(feature = "tokio-tungstenite028")]
482    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
483    /// Writes a value of this type to a `tokio-tungstenite` websocket.
484    ///
485    /// # Cancellation safety
486    ///
487    /// The default implementation of this method is not cancellation safe.
488    fn write_ws028<'a, W: Sink<tungstenite028::Message, Error = tungstenite028::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
489    where Self: Sync {
490        Box::pin(async move {
491            let mut buf = Vec::default();
492            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
493                context: ErrorContext::WebSocket {
494                    source: Box::new(context),
495                },
496                kind,
497            })?;
498            if buf.len() <= WS_MAX_MESSAGE_SIZE {
499                sink.send(tungstenite028::Message::binary(buf)).await.map_err(|e| WriteError {
500                    context: ErrorContext::DefaultImpl,
501                    kind: e.into(),
502                })?;
503            } else {
504                sink.send(tungstenite028::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
505                    context: ErrorContext::DefaultImpl,
506                    kind: e.into(),
507                })?;
508                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
509                    sink.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
510                        context: ErrorContext::DefaultImpl,
511                        kind: e.into(),
512                    })?;
513                }
514            }
515            Ok(())
516        })
517    }
518
519    #[cfg(feature = "tokio-tungstenite021")]
520    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
521    /// Reads a value of this type from a [`tungstenite021`] websocket.
522    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
523        let packet = websocket.read().map_err(|e| ReadError {
524            context: ErrorContext::DefaultImpl,
525            kind: e.into(),
526        })?;
527        match packet {
528            tungstenite021::Message::Text(data) => match data.chars().next() {
529                Some('m') => {
530                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
531                        context: ErrorContext::DefaultImpl,
532                        kind: e.into(),
533                    })?;
534                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
535                        context: ErrorContext::DefaultImpl,
536                        kind: e.into(),
537                    })?;
538                    while buf.len() < len {
539                        let packet = websocket.read().map_err(|e| ReadError {
540                            context: ErrorContext::DefaultImpl,
541                            kind: e.into(),
542                        })?;
543                        if let tungstenite021::Message::Binary(data) = packet {
544                            buf.extend_from_slice(&data);
545                        } else {
546                            return Err(ReadError {
547                                context: ErrorContext::DefaultImpl,
548                                kind: ReadErrorKind::MessageKind021(packet),
549                            })
550                        }
551                    }
552                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
553                        context: ErrorContext::WebSocket {
554                            source: Box::new(context),
555                        },
556                        kind,
557                    })
558                }
559                _ => return Err(ReadError {
560                    context: ErrorContext::DefaultImpl,
561                    kind: ReadErrorKind::WebSocketTextMessage024(data),
562                }),
563            },
564            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
565                context: ErrorContext::WebSocket {
566                    source: Box::new(context),
567                },
568                kind,
569            }),
570            _ => Err(ReadError {
571                context: ErrorContext::DefaultImpl,
572                kind: ReadErrorKind::MessageKind021(packet),
573            }),
574        }
575    }
576
577    #[cfg(feature = "tokio-tungstenite024")]
578    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
579    /// Reads a value of this type from a [`tungstenite024`] websocket.
580    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
581        let packet = websocket.read().map_err(|e| ReadError {
582            context: ErrorContext::DefaultImpl,
583            kind: e.into(),
584        })?;
585        match packet {
586            tungstenite024::Message::Text(data) => match data.chars().next() {
587                Some('m') => {
588                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
589                        context: ErrorContext::DefaultImpl,
590                        kind: e.into(),
591                    })?;
592                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
593                        context: ErrorContext::DefaultImpl,
594                        kind: e.into(),
595                    })?;
596                    while buf.len() < len {
597                        let packet = websocket.read().map_err(|e| ReadError {
598                            context: ErrorContext::DefaultImpl,
599                            kind: e.into(),
600                        })?;
601                        if let tungstenite024::Message::Binary(data) = packet {
602                            buf.extend_from_slice(&data);
603                        } else {
604                            return Err(ReadError {
605                                context: ErrorContext::DefaultImpl,
606                                kind: ReadErrorKind::MessageKind024(packet),
607                            })
608                        }
609                    }
610                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
611                        context: ErrorContext::WebSocket {
612                            source: Box::new(context),
613                        },
614                        kind,
615                    })
616                }
617                _ => return Err(ReadError {
618                    context: ErrorContext::DefaultImpl,
619                    kind: ReadErrorKind::WebSocketTextMessage024(data),
620                }),
621            },
622            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
623                context: ErrorContext::WebSocket {
624                    source: Box::new(context),
625                },
626                kind,
627            }),
628            _ => Err(ReadError {
629                context: ErrorContext::DefaultImpl,
630                kind: ReadErrorKind::MessageKind024(packet),
631            }),
632        }
633    }
634
635    #[cfg(feature = "tokio-tungstenite028")]
636    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
637    /// Reads a value of this type from a [`tungstenite028`] websocket.
638    fn read_ws_sync028(websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
639        let packet = websocket.read().map_err(|e| ReadError {
640            context: ErrorContext::DefaultImpl,
641            kind: e.into(),
642        })?;
643        match packet {
644            tungstenite028::Message::Text(data) => match data.chars().next() {
645                Some('m') => {
646                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
647                        context: ErrorContext::DefaultImpl,
648                        kind: e.into(),
649                    })?;
650                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
651                        context: ErrorContext::DefaultImpl,
652                        kind: e.into(),
653                    })?;
654                    while buf.len() < len {
655                        let packet = websocket.read().map_err(|e| ReadError {
656                            context: ErrorContext::DefaultImpl,
657                            kind: e.into(),
658                        })?;
659                        if let tungstenite028::Message::Binary(data) = packet {
660                            buf.extend_from_slice(&data);
661                        } else {
662                            return Err(ReadError {
663                                context: ErrorContext::DefaultImpl,
664                                kind: ReadErrorKind::MessageKind028(packet),
665                            })
666                        }
667                    }
668                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
669                        context: ErrorContext::WebSocket {
670                            source: Box::new(context),
671                        },
672                        kind,
673                    })
674                }
675                _ => return Err(ReadError {
676                    context: ErrorContext::DefaultImpl,
677                    kind: ReadErrorKind::WebSocketTextMessage028(data),
678                }),
679            },
680            tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
681                context: ErrorContext::WebSocket {
682                    source: Box::new(context),
683                },
684                kind,
685            }),
686            _ => Err(ReadError {
687                context: ErrorContext::DefaultImpl,
688                kind: ReadErrorKind::MessageKind028(packet),
689            }),
690        }
691    }
692
693    #[cfg(feature = "tokio-tungstenite021")]
694    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
695    /// Writes a value of this type to a [`tungstenite021`] websocket.
696    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
697        let mut buf = Vec::default();
698        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
699            context: ErrorContext::WebSocket {
700                source: Box::new(context),
701            },
702            kind,
703        })?;
704        if buf.len() <= WS_MAX_MESSAGE_SIZE {
705            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
706                context: ErrorContext::DefaultImpl,
707                kind: e.into(),
708            })?;
709        } else {
710            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
711                context: ErrorContext::DefaultImpl,
712                kind: e.into(),
713            })?;
714            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
715                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
716                    context: ErrorContext::DefaultImpl,
717                    kind: e.into(),
718                })?;
719            }
720        }
721        websocket.flush().map_err(|e| WriteError {
722            context: ErrorContext::DefaultImpl,
723            kind: e.into(),
724        })?;
725        Ok(())
726    }
727
728    #[cfg(feature = "tokio-tungstenite024")]
729    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
730    /// Writes a value of this type to a [`tungstenite024`] websocket.
731    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
732        let mut buf = Vec::default();
733        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
734            context: ErrorContext::WebSocket {
735                source: Box::new(context),
736            },
737            kind,
738        })?;
739        if buf.len() <= WS_MAX_MESSAGE_SIZE {
740            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
741                context: ErrorContext::DefaultImpl,
742                kind: e.into(),
743            })?;
744        } else {
745            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
746                context: ErrorContext::DefaultImpl,
747                kind: e.into(),
748            })?;
749            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
750                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
751                    context: ErrorContext::DefaultImpl,
752                    kind: e.into(),
753                })?;
754            }
755        }
756        websocket.flush().map_err(|e| WriteError {
757            context: ErrorContext::DefaultImpl,
758            kind: e.into(),
759        })?;
760        Ok(())
761    }
762
763    #[cfg(feature = "tokio-tungstenite028")]
764    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
765    /// Writes a value of this type to a [`tungstenite028`] websocket.
766    fn write_ws_sync028(&self, websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
767        let mut buf = Vec::default();
768        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
769            context: ErrorContext::WebSocket {
770                source: Box::new(context),
771            },
772            kind,
773        })?;
774        if buf.len() <= WS_MAX_MESSAGE_SIZE {
775            websocket.send(tungstenite028::Message::binary(buf)).map_err(|e| WriteError {
776                context: ErrorContext::DefaultImpl,
777                kind: e.into(),
778            })?;
779        } else {
780            websocket.send(tungstenite028::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
781                context: ErrorContext::DefaultImpl,
782                kind: e.into(),
783            })?;
784            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
785                websocket.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
786                    context: ErrorContext::DefaultImpl,
787                    kind: e.into(),
788                })?;
789            }
790        }
791        websocket.flush().map_err(|e| WriteError {
792            context: ErrorContext::DefaultImpl,
793            kind: e.into(),
794        })?;
795        Ok(())
796    }
797
798    #[cfg(feature = "tokio-tungstenite021")]
799    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
800    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
801    ///
802    /// This can be used to get around drop glue issues that might arise with `read_ws`.
803    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
804        Box::pin(async move {
805            let value = Self::read_ws021(&mut stream).await?;
806            Ok((stream, value))
807        })
808    }
809
810    #[cfg(feature = "tokio-tungstenite024")]
811    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
812    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
813    ///
814    /// This can be used to get around drop glue issues that might arise with `read_ws`.
815    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
816        Box::pin(async move {
817            let value = Self::read_ws024(&mut stream).await?;
818            Ok((stream, value))
819        })
820    }
821
822    #[cfg(feature = "tokio-tungstenite028")]
823    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
824    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
825    ///
826    /// This can be used to get around drop glue issues that might arise with `read_ws`.
827    fn read_ws_owned028<R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
828        Box::pin(async move {
829            let value = Self::read_ws028(&mut stream).await?;
830            Ok((stream, value))
831        })
832    }
833}
834
835/// This trait allows restricting the acceptable length of collection types.
836///
837/// By default, types from this crate implementing this trait represent their length as a `u64`. If the maximum length is limited (e.g. using the `#[async_proto(max_len = ...)]` attribute when deriving [`Protocol`]), the length may be represented using a smaller integer type.
838pub trait LengthPrefixed: Protocol {
839    /// Reads a value of this type from an async stream, limiting the length to the given value.
840    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
841    /// Writes a value of this type to an async sink, limiting the length to the given value.
842    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
843    /// Reads a value of this type from a sync stream, limiting the length to the given value.
844    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
845    /// Writes a value of this type to a sync sink, limiting the length to the given value.
846    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
847}
848
849/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
850///
851/// Useful for WebSocket connections where the message type per direction is always the same.
852#[cfg(feature = "tokio-tungstenite021")]
853#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
854pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
855    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
856    let (sink, stream) = sock.split();
857    Ok((
858        sink.sink_map_err(|e| WriteError {
859            context: ErrorContext::WebSocketSink,
860            kind: e.into(),
861        }).with_flat_map::<W, _, _>(|msg| {
862            let mut buf = Vec::default();
863            match msg.write_sync(&mut buf) {
864                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
865                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
866                } else {
867                    Either::Right(stream::iter(
868                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
869                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
870                        .collect::<Vec<_>>()
871                    ))
872                }.map(Ok)),
873                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
874                    context: ErrorContext::WebSocket {
875                        source: Box::new(context),
876                    },
877                    kind,
878                }))),
879            }
880        }),
881        stream.scan(None, |state, res| {
882            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
883                let packet = res.map_err(|e| ReadError {
884                    context: ErrorContext::WebSocketStream,
885                    kind: e.into(),
886                })?;
887                Ok(if let Some((len, buf)) = state {
888                    if let tungstenite021::Message::Binary(data) = packet {
889                        buf.extend_from_slice(&data);
890                    } else {
891                        return Err(ReadError {
892                            context: ErrorContext::DefaultImpl,
893                            kind: ReadErrorKind::MessageKind021(packet),
894                        })
895                    }
896                    if buf.len() >= *len {
897                        let buf = mem::take(buf);
898                        *state = None;
899                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
900                            context: ErrorContext::WebSocket {
901                                source: Box::new(context),
902                            },
903                            kind,
904                        })?)))
905                    } else {
906                        Either::Left(stream::empty())
907                    }
908                } else {
909                    match packet {
910                        tungstenite021::Message::Text(data) => match data.chars().next() {
911                            Some('m') => {
912                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
913                                    context: ErrorContext::DefaultImpl,
914                                    kind: e.into(),
915                                })?;
916                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
917                                    context: ErrorContext::DefaultImpl,
918                                    kind: e.into(),
919                                })?;
920                                *state = Some((len, buf));
921                                Either::Left(stream::empty())
922                            }
923                            _ => return Err(ReadError {
924                                context: ErrorContext::DefaultImpl,
925                                kind: ReadErrorKind::WebSocketTextMessage024(data),
926                            }),
927                        },
928                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
929                            context: ErrorContext::WebSocket {
930                                source: Box::new(context),
931                            },
932                            kind,
933                        })?))),
934                        _ => return Err(ReadError {
935                            context: ErrorContext::DefaultImpl,
936                            kind: ReadErrorKind::MessageKind021(packet),
937                        }),
938                    }
939                })
940            }
941
942            future::ready(Some(scanner(state, res)))
943        }).try_flatten(),
944    ))
945}
946
947/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
948///
949/// Useful for WebSocket connections where the message type per direction is always the same.
950#[cfg(feature = "tokio-tungstenite024")]
951#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
952pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
953    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
954    let (sink, stream) = sock.split();
955    Ok((
956        sink.sink_map_err(|e| WriteError {
957            context: ErrorContext::WebSocketSink,
958            kind: e.into(),
959        }).with_flat_map::<W, _, _>(|msg| {
960            let mut buf = Vec::default();
961            match msg.write_sync(&mut buf) {
962                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
963                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
964                } else {
965                    Either::Right(stream::iter(
966                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
967                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
968                        .collect::<Vec<_>>()
969                    ))
970                }.map(Ok)),
971                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
972                    context: ErrorContext::WebSocket {
973                        source: Box::new(context),
974                    },
975                    kind,
976                }))),
977            }
978        }),
979        stream.scan(None, |state, res| {
980            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
981                let packet = res.map_err(|e| ReadError {
982                    context: ErrorContext::WebSocketStream,
983                    kind: e.into(),
984                })?;
985                Ok(if let Some((len, buf)) = state {
986                    if let tungstenite024::Message::Binary(data) = packet {
987                        buf.extend_from_slice(&data);
988                    } else {
989                        return Err(ReadError {
990                            context: ErrorContext::DefaultImpl,
991                            kind: ReadErrorKind::MessageKind024(packet),
992                        })
993                    }
994                    if buf.len() >= *len {
995                        let buf = mem::take(buf);
996                        *state = None;
997                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
998                            context: ErrorContext::WebSocket {
999                                source: Box::new(context),
1000                            },
1001                            kind,
1002                        })?)))
1003                    } else {
1004                        Either::Left(stream::empty())
1005                    }
1006                } else {
1007                    match packet {
1008                        tungstenite024::Message::Text(data) => match data.chars().next() {
1009                            Some('m') => {
1010                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1011                                    context: ErrorContext::DefaultImpl,
1012                                    kind: e.into(),
1013                                })?;
1014                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1015                                    context: ErrorContext::DefaultImpl,
1016                                    kind: e.into(),
1017                                })?;
1018                                *state = Some((len, buf));
1019                                Either::Left(stream::empty())
1020                            }
1021                            _ => return Err(ReadError {
1022                                context: ErrorContext::DefaultImpl,
1023                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1024                            }),
1025                        },
1026                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1027                            context: ErrorContext::WebSocket {
1028                                source: Box::new(context),
1029                            },
1030                            kind,
1031                        })?))),
1032                        _ => return Err(ReadError {
1033                            context: ErrorContext::DefaultImpl,
1034                            kind: ReadErrorKind::MessageKind024(packet),
1035                        }),
1036                    }
1037                })
1038            }
1039
1040            future::ready(Some(scanner(state, res)))
1041        }).try_flatten(),
1042    ))
1043}
1044
1045/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1046///
1047/// Useful for WebSocket connections where the message type per direction is always the same.
1048#[cfg(feature = "tokio-tungstenite028")]
1049#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
1050pub async fn websocket028<R: Protocol, W: Protocol>(request: impl tungstenite028::client::IntoClientRequest + Unpin) -> tungstenite028::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1051    let (sock, _) = tokio_tungstenite028::connect_async(request).await?;
1052    let (sink, stream) = sock.split();
1053    Ok((
1054        sink.sink_map_err(|e| WriteError {
1055            context: ErrorContext::WebSocketSink,
1056            kind: e.into(),
1057        }).with_flat_map::<W, _, _>(|msg| {
1058            let mut buf = Vec::default();
1059            match msg.write_sync(&mut buf) {
1060                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1061                    Either::Left(stream::once(future::ready(tungstenite028::Message::binary(buf))))
1062                } else {
1063                    Either::Right(stream::iter(
1064                        iter::once(tungstenite028::Message::text(format!("m{}", buf.len())))
1065                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))))
1066                        .collect::<Vec<_>>()
1067                    ))
1068                }.map(Ok)),
1069                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1070                    context: ErrorContext::WebSocket {
1071                        source: Box::new(context),
1072                    },
1073                    kind,
1074                }))),
1075            }
1076        }),
1077        stream.scan(None, |state, res| {
1078            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite028::Result<tungstenite028::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1079                let packet = res.map_err(|e| ReadError {
1080                    context: ErrorContext::WebSocketStream,
1081                    kind: e.into(),
1082                })?;
1083                Ok(if let Some((len, buf)) = state {
1084                    if let tungstenite028::Message::Binary(data) = packet {
1085                        buf.extend_from_slice(&data);
1086                    } else {
1087                        return Err(ReadError {
1088                            context: ErrorContext::DefaultImpl,
1089                            kind: ReadErrorKind::MessageKind028(packet),
1090                        })
1091                    }
1092                    if buf.len() >= *len {
1093                        let buf = mem::take(buf);
1094                        *state = None;
1095                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1096                            context: ErrorContext::WebSocket {
1097                                source: Box::new(context),
1098                            },
1099                            kind,
1100                        })?)))
1101                    } else {
1102                        Either::Left(stream::empty())
1103                    }
1104                } else {
1105                    match packet {
1106                        tungstenite028::Message::Text(data) => match data.chars().next() {
1107                            Some('m') => {
1108                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1109                                    context: ErrorContext::DefaultImpl,
1110                                    kind: e.into(),
1111                                })?;
1112                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1113                                    context: ErrorContext::DefaultImpl,
1114                                    kind: e.into(),
1115                                })?;
1116                                *state = Some((len, buf));
1117                                Either::Left(stream::empty())
1118                            }
1119                            _ => return Err(ReadError {
1120                                context: ErrorContext::DefaultImpl,
1121                                kind: ReadErrorKind::WebSocketTextMessage028(data),
1122                            }),
1123                        },
1124                        tungstenite028::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1125                            context: ErrorContext::WebSocket {
1126                                source: Box::new(context),
1127                            },
1128                            kind,
1129                        })?))),
1130                        _ => return Err(ReadError {
1131                            context: ErrorContext::DefaultImpl,
1132                            kind: ReadErrorKind::MessageKind028(packet),
1133                        }),
1134                    }
1135                })
1136            }
1137
1138            future::ready(Some(scanner(state, res)))
1139        }).try_flatten(),
1140    ))
1141}