tetsy_libp2p_mplex/
lib.rs1mod codec;
22mod config;
23mod io;
24
25pub use config::{MplexConfig, MaxBufferBehaviour};
26
27use codec::LocalStreamId;
28use std::{cmp, iter, task::Context, task::Poll};
29use bytes::Bytes;
30use tetsy_libp2p_core::{
31    StreamMuxer,
32    muxing::StreamMuxerEvent,
33    upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
34};
35use parking_lot::Mutex;
36use futures::{prelude::*, future, ready};
37
38impl UpgradeInfo for MplexConfig {
39    type Info = &'static [u8];
40    type InfoIter = iter::Once<Self::Info>;
41
42    fn protocol_info(&self) -> Self::InfoIter {
43        iter::once(b"/mplex/6.7.0")
44    }
45}
46
47impl<C> InboundUpgrade<C> for MplexConfig
48where
49    C: AsyncRead + AsyncWrite + Unpin,
50{
51    type Output = Multiplex<C>;
52    type Error = io::Error;
53    type Future = future::Ready<Result<Self::Output, io::Error>>;
54
55    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
56        future::ready(Ok(Multiplex {
57            io: Mutex::new(io::Multiplexed::new(socket, self)),
58        }))
59    }
60}
61
62impl<C> OutboundUpgrade<C> for MplexConfig
63where
64    C: AsyncRead + AsyncWrite + Unpin,
65{
66    type Output = Multiplex<C>;
67    type Error = io::Error;
68    type Future = future::Ready<Result<Self::Output, io::Error>>;
69
70    fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
71        future::ready(Ok(Multiplex {
72            io: Mutex::new(io::Multiplexed::new(socket, self))
73        }))
74    }
75}
76
77pub struct Multiplex<C> {
82    io: Mutex<io::Multiplexed<C>>
83}
84
85impl<C> StreamMuxer for Multiplex<C>
86where
87    C: AsyncRead + AsyncWrite + Unpin
88{
89    type Substream = Substream;
90    type OutboundSubstream = OutboundSubstream;
91    type Error = io::Error;
92
93    fn poll_event(&self, cx: &mut Context<'_>)
94        -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>>
95    {
96        let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
97        let stream = Substream::new(stream_id);
98        Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
99    }
100
101    fn open_outbound(&self) -> Self::OutboundSubstream {
102        OutboundSubstream {}
103    }
104
105    fn poll_outbound(&self, cx: &mut Context<'_>, _: &mut Self::OutboundSubstream)
106        -> Poll<Result<Self::Substream, io::Error>>
107    {
108        let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
109        return Poll::Ready(Ok(Substream::new(stream_id)))
110    }
111
112    fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
113        }
115
116    fn read_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &mut [u8])
117        -> Poll<Result<usize, io::Error>>
118    {
119        loop {
120            if !substream.current_data.is_empty() {
122                let len = cmp::min(substream.current_data.len(), buf.len());
123                buf[..len].copy_from_slice(&substream.current_data.split_to(len));
124                return Poll::Ready(Ok(len));
125            }
126
127            match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
129                Some(data) => { substream.current_data = data; }
130                None => { return Poll::Ready(Ok(0)) }
131            }
132        }
133    }
134
135    fn write_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &[u8])
136        -> Poll<Result<usize, io::Error>>
137    {
138        self.io.lock().poll_write_stream(cx, substream.id, buf)
139    }
140
141    fn flush_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
142        -> Poll<Result<(), io::Error>>
143    {
144        self.io.lock().poll_flush_stream(cx, substream.id)
145    }
146
147    fn shutdown_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
148        -> Poll<Result<(), io::Error>>
149    {
150        self.io.lock().poll_close_stream(cx, substream.id)
151    }
152
153    fn destroy_substream(&self, sub: Self::Substream) {
154        self.io.lock().drop_stream(sub.id);
155    }
156
157    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
158        self.io.lock().poll_close(cx)
159    }
160
161    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
162        self.io.lock().poll_flush(cx)
163    }
164}
165
166pub struct OutboundSubstream {}
168
169pub struct Substream {
171    id: LocalStreamId,
173    current_data: Bytes,
175}
176
177impl Substream {
178    fn new(id: LocalStreamId) -> Self {
179        Self { id, current_data: Bytes::new() }
180    }
181}