#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
#![doc = include_str!("../README.md")]
#![cfg_attr(not(ci_arti_stable), allow(renamed_and_removed_lints))]
#![cfg_attr(not(ci_arti_nightly), allow(unknown_lints))]
#![deny(missing_docs)]
#![warn(noop_method_call)]
#![deny(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![deny(clippy::missing_panics_doc)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![allow(clippy::let_unit_value)] #![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)]
pub mod builder;
mod config;
mod err;
mod event;
pub mod factory;
mod mgr;
#[cfg(test)]
mod testing;
pub mod transport;
use educe::Educe;
use futures::select_biased;
use futures::task::SpawnExt;
use futures::StreamExt;
use std::result::Result as StdResult;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tor_config::ReconfigureError;
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_netdir::{params::NetParameters, NetDirProvider};
use tor_proto::channel::Channel;
use tracing::{debug, error};
use void::{ResultVoidErrExt, Void};
pub use err::Error;
pub use config::{ChannelConfig, ChannelConfigBuilder};
use tor_rtcompat::Runtime;
pub type Result<T> = std::result::Result<T, Error>;
use crate::factory::BootstrapReporter;
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
pub struct ChanMgr<R: Runtime> {
mgr: mgr::AbstractChanMgr<factory::CompoundFactory>,
bootstrap_status: event::ConnStatusEvents,
runtime: std::marker::PhantomData<fn(R) -> R>,
}
#[non_exhaustive]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ChanProvenance {
NewlyCreated,
Preexisting,
}
#[non_exhaustive]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Educe)]
#[educe(Default)]
pub enum Dormancy {
#[educe(Default)]
Active,
Dormant,
}
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
#[non_exhaustive]
pub enum ChannelUsage {
Dir,
UserTraffic,
UselessCircuit,
}
impl<R: Runtime> ChanMgr<R> {
pub fn new(
runtime: R,
config: &ChannelConfig,
dormancy: Dormancy,
netparams: &NetParameters,
) -> Self
where
R: 'static,
{
let (sender, receiver) = event::channel();
let sender = Arc::new(std::sync::Mutex::new(sender));
let reporter = BootstrapReporter(sender);
let transport = transport::DefaultTransport::new(runtime.clone());
let builder = builder::ChanBuilder::new(runtime, transport);
let factory = factory::CompoundFactory::new(
Arc::new(builder),
#[cfg(feature = "pt-client")]
None,
);
let mgr = mgr::AbstractChanMgr::new(factory, config, dormancy, netparams, reporter);
ChanMgr {
mgr,
bootstrap_status: receiver,
runtime: std::marker::PhantomData,
}
}
pub fn launch_background_tasks(
self: &Arc<Self>,
runtime: &R,
netdir: Arc<dyn NetDirProvider>,
) -> Result<Vec<TaskHandle>> {
runtime
.spawn(Self::continually_update_channels_config(
Arc::downgrade(self),
netdir,
))
.map_err(|e| Error::from_spawn("channels config task", e))?;
let (sched, handle) = TaskSchedule::new(runtime.clone());
runtime
.spawn(Self::continually_expire_channels(
sched,
Arc::downgrade(self),
))
.map_err(|e| Error::from_spawn("channel expiration task", e))?;
Ok(vec![handle])
}
pub async fn get_or_launch<T: ChanTarget + ?Sized>(
&self,
target: &T,
usage: ChannelUsage,
) -> Result<(Channel, ChanProvenance)> {
let targetinfo = OwnedChanTarget::from_chan_target(target);
let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
chan.check_match(target)
.map_err(|e| Error::from_proto_no_skew(e, target))?;
Ok((chan, provenance))
}
pub fn bootstrap_events(&self) -> ConnStatusEvents {
self.bootstrap_status.clone()
}
pub fn expire_channels(&self) -> Duration {
self.mgr.expire_channels()
}
pub fn set_dormancy(
&self,
dormancy: Dormancy,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
self.mgr.set_dormancy(dormancy, netparams)
}
pub fn reconfigure(
&self,
config: &ChannelConfig,
how: tor_config::Reconfigure,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), ReconfigureError> {
let r = self.mgr.reconfigure(config, netparams);
let _ = how;
let _: Option<&tor_error::Bug> = r.as_ref().err();
Ok(r?)
}
#[cfg(feature = "pt-client")]
pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
}
async fn continually_update_channels_config(
self_: Weak<Self>,
netdir: Arc<dyn NetDirProvider>,
) {
use tor_netdir::DirEvent as DE;
let mut netdir_stream = netdir.events().fuse();
let netdir = {
let weak = Arc::downgrade(&netdir);
drop(netdir);
weak
};
let termination_reason: std::result::Result<Void, &str> = async move {
loop {
select_biased! {
direvent = netdir_stream.next() => {
let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
if ! matches!(direvent, DE::NewConsensus) { continue };
let self_ = self_.upgrade().ok_or("channel manager gone away")?;
let netdir = netdir.upgrade().ok_or("netdir gone away")?;
let netparams = netdir.params();
self_.mgr.update_netparams(netparams).map_err(|e| {
error!("continually_update_channels_config: failed to process! {} {:?}",
&e, &e);
"error processing netdir"
})?;
}
}
}
}
.await;
debug!(
"continually_update_channels_config: shutting down: {}",
termination_reason.void_unwrap_err()
);
}
async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
while sched.next().await.is_some() {
let delay = if let Some(cm) = Weak::upgrade(&chanmgr) {
cm.expire_channels()
} else {
return;
};
sched.fire_in(Duration::from_secs(delay.as_secs()));
}
}
}