fluence-fork-libp2p-mplex 0.27.2

Mplex multiplexing protocol for libp2p
Documentation
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;
use crate::{MplexConfig, MaxBufferBehaviour};
use crate::codec::{Codec, Frame, LocalStreamId, RemoteStreamId};
use log::{debug, trace};
use futures::{prelude::*, ready, stream::Fuse};
use futures::task::{AtomicWaker, ArcWake, waker_ref, WakerRef};
use asynchronous_codec::Framed;
use nohash_hasher::{IntMap, IntSet};
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::{cmp, fmt, io, mem, sync::Arc, task::{Context, Poll, Waker}};

pub use std::io::{Result, Error, ErrorKind};

/// A connection identifier.
///
/// Randomly generated and mainly intended to improve log output
/// by scoping substream IDs to a connection.
#[derive(Clone, Copy)]
struct ConnectionId(u64);

impl fmt::Debug for ConnectionId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:16x}", self.0)
    }
}

impl fmt::Display for ConnectionId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:16x}", self.0)
    }
}
/// A multiplexed I/O stream.
pub struct Multiplexed<C> {
    /// A unique ID for the multiplexed stream (i.e. connection).
    id: ConnectionId,
    /// The current operating status of the multiplex stream.
    status: Status,
    /// The underlying multiplexed I/O stream.
    io: Fuse<Framed<C, Codec>>,
    /// The configuration.
    config: MplexConfig,
    /// The buffer of new inbound substreams that have not yet
    /// been drained by `poll_next_stream`. This buffer is
    /// effectively bounded by `max_substreams - substreams.len()`.
    open_buffer: VecDeque<LocalStreamId>,
    /// Whether a flush is pending due to one or more new outbound
    /// `Open` frames, before reading frames can proceed.
    pending_flush_open: IntSet<LocalStreamId>,
    /// The stream that currently blocks reading for all streams
    /// due to a full buffer, if any. Only applicable for use
    /// with [`MaxBufferBehaviour::Block`].
    blocking_stream: Option<LocalStreamId>,
    /// Pending frames to send at the next opportunity.
    ///
    /// An opportunity for sending pending frames is every flush
    /// or read operation. In the former case, sending of all
    /// pending frames must complete before the flush can complete.
    /// In the latter case, the read operation can proceed even
    /// if some or all of the pending frames cannot be sent.
    pending_frames: VecDeque<Frame<LocalStreamId>>,
    /// The managed substreams.
    substreams: IntMap<LocalStreamId, SubstreamState>,
    /// The ID for the next outbound substream.
    next_outbound_stream_id: LocalStreamId,
    /// Registry of wakers for pending tasks interested in reading.
    notifier_read: Arc<NotifierRead>,
    /// Registry of wakers for pending tasks interested in writing.
    notifier_write: Arc<NotifierWrite>,
    /// Registry of wakers for pending tasks interested in opening
    /// an outbound substream, when the configured limit is reached.
    ///
    /// As soon as the number of substreams drops below this limit,
    /// these tasks are woken.
    notifier_open: NotifierOpen,
}

/// The operation status of a `Multiplexed` I/O stream.
#[derive(Debug)]
enum Status {
    /// The stream is considered open and healthy.
    Open,
    /// The stream has been actively closed.
    Closed,
    /// The stream has encountered a fatal error.
    Err(io::Error),
}

