1#[cfg(test)]
2#[path = "../../../tests/flow/decoy.rs"]
3mod tests;
4
5use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicU32, Ordering};
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11
12use async_trait::async_trait;
13use log::{debug, info, warn};
14use rand::Rng;
15use rand::seq::SliceRandom;
16use rand_distr::{Distribution, Exp, Normal};
17
18use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
19use crate::cache::DerivedValue;
20use crate::flow::config::{FakeHeaderConfig, FieldType, FieldTypeHolder};
21use crate::flow::error::FlowControllerError;
22use crate::settings::Settings;
23use crate::settings::keys::*;
24use crate::tailer::{IdentityType, Tailer};
25use crate::utils::random::get_rng;
26use crate::utils::sync::{AsyncExecutor, RwLock, sleep};
27use crate::utils::unix_timestamp_ms;
28use crate::weighted_random;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub(super) enum MaintenanceMode {
35 None,
36 Random,
37 Timed {
38 delay_ms: u64,
39 },
40 Sized {
41 length: usize,
42 },
43 Both {
44 delay_ms: u64,
45 length: usize,
46 },
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub(super) enum ReplicationMode {
52 None,
53 Maintenance,
54 All,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub(super) enum SubheaderMode {
60 None,
61 Maintenance,
62 All,
63}
64
65pub(super) struct DecoyFeatureConfig {
70 pub(super) maintenance_mode: MaintenanceMode,
71 pub(super) replication_mode: ReplicationMode,
72 pub(super) replication_probability: f64,
73 pub(super) subheader_mode: SubheaderMode,
74 pub(super) subheader_config: Option<FakeHeaderConfig>,
75}
76
77impl DecoyFeatureConfig {
78 pub(super) fn random<AE: AsyncExecutor>(settings: &Settings<AE>) -> Self {
79 let mut rng = get_rng();
80
81 let delay_min = settings.get(&DECOY_MAINTENANCE_DELAY_MIN);
82 let delay_max = settings.get(&DECOY_MAINTENANCE_DELAY_MAX);
83 let length_min = settings.get(&DECOY_MAINTENANCE_LENGTH_MIN) as usize;
84 let length_max = settings.get(&DECOY_MAINTENANCE_LENGTH_MAX) as usize;
85 let fixed_delay = rng.gen_range(delay_min..=delay_max);
86 let fixed_length = rng.gen_range(length_min..=length_max);
87
88 let maintenance_mode = weighted_random! {
90 settings.get(&DECOY_MAINTENANCE_WEIGHT_NONE) => MaintenanceMode::None,
91 settings.get(&DECOY_MAINTENANCE_WEIGHT_RANDOM) => MaintenanceMode::Random,
92 settings.get(&DECOY_MAINTENANCE_WEIGHT_TIMED) => MaintenanceMode::Timed {
93 delay_ms: fixed_delay,
94 },
95 settings.get(&DECOY_MAINTENANCE_WEIGHT_SIZED) => MaintenanceMode::Sized {
96 length: fixed_length,
97 },
98 settings.get(&DECOY_MAINTENANCE_WEIGHT_BOTH) => MaintenanceMode::Both {
99 delay_ms: fixed_delay,
100 length: fixed_length,
101 },
102 };
103
104 let replication_mode = weighted_random! {
106 settings.get(&DECOY_REPLICATION_WEIGHT_NONE) => ReplicationMode::None,
107 settings.get(&DECOY_REPLICATION_WEIGHT_MAINTENANCE) => ReplicationMode::Maintenance,
108 settings.get(&DECOY_REPLICATION_WEIGHT_ALL) => ReplicationMode::All,
109 };
110
111 let prob_min = settings.get(&DECOY_REPLICATION_PROBABILITY_MIN);
112 let prob_max = settings.get(&DECOY_REPLICATION_PROBABILITY_MAX);
113 let replication_probability = rng.gen_range(prob_min..=prob_max);
114
115 let subheader_mode = weighted_random! {
117 settings.get(&DECOY_SUBHEADER_WEIGHT_NONE) => SubheaderMode::None,
118 settings.get(&DECOY_SUBHEADER_WEIGHT_MAINTENANCE) => SubheaderMode::Maintenance,
119 settings.get(&DECOY_SUBHEADER_WEIGHT_ALL) => SubheaderMode::All,
120 };
121
122 let subheader_config = if subheader_mode == SubheaderMode::None {
123 None
124 } else {
125 let min_len = settings.get(&DECOY_SUBHEADER_LENGTH_MIN) as usize;
126 let max_len = settings.get(&DECOY_SUBHEADER_LENGTH_MAX) as usize;
127 Some(generate_random_fake_header(settings, min_len, max_len))
128 };
129
130 info!("decoy feature config: maintenance={maintenance_mode:?}, replication={replication_mode:?}, replication_prob={replication_probability:.4}, subheader={subheader_mode:?}");
131
132 Self {
133 maintenance_mode,
134 replication_mode,
135 replication_probability,
136 subheader_mode,
137 subheader_config,
138 }
139 }
140}
141
142fn generate_random_fake_header<AE: AsyncExecutor>(settings: &Settings<AE>, min_len: usize, max_len: usize) -> FakeHeaderConfig {
144 let mut rng = get_rng();
145 let target_len = rng.gen_range(min_len..=max_len);
146 let mut fields = Vec::new();
147 let mut current_len = 0usize;
148
149 while current_len < target_len {
150 let remaining = target_len - current_len;
151 let size = if remaining >= 8 {
153 *[1usize, 2, 4, 8].choose(&mut rng).unwrap()
154 } else if remaining >= 4 {
155 *[1usize, 2, 4].choose(&mut rng).unwrap()
156 } else if remaining >= 2 {
157 *[1usize, 2].choose(&mut rng).unwrap()
158 } else {
159 1
160 };
161
162 let field = match size {
163 1 => FieldTypeHolder::U8(random_field_type(settings, &mut rng)),
164 2 => FieldTypeHolder::U16(random_field_type(settings, &mut rng)),
165 4 => FieldTypeHolder::U32(random_field_type(settings, &mut rng)),
166 8 => FieldTypeHolder::U64(random_field_type(settings, &mut rng)),
167 _ => unreachable!(),
168 };
169 fields.push(field);
170 current_len += size;
171 }
172
173 FakeHeaderConfig::new(fields)
174}
175
176fn random_field_type<AE: AsyncExecutor, L: Copy + From<u8>>(settings: &Settings<AE>, rng: &mut impl Rng) -> FieldType<L>
178where
179 rand::distributions::Standard: Distribution<L>,
180{
181 let volatile_prob_min = settings.get(&FAKE_HEADER_VOLATILE_CHANGE_PROB_MIN);
182 let volatile_prob_max = settings.get(&FAKE_HEADER_VOLATILE_CHANGE_PROB_MAX);
183 let switching_timeout_min = settings.get(&FAKE_HEADER_SWITCHING_TIMEOUT_MIN_MS);
184 let switching_timeout_max = settings.get(&FAKE_HEADER_SWITCHING_TIMEOUT_MAX_MS);
185 weighted_random! {
186 settings.get(&FAKE_HEADER_FIELD_WEIGHT_RANDOM) => FieldType::Random,
187 settings.get(&FAKE_HEADER_FIELD_WEIGHT_CONSTANT) => FieldType::Constant {
188 value: rng.r#gen::<L>(),
189 },
190 settings.get(&FAKE_HEADER_FIELD_WEIGHT_VOLATILE) => FieldType::Volatile {
191 value: rng.r#gen::<L>(),
192 change_probability: rng.gen_range(volatile_prob_min..=volatile_prob_max),
193 },
194 settings.get(&FAKE_HEADER_FIELD_WEIGHT_SWITCHING) => {
195 let switch_timeout = rng.gen_range(switching_timeout_min..=switching_timeout_max);
196 FieldType::Switching {
197 value: rng.r#gen::<L>(),
198 next_switch: unix_timestamp_ms() + switch_timeout as u128,
199 switch_timeout,
200 }
201 },
202 settings.get(&FAKE_HEADER_FIELD_WEIGHT_INCREMENTAL) => FieldType::Incremental {
203 value: rng.r#gen::<L>(),
204 }
205 }
206}
207
208pub trait DecoyFlowSender: Send + Sync {
213 fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>>;
215}
216
217#[async_trait]
222pub trait DecoyProvider: Send + Sync {
223 fn name(&self) -> &'static str;
225
226 async fn start(&self);
228
229 async fn feed_input(&self, packet: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer>;
232
233 async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer>;
236}
237
238pub trait DecoyCommunicationMode<T: IdentityType + Clone, AE: AsyncExecutor>: DecoyProvider + Sized {
241 fn name() -> &'static str {
243 let full = std::any::type_name::<Self>();
244 let without_generics = full.split('<').next().unwrap_or(full);
245 without_generics.split("::").last().unwrap_or(without_generics)
246 }
247
248 fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self;
253}
254
255pub(crate) struct DecoyState<T: IdentityType + Clone, AE: AsyncExecutor> {
260 pub(super) settings: Arc<Settings<AE>>,
261 pub(super) reference_rate: f64,
263 pub(super) packet_rate: f64,
265 pub(super) byte_rate: f64,
267 pub(super) byte_budget: f64,
269 previous_packet_time: Option<u128>,
271 pub(super) packet_length_cap: usize,
273 counter: Arc<AtomicU32>,
276 identity: DerivedValue<T>,
279 pub(super) next_decoy_time: u128,
281 pub(super) pending_length: usize,
283 pub(super) features: DecoyFeatureConfig,
285 pub(super) next_maintenance_time: u128,
287 pub(super) pending_maintenance_length: usize,
289 fallthrough_probability: f64,
291}
292
293impl<T: IdentityType + Clone, AE: AsyncExecutor> DecoyState<T, AE> {
294 pub(super) fn new(settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
298 let byte_rate_cap = settings.get(&DECOY_BYTE_RATE_CAP);
299 let byte_rate_factor = settings.get(&DECOY_BYTE_RATE_FACTOR);
300 let length_max = settings.get(&DECOY_LENGTH_MAX) as usize;
301 let length_min = settings.get(&DECOY_LENGTH_MIN) as usize;
302
303 let now = unix_timestamp_ms();
304 let features = DecoyFeatureConfig::random(&settings);
305
306 let (maint_time, maint_len) = if features.maintenance_mode == MaintenanceMode::None {
308 (u128::MAX, 0)
309 } else {
310 let delay = maintenance_delay_for(&features.maintenance_mode, &settings);
311 let length = maintenance_length_for(&features.maintenance_mode, &settings);
312 (now + delay as u128, length)
313 };
314
315 let fallthrough_probability = fallthrough_probability.unwrap_or_else(|| {
316 let lo = settings.get(&DECOY_FALLTHROUGH_PACKETS_MIN);
317 let hi = settings.get(&DECOY_FALLTHROUGH_PACKETS_MAX);
318 if lo >= hi {
319 lo
320 } else {
321 get_rng().gen_range(lo..=hi)
322 }
323 });
324
325 Self {
326 settings: settings.clone(),
327 reference_rate: settings.get(&DECOY_REFERENCE_PACKET_RATE_DEFAULT),
328 packet_rate: settings.get(&DECOY_CURRENT_PACKET_RATE_DEFAULT),
329 byte_rate: settings.get(&DECOY_CURRENT_BYTE_RATE_DEFAULT),
330 byte_budget: byte_rate_cap * byte_rate_factor / 2.0,
331 previous_packet_time: None,
332 packet_length_cap: length_max.max(length_min),
333 counter,
334 identity,
335 next_decoy_time: now,
336 pending_length: length_min,
337 features,
338 next_maintenance_time: maint_time,
339 pending_maintenance_length: maint_len,
340 fallthrough_probability,
341 }
342 }
343
344 #[inline]
346 pub(super) fn should_fallthrough(&self) -> bool {
347 if self.fallthrough_probability <= 0.0 {
348 false
349 } else if self.fallthrough_probability >= 1.0 {
350 true
351 } else {
352 get_rng().r#gen::<f64>() < self.fallthrough_probability
353 }
354 }
355
356 pub(super) fn update(&mut self, packet_length: usize, outgoing_real: bool) {
358 let current_time = unix_timestamp_ms();
359
360 if let Some(prev_time) = self.previous_packet_time {
361 let time_delta = (current_time - prev_time) as f64;
362
363 let reference_alpha = self.settings.get(&DECOY_REFERENCE_ALPHA);
364 let current_alpha = self.settings.get(&DECOY_CURRENT_ALPHA);
365 let byte_rate_cap = self.settings.get(&DECOY_BYTE_RATE_CAP);
366 let byte_rate_factor = self.settings.get(&DECOY_BYTE_RATE_FACTOR);
367
368 self.reference_rate = (1.0 - reference_alpha) * self.reference_rate + reference_alpha * time_delta;
369 self.packet_rate = (1.0 - current_alpha) * self.packet_rate + current_alpha * time_delta;
370 self.byte_rate = (1.0 - current_alpha) * self.byte_rate + current_alpha * (packet_length as f64);
371 let refill = time_delta * byte_rate_cap / 1000.0;
372 let deduct = if outgoing_real {
373 packet_length as f64
374 } else {
375 0.0
376 };
377 self.byte_budget = (self.byte_budget + refill - deduct).clamp(0.0, byte_rate_cap * byte_rate_factor);
378 }
379
380 self.previous_packet_time = Some(current_time);
381 }
382
383 pub(super) fn quietness_index(&self) -> f64 {
385 ((self.reference_rate - self.packet_rate) / self.reference_rate).clamp(0.0, 1.0)
386 }
387
388 fn next_packet_number(&self) -> u64 {
393 let counter = self.counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
394 let timestamp = (unix_timestamp_ms() / 1000) as u32;
395 ((timestamp as u64) << 32) | counter as u64
396 }
397
398 pub(super) fn create_decoy_packet(&mut self, body_length: usize, is_maintenance: bool) -> DynamicByteBuffer {
401 let subheader_len = self.subheader_length(is_maintenance);
402 let total_length = body_length + Tailer::<T>::len();
403 let packet = self.settings.pool().allocate(Some(total_length));
404
405 get_rng().fill(packet.slice_end_mut(body_length));
406
407 let pn = self.next_packet_number();
408 Tailer::decoy(packet.rebuffer_start(body_length), &self.identity.get(), pn);
409
410 if subheader_len > 0 {
411 let expanded = packet.expand_start(subheader_len);
412 if let Some(ref mut config) = self.features.subheader_config {
413 config.fill(expanded.rebuffer_end(expanded.len() - subheader_len));
414 }
415 return expanded;
416 }
417
418 packet
419 }
420
421 pub(super) fn create_replica_packet(&mut self, original_body: &[u8], is_maintenance: bool) -> DynamicByteBuffer {
423 let subheader_len = self.subheader_length(is_maintenance);
424 let body_length = original_body.len();
425 let total_length = body_length + Tailer::<T>::len();
426 let packet = self.settings.pool().allocate(Some(total_length));
427
428 packet.slice_end_mut(body_length).copy_from_slice(original_body);
429
430 let pn = self.next_packet_number();
431 Tailer::decoy(packet.rebuffer_start(body_length), &self.identity.get(), pn);
432
433 if subheader_len > 0 {
434 let expanded = packet.expand_start(subheader_len);
435 if let Some(ref mut config) = self.features.subheader_config {
436 config.fill(expanded.rebuffer_end(expanded.len() - subheader_len));
437 }
438 return expanded;
439 }
440
441 packet
442 }
443
444 pub(super) fn try_spend_budget(&mut self, bytes: usize) -> bool {
447 if self.byte_budget >= bytes as f64 {
448 self.byte_budget -= bytes as f64;
449 true
450 } else {
451 false
452 }
453 }
454
455 pub(super) fn schedule_next(&mut self, delay: u64, length: usize) {
457 self.next_decoy_time = unix_timestamp_ms() + delay as u128;
458 self.pending_length = length;
459 }
460
461 pub(super) fn schedule_next_maintenance(&mut self) {
463 let delay = maintenance_delay_for(&self.features.maintenance_mode, &self.settings);
464 let length = maintenance_length_for(&self.features.maintenance_mode, &self.settings);
465 self.next_maintenance_time = unix_timestamp_ms() + delay as u128;
466 self.pending_maintenance_length = length;
467 }
468
469 fn subheader_length(&self, is_maintenance: bool) -> usize {
471 let should_apply = match self.features.subheader_mode {
472 SubheaderMode::None => false,
473 SubheaderMode::Maintenance => is_maintenance,
474 SubheaderMode::All => true,
475 };
476 if should_apply {
477 self.features.subheader_config.as_ref().map_or(0, super::super::config::FakeHeaderConfig::len)
478 } else {
479 0
480 }
481 }
482
483 pub(super) fn should_replicate(&self, is_maintenance: bool) -> bool {
485 match self.features.replication_mode {
486 ReplicationMode::None => false,
487 ReplicationMode::Maintenance => is_maintenance,
488 ReplicationMode::All => true,
489 }
490 }
491}
492
493fn maintenance_delay_for<AE: AsyncExecutor>(mode: &MaintenanceMode, settings: &Settings<AE>) -> u64 {
497 match *mode {
498 MaintenanceMode::Timed {
499 delay_ms,
500 }
501 | MaintenanceMode::Both {
502 delay_ms,
503 ..
504 } => delay_ms,
505 _ => {
506 let min = settings.get(&DECOY_MAINTENANCE_DELAY_MIN);
507 let max = settings.get(&DECOY_MAINTENANCE_DELAY_MAX);
508 random_uniform(min as f64, max as f64) as u64
509 }
510 }
511}
512
513fn maintenance_length_for<AE: AsyncExecutor>(mode: &MaintenanceMode, settings: &Settings<AE>) -> usize {
515 match *mode {
516 MaintenanceMode::Sized {
517 length,
518 }
519 | MaintenanceMode::Both {
520 length,
521 ..
522 } => length,
523 _ => {
524 let min = settings.get(&DECOY_MAINTENANCE_LENGTH_MIN) as usize;
525 let max = settings.get(&DECOY_MAINTENANCE_LENGTH_MAX) as usize;
526 random_uniform(min as f64, max as f64) as usize
527 }
528 }
529}
530
531pub(super) async fn maintenance_timer_task<T, AE>(manager: Weak<dyn DecoyFlowSender>, state: Arc<RwLock<DecoyState<T, AE>>>)
534where
535 T: IdentityType + Clone + 'static,
536 AE: AsyncExecutor + 'static,
537{
538 {
539 let guard = state.read().await;
540 if guard.features.maintenance_mode == MaintenanceMode::None {
541 return;
542 }
543 }
544
545 loop {
546 let delay = {
547 let guard = state.read().await;
548 let remaining = guard.next_maintenance_time.saturating_sub(unix_timestamp_ms());
549 Duration::from_millis(remaining as u64)
550 };
551
552 sleep(delay).await;
553
554 let Some(manager_arc) = manager.upgrade() else {
555 warn!("Maintenance timer: manager dropped, stopping");
556 break;
557 };
558
559 let (packet, body_length, should_rep, fallthrough, settings) = {
560 let mut guard = state.write().await;
561 let length = guard.pending_maintenance_length;
562
563 if !guard.try_spend_budget(length) {
564 guard.schedule_next_maintenance();
565 continue;
566 }
567
568 let packet = guard.create_decoy_packet(length, true);
569 let should_rep = guard.should_replicate(true);
570 let fallthrough = guard.should_fallthrough();
571 let settings = Arc::clone(&guard.settings);
572 (packet, length, should_rep, fallthrough, settings)
573 };
574
575 let body_buf = should_rep.then(|| settings.pool().allocate_precise_from_slice_with_capacity(packet.slice_end(body_length), 0, 0));
576
577 debug!("Maintenance: generated packet (len={body_length})");
578
579 if let Err(err) = manager_arc.send_decoy_packet(packet, fallthrough, true).await {
580 warn!("Maintenance: failed to send: {err:?}");
581 } else if let Some(body) = body_buf {
582 try_replicate(&state, &manager, true, body).await;
583 }
584
585 {
586 let mut guard = state.write().await;
587 guard.schedule_next_maintenance();
588 }
589 }
590}
591
592pub(super) async fn try_replicate<T, AE>(state: &Arc<RwLock<DecoyState<T, AE>>>, manager: &Weak<dyn DecoyFlowSender>, is_maintenance: bool, body: DynamicByteBuffer)
595where
596 T: IdentityType + Clone + 'static,
597 AE: AsyncExecutor + 'static,
598{
599 let (probability, delay_min, delay_max, reduce, executor) = {
600 let guard = state.read().await;
601 if !guard.should_replicate(is_maintenance) {
602 return;
603 }
604 (guard.features.replication_probability, guard.settings.get(&DECOY_REPLICATION_DELAY_MIN), guard.settings.get(&DECOY_REPLICATION_DELAY_MAX), guard.settings.get(&DECOY_REPLICATION_PROBABILITY_REDUCE), guard.settings.executor().clone())
605 };
606
607 let state_clone = Arc::clone(state);
608 let manager_clone = manager.clone();
609
610 executor.spawn(async move {
611 let mut current_probability = probability;
612 loop {
613 if get_rng().r#gen::<f64>() >= current_probability {
614 break;
615 }
616
617 let delay = random_uniform(delay_min as f64, delay_max as f64) as u64;
618 sleep(Duration::from_millis(delay)).await;
619
620 let Some(manager_arc) = manager_clone.upgrade() else {
621 break;
622 };
623
624 let (packet, fallthrough) = {
625 let mut guard = state_clone.write().await;
626 if !guard.try_spend_budget(body.slice().len()) {
627 break;
628 }
629 let replica = guard.create_replica_packet(body.slice(), is_maintenance);
630 (replica, guard.should_fallthrough())
631 };
632
633 if manager_arc.send_decoy_packet(packet, fallthrough, is_maintenance).await.is_err() {
634 break;
635 }
636
637 current_probability /= reduce;
638 }
639 });
640}
641
642#[inline]
646pub(super) fn random_uniform(min: f64, max: f64) -> f64 {
647 get_rng().gen_range(min..=max)
648}
649
650#[inline]
652pub(super) fn random_gauss(mean: f64, sigma: f64) -> f64 {
653 if sigma <= 0.0 {
654 return mean;
655 }
656 let normal = Normal::new(mean, sigma).unwrap_or_else(|_| Normal::new(mean, 1.0).unwrap());
657 normal.sample(&mut get_rng())
658}
659
660#[inline]
662pub(super) fn exponential_variance(rate: f64) -> f64 {
663 if rate <= 0.0 {
664 return f64::MAX;
665 }
666 let exp = Exp::new(rate).unwrap_or_else(|_| Exp::new(1.0).unwrap());
667 exp.sample(&mut get_rng())
668}