1use anyhow::{Context, Result};
3use futures::task::SpawnExt;
4use futures::{AsyncRead, AsyncReadExt, AsyncWrite, Stream, StreamExt};
5use rust_i18n::t;
6use std::net::SocketAddr;
7use std::str::FromStr;
8use tor_hsservice::RendRequest;
9use tor_rtcompat::Runtime;
10use tracing::{debug, info, warn};
11
12pub async fn run_proxy_loop<R>(
23 runtime: R,
24 mut rendezvous_requests: impl Stream<Item = RendRequest> + Unpin,
25 local_target: &str,
26) where
27 R: Runtime,
28{
29 let target_addr = local_target.to_string();
30
31 while let Some(rendezvous_req) = rendezvous_requests.next().await {
32 let mut stream_requests = match rendezvous_req.accept().await {
33 Ok(stream) => stream,
34 Err(e) => {
35 debug!("{}", t!("proxy.errors.stream_req", req_err = e));
36 continue;
37 }
38 };
39
40 let target = target_addr.clone();
41 let rt_clone = runtime.clone();
42
43 let spawn_res = runtime.spawn(async move {
44 while let Some(stream_req) = stream_requests.next().await {
45 warn!("{}", t!("proxy.connect"));
46
47 let tor_stream = match stream_req
48 .accept(tor_cell::relaycell::msg::Connected::new_empty())
49 .await
50 {
51 Ok(s) => s,
52 Err(e) => {
53 warn!("{}", t!("proxy.errors.client_error", err = e));
54 continue;
55 }
56 };
57
58 let t_addr = target.clone();
59 let rt_inner = rt_clone.clone();
60
61 let inner_spawn_res = rt_clone.spawn(async move {
62 if let Err(e) = handle_connection(rt_inner, tor_stream, &t_addr).await
63 && !e.to_string().contains("END cell with reason MISC")
64 {
65 warn!("{}", t!("proxy.errors.proxy_error", error = e));
66 }
67 });
68
69 if let Err(e) = inner_spawn_res {
70 warn!("{}: {}", t!("proxy.errors.proxy"), e);
71 }
72 }
73 });
74
75 if let Err(e) = spawn_res {
76 warn!("{}: {}", t!("proxy.errors.task"), e);
77 }
78 }
79}
80
81pub async fn handle_connection<R, S>(runtime: R, tor_stream: S, local_target: &str) -> Result<()>
92where
93 R: Runtime,
94 S: AsyncRead + AsyncWrite + Unpin,
95{
96 debug!("Proxing to {}... ", local_target);
97 let addr: SocketAddr = SocketAddr::from_str(local_target)
98 .with_context(|| t!("proxy.errors.local_address", target = local_target))?;
99
100 let local_stream = runtime
102 .connect(&addr)
103 .await
104 .with_context(|| t!("errors.service_unreachable", target = local_target))?;
105
106 let (mut r_tor, mut w_tor) = tor_stream.split();
107 let (mut r_loc, mut w_loc) = local_stream.split();
108
109 let client_to_server = futures::io::copy(&mut r_tor, &mut w_loc);
111 let server_to_client = futures::io::copy(&mut r_loc, &mut w_tor);
112
113 let (up, down) = futures::future::try_join(client_to_server, server_to_client).await?;
115
116 info!("Stream closed. Up: {} B, Down: {} B", up, down);
117 Ok(())
118}