1pub mod connection;
22pub mod error;
23mod frame;
24mod pause;
25
26use async_trait::async_trait;
27use futures::{AsyncRead, AsyncWrite, FutureExt};
28use log::{info, trace};
29use std::fmt;
30
31use libp2prs_core::muxing::{IReadWrite, IStreamMuxer, ReadWriteEx, StreamInfo, StreamMuxer, StreamMuxerEx};
32use libp2prs_core::transport::{ConnectionInfo, TransportError};
33use libp2prs_core::upgrade::{UpgradeInfo, Upgrader};
34
35use crate::connection::Connection;
36use connection::{control::Control, stream::Stream, Id};
37use error::ConnectionError;
38use futures::future::BoxFuture;
39use libp2prs_core::identity::Keypair;
40use libp2prs_core::secure_io::SecureInfo;
41use libp2prs_core::{Multiaddr, PeerId, PublicKey};
42use parking_lot::Mutex;
43use std::sync::Arc;
44
45#[derive(Clone)]
46pub struct Config {}
47
48impl Config {
49 pub fn new() -> Self {
50 Config {}
51 }
52}
53
54impl Default for Config {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60pub struct Mplex<C> {
65 conn: Arc<Mutex<Option<Connection<C>>>>,
67 ctrl: Control,
69 id: Id,
71 pub la: Multiaddr,
76 pub ra: Multiaddr,
78 pub local_priv_key: Keypair,
80 pub local_peer_id: PeerId,
82 pub remote_pub_key: PublicKey,
84 pub remote_peer_id: PeerId,
86}
87
88impl<C> Clone for Mplex<C> {
89 fn clone(&self) -> Self {
90 Mplex {
91 conn: self.conn.clone(),
92 ctrl: self.ctrl.clone(),
93 id: self.id,
94 la: self.la.clone(),
95 ra: self.ra.clone(),
96 local_priv_key: self.local_priv_key.clone(),
97 local_peer_id: self.local_peer_id,
98 remote_pub_key: self.remote_pub_key.clone(),
99 remote_peer_id: self.remote_peer_id,
100 }
101 }
102}
103
104impl<C> fmt::Debug for Mplex<C> {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 f.debug_struct("Mplex")
107 .field("Id", &self.id)
108 .field("Ra", &self.ra)
109 .field("Rid", &self.remote_peer_id)
110 .finish()
111 }
112}
113
114impl<C: ConnectionInfo + SecureInfo + AsyncRead + AsyncWrite + Send + Unpin + 'static> Mplex<C> {
115 pub fn new(io: C) -> Self {
116 let la = io.local_multiaddr();
118 let ra = io.remote_multiaddr();
119 let local_priv_key = io.local_priv_key();
120 let local_peer_id = io.local_peer();
121 let remote_pub_key = io.remote_pub_key();
122 let remote_peer_id = io.remote_peer();
123
124 let conn = Connection::new(io);
125 let id = conn.id();
126 let ctrl = conn.control();
127 Mplex {
128 conn: Arc::new(Mutex::new(Some(conn))),
129 ctrl,
130 id,
131 la,
132 ra,
133 local_priv_key,
134 local_peer_id,
135 remote_pub_key,
136 remote_peer_id,
137 }
138 }
139}
140
141impl<C: Send> ConnectionInfo for Mplex<C> {
142 fn local_multiaddr(&self) -> Multiaddr {
143 self.la.clone()
144 }
145 fn remote_multiaddr(&self) -> Multiaddr {
146 self.ra.clone()
147 }
148}
149
150impl<C> SecureInfo for Mplex<C> {
151 fn local_peer(&self) -> PeerId {
152 self.local_peer_id
153 }
154
155 fn remote_peer(&self) -> PeerId {
156 self.remote_peer_id
157 }
158
159 fn local_priv_key(&self) -> Keypair {
160 self.local_priv_key.clone()
161 }
162
163 fn remote_pub_key(&self) -> PublicKey {
164 self.remote_pub_key.clone()
165 }
166}
167
168impl StreamInfo for Stream {
170 fn id(&self) -> usize {
171 self.val() as usize
172 }
173}
174
175#[async_trait]
176impl ReadWriteEx for Stream {
177 fn box_clone(&self) -> IReadWrite {
178 Box::new(self.clone())
179 }
180}
181
182impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> StreamMuxerEx for Mplex<C> {}
183
184#[async_trait]
185impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> StreamMuxer for Mplex<C> {
186 async fn open_stream(&mut self) -> Result<IReadWrite, TransportError> {
187 trace!("opening a new outbound substream for mplex...");
188 let s = self.ctrl.open_stream().await?;
189 Ok(Box::new(s))
190 }
191
192 async fn accept_stream(&mut self) -> Result<IReadWrite, TransportError> {
193 trace!("waiting for a new inbound substream for yamux...");
194 let s = self.ctrl.accept_stream().await?;
195 Ok(Box::new(s))
196 }
197
198 async fn close(&mut self) -> Result<(), TransportError> {
199 let _ = self.ctrl.close().await?;
200 Ok(())
201 }
202
203 fn task(&mut self) -> Option<BoxFuture<'static, ()>> {
204 if let Some(mut conn) = self.conn.lock().take() {
205 return Some(
206 async move {
207 while conn.next_stream().await.is_ok() {}
208 info!("connection is closed");
209 }
210 .boxed(),
211 );
212 }
213 None
214 }
215
216 fn box_clone(&self) -> IStreamMuxer {
217 Box::new(self.clone())
218 }
219}
220
221impl UpgradeInfo for Config {
222 type Info = &'static [u8];
223
224 fn protocol_info(&self) -> Vec<Self::Info> {
225 vec![b"/mplex/6.7.0"]
226 }
227}
228
229#[async_trait]
230impl<T> Upgrader<T> for Config
231where
232 T: ConnectionInfo + SecureInfo + AsyncRead + AsyncWrite + Send + Unpin + 'static,
233{
234 type Output = Mplex<T>;
235
236 async fn upgrade_inbound(self, socket: T, _info: <Self as UpgradeInfo>::Info) -> Result<Self::Output, TransportError> {
237 trace!("upgrading mplex inbound");
238 Ok(Mplex::new(socket))
239 }
240
241 async fn upgrade_outbound(self, socket: T, _info: <Self as UpgradeInfo>::Info) -> Result<Self::Output, TransportError> {
242 trace!("upgrading mplex outbound");
243 Ok(Mplex::new(socket))
244 }
245}
246
247impl From<ConnectionError> for TransportError {
248 fn from(e: ConnectionError) -> Self {
249 TransportError::StreamMuxerError(Box::new(e))
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 #[test]
257 fn it_works() {
258 assert_eq!(2 + 2, 4);
259 }
260}