1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
30
31mod provider;
32
33use std::{
34 collections::{HashSet, VecDeque},
35 io,
36 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
37 pin::Pin,
38 sync::{Arc, RwLock},
39 task::{Context, Poll, Waker},
40 time::Duration,
41};
42
43use futures::{future::Ready, prelude::*, stream::SelectAll};
44use futures_timer::Delay;
45use if_watch::IfEvent;
46use libp2p_core::{
47 multiaddr::{Multiaddr, Protocol},
48 transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
49};
50#[cfg(feature = "tokio")]
51pub use provider::tokio;
52use provider::{Incoming, Provider};
53use socket2::{Domain, Socket, Type};
54
55#[derive(Clone, Debug)]
57pub struct Config {
58 ttl: Option<u32>,
60 nodelay: bool,
62 backlog: u32,
64}
65
66type Port = u16;
67
68#[derive(Debug, Clone, Default)]
70struct PortReuse {
71 listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
74}
75
76impl PortReuse {
77 fn register(&mut self, ip: IpAddr, port: Port) {
81 tracing::trace!(%ip, %port, "Registering for port reuse");
82 self.listen_addrs
83 .write()
84 .expect("`register()` and `unregister()` never panic while holding the lock")
85 .insert((ip, port));
86 }
87
88 fn unregister(&mut self, ip: IpAddr, port: Port) {
92 tracing::trace!(%ip, %port, "Unregistering for port reuse");
93 self.listen_addrs
94 .write()
95 .expect("`register()` and `unregister()` never panic while holding the lock")
96 .remove(&(ip, port));
97 }
98
99 fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
109 for (ip, port) in self
110 .listen_addrs
111 .read()
112 .expect("`local_dial_addr` never panic while holding the lock")
113 .iter()
114 {
115 if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() {
116 if remote_ip.is_ipv4() {
117 return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
118 } else {
119 return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
120 }
121 }
122 }
123
124 None
125 }
126}
127
128impl Config {
129 pub fn new() -> Self {
137 Self {
138 ttl: None,
139 nodelay: true, backlog: 1024,
141 }
142 }
143
144 pub fn ttl(mut self, value: u32) -> Self {
146 self.ttl = Some(value);
147 self
148 }
149
150 pub fn nodelay(mut self, value: bool) -> Self {
152 self.nodelay = value;
153 self
154 }
155
156 pub fn listen_backlog(mut self, backlog: u32) -> Self {
158 self.backlog = backlog;
159 self
160 }
161
162 #[deprecated(
180 since = "0.42.0",
181 note = "This option does nothing now, since the port reuse policy is now decided on a per-connection basis by the behaviour. The function will be removed in a future release."
182 )]
183 pub fn port_reuse(self, _port_reuse: bool) -> Self {
184 self
185 }
186
187 fn create_socket(&self, socket_addr: SocketAddr, port_use: PortUse) -> io::Result<Socket> {
188 let socket = Socket::new(
189 Domain::for_address(socket_addr),
190 Type::STREAM,
191 Some(socket2::Protocol::TCP),
192 )?;
193 if socket_addr.is_ipv6() {
194 socket.set_only_v6(true)?;
195 }
196 if let Some(ttl) = self.ttl {
197 socket.set_ttl(ttl)?;
198 }
199 socket.set_nodelay(self.nodelay)?;
200 socket.set_reuse_address(true)?;
201 #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
202 if port_use == PortUse::Reuse {
203 socket.set_reuse_port(true)?;
204 }
205
206 #[cfg(not(all(unix, not(any(target_os = "solaris", target_os = "illumos")))))]
207 let _ = port_use; socket.set_nonblocking(true)?;
210
211 Ok(socket)
212 }
213}
214
215impl Default for Config {
216 fn default() -> Self {
217 Self::new()
218 }
219}
220
221pub struct Transport<T>
227where
228 T: Provider + Send,
229{
230 config: Config,
231
232 port_reuse: PortReuse,
234 listeners: SelectAll<ListenStream<T>>,
238 pending_events:
240 VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
241}
242
243impl<T> Transport<T>
244where
245 T: Provider + Send,
246{
247 pub fn new(config: Config) -> Self {
255 Transport {
256 config,
257 ..Default::default()
258 }
259 }
260
261 fn do_listen(
262 &mut self,
263 id: ListenerId,
264 socket_addr: SocketAddr,
265 ) -> io::Result<ListenStream<T>> {
266 let socket = self.config.create_socket(socket_addr, PortUse::Reuse)?;
267 socket.bind(&socket_addr.into())?;
268 socket.listen(self.config.backlog as _)?;
269 socket.set_nonblocking(true)?;
270 let listener: TcpListener = socket.into();
271 let local_addr = listener.local_addr()?;
272
273 if local_addr.ip().is_unspecified() {
274 return ListenStream::<T>::new(
275 id,
276 listener,
277 Some(T::new_if_watcher()?),
278 self.port_reuse.clone(),
279 );
280 }
281
282 self.port_reuse.register(local_addr.ip(), local_addr.port());
283 let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
284 self.pending_events.push_back(TransportEvent::NewAddress {
285 listener_id: id,
286 listen_addr,
287 });
288 ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
289 }
290}
291
292impl<T> Default for Transport<T>
293where
294 T: Provider + Send,
295{
296 fn default() -> Self {
300 Transport {
301 port_reuse: PortReuse::default(),
302 config: Config::default(),
303 listeners: SelectAll::new(),
304 pending_events: VecDeque::new(),
305 }
306 }
307}
308
309impl<T> libp2p_core::Transport for Transport<T>
310where
311 T: Provider + Send + 'static,
312 T::Listener: Unpin,
313 T::Stream: Unpin,
314{
315 type Output = T::Stream;
316 type Error = io::Error;
317 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
318 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
319
320 fn listen_on(
321 &mut self,
322 id: ListenerId,
323 addr: Multiaddr,
324 ) -> Result<(), TransportError<Self::Error>> {
325 let socket_addr = multiaddr_to_socketaddr(addr.clone())
326 .map_err(|_| TransportError::MultiaddrNotSupported(addr))?;
327 tracing::debug!("listening on {}", socket_addr);
328 let listener = self
329 .do_listen(id, socket_addr)
330 .map_err(TransportError::Other)?;
331 self.listeners.push(listener);
332 Ok(())
333 }
334
335 fn remove_listener(&mut self, id: ListenerId) -> bool {
336 if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
337 listener.close(Ok(()));
338 true
339 } else {
340 false
341 }
342 }
343
344 fn dial(
345 &mut self,
346 addr: Multiaddr,
347 opts: DialOpts,
348 ) -> Result<Self::Dial, TransportError<Self::Error>> {
349 let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
350 if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
351 return Err(TransportError::MultiaddrNotSupported(addr));
352 }
353 socket_addr
354 } else {
355 return Err(TransportError::MultiaddrNotSupported(addr));
356 };
357 tracing::debug!(address=%socket_addr, "dialing address");
358
359 let socket = self
360 .config
361 .create_socket(socket_addr, opts.port_use)
362 .map_err(TransportError::Other)?;
363
364 let bind_addr = match self.port_reuse.local_dial_addr(&socket_addr.ip()) {
365 Some(socket_addr) if opts.port_use == PortUse::Reuse => {
366 tracing::trace!(address=%addr, "Binding dial socket to listen socket address");
367 Some(socket_addr)
368 }
369 _ => None,
370 };
371
372 let local_config = self.config.clone();
373
374 Ok(async move {
375 if let Some(bind_addr) = bind_addr {
376 socket.bind(&bind_addr.into())?;
377 }
378
379 let socket = match (socket.connect(&socket_addr.into()), bind_addr) {
382 (Ok(()), _) => socket,
383 (Err(err), _) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
384 (Err(err), _) if err.kind() == io::ErrorKind::WouldBlock => socket,
385 (Err(err), Some(bind_addr)) if err.kind() == io::ErrorKind::AddrNotAvailable => {
386 tracing::debug!(connect_addr = %socket_addr, ?bind_addr, "Failed to connect using existing socket because we already have a connection, re-dialing with new port");
389 std::mem::drop(socket);
390 let socket = local_config.create_socket(socket_addr, PortUse::New)?;
391 match socket.connect(&socket_addr.into()) {
392 Ok(()) => socket,
393 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
394 Err(err) if err.kind() == io::ErrorKind::WouldBlock => socket,
395 Err(err) => return Err(err),
396 }
397 }
398 (Err(err), _) => return Err(err),
399 };
400
401 let stream = T::new_stream(socket.into()).await?;
402 Ok(stream)
403 }
404 .boxed())
405 }
406
407 #[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))]
409 fn poll(
410 mut self: Pin<&mut Self>,
411 cx: &mut Context<'_>,
412 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
413 if let Some(event) = self.pending_events.pop_front() {
415 return Poll::Ready(event);
416 }
417
418 match self.listeners.poll_next_unpin(cx) {
419 Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
420 _ => Poll::Pending,
421 }
422 }
423}
424
425struct ListenStream<T>
427where
428 T: Provider,
429{
430 listener_id: ListenerId,
432 listen_addr: SocketAddr,
436 listener: T::Listener,
438 if_watcher: Option<T::IfWatcher>,
444 port_reuse: PortReuse,
451 sleep_on_error: Duration,
454 pause: Option<Delay>,
456 pending_event: Option<<Self as Stream>::Item>,
458 is_closed: bool,
461 close_listener_waker: Option<Waker>,
463}
464
465impl<T> ListenStream<T>
466where
467 T: Provider,
468{
469 fn new(
472 listener_id: ListenerId,
473 listener: TcpListener,
474 if_watcher: Option<T::IfWatcher>,
475 port_reuse: PortReuse,
476 ) -> io::Result<Self> {
477 let listen_addr = listener.local_addr()?;
478 let listener = T::new_listener(listener)?;
479
480 Ok(ListenStream {
481 port_reuse,
482 listener,
483 listener_id,
484 listen_addr,
485 if_watcher,
486 pause: None,
487 sleep_on_error: Duration::from_millis(100),
488 pending_event: None,
489 is_closed: false,
490 close_listener_waker: None,
491 })
492 }
493
494 fn disable_port_reuse(&mut self) {
501 match &self.if_watcher {
502 Some(if_watcher) => {
503 for ip_net in T::addrs(if_watcher) {
504 self.port_reuse
505 .unregister(ip_net.addr(), self.listen_addr.port());
506 }
507 }
508 None => self
509 .port_reuse
510 .unregister(self.listen_addr.ip(), self.listen_addr.port()),
511 }
512 }
513
514 fn close(&mut self, reason: Result<(), io::Error>) {
519 if self.is_closed {
520 return;
521 }
522 self.pending_event = Some(TransportEvent::ListenerClosed {
523 listener_id: self.listener_id,
524 reason,
525 });
526 self.is_closed = true;
527
528 if let Some(waker) = self.close_listener_waker.take() {
530 waker.wake();
531 }
532 }
533
534 fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
536 let Some(if_watcher) = self.if_watcher.as_mut() else {
537 return Poll::Pending;
538 };
539
540 let my_listen_addr_port = self.listen_addr.port();
541
542 while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
543 match event {
544 Ok(IfEvent::Up(inet)) => {
545 let ip = inet.addr();
546 if self.listen_addr.is_ipv4() == ip.is_ipv4() {
547 let ma = ip_to_multiaddr(ip, my_listen_addr_port);
548 tracing::debug!(address=%ma, "New listen address");
549 self.port_reuse.register(ip, my_listen_addr_port);
550 return Poll::Ready(TransportEvent::NewAddress {
551 listener_id: self.listener_id,
552 listen_addr: ma,
553 });
554 }
555 }
556 Ok(IfEvent::Down(inet)) => {
557 let ip = inet.addr();
558 if self.listen_addr.is_ipv4() == ip.is_ipv4() {
559 let ma = ip_to_multiaddr(ip, my_listen_addr_port);
560 tracing::debug!(address=%ma, "Expired listen address");
561 self.port_reuse.unregister(ip, my_listen_addr_port);
562 return Poll::Ready(TransportEvent::AddressExpired {
563 listener_id: self.listener_id,
564 listen_addr: ma,
565 });
566 }
567 }
568 Err(error) => {
569 self.pause = Some(Delay::new(self.sleep_on_error));
570 return Poll::Ready(TransportEvent::ListenerError {
571 listener_id: self.listener_id,
572 error,
573 });
574 }
575 }
576 }
577
578 Poll::Pending
579 }
580}
581
582impl<T> Drop for ListenStream<T>
583where
584 T: Provider,
585{
586 fn drop(&mut self) {
587 self.disable_port_reuse();
588 }
589}
590
591impl<T> Stream for ListenStream<T>
592where
593 T: Provider,
594 T::Listener: Unpin,
595 T::Stream: Unpin,
596{
597 type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
598
599 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
600 if let Some(mut pause) = self.pause.take() {
601 match pause.poll_unpin(cx) {
602 Poll::Ready(_) => {}
603 Poll::Pending => {
604 self.pause = Some(pause);
605 return Poll::Pending;
606 }
607 }
608 }
609
610 if let Some(event) = self.pending_event.take() {
611 return Poll::Ready(Some(event));
612 }
613
614 if self.is_closed {
615 return Poll::Ready(None);
618 }
619
620 if let Poll::Ready(event) = self.poll_if_addr(cx) {
621 return Poll::Ready(Some(event));
622 }
623
624 match T::poll_accept(&mut self.listener, cx) {
626 Poll::Ready(Ok(Incoming {
627 local_addr,
628 remote_addr,
629 stream,
630 })) => {
631 let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
632 let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
633
634 tracing::debug!(
635 remote_address=%remote_addr,
636 local_address=%local_addr,
637 "Incoming connection from remote at local"
638 );
639
640 return Poll::Ready(Some(TransportEvent::Incoming {
641 listener_id: self.listener_id,
642 upgrade: future::ok(stream),
643 local_addr,
644 send_back_addr: remote_addr,
645 }));
646 }
647 Poll::Ready(Err(error)) => {
648 self.pause = Some(Delay::new(self.sleep_on_error));
650 return Poll::Ready(Some(TransportEvent::ListenerError {
651 listener_id: self.listener_id,
652 error,
653 }));
654 }
655 Poll::Pending => {}
656 }
657
658 self.close_listener_waker = Some(cx.waker().clone());
659 Poll::Pending
660 }
661}
662
663fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
668 let mut port = None;
672 while let Some(proto) = addr.pop() {
673 match proto {
674 Protocol::Ip4(ipv4) => match port {
675 Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
676 None => return Err(()),
677 },
678 Protocol::Ip6(ipv6) => match port {
679 Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
680 None => return Err(()),
681 },
682 Protocol::Tcp(portnum) => match port {
683 Some(_) => return Err(()),
684 None => port = Some(portnum),
685 },
686 Protocol::P2p(_) => {}
687 _ => return Err(()),
688 }
689 }
690 Err(())
691}
692
693fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
695 Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
696}
697
698#[cfg(all(test, feature = "tokio"))]
699mod tests {
700 use futures::{
701 channel::{mpsc, oneshot},
702 future::poll_fn,
703 };
704 use libp2p_core::{Endpoint, Transport as _};
705
706 use super::*;
707
708 #[test]
709 fn multiaddr_to_tcp_conversion() {
710 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711
712 assert!(
713 multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
714 .is_err()
715 );
716
717 assert_eq!(
718 multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
719 Ok(SocketAddr::new(
720 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
721 12345,
722 ))
723 );
724 assert_eq!(
725 multiaddr_to_socketaddr(
726 "/ip4/255.255.255.255/tcp/8080"
727 .parse::<Multiaddr>()
728 .unwrap()
729 ),
730 Ok(SocketAddr::new(
731 IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
732 8080,
733 ))
734 );
735 assert_eq!(
736 multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
737 Ok(SocketAddr::new(
738 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
739 12345,
740 ))
741 );
742 assert_eq!(
743 multiaddr_to_socketaddr(
744 "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
745 .parse::<Multiaddr>()
746 .unwrap()
747 ),
748 Ok(SocketAddr::new(
749 IpAddr::V6(Ipv6Addr::new(
750 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
751 )),
752 8080,
753 ))
754 );
755 }
756
757 #[test]
758 fn communicating_between_dialer_and_listener() {
759 let _ = tracing_subscriber::fmt()
760 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
761 .try_init();
762
763 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
764 let mut tcp = Transport::<T>::default().boxed();
765 tcp.listen_on(ListenerId::next(), addr).unwrap();
766 loop {
767 match tcp.select_next_some().await {
768 TransportEvent::NewAddress { listen_addr, .. } => {
769 ready_tx.send(listen_addr).await.unwrap();
770 }
771 TransportEvent::Incoming { upgrade, .. } => {
772 let mut upgrade = upgrade.await.unwrap();
773 let mut buf = [0u8; 3];
774 upgrade.read_exact(&mut buf).await.unwrap();
775 assert_eq!(buf, [1, 2, 3]);
776 upgrade.write_all(&[4, 5, 6]).await.unwrap();
777 return;
778 }
779 e => panic!("Unexpected transport event: {e:?}"),
780 }
781 }
782 }
783
784 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
785 let addr = ready_rx.next().await.unwrap();
786 let mut tcp = Transport::<T>::default();
787
788 let mut socket = tcp
790 .dial(
791 addr.clone(),
792 DialOpts {
793 role: Endpoint::Dialer,
794 port_use: PortUse::Reuse,
795 },
796 )
797 .unwrap()
798 .await
799 .unwrap();
800 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
801
802 let mut buf = [0u8; 3];
803 socket.read_exact(&mut buf).await.unwrap();
804 assert_eq!(buf, [4, 5, 6]);
805 }
806
807 fn test(addr: Multiaddr) {
808 let (ready_tx, ready_rx) = mpsc::channel(1);
809 let listener = listener::<tokio::Tcp>(addr, ready_tx);
810 let dialer = dialer::<tokio::Tcp>(ready_rx);
811 let rt = ::tokio::runtime::Builder::new_current_thread()
812 .enable_io()
813 .build()
814 .unwrap();
815 let tasks = ::tokio::task::LocalSet::new();
816 let listener = tasks.spawn_local(listener);
817 tasks.block_on(&rt, dialer);
818 tasks.block_on(&rt, listener).unwrap();
819 }
820
821 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
822 test("/ip6/::1/tcp/0".parse().unwrap());
823 }
824
825 #[test]
826 fn wildcard_expansion() {
827 let _ = tracing_subscriber::fmt()
828 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
829 .try_init();
830
831 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
832 let mut tcp = Transport::<T>::default().boxed();
833 tcp.listen_on(ListenerId::next(), addr).unwrap();
834
835 loop {
836 match tcp.select_next_some().await {
837 TransportEvent::NewAddress { listen_addr, .. } => {
838 let mut iter = listen_addr.iter();
839 match iter.next().expect("ip address") {
840 Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
841 Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
842 other => panic!("Unexpected protocol: {other}"),
843 }
844 if let Protocol::Tcp(port) = iter.next().expect("port") {
845 assert_ne!(0, port)
846 } else {
847 panic!("No TCP port in address: {listen_addr}")
848 }
849 ready_tx.send(listen_addr).await.ok();
850 }
851 TransportEvent::Incoming { .. } => {
852 return;
853 }
854 _ => {}
855 }
856 }
857 }
858
859 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
860 let dest_addr = ready_rx.next().await.unwrap();
861 let mut tcp = Transport::<T>::default();
862 tcp.dial(
863 dest_addr,
864 DialOpts {
865 role: Endpoint::Dialer,
866 port_use: PortUse::New,
867 },
868 )
869 .unwrap()
870 .await
871 .unwrap();
872 }
873
874 fn test(addr: Multiaddr) {
875 let (ready_tx, ready_rx) = mpsc::channel(1);
876 let listener = listener::<tokio::Tcp>(addr, ready_tx);
877 let dialer = dialer::<tokio::Tcp>(ready_rx);
878 let rt = ::tokio::runtime::Builder::new_current_thread()
879 .enable_io()
880 .build()
881 .unwrap();
882 let tasks = ::tokio::task::LocalSet::new();
883 let listener = tasks.spawn_local(listener);
884 tasks.block_on(&rt, dialer);
885 tasks.block_on(&rt, listener).unwrap();
886 }
887
888 test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
889 test("/ip6/::1/tcp/0".parse().unwrap());
890 }
891
892 #[test]
893 fn port_reuse_dialing() {
894 let _ = tracing_subscriber::fmt()
895 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
896 .try_init();
897
898 async fn listener<T: Provider>(
899 addr: Multiaddr,
900 mut ready_tx: mpsc::Sender<Multiaddr>,
901 port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
902 ) {
903 let mut tcp = Transport::<T>::new(Config::new()).boxed();
904 tcp.listen_on(ListenerId::next(), addr).unwrap();
905 loop {
906 match tcp.select_next_some().await {
907 TransportEvent::NewAddress { listen_addr, .. } => {
908 ready_tx.send(listen_addr).await.ok();
909 }
910 TransportEvent::Incoming {
911 upgrade,
912 mut send_back_addr,
913 ..
914 } => {
915 let remote_port_reuse = port_reuse_rx.await.unwrap();
917 assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
919
920 let mut upgrade = upgrade.await.unwrap();
921 let mut buf = [0u8; 3];
922 upgrade.read_exact(&mut buf).await.unwrap();
923 assert_eq!(buf, [1, 2, 3]);
924 upgrade.write_all(&[4, 5, 6]).await.unwrap();
925 return;
926 }
927 e => panic!("Unexpected event: {e:?}"),
928 }
929 }
930 }
931
932 async fn dialer<T: Provider>(
933 addr: Multiaddr,
934 mut ready_rx: mpsc::Receiver<Multiaddr>,
935 port_reuse_tx: oneshot::Sender<Protocol<'_>>,
936 ) {
937 let dest_addr = ready_rx.next().await.unwrap();
938 let mut tcp = Transport::<T>::new(Config::new());
939 tcp.listen_on(ListenerId::next(), addr).unwrap();
940 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
941 TransportEvent::NewAddress { .. } => {
942 let listener = tcp.listeners.iter().next().unwrap();
944 let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
945 let port_reuse_listener = listener
946 .port_reuse
947 .local_dial_addr(&listener.listen_addr.ip());
948 assert!(port_reuse_tcp.is_some());
949 assert_eq!(port_reuse_tcp, port_reuse_listener);
950
951 port_reuse_tx
953 .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
954 .ok();
955
956 let mut socket = tcp
958 .dial(
959 dest_addr,
960 DialOpts {
961 role: Endpoint::Dialer,
962 port_use: PortUse::Reuse,
963 },
964 )
965 .unwrap()
966 .await
967 .unwrap();
968 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
969 let mut buf = [0u8; 3];
971 socket.read_exact(&mut buf).await.unwrap();
972 assert_eq!(buf, [4, 5, 6]);
973 }
974 e => panic!("Unexpected transport event: {e:?}"),
975 }
976 }
977
978 fn test(addr: Multiaddr) {
979 let (ready_tx, ready_rx) = mpsc::channel(1);
980 let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
981 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
982 let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
983 let rt = ::tokio::runtime::Builder::new_current_thread()
984 .enable_io()
985 .build()
986 .unwrap();
987 let tasks = ::tokio::task::LocalSet::new();
988 let listener = tasks.spawn_local(listener);
989 tasks.block_on(&rt, dialer);
990 tasks.block_on(&rt, listener).unwrap();
991 }
992
993 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
994 test("/ip6/::1/tcp/0".parse().unwrap());
995 }
996
997 #[test]
998 fn port_reuse_listening() {
999 let _ = tracing_subscriber::fmt()
1000 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1001 .try_init();
1002
1003 async fn listen_twice<T: Provider>(addr: Multiaddr) {
1004 let mut tcp = Transport::<T>::new(Config::new());
1005 tcp.listen_on(ListenerId::next(), addr).unwrap();
1006 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1007 TransportEvent::NewAddress {
1008 listen_addr: addr1, ..
1009 } => {
1010 let listener1 = tcp.listeners.iter().next().unwrap();
1011 let port_reuse_tcp =
1012 tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
1013 let port_reuse_listener1 = listener1
1014 .port_reuse
1015 .local_dial_addr(&listener1.listen_addr.ip());
1016 assert!(port_reuse_tcp.is_some());
1017 assert_eq!(port_reuse_tcp, port_reuse_listener1);
1018
1019 tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
1021 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1022 TransportEvent::NewAddress {
1023 listen_addr: addr2, ..
1024 } => assert_eq!(addr1, addr2),
1025 e => panic!("Unexpected transport event: {e:?}"),
1026 }
1027 }
1028 e => panic!("Unexpected transport event: {e:?}"),
1029 }
1030 }
1031
1032 fn test(addr: Multiaddr) {
1033 let listener = listen_twice::<tokio::Tcp>(addr);
1034 let rt = ::tokio::runtime::Builder::new_current_thread()
1035 .enable_io()
1036 .build()
1037 .unwrap();
1038 rt.block_on(listener);
1039 }
1040
1041 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1042 }
1043
1044 #[test]
1045 fn listen_port_0() {
1046 let _ = tracing_subscriber::fmt()
1047 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1048 .try_init();
1049
1050 async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
1051 let mut tcp = Transport::<T>::default().boxed();
1052 tcp.listen_on(ListenerId::next(), addr).unwrap();
1053 tcp.select_next_some()
1054 .await
1055 .into_new_address()
1056 .expect("listen address")
1057 }
1058
1059 fn test(addr: Multiaddr) {
1060 let rt = ::tokio::runtime::Builder::new_current_thread()
1061 .enable_io()
1062 .build()
1063 .unwrap();
1064 let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
1065 assert!(!new_addr.to_string().contains("tcp/0"));
1066 }
1067
1068 test("/ip6/::1/tcp/0".parse().unwrap());
1069 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1070 }
1071
1072 #[test]
1073 fn listen_invalid_addr() {
1074 let _ = tracing_subscriber::fmt()
1075 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1076 .try_init();
1077
1078 fn test(addr: Multiaddr) {
1079 let mut tcp = tokio::Transport::default();
1080 assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
1081 }
1082
1083 test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1084 }
1085
1086 #[test]
1087 fn test_remove_listener() {
1088 let _ = tracing_subscriber::fmt()
1089 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1090 .try_init();
1091
1092 async fn cycle_listeners<T: Provider>() -> bool {
1093 let mut tcp = Transport::<T>::default().boxed();
1094 let listener_id = ListenerId::next();
1095 tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
1096 .unwrap();
1097 tcp.remove_listener(listener_id)
1098 }
1099
1100 #[cfg(feature = "tokio")]
1101 {
1102 let rt = ::tokio::runtime::Builder::new_current_thread()
1103 .enable_io()
1104 .build()
1105 .unwrap();
1106 assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
1107 }
1108 }
1109
1110 #[test]
1111 fn test_listens_ipv4_ipv6_separately() {
1112 fn test<T: Provider>() {
1113 let port = {
1114 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1115 listener.local_addr().unwrap().port()
1116 };
1117 let mut tcp = Transport::<T>::default().boxed();
1118 let listener_id = ListenerId::next();
1119 tcp.listen_on(
1120 listener_id,
1121 format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
1122 )
1123 .unwrap();
1124 tcp.listen_on(
1125 ListenerId::next(),
1126 format!("/ip6/::/tcp/{port}").parse().unwrap(),
1127 )
1128 .unwrap();
1129 }
1130 #[cfg(feature = "tokio")]
1131 {
1132 let rt = ::tokio::runtime::Builder::new_current_thread()
1133 .enable_io()
1134 .build()
1135 .unwrap();
1136 rt.block_on(async {
1137 test::<tokio::Tcp>();
1138 });
1139 }
1140 }
1141}