use std::collections::{HashSet, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(feature = "async-std")]
use async_io::Timer;
use async_trait::async_trait;
use env_logger::init;
use futures::channel::oneshot::channel;
#[cfg(not(feature = "tokio"))]
use futures::executor::block_on;
use log::warn;
use rand::Rng;
#[cfg(feature = "tokio")]
use tokio::runtime::Runtime;
#[cfg(feature = "tokio")]
use tokio::time::sleep;
use typhoon::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
use typhoon::certificate::ServerKeyPair;
use typhoon::defaults::{AsyncExecutor, DefaultClientConnectionHandler, DefaultExecutor, DefaultServerConnectionHandler};
use typhoon::flow::decoy::{DecoyCommunicationMode, DecoyFlowSender, DecoyProvider, DerivedValue, IdentityType, PacketFlags, SparseDecoyProvider, Tailer, decoy_factory};
use typhoon::flow::{FakeBodyMode, FakeHeaderConfig, FlowConfig};
use typhoon::settings::consts::{FG_OFFSET, PN_OFFSET};
use typhoon::settings::{Settings, SettingsBuilder};
use typhoon::socket::{ClientSocketBuilder, ListenerBuilder, ServerFlowConfiguration};
const SERVER_ADDR: &str = "127.0.0.1:19993";
const MSG_COUNT: usize = 30;
const PAYLOAD_SIZE: usize = 512;
const INTER_MSG_MS: u64 = 40;
const FLAT_IAT_MS: u64 = 100;
const RANDOM_DECOY_BODY_LEN: usize = 96;
type Ident = typhoon::bytes::StaticByteBuffer;
type Exec = DefaultExecutor;
struct FlatIatDecoyProvider<T: IdentityType + Clone, AE: AsyncExecutor> {
manager: Weak<dyn DecoyFlowSender>,
settings: Arc<Settings<AE>>,
identity: DerivedValue<T>,
state: Arc<Mutex<FlatIatState>>,
}
struct FlatIatState {
queue: VecDeque<(u64, DynamicByteBuffer)>,
reinjected: HashSet<u64>,
counter: Arc<AtomicU32>,
fallthrough_probability: f64,
}
impl FlatIatState {
fn next_packet_number(&self) -> u64 {
let counter = self.counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
let now_millis = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis();
let timestamp = (now_millis / 1000) as u32;
((timestamp as u64) << 32) | counter as u64
}
fn should_fallthrough(&self) -> bool {
if self.fallthrough_probability <= 0.0 {
false
} else if self.fallthrough_probability >= 1.0 {
true
} else {
rand::thread_rng().r#gen::<f64>() < self.fallthrough_probability
}
}
}
impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> FlatIatDecoyProvider<T, AE> {
async fn timer_task(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, state: Arc<Mutex<FlatIatState>>) {
loop {
sleep_ms(FLAT_IAT_MS).await;
let Some(manager_arc) = manager.upgrade() else {
warn!("FlatIatDecoyProvider: manager dropped, stopping timer");
break;
};
let (packet, fallthrough) = {
let mut guard = state.lock().expect("FlatIatDecoyProvider mutex poisoned");
if let Some((pn, real_packet)) = guard.queue.pop_front() {
guard.reinjected.insert(pn);
(real_packet, false)
} else {
let body_len = RANDOM_DECOY_BODY_LEN;
let total = body_len + Tailer::<T>::len();
let buf = settings.pool().allocate(Some(total));
rand::thread_rng().fill(buf.slice_end_mut(body_len));
Tailer::decoy(buf.rebuffer_start(body_len), &identity.get(), guard.next_packet_number());
let fallthrough = guard.should_fallthrough();
(buf, fallthrough)
}
};
if let Err(err) = manager_arc.send_decoy_packet(packet, fallthrough, false).await {
warn!("FlatIatDecoyProvider: send failed: {err:?}");
}
}
}
}
#[async_trait]
impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyProvider for FlatIatDecoyProvider<T, AE> {
fn name(&self) -> &'static str {
"FlatIatDecoyProvider"
}
async fn start(&self) {
let executor = self.settings.executor().clone();
let manager = self.manager.clone();
let settings = self.settings.clone();
let identity = self.identity.clone();
let state = self.state.clone();
executor.spawn(Self::timer_task(manager, settings, identity, state));
}
async fn feed_input(&self, packet: DynamicByteBuffer, _tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
Some(packet)
}
async fn feed_output(&self, body: DynamicByteBuffer, tailer_buf: DynamicByteBuffer) -> Option<DynamicByteBuffer> {
let flags = PacketFlags::from_bits_truncate(*tailer_buf.get(FG_OFFSET));
if !flags.has_payload() {
return Some(body);
}
let pn_bytes: [u8; 8] = tailer_buf.slice()[PN_OFFSET..PN_OFFSET + 8].try_into().expect("8-byte PN");
let pn = u64::from_be_bytes(pn_bytes);
let mut state = self.state.lock().expect("FlatIatDecoyProvider mutex poisoned");
if state.reinjected.remove(&pn) {
return Some(body);
}
let combined = body.expand_end(tailer_buf.len());
state.queue.push_back((pn, combined));
None
}
}
impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyCommunicationMode<T, AE> for FlatIatDecoyProvider<T, AE> {
fn new(manager: Weak<dyn DecoyFlowSender>, settings: Arc<Settings<AE>>, identity: DerivedValue<T>, counter: Arc<AtomicU32>, fallthrough_probability: Option<f64>) -> Self {
Self {
manager,
settings,
identity,
state: Arc::new(Mutex::new(FlatIatState {
queue: VecDeque::new(),
reinjected: HashSet::new(),
counter,
fallthrough_probability: fallthrough_probability.unwrap_or(0.5),
})),
}
}
}
#[cfg(feature = "tokio")]
fn main() {
Runtime::new().expect("tokio runtime").block_on(run());
}
#[cfg(not(feature = "tokio"))]
fn main() {
block_on(run());
}
#[cfg(feature = "tokio")]
async fn sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}
#[cfg(feature = "async-std")]
async fn sleep_ms(ms: u64) {
Timer::after(Duration::from_millis(ms)).await;
}
async fn run() {
init();
let settings = Arc::new(SettingsBuilder::<Exec>::new().build().expect("settings"));
let server_addr = SERVER_ADDR.parse().expect("valid address");
let flow_config = FlowConfig::new(
FakeBodyMode::Random {
min_length: 64,
max_length: 256,
service: true,
},
FakeHeaderConfig::new(vec![]),
);
let server_decoy = decoy_factory::<Ident, Exec, FlatIatDecoyProvider<Ident, Exec>>();
let client_decoy = decoy_factory::<Ident, Exec, SparseDecoyProvider<Ident, Exec>>();
let key_pair = ServerKeyPair::generate();
let certificate = key_pair.to_client_certificate(vec![server_addr]);
let server_flow = ServerFlowConfiguration::<Ident, Exec>::with_address(flow_config.clone(), server_addr).with_decoy_factory(server_decoy);
let listener: Arc<_> = Arc::new(ListenerBuilder::<Ident, Exec, DefaultServerConnectionHandler>::new(key_pair, DefaultServerConnectionHandler).add_flow(server_flow).with_settings(settings.clone()).build().await.expect("listener"));
listener.start().await;
let (done_tx, done_rx) = channel::<usize>();
let listener_handle = listener.clone();
settings.executor().spawn(async move {
let conn = listener_handle.accept().await.expect("accept");
let mut count = 0;
while count < MSG_COUNT {
let data = conn.receive_bytes().await.expect("recv");
conn.send_bytes(&data).await.expect("echo");
count += 1;
}
sleep_ms(FLAT_IAT_MS * 4).await;
let _ = done_tx.send(count);
});
let socket = ClientSocketBuilder::<Ident, Exec, DefaultClientConnectionHandler>::new(certificate, DefaultClientConnectionHandler).with_settings(settings.clone()).with_decoy_factory(client_decoy).with_flow_config(server_addr, flow_config).build().await.expect("client socket");
let payload = vec![0xABu8; PAYLOAD_SIZE];
for _ in 0..MSG_COUNT {
socket.send_bytes(&payload).await.expect("send");
socket.receive_bytes().await.expect("receive echo");
sleep_ms(INTER_MSG_MS).await;
}
done_rx.await.expect("server task");
println!("flat_iat_decoy done: {MSG_COUNT} round-trips complete.");
}