semipublic_mod! {
#[cfg(feature="http-connect")]
mod http_connect;
mod socks;
pub(crate) mod port_info;
}
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, Error as IoError};
use futures::stream::StreamExt;
use std::net::IpAddr;
use std::sync::Arc;
use tor_basic_utils::error_sources::ErrorSources;
use tor_rtcompat::{NetStreamProvider, SpawnExt};
use tracing::{debug, error, info, instrument, warn};
#[allow(unused)]
use arti_client::HasKind;
use arti_client::TorClient;
#[cfg(feature = "rpc")]
use arti_rpcserver::RpcMgr;
use tor_config::Listen;
use tor_error::warn_report;
use tor_rtcompat::{NetStreamListener, Runtime};
use tor_socksproto::SocksAuth;
use anyhow::{Context, Result, anyhow};
#[cfg(not(feature = "rpc"))]
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
pub(crate) enum RpcMgr {}
#[derive(Debug, Clone, PartialEq, Eq)]
struct StreamIsolationKey(ListenerIsolation, ProvidedIsolation);
#[derive(Debug, Clone, PartialEq, Eq)]
enum ProvidedIsolation {
LegacySocks(SocksAuth),
ExtendedSocks {
format_code: u8,
isolation: Box<[u8]>,
},
#[cfg(feature = "http-connect")]
Http(http_connect::Isolation),
}
impl arti_client::isolation::IsolationHelper for StreamIsolationKey {
fn compatible_same_type(&self, other: &Self) -> bool {
self == other
}
fn join_same_type(&self, other: &Self) -> Option<Self> {
if self == other {
Some(self.clone())
} else {
None
}
}
fn enables_long_lived_circuits(&self) -> bool {
use ProvidedIsolation as PI;
use SocksAuth as SA;
match &self.1 {
PI::LegacySocks(SA::Socks4(auth)) => !auth.is_empty(),
PI::LegacySocks(SA::Username(uname, pass)) => !(uname.is_empty() && pass.is_empty()),
PI::LegacySocks(_) => false,
PI::ExtendedSocks { isolation, .. } => !isolation.is_empty(),
#[cfg(feature = "http-connect")]
PI::Http(isolation) => !isolation.is_empty(),
}
}
}
const APP_STREAM_BUF_LEN: usize = 4096;
const _: () = {
assert!(APP_STREAM_BUF_LEN >= tor_socksproto::SOCKS_BUF_LEN);
};
#[cfg(feature = "rpc")]
#[allow(dead_code)]
mod socks_and_rpc {}
struct ProxyContext<R: Runtime> {
tor_client: TorClient<R>,
#[cfg(feature = "rpc")]
rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
}
type ListenerIsolation = (usize, IpAddr);
async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
where
W: AsyncWrite + Unpin,
{
writer
.write_all(buf)
.await
.context("Error while writing proxy reply")?;
writer
.flush()
.await
.context("Error while flushing proxy stream")
}
async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
where
W: AsyncWrite + Unpin,
{
writer
.write_all(buf)
.await
.context("Error while writing proxy reply")?;
writer
.close()
.await
.context("Error while closing proxy stream")
}
fn accept_err_is_fatal(err: &IoError) -> bool {
#![allow(clippy::match_like_matches_macro)]
#[cfg(windows)]
const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
match err.raw_os_error() {
#[cfg(unix)]
Some(libc::EMFILE) | Some(libc::ENFILE) => false,
#[cfg(windows)]
Some(WSAEMFILE) => false,
_ => true,
}
}
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
#[must_use]
pub(crate) struct StreamProxy<R: Runtime> {
tor_client: TorClient<R>,
listeners: Vec<<R as NetStreamProvider>::Listener>,
rpc_mgr: Option<Arc<RpcMgr>>,
}
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
#[instrument(skip_all, level = "trace")]
pub(crate) async fn bind_proxy<R: Runtime>(
runtime: R,
tor_client: TorClient<R>,
listen: Listen,
rpc_mgr: Option<Arc<RpcMgr>>,
) -> Result<StreamProxy<R>> {
if !listen.is_loopback_only() {
warn!(
"Configured to listen for proxy connections on non-local addresses. \
This is usually insecure! We recommend listening on localhost only."
);
}
let mut listeners = Vec::new();
match listen.ip_addrs() {
Ok(addrgroups) => {
for addrgroup in addrgroups {
for addr in addrgroup {
match runtime.listen(&addr).await {
Ok(listener) => {
let bound_addr = listener.local_addr()?;
info!("Listening on {:?}", bound_addr);
listeners.push(listener);
}
#[cfg(unix)]
Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
warn_report!(e, "Address family not supported {}", addr);
}
Err(ref e) => {
return Err(anyhow!("Can't listen on {}: {e}", addr));
}
}
}
}
}
Err(e) => warn_report!(e, "Invalid listen spec"),
}
if listeners.is_empty() {
error!("Couldn't open any listeners.");
return Err(anyhow!("Couldn't open listeners"));
}
Ok(StreamProxy {
tor_client,
listeners,
rpc_mgr,
})
}
impl<R: Runtime> StreamProxy<R> {
pub(crate) async fn run_proxy(self) -> Result<()> {
let StreamProxy {
tor_client,
listeners,
rpc_mgr,
} = self;
run_proxy_with_listeners(tor_client, listeners, rpc_mgr).await
}
pub(crate) fn port_info(&self) -> Result<Vec<port_info::Port>> {
let mut ports = Vec::new();
for listener in &self.listeners {
let address = listener.local_addr()?;
ports.extend([
port_info::Port {
protocol: port_info::SupportedProtocol::Socks,
address: address.into(),
},
#[cfg(feature = "http-connect")]
port_info::Port {
protocol: port_info::SupportedProtocol::Http,
address: address.into(),
},
]);
}
Ok(ports)
}
}
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
#[instrument(skip_all, level = "trace")]
pub(crate) async fn run_proxy_with_listeners<R: Runtime>(
tor_client: TorClient<R>,
listeners: Vec<<R as tor_rtcompat::NetStreamProvider>::Listener>,
rpc_mgr: Option<Arc<RpcMgr>>,
) -> Result<()> {
let mut incoming = futures::stream::select_all(
listeners
.into_iter()
.map(NetStreamListener::incoming)
.enumerate()
.map(|(listener_id, incoming_conns)| {
incoming_conns.map(move |socket| (socket, listener_id))
}),
);
while let Some((stream, sock_id)) = incoming.next().await {
let (stream, addr) = match stream {
Ok((s, a)) => (s, a),
Err(err) => {
if accept_err_is_fatal(&err) {
return Err(err).context("Failed to receive incoming stream on proxy port");
} else {
warn_report!(err, "Incoming stream failed");
continue;
}
}
};
let proxy_context = ProxyContext {
tor_client: tor_client.clone(),
#[cfg(feature = "rpc")]
rpc_mgr: rpc_mgr.clone(),
};
tor_client.runtime().spawn(async move {
let res = handle_proxy_conn(proxy_context, stream, (sock_id, addr.ip())).await;
if let Err(e) = res {
report_proxy_error(e);
}
})?;
}
Ok(())
}
enum ProxyProtocols {
Http1,
Socks,
}
fn classify_protocol_from_first_byte(byte: u8) -> Option<ProxyProtocols> {
match byte {
b'a'..=b'z' | b'A'..=b'Z' => Some(ProxyProtocols::Http1),
4 | 5 => Some(ProxyProtocols::Socks),
_ => None,
}
}
async fn handle_proxy_conn<R, S>(
context: ProxyContext<R>,
stream: S,
isolation_info: ListenerIsolation,
) -> Result<()>
where
R: Runtime,
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let mut stream = BufReader::with_capacity(APP_STREAM_BUF_LEN, stream);
use futures::AsyncBufReadExt as _;
let buf: &[u8] = stream.fill_buf().await?;
if buf.is_empty() {
return Ok(());
}
match classify_protocol_from_first_byte(buf[0]) {
Some(ProxyProtocols::Http1) => {
cfg_if::cfg_if! {
if #[cfg(feature="http-connect")] {
http_connect::handle_http_conn(context, stream, isolation_info).await
} else {
write_all_and_close(&mut stream, socks::WRONG_PROTOCOL_PAYLOAD).await?;
Ok(())
}
}
}
Some(ProxyProtocols::Socks) => {
socks::handle_socks_conn(context, stream, isolation_info).await
}
None => {
warn!(
"Unrecognized protocol on proxy listener (first byte {:x})",
buf[0]
);
Ok(())
}
}
}
fn extract_proto_err<'a>(
error: &'a (dyn std::error::Error + 'static),
) -> Option<&'a tor_proto::Error> {
for error in ErrorSources::new(error) {
if let Some(downcast) = error.downcast_ref::<tor_proto::Error>() {
return Some(downcast);
}
}
None
}
fn report_proxy_error(e: anyhow::Error) {
use tor_proto::Error as PE;
match extract_proto_err(e.as_ref()) {
Some(PE::CircuitClosed) => debug!("Connection exited with circuit close"),
_ => warn!("connection exited with error: {}", tor_error::Report(e)),
}
}