tetsy_libp2p_mplex/
lib.rs1mod codec;
22mod config;
23mod io;
24
25pub use config::{MplexConfig, MaxBufferBehaviour};
26
27use codec::LocalStreamId;
28use std::{cmp, iter, task::Context, task::Poll};
29use bytes::Bytes;
30use tetsy_libp2p_core::{
31 StreamMuxer,
32 muxing::StreamMuxerEvent,
33 upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
34};
35use parking_lot::Mutex;
36use futures::{prelude::*, future, ready};
37
38impl UpgradeInfo for MplexConfig {
39 type Info = &'static [u8];
40 type InfoIter = iter::Once<Self::Info>;
41
42 fn protocol_info(&self) -> Self::InfoIter {
43 iter::once(b"/mplex/6.7.0")
44 }
45}
46
47impl<C> InboundUpgrade<C> for MplexConfig
48where
49 C: AsyncRead + AsyncWrite + Unpin,
50{
51 type Output = Multiplex<C>;
52 type Error = io::Error;
53 type Future = future::Ready<Result<Self::Output, io::Error>>;
54
55 fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
56 future::ready(Ok(Multiplex {
57 io: Mutex::new(io::Multiplexed::new(socket, self)),
58 }))
59 }
60}
61
62impl<C> OutboundUpgrade<C> for MplexConfig
63where
64 C: AsyncRead + AsyncWrite + Unpin,
65{
66 type Output = Multiplex<C>;
67 type Error = io::Error;
68 type Future = future::Ready<Result<Self::Output, io::Error>>;
69
70 fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
71 future::ready(Ok(Multiplex {
72 io: Mutex::new(io::Multiplexed::new(socket, self))
73 }))
74 }
75}
76
77pub struct Multiplex<C> {
82 io: Mutex<io::Multiplexed<C>>
83}
84
85impl<C> StreamMuxer for Multiplex<C>
86where
87 C: AsyncRead + AsyncWrite + Unpin
88{
89 type Substream = Substream;
90 type OutboundSubstream = OutboundSubstream;
91 type Error = io::Error;
92
93 fn poll_event(&self, cx: &mut Context<'_>)
94 -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>>
95 {
96 let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
97 let stream = Substream::new(stream_id);
98 Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
99 }
100
101 fn open_outbound(&self) -> Self::OutboundSubstream {
102 OutboundSubstream {}
103 }
104
105 fn poll_outbound(&self, cx: &mut Context<'_>, _: &mut Self::OutboundSubstream)
106 -> Poll<Result<Self::Substream, io::Error>>
107 {
108 let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
109 return Poll::Ready(Ok(Substream::new(stream_id)))
110 }
111
112 fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
113 }
115
116 fn read_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &mut [u8])
117 -> Poll<Result<usize, io::Error>>
118 {
119 loop {
120 if !substream.current_data.is_empty() {
122 let len = cmp::min(substream.current_data.len(), buf.len());
123 buf[..len].copy_from_slice(&substream.current_data.split_to(len));
124 return Poll::Ready(Ok(len));
125 }
126
127 match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
129 Some(data) => { substream.current_data = data; }
130 None => { return Poll::Ready(Ok(0)) }
131 }
132 }
133 }
134
135 fn write_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream, buf: &[u8])
136 -> Poll<Result<usize, io::Error>>
137 {
138 self.io.lock().poll_write_stream(cx, substream.id, buf)
139 }
140
141 fn flush_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
142 -> Poll<Result<(), io::Error>>
143 {
144 self.io.lock().poll_flush_stream(cx, substream.id)
145 }
146
147 fn shutdown_substream(&self, cx: &mut Context<'_>, substream: &mut Self::Substream)
148 -> Poll<Result<(), io::Error>>
149 {
150 self.io.lock().poll_close_stream(cx, substream.id)
151 }
152
153 fn destroy_substream(&self, sub: Self::Substream) {
154 self.io.lock().drop_stream(sub.id);
155 }
156
157 fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
158 self.io.lock().poll_close(cx)
159 }
160
161 fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
162 self.io.lock().poll_flush(cx)
163 }
164}
165
166pub struct OutboundSubstream {}
168
169pub struct Substream {
171 id: LocalStreamId,
173 current_data: Bytes,
175}
176
177impl Substream {
178 fn new(id: LocalStreamId) -> Self {
179 Self { id, current_data: Bytes::new() }
180 }
181}