runnel-rs 0.2.2

A Rust proxy and tunnel toolbox with WireGuard-style, TUN, SOCKS, and TLS-based transports.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
use super::{
    http::TunnelTransport,
    remote::establish_remote_tunnel,
    route::{RouteDecision, Router},
    socks5::{self, TargetAddr},
    udp,
};
use crate::runtime::ClientRuntime;
use anyhow::{Context, Result, bail};
use std::{
    collections::HashMap,
    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
    sync::Arc,
};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpStream, UdpSocket},
    sync::{Mutex, mpsc},
    task::JoinHandle,
};
use tokio_rustls::TlsConnector;
use tracing::{debug, info, warn};

type SessionMap = Arc<Mutex<HashMap<String, mpsc::Sender<Vec<u8>>>>>;
type BackgroundTasks = Arc<Mutex<Vec<JoinHandle<()>>>>;

pub(crate) async fn handle_native_http_udp_associate(
    mut inbound: TcpStream,
    peer: SocketAddr,
    args: ClientRuntime,
    router: Arc<Router>,
    connector: TlsConnector,
    host_header: String,
    server_name: String,
) -> Result<()> {
    let bind_ip = match inbound.local_addr()?.ip() {
        IpAddr::V4(ip) if !ip.is_unspecified() => IpAddr::V4(ip),
        IpAddr::V6(ip) if !ip.is_unspecified() => IpAddr::V6(ip),
        _ => IpAddr::V4(Ipv4Addr::LOCALHOST),
    };
    let relay = Arc::new(
        UdpSocket::bind(SocketAddr::new(bind_ip, 0))
            .await
            .context("failed to bind SOCKS UDP relay socket")?,
    );
    let relay_addr = relay.local_addr()?;
    socks5::send_success_bound(&mut inbound, relay_addr)
        .await
        .context("failed to send SOCKS UDP associate reply")?;

    info!(peer = %peer, bind = %relay_addr, "UDP associate established");

    let client_addr = Arc::new(Mutex::new(None::<SocketAddr>));
    let direct_sessions = Arc::new(Mutex::new(HashMap::new()));
    let remote_sessions = Arc::new(Mutex::new(HashMap::new()));
    let tasks = Arc::new(Mutex::new(Vec::new()));

    let result = tokio::select! {
        result = run_udp_association(
            relay,
            client_addr,
            direct_sessions,
            remote_sessions,
            tasks.clone(),
            args,
            router,
            connector,
            host_header,
            server_name,
        ) => result,
        result = wait_for_control_close(&mut inbound) => result,
    };

    abort_background_tasks(tasks).await;
    result
}

#[allow(clippy::too_many_arguments)]
async fn run_udp_association(
    relay: Arc<UdpSocket>,
    client_addr: Arc<Mutex<Option<SocketAddr>>>,
    direct_sessions: SessionMap,
    remote_sessions: SessionMap,
    tasks: BackgroundTasks,
    args: ClientRuntime,
    router: Arc<Router>,
    connector: TlsConnector,
    host_header: String,
    server_name: String,
) -> Result<()> {
    let mut buf = vec![0_u8; udp::MAX_UDP_FRAME_SIZE];

    loop {
        let (len, sender) = relay
            .recv_from(&mut buf)
            .await
            .context("failed to receive UDP datagram from local SOCKS client")?;
        *client_addr.lock().await = Some(sender);

        let packet = match socks5::parse_udp_packet(&buf[..len]) {
            Ok(packet) => packet,
            Err(err) => {
                warn!(peer = %sender, error = %err, "dropping invalid SOCKS UDP packet");
                continue;
            }
        };

        let target = packet.target;
        let key = target.to_string();

        if let Some(dns_upstream) = args.tun_dns_udp_upstream(&target) {
            let dns_key = format!("dns-tcp:{dns_upstream}");
            send_via_session(&remote_sessions, &dns_key, packet.payload, || {
                create_remote_tcp_dns_session(
                    target.clone(),
                    dns_upstream.clone(),
                    relay.clone(),
                    client_addr.clone(),
                    tasks.clone(),
                    args.clone(),
                    connector.clone(),
                    host_header.clone(),
                    server_name.clone(),
                )
            })
            .await?;
            continue;
        }

        match router.decide(&target).await? {
            RouteDecision::Direct => {
                send_via_session(&direct_sessions, &key, packet.payload, || {
                    create_direct_udp_session(
                        target.clone(),
                        relay.clone(),
                        client_addr.clone(),
                        tasks.clone(),
                    )
                })
                .await?;
            }
            RouteDecision::Remote => {
                send_via_session(&remote_sessions, &key, packet.payload, || {
                    create_remote_udp_session(
                        target.clone(),
                        relay.clone(),
                        client_addr.clone(),
                        tasks.clone(),
                        args.clone(),
                        connector.clone(),
                        host_header.clone(),
                        server_name.clone(),
                    )
                })
                .await?;
            }
            RouteDecision::Block => {
                info!(target = %key, route = "block", mode = "native-http", "route decision");
                debug!(target = %key, "dropping blocked UDP target");
            }
        }
    }
}

