#![allow(missing_docs, clippy::missing_docs_in_private_items)]
use crate::{
BootstrapBehavior, InertTorClient, Result, TorClient, TorClientConfig, err::ErrorDetail,
};
use std::{
result::Result as StdResult,
sync::Arc,
time::{Duration, Instant},
};
use tor_dirmgr::{DirMgrConfig, DirMgrStore};
use tor_error::{ErrorKind, HasKind as _};
use tor_rtcompat::Runtime;
use tracing::instrument;
#[allow(unreachable_pub)]
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-api")))]
pub trait DirProviderBuilder<R: Runtime>: Send + Sync {
fn build(
&self,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<tor_circmgr::CircMgr<R>>,
config: DirMgrConfig,
) -> Result<Arc<dyn tor_dirmgr::DirProvider + 'static>>;
}
#[derive(Clone, Debug)]
struct DirMgrBuilder {}
impl<R: Runtime> DirProviderBuilder<R> for DirMgrBuilder {
fn build(
&self,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<tor_circmgr::CircMgr<R>>,
config: DirMgrConfig,
) -> Result<Arc<dyn tor_dirmgr::DirProvider + 'static>> {
let dirmgr = tor_dirmgr::DirMgr::create_unbootstrapped(config, runtime, store, circmgr)
.map_err(ErrorDetail::DirMgrSetup)?;
Ok(Arc::new(dirmgr))
}
}
#[derive(Clone)]
#[must_use]
pub struct TorClientBuilder<R: Runtime> {
runtime: R,
config: TorClientConfig,
bootstrap_behavior: BootstrapBehavior,
dirmgr_builder: Arc<dyn DirProviderBuilder<R>>,
local_resource_timeout: Option<Duration>,
#[cfg(feature = "dirfilter")]
dirfilter: tor_dirmgr::filter::FilterConfig,
}
pub const MAX_LOCAL_RESOURCE_TIMEOUT: Duration = Duration::new(5, 0);
impl<R: Runtime> TorClientBuilder<R> {
pub(crate) fn new(runtime: R) -> Self {
Self {
runtime,
config: TorClientConfig::default(),
bootstrap_behavior: BootstrapBehavior::default(),
dirmgr_builder: Arc::new(DirMgrBuilder {}),
local_resource_timeout: None,
#[cfg(feature = "dirfilter")]
dirfilter: None,
}
}
pub fn config(mut self, config: TorClientConfig) -> Self {
self.config = config;
self
}
pub fn bootstrap_behavior(mut self, bootstrap_behavior: BootstrapBehavior) -> Self {
self.bootstrap_behavior = bootstrap_behavior;
self
}
pub fn local_resource_timeout(mut self, timeout: Duration) -> Self {
self.local_resource_timeout = Some(timeout);
self
}
#[cfg(all(feature = "experimental-api", feature = "error_detail"))]
pub fn dirmgr_builder<B>(mut self, builder: Arc<dyn DirProviderBuilder<R>>) -> Self
where
B: DirProviderBuilder<R> + 'static,
{
self.dirmgr_builder = builder;
self
}
#[cfg(feature = "dirfilter")]
pub fn dirfilter<F>(mut self, filter: F) -> Self
where
F: Into<Arc<dyn tor_dirmgr::filter::DirFilter + 'static>>,
{
self.dirfilter = Some(filter.into());
self
}
#[instrument(skip_all, level = "trace")]
pub fn create_unbootstrapped(&self) -> Result<TorClient<R>> {
let timeout = self.local_resource_timeout_or(Duration::from_millis(0))?;
let give_up_at = Instant::now() + timeout;
let mut first_attempt = true;
loop {
match self.create_unbootstrapped_inner(Instant::now, give_up_at, first_attempt) {
Err(delay) => {
first_attempt = false;
std::thread::sleep(delay);
}
Ok(other) => return other,
}
}
}
#[instrument(skip_all, level = "trace")]
pub async fn create_unbootstrapped_async(&self) -> Result<TorClient<R>> {
let timeout = self.local_resource_timeout_or(Duration::from_millis(500))?;
let give_up_at = self.runtime.now() + timeout;
let mut first_attempt = true;
loop {
match self.create_unbootstrapped_inner(|| self.runtime.now(), give_up_at, first_attempt)
{
Err(delay) => {
first_attempt = false;
self.runtime.sleep(delay).await;
}
Ok(other) => return other,
}
}
}
#[instrument(skip_all, level = "trace")]
fn create_unbootstrapped_inner<F>(
&self,
now: F,
give_up_at: Instant,
first_attempt: bool,
) -> StdResult<Result<TorClient<R>>, Duration>
where
F: FnOnce() -> Instant,
{
#[allow(unused_mut)]
let mut dirmgr_extensions = tor_dirmgr::config::DirMgrExtensions::default();
#[cfg(feature = "dirfilter")]
{
dirmgr_extensions.filter.clone_from(&self.dirfilter);
}
let result: Result<TorClient<R>> = TorClient::create_inner(
self.runtime.clone(),
&self.config,
self.bootstrap_behavior,
self.dirmgr_builder.as_ref(),
dirmgr_extensions,
)
.map_err(ErrorDetail::into);
match result {
Err(e) if e.kind() == ErrorKind::LocalResourceAlreadyInUse => {
let now = now();
if now >= give_up_at {
Ok(Err(e))
} else {
let remaining = give_up_at.saturating_duration_since(now);
if first_attempt {
tracing::info!(
"Looks like another TorClient may be running; retrying for up to {}",
humantime::Duration::from(remaining),
);
}
Err(Duration::from_millis(50).min(remaining))
}
}
other => Ok(other),
}
}
pub async fn create_bootstrapped(&self) -> Result<TorClient<R>> {
let r = self.create_unbootstrapped_async().await?;
r.bootstrap().await?;
Ok(r)
}
fn local_resource_timeout_or(&self, dflt: Duration) -> Result<Duration> {
let timeout = self.local_resource_timeout.unwrap_or(dflt);
if timeout > MAX_LOCAL_RESOURCE_TIMEOUT {
return Err(
ErrorDetail::Configuration(tor_config::ConfigBuildError::Invalid {
field: "local_resource_timeout".into(),
problem: "local resource timeout too large".into(),
})
.into(),
);
}
Ok(timeout)
}
#[allow(clippy::unnecessary_wraps)]
pub fn create_inert(&self) -> Result<InertTorClient> {
Ok(InertTorClient::new(&self.config)?)
}
}
#[cfg(test)]
mod test {
use tor_rtcompat::PreferredRuntime;
use super::*;
fn must_be_send_and_sync<S: Send + Sync>() {}
#[test]
fn builder_is_send() {
must_be_send_and_sync::<TorClientBuilder<PreferredRuntime>>();
}
}