pub mod control;
pub mod stats;
#[cfg(test)]
mod mock_control;
#[cfg(test)]
mod mock_socks5;
use super::{
ConnectionState, DiscoveredPeer, PacketTx, ReceivedPacket, Transport, TransportAddr,
TransportError, TransportId, TransportState, TransportType,
};
use crate::config::TorConfig;
use crate::transport::tcp::stream::read_fmp_packet;
use control::{ControlAuth, TorControlClient, TorMonitoringInfo};
use stats::TorStats;
use futures::FutureExt;
use socket2::TcpKeepalive;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_socks::tcp::Socks5Stream;
use tracing::{debug, info, trace, warn};
#[derive(Clone, Debug)]
pub enum TorAddr {
Onion(String, u16),
Clearnet(SocketAddr),
ClearnetHostname(String, u16),
}
fn parse_tor_addr(addr: &TransportAddr) -> Result<TorAddr, TransportError> {
let s = addr.as_str().ok_or_else(|| {
TransportError::InvalidAddress("Tor address must be a valid UTF-8 string".into())
})?;
if s.contains(".onion:") {
let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
TransportError::InvalidAddress(format!("invalid onion address: {}", s))
})?;
let port: u16 = port_str.parse().map_err(|_| {
TransportError::InvalidAddress(format!("invalid port in onion address: {}", s))
})?;
Ok(TorAddr::Onion(host.to_string(), port))
} else if let Ok(socket_addr) = s.parse::<SocketAddr>() {
Ok(TorAddr::Clearnet(socket_addr))
} else {
let (host, port_str) = s.rsplit_once(':').ok_or_else(|| {
TransportError::InvalidAddress(format!("invalid address (expected host:port): {}", s))
})?;
let port: u16 = port_str
.parse()
.map_err(|_| TransportError::InvalidAddress(format!("invalid port: {}", s)))?;
if !host.contains('.') {
return Err(TransportError::InvalidAddress(format!(
"hostname must be fully qualified (contain a dot): {}",
host
)));
}
Ok(TorAddr::ClearnetHostname(host.to_string(), port))
}
}
struct TorConnection {
writer: Arc<Mutex<OwnedWriteHalf>>,
recv_task: JoinHandle<()>,
#[allow(dead_code)]
mtu: u16,
#[allow(dead_code)]
established_at: Instant,
}
type ConnectionPool = Arc<Mutex<HashMap<TransportAddr, TorConnection>>>;
struct ConnectingEntry {
task: JoinHandle<Result<(TcpStream, u16), TransportError>>,
}
type ConnectingPool = Arc<Mutex<HashMap<TransportAddr, ConnectingEntry>>>;
pub struct TorTransport {
transport_id: TransportId,
name: Option<String>,
config: TorConfig,
state: TransportState,
pool: ConnectionPool,
connecting: ConnectingPool,
packet_tx: PacketTx,
stats: Arc<TorStats>,
accept_task: Option<JoinHandle<()>>,
onion_address: Option<String>,
control_client: Option<Arc<Mutex<TorControlClient>>>,
cached_monitoring: Arc<std::sync::RwLock<Option<TorMonitoringInfo>>>,
monitoring_task: Option<JoinHandle<()>>,
}
impl TorTransport {
pub fn new(
transport_id: TransportId,
name: Option<String>,
config: TorConfig,
packet_tx: PacketTx,
) -> Self {
Self {
transport_id,
name,
config,
state: TransportState::Configured,
pool: Arc::new(Mutex::new(HashMap::new())),
connecting: Arc::new(Mutex::new(HashMap::new())),
packet_tx,
stats: Arc::new(TorStats::new()),
accept_task: None,
onion_address: None,
control_client: None,
cached_monitoring: Arc::new(std::sync::RwLock::new(None)),
monitoring_task: None,
}
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn onion_address(&self) -> Option<&str> {
self.onion_address.as_deref()
}
pub fn stats(&self) -> &Arc<TorStats> {
&self.stats
}
pub fn cached_monitoring(&self) -> Option<TorMonitoringInfo> {
self.cached_monitoring.read().ok()?.clone()
}
pub fn mode(&self) -> &str {
self.config.mode()
}
pub async fn start_async(&mut self) -> Result<(), TransportError> {
if !self.state.can_start() {
return Err(TransportError::AlreadyStarted);
}
self.state = TransportState::Starting;
let socks5_addr = self.config.socks5_addr().to_string();
validate_host_port(&socks5_addr, "socks5_addr")?;
let mode = self.config.mode().to_string();
match mode.as_str() {
"socks5" => {
if self.config.directory_service.is_some() {
return Err(TransportError::StartFailed(
"directory_service config requires mode 'directory', not 'socks5'".into(),
));
}
self.state = TransportState::Up;
}
"control_port" => {
self.start_control_port_mode().await?;
}
"directory" => {
self.start_directory_mode().await?;
}
other => {
return Err(TransportError::StartFailed(format!(
"unsupported Tor mode '{}' (expected 'socks5', 'control_port', or 'directory')",
other
)));
}
}
if let Some(ref name) = self.name {
info!(
name = %name,
mode = %mode,
socks5_addr = %socks5_addr,
onion_address = ?self.onion_address,
mtu = self.config.mtu(),
"Tor transport started"
);
} else {
info!(
mode = %mode,
socks5_addr = %socks5_addr,
onion_address = ?self.onion_address,
mtu = self.config.mtu(),
"Tor transport started"
);
}
Ok(())
}
async fn start_control_port_mode(&mut self) -> Result<(), TransportError> {
let control_addr = self.config.control_addr().to_string();
if !control_addr.starts_with('/') && !control_addr.starts_with("./") {
validate_host_port(&control_addr, "control_addr")?;
}
let mut client = TorControlClient::connect(&control_addr)
.await
.map_err(|e| {
self.stats.record_control_error();
TransportError::StartFailed(format!("Tor control port: {}", e))
})?;
let auth = ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path())
.map_err(|e| TransportError::StartFailed(format!("Tor auth config: {}", e)))?;
client.authenticate(&auth).await.map_err(|e| {
self.stats.record_control_error();
TransportError::StartFailed(format!("Tor authentication: {}", e))
})?;
self.control_client = Some(Arc::new(Mutex::new(client)));
self.state = TransportState::Up;
self.spawn_monitoring_task();
Ok(())
}
async fn start_directory_mode(&mut self) -> Result<(), TransportError> {
let dir_config = self.config.directory_service.clone().unwrap_or_default();
let hostname_file = dir_config.hostname_file();
let onion_addr = std::fs::read_to_string(hostname_file)
.map_err(|e| {
TransportError::StartFailed(format!(
"failed to read onion hostname from '{}': {} \
(ensure HiddenServiceDir is configured in torrc and Tor has started)",
hostname_file, e
))
})?
.trim()
.to_string();
if onion_addr.is_empty() || !onion_addr.ends_with(".onion") {
return Err(TransportError::StartFailed(format!(
"invalid onion address in '{}': '{}'",
hostname_file, onion_addr
)));
}
self.onion_address = Some(onion_addr.clone());
let bind_addr = dir_config.bind_addr();
let listener = TcpListener::bind(bind_addr).await.map_err(|e| {
TransportError::StartFailed(format!(
"failed to bind directory-mode listener on {}: {}",
bind_addr, e
))
})?;
let local_addr = listener
.local_addr()
.map_err(|e| TransportError::StartFailed(format!("failed to get local addr: {}", e)))?;
info!(
onion_address = %onion_addr,
local_addr = %local_addr,
hostname_file = %hostname_file,
"Directory-mode onion service active"
);
let transport_id = self.transport_id;
let packet_tx = self.packet_tx.clone();
let pool = self.pool.clone();
let mtu = self.config.mtu();
let max_inbound = self.config.max_inbound_connections();
let stats = self.stats.clone();
let accept_handle = tokio::spawn(async move {
tor_accept_loop(
listener,
transport_id,
packet_tx,
pool,
mtu,
max_inbound,
stats,
)
.await;
});
self.accept_task = Some(accept_handle);
self.state = TransportState::Up;
if self.config.control_addr.is_some() {
self.try_connect_control_port().await;
}
Ok(())
}
async fn try_connect_control_port(&mut self) {
let control_addr = self.config.control_addr().to_string();
if !control_addr.starts_with('/')
&& !control_addr.starts_with("./")
&& let Err(e) = validate_host_port(&control_addr, "control_addr")
{
warn!(
transport_id = %self.transport_id,
error = %e,
"Tor control port address invalid, monitoring disabled"
);
return;
}
let client = match TorControlClient::connect(&control_addr).await {
Ok(c) => c,
Err(e) => {
warn!(
transport_id = %self.transport_id,
addr = %control_addr,
error = %e,
"Tor control port connect failed, monitoring disabled"
);
return;
}
};
let auth =
match ControlAuth::from_config(self.config.control_auth(), self.config.cookie_path()) {
Ok(a) => a,
Err(e) => {
warn!(
transport_id = %self.transport_id,
error = %e,
"Tor control auth config error, monitoring disabled"
);
return;
}
};
let mut client = client;
if let Err(e) = client.authenticate(&auth).await {
warn!(
transport_id = %self.transport_id,
error = %e,
"Tor control port auth failed, monitoring disabled"
);
return;
}
info!(
transport_id = %self.transport_id,
addr = %control_addr,
"Tor control port connected (monitoring enabled)"
);
self.control_client = Some(Arc::new(Mutex::new(client)));
self.spawn_monitoring_task();
}
pub async fn stop_async(&mut self) -> Result<(), TransportError> {
if !self.state.is_operational() {
return Err(TransportError::NotStarted);
}
if let Some(task) = self.accept_task.take() {
task.abort();
let _ = task.await;
debug!(
transport_id = %self.transport_id,
"Onion service accept loop stopped"
);
}
if let Some(task) = self.monitoring_task.take() {
task.abort();
let _ = task.await;
}
if let Ok(mut w) = self.cached_monitoring.write() {
*w = None;
}
self.control_client = None;
self.onion_address = None;
let mut connecting = self.connecting.lock().await;
for (addr, entry) in connecting.drain() {
entry.task.abort();
debug!(
transport_id = %self.transport_id,
remote_addr = %addr,
"Tor connect aborted (transport stopping)"
);
}
drop(connecting);
let mut pool = self.pool.lock().await;
for (addr, conn) in pool.drain() {
conn.recv_task.abort();
let _ = conn.recv_task.await;
debug!(
transport_id = %self.transport_id,
remote_addr = %addr,
"Tor connection closed (transport stopping)"
);
}
drop(pool);
self.state = TransportState::Down;
info!(
transport_id = %self.transport_id,
"Tor transport stopped"
);
Ok(())
}
fn spawn_monitoring_task(&mut self) {
let Some(client) = self.control_client.clone() else {
return;
};
let cache = self.cached_monitoring.clone();
let stats = self.stats.clone();
let transport_id = self.transport_id;
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
let mut last_bootstrap: u8 = 0;
let mut last_liveness = String::new();
let mut was_dormant = false;
let mut stall_warned = false;
let started_at = Instant::now();
loop {
interval.tick().await;
let mut guard = client.lock().await;
match guard.monitoring_snapshot().await {
Ok(info) => {
for &milestone in &[25u8, 50, 75, 100] {
if info.bootstrap >= milestone && last_bootstrap < milestone {
info!(
transport_id = %transport_id,
bootstrap = info.bootstrap,
"Tor bootstrap {}%",
milestone
);
}
}
if info.bootstrap < 100
&& started_at.elapsed() > Duration::from_secs(60)
&& !stall_warned
{
warn!(
transport_id = %transport_id,
bootstrap = info.bootstrap,
"Tor bootstrap stalled — not at 100% after 60s"
);
stall_warned = true;
}
if info.bootstrap == 100 {
stall_warned = false;
}
last_bootstrap = info.bootstrap;
if !last_liveness.is_empty() && info.network_liveness != last_liveness {
warn!(
transport_id = %transport_id,
from = %last_liveness,
to = %info.network_liveness,
"Tor network liveness changed"
);
}
last_liveness = info.network_liveness.clone();
if info.dormant && !was_dormant {
warn!(
transport_id = %transport_id,
"Tor daemon entered dormant mode"
);
}
was_dormant = info.dormant;
if let Ok(mut w) = cache.write() {
*w = Some(info);
}
}
Err(e) => {
stats.record_control_error();
warn!(
transport_id = %transport_id,
error = %e,
"Tor monitoring query failed"
);
}
}
}
});
self.monitoring_task = Some(handle);
}
pub async fn send_async(
&self,
addr: &TransportAddr,
data: &[u8],
) -> Result<usize, TransportError> {
if !self.state.is_operational() {
return Err(TransportError::NotStarted);
}
let mtu = self.config.mtu() as usize;
if data.len() > mtu {
self.stats.record_mtu_exceeded();
return Err(TransportError::MtuExceeded {
packet_size: data.len(),
mtu: self.config.mtu(),
});
}
let writer = {
let pool = self.pool.lock().await;
pool.get(addr).map(|c| c.writer.clone())
};
let writer = match writer {
Some(w) => w,
None => {
self.connect(addr).await?
}
};
let mut w = writer.lock().await;
match w.write_all(data).await {
Ok(()) => {
self.stats.record_send(data.len());
trace!(
transport_id = %self.transport_id,
remote_addr = %addr,
bytes = data.len(),
"Tor packet sent"
);
Ok(data.len())
}
Err(e) => {
self.stats.record_send_error();
drop(w);
let mut pool = self.pool.lock().await;
if let Some(conn) = pool.remove(addr) {
conn.recv_task.abort();
}
Err(TransportError::SendFailed(format!("{}", e)))
}
}
}
async fn connect(
&self,
addr: &TransportAddr,
) -> Result<Arc<Mutex<OwnedWriteHalf>>, TransportError> {
let tor_addr = parse_tor_addr(addr)?;
let proxy_addr = self.config.socks5_addr();
let timeout_ms = self.config.connect_timeout_ms();
debug!(
transport_id = %self.transport_id,
remote_addr = %addr,
proxy = %proxy_addr,
timeout_secs = timeout_ms / 1000,
"Connecting via Tor SOCKS5"
);
let isolation_key = addr.to_string();
let connect_start = Instant::now();
let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
match &tor_addr {
TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
Socks5Stream::connect_with_password(
proxy_addr,
(host.as_str(), *port),
"fips",
&isolation_key,
)
.await
}
TorAddr::Clearnet(socket_addr) => {
Socks5Stream::connect_with_password(
proxy_addr,
*socket_addr,
"fips",
&isolation_key,
)
.await
}
}
})
.await;
let stream = match socks_result {
Ok(Ok(socks_stream)) => socks_stream.into_inner(),
Ok(Err(e)) => {
self.stats.record_socks5_error();
warn!(
transport_id = %self.transport_id,
remote_addr = %addr,
error = %e,
elapsed_secs = connect_start.elapsed().as_secs(),
"Tor SOCKS5 connection failed"
);
return Err(TransportError::ConnectionRefused);
}
Err(_) => {
self.stats.record_connect_timeout();
warn!(
transport_id = %self.transport_id,
remote_addr = %addr,
timeout_secs = timeout_ms / 1000,
"Tor SOCKS5 connection timed out"
);
return Err(TransportError::Timeout);
}
};
let std_stream = stream
.into_std()
.map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
configure_socket(&std_stream, &self.config)?;
let stream = TcpStream::from_std(std_stream)
.map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
let (read_half, write_half) = stream.into_split();
let writer = Arc::new(Mutex::new(write_half));
let transport_id = self.transport_id;
let packet_tx = self.packet_tx.clone();
let pool = self.pool.clone();
let recv_stats = self.stats.clone();
let remote_addr = addr.clone();
let mtu = self.config.mtu();
let recv_task = tokio::spawn(async move {
tor_receive_loop(
read_half,
transport_id,
remote_addr.clone(),
packet_tx,
pool,
mtu,
recv_stats,
)
.await;
});
let conn = TorConnection {
writer: writer.clone(),
recv_task,
mtu,
established_at: Instant::now(),
};
let mut pool = self.pool.lock().await;
pool.insert(addr.clone(), conn);
self.stats.record_connection_established();
info!(
transport_id = %self.transport_id,
remote_addr = %addr,
elapsed_secs = connect_start.elapsed().as_secs(),
"Tor circuit established via SOCKS5"
);
Ok(writer)
}
pub async fn connect_async(&self, addr: &TransportAddr) -> Result<(), TransportError> {
if !self.state.is_operational() {
return Err(TransportError::NotStarted);
}
{
let pool = self.pool.lock().await;
if pool.contains_key(addr) {
return Ok(());
}
}
{
let connecting = self.connecting.lock().await;
if connecting.contains_key(addr) {
return Ok(());
}
}
let tor_addr = parse_tor_addr(addr)?;
let proxy_addr = self.config.socks5_addr().to_string();
let timeout_ms = self.config.connect_timeout_ms();
let transport_id = self.transport_id;
let remote_addr = addr.clone();
let config = self.config.clone();
debug!(
transport_id = %transport_id,
remote_addr = %remote_addr,
timeout_ms,
"Initiating background Tor SOCKS5 connect"
);
let isolation_key = addr.to_string();
let task = tokio::spawn(async move {
let socks_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
match &tor_addr {
TorAddr::Onion(host, port) | TorAddr::ClearnetHostname(host, port) => {
Socks5Stream::connect_with_password(
proxy_addr.as_str(),
(host.as_str(), *port),
"fips",
&isolation_key,
)
.await
}
TorAddr::Clearnet(socket_addr) => {
Socks5Stream::connect_with_password(
proxy_addr.as_str(),
*socket_addr,
"fips",
&isolation_key,
)
.await
}
}
})
.await;
let stream = match socks_result {
Ok(Ok(socks_stream)) => socks_stream.into_inner(),
Ok(Err(e)) => {
debug!(
transport_id = %transport_id,
remote_addr = %remote_addr,
error = %e,
"Background Tor SOCKS5 connect failed"
);
return Err(TransportError::ConnectionRefused);
}
Err(_) => {
debug!(
transport_id = %transport_id,
remote_addr = %remote_addr,
"Background Tor SOCKS5 connect timed out"
);
return Err(TransportError::Timeout);
}
};
let std_stream = stream
.into_std()
.map_err(|e| TransportError::StartFailed(format!("into_std: {}", e)))?;
configure_socket(&std_stream, &config)?;
let mtu = config.mtu();
let stream = TcpStream::from_std(std_stream)
.map_err(|e| TransportError::StartFailed(format!("from_std: {}", e)))?;
Ok((stream, mtu))
});
let mut connecting = self.connecting.lock().await;
connecting.insert(addr.clone(), ConnectingEntry { task });
Ok(())
}
pub fn connection_state_sync(&self, addr: &TransportAddr) -> ConnectionState {
if let Ok(pool) = self.pool.try_lock() {
if pool.contains_key(addr) {
return ConnectionState::Connected;
}
} else {
return ConnectionState::Connecting; }
let mut connecting = match self.connecting.try_lock() {
Ok(c) => c,
Err(_) => return ConnectionState::Connecting,
};
let entry = match connecting.get_mut(addr) {
Some(e) => e,
None => return ConnectionState::None,
};
if !entry.task.is_finished() {
return ConnectionState::Connecting;
}
let addr_clone = addr.clone();
let task = connecting.remove(&addr_clone).unwrap().task;
match task.now_or_never() {
Some(Ok(Ok((stream, mtu)))) => {
self.promote_connection(addr, stream, mtu);
ConnectionState::Connected
}
Some(Ok(Err(e))) => ConnectionState::Failed(format!("{}", e)),
Some(Err(e)) => {
ConnectionState::Failed(format!("task failed: {}", e))
}
None => {
ConnectionState::Connecting
}
}
}
fn promote_connection(&self, addr: &TransportAddr, stream: TcpStream, mtu: u16) {
let (read_half, write_half) = stream.into_split();
let writer = Arc::new(Mutex::new(write_half));
let transport_id = self.transport_id;
let packet_tx = self.packet_tx.clone();
let pool = self.pool.clone();
let recv_stats = self.stats.clone();
let remote_addr = addr.clone();
let recv_task = tokio::spawn(async move {
tor_receive_loop(
read_half,
transport_id,
remote_addr.clone(),
packet_tx,
pool,
mtu,
recv_stats,
)
.await;
});
let conn = TorConnection {
writer,
recv_task,
mtu,
established_at: Instant::now(),
};
if let Ok(mut pool) = self.pool.try_lock() {
pool.insert(addr.clone(), conn);
self.stats.record_connection_established();
debug!(
transport_id = %self.transport_id,
remote_addr = %addr,
"Tor connection established (background connect)"
);
} else {
conn.recv_task.abort();
warn!(
transport_id = %self.transport_id,
remote_addr = %addr,
"Failed to promote Tor connection (pool locked)"
);
}
}
pub async fn close_connection_async(&self, addr: &TransportAddr) {
let mut pool = self.pool.lock().await;
if let Some(conn) = pool.remove(addr) {
conn.recv_task.abort();
debug!(
transport_id = %self.transport_id,
remote_addr = %addr,
"Tor connection closed"
);
}
}
}
impl Transport for TorTransport {
fn transport_id(&self) -> TransportId {
self.transport_id
}
fn transport_type(&self) -> &TransportType {
&TransportType::TOR
}
fn state(&self) -> TransportState {
self.state
}
fn mtu(&self) -> u16 {
self.config.mtu()
}
fn link_mtu(&self, _addr: &TransportAddr) -> u16 {
self.config.mtu()
}
fn start(&mut self) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use start_async() for Tor transport".into(),
))
}
fn stop(&mut self) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use stop_async() for Tor transport".into(),
))
}
fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
Err(TransportError::NotSupported(
"use send_async() for Tor transport".into(),
))
}
fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
Ok(Vec::new())
}
fn accept_connections(&self) -> bool {
self.onion_address.is_some()
}
}
async fn tor_receive_loop(
mut reader: tokio::net::tcp::OwnedReadHalf,
transport_id: TransportId,
remote_addr: TransportAddr,
packet_tx: PacketTx,
pool: ConnectionPool,
mtu: u16,
stats: Arc<TorStats>,
) {
debug!(
transport_id = %transport_id,
remote_addr = %remote_addr,
"Tor receive loop starting"
);
loop {
match read_fmp_packet(&mut reader, mtu).await {
Ok(data) => {
stats.record_recv(data.len());
trace!(
transport_id = %transport_id,
remote_addr = %remote_addr,
bytes = data.len(),
"Tor packet received"
);
let packet = ReceivedPacket::new(transport_id, remote_addr.clone(), data);
if packet_tx.send(packet).is_err() {
debug!(
transport_id = %transport_id,
"Packet channel closed, stopping Tor receive loop"
);
break;
}
}
Err(e) => {
stats.record_recv_error();
debug!(
transport_id = %transport_id,
remote_addr = %remote_addr,
error = %e,
"Tor receive error, removing connection"
);
break;
}
}
}
let mut pool_guard = pool.lock().await;
pool_guard.remove(&remote_addr);
debug!(
transport_id = %transport_id,
remote_addr = %remote_addr,
"Tor receive loop stopped"
);
}
fn configure_socket(
stream: &std::net::TcpStream,
_config: &TorConfig,
) -> Result<(), TransportError> {
let socket = socket2::SockRef::from(stream);
socket
.set_tcp_nodelay(true)
.map_err(|e| TransportError::StartFailed(format!("set nodelay: {}", e)))?;
let keepalive_secs = 30u64;
if keepalive_secs > 0 {
let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(keepalive_secs));
socket
.set_tcp_keepalive(&keepalive)
.map_err(|e| TransportError::StartFailed(format!("set keepalive: {}", e)))?;
}
Ok(())
}
async fn tor_accept_loop(
listener: TcpListener,
transport_id: TransportId,
packet_tx: PacketTx,
pool: ConnectionPool,
mtu: u16,
max_inbound: usize,
stats: Arc<TorStats>,
) {
debug!(
transport_id = %transport_id,
"Onion service accept loop starting"
);
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok(result) => result,
Err(e) => {
warn!(
transport_id = %transport_id,
error = %e,
"Onion service accept error"
);
continue;
}
};
let current_count = {
let pool_guard = pool.lock().await;
pool_guard.len()
};
if current_count >= max_inbound {
stats.record_connection_rejected();
debug!(
transport_id = %transport_id,
peer_addr = %peer_addr,
max_inbound,
"Rejecting inbound onion connection (limit reached)"
);
drop(stream);
continue;
}
let std_stream = match stream.into_std() {
Ok(s) => s,
Err(e) => {
warn!(
transport_id = %transport_id,
error = %e,
"Failed to convert accepted stream to std"
);
continue;
}
};
let socket = socket2::SockRef::from(&std_stream);
let _ = socket.set_tcp_nodelay(true);
let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(30));
let _ = socket.set_tcp_keepalive(&keepalive);
let stream = match TcpStream::from_std(std_stream) {
Ok(s) => s,
Err(e) => {
warn!(
transport_id = %transport_id,
error = %e,
"Failed to convert accepted stream back to tokio"
);
continue;
}
};
let remote_addr = TransportAddr::from_string(&peer_addr.to_string());
let (read_half, write_half) = stream.into_split();
let writer = Arc::new(Mutex::new(write_half));
let recv_pool = pool.clone();
let recv_stats = stats.clone();
let recv_addr = remote_addr.clone();
let recv_tx = packet_tx.clone();
let recv_task = tokio::spawn(async move {
tor_receive_loop(
read_half,
transport_id,
recv_addr,
recv_tx,
recv_pool,
mtu,
recv_stats,
)
.await;
});
let conn = TorConnection {
writer,
recv_task,
mtu,
established_at: Instant::now(),
};
{
let mut pool_guard = pool.lock().await;
pool_guard.insert(remote_addr.clone(), conn);
}
stats.record_connection_accepted();
debug!(
transport_id = %transport_id,
peer_addr = %peer_addr,
"Accepted inbound onion connection"
);
}
}
fn validate_host_port(addr: &str, field_name: &str) -> Result<(), TransportError> {
if addr.parse::<SocketAddr>().is_ok() {
return Ok(());
}
let parts: Vec<&str> = addr.rsplitn(2, ':').collect();
if parts.len() != 2 || parts[0].parse::<u16>().is_err() || parts[1].is_empty() {
return Err(TransportError::StartFailed(format!(
"invalid {} '{}': expected host:port or IP:port",
field_name, addr
)));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::packet_channel;
fn make_config() -> TorConfig {
TorConfig {
socks5_addr: Some("127.0.0.1:19050".to_string()),
..Default::default()
}
}
#[test]
fn test_parse_tor_addr_onion() {
let addr = TransportAddr::from_string("abcdef1234567890.onion:2121");
let tor_addr = parse_tor_addr(&addr).unwrap();
match tor_addr {
TorAddr::Onion(host, port) => {
assert_eq!(host, "abcdef1234567890.onion");
assert_eq!(port, 2121);
}
_ => panic!("expected Onion variant"),
}
}
#[test]
fn test_parse_tor_addr_clearnet() {
let addr = TransportAddr::from_string("192.168.1.1:8080");
let tor_addr = parse_tor_addr(&addr).unwrap();
match tor_addr {
TorAddr::Clearnet(socket_addr) => {
assert_eq!(
socket_addr,
"192.168.1.1:8080".parse::<SocketAddr>().unwrap()
);
}
_ => panic!("expected Clearnet variant"),
}
}
#[test]
fn test_parse_tor_addr_clearnet_hostname() {
let addr = TransportAddr::from_string("peer1.example.com:2121");
let tor_addr = parse_tor_addr(&addr).unwrap();
match tor_addr {
TorAddr::ClearnetHostname(host, port) => {
assert_eq!(host, "peer1.example.com");
assert_eq!(port, 2121);
}
_ => panic!("expected ClearnetHostname variant"),
}
}
#[test]
fn test_parse_tor_addr_invalid() {
let addr = TransportAddr::from_string("localhost:2121");
assert!(parse_tor_addr(&addr).is_err());
let addr = TransportAddr::from_string("not-a-valid-address");
assert!(parse_tor_addr(&addr).is_err());
let addr = TransportAddr::from_string("example.com:notaport");
assert!(parse_tor_addr(&addr).is_err());
}
#[test]
fn test_config_defaults() {
let config = TorConfig::default();
assert_eq!(config.mode(), "socks5");
assert_eq!(config.socks5_addr(), "127.0.0.1:9050");
assert_eq!(config.connect_timeout_ms(), 120000);
assert_eq!(config.mtu(), 1400);
assert_eq!(config.advertised_port(), 443);
}
#[test]
fn test_advertised_port_override() {
let config = TorConfig {
advertised_port: Some(9001),
..Default::default()
};
assert_eq!(config.advertised_port(), 9001);
}
#[test]
fn test_advert_address_round_trips_through_parser() {
let onion = "mwvj6q3pnsiaky7i6wg5s42xlfurt5uqr3qzckrlw2graa2ugcgwhiqd.onion";
let cfg = TorConfig::default();
let advertised = format!("{}:{}", onion, cfg.advertised_port());
let parsed = parse_tor_addr(&TransportAddr::from_string(&advertised)).unwrap();
match parsed {
TorAddr::Onion(host, port) => {
assert_eq!(host, onion);
assert_eq!(port, 443);
}
other => panic!("expected Onion variant, got {:?}", other),
}
assert!(parse_tor_addr(&TransportAddr::from_string(onion)).is_err());
}
#[tokio::test]
async fn test_start_stop() {
let (tx, _rx) = packet_channel(32);
let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
transport.start_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Up);
transport.stop_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Down);
}
#[tokio::test]
async fn test_double_start_fails() {
let (tx, _rx) = packet_channel(32);
let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
transport.start_async().await.unwrap();
assert!(transport.start_async().await.is_err());
}
#[tokio::test]
async fn test_stop_not_started_fails() {
let (tx, _rx) = packet_channel(32);
let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
assert!(transport.stop_async().await.is_err());
}
#[tokio::test]
async fn test_send_not_started() {
let (tx, _rx) = packet_channel(32);
let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
let addr = TransportAddr::from_string("127.0.0.1:2121");
let result = transport.send_async(&addr, &[0u8; 10]).await;
assert!(result.is_err());
}
#[test]
fn test_transport_type() {
let (tx, _rx) = packet_channel(32);
let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
let tt = transport.transport_type();
assert_eq!(tt.name, "tor");
assert!(tt.connection_oriented);
assert!(tt.reliable);
}
#[test]
fn test_sync_methods_return_not_supported() {
let (tx, _rx) = packet_channel(32);
let mut transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
assert!(transport.start().is_err());
assert!(transport.stop().is_err());
let addr = TransportAddr::from_string("127.0.0.1:2121");
assert!(transport.send(&addr, &[0u8; 10]).is_err());
}
#[test]
fn test_accept_connections_false() {
let (tx, _rx) = packet_channel(32);
let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
assert!(!transport.accept_connections());
}
#[test]
fn test_discover_returns_empty() {
let (tx, _rx) = packet_channel(32);
let transport = TorTransport::new(TransportId::new(1), None, make_config(), tx);
assert!(transport.discover().unwrap().is_empty());
}
#[tokio::test]
async fn test_invalid_socks5_addr_start_fails() {
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
socks5_addr: Some("not-a-socket-addr".to_string()),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
assert!(transport.start_async().await.is_err());
}
#[tokio::test]
async fn test_unsupported_mode_start_fails() {
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("embedded".to_string()),
socks5_addr: Some("127.0.0.1:9050".to_string()),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
assert!(transport.start_async().await.is_err());
}
use crate::config::TcpConfig;
use crate::transport::tcp::TcpTransport;
use mock_socks5::MockSocks5Server;
const MSG1_WIRE_SIZE: usize = 114;
const MSG1_PAYLOAD_LEN: u16 = (MSG1_WIRE_SIZE - 4) as u16;
fn build_msg1_frame() -> Vec<u8> {
let mut frame = vec![0xAA; MSG1_WIRE_SIZE];
frame[0] = 0x01; frame[1] = 0x00; frame[2..4].copy_from_slice(&MSG1_PAYLOAD_LEN.to_le_bytes());
frame
}
#[tokio::test]
async fn test_send_recv_via_socks5() {
let (dest_tx, mut dest_rx) = packet_channel(32);
let dest_config = TcpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
};
let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
dest.start_async().await.unwrap();
let dest_addr = dest.local_addr().unwrap();
let mock = MockSocks5Server::new(dest_addr).await.unwrap();
let proxy_addr = mock.addr();
let _proxy_handle = mock.spawn();
let (tor_tx, _tor_rx) = packet_channel(32);
let tor_config = TorConfig {
socks5_addr: Some(proxy_addr.to_string()),
..Default::default()
};
let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
tor.start_async().await.unwrap();
let frame = build_msg1_frame();
let target = TransportAddr::from_string(&dest_addr.to_string());
tor.send_async(&target, &frame).await.unwrap();
let received = tokio::time::timeout(Duration::from_secs(5), dest_rx.recv())
.await
.expect("timeout waiting for packet")
.expect("channel closed");
assert_eq!(received.data, frame);
tor.stop_async().await.unwrap();
dest.stop_async().await.unwrap();
}
#[tokio::test]
async fn test_socks5_proxy_down() {
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
socks5_addr: Some("127.0.0.1:19999".to_string()),
connect_timeout_ms: Some(2000),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
transport.start_async().await.unwrap();
let addr = TransportAddr::from_string("192.168.1.1:2121");
let result = transport.send_async(&addr, &build_msg1_frame()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_connect_timeout() {
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
socks5_addr: Some("192.0.2.1:9050".to_string()),
connect_timeout_ms: Some(500),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
transport.start_async().await.unwrap();
let addr = TransportAddr::from_string("10.0.0.1:2121");
let result = transport.send_async(&addr, &build_msg1_frame()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_close_connection() {
let (dest_tx, _dest_rx) = packet_channel(32);
let dest_config = TcpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
};
let mut dest = TcpTransport::new(TransportId::new(100), None, dest_config, dest_tx);
dest.start_async().await.unwrap();
let dest_addr = dest.local_addr().unwrap();
let mock = MockSocks5Server::new(dest_addr).await.unwrap();
let proxy_addr = mock.addr();
let _proxy_handle = mock.spawn();
let (tor_tx, _tor_rx) = packet_channel(32);
let tor_config = TorConfig {
socks5_addr: Some(proxy_addr.to_string()),
..Default::default()
};
let mut tor = TorTransport::new(TransportId::new(200), None, tor_config, tor_tx);
tor.start_async().await.unwrap();
let target = TransportAddr::from_string(&dest_addr.to_string());
tor.send_async(&target, &build_msg1_frame()).await.unwrap();
{
let pool = tor.pool.lock().await;
assert_eq!(pool.len(), 1);
}
tor.close_connection_async(&target).await;
{
let pool = tor.pool.lock().await;
assert_eq!(pool.len(), 0);
}
tor.stop_async().await.unwrap();
dest.stop_async().await.unwrap();
}
use mock_control::MockTorControlServer;
#[tokio::test]
async fn test_control_port_start_stop() {
let mock = MockTorControlServer::start().await;
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("control_port".to_string()),
socks5_addr: Some("127.0.0.1:19050".to_string()),
control_addr: Some(mock.addr().to_string()),
control_auth: Some("password:testpass".to_string()),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
transport.start_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Up);
assert!(transport.onion_address().is_none());
assert!(!transport.accept_connections());
transport.stop_async().await.unwrap();
}
#[tokio::test]
async fn test_config_defaults_phase2() {
let config = TorConfig::default();
assert_eq!(config.control_addr(), "/run/tor/control");
assert_eq!(config.control_auth(), "cookie");
assert_eq!(config.cookie_path(), "/var/run/tor/control.authcookie");
assert_eq!(config.max_inbound_connections(), 64);
}
use crate::config::DirectoryServiceConfig;
use tempfile::TempDir;
#[test]
fn test_directory_service_config_defaults() {
let config = DirectoryServiceConfig::default();
assert_eq!(
config.hostname_file(),
"/var/lib/tor/fips_onion_service/hostname"
);
assert_eq!(config.bind_addr(), "127.0.0.1:8443");
}
#[tokio::test]
async fn test_directory_mode_start_stop() {
let dir = TempDir::new().unwrap();
let hostname_path = dir.path().join("hostname");
std::fs::write(
&hostname_path,
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
)
.unwrap();
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("directory".to_string()),
socks5_addr: Some("127.0.0.1:19050".to_string()),
directory_service: Some(DirectoryServiceConfig {
hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
bind_addr: Some("127.0.0.1:0".to_string()),
}),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
transport.start_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Up);
assert_eq!(
transport.onion_address(),
Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion"),
);
assert!(transport.accept_connections());
transport.stop_async().await.unwrap();
assert_eq!(transport.state(), TransportState::Down);
}
#[tokio::test]
async fn test_directory_mode_missing_hostname_file() {
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("directory".to_string()),
socks5_addr: Some("127.0.0.1:19050".to_string()),
directory_service: Some(DirectoryServiceConfig {
hostname_file: Some("/nonexistent/hostname".to_string()),
bind_addr: Some("127.0.0.1:0".to_string()),
}),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
let result = transport.start_async().await;
assert!(result.is_err());
let err = format!("{}", result.unwrap_err());
assert!(err.contains("hostname"));
}
#[tokio::test]
async fn test_directory_mode_invalid_hostname() {
let dir = TempDir::new().unwrap();
let hostname_path = dir.path().join("hostname");
std::fs::write(&hostname_path, "not-an-onion-address\n").unwrap();
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("directory".to_string()),
socks5_addr: Some("127.0.0.1:19050".to_string()),
directory_service: Some(DirectoryServiceConfig {
hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
bind_addr: Some("127.0.0.1:0".to_string()),
}),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
let result = transport.start_async().await;
assert!(result.is_err());
let err = format!("{}", result.unwrap_err());
assert!(err.contains("invalid onion address"));
}
#[tokio::test]
async fn test_directory_mode_accept_inbound() {
let dir = TempDir::new().unwrap();
let hostname_path = dir.path().join("hostname");
std::fs::write(
&hostname_path,
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2.onion\n",
)
.unwrap();
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("directory".to_string()),
socks5_addr: Some("127.0.0.1:19050".to_string()),
directory_service: Some(DirectoryServiceConfig {
hostname_file: Some(hostname_path.to_str().unwrap().to_string()),
bind_addr: Some("127.0.0.1:0".to_string()),
}),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
transport.start_async().await.unwrap();
assert!(transport.accept_connections());
transport.stop_async().await.unwrap();
}
#[tokio::test]
async fn test_socks5_mode_rejects_directory_service_config() {
let (tx, _rx) = packet_channel(32);
let config = TorConfig {
mode: Some("socks5".to_string()),
socks5_addr: Some("127.0.0.1:9050".to_string()),
directory_service: Some(DirectoryServiceConfig::default()),
..Default::default()
};
let mut transport = TorTransport::new(TransportId::new(1), None, config, tx);
let result = transport.start_async().await;
assert!(result.is_err());
let err = format!("{}", result.unwrap_err());
assert!(err.contains("directory"));
}
}