async fn send_via_session<F, Fut>(
    sessions: &SessionMap,
    key: &str,
    payload: Vec<u8>,
    create: F,
) -> Result<()>
where
    F: FnOnce() -> Fut,
    Fut: std::future::Future<Output = Result<mpsc::Sender<Vec<u8>>>>,
{
    if let Some(tx) = sessions.lock().await.get(key).cloned() {
        if tx.send(payload.clone()).await.is_ok() {
            return Ok(());
        }
        sessions.lock().await.remove(key);
    }

    let tx = create().await?;
    tx.send(payload)
        .await
        .with_context(|| format!("UDP session for {key} closed before sending payload"))?;
    sessions.lock().await.insert(key.to_owned(), tx);
    Ok(())
}

async fn create_direct_udp_session(
    target: TargetAddr,
    relay: Arc<UdpSocket>,
    client_addr: Arc<Mutex<Option<SocketAddr>>>,
    tasks: BackgroundTasks,
) -> Result<mpsc::Sender<Vec<u8>>> {
    let outbound = Arc::new(
        UdpSocket::bind(target_bind_addr(&target))
            .await
            .with_context(|| format!("failed to bind direct UDP socket for {}", target))?,
    );
    outbound
        .connect(target.to_string())
        .await
        .with_context(|| format!("failed to connect direct UDP socket for {}", target))?;

    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(256);
    let response_target = target.clone();
    let handle = tokio::spawn(async move {
        let mut buf = vec![0_u8; udp::MAX_UDP_FRAME_SIZE];
        loop {
            tokio::select! {
                maybe = rx.recv() => {
                    match maybe {
                        Some(payload) => {
                            if let Err(err) = outbound.send(&payload).await {
                                warn!(target = %response_target, error = %err, "direct UDP send failed");
                                break;
                            }
                        }
                        None => break,
                    }
                }
                result = outbound.recv(&mut buf) => {
                    match result {
                        Ok(n) => {
                            if let Err(err) = forward_udp_response(&relay, &client_addr, &response_target, &buf[..n]).await {
                                warn!(target = %response_target, error = %err, "direct UDP response forwarding failed");
                                break;
                            }
                        }
                        Err(err) => {
                            warn!(target = %response_target, error = %err, "direct UDP receive failed");
                            break;
                        }
                    }
                }
            }
        }
    });

    tasks.lock().await.push(handle);
    Ok(tx)
}

#[allow(clippy::too_many_arguments)]
async fn create_remote_udp_session(
    target: TargetAddr,
    relay: Arc<UdpSocket>,
    client_addr: Arc<Mutex<Option<SocketAddr>>>,
    tasks: BackgroundTasks,
    args: ClientRuntime,
    connector: TlsConnector,
    host_header: String,
    server_name: String,
) -> Result<mpsc::Sender<Vec<u8>>> {
    let target_string = target.to_string();
    let tunnel = establish_remote_tunnel(
        &args,
        &connector,
        &host_header,
        &server_name,
        &target_string,
        TunnelTransport::Udp,
    )
    .await
    .with_context(|| format!("failed to establish remote UDP tunnel for {target_string}"))?;

    let (mut reader, writer) = tokio::io::split(tunnel);
    let mut writer = writer;
    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(256);
    let response_target = target.clone();

    let read_handle = tokio::spawn({
        let relay = relay.clone();
        let client_addr = client_addr.clone();
        let response_target = response_target.clone();
        async move {
            loop {
                match udp::read_frame(&mut reader, udp::MAX_UDP_FRAME_SIZE).await {
                    Ok(payload) => {
                        if let Err(err) =
                            forward_udp_response(&relay, &client_addr, &response_target, &payload)
                                .await
                        {
                            warn!(target = %response_target, error = %err, "remote UDP response forwarding failed");
                            break;
                        }
                    }
                    Err(err) => {
                        if udp::is_eof(&err) {
                            debug!(target = %response_target, "remote UDP tunnel closed");
                        } else {
                            warn!(target = %response_target, error = %err, "remote UDP receive failed");
                        }
                        break;
                    }
                }
            }
        }
    });

    let write_handle = tokio::spawn(async move {
        while let Some(payload) = rx.recv().await {
            if let Err(err) = udp::write_frame(&mut writer, &payload).await {
                warn!(target = %response_target, error = %err, "remote UDP send failed");
                return;
            }
        }
        let _ = writer.shutdown().await;
    });

    let mut handles = tasks.lock().await;
    handles.push(read_handle);
    handles.push(write_handle);
    Ok(tx)
}

