mwc_libp2p_core/
muxing.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Muxing is the process of splitting a connection into multiple substreams.
22//!
23//! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer`
24//! has ownership of a connection, lets you open and close substreams, and read/write data
25//! on open substreams.
26//!
27//! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this
28//! >           is managed by the library's internals.
29//!
30//! Each substream of a connection is an isolated stream of data. All the substreams are muxed
31//! together so that the data read from or written to each substream doesn't influence the other
32//! substreams.
33//!
34//! In the context of libp2p, each substream can use a different protocol. Contrary to opening a
35//! connection, opening a substream is almost free in terms of resources. This means that you
36//! shouldn't hesitate to rapidly open and close substreams, and to design protocols that don't
37//! require maintaining long-lived channels of communication.
38//!
39//! > **Example**: The Kademlia protocol opens a new substream for each request it wants to
40//! >              perform. Multiple requests can be performed simultaneously by opening multiple
41//! >              substreams, without having to worry about associating responses with the
42//! >              right request.
43//!
44//! # Implementing a muxing protocol
45//!
46//! In order to implement a muxing protocol, create an object that implements the `UpgradeInfo`,
47//! `InboundUpgrade` and `OutboundUpgrade` traits. See the `upgrade` module for more information.
48//! The `Output` associated type of the `InboundUpgrade` and `OutboundUpgrade` traits should be
49//! identical, and should be an object that implements the `StreamMuxer` trait.
50//!
51//! The upgrade process will take ownership of the connection, which makes it possible for the
52//! implementation of `StreamMuxer` to control everything that happens on the wire.
53
54use fnv::FnvHashMap;
55use futures::{future, prelude::*, task::Context, task::Poll};
56use multiaddr::Multiaddr;
57use parking_lot::Mutex;
58use std::{io, ops::Deref, fmt, pin::Pin, sync::atomic::{AtomicUsize, Ordering}};
59
60pub use self::singleton::SingletonMuxer;
61
62mod singleton;
63
64/// Implemented on objects that can open and manage substreams.
65///
66/// The state of a muxer, as exposed by this API, is the following:
67///
68/// - A connection to the remote. The `poll_event`, `flush_all` and `close` methods operate
69///   on this.
70/// - A list of substreams that are open. The `poll_outbound`, `read_substream`, `write_substream`,
71///   `flush_substream`, `shutdown_substream` and `destroy_substream` methods allow controlling
72///   these entries.
73/// - A list of outbound substreams being opened. The `open_outbound`, `poll_outbound` and
74///   `destroy_outbound` methods allow controlling these entries.
75///
76pub trait StreamMuxer {
77    /// Type of the object that represents the raw substream where data can be read and written.
78    type Substream;
79
80    /// Future that will be resolved when the outgoing substream is open.
81    type OutboundSubstream;
82
83    /// Error type of the muxer
84    type Error: Into<io::Error>;
85
86    /// Polls for a connection-wide event.
87    ///
88    /// This function behaves the same as a `Stream`.
89    ///
90    /// If `Pending` is returned, then the current task will be notified once the muxer
91    /// is ready to be polled, similar to the API of `Stream::poll()`.
92    /// Only the latest task that was used to call this method may be notified.
93    ///
94    /// It is permissible and common to use this method to perform background
95    /// work, such as processing incoming packets and polling timers.
96    ///
97    /// An error can be generated if the connection has been closed.
98    fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
99
100    /// Opens a new outgoing substream, and produces the equivalent to a future that will be
101    /// resolved when it becomes available.
102    ///
103    /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced
104    /// through the methods on the `StreamMuxer` trait.
105    fn open_outbound(&self) -> Self::OutboundSubstream;
106
107    /// Polls the outbound substream.
108    ///
109    /// If `Pending` is returned, then the current task will be notified once the substream
110    /// is ready to be polled, similar to the API of `Future::poll()`.
111    /// However, for each individual outbound substream, only the latest task that was used to
112    /// call this method may be notified.
113    ///
114    /// May panic or produce an undefined result if an earlier polling of the same substream
115    /// returned `Ready` or `Err`.
116    fn poll_outbound(&self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream)
117        -> Poll<Result<Self::Substream, Self::Error>>;
118
119    /// Destroys an outbound substream future. Use this after the outbound substream has finished,
120    /// or if you want to interrupt it.
121    fn destroy_outbound(&self, s: Self::OutboundSubstream);
122
123    /// Reads data from a substream. The behaviour is the same as `futures::AsyncRead::poll_read`.
124    ///
125    /// If `Pending` is returned, then the current task will be notified once the substream
126    /// is ready to be read. However, for each individual substream, only the latest task that
127    /// was used to call this method may be notified.
128    ///
129    /// If `Async::Ready(0)` is returned, the substream has been closed by the remote and should
130    /// no longer be read afterwards.
131    ///
132    /// An error can be generated if the connection has been closed, or if a protocol misbehaviour
133    /// happened.
134    fn read_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8])
135        -> Poll<Result<usize, Self::Error>>;
136
137    /// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`.
138    ///
139    /// If `Pending` is returned, then the current task will be notified once the substream
140    /// is ready to be read. For each individual substream, only the latest task that was used to
141    /// call this method may be notified.
142    ///
143    /// Calling `write_substream` does not guarantee that data will arrive to the remote. To
144    /// ensure that, you should call `flush_substream`.
145    ///
146    /// It is incorrect to call this method on a substream if you called `shutdown_substream` on
147    /// this substream earlier.
148    fn write_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8])
149        -> Poll<Result<usize, Self::Error>>;
150
151    /// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`.
152    ///
153    /// After this method has been called, data written earlier on the substream is guaranteed to
154    /// be received by the remote.
155    ///
156    /// If `Pending` is returned, then the current task will be notified once the substream
157    /// is ready to be read. For each individual substream, only the latest task that was used to
158    /// call this method may be notified.
159    ///
160    /// > **Note**: This method may be implemented as a call to `flush_all`.
161    fn flush_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream)
162        -> Poll<Result<(), Self::Error>>;
163
164    /// Attempts to shut down the writing side of a substream. The behaviour is similar to
165    /// `AsyncWrite::poll_close`.
166    ///
167    /// Contrary to `AsyncWrite::poll_close`, shutting down a substream does not imply
168    /// `flush_substream`. If you want to make sure that the remote is immediately informed about
169    /// the shutdown, use `flush_substream` or `flush_all`.
170    ///
171    /// After this method has been called, you should no longer attempt to write to this substream.
172    ///
173    /// An error can be generated if the connection has been closed, or if a protocol misbehaviour
174    /// happened.
175    fn shutdown_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream)
176        -> Poll<Result<(), Self::Error>>;
177
178    /// Destroys a substream.
179    fn destroy_substream(&self, s: Self::Substream);
180
181    /// Returns `true` if the remote has shown any sign of activity after the muxer has been open.
182    ///
183    /// For optimisation purposes, the connection handshake of libp2p can be very optimistic and is
184    /// allowed to assume that the handshake has succeeded when it didn't in fact succeed. This
185    /// method can be called in order to determine whether the remote has accepted our handshake or
186    /// has potentially not received it yet.
187    #[deprecated(note = "This method is unused and will be removed in the future")]
188    fn is_remote_acknowledged(&self) -> bool {
189        true
190    }
191
192    /// Closes this `StreamMuxer`.
193    ///
194    /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All
195    /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns,
196    /// or polls must generate an error or be ignored.
197    ///
198    /// Calling this method implies `flush_all`.
199    ///
200    /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
201    /// >           that the remote is properly informed of the shutdown. However, apart from
202    /// >           properly informing the remote, there is no difference between this and
203    /// >           immediately dropping the muxer.
204    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
205
206    /// Flush this `StreamMuxer`.
207    ///
208    /// This drains any write buffers of substreams and delivers any pending shutdown notifications
209    /// due to `shutdown_substream` or `close`. One may thus shutdown groups of substreams
210    /// followed by a final `flush_all` instead of having to do `flush_substream` for each.
211    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
212}
213
214/// Event about a connection, reported by an implementation of [`StreamMuxer`].
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum StreamMuxerEvent<T> {
217    /// Remote has opened a new substream. Contains the substream in question.
218    InboundSubstream(T),
219
220    /// Address to the remote has changed. The previous one is now obsolete.
221    ///
222    /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
223    /// >           can change their IP address while retaining the same QUIC connection.
224    AddressChange(Multiaddr),
225}
226
227impl<T> StreamMuxerEvent<T> {
228    /// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise
229    /// returns `None`.
230    pub fn into_inbound_substream(self) -> Option<T> {
231        if let StreamMuxerEvent::InboundSubstream(s) = self {
232            Some(s)
233        } else {
234            None
235        }
236    }
237}
238
239/// Polls for an event from the muxer and, if an inbound substream, wraps this substream in an
240/// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
241pub fn event_from_ref_and_wrap<P>(
242    muxer: P,
243) -> impl Future<Output = Result<StreamMuxerEvent<SubstreamRef<P>>, <P::Target as StreamMuxer>::Error>>
244where
245    P: Deref + Clone,
246    P::Target: StreamMuxer,
247{
248    let muxer2 = muxer.clone();
249    future::poll_fn(move |cx| muxer.poll_event(cx))
250        .map_ok(|event| {
251            match event {
252                StreamMuxerEvent::InboundSubstream(substream) =>
253                    StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream)),
254                StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
255            }
256        })
257}
258
259/// Same as `outbound_from_ref`, but wraps the output in an object that
260/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
261pub fn outbound_from_ref_and_wrap<P>(muxer: P) -> OutboundSubstreamRefWrapFuture<P>
262where
263    P: Deref + Clone,
264    P::Target: StreamMuxer,
265{
266    let inner = outbound_from_ref(muxer);
267    OutboundSubstreamRefWrapFuture { inner }
268}
269
270/// Future returned by `outbound_from_ref_and_wrap`.
271pub struct OutboundSubstreamRefWrapFuture<P>
272where
273    P: Deref + Clone,
274    P::Target: StreamMuxer,
275{
276    inner: OutboundSubstreamRefFuture<P>,
277}
278
279impl<P> Future for OutboundSubstreamRefWrapFuture<P>
280where
281    P: Deref + Clone,
282    P::Target: StreamMuxer,
283{
284    type Output = Result<SubstreamRef<P>, <P::Target as StreamMuxer>::Error>;
285
286    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287        match Future::poll(Pin::new(&mut self.inner), cx) {
288            Poll::Ready(Ok(substream)) => {
289                let out = substream_from_ref(self.inner.muxer.clone(), substream);
290                Poll::Ready(Ok(out))
291            }
292            Poll::Pending => Poll::Pending,
293            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
294        }
295    }
296}
297
298/// Builds a new future for an outbound substream, where the muxer is a reference.
299pub fn outbound_from_ref<P>(muxer: P) -> OutboundSubstreamRefFuture<P>
300where
301    P: Deref,
302    P::Target: StreamMuxer,
303{
304    let outbound = muxer.open_outbound();
305    OutboundSubstreamRefFuture {
306        muxer,
307        outbound: Some(outbound),
308    }
309}
310
311/// Future returned by `outbound_from_ref`.
312pub struct OutboundSubstreamRefFuture<P>
313where
314    P: Deref,
315    P::Target: StreamMuxer,
316{
317    muxer: P,
318    outbound: Option<<P::Target as StreamMuxer>::OutboundSubstream>,
319}
320
321impl<P> Unpin for OutboundSubstreamRefFuture<P>
322where
323    P: Deref,
324    P::Target: StreamMuxer,
325{
326}
327
328impl<P> Future for OutboundSubstreamRefFuture<P>
329where
330    P: Deref,
331    P::Target: StreamMuxer,
332{
333    type Output = Result<<P::Target as StreamMuxer>::Substream, <P::Target as StreamMuxer>::Error>;
334
335    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
336        // We use a `this` because the compiler isn't smart enough to allow mutably borrowing
337        // multiple different fields from the `Pin` at the same time.
338        let this = &mut *self;
339        this.muxer.poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty"))
340    }
341}
342
343impl<P> Drop for OutboundSubstreamRefFuture<P>
344where
345    P: Deref,
346    P::Target: StreamMuxer,
347{
348    fn drop(&mut self) {
349        self.muxer
350            .destroy_outbound(self.outbound.take().expect("outbound was empty"))
351    }
352}
353
354/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the
355/// muxer and a substream.
356pub fn substream_from_ref<P>(
357    muxer: P,
358    substream: <P::Target as StreamMuxer>::Substream,
359) -> SubstreamRef<P>
360where
361    P: Deref,
362    P::Target: StreamMuxer,
363{
364    SubstreamRef {
365        muxer,
366        substream: Some(substream),
367        shutdown_state: ShutdownState::Shutdown,
368    }
369}
370
371/// Stream returned by `substream_from_ref`.
372pub struct SubstreamRef<P>
373where
374    P: Deref,
375    P::Target: StreamMuxer,
376{
377    muxer: P,
378    substream: Option<<P::Target as StreamMuxer>::Substream>,
379    shutdown_state: ShutdownState,
380}
381
382enum ShutdownState {
383    Shutdown,
384    Flush,
385    Done,
386}
387
388impl<P> fmt::Debug for SubstreamRef<P>
389where
390    P: Deref,
391    P::Target: StreamMuxer,
392    <P::Target as StreamMuxer>::Substream: fmt::Debug,
393{
394    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
395        write!(f, "Substream({:?})", self.substream)
396    }
397}
398
399impl<P> Unpin for SubstreamRef<P>
400where
401    P: Deref,
402    P::Target: StreamMuxer,
403{
404}
405
406impl<P> AsyncRead for SubstreamRef<P>
407where
408    P: Deref,
409    P::Target: StreamMuxer,
410{
411    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
412        // We use a `this` because the compiler isn't smart enough to allow mutably borrowing
413        // multiple different fields from the `Pin` at the same time.
414        let this = &mut *self;
415
416        let s = this.substream.as_mut().expect("substream was empty");
417        this.muxer.read_substream(cx, s, buf).map_err(|e| e.into())
418    }
419}
420
421impl<P> AsyncWrite for SubstreamRef<P>
422where
423    P: Deref,
424    P::Target: StreamMuxer,
425{
426    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
427        // We use a `this` because the compiler isn't smart enough to allow mutably borrowing
428        // multiple different fields from the `Pin` at the same time.
429        let this = &mut *self;
430
431        let s = this.substream.as_mut().expect("substream was empty");
432        this.muxer.write_substream(cx, s, buf).map_err(|e| e.into())
433    }
434
435    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
436        // We use a `this` because the compiler isn't smart enough to allow mutably borrowing
437        // multiple different fields from the `Pin` at the same time.
438        let this = &mut *self;
439
440        let s = this.substream.as_mut().expect("substream was empty");
441        loop {
442            match this.shutdown_state {
443                ShutdownState::Shutdown => {
444                    match this.muxer.shutdown_substream(cx, s) {
445                        Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush,
446                        Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
447                        Poll::Pending => return Poll::Pending,
448                    }
449                }
450                ShutdownState::Flush => {
451                    match this.muxer.flush_substream(cx, s) {
452                        Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done,
453                        Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
454                        Poll::Pending => return Poll::Pending,
455                    }
456                }
457                ShutdownState::Done => {
458                    return Poll::Ready(Ok(()));
459                }
460            }
461        }
462    }
463
464    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
465        // We use a `this` because the compiler isn't smart enough to allow mutably borrowing
466        // multiple different fields from the `Pin` at the same time.
467        let this = &mut *self;
468
469        let s = this.substream.as_mut().expect("substream was empty");
470        this.muxer.flush_substream(cx, s).map_err(|e| e.into())
471    }
472}
473
474impl<P> Drop for SubstreamRef<P>
475where
476    P: Deref,
477    P::Target: StreamMuxer,
478{
479    fn drop(&mut self) {
480        self.muxer.destroy_substream(self.substream.take().expect("substream was empty"))
481    }
482}
483
484/// Abstract `StreamMuxer`.
485pub struct StreamMuxerBox {
486    inner: Box<dyn StreamMuxer<Substream = usize, OutboundSubstream = usize, Error = io::Error> + Send + Sync>,
487}
488
489impl StreamMuxerBox {
490    /// Turns a stream muxer into a `StreamMuxerBox`.
491    pub fn new<T>(muxer: T) -> StreamMuxerBox
492    where
493        T: StreamMuxer + Send + Sync + 'static,
494        T::OutboundSubstream: Send,
495        T::Substream: Send,
496    {
497        let wrap = Wrap {
498            inner: muxer,
499            substreams: Mutex::new(Default::default()),
500            next_substream: AtomicUsize::new(0),
501            outbound: Mutex::new(Default::default()),
502            next_outbound: AtomicUsize::new(0),
503        };
504
505        StreamMuxerBox {
506            inner: Box::new(wrap),
507        }
508    }
509}
510
511impl StreamMuxer for StreamMuxerBox {
512    type Substream = usize; // TODO: use a newtype
513    type OutboundSubstream = usize; // TODO: use a newtype
514    type Error = io::Error;
515
516    #[inline]
517    fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
518        self.inner.poll_event(cx)
519    }
520
521    #[inline]
522    fn open_outbound(&self) -> Self::OutboundSubstream {
523        self.inner.open_outbound()
524    }
525
526    #[inline]
527    fn poll_outbound(&self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, Self::Error>> {
528        self.inner.poll_outbound(cx, s)
529    }
530
531    #[inline]
532    fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
533        self.inner.destroy_outbound(substream)
534    }
535
536    #[inline]
537    fn read_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
538        self.inner.read_substream(cx, s, buf)
539    }
540
541    #[inline]
542    fn write_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, Self::Error>> {
543        self.inner.write_substream(cx, s, buf)
544    }
545
546    #[inline]
547    fn flush_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
548        self.inner.flush_substream(cx, s)
549    }
550
551    #[inline]
552    fn shutdown_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
553        self.inner.shutdown_substream(cx, s)
554    }
555
556    #[inline]
557    fn destroy_substream(&self, s: Self::Substream) {
558        self.inner.destroy_substream(s)
559    }
560
561    #[inline]
562    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
563        self.inner.close(cx)
564    }
565
566    #[inline]
567    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
568        self.inner.flush_all(cx)
569    }
570}
571
572struct Wrap<T> where T: StreamMuxer {
573    inner: T,
574    substreams: Mutex<FnvHashMap<usize, T::Substream>>,
575    next_substream: AtomicUsize,
576    outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
577    next_outbound: AtomicUsize,
578}
579
580impl<T> StreamMuxer for Wrap<T>
581where
582    T: StreamMuxer,
583{
584    type Substream = usize; // TODO: use a newtype
585    type OutboundSubstream = usize; // TODO: use a newtype
586    type Error = io::Error;
587
588    #[inline]
589    fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
590        let substream = match self.inner.poll_event(cx) {
591            Poll::Pending => return Poll::Pending,
592            Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) =>
593                return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))),
594            Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
595            Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
596        };
597
598        let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
599        self.substreams.lock().insert(id, substream);
600        Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id)))
601    }
602
603    #[inline]
604    fn open_outbound(&self) -> Self::OutboundSubstream {
605        let outbound = self.inner.open_outbound();
606        let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
607        self.outbound.lock().insert(id, outbound);
608        id
609    }
610
611    #[inline]
612    fn poll_outbound(
613        &self,
614        cx: &mut Context<'_>,
615        substream: &mut Self::OutboundSubstream,
616    ) -> Poll<Result<Self::Substream, Self::Error>> {
617        let mut list = self.outbound.lock();
618        let substream = match self.inner.poll_outbound(cx, list.get_mut(substream).unwrap()) {
619            Poll::Pending => return Poll::Pending,
620            Poll::Ready(Ok(s)) => s,
621            Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
622        };
623        let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
624        self.substreams.lock().insert(id, substream);
625        Poll::Ready(Ok(id))
626    }
627
628    #[inline]
629    fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
630        let mut list = self.outbound.lock();
631        self.inner.destroy_outbound(list.remove(&substream).unwrap())
632    }
633
634    #[inline]
635    fn read_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
636        let mut list = self.substreams.lock();
637        self.inner.read_substream(cx, list.get_mut(s).unwrap(), buf).map_err(|e| e.into())
638    }
639
640    #[inline]
641    fn write_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, Self::Error>> {
642        let mut list = self.substreams.lock();
643        self.inner.write_substream(cx, list.get_mut(s).unwrap(), buf).map_err(|e| e.into())
644    }
645
646    #[inline]
647    fn flush_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
648        let mut list = self.substreams.lock();
649        self.inner.flush_substream(cx, list.get_mut(s).unwrap()).map_err(|e| e.into())
650    }
651
652    #[inline]
653    fn shutdown_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
654        let mut list = self.substreams.lock();
655        self.inner.shutdown_substream(cx, list.get_mut(s).unwrap()).map_err(|e| e.into())
656    }
657
658    #[inline]
659    fn destroy_substream(&self, substream: Self::Substream) {
660        let mut list = self.substreams.lock();
661        self.inner.destroy_substream(list.remove(&substream).unwrap())
662    }
663
664    #[inline]
665    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
666        self.inner.close(cx).map_err(|e| e.into())
667    }
668
669    #[inline]
670    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
671        self.inner.flush_all(cx).map_err(|e| e.into())
672    }
673}