net/adapter/net/cortex/rpc_observer.rs
1//! `RpcObserver` hook — observability seam on the typed-nRPC
2//! dispatch path. Fires on every `call_typed` (caller side)
3//! completion with the metadata an operator-facing tail (the
4//! deck's NRPC view, a metrics exporter, a tracing bridge)
5//! wants: caller, callee, method, latency, status, byte counts.
6//!
7//! The observer is installed at the `MeshNode` level via
8//! [`super::super::MeshNode::set_rpc_observer`] and fires from
9//! the substrate's call path so every caller's traffic flows
10//! through it without per-call wiring at the SDK surface.
11//!
12//! See `DECK_DEMO_HARNESS_PLAN.md` Missing Item D for the design
13//! rationale. v1 ships caller-side firing only; server-side
14//! (inbound) firing is a follow-up — the dispatch path's
15//! mpsc-driven handler invocation needs additional plumbing
16//! before we can record the dispatch-to-respond span cleanly.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21/// Direction of the observed RPC boundary relative to the local
22/// node — `Outbound` for calls this node initiated, `Inbound`
23/// for handler invocations on this node. v1 emits only
24/// `Outbound`; `Inbound` is reserved for a future server-side
25/// hook.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum RpcDirection {
28 /// Local node initiated the call.
29 Outbound,
30 /// Local node received the call and ran the handler.
31 Inbound,
32}
33
34/// Status of an observed RPC call. Maps from the dispatch
35/// path's exit branches: `Ok` for a successful response,
36/// `Error(msg)` for a server-returned typed error or a
37/// transport-level failure, `Timeout` for a deadline expiry,
38/// and `Canceled` for a future drop / cancel-token trip.
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub enum RpcCallStatus {
41 /// Successful response received from the callee.
42 Ok,
43 /// Server returned a typed error or a transport-level
44 /// failure surfaced before the response could be parsed.
45 /// The string carries an operator-readable diagnostic.
46 Error(String),
47 /// `opts.deadline` expired before the response arrived.
48 Timeout,
49 /// The call future was dropped before completion (e.g.
50 /// `select!` cancelled, hedge loser, explicit cancel
51 /// token). Reserved — not yet emitted by v1.
52 Canceled,
53}
54
55/// Single observed RPC boundary. All fields are populated from
56/// the substrate's call path at fire time; the observer must
57/// not mutate them (the type is owned for cheap per-call
58/// construction).
59#[derive(Clone, Debug)]
60pub struct RpcCallEvent {
61 /// The 64-bit node id of the calling node. Equal to
62 /// `local_node_id` on `Outbound` events.
63 pub caller: u64,
64 /// The 64-bit node id of the responding node.
65 pub callee: u64,
66 /// Service / method name as passed into `call_typed` or
67 /// registered via `serve_rpc_typed`.
68 pub method: String,
69 /// Wall-clock-equivalent elapsed time between request send
70 /// (caller side) or dispatch (server side) and the
71 /// observation point. Truncated to ms — observers in this
72 /// codebase don't need ns resolution and `u32` keeps the
73 /// struct compact.
74 pub latency_ms: u32,
75 /// Outcome of the call at the observation point.
76 pub status: RpcCallStatus,
77 /// Wire payload size of the request body (excluding the
78 /// 24-byte `EventMeta` prefix). 0 when not available
79 /// (transport-error branches before the body was framed).
80 pub request_bytes: u32,
81 /// Wire payload size of the response body. 0 when not
82 /// available (timeout, transport error, or cancellation
83 /// before the response arrived).
84 pub response_bytes: u32,
85 /// Whether the observation came from the caller side
86 /// (`Outbound`) or the server side (`Inbound`).
87 pub direction: RpcDirection,
88 /// Unix-ms timestamp captured at fire time. Best-effort
89 /// (pre-1970 clocks read 0).
90 pub ts_unix_ms: u64,
91}
92
93/// Observer trait. The substrate calls `on_call` synchronously
94/// from the dispatch task on each completed RPC boundary;
95/// implementations must be cheap (the firing thread is the
96/// hot path). A push into a bounded mpsc or a lock-free ring
97/// is the expected shape.
98pub trait RpcObserver: Send + Sync + 'static {
99 /// Fired once per observed RPC boundary. Must be cheap —
100 /// the dispatch thread blocks until the call returns.
101 fn on_call(&self, evt: RpcCallEvent);
102}
103
104/// Convenience type alias for the swappable observer cell on
105/// `MeshNode`. `Arc<dyn RpcObserver>` lets multiple ArcSwap
106/// loads share the same underlying observer without cloning
107/// the trait object.
108pub type RpcObserverHandle = Arc<dyn RpcObserver>;
109
110/// Capture `Instant::now()` translated to a unix-millis
111/// timestamp. Used by the call-path firing sites. Wall-clock
112/// is best-effort; a pre-1970 clock reads 0 rather than
113/// underflowing.
114pub fn unix_now_ms() -> u64 {
115 std::time::SystemTime::now()
116 .duration_since(std::time::UNIX_EPOCH)
117 .map(|d| d.as_millis() as u64)
118 .unwrap_or(0)
119}
120
121// ============================================================================
122// ObserverChannel — bounded-mpsc trampoline shared across bindings (N4).
123//
124// Every binding (napi, pyo3, rpc-ffi) was hand-rolling the same
125// bounded-mpsc + drain-worker + drop-counter shape. Consolidating
126// here lets each binding write only its language-specific dispatch
127// closure (TSFN call / GIL-acquired Python invocation / C function
128// pointer) instead of ~55 lines of channel + worker plumbing.
129// ============================================================================
130
131/// Bound on the per-binding observer event buffer. Big enough that
132/// a momentarily-slow observer doesn't lose events under normal
133/// load; small enough that an actually-broken observer surfaces
134/// drops within seconds rather than minutes.
135pub const OBSERVER_BUFFER_CAPACITY: usize = 1024;
136
137/// Process-global count of observer events dropped because the
138/// bounded buffer was full. Shared across every binding's
139/// [`ObserverChannel`] instance. Surface via the binding's
140/// `metrics_snapshot.observer_dropped_total` field.
141///
142/// Per-process (not per-mesh / per-binding-instance) because the
143/// observer dispatch path is fundamentally per-process; consumers
144/// reading the snapshot expect a monotonic process-lifetime count.
145pub static OBSERVER_DROPPED_TOTAL: AtomicU64 = AtomicU64::new(0);
146
147/// Bounded-mpsc observer trampoline. Constructed by each language
148/// binding's `set_observer` implementation; installed on the mesh
149/// via [`super::super::MeshNode::set_rpc_observer`].
150///
151/// The substrate dispatch path's [`RpcObserver::on_call`] pays only
152/// `Arc::clone` + `try_send` + atomic counter on overflow — every
153/// allocation / GIL-acquisition / TSFN call defers to the worker
154/// drained off the dispatch thread.
155pub struct ObserverChannel {
156 sender: tokio::sync::mpsc::Sender<Arc<RpcCallEvent>>,
157}
158
159impl ObserverChannel {
160 /// Build a bounded channel + spawn a drain worker on the given
161 /// runtime handle. `dispatch` runs once per drained event on
162 /// the worker task — bindings put their language-specific
163 /// invocation here (TSFN, GIL acquisition + Python call, C
164 /// function-pointer call).
165 ///
166 /// The worker exits cleanly when the channel closes (every
167 /// `ObserverChannel` is dropped + no more senders exist).
168 pub fn install<F>(runtime: &tokio::runtime::Handle, dispatch: F) -> Self
169 where
170 F: Fn(Arc<RpcCallEvent>) + Send + 'static,
171 {
172 let (sender, mut receiver) =
173 tokio::sync::mpsc::channel::<Arc<RpcCallEvent>>(OBSERVER_BUFFER_CAPACITY);
174 runtime.spawn(async move {
175 while let Some(evt) = receiver.recv().await {
176 dispatch(evt);
177 }
178 // Sender dropped → channel closed → worker exits.
179 });
180 Self { sender }
181 }
182}
183
184impl RpcObserver for ObserverChannel {
185 fn on_call(&self, evt: RpcCallEvent) {
186 if self.sender.try_send(Arc::new(evt)).is_err() {
187 OBSERVER_DROPPED_TOTAL.fetch_add(1, Ordering::Relaxed);
188 }
189 }
190}
191
192/// Current value of the process-global observer drop counter.
193/// Bindings surface this via their snapshot's
194/// `observer_dropped_total` field.
195pub fn observer_dropped_total() -> u64 {
196 OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed)
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn unix_now_ms_returns_recent_timestamp() {
205 // The only meaningful contract for this helper: monotonic on
206 // a sane clock (`SystemTime::now()` doesn't read pre-epoch)
207 // and within a reasonable window of "now". We pin both —
208 // the `unwrap_or(0)` fallback only fires on a pre-1970 clock.
209 let t = unix_now_ms();
210 // 2025-01-01 in unix ms — any sane test environment is past
211 // this, so a zero return would mean the SystemTime call
212 // failed in a way we want to surface.
213 assert!(t > 1_735_689_600_000, "unix_now_ms returned suspicious {t}");
214 }
215
216 /// `ObserverChannel::on_call` drops events when the bounded
217 /// channel fills and increments `OBSERVER_DROPPED_TOTAL` by one
218 /// per drop. The whole point of the v3 mpsc design — overflow
219 /// MUST surface via the snapshot's `observer_dropped_total` so
220 /// a slow consumer is observable from production telemetry.
221 ///
222 /// The worker gate is a `parking_lot::Mutex` held by the test
223 /// for the duration of the burst; the worker tries to lock it
224 /// once per event and blocks until the burst is done. Avoids
225 /// `std::thread::sleep` which would also block tokio shutdown.
226 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
227 async fn observer_channel_drops_overflow_events_and_counts_them() {
228 let handle = tokio::runtime::Handle::current();
229 let gate = Arc::new(parking_lot::Mutex::new(()));
230 let baseline = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed);
231 let burst_guard = gate.lock();
232 let worker_gate = gate.clone();
233 let channel = ObserverChannel::install(&handle, move |_evt| {
234 let _wait = worker_gate.lock();
235 });
236 let make_event = || RpcCallEvent {
237 caller: 1,
238 callee: 2,
239 method: "test.svc.echo".into(),
240 latency_ms: 0,
241 status: RpcCallStatus::Ok,
242 request_bytes: 0,
243 response_bytes: 0,
244 direction: RpcDirection::Outbound,
245 ts_unix_ms: 0,
246 };
247 const FIRED: u64 = 2000;
248 for _ in 0..FIRED {
249 channel.on_call(make_event());
250 }
251 let dropped = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed) - baseline;
252 // First event reaches the worker (which then blocks on the
253 // gate); OBSERVER_BUFFER_CAPACITY-1 fit in the buffer; the
254 // rest drop. Allow ±1 slack for the worker's recv-then-lock
255 // race.
256 let expected_min = FIRED - OBSERVER_BUFFER_CAPACITY as u64 - 1;
257 assert!(
258 dropped >= expected_min,
259 "expected ≥ {expected_min} drops, got {dropped}",
260 );
261 drop(burst_guard);
262 }
263
264 #[test]
265 fn observer_dropped_total_helper_matches_static() {
266 let direct = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed);
267 let via_helper = observer_dropped_total();
268 assert_eq!(direct, via_helper);
269 }
270}