#[allow(clippy::too_many_arguments)]
async fn create_remote_tcp_dns_session(
    response_target: TargetAddr,
    upstream_target: TargetAddr,
    relay: Arc<UdpSocket>,
    client_addr: Arc<Mutex<Option<SocketAddr>>>,
    tasks: BackgroundTasks,
    args: ClientRuntime,
    connector: TlsConnector,
    host_header: String,
    server_name: String,
) -> Result<mpsc::Sender<Vec<u8>>> {
    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(256);
    let handle = tokio::spawn(async move {
        while let Some(payload) = rx.recv().await {
            match exchange_remote_dns_over_tcp(
                &args,
                &connector,
                &host_header,
                &server_name,
                &upstream_target,
                &payload,
            )
            .await
            {
                Ok(response) => {
                    if let Err(err) =
                        forward_udp_response(&relay, &client_addr, &response_target, &response)
                            .await
                    {
                        warn!(
                            target = %response_target,
                            upstream = %upstream_target,
                            error = %err,
                            "remote TCP DNS response forwarding failed"
                        );
                        break;
                    }
                }
                Err(err) => {
                    warn!(
                        target = %response_target,
                        upstream = %upstream_target,
                        error = %err,
                        "remote TCP DNS exchange failed"
                    );
                }
            }
        }
    });

    tasks.lock().await.push(handle);
    Ok(tx)
}

async fn exchange_remote_dns_over_tcp(
    args: &ClientRuntime,
    connector: &TlsConnector,
    host_header: &str,
    server_name: &str,
    upstream_target: &TargetAddr,
    payload: &[u8],
) -> Result<Vec<u8>> {
    let target_string = upstream_target.to_string();
    let mut tunnel = establish_remote_tunnel(
        args,
        connector,
        host_header,
        server_name,
        &target_string,
        TunnelTransport::Tcp,
    )
    .await
    .with_context(|| format!("failed to establish remote TCP DNS tunnel for {target_string}"))?;

    if payload.len() > u16::MAX as usize {
        bail!("DNS payload exceeded {} bytes", u16::MAX);
    }
    tunnel
        .write_all(&(payload.len() as u16).to_be_bytes())
        .await
        .context("failed to write TCP DNS request length")?;
    tunnel
        .write_all(payload)
        .await
        .context("failed to write TCP DNS request body")?;
    tunnel
        .flush()
        .await
        .context("failed to flush TCP DNS request")?;

    let mut length = [0_u8; 2];
    tunnel
        .read_exact(&mut length)
        .await
        .context("failed to read TCP DNS response length")?;
    let response_len = u16::from_be_bytes(length) as usize;
    let mut response = vec![0_u8; response_len];
    tunnel
        .read_exact(&mut response)
        .await
        .context("failed to read TCP DNS response body")?;
    Ok(response)
}

async fn forward_udp_response(
    relay: &UdpSocket,
    client_addr: &Mutex<Option<SocketAddr>>,
    target: &TargetAddr,
    payload: &[u8],
) -> Result<()> {
    let packet = socks5::build_udp_packet(target, payload);
    let client = match *client_addr.lock().await {
        Some(addr) => addr,
        None => return Ok(()),
    };

    relay
        .send_to(&packet, client)
        .await
        .with_context(|| format!("failed to forward UDP response for {}", target))?;
    Ok(())
}

async fn wait_for_control_close(stream: &mut TcpStream) -> Result<()> {
    let mut buf = [0_u8; 1];
    loop {
        if stream.read(&mut buf).await? == 0 {
            return Ok(());
        }
    }
}

async fn abort_background_tasks(tasks: BackgroundTasks) {
    let mut tasks = tasks.lock().await;
    for handle in tasks.drain(..) {
        handle.abort();
    }
}

fn target_bind_addr(target: &TargetAddr) -> SocketAddr {
    match target {
        TargetAddr::Ip(IpAddr::V6(_), _) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
        _ => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
    }
}