Skip to main content

hypercore_protocol/
stream.rs

1//! Type-erased bidirectional byte stream for the protocol.
2//!
3//! This module provides [`BoxedStream`], which hides the concrete stream type
4//! from the public API while still supporting any `Stream<Item = Vec<u8>> + Sink<Vec<u8>>`.
5
6use std::io;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use futures::{Sink, Stream};
11
12/// A type-erased bidirectional byte stream for protocol communication.
13///
14/// This wrapper allows [`Protocol`](crate::Protocol) to have a non-generic public interface
15/// while still accepting any stream type that implements the required traits.
16pub struct BoxedStream {
17    inner: Box<dyn StreamSink + Send>,
18}
19
20impl std::fmt::Debug for BoxedStream {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("BoxedStream").finish_non_exhaustive()
23    }
24}
25
26/// Internal trait combining Stream + Sink operations for type erasure.
27trait StreamSink: Send + Sync {
28    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Vec<u8>>>;
29    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
30    fn start_send(&mut self, item: Vec<u8>) -> io::Result<()>;
31    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
32    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
33}
34
35/// Wrapper to implement StreamSink for any compatible type.
36struct StreamSinkWrapper<S>(S);
37
38impl<S> StreamSink for StreamSinkWrapper<S>
39where
40    S: Stream<Item = Vec<u8>> + Sink<Vec<u8>> + Unpin + Send + Sync,
41    <S as Sink<Vec<u8>>>::Error: Into<io::Error>,
42{
43    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Vec<u8>>> {
44        Pin::new(&mut self.0).poll_next(cx)
45    }
46
47    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
48        Pin::new(&mut self.0).poll_ready(cx).map_err(Into::into)
49    }
50
51    fn start_send(&mut self, item: Vec<u8>) -> io::Result<()> {
52        Pin::new(&mut self.0).start_send(item).map_err(Into::into)
53    }
54
55    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
56        Pin::new(&mut self.0).poll_flush(cx).map_err(Into::into)
57    }
58
59    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
60        Pin::new(&mut self.0).poll_close(cx).map_err(Into::into)
61    }
62}
63
64impl BoxedStream {
65    /// Create a new `BoxedStream` from any compatible stream.
66    ///
67    /// The stream must implement:
68    /// - `Stream<Item = Vec<u8>>` for receiving messages
69    /// - `Sink<Vec<u8>>` for sending messages
70    /// - `Unpin` and `Send`
71    pub fn new<S>(stream: S) -> Self
72    where
73        S: Stream<Item = Vec<u8>> + Sink<Vec<u8>> + Unpin + Send + Sync + 'static,
74        <S as Sink<Vec<u8>>>::Error: Into<io::Error>,
75    {
76        BoxedStream {
77            inner: Box::new(StreamSinkWrapper(stream)),
78        }
79    }
80}
81
82impl Stream for BoxedStream {
83    type Item = Vec<u8>;
84
85    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86        self.inner.poll_next(cx)
87    }
88}
89
90impl Sink<Vec<u8>> for BoxedStream {
91    type Error = io::Error;
92
93    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94        self.inner.poll_ready(cx)
95    }
96
97    fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
98        self.inner.start_send(item)
99    }
100
101    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102        self.inner.poll_flush(cx)
103    }
104
105    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106        self.inner.poll_close(cx)
107    }
108}