phantom_protocol/observability/
mod.rs1pub(crate) mod atomics;
24pub mod attrs;
25pub(crate) mod bridge;
26pub mod config;
27pub(crate) mod instruments;
28pub mod snapshot;
29
30pub use attrs::{
31 leg_str, AeadAlgorithm, CookieOutcome, Direction, EarlyDataOutcome, FallbackReason,
32 HandshakeOutcome, PathValidationOutcome, PowOutcome, ProtocolVersion, ReplayReason,
33 ResumptionMode,
34};
35pub use config::{HistogramConfig, ObservabilityConfig, ObservabilityConfigBuilder};
36pub use snapshot::MetricsSnapshot;
37
38use crate::transport::types::LegType;
39use atomics::HotPathAtomics;
40use instruments::PhantomInstruments;
41use std::sync::Arc;
42
43#[derive(Debug)]
51pub struct Observability {
52 config: ObservabilityConfig,
53 atomics: Arc<HotPathAtomics>,
54 instruments: PhantomInstruments,
55}
56
57impl Observability {
58 pub fn new(config: ObservabilityConfig) -> Arc<Self> {
72 let instruments = PhantomInstruments::new(&config);
73 let atomics = Arc::new(HotPathAtomics::new());
74 bridge::register_observables(&atomics, &config);
79 Arc::new(Self {
80 config,
81 atomics,
82 instruments,
83 })
84 }
85
86 pub fn config(&self) -> &ObservabilityConfig {
88 &self.config
89 }
90
91 pub fn snapshot(&self) -> MetricsSnapshot {
93 MetricsSnapshot::capture(&self.atomics)
94 }
95
96 #[inline]
99 pub fn record_send(&self, bytes: usize, leg: LegType) {
100 self.atomics.record_send(bytes, leg);
101 }
102
103 #[inline]
104 pub fn record_recv(&self, bytes: usize, leg: LegType) {
105 self.atomics.record_recv(bytes, leg);
106 }
107
108 #[inline]
109 pub fn record_encrypt_ns(&self, duration_ns: u64) {
110 self.atomics.record_encrypt_ns(duration_ns);
111 }
112
113 #[inline]
114 pub fn record_decrypt_ns(&self, duration_ns: u64) {
115 self.atomics.record_decrypt_ns(duration_ns);
116 }
117
118 #[inline]
119 pub fn record_rtt_us(&self, rtt_us: u64, path_id: u8) {
120 self.atomics.record_rtt_us(rtt_us, path_id);
121 }
122
123 #[inline]
128 pub fn session_opened(&self, leg: LegType) {
129 self.atomics.session_opened();
130 self.instruments.session_opened(leg);
131 }
132
133 #[inline]
134 pub fn session_closed(&self, leg: LegType) {
135 self.atomics.session_closed();
136 self.instruments.session_closed(leg);
137 }
138
139 #[inline]
140 pub fn stream_opened(&self) {
141 self.atomics.stream_opened();
142 self.instruments.stream_opened();
143 }
144
145 #[inline]
146 pub fn stream_closed(&self) {
147 self.atomics.stream_closed();
148 self.instruments.stream_closed();
149 }
150
151 pub fn record_handshake_success(&self, duration_ns: u64) {
158 self.atomics.record_handshake_success(duration_ns);
159 }
160
161 #[inline]
164 pub fn record_handshake_failure(&self) {
165 self.atomics.record_handshake_failure();
166 }
167
168 pub fn record_handshake(
176 &self,
177 duration: std::time::Duration,
178 outcome: HandshakeOutcome,
179 leg: LegType,
180 cipher: AeadAlgorithm,
181 version: ProtocolVersion,
182 ) {
183 let duration_ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
184 match outcome {
185 HandshakeOutcome::Success => self.atomics.record_handshake_success(duration_ns),
186 HandshakeOutcome::Failure => self.atomics.record_handshake_failure(),
187 }
188 self.instruments.record_handshake_duration(
189 duration.as_secs_f64(),
190 outcome,
191 leg,
192 cipher,
193 version,
194 );
195 }
196
197 pub fn record_path_validation(
199 &self,
200 duration: std::time::Duration,
201 path_id: u8,
202 outcome: PathValidationOutcome,
203 ) {
204 self.instruments
205 .record_path_validation_duration(duration.as_secs_f64(), path_id, outcome);
206 }
207
208 pub fn record_resumption(&self, mode: ResumptionMode, accepted: bool) {
209 self.instruments.record_resumption(mode, accepted);
210 }
211
212 #[inline]
213 pub fn record_replay_rejected(&self, reason: ReplayReason) {
214 self.instruments.record_replay_rejected(reason);
215 }
216
217 #[inline]
218 pub fn record_aead_failure(&self, leg: LegType, algorithm: AeadAlgorithm) {
219 self.instruments.record_aead_failure(leg, algorithm);
220 }
221
222 #[inline]
223 pub fn record_unencrypted_dropped(&self, leg: LegType) {
224 self.instruments.record_unencrypted_dropped(leg);
225 }
226
227 pub fn record_path_migration(&self, from: u8, to: u8) {
228 self.instruments.record_path_migration(from, to);
229 }
230
231 pub fn record_cookie(&self, outcome: CookieOutcome) {
232 self.instruments.record_cookie(outcome);
233 }
234
235 pub fn record_pow(&self, outcome: PowOutcome, difficulty: u8) {
236 self.instruments.record_pow(outcome, difficulty);
237 }
238
239 pub fn record_early_data(&self, outcome: EarlyDataOutcome) {
240 self.instruments.record_early_data(outcome);
241 }
242
243 pub fn record_rekey(&self, direction: Direction) {
244 self.instruments.record_rekey(direction);
245 }
246
247 pub fn record_fallback(&self, from_leg: LegType, to_leg: LegType, reason: FallbackReason) {
248 self.instruments.record_fallback(from_leg, to_leg, reason);
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn default_observability_has_default_config() {
258 let obs = Observability::new(ObservabilityConfig::default());
259 assert_eq!(obs.config().namespace.as_ref(), "phantom");
260 }
261
262 #[test]
263 fn new_returns_arc_with_provided_config() {
264 let cfg = ObservabilityConfig::builder().namespace("myapp").build();
265 let obs = Observability::new(cfg);
266 assert_eq!(obs.config().namespace.as_ref(), "myapp");
267 let obs2 = obs.clone();
269 assert_eq!(obs2.config().namespace.as_ref(), "myapp");
270 }
271
272 #[test]
273 fn record_send_round_trips_through_snapshot() {
274 let obs = Observability::new(ObservabilityConfig::default());
275 obs.record_send(1024, LegType::Tcp);
276 obs.record_send(2048, LegType::Tcp);
277 obs.record_recv(512, LegType::Kcp);
278 obs.record_encrypt_ns(100);
279 obs.record_encrypt_ns(300);
280 obs.session_opened(LegType::Tcp);
281 obs.stream_opened();
282 obs.stream_opened();
283 obs.record_rtt_us(5_000, 0);
284
285 let s = obs.snapshot();
286 assert_eq!(s.packets_sent, 2);
287 assert_eq!(s.packets_recv, 1);
288 assert_eq!(s.bytes_sent, 3072);
289 assert_eq!(s.bytes_recv, 512);
290 assert_eq!(s.avg_encrypt_ns, 200);
291 assert_eq!(s.encrypt_count, 2);
292 assert_eq!(s.active_sessions, 1);
293 assert_eq!(s.active_streams, 2);
294 assert_eq!(s.rtt_us_path_0, 5_000);
295 }
296}