use futures::future::FutureExt;
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error as IoError};
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use safelog::sensitive;
use std::io::Result as IoResult;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use tracing::{debug, error, info, warn};
use arti_client::{ErrorKind, HasKind, StreamPrefs, TorClient};
use tor_rtcompat::{Runtime, TcpListener};
use tor_socksproto::{SocksAddr, SocksAuth, SocksCmd, SocksRequest};
use anyhow::{anyhow, Context, Result};
const WRONG_PROTOCOL_PAYLOAD: &[u8] = br#"HTTP/1.0 501 Tor is not an HTTP Proxy
Content-Type: text/html; charset=utf-8
<!DOCTYPE html>
<html>
<head>
<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
</head>
<body>
<h1>This is a SOCKs proxy, not an HTTP proxy.</h1>
<p>
It appears you have configured your web browser to use this Tor port as
an HTTP proxy.
</p><p>
This is not correct: This port is configured as a SOCKS proxy, not
an HTTP proxy. If you need an HTTP proxy tunnel, wait for Arti to
add support for it in place of, or in addition to, socks_port.
Please configure your client accordingly.
</p>
<p>
See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
</p>
</body>
</html>"#;
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
let mut prefs = StreamPrefs::new();
if addr.parse::<Ipv4Addr>().is_ok() {
prefs.ipv4_only();
} else if addr.parse::<Ipv6Addr>().is_ok() {
prefs.ipv6_only();
} else if req.version() == tor_socksproto::SocksVersion::V4 {
prefs.ipv4_only();
} else {
prefs.ipv4_preferred();
}
prefs
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct SocksIsolationKey(usize, IpAddr, SocksAuth);
impl arti_client::isolation::IsolationHelper for SocksIsolationKey {
fn compatible_same_type(&self, other: &Self) -> bool {
self == other
}
fn join_same_type(&self, other: &Self) -> Option<Self> {
if self == other {
Some(self.clone())
} else {
None
}
}
}
async fn handle_socks_conn<R, S>(
runtime: R,
tor_client: TorClient<R>,
socks_stream: S,
isolation_info: (usize, IpAddr),
) -> Result<()>
where
R: Runtime,
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let mut handshake = tor_socksproto::SocksProxyHandshake::new();
let (mut socks_r, mut socks_w) = socks_stream.split();
let mut inbuf = [0_u8; 1024];
let mut n_read = 0;
let request = loop {
n_read += socks_r
.read(&mut inbuf[n_read..])
.await
.context("Error while reading SOCKS handshake")?;
let action = match handshake.handshake(&inbuf[..n_read]) {
Err(_) => continue, Ok(Err(e)) => {
if let tor_socksproto::Error::BadProtocol(version) = e {
if [b'C', b'D', b'G', b'H', b'O', b'P', b'T'].contains(&version) {
write_all_and_close(&mut socks_w, WRONG_PROTOCOL_PAYLOAD).await?;
}
}
return Err(e.into());
}
Ok(Ok(action)) => action,
};
if action.drain > 0 {
inbuf.copy_within(action.drain..action.drain + n_read, 0);
n_read -= action.drain;
}
if !action.reply.is_empty() {
write_all_and_flush(&mut socks_w, &action.reply).await?;
}
if action.finished {
break handshake.into_request();
}
};
let request = match request {
Some(r) => r,
None => {
warn!("SOCKS handshake succeeded, but couldn't convert into a request.");
return Ok(());
}
};
let addr = request.addr().to_string();
let port = request.port();
debug!(
"Got a socks request: {} {}:{}",
request.command(),
sensitive(&addr),
port
);
let auth = request.auth().clone();
let (source_address, ip) = isolation_info;
let mut prefs = stream_preference(&request, &addr);
prefs.set_isolation(SocksIsolationKey(source_address, ip, auth));
match request.command() {
SocksCmd::CONNECT => {
let tor_stream = tor_client
.connect_with_prefs((addr.clone(), port), &prefs)
.await;
let tor_stream = match tor_stream {
Ok(s) => s,
Err(e) => return reply_error(&mut socks_w, &request, e).await,
};
debug!("Got a stream for {}:{}", sensitive(&addr), port);
let reply = request
.reply(tor_socksproto::SocksStatus::SUCCEEDED, None)
.context("Encoding socks reply")?;
write_all_and_flush(&mut socks_w, &reply[..]).await?;
let (tor_r, tor_w) = tor_stream.split();
runtime.spawn(copy_interactive(socks_r, tor_w).map(|_| ()))?;
runtime.spawn(copy_interactive(tor_r, socks_w).map(|_| ()))?;
}
SocksCmd::RESOLVE => {
let addrs = match tor_client.resolve_with_prefs(&addr, &prefs).await {
Ok(addrs) => addrs,
Err(e) => return reply_error(&mut socks_w, &request, e).await,
};
if let Some(addr) = addrs.first() {
let reply = request
.reply(
tor_socksproto::SocksStatus::SUCCEEDED,
Some(&SocksAddr::Ip(*addr)),
)
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_w, &reply[..]).await?;
}
}
SocksCmd::RESOLVE_PTR => {
let addr: IpAddr = match addr.parse() {
Ok(ip) => ip,
Err(e) => {
let reply = request
.reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None)
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_w, &reply[..]).await?;
return Err(anyhow!(e));
}
};
let hosts = match tor_client.resolve_ptr_with_prefs(addr, &prefs).await {
Ok(hosts) => hosts,
Err(e) => return reply_error(&mut socks_w, &request, e).await,
};
if let Some(host) = hosts.into_iter().next() {
let hostname = SocksAddr::Hostname(host.try_into()?);
let reply = request
.reply(tor_socksproto::SocksStatus::SUCCEEDED, Some(&hostname))
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_w, &reply[..]).await?;
}
}
_ => {
warn!("Dropping request; {:?} is unsupported", request.command());
let reply = request
.reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None)
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_w, &reply[..]).await?;
}
};
Ok(())
}
async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
where
W: AsyncWrite + Unpin,
{
writer
.write_all(buf)
.await
.context("Error while writing SOCKS reply")?;
writer
.flush()
.await
.context("Error while flushing SOCKS stream")
}
async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
where
W: AsyncWrite + Unpin,
{
writer
.write_all(buf)
.await
.context("Error while writing SOCKS reply")?;
writer
.close()
.await
.context("Error while closing SOCKS stream")
}
async fn reply_error<W>(
writer: &mut W,
request: &SocksRequest,
error: arti_client::Error,
) -> Result<()>
where
W: AsyncWrite + Unpin,
{
let reply = match error.kind() {
ErrorKind::RemoteNetworkTimeout => {
request.reply(tor_socksproto::SocksStatus::TTL_EXPIRED, None)
}
_ => request.reply(tor_socksproto::SocksStatus::GENERAL_FAILURE, None),
}
.context("Encoding socks reply")?;
let _ = write_all_and_close(writer, &reply[..]).await;
Err(anyhow!(error))
}
async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
use futures::{poll, task::Poll};
let mut buf = [0_u8; 1024];
let loop_result: IoResult<()> = loop {
let mut read_future = reader.read(&mut buf[..]);
match poll!(&mut read_future) {
Poll::Ready(Err(e)) => break Err(e),
Poll::Ready(Ok(0)) => break Ok(()), Poll::Ready(Ok(n)) => {
writer.write_all(&buf[..n]).await?;
continue;
}
Poll::Pending => writer.flush().await?,
}
match read_future.await {
Err(e) => break Err(e),
Ok(0) => break Ok(()),
Ok(n) => writer.write_all(&buf[..n]).await?,
}
};
let flush_result = if loop_result.is_ok() {
writer.close().await
} else {
writer.flush().await
};
loop_result.or(flush_result)
}
fn accept_err_is_fatal(err: &IoError) -> bool {
#![allow(clippy::match_like_matches_macro)]
#[cfg(windows)]
const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
match err.raw_os_error() {
#[cfg(unix)]
Some(libc::EMFILE) | Some(libc::ENFILE) => false,
#[cfg(windows)]
Some(WSAEMFILE) => false,
_ => true,
}
}
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
pub(crate) async fn run_socks_proxy<R: Runtime>(
runtime: R,
tor_client: TorClient<R>,
socks_port: u16,
) -> Result<()> {
let mut listeners = Vec::new();
let localhosts: [IpAddr; 2] = [Ipv4Addr::LOCALHOST.into(), Ipv6Addr::LOCALHOST.into()];
for localhost in &localhosts {
let addr: SocketAddr = (*localhost, socks_port).into();
match runtime.listen(&addr).await {
Ok(listener) => {
info!("Listening on {:?}.", addr);
listeners.push(listener);
}
Err(e) => warn!("Can't listen on {}: {}", addr, e),
}
}
if listeners.is_empty() {
error!("Couldn't open any SOCKS listeners.");
return Err(anyhow!("Couldn't open SOCKS listeners"));
}
let mut incoming = futures::stream::select_all(
listeners
.into_iter()
.map(TcpListener::incoming)
.enumerate()
.map(|(listener_id, incoming_conns)| {
incoming_conns.map(move |socket| (socket, listener_id))
}),
);
while let Some((stream, sock_id)) = incoming.next().await {
let (stream, addr) = match stream {
Ok((s, a)) => (s, a),
Err(err) => {
if accept_err_is_fatal(&err) {
return Err(err).context("Failed to receive incoming stream on SOCKS port");
} else {
warn!("Incoming stream failed: {}", err);
continue;
}
}
};
let client_ref = tor_client.clone();
let runtime_copy = runtime.clone();
runtime.spawn(async move {
let res =
handle_socks_conn(runtime_copy, client_ref, stream, (sock_id, addr.ip())).await;
if let Err(e) = res {
warn!("connection exited with error: {}", e);
}
})?;
}
Ok(())
}