mwc_libp2p_core/connection/
substream.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::muxing::{StreamMuxer, StreamMuxerEvent, SubstreamRef, substream_from_ref};
22use futures::prelude::*;
23use multiaddr::Multiaddr;
24use smallvec::SmallVec;
25use std::sync::Arc;
26use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
27
28/// Endpoint for a received substream.
29#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub enum SubstreamEndpoint<TDialInfo> {
31    Dialer(TDialInfo),
32    Listener,
33}
34
35impl<TDialInfo> SubstreamEndpoint<TDialInfo> {
36    /// Returns true for `Dialer`.
37    pub fn is_dialer(&self) -> bool {
38        match self {
39            SubstreamEndpoint::Dialer(_) => true,
40            SubstreamEndpoint::Listener => false,
41        }
42    }
43
44    /// Returns true for `Listener`.
45    pub fn is_listener(&self) -> bool {
46        match self {
47            SubstreamEndpoint::Dialer(_) => false,
48            SubstreamEndpoint::Listener => true,
49        }
50    }
51}
52
53/// Implementation of `Stream` that handles substream multiplexing.
54///
55/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying
56/// the `Muxing` will **not** close the existing substreams.
57///
58/// The stream will close once both the inbound and outbound channels are closed, and no more
59/// outbound substream attempt is pending.
60pub struct Muxing<TMuxer, TUserData>
61where
62    TMuxer: StreamMuxer,
63{
64    /// The muxer used to manage substreams.
65    inner: Arc<TMuxer>,
66    /// List of substreams we are currently opening.
67    outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
68}
69
70/// Future that signals the remote that we have closed the connection.
71pub struct Close<TMuxer> {
72    /// Muxer to close.
73    muxer: Arc<TMuxer>,
74}
75
76/// A successfully opened substream.
77pub type Substream<TMuxer> = SubstreamRef<Arc<TMuxer>>;
78
79/// Event that can happen on the `Muxing`.
80pub enum SubstreamEvent<TMuxer, TUserData>
81where
82    TMuxer: StreamMuxer,
83{
84    /// A new inbound substream arrived.
85    InboundSubstream {
86        /// The newly-opened substream. Will return EOF of an error if the `Muxing` is
87        /// destroyed or `close_graceful` is called.
88        substream: Substream<TMuxer>,
89    },
90
91    /// An outbound substream has successfully been opened.
92    OutboundSubstream {
93        /// User data that has been passed to the `open_substream` method.
94        user_data: TUserData,
95        /// The newly-opened substream. Will return EOF of an error if the `Muxing` is
96        /// destroyed or `close_graceful` is called.
97        substream: Substream<TMuxer>,
98    },
99
100    /// Address to the remote has changed. The previous one is now obsolete.
101    ///
102    /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
103    /// >           can change their IP address while retaining the same QUIC connection.
104    AddressChange(Multiaddr),
105}
106
107/// Identifier for a substream being opened.
108#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
109pub struct OutboundSubstreamId(usize);
110
111impl<TMuxer, TUserData> Muxing<TMuxer, TUserData>
112where
113    TMuxer: StreamMuxer,
114{
115    /// Creates a new node events stream.
116    pub fn new(muxer: TMuxer) -> Self {
117        Muxing {
118            inner: Arc::new(muxer),
119            outbound_substreams: SmallVec::new(),
120        }
121    }
122
123    /// Starts the process of opening a new outbound substream.
124    ///
125    /// After calling this method, polling the stream should eventually produce either an
126    /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has
127    /// been passed to this method.
128    pub fn open_substream(&mut self, user_data: TUserData) {
129        let raw = self.inner.open_outbound();
130        self.outbound_substreams.push((user_data, raw));
131    }
132
133    /// Destroys the node stream and returns all the pending outbound substreams, plus an object
134    /// that signals the remote that we shut down the connection.
135    #[must_use]
136    pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
137        let substreams = self.cancel_outgoing();
138        let close = Close { muxer: self.inner.clone() };
139        (close, substreams)
140    }
141
142    /// Destroys all outbound streams and returns the corresponding user data.
143    pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
144        let mut out = Vec::with_capacity(self.outbound_substreams.len());
145        for (user_data, outbound) in self.outbound_substreams.drain(..) {
146            out.push(user_data);
147            self.inner.destroy_outbound(outbound);
148        }
149        out
150    }
151
152    /// Provides an API similar to `Future`.
153    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
154        // Polling inbound substream.
155        match self.inner.poll_event(cx) {
156            Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
157                let substream = substream_from_ref(self.inner.clone(), substream);
158                return Poll::Ready(Ok(SubstreamEvent::InboundSubstream {
159                    substream,
160                }));
161            }
162            Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) =>
163                return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))),
164            Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
165            Poll::Pending => {}
166        }
167
168        // Polling outbound substreams.
169        // We remove each element from `outbound_substreams` one by one and add them back.
170        for n in (0..self.outbound_substreams.len()).rev() {
171            let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
172            match self.inner.poll_outbound(cx, &mut outbound) {
173                Poll::Ready(Ok(substream)) => {
174                    let substream = substream_from_ref(self.inner.clone(), substream);
175                    self.inner.destroy_outbound(outbound);
176                    return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
177                        user_data,
178                        substream,
179                    }));
180                }
181                Poll::Pending => {
182                    self.outbound_substreams.push((user_data, outbound));
183                }
184                Poll::Ready(Err(err)) => {
185                    self.inner.destroy_outbound(outbound);
186                    return Poll::Ready(Err(err.into()));
187                }
188            }
189        }
190
191        // Nothing happened. Register our task to be notified and return.
192        Poll::Pending
193    }
194}
195
196impl<TMuxer, TUserData> fmt::Debug for Muxing<TMuxer, TUserData>
197where
198    TMuxer: StreamMuxer,
199{
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
201        f.debug_struct("Muxing")
202            .field("outbound_substreams", &self.outbound_substreams.len())
203            .finish()
204    }
205}
206
207impl<TMuxer, TUserData> Drop for Muxing<TMuxer, TUserData>
208where
209    TMuxer: StreamMuxer,
210{
211    fn drop(&mut self) {
212        // The substreams that were produced will continue to work, as the muxer is held in an Arc.
213        // However we will no longer process any further inbound or outbound substream, and we
214        // therefore close everything.
215        for (_, outbound) in self.outbound_substreams.drain(..) {
216            self.inner.destroy_outbound(outbound);
217        }
218    }
219}
220
221impl<TMuxer> Future for Close<TMuxer>
222where
223    TMuxer: StreamMuxer,
224{
225    type Output = Result<(), IoError>;
226
227    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228        match self.muxer.close(cx) {
229            Poll::Pending => Poll::Pending,
230            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
231            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
232        }
233    }
234}
235
236impl<TMuxer> fmt::Debug for Close<TMuxer>
237where
238    TMuxer: StreamMuxer,
239{
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
241        f.debug_struct("Close")
242            .finish()
243    }
244}
245
246impl<TMuxer, TUserData> fmt::Debug for SubstreamEvent<TMuxer, TUserData>
247where
248    TMuxer: StreamMuxer,
249    TMuxer::Substream: fmt::Debug,
250    TUserData: fmt::Debug,
251{
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        match self {
254            SubstreamEvent::InboundSubstream { substream } => {
255                f.debug_struct("SubstreamEvent::OutboundClosed")
256                    .field("substream", substream)
257                    .finish()
258            },
259            SubstreamEvent::OutboundSubstream { user_data, substream } => {
260                f.debug_struct("SubstreamEvent::OutboundSubstream")
261                    .field("user_data", user_data)
262                    .field("substream", substream)
263                    .finish()
264            },
265            SubstreamEvent::AddressChange(address) => {
266                f.debug_struct("SubstreamEvent::AddressChange")
267                    .field("address", address)
268                    .finish()
269            },
270        }
271    }
272}