async_proto/impls/
nonempty_collections.rs

1use {
2    std::{
3        hash::Hash,
4        io::prelude::*,
5        num::NonZero,
6        pin::Pin,
7    },
8    nonempty_collections::{
9        NEMap,
10        NESet,
11        NEVec,
12    },
13    tokio::io::{
14        AsyncRead,
15        AsyncWrite,
16    },
17    crate::{
18        ErrorContext,
19        LengthPrefixed,
20        Protocol,
21        ReadError,
22        ReadErrorKind,
23        WriteError,
24    },
25};
26
27/// A vector is prefixed with the length as a [`u64`].
28#[cfg_attr(docsrs, doc(cfg(feature = "nonempty-collections")))]
29impl<T: Protocol + Send + Sync> Protocol for NEVec<T> {
30        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
31        Self::read_length_prefixed(stream, u64::MAX)
32    }
33
34    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
35        self.write_length_prefixed(sink, u64::MAX)
36    }
37
38    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
39        Self::read_length_prefixed_sync(stream, u64::MAX)
40    }
41
42    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
43        self.write_length_prefixed_sync(sink, u64::MAX)
44    }
45}
46
47#[cfg_attr(docsrs, doc(cfg(feature = "nonempty-collections")))]
48impl<T: Protocol + Send + Sync> LengthPrefixed for NEVec<T> {
49    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
50        Box::pin(async move {
51            let len = super::read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "NEVec" }).await?;
52            let len = NonZero::new(len).ok_or_else(|| ReadError {
53                context: ErrorContext::BuiltIn { for_type: "NEVec" },
54                kind: ReadErrorKind::UnknownVariant64(0),
55            })?;
56            let mut buf = Self::with_capacity(len, T::read(stream).await?); //TODO use fallible allocation once available
57            for _ in 1..len.get() {
58                buf.push(T::read(stream).await?);
59            }
60            Ok(buf)
61        })
62    }
63
64    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
65        Box::pin(async move {
66            super::write_len(sink, self.len().get(), max_len, || ErrorContext::BuiltIn { for_type: "NEVec" }).await?;
67            for elt in self {
68                elt.write(sink).await?;
69            }
70            Ok(())
71        })
72    }
73
74    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
75        let len = super::read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "NEVec" })?;
76        let len = NonZero::new(len).ok_or_else(|| ReadError {
77            context: ErrorContext::BuiltIn { for_type: "NEVec" },
78            kind: ReadErrorKind::UnknownVariant64(0),
79        })?;
80        let mut buf = Self::with_capacity(len, T::read_sync(stream)?); //TODO use fallible allocation once available
81        for _ in 1..len.get() {
82            buf.push(T::read_sync(stream)?);
83        }
84        Ok(buf)
85    }
86
87    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
88        super::write_len_sync(sink, self.len().get(), max_len, || ErrorContext::BuiltIn { for_type: "NEVec" })?;
89        for elt in self {
90            elt.write_sync(sink)?;
91        }
92        Ok(())
93    }
94}
95
96#[cfg_attr(docsrs, doc(cfg(feature = "nonempty-collections")))]
97impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for NESet<T> {
98    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
99        Self::read_length_prefixed(stream, u64::MAX)
100    }
101
102    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
103        self.write_length_prefixed(sink, u64::MAX)
104    }
105
106    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
107        Self::read_length_prefixed_sync(stream, u64::MAX)
108    }
109
110    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
111        self.write_length_prefixed_sync(sink, u64::MAX)
112    }
113}
114
115/// A set is prefixed with the length as a [`u64`].
116#[cfg_attr(docsrs, doc(cfg(feature = "nonempty-collections")))]
117impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for NESet<T> {
118    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
119        Box::pin(async move {
120            let len = super::read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "NESet" }).await?;
121            let len = NonZero::new(len).ok_or_else(|| ReadError {
122                context: ErrorContext::BuiltIn { for_type: "NESet" },
123                kind: ReadErrorKind::UnknownVariant64(0),
124            })?;
125            let mut set = Self::with_capacity(len, T::read(stream).await?); //TODO use fallible allocation once available
126            for _ in 1..len.get() {
127                set.insert(T::read(stream).await?);
128            }
129            Ok(set)
130        })
131    }
132
133    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
134        Box::pin(async move {
135            super::write_len(sink, self.len().get(), max_len, || ErrorContext::BuiltIn { for_type: "NESet" }).await?;
136            for elt in self {
137                elt.write(sink).await?;
138            }
139            Ok(())
140        })
141    }
142
143    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
144        let len = super::read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "NESet" })?;
145        let len = NonZero::new(len).ok_or_else(|| ReadError {
146            context: ErrorContext::BuiltIn { for_type: "NESet" },
147            kind: ReadErrorKind::UnknownVariant64(0),
148        })?;
149        let mut set = Self::with_capacity(len, T::read_sync(stream)?); //TODO use fallible allocation once available
150        for _ in 1..len.get() {
151            set.insert(T::read_sync(stream)?);
152        }
153        Ok(set)
154    }
155
156    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
157        super::write_len_sync(sink, self.len().get(), max_len, || ErrorContext::BuiltIn { for_type: "NESet" })?;
158        for elt in self {
159            elt.write_sync(sink)?;
160        }
161        Ok(())
162    }
163}
164
165/// A map is prefixed with the length as a [`u64`].
166#[cfg_attr(docsrs, doc(cfg(feature = "nonempty-collections")))]
167impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for NEMap<K, V> {
168    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
169        Self::read_length_prefixed(stream, u64::MAX)
170    }
171
172    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
173        self.write_length_prefixed(sink, u64::MAX)
174    }
175
176    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
177        Self::read_length_prefixed_sync(stream, u64::MAX)
178    }
179
180    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
181        self.write_length_prefixed_sync(sink, u64::MAX)
182    }
183}
184
185#[cfg_attr(docsrs, doc(cfg(feature = "nonempty-collections")))]
186impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for NEMap<K, V> {
187    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
188        Box::pin(async move {
189            let len = super::read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "NEMap" }).await?;
190            let len = NonZero::new(len).ok_or_else(|| ReadError {
191                context: ErrorContext::BuiltIn { for_type: "NEMap" },
192                kind: ReadErrorKind::UnknownVariant64(0),
193            })?;
194            let mut map = Self::with_capacity(len, K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
195            for _ in 1..len.get() {
196                map.insert(K::read(stream).await?, V::read(stream).await?);
197            }
198            Ok(map)
199        })
200    }
201
202    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
203        Box::pin(async move {
204            super::write_len(sink, self.len().get(), max_len, || ErrorContext::BuiltIn { for_type: "NEMap" }).await?;
205            for (k, v) in self {
206                k.write(sink).await?;
207                v.write(sink).await?;
208            }
209            Ok(())
210        })
211    }
212
213    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
214        let len = super::read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "NEMap" })?;
215        let len = NonZero::new(len).ok_or_else(|| ReadError {
216            context: ErrorContext::BuiltIn { for_type: "NEMap" },
217            kind: ReadErrorKind::UnknownVariant64(0),
218        })?;
219        let mut map = Self::with_capacity(len, K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
220        for _ in 1..len.get() {
221            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
222        }
223        Ok(map)
224    }
225
226    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
227        super::write_len_sync(sink, self.len().get(), max_len, || ErrorContext::BuiltIn { for_type: "NEMap" })?;
228        for (k, v) in self {
229            k.write_sync(sink)?;
230            v.write_sync(sink)?;
231        }
232        Ok(())
233    }
234}