1mod provider;
30
31#[cfg(feature = "async-io")]
32pub use provider::async_io;
33
34#[cfg(feature = "async-io")]
36pub type TcpConfig = GenTcpConfig<async_io::Tcp>;
37
38#[cfg(feature = "tokio")]
39pub use provider::tokio;
40
41#[cfg(feature = "tokio")]
43pub type TokioTcpConfig = GenTcpConfig<tokio::Tcp>;
44
45use futures::{
46 future::{self, BoxFuture, Ready},
47 prelude::*,
48 ready,
49};
50use futures_timer::Delay;
51use mwc_libp2p_core::{
52 address_translation,
53 multiaddr::{Multiaddr, Protocol},
54 transport::{ListenerEvent, Transport, TransportError},
55};
56use socket2::{Domain, Socket, Type};
57use std::{
58 collections::HashSet,
59 io,
60 net::{SocketAddr, IpAddr, TcpListener},
61 pin::Pin,
62 sync::{Arc, RwLock},
63 task::{Context, Poll},
64 time::Duration,
65};
66
67use provider::{Provider, IfEvent};
68
69#[derive(Clone, Debug)]
76pub struct GenTcpConfig<T> {
77 _impl: std::marker::PhantomData<T>,
79 ttl: Option<u32>,
81 nodelay: Option<bool>,
83 backlog: u32,
85 port_reuse: PortReuse,
87}
88
89type Port = u16;
90
91#[derive(Debug, Clone)]
93enum PortReuse {
94 Disabled,
97 Enabled {
102 listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>
105 },
106}
107
108impl PortReuse {
109 fn register(&mut self, ip: IpAddr, port: Port) {
113 if let PortReuse::Enabled { listen_addrs } = self {
114 log::trace!("Registering for port reuse: {}:{}", ip, port);
115 listen_addrs
116 .write()
117 .expect("`register()` and `unregister()` never panic while holding the lock")
118 .insert((ip, port));
119 }
120 }
121
122 fn unregister(&mut self, ip: IpAddr, port: Port) {
126 if let PortReuse::Enabled { listen_addrs } = self {
127 log::trace!("Unregistering for port reuse: {}:{}", ip, port);
128 listen_addrs
129 .write()
130 .expect("`register()` and `unregister()` never panic while holding the lock")
131 .remove(&(ip, port));
132 }
133 }
134
135 fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
145 if let PortReuse::Enabled { listen_addrs } = self {
146 for (ip, port) in listen_addrs
147 .read()
148 .expect("`register()` and `unregister()` never panic while holding the lock")
149 .iter()
150 {
151 if ip.is_ipv4() == remote_ip.is_ipv4()
152 && ip.is_loopback() == remote_ip.is_loopback()
153 {
154 return Some(SocketAddr::new(*ip, *port))
155 }
156 }
157 }
158
159 None
160 }
161}
162
163impl<T> GenTcpConfig<T>
164where
165 T: Provider + Send,
166{
167 pub fn new() -> Self {
178 Self {
179 ttl: None,
180 nodelay: None,
181 backlog: 1024,
182 port_reuse: PortReuse::Disabled,
183 _impl: std::marker::PhantomData,
184 }
185 }
186
187 pub fn ttl(mut self, value: u32) -> Self {
189 self.ttl = Some(value);
190 self
191 }
192
193 pub fn nodelay(mut self, value: bool) -> Self {
195 self.nodelay = Some(value);
196 self
197 }
198
199 pub fn listen_backlog(mut self, backlog: u32) -> Self {
201 self.backlog = backlog;
202 self
203 }
204
205 pub fn port_reuse(mut self, port_reuse: bool) -> Self {
303 self.port_reuse = if port_reuse {
304 PortReuse::Enabled {
305 listen_addrs: Arc::new(RwLock::new(HashSet::new()))
306 }
307 } else {
308 PortReuse::Disabled
309 };
310
311 self
312 }
313
314 fn create_socket(&self, socket_addr: &SocketAddr) -> io::Result<Socket> {
315 let domain = if socket_addr.is_ipv4() {
316 Domain::ipv4()
317 } else {
318 Domain::ipv6()
319 };
320 let socket = Socket::new(domain, Type::stream(), Some(socket2::Protocol::tcp()))?;
321 if socket_addr.is_ipv6() {
322 socket.set_only_v6(true)?;
323 }
324 if let Some(ttl) = self.ttl {
325 socket.set_ttl(ttl)?;
326 }
327 if let Some(nodelay) = self.nodelay {
328 socket.set_nodelay(nodelay)?;
329 }
330 socket.set_reuse_address(true)?;
331 #[cfg(unix)]
332 if let PortReuse::Enabled { .. } = &self.port_reuse {
333 socket.set_reuse_port(true)?;
334 }
335 Ok(socket)
336 }
337
338 fn do_listen(self, socket_addr: SocketAddr) -> io::Result<TcpListenStream<T>> {
339 let socket = self.create_socket(&socket_addr)?;
340 socket.bind(&socket_addr.into())?;
341 socket.listen(self.backlog as _)?;
342 socket.set_nonblocking(true)?;
343 TcpListenStream::<T>::new(socket.into_tcp_listener(), self.port_reuse)
344 }
345
346 async fn do_dial(self, socket_addr: SocketAddr) -> Result<T::Stream, io::Error> {
347 let socket = self.create_socket(&socket_addr)?;
348
349 if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) {
350 log::trace!("Binding dial socket to listen socket {}", addr);
351 socket.bind(&addr.into())?;
352 }
353
354 socket.set_nonblocking(true)?;
355
356 match socket.connect(&socket_addr.into()) {
357 Ok(()) => {}
358 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
359 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
360 Err(err) => return Err(err),
361 };
362
363 let stream = T::new_stream(socket.into_tcp_stream()).await?;
364 Ok(stream)
365 }
366}
367
368impl<T> Transport for GenTcpConfig<T>
369where
370 T: Provider + Send + 'static,
371 T::Listener: Unpin,
372 T::IfWatcher: Unpin,
373 T::Stream: Unpin,
374{
375 type Output = T::Stream;
376 type Error = io::Error;
377 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
378 type Listener = TcpListenStream<T>;
379 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
380
381 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
382 let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
383 sa
384 } else {
385 return Err(TransportError::MultiaddrNotSupported(addr));
386 };
387 log::debug!("listening on {}", socket_addr);
388 self.do_listen(socket_addr)
389 .map_err(TransportError::Other)
390 }
391
392 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
393 let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
394 if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
395 return Err(TransportError::MultiaddrNotSupported(addr));
396 }
397 socket_addr
398 } else {
399 return Err(TransportError::MultiaddrNotSupported(addr));
400 };
401 log::debug!("dialing {}", socket_addr);
402 Ok(Box::pin(self.do_dial(socket_addr)))
403 }
404
405 fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
423 match &self.port_reuse {
424 PortReuse::Disabled => address_translation(listen, observed),
425 PortReuse::Enabled { .. } => Some(observed.clone()),
426 }
427 }
428}
429
430type TcpListenerEvent<S> = ListenerEvent<Ready<Result<S, io::Error>>, io::Error>;
431
432enum IfWatch<TIfWatcher> {
433 Pending(BoxFuture<'static, io::Result<TIfWatcher>>),
434 Ready(TIfWatcher),
435}
436
437enum InAddr<TIfWatcher> {
439 One {
441 addr: IpAddr,
442 out: Option<Multiaddr>
443 },
444 Any {
446 addrs: HashSet<IpAddr>,
447 if_watch: IfWatch<TIfWatcher>,
448 }
449}
450
451pub struct TcpListenStream<T>
453where
454 T: Provider
455{
456 listen_addr: SocketAddr,
460 listener: T::Listener,
462 in_addr: InAddr<T::IfWatcher>,
468 port_reuse: PortReuse,
475 sleep_on_error: Duration,
478 pause: Option<Delay>,
480}
481
482impl<T> TcpListenStream<T>
483where
484 T: Provider
485{
486 fn new(listener: TcpListener, port_reuse: PortReuse) -> io::Result<Self> {
489 let listen_addr = listener.local_addr()?;
490
491 let in_addr = if match &listen_addr {
492 SocketAddr::V4(a) => a.ip().is_unspecified(),
493 SocketAddr::V6(a) => a.ip().is_unspecified(),
494 } {
495 InAddr::Any {
498 addrs: HashSet::new(),
499 if_watch: IfWatch::Pending(T::if_watcher()),
500 }
501 } else {
502 InAddr::One {
503 out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())),
504 addr: listen_addr.ip(),
505 }
506 };
507
508 let listener = T::new_listener(listener)?;
509
510 Ok(TcpListenStream {
511 port_reuse,
512 listener,
513 listen_addr,
514 in_addr,
515 pause: None,
516 sleep_on_error: Duration::from_millis(100),
517 })
518 }
519
520 fn disable_port_reuse(&mut self) {
527 match &self.in_addr {
528 InAddr::One { addr, .. } => {
529 self.port_reuse.unregister(*addr, self.listen_addr.port());
530 },
531 InAddr::Any { addrs, .. } => {
532 for addr in addrs {
533 self.port_reuse.unregister(*addr, self.listen_addr.port());
534 }
535 }
536 }
537 }
538}
539
540impl<T> Drop for TcpListenStream<T>
541where
542 T: Provider
543{
544 fn drop(&mut self) {
545 self.disable_port_reuse();
546 }
547}
548
549impl<T> Stream for TcpListenStream<T>
550where
551 T: Provider,
552 T::Listener: Unpin,
553 T::Stream: Unpin,
554 T::IfWatcher: Unpin,
555{
556 type Item = Result<TcpListenerEvent<T::Stream>, io::Error>;
557
558 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
559 let me = Pin::into_inner(self);
560
561 loop {
562 match &mut me.in_addr {
563 InAddr::Any { if_watch, addrs } => match if_watch {
564 IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) {
566 Ok(w) => {
567 *if_watch = IfWatch::Ready(w);
568 continue
569 }
570 Err(err) => {
571 log::debug! {
572 "Failed to begin observing interfaces: {:?}. Scheduling retry.",
573 err
574 };
575 *if_watch = IfWatch::Pending(T::if_watcher());
576 me.pause = Some(Delay::new(me.sleep_on_error));
577 return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
578 }
579 },
580 IfWatch::Ready(watch) => while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) {
582 match ev {
583 Ok(IfEvent::Up(inet)) => {
584 let ip = inet.addr();
585 if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) {
586 let ma = ip_to_multiaddr(ip, me.listen_addr.port());
587 log::debug!("New listen address: {}", ma);
588 me.port_reuse.register(ip, me.listen_addr.port());
589 return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
590 }
591 }
592 Ok(IfEvent::Down(inet)) => {
593 let ip = inet.addr();
594 if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) {
595 let ma = ip_to_multiaddr(ip, me.listen_addr.port());
596 log::debug!("Expired listen address: {}", ma);
597 me.port_reuse.unregister(ip, me.listen_addr.port());
598 return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
599 }
600 }
601 Err(err) => {
602 log::debug! {
603 "Failure polling interfaces: {:?}. Scheduling retry.",
604 err
605 };
606 me.pause = Some(Delay::new(me.sleep_on_error));
607 return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
608 }
609 }
610 },
611 },
612 InAddr::One { addr, out } => if let Some(multiaddr) = out.take() {
615 me.port_reuse.register(*addr, me.listen_addr.port());
616 return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(multiaddr))))
617 }
618 }
619
620 if let Some(mut pause) = me.pause.take() {
621 match Pin::new(&mut pause).poll(cx) {
622 Poll::Ready(_) => {}
623 Poll::Pending => {
624 me.pause = Some(pause);
625 return Poll::Pending;
626 }
627 }
628 }
629
630 let incoming = match T::poll_accept(&mut me.listener, cx) {
632 Poll::Pending => return Poll::Pending,
633 Poll::Ready(Ok(incoming)) => incoming,
634 Poll::Ready(Err(e)) => {
635 log::error!("error accepting incoming connection: {}", e);
637 me.pause = Some(Delay::new(me.sleep_on_error));
638 return Poll::Ready(Some(Ok(ListenerEvent::Error(e))));
639 }
640 };
641
642 let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
643 let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());
644
645 log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
646
647 return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
648 upgrade: future::ok(incoming.stream),
649 local_addr,
650 remote_addr,
651 })));
652 }
653 }
654}
655
656fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
658 let mut iter = addr.iter();
659 let proto1 = iter.next().ok_or(())?;
660 let proto2 = iter.next().ok_or(())?;
661
662 if iter.next().is_some() {
663 return Err(());
664 }
665
666 match (proto1, proto2) {
667 (Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
668 (Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
669 _ => Err(()),
670 }
671}
672
673fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
675 Multiaddr::empty()
676 .with(ip.into())
677 .with(Protocol::Tcp(port))
678}
679
680#[cfg(test)]
681mod tests {
682 use futures::channel::mpsc;
683 use super::*;
684
685 #[test]
686 fn multiaddr_to_tcp_conversion() {
687 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
688
689 assert!(
690 multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
691 .is_err()
692 );
693
694 assert_eq!(
695 multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
696 Ok(SocketAddr::new(
697 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
698 12345,
699 ))
700 );
701 assert_eq!(
702 multiaddr_to_socketaddr(
703 &"/ip4/255.255.255.255/tcp/8080"
704 .parse::<Multiaddr>()
705 .unwrap()
706 ),
707 Ok(SocketAddr::new(
708 IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
709 8080,
710 ))
711 );
712 assert_eq!(
713 multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
714 Ok(SocketAddr::new(
715 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
716 12345,
717 ))
718 );
719 assert_eq!(
720 multiaddr_to_socketaddr(
721 &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
722 .parse::<Multiaddr>()
723 .unwrap()
724 ),
725 Ok(SocketAddr::new(
726 IpAddr::V6(Ipv6Addr::new(
727 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
728 )),
729 8080,
730 ))
731 );
732 }
733
734 #[test]
735 fn communicating_between_dialer_and_listener() {
736 env_logger::try_init().ok();
737
738 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
739 let tcp = GenTcpConfig::<T>::new();
740 let mut listener = tcp.listen_on(addr).unwrap();
741 loop {
742 match listener.next().await.unwrap().unwrap() {
743 ListenerEvent::NewAddress(listen_addr) => {
744 ready_tx.send(listen_addr).await.unwrap();
745 }
746 ListenerEvent::Upgrade { upgrade, .. } => {
747 let mut upgrade = upgrade.await.unwrap();
748 let mut buf = [0u8; 3];
749 upgrade.read_exact(&mut buf).await.unwrap();
750 assert_eq!(buf, [1, 2, 3]);
751 upgrade.write_all(&[4, 5, 6]).await.unwrap();
752 return
753 }
754 e => panic!("Unexpected listener event: {:?}", e),
755 }
756 }
757 }
758
759 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
760 let addr = ready_rx.next().await.unwrap();
761 let tcp = GenTcpConfig::<T>::new();
762
763 let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
765 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
766
767 let mut buf = [0u8; 3];
768 socket.read_exact(&mut buf).await.unwrap();
769 assert_eq!(buf, [4, 5, 6]);
770 }
771
772 fn test(addr: Multiaddr) {
773 #[cfg(feature = "async-io")]
774 {
775 let (ready_tx, ready_rx) = mpsc::channel(1);
776 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
777 let dialer = dialer::<async_io::Tcp>(ready_rx);
778 let listener = async_std::task::spawn(listener);
779 async_std::task::block_on(dialer);
780 async_std::task::block_on(listener);
781 }
782
783 #[cfg(feature = "tokio")]
784 {
785 let (ready_tx, ready_rx) = mpsc::channel(1);
786 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
787 let dialer = dialer::<tokio::Tcp>(ready_rx);
788 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
789 let tasks = tokio_crate::task::LocalSet::new();
790 let listener = tasks.spawn_local(listener);
791 tasks.block_on(&rt, dialer);
792 tasks.block_on(&rt, listener).unwrap();
793 }
794 }
795
796 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
797 test("/ip6/::1/tcp/0".parse().unwrap());
798 }
799
800 #[test]
801 fn wildcard_expansion() {
802 env_logger::try_init().ok();
803
804 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
805 let tcp = GenTcpConfig::<T>::new();
806 let mut listener = tcp.listen_on(addr).unwrap();
807
808 loop {
809 match listener.next().await.unwrap().unwrap() {
810 ListenerEvent::NewAddress(a) => {
811 let mut iter = a.iter();
812 match iter.next().expect("ip address") {
813 Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
814 Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
815 other => panic!("Unexpected protocol: {}", other),
816 }
817 if let Protocol::Tcp(port) = iter.next().expect("port") {
818 assert_ne!(0, port)
819 } else {
820 panic!("No TCP port in address: {}", a)
821 }
822 ready_tx.send(a).await.ok();
823 return
824 }
825 _ => {}
826 }
827 }
828 }
829
830 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
831 let dest_addr = ready_rx.next().await.unwrap();
832 let tcp = GenTcpConfig::<T>::new();
833 tcp.dial(dest_addr).unwrap().await.unwrap();
834 }
835
836 fn test(addr: Multiaddr) {
837 #[cfg(feature = "async-io")]
838 {
839 let (ready_tx, ready_rx) = mpsc::channel(1);
840 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
841 let dialer = dialer::<async_io::Tcp>(ready_rx);
842 let listener = async_std::task::spawn(listener);
843 async_std::task::block_on(dialer);
844 async_std::task::block_on(listener);
845 }
846
847 #[cfg(feature = "tokio")]
848 {
849 let (ready_tx, ready_rx) = mpsc::channel(1);
850 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
851 let dialer = dialer::<tokio::Tcp>(ready_rx);
852 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
853 let tasks = tokio_crate::task::LocalSet::new();
854 let listener = tasks.spawn_local(listener);
855 tasks.block_on(&rt, dialer);
856 tasks.block_on(&rt, listener).unwrap();
857 }
858 }
859
860 test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
861 test("/ip6/::1/tcp/0".parse().unwrap());
862 }
863
864 #[test]
865 fn port_reuse_dialing() {
866 env_logger::try_init().ok();
867
868 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
869 let tcp = GenTcpConfig::<T>::new();
870 let mut listener = tcp.listen_on(addr).unwrap();
871 loop {
872 match listener.next().await.unwrap().unwrap() {
873 ListenerEvent::NewAddress(listen_addr) => {
874 ready_tx.send(listen_addr).await.ok();
875 }
876 ListenerEvent::Upgrade { upgrade, .. } => {
877 let mut upgrade = upgrade.await.unwrap();
878 let mut buf = [0u8; 3];
879 upgrade.read_exact(&mut buf).await.unwrap();
880 assert_eq!(buf, [1, 2, 3]);
881 upgrade.write_all(&[4, 5, 6]).await.unwrap();
882 return
883 }
884 e => panic!("Unexpected event: {:?}", e),
885 }
886 }
887 }
888
889 async fn dialer<T: Provider>(addr: Multiaddr, mut ready_rx: mpsc::Receiver<Multiaddr>) {
890 let dest_addr = ready_rx.next().await.unwrap();
891 let tcp = GenTcpConfig::<T>::new().port_reuse(true);
892 let mut listener = tcp.clone().listen_on(addr).unwrap();
893 match listener.next().await.unwrap().unwrap() {
894 ListenerEvent::NewAddress(_) => {
895 let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
897 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
898 let mut buf = [0u8; 3];
900 socket.read_exact(&mut buf).await.unwrap();
901 assert_eq!(buf, [4, 5, 6]);
902 }
903 e => panic!("Unexpected listener event: {:?}", e)
904 }
905 }
906
907 fn test(addr: Multiaddr) {
908 #[cfg(feature = "async-io")]
909 {
910 let (ready_tx, ready_rx) = mpsc::channel(1);
911 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
912 let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx);
913 let listener = async_std::task::spawn(listener);
914 async_std::task::block_on(dialer);
915 async_std::task::block_on(listener);
916 }
917
918 #[cfg(feature = "tokio")]
919 {
920 let (ready_tx, ready_rx) = mpsc::channel(1);
921 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
922 let dialer = dialer::<tokio::Tcp>(addr.clone(), ready_rx);
923 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
924 let tasks = tokio_crate::task::LocalSet::new();
925 let listener = tasks.spawn_local(listener);
926 tasks.block_on(&rt, dialer);
927 tasks.block_on(&rt, listener).unwrap();
928 }
929 }
930
931 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
932 test("/ip6/::1/tcp/0".parse().unwrap());
933 }
934
935 #[test]
936 fn port_reuse_listening() {
937 env_logger::try_init().ok();
938
939 async fn listen_twice<T: Provider>(addr: Multiaddr) {
940 let tcp = GenTcpConfig::<T>::new().port_reuse(true);
941 let mut listener1 = tcp.clone().listen_on(addr).unwrap();
942 match listener1.next().await.unwrap().unwrap() {
943 ListenerEvent::NewAddress(addr1) => {
944 let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap();
946 match listener2.next().await.unwrap().unwrap() {
947 ListenerEvent::NewAddress(addr2) => {
948 assert_eq!(addr1, addr2);
949 return
950 }
951 e => panic!("Unexpected listener event: {:?}", e),
952 }
953 }
954 e => panic!("Unexpected listener event: {:?}", e),
955 }
956 }
957
958 fn test(addr: Multiaddr) {
959 #[cfg(feature = "async-io")]
960 {
961 let listener = listen_twice::<async_io::Tcp>(addr.clone());
962 async_std::task::block_on(listener);
963 }
964
965 #[cfg(feature = "tokio")]
966 {
967 let listener = listen_twice::<tokio::Tcp>(addr.clone());
968 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
969 rt.block_on(listener);
970 }
971 }
972
973 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
974 }
975
976 #[test]
977 fn listen_port_0() {
978 env_logger::try_init().ok();
979
980 async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
981 GenTcpConfig::<T>::new()
982 .listen_on(addr)
983 .unwrap()
984 .next()
985 .await
986 .expect("some event")
987 .expect("no error")
988 .into_new_address()
989 .expect("listen address")
990 }
991
992 fn test(addr: Multiaddr) {
993 #[cfg(feature = "async-io")]
994 {
995 let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
996 assert!(!new_addr.to_string().contains("tcp/0"));
997 }
998
999 #[cfg(feature = "tokio")]
1000 {
1001 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
1002 let new_addr = rt.block_on(listen::<tokio::Tcp>(addr.clone()));
1003 assert!(!new_addr.to_string().contains("tcp/0"));
1004 }
1005 }
1006
1007 test("/ip6/::1/tcp/0".parse().unwrap());
1008 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1009 }
1010
1011 #[test]
1012 fn listen_invalid_addr() {
1013 env_logger::try_init().ok();
1014
1015 fn test(addr: Multiaddr) {
1016 #[cfg(feature = "async-io")]
1017 {
1018 let tcp = TcpConfig::new();
1019 assert!(tcp.listen_on(addr.clone()).is_err());
1020 }
1021
1022 #[cfg(feature = "tokio")]
1023 {
1024 let tcp = TokioTcpConfig::new();
1025 assert!(tcp.listen_on(addr.clone()).is_err());
1026 }
1027 }
1028
1029 test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1030 }
1031}