pub(crate) mod atomics;
pub mod attrs;
pub(crate) mod bridge;
pub mod config;
pub(crate) mod instruments;
pub mod snapshot;
pub use attrs::{
leg_str, AeadAlgorithm, CookieOutcome, Direction, EarlyDataOutcome, FallbackReason,
HandshakeOutcome, PathValidationOutcome, PowOutcome, ProtocolVersion, ReplayReason,
ResumptionMode,
};
pub use config::{HistogramConfig, ObservabilityConfig, ObservabilityConfigBuilder};
pub use snapshot::MetricsSnapshot;
use crate::transport::types::LegType;
use atomics::HotPathAtomics;
use instruments::PhantomInstruments;
use std::sync::Arc;
#[derive(Debug)]
pub struct Observability {
config: ObservabilityConfig,
atomics: Arc<HotPathAtomics>,
instruments: PhantomInstruments,
}
impl Observability {
pub fn new(config: ObservabilityConfig) -> Arc<Self> {
let instruments = PhantomInstruments::new(&config);
let atomics = Arc::new(HotPathAtomics::new());
bridge::register_observables(&atomics, &config);
Arc::new(Self {
config,
atomics,
instruments,
})
}
pub fn config(&self) -> &ObservabilityConfig {
&self.config
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot::capture(&self.atomics)
}
#[inline]
pub fn record_send(&self, bytes: usize, leg: LegType) {
self.atomics.record_send(bytes, leg);
}
#[inline]
pub fn record_recv(&self, bytes: usize, leg: LegType) {
self.atomics.record_recv(bytes, leg);
}
#[inline]
pub fn record_encrypt_ns(&self, duration_ns: u64) {
self.atomics.record_encrypt_ns(duration_ns);
}
#[inline]
pub fn record_decrypt_ns(&self, duration_ns: u64) {
self.atomics.record_decrypt_ns(duration_ns);
}
#[inline]
pub fn record_rtt_us(&self, rtt_us: u64, path_id: u8) {
self.atomics.record_rtt_us(rtt_us, path_id);
}
#[inline]
pub fn session_opened(&self, leg: LegType) {
self.atomics.session_opened();
self.instruments.session_opened(leg);
}
#[inline]
pub fn session_closed(&self, leg: LegType) {
self.atomics.session_closed();
self.instruments.session_closed(leg);
}
#[inline]
pub fn stream_opened(&self) {
self.atomics.stream_opened();
self.instruments.stream_opened();
}
#[inline]
pub fn stream_closed(&self) {
self.atomics.stream_closed();
self.instruments.stream_closed();
}
pub fn record_handshake_success(&self, duration_ns: u64) {
self.atomics.record_handshake_success(duration_ns);
}
#[inline]
pub fn record_handshake_failure(&self) {
self.atomics.record_handshake_failure();
}
pub fn record_handshake(
&self,
duration: std::time::Duration,
outcome: HandshakeOutcome,
leg: LegType,
cipher: AeadAlgorithm,
version: ProtocolVersion,
) {
let duration_ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
match outcome {
HandshakeOutcome::Success => self.atomics.record_handshake_success(duration_ns),
HandshakeOutcome::Failure => self.atomics.record_handshake_failure(),
}
self.instruments.record_handshake_duration(
duration.as_secs_f64(),
outcome,
leg,
cipher,
version,
);
}
pub fn record_path_validation(
&self,
duration: std::time::Duration,
path_id: u8,
outcome: PathValidationOutcome,
) {
self.instruments
.record_path_validation_duration(duration.as_secs_f64(), path_id, outcome);
}
pub fn record_resumption(&self, mode: ResumptionMode, accepted: bool) {
self.instruments.record_resumption(mode, accepted);
}
#[inline]
pub fn record_replay_rejected(&self, reason: ReplayReason) {
self.instruments.record_replay_rejected(reason);
}
#[inline]
pub fn record_aead_failure(&self, leg: LegType, algorithm: AeadAlgorithm) {
self.instruments.record_aead_failure(leg, algorithm);
}
#[inline]
pub fn record_unencrypted_dropped(&self, leg: LegType) {
self.instruments.record_unencrypted_dropped(leg);
}
pub fn record_path_migration(&self, from: u8, to: u8) {
self.instruments.record_path_migration(from, to);
}
pub fn record_cookie(&self, outcome: CookieOutcome) {
self.instruments.record_cookie(outcome);
}
pub fn record_pow(&self, outcome: PowOutcome, difficulty: u8) {
self.instruments.record_pow(outcome, difficulty);
}
pub fn record_early_data(&self, outcome: EarlyDataOutcome) {
self.instruments.record_early_data(outcome);
}
pub fn record_rekey(&self, direction: Direction) {
self.instruments.record_rekey(direction);
}
pub fn record_fallback(&self, from_leg: LegType, to_leg: LegType, reason: FallbackReason) {
self.instruments.record_fallback(from_leg, to_leg, reason);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_observability_has_default_config() {
let obs = Observability::new(ObservabilityConfig::default());
assert_eq!(obs.config().namespace.as_ref(), "phantom");
}
#[test]
fn new_returns_arc_with_provided_config() {
let cfg = ObservabilityConfig::builder().namespace("myapp").build();
let obs = Observability::new(cfg);
assert_eq!(obs.config().namespace.as_ref(), "myapp");
let obs2 = obs.clone();
assert_eq!(obs2.config().namespace.as_ref(), "myapp");
}
#[test]
fn record_send_round_trips_through_snapshot() {
let obs = Observability::new(ObservabilityConfig::default());
obs.record_send(1024, LegType::Tcp);
obs.record_send(2048, LegType::Tcp);
obs.record_recv(512, LegType::Kcp);
obs.record_encrypt_ns(100);
obs.record_encrypt_ns(300);
obs.session_opened(LegType::Tcp);
obs.stream_opened();
obs.stream_opened();
obs.record_rtt_us(5_000, 0);
let s = obs.snapshot();
assert_eq!(s.packets_sent, 2);
assert_eq!(s.packets_recv, 1);
assert_eq!(s.bytes_sent, 3072);
assert_eq!(s.bytes_recv, 512);
assert_eq!(s.avg_encrypt_ns, 200);
assert_eq!(s.encrypt_count, 2);
assert_eq!(s.active_sessions, 1);
assert_eq!(s.active_streams, 2);
assert_eq!(s.rtt_us_path_0, 5_000);
}
}