impl<C> Multiplexed<C>
where
    C: AsyncRead + AsyncWrite + Unpin
{
    /// Creates a new multiplexed I/O stream.
    pub fn new(io: C, config: MplexConfig) -> Self {
        let id = ConnectionId(rand::random());
        debug!("New multiplexed connection: {}", id);
        Multiplexed {
            id,
            config,
            status: Status::Open,
            io: Framed::new(io, Codec::new()).fuse(),
            open_buffer: Default::default(),
            substreams: Default::default(),
            pending_flush_open: Default::default(),
            pending_frames: Default::default(),
            blocking_stream: None,
            next_outbound_stream_id: LocalStreamId::dialer(0),
            notifier_read: Arc::new(NotifierRead {
                read_stream: Mutex::new(Default::default()),
                next_stream: AtomicWaker::new(),
            }),
            notifier_write: Arc::new(NotifierWrite {
                pending: Mutex::new(Default::default()),
            }),
            notifier_open: NotifierOpen {
                pending: Default::default()
            }
        }
    }

    /// Flushes the underlying I/O stream.
    pub fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match &self.status {
            Status::Closed => return Poll::Ready(Ok(())),
            Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
            Status::Open => {}
        }

        // Send any pending frames.
        ready!(self.send_pending_frames(cx))?;

        // Flush the underlying I/O stream.
        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
        match ready!(self.io.poll_flush_unpin(&mut Context::from_waker(&waker))) {
            Err(e) => Poll::Ready(self.on_error(e)),
            Ok(()) => {
                self.pending_flush_open = Default::default();
                Poll::Ready(Ok(()))
            }
        }
    }

    /// Closes the underlying I/O stream.
    ///
    /// > **Note**: No `Close` or `Reset` frames are sent on open substreams
    /// > before closing the underlying connection. However, the connection
    /// > close implies a flush of any frames already sent.
    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match &self.status {
            Status::Closed => return Poll::Ready(Ok(())),
            Status::Err(e) => return Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))),
            Status::Open => {}
        }

        // Note: We do not make the effort to send pending `Reset` frames
        // here, we only close (and thus flush) the underlying I/O stream.

        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
        match self.io.poll_close_unpin(&mut Context::from_waker(&waker)) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Err(e)) => Poll::Ready(self.on_error(e)),
            Poll::Ready(Ok(())) => {
                self.pending_frames = VecDeque::new();
                // We do not support read-after-close on the underlying
                // I/O stream, hence clearing the buffer and substreams.
                self.open_buffer = Default::default();
                self.substreams = Default::default();
                self.status = Status::Closed;
                Poll::Ready(Ok(()))
            }
        }
    }

    /// Waits for a new inbound substream, returning the corresponding `LocalStreamId`.
    ///
    /// If the number of already used substreams (i.e. substreams that have not
    /// yet been dropped via `drop_substream`) reaches the configured
    /// `max_substreams`, any further inbound substreams are immediately reset
    /// until existing substreams are dropped.
    ///
    /// Data frames read for existing substreams in the context of this
    /// method call are buffered and tasks interested in reading from
    /// these substreams are woken. If a substream buffer is full and
    /// [`MaxBufferBehaviour::Block`] is used, this method is blocked
    /// (i.e. `Pending`) on some task reading from the substream whose
    /// buffer is full.
    pub fn poll_next_stream(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<LocalStreamId>> {
        self.guard_open()?;

        // Try to read from the buffer first.
        if let Some(stream_id) = self.open_buffer.pop_back() {
            return Poll::Ready(Ok(stream_id));
        }

        debug_assert!(self.open_buffer.is_empty());
        let mut num_buffered = 0;

        loop {
            // Whenever we may have completely filled a substream
            // buffer while waiting for the next inbound stream,
            // yield to give the current task a chance to read
            // from the respective substreams.
            if num_buffered == self.config.max_buffer_len {
                cx.waker().clone().wake();
                return Poll::Pending
            }

            // Wait for the next inbound `Open` frame.
            match ready!(self.poll_read_frame(cx, None))? {
                Frame::Open { stream_id } => {
                    if let Some(id) = self.on_open(stream_id)? {
                        return Poll::Ready(Ok(id))
                    }
                }
                Frame::Data { stream_id, data } => {
                    self.buffer(stream_id.into_local(), data)?;
                    num_buffered += 1;
                }
                Frame::Close { stream_id } => {
                    self.on_close(stream_id.into_local())?;
                }
                Frame::Reset { stream_id } => {
                    self.on_reset(stream_id.into_local())
                }
            }
        }
    }

    /// Creates a new (outbound) substream, returning the allocated stream ID.
    pub fn poll_open_stream(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<LocalStreamId>> {
        self.guard_open()?;

        // Check the stream limits.
        if self.substreams.len() >= self.config.max_substreams {
            debug!("{}: Maximum number of substreams reached ({})",
                self.id, self.config.max_substreams);
            self.notifier_open.register(cx.waker());
            return Poll::Pending
        }

        // Send the `Open` frame.
        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
        match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
            Ok(()) => {
                let stream_id = self.next_outbound_stream_id();
                let frame = Frame::Open { stream_id };
                match self.io.start_send_unpin(frame) {
                    Ok(()) => {
                        self.substreams.insert(stream_id, SubstreamState::Open {
                            buf: Default::default()
                        });
                        debug!("{}: New outbound substream: {} (total {})",
                            self.id, stream_id, self.substreams.len());
                        // The flush is delayed and the `Open` frame may be sent
                        // together with other frames in the same transport packet.
                        self.pending_flush_open.insert(stream_id);
                        Poll::Ready(Ok(stream_id))
                    }
                    Err(e) => Poll::Ready(self.on_error(e)),
                }
            },
            Err(e) => Poll::Ready(self.on_error(e))
        }
    }

    /// Immediately drops a substream.
    ///
    /// All locally allocated resources for the dropped substream
    /// are freed and the substream becomes unavailable for both
    /// reading and writing immediately. The remote is informed
    /// based on the current state of the substream:
    ///
    /// * If the substream was open, a `Reset` frame is sent at
    ///   the next opportunity.
    /// * If the substream was half-closed, i.e. a `Close` frame
    ///   has already been sent, nothing further happens.
    /// * If the substream was half-closed by the remote, i.e.
    ///   a `Close` frame has already been received, a `Close`
    ///   frame is sent at the next opportunity.
    ///
    /// If the multiplexed stream is closed or encountered
    /// an error earlier, or there is no known substream with
    /// the given ID, this is a no-op.
    ///
    /// > **Note**: All substreams obtained via `poll_next_stream`
    /// > or `poll_open_stream` must eventually be "dropped" by
    /// > calling this method when they are no longer used.
    pub fn drop_stream(&mut self, id: LocalStreamId) {
        // Check if the underlying stream is ok.
        match self.status {
            Status::Closed | Status::Err(_) => return,
            Status::Open => {},
        }

        // If there is still a task waker interested in reading from that
        // stream, wake it to avoid leaving it dangling and notice that
        // the stream is gone. In contrast, wakers for write operations
        // are all woken on every new write opportunity.
        self.notifier_read.wake_read_stream(id);

        // Remove the substream, scheduling pending frames as necessary.
        match self.substreams.remove(&id) {
            None => {},
            Some(state) => {
                // If we fell below the substream limit, notify tasks that had
                // interest in opening an outbound substream earlier.
                let below_limit = self.substreams.len() == self.config.max_substreams - 1;
                if below_limit {
                    self.notifier_open.wake_all();
                }
                // Schedule any pending final frames to send, if necessary.
                match state {
                    SubstreamState::Closed { .. } => {}
                    SubstreamState::SendClosed { .. } => {}
                    SubstreamState::Reset { .. } => {}
                    SubstreamState::RecvClosed { .. } => {
                        if self.check_max_pending_frames().is_err() {
                            return
                        }
                        trace!("{}: Pending close for stream {}", self.id, id);
                        self.pending_frames.push_front(Frame::Close { stream_id: id });
                    }
                    SubstreamState::Open { .. } => {
                        if self.check_max_pending_frames().is_err() {
                            return
                        }
                        trace!("{}: Pending reset for stream {}", self.id, id);
                        self.pending_frames.push_front(Frame::Reset { stream_id: id });
                    }
                }
            }
        }
    }

    /// Writes data to a substream.
    pub fn poll_write_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId, buf: &[u8])
        -> Poll<io::Result<usize>>
    {
        self.guard_open()?;

        // Check if the stream is open for writing.
        match self.substreams.get(&id) {
            None | Some(SubstreamState::Reset { .. }) =>
                return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
            Some(SubstreamState::SendClosed { .. }) | Some(SubstreamState::Closed { .. }) =>
                return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
            Some(SubstreamState::Open { .. }) | Some(SubstreamState::RecvClosed { .. }) => {
                // Substream is writeable. Continue.
            }
        }

        // Determine the size of the frame to send.
        let frame_len = cmp::min(buf.len(), self.config.split_send_size);

        // Send the data frame.
        ready!(self.poll_send_frame(cx, || {
            let data = Bytes::copy_from_slice(&buf[.. frame_len]);
            Frame::Data { stream_id: id, data }
        }))?;

        Poll::Ready(Ok(frame_len))
    }

    /// Reads data from a substream.
    ///
    /// Data frames read for substreams other than `id` in the context
    /// of this method call are buffered and tasks interested in reading
    /// from these substreams are woken. If a substream buffer is full
    /// and [`MaxBufferBehaviour::Block`] is used, reading the next data
    /// frame for `id` is blocked on some task reading from the blocking
    /// stream's full buffer first.
    ///
    /// New inbound substreams (i.e. `Open` frames) read in the context of
    /// this method call are buffered up to the configured `max_substreams`
    /// and under consideration of the number of already used substreams,
    /// thereby waking the task that last called `poll_next_stream`, if any.
    /// Inbound substreams received in excess of that limit are immediately reset.
    pub fn poll_read_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId)
        -> Poll<io::Result<Option<Bytes>>>
    {
        self.guard_open()?;

        // Try to read from the buffer first.
        if let Some(state) = self.substreams.get_mut(&id) {
            let buf = state.recv_buf();
            if !buf.is_empty() {
                if self.blocking_stream == Some(id) {
                    // Unblock reading new frames.
                    self.blocking_stream = None;
                    ArcWake::wake_by_ref(&self.notifier_read);
                }
                let data = buf.remove(0);
                return Poll::Ready(Ok(Some(data)))
            }
            // If the stream buffer "spilled" onto the heap, free that memory.
            buf.shrink_to_fit();
        }

        let mut num_buffered = 0;

        loop {
            // Whenever we may have completely filled a substream
            // buffer of another substream while waiting for the
            // next frame for `id`, yield to give the current task
            // a chance to read from the other substream(s).
            if num_buffered == self.config.max_buffer_len {
                cx.waker().clone().wake();
                return Poll::Pending
            }

            // Check if the targeted substream (if any) reached EOF.
            if !self.can_read(&id) {
                // Note: Contrary to what is recommended by the spec, we must
                // return "EOF" also when the stream has been reset by the
                // remote, as the `StreamMuxer::read_substream` contract only
                // permits errors on "terminal" conditions, e.g. if the connection
                // has been closed or on protocol misbehaviour.
                return Poll::Ready(Ok(None))
            }

            // Read the next frame.
            match ready!(self.poll_read_frame(cx, Some(id)))? {
                Frame::Data { data, stream_id } if stream_id.into_local() == id => {
                    return Poll::Ready(Ok(Some(data)))
                },
                Frame::Data { stream_id, data } => {
                    // The data frame is for a different stream than the one
                    // currently being polled, so it needs to be buffered and
                    // the interested tasks notified.
                    self.buffer(stream_id.into_local(), data)?;
                    num_buffered += 1;
                }
                frame @ Frame::Open { .. } => {
                    if let Some(id) = self.on_open(frame.remote_id())? {
                        self.open_buffer.push_front(id);
                        trace!("{}: Buffered new inbound stream {} (total: {})", self.id, id, self.open_buffer.len());
                        self.notifier_read.wake_next_stream();
                    }
                }
                Frame::Close { stream_id } => {
                    let stream_id = stream_id.into_local();
                    self.on_close(stream_id)?;
                    if id == stream_id {
                        return Poll::Ready(Ok(None))
                    }
                }
                Frame::Reset { stream_id } => {
                    let stream_id = stream_id.into_local();
                    self.on_reset(stream_id);
                    if id == stream_id {
                        return Poll::Ready(Ok(None))
                    }
                }
            }
        }
    }

    /// Flushes a substream.
    ///
    /// > **Note**: This is equivalent to `poll_flush()`, i.e. to flushing
    /// > all substreams, except that this operation returns an error if
    /// > the underlying I/O stream is already closed.
    pub fn poll_flush_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId)
        -> Poll<io::Result<()>>
    {
        self.guard_open()?;

        ready!(self.poll_flush(cx))?;
        trace!("{}: Flushed substream {}", self.id, id);

        Poll::Ready(Ok(()))
    }

    /// Closes a stream for writing.
    ///
    /// > **Note**: As opposed to `poll_close()`, a flush it not implied.
    pub fn poll_close_stream(&mut self, cx: &mut Context<'_>, id: LocalStreamId)
        -> Poll<io::Result<()>>
    {
        self.guard_open()?;

        match self.substreams.remove(&id) {
            None => Poll::Ready(Ok(())),
            Some(SubstreamState::SendClosed { buf }) => {
                self.substreams.insert(id, SubstreamState::SendClosed { buf });
                Poll::Ready(Ok(()))
            }
            Some(SubstreamState::Closed { buf }) => {
                self.substreams.insert(id, SubstreamState::Closed { buf });
                Poll::Ready(Ok(()))
            }
            Some(SubstreamState::Reset { buf }) => {
                self.substreams.insert(id, SubstreamState::Reset { buf });
                Poll::Ready(Ok(()))
            }
            Some(SubstreamState::Open { buf }) => {
                if self.poll_send_frame(cx, || Frame::Close { stream_id: id })?.is_pending() {
                    self.substreams.insert(id, SubstreamState::Open { buf });
                    Poll::Pending
                } else {
                    debug!("{}: Closed substream {} (half-close)", self.id, id);
                    self.substreams.insert(id, SubstreamState::SendClosed { buf });
                    Poll::Ready(Ok(()))
                }
            }
            Some(SubstreamState::RecvClosed { buf }) => {
                if self.poll_send_frame(cx, || Frame::Close { stream_id: id })?.is_pending() {
                    self.substreams.insert(id, SubstreamState::RecvClosed { buf });
                    Poll::Pending
                } else {
                    debug!("{}: Closed substream {}", self.id, id);
                    self.substreams.insert(id, SubstreamState::Closed { buf });
                    Poll::Ready(Ok(()))
                }
            }
        }
    }

    /// Sends a (lazily constructed) mplex frame on the underlying I/O stream.
    ///
    /// The frame is only constructed if the underlying sink is ready to
    /// send another frame.
    fn poll_send_frame<F>(&mut self, cx: &mut Context<'_>, frame: F)
        -> Poll<io::Result<()>>
    where
        F: FnOnce() -> Frame<LocalStreamId>
    {
        let waker = NotifierWrite::register(&self.notifier_write, cx.waker());
        match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) {
            Ok(()) => {
                let frame = frame();
                trace!("{}: Sending {:?}", self.id, frame);
                match self.io.start_send_unpin(frame) {
                    Ok(()) => Poll::Ready(Ok(())),
                    Err(e) => Poll::Ready(self.on_error(e))
                }
            },
            Err(e) => Poll::Ready(self.on_error(e))
        }
    }

    /// Reads the next frame from the underlying I/O stream.
    ///
    /// The given `stream_id` identifies the substream in which
    /// the current task is interested and wants to be woken up for,
    /// in case new frames can be read. `None` means interest in
    /// frames for any substream.
    fn poll_read_frame(&mut self, cx: &mut Context<'_>, stream_id: Option<LocalStreamId>)
        -> Poll<io::Result<Frame<RemoteStreamId>>>
    {
        // Try to send pending frames, if there are any, without blocking,
        if let Poll::Ready(Err(e)) = self.send_pending_frames(cx) {
            return Poll::Ready(Err(e))
        }

        // Perform any pending flush before reading.
        if let Some(id) = &stream_id {
            if self.pending_flush_open.contains(id) {
                trace!("{}: Executing pending flush for {}.", self.id, id);
                ready!(self.poll_flush(cx))?;
                self.pending_flush_open = Default::default();
            }
        }

        // Check if there is a blocked stream.
        if let Some(blocked_id) = &self.blocking_stream {
            // We have a blocked stream and cannot continue reading
            // new frames until frames are taken from the blocked stream's
            // buffer.

            // Try to wake a pending reader of the blocked stream.
            if !self.notifier_read.wake_read_stream(*blocked_id) {
                // No task dedicated to the blocked stream woken, so schedule
                // this task again to have a chance at progress.
                trace!("{}: No task to read from blocked stream. Waking current task.", self.id);
                cx.waker().clone().wake();
            } else if let Some(id) = stream_id {
                // We woke some other task, but are still interested in
                // reading `Data` frames from the current stream when unblocked.
                debug_assert!(blocked_id != &id, "Unexpected attempt at reading a new \
                    frame from a substream with a full buffer.");
                let _ = NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id);
            } else {
                // We woke some other task but are still interested in
                // reading new `Open` frames when unblocked.
                let _ = NotifierRead::register_next_stream(&self.notifier_read, cx.waker());
            }

            return Poll::Pending
        }

        // Try to read another frame from the underlying I/O stream.
        let waker = match stream_id {
            Some(id) => NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id),
            None => NotifierRead::register_next_stream(&self.notifier_read, cx.waker())
        };
        match ready!(self.io.poll_next_unpin(&mut Context::from_waker(&waker))) {
            Some(Ok(frame)) => {
                trace!("{}: Received {:?}", self.id, frame);
                Poll::Ready(Ok(frame))
            }
            Some(Err(e)) => Poll::Ready(self.on_error(e)),
            None => Poll::Ready(self.on_error(io::ErrorKind::UnexpectedEof.into()))
        }
    }

    /// Processes an inbound `Open` frame.
    fn on_open(&mut self, id: RemoteStreamId) -> io::Result<Option<LocalStreamId>> {
        let id = id.into_local();

        if self.substreams.contains_key(&id) {
            debug!("{}: Received unexpected `Open` frame for open substream {}", self.id, id);
            return self.on_error(io::Error::new(io::ErrorKind::Other,
                "Protocol error: Received `Open` frame for open substream."))
        }

        if self.substreams.len() >= self.config.max_substreams {
            debug!("{}: Maximum number of substreams exceeded: {}",
                self.id, self.config.max_substreams);
            self.check_max_pending_frames()?;
            debug!("{}: Pending reset for new stream {}", self.id, id);
            self.pending_frames.push_front(Frame::Reset {
                stream_id: id
            });
            return Ok(None)
        }

        self.substreams.insert(id, SubstreamState::Open {
            buf: Default::default()
        });

        debug!("{}: New inbound substream: {} (total {})", self.id, id, self.substreams.len());

        Ok(Some(id))
    }

    /// Processes an inbound `Reset` frame.
    fn on_reset(&mut self, id: LocalStreamId) {
        if let Some(state) = self.substreams.remove(&id) {
            match state {
                SubstreamState::Closed { .. } => {
                    trace!("{}: Ignoring reset for mutually closed substream {}.", self.id, id);
                }
                SubstreamState::Reset { .. } => {
                    trace!("{}: Ignoring redundant reset for already reset substream {}",
                        self.id, id);
                }
                SubstreamState::RecvClosed { buf } |
                SubstreamState::SendClosed { buf } |
                SubstreamState::Open { buf } => {
                    debug!("{}: Substream {} reset by remote.", self.id, id);
                    self.substreams.insert(id, SubstreamState::Reset { buf });
                    // Notify tasks interested in reading from that stream,
                    // so they may read the EOF.
                    NotifierRead::wake_read_stream(&self.notifier_read, id);
                }
            }
        } else {
            trace!("{}: Ignoring `Reset` for unknown substream {}. Possibly dropped earlier.",
                self.id, id);
        }
    }

    /// Processes an inbound `Close` frame.
    fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> {
        if let Some(state) = self.substreams.remove(&id) {
            match state {
                SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => {
                    debug!("{}: Ignoring `Close` frame for closed substream {}",
                        self.id, id);
                    self.substreams.insert(id, state);
                },
                SubstreamState::Reset { buf } => {
                    debug!("{}: Ignoring `Close` frame for already reset substream {}",
                        self.id, id);
                    self.substreams.insert(id, SubstreamState::Reset { buf });
                }
                SubstreamState::SendClosed { buf } => {
                    debug!("{}: Substream {} closed by remote (SendClosed -> Closed).",
                        self.id, id);
                    self.substreams.insert(id, SubstreamState::Closed { buf });
                    // Notify tasks interested in reading, so they may read the EOF.
                    self.notifier_read.wake_read_stream(id);
                },
                SubstreamState::Open { buf } => {
                    debug!("{}: Substream {} closed by remote (Open -> RecvClosed)",
                        self.id, id);
                    self.substreams.insert(id, SubstreamState::RecvClosed { buf });
                    // Notify tasks interested in reading, so they may read the EOF.
                    self.notifier_read.wake_read_stream(id);
                },
            }
        } else {
            trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.",
                self.id, id);
        }

        Ok(())
    }

    /// Generates the next outbound stream ID.
    fn next_outbound_stream_id(&mut self) -> LocalStreamId {
        let id = self.next_outbound_stream_id;
        self.next_outbound_stream_id = self.next_outbound_stream_id.next();
        id
    }

    /// Checks whether a substream is open for reading.
    fn can_read(&self, id: &LocalStreamId) -> bool {
        match self.substreams.get(id) {
            Some(SubstreamState::Open { .. }) | Some(SubstreamState::SendClosed { .. }) => true,
            _ => false,
        }
    }

    /// Sends pending frames, without flushing.
    fn send_pending_frames(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        while let Some(frame) = self.pending_frames.pop_back() {
            if self.poll_send_frame(cx, || {
                frame.clone()
            })?.is_pending() {
                self.pending_frames.push_back(frame);
                return Poll::Pending
            }
        }

        Poll::Ready(Ok(()))
    }

    /// Records a fatal error for the multiplexed I/O stream.
    fn on_error<T>(&mut self, e: io::Error) -> io::Result<T> {
        debug!("{}: Multiplexed connection failed: {:?}", self.id, e);
        self.status = Status::Err(io::Error::new(e.kind(), e.to_string()));
        self.pending_frames =  Default::default();
        self.substreams = Default::default();
        self.open_buffer = Default::default();
        Err(e)
    }

    /// Checks that the multiplexed stream has status `Ok`,
    /// i.e. is not closed and did not encounter a fatal error.
    fn guard_open(&self) -> io::Result<()> {
        match &self.status {
            Status::Closed => Err(io::Error::new(io::ErrorKind::Other, "Connection is closed")),
            Status::Err(e) => Err(io::Error::new(e.kind(), e.to_string())),
            Status::Open => Ok(())
        }
    }

    /// Checks that the permissible limit for pending outgoing frames
    /// has not been reached.
    fn check_max_pending_frames(&mut self) -> io::Result<()> {
        if self.pending_frames.len() >= self.config.max_substreams + EXTRA_PENDING_FRAMES {
            return self.on_error(io::Error::new(io::ErrorKind::Other,
                "Too many pending frames."));
        }
        Ok(())
    }

    /// Buffers a data frame for a particular substream, if possible.
    ///
    /// If the new data frame exceeds the `max_buffer_len` for the buffer
    /// of the substream, the behaviour depends on the configured
    /// [`MaxBufferBehaviour`]. Note that the excess frame is still
    /// buffered in that case (but no further frames will be).
    ///
    /// Fails the entire multiplexed stream if too many pending `Reset`
    /// frames accumulate when using [`MaxBufferBehaviour::ResetStream`].
    fn buffer(&mut self, id: LocalStreamId, data: Bytes) -> io::Result<()> {
        let state = if let Some(state) = self.substreams.get_mut(&id) {
            state
        } else {
            trace!("{}: Dropping data {:?} for unknown substream {}", self.id, data, id);
            return Ok(())
        };

        let buf = if let Some(buf) = state.recv_buf_open() {
            buf
        } else {
            trace!("{}: Dropping data {:?} for closed or reset substream {}", self.id, data, id);
            return Ok(())
        };

        debug_assert!(buf.len() <= self.config.max_buffer_len);
        trace!("{}: Buffering {:?} for stream {} (total: {})", self.id, data, id, buf.len() + 1);
        buf.push(data);
        self.notifier_read.wake_read_stream(id);
        if buf.len() > self.config.max_buffer_len {
            debug!("{}: Frame buffer of stream {} is full.", self.id, id);
            match self.config.max_buffer_behaviour {
                MaxBufferBehaviour::ResetStream => {
                    let buf = buf.clone();
                    self.check_max_pending_frames()?;
                    self.substreams.insert(id, SubstreamState::Reset { buf });
                    debug!("{}: Pending reset for stream {}", self.id, id);
                    self.pending_frames.push_front(Frame::Reset {
                        stream_id: id
                    });
                }
                MaxBufferBehaviour::Block => {
                    self.blocking_stream = Some(id);
                }
            }
        }

        Ok(())
    }
}

