use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RpcDirection {
Outbound,
Inbound,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RpcCallStatus {
Ok,
Error(String),
Timeout,
Canceled,
}
#[derive(Clone, Debug)]
pub struct RpcCallEvent {
pub caller: u64,
pub callee: u64,
pub method: String,
pub latency_ms: u32,
pub status: RpcCallStatus,
pub request_bytes: u32,
pub response_bytes: u32,
pub direction: RpcDirection,
pub ts_unix_ms: u64,
}
pub trait RpcObserver: Send + Sync + 'static {
fn on_call(&self, evt: RpcCallEvent);
}
pub type RpcObserverHandle = Arc<dyn RpcObserver>;
pub fn unix_now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub const OBSERVER_BUFFER_CAPACITY: usize = 1024;
pub static OBSERVER_DROPPED_TOTAL: AtomicU64 = AtomicU64::new(0);
pub struct ObserverChannel {
sender: tokio::sync::mpsc::Sender<Arc<RpcCallEvent>>,
}
impl ObserverChannel {
pub fn install<F>(runtime: &tokio::runtime::Handle, dispatch: F) -> Self
where
F: Fn(Arc<RpcCallEvent>) + Send + 'static,
{
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<Arc<RpcCallEvent>>(OBSERVER_BUFFER_CAPACITY);
runtime.spawn(async move {
while let Some(evt) = receiver.recv().await {
dispatch(evt);
}
});
Self { sender }
}
}
impl RpcObserver for ObserverChannel {
fn on_call(&self, evt: RpcCallEvent) {
if self.sender.try_send(Arc::new(evt)).is_err() {
OBSERVER_DROPPED_TOTAL.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observer_dropped_total() -> u64 {
OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unix_now_ms_returns_recent_timestamp() {
let t = unix_now_ms();
assert!(t > 1_735_689_600_000, "unix_now_ms returned suspicious {t}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn observer_channel_drops_overflow_events_and_counts_them() {
let handle = tokio::runtime::Handle::current();
let gate = Arc::new(parking_lot::Mutex::new(()));
let baseline = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed);
let burst_guard = gate.lock();
let worker_gate = gate.clone();
let channel = ObserverChannel::install(&handle, move |_evt| {
let _wait = worker_gate.lock();
});
let make_event = || RpcCallEvent {
caller: 1,
callee: 2,
method: "test.svc.echo".into(),
latency_ms: 0,
status: RpcCallStatus::Ok,
request_bytes: 0,
response_bytes: 0,
direction: RpcDirection::Outbound,
ts_unix_ms: 0,
};
const FIRED: u64 = 2000;
for _ in 0..FIRED {
channel.on_call(make_event());
}
let dropped = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed) - baseline;
let expected_min = FIRED - OBSERVER_BUFFER_CAPACITY as u64 - 1;
assert!(
dropped >= expected_min,
"expected ≥ {expected_min} drops, got {dropped}",
);
drop(burst_guard);
}
#[test]
fn observer_dropped_total_helper_matches_static() {
let direct = OBSERVER_DROPPED_TOTAL.load(Ordering::Relaxed);
let via_helper = observer_dropped_total();
assert_eq!(direct, via_helper);
}
}