typhoon/flow/decoy/
sparse.rs1use std::sync::atomic::AtomicU32;
3use std::sync::{Arc, Weak};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use log::warn;
8
9use crate::bytes::{ByteBuffer, DynamicByteBuffer};
10use crate::cache::DerivedValue;
11use crate::flow::decoy::common::{DecoyCommunicationMode, DecoyFlowSender, DecoyProvider, DecoyState, maintenance_timer_task, random_gauss, random_uniform, try_replicate};
12use crate::settings::Settings;
13use crate::settings::consts::FG_OFFSET;
14use crate::settings::keys::*;
15use crate::tailer::{IdentityType, PacketFlags};
16use crate::utils::sync::{AsyncExecutor, RwLock, sleep};
17use crate::utils::unix_timestamp_ms;
18
19pub struct SparseDecoyProvider<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> {
21 manager: Weak<dyn DecoyFlowSender>,
22 state: Arc<RwLock<DecoyState<T, AE>>>,
23}
24
25impl<T: IdentityType + Clone, AE: AsyncExecutor> SparseDecoyProvider<T, AE> {
26 fn calculate_delay(state: &DecoyState<T, AE>) -> u64 {
27 let base_rate_rnd = state.settings.get(&DECOY_BASE_RATE_RND);
28 let sparse_base_rate = state.settings.get(&DECOY_SPARSE_BASE_RATE);
29 let rate_factor = state.settings.get(&DECOY_SPARSE_RATE_FACTOR);
30 let jitter = state.settings.get(&DECOY_SPARSE_JITTER);
31 let delay_factor = state.settings.get(&DECOY_SPARSE_DELAY_FACTOR);
32 let delay_min = state.settings.get(&DECOY_SPARSE_DELAY_MIN);
33 let delay_max = state.settings.get(&DECOY_SPARSE_DELAY_MAX);
34 let delay_default = state.settings.get(&DECOY_SPARSE_DELAY_DEFAULT);
35
36 let base_rate = sparse_base_rate * random_uniform(1.0 - base_rate_rnd, 1.0 + base_rate_rnd);
37 let quietness = state.quietness_index();
38 let rate = base_rate * quietness * (-rate_factor * state.packet_rate / state.reference_rate).exp();
39
40 let delay = if rate > 0.0 {
41 random_uniform(1.0 - jitter, 1.0 + jitter) * (1.0 + delay_factor * (state.packet_rate / state.reference_rate)) / rate * 1000.0
42 } else {
43 delay_default as f64
44 };
45
46 (delay as u64).clamp(delay_min, delay_max)
47 }
48
49 fn calculate_length(state: &DecoyState<T, AE>) -> usize {
50 let length_factor = state.settings.get(&DECOY_SPARSE_LENGTH_FACTOR);
51 let length_sigma = state.settings.get(&DECOY_SPARSE_LENGTH_SIGMA);
52 let length_min = state.settings.get(&DECOY_SPARSE_LENGTH_MIN) as usize;
53 let length_max = state.settings.get(&DECOY_SPARSE_LENGTH_MAX) as usize;
54
55 let mean = length_factor * (-state.packet_rate / state.reference_rate).exp();
56 let decoy_length = random_gauss(mean, length_sigma);
57
58 (decoy_length as usize).clamp(length_min, length_max)
59 }
60
61 async fn timer_task(manager: Weak<dyn DecoyFlowSender>, state: Arc<RwLock<DecoyState<T, AE>>>) {
62 loop {
63 let delay = {
64 let state_guard = state.read().await;
65 let remaining = state_guard.next_decoy_time.saturating_sub(unix_timestamp_ms());
66 Duration::from_millis(remaining as u64)
67 };
68
69 sleep(delay).await;
70
71 let Some(manager_arc) = manager.upgrade() else {
72 warn!("SparseDecoyProvider: manager dropped, stopping timer");
73 break;
74 };
75
76 {
77 let mut state_guard = state.write().await;
78 let decoy_length = state_guard.pending_length;
79
80 if state_guard.try_spend_budget(decoy_length) {
81 let decoy_packet = state_guard.create_decoy_packet(decoy_length, false);
82 let should_rep = state_guard.should_replicate(false);
83 let fallthrough = state_guard.should_fallthrough();
84 let settings = Arc::clone(&state_guard.settings);
85 drop(state_guard);
86
87 let body_buf = should_rep.then(|| settings.pool().allocate_precise_from_slice_with_capacity(decoy_packet.slice_end(decoy_length), 0, 0));
88 if let Err(err) = manager_arc.send_decoy_packet(decoy_packet, fallthrough, false).await {
89 warn!("SparseDecoyProvider: failed to send decoy packet: {err:?}");
90 } else if let Some(body) = body_buf {
91 try_replicate(&state, &manager, false, body).await;
92 }
93 }
94 }
95
96 {
97 let mut state_guard = state.write().await;
98 let delay = Self::calculate_delay(&state_guard);
99 let length = Self::calculate_length(&state_guard);
100 state_guard.schedule_next(delay, length);
101 }
102 }
103 }
104}
105
106#[async_trait]
107impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyProvider for SparseDecoyProvider<T, AE> {
108 #[inline]
109 fn name(&self) -> &'static str {
110 "SparseDecoyProvider"
111 }
112
113 async fn start(&self) {
114 let executor = {
115 let lock = self.state.read().await;
116 lock.settings.executor().clone()
117 };
118
119 let manager = self.manager.clone();
120 let state = self.state.clone();
121 executor.spawn(Self::timer_task(manager.clone(), state.clone()));
122 executor.spawn(maintenance_timer_task(manager, state));
123 }
124
125 async fn feed_input(&self, packet: DynamicByteBuffer, _tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
126 let mut state = self.state.write().await;
127 state.update(packet.len(), false);
128 Some(packet)
129 }
130
131 async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
132 let flags = PacketFlags::from_bits_truncate(*tailer_buf.get(FG_OFFSET));
133 if !flags.is_discardable() {
134 let mut state = self.state.write().await;
135 state.update(body.len() + tailer_buf.len(), true);
136 }
137 Some(body)
138 }
139}
140
141impl<T: IdentityType + Clone, AE: AsyncExecutor> DecoyCommunicationMode<T, AE> for SparseDecoyProvider<T, AE> {
142 fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
143 let state = DecoyState::new(settings.clone(), identity, counter, fallthrough_probability);
144 let delay = Self::calculate_delay(&state);
145 let length = Self::calculate_length(&state);
146 let mut state = state;
147 state.schedule_next(delay, length);
148
149 Self {
150 manager,
151 state: Arc::new(RwLock::new(state)),
152 }
153 }
154}