1use crate::errors::*;
2use crate::p2p::proto::PeerAddr;
3use std::net::SocketAddr;
4use std::time::Duration;
5use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
6use tokio::net::TcpStream;
7use tokio::time;
8use tokio_socks::tcp::Socks5Stream;
9
10pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
11pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);
12pub const PROXY_TIMEOUT: Duration = Duration::from_secs(30);
13
14pub async fn connect(addr: &PeerAddr, proxy: Option<SocketAddr>) -> Result<TcpStream> {
15 let PeerAddr::Inet(addr) = addr else {
16 bail!("Connecting to onions is not yet implemented")
17 };
18
19 let target = proxy.unwrap_or(*addr);
21
22 info!("Creating tcp connection to {target:?}");
23 let sock = TcpStream::connect(target);
24 let mut sock = time::timeout(CONNECT_TIMEOUT, sock)
25 .await
26 .with_context(|| anyhow!("Connecting to {target:?} timed out"))?
27 .with_context(|| anyhow!("Failed to connect to {target:?}"))?;
28
29 if let Some(proxy) = proxy {
30 debug!("Requesting socks5 connection to {addr:?}");
31 let connect = Socks5Stream::connect_with_socket(sock, addr);
32
33 sock = time::timeout(PROXY_TIMEOUT, connect)
34 .await
35 .with_context(|| anyhow!("Connecting to {addr:?} (with socks5 {proxy:?}) timed out"))?
36 .with_context(|| anyhow!("Failed to connect to {addr:?} (with socks5 {proxy:?})"))?
37 .into_inner()
38 }
39
40 debug!("Connection has been established");
41
42 Ok(sock)
43}
44
45pub async fn handshake<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
46 mut rx: R,
47 mut tx: W,
48) -> Result<()> {
49 debug!("Sending protocol probe to remote peer");
50 time::timeout(HANDSHAKE_TIMEOUT, tx.write_all(b"//\n"))
51 .await
52 .context("Sending handshake protocol probe timed out")?
53 .context("Failed to send protocol probe")?;
54
55 let mut buf = [0u8; 3];
56 time::timeout(HANDSHAKE_TIMEOUT, rx.read_exact(&mut buf))
57 .await
58 .context("Sending handshake protocol probe timed out")?
59 .context("Failed to receive handshake response")?;
60
61 if buf == *b":0\n" {
62 debug!("Remote peer has sent expected response");
63 Ok(())
64 } else {
65 bail!("Invalid handshake response: {buf:?}")
66 }
67}