p2panda_stream/stream/
decode.rs1use std::marker::PhantomData;
4use std::pin::Pin;
5
6use futures_util::stream::{Fuse, FusedStream};
7use futures_util::task::{Context, Poll};
8use futures_util::{Sink, Stream, StreamExt, ready};
9use p2panda_core::cbor::{DecodeError, decode_cbor};
10use p2panda_core::{Body, Extensions, Header, RawOperation};
11use pin_project::pin_project;
12
13use crate::macros::{delegate_access_inner, delegate_sink};
14
15pub trait DecodeExt<E>: Stream<Item = RawOperation> {
18 fn decode(self) -> Decode<Self, E>
20 where
21 E: Extensions,
22 Self: Sized,
23 {
24 Decode::new(self)
25 }
26}
27
28impl<T: ?Sized, E> DecodeExt<E> for T where T: Stream<Item = RawOperation> {}
29
30#[derive(Debug)]
32#[pin_project]
33#[must_use = "streams do nothing unless polled"]
34pub struct Decode<St, E>
35where
36 St: Stream<Item = RawOperation>,
37 E: Extensions,
38{
39 #[pin]
40 stream: Fuse<St>,
41 _marker: PhantomData<E>,
42}
43
44impl<St, E> Decode<St, E>
45where
46 St: Stream<Item = RawOperation>,
47 E: Extensions,
48{
49 pub(super) fn new(stream: St) -> Decode<St, E> {
50 Decode {
51 stream: stream.fuse(),
52 _marker: PhantomData,
53 }
54 }
55
56 delegate_access_inner!(stream, St, (.));
57}
58
59impl<St, E> Stream for Decode<St, E>
60where
61 St: Stream<Item = RawOperation>,
62 E: Extensions,
63{
64 type Item = Result<(Header<E>, Option<Body>, Vec<u8>), DecodeError>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 let mut this = self.project();
68 let res = ready!(this.stream.as_mut().poll_next(cx));
69 Poll::Ready(res.map(|(header_bytes, body_bytes)| {
70 match decode_cbor::<Header<E>, _>(&header_bytes[..]) {
71 Ok(header) => Ok((header, body_bytes.map(Body::from), header_bytes)),
72 Err(err) => Err(err),
73 }
74 }))
75 }
76
77 fn size_hint(&self) -> (usize, Option<usize>) {
78 self.stream.size_hint()
79 }
80}
81
82impl<St: FusedStream, E> FusedStream for Decode<St, E>
83where
84 St: Stream<Item = RawOperation>,
85 E: Extensions,
86{
87 fn is_terminated(&self) -> bool {
88 self.stream.is_terminated()
89 }
90}
91
92impl<S, E> Sink<RawOperation> for Decode<S, E>
93where
94 S: Stream<Item = RawOperation> + Sink<RawOperation>,
95 E: Extensions,
96{
97 type Error = S::Error;
98
99 delegate_sink!(stream, RawOperation);
100}
101
102#[cfg(test)]
103mod tests {
104 use futures_util::{StreamExt, TryStreamExt};
105 use p2panda_core::{Body, Header};
106
107 use crate::test_utils::{Extensions, mock_stream};
108
109 use super::DecodeExt;
110
111 #[tokio::test]
112 async fn decode() {
113 let stream = mock_stream().decode();
114 let result: Vec<(Header<Extensions>, Option<Body>, Vec<u8>)> =
115 stream.take(5).try_collect().await.expect("not fail");
116 assert_eq!(result.len(), 5);
117 }
118}