#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
#![doc = include_str!("../README.md")]
#![cfg_attr(not(ci_arti_stable), allow(renamed_and_removed_lints))]
#![cfg_attr(not(ci_arti_nightly), allow(unknown_lints))]
#![deny(missing_docs)]
#![warn(noop_method_call)]
#![deny(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![deny(clippy::missing_panics_doc)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![allow(clippy::let_unit_value)] #![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)]
pub mod config;
pub mod err;
pub mod ipc;
use crate::config::ManagedTransportConfig;
use crate::err::PtError;
use crate::ipc::{PluggableTransport, PtClientMethod, PtParameters};
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use futures::task::SpawnExt;
use futures::{select, FutureExt, StreamExt};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use tor_linkspec::PtTransportName;
use tor_rtcompat::Runtime;
use tracing::{debug, error, warn};
#[cfg(feature = "tor-channel-factory")]
use {
async_trait::async_trait,
tor_chanmgr::{
builder::ChanBuilder,
factory::{AbstractPtError, ChannelFactory},
transport::ExternalProxyPlugin,
},
};
#[derive(Default, Debug)]
struct PtSharedState {
cmethods: HashMap<PtTransportName, PtClientMethod>,
configured: HashMap<PtTransportName, ManagedTransportConfig>,
}
enum PtReactorMessage {
Reconfigured,
Spawn {
pt: PtTransportName,
result: oneshot::Sender<err::Result<PtClientMethod>>,
},
}
type SpawnResult = (Vec<PtTransportName>, err::Result<PluggableTransport>);
struct PtReactor<R> {
rt: R,
running: Vec<PluggableTransport>,
requests: HashMap<PtTransportName, Vec<oneshot::Sender<err::Result<PtClientMethod>>>>,
spawning: FuturesUnordered<Pin<Box<dyn Future<Output = SpawnResult> + Send>>>,
state: Arc<RwLock<PtSharedState>>,
rx: UnboundedReceiver<PtReactorMessage>,
state_dir: PathBuf,
}
impl<R: Runtime> PtReactor<R> {
fn new(
rt: R,
state: Arc<RwLock<PtSharedState>>,
rx: UnboundedReceiver<PtReactorMessage>,
state_dir: PathBuf,
) -> Self {
let spawning = FuturesUnordered::new();
spawning.push(Box::pin(futures::future::pending::<SpawnResult>())
as Pin<Box<dyn Future<Output = _> + Send>>);
Self {
rt,
running: vec![],
requests: Default::default(),
spawning,
state,
rx,
state_dir,
}
}
#[allow(clippy::needless_pass_by_value)]
fn handle_spawned(
&mut self,
covers: Vec<PtTransportName>,
result: err::Result<PluggableTransport>,
) {
match result {
Err(e) => {
warn!("Spawning PT for {:?} failed: {}", covers, e);
let senders = covers
.iter()
.flat_map(|x| self.requests.remove(x))
.flatten();
for sender in senders {
let _ = sender.send(Err(e.clone()));
}
}
Ok(pt) => {
let mut state = self.state.write().expect("ptmgr state poisoned");
for (transport, method) in pt.transport_methods() {
state.cmethods.insert(transport.clone(), method.clone());
for sender in self.requests.remove(transport).into_iter().flatten() {
let _ = sender.send(Ok(method.clone()));
}
}
let requested: HashSet<_> = covers.iter().collect();
let found: HashSet<_> = pt.transport_methods().iter().map(|(t, _)| t).collect();
if requested != found {
warn!("Bug: PT {} succeeded, but did not give the same transports we asked for. ({:?} vs {:?})",
pt.identifier(), found, requested);
}
self.running.push(pt);
}
}
}
fn remove_pt(&self, pt: PluggableTransport) {
let mut state = self.state.write().expect("ptmgr state poisoned");
for transport in pt.transport_methods().keys() {
state.cmethods.remove(transport);
}
drop(pt);
}
async fn run_one_step(&mut self) -> err::Result<bool> {
use futures::future::Either;
let mut all_next_messages = self
.running
.iter_mut()
.map(|pt| Box::pin(pt.next_message()))
.collect::<Vec<_>>();
let mut next_message = if all_next_messages.is_empty() {
Either::Left(futures::future::pending())
} else {
Either::Right(futures::future::select_all(all_next_messages.iter_mut()).fuse())
};
select! {
(result, idx, _) = next_message => {
drop(all_next_messages);
match result {
Ok(m) => {
debug!("PT {} message: {:?}", self.running[idx].identifier(), m);
},
Err(e) => {
warn!("PT {} quit: {:?}", self.running[idx].identifier(), e);
let pt = self.running.remove(idx);
self.remove_pt(pt);
}
}
},
spawn_result = self.spawning.next() => {
drop(all_next_messages);
let (covers, result) = spawn_result.expect("self.spawning should never dry up");
self.handle_spawned(covers, result);
}
internal = self.rx.next() => {
drop(all_next_messages);
match internal {
Some(PtReactorMessage::Reconfigured) => {},
Some(PtReactorMessage::Spawn { pt, result }) => {
if let Some(requests) = self.requests.get_mut(&pt) {
requests.push(result);
return Ok(false);
}
for rpt in self.running.iter() {
if let Some(cmethod) = rpt.transport_methods().get(&pt) {
let _ = result.send(Ok(cmethod.clone()));
return Ok(false);
}
}
let config = {
let state = self.state.read().expect("ptmgr state poisoned");
state.configured.get(&pt).cloned()
};
let config = match config {
Some(v) => v,
None => {
let _ = result.send(Err(PtError::UnconfiguredTransportDueToConcurrentReconfiguration));
return Ok(false);
}
};
self.requests.entry(pt).or_default().push(result);
for proto in config.protocols.iter() {
self.requests.entry(proto.clone()).or_default();
}
let spawn_fut = Box::pin(
spawn_from_config(self.rt.clone(), self.state_dir.clone(), config.clone())
.map(|result| (config.protocols, result))
);
self.spawning.push(spawn_fut);
},
None => return Ok(true)
}
}
}
Ok(false)
}
}
pub struct PtMgr<R> {
#[allow(dead_code)]
runtime: R,
state: Arc<RwLock<PtSharedState>>,
tx: UnboundedSender<PtReactorMessage>,
}
impl<R: Runtime> PtMgr<R> {
fn transform_config(
binaries: Vec<ManagedTransportConfig>,
) -> HashMap<PtTransportName, ManagedTransportConfig> {
let mut ret = HashMap::new();
for thing in binaries {
for tn in thing.protocols.iter() {
ret.insert(tn.clone(), thing.clone());
}
}
ret
}
pub fn new(
transports: Vec<ManagedTransportConfig>,
state_dir: PathBuf,
rt: R,
) -> Result<Self, PtError> {
let state = PtSharedState {
cmethods: Default::default(),
configured: Self::transform_config(transports),
};
let state = Arc::new(RwLock::new(state));
let (tx, rx) = mpsc::unbounded();
let mut reactor = PtReactor::new(rt.clone(), state.clone(), rx, state_dir);
rt.spawn(async move {
loop {
match reactor.run_one_step().await {
Ok(true) => return,
Ok(false) => {}
Err(e) => {
error!("PtReactor failed: {}", e);
return;
}
}
}
})
.map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
Ok(Self {
runtime: rt,
state,
tx,
})
}
pub fn reconfigure(
&self,
how: tor_config::Reconfigure,
transports: Vec<ManagedTransportConfig>,
) -> Result<(), tor_config::ReconfigureError> {
let configured = Self::transform_config(transports);
if how == tor_config::Reconfigure::CheckAllOrNothing {
return Ok(());
}
{
let mut inner = self.state.write().expect("ptmgr poisoned");
inner.configured = configured;
}
let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
Ok(())
}
}
async fn spawn_from_config<R: Runtime>(
rt: R,
state_dir: PathBuf,
cfg: ManagedTransportConfig,
) -> Result<PluggableTransport, PtError> {
let binary_path = cfg.path.path().map_err(|e| PtError::PathExpansionFailed {
path: cfg.path,
error: e,
})?;
let filename = pt_identifier_as_path(&binary_path)?;
let new_state_dir = state_dir.join(filename);
std::fs::create_dir_all(&new_state_dir).map_err(|e| PtError::StatedirCreateFailed {
path: new_state_dir.clone(),
error: Arc::new(e),
})?;
let pt_params = PtParameters::builder()
.state_location(new_state_dir)
.transports(cfg.protocols)
.build()
.expect("PtParameters constructed incorrectly");
let mut pt = PluggableTransport::new(binary_path, cfg.arguments, pt_params);
pt.launch(rt).await?;
Ok(pt)
}
#[cfg(feature = "tor-channel-factory")]
#[async_trait]
impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
async fn factory_for_transport(
&self,
transport: &PtTransportName,
) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
let (mut cmethod, configured) = {
let inner = self.state.read().expect("ptmgr poisoned");
let cmethod = inner.cmethods.get(transport).cloned();
let configured = cmethod.is_some() || inner.configured.get(transport).is_some();
(cmethod, configured)
};
if cmethod.is_none() {
if configured {
let (tx, rx) = oneshot::channel();
self.tx
.unbounded_send(PtReactorMessage::Spawn {
pt: transport.clone(),
result: tx,
})
.map_err(|_| {
Arc::new(PtError::Internal(tor_error::internal!(
"PT reactor closed unexpectedly"
))) as Arc<dyn AbstractPtError>
})?;
cmethod = Some(
rx.await
.map_err(|_| {
Arc::new(PtError::Internal(tor_error::internal!(
"PT reactor closed unexpectedly"
))) as Arc<dyn AbstractPtError>
})?
.map_err(|x| Arc::new(x) as Arc<dyn AbstractPtError>)?,
);
} else {
return Ok(None);
}
}
let cmethod = cmethod.expect("impossible");
let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
let factory = ChanBuilder::new(self.runtime.clone(), proxy);
Ok(Some(Arc::new(factory)))
}
}
fn pt_identifier_as_path(binary_path: impl AsRef<Path>) -> Result<PathBuf, PtError> {
let mut filename =
PathBuf::from(
binary_path
.as_ref()
.file_name()
.ok_or_else(|| PtError::NotAFile {
path: binary_path.as_ref().to_path_buf(),
})?,
);
if let Some(ext) = filename.extension() {
if ext.eq_ignore_ascii_case(std::env::consts::EXE_EXTENSION) {
filename.set_extension("");
}
}
Ok(filename)
}
fn pt_identifier(binary_path: impl AsRef<Path>) -> Result<String, PtError> {
Ok(pt_identifier_as_path(binary_path)?
.to_string_lossy()
.to_string())
}