mwc_libp2p_core/muxing/
singleton.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{connection::Endpoint, muxing::{StreamMuxer, StreamMuxerEvent}};
22
23use futures::prelude::*;
24use parking_lot::Mutex;
25use std::{io, pin::Pin, sync::atomic::{AtomicBool, Ordering}, task::Context, task::Poll};
26
27/// Implementation of `StreamMuxer` that allows only one substream on top of a connection,
28/// yielding the connection itself.
29///
30/// Applying this muxer on a connection doesn't read or write any data on the connection itself.
31/// Most notably, no protocol is negotiated.
32pub struct SingletonMuxer<TSocket> {
33    /// The inner connection.
34    inner: Mutex<TSocket>,
35    /// If true, a substream has been produced and any further attempt should fail.
36    substream_extracted: AtomicBool,
37    /// Our local endpoint. Always the same value as was passed to `new`.
38    endpoint: Endpoint,
39}
40
41impl<TSocket> SingletonMuxer<TSocket> {
42    /// Creates a new `SingletonMuxer`.
43    ///
44    /// If `endpoint` is `Dialer`, then only one outbound substream will be permitted.
45    /// If `endpoint` is `Listener`, then only one inbound substream will be permitted.
46    pub fn new(inner: TSocket, endpoint: Endpoint) -> Self {
47        SingletonMuxer {
48            inner: Mutex::new(inner),
49            substream_extracted: AtomicBool::new(false),
50            endpoint,
51        }
52    }
53}
54
55/// Substream of the `SingletonMuxer`.
56pub struct Substream {}
57/// Outbound substream attempt of the `SingletonMuxer`.
58pub struct OutboundSubstream {}
59
60impl<TSocket> StreamMuxer for SingletonMuxer<TSocket>
61where
62    TSocket: AsyncRead + AsyncWrite + Unpin,
63{
64    type Substream = Substream;
65    type OutboundSubstream = OutboundSubstream;
66    type Error = io::Error;
67
68    fn poll_event(&self, _: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, io::Error>> {
69        match self.endpoint {
70            Endpoint::Dialer => return Poll::Pending,
71            Endpoint::Listener => {}
72        }
73
74        if !self.substream_extracted.swap(true, Ordering::Relaxed) {
75            Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream {})))
76        } else {
77            Poll::Pending
78        }
79    }
80
81    fn open_outbound(&self) -> Self::OutboundSubstream {
82        OutboundSubstream {}
83    }
84
85    fn poll_outbound(&self, _: &mut Context<'_>, _: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, io::Error>> {
86        match self.endpoint {
87            Endpoint::Listener => return Poll::Pending,
88            Endpoint::Dialer => {}
89        }
90
91        if !self.substream_extracted.swap(true, Ordering::Relaxed) {
92            Poll::Ready(Ok(Substream {}))
93        } else {
94            Poll::Pending
95        }
96    }
97
98    fn destroy_outbound(&self, _: Self::OutboundSubstream) {
99    }
100
101    fn read_substream(&self, cx: &mut Context<'_>, _: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
102        AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf)
103    }
104
105    fn write_substream(&self, cx: &mut Context<'_>, _: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
106        AsyncWrite::poll_write(Pin::new(&mut *self.inner.lock()), cx, buf)
107    }
108
109    fn flush_substream(&self, cx: &mut Context<'_>, _: &mut Self::Substream) -> Poll<Result<(), io::Error>> {
110        AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx)
111    }
112
113    fn shutdown_substream(&self, cx: &mut Context<'_>, _: &mut Self::Substream) -> Poll<Result<(), io::Error>> {
114        AsyncWrite::poll_close(Pin::new(&mut *self.inner.lock()), cx)
115    }
116
117    fn destroy_substream(&self, _: Self::Substream) {
118    }
119
120    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
121        // The `StreamMuxer` trait requires that `close()` implies `flush_all()`.
122        self.flush_all(cx)
123    }
124
125    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
126        AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx)
127    }
128}