Skip to main content

typhoon/flow/decoy/
smooth.rs

1/// Smooth mode: sends few average decoy packets during quiet periods, filling gaps between data packets.
2use std::sync::atomic::AtomicU32;
3use std::sync::{Arc, Weak};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use log::warn;
8
9use crate::bytes::{ByteBuffer, DynamicByteBuffer};
10use crate::cache::DerivedValue;
11use crate::flow::decoy::common::{DecoyCommunicationMode, DecoyFlowSender, DecoyProvider, DecoyState, maintenance_timer_task, random_uniform, try_replicate};
12use crate::settings::Settings;
13use crate::settings::consts::FG_OFFSET;
14use crate::settings::keys::*;
15use crate::tailer::{IdentityType, PacketFlags};
16use crate::utils::sync::{AsyncExecutor, RwLock, sleep};
17use crate::utils::unix_timestamp_ms;
18
19/// Smooth mode implements sending few average decoy packets during quiet periods.
20pub struct SmoothDecoyProvider<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> {
21    manager: Weak<dyn DecoyFlowSender>,
22    state: Arc<RwLock<DecoyState<T, AE>>>,
23}
24
25impl<T: IdentityType + Clone, AE: AsyncExecutor> SmoothDecoyProvider<T, AE> {
26    fn calculate_delay(state: &DecoyState<T, AE>) -> u64 {
27        let base_rate_rnd = state.settings.get(&DECOY_BASE_RATE_RND);
28        let smooth_base_rate = state.settings.get(&DECOY_SMOOTH_BASE_RATE);
29        let quietness_factor = state.settings.get(&DECOY_SMOOTH_QUIETNESS_FACTOR);
30        let rate_factor = state.settings.get(&DECOY_SMOOTH_RATE_FACTOR);
31        let jitter = state.settings.get(&DECOY_SMOOTH_JITTER);
32        let delay_factor = state.settings.get(&DECOY_SMOOTH_DELAY_FACTOR);
33        let delay_min = state.settings.get(&DECOY_SMOOTH_DELAY_MIN);
34        let delay_max = state.settings.get(&DECOY_SMOOTH_DELAY_MAX);
35        let delay_default = state.settings.get(&DECOY_SMOOTH_DELAY_DEFAULT);
36
37        let base_rate = smooth_base_rate * random_uniform(1.0 - base_rate_rnd, 1.0 + base_rate_rnd);
38        let quietness = state.quietness_index();
39        let rate = base_rate * quietness.powf(quietness_factor) * (-rate_factor * state.packet_rate / state.reference_rate).exp();
40
41        let delay = if rate > 0.0 {
42            random_uniform(1.0 - jitter, 1.0 + jitter) * (1.0 + delay_factor * (state.packet_rate / state.reference_rate)) / rate * 1000.0
43        } else {
44            delay_default as f64
45        };
46
47        (delay as u64).clamp(delay_min, delay_max)
48    }
49
50    pub(crate) fn calculate_length(state: &DecoyState<T, AE>) -> usize {
51        let length_min = state.settings.get(&DECOY_SMOOTH_LENGTH_MIN) as usize;
52        let length_max = state.settings.get(&DECOY_SMOOTH_LENGTH_MAX) as usize;
53
54        let quietness = state.quietness_index();
55        let mean_length = (length_min as f64) + quietness * (-state.packet_rate / state.reference_rate).exp() * ((length_max - length_min) as f64);
56        let decoy_length = random_uniform(length_min as f64, mean_length);
57
58        (decoy_length as usize).clamp(length_min, length_max)
59    }
60
61    async fn timer_task(manager: Weak<dyn DecoyFlowSender>, state: Arc<RwLock<DecoyState<T, AE>>>) {
62        loop {
63            let delay = {
64                let state_guard = state.read().await;
65                let remaining = state_guard.next_decoy_time.saturating_sub(unix_timestamp_ms());
66                Duration::from_millis(remaining as u64)
67            };
68
69            sleep(delay).await;
70
71            let Some(manager_arc) = manager.upgrade() else {
72                warn!("SmoothDecoyProvider: manager dropped, stopping timer");
73                break;
74            };
75
76            {
77                let mut state_guard = state.write().await;
78                let decoy_length = state_guard.pending_length;
79
80                if state_guard.try_spend_budget(decoy_length) {
81                    let decoy_packet = state_guard.create_decoy_packet(decoy_length, false);
82                    let should_rep = state_guard.should_replicate(false);
83                    let fallthrough = state_guard.should_fallthrough();
84                    let settings = Arc::clone(&state_guard.settings);
85                    drop(state_guard);
86
87                    let body_buf = should_rep.then(|| settings.pool().allocate_precise_from_slice_with_capacity(decoy_packet.slice_end(decoy_length), 0, 0));
88                    if let Err(err) = manager_arc.send_decoy_packet(decoy_packet, fallthrough, false).await {
89                        warn!("SmoothDecoyProvider: failed to send decoy packet: {err:?}");
90                    } else if let Some(body) = body_buf {
91                        try_replicate(&state, &manager, false, body).await;
92                    }
93                }
94            }
95
96            {
97                let mut state_guard = state.write().await;
98                let delay = Self::calculate_delay(&state_guard);
99                let length = Self::calculate_length(&state_guard);
100                state_guard.schedule_next(delay, length);
101            }
102        }
103    }
104}
105
106#[async_trait]
107impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyProvider for SmoothDecoyProvider<T, AE> {
108    #[inline]
109    fn name(&self) -> &'static str {
110        "SmoothDecoyProvider"
111    }
112
113    async fn start(&self) {
114        let executor = {
115            let lock = self.state.read().await;
116            lock.settings.executor().clone()
117        };
118
119        let manager = self.manager.clone();
120        let state = self.state.clone();
121        executor.spawn(Self::timer_task(manager.clone(), state.clone()));
122        executor.spawn(maintenance_timer_task(manager, state));
123    }
124
125    async fn feed_input(&self, packet: DynamicByteBuffer, _tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
126        let mut state = self.state.write().await;
127        state.update(packet.len(), false);
128        Some(packet)
129    }
130
131    async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
132        let flags = PacketFlags::from_bits_truncate(*tailer_buf.get(FG_OFFSET));
133        if !flags.is_discardable() {
134            let mut state = self.state.write().await;
135            state.update(body.len() + tailer_buf.len(), true);
136        }
137        Some(body)
138    }
139}
140
141impl<T: IdentityType + Clone, AE: AsyncExecutor> DecoyCommunicationMode<T, AE> for SmoothDecoyProvider<T, AE> {
142    fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
143        let state = DecoyState::new(settings.clone(), identity, counter, fallthrough_probability);
144        let delay = Self::calculate_delay(&state);
145        let length = Self::calculate_length(&state);
146        let mut state = state;
147        state.schedule_next(delay, length);
148
149        Self {
150            manager,
151            state: Arc::new(RwLock::new(state)),
152        }
153    }
154}