type RecvBuf = SmallVec<[Bytes; 10]>;

/// The operating states of a substream.
#[derive(Clone, Debug)]
enum SubstreamState {
    /// An `Open` frame has been received or sent.
    Open { buf: RecvBuf },
    /// A `Close` frame has been sent, but the stream is still open
    /// for reading (half-close).
    SendClosed { buf: RecvBuf },
    /// A `Close` frame has been received but the stream is still
    /// open for writing (remote half-close).
    RecvClosed { buf: RecvBuf },
    /// A `Close` frame has been sent and received but the stream
    /// has not yet been dropped and may still have buffered
    /// frames to read.
    Closed { buf: RecvBuf },
    /// The stream has been reset by the local or remote peer but has
    /// not yet been dropped and may still have buffered frames to read.
    Reset { buf: RecvBuf }
}

impl SubstreamState {
    /// Mutably borrows the substream's receive buffer.
    fn recv_buf(&mut self) -> &mut RecvBuf {
        match self {
            SubstreamState::Open { buf } => buf,
            SubstreamState::SendClosed { buf } => buf,
            SubstreamState::RecvClosed { buf } => buf,
            SubstreamState::Closed { buf } => buf,
            SubstreamState::Reset { buf } => buf,
        }
    }

    /// Mutably borrows the substream's receive buffer if the substream
    /// is still open for reading, `None` otherwise.
    fn recv_buf_open(&mut self) -> Option<&mut RecvBuf> {
        match self {
            SubstreamState::Open { buf } => Some(buf),
            SubstreamState::SendClosed { buf } => Some(buf),
            SubstreamState::RecvClosed { .. } => None,
            SubstreamState::Closed { .. } => None,
            SubstreamState::Reset { .. } => None,
        }
    }
}

