1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use tokio::{io::BufReader, net::TcpStream};
use tracing::{error, info, info_span, warn, Instrument};
use uuid::Uuid;
use crate::shared::{proxy, recv_json, send_json, ClientMessage, ServerMessage, CONTROL_PORT};
pub struct Client {
conn: Option<BufReader<TcpStream>>,
to: String,
local_port: u16,
remote_port: u16,
}
impl Client {
pub async fn new(local_port: u16, to: &str, port: u16) -> Result<Self> {
let stream = TcpStream::connect((to, CONTROL_PORT)).await?;
let mut stream = BufReader::new(stream);
send_json(&mut stream, ClientMessage::Hello(port)).await?;
let remote_port = match recv_json(&mut stream, &mut Vec::new()).await? {
Some(ServerMessage::Hello(remote_port)) => remote_port,
Some(ServerMessage::Error(message)) => bail!("server error: {message}"),
Some(_) => bail!("unexpected initial non-hello message"),
None => bail!("unexpected EOF"),
};
info!(remote_port, "connected to server");
info!("listening at {to}:{remote_port}");
Ok(Client {
conn: Some(stream),
to: to.to_string(),
local_port,
remote_port,
})
}
pub fn remote_port(&self) -> u16 {
self.remote_port
}
pub async fn listen(mut self) -> Result<()> {
let mut conn = self.conn.take().unwrap();
let this = Arc::new(self);
let mut buf = Vec::new();
loop {
let msg = recv_json(&mut conn, &mut buf).await?;
match msg {
Some(ServerMessage::Hello(_)) => warn!("unexpected hello"),
Some(ServerMessage::Heartbeat) => (),
Some(ServerMessage::Connection(id)) => {
let this = Arc::clone(&this);
tokio::spawn(
async move {
info!("new connection");
match this.handle_connection(id).await {
Ok(_) => info!("connection exited"),
Err(err) => warn!(%err, "connection exited with error"),
}
}
.instrument(info_span!("proxy", %id)),
);
}
Some(ServerMessage::Error(err)) => error!(%err, "server error"),
None => return Ok(()),
}
}
}
async fn handle_connection(&self, id: Uuid) -> Result<()> {
let local_conn = TcpStream::connect(("localhost", self.local_port))
.await
.context("failed TCP connection to local port")?;
let mut remote_conn = TcpStream::connect((&self.to[..], CONTROL_PORT))
.await
.context("failed TCP connection to remote port")?;
send_json(&mut remote_conn, ClientMessage::Accept(id)).await?;
proxy(local_conn, remote_conn).await?;
Ok(())
}
}