use std::sync::Arc;
use std::{fmt::Display, time::Duration};
use arti_client::config::TorClientConfigBuilder;
use arti_client::{TorClient, TorClientConfig};
use futures::lock::Mutex;
use futures::{Stream, StreamExt};
use safelog::DisplayRedacted;
use tor_cell::relaycell::msg::Connected;
use tor_hsservice::config::OnionServiceConfigBuilder;
use tor_hsservice::{RunningOnionService, StreamRequest};
use tor_proto::client::stream::IncomingStreamRequest;
use tor_rtcompat::PreferredRuntime;
use crate::actors::remote::netlayer::{AsyncMsgStream, NetLayer};
use crate::utils;
#[allow(missing_debug_implementations)]
pub struct TorLayer {
client: TorClient<PreferredRuntime>,
nickname: String,
port: Option<u16>,
address: Option<String>,
service: Option<Arc<RunningOnionService>>,
stream: Option<Arc<Mutex<Box<dyn Stream<Item = StreamRequest> + Send + Unpin>>>>,
}
#[derive(Debug)]
pub struct TorLayerConfig {
port: u16,
directories: Option<TorLayerDirectories>,
}
impl TorLayerConfig {
pub fn new(port: u16, directories: TorLayerDirectories) -> Self {
Self {
port,
directories: Some(directories),
}
}
pub fn new_from_port(port: u16) -> Self {
Self {
port,
directories: None,
}
}
}
#[derive(Debug, Default)]
pub struct TorLayerDirectories {
data_dir: String,
cache_dir: String,
}
impl TorLayerDirectories {
pub fn new(data_dir: String, cache_dir: String) -> Self {
Self {
data_dir,
cache_dir,
}
}
}
impl TorLayer {
pub async fn new(nickname: String, layer_config: TorLayerConfig) -> Result<Self, Error> {
let conf = if let Some(TorLayerDirectories {
data_dir,
cache_dir,
}) = layer_config.directories
{
TorClientConfigBuilder::from_directories(data_dir, cache_dir)
.build()
.map_err(|e| Error::Bootstrap(e.to_string()))?
} else {
TorClientConfig::default()
};
let client = TorClient::create_bootstrapped(conf)
.await
.map_err(|e| Error::Bootstrap(e.to_string()))?;
Ok(Self {
client,
nickname,
port: Some(layer_config.port),
address: None,
service: None,
stream: None,
})
}
pub async fn new_for_client(nickname: String) -> Result<Self, Error> {
let client = TorClient::create_bootstrapped(TorClientConfig::default())
.await
.map_err(|e| Error::Bootstrap(e.to_string()))?;
Ok(Self {
client,
nickname,
port: None,
address: None,
service: None,
stream: None,
})
}
}
impl NetLayer for TorLayer {
type Error = Error;
fn name() -> &'static str {
"tor"
}
async fn connect(&self, addr: &str) -> Result<impl AsyncMsgStream, Self::Error> {
self.client
.connect(addr)
.await
.map_err(|e| Error::Connect(e.to_string()))
}
async fn init(&mut self) -> Result<(), Self::Error> {
let service_config = OnionServiceConfigBuilder::default()
.nickname(
self.nickname
.parse()
.map_err(|_| Error::Init("invalid nickname".to_string()))?,
)
.build()
.map_err(|e| Error::Init(e.to_string()))?;
let (service, requests_stream) = self
.client
.launch_onion_service(service_config)
.map_err(|e| Error::Init(e.to_string()))?
.ok_or(Error::Init("could not launch onion service".to_string()))?;
let status_stream = service.status_events();
let mut binding = status_stream
.filter(|status| futures::future::ready(status.state().is_fully_reachable()));
match tokio::time::timeout(Duration::from_secs(60), binding.next()).await {
Ok(Some(_)) => tracing::info!("onion service is fully reachable."),
Ok(None) => tracing::warn!("status stream ended unexpectedly."),
Err(_) => tracing::warn!(
"timeout waiting for service to become reachable. actor may or may not receive messages."
),
};
if self.port.is_none() {
let port = self.port.unwrap_or(
utils::random_unused_port()
.await
.map_err(|e| Error::Hostname(e.to_string()))?,
);
self.port.replace(port);
}
let redacted = service.onion_address().ok_or(Error::Init(
"failed to query our own onion address".to_string(),
))?;
let address = format!(
"{}:{}",
redacted.display_unredacted(),
self.port.expect("valid port should be set")
);
let requests_stream = tor_hsservice::handle_rend_requests(requests_stream);
self.service.replace(service);
self.stream
.replace(Arc::new(Mutex::new(Box::new(requests_stream))));
self.address.replace(address);
Ok(())
}
async fn accept(&self) -> Result<impl AsyncMsgStream, Self::Error> {
loop {
if let Some(stream) = &self.stream {
if let Some(request) = stream.lock().await.next().await {
match request.request() {
IncomingStreamRequest::Begin(begin)
if begin.port() == self.port.expect("valid port should be set") =>
{
return request
.accept(Connected::new_empty())
.await
.map_err(|e| Error::Accept(e.to_string()));
}
_ => {
let _ = request.shutdown_circuit();
continue;
}
}
} else {
return Err(Error::NotReady);
}
} else {
return Err(Error::NotReady);
}
}
}
async fn address(&self) -> Result<String, Self::Error> {
self.address.to_owned().ok_or(Error::NotReady)
}
}
#[allow(missing_docs)]
#[derive(Debug)]
pub enum Error {
Bootstrap(String),
Init(String),
Accept(String),
Connect(String),
Hostname(String),
NotReady,
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Init(ctx) => write!(f, "failed to init layer: {ctx}"),
Error::Accept(ctx) => write!(f, "failed to receive data: {ctx}"),
Error::Connect(ctx) => write!(f, "failed to connect to endpoint: {ctx}"),
Error::Hostname(ctx) => write!(f, "failed to recover our hostname: {ctx}"),
Error::Bootstrap(ctx) => write!(f, "failed to connect to Tor network: {ctx}"),
Error::NotReady => write!(f, "layer not ready"),
}
}
}
impl std::error::Error for Error {}