use std::time::Duration;
use super::AbstractChannelFactory;
use super::{AbstractChannel, Pending};
use crate::{ChannelConfig, Dormancy, Result};
use std::result::Result as StdResult;
use std::sync::Arc;
use tor_cell::chancell::msg::PaddingNegotiate;
use tor_config::PaddingLevel;
use tor_error::{internal, into_internal};
use tor_linkspec::ByRelayIds;
use tor_linkspec::HasRelayIds;
use tor_linkspec::RelayIds;
use tor_netdir::{params::NetParameters, params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND};
use tor_proto::channel::padding::Parameters as PaddingParameters;
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
use tor_proto::channel::ChannelPaddingInstructionsUpdates;
use tor_proto::ChannelPaddingInstructions;
use tor_units::{BoundedInt32, IntegerMilliseconds};
use tracing::info;
use void::{ResultVoidExt as _, Void};
#[cfg(test)]
mod padding_test;
pub(crate) struct MgrState<C: AbstractChannelFactory> {
inner: std::sync::Mutex<Inner<C>>,
}
struct Inner<C: AbstractChannelFactory> {
builder: C,
channels: ByRelayIds<ChannelState<C::Channel>>,
channels_params: ChannelPaddingInstructions,
config: ChannelConfig,
dormancy: Dormancy,
}
pub(crate) enum ChannelState<C> {
Open(OpenEntry<C>),
Building(PendingEntry),
}
#[derive(Clone)]
pub(crate) struct OpenEntry<C> {
pub(crate) channel: C,
pub(crate) max_unused_duration: Duration,
}
#[derive(Clone)]
pub(crate) struct PendingEntry {
pub(crate) ids: RelayIds,
pub(crate) pending: Pending,
}
impl<C> HasRelayIds for ChannelState<C>
where
C: HasRelayIds,
{
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match self {
ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
}
}
}
impl<C: Clone> ChannelState<C> {
#[cfg(test)]
fn unwrap_open(&self) -> &C {
match self {
ChannelState::Open(ent) => &ent.channel,
_ => panic!("Not an open channel"),
}
}
}
type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
#[derive(Debug, Clone)]
struct NetParamsExtract {
nf_ito: [[NfIto; 2]; 2],
}
impl From<&NetParameters> for NetParamsExtract {
fn from(p: &NetParameters) -> Self {
NetParamsExtract {
nf_ito: [
[p.nf_ito_low, p.nf_ito_high],
[p.nf_ito_low_reduced, p.nf_ito_high_reduced],
],
}
}
}
impl NetParamsExtract {
fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
self.pad_get(reduced, 0)
}
fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
self.pad_get(reduced, 1)
}
fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
self.nf_ito[usize::from(reduced)][low_or_high]
.try_map(|v| Ok::<_, Void>(v.into()))
.void_unwrap()
}
}
impl<C: AbstractChannel> ChannelState<C> {
fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
if let ChannelState::Open(ent) = self {
let unused_duration = ent.channel.duration_unused();
if let Some(unused_duration) = unused_duration {
let max_unused_duration = ent.max_unused_duration;
if let Some(remaining) = max_unused_duration.checked_sub(unused_duration) {
*expire_after = std::cmp::min(*expire_after, remaining);
false
} else {
true
}
} else {
false
}
} else {
false
}
}
}
impl<C: AbstractChannelFactory> MgrState<C> {
pub(crate) fn new(
builder: C,
config: ChannelConfig,
dormancy: Dormancy,
netparams: &NetParameters,
) -> Self {
let mut channels_params = ChannelPaddingInstructions::default();
let netparams = NetParamsExtract::from(netparams);
let update = parameterize(&mut channels_params, &config, dormancy, &netparams)
.unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
let _: Option<_> = update;
MgrState {
inner: std::sync::Mutex::new(Inner {
builder,
channels: ByRelayIds::new(),
config,
channels_params,
dormancy,
}),
}
}
pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&mut ByRelayIds<ChannelState<C::Channel>>) -> T,
{
let mut inner = self.inner.lock()?;
Ok(func(&mut inner.channels))
}
pub(crate) fn builder(&self) -> C
where
C: Clone,
{
let inner = self.inner.lock().expect("lock poisoned");
inner.builder.clone()
}
#[allow(dead_code)]
pub(crate) fn with_mut_builder<F>(&self, func: F)
where
F: FnOnce(&mut C),
{
let mut inner = self.inner.lock().expect("lock poisoned");
func(&mut inner.builder);
}
pub(crate) fn with_channels_and_params<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&mut ByRelayIds<ChannelState<C::Channel>>, &ChannelPaddingInstructions) -> T,
{
let mut inner = self.inner.lock()?;
let Inner {
ref mut channels,
ref channels_params,
..
} = &mut *inner;
Ok(func(channels, channels_params))
}
#[cfg(test)]
pub(crate) fn remove_unusable(&self) -> Result<()> {
let mut inner = self.inner.lock()?;
inner.channels.retain(|state| match state {
ChannelState::Open(ent) => ent.channel.is_usable(),
ChannelState::Building(_) => true,
});
Ok(())
}
pub(super) fn reconfigure_general(
&self,
new_config: Option<&ChannelConfig>,
new_dormancy: Option<Dormancy>,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
use ChannelState as CS;
let netdir = {
let extract = NetParamsExtract::from((*netparams).as_ref());
drop(netparams);
extract
};
let mut inner = self
.inner
.lock()
.map_err(|_| internal!("poisoned channel manager"))?;
let mut inner = &mut *inner;
if let Some(new_config) = new_config {
inner.config = new_config.clone();
}
if let Some(new_dormancy) = new_dormancy {
inner.dormancy = new_dormancy;
}
let update = parameterize(
&mut inner.channels_params,
&inner.config,
inner.dormancy,
&netdir,
)?;
let update = if let Some(u) = update {
u
} else {
return Ok(());
};
let update = Arc::new(update);
for channel in inner.channels.values() {
let channel = match channel {
CS::Open(OpenEntry { channel, .. }) => channel,
CS::Building(_) => continue,
};
let _ = channel.reparameterize(update.clone());
}
Ok(())
}
pub(crate) fn expire_channels(&self) -> Duration {
let mut ret = Duration::from_secs(180);
self.inner
.lock()
.expect("Poisoned lock")
.channels
.retain(|chan| !chan.ready_to_expire(&mut ret));
ret
}
}
fn parameterize(
channels_params: &mut ChannelPaddingInstructions,
config: &ChannelConfig,
dormancy: Dormancy,
netdir: &NetParamsExtract,
) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
let padding_of_level = |level| padding_parameters(level, netdir);
let send_padding = padding_of_level(config.padding)?;
let padding_default = padding_of_level(PaddingLevel::default())?;
let send_padding = match dormancy {
Dormancy::Active => send_padding,
Dormancy::Dormant => None,
};
let recv_padding = match config.padding {
PaddingLevel::Reduced => None,
PaddingLevel::Normal => send_padding,
PaddingLevel::None => None,
};
let recv_equals_default = recv_padding == padding_default;
let padding_negotiate = if recv_equals_default {
PaddingNegotiate::start_default()
} else {
match recv_padding {
None => PaddingNegotiate::stop(),
Some(params) => params.padding_negotiate_cell()?,
}
};
let mut update = channels_params
.start_update()
.padding_enable(send_padding.is_some())
.padding_negotiate(padding_negotiate);
if let Some(params) = send_padding {
update = update.padding_parameters(params);
}
let update = update.finish();
Ok(update)
}
fn padding_parameters(
config: PaddingLevel,
netdir: &NetParamsExtract,
) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
let reduced = match config {
PaddingLevel::Reduced => true,
PaddingLevel::Normal => false,
PaddingLevel::None => return Ok(None),
};
padding_parameters_builder(reduced, netdir)
.unwrap_or_else(|e| {
info!(
"consensus channel padding parameters wrong, using defaults: {}",
&e
);
Some(PaddingParametersBuilder::default())
})
.map(|p| {
p.build()
.map_err(into_internal!("failed to build padding parameters"))
})
.transpose()
}
fn padding_parameters_builder(
reduced: bool,
netdir: &NetParamsExtract,
) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
let mut p = PaddingParametersBuilder::default();
let low = netdir.pad_low(reduced);
let high = netdir.pad_high(reduced);
if low > high {
return Err("low > high");
}
if low.as_millis() == 0 && high.as_millis() == 0 {
return Ok(None);
}
p.low(low);
p.high(high);
Ok::<_, &'static str>(Some(p))
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
use super::*;
use crate::factory::BootstrapReporter;
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
fn new_test_state() -> MgrState<FakeChannelFactory> {
MgrState::new(
FakeChannelFactory::default(),
ChannelConfig::default(),
Default::default(),
&Default::default(),
)
}
#[derive(Clone, Debug, Default)]
struct FakeChannelFactory;
#[async_trait]
impl AbstractChannelFactory for FakeChannelFactory {
type Channel = FakeChannel;
type BuildSpec = tor_linkspec::OwnedChanTarget;
async fn build_channel(
&self,
_target: &Self::BuildSpec,
_reporter: BootstrapReporter,
) -> Result<FakeChannel> {
unimplemented!()
}
}
#[derive(Clone, Debug)]
struct FakeChannel {
ed_ident: Ed25519Identity,
usable: bool,
unused_duration: Option<u64>,
params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
}
impl AbstractChannel for FakeChannel {
fn is_usable(&self) -> bool {
self.usable
}
fn duration_unused(&self) -> Option<Duration> {
self.unused_duration.map(Duration::from_secs)
}
fn reparameterize(
&self,
update: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
*self.params_update.lock().unwrap() = Some(update);
Ok(())
}
fn engage_padding_activities(&self) {}
}
impl tor_linkspec::HasRelayIds for FakeChannel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
_ => None,
}
}
}
fn str_to_ed(s: &str) -> Ed25519Identity {
let byte = s.as_bytes()[0];
[byte; 32].into()
}
fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ed_ident: str_to_ed(ident),
usable: true,
unused_duration: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel,
max_unused_duration: Duration::from_secs(180),
})
}
fn ch_with_details(
ident: &'static str,
max_unused_duration: Duration,
unused_duration: Option<u64>,
) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ed_ident: str_to_ed(ident),
usable: true,
unused_duration,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel,
max_unused_duration,
})
}
fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ed_ident: str_to_ed(ident),
usable: false,
unused_duration: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel,
max_unused_duration: Duration::from_secs(180),
})
}
#[test]
fn rmv_unusable() -> Result<()> {
let map = new_test_state();
map.with_channels(|map| {
map.insert(closed("machen"));
map.insert(ch("feinen"));
map.insert(closed("wir"));
map.insert(ch("Fug"));
})?;
map.remove_unusable().unwrap();
map.with_channels(|map| {
assert!(map.by_id(&str_to_ed("m")).is_none());
assert!(map.by_id(&str_to_ed("w")).is_none());
assert!(map.by_id(&str_to_ed("f")).is_some());
assert!(map.by_id(&str_to_ed("F")).is_some());
})?;
Ok(())
}
#[test]
fn reparameterize_via_netdir() -> Result<()> {
let map = new_test_state();
let _ = map
.inner
.lock()
.unwrap()
.channels_params
.start_update()
.padding_parameters(
PaddingParametersBuilder::default()
.low(1234.into())
.build()
.unwrap(),
)
.finish();
map.with_channels(|map| {
map.insert(ch("track"));
})?;
let netdir = tor_netdir::testnet::construct_netdir()
.unwrap_if_sufficient()
.unwrap();
let netdir = Arc::new(netdir);
let with_ch = |f: &dyn Fn(&FakeChannel)| {
let inner = map.inner.lock().unwrap();
let ch = inner.channels.by_ed25519(&str_to_ed("t"));
let ch = ch.unwrap().unwrap_open();
f(ch);
};
eprintln!("-- process a default netdir, which should send an update --");
map.reconfigure_general(None, None, netdir.clone()).unwrap();
with_ch(&|ch| {
assert_eq!(
format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
"ChannelPaddingInstructionsUpdates { padding_enable: None, \
padding_parameters: Some(Parameters { \
low: IntegerMilliseconds { value: 1500 }, \
high: IntegerMilliseconds { value: 9500 } }), \
padding_negotiate: None }"
);
});
eprintln!();
eprintln!("-- process a default netdir again, which should *not* send an update --");
map.reconfigure_general(None, None, netdir).unwrap();
with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
Ok(())
}
#[test]
fn expire_channels() -> Result<()> {
let map = new_test_state();
map.with_channels(|map| {
map.insert(ch_with_details(
"wello",
Duration::from_secs(180),
Some(181),
))
})?;
assert_eq!(180, map.expire_channels().as_secs());
map.with_channels(|map| {
assert!(map.by_ed25519(&str_to_ed("w")).is_none());
})?;
let map = new_test_state();
map.with_channels(|map| {
map.insert(ch_with_details(
"wello",
Duration::from_secs(180),
Some(120),
));
map.insert(ch_with_details(
"yello",
Duration::from_secs(180),
Some(170),
));
map.insert(ch_with_details(
"gello",
Duration::from_secs(180),
Some(181),
));
map.insert(closed("hello"));
})?;
assert_eq!(10, map.expire_channels().as_secs());
map.with_channels(|map| {
assert!(map.by_ed25519(&str_to_ed("w")).is_some());
assert!(map.by_ed25519(&str_to_ed("y")).is_some());
assert!(map.by_ed25519(&str_to_ed("h")).is_some());
assert!(map.by_ed25519(&str_to_ed("g")).is_none());
})?;
Ok(())
}
}