apt_swarm/
net.rs

1use crate::errors::*;
2use crate::p2p::proto::PeerAddr;
3use std::net::SocketAddr;
4use std::time::Duration;
5use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
6use tokio::net::TcpStream;
7use tokio::time;
8use tokio_socks::tcp::Socks5Stream;
9
10pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
11pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);
12pub const PROXY_TIMEOUT: Duration = Duration::from_secs(30);
13
14pub async fn connect(addr: &PeerAddr, proxy: Option<SocketAddr>) -> Result<TcpStream> {
15    let PeerAddr::Inet(addr) = addr else {
16        bail!("Connecting to onions is not yet implemented")
17    };
18
19    // TODO: only do this for PeerAddr::Inet
20    let target = proxy.unwrap_or(*addr);
21
22    info!("Creating tcp connection to {target:?}");
23    let sock = TcpStream::connect(target);
24    let mut sock = time::timeout(CONNECT_TIMEOUT, sock)
25        .await
26        .with_context(|| anyhow!("Connecting to {target:?} timed out"))?
27        .with_context(|| anyhow!("Failed to connect to {target:?}"))?;
28
29    if let Some(proxy) = proxy {
30        debug!("Requesting socks5 connection to {addr:?}");
31        let connect = Socks5Stream::connect_with_socket(sock, addr);
32
33        sock = time::timeout(PROXY_TIMEOUT, connect)
34            .await
35            .with_context(|| anyhow!("Connecting to {addr:?} (with socks5 {proxy:?}) timed out"))?
36            .with_context(|| anyhow!("Failed to connect to {addr:?} (with socks5 {proxy:?})"))?
37            .into_inner()
38    }
39
40    debug!("Connection has been established");
41
42    Ok(sock)
43}
44
45pub async fn handshake<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
46    mut rx: R,
47    mut tx: W,
48) -> Result<()> {
49    debug!("Sending protocol probe to remote peer");
50    time::timeout(HANDSHAKE_TIMEOUT, tx.write_all(b"//\n"))
51        .await
52        .context("Sending handshake protocol probe timed out")?
53        .context("Failed to send protocol probe")?;
54
55    let mut buf = [0u8; 3];
56    time::timeout(HANDSHAKE_TIMEOUT, rx.read_exact(&mut buf))
57        .await
58        .context("Sending handshake protocol probe timed out")?
59        .context("Failed to receive handshake response")?;
60
61    if buf == *b":0\n" {
62        debug!("Remote peer has sent expected response");
63        Ok(())
64    } else {
65        bail!("Invalid handshake response: {buf:?}")
66    }
67}