mwc_libp2p_core/connection/
substream.rs1use crate::muxing::{StreamMuxer, StreamMuxerEvent, SubstreamRef, substream_from_ref};
22use futures::prelude::*;
23use multiaddr::Multiaddr;
24use smallvec::SmallVec;
25use std::sync::Arc;
26use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
27
28#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub enum SubstreamEndpoint<TDialInfo> {
31 Dialer(TDialInfo),
32 Listener,
33}
34
35impl<TDialInfo> SubstreamEndpoint<TDialInfo> {
36 pub fn is_dialer(&self) -> bool {
38 match self {
39 SubstreamEndpoint::Dialer(_) => true,
40 SubstreamEndpoint::Listener => false,
41 }
42 }
43
44 pub fn is_listener(&self) -> bool {
46 match self {
47 SubstreamEndpoint::Dialer(_) => false,
48 SubstreamEndpoint::Listener => true,
49 }
50 }
51}
52
53pub struct Muxing<TMuxer, TUserData>
61where
62 TMuxer: StreamMuxer,
63{
64 inner: Arc<TMuxer>,
66 outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
68}
69
70pub struct Close<TMuxer> {
72 muxer: Arc<TMuxer>,
74}
75
76pub type Substream<TMuxer> = SubstreamRef<Arc<TMuxer>>;
78
79pub enum SubstreamEvent<TMuxer, TUserData>
81where
82 TMuxer: StreamMuxer,
83{
84 InboundSubstream {
86 substream: Substream<TMuxer>,
89 },
90
91 OutboundSubstream {
93 user_data: TUserData,
95 substream: Substream<TMuxer>,
98 },
99
100 AddressChange(Multiaddr),
105}
106
107#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
109pub struct OutboundSubstreamId(usize);
110
111impl<TMuxer, TUserData> Muxing<TMuxer, TUserData>
112where
113 TMuxer: StreamMuxer,
114{
115 pub fn new(muxer: TMuxer) -> Self {
117 Muxing {
118 inner: Arc::new(muxer),
119 outbound_substreams: SmallVec::new(),
120 }
121 }
122
123 pub fn open_substream(&mut self, user_data: TUserData) {
129 let raw = self.inner.open_outbound();
130 self.outbound_substreams.push((user_data, raw));
131 }
132
133 #[must_use]
136 pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
137 let substreams = self.cancel_outgoing();
138 let close = Close { muxer: self.inner.clone() };
139 (close, substreams)
140 }
141
142 pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
144 let mut out = Vec::with_capacity(self.outbound_substreams.len());
145 for (user_data, outbound) in self.outbound_substreams.drain(..) {
146 out.push(user_data);
147 self.inner.destroy_outbound(outbound);
148 }
149 out
150 }
151
152 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
154 match self.inner.poll_event(cx) {
156 Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
157 let substream = substream_from_ref(self.inner.clone(), substream);
158 return Poll::Ready(Ok(SubstreamEvent::InboundSubstream {
159 substream,
160 }));
161 }
162 Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) =>
163 return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))),
164 Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
165 Poll::Pending => {}
166 }
167
168 for n in (0..self.outbound_substreams.len()).rev() {
171 let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
172 match self.inner.poll_outbound(cx, &mut outbound) {
173 Poll::Ready(Ok(substream)) => {
174 let substream = substream_from_ref(self.inner.clone(), substream);
175 self.inner.destroy_outbound(outbound);
176 return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
177 user_data,
178 substream,
179 }));
180 }
181 Poll::Pending => {
182 self.outbound_substreams.push((user_data, outbound));
183 }
184 Poll::Ready(Err(err)) => {
185 self.inner.destroy_outbound(outbound);
186 return Poll::Ready(Err(err.into()));
187 }
188 }
189 }
190
191 Poll::Pending
193 }
194}
195
196impl<TMuxer, TUserData> fmt::Debug for Muxing<TMuxer, TUserData>
197where
198 TMuxer: StreamMuxer,
199{
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
201 f.debug_struct("Muxing")
202 .field("outbound_substreams", &self.outbound_substreams.len())
203 .finish()
204 }
205}
206
207impl<TMuxer, TUserData> Drop for Muxing<TMuxer, TUserData>
208where
209 TMuxer: StreamMuxer,
210{
211 fn drop(&mut self) {
212 for (_, outbound) in self.outbound_substreams.drain(..) {
216 self.inner.destroy_outbound(outbound);
217 }
218 }
219}
220
221impl<TMuxer> Future for Close<TMuxer>
222where
223 TMuxer: StreamMuxer,
224{
225 type Output = Result<(), IoError>;
226
227 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228 match self.muxer.close(cx) {
229 Poll::Pending => Poll::Pending,
230 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
231 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
232 }
233 }
234}
235
236impl<TMuxer> fmt::Debug for Close<TMuxer>
237where
238 TMuxer: StreamMuxer,
239{
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
241 f.debug_struct("Close")
242 .finish()
243 }
244}
245
246impl<TMuxer, TUserData> fmt::Debug for SubstreamEvent<TMuxer, TUserData>
247where
248 TMuxer: StreamMuxer,
249 TMuxer::Substream: fmt::Debug,
250 TUserData: fmt::Debug,
251{
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 match self {
254 SubstreamEvent::InboundSubstream { substream } => {
255 f.debug_struct("SubstreamEvent::OutboundClosed")
256 .field("substream", substream)
257 .finish()
258 },
259 SubstreamEvent::OutboundSubstream { user_data, substream } => {
260 f.debug_struct("SubstreamEvent::OutboundSubstream")
261 .field("user_data", user_data)
262 .field("substream", substream)
263 .finish()
264 },
265 SubstreamEvent::AddressChange(address) => {
266 f.debug_struct("SubstreamEvent::AddressChange")
267 .field("address", address)
268 .finish()
269 },
270 }
271 }
272}