use std::time::Duration;
use super::AbstractChannelFactory;
use super::{AbstractChannel, Pending, Sending, select};
use crate::{ChannelConfig, Dormancy, Error, Result};
use futures::FutureExt;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tor_async_utils::oneshot;
use tor_basic_utils::RngExt as _;
use tor_cell::chancell::msg::PaddingNegotiate;
use tor_config::PaddingLevel;
use tor_error::{error_report, internal, into_internal};
use tor_linkspec::{HasRelayIds, ListByRelayIds, RelayIds};
use tor_netdir::{params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND, params::NetParameters};
use tor_proto::ChannelPaddingInstructions;
use tor_proto::channel::ChannelPaddingInstructionsUpdates;
use tor_proto::channel::kist::{KistMode, KistParams};
use tor_proto::channel::padding::Parameters as PaddingParameters;
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
use tor_units::{BoundedInt32, IntegerMilliseconds};
use tracing::{info, instrument};
use void::{ResultVoidExt as _, Void};
#[cfg(test)]
mod padding_test;
pub(crate) struct MgrState<C: AbstractChannelFactory> {
inner: std::sync::Mutex<Inner<C>>,
}
struct ChannelParams {
padding: ChannelPaddingInstructions,
kist: KistParams,
}
struct Inner<C: AbstractChannelFactory> {
builder: C,
channels: ListByRelayIds<ChannelState<C::Channel>>,
channels_params: ChannelParams,
config: ChannelConfig,
dormancy: Dormancy,
}
pub(crate) enum ChannelState<C> {
Open(OpenEntry<C>),
Building(PendingEntry),
}
#[derive(Clone)]
pub(crate) struct OpenEntry<C> {
pub(crate) channel: Arc<C>,
pub(crate) max_unused_duration: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct UniqPendingChanId(u64);
impl UniqPendingChanId {
pub(crate) fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
assert!(id != u64::MAX, "Exhausted the pending channel ID namespace");
Self(id)
}
}
impl std::fmt::Display for UniqPendingChanId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PendingChan {}", self.0)
}
}
#[derive(Clone)]
pub(crate) struct PendingEntry {
pub(crate) ids: RelayIds,
pub(crate) pending: Pending,
pub(crate) unique_id: UniqPendingChanId,
}
impl<C> HasRelayIds for ChannelState<C>
where
C: HasRelayIds,
{
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match self {
ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
}
}
}
impl<C: Clone> ChannelState<C> {
#[cfg(test)]
fn unwrap_open(&self) -> &C {
match self {
ChannelState::Open(ent) => &ent.channel,
_ => panic!("Not an open channel"),
}
}
}
type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
#[derive(Debug, Clone)]
struct NetParamsExtract {
nf_ito: [[NfIto; 2]; 2],
kist: KistParams,
}
impl From<&NetParameters> for NetParamsExtract {
fn from(p: &NetParameters) -> Self {
let kist_enabled = kist_mode_from_net_parameter(p.kist_enabled);
let tcp_notsent_lowat = u32::from(p.kist_tcp_notsent_lowat);
let kist = KistParams::new(kist_enabled, tcp_notsent_lowat);
NetParamsExtract {
nf_ito: [
[p.nf_ito_low, p.nf_ito_high],
[p.nf_ito_low_reduced, p.nf_ito_high_reduced],
],
kist,
}
}
}
fn kist_mode_from_net_parameter(val: BoundedInt32<0, 1>) -> KistMode {
caret::caret_int! {
struct KistType(i32) {
DISABLED = 0,
TCP_NOTSENT_LOWAT = 1,
}
}
match val.get().into() {
KistType::DISABLED => KistMode::Disabled,
KistType::TCP_NOTSENT_LOWAT => KistMode::TcpNotSentLowat,
_ => unreachable!("BoundedInt32 was not bounded?!"),
}
}
impl NetParamsExtract {
fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
self.pad_get(reduced, 0)
}
fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
self.pad_get(reduced, 1)
}
fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
self.nf_ito[usize::from(reduced)][low_or_high]
.try_map(|v| Ok::<_, Void>(v.into()))
.void_unwrap()
}
}
impl<C: AbstractChannel> ChannelState<C> {
fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
let ChannelState::Open(ent) = self else {
return false;
};
let Some(unused_duration) = ent.channel.duration_unused() else {
return false;
};
let max_unused_duration = ent.max_unused_duration;
let Some(remaining) = max_unused_duration.checked_sub(unused_duration) else {
return true;
};
if remaining.is_zero() {
return true;
}
*expire_after = std::cmp::min(*expire_after, remaining);
false
}
}
impl<C: AbstractChannelFactory> MgrState<C> {
pub(crate) fn new(
builder: C,
config: ChannelConfig,
dormancy: Dormancy,
netparams: &NetParameters,
) -> Self {
let mut padding_params = ChannelPaddingInstructions::default();
let netparams = NetParamsExtract::from(netparams);
let kist_params = netparams.kist;
let update = parameterize(&mut padding_params, &config, dormancy, &netparams)
.unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
let _: Option<_> = update;
let channels_params = ChannelParams {
padding: padding_params,
kist: kist_params,
};
MgrState {
inner: std::sync::Mutex::new(Inner {
builder,
channels: ListByRelayIds::new(),
config,
channels_params,
dormancy,
}),
}
}
#[cfg(test)]
pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&mut ListByRelayIds<ChannelState<C::Channel>>) -> T,
{
let mut inner = self.inner.lock()?;
Ok(func(&mut inner.channels))
}
pub(crate) fn builder(&self) -> C
where
C: Clone,
{
let inner = self.inner.lock().expect("lock poisoned");
inner.builder.clone()
}
#[allow(unused)]
pub(crate) fn with_mut_builder<F>(&self, func: F)
where
F: FnOnce(&mut C),
{
let mut inner = self.inner.lock().expect("lock poisoned");
func(&mut inner.builder);
}
#[cfg(feature = "relay")]
pub(crate) fn add_open(&self, channel: Arc<C::Channel>) -> Result<()> {
let mut inner = self.inner.lock()?;
inner.channels.insert(ChannelState::Open(OpenEntry {
channel,
max_unused_duration: Self::random_max_unused_duration(),
}));
Ok(())
}
#[cfg(test)]
pub(crate) fn remove_unusable(&self) -> Result<()> {
let mut inner = self.inner.lock()?;
inner.channels.retain(|state| match state {
ChannelState::Open(ent) => ent.channel.is_usable(),
ChannelState::Building(_) => true,
});
Ok(())
}
pub(crate) fn request_channel(
&self,
target: &C::BuildSpec,
add_new_entry_if_not_found: bool,
) -> Result<Option<ChannelForTarget<C>>> {
use ChannelState::*;
let mut inner = self.inner.lock()?;
let open_channels = inner
.channels
.by_all_ids(target)
.filter(|entry| match entry {
Open(x) => select::open_channel_is_allowed(x, target),
Building(_) => false,
});
let pending_channels = inner
.channels
.all_subset(target)
.into_iter()
.filter(|entry| match entry {
Open(_) => false,
Building(x) => select::pending_channel_maybe_allowed(x, target),
});
match select::choose_best_channel(open_channels.chain(pending_channels), target) {
Some(Open(OpenEntry { channel, .. })) => {
return Ok(Some(ChannelForTarget::Open(Arc::clone(channel))));
}
Some(Building(PendingEntry { pending, .. })) => {
return Ok(Some(ChannelForTarget::Pending(pending.clone())));
}
None => {}
}
if inner
.channels
.all_overlapping(target)
.into_iter()
.filter(|entry| !entry.has_all_relay_ids_from(target))
.any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
{
return Err(Error::IdentityConflict);
}
if !add_new_entry_if_not_found {
return Ok(None);
}
let any_relay_id = target
.identities()
.next()
.ok_or(internal!("relay target had no id"))?
.to_owned();
let (new_state, send, unique_id) = setup_launch(RelayIds::from_relay_ids(target));
inner
.channels
.try_insert(ChannelState::Building(new_state))?;
let handle = PendingChannelHandle::new(any_relay_id, unique_id);
Ok(Some(ChannelForTarget::NewEntry((handle, send))))
}
pub(crate) fn remove_pending_channel(&self, handle: PendingChannelHandle) -> Result<()> {
let mut inner = self.inner.lock()?;
remove_pending(&mut inner.channels, handle);
Ok(())
}
#[instrument(skip_all, level = "trace")]
pub(crate) fn upgrade_pending_channel_to_open(
&self,
handle: PendingChannelHandle,
channel: Arc<C::Channel>,
) -> Result<()> {
let mut inner = self.inner.lock()?;
remove_pending(&mut inner.channels, handle);
let update = inner.channels_params.padding.initial_update();
if let Some(update) = update {
channel
.reparameterize(update.into())
.map_err(|_| internal!("failure on new channel"))?;
}
let new_entry = ChannelState::Open(OpenEntry {
channel,
max_unused_duration: Self::random_max_unused_duration(),
});
inner.channels.insert(new_entry);
Ok(())
}
pub(super) fn reconfigure_general(
&self,
new_config: Option<&ChannelConfig>,
new_dormancy: Option<Dormancy>,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
use ChannelState as CS;
let netdir = {
let extract = NetParamsExtract::from((*netparams).as_ref());
drop(netparams);
extract
};
let mut inner = self
.inner
.lock()
.map_err(|_| internal!("poisoned channel manager"))?;
let inner = &mut *inner;
if let Some(new_config) = new_config {
inner.config = new_config.clone();
}
if let Some(new_dormancy) = new_dormancy {
inner.dormancy = new_dormancy;
}
let update = parameterize(
&mut inner.channels_params.padding,
&inner.config,
inner.dormancy,
&netdir,
)?;
let update = update.map(Arc::new);
let new_kist_params = netdir.kist;
let kist_params = if new_kist_params != inner.channels_params.kist {
inner.channels_params.kist = new_kist_params;
Some(new_kist_params)
} else {
None
};
if update.is_none() && kist_params.is_none() {
return Ok(());
}
for channel in inner.channels.values() {
let channel = match channel {
CS::Open(OpenEntry { channel, .. }) => channel,
CS::Building(_) => continue,
};
if let Some(ref update) = update {
let _ = channel.reparameterize(Arc::clone(update));
}
if let Some(kist) = kist_params {
let _ = channel.reparameterize_kist(kist);
}
}
Ok(())
}
pub(crate) fn expire_channels(&self) -> Duration {
let mut ret = Duration::from_secs(180);
self.inner
.lock()
.expect("Poisoned lock")
.channels
.retain(|chan| !chan.ready_to_expire(&mut ret));
ret
}
fn random_max_unused_duration() -> Duration {
Duration::from_secs(
rand::rng()
.gen_range_checked(180..270)
.expect("not 180 < 270 !"),
)
}
}
pub(crate) enum ChannelForTarget<CF: AbstractChannelFactory> {
Open(Arc<CF::Channel>),
Pending(Pending),
NewEntry((PendingChannelHandle, Sending)),
}
pub(crate) struct PendingChannelHandle {
relay_id: tor_linkspec::RelayId,
unique_id: UniqPendingChanId,
chan_has_been_removed: bool,
}
impl PendingChannelHandle {
fn new(relay_id: tor_linkspec::RelayId, unique_id: UniqPendingChanId) -> Self {
Self {
relay_id,
unique_id,
chan_has_been_removed: false,
}
}
fn chan_has_been_removed(mut self) {
self.chan_has_been_removed = true;
}
}
impl std::ops::Drop for PendingChannelHandle {
fn drop(&mut self) {
if !self.chan_has_been_removed {
#[allow(clippy::missing_docs_in_private_items)]
const MSG: &str = "Dropped the 'PendingChannelHandle' without removing the channel";
error_report!(
internal!("{MSG}"),
"'PendingChannelHandle' dropped unexpectedly",
);
}
}
}
fn setup_launch(ids: RelayIds) -> (PendingEntry, Sending, UniqPendingChanId) {
let (snd, rcv) = oneshot::channel();
let pending = rcv.shared();
let unique_id = UniqPendingChanId::new();
let entry = PendingEntry {
ids,
pending,
unique_id,
};
(entry, snd, unique_id)
}
fn remove_pending<C: AbstractChannel>(
channel_map: &mut tor_linkspec::ListByRelayIds<ChannelState<C>>,
handle: PendingChannelHandle,
) {
let removed = channel_map.remove_by_id(&handle.relay_id, |c| {
let ChannelState::Building(c) = c else {
return false;
};
c.unique_id == handle.unique_id
});
debug_assert_eq!(removed.len(), 1, "expected to remove exactly one channel");
handle.chan_has_been_removed();
}
fn parameterize(
channels_params: &mut ChannelPaddingInstructions,
config: &ChannelConfig,
dormancy: Dormancy,
netdir: &NetParamsExtract,
) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
let padding_of_level = |level| padding_parameters(level, netdir);
let send_padding = padding_of_level(config.padding)?;
let padding_default = padding_of_level(PaddingLevel::default())?;
let send_padding = match dormancy {
Dormancy::Active => send_padding,
Dormancy::Dormant => None,
};
let recv_padding = match config.padding {
PaddingLevel::Reduced => None,
PaddingLevel::Normal => send_padding,
PaddingLevel::None => None,
};
let recv_equals_default = recv_padding == padding_default;
let padding_negotiate = if recv_equals_default {
PaddingNegotiate::start_default()
} else {
match recv_padding {
None => PaddingNegotiate::stop(),
Some(params) => params.padding_negotiate_cell()?,
}
};
let mut update = channels_params
.start_update()
.padding_enable(send_padding.is_some())
.padding_negotiate(padding_negotiate);
if let Some(params) = send_padding {
update = update.padding_parameters(params);
}
let update = update.finish();
Ok(update)
}
fn padding_parameters(
config: PaddingLevel,
netdir: &NetParamsExtract,
) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
let reduced = match config {
PaddingLevel::Reduced => true,
PaddingLevel::Normal => false,
PaddingLevel::None => return Ok(None),
};
padding_parameters_builder(reduced, netdir)
.unwrap_or_else(|e: &str| {
info!(
"consensus channel padding parameters wrong, using defaults: {}",
&e,
);
Some(PaddingParametersBuilder::default())
})
.map(|p| {
p.build()
.map_err(into_internal!("failed to build padding parameters"))
})
.transpose()
}
fn padding_parameters_builder(
reduced: bool,
netdir: &NetParamsExtract,
) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
let mut p = PaddingParametersBuilder::default();
let low = netdir.pad_low(reduced);
let high = netdir.pad_high(reduced);
if low > high {
return Err("low > high");
}
if low.as_millis() == 0 && high.as_millis() == 0 {
return Ok(None);
}
p.low(low);
p.high(high);
Ok::<_, &'static str>(Some(p))
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use crate::factory::BootstrapReporter;
use async_trait::async_trait;
#[cfg(feature = "relay")]
use safelog::Sensitive;
use std::sync::{Arc, Mutex};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
use tor_proto::memquota::ChannelAccount;
fn new_test_state() -> MgrState<FakeChannelFactory> {
MgrState::new(
FakeChannelFactory::default(),
ChannelConfig::default(),
Default::default(),
&Default::default(),
)
}
#[derive(Clone, Debug, Default)]
struct FakeChannelFactory {}
#[allow(clippy::diverging_sub_expression)] #[async_trait]
impl AbstractChannelFactory for FakeChannelFactory {
type Channel = FakeChannel;
type BuildSpec = tor_linkspec::OwnedChanTarget;
type Stream = ();
async fn build_channel(
&self,
_target: &Self::BuildSpec,
_reporter: BootstrapReporter,
_memquota: ChannelAccount,
) -> Result<Arc<FakeChannel>> {
unimplemented!()
}
#[cfg(feature = "relay")]
async fn build_channel_using_incoming(
&self,
_peer: Sensitive<std::net::SocketAddr>,
_stream: Self::Stream,
_memquota: ChannelAccount,
) -> Result<Arc<Self::Channel>> {
unimplemented!()
}
}
#[derive(Clone, Debug)]
struct FakeChannel {
ed_ident: Ed25519Identity,
usable: bool,
unused_duration: Option<u64>,
params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
}
impl AbstractChannel for FakeChannel {
fn is_canonical(&self) -> bool {
unimplemented!()
}
fn is_canonical_to_peer(&self) -> bool {
unimplemented!()
}
fn is_usable(&self) -> bool {
self.usable
}
fn duration_unused(&self) -> Option<Duration> {
self.unused_duration.map(Duration::from_secs)
}
fn reparameterize(
&self,
update: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
*self.params_update.lock().unwrap() = Some(update);
Ok(())
}
fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
Ok(())
}
fn engage_padding_activities(&self) {}
}
impl tor_linkspec::HasRelayIds for FakeChannel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
_ => None,
}
}
}
fn str_to_ed(s: &str) -> Ed25519Identity {
let byte = s.as_bytes()[0];
[byte; 32].into()
}
fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ed_ident: str_to_ed(ident),
usable: true,
unused_duration: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel: Arc::new(channel),
max_unused_duration: Duration::from_secs(180),
})
}
fn ch_with_details(
ident: &'static str,
max_unused_duration: Duration,
unused_duration: Option<u64>,
) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ed_ident: str_to_ed(ident),
usable: true,
unused_duration,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel: Arc::new(channel),
max_unused_duration,
})
}
fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ed_ident: str_to_ed(ident),
usable: false,
unused_duration: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel: Arc::new(channel),
max_unused_duration: Duration::from_secs(180),
})
}
#[test]
fn rmv_unusable() -> Result<()> {
let map = new_test_state();
map.with_channels(|map| {
map.insert(closed("machen"));
map.insert(closed("wir"));
map.insert(ch("wir"));
map.insert(ch("feinen"));
map.insert(ch("Fug"));
map.insert(ch("Fug"));
})?;
map.remove_unusable().unwrap();
map.with_channels(|map| {
assert_eq!(map.by_id(&str_to_ed("m")).len(), 0);
assert_eq!(map.by_id(&str_to_ed("w")).len(), 1);
assert_eq!(map.by_id(&str_to_ed("f")).len(), 1);
assert_eq!(map.by_id(&str_to_ed("F")).len(), 2);
})?;
Ok(())
}
#[test]
fn reparameterize_via_netdir() -> Result<()> {
let map = new_test_state();
let _ = map
.inner
.lock()
.unwrap()
.channels_params
.padding
.start_update()
.padding_parameters(
PaddingParametersBuilder::default()
.low(1234.into())
.build()
.unwrap(),
)
.finish();
map.with_channels(|map| {
map.insert(ch("track"));
})?;
let netdir = tor_netdir::testnet::construct_netdir()
.unwrap_if_sufficient()
.unwrap();
let netdir = Arc::new(netdir);
let with_ch = |f: &dyn Fn(&FakeChannel)| {
let inner = map.inner.lock().unwrap();
let mut ch = inner.channels.by_ed25519(&str_to_ed("t"));
let ch = ch.next().unwrap().unwrap_open();
f(ch);
};
eprintln!("-- process a default netdir, which should send an update --");
map.reconfigure_general(None, None, netdir.clone()).unwrap();
with_ch(&|ch| {
assert_eq!(
format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
"ChannelPaddingInstructionsUpdates { padding_enable: None, \
padding_parameters: Some(Parameters { \
low: IntegerMilliseconds { value: 1500 }, \
high: IntegerMilliseconds { value: 9500 } }), \
padding_negotiate: None }"
);
});
eprintln!();
eprintln!("-- process a default netdir again, which should *not* send an update --");
map.reconfigure_general(None, None, netdir).unwrap();
with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
Ok(())
}
#[test]
fn expire_channels() -> Result<()> {
let map = new_test_state();
map.with_channels(|map| {
map.insert(ch_with_details(
"wello",
Duration::from_secs(180),
Some(181),
));
})?;
assert_eq!(180, map.expire_channels().as_secs());
map.with_channels(|map| {
assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 0);
})?;
let map = new_test_state();
map.with_channels(|map| {
map.insert(ch_with_details(
"wello",
Duration::from_secs(180),
Some(120),
));
map.insert(ch_with_details(
"yello",
Duration::from_secs(180),
Some(170),
));
map.insert(ch_with_details(
"gello",
Duration::from_secs(180),
Some(181),
));
map.insert(closed("hello"));
})?;
assert_eq!(10, map.expire_channels().as_secs());
map.with_channels(|map| {
assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 1);
assert_eq!(map.by_ed25519(&str_to_ed("y")).len(), 1);
assert_eq!(map.by_ed25519(&str_to_ed("h")).len(), 1);
assert_eq!(map.by_ed25519(&str_to_ed("g")).len(), 0);
})?;
Ok(())
}
}