hypercore_protocol/
stream.rs1use std::io;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use futures::{Sink, Stream};
11
12pub struct BoxedStream {
17 inner: Box<dyn StreamSink + Send>,
18}
19
20impl std::fmt::Debug for BoxedStream {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("BoxedStream").finish_non_exhaustive()
23 }
24}
25
26trait StreamSink: Send + Sync {
28 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Vec<u8>>>;
29 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
30 fn start_send(&mut self, item: Vec<u8>) -> io::Result<()>;
31 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
32 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
33}
34
35struct StreamSinkWrapper<S>(S);
37
38impl<S> StreamSink for StreamSinkWrapper<S>
39where
40 S: Stream<Item = Vec<u8>> + Sink<Vec<u8>> + Unpin + Send + Sync,
41 <S as Sink<Vec<u8>>>::Error: Into<io::Error>,
42{
43 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Vec<u8>>> {
44 Pin::new(&mut self.0).poll_next(cx)
45 }
46
47 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
48 Pin::new(&mut self.0).poll_ready(cx).map_err(Into::into)
49 }
50
51 fn start_send(&mut self, item: Vec<u8>) -> io::Result<()> {
52 Pin::new(&mut self.0).start_send(item).map_err(Into::into)
53 }
54
55 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
56 Pin::new(&mut self.0).poll_flush(cx).map_err(Into::into)
57 }
58
59 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
60 Pin::new(&mut self.0).poll_close(cx).map_err(Into::into)
61 }
62}
63
64impl BoxedStream {
65 pub fn new<S>(stream: S) -> Self
72 where
73 S: Stream<Item = Vec<u8>> + Sink<Vec<u8>> + Unpin + Send + Sync + 'static,
74 <S as Sink<Vec<u8>>>::Error: Into<io::Error>,
75 {
76 BoxedStream {
77 inner: Box::new(StreamSinkWrapper(stream)),
78 }
79 }
80}
81
82impl Stream for BoxedStream {
83 type Item = Vec<u8>;
84
85 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86 self.inner.poll_next(cx)
87 }
88}
89
90impl Sink<Vec<u8>> for BoxedStream {
91 type Error = io::Error;
92
93 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 self.inner.poll_ready(cx)
95 }
96
97 fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
98 self.inner.start_send(item)
99 }
100
101 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102 self.inner.poll_flush(cx)
103 }
104
105 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 self.inner.poll_close(cx)
107 }
108}