Skip to main content

typhoon/flow/decoy/
noisy.rs

1/// Noisy mode: sends smaller decoy packets in bursts often, resembling web or socket traffic.
2use std::sync::atomic::AtomicU32;
3use std::sync::{Arc, Weak};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use log::warn;
8
9use crate::bytes::{ByteBuffer, DynamicByteBuffer};
10use crate::cache::DerivedValue;
11use crate::flow::decoy::common::{DecoyCommunicationMode, DecoyFlowSender, DecoyProvider, DecoyState, exponential_variance, maintenance_timer_task, random_gauss, random_uniform, try_replicate};
12use crate::settings::Settings;
13use crate::settings::consts::FG_OFFSET;
14use crate::settings::keys::*;
15use crate::tailer::{IdentityType, PacketFlags};
16use crate::utils::sync::{AsyncExecutor, RwLock, sleep};
17use crate::utils::unix_timestamp_ms;
18
19/// Noisy mode implements sending smaller decoy packets in bursts often.
20pub struct NoisyDecoyProvider<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> {
21    manager: Weak<dyn DecoyFlowSender>,
22    state: Arc<RwLock<DecoyState<T, AE>>>,
23}
24
25impl<T: IdentityType + Clone, AE: AsyncExecutor> NoisyDecoyProvider<T, AE> {
26    fn calculate_delay(state: &DecoyState<T, AE>) -> u64 {
27        let base_rate_rnd = state.settings.get(&DECOY_BASE_RATE_RND);
28        let noisy_base_rate = state.settings.get(&DECOY_NOISY_BASE_RATE);
29        let delay_min = state.settings.get(&DECOY_NOISY_DELAY_MIN);
30        let delay_max = state.settings.get(&DECOY_NOISY_DELAY_MAX);
31        let delay_default = state.settings.get(&DECOY_NOISY_DELAY_DEFAULT);
32
33        let base_rate = noisy_base_rate * random_uniform(1.0 - base_rate_rnd, 1.0 + base_rate_rnd);
34        let quietness = state.quietness_index();
35        let rate = base_rate * quietness * (-state.packet_rate / state.reference_rate).exp();
36
37        let delay = if rate > 0.0 {
38            exponential_variance(rate * (1.0 + state.packet_rate / state.reference_rate)) * 1000.0
39        } else {
40            delay_default as f64
41        };
42
43        (delay as u64).clamp(delay_min, delay_max)
44    }
45
46    pub(crate) fn calculate_length(state: &DecoyState<T, AE>) -> usize {
47        let length_min = state.settings.get(&DECOY_NOISY_DECOY_LENGTH_MIN) as usize;
48        let length_max = state.settings.get(&DECOY_NOISY_LENGTH_MAX) as usize;
49        let length_jitter = state.settings.get(&DECOY_NOISY_DECOY_LENGTH_JITTER);
50
51        let quietness = state.quietness_index();
52        let mean_length = (length_min as f64) + quietness * (-state.packet_rate / state.reference_rate).exp() * ((length_max - length_min) as f64);
53        let decoy_length = random_gauss(mean_length, length_jitter * mean_length);
54
55        (decoy_length as usize).clamp(length_min, length_max)
56    }
57
58    async fn timer_task(manager: Weak<dyn DecoyFlowSender>, state: Arc<RwLock<DecoyState<T, AE>>>) {
59        loop {
60            let delay = {
61                let state_guard = state.read().await;
62                let remaining = state_guard.next_decoy_time.saturating_sub(unix_timestamp_ms());
63                Duration::from_millis(remaining as u64)
64            };
65
66            sleep(delay).await;
67
68            let Some(manager_arc) = manager.upgrade() else {
69                warn!("NoisyDecoyProvider: manager dropped, stopping timer");
70                break;
71            };
72
73            {
74                let mut state_guard = state.write().await;
75                let decoy_length = state_guard.pending_length;
76
77                if state_guard.try_spend_budget(decoy_length) {
78                    let decoy_packet = state_guard.create_decoy_packet(decoy_length, false);
79                    let should_rep = state_guard.should_replicate(false);
80                    let fallthrough = state_guard.should_fallthrough();
81                    let settings = Arc::clone(&state_guard.settings);
82                    drop(state_guard);
83
84                    let body_buf = should_rep.then(|| settings.pool().allocate_precise_from_slice_with_capacity(decoy_packet.slice_end(decoy_length), 0, 0));
85                    if let Err(err) = manager_arc.send_decoy_packet(decoy_packet, fallthrough, false).await {
86                        warn!("NoisyDecoyProvider: failed to send decoy packet: {err:?}");
87                    } else if let Some(body) = body_buf {
88                        try_replicate(&state, &manager, false, body).await;
89                    }
90                }
91            }
92
93            {
94                let mut state_guard = state.write().await;
95                let delay = Self::calculate_delay(&state_guard);
96                let length = Self::calculate_length(&state_guard);
97                state_guard.schedule_next(delay, length);
98            }
99        }
100    }
101}
102
103#[async_trait]
104impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyProvider for NoisyDecoyProvider<T, AE> {
105    #[inline]
106    fn name(&self) -> &'static str {
107        "NoisyDecoyProvider"
108    }
109
110    async fn start(&self) {
111        let executor = {
112            let lock = self.state.read().await;
113            lock.settings.executor().clone()
114        };
115
116        let manager = self.manager.clone();
117        let state = self.state.clone();
118        executor.spawn(Self::timer_task(manager.clone(), state.clone()));
119        executor.spawn(maintenance_timer_task(manager, state));
120    }
121
122    async fn feed_input(&self, packet: DynamicByteBuffer, _tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
123        let mut state = self.state.write().await;
124        state.update(packet.len(), false);
125        Some(packet)
126    }
127
128    async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
129        let flags = PacketFlags::from_bits_truncate(*tailer_buf.get(FG_OFFSET));
130        if !flags.is_discardable() {
131            let mut state = self.state.write().await;
132            state.update(body.len() + tailer_buf.len(), true);
133        }
134        Some(body)
135    }
136}
137
138impl<T: IdentityType + Clone, AE: AsyncExecutor> DecoyCommunicationMode<T, AE> for NoisyDecoyProvider<T, AE> {
139    fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
140        let state = DecoyState::new(settings.clone(), identity, counter, fallthrough_probability);
141        let delay = Self::calculate_delay(&state);
142        let length = Self::calculate_length(&state);
143        let mut state = state;
144        state.schedule_next(delay, length);
145
146        Self {
147            manager,
148            state: Arc::new(RwLock::new(state)),
149        }
150    }
151}