async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
17//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
18//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
19//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
20//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
21//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
22//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
23//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
24//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
25//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
26//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
27//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
28//!
29//! Additionally, the following features can be enabled via Cargo:
30//!
31//! * `tokio-tungstenite`: Adds a dependency on the [`tokio-tungstenite`](https://docs.rs/tokio-tungstenite) crate and convenience methods for reading/writing [`Protocol`] types from/to its websockets.
32//! * `tungstenite`: Adds a dependency on the [`tungstenite`](https://docs.rs/tungstenite) crate and convenience methods for synchronously reading/writing [`Protocol`] types from/to its websockets.
33
34use {
35    std::{
36        future::Future,
37        io::{
38            self,
39            prelude::*,
40        },
41        pin::Pin,
42    },
43    tokio::io::{
44        AsyncRead,
45        AsyncWrite,
46    },
47};
48#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] use {
49    std::{
50        iter,
51        mem,
52    },
53    fallible_collections::FallibleVec,
54    futures::{
55        Sink,
56        SinkExt as _,
57        future::{
58            self,
59            Either,
60        },
61        stream::{
62            self,
63            Stream,
64            StreamExt as _,
65            TryStreamExt as _,
66        },
67    },
68};
69#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
70#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
71pub use {
72    async_proto_derive::{
73        Protocol,
74        bitflags,
75    },
76    crate::error::*,
77};
78#[doc(hidden)] pub use tokio; // used in proc macro
79
80mod error;
81mod impls;
82
83/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
84#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
85
86/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
87pub trait Protocol: Sized {
88    /// Reads a value of this type from an async stream.
89    ///
90    /// # Cancellation safety
91    ///
92    /// Implementations of this method are generally not cancellation safe.
93    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
94    /// Writes a value of this type to an async sink.
95    ///
96    /// # Cancellation safety
97    ///
98    /// Implementations of this method are generally not cancellation safe.
99    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
100    /// Reads a value of this type from a sync stream.
101    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
102    /// Writes a value of this type to a sync sink.
103    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
104
105    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
106    ///
107    /// This can be used to get around drop glue issues that might arise with `read`.
108    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
109        Box::pin(async move {
110            let value = Self::read(&mut stream).await?;
111            Ok((stream, value))
112        })
113    }
114
115    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
116    ///
117    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
118    ///
119    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use {
125    ///     std::{
126    ///         io,
127    ///         net::TcpStream,
128    ///     },
129    ///     async_proto::Protocol,
130    /// };
131    ///
132    /// struct Client {
133    ///     tcp_stream: TcpStream,
134    ///     buf: Vec<u8>,
135    /// }
136    ///
137    /// impl Client {
138    ///     fn new(tcp_stream: TcpStream) -> Self {
139    ///         Self {
140    ///             tcp_stream,
141    ///             buf: Vec::default(),
142    ///         }
143    ///     }
144    ///
145    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
146    ///         self.tcp_stream.set_nonblocking(true)?;
147    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
148    ///     }
149    ///
150    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
151    ///         self.tcp_stream.set_nonblocking(false)?;
152    ///         msg.write_sync(&mut self.tcp_stream)?;
153    ///         Ok(())
154    ///     }
155    /// }
156    /// ```
157    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
158        let mut temp_buf = vec![0; 8];
159        loop {
160            let mut slice = &mut &**buf;
161            match Self::read_sync(&mut slice) {
162                Ok(value) => {
163                    let value_len = slice.len();
164                    buf.drain(..buf.len() - value_len);
165                    return Ok(Some(value))
166                }
167                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
168                Err(e) => return Err(e),
169            }
170            match stream.read(&mut temp_buf) {
171                Ok(0) => return Err(ReadError {
172                    context: ErrorContext::DefaultImpl,
173                    kind: ReadErrorKind::EndOfStream,
174                }),
175                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
176                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
177                Err(e) => return Err(ReadError {
178                    context: ErrorContext::DefaultImpl,
179                    kind: e.into(),
180                }),
181            }
182        }
183    }
184
185    #[cfg(feature = "tokio-tungstenite021")]
186    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
187    /// Reads a value of this type from a `tokio-tungstenite` websocket.
188    ///
189    /// # Cancellation safety
190    ///
191    /// The default implementation of this method is cancellation safe.
192    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
193        Box::pin(async move {
194            let packet = stream.try_next().await.map_err(|e| ReadError {
195                context: ErrorContext::DefaultImpl,
196                kind: e.into(),
197            })?.ok_or_else(|| ReadError {
198                context: ErrorContext::DefaultImpl,
199                kind: ReadErrorKind::EndOfStream,
200            })?;
201            match packet {
202                tungstenite021::Message::Text(data) => match data.chars().next() {
203                    Some('m') => {
204                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
205                            context: ErrorContext::DefaultImpl,
206                            kind: e.into(),
207                        })?;
208                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
209                            context: ErrorContext::DefaultImpl,
210                            kind: e.into(),
211                        })?;
212                        while buf.len() < len {
213                            let packet = stream.try_next().await.map_err(|e| ReadError {
214                                context: ErrorContext::DefaultImpl,
215                                kind: e.into(),
216                            })?.ok_or_else(|| ReadError {
217                                context: ErrorContext::DefaultImpl,
218                                kind: ReadErrorKind::EndOfStream,
219                            })?;
220                            if let tungstenite021::Message::Binary(data) = packet {
221                                buf.extend_from_slice(&data);
222                            } else {
223                                return Err(ReadError {
224                                    context: ErrorContext::DefaultImpl,
225                                    kind: ReadErrorKind::MessageKind021(packet),
226                                })
227                            }
228                        }
229                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
230                            context: ErrorContext::WebSocket {
231                                source: Box::new(context),
232                            },
233                            kind,
234                        })
235                    }
236                    _ => Err(ReadError {
237                        context: ErrorContext::DefaultImpl,
238                        kind: ReadErrorKind::WebSocketTextMessage(data),
239                    }),
240                },
241                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
242                    context: ErrorContext::WebSocket {
243                        source: Box::new(context),
244                    },
245                    kind,
246                }),
247                _ => Err(ReadError {
248                    context: ErrorContext::DefaultImpl,
249                    kind: ReadErrorKind::MessageKind021(packet),
250                }),
251            }
252        })
253    }
254
255    #[cfg(feature = "tokio-tungstenite024")]
256    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
257    /// Reads a value of this type from a `tokio-tungstenite` websocket.
258    ///
259    /// # Cancellation safety
260    ///
261    /// The default implementation of this method is cancellation safe.
262    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
263        Box::pin(async move {
264            let packet = stream.try_next().await.map_err(|e| ReadError {
265                context: ErrorContext::DefaultImpl,
266                kind: e.into(),
267            })?.ok_or_else(|| ReadError {
268                context: ErrorContext::DefaultImpl,
269                kind: ReadErrorKind::EndOfStream,
270            })?;
271            match packet {
272                tungstenite024::Message::Text(data) => match data.chars().next() {
273                    Some('m') => {
274                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
275                            context: ErrorContext::DefaultImpl,
276                            kind: e.into(),
277                        })?;
278                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
279                            context: ErrorContext::DefaultImpl,
280                            kind: e.into(),
281                        })?;
282                        while buf.len() < len {
283                            let packet = stream.try_next().await.map_err(|e| ReadError {
284                                context: ErrorContext::DefaultImpl,
285                                kind: e.into(),
286                            })?.ok_or_else(|| ReadError {
287                                context: ErrorContext::DefaultImpl,
288                                kind: ReadErrorKind::EndOfStream,
289                            })?;
290                            if let tungstenite024::Message::Binary(data) = packet {
291                                buf.extend_from_slice(&data);
292                            } else {
293                                return Err(ReadError {
294                                    context: ErrorContext::DefaultImpl,
295                                    kind: ReadErrorKind::MessageKind024(packet),
296                                })
297                            }
298                        }
299                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
300                            context: ErrorContext::WebSocket {
301                                source: Box::new(context),
302                            },
303                            kind,
304                        })
305                    }
306                    _ => Err(ReadError {
307                        context: ErrorContext::DefaultImpl,
308                        kind: ReadErrorKind::WebSocketTextMessage(data),
309                    }),
310                },
311                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
312                    context: ErrorContext::WebSocket {
313                        source: Box::new(context),
314                    },
315                    kind,
316                }),
317                _ => Err(ReadError {
318                    context: ErrorContext::DefaultImpl,
319                    kind: ReadErrorKind::MessageKind024(packet),
320                }),
321            }
322        })
323    }
324
325    #[cfg(feature = "tokio-tungstenite021")]
326    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
327    /// Writes a value of this type to a `tokio-tungstenite` websocket.
328    ///
329    /// # Cancellation safety
330    ///
331    /// The default implementation of this method is not cancellation safe.
332    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
333    where Self: Sync {
334        Box::pin(async move {
335            let mut buf = Vec::default();
336            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
337                context: ErrorContext::WebSocket {
338                    source: Box::new(context),
339                },
340                kind,
341            })?;
342            if buf.len() <= WS_MAX_MESSAGE_SIZE {
343                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
344                    context: ErrorContext::DefaultImpl,
345                    kind: e.into(),
346                })?;
347            } else {
348                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
349                    context: ErrorContext::DefaultImpl,
350                    kind: e.into(),
351                })?;
352                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
353                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
354                        context: ErrorContext::DefaultImpl,
355                        kind: e.into(),
356                    })?;
357                }
358            }
359            Ok(())
360        })
361    }
362
363    #[cfg(feature = "tokio-tungstenite024")]
364    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
365    /// Writes a value of this type to a `tokio-tungstenite` websocket.
366    ///
367    /// # Cancellation safety
368    ///
369    /// The default implementation of this method is not cancellation safe.
370    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
371    where Self: Sync {
372        Box::pin(async move {
373            let mut buf = Vec::default();
374            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
375                context: ErrorContext::WebSocket {
376                    source: Box::new(context),
377                },
378                kind,
379            })?;
380            if buf.len() <= WS_MAX_MESSAGE_SIZE {
381                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
382                    context: ErrorContext::DefaultImpl,
383                    kind: e.into(),
384                })?;
385            } else {
386                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
387                    context: ErrorContext::DefaultImpl,
388                    kind: e.into(),
389                })?;
390                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
391                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
392                        context: ErrorContext::DefaultImpl,
393                        kind: e.into(),
394                    })?;
395                }
396            }
397            Ok(())
398        })
399    }
400
401    #[cfg(feature = "tokio-tungstenite021")]
402    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
403    /// Reads a value of this type from a [`tungstenite021`] websocket.
404    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
405        let packet = websocket.read().map_err(|e| ReadError {
406            context: ErrorContext::DefaultImpl,
407            kind: e.into(),
408        })?;
409        match packet {
410            tungstenite021::Message::Text(data) => match data.chars().next() {
411                Some('m') => {
412                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
413                        context: ErrorContext::DefaultImpl,
414                        kind: e.into(),
415                    })?;
416                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
417                        context: ErrorContext::DefaultImpl,
418                        kind: e.into(),
419                    })?;
420                    while buf.len() < len {
421                        let packet = websocket.read().map_err(|e| ReadError {
422                            context: ErrorContext::DefaultImpl,
423                            kind: e.into(),
424                        })?;
425                        if let tungstenite021::Message::Binary(data) = packet {
426                            buf.extend_from_slice(&data);
427                        } else {
428                            return Err(ReadError {
429                                context: ErrorContext::DefaultImpl,
430                                kind: ReadErrorKind::MessageKind021(packet),
431                            })
432                        }
433                    }
434                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
435                        context: ErrorContext::WebSocket {
436                            source: Box::new(context),
437                        },
438                        kind,
439                    })
440                }
441                _ => return Err(ReadError {
442                    context: ErrorContext::DefaultImpl,
443                    kind: ReadErrorKind::WebSocketTextMessage(data),
444                }),
445            },
446            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
447                context: ErrorContext::WebSocket {
448                    source: Box::new(context),
449                },
450                kind,
451            }),
452            _ => Err(ReadError {
453                context: ErrorContext::DefaultImpl,
454                kind: ReadErrorKind::MessageKind021(packet),
455            }),
456        }
457    }
458
459    #[cfg(feature = "tokio-tungstenite024")]
460    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
461    /// Reads a value of this type from a [`tungstenite024`] websocket.
462    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
463        let packet = websocket.read().map_err(|e| ReadError {
464            context: ErrorContext::DefaultImpl,
465            kind: e.into(),
466        })?;
467        match packet {
468            tungstenite024::Message::Text(data) => match data.chars().next() {
469                Some('m') => {
470                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
471                        context: ErrorContext::DefaultImpl,
472                        kind: e.into(),
473                    })?;
474                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
475                        context: ErrorContext::DefaultImpl,
476                        kind: e.into(),
477                    })?;
478                    while buf.len() < len {
479                        let packet = websocket.read().map_err(|e| ReadError {
480                            context: ErrorContext::DefaultImpl,
481                            kind: e.into(),
482                        })?;
483                        if let tungstenite024::Message::Binary(data) = packet {
484                            buf.extend_from_slice(&data);
485                        } else {
486                            return Err(ReadError {
487                                context: ErrorContext::DefaultImpl,
488                                kind: ReadErrorKind::MessageKind024(packet),
489                            })
490                        }
491                    }
492                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
493                        context: ErrorContext::WebSocket {
494                            source: Box::new(context),
495                        },
496                        kind,
497                    })
498                }
499                _ => return Err(ReadError {
500                    context: ErrorContext::DefaultImpl,
501                    kind: ReadErrorKind::WebSocketTextMessage(data),
502                }),
503            },
504            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
505                context: ErrorContext::WebSocket {
506                    source: Box::new(context),
507                },
508                kind,
509            }),
510            _ => Err(ReadError {
511                context: ErrorContext::DefaultImpl,
512                kind: ReadErrorKind::MessageKind024(packet),
513            }),
514        }
515    }
516
517    #[cfg(feature = "tokio-tungstenite021")]
518    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
519    /// Writes a value of this type to a [`tungstenite021`] websocket.
520    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
521        let mut buf = Vec::default();
522        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
523            context: ErrorContext::WebSocket {
524                source: Box::new(context),
525            },
526            kind,
527        })?;
528        if buf.len() <= WS_MAX_MESSAGE_SIZE {
529            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
530                context: ErrorContext::DefaultImpl,
531                kind: e.into(),
532            })?;
533        } else {
534            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
535                context: ErrorContext::DefaultImpl,
536                kind: e.into(),
537            })?;
538            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
539                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
540                    context: ErrorContext::DefaultImpl,
541                    kind: e.into(),
542                })?;
543            }
544        }
545        websocket.flush().map_err(|e| WriteError {
546            context: ErrorContext::DefaultImpl,
547            kind: e.into(),
548        })?;
549        Ok(())
550    }
551
552    #[cfg(feature = "tokio-tungstenite024")]
553    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
554    /// Writes a value of this type to a [`tungstenite024`] websocket.
555    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
556        let mut buf = Vec::default();
557        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
558            context: ErrorContext::WebSocket {
559                source: Box::new(context),
560            },
561            kind,
562        })?;
563        if buf.len() <= WS_MAX_MESSAGE_SIZE {
564            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
565                context: ErrorContext::DefaultImpl,
566                kind: e.into(),
567            })?;
568        } else {
569            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
570                context: ErrorContext::DefaultImpl,
571                kind: e.into(),
572            })?;
573            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
574                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
575                    context: ErrorContext::DefaultImpl,
576                    kind: e.into(),
577                })?;
578            }
579        }
580        websocket.flush().map_err(|e| WriteError {
581            context: ErrorContext::DefaultImpl,
582            kind: e.into(),
583        })?;
584        Ok(())
585    }
586
587    #[cfg(feature = "tokio-tungstenite021")]
588    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
589    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
590    ///
591    /// This can be used to get around drop glue issues that might arise with `read_ws`.
592    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
593        Box::pin(async move {
594            let value = Self::read_ws021(&mut stream).await?;
595            Ok((stream, value))
596        })
597    }
598
599    #[cfg(feature = "tokio-tungstenite024")]
600    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
601    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
602    ///
603    /// This can be used to get around drop glue issues that might arise with `read_ws`.
604    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
605        Box::pin(async move {
606            let value = Self::read_ws024(&mut stream).await?;
607            Ok((stream, value))
608        })
609    }
610}
611
612/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
613///
614/// Useful for WebSocket connections where the message type per direction is always the same.
615#[cfg(feature = "tokio-tungstenite021")]
616#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
617pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
618    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
619    let (sink, stream) = sock.split();
620    Ok((
621        sink.sink_map_err(|e| WriteError {
622            context: ErrorContext::WebSocketSink,
623            kind: e.into(),
624        }).with_flat_map::<W, _, _>(|msg| {
625            let mut buf = Vec::default();
626            match msg.write_sync(&mut buf) {
627                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
628                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
629                } else {
630                    Either::Right(stream::iter(
631                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
632                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
633                        .collect::<Vec<_>>()
634                    ))
635                }.map(Ok)),
636                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
637                    context: ErrorContext::WebSocket {
638                        source: Box::new(context),
639                    },
640                    kind,
641                }))),
642            }
643        }),
644        /*
645        stream.map_err(|e| ReadError {
646            context: ErrorContext::WebSocketStream,
647            kind: e.into(),
648        }).and_then(|packet| async move {
649            if !packet.is_binary() {
650                return Err(ReadError {
651                    context: ErrorContext::WebSocketStream,
652                    kind: ReadErrorKind::MessageKind(packet),
653                })
654            }
655            R::read_sync(&mut &*packet.into_data()).map_err(|ReadError { context, kind }| ReadError {
656                context: ErrorContext::WebSocket {
657                    source: Box::new(context),
658                },
659                kind,
660            })
661        }),
662        */
663        stream.scan(None, |state, res| {
664            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>>, ReadError> {
665                let packet = res.map_err(|e| ReadError {
666                    context: ErrorContext::WebSocketStream,
667                    kind: e.into(),
668                })?;
669                Ok(if let Some((len, buf)) = state {
670                    if let tungstenite021::Message::Binary(data) = packet {
671                        buf.extend_from_slice(&data);
672                    } else {
673                        return Err(ReadError {
674                            context: ErrorContext::DefaultImpl,
675                            kind: ReadErrorKind::MessageKind021(packet),
676                        })
677                    }
678                    if buf.len() >= *len {
679                        let buf = mem::take(buf);
680                        *state = None;
681                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
682                            context: ErrorContext::WebSocket {
683                                source: Box::new(context),
684                            },
685                            kind,
686                        })?)))
687                    } else {
688                        Either::Left(stream::empty())
689                    }
690                } else {
691                    match packet {
692                        tungstenite021::Message::Text(data) => match data.chars().next() {
693                            Some('m') => {
694                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
695                                    context: ErrorContext::DefaultImpl,
696                                    kind: e.into(),
697                                })?;
698                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
699                                    context: ErrorContext::DefaultImpl,
700                                    kind: e.into(),
701                                })?;
702                                *state = Some((len, buf));
703                                Either::Left(stream::empty())
704                            }
705                            _ => return Err(ReadError {
706                                context: ErrorContext::DefaultImpl,
707                                kind: ReadErrorKind::WebSocketTextMessage(data),
708                            }),
709                        },
710                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
711                            context: ErrorContext::WebSocket {
712                                source: Box::new(context),
713                            },
714                            kind,
715                        })?))),
716                        _ => return Err(ReadError {
717                            context: ErrorContext::DefaultImpl,
718                            kind: ReadErrorKind::MessageKind021(packet),
719                        }),
720                    }
721                })
722            }
723
724            future::ready(Some(scanner(state, res)))
725        }).try_flatten(),
726    ))
727}
728
729/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
730///
731/// Useful for WebSocket connections where the message type per direction is always the same.
732#[cfg(feature = "tokio-tungstenite024")]
733#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
734pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
735    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
736    let (sink, stream) = sock.split();
737    Ok((
738        sink.sink_map_err(|e| WriteError {
739            context: ErrorContext::WebSocketSink,
740            kind: e.into(),
741        }).with_flat_map::<W, _, _>(|msg| {
742            let mut buf = Vec::default();
743            match msg.write_sync(&mut buf) {
744                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
745                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
746                } else {
747                    Either::Right(stream::iter(
748                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
749                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
750                        .collect::<Vec<_>>()
751                    ))
752                }.map(Ok)),
753                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
754                    context: ErrorContext::WebSocket {
755                        source: Box::new(context),
756                    },
757                    kind,
758                }))),
759            }
760        }),
761        /*
762        stream.map_err(|e| ReadError {
763            context: ErrorContext::WebSocketStream,
764            kind: e.into(),
765        }).and_then(|packet| async move {
766            if !packet.is_binary() {
767                return Err(ReadError {
768                    context: ErrorContext::WebSocketStream,
769                    kind: ReadErrorKind::MessageKind(packet),
770                })
771            }
772            R::read_sync(&mut &*packet.into_data()).map_err(|ReadError { context, kind }| ReadError {
773                context: ErrorContext::WebSocket {
774                    source: Box::new(context),
775                },
776                kind,
777            })
778        }),
779        */
780        stream.scan(None, |state, res| {
781            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>>, ReadError> {
782                let packet = res.map_err(|e| ReadError {
783                    context: ErrorContext::WebSocketStream,
784                    kind: e.into(),
785                })?;
786                Ok(if let Some((len, buf)) = state {
787                    if let tungstenite024::Message::Binary(data) = packet {
788                        buf.extend_from_slice(&data);
789                    } else {
790                        return Err(ReadError {
791                            context: ErrorContext::DefaultImpl,
792                            kind: ReadErrorKind::MessageKind024(packet),
793                        })
794                    }
795                    if buf.len() >= *len {
796                        let buf = mem::take(buf);
797                        *state = None;
798                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
799                            context: ErrorContext::WebSocket {
800                                source: Box::new(context),
801                            },
802                            kind,
803                        })?)))
804                    } else {
805                        Either::Left(stream::empty())
806                    }
807                } else {
808                    match packet {
809                        tungstenite024::Message::Text(data) => match data.chars().next() {
810                            Some('m') => {
811                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
812                                    context: ErrorContext::DefaultImpl,
813                                    kind: e.into(),
814                                })?;
815                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
816                                    context: ErrorContext::DefaultImpl,
817                                    kind: e.into(),
818                                })?;
819                                *state = Some((len, buf));
820                                Either::Left(stream::empty())
821                            }
822                            _ => return Err(ReadError {
823                                context: ErrorContext::DefaultImpl,
824                                kind: ReadErrorKind::WebSocketTextMessage(data),
825                            }),
826                        },
827                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
828                            context: ErrorContext::WebSocket {
829                                source: Box::new(context),
830                            },
831                            kind,
832                        })?))),
833                        _ => return Err(ReadError {
834                            context: ErrorContext::DefaultImpl,
835                            kind: ReadErrorKind::MessageKind024(packet),
836                        }),
837                    }
838                })
839            }
840
841            future::ready(Some(scanner(state, res)))
842        }).try_flatten(),
843    ))
844}