Skip to main content

onionize/
proxy.rs

1// src/proxy.rs
2use anyhow::{Context, Result};
3use futures::task::SpawnExt;
4use futures::{AsyncRead, AsyncReadExt, AsyncWrite, Stream, StreamExt};
5use rust_i18n::t;
6use std::net::SocketAddr;
7use std::str::FromStr;
8use tor_hsservice::RendRequest;
9use tor_rtcompat::Runtime;
10use tracing::{debug, info, warn};
11
12/// Runs the main proxy loop, accepting incoming Tor connections.
13///
14/// This function continuously listens for incoming rendezvous requests from the Tor network,
15/// accepts them, and spawns a new task to handle each connection.
16///
17/// # Arguments
18///
19/// * `runtime` - The runtime used to spawn tasks and connect to local sockets.
20/// * `rendezvous_requests` - The stream of incoming requests from the Onion Service.
21/// * `local_target` - The local address to forward traffic to (e.g., "127.0.0.1:8080").
22pub async fn run_proxy_loop<R>(
23    runtime: R,
24    mut rendezvous_requests: impl Stream<Item = RendRequest> + Unpin,
25    local_target: &str,
26) where
27    R: Runtime,
28{
29    let target_addr = local_target.to_string();
30
31    while let Some(rendezvous_req) = rendezvous_requests.next().await {
32        let mut stream_requests = match rendezvous_req.accept().await {
33            Ok(stream) => stream,
34            Err(e) => {
35                debug!("{}", t!("proxy.errors.stream_req", req_err = e));
36                continue;
37            }
38        };
39
40        let target = target_addr.clone();
41        let rt_clone = runtime.clone();
42
43        let spawn_res = runtime.spawn(async move {
44            while let Some(stream_req) = stream_requests.next().await {
45                warn!("{}", t!("proxy.connect"));
46
47                let tor_stream = match stream_req
48                    .accept(tor_cell::relaycell::msg::Connected::new_empty())
49                    .await
50                {
51                    Ok(s) => s,
52                    Err(e) => {
53                        warn!("{}", t!("proxy.errors.client_error", err = e));
54                        continue;
55                    }
56                };
57
58                let t_addr = target.clone();
59                let rt_inner = rt_clone.clone();
60
61                let inner_spawn_res = rt_clone.spawn(async move {
62                    if let Err(e) = handle_connection(rt_inner, tor_stream, &t_addr).await
63                        && !e.to_string().contains("END cell with reason MISC")
64                    {
65                        warn!("{}", t!("proxy.errors.proxy_error", error = e));
66                    }
67                });
68
69                if let Err(e) = inner_spawn_res {
70                    warn!("{}: {}", t!("proxy.errors.proxy"), e);
71                }
72            }
73        });
74
75        if let Err(e) = spawn_res {
76            warn!("{}: {}", t!("proxy.errors.task"), e);
77        }
78    }
79}
80
81/// Handles a single connection by bridging a Tor stream and a local TCP socket.
82///
83/// This function establishes a connection to the `local_target` and copies data
84/// bidirectionally between the Tor stream and the local socket until one side closes.
85///
86/// # Arguments
87///
88/// * `runtime` - The runtime used to initiate the local TCP connection.
89/// * `tor_stream` - The incoming stream from the Tor network.
90/// * `local_target` - The address of the local service.
91pub async fn handle_connection<R, S>(runtime: R, tor_stream: S, local_target: &str) -> Result<()>
92where
93    R: Runtime,
94    S: AsyncRead + AsyncWrite + Unpin,
95{
96    debug!("Proxing to {}... ", local_target);
97    let addr: SocketAddr = SocketAddr::from_str(local_target)
98        .with_context(|| t!("proxy.errors.local_address", target = local_target))?;
99
100    //
101    let local_stream = runtime
102        .connect(&addr)
103        .await
104        .with_context(|| t!("errors.service_unreachable", target = local_target))?;
105
106    let (mut r_tor, mut w_tor) = tor_stream.split();
107    let (mut r_loc, mut w_loc) = local_stream.split();
108
109    // futures::io::copy work with AsyncRead/AsyncWrite
110    let client_to_server = futures::io::copy(&mut r_tor, &mut w_loc);
111    let server_to_client = futures::io::copy(&mut r_loc, &mut w_tor);
112
113    // Run both directions concurrently
114    let (up, down) = futures::future::try_join(client_to_server, server_to_client).await?;
115
116    info!("Stream closed. Up: {} B, Down: {} B", up, down);
117    Ok(())
118}