libp2prs_mplex/
lib.rs

1// Copyright 2020 Netwarps Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21pub mod connection;
22pub mod error;
23mod frame;
24mod pause;
25
26use async_trait::async_trait;
27use futures::{AsyncRead, AsyncWrite, FutureExt};
28use log::{info, trace};
29use std::fmt;
30
31use libp2prs_core::muxing::{IReadWrite, IStreamMuxer, ReadWriteEx, StreamInfo, StreamMuxer, StreamMuxerEx};
32use libp2prs_core::transport::{ConnectionInfo, TransportError};
33use libp2prs_core::upgrade::{UpgradeInfo, Upgrader};
34
35use crate::connection::Connection;
36use connection::{control::Control, stream::Stream, Id};
37use error::ConnectionError;
38use futures::future::BoxFuture;
39use libp2prs_core::identity::Keypair;
40use libp2prs_core::secure_io::SecureInfo;
41use libp2prs_core::{Multiaddr, PeerId, PublicKey};
42use parking_lot::Mutex;
43use std::sync::Arc;
44
45#[derive(Clone)]
46pub struct Config {}
47
48impl Config {
49    pub fn new() -> Self {
50        Config {}
51    }
52}
53
54impl Default for Config {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60/// A Mplex connection.
61///
62/// This implementation isn't capable of detecting when the underlying socket changes its address,
63/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
64pub struct Mplex<C> {
65    /// The [`futures::stream::Stream`] of incoming substreams.
66    conn: Arc<Mutex<Option<Connection<C>>>>,
67    /// Handle to control the connection.
68    ctrl: Control,
69    /// For debug purpose
70    id: Id,
71    /// The secure&connection info provided by underlying socket.
72    /// The socket is moved into Connection, so we have to make a copy of these information
73    ///
74    /// The local multiaddr of this connection
75    pub la: Multiaddr,
76    /// The remote multiaddr of this connection
77    pub ra: Multiaddr,
78    /// The private key of the local
79    pub local_priv_key: Keypair,
80    /// For convenience, the local peer ID, generated from local pub key
81    pub local_peer_id: PeerId,
82    /// The public key of the remote.
83    pub remote_pub_key: PublicKey,
84    /// For convenience, put a PeerId here, which is actually calculated from remote_key
85    pub remote_peer_id: PeerId,
86}
87
88impl<C> Clone for Mplex<C> {
89    fn clone(&self) -> Self {
90        Mplex {
91            conn: self.conn.clone(),
92            ctrl: self.ctrl.clone(),
93            id: self.id,
94            la: self.la.clone(),
95            ra: self.ra.clone(),
96            local_priv_key: self.local_priv_key.clone(),
97            local_peer_id: self.local_peer_id,
98            remote_pub_key: self.remote_pub_key.clone(),
99            remote_peer_id: self.remote_peer_id,
100        }
101    }
102}
103
104impl<C> fmt::Debug for Mplex<C> {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        f.debug_struct("Mplex")
107            .field("Id", &self.id)
108            .field("Ra", &self.ra)
109            .field("Rid", &self.remote_peer_id)
110            .finish()
111    }
112}
113
114impl<C: ConnectionInfo + SecureInfo + AsyncRead + AsyncWrite + Send + Unpin + 'static> Mplex<C> {
115    pub fn new(io: C) -> Self {
116        // `io` will be moved into Connection soon, make a copy of the connection & secure info
117        let la = io.local_multiaddr();
118        let ra = io.remote_multiaddr();
119        let local_priv_key = io.local_priv_key();
120        let local_peer_id = io.local_peer();
121        let remote_pub_key = io.remote_pub_key();
122        let remote_peer_id = io.remote_peer();
123
124        let conn = Connection::new(io);
125        let id = conn.id();
126        let ctrl = conn.control();
127        Mplex {
128            conn: Arc::new(Mutex::new(Some(conn))),
129            ctrl,
130            id,
131            la,
132            ra,
133            local_priv_key,
134            local_peer_id,
135            remote_pub_key,
136            remote_peer_id,
137        }
138    }
139}
140
141impl<C: Send> ConnectionInfo for Mplex<C> {
142    fn local_multiaddr(&self) -> Multiaddr {
143        self.la.clone()
144    }
145    fn remote_multiaddr(&self) -> Multiaddr {
146        self.ra.clone()
147    }
148}
149
150impl<C> SecureInfo for Mplex<C> {
151    fn local_peer(&self) -> PeerId {
152        self.local_peer_id
153    }
154
155    fn remote_peer(&self) -> PeerId {
156        self.remote_peer_id
157    }
158
159    fn local_priv_key(&self) -> Keypair {
160        self.local_priv_key.clone()
161    }
162
163    fn remote_pub_key(&self) -> PublicKey {
164        self.remote_pub_key.clone()
165    }
166}
167
168/// StreamInfo for Mplex::Stream
169impl StreamInfo for Stream {
170    fn id(&self) -> usize {
171        self.val() as usize
172    }
173}
174
175#[async_trait]
176impl ReadWriteEx for Stream {
177    fn box_clone(&self) -> IReadWrite {
178        Box::new(self.clone())
179    }
180}
181
182impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> StreamMuxerEx for Mplex<C> {}
183
184#[async_trait]
185impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> StreamMuxer for Mplex<C> {
186    async fn open_stream(&mut self) -> Result<IReadWrite, TransportError> {
187        trace!("opening a new outbound substream for mplex...");
188        let s = self.ctrl.open_stream().await?;
189        Ok(Box::new(s))
190    }
191
192    async fn accept_stream(&mut self) -> Result<IReadWrite, TransportError> {
193        trace!("waiting for a new inbound substream for yamux...");
194        let s = self.ctrl.accept_stream().await?;
195        Ok(Box::new(s))
196    }
197
198    async fn close(&mut self) -> Result<(), TransportError> {
199        let _ = self.ctrl.close().await?;
200        Ok(())
201    }
202
203    fn task(&mut self) -> Option<BoxFuture<'static, ()>> {
204        if let Some(mut conn) = self.conn.lock().take() {
205            return Some(
206                async move {
207                    while conn.next_stream().await.is_ok() {}
208                    info!("connection is closed");
209                }
210                .boxed(),
211            );
212        }
213        None
214    }
215
216    fn box_clone(&self) -> IStreamMuxer {
217        Box::new(self.clone())
218    }
219}
220
221impl UpgradeInfo for Config {
222    type Info = &'static [u8];
223
224    fn protocol_info(&self) -> Vec<Self::Info> {
225        vec![b"/mplex/6.7.0"]
226    }
227}
228
229#[async_trait]
230impl<T> Upgrader<T> for Config
231where
232    T: ConnectionInfo + SecureInfo + AsyncRead + AsyncWrite + Send + Unpin + 'static,
233{
234    type Output = Mplex<T>;
235
236    async fn upgrade_inbound(self, socket: T, _info: <Self as UpgradeInfo>::Info) -> Result<Self::Output, TransportError> {
237        trace!("upgrading mplex inbound");
238        Ok(Mplex::new(socket))
239    }
240
241    async fn upgrade_outbound(self, socket: T, _info: <Self as UpgradeInfo>::Info) -> Result<Self::Output, TransportError> {
242        trace!("upgrading mplex outbound");
243        Ok(Mplex::new(socket))
244    }
245}
246
247impl From<ConnectionError> for TransportError {
248    fn from(e: ConnectionError) -> Self {
249        // TODO: make a mux error catalog for secio
250        TransportError::StreamMuxerError(Box::new(e))
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    #[test]
257    fn it_works() {
258        assert_eq!(2 + 2, 4);
259    }
260}