1use std::{
17 collections::HashSet,
18 fmt,
19 io,
20 net::{IpAddr, SocketAddr},
21 ops::Deref,
22 sync::{
23 Arc,
24 atomic::{AtomicUsize, Ordering::*},
25 },
26 time::{Duration, Instant},
27};
28
29use anyhow::anyhow;
30#[cfg(feature = "locktick")]
31use locktick::parking_lot::Mutex;
32use once_cell::sync::OnceCell;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::Mutex;
35use tokio::{
36 io::split,
37 net::{TcpListener, TcpSocket, TcpStream},
38 sync::oneshot,
39 task::JoinHandle,
40 time::timeout,
41};
42use tracing::*;
43
44use crate::{
45 BannedPeers,
46 Config,
47 KnownPeers,
48 Stats,
49 connections::{Connection, ConnectionSide, Connections},
50 protocols::{Protocol, Protocols},
51};
52
53static SEQUENTIAL_NODE_ID: AtomicUsize = AtomicUsize::new(0);
55
56#[derive(Clone)]
58pub struct Tcp(Arc<InnerTcp>);
59
60impl Deref for Tcp {
61 type Target = Arc<InnerTcp>;
62
63 fn deref(&self) -> &Self::Target {
64 &self.0
65 }
66}
67
68pub trait ApplicationError: Send + Sync + std::fmt::Debug + std::fmt::Display + 'static {}
70
71#[allow(missing_docs)]
73#[derive(thiserror::Error, Debug)]
74pub enum ConnectError {
75 #[error("already reached the maximum number of {limit} connections")]
76 MaximumConnectionsReached { limit: u16 },
77 #[error("already connecting to node at {address:?}")]
78 AlreadyConnecting { address: SocketAddr },
79 #[error("already connected to node at {address:?}")]
80 AlreadyConnected { address: SocketAddr },
81 #[error("attempt to self-connect (at address {address:?}")]
82 SelfConnect { address: SocketAddr },
83 #[error("rejected a connection attempt from a banned IP '{ip}'")]
84 BannedIp { ip: IpAddr },
85 #[error(transparent)]
87 IoError(std::io::Error),
88 #[error("{0}")]
91 ApplicationError(Box<dyn ApplicationError>),
92 #[error(transparent)]
96 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
97}
98
99impl ConnectError {
100 pub fn application<E: ApplicationError>(err: E) -> Self {
102 Self::ApplicationError(Box::new(err))
103 }
104
105 pub fn other<E: Into<Box<dyn std::error::Error + Send + Sync>>>(err: E) -> Self {
107 Self::Other(err.into())
108 }
109}
110
111impl From<ConnectError> for std::io::Error {
112 fn from(err: ConnectError) -> Self {
113 match err {
114 ConnectError::IoError(err) => err,
115 ConnectError::Other(err) => std::io::Error::other(err),
116 err => std::io::Error::other(err.to_string()),
117 }
118 }
119}
120
121impl From<std::io::Error> for ConnectError {
122 fn from(err: std::io::Error) -> Self {
123 if err.kind() == std::io::ErrorKind::Other {
125 let inner = err.into_inner().unwrap_or_else(|| anyhow!("Unknown error").into());
127 ConnectError::other(inner)
128 } else {
129 ConnectError::IoError(err)
130 }
131 }
132}
133
134#[doc(hidden)]
135pub struct InnerTcp {
136 span: Span,
138 config: Config,
140 listening_addr: OnceCell<SocketAddr>,
142 pub(crate) protocols: Protocols,
144 connecting: Mutex<HashSet<SocketAddr>>,
146 connections: Connections,
148 known_peers: KnownPeers,
150 banned_peers: BannedPeers,
152 stats: Stats,
154 pub(crate) tasks: Mutex<Vec<JoinHandle<()>>>,
156}
157
158impl Tcp {
159 pub fn new(mut config: Config) -> Self {
161 if config.name.is_none() {
163 config.name = Some(SEQUENTIAL_NODE_ID.fetch_add(1, Relaxed).to_string());
164 }
165
166 let span = crate::helpers::create_span(config.name.as_deref().unwrap());
168
169 let tcp = Tcp(Arc::new(InnerTcp {
171 span,
172 config,
173 listening_addr: Default::default(),
174 protocols: Default::default(),
175 connecting: Default::default(),
176 connections: Default::default(),
177 known_peers: Default::default(),
178 banned_peers: Default::default(),
179 stats: Stats::new(Instant::now()),
180 tasks: Default::default(),
181 }));
182
183 debug!(parent: tcp.span(), "The node is ready");
184
185 tcp
186 }
187
188 pub fn uptime(&self) -> Duration {
190 self.stats.timestamp().elapsed()
191 }
192
193 #[inline]
195 pub fn name(&self) -> &str {
196 self.config.name.as_deref().unwrap()
198 }
199
200 #[inline]
202 pub fn config(&self) -> &Config {
203 &self.config
204 }
205
206 pub fn listening_addr(&self) -> io::Result<SocketAddr> {
209 self.listening_addr.get().copied().ok_or_else(|| io::ErrorKind::AddrNotAvailable.into())
210 }
211
212 pub fn is_connected(&self, addr: SocketAddr) -> bool {
214 self.connections.is_connected(addr)
215 }
216
217 pub fn is_connecting(&self, addr: SocketAddr) -> bool {
219 self.connecting.lock().contains(&addr)
220 }
221
222 pub fn num_connected(&self) -> usize {
224 self.connections.num_connected()
225 }
226
227 pub fn num_connecting(&self) -> usize {
229 self.connecting.lock().len()
230 }
231
232 pub fn connected_addrs(&self) -> Vec<SocketAddr> {
234 self.connections.addrs()
235 }
236
237 pub fn connecting_addrs(&self) -> Vec<SocketAddr> {
239 self.connecting.lock().iter().copied().collect()
240 }
241
242 #[inline]
244 pub fn known_peers(&self) -> &KnownPeers {
245 &self.known_peers
246 }
247
248 #[inline]
250 pub fn banned_peers(&self) -> &BannedPeers {
251 &self.banned_peers
252 }
253
254 #[inline]
256 pub fn stats(&self) -> &Stats {
257 &self.stats
258 }
259
260 #[inline]
262 pub fn span(&self) -> &Span {
263 &self.span
264 }
265
266 pub async fn shut_down(&self) {
268 debug!(parent: self.span(), "Shutting down the TCP stack");
269
270 let mut tasks = std::mem::take(&mut *self.tasks.lock()).into_iter();
272
273 if let Some(listening_task) = tasks.next() {
275 listening_task.abort(); }
277 for addr in self.connected_addrs() {
279 self.disconnect(addr).await;
280 }
281 for handle in tasks {
283 handle.abort();
284 }
285 }
286}
287
288impl Tcp {
289 pub async fn connect(&self, addr: SocketAddr) -> Result<(), ConnectError> {
291 if let Ok(listening_addr) = self.listening_addr() {
292 if addr == listening_addr || self.is_self_connect(addr) {
294 error!(parent: self.span(), "Attempted to self-connect ({addr})");
295 return Err(ConnectError::SelfConnect { address: addr });
296 }
297 }
298
299 if !self.can_add_connection() {
300 error!(parent: self.span(), "Too many connections; refusing to connect to {addr}");
301 return Err(ConnectError::MaximumConnectionsReached { limit: self.config.max_connections });
302 }
303
304 if self.is_connected(addr) {
305 trace!(parent: self.span(), "Already connected to {addr}");
306 return Err(ConnectError::AlreadyConnected { address: addr });
307 }
308
309 if !self.connecting.lock().insert(addr) {
310 debug!(parent: self.span(), "Already connecting to {addr}");
311 return Err(ConnectError::AlreadyConnecting { address: addr });
312 }
313
314 let timeout_duration = Duration::from_millis(self.config().connection_timeout_ms.into());
315
316 let res = if let Some(listen_ip) = self.config().listener_ip {
319 timeout(timeout_duration, self.connect_with_specific_interface(listen_ip, addr)).await
320 } else {
321 timeout(timeout_duration, TcpStream::connect(addr)).await
322 };
323
324 let stream = match res {
325 Ok(Ok(stream)) => Ok(stream),
326 Ok(err) => {
327 self.connecting.lock().remove(&addr);
328 err
329 }
330 Err(err) => {
331 self.connecting.lock().remove(&addr);
332 error!("connection timeout error: {}", err);
333 Err(io::ErrorKind::TimedOut.into())
334 }
335 }?;
336
337 let ret = self.adapt_stream(stream, addr, ConnectionSide::Initiator).await;
338
339 if let Err(ref e) = ret {
340 self.connecting.lock().remove(&addr);
341 self.known_peers().register_failure(addr.ip());
342 error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}");
343 }
344
345 ret.map_err(|err| err.into())
346 }
347
348 async fn connect_with_specific_interface(&self, listen_ip: IpAddr, addr: SocketAddr) -> io::Result<TcpStream> {
349 let sock = if listen_ip.is_ipv4() { TcpSocket::new_v4()? } else { TcpSocket::new_v6()? };
350 sock.bind(SocketAddr::new(listen_ip, 0))?;
352 sock.connect(addr).await
353 }
354
355 pub async fn disconnect(&self, addr: SocketAddr) -> bool {
359 if let Some(conn) = self.connections.0.read().get(&addr) {
361 if conn.disconnecting.swap(true, Relaxed) {
362 return false;
364 }
365 } else {
366 return false;
368 };
369
370 if let Some(handler) = self.protocols.disconnect.get() {
371 let (sender, receiver) = oneshot::channel();
372 handler.trigger((addr, sender));
373 let _ = receiver.await; }
375
376 let conn = self.connections.remove(addr);
377
378 if let Some(ref conn) = conn {
379 debug!(parent: self.span(), "Disconnecting from {}", conn.addr());
380
381 for task in conn.tasks.iter().rev() {
383 task.abort();
384 }
385
386 debug!(parent: self.span(), "Disconnected from {}", conn.addr());
387 } else {
388 warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}");
389 }
390
391 conn.is_some()
392 }
393}
394
395impl Tcp {
396 pub async fn enable_listener(&self) -> io::Result<SocketAddr> {
398 let listener_ip =
400 self.config().listener_ip.expect("Tcp::enable_listener was called, but Config::listener_ip is not set");
401
402 let listener = self.create_listener(listener_ip).await?;
404
405 let port = listener.local_addr()?.port();
407
408 let listening_addr = (listener_ip, port).into();
410 self.listening_addr.set(listening_addr).expect("The node's listener was started more than once");
411
412 let (tx, rx) = oneshot::channel();
414
415 let tcp = self.clone();
416 let listening_task = tokio::spawn(async move {
417 trace!(parent: tcp.span(), "Spawned the listening task");
418 tx.send(()).unwrap(); loop {
421 match listener.accept().await {
423 Ok((stream, addr)) => tcp.handle_connection(stream, addr),
424 Err(e) => error!(parent: tcp.span(), "Failed to accept a connection: {e}"),
425 }
426 }
427 });
428 self.tasks.lock().push(listening_task);
429 let _ = rx.await;
430 debug!(parent: self.span(), "Listening on {listening_addr}");
431
432 Ok(listening_addr)
433 }
434
435 async fn create_listener(&self, listener_ip: IpAddr) -> io::Result<TcpListener> {
437 debug!("Creating a TCP listener on {listener_ip}...");
438 let listener = if let Some(port) = self.config().desired_listening_port {
439 let desired_listening_addr = SocketAddr::new(listener_ip, port);
441 match TcpListener::bind(desired_listening_addr).await {
443 Ok(listener) => listener,
444 Err(e) => {
445 if self.config().allow_random_port {
446 warn!(
447 parent: self.span(),
448 "Trying any listening port, as the desired port is unavailable: {e}"
449 );
450 let random_available_addr = SocketAddr::new(listener_ip, 0);
451 TcpListener::bind(random_available_addr).await?
452 } else {
453 error!(parent: self.span(), "The desired listening port is unavailable: {e}");
454 return Err(e);
455 }
456 }
457 }
458 } else if self.config().allow_random_port {
459 let random_available_addr = SocketAddr::new(listener_ip, 0);
460 TcpListener::bind(random_available_addr).await?
461 } else {
462 panic!("As 'listener_ip' is set, either 'desired_listening_port' or 'allow_random_port' must be set");
463 };
464
465 Ok(listener)
466 }
467
468 fn handle_connection(&self, stream: TcpStream, addr: SocketAddr) {
470 debug!(parent: self.span(), "Received a connection from {addr}");
471
472 if !self.can_add_connection() || self.is_self_connect(addr) {
473 debug!(parent: self.span(), "Rejecting the connection from {addr}");
474 return;
475 }
476
477 self.connecting.lock().insert(addr);
478
479 let tcp = self.clone();
480 tokio::spawn(async move {
481 if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await {
482 tcp.connecting.lock().remove(&addr);
483 tcp.known_peers().register_failure(addr.ip());
484 error!(parent: tcp.span(), "Failed to connect with {addr}: {e}");
485 }
486 });
487 }
488
489 fn is_self_connect(&self, addr: SocketAddr) -> bool {
491 let listening_addr = self.listening_addr().unwrap();
493
494 match listening_addr.ip().is_loopback() {
495 true => listening_addr.port() == addr.port(),
498 false => listening_addr.ip() == addr.ip(),
500 }
501 }
502
503 fn can_add_connection(&self) -> bool {
505 let num_connected = self.num_connected();
507 let limit = self.config.max_connections as usize;
509
510 if num_connected >= limit {
511 warn!(parent: self.span(), "Maximum number of active connections ({limit}) reached");
512 false
513 } else if num_connected + self.num_connecting() >= limit {
514 warn!(parent: self.span(), "Maximum number of active & pending connections ({limit}) reached");
515 false
516 } else {
517 true
518 }
519 }
520
521 async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> {
523 self.known_peers.add(peer_addr.ip());
524
525 if own_side == ConnectionSide::Initiator {
527 if let Ok(addr) = stream.local_addr() {
528 debug!(
529 parent: self.span(), "establishing connection with {}; the peer is connected on port {}",
530 peer_addr, addr.port()
531 );
532 } else {
533 warn!(parent: self.span(), "couldn't determine the peer's port");
534 }
535 }
536
537 let connection = Connection::new(peer_addr, stream, !own_side);
538
539 let mut connection = self.enable_protocols(connection).await?;
541
542 let conn_ready_tx = connection.readiness_notifier.take();
544
545 self.connections.add(connection);
546 self.connecting.lock().remove(&peer_addr);
547
548 if let Some(tx) = conn_ready_tx {
550 let _ = tx.send(());
551 }
552
553 if let Some(handler) = self.protocols.on_connect.get() {
555 let (sender, receiver) = oneshot::channel();
556 handler.trigger((peer_addr, sender));
557 let _ = receiver.await; }
559
560 Ok(())
561 }
562
563 async fn enable_protocols(&self, conn: Connection) -> io::Result<Connection> {
565 macro_rules! enable_protocol {
567 ($handler_type: ident, $node:expr, $conn: expr) => {
568 if let Some(handler) = $node.protocols.$handler_type.get() {
569 let (conn_returner, conn_retriever) = oneshot::channel();
570
571 handler.trigger(($conn, conn_returner));
572
573 match conn_retriever.await {
574 Ok(Ok(conn)) => conn,
575 Err(_) => return Err(io::ErrorKind::BrokenPipe.into()),
576 Ok(e) => return e,
577 }
578 } else {
579 $conn
580 }
581 };
582 }
583
584 let mut conn = enable_protocol!(handshake, self, conn);
585
586 if let Some(stream) = conn.stream.take() {
588 let (reader, writer) = split(stream);
589 conn.reader = Some(Box::new(reader));
590 conn.writer = Some(Box::new(writer));
591 }
592
593 let conn = enable_protocol!(reading, self, conn);
594 let conn = enable_protocol!(writing, self, conn);
595
596 Ok(conn)
597 }
598}
599
600impl fmt::Debug for Tcp {
601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602 write!(f, "The TCP stack config: {:?}", self.config)
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609
610 use std::{
611 net::{IpAddr, Ipv4Addr},
612 str::FromStr,
613 };
614
615 #[tokio::test]
616 async fn test_new() {
617 let tcp = Tcp::new(Config {
618 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
619 max_connections: 200,
620 ..Default::default()
621 });
622
623 assert_eq!(tcp.config.max_connections, 200);
624 assert_eq!(tcp.config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
625 assert_eq!(tcp.enable_listener().await.unwrap().ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
626
627 assert_eq!(tcp.num_connected(), 0);
628 assert_eq!(tcp.num_connecting(), 0);
629 }
630
631 #[tokio::test]
632 async fn test_connect() {
633 let tcp = Tcp::new(Config::default());
634 let node_ip = tcp.enable_listener().await.unwrap();
635
636 let result = tcp.connect(node_ip).await;
638 assert!(matches!(result, Err(ConnectError::SelfConnect { .. })));
639
640 assert_eq!(tcp.num_connected(), 0);
641 assert_eq!(tcp.num_connecting(), 0);
642 assert!(!tcp.is_connected(node_ip));
643 assert!(!tcp.is_connecting(node_ip));
644
645 let peer = Tcp::new(Config {
647 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
648 desired_listening_port: Some(0),
649 max_connections: 1,
650 ..Default::default()
651 });
652 let peer_ip = peer.enable_listener().await.unwrap();
653
654 tcp.connect(peer_ip).await.unwrap();
656 assert_eq!(tcp.num_connected(), 1);
657 assert_eq!(tcp.num_connecting(), 0);
658 assert!(tcp.is_connected(peer_ip));
659 assert!(!tcp.is_connecting(peer_ip));
660 }
661
662 #[tokio::test]
663 async fn test_disconnect() {
664 let tcp = Tcp::new(Config::default());
665 let _node_ip = tcp.enable_listener().await.unwrap();
666
667 let peer = Tcp::new(Config {
669 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
670 desired_listening_port: Some(0),
671 max_connections: 1,
672 ..Default::default()
673 });
674 let peer_ip = peer.enable_listener().await.unwrap();
675
676 tcp.connect(peer_ip).await.unwrap();
678 assert_eq!(tcp.num_connected(), 1);
679 assert_eq!(tcp.num_connecting(), 0);
680 assert!(tcp.is_connected(peer_ip));
681 assert!(!tcp.is_connecting(peer_ip));
682
683 let has_disconnected = tcp.disconnect(peer_ip).await;
685 assert!(has_disconnected);
686 assert_eq!(tcp.num_connected(), 0);
687 assert_eq!(tcp.num_connecting(), 0);
688 assert!(!tcp.is_connected(peer_ip));
689 assert!(!tcp.is_connecting(peer_ip));
690
691 let has_disconnected = tcp.disconnect(peer_ip).await;
693 assert!(!has_disconnected);
694 assert_eq!(tcp.num_connected(), 0);
695 assert_eq!(tcp.num_connecting(), 0);
696 assert!(!tcp.is_connected(peer_ip));
697 assert!(!tcp.is_connecting(peer_ip));
698 }
699
700 #[tokio::test]
701 async fn test_can_add_connection() {
702 let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() });
703
704 let peer = Tcp::new(Config {
706 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
707 desired_listening_port: Some(0),
708 max_connections: 1,
709 ..Default::default()
710 });
711 let peer_ip = peer.enable_listener().await.unwrap();
712
713 assert!(tcp.can_add_connection());
714
715 let stream = TcpStream::connect(peer_ip).await.unwrap();
717 tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Initiator));
718 assert!(!tcp.can_add_connection());
719
720 let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap();
723 let result = tcp.connect(another_ip).await;
724 assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. })));
725
726 tcp.connections.remove(peer_ip);
728 assert!(tcp.can_add_connection());
729
730 tcp.connecting.lock().insert(peer_ip);
732 assert!(!tcp.can_add_connection());
733
734 let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap();
736 let result = tcp.connect(another_ip).await;
737 assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. })));
738
739 tcp.connecting.lock().remove(&peer_ip);
741 assert!(tcp.can_add_connection());
742
743 let stream = TcpStream::connect(peer_ip).await.unwrap();
745 tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Responder));
746 tcp.connecting.lock().insert(peer_ip);
747 assert!(!tcp.can_add_connection());
748
749 tcp.connections.remove(peer_ip);
751 tcp.connecting.lock().remove(&peer_ip);
752 assert!(tcp.can_add_connection());
753 }
754
755 #[tokio::test]
756 async fn test_handle_connection() {
757 let tcp = Tcp::new(Config {
758 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
759 max_connections: 1,
760 ..Default::default()
761 });
762
763 let peer1 = Tcp::new(Config {
765 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
766 desired_listening_port: Some(0),
767 max_connections: 1,
768 ..Default::default()
769 });
770 let peer1_ip = peer1.enable_listener().await.unwrap();
771
772 let stream = TcpStream::connect(peer1_ip).await.unwrap();
774 tcp.connections.add(Connection::new(peer1_ip, stream, ConnectionSide::Responder));
775 assert!(!tcp.can_add_connection());
776 assert_eq!(tcp.num_connected(), 1);
777 assert_eq!(tcp.num_connecting(), 0);
778 assert!(tcp.is_connected(peer1_ip));
779 assert!(!tcp.is_connecting(peer1_ip));
780
781 let peer2 = Tcp::new(Config {
783 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
784 desired_listening_port: Some(0),
785 max_connections: 1,
786 ..Default::default()
787 });
788 let peer2_ip = peer2.enable_listener().await.unwrap();
789
790 let stream = TcpStream::connect(peer2_ip).await.unwrap();
792 tcp.handle_connection(stream, peer2_ip);
793 assert!(!tcp.can_add_connection());
794 assert_eq!(tcp.num_connected(), 1);
795 assert_eq!(tcp.num_connecting(), 0);
796 assert!(tcp.is_connected(peer1_ip));
797 assert!(!tcp.is_connected(peer2_ip));
798 assert!(!tcp.is_connecting(peer1_ip));
799 assert!(!tcp.is_connecting(peer2_ip));
800 }
801
802 #[tokio::test]
803 async fn test_adapt_stream() {
804 let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() });
805
806 let peer = Tcp::new(Config {
808 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
809 desired_listening_port: Some(0),
810 max_connections: 1,
811 ..Default::default()
812 });
813 let peer_ip = peer.enable_listener().await.unwrap();
814
815 tcp.connecting.lock().insert(peer_ip);
817 assert_eq!(tcp.num_connected(), 0);
818 assert_eq!(tcp.num_connecting(), 1);
819 assert!(!tcp.is_connected(peer_ip));
820 assert!(tcp.is_connecting(peer_ip));
821
822 let stream = TcpStream::connect(peer_ip).await.unwrap();
824 tcp.adapt_stream(stream, peer_ip, ConnectionSide::Responder).await.unwrap();
825 assert_eq!(tcp.num_connected(), 1);
826 assert_eq!(tcp.num_connecting(), 0);
827 assert!(tcp.is_connected(peer_ip));
828 assert!(!tcp.is_connecting(peer_ip));
829 }
830}