Skip to main content

net/adapter/net/cortex/
rpc_observer.rs

1//! `RpcObserver` hook — observability seam on the typed-nRPC
2//! dispatch path. Fires on every `call_typed` (caller side)
3//! completion with the metadata an operator-facing tail (the
4//! deck's NRPC view, a metrics exporter, a tracing bridge)
5//! wants: caller, callee, method, latency, status, byte counts.
6//!
7//! The observer is installed at the `MeshNode` level via
8//! [`super::super::MeshNode::set_rpc_observer`] and fires from
9//! the substrate's call path so every caller's traffic flows
10//! through it without per-call wiring at the SDK surface.
11//!
12//! See `DECK_DEMO_HARNESS_PLAN.md` Missing Item D for the design
13//! rationale. v1 ships caller-side firing only; server-side
14//! (inbound) firing is a follow-up — the dispatch path's
15//! mpsc-driven handler invocation needs additional plumbing
16//! before we can record the dispatch-to-respond span cleanly.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21/// Direction of the observed RPC boundary relative to the local
22/// node — `Outbound` for calls this node initiated, `Inbound`
23/// for handler invocations on this node. v1 emits only
24/// `Outbound`; `Inbound` is reserved for a future server-side
25/// hook.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum RpcDirection {
28    /// Local node initiated the call.
29    Outbound,
30    /// Local node received the call and ran the handler.
31    Inbound,
32}
33
34/// Status of an observed RPC call. Maps from the dispatch
35/// path's exit branches: `Ok` for a successful response,
36/// `Error(msg)` for a server-returned typed error or a
37/// transport-level failure, `Timeout` for a deadline expiry,
38/// and `Canceled` for a future drop / cancel-token trip.
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub enum RpcCallStatus {
41    /// Successful response received from the callee.
42    Ok,
43    /// Server returned a typed error or a transport-level
44    /// failure surfaced before the response could be parsed.
45    /// The string carries an operator-readable diagnostic.
46    Error(String),
47    /// `opts.deadline` expired before the response arrived.
48    Timeout,
49    /// The call future was dropped before completion (e.g.
50    /// `select!` cancelled, hedge loser, explicit cancel
51    /// token). Reserved — not yet emitted by v1.
52    Canceled,
53}
54
55/// Single observed RPC boundary. All fields are populated from
56/// the substrate's call path at fire time; the observer must
57/// not mutate them (the type is owned for cheap per-call
58/// construction).
59#[derive(Clone, Debug)]
60pub struct RpcCallEvent {
61    /// The 64-bit node id of the calling node. Equal to
62    /// `local_node_id` on `Outbound` events.
63    pub caller: u64,
64    /// The 64-bit node id of the responding node.
65    pub callee: u64,
66    /// Service / method name as passed into `call_typed` or
67    /// registered via `serve_rpc_typed`.
68    pub method: String,
69    /// Wall-clock-equivalent elapsed time between request send
70    /// (caller side) or dispatch (server side) and the
71    /// observation point. Truncated to ms — observers in this
72    /// codebase don't need ns resolution and `u32` keeps the
73    /// struct compact.
74    pub latency_ms: u32,
75    /// Outcome of the call at the observation point.
76    pub status: RpcCallStatus,
77    /// Wire payload size of the request body (excluding the
78    /// 24-byte `EventMeta` prefix). 0 when not available
79    /// (transport-error branches before the body was framed).
80    pub request_bytes: u32,
81    /// Wire payload size of the response body. 0 when not
82    /// available (timeout, transport error, or cancellation
83    /// before the response arrived).
84    pub response_bytes: u32,
85    /// Whether the observation came from the caller side
86    /// (`Outbound`) or the server side (`Inbound`).
87    pub direction: RpcDirection,
88    /// Unix-ms timestamp captured at fire time. Best-effort
89    /// (pre-1970 clocks read 0).
90    pub ts_unix_ms: u64,
91}
92
93/// Observer trait. The substrate calls `on_call` synchronously
94/// from the dispatch task on each completed RPC boundary;
95/// implementations must be cheap (the firing thread is the
96/// hot path). A push into a bounded mpsc or a lock-free ring
97/// is the expected shape.
98pub trait RpcObserver: Send + Sync + 'static {
99    /// Fired once per observed RPC boundary. Must be cheap —
100    /// the dispatch thread blocks until the call returns.
101    fn on_call(&self, evt: RpcCallEvent);
102}
103
104/// Convenience type alias for the swappable observer cell on
105/// `MeshNode`. `Arc<dyn RpcObserver>` lets multiple ArcSwap
106/// loads share the same underlying observer without cloning
107/// the trait object.
108pub type RpcObserverHandle = Arc<dyn RpcObserver>;
109
110/// Capture `Instant::now()` translated to a unix-millis
111/// timestamp. Used by the call-path firing sites. Wall-clock
112/// is best-effort; a pre-1970 clock reads 0 rather than
113/// underflowing.
114pub fn unix_now_ms() -> u64 {
115    std::time::SystemTime::now()
116        .duration_since(std::time::UNIX_EPOCH)
117        .map(|d| d.as_millis() as u64)
118        .unwrap_or(0)
119}
120
121// ============================================================================
122// ObserverChannel — bounded-mpsc trampoline shared across bindings (N4).
123//
124// Every binding (napi, pyo3, rpc-ffi) was hand-rolling the same
125// bounded-mpsc + drain-worker + drop-counter shape. Consolidating
126// here lets each binding write only its language-specific dispatch
127// closure (TSFN call / GIL-acquired Python invocation / C function
128// pointer) instead of ~55 lines of channel + worker plumbing.
129// ============================================================================
130
131/// Bound on the per-binding observer event buffer. Big enough that
132/// a momentarily-slow observer doesn't lose events under normal
133/// load; small enough that an actually-broken observer surfaces
134/// drops within seconds rather than minutes.
135pub const OBSERVER_BUFFER_CAPACITY: usize = 1024;
136
137/// Process-global count of observer events dropped because the
138/// bounded buffer was full. Shared across every binding's
139/// [`ObserverChannel`] instance. Surface via the binding's
140/// `metrics_snapshot.observer_dropped_total` field.
141///
142/// Per-process (not per-mesh / per-binding-instance) because the
143/// observer dispatch path is fundamentally per-process; consumers
144/// reading the snapshot expect a monotonic process-lifetime count.
145pub static OBSERVER_DROPPED_TOTAL: AtomicU64 = AtomicU64::new(0);
146
147/// Bounded-mpsc observer trampoline. Constructed by each language
148/// binding's `set_observer` implementation; installed on the mesh
149/// via [`super::super::MeshNode::set_rpc_observer`].
150///
151/// The substrate dispatch path's [`RpcObserver::on_call`] pays only
152/// `Arc::clone` + `try_send` + atomic counter on overflow — every
153/// allocation / GIL-acquisition / TSFN call defers to the worker
154/// drained off the dispatch thread.
155pub struct ObserverChannel {
156    sender: tokio::sync::mpsc::Sender<Arc<RpcCallEvent>>,
157}
158
159impl ObserverChannel {
160    /// Build a bounded channel + spawn a drain worker on the given
161    /// runtime handle. `dispatch` runs once per drained event on
162    /// the worker task — bindings put their language-specific
163    /// invocation here (TSFN, GIL acquisition + Python call, C
164    /// function-pointer call).
165    ///
166    /// The worker exits cleanly when the channel closes (every
167    /// `ObserverChannel` is dropped + no more senders exist).
168    pub fn install<F>(runtime: &tokio::runtime::Handle, dispatch: F) -> Self
169    where
170        F: Fn(Arc<RpcCallEvent>) + Send + 'static,
171    {
172        let (sender, mut receiver) =
173            tokio::sync::mpsc::channel::<Arc<RpcCallEvent>>(OBSERVER_BUFFER_CAPACITY);
174        runtime.spawn(async move {
175            while let Some(evt) = receiver.recv().await {
176                dispatch(evt);
177            }
178            // Sender dropped → channel closed → worker exits.
179        });
180        Self { sender }
181    }
182}
183
184impl RpcObserver for ObserverChannel {
185    fn on_call(&self, evt: RpcCallEvent) {
186        if self.sender.try_send(Arc::new(evt)).is_err() {
187            OBSERVER_DROPPED_TOTAL.fetch_add(1, Ordering::Relaxed);
188        }
189    }
190}
191
192/// Current value of the process-global observer drop counter.
193/// Bindings surface this via their snapshot's
194/// `observer_dropped_total` field.
195pub fn observer_dropped_total() -> u64 {
196    OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed)
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn unix_now_ms_returns_recent_timestamp() {
205        // The only meaningful contract for this helper: monotonic on
206        // a sane clock (`SystemTime::now()` doesn't read pre-epoch)
207        // and within a reasonable window of "now". We pin both —
208        // the `unwrap_or(0)` fallback only fires on a pre-1970 clock.
209        let t = unix_now_ms();
210        // 2025-01-01 in unix ms — any sane test environment is past
211        // this, so a zero return would mean the SystemTime call
212        // failed in a way we want to surface.
213        assert!(t > 1_735_689_600_000, "unix_now_ms returned suspicious {t}");
214    }
215
216    /// `ObserverChannel::on_call` drops events when the bounded
217    /// channel fills and increments `OBSERVER_DROPPED_TOTAL` by one
218    /// per drop. The whole point of the v3 mpsc design — overflow
219    /// MUST surface via the snapshot's `observer_dropped_total` so
220    /// a slow consumer is observable from production telemetry.
221    ///
222    /// The worker gate is a `parking_lot::Mutex` held by the test
223    /// for the duration of the burst; the worker tries to lock it
224    /// once per event and blocks until the burst is done. Avoids
225    /// `std::thread::sleep` which would also block tokio shutdown.
226    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
227    async fn observer_channel_drops_overflow_events_and_counts_them() {
228        let handle = tokio::runtime::Handle::current();
229        let gate = Arc::new(parking_lot::Mutex::new(()));
230        let baseline = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed);
231        let burst_guard = gate.lock();
232        let worker_gate = gate.clone();
233        let channel = ObserverChannel::install(&handle, move |_evt| {
234            let _wait = worker_gate.lock();
235        });
236        let make_event = || RpcCallEvent {
237            caller: 1,
238            callee: 2,
239            method: "test.svc.echo".into(),
240            latency_ms: 0,
241            status: RpcCallStatus::Ok,
242            request_bytes: 0,
243            response_bytes: 0,
244            direction: RpcDirection::Outbound,
245            ts_unix_ms: 0,
246        };
247        const FIRED: u64 = 2000;
248        for _ in 0..FIRED {
249            channel.on_call(make_event());
250        }
251        let dropped = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed) - baseline;
252        // First event reaches the worker (which then blocks on the
253        // gate); OBSERVER_BUFFER_CAPACITY-1 fit in the buffer; the
254        // rest drop. Allow ±1 slack for the worker's recv-then-lock
255        // race.
256        let expected_min = FIRED - OBSERVER_BUFFER_CAPACITY as u64 - 1;
257        assert!(
258            dropped >= expected_min,
259            "expected ≥ {expected_min} drops, got {dropped}",
260        );
261        drop(burst_guard);
262    }
263
264    #[test]
265    fn observer_dropped_total_helper_matches_static() {
266        let direct = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed);
267        let via_helper = observer_dropped_total();
268        assert_eq!(direct, via_helper);
269    }
270}