1use std::{
17 collections::HashSet,
18 fmt,
19 io,
20 net::{IpAddr, SocketAddr},
21 ops::Deref,
22 sync::{
23 Arc,
24 atomic::{AtomicUsize, Ordering::*},
25 },
26 time::{Duration, Instant},
27};
28
29use anyhow::anyhow;
30#[cfg(feature = "locktick")]
31use locktick::parking_lot::Mutex;
32use once_cell::sync::OnceCell;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::Mutex;
35use tokio::{
36 io::split,
37 net::{TcpListener, TcpSocket, TcpStream},
38 sync::oneshot,
39 task::{JoinHandle, JoinSet},
40 time::timeout,
41};
42use tracing::*;
43
44use crate::{
45 BannedPeers,
46 Config,
47 KnownPeers,
48 Stats,
49 connections::{Connection, ConnectionSide, Connections, create_connection_span},
50 protocols::{Protocol, Protocols},
51};
52
53static SEQUENTIAL_NODE_ID: AtomicUsize = AtomicUsize::new(0);
55
56#[derive(Clone)]
58pub struct Tcp(Arc<InnerTcp>);
59
60impl Deref for Tcp {
61 type Target = Arc<InnerTcp>;
62
63 fn deref(&self) -> &Self::Target {
64 &self.0
65 }
66}
67
68pub trait ApplicationError: Send + Sync + std::fmt::Debug + std::fmt::Display + 'static {}
70
71#[allow(missing_docs)]
73#[derive(thiserror::Error, Debug)]
74pub enum ConnectError {
75 #[error("already reached the maximum number of {limit} connections")]
76 MaximumConnectionsReached { limit: u16 },
77 #[error("already connecting to node at {address:?}")]
78 AlreadyConnecting { address: SocketAddr },
79 #[error("already connected to node at {address:?}")]
80 AlreadyConnected { address: SocketAddr },
81 #[error("attempt to self-connect (at address {address:?}")]
82 SelfConnect { address: SocketAddr },
83 #[error("rejected a connection attempt from a banned IP '{ip}'")]
84 BannedIp { ip: IpAddr },
85 #[error(transparent)]
87 IoError(std::io::Error),
88 #[error("{0}")]
91 ApplicationError(Box<dyn ApplicationError>),
92 #[error(transparent)]
96 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
97}
98
99impl ConnectError {
100 pub fn application<E: ApplicationError>(err: E) -> Self {
102 Self::ApplicationError(Box::new(err))
103 }
104
105 pub fn other<E: Into<Box<dyn std::error::Error + Send + Sync>>>(err: E) -> Self {
107 Self::Other(err.into())
108 }
109}
110
111impl From<ConnectError> for std::io::Error {
112 fn from(err: ConnectError) -> Self {
113 match err {
114 ConnectError::IoError(err) => err,
115 ConnectError::Other(err) => std::io::Error::other(err),
116 err => std::io::Error::other(err.to_string()),
117 }
118 }
119}
120
121impl From<std::io::Error> for ConnectError {
122 fn from(err: std::io::Error) -> Self {
123 if err.kind() == std::io::ErrorKind::Other {
125 let inner = err.into_inner().unwrap_or_else(|| anyhow!("Unknown error").into());
127 ConnectError::other(inner)
128 } else {
129 ConnectError::IoError(err)
130 }
131 }
132}
133
134#[doc(hidden)]
135pub struct InnerTcp {
136 span: Span,
138 config: Config,
140 listening_addr: OnceCell<SocketAddr>,
142 pub(crate) protocols: Protocols,
144 connecting: Mutex<HashSet<SocketAddr>>,
146 connections: Connections,
148 known_peers: KnownPeers,
150 banned_peers: BannedPeers,
152 stats: Stats,
154 pub(crate) tasks: Mutex<Vec<JoinHandle<()>>>,
156}
157
158impl Tcp {
159 pub fn new(mut config: Config) -> Self {
161 if config.name.is_none() {
163 config.name = Some(SEQUENTIAL_NODE_ID.fetch_add(1, Relaxed).to_string());
164 }
165
166 let span = crate::helpers::create_span(config.name.as_deref().unwrap());
168
169 let tcp = Tcp(Arc::new(InnerTcp {
171 span,
172 config,
173 listening_addr: Default::default(),
174 protocols: Default::default(),
175 connecting: Default::default(),
176 connections: Default::default(),
177 known_peers: Default::default(),
178 banned_peers: Default::default(),
179 stats: Stats::new(Instant::now()),
180 tasks: Default::default(),
181 }));
182
183 debug!(parent: tcp.span(), "The node is ready");
184
185 tcp
186 }
187
188 pub fn uptime(&self) -> Duration {
190 self.stats.timestamp().elapsed()
191 }
192
193 #[inline]
195 pub fn name(&self) -> &str {
196 self.config.name.as_deref().unwrap()
198 }
199
200 #[inline]
202 pub fn config(&self) -> &Config {
203 &self.config
204 }
205
206 pub fn listening_addr(&self) -> io::Result<SocketAddr> {
209 self.listening_addr.get().copied().ok_or_else(|| io::ErrorKind::AddrNotAvailable.into())
210 }
211
212 pub fn is_connected(&self, addr: SocketAddr) -> bool {
214 self.connections.is_connected(addr)
215 }
216
217 pub fn is_connecting(&self, addr: SocketAddr) -> bool {
219 self.connecting.lock().contains(&addr)
220 }
221
222 pub fn num_connected(&self) -> usize {
224 self.connections.num_connected()
225 }
226
227 pub fn num_connecting(&self) -> usize {
229 self.connecting.lock().len()
230 }
231
232 pub fn connected_addrs(&self) -> Vec<SocketAddr> {
234 self.connections.addrs()
235 }
236
237 pub fn connecting_addrs(&self) -> Vec<SocketAddr> {
239 self.connecting.lock().iter().copied().collect()
240 }
241
242 #[inline]
244 pub fn known_peers(&self) -> &KnownPeers {
245 &self.known_peers
246 }
247
248 #[inline]
250 pub fn banned_peers(&self) -> &BannedPeers {
251 &self.banned_peers
252 }
253
254 #[inline]
256 pub fn stats(&self) -> &Stats {
257 &self.stats
258 }
259
260 #[inline]
262 pub fn span(&self) -> &Span {
263 &self.span
264 }
265
266 pub async fn shut_down(&self) {
268 debug!(parent: self.span(), "Shutting down the TCP stack");
269
270 let mut tasks = std::mem::take(&mut *self.tasks.lock()).into_iter();
272
273 if let Some(listening_task) = tasks.next() {
275 listening_task.abort(); }
277
278 let mut disconnect_tasks = JoinSet::new();
280 for addr in self.connected_addrs() {
281 let node = self.clone();
282 disconnect_tasks.spawn(async move {
283 node.disconnect(addr).await;
284 });
285 }
286 while disconnect_tasks.join_next().await.is_some() {}
287
288 for handle in tasks {
290 handle.abort();
291 }
292 }
293}
294
295impl Tcp {
296 pub async fn connect(&self, addr: SocketAddr) -> Result<(), ConnectError> {
298 if let Ok(listening_addr) = self.listening_addr() {
299 if addr == listening_addr || self.is_self_connect(addr) {
301 error!(parent: self.span(), "Attempted to self-connect ({addr})");
302 return Err(ConnectError::SelfConnect { address: addr });
303 }
304 }
305
306 if !self.can_add_connection() {
307 error!(parent: self.span(), "Too many connections; refusing to connect to {addr}");
308 return Err(ConnectError::MaximumConnectionsReached { limit: self.config.max_connections });
309 }
310
311 if self.is_connected(addr) {
312 trace!(parent: self.span(), "Already connected to {addr}");
313 return Err(ConnectError::AlreadyConnected { address: addr });
314 }
315
316 if !self.connecting.lock().insert(addr) {
317 debug!(parent: self.span(), "Already connecting to {addr}");
318 return Err(ConnectError::AlreadyConnecting { address: addr });
319 }
320
321 let timeout_duration = Duration::from_millis(self.config().connection_timeout_ms.into());
322
323 let res = if let Some(listen_ip) = self.config().listener_ip {
326 timeout(timeout_duration, self.connect_with_specific_interface(listen_ip, addr)).await
327 } else {
328 timeout(timeout_duration, TcpStream::connect(addr)).await
329 };
330
331 let stream = match res {
332 Ok(Ok(stream)) => Ok(stream),
333 Ok(err) => {
334 self.connecting.lock().remove(&addr);
335 err
336 }
337 Err(err) => {
338 self.connecting.lock().remove(&addr);
339 error!("connection timeout error: {}", err);
340 Err(io::ErrorKind::TimedOut.into())
341 }
342 }?;
343
344 let ret = self.adapt_stream(stream, addr, ConnectionSide::Initiator).await;
345
346 if let Err(ref e) = ret {
347 self.connecting.lock().remove(&addr);
348 self.known_peers().register_failure(addr.ip());
349 error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}");
350 }
351
352 ret.map_err(|err| err.into())
353 }
354
355 async fn connect_with_specific_interface(&self, listen_ip: IpAddr, addr: SocketAddr) -> io::Result<TcpStream> {
356 let sock = if listen_ip.is_ipv4() { TcpSocket::new_v4()? } else { TcpSocket::new_v6()? };
357 sock.bind(SocketAddr::new(listen_ip, 0))?;
359 sock.connect(addr).await
360 }
361
362 pub async fn disconnect(&self, addr: SocketAddr) -> bool {
366 if let Some(conn) = self.connections.0.read().get(&addr) {
368 if conn.disconnecting.swap(true, Relaxed) {
369 return false;
371 }
372 } else {
373 return false;
375 };
376
377 if let Some(handler) = self.protocols.disconnect.get() {
378 let (sender, receiver) = oneshot::channel();
379 handler.trigger((addr, sender)).await;
380 if let Ok((handle, waiter)) = receiver.await {
381 if let Some(conn) = self.connections.0.write().get_mut(&addr) {
384 conn.tasks.push(handle);
385 }
386 let _ = waiter.await;
388 }
389 }
390
391 let conn = self.connections.remove(addr);
392 let disconnected = conn.is_some();
393
394 if let Some(conn) = conn {
395 debug!(parent: self.span(), "Disconnecting from {addr}");
396
397 drop(conn);
399
400 debug!(parent: self.span(), "Disconnected from {addr}");
401 } else {
402 warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}");
403 }
404
405 disconnected
406 }
407}
408
409impl Tcp {
410 pub async fn enable_listener(&self) -> io::Result<SocketAddr> {
412 let listener_ip =
414 self.config().listener_ip.expect("Tcp::enable_listener was called, but Config::listener_ip is not set");
415
416 let listener = self.create_listener(listener_ip).await?;
418
419 let port = listener.local_addr()?.port();
421
422 let listening_addr = (listener_ip, port).into();
424 self.listening_addr.set(listening_addr).expect("The node's listener was started more than once");
425
426 let (tx, rx) = oneshot::channel();
428
429 let tcp = self.clone();
430 let listening_task = tokio::spawn(async move {
431 trace!(parent: tcp.span(), "Spawned the listening task");
432 tx.send(()).unwrap(); loop {
435 match listener.accept().await {
437 Ok((stream, addr)) => tcp.handle_connection(stream, addr),
438 Err(e) => {
439 error!(parent: tcp.span(), "Failed to accept a connection: {e}");
440 tokio::time::sleep(Duration::from_millis(500)).await;
443 }
444 }
445 }
446 });
447 self.tasks.lock().push(listening_task);
448 let _ = rx.await;
449 debug!(parent: self.span(), "Listening on {listening_addr}");
450
451 Ok(listening_addr)
452 }
453
454 async fn create_listener(&self, listener_ip: IpAddr) -> io::Result<TcpListener> {
456 debug!("Creating a TCP listener on {listener_ip}...");
457 let listener = if let Some(port) = self.config().desired_listening_port {
458 let desired_listening_addr = SocketAddr::new(listener_ip, port);
460 match TcpListener::bind(desired_listening_addr).await {
462 Ok(listener) => listener,
463 Err(e) => {
464 if self.config().allow_random_port {
465 warn!(
466 parent: self.span(),
467 "Trying any listening port, as the desired port is unavailable: {e}"
468 );
469 let random_available_addr = SocketAddr::new(listener_ip, 0);
470 TcpListener::bind(random_available_addr).await?
471 } else {
472 error!(parent: self.span(), "The desired listening port is unavailable: {e}");
473 return Err(e);
474 }
475 }
476 }
477 } else if self.config().allow_random_port {
478 let random_available_addr = SocketAddr::new(listener_ip, 0);
479 TcpListener::bind(random_available_addr).await?
480 } else {
481 panic!("As 'listener_ip' is set, either 'desired_listening_port' or 'allow_random_port' must be set");
482 };
483
484 Ok(listener)
485 }
486
487 fn handle_connection(&self, stream: TcpStream, addr: SocketAddr) {
489 debug!(parent: self.span(), "Received a connection from {addr}");
490
491 if !self.can_add_connection() || self.is_self_connect(addr) {
492 debug!(parent: self.span(), "Rejecting the connection from {addr}");
493 return;
494 }
495
496 self.connecting.lock().insert(addr);
497
498 let tcp = self.clone();
499 tokio::spawn(async move {
500 if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await {
501 tcp.connecting.lock().remove(&addr);
502 tcp.known_peers().register_failure(addr.ip());
503 error!(parent: tcp.span(), "Failed to connect with {addr}: {e}");
504 }
505 });
506 }
507
508 fn is_self_connect(&self, addr: SocketAddr) -> bool {
510 let listening_addr = self.listening_addr().unwrap();
512
513 match listening_addr.ip().is_loopback() {
514 true => listening_addr.port() == addr.port(),
517 false => listening_addr.ip() == addr.ip(),
519 }
520 }
521
522 fn can_add_connection(&self) -> bool {
524 let num_connected = self.num_connected();
526 let limit = self.config.max_connections as usize;
528
529 if num_connected >= limit {
530 warn!(parent: self.span(), "Maximum number of active connections ({limit}) reached");
531 false
532 } else if num_connected + self.num_connecting() >= limit {
533 warn!(parent: self.span(), "Maximum number of active & pending connections ({limit}) reached");
534 false
535 } else {
536 true
537 }
538 }
539
540 async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> {
542 self.known_peers.add(peer_addr.ip());
543
544 if own_side == ConnectionSide::Initiator {
546 if let Ok(addr) = stream.local_addr() {
547 debug!(
548 parent: self.span(), "establishing connection with {}; the peer is connected on port {}",
549 peer_addr, addr.port()
550 );
551 } else {
552 warn!(parent: self.span(), "couldn't determine the peer's port");
553 }
554 }
555
556 let conn_span = create_connection_span(peer_addr, self.span());
557 let connection = Connection::new(peer_addr, stream, !own_side, conn_span);
558
559 let mut connection = self.enable_protocols(connection).await?;
561
562 let conn_ready_tx = connection.readiness_notifier.take();
564
565 self.connections.add(connection);
566 self.connecting.lock().remove(&peer_addr);
567
568 if let Some(tx) = conn_ready_tx {
570 let _ = tx.send(());
571 }
572
573 if let Some(handler) = self.protocols.on_connect.get() {
575 let (sender, receiver) = oneshot::channel();
576 handler.trigger((peer_addr, sender)).await;
577 if let Ok(handle) = receiver.await {
579 if let Some(conn) = self.connections.0.write().get_mut(&peer_addr) {
581 conn.tasks.push(handle);
582 } else {
583 handle.abort();
585 }
586 }
587 }
588
589 Ok(())
590 }
591
592 async fn enable_protocols(&self, conn: Connection) -> io::Result<Connection> {
594 macro_rules! enable_protocol {
596 ($handler_type: ident, $node:expr, $conn: expr) => {
597 if let Some(handler) = $node.protocols.$handler_type.get() {
598 let (conn_returner, conn_retriever) = oneshot::channel();
599
600 handler.trigger(($conn, conn_returner)).await;
601
602 match conn_retriever.await {
603 Ok(Ok(conn)) => conn,
604 Err(_) => return Err(io::ErrorKind::BrokenPipe.into()),
605 Ok(e) => return e,
606 }
607 } else {
608 $conn
609 }
610 };
611 }
612
613 let mut conn = enable_protocol!(handshake, self, conn);
614
615 if let Some(stream) = conn.stream.take() {
617 let (reader, writer) = split(stream);
618 conn.reader = Some(Box::new(reader));
619 conn.writer = Some(Box::new(writer));
620 }
621
622 let conn = enable_protocol!(reading, self, conn);
623 let conn = enable_protocol!(writing, self, conn);
624
625 Ok(conn)
626 }
627}
628
629impl fmt::Debug for Tcp {
630 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
631 write!(f, "The TCP stack config: {:?}", self.config)
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638
639 use std::{
640 net::{IpAddr, Ipv4Addr},
641 str::FromStr,
642 };
643
644 #[tokio::test]
645 async fn test_new() {
646 let tcp = Tcp::new(Config {
647 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
648 max_connections: 200,
649 ..Default::default()
650 });
651
652 assert_eq!(tcp.config.max_connections, 200);
653 assert_eq!(tcp.config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
654 assert_eq!(tcp.enable_listener().await.unwrap().ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
655
656 assert_eq!(tcp.num_connected(), 0);
657 assert_eq!(tcp.num_connecting(), 0);
658 }
659
660 #[tokio::test]
661 async fn test_connect() {
662 let tcp = Tcp::new(Config::default());
663 let node_ip = tcp.enable_listener().await.unwrap();
664
665 let result = tcp.connect(node_ip).await;
667 assert!(matches!(result, Err(ConnectError::SelfConnect { .. })));
668
669 assert_eq!(tcp.num_connected(), 0);
670 assert_eq!(tcp.num_connecting(), 0);
671 assert!(!tcp.is_connected(node_ip));
672 assert!(!tcp.is_connecting(node_ip));
673
674 let peer = Tcp::new(Config {
676 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
677 desired_listening_port: Some(0),
678 max_connections: 1,
679 ..Default::default()
680 });
681 let peer_ip = peer.enable_listener().await.unwrap();
682
683 tcp.connect(peer_ip).await.unwrap();
685 assert_eq!(tcp.num_connected(), 1);
686 assert_eq!(tcp.num_connecting(), 0);
687 assert!(tcp.is_connected(peer_ip));
688 assert!(!tcp.is_connecting(peer_ip));
689 }
690
691 #[tokio::test]
692 async fn test_disconnect() {
693 let tcp = Tcp::new(Config::default());
694 let _node_ip = tcp.enable_listener().await.unwrap();
695
696 let peer = Tcp::new(Config {
698 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
699 desired_listening_port: Some(0),
700 max_connections: 1,
701 ..Default::default()
702 });
703 let peer_ip = peer.enable_listener().await.unwrap();
704
705 tcp.connect(peer_ip).await.unwrap();
707 assert_eq!(tcp.num_connected(), 1);
708 assert_eq!(tcp.num_connecting(), 0);
709 assert!(tcp.is_connected(peer_ip));
710 assert!(!tcp.is_connecting(peer_ip));
711
712 let has_disconnected = tcp.disconnect(peer_ip).await;
714 assert!(has_disconnected);
715 assert_eq!(tcp.num_connected(), 0);
716 assert_eq!(tcp.num_connecting(), 0);
717 assert!(!tcp.is_connected(peer_ip));
718 assert!(!tcp.is_connecting(peer_ip));
719
720 let has_disconnected = tcp.disconnect(peer_ip).await;
722 assert!(!has_disconnected);
723 assert_eq!(tcp.num_connected(), 0);
724 assert_eq!(tcp.num_connecting(), 0);
725 assert!(!tcp.is_connected(peer_ip));
726 assert!(!tcp.is_connecting(peer_ip));
727 }
728
729 #[tokio::test]
730 async fn test_can_add_connection() {
731 let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() });
732
733 let peer = Tcp::new(Config {
735 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
736 desired_listening_port: Some(0),
737 max_connections: 1,
738 ..Default::default()
739 });
740 let peer_ip = peer.enable_listener().await.unwrap();
741
742 assert!(tcp.can_add_connection());
743
744 let stream = TcpStream::connect(peer_ip).await.unwrap();
746 tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Initiator, Span::none()));
747 assert!(!tcp.can_add_connection());
748
749 let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap();
752 let result = tcp.connect(another_ip).await;
753 assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. })));
754
755 tcp.connections.remove(peer_ip);
757 assert!(tcp.can_add_connection());
758
759 tcp.connecting.lock().insert(peer_ip);
761 assert!(!tcp.can_add_connection());
762
763 let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap();
765 let result = tcp.connect(another_ip).await;
766 assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. })));
767
768 tcp.connecting.lock().remove(&peer_ip);
770 assert!(tcp.can_add_connection());
771
772 let stream = TcpStream::connect(peer_ip).await.unwrap();
774 tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Responder, Span::none()));
775 tcp.connecting.lock().insert(peer_ip);
776 assert!(!tcp.can_add_connection());
777
778 tcp.connections.remove(peer_ip);
780 tcp.connecting.lock().remove(&peer_ip);
781 assert!(tcp.can_add_connection());
782 }
783
784 #[tokio::test]
785 async fn test_handle_connection() {
786 let tcp = Tcp::new(Config {
787 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
788 max_connections: 1,
789 ..Default::default()
790 });
791
792 let peer1 = Tcp::new(Config {
794 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
795 desired_listening_port: Some(0),
796 max_connections: 1,
797 ..Default::default()
798 });
799 let peer1_ip = peer1.enable_listener().await.unwrap();
800
801 let stream = TcpStream::connect(peer1_ip).await.unwrap();
803 tcp.connections.add(Connection::new(peer1_ip, stream, ConnectionSide::Responder, Span::none()));
804 assert!(!tcp.can_add_connection());
805 assert_eq!(tcp.num_connected(), 1);
806 assert_eq!(tcp.num_connecting(), 0);
807 assert!(tcp.is_connected(peer1_ip));
808 assert!(!tcp.is_connecting(peer1_ip));
809
810 let peer2 = Tcp::new(Config {
812 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
813 desired_listening_port: Some(0),
814 max_connections: 1,
815 ..Default::default()
816 });
817 let peer2_ip = peer2.enable_listener().await.unwrap();
818
819 let stream = TcpStream::connect(peer2_ip).await.unwrap();
821 tcp.handle_connection(stream, peer2_ip);
822 assert!(!tcp.can_add_connection());
823 assert_eq!(tcp.num_connected(), 1);
824 assert_eq!(tcp.num_connecting(), 0);
825 assert!(tcp.is_connected(peer1_ip));
826 assert!(!tcp.is_connected(peer2_ip));
827 assert!(!tcp.is_connecting(peer1_ip));
828 assert!(!tcp.is_connecting(peer2_ip));
829 }
830
831 #[tokio::test]
832 async fn test_adapt_stream() {
833 let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() });
834
835 let peer = Tcp::new(Config {
837 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
838 desired_listening_port: Some(0),
839 max_connections: 1,
840 ..Default::default()
841 });
842 let peer_ip = peer.enable_listener().await.unwrap();
843
844 tcp.connecting.lock().insert(peer_ip);
846 assert_eq!(tcp.num_connected(), 0);
847 assert_eq!(tcp.num_connecting(), 1);
848 assert!(!tcp.is_connected(peer_ip));
849 assert!(tcp.is_connecting(peer_ip));
850
851 let stream = TcpStream::connect(peer_ip).await.unwrap();
853 tcp.adapt_stream(stream, peer_ip, ConnectionSide::Responder).await.unwrap();
854 assert_eq!(tcp.num_connected(), 1);
855 assert_eq!(tcp.num_connecting(), 0);
856 assert!(tcp.is_connected(peer_ip));
857 assert!(!tcp.is_connecting(peer_ip));
858 }
859}