struct NotifierRead {
    /// The waker of the currently pending task that last
    /// called `poll_next_stream`, if any.
    next_stream: AtomicWaker,
    /// The wakers of currently pending tasks that last
    /// called `poll_read_stream` for a particular substream.
    read_stream: Mutex<IntMap<LocalStreamId, Waker>>,
}

impl NotifierRead {
    /// Registers a task to be woken up when new `Data` frames for a particular
    /// stream can be read.
    ///
    /// The returned waker should be passed to an I/O read operation
    /// that schedules a wakeup, if the operation is pending.
    #[must_use]
    fn register_read_stream<'a>(self: &'a Arc<Self>, waker: &Waker, id: LocalStreamId)
        -> WakerRef<'a>
    {
        let mut pending = self.read_stream.lock();
        pending.insert(id, waker.clone());
        waker_ref(self)
    }

    /// Registers a task to be woken up when new `Open` frames can be read.
    ///
    /// The returned waker should be passed to an I/O read operation
    /// that schedules a wakeup, if the operation is pending.
    #[must_use]
    fn register_next_stream<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
        self.next_stream.register(waker);
        waker_ref(self)
    }

    /// Wakes the task pending on `poll_read_stream` for the
    /// specified stream, if any.
    fn wake_read_stream(&self, id: LocalStreamId) -> bool {
        let mut pending = self.read_stream.lock();

        if let Some(waker) = pending.remove(&id) {
            waker.wake();
            return true
        }

        false
    }

    /// Wakes the task pending on `poll_next_stream`, if any.
    fn wake_next_stream(&self) {
        self.next_stream.wake();
    }
}

