1mod provider;
30
31#[cfg(feature = "async-io")]
32pub use provider::async_io;
33
34#[cfg(feature = "async-io")]
36pub type TcpConfig = GenTcpConfig<async_io::Tcp>;
37
38#[cfg(feature = "tokio")]
39pub use provider::tokio;
40
41#[cfg(feature = "tokio")]
43pub type TokioTcpConfig = GenTcpConfig<tokio::Tcp>;
44
45use futures::{
46 future::{self, BoxFuture, Ready},
47 prelude::*,
48 ready,
49};
50use futures_timer::Delay;
51use tet_libp2p_core::{
52 address_translation,
53 multiaddr::{Multiaddr, Protocol},
54 transport::{ListenerEvent, Transport, TransportError},
55};
56use socket2::{Domain, Socket, Type};
57use std::{
58 collections::HashSet,
59 io,
60 net::{SocketAddr, IpAddr, TcpListener},
61 pin::Pin,
62 sync::{Arc, RwLock},
63 task::{Context, Poll},
64 time::Duration,
65};
66
67use provider::{Provider, IfEvent};
68
69#[derive(Clone, Debug)]
76pub struct GenTcpConfig<T> {
77 _impl: std::marker::PhantomData<T>,
79 ttl: Option<u32>,
81 nodelay: Option<bool>,
83 backlog: u32,
85 port_reuse: PortReuse,
87}
88
89type Port = u16;
90
91#[derive(Debug, Clone)]
93enum PortReuse {
94 Disabled,
97 Enabled {
102 listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>
105 },
106}
107
108impl PortReuse {
109 fn register(&mut self, ip: IpAddr, port: Port) {
113 if let PortReuse::Enabled { listen_addrs } = self {
114 log::trace!("Registering for port reuse: {}:{}", ip, port);
115 listen_addrs
116 .write()
117 .expect("`register()` and `unregister()` never panic while holding the lock")
118 .insert((ip, port));
119 }
120 }
121
122 fn unregister(&mut self, ip: IpAddr, port: Port) {
126 if let PortReuse::Enabled { listen_addrs } = self {
127 log::trace!("Unregistering for port reuse: {}:{}", ip, port);
128 listen_addrs
129 .write()
130 .expect("`register()` and `unregister()` never panic while holding the lock")
131 .remove(&(ip, port));
132 }
133 }
134
135 fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
145 if let PortReuse::Enabled { listen_addrs } = self {
146 for (ip, port) in listen_addrs
147 .read()
148 .expect("`register()` and `unregister()` never panic while holding the lock")
149 .iter()
150 {
151 if ip.is_ipv4() == remote_ip.is_ipv4()
152 && ip.is_loopback() == remote_ip.is_loopback()
153 {
154 return Some(SocketAddr::new(*ip, *port))
155 }
156 }
157 }
158
159 None
160 }
161}
162
163impl<T> GenTcpConfig<T>
164where
165 T: Provider + Send,
166{
167 pub fn new() -> Self {
178 Self {
179 ttl: None,
180 nodelay: None,
181 backlog: 1024,
182 port_reuse: PortReuse::Disabled,
183 _impl: std::marker::PhantomData,
184 }
185 }
186
187 pub fn ttl(mut self, value: u32) -> Self {
189 self.ttl = Some(value);
190 self
191 }
192
193 pub fn nodelay(mut self, value: bool) -> Self {
195 self.nodelay = Some(value);
196 self
197 }
198
199 pub fn listen_backlog(mut self, backlog: u32) -> Self {
201 self.backlog = backlog;
202 self
203 }
204
205 pub fn port_reuse(mut self, port_reuse: bool) -> Self {
303 self.port_reuse = if port_reuse {
304 PortReuse::Enabled {
305 listen_addrs: Arc::new(RwLock::new(HashSet::new()))
306 }
307 } else {
308 PortReuse::Disabled
309 };
310
311 self
312 }
313
314 fn create_socket(&self, socket_addr: &SocketAddr) -> io::Result<Socket> {
315 let domain = if socket_addr.is_ipv4() {
316 Domain::ipv4()
317 } else {
318 Domain::ipv6()
319 };
320 let socket = Socket::new(domain, Type::stream(), Some(socket2::Protocol::tcp()))?;
321 if socket_addr.is_ipv6() {
322 socket.set_only_v6(true)?;
323 }
324 if let Some(ttl) = self.ttl {
325 socket.set_ttl(ttl)?;
326 }
327 if let Some(nodelay) = self.nodelay {
328 socket.set_nodelay(nodelay)?;
329 }
330 socket.set_reuse_address(true)?;
331 #[cfg(unix)]
332 if let PortReuse::Enabled { .. } = &self.port_reuse {
333 socket.set_reuse_port(true)?;
334 }
335 Ok(socket)
336 }
337
338 fn do_listen(self, socket_addr: SocketAddr) -> io::Result<TcpListenStream<T>> {
339 let socket = self.create_socket(&socket_addr)?;
340 socket.bind(&socket_addr.into())?;
341 socket.listen(self.backlog as _)?;
342 socket.set_nonblocking(true)?;
343 TcpListenStream::<T>::new(socket.into_tcp_listener(), self.port_reuse)
344 }
345
346 async fn do_dial(self, socket_addr: SocketAddr) -> Result<T::Stream, io::Error> {
347 let socket = self.create_socket(&socket_addr)?;
348
349 if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) {
350 log::trace!("Binding dial socket to listen socket {}", addr);
351 socket.bind(&addr.into())?;
352 }
353
354 socket.set_nonblocking(true)?;
355
356 match socket.connect(&socket_addr.into()) {
357 Ok(()) => {}
358 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
359 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
360 Err(err) => return Err(err),
361 };
362
363 let stream = T::new_stream(socket.into_tcp_stream()).await?;
364 Ok(stream)
365 }
366}
367
368impl<T> Transport for GenTcpConfig<T>
369where
370 T: Provider + Send + 'static,
371 T::Listener: Unpin,
372 T::IfWatcher: Unpin,
373 T::Stream: Unpin,
374{
375 type Output = T::Stream;
376 type Error = io::Error;
377 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
378 type Listener = TcpListenStream<T>;
379 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
380
381 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
382 let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
383 sa
384 } else {
385 return Err(TransportError::MultiaddrNotSupported(addr));
386 };
387 log::debug!("listening on {}", socket_addr);
388 self.do_listen(socket_addr)
389 .map_err(|e| TransportError::Other(e))
390 }
391
392 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
393 let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
394 if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
395 return Err(TransportError::MultiaddrNotSupported(addr));
396 }
397 socket_addr
398 } else {
399 return Err(TransportError::MultiaddrNotSupported(addr));
400 };
401 log::debug!("dialing {}", socket_addr);
402 Ok(Box::pin(self.do_dial(socket_addr)))
403 }
404
405 fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
423 match &self.port_reuse {
424 PortReuse::Disabled => address_translation(listen, observed),
425 PortReuse::Enabled { .. } => Some(observed.clone()),
426 }
427 }
428}
429
430type TcpListenerEvent<S> = ListenerEvent<Ready<Result<S, io::Error>>, io::Error>;
431
432enum IfWatch<TIfWatcher> {
433 Pending(BoxFuture<'static, io::Result<TIfWatcher>>),
434 Ready(TIfWatcher),
435}
436
437enum InAddr<TIfWatcher> {
439 One {
441 addr: IpAddr,
442 out: Option<Multiaddr>
443 },
444 Any {
446 addrs: HashSet<IpAddr>,
447 if_watch: IfWatch<TIfWatcher>,
448 }
449}
450
451pub struct TcpListenStream<T>
453where
454 T: Provider
455{
456 listen_addr: SocketAddr,
460 listener: T::Listener,
462 in_addr: InAddr<T::IfWatcher>,
468 port_reuse: PortReuse,
475 sleep_on_error: Duration,
478 pause: Option<Delay>,
480}
481
482impl<T> TcpListenStream<T>
483where
484 T: Provider
485{
486 fn new(listener: TcpListener, port_reuse: PortReuse) -> io::Result<Self> {
489 let listen_addr = listener.local_addr()?;
490
491 let in_addr = if match &listen_addr {
492 SocketAddr::V4(a) => a.ip().is_unspecified(),
493 SocketAddr::V6(a) => a.ip().is_unspecified(),
494 } {
495 InAddr::Any {
498 addrs: HashSet::new(),
499 if_watch: IfWatch::Pending(T::if_watcher()),
500 }
501 } else {
502 InAddr::One {
503 out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())),
504 addr: listen_addr.ip(),
505 }
506 };
507
508 let listener = T::new_listener(listener)?;
509
510 Ok(TcpListenStream {
511 port_reuse,
512 listener,
513 listen_addr,
514 in_addr,
515 pause: None,
516 sleep_on_error: Duration::from_millis(100),
517 })
518 }
519
520 fn disable_port_reuse(&mut self) {
527 match &self.in_addr {
528 InAddr::One { addr, .. } => {
529 self.port_reuse.unregister(*addr, self.listen_addr.port());
530 },
531 InAddr::Any { addrs, .. } => {
532 for addr in addrs {
533 self.port_reuse.unregister(*addr, self.listen_addr.port());
534 }
535 }
536 }
537 }
538}
539
540impl<T> Drop for TcpListenStream<T>
541where
542 T: Provider
543{
544 fn drop(&mut self) {
545 self.disable_port_reuse();
546 }
547}
548
549impl<T> Stream for TcpListenStream<T>
550where
551 T: Provider,
552 T::Listener: Unpin,
553 T::Stream: Unpin,
554 T::IfWatcher: Unpin,
555{
556 type Item = Result<TcpListenerEvent<T::Stream>, io::Error>;
557
558 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
559 let me = Pin::into_inner(self);
560
561 loop {
562 match &mut me.in_addr {
563 InAddr::Any { if_watch, addrs } => match if_watch {
564 IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) {
566 Ok(w) => {
567 *if_watch = IfWatch::Ready(w);
568 continue
569 }
570 Err(err) => {
571 log::debug! {
572 "Failed to begin observing interfaces: {:?}. Scheduling retry.",
573 err
574 };
575 *if_watch = IfWatch::Pending(T::if_watcher());
576 me.pause = Some(Delay::new(me.sleep_on_error));
577 return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
578 }
579 },
580 IfWatch::Ready(watch) => while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) {
582 match ev {
583 Ok(IfEvent::Up(inet)) => {
584 let ip = inet.addr();
585 if me.listen_addr.is_ipv4() == ip.is_ipv4() {
586 if addrs.insert(ip) {
587 let ma = ip_to_multiaddr(ip, me.listen_addr.port());
588 log::debug!("New listen address: {}", ma);
589 me.port_reuse.register(ip, me.listen_addr.port());
590 return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
591 }
592 }
593 }
594 Ok(IfEvent::Down(inet)) => {
595 let ip = inet.addr();
596 if me.listen_addr.is_ipv4() == ip.is_ipv4() {
597 if addrs.remove(&ip) {
598 let ma = ip_to_multiaddr(ip, me.listen_addr.port());
599 log::debug!("Expired listen address: {}", ma);
600 me.port_reuse.unregister(ip, me.listen_addr.port());
601 return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
602 }
603 }
604 }
605 Err(err) => {
606 log::debug! {
607 "Failure polling interfaces: {:?}. Scheduling retry.",
608 err
609 };
610 me.pause = Some(Delay::new(me.sleep_on_error));
611 return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
612 }
613 }
614 },
615 },
616 InAddr::One { addr, out } => if let Some(multiaddr) = out.take() {
619 me.port_reuse.register(*addr, me.listen_addr.port());
620 return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(multiaddr))))
621 }
622 }
623
624 if let Some(mut pause) = me.pause.take() {
625 match Pin::new(&mut pause).poll(cx) {
626 Poll::Ready(_) => {}
627 Poll::Pending => {
628 me.pause = Some(pause);
629 return Poll::Pending;
630 }
631 }
632 }
633
634 let incoming = match T::poll_accept(&mut me.listener, cx) {
636 Poll::Pending => return Poll::Pending,
637 Poll::Ready(Ok(incoming)) => incoming,
638 Poll::Ready(Err(e)) => {
639 log::error!("error accepting incoming connection: {}", e);
641 me.pause = Some(Delay::new(me.sleep_on_error));
642 return Poll::Ready(Some(Ok(ListenerEvent::Error(e))));
643 }
644 };
645
646 let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
647 let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());
648
649 log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
650
651 return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
652 upgrade: future::ok(incoming.stream),
653 local_addr,
654 remote_addr,
655 })));
656 }
657 }
658}
659
660fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
662 let mut iter = addr.iter();
663 let proto1 = iter.next().ok_or(())?;
664 let proto2 = iter.next().ok_or(())?;
665
666 if iter.next().is_some() {
667 return Err(());
668 }
669
670 match (proto1, proto2) {
671 (Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
672 (Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
673 _ => Err(()),
674 }
675}
676
677fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
679 Multiaddr::empty()
680 .with(ip.into())
681 .with(Protocol::Tcp(port))
682}
683
684#[cfg(test)]
685mod tests {
686 use futures::channel::mpsc;
687 use super::*;
688
689 #[test]
690 fn multiaddr_to_tcp_conversion() {
691 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
692
693 assert!(
694 multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
695 .is_err()
696 );
697
698 assert_eq!(
699 multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
700 Ok(SocketAddr::new(
701 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
702 12345,
703 ))
704 );
705 assert_eq!(
706 multiaddr_to_socketaddr(
707 &"/ip4/255.255.255.255/tcp/8080"
708 .parse::<Multiaddr>()
709 .unwrap()
710 ),
711 Ok(SocketAddr::new(
712 IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
713 8080,
714 ))
715 );
716 assert_eq!(
717 multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
718 Ok(SocketAddr::new(
719 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
720 12345,
721 ))
722 );
723 assert_eq!(
724 multiaddr_to_socketaddr(
725 &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
726 .parse::<Multiaddr>()
727 .unwrap()
728 ),
729 Ok(SocketAddr::new(
730 IpAddr::V6(Ipv6Addr::new(
731 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
732 )),
733 8080,
734 ))
735 );
736 }
737
738 #[test]
739 fn communicating_between_dialer_and_listener() {
740 env_logger::try_init().ok();
741
742 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
743 let tcp = GenTcpConfig::<T>::new();
744 let mut listener = tcp.listen_on(addr).unwrap();
745 loop {
746 match listener.next().await.unwrap().unwrap() {
747 ListenerEvent::NewAddress(listen_addr) => {
748 ready_tx.send(listen_addr).await.unwrap();
749 }
750 ListenerEvent::Upgrade { upgrade, .. } => {
751 let mut upgrade = upgrade.await.unwrap();
752 let mut buf = [0u8; 3];
753 upgrade.read_exact(&mut buf).await.unwrap();
754 assert_eq!(buf, [1, 2, 3]);
755 upgrade.write_all(&[4, 5, 6]).await.unwrap();
756 return
757 }
758 e => panic!("Unexpected listener event: {:?}", e),
759 }
760 }
761 }
762
763 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
764 let addr = ready_rx.next().await.unwrap();
765 let tcp = GenTcpConfig::<T>::new();
766
767 let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
769 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
770
771 let mut buf = [0u8; 3];
772 socket.read_exact(&mut buf).await.unwrap();
773 assert_eq!(buf, [4, 5, 6]);
774 }
775
776 fn test(addr: Multiaddr) {
777 #[cfg(feature = "async-io")]
778 {
779 let (ready_tx, ready_rx) = mpsc::channel(1);
780 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
781 let dialer = dialer::<async_io::Tcp>(ready_rx);
782 let listener = async_std::task::spawn(listener);
783 async_std::task::block_on(dialer);
784 async_std::task::block_on(listener);
785 }
786
787 #[cfg(feature = "tokio")]
788 {
789 let (ready_tx, ready_rx) = mpsc::channel(1);
790 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
791 let dialer = dialer::<tokio::Tcp>(ready_rx);
792 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
793 let tasks = tokio_crate::task::LocalSet::new();
794 let listener = tasks.spawn_local(listener);
795 tasks.block_on(&rt, dialer);
796 tasks.block_on(&rt, listener).unwrap();
797 }
798 }
799
800 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
801 test("/ip6/::1/tcp/0".parse().unwrap());
802 }
803
804 #[test]
805 fn wildcard_expansion() {
806 env_logger::try_init().ok();
807
808 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
809 let tcp = GenTcpConfig::<T>::new();
810 let mut listener = tcp.listen_on(addr).unwrap();
811
812 loop {
813 match listener.next().await.unwrap().unwrap() {
814 ListenerEvent::NewAddress(a) => {
815 let mut iter = a.iter();
816 match iter.next().expect("ip address") {
817 Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
818 Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
819 other => panic!("Unexpected protocol: {}", other),
820 }
821 if let Protocol::Tcp(port) = iter.next().expect("port") {
822 assert_ne!(0, port)
823 } else {
824 panic!("No TCP port in address: {}", a)
825 }
826 ready_tx.send(a).await.ok();
827 return
828 }
829 _ => {}
830 }
831 }
832 }
833
834 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
835 let dest_addr = ready_rx.next().await.unwrap();
836 let tcp = GenTcpConfig::<T>::new();
837 tcp.dial(dest_addr).unwrap().await.unwrap();
838 }
839
840 fn test(addr: Multiaddr) {
841 #[cfg(feature = "async-io")]
842 {
843 let (ready_tx, ready_rx) = mpsc::channel(1);
844 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
845 let dialer = dialer::<async_io::Tcp>(ready_rx);
846 let listener = async_std::task::spawn(listener);
847 async_std::task::block_on(dialer);
848 async_std::task::block_on(listener);
849 }
850
851 #[cfg(feature = "tokio")]
852 {
853 let (ready_tx, ready_rx) = mpsc::channel(1);
854 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
855 let dialer = dialer::<tokio::Tcp>(ready_rx);
856 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
857 let tasks = tokio_crate::task::LocalSet::new();
858 let listener = tasks.spawn_local(listener);
859 tasks.block_on(&rt, dialer);
860 tasks.block_on(&rt, listener).unwrap();
861 }
862 }
863
864 test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
865 test("/ip6/::1/tcp/0".parse().unwrap());
866 }
867
868 #[test]
869 fn port_reuse_dialing() {
870 env_logger::try_init().ok();
871
872 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
873 let tcp = GenTcpConfig::<T>::new();
874 let mut listener = tcp.listen_on(addr).unwrap();
875 loop {
876 match listener.next().await.unwrap().unwrap() {
877 ListenerEvent::NewAddress(listen_addr) => {
878 ready_tx.send(listen_addr).await.ok();
879 }
880 ListenerEvent::Upgrade { upgrade, .. } => {
881 let mut upgrade = upgrade.await.unwrap();
882 let mut buf = [0u8; 3];
883 upgrade.read_exact(&mut buf).await.unwrap();
884 assert_eq!(buf, [1, 2, 3]);
885 upgrade.write_all(&[4, 5, 6]).await.unwrap();
886 return
887 }
888 e => panic!("Unexpected event: {:?}", e),
889 }
890 }
891 }
892
893 async fn dialer<T: Provider>(addr: Multiaddr, mut ready_rx: mpsc::Receiver<Multiaddr>) {
894 let dest_addr = ready_rx.next().await.unwrap();
895 let tcp = GenTcpConfig::<T>::new().port_reuse(true);
896 let mut listener = tcp.clone().listen_on(addr).unwrap();
897 match listener.next().await.unwrap().unwrap() {
898 ListenerEvent::NewAddress(_) => {
899 let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
901 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
902 let mut buf = [0u8; 3];
904 socket.read_exact(&mut buf).await.unwrap();
905 assert_eq!(buf, [4, 5, 6]);
906 }
907 e => panic!("Unexpected listener event: {:?}", e)
908 }
909 }
910
911 fn test(addr: Multiaddr) {
912 #[cfg(feature = "async-io")]
913 {
914 let (ready_tx, ready_rx) = mpsc::channel(1);
915 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
916 let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx);
917 let listener = async_std::task::spawn(listener);
918 async_std::task::block_on(dialer);
919 async_std::task::block_on(listener);
920 }
921
922 #[cfg(feature = "tokio")]
923 {
924 let (ready_tx, ready_rx) = mpsc::channel(1);
925 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
926 let dialer = dialer::<tokio::Tcp>(addr.clone(), ready_rx);
927 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
928 let tasks = tokio_crate::task::LocalSet::new();
929 let listener = tasks.spawn_local(listener);
930 tasks.block_on(&rt, dialer);
931 tasks.block_on(&rt, listener).unwrap();
932 }
933 }
934
935 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
936 test("/ip6/::1/tcp/0".parse().unwrap());
937 }
938
939 #[test]
940 fn port_reuse_listening() {
941 env_logger::try_init().ok();
942
943 async fn listen_twice<T: Provider>(addr: Multiaddr) {
944 let tcp = GenTcpConfig::<T>::new().port_reuse(true);
945 let mut listener1 = tcp.clone().listen_on(addr).unwrap();
946 match listener1.next().await.unwrap().unwrap() {
947 ListenerEvent::NewAddress(addr1) => {
948 let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap();
950 match listener2.next().await.unwrap().unwrap() {
951 ListenerEvent::NewAddress(addr2) => {
952 assert_eq!(addr1, addr2);
953 return
954 }
955 e => panic!("Unexpected listener event: {:?}", e),
956 }
957 }
958 e => panic!("Unexpected listener event: {:?}", e),
959 }
960 }
961
962 fn test(addr: Multiaddr) {
963 #[cfg(feature = "async-io")]
964 {
965 let listener = listen_twice::<async_io::Tcp>(addr.clone());
966 async_std::task::block_on(listener);
967 }
968
969 #[cfg(feature = "tokio")]
970 {
971 let listener = listen_twice::<tokio::Tcp>(addr.clone());
972 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
973 rt.block_on(listener);
974 }
975 }
976
977 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
978 }
979
980 #[test]
981 fn listen_port_0() {
982 env_logger::try_init().ok();
983
984 async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
985 GenTcpConfig::<T>::new()
986 .listen_on(addr)
987 .unwrap()
988 .next()
989 .await
990 .expect("some event")
991 .expect("no error")
992 .into_new_address()
993 .expect("listen address")
994 }
995
996 fn test(addr: Multiaddr) {
997 #[cfg(feature = "async-io")]
998 {
999 let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
1000 assert!(!new_addr.to_string().contains("tcp/0"));
1001 }
1002
1003 #[cfg(feature = "tokio")]
1004 {
1005 let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
1006 let new_addr = rt.block_on(listen::<tokio::Tcp>(addr.clone()));
1007 assert!(!new_addr.to_string().contains("tcp/0"));
1008 }
1009 }
1010
1011 test("/ip6/::1/tcp/0".parse().unwrap());
1012 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1013 }
1014
1015 #[test]
1016 fn listen_invalid_addr() {
1017 env_logger::try_init().ok();
1018
1019 fn test(addr: Multiaddr) {
1020 #[cfg(feature = "async-io")]
1021 {
1022 let tcp = TcpConfig::new();
1023 assert!(tcp.listen_on(addr.clone()).is_err());
1024 }
1025
1026 #[cfg(feature = "tokio")]
1027 {
1028 let tcp = TokioTcpConfig::new();
1029 assert!(tcp.listen_on(addr.clone()).is_err());
1030 }
1031 }
1032
1033 test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1034 }
1035}