#![cfg_attr(
all(feature = "circ-padding", not(feature = "circ-padding-manual")),
expect(dead_code)
)]
mod backend;
use std::collections::VecDeque;
use std::num::NonZeroU16;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use bitvec::BitArr;
use maybenot::MachineId;
use smallvec::SmallVec;
use tor_memquota::memory_cost_structural_copy;
use tor_rtcompat::{DynTimeProvider, SleepProvider};
use crate::HopNum;
use crate::circuit::HOPS;
use crate::util::err::ExcessPadding;
use backend::PaddingBackend;
type Instant = web_time_compat::Instant;
type Duration = std::time::Duration;
type PaddingEventQueue = VecDeque<PaddingEvent>;
type PerHopPaddingEventVec = Vec<PerHopPaddingEvent>;
#[derive(Clone, Debug, derive_builder::Builder)]
#[builder(build_fn(
validate = "Self::validate",
private,
error = "CircuitPadderConfigError"
))]
#[builder(name = "CircuitPadderConfig")]
#[cfg_attr(not(feature = "circ-padding-manual"), builder(private))]
#[cfg_attr(feature = "circ-padding-manual", builder(public))]
pub(crate) struct PaddingRules {
machines: Arc<[maybenot::Machine]>,
#[builder(default = "1.0")]
max_outbound_blocking_frac: f64,
#[builder(default = "1.0")]
max_outbound_padding_frac: f64,
#[builder(default = "1.0")]
max_inbound_padding_frac: f64,
#[builder(default = "1")]
enforce_inbound_padding_after_cells: u16,
}
#[derive(Clone, Debug, thiserror::Error)]
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
#[non_exhaustive]
pub(crate) enum CircuitPadderConfigError {
#[error("No value was given for {0}")]
UninitializedField(&'static str),
#[error("Value was out of range for {0}. (Must be between 0 and 1)")]
FractionOutOfRange(&'static str),
#[error("Maybenot could not initialize framework for rules")]
MaybenotError(#[from] maybenot::Error),
}
impl From<derive_builder::UninitializedFieldError> for CircuitPadderConfigError {
fn from(value: derive_builder::UninitializedFieldError) -> Self {
Self::UninitializedField(value.field_name())
}
}
impl CircuitPadderConfig {
fn validate(&self) -> Result<(), CircuitPadderConfigError> {
macro_rules! enforce_frac {
{ $field:ident } =>
{
if self.$field.is_some_and(|v| ! (0.0 .. 1.0).contains(&v)) {
return Err(CircuitPadderConfigError::FractionOutOfRange(stringify!($field)));
}
}
}
enforce_frac!(max_outbound_blocking_frac);
enforce_frac!(max_outbound_padding_frac);
enforce_frac!(max_inbound_padding_frac);
Ok(())
}
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
pub(crate) fn create_padder(&self) -> Result<CircuitPadder, CircuitPadderConfigError> {
let rules = self.build()?;
let backend = rules.create_padding_backend()?;
let initial_stats = rules.initialize_stats();
Ok(CircuitPadder {
initial_stats,
backend,
})
}
}
impl PaddingRules {
fn create_padding_backend(&self) -> Result<Box<dyn PaddingBackend>, maybenot::Error> {
const OPTIMIZE_FOR_N_MACHINES: usize = 4;
let backend =
backend::MaybenotPadder::<OPTIMIZE_FOR_N_MACHINES>::from_framework_rules(self)?;
Ok(Box::new(backend))
}
fn initialize_stats(&self) -> PaddingStats {
PaddingStats {
n_padding: 0,
n_normal: 0,
max_padding_frac: self.max_inbound_padding_frac as f32,
enforce_max_after: self
.enforce_inbound_padding_after_cells
.try_into()
.unwrap_or(1.try_into().expect("1 was not nonzero!?")),
}
}
}
#[derive(derive_more::Debug)]
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
pub(crate) struct CircuitPadder {
initial_stats: PaddingStats,
#[debug(skip)]
backend: Box<dyn PaddingBackend>,
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum PaddingEvent {
SendPadding(SendPadding),
StartBlocking(StartBlocking),
StopBlocking,
}
#[derive(Clone, Copy, Debug)]
enum PerHopPaddingEvent {
SendPadding {
machine: MachineId,
replace: Replace,
bypass: Bypass,
},
StartBlocking {
is_bypassable: bool,
},
StopBlocking,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum Replace {
Replaceable,
NotReplaceable,
}
impl Replace {
fn from_bool(replace: bool) -> Self {
match replace {
true => Replace::Replaceable,
false => Replace::NotReplaceable,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum Bypass {
BypassBlocking,
DoNotBypass,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct QueuedCellPaddingInfo {
pub(crate) target_hop: HopNum,
}
memory_cost_structural_copy!(QueuedCellPaddingInfo);
impl Bypass {
fn from_bool(replace: bool) -> Self {
match replace {
true => Bypass::BypassBlocking,
false => Bypass::DoNotBypass,
}
}
}
#[derive(Clone, Debug, Copy)]
pub(crate) struct SendPadding {
machine: maybenot::MachineId,
pub(crate) hop: HopNum,
pub(crate) replace: Replace,
pub(crate) bypass: Bypass,
}
impl SendPadding {
fn into_sent_event(self) -> maybenot::TriggerEvent {
maybenot::TriggerEvent::PaddingSent {
machine: self.machine,
}
}
pub(crate) fn may_replace_with_data(&self) -> Replace {
self.replace
}
pub(crate) fn may_bypass_block(&self) -> Bypass {
self.bypass
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct StartBlocking {
pub(crate) is_bypassable: bool,
}
const MAX_HOPS: usize = 64;
#[derive(Clone, derive_more::Debug)]
pub(crate) struct PaddingController<S = DynTimeProvider>
where
S: SleepProvider,
{
#[debug(skip)]
shared: Arc<Mutex<PaddingShared<S>>>,
}
struct PaddingShared<S: SleepProvider> {
runtime: S,
hops: SmallVec<[Option<Box<dyn PaddingBackend>>; HOPS]>,
stats: SmallVec<[Option<PaddingStats>; HOPS]>,
blocking: BlockingState,
next_scheduled_wakeup: Option<Instant>,
pending_events: PaddingEventQueue,
waker: Waker,
}
#[derive(Clone, Debug)]
struct PaddingStats {
n_padding: u64,
n_normal: u64,
max_padding_frac: f32,
enforce_max_after: NonZeroU16,
}
impl PaddingStats {
fn validate(&self) -> Result<(), ExcessPadding> {
let total = self.n_padding + self.n_normal;
if total >= u16::from(self.enforce_max_after).into() {
if self.n_padding as f32 > (total as f32 * self.max_padding_frac) {
return Err(ExcessPadding::PaddingExceedsLimit);
}
}
Ok(())
}
}
#[derive(Default)]
struct BlockingState {
hop_blocked: BitArr![for MAX_HOPS],
blocking_non_bypassable: BitArr![for MAX_HOPS],
}
impl BlockingState {
fn set_blocked(&mut self, idx: usize, is_bypassable: bool) {
self.hop_blocked.set(idx, true);
self.blocking_non_bypassable.set(idx, !is_bypassable);
}
fn set_unblocked(&mut self, idx: usize) {
self.hop_blocked.set(idx, false);
self.blocking_non_bypassable.set(idx, false);
}
fn blocking_update_paddingevent(&self) -> PaddingEvent {
if self.blocking_non_bypassable.any() {
PaddingEvent::StartBlocking(StartBlocking {
is_bypassable: false,
})
} else if self.hop_blocked.any() {
PaddingEvent::StartBlocking(StartBlocking {
is_bypassable: true,
})
} else {
PaddingEvent::StopBlocking
}
}
}
#[allow(clippy::unnecessary_wraps)]
impl<S: SleepProvider> PaddingController<S> {
pub(crate) fn queued_data(&self, hop: HopNum) -> Option<QueuedCellPaddingInfo> {
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.trigger_events(hop, &[maybenot::TriggerEvent::NormalSent]);
shared.info_for_hop(hop)
}
pub(crate) fn install_padder_padding_at_hop(&self, hop: HopNum, padder: Option<CircuitPadder>) {
self.shared
.lock()
.expect("lock poisoned")
.set_hop_backend(hop, padder);
}
pub(crate) fn queued_data_as_padding(
&self,
hop: HopNum,
sendpadding: SendPadding,
) -> Option<QueuedCellPaddingInfo> {
assert_eq!(hop, sendpadding.hop);
assert_eq!(Replace::Replaceable, sendpadding.replace);
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.trigger_events_mixed(
hop,
&[maybenot::TriggerEvent::NormalSent],
&[
maybenot::TriggerEvent::NormalSent,
sendpadding.into_sent_event(),
],
);
shared.info_for_hop(hop)
}
pub(crate) fn queued_padding(
&self,
hop: HopNum,
sendpadding: SendPadding,
) -> Option<QueuedCellPaddingInfo> {
assert_eq!(hop, sendpadding.hop);
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.trigger_events_mixed(
hop,
&[maybenot::TriggerEvent::NormalSent],
&[sendpadding.into_sent_event()],
);
shared.info_for_hop(hop)
}
pub(crate) fn replaceable_padding_already_queued(&self, hop: HopNum, sendpadding: SendPadding) {
assert_eq!(hop, sendpadding.hop);
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.trigger_events_mixed(
hop,
&[],
&[sendpadding.into_sent_event()],
);
}
pub(crate) fn flushed_relay_cell(&self, info: QueuedCellPaddingInfo) {
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.trigger_events(info.target_hop, &[maybenot::TriggerEvent::TunnelSent]);
}
pub(crate) fn flushed_channel_cell(&self) {
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.trigger_events(HopNum::from(0), &[maybenot::TriggerEvent::TunnelSent]);
}
pub(crate) fn decrypted_data(&self, hop: HopNum) {
let mut shared = self.shared.lock().expect("Lock poisoned");
shared.inc_normal_received(hop);
shared.trigger_events(
hop,
&[
maybenot::TriggerEvent::TunnelRecv,
maybenot::TriggerEvent::NormalRecv,
],
);
}
pub(crate) fn decrypted_padding(&self, hop: HopNum) -> Result<(), crate::Error> {
let mut shared = self.shared.lock().expect("Lock poisoned");
shared
.inc_padding_received(hop)
.map_err(|e| crate::Error::ExcessPadding(e, hop))?;
shared.trigger_events_mixed(
hop,
&[
maybenot::TriggerEvent::TunnelRecv,
maybenot::TriggerEvent::NormalRecv,
],
&[
maybenot::TriggerEvent::TunnelRecv,
maybenot::TriggerEvent::PaddingRecv,
],
);
Ok(())
}
}
impl<S: SleepProvider> PaddingShared<S> {
fn trigger_events(&mut self, hop: HopNum, events: &[maybenot::TriggerEvent]) {
let final_idx = usize::from(hop);
let now = self.runtime.now();
let next_scheduled_wakeup = self.next_scheduled_wakeup;
for hop_controller in self.hops.iter_mut().take(final_idx + 1) {
let Some(hop_controller) = hop_controller else {
continue;
};
hop_controller.report_events_at(events, now, next_scheduled_wakeup);
}
}
fn trigger_events_mixed(
&mut self,
hop: HopNum,
intermediate_hop_events: &[maybenot::TriggerEvent],
final_hop_events: &[maybenot::TriggerEvent],
) {
use itertools::Itertools as _;
use itertools::Position as P;
let final_idx = usize::from(hop);
let now = self.runtime.now();
let next_scheduled_wakeup = self.next_scheduled_wakeup;
for (position, hop_controller) in self.hops.iter_mut().take(final_idx + 1).with_position() {
let Some(hop_controller) = hop_controller else {
continue;
};
let events = match position {
P::First | P::Middle => intermediate_hop_events,
P::Last | P::Only => final_hop_events,
};
hop_controller.report_events_at(events, now, next_scheduled_wakeup);
}
}
fn inc_normal_received(&mut self, hop: HopNum) {
let final_idx = usize::from(hop);
for stats in self.stats.iter_mut().take(final_idx + 1).flatten() {
stats.n_normal += 1;
}
}
fn inc_padding_received(&mut self, hop: HopNum) -> Result<(), ExcessPadding> {
use itertools::Itertools as _;
use itertools::Position as P;
let final_idx = usize::from(hop);
for (position, stats) in self.stats.iter_mut().take(final_idx + 1).with_position() {
match (position, stats) {
(P::First | P::Middle, Some(stats)) => stats.n_normal += 1,
(P::First | P::Middle, None) => {}
(P::Last | P::Only, Some(stats)) => {
stats.n_padding += 1;
stats.validate()?;
}
(P::Last | P::Only, None) => {
return Err(ExcessPadding::NoPaddingNegotiated);
}
}
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn info_for_hop(&self, target_hop: HopNum) -> Option<QueuedCellPaddingInfo> {
Some(QueuedCellPaddingInfo { target_hop })
}
}
impl<S: SleepProvider> PaddingShared<S> {
fn set_hop_backend(&mut self, hop: HopNum, backend: Option<CircuitPadder>) {
let hop_idx: usize = hop.into();
assert!(hop_idx < MAX_HOPS);
let n_needed = hop_idx + 1;
while self.hops.len() < n_needed {
self.hops.push(None);
}
while self.stats.len() < n_needed {
self.stats.push(None);
}
let (hop_backend, stats) = if let Some(padder) = backend {
(Some(padder.backend), Some(padder.initial_stats))
} else {
(None, None)
};
self.hops[hop_idx] = hop_backend;
self.stats[hop_idx] = stats;
let was_blocked = self.blocking.hop_blocked[hop_idx];
self.blocking.set_unblocked(hop_idx);
if was_blocked {
self.pending_events
.push_back(self.blocking.blocking_update_paddingevent());
}
self.waker.wake_by_ref();
}
fn process_per_hop_event(
blocking: &mut BlockingState,
hop_idx: usize,
event: PerHopPaddingEvent,
) -> PaddingEvent {
use PaddingEvent as PE;
use PerHopPaddingEvent as PHPE;
match event {
PHPE::SendPadding {
machine,
replace,
bypass,
} => PE::SendPadding(SendPadding {
machine,
hop: hopnum_from_hop_idx(hop_idx),
replace,
bypass,
}),
PHPE::StartBlocking { is_bypassable } => {
blocking.set_blocked(hop_idx, is_bypassable);
blocking.blocking_update_paddingevent()
}
PHPE::StopBlocking => {
blocking.set_unblocked(hop_idx);
blocking.blocking_update_paddingevent()
}
}
}
fn take_padding_events_at(&mut self, now: Instant) -> PaddingEventQueue {
let mut output = PaddingEventQueue::default();
for (hop_idx, backend) in self.hops.iter_mut().enumerate() {
let Some(backend) = backend else {
continue;
};
let hop_events = backend.take_padding_events_at(now, self.next_scheduled_wakeup);
output.extend(
hop_events
.into_iter()
.map(|ev| Self::process_per_hop_event(&mut self.blocking, hop_idx, ev)),
);
}
output
}
fn schedule_next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
let next_expiration = self
.hops
.iter_mut()
.flatten()
.filter_map(|hop| hop.next_wakeup(waker))
.min();
self.next_scheduled_wakeup = next_expiration;
self.waker = waker.clone();
next_expiration
}
}
pub(crate) struct PaddingEventStream<S = DynTimeProvider>
where
S: SleepProvider,
{
shared: Arc<Mutex<PaddingShared<S>>>,
sleep_future: S::SleepFuture,
}
impl futures::Stream for PaddingEventStream {
type Item = PaddingEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let (now, next_wakeup, runtime) = {
let Self { shared, .. } = &mut *self;
let mut shared = shared.lock().expect("Poisoned lock");
if let Some(val) = shared.pending_events.pop_front() {
return Poll::Ready(Some(val));
}
let now = shared.runtime.now();
shared.pending_events = shared.take_padding_events_at(now);
if let Some(val) = shared.pending_events.pop_front() {
return Poll::Ready(Some(val));
}
(
now,
shared.schedule_next_wakeup(cx.waker()),
shared.runtime.clone(),
)
};
match next_wakeup {
None => {
return Poll::Pending;
}
Some(t) => {
self.sleep_future = runtime.sleep(t.saturating_duration_since(now));
match self.sleep_future.as_mut().poll(cx) {
Poll::Ready(()) => {
continue;
}
Poll::Pending => return Poll::Pending,
}
}
}
}
}
}
impl futures::stream::FusedStream for PaddingEventStream {
fn is_terminated(&self) -> bool {
false
}
}
fn hopnum_from_hop_idx(hop_idx: usize) -> HopNum {
const _: () = assert!(MAX_HOPS < u8::MAX as usize);
HopNum::from(u8::try_from(hop_idx).expect("hop_idx out of range!"))
}
pub(crate) fn new_padding<S>(runtime: S) -> (PaddingController<S>, PaddingEventStream<S>)
where
S: SleepProvider,
{
let sleep_future = runtime.sleep(Duration::new(86400, 0));
let shared = PaddingShared {
runtime,
hops: Default::default(),
stats: Default::default(),
blocking: Default::default(),
next_scheduled_wakeup: None,
pending_events: PaddingEventQueue::default(),
waker: Waker::noop().clone(),
};
let shared = Arc::new(Mutex::new(shared));
let controller = PaddingController {
shared: shared.clone(),
};
let stream = PaddingEventStream {
shared,
sleep_future,
};
(controller, stream)
}