impl ArcWake for NotifierRead {
    fn wake_by_ref(this: &Arc<Self>) {
        let wakers = mem::take(&mut *this.read_stream.lock());
        for (_, waker) in wakers {
            waker.wake();
        }
        this.wake_next_stream();
    }
}

struct NotifierWrite {
    /// List of wakers to wake when write operations on the
    /// underlying I/O stream can proceed.
    pending: Mutex<Vec<Waker>>,
}

impl NotifierWrite {
    /// Registers interest of a task in writing to some substream.
    ///
    /// The returned waker should be passed to an I/O write operation
    /// that schedules a wakeup if the operation is pending.
    #[must_use]
    fn register<'a>(self: &'a Arc<Self>, waker: &Waker) -> WakerRef<'a> {
        let mut pending = self.pending.lock();
        if pending.iter().all(|w| !w.will_wake(waker)) {
            pending.push(waker.clone());
        }
        waker_ref(self)
    }
}

impl ArcWake for NotifierWrite {
    fn wake_by_ref(this: &Arc<Self>) {
        let wakers = mem::take(&mut *this.pending.lock());
        for waker in wakers {
            waker.wake();
        }
    }
}

struct NotifierOpen {
    /// Wakers of pending tasks interested in creating new
    /// outbound substreams.
    pending: Vec<Waker>,
}

