Skip to main content

typhoon/flow/decoy/
common.rs

1#[cfg(test)]
2#[path = "../../../tests/flow/decoy.rs"]
3mod tests;
4
5/// Shared state and utilities for decoy traffic communication modes.
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicU32, Ordering};
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11
12use async_trait::async_trait;
13use log::{debug, info, warn};
14use rand::Rng;
15use rand::seq::SliceRandom;
16use rand_distr::{Distribution, Exp, Normal};
17
18use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
19use crate::cache::DerivedValue;
20use crate::flow::config::{FakeHeaderConfig, FieldType, FieldTypeHolder};
21use crate::flow::error::FlowControllerError;
22use crate::settings::Settings;
23use crate::settings::keys::*;
24use crate::tailer::{IdentityType, Tailer};
25use crate::utils::random::get_rng;
26use crate::utils::sync::{AsyncExecutor, RwLock, sleep};
27use crate::utils::unix_timestamp_ms;
28use crate::weighted_random;
29
30// ── Mode enums ──────────────────────────────────────────────────────────────
31
32/// Maintenance mode for decoy packets.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub(super) enum MaintenanceMode {
35    None,
36    Random,
37    Timed {
38        delay_ms: u64,
39    },
40    Sized {
41        length: usize,
42    },
43    Both {
44        delay_ms: u64,
45        length: usize,
46    },
47}
48
49/// Replication mode for decoy packets.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub(super) enum ReplicationMode {
52    None,
53    Maintenance,
54    All,
55}
56
57/// Subheader mode for decoy packets.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub(super) enum SubheaderMode {
60    None,
61    Maintenance,
62    All,
63}
64
65// ── DecoyFeatureConfig ──────────────────────────────────────────────────────
66
67/// Per-provider configuration for maintenance, replication, and subheader features.
68/// Randomly selected at init.
69pub(super) struct DecoyFeatureConfig {
70    pub(super) maintenance_mode: MaintenanceMode,
71    pub(super) replication_mode: ReplicationMode,
72    pub(super) replication_probability: f64,
73    pub(super) subheader_mode: SubheaderMode,
74    pub(super) subheader_config: Option<FakeHeaderConfig>,
75}
76
77impl DecoyFeatureConfig {
78    pub(super) fn random<AE: AsyncExecutor>(settings: &Settings<AE>) -> Self {
79        let mut rng = get_rng();
80
81        let delay_min = settings.get(&DECOY_MAINTENANCE_DELAY_MIN);
82        let delay_max = settings.get(&DECOY_MAINTENANCE_DELAY_MAX);
83        let length_min = settings.get(&DECOY_MAINTENANCE_LENGTH_MIN) as usize;
84        let length_max = settings.get(&DECOY_MAINTENANCE_LENGTH_MAX) as usize;
85        let fixed_delay = rng.gen_range(delay_min..=delay_max);
86        let fixed_length = rng.gen_range(length_min..=length_max);
87
88        // Maintenance mode: weights from settings (None heavier by default).
89        let maintenance_mode = weighted_random! {
90            settings.get(&DECOY_MAINTENANCE_WEIGHT_NONE) => MaintenanceMode::None,
91            settings.get(&DECOY_MAINTENANCE_WEIGHT_RANDOM) => MaintenanceMode::Random,
92            settings.get(&DECOY_MAINTENANCE_WEIGHT_TIMED) => MaintenanceMode::Timed {
93                delay_ms: fixed_delay,
94            },
95            settings.get(&DECOY_MAINTENANCE_WEIGHT_SIZED) => MaintenanceMode::Sized {
96                length: fixed_length,
97            },
98            settings.get(&DECOY_MAINTENANCE_WEIGHT_BOTH) => MaintenanceMode::Both {
99                delay_ms: fixed_delay,
100                length: fixed_length,
101            },
102        };
103
104        // Replication mode: weights from settings (None heavier by default).
105        let replication_mode = weighted_random! {
106            settings.get(&DECOY_REPLICATION_WEIGHT_NONE) => ReplicationMode::None,
107            settings.get(&DECOY_REPLICATION_WEIGHT_MAINTENANCE) => ReplicationMode::Maintenance,
108            settings.get(&DECOY_REPLICATION_WEIGHT_ALL) => ReplicationMode::All,
109        };
110
111        let prob_min = settings.get(&DECOY_REPLICATION_PROBABILITY_MIN);
112        let prob_max = settings.get(&DECOY_REPLICATION_PROBABILITY_MAX);
113        let replication_probability = rng.gen_range(prob_min..=prob_max);
114
115        // Subheader mode: weights from settings.
116        let subheader_mode = weighted_random! {
117            settings.get(&DECOY_SUBHEADER_WEIGHT_NONE) => SubheaderMode::None,
118            settings.get(&DECOY_SUBHEADER_WEIGHT_MAINTENANCE) => SubheaderMode::Maintenance,
119            settings.get(&DECOY_SUBHEADER_WEIGHT_ALL) => SubheaderMode::All,
120        };
121
122        let subheader_config = if subheader_mode == SubheaderMode::None {
123            None
124        } else {
125            let min_len = settings.get(&DECOY_SUBHEADER_LENGTH_MIN) as usize;
126            let max_len = settings.get(&DECOY_SUBHEADER_LENGTH_MAX) as usize;
127            Some(generate_random_fake_header(settings, min_len, max_len))
128        };
129
130        info!("decoy feature config: maintenance={maintenance_mode:?}, replication={replication_mode:?}, replication_prob={replication_probability:.4}, subheader={subheader_mode:?}");
131
132        Self {
133            maintenance_mode,
134            replication_mode,
135            replication_probability,
136            subheader_mode,
137            subheader_config,
138        }
139    }
140}
141
142/// Generate a random `FakeHeaderConfig` with total byte length in [min_len, max_len].
143fn generate_random_fake_header<AE: AsyncExecutor>(settings: &Settings<AE>, min_len: usize, max_len: usize) -> FakeHeaderConfig {
144    let mut rng = get_rng();
145    let target_len = rng.gen_range(min_len..=max_len);
146    let mut fields = Vec::new();
147    let mut current_len = 0usize;
148
149    while current_len < target_len {
150        let remaining = target_len - current_len;
151        // Pick the largest field size that still fits.
152        let size = if remaining >= 8 {
153            *[1usize, 2, 4, 8].choose(&mut rng).unwrap()
154        } else if remaining >= 4 {
155            *[1usize, 2, 4].choose(&mut rng).unwrap()
156        } else if remaining >= 2 {
157            *[1usize, 2].choose(&mut rng).unwrap()
158        } else {
159            1
160        };
161
162        let field = match size {
163            1 => FieldTypeHolder::U8(random_field_type(settings, &mut rng)),
164            2 => FieldTypeHolder::U16(random_field_type(settings, &mut rng)),
165            4 => FieldTypeHolder::U32(random_field_type(settings, &mut rng)),
166            8 => FieldTypeHolder::U64(random_field_type(settings, &mut rng)),
167            _ => unreachable!(),
168        };
169        fields.push(field);
170        current_len += size;
171    }
172
173    FakeHeaderConfig::new(fields)
174}
175
176/// Generate a random FieldType variant weighted by the `FAKE_HEADER_FIELD_WEIGHT_*` settings.
177fn random_field_type<AE: AsyncExecutor, L: Copy + From<u8>>(settings: &Settings<AE>, rng: &mut impl Rng) -> FieldType<L>
178where
179    rand::distributions::Standard: Distribution<L>,
180{
181    let volatile_prob_min = settings.get(&FAKE_HEADER_VOLATILE_CHANGE_PROB_MIN);
182    let volatile_prob_max = settings.get(&FAKE_HEADER_VOLATILE_CHANGE_PROB_MAX);
183    let switching_timeout_min = settings.get(&FAKE_HEADER_SWITCHING_TIMEOUT_MIN_MS);
184    let switching_timeout_max = settings.get(&FAKE_HEADER_SWITCHING_TIMEOUT_MAX_MS);
185    weighted_random! {
186        settings.get(&FAKE_HEADER_FIELD_WEIGHT_RANDOM) => FieldType::Random,
187        settings.get(&FAKE_HEADER_FIELD_WEIGHT_CONSTANT) => FieldType::Constant {
188            value: rng.r#gen::<L>(),
189        },
190        settings.get(&FAKE_HEADER_FIELD_WEIGHT_VOLATILE) => FieldType::Volatile {
191            value: rng.r#gen::<L>(),
192            change_probability: rng.gen_range(volatile_prob_min..=volatile_prob_max),
193        },
194        settings.get(&FAKE_HEADER_FIELD_WEIGHT_SWITCHING) => {
195            let switch_timeout = rng.gen_range(switching_timeout_min..=switching_timeout_max);
196            FieldType::Switching {
197                value: rng.r#gen::<L>(),
198                next_switch: unix_timestamp_ms() + switch_timeout as u128,
199                switch_timeout,
200            }
201        },
202        settings.get(&FAKE_HEADER_FIELD_WEIGHT_INCREMENTAL) => FieldType::Incremental {
203            value: rng.r#gen::<L>(),
204        }
205    }
206}
207
208// ── Trait ────────────────────────────────────────────────────────────────────
209
210/// Object-safe interface used by decoy providers to dispatch generated packets.
211/// Implemented explicitly by each flow manager — `ClientFlowManager` forwards to its `FlowManager::send_packet`, `ServerFlowManager` forwards to its inherent `send_packet`.
212pub trait DecoyFlowSender: Send + Sync {
213    /// Send a generated decoy packet through the flow manager. `fallthrough` skips the tailer step (see `FlowManager::send_packet`).
214    fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>>;
215}
216
217/// Object-safe runtime interface for decoy traffic. Used as `Arc<dyn DecoyProvider>` in
218/// flow managers — no external lock wraps it, so implementations must manage their own
219/// mutable state via interior mutability (e.g. `Arc<RwLock<_>>`, as every built-in provider
220/// does). All async methods are boxed automatically by `async_trait`.
221#[async_trait]
222pub trait DecoyProvider: Send + Sync {
223    /// Short display name of this provider (e.g. "SparseDecoyProvider").
224    fn name(&self) -> &'static str;
225
226    /// Start the background decoy generation timer.
227    async fn start(&self);
228
229    /// Process an incoming packet, updating internal rate tracking.
230    /// `tailer_buf` is the deobfuscated tailer for the packet (flags, packet number, etc.).
231    async fn feed_input(&self, packet: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer>;
232
233    /// Process an outgoing packet body and its plaintext tailer, updating internal rate tracking.
234    /// Returns the (possibly modified) body, or `None` to suppress the packet entirely.
235    async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer>;
236}
237
238/// Construction contract for decoy providers. Extends `DecoyProvider` so that any
239/// `DecoyCommunicationMode` can be stored as `Box<dyn DecoyProvider>`.
240pub trait DecoyCommunicationMode<T: IdentityType + Clone, AE: AsyncExecutor>: DecoyProvider + Sized {
241    /// Short name of this provider, derived from the type name (no path, no generics).
242    fn name() -> &'static str {
243        let full = std::any::type_name::<Self>();
244        let without_generics = full.split('<').next().unwrap_or(full);
245        without_generics.split("::").last().unwrap_or(without_generics)
246    }
247
248    /// Create a new decoy provider; `counter` is the per-session monotonic packet-number
249    /// counter shared with the session manager and the health-check provider; every emitted
250    /// decoy packet advances it. `fallthrough_probability` pins the per-flow fallthrough rate,
251    /// `None` samples from the settings keys.
252    fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self;
253}
254
255// ── DecoyState ──────────────────────────────────────────────────────────────
256
257/// Internal state for tracking packet rates and byte budgets.
258/// This state is shared by all communication modes.
259pub(crate) struct DecoyState<T: IdentityType + Clone, AE: AsyncExecutor> {
260    pub(super) settings: Arc<Settings<AE>>,
261    /// Long-term reference transmission rate in packets (milliseconds between packets).
262    pub(super) reference_rate: f64,
263    /// Current transmission rate in packets (milliseconds between packets).
264    pub(super) packet_rate: f64,
265    /// Current transmission rate in bytes.
266    pub(super) byte_rate: f64,
267    /// Number of decoy packet bytes allowed to send now.
268    pub(super) byte_budget: f64,
269    /// Timestamp of the previous packet.
270    previous_packet_time: Option<u128>,
271    /// Maximum allowed length of decoy packets.
272    pub(super) packet_length_cap: usize,
273    /// Per-session monotonic packet-number counter, shared with the session manager and the
274    /// health-check provider. Every emitted decoy advances it.
275    counter: Arc<AtomicU32>,
276    /// Live source of the current session identity for decoy tailers; re-read on every emitted
277    /// decoy so the identity follows session-identity rotation rather than freezing at construction.
278    identity: DerivedValue<T>,
279    /// Next scheduled decoy time (milliseconds since epoch).
280    pub(super) next_decoy_time: u128,
281    /// Pre-computed length for next decoy.
282    pub(super) pending_length: usize,
283    /// Maintenance, replication, and subheader configuration.
284    pub(super) features: DecoyFeatureConfig,
285    /// Next scheduled maintenance time (milliseconds since epoch).
286    pub(super) next_maintenance_time: u128,
287    /// Pre-computed length for next maintenance packet.
288    pub(super) pending_maintenance_length: usize,
289    /// Per-flow probability that a generated decoy packet bypasses the tailer step.
290    fallthrough_probability: f64,
291}
292
293impl<T: IdentityType + Clone, AE: AsyncExecutor> DecoyState<T, AE> {
294    /// Build a fresh decoy state. `counter` is the per-session monotonic PN counter shared
295    /// with the session manager and the health-check provider; `fallthrough_probability`
296    /// pins the per-flow probability, `None` samples from `DECOY_FALLTHROUGH_PACKETS_{MIN,MAX}`.
297    pub(super) fn new(settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
298        let byte_rate_cap = settings.get(&DECOY_BYTE_RATE_CAP);
299        let byte_rate_factor = settings.get(&DECOY_BYTE_RATE_FACTOR);
300        let length_max = settings.get(&DECOY_LENGTH_MAX) as usize;
301        let length_min = settings.get(&DECOY_LENGTH_MIN) as usize;
302
303        let now = unix_timestamp_ms();
304        let features = DecoyFeatureConfig::random(&settings);
305
306        // Initial maintenance scheduling.
307        let (maint_time, maint_len) = if features.maintenance_mode == MaintenanceMode::None {
308            (u128::MAX, 0)
309        } else {
310            let delay = maintenance_delay_for(&features.maintenance_mode, &settings);
311            let length = maintenance_length_for(&features.maintenance_mode, &settings);
312            (now + delay as u128, length)
313        };
314
315        let fallthrough_probability = fallthrough_probability.unwrap_or_else(|| {
316            let lo = settings.get(&DECOY_FALLTHROUGH_PACKETS_MIN);
317            let hi = settings.get(&DECOY_FALLTHROUGH_PACKETS_MAX);
318            if lo >= hi {
319                lo
320            } else {
321                get_rng().gen_range(lo..=hi)
322            }
323        });
324
325        Self {
326            settings: settings.clone(),
327            reference_rate: settings.get(&DECOY_REFERENCE_PACKET_RATE_DEFAULT),
328            packet_rate: settings.get(&DECOY_CURRENT_PACKET_RATE_DEFAULT),
329            byte_rate: settings.get(&DECOY_CURRENT_BYTE_RATE_DEFAULT),
330            byte_budget: byte_rate_cap * byte_rate_factor / 2.0,
331            previous_packet_time: None,
332            packet_length_cap: length_max.max(length_min),
333            counter,
334            identity,
335            next_decoy_time: now,
336            pending_length: length_min,
337            features,
338            next_maintenance_time: maint_time,
339            pending_maintenance_length: maint_len,
340            fallthrough_probability,
341        }
342    }
343
344    /// Roll a coin against `fallthrough_probability`; `true` ⇒ next decoy bypasses the tailer.
345    #[inline]
346    pub(super) fn should_fallthrough(&self) -> bool {
347        if self.fallthrough_probability <= 0.0 {
348            false
349        } else if self.fallthrough_probability >= 1.0 {
350            true
351        } else {
352            get_rng().r#gen::<f64>() < self.fallthrough_probability
353        }
354    }
355
356    /// Update rate-tracking state when a packet passes through.
357    pub(super) fn update(&mut self, packet_length: usize, outgoing_real: bool) {
358        let current_time = unix_timestamp_ms();
359
360        if let Some(prev_time) = self.previous_packet_time {
361            let time_delta = (current_time - prev_time) as f64;
362
363            let reference_alpha = self.settings.get(&DECOY_REFERENCE_ALPHA);
364            let current_alpha = self.settings.get(&DECOY_CURRENT_ALPHA);
365            let byte_rate_cap = self.settings.get(&DECOY_BYTE_RATE_CAP);
366            let byte_rate_factor = self.settings.get(&DECOY_BYTE_RATE_FACTOR);
367
368            self.reference_rate = (1.0 - reference_alpha) * self.reference_rate + reference_alpha * time_delta;
369            self.packet_rate = (1.0 - current_alpha) * self.packet_rate + current_alpha * time_delta;
370            self.byte_rate = (1.0 - current_alpha) * self.byte_rate + current_alpha * (packet_length as f64);
371            let refill = time_delta * byte_rate_cap / 1000.0;
372            let deduct = if outgoing_real {
373                packet_length as f64
374            } else {
375                0.0
376            };
377            self.byte_budget = (self.byte_budget + refill - deduct).clamp(0.0, byte_rate_cap * byte_rate_factor);
378        }
379
380        self.previous_packet_time = Some(current_time);
381    }
382
383    /// Get quietness index: how quiet the traffic is (0 = busy, 1 = quiet).
384    pub(super) fn quietness_index(&self) -> f64 {
385        ((self.reference_rate - self.packet_rate) / self.reference_rate).clamp(0.0, 1.0)
386    }
387
388    /// Bump the per-session counter and return the next packet number
389    /// (`timestamp_seconds << 32 | counter`). Decoy emissions share this counter with the
390    /// session manager and the health-check provider, so the resulting PN stream is monotonic
391    /// across every packet type the session produces.
392    fn next_packet_number(&self) -> u64 {
393        let counter = self.counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
394        let timestamp = (unix_timestamp_ms() / 1000) as u32;
395        ((timestamp as u64) << 32) | counter as u64
396    }
397
398    /// Create a decoy packet with the given body length.
399    /// If `is_maintenance` is true and the subheader mode applies, a subheader is prepended.
400    pub(super) fn create_decoy_packet(&mut self, body_length: usize, is_maintenance: bool) -> DynamicByteBuffer {
401        let subheader_len = self.subheader_length(is_maintenance);
402        let total_length = body_length + Tailer::<T>::len();
403        let packet = self.settings.pool().allocate(Some(total_length));
404
405        get_rng().fill(packet.slice_end_mut(body_length));
406
407        let pn = self.next_packet_number();
408        Tailer::decoy(packet.rebuffer_start(body_length), &self.identity.get(), pn);
409
410        if subheader_len > 0 {
411            let expanded = packet.expand_start(subheader_len);
412            if let Some(ref mut config) = self.features.subheader_config {
413                config.fill(expanded.rebuffer_end(expanded.len() - subheader_len));
414            }
415            return expanded;
416        }
417
418        packet
419    }
420
421    /// Create a replica of the given decoy body (same body bytes, new tailer).
422    pub(super) fn create_replica_packet(&mut self, original_body: &[u8], is_maintenance: bool) -> DynamicByteBuffer {
423        let subheader_len = self.subheader_length(is_maintenance);
424        let body_length = original_body.len();
425        let total_length = body_length + Tailer::<T>::len();
426        let packet = self.settings.pool().allocate(Some(total_length));
427
428        packet.slice_end_mut(body_length).copy_from_slice(original_body);
429
430        let pn = self.next_packet_number();
431        Tailer::decoy(packet.rebuffer_start(body_length), &self.identity.get(), pn);
432
433        if subheader_len > 0 {
434            let expanded = packet.expand_start(subheader_len);
435            if let Some(ref mut config) = self.features.subheader_config {
436                config.fill(expanded.rebuffer_end(expanded.len() - subheader_len));
437            }
438            return expanded;
439        }
440
441        packet
442    }
443
444    /// Try to spend byte budget for a decoy packet.
445    /// Returns true if budget was sufficient and has been deducted.
446    pub(super) fn try_spend_budget(&mut self, bytes: usize) -> bool {
447        if self.byte_budget >= bytes as f64 {
448            self.byte_budget -= bytes as f64;
449            true
450        } else {
451            false
452        }
453    }
454
455    /// Schedule the next decoy packet.
456    pub(super) fn schedule_next(&mut self, delay: u64, length: usize) {
457        self.next_decoy_time = unix_timestamp_ms() + delay as u128;
458        self.pending_length = length;
459    }
460
461    /// Schedule the next maintenance packet.
462    pub(super) fn schedule_next_maintenance(&mut self) {
463        let delay = maintenance_delay_for(&self.features.maintenance_mode, &self.settings);
464        let length = maintenance_length_for(&self.features.maintenance_mode, &self.settings);
465        self.next_maintenance_time = unix_timestamp_ms() + delay as u128;
466        self.pending_maintenance_length = length;
467    }
468
469    /// Returns the subheader byte length for a packet, or 0 if no subheader applies.
470    fn subheader_length(&self, is_maintenance: bool) -> usize {
471        let should_apply = match self.features.subheader_mode {
472            SubheaderMode::None => false,
473            SubheaderMode::Maintenance => is_maintenance,
474            SubheaderMode::All => true,
475        };
476        if should_apply {
477            self.features.subheader_config.as_ref().map_or(0, super::super::config::FakeHeaderConfig::len)
478        } else {
479            0
480        }
481    }
482
483    /// Check if a packet should be replicated.
484    pub(super) fn should_replicate(&self, is_maintenance: bool) -> bool {
485        match self.features.replication_mode {
486            ReplicationMode::None => false,
487            ReplicationMode::Maintenance => is_maintenance,
488            ReplicationMode::All => true,
489        }
490    }
491}
492
493// ── Maintenance / Replication helpers ───────────────────────────────────────
494
495/// Get maintenance delay for the given mode.
496fn maintenance_delay_for<AE: AsyncExecutor>(mode: &MaintenanceMode, settings: &Settings<AE>) -> u64 {
497    match *mode {
498        MaintenanceMode::Timed {
499            delay_ms,
500        }
501        | MaintenanceMode::Both {
502            delay_ms,
503            ..
504        } => delay_ms,
505        _ => {
506            let min = settings.get(&DECOY_MAINTENANCE_DELAY_MIN);
507            let max = settings.get(&DECOY_MAINTENANCE_DELAY_MAX);
508            random_uniform(min as f64, max as f64) as u64
509        }
510    }
511}
512
513/// Get maintenance packet length for the given mode.
514fn maintenance_length_for<AE: AsyncExecutor>(mode: &MaintenanceMode, settings: &Settings<AE>) -> usize {
515    match *mode {
516        MaintenanceMode::Sized {
517            length,
518        }
519        | MaintenanceMode::Both {
520            length,
521            ..
522        } => length,
523        _ => {
524            let min = settings.get(&DECOY_MAINTENANCE_LENGTH_MIN) as usize;
525            let max = settings.get(&DECOY_MAINTENANCE_LENGTH_MAX) as usize;
526            random_uniform(min as f64, max as f64) as usize
527        }
528    }
529}
530
531/// Background maintenance timer task. Runs independently of the communication mode timer.
532/// Returns immediately if maintenance mode is `None`.
533pub(super) async fn maintenance_timer_task<T, AE>(manager: Weak<dyn DecoyFlowSender>, state: Arc<RwLock<DecoyState<T, AE>>>)
534where
535    T: IdentityType + Clone + 'static,
536    AE: AsyncExecutor + 'static,
537{
538    {
539        let guard = state.read().await;
540        if guard.features.maintenance_mode == MaintenanceMode::None {
541            return;
542        }
543    }
544
545    loop {
546        let delay = {
547            let guard = state.read().await;
548            let remaining = guard.next_maintenance_time.saturating_sub(unix_timestamp_ms());
549            Duration::from_millis(remaining as u64)
550        };
551
552        sleep(delay).await;
553
554        let Some(manager_arc) = manager.upgrade() else {
555            warn!("Maintenance timer: manager dropped, stopping");
556            break;
557        };
558
559        let (packet, body_length, should_rep, fallthrough, settings) = {
560            let mut guard = state.write().await;
561            let length = guard.pending_maintenance_length;
562
563            if !guard.try_spend_budget(length) {
564                guard.schedule_next_maintenance();
565                continue;
566            }
567
568            let packet = guard.create_decoy_packet(length, true);
569            let should_rep = guard.should_replicate(true);
570            let fallthrough = guard.should_fallthrough();
571            let settings = Arc::clone(&guard.settings);
572            (packet, length, should_rep, fallthrough, settings)
573        };
574
575        let body_buf = should_rep.then(|| settings.pool().allocate_precise_from_slice_with_capacity(packet.slice_end(body_length), 0, 0));
576
577        debug!("Maintenance: generated packet (len={body_length})");
578
579        if let Err(err) = manager_arc.send_decoy_packet(packet, fallthrough, true).await {
580            warn!("Maintenance: failed to send: {err:?}");
581        } else if let Some(body) = body_buf {
582            try_replicate(&state, &manager, true, body).await;
583        }
584
585        {
586            let mut guard = state.write().await;
587            guard.schedule_next_maintenance();
588        }
589    }
590}
591
592/// Attempt replication of a decoy packet. If replication mode applies, spawns a cascading
593/// task that re-sends the packet body with diminishing probability.
594pub(super) async fn try_replicate<T, AE>(state: &Arc<RwLock<DecoyState<T, AE>>>, manager: &Weak<dyn DecoyFlowSender>, is_maintenance: bool, body: DynamicByteBuffer)
595where
596    T: IdentityType + Clone + 'static,
597    AE: AsyncExecutor + 'static,
598{
599    let (probability, delay_min, delay_max, reduce, executor) = {
600        let guard = state.read().await;
601        if !guard.should_replicate(is_maintenance) {
602            return;
603        }
604        (guard.features.replication_probability, guard.settings.get(&DECOY_REPLICATION_DELAY_MIN), guard.settings.get(&DECOY_REPLICATION_DELAY_MAX), guard.settings.get(&DECOY_REPLICATION_PROBABILITY_REDUCE), guard.settings.executor().clone())
605    };
606
607    let state_clone = Arc::clone(state);
608    let manager_clone = manager.clone();
609
610    executor.spawn(async move {
611        let mut current_probability = probability;
612        loop {
613            if get_rng().r#gen::<f64>() >= current_probability {
614                break;
615            }
616
617            let delay = random_uniform(delay_min as f64, delay_max as f64) as u64;
618            sleep(Duration::from_millis(delay)).await;
619
620            let Some(manager_arc) = manager_clone.upgrade() else {
621                break;
622            };
623
624            let (packet, fallthrough) = {
625                let mut guard = state_clone.write().await;
626                if !guard.try_spend_budget(body.slice().len()) {
627                    break;
628                }
629                let replica = guard.create_replica_packet(body.slice(), is_maintenance);
630                (replica, guard.should_fallthrough())
631            };
632
633            if manager_arc.send_decoy_packet(packet, fallthrough, is_maintenance).await.is_err() {
634                break;
635            }
636
637            current_probability /= reduce;
638        }
639    });
640}
641
642// ── Random utility functions ────────────────────────────────────────────────
643
644/// Random uniform distribution between min and max.
645#[inline]
646pub(super) fn random_uniform(min: f64, max: f64) -> f64 {
647    get_rng().gen_range(min..=max)
648}
649
650/// Gaussian random with mean and standard deviation.
651#[inline]
652pub(super) fn random_gauss(mean: f64, sigma: f64) -> f64 {
653    if sigma <= 0.0 {
654        return mean;
655    }
656    let normal = Normal::new(mean, sigma).unwrap_or_else(|_| Normal::new(mean, 1.0).unwrap());
657    normal.sample(&mut get_rng())
658}
659
660/// Exponential random with rate (mean = 1/rate).
661#[inline]
662pub(super) fn exponential_variance(rate: f64) -> f64 {
663    if rate <= 0.0 {
664        return f64::MAX;
665    }
666    let exp = Exp::new(rate).unwrap_or_else(|_| Exp::new(1.0).unwrap());
667    exp.sample(&mut get_rng())
668}