mod config;
mod pool;
use std::{
ops::Deref,
sync::{Arc, Mutex, Weak},
time::Duration,
};
use crate::{
AbstractTunnel, CircMgr, CircMgrInner, ClientOnionServiceDataTunnel,
ClientOnionServiceDirTunnel, ClientOnionServiceIntroTunnel, Error, Result,
ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel, ServiceOnionServiceIntroTunnel,
build::{TunnelBuilder, onion_circparams_from_netparams},
mgr::AbstractTunnelBuilder,
path::hspath::hs_stem_terminal_hop_usage,
timeouts,
};
use futures::{StreamExt, TryFutureExt};
use once_cell::sync::OnceCell;
use tor_error::{Bug, debug_report};
use tor_error::{bad_api_usage, internal};
use tor_guardmgr::VanguardMode;
use tor_linkspec::{
CircTarget, HasRelayIds as _, IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget,
};
use tor_netdir::{NetDir, NetDirProvider, Relay};
use tor_proto::client::circuit::{self, CircParameters};
use tor_relay_selection::{LowLevelRelayPredicate, RelayExclusion};
use tor_rtcompat::{
Runtime, SleepProviderExt, SpawnExt,
scheduler::{TaskHandle, TaskSchedule},
};
use tracing::{debug, instrument, trace, warn};
use std::result::Result as StdResult;
pub use config::HsCircPoolConfig;
use self::pool::HsCircPrefs;
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
use crate::path::hspath::select_middle_for_vanguard_circ;
#[cfg(feature = "hs-common")]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[non_exhaustive]
pub enum HsCircKind {
SvcHsDir,
SvcIntro,
SvcRend,
ClientHsDir,
ClientIntro,
ClientRend,
}
impl HsCircKind {
fn stem_kind(&self) -> HsCircStemKind {
match self {
HsCircKind::SvcIntro => HsCircStemKind::Naive,
HsCircKind::SvcHsDir => {
HsCircStemKind::Naive
}
HsCircKind::ClientRend => {
HsCircStemKind::Guarded
}
HsCircKind::SvcRend | HsCircKind::ClientHsDir | HsCircKind::ClientIntro => {
HsCircStemKind::Guarded
}
}
}
}
pub(crate) struct HsCircStem<C: AbstractTunnel> {
pub(crate) circ: C,
pub(crate) kind: HsCircStemKind,
}
impl<C: AbstractTunnel> HsCircStem<C> {
fn satisfies_prefs(&self, prefs: &HsCircPrefs) -> bool {
let HsCircPrefs { kind_prefs } = prefs;
match kind_prefs {
Some(kind) => *kind == self.kind,
None => true,
}
}
}
impl<C: AbstractTunnel> Deref for HsCircStem<C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.circ
}
}
impl<C: AbstractTunnel> HsCircStem<C> {
pub(crate) fn can_become(&self, other: HsCircStemKind) -> bool {
use HsCircStemKind::*;
match (self.kind, other) {
(Naive, Naive) | (Guarded, Guarded) | (Naive, Guarded) => true,
(Guarded, Naive) => false,
}
}
}
#[allow(rustdoc::private_intra_doc_links)]
#[derive(Copy, Clone, Debug, PartialEq, derive_more::Display)]
#[non_exhaustive]
pub(crate) enum HsCircStemKind {
#[display("NAIVE")]
Naive,
#[display("GUARDED")]
Guarded,
}
impl HsCircStemKind {
pub(crate) fn num_hops(&self, mode: VanguardMode) -> StdResult<usize, Bug> {
use HsCircStemKind::*;
use VanguardMode::*;
let len = match (mode, self) {
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
(Lite, _) => 3,
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
(Full, Naive) => 3,
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
(Full, Guarded) => 4,
(Disabled, _) => 3,
(_, _) => {
return Err(internal!("Unsupported vanguard mode {mode}"));
}
};
Ok(len)
}
}
pub struct HsCircPool<R: Runtime>(Arc<HsCircPoolInner<TunnelBuilder<R>, R>>);
impl<R: Runtime> HsCircPool<R> {
pub fn new(circmgr: &Arc<CircMgr<R>>) -> Self {
Self(Arc::new(HsCircPoolInner::new(circmgr)))
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch_client_dir<T>(
&self,
netdir: &NetDir,
target: T,
) -> Result<ClientOnionServiceDirTunnel>
where
T: CircTarget + Sync,
{
let tunnel = self
.0
.get_or_launch_specific(netdir, HsCircKind::ClientHsDir, target)
.await?;
Ok(tunnel.into())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch_client_intro<T>(
&self,
netdir: &NetDir,
target: T,
) -> Result<ClientOnionServiceIntroTunnel>
where
T: CircTarget + Sync,
{
let tunnel = self
.0
.get_or_launch_specific(netdir, HsCircKind::ClientIntro, target)
.await?;
Ok(tunnel.into())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch_svc_dir<T>(
&self,
netdir: &NetDir,
target: T,
) -> Result<ServiceOnionServiceDirTunnel>
where
T: CircTarget + Sync,
{
let tunnel = self
.0
.get_or_launch_specific(netdir, HsCircKind::SvcHsDir, target)
.await?;
Ok(tunnel.into())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch_svc_intro<T>(
&self,
netdir: &NetDir,
target: T,
) -> Result<ServiceOnionServiceIntroTunnel>
where
T: CircTarget + Sync,
{
let tunnel = self
.0
.get_or_launch_specific(netdir, HsCircKind::SvcIntro, target)
.await?;
Ok(tunnel.into())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch_svc_rend<T>(
&self,
netdir: &NetDir,
target: T,
) -> Result<ServiceOnionServiceDataTunnel>
where
T: CircTarget + Sync,
{
let tunnel = self
.0
.get_or_launch_specific(netdir, HsCircKind::SvcRend, target)
.await?;
Ok(tunnel.into())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_or_launch_client_rend<'a>(
&self,
netdir: &'a NetDir,
) -> Result<(ClientOnionServiceDataTunnel, Relay<'a>)> {
let (tunnel, relay) = self.0.get_or_launch_client_rend(netdir).await?;
Ok((tunnel.into(), relay))
}
pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
self.0.estimate_timeout(timeout_action)
}
pub fn launch_background_tasks(
self: &Arc<Self>,
runtime: &R,
netdir_provider: &Arc<dyn NetDirProvider + 'static>,
) -> Result<Vec<TaskHandle>> {
HsCircPoolInner::launch_background_tasks(&self.0.clone(), runtime, netdir_provider)
}
pub fn retire_all_circuits(&self) -> StdResult<(), tor_config::ReconfigureError> {
self.0.retire_all_circuits()
}
}
pub(crate) struct HsCircPoolInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
circmgr: Arc<CircMgrInner<B, R>>,
launcher_handle: OnceCell<TaskHandle>,
inner: Mutex<Inner<B::Tunnel>>,
}
struct Inner<C: AbstractTunnel> {
pool: pool::Pool<C>,
}
impl<R: Runtime> HsCircPoolInner<TunnelBuilder<R>, R> {
pub(crate) fn new(circmgr: &CircMgr<R>) -> Self {
Self::new_internal(&circmgr.0)
}
}
impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> HsCircPoolInner<B, R> {
pub(crate) fn new_internal(circmgr: &Arc<CircMgrInner<B, R>>) -> Self {
let circmgr = Arc::clone(circmgr);
let pool = pool::Pool::default();
Self {
circmgr,
launcher_handle: OnceCell::new(),
inner: Mutex::new(Inner { pool }),
}
}
#[instrument(level = "trace", skip_all)]
pub(crate) fn launch_background_tasks(
self: &Arc<Self>,
runtime: &R,
netdir_provider: &Arc<dyn NetDirProvider + 'static>,
) -> Result<Vec<TaskHandle>> {
let handle = self.launcher_handle.get_or_try_init(|| {
runtime
.spawn(remove_unusable_circuits(
Arc::downgrade(self),
Arc::downgrade(netdir_provider),
))
.map_err(|e| Error::from_spawn("preemptive onion circuit expiration task", e))?;
let (schedule, handle) = TaskSchedule::new(runtime.clone());
runtime
.spawn(launch_hs_circuits_as_needed(
Arc::downgrade(self),
Arc::downgrade(netdir_provider),
schedule,
))
.map_err(|e| Error::from_spawn("preemptive onion circuit builder task", e))?;
Result::<TaskHandle>::Ok(handle)
})?;
Ok(vec![handle.clone()])
}
#[instrument(level = "trace", skip_all)]
pub(crate) async fn get_or_launch_client_rend<'a>(
&self,
netdir: &'a NetDir,
) -> Result<(B::Tunnel, Relay<'a>)> {
let circ = self
.take_or_launch_stem_circuit::<OwnedCircTarget>(netdir, None, HsCircKind::ClientRend)
.await?;
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
if matches!(
self.vanguard_mode(),
VanguardMode::Full | VanguardMode::Lite
) && circ.kind != HsCircStemKind::Guarded
{
return Err(internal!("wanted a GUARDED circuit, but got NAIVE?!").into());
}
let path = circ.single_path().map_err(|error| Error::Protocol {
action: "launching a client rend circuit",
peer: None, unique_id: Some(circ.unique_id()),
error,
})?;
match path.hops().last() {
Some(ent) => {
let Some(ct) = ent.as_chan_target() else {
return Err(
internal!("HsPool gave us a circuit with a virtual last hop!?").into(),
);
};
match netdir.by_ids(ct) {
Some(relay) => Ok((circ.circ, relay)),
None => Err(internal!("Got circuit with unknown last hop!?").into()),
}
}
None => Err(internal!("Circuit with an empty path!?").into()),
}
}
#[instrument(level = "trace", skip_all)]
pub(crate) async fn get_or_launch_specific<T>(
&self,
netdir: &NetDir,
kind: HsCircKind,
target: T,
) -> Result<B::Tunnel>
where
T: CircTarget + Sync,
{
if kind == HsCircKind::ClientRend {
return Err(bad_api_usage!("get_or_launch_specific with ClientRend circuit!?").into());
}
let wanted_kind = kind.stem_kind();
let circ = self
.take_or_launch_stem_circuit(netdir, Some(&target), kind)
.await?;
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
if matches!(
self.vanguard_mode(),
VanguardMode::Full | VanguardMode::Lite
) && circ.kind != wanted_kind
{
return Err(internal!(
"take_or_launch_stem_circuit() returned {:?}, but we need {wanted_kind:?}",
circ.kind
)
.into());
}
let mut params = onion_circparams_from_netparams(netdir.params())?;
params.n_incoming_cells_permitted = match kind {
HsCircKind::ClientHsDir => Some(netdir.params().hsdir_dl_max_reply_cells.into()),
HsCircKind::SvcHsDir => Some(netdir.params().hsdir_ul_max_reply_cells.into()),
HsCircKind::SvcIntro
| HsCircKind::SvcRend
| HsCircKind::ClientIntro
| HsCircKind::ClientRend => None,
};
self.extend_circ(circ, params, target).await
}
async fn extend_circ<T>(
&self,
circ: HsCircStem<B::Tunnel>,
params: CircParameters,
target: T,
) -> Result<B::Tunnel>
where
T: CircTarget + Sync,
{
let protocol_err = |error| Error::Protocol {
action: "extending to chosen HS hop",
peer: None, unique_id: Some(circ.unique_id()),
error,
};
let n_hops = circ.n_hops().map_err(protocol_err)?;
let (extend_timeout, _) = self.circmgr.mgr.peek_builder().estimator().timeouts(
&crate::timeouts::Action::ExtendCircuit {
initial_length: n_hops,
final_length: n_hops + 1,
},
);
let extend_future = circ.extend(&target, params).map_err(protocol_err);
self.circmgr
.mgr
.peek_runtime()
.timeout(extend_timeout, extend_future)
.await
.map_err(|_| Error::CircTimeout(Some(circ.unique_id())))??;
Ok(circ.circ)
}
pub(crate) fn retire_all_circuits(&self) -> StdResult<(), tor_config::ReconfigureError> {
self.inner
.lock()
.expect("poisoned lock")
.pool
.retire_all_circuits()?;
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn take_or_launch_stem_circuit<T>(
&self,
netdir: &NetDir,
avoid_target: Option<&T>,
kind: HsCircKind,
) -> Result<HsCircStem<B::Tunnel>>
where
T: CircTarget + Sync,
{
let stem_kind = kind.stem_kind();
let vanguard_mode = self.vanguard_mode();
trace!(
vanguards=%vanguard_mode,
kind=%stem_kind,
"selecting HS circuit stem"
);
let target_exclusion = {
let path_cfg = self.circmgr.builder().path_config();
let cfg = path_cfg.relay_selection_config();
match avoid_target {
Some(ct) => RelayExclusion::exclude_channel_target_family(&cfg, ct, netdir),
None => RelayExclusion::no_relays_excluded(),
}
};
let found_usable_circ = {
let mut inner = self.inner.lock().expect("lock poisoned");
let restrictions = |circ: &HsCircStem<B::Tunnel>| {
match vanguard_mode {
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
VanguardMode::Lite | VanguardMode::Full => {
vanguards_circuit_compatible_with_target(
netdir,
circ,
stem_kind,
kind,
avoid_target,
)
}
VanguardMode::Disabled => {
circuit_compatible_with_target(netdir, circ, kind, &target_exclusion)
}
_ => {
warn!("unknown vanguard mode {vanguard_mode}");
false
}
}
};
let mut prefs = HsCircPrefs::default();
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
if matches!(vanguard_mode, VanguardMode::Full | VanguardMode::Lite) {
prefs.preferred_stem_kind(stem_kind);
}
let found_usable_circ =
inner
.pool
.take_one_where(&mut rand::rng(), restrictions, &prefs);
if inner.pool.very_low() || found_usable_circ.is_none() {
let handle = self.launcher_handle.get().ok_or_else(|| {
Error::from(bad_api_usage!("The circuit launcher wasn't initialized"))
})?;
handle.fire();
}
found_usable_circ
};
if let Some(circuit) = found_usable_circ {
let circuit = self
.maybe_extend_stem_circuit(netdir, circuit, avoid_target, stem_kind, kind)
.await?;
self.ensure_suitable_circuit(&circuit, avoid_target, stem_kind)?;
return Ok(circuit);
}
let circ = self
.circmgr
.launch_hs_unmanaged(avoid_target, netdir, stem_kind, Some(kind))
.await?;
self.ensure_suitable_circuit(&circ, avoid_target, stem_kind)?;
Ok(HsCircStem {
circ,
kind: stem_kind,
})
}
async fn maybe_extend_stem_circuit<T>(
&self,
netdir: &NetDir,
circuit: HsCircStem<B::Tunnel>,
avoid_target: Option<&T>,
stem_kind: HsCircStemKind,
circ_kind: HsCircKind,
) -> Result<HsCircStem<B::Tunnel>>
where
T: CircTarget + Sync,
{
match self.vanguard_mode() {
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
VanguardMode::Full => {
self.extend_full_vanguards_circuit(
netdir,
circuit,
avoid_target,
stem_kind,
circ_kind,
)
.await
}
_ => {
let HsCircStem { circ, kind: _ } = circuit;
Ok(HsCircStem {
circ,
kind: stem_kind,
})
}
}
}
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
async fn extend_full_vanguards_circuit<T>(
&self,
netdir: &NetDir,
circuit: HsCircStem<B::Tunnel>,
avoid_target: Option<&T>,
stem_kind: HsCircStemKind,
circ_kind: HsCircKind,
) -> Result<HsCircStem<B::Tunnel>>
where
T: CircTarget + Sync,
{
use crate::path::hspath::hs_stem_terminal_hop_usage;
use tor_relay_selection::RelaySelector;
match (circuit.kind, stem_kind) {
(HsCircStemKind::Naive, HsCircStemKind::Guarded) => {
debug!("Wanted GUARDED circuit, but got NAIVE; extending by 1 hop...");
let params = crate::build::onion_circparams_from_netparams(netdir.params())?;
let circ_path = circuit
.circ
.single_path()
.map_err(|error| Error::Protocol {
action: "extending full vanguards circuit",
peer: None, unique_id: Some(circuit.unique_id()),
error,
})?;
debug_assert_eq!(circ_path.hops().len(), 3);
let target_exclusion = if let Some(target) = &avoid_target {
RelayExclusion::exclude_identities(
target.identities().map(|id| id.to_owned()).collect(),
)
} else {
RelayExclusion::no_relays_excluded()
};
let selector = RelaySelector::new(
hs_stem_terminal_hop_usage(Some(circ_kind)),
target_exclusion,
);
let hops = circ_path
.iter()
.flat_map(|hop| hop.as_chan_target())
.map(IntoOwnedChanTarget::to_owned)
.collect::<Vec<OwnedChanTarget>>();
let extra_hop =
select_middle_for_vanguard_circ(&hops, netdir, &selector, &mut rand::rng())?;
let circ = self.extend_circ(circuit, params, extra_hop).await?;
Ok(HsCircStem {
circ,
kind: stem_kind,
})
}
(HsCircStemKind::Guarded, HsCircStemKind::Naive) => {
Err(internal!("wanted a NAIVE circuit, but got GUARDED?!").into())
}
_ => {
trace!("Wanted {stem_kind} circuit, got {}", circuit.kind);
Ok(circuit)
}
}
}
fn ensure_suitable_circuit<T>(
&self,
circ: &B::Tunnel,
target: Option<&T>,
kind: HsCircStemKind,
) -> Result<()>
where
T: CircTarget + Sync,
{
Self::ensure_circuit_can_extend_to_target(circ, target)?;
self.ensure_circuit_length_valid(circ, kind)?;
Ok(())
}
fn ensure_circuit_length_valid(&self, tunnel: &B::Tunnel, kind: HsCircStemKind) -> Result<()> {
let circ_path_len = tunnel.n_hops().map_err(|error| Error::Protocol {
action: "validating circuit length",
peer: None, unique_id: Some(tunnel.unique_id()),
error,
})?;
let mode = self.vanguard_mode();
let expected_len = kind.num_hops(mode)?;
if circ_path_len != expected_len {
return Err(internal!(
"invalid path length for {} {mode}-vanguard circuit (expected {} hops, got {})",
kind,
expected_len,
circ_path_len
)
.into());
}
Ok(())
}
fn ensure_circuit_can_extend_to_target<T>(tunnel: &B::Tunnel, target: Option<&T>) -> Result<()>
where
T: CircTarget + Sync,
{
if let Some(target) = target {
let take_n = 2;
if let Some(hop) = tunnel
.single_path()
.map_err(|error| Error::Protocol {
action: "validating circuit compatibility with target",
peer: None, unique_id: Some(tunnel.unique_id()),
error,
})?
.hops()
.iter()
.rev()
.take(take_n)
.flat_map(|hop| hop.as_chan_target())
.find(|hop| hop.has_any_relay_id_from(target))
{
return Err(internal!(
"invalid path: circuit target {} appears as one of the last 2 hops (matches hop {})",
target.display_relay_ids(),
hop.display_relay_ids()
).into());
}
}
Ok(())
}
fn remove_closed(&self) {
let mut inner = self.inner.lock().expect("lock poisoned");
inner.pool.retain(|circ| !circ.is_closing());
}
fn remove_unlisted(&self, netdir: &NetDir) {
let mut inner = self.inner.lock().expect("lock poisoned");
inner
.pool
.retain(|circ| circuit_still_useable(netdir, circ, |_relay| true, |_last_hop| true));
}
fn vanguard_mode(&self) -> VanguardMode {
cfg_if::cfg_if! {
if #[cfg(all(feature = "vanguards", feature = "hs-common"))] {
self
.circmgr
.mgr
.peek_builder()
.vanguardmgr()
.mode()
} else {
VanguardMode::Disabled
}
}
}
pub(crate) fn estimate_timeout(
&self,
timeout_action: &timeouts::Action,
) -> std::time::Duration {
self.circmgr.estimate_timeout(timeout_action)
}
}
fn circuit_compatible_with_target<C: AbstractTunnel>(
netdir: &NetDir,
circ: &HsCircStem<C>,
circ_kind: HsCircKind,
exclude_target: &RelayExclusion,
) -> bool {
let last_hop_usage = hs_stem_terminal_hop_usage(Some(circ_kind));
circuit_still_useable(
netdir,
circ,
|relay| exclude_target.low_level_predicate_permits_relay(relay),
|last_hop| last_hop_usage.low_level_predicate_permits_relay(last_hop),
)
}
fn vanguards_circuit_compatible_with_target<C: AbstractTunnel, T>(
netdir: &NetDir,
circ: &HsCircStem<C>,
kind: HsCircStemKind,
circ_kind: HsCircKind,
avoid_target: Option<&T>,
) -> bool
where
T: CircTarget + Sync,
{
if let Some(target) = avoid_target {
let Ok(circ_path) = circ.circ.single_path() else {
return false;
};
let take_n = 2;
if circ_path
.hops()
.iter()
.rev()
.take(take_n)
.flat_map(|hop| hop.as_chan_target())
.any(|hop| hop.has_any_relay_id_from(target))
{
return false;
}
}
let last_hop_usage = hs_stem_terminal_hop_usage(Some(circ_kind));
circ.can_become(kind)
&& circuit_still_useable(
netdir,
circ,
|_relay| true,
|last_hop| last_hop_usage.low_level_predicate_permits_relay(last_hop),
)
}
fn circuit_still_useable<C, F1, F2>(
netdir: &NetDir,
circ: &HsCircStem<C>,
relay_okay: F1,
last_hop_ok: F2,
) -> bool
where
C: AbstractTunnel,
F1: Fn(&Relay<'_>) -> bool,
F2: Fn(&Relay<'_>) -> bool,
{
let circ = &circ.circ;
if circ.is_closing() {
return false;
}
let Ok(path) = circ.single_path() else {
return false;
};
let last_hop = path.hops().last().expect("No hops in circuit?!");
match relay_for_path_ent(netdir, last_hop) {
Err(NoRelayForPathEnt::HopWasVirtual) => {}
Err(NoRelayForPathEnt::NoSuchRelay) => {
return false;
}
Ok(r) => {
if !last_hop_ok(&r) {
return false;
}
}
};
path.iter().all(|ent: &circuit::PathEntry| {
match relay_for_path_ent(netdir, ent) {
Err(NoRelayForPathEnt::HopWasVirtual) => {
true
}
Err(NoRelayForPathEnt::NoSuchRelay) => {
false
}
Ok(r) => {
relay_okay(&r)
}
}
})
}
#[derive(Clone, Debug)]
enum NoRelayForPathEnt {
HopWasVirtual,
NoSuchRelay,
}
fn relay_for_path_ent<'a>(
netdir: &'a NetDir,
ent: &circuit::PathEntry,
) -> StdResult<Relay<'a>, NoRelayForPathEnt> {
let Some(c) = ent.as_chan_target() else {
return Err(NoRelayForPathEnt::HopWasVirtual);
};
let Some(relay) = netdir.by_ids(c) else {
return Err(NoRelayForPathEnt::NoSuchRelay);
};
Ok(relay)
}
#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
async fn launch_hs_circuits_as_needed<B: AbstractTunnelBuilder<R> + 'static, R: Runtime>(
pool: Weak<HsCircPoolInner<B, R>>,
netdir_provider: Weak<dyn NetDirProvider + 'static>,
mut schedule: TaskSchedule<R>,
) {
const DELAY: Duration = Duration::from_secs(30);
while schedule.next().await.is_some() {
let (pool, provider) = match (pool.upgrade(), netdir_provider.upgrade()) {
(Some(x), Some(y)) => (x, y),
_ => {
break;
}
};
let now = pool.circmgr.mgr.peek_runtime().now();
pool.remove_closed();
let mut circs_to_launch = {
let mut inner = pool.inner.lock().expect("poisioned_lock");
inner.pool.update_target_size(now);
inner.pool.circs_to_launch()
};
let n_to_launch = circs_to_launch.n_to_launch();
let mut max_attempts = n_to_launch * 2;
if n_to_launch > 0 {
debug!(
"launching {} NAIVE and {} GUARDED circuits",
circs_to_launch.stem(),
circs_to_launch.guarded_stem()
);
}
'inner: while circs_to_launch.n_to_launch() > 0 {
max_attempts -= 1;
if max_attempts == 0 {
warn!("Too many preemptive onion service circuits failed; waiting a while.");
break 'inner;
}
if let Ok(netdir) = provider.netdir(tor_netdir::Timeliness::Timely) {
let no_target: Option<&OwnedCircTarget> = None;
let for_launch = circs_to_launch.for_launch();
match pool
.circmgr
.launch_hs_unmanaged(no_target, &netdir, for_launch.kind(), None)
.await
{
Ok(circ) => {
let kind = for_launch.kind();
let circ = HsCircStem { circ, kind };
pool.inner.lock().expect("poisoned lock").pool.insert(circ);
trace!("successfully launched {kind} circuit");
for_launch.note_circ_launched();
}
Err(err) => {
debug_report!(err, "Unable to build preemptive circuit for onion services");
}
}
} else {
break 'inner;
}
}
schedule.fire_in(DELAY);
}
}
async fn remove_unusable_circuits<B: AbstractTunnelBuilder<R> + 'static, R: Runtime>(
pool: Weak<HsCircPoolInner<B, R>>,
netdir_provider: Weak<dyn NetDirProvider + 'static>,
) {
let mut event_stream = match netdir_provider.upgrade() {
Some(nd) => nd.events(),
None => return,
};
while event_stream.next().await.is_some() {
let (pool, provider) = match (pool.upgrade(), netdir_provider.upgrade()) {
(Some(x), Some(y)) => (x, y),
_ => {
break;
}
};
pool.remove_closed();
if let Ok(netdir) = provider.netdir(tor_netdir::Timeliness::Timely) {
pool.remove_unlisted(&netdir);
}
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
#![allow(clippy::cognitive_complexity)]
use tor_config::ExplicitOrAuto;
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
use tor_guardmgr::VanguardConfigBuilder;
use tor_guardmgr::VanguardMode;
use tor_memquota::ArcMemoryQuotaTrackerExt as _;
use tor_proto::memquota::ToplevelAccount;
use tor_rtmock::MockRuntime;
use super::*;
use crate::{CircMgrInner, TestConfig};
fn circmgr_with_vanguards<R: Runtime>(
runtime: R,
mode: VanguardMode,
) -> Arc<CircMgrInner<crate::build::TunnelBuilder<R>, R>> {
let chanmgr = tor_chanmgr::ChanMgr::new(
runtime.clone(),
&Default::default(),
tor_chanmgr::Dormancy::Dormant,
&Default::default(),
ToplevelAccount::new_noop(),
None,
);
let guardmgr = tor_guardmgr::GuardMgr::new(
runtime.clone(),
tor_persist::TestingStateMgr::new(),
&tor_guardmgr::TestConfig::default(),
)
.unwrap();
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
let vanguard_config = VanguardConfigBuilder::default()
.mode(ExplicitOrAuto::Explicit(mode))
.build()
.unwrap();
let config = TestConfig {
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
vanguard_config,
..Default::default()
};
CircMgrInner::new(
&config,
tor_persist::TestingStateMgr::new(),
&runtime,
Arc::new(chanmgr),
&guardmgr,
)
.unwrap()
.into()
}
#[test]
fn pool_with_vanguards_disabled() {
MockRuntime::test_with_various(|runtime| async move {
let circmgr = circmgr_with_vanguards(runtime, VanguardMode::Disabled);
let circpool = HsCircPoolInner::new_internal(&circmgr);
assert!(circpool.vanguard_mode() == VanguardMode::Disabled);
});
}
#[test]
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
fn pool_with_vanguards_enabled() {
MockRuntime::test_with_various(|runtime| async move {
for mode in [VanguardMode::Lite, VanguardMode::Full] {
let circmgr = circmgr_with_vanguards(runtime.clone(), mode);
let circpool = HsCircPoolInner::new_internal(&circmgr);
assert!(circpool.vanguard_mode() == mode);
}
});
}
}