impl NotifierOpen {
    /// Registers interest of a task in opening a new outbound substream.
    fn register(&mut self, waker: &Waker) {
        if self.pending.iter().all(|w| !w.will_wake(waker)) {
            self.pending.push(waker.clone());
        }
    }

    fn wake_all(&mut self) {
        let wakers = mem::take(&mut self.pending);
        for waker in wakers {
            waker.wake();
        }
    }
}

/// The maximum number of pending reset or close frames to send
/// we are willing to buffer beyond the configured substream limit.
/// This extra leeway bounds resource usage while allowing some
/// back-pressure when sending out these frames.
///
/// If too many pending frames accumulate, the multiplexed stream is
/// considered unhealthy and terminates with an error.
const EXTRA_PENDING_FRAMES: usize = 1000;

#[cfg(test)]
mod tests {
    use async_std::task;
    use bytes::BytesMut;
    use futures::prelude::*;
    use asynchronous_codec::{Decoder, Encoder};
    use quickcheck::*;
    use rand::prelude::*;
    use std::collections::HashSet;
    use std::num::NonZeroU8;
    use std::ops::DerefMut;
    use std::pin::Pin;
    use super::*;

    impl Arbitrary for MaxBufferBehaviour {
        fn arbitrary<G: Gen>(g: &mut G) -> MaxBufferBehaviour {
            *[MaxBufferBehaviour::Block, MaxBufferBehaviour::ResetStream].choose(g).unwrap()
        }
    }

