async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bitvec`](https://docs.rs/bitvec): [`BitVec<u8, Lsb0>`](https://docs.rs/bitvec/latest/bitvec/vec/struct.BitVec.html)
17//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
18//! * [`bytesize`](https://docs.rs/bytesize): [`ByteSize`](https://docs.rs/bytesize/latest/bytesize/struct.ByteSize.html)
19//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
20//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
21//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
22//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
23//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
24//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
25//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
26//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
27//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
28//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
29//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
30//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
31//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
32//!
33//! Additionally, this crate offers optional dependencies on the `tokio-tungstenite` crate to add convenience methods for reading/writing [`Protocol`] types from/to its websockets. The following versions are supported:
34//!
35//! * The latest release (currently [`tokio-tungstenite` 0.27](https://docs.rs/tokio-tungstenite/0.27), feature flag `tokio-tungstenite027`)
36//! * The version used by [the `master` branch of `rocket_ws` on GitHub](https://github.com/rwf2/Rocket/tree/master/contrib/ws) (currently [`tokio-tungstenite` 0.24](https://docs.rs/tokio-tungstenite/0.24), feature flag `tokio-tungstenite024`)
37//! * The version used by [the latest `rocket_ws` crates.io release](https://docs.rs/rocket_ws) (currently [`tokio-tungstenite` 0.21](https://docs.rs/tokio-tungstenite/0.21), feature flag `tokio-tungstenite021`)
38
39use {
40    std::{
41        future::Future,
42        io::{
43            self,
44            prelude::*,
45        },
46        pin::Pin,
47    },
48    tokio::io::{
49        AsyncRead,
50        AsyncWrite,
51    },
52};
53#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
54    std::{
55        iter,
56        mem,
57    },
58    fallible_collections::FallibleVec,
59    futures::{
60        Sink,
61        SinkExt as _,
62        future::{
63            self,
64            Either,
65        },
66        stream::{
67            self,
68            Stream,
69            StreamExt as _,
70            TryStreamExt as _,
71        },
72    },
73};
74#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
75#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
76#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
77pub use {
78    async_proto_derive::{
79        Protocol,
80        bitflags,
81    },
82    crate::error::*,
83};
84#[doc(hidden)] pub use tokio; // used in proc macro
85
86mod error;
87mod impls;
88
89/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
90#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
91
92/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
93pub trait Protocol: Sized {
94    /// Reads a value of this type from an async stream.
95    ///
96    /// # Cancellation safety
97    ///
98    /// Implementations of this method are generally not cancellation safe.
99    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
100    /// Writes a value of this type to an async sink.
101    ///
102    /// # Cancellation safety
103    ///
104    /// Implementations of this method are generally not cancellation safe.
105    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
106    /// Reads a value of this type from a sync stream.
107    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
108    /// Writes a value of this type to a sync sink.
109    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
110
111    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
112    ///
113    /// This can be used to get around drop glue issues that might arise with `read`.
114    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
115        Box::pin(async move {
116            let value = Self::read(&mut stream).await?;
117            Ok((stream, value))
118        })
119    }
120
121    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
122    ///
123    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
124    ///
125    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
126    ///
127    /// # Example
128    ///
129    /// ```
130    /// use {
131    ///     std::{
132    ///         io,
133    ///         net::TcpStream,
134    ///     },
135    ///     async_proto::Protocol,
136    /// };
137    ///
138    /// struct Client {
139    ///     tcp_stream: TcpStream,
140    ///     buf: Vec<u8>,
141    /// }
142    ///
143    /// impl Client {
144    ///     fn new(tcp_stream: TcpStream) -> Self {
145    ///         Self {
146    ///             tcp_stream,
147    ///             buf: Vec::default(),
148    ///         }
149    ///     }
150    ///
151    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
152    ///         self.tcp_stream.set_nonblocking(true)?;
153    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
154    ///     }
155    ///
156    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
157    ///         self.tcp_stream.set_nonblocking(false)?;
158    ///         msg.write_sync(&mut self.tcp_stream)?;
159    ///         Ok(())
160    ///     }
161    /// }
162    /// ```
163    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
164        let mut temp_buf = vec![0; 8];
165        loop {
166            let mut slice = &mut &**buf;
167            match Self::read_sync(&mut slice) {
168                Ok(value) => {
169                    let value_len = slice.len();
170                    buf.drain(..buf.len() - value_len);
171                    return Ok(Some(value))
172                }
173                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
174                Err(e) => return Err(e),
175            }
176            match stream.read(&mut temp_buf) {
177                Ok(0) => return Err(ReadError {
178                    context: ErrorContext::DefaultImpl,
179                    kind: ReadErrorKind::EndOfStream,
180                }),
181                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
182                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
183                Err(e) => return Err(ReadError {
184                    context: ErrorContext::DefaultImpl,
185                    kind: e.into(),
186                }),
187            }
188        }
189    }
190
191    #[cfg(feature = "tokio-tungstenite021")]
192    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
193    /// Reads a value of this type from a `tokio-tungstenite` websocket.
194    ///
195    /// # Cancellation safety
196    ///
197    /// The default implementation of this method is not cancellation safe.
198    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
199        Box::pin(async move {
200            let packet = stream.try_next().await.map_err(|e| ReadError {
201                context: ErrorContext::DefaultImpl,
202                kind: e.into(),
203            })?.ok_or_else(|| ReadError {
204                context: ErrorContext::DefaultImpl,
205                kind: ReadErrorKind::EndOfStream,
206            })?;
207            match packet {
208                tungstenite021::Message::Text(data) => match data.chars().next() {
209                    Some('m') => {
210                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
211                            context: ErrorContext::DefaultImpl,
212                            kind: e.into(),
213                        })?;
214                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
215                            context: ErrorContext::DefaultImpl,
216                            kind: e.into(),
217                        })?;
218                        while buf.len() < len {
219                            let packet = stream.try_next().await.map_err(|e| ReadError {
220                                context: ErrorContext::DefaultImpl,
221                                kind: e.into(),
222                            })?.ok_or_else(|| ReadError {
223                                context: ErrorContext::DefaultImpl,
224                                kind: ReadErrorKind::EndOfStream,
225                            })?;
226                            if let tungstenite021::Message::Binary(data) = packet {
227                                buf.extend_from_slice(&data);
228                            } else {
229                                return Err(ReadError {
230                                    context: ErrorContext::DefaultImpl,
231                                    kind: ReadErrorKind::MessageKind021(packet),
232                                })
233                            }
234                        }
235                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
236                            context: ErrorContext::WebSocket {
237                                source: Box::new(context),
238                            },
239                            kind,
240                        })
241                    }
242                    _ => Err(ReadError {
243                        context: ErrorContext::DefaultImpl,
244                        kind: ReadErrorKind::WebSocketTextMessage024(data),
245                    }),
246                },
247                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
248                    context: ErrorContext::WebSocket {
249                        source: Box::new(context),
250                    },
251                    kind,
252                }),
253                _ => Err(ReadError {
254                    context: ErrorContext::DefaultImpl,
255                    kind: ReadErrorKind::MessageKind021(packet),
256                }),
257            }
258        })
259    }
260
261    #[cfg(feature = "tokio-tungstenite024")]
262    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
263    /// Reads a value of this type from a `tokio-tungstenite` websocket.
264    ///
265    /// # Cancellation safety
266    ///
267    /// The default implementation of this method is not cancellation safe.
268    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
269        Box::pin(async move {
270            let packet = stream.try_next().await.map_err(|e| ReadError {
271                context: ErrorContext::DefaultImpl,
272                kind: e.into(),
273            })?.ok_or_else(|| ReadError {
274                context: ErrorContext::DefaultImpl,
275                kind: ReadErrorKind::EndOfStream,
276            })?;
277            match packet {
278                tungstenite024::Message::Text(data) => match data.chars().next() {
279                    Some('m') => {
280                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
281                            context: ErrorContext::DefaultImpl,
282                            kind: e.into(),
283                        })?;
284                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
285                            context: ErrorContext::DefaultImpl,
286                            kind: e.into(),
287                        })?;
288                        while buf.len() < len {
289                            let packet = stream.try_next().await.map_err(|e| ReadError {
290                                context: ErrorContext::DefaultImpl,
291                                kind: e.into(),
292                            })?.ok_or_else(|| ReadError {
293                                context: ErrorContext::DefaultImpl,
294                                kind: ReadErrorKind::EndOfStream,
295                            })?;
296                            if let tungstenite024::Message::Binary(data) = packet {
297                                buf.extend_from_slice(&data);
298                            } else {
299                                return Err(ReadError {
300                                    context: ErrorContext::DefaultImpl,
301                                    kind: ReadErrorKind::MessageKind024(packet),
302                                })
303                            }
304                        }
305                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
306                            context: ErrorContext::WebSocket {
307                                source: Box::new(context),
308                            },
309                            kind,
310                        })
311                    }
312                    _ => Err(ReadError {
313                        context: ErrorContext::DefaultImpl,
314                        kind: ReadErrorKind::WebSocketTextMessage024(data),
315                    }),
316                },
317                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
318                    context: ErrorContext::WebSocket {
319                        source: Box::new(context),
320                    },
321                    kind,
322                }),
323                _ => Err(ReadError {
324                    context: ErrorContext::DefaultImpl,
325                    kind: ReadErrorKind::MessageKind024(packet),
326                }),
327            }
328        })
329    }
330
331    #[cfg(feature = "tokio-tungstenite027")]
332    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
333    /// Reads a value of this type from a `tokio-tungstenite` websocket.
334    ///
335    /// # Cancellation safety
336    ///
337    /// The default implementation of this method is not cancellation safe.
338    fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
339        Box::pin(async move {
340            let packet = stream.try_next().await.map_err(|e| ReadError {
341                context: ErrorContext::DefaultImpl,
342                kind: e.into(),
343            })?.ok_or_else(|| ReadError {
344                context: ErrorContext::DefaultImpl,
345                kind: ReadErrorKind::EndOfStream,
346            })?;
347            match packet {
348                tungstenite027::Message::Text(data) => match data.chars().next() {
349                    Some('m') => {
350                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
351                            context: ErrorContext::DefaultImpl,
352                            kind: e.into(),
353                        })?;
354                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
355                            context: ErrorContext::DefaultImpl,
356                            kind: e.into(),
357                        })?;
358                        while buf.len() < len {
359                            let packet = stream.try_next().await.map_err(|e| ReadError {
360                                context: ErrorContext::DefaultImpl,
361                                kind: e.into(),
362                            })?.ok_or_else(|| ReadError {
363                                context: ErrorContext::DefaultImpl,
364                                kind: ReadErrorKind::EndOfStream,
365                            })?;
366                            if let tungstenite027::Message::Binary(data) = packet {
367                                buf.extend_from_slice(&data);
368                            } else {
369                                return Err(ReadError {
370                                    context: ErrorContext::DefaultImpl,
371                                    kind: ReadErrorKind::MessageKind027(packet),
372                                })
373                            }
374                        }
375                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
376                            context: ErrorContext::WebSocket {
377                                source: Box::new(context),
378                            },
379                            kind,
380                        })
381                    }
382                    _ => Err(ReadError {
383                        context: ErrorContext::DefaultImpl,
384                        kind: ReadErrorKind::WebSocketTextMessage027(data),
385                    }),
386                },
387                tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
388                    context: ErrorContext::WebSocket {
389                        source: Box::new(context),
390                    },
391                    kind,
392                }),
393                _ => Err(ReadError {
394                    context: ErrorContext::DefaultImpl,
395                    kind: ReadErrorKind::MessageKind027(packet),
396                }),
397            }
398        })
399    }
400
401    #[cfg(feature = "tokio-tungstenite021")]
402    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
403    /// Writes a value of this type to a `tokio-tungstenite` websocket.
404    ///
405    /// # Cancellation safety
406    ///
407    /// The default implementation of this method is not cancellation safe.
408    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
409    where Self: Sync {
410        Box::pin(async move {
411            let mut buf = Vec::default();
412            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
413                context: ErrorContext::WebSocket {
414                    source: Box::new(context),
415                },
416                kind,
417            })?;
418            if buf.len() <= WS_MAX_MESSAGE_SIZE {
419                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
420                    context: ErrorContext::DefaultImpl,
421                    kind: e.into(),
422                })?;
423            } else {
424                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
425                    context: ErrorContext::DefaultImpl,
426                    kind: e.into(),
427                })?;
428                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
429                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
430                        context: ErrorContext::DefaultImpl,
431                        kind: e.into(),
432                    })?;
433                }
434            }
435            Ok(())
436        })
437    }
438
439    #[cfg(feature = "tokio-tungstenite024")]
440    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
441    /// Writes a value of this type to a `tokio-tungstenite` websocket.
442    ///
443    /// # Cancellation safety
444    ///
445    /// The default implementation of this method is not cancellation safe.
446    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
447    where Self: Sync {
448        Box::pin(async move {
449            let mut buf = Vec::default();
450            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
451                context: ErrorContext::WebSocket {
452                    source: Box::new(context),
453                },
454                kind,
455            })?;
456            if buf.len() <= WS_MAX_MESSAGE_SIZE {
457                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
458                    context: ErrorContext::DefaultImpl,
459                    kind: e.into(),
460                })?;
461            } else {
462                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
463                    context: ErrorContext::DefaultImpl,
464                    kind: e.into(),
465                })?;
466                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
467                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
468                        context: ErrorContext::DefaultImpl,
469                        kind: e.into(),
470                    })?;
471                }
472            }
473            Ok(())
474        })
475    }
476
477    #[cfg(feature = "tokio-tungstenite027")]
478    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
479    /// Writes a value of this type to a `tokio-tungstenite` websocket.
480    ///
481    /// # Cancellation safety
482    ///
483    /// The default implementation of this method is not cancellation safe.
484    fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
485    where Self: Sync {
486        Box::pin(async move {
487            let mut buf = Vec::default();
488            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
489                context: ErrorContext::WebSocket {
490                    source: Box::new(context),
491                },
492                kind,
493            })?;
494            if buf.len() <= WS_MAX_MESSAGE_SIZE {
495                sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
496                    context: ErrorContext::DefaultImpl,
497                    kind: e.into(),
498                })?;
499            } else {
500                sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
501                    context: ErrorContext::DefaultImpl,
502                    kind: e.into(),
503                })?;
504                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
505                    sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
506                        context: ErrorContext::DefaultImpl,
507                        kind: e.into(),
508                    })?;
509                }
510            }
511            Ok(())
512        })
513    }
514
515    #[cfg(feature = "tokio-tungstenite021")]
516    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
517    /// Reads a value of this type from a [`tungstenite021`] websocket.
518    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
519        let packet = websocket.read().map_err(|e| ReadError {
520            context: ErrorContext::DefaultImpl,
521            kind: e.into(),
522        })?;
523        match packet {
524            tungstenite021::Message::Text(data) => match data.chars().next() {
525                Some('m') => {
526                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
527                        context: ErrorContext::DefaultImpl,
528                        kind: e.into(),
529                    })?;
530                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
531                        context: ErrorContext::DefaultImpl,
532                        kind: e.into(),
533                    })?;
534                    while buf.len() < len {
535                        let packet = websocket.read().map_err(|e| ReadError {
536                            context: ErrorContext::DefaultImpl,
537                            kind: e.into(),
538                        })?;
539                        if let tungstenite021::Message::Binary(data) = packet {
540                            buf.extend_from_slice(&data);
541                        } else {
542                            return Err(ReadError {
543                                context: ErrorContext::DefaultImpl,
544                                kind: ReadErrorKind::MessageKind021(packet),
545                            })
546                        }
547                    }
548                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
549                        context: ErrorContext::WebSocket {
550                            source: Box::new(context),
551                        },
552                        kind,
553                    })
554                }
555                _ => return Err(ReadError {
556                    context: ErrorContext::DefaultImpl,
557                    kind: ReadErrorKind::WebSocketTextMessage024(data),
558                }),
559            },
560            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
561                context: ErrorContext::WebSocket {
562                    source: Box::new(context),
563                },
564                kind,
565            }),
566            _ => Err(ReadError {
567                context: ErrorContext::DefaultImpl,
568                kind: ReadErrorKind::MessageKind021(packet),
569            }),
570        }
571    }
572
573    #[cfg(feature = "tokio-tungstenite024")]
574    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
575    /// Reads a value of this type from a [`tungstenite024`] websocket.
576    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
577        let packet = websocket.read().map_err(|e| ReadError {
578            context: ErrorContext::DefaultImpl,
579            kind: e.into(),
580        })?;
581        match packet {
582            tungstenite024::Message::Text(data) => match data.chars().next() {
583                Some('m') => {
584                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
585                        context: ErrorContext::DefaultImpl,
586                        kind: e.into(),
587                    })?;
588                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
589                        context: ErrorContext::DefaultImpl,
590                        kind: e.into(),
591                    })?;
592                    while buf.len() < len {
593                        let packet = websocket.read().map_err(|e| ReadError {
594                            context: ErrorContext::DefaultImpl,
595                            kind: e.into(),
596                        })?;
597                        if let tungstenite024::Message::Binary(data) = packet {
598                            buf.extend_from_slice(&data);
599                        } else {
600                            return Err(ReadError {
601                                context: ErrorContext::DefaultImpl,
602                                kind: ReadErrorKind::MessageKind024(packet),
603                            })
604                        }
605                    }
606                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
607                        context: ErrorContext::WebSocket {
608                            source: Box::new(context),
609                        },
610                        kind,
611                    })
612                }
613                _ => return Err(ReadError {
614                    context: ErrorContext::DefaultImpl,
615                    kind: ReadErrorKind::WebSocketTextMessage024(data),
616                }),
617            },
618            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
619                context: ErrorContext::WebSocket {
620                    source: Box::new(context),
621                },
622                kind,
623            }),
624            _ => Err(ReadError {
625                context: ErrorContext::DefaultImpl,
626                kind: ReadErrorKind::MessageKind024(packet),
627            }),
628        }
629    }
630
631    #[cfg(feature = "tokio-tungstenite027")]
632    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
633    /// Reads a value of this type from a [`tungstenite027`] websocket.
634    fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
635        let packet = websocket.read().map_err(|e| ReadError {
636            context: ErrorContext::DefaultImpl,
637            kind: e.into(),
638        })?;
639        match packet {
640            tungstenite027::Message::Text(data) => match data.chars().next() {
641                Some('m') => {
642                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
643                        context: ErrorContext::DefaultImpl,
644                        kind: e.into(),
645                    })?;
646                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
647                        context: ErrorContext::DefaultImpl,
648                        kind: e.into(),
649                    })?;
650                    while buf.len() < len {
651                        let packet = websocket.read().map_err(|e| ReadError {
652                            context: ErrorContext::DefaultImpl,
653                            kind: e.into(),
654                        })?;
655                        if let tungstenite027::Message::Binary(data) = packet {
656                            buf.extend_from_slice(&data);
657                        } else {
658                            return Err(ReadError {
659                                context: ErrorContext::DefaultImpl,
660                                kind: ReadErrorKind::MessageKind027(packet),
661                            })
662                        }
663                    }
664                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
665                        context: ErrorContext::WebSocket {
666                            source: Box::new(context),
667                        },
668                        kind,
669                    })
670                }
671                _ => return Err(ReadError {
672                    context: ErrorContext::DefaultImpl,
673                    kind: ReadErrorKind::WebSocketTextMessage027(data),
674                }),
675            },
676            tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
677                context: ErrorContext::WebSocket {
678                    source: Box::new(context),
679                },
680                kind,
681            }),
682            _ => Err(ReadError {
683                context: ErrorContext::DefaultImpl,
684                kind: ReadErrorKind::MessageKind027(packet),
685            }),
686        }
687    }
688
689    #[cfg(feature = "tokio-tungstenite021")]
690    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
691    /// Writes a value of this type to a [`tungstenite021`] websocket.
692    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
693        let mut buf = Vec::default();
694        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
695            context: ErrorContext::WebSocket {
696                source: Box::new(context),
697            },
698            kind,
699        })?;
700        if buf.len() <= WS_MAX_MESSAGE_SIZE {
701            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
702                context: ErrorContext::DefaultImpl,
703                kind: e.into(),
704            })?;
705        } else {
706            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
707                context: ErrorContext::DefaultImpl,
708                kind: e.into(),
709            })?;
710            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
711                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
712                    context: ErrorContext::DefaultImpl,
713                    kind: e.into(),
714                })?;
715            }
716        }
717        websocket.flush().map_err(|e| WriteError {
718            context: ErrorContext::DefaultImpl,
719            kind: e.into(),
720        })?;
721        Ok(())
722    }
723
724    #[cfg(feature = "tokio-tungstenite024")]
725    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
726    /// Writes a value of this type to a [`tungstenite024`] websocket.
727    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
728        let mut buf = Vec::default();
729        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
730            context: ErrorContext::WebSocket {
731                source: Box::new(context),
732            },
733            kind,
734        })?;
735        if buf.len() <= WS_MAX_MESSAGE_SIZE {
736            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
737                context: ErrorContext::DefaultImpl,
738                kind: e.into(),
739            })?;
740        } else {
741            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
742                context: ErrorContext::DefaultImpl,
743                kind: e.into(),
744            })?;
745            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
746                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
747                    context: ErrorContext::DefaultImpl,
748                    kind: e.into(),
749                })?;
750            }
751        }
752        websocket.flush().map_err(|e| WriteError {
753            context: ErrorContext::DefaultImpl,
754            kind: e.into(),
755        })?;
756        Ok(())
757    }
758
759    #[cfg(feature = "tokio-tungstenite027")]
760    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
761    /// Writes a value of this type to a [`tungstenite027`] websocket.
762    fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
763        let mut buf = Vec::default();
764        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
765            context: ErrorContext::WebSocket {
766                source: Box::new(context),
767            },
768            kind,
769        })?;
770        if buf.len() <= WS_MAX_MESSAGE_SIZE {
771            websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
772                context: ErrorContext::DefaultImpl,
773                kind: e.into(),
774            })?;
775        } else {
776            websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
777                context: ErrorContext::DefaultImpl,
778                kind: e.into(),
779            })?;
780            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
781                websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
782                    context: ErrorContext::DefaultImpl,
783                    kind: e.into(),
784                })?;
785            }
786        }
787        websocket.flush().map_err(|e| WriteError {
788            context: ErrorContext::DefaultImpl,
789            kind: e.into(),
790        })?;
791        Ok(())
792    }
793
794    #[cfg(feature = "tokio-tungstenite021")]
795    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
796    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
797    ///
798    /// This can be used to get around drop glue issues that might arise with `read_ws`.
799    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
800        Box::pin(async move {
801            let value = Self::read_ws021(&mut stream).await?;
802            Ok((stream, value))
803        })
804    }
805
806    #[cfg(feature = "tokio-tungstenite024")]
807    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
808    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
809    ///
810    /// This can be used to get around drop glue issues that might arise with `read_ws`.
811    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
812        Box::pin(async move {
813            let value = Self::read_ws024(&mut stream).await?;
814            Ok((stream, value))
815        })
816    }
817
818    #[cfg(feature = "tokio-tungstenite027")]
819    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
820    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
821    ///
822    /// This can be used to get around drop glue issues that might arise with `read_ws`.
823    fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
824        Box::pin(async move {
825            let value = Self::read_ws027(&mut stream).await?;
826            Ok((stream, value))
827        })
828    }
829}
830
831/// This trait allows restricting the acceptable length of collection types.
832///
833/// By default, types from this crate implementing this trait represent their length as a `u64`. If the maximum length is limited (e.g. using the `#[async_proto(max_len = ...)]` attribute when deriving [`Protocol`]), the length may be represented using a smaller integer type.
834pub trait LengthPrefixed: Protocol {
835    /// Reads a value of this type from an async stream, limiting the length to the given value.
836    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
837    /// Writes a value of this type to an async sink, limiting the length to the given value.
838    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
839    /// Reads a value of this type from a sync stream, limiting the length to the given value.
840    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
841    /// Writes a value of this type to a sync sink, limiting the length to the given value.
842    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
843}
844
845/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
846///
847/// Useful for WebSocket connections where the message type per direction is always the same.
848#[cfg(feature = "tokio-tungstenite021")]
849#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
850pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
851    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
852    let (sink, stream) = sock.split();
853    Ok((
854        sink.sink_map_err(|e| WriteError {
855            context: ErrorContext::WebSocketSink,
856            kind: e.into(),
857        }).with_flat_map::<W, _, _>(|msg| {
858            let mut buf = Vec::default();
859            match msg.write_sync(&mut buf) {
860                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
861                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
862                } else {
863                    Either::Right(stream::iter(
864                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
865                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
866                        .collect::<Vec<_>>()
867                    ))
868                }.map(Ok)),
869                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
870                    context: ErrorContext::WebSocket {
871                        source: Box::new(context),
872                    },
873                    kind,
874                }))),
875            }
876        }),
877        stream.scan(None, |state, res| {
878            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
879                let packet = res.map_err(|e| ReadError {
880                    context: ErrorContext::WebSocketStream,
881                    kind: e.into(),
882                })?;
883                Ok(if let Some((len, buf)) = state {
884                    if let tungstenite021::Message::Binary(data) = packet {
885                        buf.extend_from_slice(&data);
886                    } else {
887                        return Err(ReadError {
888                            context: ErrorContext::DefaultImpl,
889                            kind: ReadErrorKind::MessageKind021(packet),
890                        })
891                    }
892                    if buf.len() >= *len {
893                        let buf = mem::take(buf);
894                        *state = None;
895                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
896                            context: ErrorContext::WebSocket {
897                                source: Box::new(context),
898                            },
899                            kind,
900                        })?)))
901                    } else {
902                        Either::Left(stream::empty())
903                    }
904                } else {
905                    match packet {
906                        tungstenite021::Message::Text(data) => match data.chars().next() {
907                            Some('m') => {
908                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
909                                    context: ErrorContext::DefaultImpl,
910                                    kind: e.into(),
911                                })?;
912                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
913                                    context: ErrorContext::DefaultImpl,
914                                    kind: e.into(),
915                                })?;
916                                *state = Some((len, buf));
917                                Either::Left(stream::empty())
918                            }
919                            _ => return Err(ReadError {
920                                context: ErrorContext::DefaultImpl,
921                                kind: ReadErrorKind::WebSocketTextMessage024(data),
922                            }),
923                        },
924                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
925                            context: ErrorContext::WebSocket {
926                                source: Box::new(context),
927                            },
928                            kind,
929                        })?))),
930                        _ => return Err(ReadError {
931                            context: ErrorContext::DefaultImpl,
932                            kind: ReadErrorKind::MessageKind021(packet),
933                        }),
934                    }
935                })
936            }
937
938            future::ready(Some(scanner(state, res)))
939        }).try_flatten(),
940    ))
941}
942
943/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
944///
945/// Useful for WebSocket connections where the message type per direction is always the same.
946#[cfg(feature = "tokio-tungstenite024")]
947#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
948pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
949    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
950    let (sink, stream) = sock.split();
951    Ok((
952        sink.sink_map_err(|e| WriteError {
953            context: ErrorContext::WebSocketSink,
954            kind: e.into(),
955        }).with_flat_map::<W, _, _>(|msg| {
956            let mut buf = Vec::default();
957            match msg.write_sync(&mut buf) {
958                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
959                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
960                } else {
961                    Either::Right(stream::iter(
962                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
963                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
964                        .collect::<Vec<_>>()
965                    ))
966                }.map(Ok)),
967                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
968                    context: ErrorContext::WebSocket {
969                        source: Box::new(context),
970                    },
971                    kind,
972                }))),
973            }
974        }),
975        stream.scan(None, |state, res| {
976            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
977                let packet = res.map_err(|e| ReadError {
978                    context: ErrorContext::WebSocketStream,
979                    kind: e.into(),
980                })?;
981                Ok(if let Some((len, buf)) = state {
982                    if let tungstenite024::Message::Binary(data) = packet {
983                        buf.extend_from_slice(&data);
984                    } else {
985                        return Err(ReadError {
986                            context: ErrorContext::DefaultImpl,
987                            kind: ReadErrorKind::MessageKind024(packet),
988                        })
989                    }
990                    if buf.len() >= *len {
991                        let buf = mem::take(buf);
992                        *state = None;
993                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
994                            context: ErrorContext::WebSocket {
995                                source: Box::new(context),
996                            },
997                            kind,
998                        })?)))
999                    } else {
1000                        Either::Left(stream::empty())
1001                    }
1002                } else {
1003                    match packet {
1004                        tungstenite024::Message::Text(data) => match data.chars().next() {
1005                            Some('m') => {
1006                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1007                                    context: ErrorContext::DefaultImpl,
1008                                    kind: e.into(),
1009                                })?;
1010                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1011                                    context: ErrorContext::DefaultImpl,
1012                                    kind: e.into(),
1013                                })?;
1014                                *state = Some((len, buf));
1015                                Either::Left(stream::empty())
1016                            }
1017                            _ => return Err(ReadError {
1018                                context: ErrorContext::DefaultImpl,
1019                                kind: ReadErrorKind::WebSocketTextMessage024(data),
1020                            }),
1021                        },
1022                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1023                            context: ErrorContext::WebSocket {
1024                                source: Box::new(context),
1025                            },
1026                            kind,
1027                        })?))),
1028                        _ => return Err(ReadError {
1029                            context: ErrorContext::DefaultImpl,
1030                            kind: ReadErrorKind::MessageKind024(packet),
1031                        }),
1032                    }
1033                })
1034            }
1035
1036            future::ready(Some(scanner(state, res)))
1037        }).try_flatten(),
1038    ))
1039}
1040
1041/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
1042///
1043/// Useful for WebSocket connections where the message type per direction is always the same.
1044#[cfg(feature = "tokio-tungstenite027")]
1045#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1046pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1047    let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1048    let (sink, stream) = sock.split();
1049    Ok((
1050        sink.sink_map_err(|e| WriteError {
1051            context: ErrorContext::WebSocketSink,
1052            kind: e.into(),
1053        }).with_flat_map::<W, _, _>(|msg| {
1054            let mut buf = Vec::default();
1055            match msg.write_sync(&mut buf) {
1056                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1057                    Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1058                } else {
1059                    Either::Right(stream::iter(
1060                        iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1061                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1062                        .collect::<Vec<_>>()
1063                    ))
1064                }.map(Ok)),
1065                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1066                    context: ErrorContext::WebSocket {
1067                        source: Box::new(context),
1068                    },
1069                    kind,
1070                }))),
1071            }
1072        }),
1073        stream.scan(None, |state, res| {
1074            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1075                let packet = res.map_err(|e| ReadError {
1076                    context: ErrorContext::WebSocketStream,
1077                    kind: e.into(),
1078                })?;
1079                Ok(if let Some((len, buf)) = state {
1080                    if let tungstenite027::Message::Binary(data) = packet {
1081                        buf.extend_from_slice(&data);
1082                    } else {
1083                        return Err(ReadError {
1084                            context: ErrorContext::DefaultImpl,
1085                            kind: ReadErrorKind::MessageKind027(packet),
1086                        })
1087                    }
1088                    if buf.len() >= *len {
1089                        let buf = mem::take(buf);
1090                        *state = None;
1091                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1092                            context: ErrorContext::WebSocket {
1093                                source: Box::new(context),
1094                            },
1095                            kind,
1096                        })?)))
1097                    } else {
1098                        Either::Left(stream::empty())
1099                    }
1100                } else {
1101                    match packet {
1102                        tungstenite027::Message::Text(data) => match data.chars().next() {
1103                            Some('m') => {
1104                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1105                                    context: ErrorContext::DefaultImpl,
1106                                    kind: e.into(),
1107                                })?;
1108                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1109                                    context: ErrorContext::DefaultImpl,
1110                                    kind: e.into(),
1111                                })?;
1112                                *state = Some((len, buf));
1113                                Either::Left(stream::empty())
1114                            }
1115                            _ => return Err(ReadError {
1116                                context: ErrorContext::DefaultImpl,
1117                                kind: ReadErrorKind::WebSocketTextMessage027(data),
1118                            }),
1119                        },
1120                        tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1121                            context: ErrorContext::WebSocket {
1122                                source: Box::new(context),
1123                            },
1124                            kind,
1125                        })?))),
1126                        _ => return Err(ReadError {
1127                            context: ErrorContext::DefaultImpl,
1128                            kind: ReadErrorKind::MessageKind027(packet),
1129                        }),
1130                    }
1131                })
1132            }
1133
1134            future::ready(Some(scanner(state, res)))
1135        }).try_flatten(),
1136    ))
1137}