#[cfg(test)]
#[path = "../tests/debug/mod.rs"]
mod tests;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use cfg_if::cfg_if;
use futures::future::{Either, select};
use log::{debug, info, trace};
use crate::bytes::StaticByteBuffer;
use crate::certificate::ClientCertificate;
use crate::defaults::{DefaultExecutor, DefaultSettings};
use crate::flow::config::{FakeBodyMode, FakeHeaderConfig, FlowConfig};
use crate::settings::keys;
use crate::socket::{ClientSocket, ClientSocketBuilder};
use crate::tailer::ClientConnectionHandler;
use crate::utils::sync::sleep;
use crate::utils::unix_timestamp_ms;
cfg_if! {
if #[cfg(feature = "server")] {
use crate::bytes::ByteBuffer;
use crate::settings::consts::DEFAULT_TYPHOON_ID_LENGTH;
use crate::tailer::ServerConnectionHandler;
use crate::utils::random::{SupportRng, get_rng};
}
}
pub const PHASE_REACHABILITY: u32 = 0;
pub const PHASE_RETURN_TIME: u32 = 1;
pub const PHASE_THROUGHPUT: u32 = 2;
const PROBE_HEADER_SIZE: usize = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DebugMode {
Reachability,
ReturnTime,
Throughput,
All,
}
impl DebugMode {
#[inline]
fn run_reachability(self) -> bool {
matches!(self, Self::Reachability | Self::All)
}
#[inline]
fn run_rtt(self) -> bool {
matches!(self, Self::ReturnTime | Self::All)
}
#[inline]
fn run_throughput(self) -> bool {
matches!(self, Self::Throughput | Self::All)
}
}
#[derive(Debug, Clone)]
pub struct DebugResult {
pub reachable: Option<bool>,
pub rtt_ms: Option<f64>,
pub throughput_bps: Option<f64>,
pub packets_sent: usize,
pub packets_received: usize,
}
pub struct DebugClientConnectionHandler;
impl ClientConnectionHandler for DebugClientConnectionHandler {
fn initial_data(&self) -> StaticByteBuffer {
StaticByteBuffer::from_slice(&[])
}
fn version(&self, length: usize) -> StaticByteBuffer {
StaticByteBuffer::empty(length)
}
}
#[cfg(feature = "server")]
pub struct DebugServerConnectionHandler;
#[cfg(feature = "server")]
impl ServerConnectionHandler<StaticByteBuffer> for DebugServerConnectionHandler {
fn generate(&self, _initial_data: &[u8]) -> StaticByteBuffer {
StaticByteBuffer::from_slice(get_rng().random_byte_buffer::<DEFAULT_TYPHOON_ID_LENGTH>().slice())
}
fn initial_data(&self, _identity: &StaticByteBuffer) -> StaticByteBuffer {
StaticByteBuffer::from_slice(&[])
}
fn verify_version(&self, _version_bytes: &[u8]) -> bool {
true
}
}
type DebugSocket = ClientSocket<StaticByteBuffer, DefaultExecutor, DebugClientConnectionHandler>;
fn make_probe(phase: u32, sequence: u32, extra: usize) -> Vec<u8> {
let mut buf = vec![0u8; PROBE_HEADER_SIZE + extra];
buf[0..4].copy_from_slice(&sequence.to_be_bytes());
buf[4..8].copy_from_slice(&phase.to_be_bytes());
buf[8..16].copy_from_slice(&(unix_timestamp_ms() as u64).to_be_bytes());
buf
}
fn stamp_probe(buf: &mut Vec<u8>, sequence: u32) {
buf[0..4].copy_from_slice(&sequence.to_be_bytes());
buf[8..16].copy_from_slice(&(unix_timestamp_ms() as u64).to_be_bytes());
}
fn parse_send_time(data: &[u8]) -> Option<u64> {
data.get(8..16).and_then(|s| s.try_into().ok()).map(u64::from_be_bytes)
}
async fn recv_or_timeout(socket: &DebugSocket, timeout_ms: u64) -> Option<Vec<u8>> {
let recv_fut = pin!(socket.receive_bytes());
let sleep_fut = pin!(sleep(Duration::from_millis(timeout_ms)));
match select(recv_fut, sleep_fut).await {
Either::Left((Ok(data), _)) => Some(data),
_ => None,
}
}
pub async fn run_debug(certificate: ClientCertificate, mode: DebugMode, settings: Arc<DefaultSettings>) -> DebugResult {
let timeout_ms = settings.get(&keys::DEBUG_PROBE_TIMEOUT);
let mut result = DebugResult {
reachable: None,
rtt_ms: None,
throughput_bps: None,
packets_sent: 0,
packets_received: 0,
};
let mut builder = ClientSocketBuilder::<StaticByteBuffer, DefaultExecutor, DebugClientConnectionHandler>::new(certificate.clone(), DebugClientConnectionHandler).with_settings(settings.clone());
for &addr in certificate.addresses() {
builder = builder.with_flow_config(addr, FlowConfig::new(FakeBodyMode::Empty, FakeHeaderConfig::new(vec![])));
}
let socket = match builder.build().await {
Ok(s) => s,
Err(_) => {
if mode.run_reachability() {
result.reachable = Some(false);
}
return result;
}
};
if mode.run_reachability() {
info!("debug probe: reachability phase");
let probe = make_probe(PHASE_REACHABILITY, 0, 0);
result.packets_sent += 1;
trace!("debug probe: sent reachability probe ({} bytes)", probe.len());
if socket.send_bytes(&probe).await.is_ok() {
if recv_or_timeout(&socket, timeout_ms).await.is_some() {
result.packets_received += 1;
result.reachable = Some(true);
debug!("debug probe: reachability OK");
} else {
result.reachable = Some(false);
debug!("debug probe: reachability timeout");
}
} else {
result.reachable = Some(false);
}
}
if mode.run_rtt() {
info!("debug probe: return-time phase");
let probe = make_probe(PHASE_RETURN_TIME, 0, 0);
result.packets_sent += 1;
trace!("debug probe: sent rtt probe ({} bytes)", probe.len());
if socket.send_bytes(&probe).await.is_ok() {
if let Some(response) = recv_or_timeout(&socket, timeout_ms).await {
result.packets_received += 1;
if let Some(send_time) = parse_send_time(&response) {
let rtt = unix_timestamp_ms().saturating_sub(send_time as u128);
result.rtt_ms = Some(rtt as f64);
debug!("debug probe: RTT={:.1}ms", rtt);
}
} else {
debug!("debug probe: RTT probe timed out");
}
}
}
if mode.run_throughput() {
let probe_count = settings.get(&keys::DEBUG_PROBE_COUNT) as usize;
let probe_size = settings.get(&keys::DEBUG_PROBE_SIZE) as usize;
let max_data_payload = socket.max_data_payload();
let probe_payload_size = PROBE_HEADER_SIZE + probe_size;
let chunks_per_probe = probe_payload_size.div_ceil(max_data_payload);
let total_echo_packets = probe_count * chunks_per_probe;
info!("debug probe: throughput phase — {} probe(s) × {}B payload, max_data_payload={}B → {} echo packet(s) expected", probe_count, probe_payload_size, max_data_payload, total_echo_packets);
let mut probe_buf = make_probe(PHASE_THROUGHPUT, 0, probe_size);
let start_ms = unix_timestamp_ms();
for seq in 0..probe_count as u32 {
stamp_probe(&mut probe_buf, seq);
result.packets_sent += chunks_per_probe;
trace!("debug probe: sending throughput probe seq={} ({} UDP packet(s))", seq, chunks_per_probe);
if socket.send_bytes(&probe_buf).await.is_err() {
debug!("debug probe: send error on seq={}, aborting", seq);
break;
}
}
let mut received_bytes: usize = 0;
for i in 0..total_echo_packets {
if let Some(response) = recv_or_timeout(&socket, timeout_ms).await {
result.packets_received += 1;
received_bytes += response.len();
trace!("debug probe: echo {}/{} ({} bytes, total {}B)", i + 1, total_echo_packets, response.len(), received_bytes);
} else {
debug!("debug probe: echo {}/{} timed out", i + 1, total_echo_packets);
}
}
let elapsed_ms = unix_timestamp_ms().saturating_sub(start_ms);
info!("debug probe: throughput summary — sent {} UDP packet(s), received {} / {} echo(s) ({:.1}% delivery)", result.packets_sent, result.packets_received, total_echo_packets, 100.0 * result.packets_received as f64 / total_echo_packets as f64);
if elapsed_ms > 0 && received_bytes > 0 {
let bps = received_bytes as f64 / (elapsed_ms as f64 / 1000.0);
result.throughput_bps = Some(bps);
info!("debug probe: throughput={}B/s ({} bytes in {}ms)", bps as u64, received_bytes, elapsed_ms);
}
}
result
}