tetsy_libp2p_mplex/
lib.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod codec;
22mod config;
23mod io;
24
25pub use config::{MplexConfig, MaxBufferBehaviour};
26
27use codec::LocalStreamId;
28use std::{cmp, iter, task::Context, task::Poll};
29use bytes::Bytes;
30use tetsy_libp2p_core::{
31    StreamMuxer,
32    muxing::StreamMuxerEvent,
33    upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
34};
35use parking_lot::Mutex;
36use futures::{prelude::*, future, ready};
37
38impl UpgradeInfo for MplexConfig {
39    type Info = &'static [u8];
40    type InfoIter = iter::Once<Self::Info>;
41
42    fn protocol_info(&self) -> Self::InfoIter {
43        iter::once(b"/mplex/6.7.0")
44    }
45}
46
47impl<C> InboundUpgrade<C> for MplexConfig
48where
49    C: AsyncRead + AsyncWrite + Unpin,
50{
51    type Output = Multiplex<C>;
52    type Error = io::Error;
53    type Future = future::Ready<Result<Self::Output, io::Error>>;
54
55    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
56        future::ready(Ok(Multiplex {
57            io: Mutex::new(io::Multiplexed::new(socket, self)),
58        }))
59    }
60}
61
62impl<C> OutboundUpgrade<C> for MplexConfig
63where
64    C: AsyncRead + AsyncWrite + Unpin,
65{
66    type Output = Multiplex<C>;
67    type Error = io::Error;
68    type Future = future::Ready<Result<Self::Output, io::Error>>;
69
70    fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
71        future::ready(Ok(Multiplex {
72            io: Mutex::new(io::Multiplexed::new(socket, self))
73        }))
74    }
75}
76
77/// Multiplexer. Implements the `StreamMuxer` trait.
78///
79/// This implementation isn't capable of detecting when the underlying socket changes its address,
80/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
81pub struct Multiplex<C> {
82    io: Mutex<io::Multiplexed<C>>
83}
84
85impl<C> StreamMuxer for Multiplex<C>
86where
87    C: AsyncRead + AsyncWrite + Unpin
88{
89    type Substream = Substream;
90    type OutboundSubstream = OutboundSubstream;
91    type Error = io::Error;
92
93    fn poll_event(&self, cx: &mut Context<'_>)
94        -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>>
95    {
96        let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
97        let stream = Substream::new(stream_id);
98        Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
99    }
100
101    fn open_outbound(&self) -> Self::OutboundSubstream {
102        OutboundSubstream {}
103    }
104
105    fn poll_outbound(&self, cx: &mut Context<'_>, _: &mut Self::OutboundSubstream)
106        -> Poll<Result<Self::Substream, io::Error>>
107    {
108        let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
109        return Poll::Ready(Ok(Substream::new(stream_id)))
110    }
111
112    fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
113        // Nothing to do, since `open_outbound` creates no new local state.
114    }
115
116    fn read_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &mut [u8])
117        -> Poll<Result<usize, io::Error>>
118    {
119        loop {
120            // Try to read from the current (i.e. last received) frame.
121            if !substream.current_data.is_empty() {
122                let len = cmp::min(substream.current_data.len(), buf.len());
123                buf[..len].copy_from_slice(&substream.current_data.split_to(len));
124                return Poll::Ready(Ok(len));
125            }
126
127            // Read the next data frame from the multiplexed stream.
128            match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
129                Some(data) => { substream.current_data = data; }
130                None => { return Poll::Ready(Ok(0)) }
131            }
132        }
133    }
134
135    fn write_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &[u8])
136        -> Poll<Result<usize, io::Error>>
137    {
138        self.io.lock().poll_write_stream(cx, substream.id, buf)
139    }
140
141    fn flush_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
142        -> Poll<Result<(), io::Error>>
143    {
144        self.io.lock().poll_flush_stream(cx, substream.id)
145    }
146
147    fn shutdown_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
148        -> Poll<Result<(), io::Error>>
149    {
150        self.io.lock().poll_close_stream(cx, substream.id)
151    }
152
153    fn destroy_substream(&self, sub: Self::Substream) {
154        self.io.lock().drop_stream(sub.id);
155    }
156
157    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
158        self.io.lock().poll_close(cx)
159    }
160
161    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
162        self.io.lock().poll_flush(cx)
163    }
164}
165
166/// Active attempt to open an outbound substream.
167pub struct OutboundSubstream {}
168
169/// Active substream to the remote.
170pub struct Substream {
171    /// The unique, local identifier of the substream.
172    id: LocalStreamId,
173    /// The current data frame the substream is reading from.
174    current_data: Bytes,
175}
176
177impl Substream {
178    fn new(id: LocalStreamId) -> Self {
179        Self { id, current_data: Bytes::new() }
180    }
181}