use crate::config::CircuitTiming;
use crate::usage::{SupportedTunnelUsage, TargetTunnelUsage};
use crate::{DirInfo, Error, PathConfig, Result, timeouts};
use retry_error::RetryError;
use tor_async_utils::mpsc_channel_no_memquota;
use tor_basic_utils::retry::RetryDelay;
use tor_config::MutCfg;
use tor_error::{AbsRetryTime, HasRetryTime, debug_report, info_report, internal, warn_report};
#[cfg(feature = "vanguards")]
use tor_guardmgr::vanguards::VanguardMgr;
use tor_linkspec::CircTarget;
use tor_proto::circuit::UniqId;
use tor_proto::client::circuit::{CircParameters, Path};
use tor_rtcompat::{Runtime, SleepProviderExt};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::future::{FutureExt, Shared};
use futures::stream::{FuturesUnordered, StreamExt};
use oneshot_fused_workaround as oneshot;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::panic::AssertUnwindSafe;
use std::sync::{self, Arc, Weak};
use std::time::{Duration, Instant};
use tor_rtcompat::SpawnExt;
use tracing::{debug, instrument, trace, warn};
use weak_table::PtrWeakHashSet;
mod streams;
#[non_exhaustive]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum TunnelProvenance {
NewlyCreated,
Preexisting,
}
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum RestrictionFailed {
#[error("Specification did not support desired usage")]
NotSupported,
}
#[async_trait]
pub(crate) trait AbstractTunnel: Debug {
type Id: Clone + Debug + Hash + Eq + Send + Sync;
fn id(&self) -> Self::Id;
fn usable(&self) -> bool;
fn single_path(&self) -> tor_proto::Result<Arc<Path>>;
fn n_hops(&self) -> tor_proto::Result<usize>;
fn is_closing(&self) -> bool;
fn unique_id(&self) -> UniqId;
async fn extend<T: CircTarget + Sync>(
&self,
target: &T,
params: CircParameters,
) -> tor_proto::Result<()>;
async fn last_known_to_be_used_at(&self) -> tor_proto::Result<Option<Instant>>;
}
pub(crate) trait MockablePlan {
fn add_blocked_advance_reason(&mut self, _reason: String) {}
}
#[async_trait]
pub(crate) trait AbstractTunnelBuilder<R: Runtime>: Send + Sync {
type Tunnel: AbstractTunnel + Send + Sync;
type Plan: Send + Debug + MockablePlan;
fn plan_tunnel(
&self,
usage: &TargetTunnelUsage,
dir: DirInfo<'_>,
) -> Result<(Self::Plan, SupportedTunnelUsage)>;
async fn build_tunnel(&self, plan: Self::Plan) -> Result<(SupportedTunnelUsage, Self::Tunnel)>;
fn launch_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
let _ = usage; 1
}
fn select_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
let _ = usage; 1
}
fn learning_timeouts(&self) -> bool;
fn save_state(&self) -> Result<bool>;
fn path_config(&self) -> Arc<PathConfig>;
#[allow(dead_code)]
fn set_path_config(&self, new_config: PathConfig);
fn estimator(&self) -> &timeouts::Estimator;
#[cfg(feature = "vanguards")]
fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>>;
fn upgrade_to_owned_state(&self) -> Result<()>;
fn reload_state(&self) -> Result<()>;
fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R>;
fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters);
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ExpirationInfo {
Unused {
created: Instant,
},
Dirty {
dirty_since: Instant,
},
LongLived {
last_known_to_be_used_at: Instant,
},
}
impl ExpirationInfo {
fn new(now: Instant) -> Self {
ExpirationInfo::Unused { created: now }
}
fn mark_used(&mut self, now: Instant, long_lived: bool) {
if long_lived {
*self = ExpirationInfo::LongLived {
last_known_to_be_used_at: now,
};
} else {
match self {
ExpirationInfo::Unused { .. } => {
*self = ExpirationInfo::Dirty { dirty_since: now };
}
ExpirationInfo::Dirty { .. } => {
}
ExpirationInfo::LongLived { .. } => {
}
}
}
}
fn check_long_lived(&self) -> Result<()> {
match self {
ExpirationInfo::Unused { .. } | ExpirationInfo::Dirty { .. } => Err(internal!(
"Tunnel was not long-lived as expected. (Expiration status: {:?})",
self
)
.into()),
ExpirationInfo::LongLived { .. } => Ok(()),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct ExpirationParameters {
expire_unused_after: Duration,
expire_dirty_after: Duration,
expire_disused_after: Duration,
}
#[derive(Debug, Clone)]
pub(crate) struct OpenEntry<T> {
spec: SupportedTunnelUsage,
tunnel: Arc<T>,
expiration: ExpirationInfo,
}
impl<T: AbstractTunnel> OpenEntry<T> {
fn new(spec: SupportedTunnelUsage, tunnel: T, expiration: ExpirationInfo) -> Self {
OpenEntry {
spec,
tunnel: tunnel.into(),
expiration,
}
}
pub(crate) fn supports(&self, usage: &TargetTunnelUsage) -> bool {
self.tunnel.usable() && self.spec.supports(usage)
}
fn restrict_mut(&mut self, usage: &TargetTunnelUsage, now: Instant) -> Result<()> {
self.spec.restrict_mut(usage)?;
self.expiration.mark_used(now, self.spec.is_long_lived());
Ok(())
}
fn find_best<'a>(
ents: &'a mut [&'a mut Self],
usage: &TargetTunnelUsage,
parallelism: usize,
) -> &'a mut Self {
let _ = usage; use rand::seq::IndexedMutRandom as _;
let parallelism = parallelism.clamp(1, ents.len());
let slice = &mut ents[0..parallelism];
let mut rng = rand::rng();
slice.choose_mut(&mut rng).expect("Input list was empty")
}
fn should_expire(&self, now: Instant, params: &ExpirationParameters) -> ShouldExpire {
match self.expiration {
ExpirationInfo::Unused { created } => {
ShouldExpire::certain(now, created + params.expire_unused_after)
}
ExpirationInfo::Dirty { dirty_since } => {
ShouldExpire::certain(now, dirty_since + params.expire_dirty_after)
}
ExpirationInfo::LongLived {
last_known_to_be_used_at,
} => {
ShouldExpire::uncertain(now, last_known_to_be_used_at + params.expire_disused_after)
}
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ShouldExpire {
Now,
PossiblyNow,
NotBefore(Instant),
}
impl ShouldExpire {
fn certain(now: Instant, expiration: Instant) -> Self {
if now >= expiration {
ShouldExpire::Now
} else {
ShouldExpire::NotBefore(expiration)
}
}
fn uncertain(now: Instant, expiration: Instant) -> Self {
if now >= expiration {
ShouldExpire::PossiblyNow
} else {
ShouldExpire::NotBefore(expiration)
}
}
}
type PendResult<B, R> = Result<<<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id>;
struct PendingRequest<B: AbstractTunnelBuilder<R>, R: Runtime> {
usage: TargetTunnelUsage,
notify: mpsc::Sender<PendResult<B, R>>,
}
impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingRequest<B, R> {
fn supported_by(&self, spec: &SupportedTunnelUsage) -> bool {
spec.supports(&self.usage)
}
}
#[derive(Debug)]
struct PendingEntry<B: AbstractTunnelBuilder<R>, R: Runtime> {
tentative_assignment: sync::Mutex<SupportedTunnelUsage>,
receiver: Shared<oneshot::Receiver<PendResult<B, R>>>,
}
impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingEntry<B, R> {
fn new(spec: &SupportedTunnelUsage) -> (Self, oneshot::Sender<PendResult<B, R>>) {
let tentative_assignment = sync::Mutex::new(spec.clone());
let (sender, receiver) = oneshot::channel();
let receiver = receiver.shared();
let entry = PendingEntry {
tentative_assignment,
receiver,
};
(entry, sender)
}
fn supports(&self, usage: &TargetTunnelUsage) -> bool {
let assignment = self.tentative_assignment.lock().expect("poisoned lock");
assignment.supports(usage)
}
fn tentative_restrict_mut(&self, usage: &TargetTunnelUsage) -> Result<()> {
if let Ok(mut assignment) = self.tentative_assignment.lock() {
assignment.restrict_mut(usage)?;
}
Ok(())
}
fn find_best(ents: &[Arc<Self>], usage: &TargetTunnelUsage) -> Vec<Arc<Self>> {
let _ = usage; vec![Arc::clone(&ents[0])]
}
}
#[derive(Debug)]
struct TunnelBuildPlan<B: AbstractTunnelBuilder<R>, R: Runtime> {
plan: B::Plan,
sender: oneshot::Sender<PendResult<B, R>>,
pending: Arc<PendingEntry<B, R>>,
}
struct TunnelList<B: AbstractTunnelBuilder<R>, R: Runtime> {
#[allow(clippy::type_complexity)]
open_tunnels: HashMap<<B::Tunnel as AbstractTunnel>::Id, OpenEntry<B::Tunnel>>,
pending_tunnels: PtrWeakHashSet<Weak<PendingEntry<B, R>>>,
pending_requests: PtrWeakHashSet<Weak<PendingRequest<B, R>>>,
}
impl<B: AbstractTunnelBuilder<R>, R: Runtime> TunnelList<B, R> {
fn new() -> Self {
TunnelList {
open_tunnels: HashMap::new(),
pending_tunnels: PtrWeakHashSet::new(),
pending_requests: PtrWeakHashSet::new(),
}
}
fn add_open(&mut self, e: OpenEntry<B::Tunnel>) {
let id = e.tunnel.id();
self.open_tunnels.insert(id, e);
}
fn find_open(&mut self, usage: &TargetTunnelUsage) -> Option<Vec<&mut OpenEntry<B::Tunnel>>> {
let list = self.open_tunnels.values_mut();
let v = SupportedTunnelUsage::find_supported(list, usage);
if v.is_empty() { None } else { Some(v) }
}
fn get_open_mut(
&mut self,
id: &<B::Tunnel as AbstractTunnel>::Id,
) -> Option<&mut OpenEntry<B::Tunnel>> {
self.open_tunnels.get_mut(id)
}
fn take_open(
&mut self,
id: &<B::Tunnel as AbstractTunnel>::Id,
) -> Option<OpenEntry<B::Tunnel>> {
self.open_tunnels.remove(id)
}
#[must_use]
fn expire_tunnels(
&mut self,
now: Instant,
params: &ExpirationParameters,
) -> (Option<Instant>, Vec<Weak<B::Tunnel>>) {
let mut need_check = Vec::new();
let mut earliest_expiration = None;
self.open_tunnels
.retain(|_k, v| match v.should_expire(now, params) {
ShouldExpire::Now => false,
ShouldExpire::NotBefore(when) => {
earliest_expiration = match earliest_expiration {
Some(t) if t < when => Some(t),
_ => Some(when),
};
true
}
ShouldExpire::PossiblyNow => {
need_check.push(Arc::downgrade(&v.tunnel));
true
}
});
(earliest_expiration, need_check)
}
fn tunnel_should_expire(
&mut self,
id: &<B::Tunnel as AbstractTunnel>::Id,
now: Instant,
params: &ExpirationParameters,
) -> Option<ShouldExpire> {
self.open_tunnels
.get(id)
.map(|v| v.should_expire(now, params))
}
fn update_long_lived_tunnel_last_used(
&mut self,
id: &<B::Tunnel as AbstractTunnel>::Id,
now: Instant,
params: &ExpirationParameters,
disused_since: &tor_proto::Result<Option<Instant>>,
) -> crate::Result<Option<Instant>> {
let Ok(disused_since) = disused_since else {
let discard = self.take_open(id);
if let Some(ent) = discard {
ent.expiration.check_long_lived()?;
}
return Ok(None);
};
let Some(tun) = self.open_tunnels.get_mut(id) else {
return Ok(None);
};
tun.expiration.check_long_lived()?;
let last_known_in_use_at = disused_since.unwrap_or(now);
tun.expiration.mark_used(last_known_in_use_at, true);
match tun.should_expire(now, params) {
ShouldExpire::Now | ShouldExpire::PossiblyNow => {
let _discard = self.take_open(id);
Ok(None)
}
ShouldExpire::NotBefore(instant) => Ok(Some(instant)),
}
}
fn add_pending_tunnel(&mut self, pending: Arc<PendingEntry<B, R>>) {
self.pending_tunnels.insert(pending);
}
fn find_pending_tunnels(
&self,
usage: &TargetTunnelUsage,
) -> Option<Vec<Arc<PendingEntry<B, R>>>> {
let result: Vec<_> = self
.pending_tunnels
.iter()
.filter(|p| p.supports(usage))
.filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
.collect();
if result.is_empty() {
None
} else {
Some(result)
}
}
fn tunnel_is_pending(&self, circ: &Arc<PendingEntry<B, R>>) -> bool {
self.pending_tunnels.contains(circ)
}
fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B, R>>) {
self.pending_requests.insert(Arc::clone(pending));
}
fn find_pending_requests(
&self,
circ_spec: &SupportedTunnelUsage,
) -> Vec<Arc<PendingRequest<B, R>>> {
self.pending_requests
.iter()
.filter(|pend| pend.supported_by(circ_spec))
.collect()
}
fn clear_all_tunnels(&mut self) {
self.pending_tunnels.clear();
self.open_tunnels.clear();
}
}
struct UnusedTimings {
learning: Duration,
not_learning: Duration,
}
#[allow(clippy::fallible_impl_from)]
impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
fn from(v: &tor_netdir::params::NetParameters) -> Self {
#[allow(clippy::unwrap_used)]
UnusedTimings {
learning: v
.unused_client_circ_timeout_while_learning_cbt
.try_into()
.unwrap(),
not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
}
}
}
pub(crate) struct AbstractTunnelMgr<B: AbstractTunnelBuilder<R>, R: Runtime> {
builder: B,
runtime: R,
tunnels: sync::Mutex<TunnelList<B, R>>,
circuit_timing: MutCfg<CircuitTiming>,
unused_timing: sync::Mutex<UnusedTimings>,
}
enum Action<B: AbstractTunnelBuilder<R>, R: Runtime> {
Open(Arc<B::Tunnel>),
Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B, R>>>>),
Build(Vec<TunnelBuildPlan<B, R>>),
}
impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> AbstractTunnelMgr<B, R> {
pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
let circs = sync::Mutex::new(TunnelList::new());
let dflt_params = tor_netdir::params::NetParameters::default();
let unused_timing = (&dflt_params).into();
AbstractTunnelMgr {
builder,
runtime,
tunnels: circs,
circuit_timing: circuit_timing.into(),
unused_timing: sync::Mutex::new(unused_timing),
}
}
pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
let mut u = self
.unused_timing
.lock()
.expect("Poisoned lock for unused_timing");
*u = p.into();
}
pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
self.circuit_timing.get()
}
pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
self.circuit_timing.replace(new_config);
}
#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
pub(crate) async fn get_or_launch(
self: &Arc<Self>,
usage: &TargetTunnelUsage,
dir: DirInfo<'_>,
) -> Result<(Arc<B::Tunnel>, TunnelProvenance)> {
const MAX_RESETS: usize = 8;
let circuit_timing = self.circuit_timing();
let timeout_at = self.runtime.now() + circuit_timing.request_timeout;
let max_tries = circuit_timing.request_max_retries;
let max_failures = usize::div_ceil(
max_tries as usize,
std::cmp::max(1, self.builder.launch_parallelism(usage)),
);
let mut retry_schedule = RetryDelay::from_msec(100);
let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a tunnel");
let mut n_failures = 0;
let mut n_resets = 0;
for attempt_num in 1.. {
let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
None => {
retry_err.push(Error::RequestTimeout);
break;
}
Some(t) => t,
};
let error = match self.prepare_action(usage, dir, true) {
Ok(action) => {
let outcome = self
.runtime
.timeout(remaining, Arc::clone(self).take_action(action, usage))
.await;
match outcome {
Ok(Ok(circ)) => return Ok(circ),
Ok(Err(e)) => {
debug!("Circuit attempt {} failed.", attempt_num);
Error::RequestFailed(e)
}
Err(_) => {
warn!("All tunnel attempts failed due to timeout");
retry_err.push(Error::RequestTimeout);
break;
}
}
}
Err(e) => {
debug_report!(
&e,
"Couldn't pick action for tunnel attempt {}",
attempt_num,
);
e
}
};
let now = self.runtime.now();
let retry_time =
error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::rng()));
let (count, count_limit) = if error.is_internal_reset() {
(&mut n_resets, MAX_RESETS)
} else {
(&mut n_failures, max_failures)
};
match error {
Error::RequestFailed(e) => retry_err.extend(e),
e => retry_err.push(e),
}
*count += 1;
if *count >= count_limit {
warn!("Reached circuit build retry limit, exiting...");
break;
}
match retry_time {
AbsRetryTime::Immediate => {}
AbsRetryTime::Never => break,
AbsRetryTime::At(t) => {
let remaining = timeout_at.saturating_duration_since(now);
let delay = t.saturating_duration_since(now);
trace!(?delay, "Waiting to retry...");
self.runtime.sleep(std::cmp::min(delay, remaining)).await;
}
}
}
warn!("Request failed");
Err(Error::RequestFailed(retry_err))
}
#[cfg(test)]
pub(crate) async fn ensure_tunnel(
self: &Arc<Self>,
usage: &TargetTunnelUsage,
dir: DirInfo<'_>,
) -> Result<()> {
let action = self.prepare_action(usage, dir, false)?;
if let Action::Build(plans) = action {
for plan in plans {
let self_clone = Arc::clone(self);
let _ignore_receiver = self_clone.spawn_launch(usage, plan);
}
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
fn prepare_action(
&self,
usage: &TargetTunnelUsage,
dir: DirInfo<'_>,
restrict_circ: bool,
) -> Result<Action<B, R>> {
let mut list = self.tunnels.lock().expect("poisoned lock");
if let Some(mut open) = list.find_open(usage) {
let parallelism = self.builder.select_parallelism(usage);
let best = OpenEntry::find_best(&mut open, usage, parallelism);
if restrict_circ {
let now = self.runtime.now();
best.restrict_mut(usage, now)?;
}
return Ok(Action::Open(best.tunnel.clone()));
}
if let Some(pending) = list.find_pending_tunnels(usage) {
let best = PendingEntry::find_best(&pending, usage);
if restrict_circ {
for item in &best {
item.tentative_restrict_mut(usage)?;
}
}
let stream = best.iter().map(|item| item.receiver.clone()).collect();
return Ok(Action::Wait(stream));
}
let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
let mut plans = Vec::new();
let mut last_err = None;
for _ in 0..parallelism {
match self.plan_by_usage(dir, usage) {
Ok((pending, plan)) => {
list.add_pending_tunnel(pending);
plans.push(plan);
}
Err(e) => {
debug!("Unable to make a plan for {:?}: {}", usage, e);
last_err = Some(e);
}
}
}
if !plans.is_empty() {
Ok(Action::Build(plans))
} else if let Some(last_err) = last_err {
Err(last_err)
} else {
Err(internal!("no plans were built, but no errors were found").into())
}
}
#[allow(clippy::cognitive_complexity, clippy::type_complexity)] #[instrument(level = "trace", skip_all)]
async fn take_action(
self: Arc<Self>,
act: Action<B, R>,
usage: &TargetTunnelUsage,
) -> std::result::Result<(Arc<B::Tunnel>, TunnelProvenance), RetryError<Box<Error>>> {
fn record_error(
retry_err: &mut RetryError<Box<Error>>,
source: streams::Source,
building: bool,
mut err: Error,
) {
if source == streams::Source::Right {
return;
}
if !building {
err = Error::PendingFailed(Box::new(err));
}
retry_err.push(err);
}
fn describe_source(building: bool, source: streams::Source) -> &'static str {
match (building, source) {
(_, streams::Source::Right) => "optimistic advice",
(true, streams::Source::Left) => "tunnel we're building",
(false, streams::Source::Left) => "pending tunnel",
}
}
let (building, wait_on_stream) = match act {
Action::Open(c) => {
trace!("Returning existing tunnel.");
return Ok((c, TunnelProvenance::Preexisting));
}
Action::Wait(f) => {
trace!("Waiting for tunnel.");
(false, f)
}
Action::Build(plans) => {
trace!("Building new tunnel.");
let futures = FuturesUnordered::new();
for plan in plans {
let self_clone = Arc::clone(&self);
futures.push(self_clone.spawn_launch(usage, plan));
}
(true, futures)
}
};
let (pending_request, additional_stream) = {
let (send, recv) = mpsc_channel_no_memquota(8);
let pending = Arc::new(PendingRequest {
usage: usage.clone(),
notify: send,
});
let mut list = self.tunnels.lock().expect("poisoned lock");
list.add_pending_request(&pending);
(pending, recv)
};
let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
let mut retry_error = RetryError::in_attempt_to("wait for tunnels");
while let Some((src, id)) = incoming.next().await {
match id {
Ok(Ok(ref id)) => {
let mut list = self.tunnels.lock().expect("poisoned lock");
if let Some(ent) = list.get_open_mut(id) {
let now = self.runtime.now();
match ent.restrict_mut(usage, now) {
Ok(()) => {
drop(pending_request);
if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
let try_to_expire_after = if ent.spec.is_long_lived() {
self.circuit_timing().disused_circuit_timeout
} else {
self.circuit_timing().max_dirtiness
};
spawn_expiration_task(
&self.runtime,
Arc::downgrade(&self),
ent.tunnel.id(),
now + try_to_expire_after,
);
}
return Ok((ent.tunnel.clone(), TunnelProvenance::NewlyCreated));
}
Err(e) => {
let e = match e {
Error::UsageMismatched(e) => Error::LostUsabilityRace(e),
x => x,
};
if src == streams::Source::Left {
info_report!(
&e,
"{} suggested we use {:?}, but restrictions failed",
describe_source(building, src),
id,
);
} else {
debug_report!(
&e,
"{} suggested we use {:?}, but restrictions failed",
describe_source(building, src),
id,
);
}
record_error(&mut retry_error, src, building, e);
continue;
}
}
}
}
Ok(Err(ref e)) => {
debug!("{} sent error {:?}", describe_source(building, src), e);
record_error(&mut retry_error, src, building, e.clone());
}
Err(oneshot::Canceled) => {
debug!(
"{} went away (Canceled), quitting take_action right away",
describe_source(building, src)
);
record_error(&mut retry_error, src, building, Error::PendingCanceled);
return Err(retry_error);
}
}
debug!(
"While waiting on tunnel: {:?} from {}",
id,
describe_source(building, src)
);
}
drop(pending_request);
Err(retry_error)
}
#[allow(clippy::type_complexity)]
fn plan_by_usage(
&self,
dir: DirInfo<'_>,
usage: &TargetTunnelUsage,
) -> Result<(Arc<PendingEntry<B, R>>, TunnelBuildPlan<B, R>)> {
let (plan, bspec) = self.builder.plan_tunnel(usage, dir)?;
let (pending, sender) = PendingEntry::new(&bspec);
let pending = Arc::new(pending);
let plan = TunnelBuildPlan {
plan,
sender,
pending: Arc::clone(&pending),
};
Ok((pending, plan))
}
#[instrument(level = "trace", skip_all)]
pub(crate) fn launch_by_usage(
self: &Arc<Self>,
usage: &TargetTunnelUsage,
dir: DirInfo<'_>,
) -> Result<Shared<oneshot::Receiver<PendResult<B, R>>>> {
let (pending, plan) = self.plan_by_usage(dir, usage)?;
self.tunnels
.lock()
.expect("Poisoned lock for tunnel list")
.add_pending_tunnel(pending);
Ok(Arc::clone(self).spawn_launch(usage, plan))
}
#[instrument(level = "trace", skip_all)]
fn spawn_launch(
self: Arc<Self>,
usage: &TargetTunnelUsage,
plan: TunnelBuildPlan<B, R>,
) -> Shared<oneshot::Receiver<PendResult<B, R>>> {
let _ = usage; let TunnelBuildPlan {
mut plan,
sender,
pending,
} = plan;
let request_loyalty = self.circuit_timing().request_loyalty;
let wait_on_future = pending.receiver.clone();
let runtime = self.runtime.clone();
let runtime_copy = self.runtime.clone();
let tid = rand::random::<u64>();
let reason = format!("tunnel builder task {}", tid);
runtime.block_advance(reason.clone());
plan.add_blocked_advance_reason(reason);
runtime
.spawn(async move {
let self_clone = Arc::clone(&self);
let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
let (new_spec, reply) = match future.await {
Ok(x) => x, Err(e) => {
let _ = sender.send(Err(internal!("tunnel build task panicked").into()));
std::panic::panic_any(e);
}
};
let _ = sender.send(reply.clone());
if let Some(new_spec) = new_spec {
let sl = runtime_copy.sleep(request_loyalty);
runtime_copy.allow_one_advance(request_loyalty);
sl.await;
let pending = {
let list = self.tunnels.lock().expect("poisoned lock");
list.find_pending_requests(&new_spec)
};
for pending_request in pending {
let _ = pending_request.notify.clone().try_send(reply.clone());
}
}
runtime_copy.release_advance(format!("tunnel builder task {}", tid));
})
.expect("Couldn't spawn tunnel-building task");
wait_on_future
}
#[instrument(level = "trace", skip_all)]
async fn do_launch(
self: Arc<Self>,
plan: <B as AbstractTunnelBuilder<R>>::Plan,
pending: Arc<PendingEntry<B, R>>,
) -> (Option<SupportedTunnelUsage>, PendResult<B, R>) {
let outcome = self.builder.build_tunnel(plan).await;
match outcome {
Err(e) => (None, Err(e)),
Ok((new_spec, tunnel)) => {
let id = tunnel.id();
let use_duration = self.pick_use_duration();
let now = self.runtime.now();
let exp_inst = now + use_duration;
let runtime_copy = self.runtime.clone();
spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), tunnel.id(), exp_inst);
let use_before = ExpirationInfo::new(now);
let open_ent = OpenEntry::new(new_spec.clone(), tunnel, use_before);
{
let mut list = self.tunnels.lock().expect("poisoned lock");
if list.tunnel_is_pending(&pending) {
list.add_open(open_ent);
drop(pending);
(Some(new_spec), Ok(id))
} else {
drop(pending); (None, Err(Error::CircCanceled))
}
}
}
}
}
fn expiration_params(&self) -> ExpirationParameters {
let expire_unused_after = self.pick_use_duration();
let expire_dirty_after = self.circuit_timing().max_dirtiness;
let expire_disused_after = self.circuit_timing().disused_circuit_timeout;
ExpirationParameters {
expire_unused_after,
expire_dirty_after,
expire_disused_after,
}
}
#[cfg(feature = "hs-common")]
#[instrument(level = "trace", skip_all)]
pub(crate) async fn launch_unmanaged(
&self,
usage: &TargetTunnelUsage,
dir: DirInfo<'_>,
) -> Result<(SupportedTunnelUsage, B::Tunnel)> {
let (_, plan) = self.plan_by_usage(dir, usage)?;
self.builder.build_tunnel(plan.plan).await
}
pub(crate) fn take_tunnel(
&self,
id: &<B::Tunnel as AbstractTunnel>::Id,
) -> Option<Arc<B::Tunnel>> {
let mut list = self.tunnels.lock().expect("poisoned lock");
list.take_open(id).map(|e| e.tunnel)
}
pub(crate) fn retire_all_tunnels(&self) {
let mut list = self.tunnels.lock().expect("poisoned lock");
list.clear_all_tunnels();
}
pub(crate) async fn expire_tunnels(&self, now: Instant) -> Option<Instant> {
let expiration_params = self.expiration_params();
let (mut earliest_expiration, need_to_check) = {
let mut list = self.tunnels.lock().expect("poisoned lock");
list.expire_tunnels(now, &expiration_params)
};
let mut last_known_usage = Vec::new();
for tunnel in need_to_check {
let Some(tunnel) = Weak::upgrade(&tunnel) else {
continue; };
last_known_usage.push((tunnel.id(), tunnel.last_known_to_be_used_at().await));
}
{
let mut list = self.tunnels.lock().expect("poisoned lock");
for (id, disused_since) in last_known_usage {
match list.update_long_lived_tunnel_last_used(
&id,
now,
&expiration_params,
&disused_since,
) {
Ok(Some(may_expire)) => {
earliest_expiration = match earliest_expiration {
Some(exp) if exp < may_expire => Some(exp),
_ => Some(may_expire),
};
}
Ok(None) => {}
Err(e) => warn_report!(e, "Error while updating status on tunnel {:?}", id),
}
}
}
earliest_expiration
}
pub(crate) async fn consider_expiring_tunnel(
&self,
tun_id: &<B::Tunnel as AbstractTunnel>::Id,
now: Instant,
) -> Result<Option<Instant>> {
let expiration_params = self.expiration_params();
let tunnel = {
let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
self.tunnels.lock().expect("poisoned lock");
let Some(should_expire) = list.tunnel_should_expire(tun_id, now, &expiration_params)
else {
return Ok(None);
};
match should_expire {
ShouldExpire::Now => {
let _discard = list.take_open(tun_id);
return Ok(None);
}
ShouldExpire::NotBefore(t) => return Ok(Some(t)),
ShouldExpire::PossiblyNow => {
let Some(tunnel_ent) = list.get_open_mut(tun_id) else {
return Ok(None);
};
Arc::clone(&tunnel_ent.tunnel)
}
}
};
let last_known_in_use_at = tunnel.last_known_to_be_used_at().await;
{
let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
self.tunnels.lock().expect("poisoned lock");
list.update_long_lived_tunnel_last_used(
tun_id,
now,
&expiration_params,
&last_known_in_use_at,
)
}
}
pub(crate) fn n_tunnels(&self) -> usize {
let list = self.tunnels.lock().expect("poisoned lock");
list.open_tunnels.len()
}
#[cfg(test)]
pub(crate) fn n_pending_tunnels(&self) -> usize {
let list = self.tunnels.lock().expect("poisoned lock");
list.pending_tunnels.len()
}
pub(crate) fn peek_runtime(&self) -> &R {
&self.runtime
}
pub(crate) fn peek_builder(&self) -> &B {
&self.builder
}
fn pick_use_duration(&self) -> Duration {
let timings = self
.unused_timing
.lock()
.expect("Poisoned lock for unused_timing");
if self.builder.learning_timeouts() {
timings.learning
} else {
use tor_basic_utils::RngExt as _;
let mut rng = rand::rng();
rng.gen_range_checked(timings.not_learning..=timings.not_learning * 2)
.expect("T .. 2x T turned out to be an empty duration range?!")
}
}
}
fn spawn_expiration_task<B, R>(
runtime: &R,
circmgr: Weak<AbstractTunnelMgr<B, R>>,
circ_id: <<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id,
exp_inst: Instant,
) where
R: Runtime,
B: 'static + AbstractTunnelBuilder<R>,
{
let now = runtime.now();
let rt_copy = runtime.clone();
let mut duration = exp_inst.saturating_duration_since(now);
if let Err(e) = runtime.spawn(async move {
loop {
rt_copy.sleep(duration).await;
let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
cm
} else {
return;
};
match cm.consider_expiring_tunnel(&circ_id, exp_inst).await {
Ok(None) => return,
Ok(Some(when)) => {
duration = when.saturating_duration_since(rt_copy.now());
}
Err(e) => {
warn_report!(
e,
"Error while considering expiration for tunnel {:?}",
circ_id
);
return;
}
}
}
}) {
warn_report!(e, "Unable to launch expiration task");
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use crate::isolation::test::{IsolationTokenEq, assert_isoleq};
use crate::mocks::{FakeBuilder, FakeCirc, FakeId, FakeOp};
use crate::usage::{ExitPolicy, SupportedTunnelUsage};
use crate::{
Error, IsolationToken, StreamIsolation, TargetPort, TargetPorts, TargetTunnelUsage,
};
use std::sync::LazyLock;
use tor_dircommon::fallback::FallbackList;
use tor_guardmgr::TestConfig;
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_netdir::testnet;
use tor_persist::TestingStateMgr;
use tor_rtcompat::SleepProvider;
use tor_rtmock::MockRuntime;
#[allow(deprecated)] use tor_rtmock::MockSleepRuntime;
static FALLBACKS_EMPTY: LazyLock<FallbackList> = LazyLock::new(|| [].into());
fn di() -> DirInfo<'static> {
(&*FALLBACKS_EMPTY).into()
}
fn target_to_spec(target: &TargetTunnelUsage) -> SupportedTunnelUsage {
match target {
TargetTunnelUsage::Exit {
ports,
isolation,
country_code,
require_stability,
} => SupportedTunnelUsage::Exit {
policy: ExitPolicy::from_target_ports(&TargetPorts::from(&ports[..])),
isolation: Some(isolation.clone()),
country_code: country_code.clone(),
all_relays_stable: *require_stability,
},
_ => unimplemented!(),
}
}
impl<U: PartialEq> IsolationTokenEq for OpenEntry<U> {
fn isol_eq(&self, other: &Self) -> bool {
self.spec.isol_eq(&other.spec)
&& self.tunnel == other.tunnel
&& self.expiration == other.expiration
}
}
impl<U: PartialEq> IsolationTokenEq for &mut OpenEntry<U> {
fn isol_eq(&self, other: &Self) -> bool {
self.spec.isol_eq(&other.spec)
&& self.tunnel == other.tunnel
&& self.expiration == other.expiration
}
}
fn make_builder<R: Runtime>(runtime: &R) -> FakeBuilder<R> {
let state_mgr = TestingStateMgr::new();
let guard_config = TestConfig::default();
FakeBuilder::new(runtime, state_mgr, &guard_config)
}
#[test]
fn basic_tests() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let builder = make_builder(&rt);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let webports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
assert_eq!(mgr.n_tunnels(), 0);
assert!(mgr.peek_builder().script.lock().unwrap().is_empty());
let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
let c1 = c1.unwrap().0;
assert_eq!(mgr.n_tunnels(), 1);
let port80 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
let c2 = mgr.get_or_launch(&port80, di()).await;
let c2 = c2.unwrap().0;
assert!(FakeCirc::eq(&c1, &c2));
assert_eq!(mgr.n_tunnels(), 1);
let dnsport = TargetTunnelUsage::new_from_ipv4_ports(&[53]);
let dnsport_restrict = TargetTunnelUsage::Exit {
ports: vec![TargetPort::ipv4(53)],
isolation: StreamIsolation::builder().build().unwrap(),
country_code: None,
require_stability: false,
};
let (c3, c4) = rt
.wait_for(futures::future::join(
mgr.get_or_launch(&dnsport, di()),
mgr.get_or_launch(&dnsport_restrict, di()),
))
.await;
let c3 = c3.unwrap().0;
let c4 = c4.unwrap().0;
assert!(!FakeCirc::eq(&c1, &c3));
assert!(FakeCirc::eq(&c3, &c4));
assert_eq!(c3.id(), c4.id());
assert_eq!(mgr.n_tunnels(), 2);
let c3_taken = mgr.take_tunnel(&c3.id()).unwrap();
let now_its_gone = mgr.take_tunnel(&c4.id());
assert!(FakeCirc::eq(&c3_taken, &c3));
assert!(now_its_gone.is_none());
assert_eq!(mgr.n_tunnels(), 1);
let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
let c5 = c5.unwrap().0;
assert!(!FakeCirc::eq(&c3, &c5));
assert!(!FakeCirc::eq(&c4, &c5));
assert_eq!(mgr.n_tunnels(), 2);
let prev = mgr.n_pending_tunnels();
assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
assert_eq!(mgr.n_pending_tunnels(), prev + 1);
});
}
#[test]
fn request_timeout() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(&ports, vec![FakeOp::Fail, FakeOp::Timeout]);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let c1 = mgr
.peek_runtime()
.wait_for(mgr.get_or_launch(&ports, di()))
.await;
assert!(matches!(c1, Err(Error::RequestFailed(_))));
});
}
#[test]
fn request_timeout2() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(
&ports,
vec![
FakeOp::Delay(Duration::from_millis(60_000 - 25)),
FakeOp::NoPlan,
],
);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let c1 = mgr
.peek_runtime()
.wait_for(mgr.get_or_launch(&ports, di()))
.await;
assert!(matches!(c1, Err(Error::RequestFailed(_))));
});
}
#[test]
fn request_unplannable() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(&ports, vec![FakeOp::NoPlan; 2000]);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
assert!(matches!(c1, Err(Error::RequestFailed(_))));
});
}
#[test]
fn request_fails_too_much() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(&ports, vec![FakeOp::Fail; 1000]);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
assert!(matches!(c1, Err(Error::RequestFailed(_))));
});
}
#[test]
fn request_wrong_spec() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(
&ports,
vec![FakeOp::WrongSpec(target_to_spec(
&TargetTunnelUsage::new_from_ipv4_ports(&[22]),
))],
);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
assert!(c1.is_ok());
});
}
#[test]
fn request_retried() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(&ports, vec![FakeOp::Fail, FakeOp::Fail]);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
rt.block_advance("test doesn't require advancing");
let (c1, c2) = rt
.wait_for(futures::future::join(
mgr.get_or_launch(&ports, di()),
mgr.get_or_launch(&ports, di()),
))
.await;
let c1 = c1.unwrap().0;
let c2 = c2.unwrap().0;
assert!(FakeCirc::eq(&c1, &c2));
});
}
#[test]
fn isolated() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let builder = make_builder(&rt);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let iso1 = TargetTunnelUsage::Exit {
ports: vec![TargetPort::ipv4(443)],
isolation: StreamIsolation::builder()
.owner_token(IsolationToken::new())
.build()
.unwrap(),
country_code: None,
require_stability: false,
};
let iso2 = TargetTunnelUsage::Exit {
ports: vec![TargetPort::ipv4(443)],
isolation: StreamIsolation::builder()
.owner_token(IsolationToken::new())
.build()
.unwrap(),
country_code: None,
require_stability: false,
};
let no_iso1 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
let no_iso2 = no_iso1.clone();
use itertools::Itertools;
let timeouts: Vec<_> = [0_u64, 2, 4, 6]
.iter()
.map(|d| Duration::from_millis(*d))
.collect();
for delays in timeouts.iter().permutations(4) {
let d1 = delays[0];
let d2 = delays[1];
let d3 = delays[2];
let d4 = delays[2];
let (c_iso1, c_iso2, c_no_iso1, c_no_iso2) = rt
.wait_for(futures::future::join4(
async {
rt.sleep(*d1).await;
mgr.get_or_launch(&iso1, di()).await
},
async {
rt.sleep(*d2).await;
mgr.get_or_launch(&iso2, di()).await
},
async {
rt.sleep(*d3).await;
mgr.get_or_launch(&no_iso1, di()).await
},
async {
rt.sleep(*d4).await;
mgr.get_or_launch(&no_iso2, di()).await
},
))
.await;
let c_iso1 = c_iso1.unwrap().0;
let c_iso2 = c_iso2.unwrap().0;
let c_no_iso1 = c_no_iso1.unwrap().0;
let c_no_iso2 = c_no_iso2.unwrap().0;
assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
assert!(!FakeCirc::eq(&c_iso1, &c_no_iso1));
assert!(!FakeCirc::eq(&c_iso1, &c_no_iso2));
assert!(!FakeCirc::eq(&c_iso2, &c_no_iso1));
assert!(!FakeCirc::eq(&c_iso2, &c_no_iso2));
assert!(FakeCirc::eq(&c_no_iso1, &c_no_iso2));
}
});
}
#[test]
fn opportunistic() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let builder = make_builder(&rt);
builder.set(&ports1, vec![FakeOp::Timeout]);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let (c1, c2) = rt
.wait_for(futures::future::join(
mgr.get_or_launch(&ports1, di()),
async {
rt.sleep(Duration::from_millis(100)).await;
mgr.get_or_launch(&ports2, di()).await
},
))
.await;
if let (Ok((c1, _)), Ok((c2, _))) = (c1, c2) {
assert!(FakeCirc::eq(&c1, &c2));
} else {
panic!();
};
});
}
#[test]
fn prebuild() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let builder = make_builder(&rt);
let mgr = Arc::new(AbstractTunnelMgr::new(
builder,
rt.clone(),
CircuitTiming::default(),
));
let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
let ports3 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
let (ok, c1, c2) = rt
.wait_for(futures::future::join3(
mgr.ensure_tunnel(&ports1, di()),
async {
rt.sleep(Duration::from_millis(10)).await;
mgr.get_or_launch(&ports2, di()).await
},
async {
rt.sleep(Duration::from_millis(50)).await;
mgr.get_or_launch(&ports3, di()).await
},
))
.await;
assert!(ok.is_ok());
let c1 = c1.unwrap().0;
let c2 = c2.unwrap().0;
assert!(FakeCirc::eq(&c1, &c2));
});
}
#[test]
fn expiration() {
MockRuntime::test_with_various(|rt| async move {
use crate::config::CircuitTimingBuilder;
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let builder = make_builder(&rt);
let circuit_timing = CircuitTimingBuilder::default()
.max_dirtiness(Duration::from_secs(15))
.build()
.unwrap();
let mgr = Arc::new(AbstractTunnelMgr::new(builder, rt.clone(), circuit_timing));
let imap = TargetTunnelUsage::new_from_ipv4_ports(&[993]);
let pop = TargetTunnelUsage::new_from_ipv4_ports(&[995]);
let (ok, pop1) = rt
.wait_for(futures::future::join(
mgr.ensure_tunnel(&imap, di()),
mgr.get_or_launch(&pop, di()),
))
.await;
assert!(ok.is_ok());
let pop1 = pop1.unwrap().0;
rt.advance(Duration::from_secs(30)).await;
rt.advance(Duration::from_secs(15)).await;
let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap().0;
let now = rt.now();
mgr.expire_tunnels(now).await;
let (pop2, imap2) = rt
.wait_for(futures::future::join(
mgr.get_or_launch(&pop, di()),
mgr.get_or_launch(&imap, di()),
))
.await;
let pop2 = pop2.unwrap().0;
let imap2 = imap2.unwrap().0;
assert!(!FakeCirc::eq(&pop2, &pop1));
assert!(FakeCirc::eq(&imap2, &imap1));
});
}
fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let id_noexit: Ed25519Identity = [0x05; 32].into();
let id_webexit: Ed25519Identity = [0x11; 32].into();
let id_fullexit: Ed25519Identity = [0x20; 32].into();
let not_exit = network.by_id(&id_noexit).unwrap();
let web_exit = network.by_id(&id_webexit).unwrap();
let full_exit = network.by_id(&id_fullexit).unwrap();
let ep_none = ExitPolicy::from_relay(¬_exit);
let ep_web = ExitPolicy::from_relay(&web_exit);
let ep_full = ExitPolicy::from_relay(&full_exit);
(ep_none, ep_web, ep_full)
}
#[test]
fn test_find_supported() {
let (ep_none, ep_web, ep_full) = get_exit_policies();
let fake_circ = FakeCirc { id: FakeId::next() };
let expiration = ExpirationInfo::Unused {
created: Instant::now(),
};
let mut entry_none = OpenEntry::new(
SupportedTunnelUsage::Exit {
policy: ep_none,
isolation: None,
country_code: None,
all_relays_stable: true,
},
fake_circ.clone(),
expiration.clone(),
);
let mut entry_none_c = entry_none.clone();
let mut entry_web = OpenEntry::new(
SupportedTunnelUsage::Exit {
policy: ep_web,
isolation: None,
country_code: None,
all_relays_stable: true,
},
fake_circ.clone(),
expiration.clone(),
);
let mut entry_web_c = entry_web.clone();
let mut entry_full = OpenEntry::new(
SupportedTunnelUsage::Exit {
policy: ep_full,
isolation: None,
country_code: None,
all_relays_stable: true,
},
fake_circ,
expiration,
);
let mut entry_full_c = entry_full.clone();
let usage_web = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
let empty: Vec<&mut OpenEntry<FakeCirc>> = vec![];
assert_isoleq!(
SupportedTunnelUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
empty
);
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none, &mut entry_web].into_iter(),
&usage_web,
),
vec![&mut entry_web_c]
);
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
&usage_web,
),
vec![&mut entry_web_c, &mut entry_full_c]
);
let usage_preemptive_web = TargetTunnelUsage::Preemptive {
port: Some(TargetPort::ipv4(80)),
circs: 2,
require_stability: false,
};
let usage_preemptive_dns = TargetTunnelUsage::Preemptive {
port: None,
circs: 2,
require_stability: false,
};
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none].into_iter(),
&usage_preemptive_web
),
empty
);
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none].into_iter(),
&usage_preemptive_dns
),
empty
);
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none, &mut entry_web].into_iter(),
&usage_preemptive_web
),
empty
);
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none, &mut entry_web].into_iter(),
&usage_preemptive_dns
),
vec![&mut entry_none_c, &mut entry_web_c]
);
assert_isoleq!(
SupportedTunnelUsage::find_supported(
vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
&usage_preemptive_web
),
vec![&mut entry_web_c, &mut entry_full_c]
);
}
#[test]
fn test_circlist_preemptive_target_circs() {
MockRuntime::test_with_various(|rt| async move {
#[allow(deprecated)] let rt = MockSleepRuntime::new(rt);
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let dirinfo = DirInfo::Directory(&netdir);
let builder = make_builder(&rt);
for circs in [2, 8].iter() {
let mut circlist = TunnelList::<FakeBuilder<MockRuntime>, MockRuntime>::new();
let preemptive_target = TargetTunnelUsage::Preemptive {
port: Some(TargetPort::ipv4(80)),
circs: *circs,
require_stability: false,
};
for _ in 0..*circs {
assert!(circlist.find_open(&preemptive_target).is_none());
let usage = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
let (plan, _) = builder.plan_tunnel(&usage, dirinfo).unwrap();
let (spec, circ) = rt.wait_for(builder.build_tunnel(plan)).await.unwrap();
let entry = OpenEntry::new(
spec,
circ,
ExpirationInfo::new(rt.now() + Duration::from_secs(60)),
);
circlist.add_open(entry);
}
assert!(circlist.find_open(&preemptive_target).is_some());
}
});
}
}