    impl Arbitrary for MplexConfig {
        fn arbitrary<G: Gen>(g: &mut G) -> MplexConfig {
            MplexConfig {
                max_substreams: g.gen_range(1, 100),
                max_buffer_len: g.gen_range(1, 1000),
                max_buffer_behaviour: MaxBufferBehaviour::arbitrary(g),
                split_send_size: g.gen_range(1, 10000),
            }
        }
    }

    /// Memory-backed "connection".
    struct Connection {
        /// The buffer that the `Multiplexed` stream reads from.
        r_buf: BytesMut,
        /// The buffer that the `Multiplexed` stream writes to.
        w_buf: BytesMut,
        /// Whether the connection should return EOF on the next read.
        eof: bool,
    }

    impl AsyncRead for Connection {
        fn poll_read(
            mut self: Pin<&mut Self>,
            _: &mut Context<'_>,
            buf: &mut [u8]
        ) -> Poll<io::Result<usize>> {
            if self.eof {
                return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()))
            }
            let n = std::cmp::min(buf.len(), self.r_buf.len());
            let data = self.r_buf.split_to(n);
            buf[..n].copy_from_slice(&data[..]);
            if n == 0 {
                Poll::Pending
            } else {
                Poll::Ready(Ok(n))
            }
        }
    }

    impl AsyncWrite for Connection {
        fn poll_write(
            mut self: Pin<&mut Self>,
            _: &mut Context<'_>,
            buf: &[u8]
        ) -> Poll<io::Result<usize>> {
            self.w_buf.extend_from_slice(buf);
            Poll::Ready(Ok(buf.len()))
        }

        fn poll_flush(
            self: Pin<&mut Self>,
            _: &mut Context<'_>
        ) -> Poll<io::Result<()>> {
            Poll::Ready(Ok(()))
        }

        fn poll_close(
            self: Pin<&mut Self>,
            _: &mut Context<'_>
        ) -> Poll<io::Result<()>> {
            Poll::Ready(Ok(()))
        }
    }

    #[test]
    fn max_buffer_behaviour() {
        let _ = env_logger::try_init();

        fn prop(cfg: MplexConfig, overflow: NonZeroU8) {
            let mut r_buf = BytesMut::new();
            let mut codec = Codec::new();

            // Open the maximum number of inbound streams.
            for i in 0 .. cfg.max_substreams {
                let stream_id = LocalStreamId::dialer(i as u32);
                codec.encode(Frame::Open { stream_id  }, &mut r_buf).unwrap();
            }

            // Send more data on stream 0 than the buffer permits.
            let stream_id = LocalStreamId::dialer(0);
            let data = Bytes::from("Hello world");
            for _ in 0 .. cfg.max_buffer_len + overflow.get() as usize {
                codec.encode(Frame::Data { stream_id, data: data.clone() }, &mut r_buf).unwrap();
            }

            // Setup the multiplexed connection.
            let conn = Connection { r_buf, w_buf: BytesMut::new(), eof: false };
            let mut m = Multiplexed::new(conn, cfg.clone());

            task::block_on(future::poll_fn(move |cx| {
                // Receive all inbound streams.
                for i in 0 .. cfg.max_substreams {
                    match m.poll_next_stream(cx) {
                        Poll::Pending => panic!("Expected new inbound stream."),
                        Poll::Ready(Err(e)) => panic!("{:?}", e),
                        Poll::Ready(Ok(id)) => {
                            assert_eq!(id, LocalStreamId::listener(i as u32));
                        }
                    };
                }

                // Polling again for an inbound stream should yield `Pending`
                // after reading and buffering data frames up to the limit.
                let id = LocalStreamId::listener(0);
                match m.poll_next_stream(cx) {
                    Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
                    Poll::Pending => {
                        // We expect the implementation to yield when the buffer
                        // is full but before it is exceeded and the max buffer
                        // behaviour takes effect, giving the current task a
                        // chance to read from the buffer. Here we just read
                        // again to provoke the max buffer behaviour.
                        assert_eq!(
                            m.substreams.get_mut(&id).unwrap().recv_buf().len(),
                            cfg.max_buffer_len
                        );
                        match m.poll_next_stream(cx) {
                            Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
                            Poll::Pending => {
                                // Expect the buffer for stream 0 to be exceeded, triggering
                                // the max. buffer behaviour.
                                assert_eq!(
                                    m.substreams.get_mut(&id).unwrap().recv_buf().len(),
                                    cfg.max_buffer_len + 1
                                );
                            }
                        }
                    }
                }

                // Expect either a `Reset` to be sent or all reads to be
                // blocked `Pending`, depending on the `MaxBufferBehaviour`.
                match cfg.max_buffer_behaviour {
                    MaxBufferBehaviour::ResetStream => {
                        let _ = m.poll_flush_stream(cx, id);
                        let w_buf = &mut m.io.get_mut().deref_mut().w_buf;
                        let frame = codec.decode(w_buf).unwrap();
                        let stream_id = stream_id.into_remote();
                        assert_eq!(frame, Some(Frame::Reset { stream_id }));
                    }
                    MaxBufferBehaviour::Block => {
                        assert!(m.poll_next_stream(cx).is_pending());
                        for i in 1 .. cfg.max_substreams {
                            let id = LocalStreamId::listener(i as u32);
                            assert!(m.poll_read_stream(cx, id).is_pending());
                        }
                    }
                }

                // Drain the buffer by reading from the stream.
                for _ in 0 .. cfg.max_buffer_len + 1 {
                    match m.poll_read_stream(cx, id) {
                        Poll::Ready(Ok(Some(bytes))) => {
                            assert_eq!(bytes, data);
                        }
                        x => panic!("Unexpected: {:?}", x)
                    }
                }

                // Read from the stream after the buffer has been drained,
                // expecting either EOF or further data, depending on
                // the `MaxBufferBehaviour`.
                match cfg.max_buffer_behaviour {
                    MaxBufferBehaviour::ResetStream => {
                        // Expect to read EOF
                        match m.poll_read_stream(cx, id) {
                            Poll::Ready(Ok(None)) => {},
                            poll => panic!("Unexpected: {:?}", poll)
                        }
                    }
                    MaxBufferBehaviour::Block => {
                        // Expect to be able to continue reading.
                        match m.poll_read_stream(cx, id) {
                            Poll::Ready(Ok(Some(bytes))) => assert_eq!(bytes, data),
                            Poll::Pending => assert_eq!(overflow.get(), 1),
                            poll => panic!("Unexpected: {:?}", poll)
                        }
                    }
                }

                Poll::Ready(())
            }));
        }

        quickcheck(prop as fn(_,_))
    }

    #[test]
    fn close_on_error() {
        let _ = env_logger::try_init();

        fn prop(cfg: MplexConfig, num_streams: NonZeroU8) {
            let num_streams = cmp::min(cfg.max_substreams, num_streams.get() as usize);

            // Setup the multiplexed connection.
            let conn = Connection {
                r_buf: BytesMut::new(),
                w_buf: BytesMut::new(),
                eof: false
            };
            let mut m = Multiplexed::new(conn, cfg.clone());

            // Run the test.
            let mut opened = HashSet::new();
            task::block_on(future::poll_fn(move |cx| {
                // Open a number of streams.
                for _ in 0 .. num_streams {
                    let id = ready!(m.poll_open_stream(cx)).unwrap();
                    assert!(opened.insert(id));
                    assert!(m.poll_read_stream(cx, id).is_pending());
                }

                // Abruptly "close" the connection.
                m.io.get_mut().deref_mut().eof = true;

                // Reading from any stream should yield an error and all streams
                // should be closed due to the failed connection.
                assert!(opened.iter().all(|id| match m.poll_read_stream(cx, *id) {
                    Poll::Ready(Err(e)) => e.kind() == io::ErrorKind::UnexpectedEof,
                    _ => false
                }));

                assert!(m.substreams.is_empty());

                Poll::Ready(())
            }))
        }

        quickcheck(prop as fn(_,_))
    }
}