#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc = include_str!("../README.md")]
#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
#![warn(noop_method_call)]
#![warn(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![deny(clippy::print_stderr)]
#![deny(clippy::print_stdout)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unchecked_time_subtraction)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::mod_module_files)]
#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
pub mod builder;
mod config;
mod err;
mod event;
pub mod factory;
mod mgr;
#[cfg(test)]
mod testing;
pub mod transport;
use futures::StreamExt;
use futures::select_biased;
use std::result::Result as StdResult;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tor_config::ReconfigureError;
use tor_error::error_report;
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_netdir::{NetDirProvider, params::NetParameters};
use tor_proto::channel::Channel;
#[cfg(feature = "experimental-api")]
use tor_proto::memquota::ChannelAccount;
use tor_proto::memquota::ToplevelAccount;
use tor_rtcompat::SpawnExt;
use tracing::debug;
use tracing::instrument;
use void::{ResultVoidErrExt, Void};
#[cfg(feature = "relay")]
use {
async_trait::async_trait, safelog::Sensitive, tor_proto::relay::CreateRequestHandler,
tor_proto::relay::channel_provider::ChannelProvider,
};
pub use err::Error;
pub use config::{ChannelConfig, ChannelConfigBuilder, ProxyProtocol};
pub use mgr::ChanMgrConfig;
use tor_rtcompat::Runtime;
pub type Result<T> = std::result::Result<T, Error>;
use crate::factory::BootstrapReporter;
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
pub struct ChanMgr<R: Runtime> {
mgr: mgr::AbstractChanMgr<
factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
>,
bootstrap_status: event::ConnStatusEvents,
#[allow(unused)] runtime: R,
}
#[non_exhaustive]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ChanProvenance {
NewlyCreated,
Preexisting,
}
#[non_exhaustive]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub enum Dormancy {
#[default]
Active,
Dormant,
}
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
#[non_exhaustive]
pub enum ChannelUsage {
Dir,
UserTraffic,
UselessCircuit,
}
impl<R: Runtime> ChanMgr<R> {
pub fn new(
runtime: R,
config: ChanMgrConfig,
dormancy: Dormancy,
netparams: &NetParameters,
memquota: ToplevelAccount,
) -> Result<Self>
where
R: 'static,
{
let (sender, receiver) = event::channel();
let sender = Arc::new(std::sync::Mutex::new(sender));
let reporter = BootstrapReporter(sender);
let transport =
transport::DefaultTransport::new(runtime.clone(), config.cfg.outbound_proxy.clone());
cfg_if::cfg_if! {
if #[cfg(feature = "relay")] {
let builder = if let Some(auth_material) = &config.auth_material {
builder::ChanBuilder::new_relay(runtime.clone(), transport, auth_material.clone(), config.my_addrs, None)?
} else {
builder::ChanBuilder::new_client(runtime.clone(), transport)
};
} else {
let builder = builder::ChanBuilder::new_client(runtime.clone(), transport);
}
};
let factory = factory::CompoundFactory::new(
Arc::new(builder),
#[cfg(feature = "pt-client")]
None,
);
if let Some(ref proxy) = config.cfg.outbound_proxy {
if !proxy.is_loopback() {
tracing::warn!(
proxy_addr = %proxy,
"outbound_proxy is configured to a non-loopback address; \
this may expose Tor traffic to an untrusted intermediate"
);
}
}
let mgr =
mgr::AbstractChanMgr::new(factory, config.cfg, dormancy, netparams, reporter, memquota);
Ok(ChanMgr {
mgr,
bootstrap_status: receiver,
runtime,
})
}
#[instrument(level = "trace", skip_all)]
pub fn launch_background_tasks(
self: &Arc<Self>,
runtime: &R,
netdir: Arc<dyn NetDirProvider>,
) -> Result<Vec<TaskHandle>> {
runtime
.spawn(Self::continually_update_channels_config(
Arc::downgrade(self),
netdir,
))
.map_err(|e| Error::from_spawn("channels config task", e))?;
let (sched, handle) = TaskSchedule::new(runtime.clone());
runtime
.spawn(Self::continually_expire_channels(
sched,
Arc::downgrade(self),
))
.map_err(|e| Error::from_spawn("channel expiration task", e))?;
Ok(vec![handle])
}
#[cfg(feature = "relay")]
pub async fn handle_incoming(
&self,
src: Sensitive<std::net::SocketAddr>,
stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
) -> Result<Arc<Channel>> {
self.mgr.handle_incoming(src, stream).await
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch<T: ChanTarget + ?Sized>(
&self,
target: &T,
usage: ChannelUsage,
) -> Result<(Arc<Channel>, ChanProvenance)> {
let targetinfo = OwnedChanTarget::from_chan_target(target);
let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
chan.check_match(target)
.map_err(|e| Error::from_proto_no_skew(e, target))?;
Ok((chan, provenance))
}
pub fn bootstrap_events(&self) -> ConnStatusEvents {
self.bootstrap_status.clone()
}
pub fn expire_channels(&self) -> Duration {
self.mgr.expire_channels()
}
pub fn set_dormancy(
&self,
dormancy: Dormancy,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
self.mgr.set_dormancy(dormancy, netparams)
}
pub fn reconfigure(
&self,
config: &ChannelConfig,
how: tor_config::Reconfigure,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), ReconfigureError> {
if how == tor_config::Reconfigure::CheckAllOrNothing {
return Ok(());
}
let r = self.mgr.reconfigure(config, netparams);
let _: Option<&tor_error::Bug> = r.as_ref().err();
Ok(r?)
}
#[cfg(feature = "pt-client")]
pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
}
#[cfg(feature = "relay")]
pub fn set_relay_auth_material(
&self,
auth_material: Arc<tor_proto::RelayChannelAuthMaterial>,
) -> Result<()> {
let mut result = Ok(());
self.mgr.with_mut_builder(|f| {
match f
.default_factory()
.rebuild_with_auth_material(auth_material)
{
Ok(b) => f.replace_default_factory(Arc::new(b)),
Err(e) => result = Err(e),
}
});
result
}
#[cfg(feature = "relay")]
pub fn set_create_request_handler(&self, handler: Arc<CreateRequestHandler>) -> Result<()> {
let mut result = Ok(());
self.mgr.with_mut_builder(|f| {
match f
.default_factory()
.rebuild_with_create_request_handler(handler)
{
Ok(b) => f.replace_default_factory(Arc::new(b)),
Err(e) => result = Err(e),
}
});
result
}
#[cfg(feature = "experimental-api")]
#[instrument(level = "trace", skip_all)]
pub async fn build_unmanaged_channel(
&self,
target: impl tor_linkspec::IntoOwnedChanTarget,
memquota: ChannelAccount,
) -> Result<Arc<Channel>> {
use factory::ChannelFactory as _;
let target = target.to_owned();
self.mgr
.channels
.builder()
.connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
.await
}
#[instrument(level = "trace", skip_all)]
async fn continually_update_channels_config(
self_: Weak<Self>,
netdir: Arc<dyn NetDirProvider>,
) {
use tor_netdir::DirEvent as DE;
let mut netdir_stream = netdir.events().fuse();
let netdir = {
let weak = Arc::downgrade(&netdir);
drop(netdir);
weak
};
let termination_reason: std::result::Result<Void, &str> = async move {
loop {
select_biased! {
direvent = netdir_stream.next() => {
let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
if ! matches!(direvent, DE::NewConsensus) { continue };
let self_ = self_.upgrade().ok_or("channel manager gone away")?;
let netdir = netdir.upgrade().ok_or("netdir gone away")?;
let netparams = netdir.params();
self_.mgr.update_netparams(netparams).map_err(|e| {
error_report!(e, "continually_update_channels_config: failed to process!");
"error processing netdir"
})?;
}
}
}
}
.await;
debug!(
"continually_update_channels_config: shutting down: {}",
termination_reason.void_unwrap_err()
);
}
#[instrument(level = "trace", skip_all)]
async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
while sched.next().await.is_some() {
let Some(cm) = Weak::upgrade(&chanmgr) else {
return;
};
let delay = cm.expire_channels();
sched.fire_in(delay);
}
}
}
#[cfg(feature = "relay")]
#[async_trait]
impl<R: Runtime> ChannelProvider for ChanMgr<R> {
type BuildSpec = OwnedChanTarget;
fn get_or_launch(
self: Arc<Self>,
reactor_id: tor_proto::circuit::UniqId,
target: Self::BuildSpec,
tx: tor_proto::relay::channel_provider::OutboundChanSender,
) -> tor_proto::Result<()> {
use tor_error::into_internal;
debug!("Get or launch channel to {target} for circuit reactor {reactor_id}");
let chanmgr = self.clone();
self.runtime
.spawn(async move {
let r = chanmgr
.mgr
.get_or_launch(target, ChannelUsage::UserTraffic)
.await
.map_err(|e| tor_proto::Error::ChanProto(e.to_string())); tx.send(r.map(|(chan, _)| chan));
})
.map_err(into_internal!("Failed to launch channel provider task"))?;
Ok(())
}
}