use std::ops::Deref;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[cfg(target_os = "linux")]
use std::sync::OnceLock;
use std::sync::RwLock;
use std::time::Duration;
use std::time::SystemTime;
pub trait AsSocketStats {
fn as_socket_stats(&self) -> SocketStats;
fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
None
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SocketStats {
pub pmtu: u16,
pub rtt_us: i64,
pub min_rtt_us: i64,
pub max_rtt_us: i64,
pub rtt_var_us: i64,
pub cwnd: u64,
pub total_pto_count: u64,
pub packets_sent: u64,
pub packets_recvd: u64,
pub packets_lost: u64,
pub packets_lost_spurious: u64,
pub packets_retrans: u64,
pub bytes_sent: u64,
pub bytes_recvd: u64,
pub bytes_lost: u64,
pub bytes_retrans: u64,
pub bytes_unsent: u64,
pub delivery_rate: u64,
pub max_bandwidth: Option<u64>,
pub startup_exit: Option<StartupExit>,
pub bytes_in_flight_duration_us: u64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct StartupExit {
pub cwnd: usize,
pub bandwidth: Option<u64>,
pub reason: StartupExitReason,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum StartupExitReason {
Loss,
BandwidthPlateau,
PersistentQueue,
ConservativeSlowStartRounds,
}
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug)]
pub struct QuicAuditStats {
recvd_conn_close_transport_error_code: AtomicI64,
sent_conn_close_transport_error_code: AtomicI64,
recvd_conn_close_application_error_code: AtomicI64,
sent_conn_close_application_error_code: AtomicI64,
transport_handshake_duration_us: AtomicI64,
transport_handshake_start: Arc<RwLock<Option<SystemTime>>>,
connection_close_reason: RwLock<Option<BoxError>>,
max_bandwidth: AtomicU64,
max_loss_pct: AtomicU8,
#[cfg(target_os = "linux")]
initial_so_mark: OnceLock<[u8; 4]>,
pub quic_connection_id: Vec<u8>,
}
impl QuicAuditStats {
#[inline]
pub fn new(quic_connection_id: Vec<u8>) -> Self {
Self {
recvd_conn_close_transport_error_code: AtomicI64::new(-1),
sent_conn_close_transport_error_code: AtomicI64::new(-1),
recvd_conn_close_application_error_code: AtomicI64::new(-1),
sent_conn_close_application_error_code: AtomicI64::new(-1),
transport_handshake_duration_us: AtomicI64::new(-1),
transport_handshake_start: Arc::new(RwLock::new(None)),
connection_close_reason: RwLock::new(None),
max_bandwidth: AtomicU64::new(0),
max_loss_pct: AtomicU8::new(0),
#[cfg(target_os = "linux")]
initial_so_mark: OnceLock::new(),
quic_connection_id,
}
}
#[inline]
pub fn recvd_conn_close_transport_error_code(&self) -> i64 {
self.recvd_conn_close_transport_error_code
.load(Ordering::SeqCst)
}
#[inline]
pub fn sent_conn_close_transport_error_code(&self) -> i64 {
self.sent_conn_close_transport_error_code
.load(Ordering::SeqCst)
}
#[inline]
pub fn recvd_conn_close_application_error_code(&self) -> i64 {
self.recvd_conn_close_application_error_code
.load(Ordering::SeqCst)
}
#[inline]
pub fn sent_conn_close_application_error_code(&self) -> i64 {
self.sent_conn_close_application_error_code
.load(Ordering::SeqCst)
}
#[inline]
pub fn set_recvd_conn_close_transport_error_code(
&self, recvd_conn_close_transport_error_code: i64,
) {
self.recvd_conn_close_transport_error_code
.store(recvd_conn_close_transport_error_code, Ordering::SeqCst)
}
#[inline]
pub fn set_sent_conn_close_transport_error_code(
&self, sent_conn_close_transport_error_code: i64,
) {
self.sent_conn_close_transport_error_code
.store(sent_conn_close_transport_error_code, Ordering::SeqCst)
}
#[inline]
pub fn set_recvd_conn_close_application_error_code(
&self, recvd_conn_close_application_error_code: i64,
) {
self.recvd_conn_close_application_error_code
.store(recvd_conn_close_application_error_code, Ordering::SeqCst)
}
#[inline]
pub fn set_sent_conn_close_application_error_code(
&self, sent_conn_close_application_error_code: i64,
) {
self.sent_conn_close_application_error_code
.store(sent_conn_close_application_error_code, Ordering::SeqCst)
}
#[inline]
pub fn transport_handshake_duration_us(&self) -> i64 {
self.transport_handshake_duration_us.load(Ordering::SeqCst)
}
#[inline]
pub fn set_transport_handshake_start(&self, start_time: SystemTime) {
*self.transport_handshake_start.write().unwrap() = Some(start_time);
}
#[inline]
pub fn set_transport_handshake_duration(&self, duration: Duration) {
let dur = i64::try_from(duration.as_micros()).unwrap_or(-1);
self.transport_handshake_duration_us
.store(dur, Ordering::SeqCst);
}
#[inline]
pub fn transport_handshake_start(&self) -> Arc<RwLock<Option<SystemTime>>> {
Arc::clone(&self.transport_handshake_start)
}
#[inline]
pub fn connection_close_reason(
&self,
) -> impl Deref<Target = Option<BoxError>> + '_ {
self.connection_close_reason.read().unwrap()
}
#[inline]
pub fn set_connection_close_reason(&self, error: BoxError) {
*self.connection_close_reason.write().unwrap() = Some(error);
}
#[inline]
pub fn set_max_bandwidth(&self, max_bandwidth: u64) {
self.max_bandwidth.store(max_bandwidth, Ordering::Release)
}
#[inline]
pub fn max_bandwidth(&self) -> u64 {
self.max_bandwidth.load(Ordering::Acquire)
}
#[inline]
pub fn set_max_loss_pct(&self, max_loss_pct: u8) {
self.max_loss_pct.store(max_loss_pct, Ordering::Release)
}
#[inline]
pub fn max_loss_pct(&self) -> u8 {
self.max_loss_pct.load(Ordering::Acquire)
}
#[inline]
#[cfg(target_os = "linux")]
pub fn set_initial_so_mark_data(&self, value: Option<[u8; 4]>) {
if let Some(inner) = value {
let _ = self.initial_so_mark.set(inner);
}
}
#[inline]
#[cfg(target_os = "linux")]
pub fn initial_so_mark_data(&self) -> Option<&[u8; 4]> {
self.initial_so_mark.get()
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum StreamClosureKind {
None,
Implicit,
Explicit,
}