1use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::{stream::FusedStream, Sink, Stream};
10use num::{traits::WrappingAdd, One, Zero};
11use pin_project::pin_project;
12use serde::{Deserialize, Serialize};
13
14pub trait Number {
16 type Output;
18
19 fn number(&self) -> Self::Output;
21}
22
23pub trait Unwrap {
25 type Output;
27
28 fn unwrap(self) -> Self::Output;
30}
31
32#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
34pub struct Wrapper<N, T> {
35 pub number: N,
37
38 pub wrapped: T,
40}
41
42impl<N, T> Number for Wrapper<N, T>
43where
44 N: Clone,
45{
46 type Output = N;
47
48 fn number(&self) -> Self::Output {
49 self.number.clone()
50 }
51}
52
53impl<N, T> Unwrap for Wrapper<N, T> {
54 type Output = T;
55
56 fn unwrap(self) -> Self::Output {
57 self.wrapped
58 }
59}
60
61#[pin_project]
74pub struct Numbered<N, T, E, Incoming, Outgoing>
75where
76 T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
77 N: Clone + Zero + One + WrappingAdd,
78{
79 #[pin]
80 inner: T,
81 current_number: N,
82 _error: PhantomData<E>,
83 _number: PhantomData<N>,
84 _incoming: PhantomData<Incoming>,
85 _outgoing: PhantomData<Outgoing>,
86}
87
88impl<N, T, E, Incoming, Outgoing> Numbered<N, T, E, Incoming, Outgoing>
89where
90 T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
91 N: Clone + Zero + One + WrappingAdd,
92{
93 pub fn new(transport: T) -> Self {
97 Numbered {
98 inner: transport,
99 current_number: N::zero(),
100 _error: PhantomData,
101 _number: PhantomData,
102 _incoming: PhantomData,
103 _outgoing: PhantomData,
104 }
105 }
106
107 pub fn current_number(&self) -> N {
109 self.current_number.clone()
110 }
111}
112
113impl<T, E, Incoming, Outgoing> Numbered<usize, T, E, Incoming, Outgoing>
114where
115 T: mezzenger::Transport<Wrapper<usize, Incoming>, Wrapper<usize, Outgoing>, E>,
116{
117 pub fn new_usize(transport: T) -> Self {
119 Numbered::new(transport)
120 }
121}
122
123impl<T, E, Incoming, Outgoing> Numbered<u32, T, E, Incoming, Outgoing>
124where
125 T: mezzenger::Transport<Wrapper<u32, Incoming>, Wrapper<u32, Outgoing>, E>,
126{
127 pub fn new_u32(transport: T) -> Self {
129 Numbered::new(transport)
130 }
131}
132
133impl<T, E, Incoming, Outgoing> Numbered<u64, T, E, Incoming, Outgoing>
134where
135 T: mezzenger::Transport<Wrapper<u64, Incoming>, Wrapper<u64, Outgoing>, E>,
136{
137 pub fn new_u64(transport: T) -> Self {
139 Numbered::new(transport)
140 }
141}
142
143impl<T, E, Incoming, Outgoing> Numbered<u128, T, E, Incoming, Outgoing>
144where
145 T: mezzenger::Transport<Wrapper<u128, Incoming>, Wrapper<u128, Outgoing>, E>,
146{
147 pub fn new_u128(transport: T) -> Self {
149 Numbered::new(transport)
150 }
151}
152
153impl<N, T, E, Incoming, Outgoing> Sink<Outgoing> for Numbered<N, T, E, Incoming, Outgoing>
154where
155 T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
156 N: Clone + Zero + One + WrappingAdd,
157{
158 type Error = mezzenger::Error<E>;
159
160 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161 let me = self.project();
162 me.inner.poll_ready(cx)
163 }
164
165 fn start_send(self: Pin<&mut Self>, item: Outgoing) -> Result<(), Self::Error> {
166 let me = self.project();
167 let item = Wrapper {
168 number: me.current_number.clone(),
169 wrapped: item,
170 };
171 let result = me.inner.start_send(item);
172 if result.is_ok() {
173 *me.current_number = me.current_number.wrapping_add(&One::one());
174 }
175 result
176 }
177
178 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
179 let me = self.project();
180 me.inner.poll_flush(cx)
181 }
182
183 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
184 let me = self.project();
185 me.inner.poll_close(cx)
186 }
187}
188
189impl<N, T, E, Incoming, Outgoing> Stream for Numbered<N, T, E, Incoming, Outgoing>
190where
191 T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E>,
192 N: Clone + Zero + One + WrappingAdd,
193{
194 type Item = Result<Wrapper<N, Incoming>, E>;
195
196 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197 let me = self.project();
198 me.inner.poll_next(cx)
199 }
200}
201
202impl<N, T, E, Incoming, Outgoing> FusedStream for Numbered<N, T, E, Incoming, Outgoing>
203where
204 T: mezzenger::Transport<Wrapper<N, Incoming>, Wrapper<N, Outgoing>, E> + FusedStream,
205 N: Clone + Zero + One + WrappingAdd,
206{
207 fn is_terminated(&self) -> bool {
208 self.inner.is_terminated()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use futures::SinkExt;
215 use mezzenger::Receive;
216 use mezzenger_channel::transports;
217
218 #[cfg(target_arch = "wasm32")]
219 use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
220 #[cfg(target_arch = "wasm32")]
221 wasm_bindgen_test_configure!(run_in_browser);
222
223 use crate::numbered::{Numbered, Wrapper};
224
225 async fn test_transport_inner() {
226 let (left, right) = transports();
227
228 let mut left = Numbered::new_usize(left);
229 let mut right = Numbered::new_usize(right);
230
231 left.send(1).await.unwrap();
232 left.send(2).await.unwrap();
233 left.send(3).await.unwrap();
234
235 assert_eq!(
236 right.receive().await.unwrap(),
237 Wrapper {
238 number: 0,
239 wrapped: 1,
240 }
241 );
242 assert_eq!(
243 right.receive().await.unwrap(),
244 Wrapper {
245 number: 1,
246 wrapped: 2,
247 }
248 );
249 assert_eq!(
250 right.receive().await.unwrap(),
251 Wrapper {
252 number: 2,
253 wrapped: 3,
254 }
255 );
256
257 right.send(1).await.unwrap();
258 right.send(2).await.unwrap();
259 right.send(3).await.unwrap();
260
261 assert_eq!(
262 left.receive().await.unwrap(),
263 Wrapper {
264 number: 0,
265 wrapped: 1,
266 }
267 );
268 assert_eq!(
269 left.receive().await.unwrap(),
270 Wrapper {
271 number: 1,
272 wrapped: 2,
273 }
274 );
275 assert_eq!(
276 left.receive().await.unwrap(),
277 Wrapper {
278 number: 2,
279 wrapped: 3,
280 }
281 );
282 }
283
284 #[cfg(not(target_arch = "wasm32"))]
285 #[tokio::test]
286 async fn test_transport() {
287 test_transport_inner().await
288 }
289
290 #[cfg(target_arch = "wasm32")]
291 #[wasm_bindgen_test]
292 async fn test_transport() {
293 test_transport_inner().await
294 }
295}