p2panda_stream/stream/
decode.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use futures_util::stream::{Fuse, FusedStream};
7use futures_util::task::{Context, Poll};
8use futures_util::{Sink, Stream, StreamExt, ready};
9use p2panda_core::cbor::{DecodeError, decode_cbor};
10use p2panda_core::{Body, Extensions, Header, RawOperation};
11use pin_project::pin_project;
12
13use crate::macros::{delegate_access_inner, delegate_sink};
14
15/// An extension trait for `Stream`s that provides a convenient [`decode`](DecodeExt::decode)
16/// method.
17pub trait DecodeExt<E>: Stream<Item = RawOperation> {
18    /// Decode byte streams into p2panda operations.
19    fn decode(self) -> Decode<Self, E>
20    where
21        E: Extensions,
22        Self: Sized,
23    {
24        Decode::new(self)
25    }
26}
27
28impl<T: ?Sized, E> DecodeExt<E> for T where T: Stream<Item = RawOperation> {}
29
30/// Stream for the [`decode`](DecodeExt::decode) method.
31#[derive(Debug)]
32#[pin_project]
33#[must_use = "streams do nothing unless polled"]
34pub struct Decode<St, E>
35where
36    St: Stream<Item = RawOperation>,
37    E: Extensions,
38{
39    #[pin]
40    stream: Fuse<St>,
41    _marker: PhantomData<E>,
42}
43
44impl<St, E> Decode<St, E>
45where
46    St: Stream<Item = RawOperation>,
47    E: Extensions,
48{
49    pub(super) fn new(stream: St) -> Decode<St, E> {
50        Decode {
51            stream: stream.fuse(),
52            _marker: PhantomData,
53        }
54    }
55
56    delegate_access_inner!(stream, St, (.));
57}
58
59impl<St, E> Stream for Decode<St, E>
60where
61    St: Stream<Item = RawOperation>,
62    E: Extensions,
63{
64    type Item = Result<(Header<E>, Option<Body>, Vec<u8>), DecodeError>;
65
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        let mut this = self.project();
68        let res = ready!(this.stream.as_mut().poll_next(cx));
69        Poll::Ready(res.map(|(header_bytes, body_bytes)| {
70            match decode_cbor::<Header<E>, _>(&header_bytes[..]) {
71                Ok(header) => Ok((header, body_bytes.map(Body::from), header_bytes)),
72                Err(err) => Err(err),
73            }
74        }))
75    }
76
77    fn size_hint(&self) -> (usize, Option<usize>) {
78        self.stream.size_hint()
79    }
80}
81
82impl<St: FusedStream, E> FusedStream for Decode<St, E>
83where
84    St: Stream<Item = RawOperation>,
85    E: Extensions,
86{
87    fn is_terminated(&self) -> bool {
88        self.stream.is_terminated()
89    }
90}
91
92impl<S, E> Sink<RawOperation> for Decode<S, E>
93where
94    S: Stream<Item = RawOperation> + Sink<RawOperation>,
95    E: Extensions,
96{
97    type Error = S::Error;
98
99    delegate_sink!(stream, RawOperation);
100}
101
102#[cfg(test)]
103mod tests {
104    use futures_util::{StreamExt, TryStreamExt};
105    use p2panda_core::{Body, Header};
106
107    use crate::test_utils::{Extensions, mock_stream};
108
109    use super::DecodeExt;
110
111    #[tokio::test]
112    async fn decode() {
113        let stream = mock_stream().decode();
114        let result: Vec<(Header<Extensions>, Option<Body>, Vec<u8>)> =
115            stream.take(5).try_collect().await.expect("not fail");
116        assert_eq!(result.len(), 5);
117    }
118}