Skip to main content

mezzenger_utils/
numbered.rs

1//! Wrapper transport attaching a number to messages.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::{stream::FusedStream, Sink, Stream};
10use num::{traits::WrappingAdd, One, Zero};
11use pin_project::pin_project;
12use serde::{Deserialize, Serialize};
13
14/// Trait implemented by messages with attached number.
15pub trait Number {
16    /// Message number type.
17    type Output;
18
19    /// Message number.
20    fn number(&self) -> Self::Output;
21}
22
23/// Trait implemented by numbered messages that can be unwrapped into their original form.
24pub trait Unwrap {
25    /// Type of wrapped message.
26    type Output;
27
28    /// Unwrap message.
29    fn unwrap(self) -> Self::Output;
30}
31
32/// Message wrapper used by [`Numbered`] transport.
33#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
34pub struct Wrapper<N, T> {
35    /// Message number.
36    pub number: N,
37
38    /// Wrapped message.
39    pub wrapped: T,
40}
41
42impl<N, T> Number for Wrapper<N, T>
43where
44    N: Clone,
45{
46    type Output = N;
47
48    fn number(&self) -> Self::Output {
49        self.number.clone()
50    }
51}
52
53impl<N, T> Unwrap for Wrapper<N, T> {
54    type Output = T;
55
56    fn unwrap(self) -> Self::Output {
57        self.wrapped
58    }
59}
60
61/// Wrapper transport attaching number to sent messages.
62///
63/// Received messages are of [`Wrapper`] type.
64///
65/// First message number is [zero].<br>
66/// Next message number = previous message number + [one].
67///
68/// Message numbers will [wrap] when reaching maximum value.
69///
70/// [zero]: num::Zero
71/// [one]: num::One
72/// [wrap]: num::traits::WrappingAdd
73#[pin_project]
74pub struct Numbered<N, T, E, Incoming, Outgoing>
75where
76    T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
77    N: Clone + Zero + One + WrappingAdd,
78{
79    #[pin]
80    inner: T,
81    current_number: N,
82    _error: PhantomData<E>,
83    _number: PhantomData<N>,
84    _incoming: PhantomData<Incoming>,
85    _outgoing: PhantomData<Outgoing>,
86}
87
88impl<N, T, E, Incoming, Outgoing> Numbered<N, T, E, Incoming, Outgoing>
89where
90    T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
91    N: Clone + Zero + One + WrappingAdd,
92{
93    /// Create new [numbered] transport wrapping provided transport.
94    ///
95    /// [numbered]: self::Number
96    pub fn new(transport: T) -> Self {
97        Numbered {
98            inner: transport,
99            current_number: N::zero(),
100            _error: PhantomData,
101            _number: PhantomData,
102            _incoming: PhantomData,
103            _outgoing: PhantomData,
104        }
105    }
106
107    /// Number that will be given attached the next sent message.
108    pub fn current_number(&self) -> N {
109        self.current_number.clone()
110    }
111}
112
113impl<T, E, Incoming, Outgoing> Numbered<usize, T, E, Incoming, Outgoing>
114where
115    T: mezzenger::Transport<Wrapper<usize, Incoming>, Wrapper<usize, Outgoing>, E>,
116{
117    /// Create new [`Numbered`] transport using [`usize`] type as  message number.
118    pub fn new_usize(transport: T) -> Self {
119        Numbered::new(transport)
120    }
121}
122
123impl<T, E, Incoming, Outgoing> Numbered<u32, T, E, Incoming, Outgoing>
124where
125    T: mezzenger::Transport<Wrapper<u32, Incoming>, Wrapper<u32, Outgoing>, E>,
126{
127    /// Create new [`Numbered`] transport using [`u32`] type as  message number.
128    pub fn new_u32(transport: T) -> Self {
129        Numbered::new(transport)
130    }
131}
132
133impl<T, E, Incoming, Outgoing> Numbered<u64, T, E, Incoming, Outgoing>
134where
135    T: mezzenger::Transport<Wrapper<u64, Incoming>, Wrapper<u64, Outgoing>, E>,
136{
137    /// Create new [`Numbered`] transport using [`u64`] type as  message number.
138    pub fn new_u64(transport: T) -> Self {
139        Numbered::new(transport)
140    }
141}
142
143impl<T, E, Incoming, Outgoing> Numbered<u128, T, E, Incoming, Outgoing>
144where
145    T: mezzenger::Transport<Wrapper<u128, Incoming>, Wrapper<u128, Outgoing>, E>,
146{
147    /// Create new [`Numbered`] transport using [`u128`] type as  message number.
148    pub fn new_u128(transport: T) -> Self {
149        Numbered::new(transport)
150    }
151}
152
153impl<N, T, E, Incoming, Outgoing> Sink<Outgoing> for Numbered<N, T, E, Incoming, Outgoing>
154where
155    T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
156    N: Clone + Zero + One + WrappingAdd,
157{
158    type Error = mezzenger::Error<E>;
159
160    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161        let me = self.project();
162        me.inner.poll_ready(cx)
163    }
164
165    fn start_send(self: Pin<&mut Self>, item: Outgoing) -> Result<(), Self::Error> {
166        let me = self.project();
167        let item = Wrapper {
168            number: me.current_number.clone(),
169            wrapped: item,
170        };
171        let result = me.inner.start_send(item);
172        if result.is_ok() {
173            *me.current_number = me.current_number.wrapping_add(&One::one());
174        }
175        result
176    }
177
178    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
179        let me = self.project();
180        me.inner.poll_flush(cx)
181    }
182
183    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
184        let me = self.project();
185        me.inner.poll_close(cx)
186    }
187}
188
189impl<N, T, E, Incoming, Outgoing> Stream for Numbered<N, T, E, Incoming, Outgoing>
190where
191    T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
192    N: Clone + Zero + One + WrappingAdd,
193{
194    type Item = Result<Wrapper<N, Incoming>, E>;
195
196    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197        let me = self.project();
198        me.inner.poll_next(cx)
199    }
200}
201
202impl<N, T, E, Incoming, Outgoing> FusedStream for Numbered<N, T, E, Incoming, Outgoing>
203where
204    T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E> + FusedStream,
205    N: Clone + Zero + One + WrappingAdd,
206{
207    fn is_terminated(&self) -> bool {
208        self.inner.is_terminated()
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use futures::SinkExt;
215    use mezzenger::Receive;
216    use mezzenger_channel::transports;
217
218    #[cfg(target_arch = "wasm32")]
219    use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
220    #[cfg(target_arch = "wasm32")]
221    wasm_bindgen_test_configure!(run_in_browser);
222
223    use crate::numbered::{Numbered, Wrapper};
224
225    async fn test_transport_inner() {
226        let (left, right) = transports();
227
228        let mut left = Numbered::new_usize(left);
229        let mut right = Numbered::new_usize(right);
230
231        left.send(1).await.unwrap();
232        left.send(2).await.unwrap();
233        left.send(3).await.unwrap();
234
235        assert_eq!(
236            right.receive().await.unwrap(),
237            Wrapper {
238                number: 0,
239                wrapped: 1,
240            }
241        );
242        assert_eq!(
243            right.receive().await.unwrap(),
244            Wrapper {
245                number: 1,
246                wrapped: 2,
247            }
248        );
249        assert_eq!(
250            right.receive().await.unwrap(),
251            Wrapper {
252                number: 2,
253                wrapped: 3,
254            }
255        );
256
257        right.send(1).await.unwrap();
258        right.send(2).await.unwrap();
259        right.send(3).await.unwrap();
260
261        assert_eq!(
262            left.receive().await.unwrap(),
263            Wrapper {
264                number: 0,
265                wrapped: 1,
266            }
267        );
268        assert_eq!(
269            left.receive().await.unwrap(),
270            Wrapper {
271                number: 1,
272                wrapped: 2,
273            }
274        );
275        assert_eq!(
276            left.receive().await.unwrap(),
277            Wrapper {
278                number: 2,
279                wrapped: 3,
280            }
281        );
282    }
283
284    #[cfg(not(target_arch = "wasm32"))]
285    #[tokio::test]
286    async fn test_transport() {
287        test_transport_inner().await
288    }
289
290    #[cfg(target_arch = "wasm32")]
291    #[wasm_bindgen_test]
292    async fn test_transport() {
293        test_transport_inner().await
294    }
295}