use std::sync::Arc;
use tokio::sync::{Mutex, broadcast};
use crate::masking::cover::{CoverTrafficGenerator, TrafficProfile};
use crate::pipeline::SrxPipeline;
use crate::seed::SeedRng;
pub struct CoverTrafficHandle {
shutdown_tx: broadcast::Sender<()>,
}
impl CoverTrafficHandle {
pub fn stop(&self) {
let _ = self.shutdown_tx.send(());
}
}
pub fn spawn_cover_traffic(
pipeline: Arc<Mutex<SrxPipeline>>,
seed: [u8; 32],
profile: TrafficProfile,
) -> CoverTrafficHandle {
let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(1);
let mut cover_gen = CoverTrafficGenerator::new(SeedRng::new(seed), profile);
tokio::spawn(async move {
let stopped = async {
loop {
let event = cover_gen.next_event();
tokio::time::sleep(event.delay).await;
let mut rng = SeedRng::new(seed);
let payload: Vec<u8> = (0..event.size).map(|_| rng.next_u64() as u8).collect();
let mut pipe = pipeline.lock().await;
match pipe.send(&payload).await {
Ok(kind) => {
tracing::trace!(?kind, size = event.size, "cover traffic sent");
}
Err(e) => {
tracing::debug!(error = %e, "cover traffic send failed (no transports?)");
}
}
}
};
tokio::select! {
biased;
_ = shutdown_rx.recv() => {}
() = stopped => {}
}
tracing::debug!("cover traffic task stopped");
});
CoverTrafficHandle { shutdown_tx }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{AeadCipher as Variant, MimicryMode};
use crate::crypto::AeadPipeline;
use crate::masking::padding::PaddingStrategy;
use crate::session::Session;
use crate::transport::TransportManager;
use std::time::Duration;
#[tokio::test]
async fn cover_traffic_handle_stops_task() {
let seed = [0xCCu8; 32];
let key = [0xDDu8; 32];
let aead = Arc::new(AeadPipeline::new(Variant::ChaCha20Poly1305, &key, 2).unwrap());
let pipe = SrxPipeline::new(
Session::new(1, seed, key),
aead,
PaddingStrategy::new(SeedRng::new(seed), 32),
MimicryMode::None,
None,
TransportManager::new(),
);
let pipe = Arc::new(Mutex::new(pipe));
let handle = spawn_cover_traffic(pipe, seed, TrafficProfile::ApiPolling);
tokio::time::sleep(Duration::from_millis(50)).await;
handle.stop();
tokio::time::sleep(Duration::from_millis(20)).await;
}
}