#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc = include_str!("../README.md")]
#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
#![warn(noop_method_call)]
#![warn(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![deny(clippy::print_stderr)]
#![deny(clippy::print_stdout)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unchecked_time_subtraction)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::mod_module_files)]
#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
pub mod config;
pub mod err;
#[cfg(feature = "managed-pts")]
pub mod ipc;
#[cfg(feature = "managed-pts")]
mod managed;
use crate::config::{TransportConfig, TransportOptions};
use crate::err::PtError;
use oneshot_fused_workaround as oneshot;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tor_chanmgr::ProxyProtocol;
use tor_config_path::CfgPathResolver;
use tor_linkspec::PtTransportName;
use tor_rtcompat::Runtime;
use tor_socksproto::SocksVersion;
#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
use tracing::info;
use tracing::warn;
#[cfg(feature = "managed-pts")]
use {
crate::managed::{PtReactor, PtReactorMessage},
futures::channel::mpsc::{self, UnboundedSender},
tor_error::error_report,
tor_rtcompat::SpawnExt,
};
#[cfg(feature = "tor-channel-factory")]
use {
async_trait::async_trait,
tor_chanmgr::{
builder::ChanBuilder,
factory::{AbstractPtError, ChannelFactory},
transport::ExternalProxyPlugin,
},
tracing::trace,
};
#[derive(Default, Debug)]
struct PtSharedState {
#[allow(dead_code)]
managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
configured: HashMap<PtTransportName, TransportOptions>,
outbound_proxy: Option<ProxyProtocol>,
}
pub struct PtMgr<R> {
#[allow(dead_code)]
runtime: R,
state: Arc<RwLock<PtSharedState>>,
#[cfg(feature = "managed-pts")]
tx: UnboundedSender<PtReactorMessage>,
}
impl<R: Runtime> PtMgr<R> {
fn transform_config(
binaries: Vec<TransportConfig>,
) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
let mut ret = HashMap::new();
for thing in binaries {
for tn in thing.protocols.iter() {
ret.insert(tn.clone(), thing.clone().try_into()?);
}
}
for opt in ret.values() {
if let TransportOptions::Unmanaged(u) = opt {
if !u.is_localhost() {
warn!(
"Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only."
);
}
}
}
Ok(ret)
}
pub fn new(
transports: Vec<TransportConfig>,
#[allow(unused)] state_dir: PathBuf,
path_resolver: Arc<CfgPathResolver>,
outbound_proxy: Option<ProxyProtocol>,
rt: R,
) -> Result<Self, PtError> {
let state = PtSharedState {
managed_cmethods: Default::default(),
configured: Self::transform_config(transports)?,
outbound_proxy,
};
let state = Arc::new(RwLock::new(state));
#[cfg(feature = "managed-pts")]
let tx = {
let (tx, rx) = mpsc::unbounded();
let mut reactor =
PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
rt.spawn(async move {
loop {
match reactor.run_one_step().await {
Ok(true) => return,
Ok(false) => {}
Err(e) => {
error_report!(e, "PtReactor failed");
return;
}
}
}
})
.map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
tx
};
Ok(Self {
runtime: rt,
state,
#[cfg(feature = "managed-pts")]
tx,
})
}
pub fn reconfigure(
&self,
how: tor_config::Reconfigure,
transports: Vec<TransportConfig>,
outbound_proxy: Option<ProxyProtocol>,
) -> Result<(), tor_config::ReconfigureError> {
let configured = Self::transform_config(transports)?;
if how == tor_config::Reconfigure::CheckAllOrNothing {
return Ok(());
}
{
let mut inner = self.state.write().expect("ptmgr poisoned");
inner.configured = configured;
inner.outbound_proxy = outbound_proxy;
}
#[cfg(feature = "managed-pts")]
let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
Ok(())
}
#[cfg(feature = "tor-channel-factory")]
async fn get_cmethod_for_transport(
&self,
transport: &PtTransportName,
) -> Result<Option<PtClientMethod>, PtError> {
#[allow(unused)]
let (cfg, managed_cmethod) = {
let inner = self.state.read().expect("ptmgr poisoned");
let cfg = inner.configured.get(transport);
let managed_cmethod = inner.managed_cmethods.get(transport);
(cfg.cloned(), managed_cmethod.cloned())
};
match cfg {
Some(TransportOptions::Unmanaged(cfg)) => {
let cmethod = cfg.cmethod();
trace!(
"Found configured unmanaged transport {transport} accessible via {cmethod:?}"
);
Ok(Some(cmethod))
}
#[cfg(feature = "managed-pts")]
Some(TransportOptions::Managed(_cfg)) => {
match managed_cmethod {
Some(cmethod) => {
trace!(
"Found configured managed transport {transport} accessible via {cmethod:?}"
);
Ok(Some(cmethod))
}
None => {
Ok(Some(self.spawn_transport(transport).await?))
}
}
}
None => {
trace!("Got a request for transport {transport}, which is not configured.");
Ok(None)
}
}
}
#[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
async fn spawn_transport(
&self,
transport: &PtTransportName,
) -> Result<PtClientMethod, PtError> {
info!(
"Got a request for transport {transport}, which is not currently running. Launching it."
);
let (tx, rx) = oneshot::channel();
self.tx
.unbounded_send(PtReactorMessage::Spawn {
pt: transport.clone(),
result: tx,
})
.map_err(|_| {
PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
})?;
let method = match rx.await {
Err(_) => {
return Err(PtError::Internal(tor_error::internal!(
"PT reactor closed unexpectedly"
)));
}
Ok(Err(e)) => {
warn!("PT for {transport} failed to launch: {e}");
return Err(e);
}
Ok(Ok(method)) => method,
};
info!("Successfully launched PT for {transport} at {method:?}.");
Ok(method)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PtClientMethod {
pub(crate) kind: SocksVersion,
pub(crate) endpoint: SocketAddr,
}
impl PtClientMethod {
pub fn kind(&self) -> SocksVersion {
self.kind
}
pub fn endpoint(&self) -> SocketAddr {
self.endpoint
}
}
#[cfg(feature = "tor-channel-factory")]
#[async_trait]
impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
async fn factory_for_transport(
&self,
transport: &PtTransportName,
) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
let cmethod = match self.get_cmethod_for_transport(transport).await {
Err(e) => return Err(Arc::new(e)),
Ok(None) => return Ok(None),
Ok(Some(m)) => m,
};
let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
let factory = ChanBuilder::new_client(self.runtime.clone(), proxy);
Ok(Some(Arc::new(factory)))
}
}