1use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
24use futures::{prelude::*, task::Context, task::Poll};
25use log::debug;
26use smallvec::SmallVec;
27use std::{collections::VecDeque, fmt, pin::Pin};
28
29pub struct ListenersStream<TTrans>
79where
80 TTrans: Transport,
81{
82 transport: TTrans,
84 listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
88 next_id: ListenerId
90}
91
92#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
97pub struct ListenerId(u64);
98
99#[pin_project::pin_project]
101#[derive(Debug)]
102struct Listener<TTrans>
103where
104 TTrans: Transport,
105{
106 id: ListenerId,
108 #[pin]
110 listener: TTrans::Listener,
111 addresses: SmallVec<[Multiaddr; 4]>
113}
114
115pub enum ListenersEvent<TTrans>
117where
118 TTrans: Transport,
119{
120 NewAddress {
122 listener_id: ListenerId,
124 listen_addr: Multiaddr
126 },
127 AddressExpired {
129 listener_id: ListenerId,
131 listen_addr: Multiaddr
133 },
134 Incoming {
136 listener_id: ListenerId,
138 upgrade: TTrans::ListenerUpgrade,
140 local_addr: Multiaddr,
142 send_back_addr: Multiaddr,
144 },
145 Closed {
147 listener_id: ListenerId,
149 addresses: Vec<Multiaddr>,
151 reason: Result<(), TTrans::Error>,
154 },
155 Error {
160 listener_id: ListenerId,
162 error: TTrans::Error,
164 }
165}
166
167impl<TTrans> ListenersStream<TTrans>
168where
169 TTrans: Transport,
170{
171 pub fn new(transport: TTrans) -> Self {
173 ListenersStream {
174 transport,
175 listeners: VecDeque::new(),
176 next_id: ListenerId(1)
177 }
178 }
179
180 pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
183 ListenersStream {
184 transport,
185 listeners: VecDeque::with_capacity(capacity),
186 next_id: ListenerId(1)
187 }
188 }
189
190 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>>
194 where
195 TTrans: Clone,
196 {
197 let listener = self.transport.clone().listen_on(addr)?;
198 self.listeners.push_back(Box::pin(Listener {
199 id: self.next_id,
200 listener,
201 addresses: SmallVec::new()
202 }));
203 let id = self.next_id;
204 self.next_id = ListenerId(self.next_id.0 + 1);
205 Ok(id)
206 }
207
208 pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
212 if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
213 self.listeners.remove(i);
214 Ok(())
215 } else {
216 Err(())
217 }
218 }
219
220 pub fn transport(&self) -> &TTrans {
222 &self.transport
223 }
224
225 pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
227 self.listeners.iter().flat_map(|l| l.addresses.iter())
228 }
229
230 pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
232 let mut remaining = self.listeners.len();
234 while let Some(mut listener) = self.listeners.pop_back() {
235 let mut listener_project = listener.as_mut().project();
236 match TryStream::try_poll_next(listener_project.listener.as_mut(), cx) {
237 Poll::Pending => {
238 self.listeners.push_front(listener);
239 remaining -= 1;
240 if remaining == 0 { break }
241 }
242 Poll::Ready(Some(Ok(ListenerEvent::Upgrade { upgrade, local_addr, remote_addr }))) => {
243 let id = *listener_project.id;
244 self.listeners.push_front(listener);
245 return Poll::Ready(ListenersEvent::Incoming {
246 listener_id: id,
247 upgrade,
248 local_addr,
249 send_back_addr: remote_addr
250 })
251 }
252 Poll::Ready(Some(Ok(ListenerEvent::NewAddress(a)))) => {
253 if listener_project.addresses.contains(&a) {
254 debug!("Transport has reported address {} multiple times", a)
255 }
256 if !listener_project.addresses.contains(&a) {
257 listener_project.addresses.push(a.clone());
258 }
259 let id = *listener_project.id;
260 self.listeners.push_front(listener);
261 return Poll::Ready(ListenersEvent::NewAddress {
262 listener_id: id,
263 listen_addr: a
264 })
265 }
266 Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(a)))) => {
267 listener_project.addresses.retain(|x| x != &a);
268 let id = *listener_project.id;
269 self.listeners.push_front(listener);
270 return Poll::Ready(ListenersEvent::AddressExpired {
271 listener_id: id,
272 listen_addr: a
273 })
274 }
275 Poll::Ready(Some(Ok(ListenerEvent::Error(error)))) => {
276 let id = *listener_project.id;
277 self.listeners.push_front(listener);
278 return Poll::Ready(ListenersEvent::Error {
279 listener_id: id,
280 error,
281 })
282 }
283 Poll::Ready(None) => {
284 return Poll::Ready(ListenersEvent::Closed {
285 listener_id: *listener_project.id,
286 addresses: listener_project.addresses.drain(..).collect(),
287 reason: Ok(()),
288 })
289 }
290 Poll::Ready(Some(Err(err))) => {
291 return Poll::Ready(ListenersEvent::Closed {
292 listener_id: *listener_project.id,
293 addresses: listener_project.addresses.drain(..).collect(),
294 reason: Err(err),
295 })
296 }
297 }
298 }
299
300 Poll::Pending
302 }
303}
304
305impl<TTrans> Stream for ListenersStream<TTrans>
306where
307 TTrans: Transport,
308{
309 type Item = ListenersEvent<TTrans>;
310
311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 ListenersStream::poll(self, cx).map(Option::Some)
313 }
314}
315
316impl<TTrans> Unpin for ListenersStream<TTrans>
317where
318 TTrans: Transport,
319{
320}
321
322impl<TTrans> fmt::Debug for ListenersStream<TTrans>
323where
324 TTrans: Transport + fmt::Debug,
325{
326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
327 f.debug_struct("ListenersStream")
328 .field("transport", &self.transport)
329 .field("listen_addrs", &self.listen_addrs().collect::<Vec<_>>())
330 .finish()
331 }
332}
333
334impl<TTrans> fmt::Debug for ListenersEvent<TTrans>
335where
336 TTrans: Transport,
337 TTrans::Error: fmt::Debug,
338{
339 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
340 match self {
341 ListenersEvent::NewAddress { listener_id, listen_addr } => f
342 .debug_struct("ListenersEvent::NewAddress")
343 .field("listener_id", listener_id)
344 .field("listen_addr", listen_addr)
345 .finish(),
346 ListenersEvent::AddressExpired { listener_id, listen_addr } => f
347 .debug_struct("ListenersEvent::AddressExpired")
348 .field("listener_id", listener_id)
349 .field("listen_addr", listen_addr)
350 .finish(),
351 ListenersEvent::Incoming { listener_id, local_addr, .. } => f
352 .debug_struct("ListenersEvent::Incoming")
353 .field("listener_id", listener_id)
354 .field("local_addr", local_addr)
355 .finish(),
356 ListenersEvent::Closed { listener_id, addresses, reason } => f
357 .debug_struct("ListenersEvent::Closed")
358 .field("listener_id", listener_id)
359 .field("addresses", addresses)
360 .field("reason", reason)
361 .finish(),
362 ListenersEvent::Error { listener_id, error } => f
363 .debug_struct("ListenersEvent::Error")
364 .field("listener_id", listener_id)
365 .field("error", error)
366 .finish()
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::transport;
375
376 #[test]
377 fn incoming_event() {
378 async_std::task::block_on(async move {
379 let mem_transport = transport::MemoryTransport::default();
380
381 let mut listeners = ListenersStream::new(mem_transport);
382 listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
383
384 let address = {
385 let event = listeners.next().await.unwrap();
386 if let ListenersEvent::NewAddress { listen_addr, .. } = event {
387 listen_addr
388 } else {
389 panic!("Was expecting the listen address to be reported")
390 }
391 };
392
393 let address2 = address.clone();
394 async_std::task::spawn(async move {
395 mem_transport.dial(address2).unwrap().await.unwrap();
396 });
397
398 match listeners.next().await.unwrap() {
399 ListenersEvent::Incoming { local_addr, send_back_addr, .. } => {
400 assert_eq!(local_addr, address);
401 assert!(send_back_addr != address);
402 },
403 _ => panic!()
404 }
405 });
406 }
407
408 #[test]
409 fn listener_event_error_isnt_fatal() {
410 #[derive(Clone)]
414 struct DummyTrans;
415 impl transport::Transport for DummyTrans {
416 type Output = ();
417 type Error = std::io::Error;
418 type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
419 type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
420 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
421
422 fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
423 Ok(Box::pin(stream::unfold((), |()| async move {
424 Some((Ok(ListenerEvent::Error(std::io::Error::from(std::io::ErrorKind::Other))), ()))
425 })))
426 }
427
428 fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
429 panic!()
430 }
431
432 fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
433 }
434
435 async_std::task::block_on(async move {
436 let transport = DummyTrans;
437 let mut listeners = ListenersStream::new(transport);
438 listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
439
440 for _ in 0..10 {
441 match listeners.next().await.unwrap() {
442 ListenersEvent::Error { .. } => {},
443 _ => panic!()
444 }
445 }
446 });
447 }
448
449 #[test]
450 fn listener_error_is_fatal() {
451 #[derive(Clone)]
454 struct DummyTrans;
455 impl transport::Transport for DummyTrans {
456 type Output = ();
457 type Error = std::io::Error;
458 type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
459 type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
460 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
461
462 fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
463 Ok(Box::pin(stream::unfold((), |()| async move {
464 Some((Err(std::io::Error::from(std::io::ErrorKind::Other)), ()))
465 })))
466 }
467
468 fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
469 panic!()
470 }
471
472 fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
473 }
474
475 async_std::task::block_on(async move {
476 let transport = DummyTrans;
477 let mut listeners = ListenersStream::new(transport);
478 listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
479
480 match listeners.next().await.unwrap() {
481 ListenersEvent::Closed { .. } => {},
482 _ => panic!()
483 }
484 });
485 }
486}