#[cfg(test)]
#[path = "../../../tests/flow/decoy.rs"]
mod tests;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use async_trait::async_trait;
use log::{debug, info, warn};
use rand::Rng;
use rand::seq::SliceRandom;
use rand_distr::{Distribution, Exp, Normal};
use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
use crate::cache::DerivedValue;
use crate::flow::config::{FakeHeaderConfig, FieldType, FieldTypeHolder};
use crate::flow::error::FlowControllerError;
use crate::settings::Settings;
use crate::settings::keys::*;
use crate::tailer::{IdentityType, Tailer};
use crate::utils::random::get_rng;
use crate::utils::sync::{AsyncExecutor, RwLock, sleep};
use crate::utils::unix_timestamp_ms;
use crate::weighted_random;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum MaintenanceMode {
None,
Random,
Timed {
delay_ms: u64,
},
Sized {
length: usize,
},
Both {
delay_ms: u64,
length: usize,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum ReplicationMode {
None,
Maintenance,
All,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum SubheaderMode {
None,
Maintenance,
All,
}
pub(super) struct DecoyFeatureConfig {
pub(super) maintenance_mode: MaintenanceMode,
pub(super) replication_mode: ReplicationMode,
pub(super) replication_probability: f64,
pub(super) subheader_mode: SubheaderMode,
pub(super) subheader_config: Option<FakeHeaderConfig>,
}
impl DecoyFeatureConfig {
pub(super) fn random<AE: AsyncExecutor>(settings: &Settings<AE>) -> Self {
let mut rng = get_rng();
let delay_min = settings.get(&DECOY_MAINTENANCE_DELAY_MIN);
let delay_max = settings.get(&DECOY_MAINTENANCE_DELAY_MAX);
let length_min = settings.get(&DECOY_MAINTENANCE_LENGTH_MIN) as usize;
let length_max = settings.get(&DECOY_MAINTENANCE_LENGTH_MAX) as usize;
let fixed_delay = rng.gen_range(delay_min..=delay_max);
let fixed_length = rng.gen_range(length_min..=length_max);
let maintenance_mode = weighted_random! {
settings.get(&DECOY_MAINTENANCE_WEIGHT_NONE) => MaintenanceMode::None,
settings.get(&DECOY_MAINTENANCE_WEIGHT_RANDOM) => MaintenanceMode::Random,
settings.get(&DECOY_MAINTENANCE_WEIGHT_TIMED) => MaintenanceMode::Timed {
delay_ms: fixed_delay,
},
settings.get(&DECOY_MAINTENANCE_WEIGHT_SIZED) => MaintenanceMode::Sized {
length: fixed_length,
},
settings.get(&DECOY_MAINTENANCE_WEIGHT_BOTH) => MaintenanceMode::Both {
delay_ms: fixed_delay,
length: fixed_length,
},
};
let replication_mode = weighted_random! {
settings.get(&DECOY_REPLICATION_WEIGHT_NONE) => ReplicationMode::None,
settings.get(&DECOY_REPLICATION_WEIGHT_MAINTENANCE) => ReplicationMode::Maintenance,
settings.get(&DECOY_REPLICATION_WEIGHT_ALL) => ReplicationMode::All,
};
let prob_min = settings.get(&DECOY_REPLICATION_PROBABILITY_MIN);
let prob_max = settings.get(&DECOY_REPLICATION_PROBABILITY_MAX);
let replication_probability = rng.gen_range(prob_min..=prob_max);
let subheader_mode = weighted_random! {
settings.get(&DECOY_SUBHEADER_WEIGHT_NONE) => SubheaderMode::None,
settings.get(&DECOY_SUBHEADER_WEIGHT_MAINTENANCE) => SubheaderMode::Maintenance,
settings.get(&DECOY_SUBHEADER_WEIGHT_ALL) => SubheaderMode::All,
};
let subheader_config = if subheader_mode == SubheaderMode::None {
None
} else {
let min_len = settings.get(&DECOY_SUBHEADER_LENGTH_MIN) as usize;
let max_len = settings.get(&DECOY_SUBHEADER_LENGTH_MAX) as usize;
Some(generate_random_fake_header(settings, min_len, max_len))
};
info!("decoy feature config: maintenance={maintenance_mode:?}, replication={replication_mode:?}, replication_prob={replication_probability:.4}, subheader={subheader_mode:?}");
Self {
maintenance_mode,
replication_mode,
replication_probability,
subheader_mode,
subheader_config,
}
}
}
fn generate_random_fake_header<AE: AsyncExecutor>(settings: &Settings<AE>, min_len: usize, max_len: usize) -> FakeHeaderConfig {
let mut rng = get_rng();
let target_len = rng.gen_range(min_len..=max_len);
let mut fields = Vec::new();
let mut current_len = 0usize;
while current_len < target_len {
let remaining = target_len - current_len;
let size = if remaining >= 8 {
*[1usize, 2, 4, 8].choose(&mut rng).unwrap()
} else if remaining >= 4 {
*[1usize, 2, 4].choose(&mut rng).unwrap()
} else if remaining >= 2 {
*[1usize, 2].choose(&mut rng).unwrap()
} else {
1
};
let field = match size {
1 => FieldTypeHolder::U8(random_field_type(settings, &mut rng)),
2 => FieldTypeHolder::U16(random_field_type(settings, &mut rng)),
4 => FieldTypeHolder::U32(random_field_type(settings, &mut rng)),
8 => FieldTypeHolder::U64(random_field_type(settings, &mut rng)),
_ => unreachable!(),
};
fields.push(field);
current_len += size;
}
FakeHeaderConfig::new(fields)
}
fn random_field_type<AE: AsyncExecutor, L: Copy + From<u8>>(settings: &Settings<AE>, rng: &mut impl Rng) -> FieldType<L>
where
rand::distributions::Standard: Distribution<L>,
{
let volatile_prob_min = settings.get(&FAKE_HEADER_VOLATILE_CHANGE_PROB_MIN);
let volatile_prob_max = settings.get(&FAKE_HEADER_VOLATILE_CHANGE_PROB_MAX);
let switching_timeout_min = settings.get(&FAKE_HEADER_SWITCHING_TIMEOUT_MIN_MS);
let switching_timeout_max = settings.get(&FAKE_HEADER_SWITCHING_TIMEOUT_MAX_MS);
weighted_random! {
settings.get(&FAKE_HEADER_FIELD_WEIGHT_RANDOM) => FieldType::Random,
settings.get(&FAKE_HEADER_FIELD_WEIGHT_CONSTANT) => FieldType::Constant {
value: rng.r#gen::<L>(),
},
settings.get(&FAKE_HEADER_FIELD_WEIGHT_VOLATILE) => FieldType::Volatile {
value: rng.r#gen::<L>(),
change_probability: rng.gen_range(volatile_prob_min..=volatile_prob_max),
},
settings.get(&FAKE_HEADER_FIELD_WEIGHT_SWITCHING) => {
let switch_timeout = rng.gen_range(switching_timeout_min..=switching_timeout_max);
FieldType::Switching {
value: rng.r#gen::<L>(),
next_switch: unix_timestamp_ms() + switch_timeout as u128,
switch_timeout,
}
},
settings.get(&FAKE_HEADER_FIELD_WEIGHT_INCREMENTAL) => FieldType::Incremental {
value: rng.r#gen::<L>(),
}
}
}
pub trait DecoyFlowSender: Send + Sync {
fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>>;
}
#[async_trait]
pub trait DecoyProvider: Send + Sync {
fn name(&self) -> &'static str;
async fn start(&self);
async fn feed_input(&self, packet: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer>;
async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer>;
}
pub trait DecoyCommunicationMode<T: IdentityType + Clone, AE: AsyncExecutor>: DecoyProvider + Sized {
fn name() -> &'static str {
let full = std::any::type_name::<Self>();
let without_generics = full.split('<').next().unwrap_or(full);
without_generics.split("::").last().unwrap_or(without_generics)
}
fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self;
}
pub(crate) struct DecoyState<T: IdentityType + Clone, AE: AsyncExecutor> {
pub(super) settings: Arc<Settings<AE>>,
pub(super) reference_rate: f64,
pub(super) packet_rate: f64,
pub(super) byte_rate: f64,
pub(super) byte_budget: f64,
previous_packet_time: Option<u128>,
pub(super) packet_length_cap: usize,
counter: Arc<AtomicU32>,
identity: DerivedValue<T>,
pub(super) next_decoy_time: u128,
pub(super) pending_length: usize,
pub(super) features: DecoyFeatureConfig,
pub(super) next_maintenance_time: u128,
pub(super) pending_maintenance_length: usize,
fallthrough_probability: f64,
}
impl<T: IdentityType + Clone, AE: AsyncExecutor> DecoyState<T, AE> {
pub(super) fn new(settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
let byte_rate_cap = settings.get(&DECOY_BYTE_RATE_CAP);
let byte_rate_factor = settings.get(&DECOY_BYTE_RATE_FACTOR);
let length_max = settings.get(&DECOY_LENGTH_MAX) as usize;
let length_min = settings.get(&DECOY_LENGTH_MIN) as usize;
let now = unix_timestamp_ms();
let features = DecoyFeatureConfig::random(&settings);
let (maint_time, maint_len) = if features.maintenance_mode == MaintenanceMode::None {
(u128::MAX, 0)
} else {
let delay = maintenance_delay_for(&features.maintenance_mode, &settings);
let length = maintenance_length_for(&features.maintenance_mode, &settings);
(now + delay as u128, length)
};
let fallthrough_probability = fallthrough_probability.unwrap_or_else(|| {
let lo = settings.get(&DECOY_FALLTHROUGH_PACKETS_MIN);
let hi = settings.get(&DECOY_FALLTHROUGH_PACKETS_MAX);
if lo >= hi {
lo
} else {
get_rng().gen_range(lo..=hi)
}
});
Self {
settings: settings.clone(),
reference_rate: settings.get(&DECOY_REFERENCE_PACKET_RATE_DEFAULT),
packet_rate: settings.get(&DECOY_CURRENT_PACKET_RATE_DEFAULT),
byte_rate: settings.get(&DECOY_CURRENT_BYTE_RATE_DEFAULT),
byte_budget: byte_rate_cap * byte_rate_factor / 2.0,
previous_packet_time: None,
packet_length_cap: length_max.max(length_min),
counter,
identity,
next_decoy_time: now,
pending_length: length_min,
features,
next_maintenance_time: maint_time,
pending_maintenance_length: maint_len,
fallthrough_probability,
}
}
#[inline]
pub(super) fn should_fallthrough(&self) -> bool {
if self.fallthrough_probability <= 0.0 {
false
} else if self.fallthrough_probability >= 1.0 {
true
} else {
get_rng().r#gen::<f64>() < self.fallthrough_probability
}
}
pub(super) fn update(&mut self, packet_length: usize, outgoing_real: bool) {
let current_time = unix_timestamp_ms();
if let Some(prev_time) = self.previous_packet_time {
let time_delta = (current_time - prev_time) as f64;
let reference_alpha = self.settings.get(&DECOY_REFERENCE_ALPHA);
let current_alpha = self.settings.get(&DECOY_CURRENT_ALPHA);
let byte_rate_cap = self.settings.get(&DECOY_BYTE_RATE_CAP);
let byte_rate_factor = self.settings.get(&DECOY_BYTE_RATE_FACTOR);
self.reference_rate = (1.0 - reference_alpha) * self.reference_rate + reference_alpha * time_delta;
self.packet_rate = (1.0 - current_alpha) * self.packet_rate + current_alpha * time_delta;
self.byte_rate = (1.0 - current_alpha) * self.byte_rate + current_alpha * (packet_length as f64);
let refill = time_delta * byte_rate_cap / 1000.0;
let deduct = if outgoing_real {
packet_length as f64
} else {
0.0
};
self.byte_budget = (self.byte_budget + refill - deduct).clamp(0.0, byte_rate_cap * byte_rate_factor);
}
self.previous_packet_time = Some(current_time);
}
pub(super) fn quietness_index(&self) -> f64 {
((self.reference_rate - self.packet_rate) / self.reference_rate).clamp(0.0, 1.0)
}
fn next_packet_number(&self) -> u64 {
let counter = self.counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
let timestamp = (unix_timestamp_ms() / 1000) as u32;
((timestamp as u64) << 32) | counter as u64
}
pub(super) fn create_decoy_packet(&mut self, body_length: usize, is_maintenance: bool) -> DynamicByteBuffer {
let subheader_len = self.subheader_length(is_maintenance);
let total_length = body_length + Tailer::<T>::len();
let packet = self.settings.pool().allocate(Some(total_length));
get_rng().fill(packet.slice_end_mut(body_length));
let pn = self.next_packet_number();
Tailer::decoy(packet.rebuffer_start(body_length), &self.identity.get(), pn);
if subheader_len > 0 {
let expanded = packet.expand_start(subheader_len);
if let Some(ref mut config) = self.features.subheader_config {
config.fill(expanded.rebuffer_end(expanded.len() - subheader_len));
}
return expanded;
}
packet
}
pub(super) fn create_replica_packet(&mut self, original_body: &[u8], is_maintenance: bool) -> DynamicByteBuffer {
let subheader_len = self.subheader_length(is_maintenance);
let body_length = original_body.len();
let total_length = body_length + Tailer::<T>::len();
let packet = self.settings.pool().allocate(Some(total_length));
packet.slice_end_mut(body_length).copy_from_slice(original_body);
let pn = self.next_packet_number();
Tailer::decoy(packet.rebuffer_start(body_length), &self.identity.get(), pn);
if subheader_len > 0 {
let expanded = packet.expand_start(subheader_len);
if let Some(ref mut config) = self.features.subheader_config {
config.fill(expanded.rebuffer_end(expanded.len() - subheader_len));
}
return expanded;
}
packet
}
pub(super) fn try_spend_budget(&mut self, bytes: usize) -> bool {
if self.byte_budget >= bytes as f64 {
self.byte_budget -= bytes as f64;
true
} else {
false
}
}
pub(super) fn schedule_next(&mut self, delay: u64, length: usize) {
self.next_decoy_time = unix_timestamp_ms() + delay as u128;
self.pending_length = length;
}
pub(super) fn schedule_next_maintenance(&mut self) {
let delay = maintenance_delay_for(&self.features.maintenance_mode, &self.settings);
let length = maintenance_length_for(&self.features.maintenance_mode, &self.settings);
self.next_maintenance_time = unix_timestamp_ms() + delay as u128;
self.pending_maintenance_length = length;
}
fn subheader_length(&self, is_maintenance: bool) -> usize {
let should_apply = match self.features.subheader_mode {
SubheaderMode::None => false,
SubheaderMode::Maintenance => is_maintenance,
SubheaderMode::All => true,
};
if should_apply {
self.features.subheader_config.as_ref().map_or(0, super::super::config::FakeHeaderConfig::len)
} else {
0
}
}
pub(super) fn should_replicate(&self, is_maintenance: bool) -> bool {
match self.features.replication_mode {
ReplicationMode::None => false,
ReplicationMode::Maintenance => is_maintenance,
ReplicationMode::All => true,
}
}
}
fn maintenance_delay_for<AE: AsyncExecutor>(mode: &MaintenanceMode, settings: &Settings<AE>) -> u64 {
match *mode {
MaintenanceMode::Timed {
delay_ms,
}
| MaintenanceMode::Both {
delay_ms,
..
} => delay_ms,
_ => {
let min = settings.get(&DECOY_MAINTENANCE_DELAY_MIN);
let max = settings.get(&DECOY_MAINTENANCE_DELAY_MAX);
random_uniform(min as f64, max as f64) as u64
}
}
}
fn maintenance_length_for<AE: AsyncExecutor>(mode: &MaintenanceMode, settings: &Settings<AE>) -> usize {
match *mode {
MaintenanceMode::Sized {
length,
}
| MaintenanceMode::Both {
length,
..
} => length,
_ => {
let min = settings.get(&DECOY_MAINTENANCE_LENGTH_MIN) as usize;
let max = settings.get(&DECOY_MAINTENANCE_LENGTH_MAX) as usize;
random_uniform(min as f64, max as f64) as usize
}
}
}
pub(super) async fn maintenance_timer_task<T, AE>(manager: Weak<dyn DecoyFlowSender>, state: Arc<RwLock<DecoyState<T, AE>>>)
where
T: IdentityType + Clone + 'static,
AE: AsyncExecutor + 'static,
{
{
let guard = state.read().await;
if guard.features.maintenance_mode == MaintenanceMode::None {
return;
}
}
loop {
let delay = {
let guard = state.read().await;
let remaining = guard.next_maintenance_time.saturating_sub(unix_timestamp_ms());
Duration::from_millis(remaining as u64)
};
sleep(delay).await;
let Some(manager_arc) = manager.upgrade() else {
warn!("Maintenance timer: manager dropped, stopping");
break;
};
let (packet, body_length, should_rep, fallthrough, settings) = {
let mut guard = state.write().await;
let length = guard.pending_maintenance_length;
if !guard.try_spend_budget(length) {
guard.schedule_next_maintenance();
continue;
}
let packet = guard.create_decoy_packet(length, true);
let should_rep = guard.should_replicate(true);
let fallthrough = guard.should_fallthrough();
let settings = Arc::clone(&guard.settings);
(packet, length, should_rep, fallthrough, settings)
};
let body_buf = should_rep.then(|| settings.pool().allocate_precise_from_slice_with_capacity(packet.slice_end(body_length), 0, 0));
debug!("Maintenance: generated packet (len={body_length})");
if let Err(err) = manager_arc.send_decoy_packet(packet, fallthrough, true).await {
warn!("Maintenance: failed to send: {err:?}");
} else if let Some(body) = body_buf {
try_replicate(&state, &manager, true, body).await;
}
{
let mut guard = state.write().await;
guard.schedule_next_maintenance();
}
}
}
pub(super) async fn try_replicate<T, AE>(state: &Arc<RwLock<DecoyState<T, AE>>>, manager: &Weak<dyn DecoyFlowSender>, is_maintenance: bool, body: DynamicByteBuffer)
where
T: IdentityType + Clone + 'static,
AE: AsyncExecutor + 'static,
{
let (probability, delay_min, delay_max, reduce, executor) = {
let guard = state.read().await;
if !guard.should_replicate(is_maintenance) {
return;
}
(guard.features.replication_probability, guard.settings.get(&DECOY_REPLICATION_DELAY_MIN), guard.settings.get(&DECOY_REPLICATION_DELAY_MAX), guard.settings.get(&DECOY_REPLICATION_PROBABILITY_REDUCE), guard.settings.executor().clone())
};
let state_clone = Arc::clone(state);
let manager_clone = manager.clone();
executor.spawn(async move {
let mut current_probability = probability;
loop {
if get_rng().r#gen::<f64>() >= current_probability {
break;
}
let delay = random_uniform(delay_min as f64, delay_max as f64) as u64;
sleep(Duration::from_millis(delay)).await;
let Some(manager_arc) = manager_clone.upgrade() else {
break;
};
let (packet, fallthrough) = {
let mut guard = state_clone.write().await;
if !guard.try_spend_budget(body.slice().len()) {
break;
}
let replica = guard.create_replica_packet(body.slice(), is_maintenance);
(replica, guard.should_fallthrough())
};
if manager_arc.send_decoy_packet(packet, fallthrough, is_maintenance).await.is_err() {
break;
}
current_probability /= reduce;
}
});
}
#[inline]
pub(super) fn random_uniform(min: f64, max: f64) -> f64 {
get_rng().gen_range(min..=max)
}
#[inline]
pub(super) fn random_gauss(mean: f64, sigma: f64) -> f64 {
if sigma <= 0.0 {
return mean;
}
let normal = Normal::new(mean, sigma).unwrap_or_else(|_| Normal::new(mean, 1.0).unwrap());
normal.sample(&mut get_rng())
}
#[inline]
pub(super) fn exponential_variance(rate: f64) -> f64 {
if rate <= 0.0 {
return f64::MAX;
}
let exp = Exp::new(rate).unwrap_or_else(|_| Exp::new(1.0).unwrap());
exp.sample(&mut get_rng())
}