use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "async-std")]
use async_io::Timer;
use env_logger::init;
use futures::channel::oneshot::channel;
#[cfg(not(feature = "tokio"))]
use futures::executor::block_on;
use rand::Rng;
#[cfg(feature = "tokio")]
use tokio::runtime::Runtime;
#[cfg(feature = "tokio")]
use tokio::time::sleep;
use typhoon::bytes::StaticByteBuffer;
use typhoon::certificate::ServerKeyPair;
use typhoon::defaults::{AsyncExecutor, DefaultClientConnectionHandler, DefaultExecutor, DefaultServerConnectionHandler};
use typhoon::flow::decoy::{DecoyFactory, HeavyDecoyProvider, SimpleDecoyProvider, SmoothDecoyProvider, SparseDecoyProvider, decoy_factory, random_decoy_factory};
use typhoon::flow::{FakeBodyMode, FakeHeaderConfig, FieldType, FieldTypeHolder, FlowConfig};
use typhoon::settings::SettingsBuilder;
use typhoon::socket::{ClientSocketBuilder, ListenerBuilder, ServerFlowConfiguration};
const SERVER_ADDR: &str = "127.0.0.1:19991";
const MSG_COUNT: usize = 100;
const CLUSTER_SIZE: usize = 10;
const INTRA_CLUSTER_MS: u64 = 5;
const INTRA_CLUSTER_MIN_MS: u64 = 5;
const INTRA_CLUSTER_MAX_MS: u64 = 50;
const INTER_CLUSTER_MS: u64 = 200;
const PAYLOAD_FIXED: usize = 1024;
const PAYLOAD_MIN: usize = 64;
const PAYLOAD_MAX: usize = 2048;
type Ident = StaticByteBuffer;
type Exec = DefaultExecutor;
#[cfg(feature = "tokio")]
fn main() {
Runtime::new().expect("tokio runtime").block_on(run());
}
#[cfg(not(feature = "tokio"))]
fn main() {
block_on(run());
}
#[cfg(feature = "tokio")]
async fn sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}
#[cfg(feature = "async-std")]
async fn sleep_ms(ms: u64) {
Timer::after(Duration::from_millis(ms)).await;
}
async fn run() {
init();
let settings = Arc::new(SettingsBuilder::<Exec>::new().build().expect("settings"));
let server_addr = SERVER_ADDR.parse().expect("valid address");
let use_case = std::env::var("TYPHOON_USE_CASE").unwrap_or_else(|_| "default".to_string());
let random_payload = std::env::var("TYPHOON_RANDOM_PAYLOAD").is_ok();
let random_wait = std::env::var("TYPHOON_RANDOM_WAIT").is_ok();
let (flow_config, decoy): (FlowConfig, DecoyFactory<Ident, Exec>) = match use_case.as_str() {
"throughput" => (FlowConfig::new(FakeBodyMode::Empty, FakeHeaderConfig::new(vec![])), decoy_factory::<Ident, Exec, SimpleDecoyProvider>()),
"interactive" => (
FlowConfig::new(
FakeBodyMode::Random {
min_length: 8,
max_length: 64,
service: true,
},
FakeHeaderConfig::new(vec![]),
),
decoy_factory::<Ident, Exec, SparseDecoyProvider<Ident, Exec>>(),
),
"transparent" => (
FlowConfig::new(
FakeBodyMode::Constant {
packet_length: 1024,
},
FakeHeaderConfig::new(vec![
FieldTypeHolder::U32(FieldType::Constant {
value: 0x48545450u32,
}),
FieldTypeHolder::U32(FieldType::Incremental {
value: 0u32,
}),
]),
),
decoy_factory::<Ident, Exec, SmoothDecoyProvider<Ident, Exec>>(),
),
"security" => (
FlowConfig::new(
FakeBodyMode::Random {
min_length: 256,
max_length: 1024,
service: false,
},
FakeHeaderConfig::new(vec![FieldTypeHolder::U64(FieldType::Random), FieldTypeHolder::U64(FieldType::Random), FieldTypeHolder::U64(FieldType::Random), FieldTypeHolder::U64(FieldType::Random)]),
),
decoy_factory::<Ident, Exec, HeavyDecoyProvider<Ident, Exec>>(),
),
_ => (FlowConfig::random(&settings), random_decoy_factory::<Ident, Exec>()),
};
let key_pair = ServerKeyPair::generate();
let certificate = key_pair.to_client_certificate(vec![server_addr]);
let server_flow = ServerFlowConfiguration::<Ident, Exec>::with_address(flow_config.clone(), server_addr).with_decoy_factory(decoy.clone());
let listener: Arc<_> = Arc::new(ListenerBuilder::<Ident, Exec, DefaultServerConnectionHandler>::new(key_pair, DefaultServerConnectionHandler).add_flow(server_flow).with_settings(settings.clone()).build().await.expect("listener"));
listener.start().await;
let (done_tx, done_rx) = channel::<usize>();
let listener_handle = listener.clone();
let server_random_wait = random_wait;
settings.executor().spawn(async move {
let conn = listener_handle.accept().await.expect("accept");
let mut count = 0;
while count < MSG_COUNT {
let data = conn.receive_bytes().await.expect("recv");
if server_random_wait {
let delay = rand::thread_rng().gen_range(INTRA_CLUSTER_MIN_MS..=INTRA_CLUSTER_MAX_MS);
sleep_ms(delay).await;
}
conn.send_bytes(&data).await.expect("echo");
count += 1;
}
let _ = done_tx.send(count);
});
let socket = ClientSocketBuilder::<Ident, Exec, DefaultClientConnectionHandler>::new(certificate, DefaultClientConnectionHandler).with_settings(settings.clone()).with_decoy_factory(decoy).with_flow_config(server_addr, flow_config).build().await.expect("client socket");
let max_payload = socket.max_data_payload();
let cap = if random_payload {
max_payload.min(PAYLOAD_MAX)
} else {
PAYLOAD_FIXED.min(max_payload)
};
let mut rng = rand::thread_rng();
let clusters = MSG_COUNT / CLUSTER_SIZE;
for cluster_idx in 0..clusters {
for msg_idx in 0..CLUSTER_SIZE {
let size = if random_payload {
rng.gen_range(PAYLOAD_MIN..=cap)
} else {
cap
};
let payload: Vec<u8> = (0..size).map(|j| (j % 256) as u8).collect();
socket.send_bytes(&payload).await.expect("send");
socket.receive_bytes().await.expect("receive echo");
if random_wait {
sleep_ms(rng.gen_range(INTRA_CLUSTER_MIN_MS..=INTRA_CLUSTER_MAX_MS)).await;
} else if msg_idx + 1 < CLUSTER_SIZE {
sleep_ms(INTRA_CLUSTER_MS).await;
}
}
if !random_wait && cluster_idx + 1 < clusters {
sleep_ms(INTER_CLUSTER_MS).await;
}
}
done_rx.await.expect("server task");
println!("use_case={use_case} random_payload={random_payload} random_wait={random_wait} done.");
}