use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufReader};
use safelog::sensitive;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use tracing::{debug, instrument, warn};
#[allow(unused)]
use arti_client::HasKind;
use arti_client::{ErrorKind, IntoTorAddr as _, StreamPrefs};
#[cfg(feature = "rpc")]
use tor_rpcbase::{self as rpc};
use tor_rtcompat::Runtime;
use tor_socksproto::{Handshake as _, SocksAddr, SocksAuth, SocksCmd, SocksRequest};
use anyhow::{Context, Result, anyhow};
use super::{
ListenerIsolation, ProvidedIsolation, ProxyContext, StreamIsolationKey, write_all_and_close,
write_all_and_flush,
};
cfg_if::cfg_if! {
if #[cfg(feature="rpc")] {
use crate::rpc::conntarget::ConnTarget;
} else {
use arti_client::TorClient;
type ConnTarget<R> = TorClient<R>;
}
}
#[cfg(not(feature = "http-connect"))]
pub(super) const WRONG_PROTOCOL_PAYLOAD: &[u8] = br#"HTTP/1.0 501 Not running as an HTTP Proxy
Content-Type: text/html; charset=utf-8
<!DOCTYPE html>
<html>
<head>
<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
</head>
<body>
<h1>This is a SOCKS proxy, not an HTTP proxy.</h1>
<p>
It appears you have configured your web browser to use this Tor port as
an HTTP proxy.
</p>
<p>
This is not correct: This port is configured as a SOCKS proxy, not
an HTTP proxy. If you need an HTTP proxy tunnel,
build Arti with the <code>http-connect</code> feature enabled.
</p>
<p>
See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
</p>
</body>
</html>"#;
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
let mut prefs = StreamPrefs::new();
if addr.parse::<Ipv4Addr>().is_ok() {
prefs.ipv4_only();
} else if addr.parse::<Ipv6Addr>().is_ok() {
prefs.ipv6_only();
} else if req.version() == tor_socksproto::SocksVersion::V4 {
prefs.ipv4_only();
} else {
prefs.ipv4_preferred();
}
prefs
}
struct AuthInterpretation {
#[cfg(feature = "rpc")]
rpc_object: Option<rpc::ObjectId>,
isolation: ProvidedIsolation,
}
fn interpret_socks_auth(auth: &SocksAuth) -> Result<AuthInterpretation> {
enum Uname<'a> {
Legacy,
Extended(u8, &'a [u8]),
}
fn interpret_socks5_username(username: &[u8]) -> Result<Uname<'_>> {
const SOCKS_EXT_CONST_ANY: &[u8] = b"<torS0X>";
let Some(remainder) = username.strip_prefix(SOCKS_EXT_CONST_ANY) else {
return Ok(Uname::Legacy);
};
let (format_code, remainder) = remainder
.split_at_checked(1)
.ok_or_else(|| anyhow!("Extended SOCKS information without format code."))?;
Ok(Uname::Extended(format_code[0], remainder))
}
let isolation = match auth {
SocksAuth::Username(user, pass) => match interpret_socks5_username(user)? {
Uname::Legacy => ProvidedIsolation::LegacySocks(auth.clone()),
Uname::Extended(b'1', b"") => {
return Err(anyhow!("Received empty RPC object ID"));
}
Uname::Extended(format_code @ b'1', remainder) => {
#[cfg(not(feature = "rpc"))]
return Err(anyhow!(
"Received RPC object ID, but not built with support for RPC"
));
#[cfg(feature = "rpc")]
return Ok(AuthInterpretation {
rpc_object: Some(rpc::ObjectId::from(
std::str::from_utf8(remainder).context("Rpc object ID was not utf-8")?,
)),
isolation: ProvidedIsolation::ExtendedSocks {
format_code,
isolation: pass.clone().into(),
},
});
}
Uname::Extended(format_code @ b'0', b"") => ProvidedIsolation::ExtendedSocks {
format_code,
isolation: pass.clone().into(),
},
Uname::Extended(b'0', _) => {
return Err(anyhow!("Extraneous information in SOCKS username field."));
}
_ => return Err(anyhow!("Unrecognized SOCKS format code")),
},
_ => ProvidedIsolation::LegacySocks(auth.clone()),
};
Ok(AuthInterpretation {
#[cfg(feature = "rpc")]
rpc_object: None,
isolation,
})
}
impl<R: Runtime> super::ProxyContext<R> {
fn get_prefs_and_session(
&self,
request: &SocksRequest,
target_addr: &str,
conn_isolation: ListenerIsolation,
) -> Result<(StreamPrefs, ConnTarget<R>)> {
let mut prefs = stream_preference(request, target_addr);
let interp = interpret_socks_auth(request.auth())?;
prefs.set_isolation(StreamIsolationKey(conn_isolation, interp.isolation));
#[cfg(feature = "rpc")]
if let Some(session) = interp.rpc_object {
if let Some(mgr) = &self.rpc_mgr {
let (context, object) = mgr
.lookup_object(&session)
.context("no such session found")?;
let target = ConnTarget::Rpc { context, object };
return Ok((prefs, target));
} else {
return Err(anyhow!("no rpc manager found!?"));
}
}
let client = self.tor_client.clone();
#[cfg(feature = "rpc")]
let client = ConnTarget::Client(Box::new(client));
Ok((prefs, client))
}
}
#[instrument(skip_all, level = "trace")]
pub(super) async fn handle_socks_conn<R, S>(
context: ProxyContext<R>,
mut socks_stream: BufReader<S>,
isolation_info: ListenerIsolation,
) -> Result<()>
where
R: Runtime,
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let mut handshake = tor_socksproto::SocksProxyHandshake::new();
let mut inbuf = tor_socksproto::Buffer::new();
let request = loop {
use tor_socksproto::NextStep as NS;
let step = handshake.step(&mut inbuf)?;
match step {
NS::Recv(mut recv) => {
let n = socks_stream
.read(recv.buf())
.await
.context("Error while reading SOCKS handshake")?;
recv.note_received(n)?;
}
NS::Send(data) => write_all_and_flush(&mut socks_stream, &data).await?,
NS::Finished(fin) => break fin.into_output_forbid_pipelining()?,
}
};
if !socks_stream.buffer().is_empty() {
let error = tor_socksproto::Error::ForbiddenPipelining;
return reply_error(&mut socks_stream, &request, error.kind()).await;
}
let addr = request.addr().to_string();
let port = request.port();
debug!(
"Got a socks request: {} {}:{}",
request.command(),
sensitive(&addr),
port
);
let (prefs, tor_client) = context.get_prefs_and_session(&request, &addr, isolation_info)?;
match request.command() {
SocksCmd::CONNECT => {
let tor_addr = (addr.clone(), port).into_tor_addr()?;
let tor_stream = tor_client.connect_with_prefs(&tor_addr, &prefs).await;
let tor_stream = match tor_stream {
Ok(s) => s,
Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
};
debug!("Got a stream for {}:{}", sensitive(&addr), port);
let reply = request
.reply(tor_socksproto::SocksStatus::SUCCEEDED, None)
.context("Encoding socks reply")?;
write_all_and_flush(&mut socks_stream, &reply[..]).await?;
let tor_stream = BufReader::with_capacity(super::APP_STREAM_BUF_LEN, tor_stream);
futures_copy::copy_buf_bidirectional(
socks_stream,
tor_stream,
futures_copy::eof::Close,
futures_copy::eof::Close,
)
.await?;
}
SocksCmd::RESOLVE => {
let addr = if let Ok(addr) = addr.parse() {
Ok(addr)
} else {
tor_client
.resolve_with_prefs(&addr, &prefs)
.await
.map_err(|e| e.kind())
.and_then(|addrs| addrs.first().copied().ok_or(ErrorKind::Other))
};
match addr {
Ok(addr) => {
let reply = request
.reply(
tor_socksproto::SocksStatus::SUCCEEDED,
Some(&SocksAddr::Ip(addr)),
)
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_stream, &reply[..]).await?;
}
Err(e) => return reply_error(&mut socks_stream, &request, e).await,
}
}
SocksCmd::RESOLVE_PTR => {
let addr: IpAddr = match addr.parse() {
Ok(ip) => ip,
Err(e) => {
let reply = request
.reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None)
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_stream, &reply[..]).await?;
return Err(anyhow!(e));
}
};
let hosts = match tor_client.resolve_ptr_with_prefs(addr, &prefs).await {
Ok(hosts) => hosts,
Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
};
if let Some(host) = hosts.into_iter().next() {
let hostname = SocksAddr::Hostname(host.try_into()?);
let reply = request
.reply(tor_socksproto::SocksStatus::SUCCEEDED, Some(&hostname))
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_stream, &reply[..]).await?;
}
}
_ => {
warn!("Dropping request; {:?} is unsupported", request.command());
let reply = request
.reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None)
.context("Encoding socks reply")?;
write_all_and_close(&mut socks_stream, &reply[..]).await?;
}
};
Ok(())
}
async fn reply_error<W>(
writer: &mut W,
request: &SocksRequest,
error: arti_client::ErrorKind,
) -> Result<()>
where
W: AsyncWrite + Unpin,
{
use {ErrorKind as EK, tor_socksproto::SocksStatus as S};
let status = match error {
EK::RemoteNetworkFailed => S::TTL_EXPIRED,
#[cfg(feature = "onion-service-client")]
EK::OnionServiceNotFound => S::HS_DESC_NOT_FOUND,
#[cfg(feature = "onion-service-client")]
EK::OnionServiceAddressInvalid => S::HS_BAD_ADDRESS,
#[cfg(feature = "onion-service-client")]
EK::OnionServiceMissingClientAuth => S::HS_MISSING_CLIENT_AUTH,
#[cfg(feature = "onion-service-client")]
EK::OnionServiceWrongClientAuth => S::HS_WRONG_CLIENT_AUTH,
#[cfg(feature = "onion-service-client")]
EK::OnionServiceNotRunning
| EK::OnionServiceConnectionFailed
| EK::OnionServiceProtocolViolation => S::HS_INTRO_FAILED,
_ => S::GENERAL_FAILURE,
};
let reply = request
.reply(status, None)
.context("Encoding socks reply")?;
let _ = write_all_and_close(writer, &reply[..]).await;
Err(anyhow!(error))
}