async_proto/
lib.rs

1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! This is `async-proto`, a library crate facilitating simple binary network protocols with `async` support.
7//!
8//! The main feature is the [`Protocol`] trait, which allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
9//!
10//! [`Protocol`] can be derived for `enum`s and `struct`s if all fields implement [`Protocol`].
11//!
12//! # Features
13//!
14//! This crate offers optional dependencies on the following crates to enable [`Protocol`] implementations for some of their types:
15//!
16//! * [`bytes`](https://docs.rs/bytes): [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html)
17//! * [`chrono`](https://docs.rs/chrono): [`NaiveDate`](https://docs.rs/chrono/latest/chrono/naive/struct.NaiveDate.html), [`DateTime`](https://docs.rs/chrono/latest/chrono/struct.DateTime.html), [`Utc`](https://docs.rs/chrono/latest/chrono/offset/struct.Utc.html), and [`FixedOffset`](https://docs.rs/chrono/latest/chrono/offset/struct.FixedOffset.html)
18//! * [`chrono-tz`](https://docs.rs/chrono-tz): [`Tz`](https://docs.rs/chrono-tz/latest/chrono_tz/enum.Tz.html)
19//! * [`doubloon`](https://docs.rs/doubloon): [`Money`](https://docs.rs/doubloon/latest/doubloon/struct.Money.html) and all [ISO currencies](https://docs.rs/doubloon/latest/doubloon/iso_currencies/index.html)
20//! * [`either`](https://docs.rs/either): [`Either`](https://docs.rs/either/latest/either/enum.Either.html)
21//! * [`enumset`](https://docs.rs/enumset): [`EnumSet`](https://docs.rs/enumset/latest/enumset/struct.EnumSet.html)
22//! * [`git2`](https://docs.rs/git2): [`Oid`](https://docs.rs/git2/latest/git2/struct.Oid.html)
23//! * [`gix-hash`](https://docs.rs/gix-hash): [`ObjectId`](https://docs.rs/gix-hash/latest/gix_hash/enum.ObjectId.html)
24//! * [`noisy_float`](https://docs.rs/noisy_float): [`NoisyFloat`](https://docs.rs/noisy_float/latest/noisy_float/struct.NoisyFloat.html)
25//! * [`rust_decimal`](https://docs.rs/rust_decimal): [`Decimal`](https://docs.rs/rust_decimal/latest/rust_decimal/struct.Decimal.html)
26//! * [`semver`](https://docs.rs/semver): [`Version`](https://docs.rs/semver/latest/semver/struct.Version.html), [`Prerelease`](https://docs.rs/semver/latest/semver/struct.Prerelease.html), and [`BuildMetadata`](https://docs.rs/semver/latest/semver/struct.BuildMetadata.html)
27//! * [`serde_json`](https://docs.rs/serde_json): [`Value`](https://docs.rs/serde_json/latest/serde_json/enum.Value.html), [`Map`](https://docs.rs/serde_json/latest/serde_json/struct.Map.html), and [`Number`](https://docs.rs/serde_json/latest/serde_json/struct.Number.html)
28//! * [`serenity`](https://docs.rs/serenity): The [ID types](https://docs.rs/serenity/latest/serenity/model/id/index.html), not including [`ShardId`](https://docs.rs/serenity/latest/serenity/model/id/struct.ShardId.html)
29//! * [`uuid`](https://docs.rs/uuid): [`Uuid`](https://docs.rs/uuid/latest/uuid/struct.Uuid.html)
30//!
31//! Additionally, the following features can be enabled via Cargo:
32//!
33//! * `tokio-tungstenite`: Adds a dependency on the [`tokio-tungstenite`](https://docs.rs/tokio-tungstenite) crate and convenience methods for reading/writing [`Protocol`] types from/to its websockets.
34//! * `tungstenite`: Adds a dependency on the [`tungstenite`](https://docs.rs/tungstenite) crate and convenience methods for synchronously reading/writing [`Protocol`] types from/to its websockets.
35
36use {
37    std::{
38        future::Future,
39        io::{
40            self,
41            prelude::*,
42        },
43        pin::Pin,
44    },
45    tokio::io::{
46        AsyncRead,
47        AsyncWrite,
48    },
49};
50#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] use {
51    std::{
52        iter,
53        mem,
54    },
55    fallible_collections::FallibleVec,
56    futures::{
57        Sink,
58        SinkExt as _,
59        future::{
60            self,
61            Either,
62        },
63        stream::{
64            self,
65            Stream,
66            StreamExt as _,
67            TryStreamExt as _,
68        },
69    },
70};
71#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
72#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
73pub use {
74    async_proto_derive::{
75        Protocol,
76        bitflags,
77    },
78    crate::error::*,
79};
80#[doc(hidden)] pub use tokio; // used in proc macro
81
82mod error;
83mod impls;
84
85/// The maximum message size that can be sent and received by tokio-tungstenite without errors on the default configuration.
86#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
87
88/// This trait allows reading a value of an implementing type from an async or sync stream, as well as writing one to an async or sync sink.
89pub trait Protocol: Sized {
90    /// Reads a value of this type from an async stream.
91    ///
92    /// # Cancellation safety
93    ///
94    /// Implementations of this method are generally not cancellation safe.
95    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
96    /// Writes a value of this type to an async sink.
97    ///
98    /// # Cancellation safety
99    ///
100    /// Implementations of this method are generally not cancellation safe.
101    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
102    /// Reads a value of this type from a sync stream.
103    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
104    /// Writes a value of this type to a sync sink.
105    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
106
107    /// Takes ownership of an async stream, reads a value of this type from it, then returns it along with the stream.
108    ///
109    /// This can be used to get around drop glue issues that might arise with `read`.
110    fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
111        Box::pin(async move {
112            let value = Self::read(&mut stream).await?;
113            Ok((stream, value))
114        })
115    }
116
117    /// Attempts to read a value of this type from a prefix in a buffer and a suffix in a sync stream.
118    ///
119    /// If [`io::ErrorKind::WouldBlock`] is encountered, `Ok(None)` is returned and the portion read successfully is appended to `buf`. Otherwise, the prefix representing the returned value is removed from `buf`.
120    ///
121    /// Callers, not implementations, should ensure that `stream` is non-blocking if desired.
122    ///
123    /// # Example
124    ///
125    /// ```
126    /// use {
127    ///     std::{
128    ///         io,
129    ///         net::TcpStream,
130    ///     },
131    ///     async_proto::Protocol,
132    /// };
133    ///
134    /// struct Client {
135    ///     tcp_stream: TcpStream,
136    ///     buf: Vec<u8>,
137    /// }
138    ///
139    /// impl Client {
140    ///     fn new(tcp_stream: TcpStream) -> Self {
141    ///         Self {
142    ///             tcp_stream,
143    ///             buf: Vec::default(),
144    ///         }
145    ///     }
146    ///
147    ///     fn try_read<T: Protocol>(&mut self) -> io::Result<Option<T>> {
148    ///         self.tcp_stream.set_nonblocking(true)?;
149    ///         Ok(T::try_read(&mut self.tcp_stream, &mut self.buf)?)
150    ///     }
151    ///
152    ///     fn write<T: Protocol>(&mut self, msg: &T) -> io::Result<()> {
153    ///         self.tcp_stream.set_nonblocking(false)?;
154    ///         msg.write_sync(&mut self.tcp_stream)?;
155    ///         Ok(())
156    ///     }
157    /// }
158    /// ```
159    fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
160        let mut temp_buf = vec![0; 8];
161        loop {
162            let mut slice = &mut &**buf;
163            match Self::read_sync(&mut slice) {
164                Ok(value) => {
165                    let value_len = slice.len();
166                    buf.drain(..buf.len() - value_len);
167                    return Ok(Some(value))
168                }
169                Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
170                Err(e) => return Err(e),
171            }
172            match stream.read(&mut temp_buf) {
173                Ok(0) => return Err(ReadError {
174                    context: ErrorContext::DefaultImpl,
175                    kind: ReadErrorKind::EndOfStream,
176                }),
177                Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
178                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
179                Err(e) => return Err(ReadError {
180                    context: ErrorContext::DefaultImpl,
181                    kind: e.into(),
182                }),
183            }
184        }
185    }
186
187    #[cfg(feature = "tokio-tungstenite021")]
188    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
189    /// Reads a value of this type from a `tokio-tungstenite` websocket.
190    ///
191    /// # Cancellation safety
192    ///
193    /// The default implementation of this method is cancellation safe.
194    fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
195        Box::pin(async move {
196            let packet = stream.try_next().await.map_err(|e| ReadError {
197                context: ErrorContext::DefaultImpl,
198                kind: e.into(),
199            })?.ok_or_else(|| ReadError {
200                context: ErrorContext::DefaultImpl,
201                kind: ReadErrorKind::EndOfStream,
202            })?;
203            match packet {
204                tungstenite021::Message::Text(data) => match data.chars().next() {
205                    Some('m') => {
206                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
207                            context: ErrorContext::DefaultImpl,
208                            kind: e.into(),
209                        })?;
210                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
211                            context: ErrorContext::DefaultImpl,
212                            kind: e.into(),
213                        })?;
214                        while buf.len() < len {
215                            let packet = stream.try_next().await.map_err(|e| ReadError {
216                                context: ErrorContext::DefaultImpl,
217                                kind: e.into(),
218                            })?.ok_or_else(|| ReadError {
219                                context: ErrorContext::DefaultImpl,
220                                kind: ReadErrorKind::EndOfStream,
221                            })?;
222                            if let tungstenite021::Message::Binary(data) = packet {
223                                buf.extend_from_slice(&data);
224                            } else {
225                                return Err(ReadError {
226                                    context: ErrorContext::DefaultImpl,
227                                    kind: ReadErrorKind::MessageKind021(packet),
228                                })
229                            }
230                        }
231                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
232                            context: ErrorContext::WebSocket {
233                                source: Box::new(context),
234                            },
235                            kind,
236                        })
237                    }
238                    _ => Err(ReadError {
239                        context: ErrorContext::DefaultImpl,
240                        kind: ReadErrorKind::WebSocketTextMessage(data),
241                    }),
242                },
243                tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
244                    context: ErrorContext::WebSocket {
245                        source: Box::new(context),
246                    },
247                    kind,
248                }),
249                _ => Err(ReadError {
250                    context: ErrorContext::DefaultImpl,
251                    kind: ReadErrorKind::MessageKind021(packet),
252                }),
253            }
254        })
255    }
256
257    #[cfg(feature = "tokio-tungstenite024")]
258    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
259    /// Reads a value of this type from a `tokio-tungstenite` websocket.
260    ///
261    /// # Cancellation safety
262    ///
263    /// The default implementation of this method is cancellation safe.
264    fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
265        Box::pin(async move {
266            let packet = stream.try_next().await.map_err(|e| ReadError {
267                context: ErrorContext::DefaultImpl,
268                kind: e.into(),
269            })?.ok_or_else(|| ReadError {
270                context: ErrorContext::DefaultImpl,
271                kind: ReadErrorKind::EndOfStream,
272            })?;
273            match packet {
274                tungstenite024::Message::Text(data) => match data.chars().next() {
275                    Some('m') => {
276                        let len = data[1..].parse::<usize>().map_err(|e| ReadError {
277                            context: ErrorContext::DefaultImpl,
278                            kind: e.into(),
279                        })?;
280                        let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
281                            context: ErrorContext::DefaultImpl,
282                            kind: e.into(),
283                        })?;
284                        while buf.len() < len {
285                            let packet = stream.try_next().await.map_err(|e| ReadError {
286                                context: ErrorContext::DefaultImpl,
287                                kind: e.into(),
288                            })?.ok_or_else(|| ReadError {
289                                context: ErrorContext::DefaultImpl,
290                                kind: ReadErrorKind::EndOfStream,
291                            })?;
292                            if let tungstenite024::Message::Binary(data) = packet {
293                                buf.extend_from_slice(&data);
294                            } else {
295                                return Err(ReadError {
296                                    context: ErrorContext::DefaultImpl,
297                                    kind: ReadErrorKind::MessageKind024(packet),
298                                })
299                            }
300                        }
301                        Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
302                            context: ErrorContext::WebSocket {
303                                source: Box::new(context),
304                            },
305                            kind,
306                        })
307                    }
308                    _ => Err(ReadError {
309                        context: ErrorContext::DefaultImpl,
310                        kind: ReadErrorKind::WebSocketTextMessage(data),
311                    }),
312                },
313                tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
314                    context: ErrorContext::WebSocket {
315                        source: Box::new(context),
316                    },
317                    kind,
318                }),
319                _ => Err(ReadError {
320                    context: ErrorContext::DefaultImpl,
321                    kind: ReadErrorKind::MessageKind024(packet),
322                }),
323            }
324        })
325    }
326
327    #[cfg(feature = "tokio-tungstenite021")]
328    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
329    /// Writes a value of this type to a `tokio-tungstenite` websocket.
330    ///
331    /// # Cancellation safety
332    ///
333    /// The default implementation of this method is not cancellation safe.
334    fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
335    where Self: Sync {
336        Box::pin(async move {
337            let mut buf = Vec::default();
338            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
339                context: ErrorContext::WebSocket {
340                    source: Box::new(context),
341                },
342                kind,
343            })?;
344            if buf.len() <= WS_MAX_MESSAGE_SIZE {
345                sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
346                    context: ErrorContext::DefaultImpl,
347                    kind: e.into(),
348                })?;
349            } else {
350                sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
351                    context: ErrorContext::DefaultImpl,
352                    kind: e.into(),
353                })?;
354                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
355                    sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
356                        context: ErrorContext::DefaultImpl,
357                        kind: e.into(),
358                    })?;
359                }
360            }
361            Ok(())
362        })
363    }
364
365    #[cfg(feature = "tokio-tungstenite024")]
366    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
367    /// Writes a value of this type to a `tokio-tungstenite` websocket.
368    ///
369    /// # Cancellation safety
370    ///
371    /// The default implementation of this method is not cancellation safe.
372    fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
373    where Self: Sync {
374        Box::pin(async move {
375            let mut buf = Vec::default();
376            self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
377                context: ErrorContext::WebSocket {
378                    source: Box::new(context),
379                },
380                kind,
381            })?;
382            if buf.len() <= WS_MAX_MESSAGE_SIZE {
383                sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
384                    context: ErrorContext::DefaultImpl,
385                    kind: e.into(),
386                })?;
387            } else {
388                sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
389                    context: ErrorContext::DefaultImpl,
390                    kind: e.into(),
391                })?;
392                for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
393                    sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
394                        context: ErrorContext::DefaultImpl,
395                        kind: e.into(),
396                    })?;
397                }
398            }
399            Ok(())
400        })
401    }
402
403    #[cfg(feature = "tokio-tungstenite021")]
404    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
405    /// Reads a value of this type from a [`tungstenite021`] websocket.
406    fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
407        let packet = websocket.read().map_err(|e| ReadError {
408            context: ErrorContext::DefaultImpl,
409            kind: e.into(),
410        })?;
411        match packet {
412            tungstenite021::Message::Text(data) => match data.chars().next() {
413                Some('m') => {
414                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
415                        context: ErrorContext::DefaultImpl,
416                        kind: e.into(),
417                    })?;
418                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
419                        context: ErrorContext::DefaultImpl,
420                        kind: e.into(),
421                    })?;
422                    while buf.len() < len {
423                        let packet = websocket.read().map_err(|e| ReadError {
424                            context: ErrorContext::DefaultImpl,
425                            kind: e.into(),
426                        })?;
427                        if let tungstenite021::Message::Binary(data) = packet {
428                            buf.extend_from_slice(&data);
429                        } else {
430                            return Err(ReadError {
431                                context: ErrorContext::DefaultImpl,
432                                kind: ReadErrorKind::MessageKind021(packet),
433                            })
434                        }
435                    }
436                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
437                        context: ErrorContext::WebSocket {
438                            source: Box::new(context),
439                        },
440                        kind,
441                    })
442                }
443                _ => return Err(ReadError {
444                    context: ErrorContext::DefaultImpl,
445                    kind: ReadErrorKind::WebSocketTextMessage(data),
446                }),
447            },
448            tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
449                context: ErrorContext::WebSocket {
450                    source: Box::new(context),
451                },
452                kind,
453            }),
454            _ => Err(ReadError {
455                context: ErrorContext::DefaultImpl,
456                kind: ReadErrorKind::MessageKind021(packet),
457            }),
458        }
459    }
460
461    #[cfg(feature = "tokio-tungstenite024")]
462    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
463    /// Reads a value of this type from a [`tungstenite024`] websocket.
464    fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
465        let packet = websocket.read().map_err(|e| ReadError {
466            context: ErrorContext::DefaultImpl,
467            kind: e.into(),
468        })?;
469        match packet {
470            tungstenite024::Message::Text(data) => match data.chars().next() {
471                Some('m') => {
472                    let len = data[1..].parse::<usize>().map_err(|e| ReadError {
473                        context: ErrorContext::DefaultImpl,
474                        kind: e.into(),
475                    })?;
476                    let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
477                        context: ErrorContext::DefaultImpl,
478                        kind: e.into(),
479                    })?;
480                    while buf.len() < len {
481                        let packet = websocket.read().map_err(|e| ReadError {
482                            context: ErrorContext::DefaultImpl,
483                            kind: e.into(),
484                        })?;
485                        if let tungstenite024::Message::Binary(data) = packet {
486                            buf.extend_from_slice(&data);
487                        } else {
488                            return Err(ReadError {
489                                context: ErrorContext::DefaultImpl,
490                                kind: ReadErrorKind::MessageKind024(packet),
491                            })
492                        }
493                    }
494                    Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
495                        context: ErrorContext::WebSocket {
496                            source: Box::new(context),
497                        },
498                        kind,
499                    })
500                }
501                _ => return Err(ReadError {
502                    context: ErrorContext::DefaultImpl,
503                    kind: ReadErrorKind::WebSocketTextMessage(data),
504                }),
505            },
506            tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
507                context: ErrorContext::WebSocket {
508                    source: Box::new(context),
509                },
510                kind,
511            }),
512            _ => Err(ReadError {
513                context: ErrorContext::DefaultImpl,
514                kind: ReadErrorKind::MessageKind024(packet),
515            }),
516        }
517    }
518
519    #[cfg(feature = "tokio-tungstenite021")]
520    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
521    /// Writes a value of this type to a [`tungstenite021`] websocket.
522    fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
523        let mut buf = Vec::default();
524        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
525            context: ErrorContext::WebSocket {
526                source: Box::new(context),
527            },
528            kind,
529        })?;
530        if buf.len() <= WS_MAX_MESSAGE_SIZE {
531            websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
532                context: ErrorContext::DefaultImpl,
533                kind: e.into(),
534            })?;
535        } else {
536            websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
537                context: ErrorContext::DefaultImpl,
538                kind: e.into(),
539            })?;
540            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
541                websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
542                    context: ErrorContext::DefaultImpl,
543                    kind: e.into(),
544                })?;
545            }
546        }
547        websocket.flush().map_err(|e| WriteError {
548            context: ErrorContext::DefaultImpl,
549            kind: e.into(),
550        })?;
551        Ok(())
552    }
553
554    #[cfg(feature = "tokio-tungstenite024")]
555    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
556    /// Writes a value of this type to a [`tungstenite024`] websocket.
557    fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
558        let mut buf = Vec::default();
559        self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
560            context: ErrorContext::WebSocket {
561                source: Box::new(context),
562            },
563            kind,
564        })?;
565        if buf.len() <= WS_MAX_MESSAGE_SIZE {
566            websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
567                context: ErrorContext::DefaultImpl,
568                kind: e.into(),
569            })?;
570        } else {
571            websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
572                context: ErrorContext::DefaultImpl,
573                kind: e.into(),
574            })?;
575            for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
576                websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
577                    context: ErrorContext::DefaultImpl,
578                    kind: e.into(),
579                })?;
580            }
581        }
582        websocket.flush().map_err(|e| WriteError {
583            context: ErrorContext::DefaultImpl,
584            kind: e.into(),
585        })?;
586        Ok(())
587    }
588
589    #[cfg(feature = "tokio-tungstenite021")]
590    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
591    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
592    ///
593    /// This can be used to get around drop glue issues that might arise with `read_ws`.
594    fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
595        Box::pin(async move {
596            let value = Self::read_ws021(&mut stream).await?;
597            Ok((stream, value))
598        })
599    }
600
601    #[cfg(feature = "tokio-tungstenite024")]
602    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
603    /// Takes ownership of an async websocket stream, reads a value of this type from it, then returns it along with the stream.
604    ///
605    /// This can be used to get around drop glue issues that might arise with `read_ws`.
606    fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
607        Box::pin(async move {
608            let value = Self::read_ws024(&mut stream).await?;
609            Ok((stream, value))
610        })
611    }
612}
613
614/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
615///
616/// Useful for WebSocket connections where the message type per direction is always the same.
617#[cfg(feature = "tokio-tungstenite021")]
618#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
619pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
620    let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
621    let (sink, stream) = sock.split();
622    Ok((
623        sink.sink_map_err(|e| WriteError {
624            context: ErrorContext::WebSocketSink,
625            kind: e.into(),
626        }).with_flat_map::<W, _, _>(|msg| {
627            let mut buf = Vec::default();
628            match msg.write_sync(&mut buf) {
629                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
630                    Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
631                } else {
632                    Either::Right(stream::iter(
633                        iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
634                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
635                        .collect::<Vec<_>>()
636                    ))
637                }.map(Ok)),
638                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
639                    context: ErrorContext::WebSocket {
640                        source: Box::new(context),
641                    },
642                    kind,
643                }))),
644            }
645        }),
646        /*
647        stream.map_err(|e| ReadError {
648            context: ErrorContext::WebSocketStream,
649            kind: e.into(),
650        }).and_then(|packet| async move {
651            if !packet.is_binary() {
652                return Err(ReadError {
653                    context: ErrorContext::WebSocketStream,
654                    kind: ReadErrorKind::MessageKind(packet),
655                })
656            }
657            R::read_sync(&mut &*packet.into_data()).map_err(|ReadError { context, kind }| ReadError {
658                context: ErrorContext::WebSocket {
659                    source: Box::new(context),
660                },
661                kind,
662            })
663        }),
664        */
665        stream.scan(None, |state, res| {
666            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
667                let packet = res.map_err(|e| ReadError {
668                    context: ErrorContext::WebSocketStream,
669                    kind: e.into(),
670                })?;
671                Ok(if let Some((len, buf)) = state {
672                    if let tungstenite021::Message::Binary(data) = packet {
673                        buf.extend_from_slice(&data);
674                    } else {
675                        return Err(ReadError {
676                            context: ErrorContext::DefaultImpl,
677                            kind: ReadErrorKind::MessageKind021(packet),
678                        })
679                    }
680                    if buf.len() >= *len {
681                        let buf = mem::take(buf);
682                        *state = None;
683                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
684                            context: ErrorContext::WebSocket {
685                                source: Box::new(context),
686                            },
687                            kind,
688                        })?)))
689                    } else {
690                        Either::Left(stream::empty())
691                    }
692                } else {
693                    match packet {
694                        tungstenite021::Message::Text(data) => match data.chars().next() {
695                            Some('m') => {
696                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
697                                    context: ErrorContext::DefaultImpl,
698                                    kind: e.into(),
699                                })?;
700                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
701                                    context: ErrorContext::DefaultImpl,
702                                    kind: e.into(),
703                                })?;
704                                *state = Some((len, buf));
705                                Either::Left(stream::empty())
706                            }
707                            _ => return Err(ReadError {
708                                context: ErrorContext::DefaultImpl,
709                                kind: ReadErrorKind::WebSocketTextMessage(data),
710                            }),
711                        },
712                        tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
713                            context: ErrorContext::WebSocket {
714                                source: Box::new(context),
715                            },
716                            kind,
717                        })?))),
718                        _ => return Err(ReadError {
719                            context: ErrorContext::DefaultImpl,
720                            kind: ReadErrorKind::MessageKind021(packet),
721                        }),
722                    }
723                })
724            }
725
726            future::ready(Some(scanner(state, res)))
727        }).try_flatten(),
728    ))
729}
730
731/// Establishes a WebSocket connection to the given URL and returns a typed sink/stream pair.
732///
733/// Useful for WebSocket connections where the message type per direction is always the same.
734#[cfg(feature = "tokio-tungstenite024")]
735#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
736pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
737    let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
738    let (sink, stream) = sock.split();
739    Ok((
740        sink.sink_map_err(|e| WriteError {
741            context: ErrorContext::WebSocketSink,
742            kind: e.into(),
743        }).with_flat_map::<W, _, _>(|msg| {
744            let mut buf = Vec::default();
745            match msg.write_sync(&mut buf) {
746                Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
747                    Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
748                } else {
749                    Either::Right(stream::iter(
750                        iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
751                        .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
752                        .collect::<Vec<_>>()
753                    ))
754                }.map(Ok)),
755                Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
756                    context: ErrorContext::WebSocket {
757                        source: Box::new(context),
758                    },
759                    kind,
760                }))),
761            }
762        }),
763        /*
764        stream.map_err(|e| ReadError {
765            context: ErrorContext::WebSocketStream,
766            kind: e.into(),
767        }).and_then(|packet| async move {
768            if !packet.is_binary() {
769                return Err(ReadError {
770                    context: ErrorContext::WebSocketStream,
771                    kind: ReadErrorKind::MessageKind(packet),
772                })
773            }
774            R::read_sync(&mut &*packet.into_data()).map_err(|ReadError { context, kind }| ReadError {
775                context: ErrorContext::WebSocket {
776                    source: Box::new(context),
777                },
778                kind,
779            })
780        }),
781        */
782        stream.scan(None, |state, res| {
783            fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
784                let packet = res.map_err(|e| ReadError {
785                    context: ErrorContext::WebSocketStream,
786                    kind: e.into(),
787                })?;
788                Ok(if let Some((len, buf)) = state {
789                    if let tungstenite024::Message::Binary(data) = packet {
790                        buf.extend_from_slice(&data);
791                    } else {
792                        return Err(ReadError {
793                            context: ErrorContext::DefaultImpl,
794                            kind: ReadErrorKind::MessageKind024(packet),
795                        })
796                    }
797                    if buf.len() >= *len {
798                        let buf = mem::take(buf);
799                        *state = None;
800                        Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
801                            context: ErrorContext::WebSocket {
802                                source: Box::new(context),
803                            },
804                            kind,
805                        })?)))
806                    } else {
807                        Either::Left(stream::empty())
808                    }
809                } else {
810                    match packet {
811                        tungstenite024::Message::Text(data) => match data.chars().next() {
812                            Some('m') => {
813                                let len = data[1..].parse::<usize>().map_err(|e| ReadError {
814                                    context: ErrorContext::DefaultImpl,
815                                    kind: e.into(),
816                                })?;
817                                let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
818                                    context: ErrorContext::DefaultImpl,
819                                    kind: e.into(),
820                                })?;
821                                *state = Some((len, buf));
822                                Either::Left(stream::empty())
823                            }
824                            _ => return Err(ReadError {
825                                context: ErrorContext::DefaultImpl,
826                                kind: ReadErrorKind::WebSocketTextMessage(data),
827                            }),
828                        },
829                        tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
830                            context: ErrorContext::WebSocket {
831                                source: Box::new(context),
832                            },
833                            kind,
834                        })?))),
835                        _ => return Err(ReadError {
836                            context: ErrorContext::DefaultImpl,
837                            kind: ReadErrorKind::MessageKind024(packet),
838                        }),
839                    }
840                })
841            }
842
843            future::ready(Some(scanner(state, res)))
844        }).try_flatten(),
845    ))
846}