Skip to main content

phantom_protocol/observability/
mod.rs

1//! Phantom Protocol observability subsystem.
2//!
3//! Replaces the Phase 4.5 hand-rolled metrics module (`transport::metrics`).
4//! Lock-free hot-path atomics for per-packet recording plus opt-in
5//! OpenTelemetry instruments (metrics + traces) gated behind the
6//! `telemetry-otel` Cargo feature.
7//!
8//! See `docs/observability/refactor-plan.md` for the full design and
9//! `docs/observability/metrics-catalog.md` for the instrument inventory.
10//! This file is the public module surface; concrete types live in the
11//! submodules below.
12//!
13//! ## Layout
14//!
15//! - `atomics` — lock-free hot-path counters (`HotPathAtomics`).
16//! - [`config`] — [`ObservabilityConfig`] (namespace, histogram buckets).
17//! - `instruments` — OTel instrument holder; ZST no-op when the feature
18//!   is off.
19//! - `bridge` — registers `ObservableCounter` callbacks over the atomics.
20//! - [`attrs`] — typed attribute-value enums (the cardinality contract).
21//! - [`snapshot`] — [`MetricsSnapshot`], a cold-path read for FFI / debug.
22
23pub(crate) mod atomics;
24pub mod attrs;
25pub(crate) mod bridge;
26pub mod config;
27pub(crate) mod instruments;
28pub mod snapshot;
29
30pub use attrs::{
31    leg_str, AeadAlgorithm, CookieOutcome, Direction, EarlyDataOutcome, FallbackReason,
32    HandshakeOutcome, PathValidationOutcome, PowOutcome, ProtocolVersion, ReplayReason,
33    ResumptionMode,
34};
35pub use config::{HistogramConfig, ObservabilityConfig, ObservabilityConfigBuilder};
36pub use snapshot::MetricsSnapshot;
37
38use crate::transport::types::LegType;
39use atomics::HotPathAtomics;
40use instruments::PhantomInstruments;
41use std::sync::Arc;
42
43/// Public observability facade.
44///
45/// Wraps the lock-free atomic counters (always present) and — in later
46/// rollout steps — the feature-gated OpenTelemetry instrument holder.
47/// Recording sites in `transport`, `api`, and `crypto` call methods on this
48/// struct via an `Arc<Observability>` borrowed from `PhantomListener` /
49/// `PhantomSession`.
50#[derive(Debug)]
51pub struct Observability {
52    config: ObservabilityConfig,
53    atomics: Arc<HotPathAtomics>,
54    instruments: PhantomInstruments,
55}
56
57impl Observability {
58    /// Construct a new observability handle.
59    ///
60    /// Returns an `Arc` because the handle is shared between recording sites
61    /// (in `Session`, `Listener`, handshake code paths) and the OTel
62    /// observable callbacks.
63    ///
64    /// **Call once per process.** When `telemetry-otel` is on, this
65    /// registers observable-instrument callbacks against the *global* OTel
66    /// meter; those callbacks are never unregistered (the meter owns them
67    /// for process life). Constructing many `Observability` instances would
68    /// therefore accumulate callbacks. `PhantomListener` / `PhantomSession`
69    /// each hold one shared `Arc<Observability>` — that is the intended
70    /// usage.
71    pub fn new(config: ObservabilityConfig) -> Arc<Self> {
72        let instruments = PhantomInstruments::new(&config);
73        let atomics = Arc::new(HotPathAtomics::new());
74        // Register OTel observable callbacks that read the atomic counters
75        // on each export tick. The atomics live behind `Arc` so the
76        // callbacks own a strong ref independent of `Observability`'s
77        // lifetime.
78        bridge::register_observables(&atomics, &config);
79        Arc::new(Self {
80            config,
81            atomics,
82            instruments,
83        })
84    }
85
86    /// Borrow the captured configuration.
87    pub fn config(&self) -> &ObservabilityConfig {
88        &self.config
89    }
90
91    /// Capture a cold-path snapshot of all counters and gauges.
92    pub fn snapshot(&self) -> MetricsSnapshot {
93        MetricsSnapshot::capture(&self.atomics)
94    }
95
96    // --- Hot path recording ---
97
98    #[inline]
99    pub fn record_send(&self, bytes: usize, leg: LegType) {
100        self.atomics.record_send(bytes, leg);
101    }
102
103    #[inline]
104    pub fn record_recv(&self, bytes: usize, leg: LegType) {
105        self.atomics.record_recv(bytes, leg);
106    }
107
108    #[inline]
109    pub fn record_encrypt_ns(&self, duration_ns: u64) {
110        self.atomics.record_encrypt_ns(duration_ns);
111    }
112
113    #[inline]
114    pub fn record_decrypt_ns(&self, duration_ns: u64) {
115        self.atomics.record_decrypt_ns(duration_ns);
116    }
117
118    #[inline]
119    pub fn record_rtt_us(&self, rtt_us: u64, path_id: u8) {
120        self.atomics.record_rtt_us(rtt_us, path_id);
121    }
122
123    // --- Gauges ---
124
125    /// Mark a new session as opened. Updates both the lock-free gauge and
126    /// the OTel `UpDownCounter` (when the `telemetry-otel` feature is on).
127    #[inline]
128    pub fn session_opened(&self, leg: LegType) {
129        self.atomics.session_opened();
130        self.instruments.session_opened(leg);
131    }
132
133    #[inline]
134    pub fn session_closed(&self, leg: LegType) {
135        self.atomics.session_closed();
136        self.instruments.session_closed(leg);
137    }
138
139    #[inline]
140    pub fn stream_opened(&self) {
141        self.atomics.stream_opened();
142        self.instruments.stream_opened();
143    }
144
145    #[inline]
146    pub fn stream_closed(&self) {
147        self.atomics.stream_closed();
148        self.instruments.stream_closed();
149    }
150
151    /// Record a successful handshake completion with its duration (ns).
152    ///
153    /// Transitional API — step 7 introduces a labeled `record_handshake`
154    /// that takes `(outcome, leg, cipher_suite, version)` and writes to an
155    /// OTel `Histogram`. Until then, the duration accumulates in atomic
156    /// sum+count fields surfaced via [`Self::snapshot`].
157    pub fn record_handshake_success(&self, duration_ns: u64) {
158        self.atomics.record_handshake_success(duration_ns);
159    }
160
161    /// Record a handshake failure. Cause attribution (cookie / signature /
162    /// transcript / KEM) lands with the labeled API in step 7.
163    #[inline]
164    pub fn record_handshake_failure(&self) {
165        self.atomics.record_handshake_failure();
166    }
167
168    // --- Labeled OTel event recorders (step 7) ---
169
170    /// Record a handshake outcome and its latency with full OTel
171    /// attribution. The lock-free atomic counters and the OTel
172    /// `Histogram` (`{ns}.handshake.duration`) are updated together; the
173    /// histogram's `_count` series — sliced by the `outcome` attribute —
174    /// is the canonical handshake count, so there is no separate counter.
175    pub fn record_handshake(
176        &self,
177        duration: std::time::Duration,
178        outcome: HandshakeOutcome,
179        leg: LegType,
180        cipher: AeadAlgorithm,
181        version: ProtocolVersion,
182    ) {
183        let duration_ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
184        match outcome {
185            HandshakeOutcome::Success => self.atomics.record_handshake_success(duration_ns),
186            HandshakeOutcome::Failure => self.atomics.record_handshake_failure(),
187        }
188        self.instruments.record_handshake_duration(
189            duration.as_secs_f64(),
190            outcome,
191            leg,
192            cipher,
193            version,
194        );
195    }
196
197    /// Record a `PATH_VALIDATION` exchange latency.
198    pub fn record_path_validation(
199        &self,
200        duration: std::time::Duration,
201        path_id: u8,
202        outcome: PathValidationOutcome,
203    ) {
204        self.instruments
205            .record_path_validation_duration(duration.as_secs_f64(), path_id, outcome);
206    }
207
208    pub fn record_resumption(&self, mode: ResumptionMode, accepted: bool) {
209        self.instruments.record_resumption(mode, accepted);
210    }
211
212    #[inline]
213    pub fn record_replay_rejected(&self, reason: ReplayReason) {
214        self.instruments.record_replay_rejected(reason);
215    }
216
217    #[inline]
218    pub fn record_aead_failure(&self, leg: LegType, algorithm: AeadAlgorithm) {
219        self.instruments.record_aead_failure(leg, algorithm);
220    }
221
222    #[inline]
223    pub fn record_unencrypted_dropped(&self, leg: LegType) {
224        self.instruments.record_unencrypted_dropped(leg);
225    }
226
227    pub fn record_path_migration(&self, from: u8, to: u8) {
228        self.instruments.record_path_migration(from, to);
229    }
230
231    pub fn record_cookie(&self, outcome: CookieOutcome) {
232        self.instruments.record_cookie(outcome);
233    }
234
235    pub fn record_pow(&self, outcome: PowOutcome, difficulty: u8) {
236        self.instruments.record_pow(outcome, difficulty);
237    }
238
239    pub fn record_early_data(&self, outcome: EarlyDataOutcome) {
240        self.instruments.record_early_data(outcome);
241    }
242
243    pub fn record_rekey(&self, direction: Direction) {
244        self.instruments.record_rekey(direction);
245    }
246
247    pub fn record_fallback(&self, from_leg: LegType, to_leg: LegType, reason: FallbackReason) {
248        self.instruments.record_fallback(from_leg, to_leg, reason);
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn default_observability_has_default_config() {
258        let obs = Observability::new(ObservabilityConfig::default());
259        assert_eq!(obs.config().namespace.as_ref(), "phantom");
260    }
261
262    #[test]
263    fn new_returns_arc_with_provided_config() {
264        let cfg = ObservabilityConfig::builder().namespace("myapp").build();
265        let obs = Observability::new(cfg);
266        assert_eq!(obs.config().namespace.as_ref(), "myapp");
267        // Cloning the Arc preserves identity.
268        let obs2 = obs.clone();
269        assert_eq!(obs2.config().namespace.as_ref(), "myapp");
270    }
271
272    #[test]
273    fn record_send_round_trips_through_snapshot() {
274        let obs = Observability::new(ObservabilityConfig::default());
275        obs.record_send(1024, LegType::Tcp);
276        obs.record_send(2048, LegType::Tcp);
277        obs.record_recv(512, LegType::Kcp);
278        obs.record_encrypt_ns(100);
279        obs.record_encrypt_ns(300);
280        obs.session_opened(LegType::Tcp);
281        obs.stream_opened();
282        obs.stream_opened();
283        obs.record_rtt_us(5_000, 0);
284
285        let s = obs.snapshot();
286        assert_eq!(s.packets_sent, 2);
287        assert_eq!(s.packets_recv, 1);
288        assert_eq!(s.bytes_sent, 3072);
289        assert_eq!(s.bytes_recv, 512);
290        assert_eq!(s.avg_encrypt_ns, 200);
291        assert_eq!(s.encrypt_count, 2);
292        assert_eq!(s.active_sessions, 1);
293        assert_eq!(s.active_streams, 2);
294        assert_eq!(s.rtt_us_path_0, 5_000);
295    }
296}