#[cfg(feature = "profiling")]
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
pub struct InstrumentedSender<T> {
inner: mpsc::Sender<T>,
name: &'static str,
#[cfg(feature = "profiling")]
sent: AtomicU64,
}
impl<T> InstrumentedSender<T> {
pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
#[cfg(feature = "profiling")]
let start = std::time::Instant::now();
let result = self.inner.send(value).await;
#[cfg(feature = "profiling")]
{
let count = self.sent.fetch_add(1, Ordering::Relaxed) + 1;
let latency_us = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
if count.trailing_zeros() >= 4 {
tracing::trace!(
channel = self.name,
sent = count,
queue_depth = self.inner.max_capacity() - self.inner.capacity(),
backpressure_latency_us = latency_us,
"channel.metrics"
);
}
}
result
}
pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
self.inner.try_send(value)
}
#[must_use]
pub fn inner(&self) -> &mpsc::Sender<T> {
&self.inner
}
#[must_use]
pub fn into_inner(self) -> mpsc::Sender<T> {
self.inner
}
}
impl<T> Clone for InstrumentedSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
name: self.name,
#[cfg(feature = "profiling")]
sent: AtomicU64::new(0),
}
}
}
pub struct InstrumentedReceiver<T> {
inner: mpsc::Receiver<T>,
#[allow(dead_code)]
name: &'static str,
#[cfg(feature = "profiling")]
received: AtomicU64,
}
impl<T> InstrumentedReceiver<T> {
#[must_use]
pub fn from_receiver(inner: mpsc::Receiver<T>, name: &'static str) -> Self {
Self {
inner,
name,
#[cfg(feature = "profiling")]
received: AtomicU64::new(0),
}
}
pub async fn recv(&mut self) -> Option<T> {
let result = self.inner.recv().await;
#[cfg(feature = "profiling")]
if result.is_some() {
self.received.fetch_add(1, Ordering::Relaxed);
}
result
}
pub fn inner_mut(&mut self) -> &mut mpsc::Receiver<T> {
&mut self.inner
}
#[must_use]
pub fn into_inner(self) -> mpsc::Receiver<T> {
self.inner
}
}
pub struct InstrumentedUnboundedSender<T> {
inner: mpsc::UnboundedSender<T>,
name: &'static str,
#[cfg(feature = "profiling")]
sent: AtomicU64,
}
impl<T> InstrumentedUnboundedSender<T> {
#[must_use]
pub fn from_sender(inner: mpsc::UnboundedSender<T>, name: &'static str) -> Self {
Self {
inner,
name,
#[cfg(feature = "profiling")]
sent: AtomicU64::new(0),
}
}
pub fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
let result = self.inner.send(value);
#[cfg(feature = "profiling")]
{
let count = self.sent.fetch_add(1, Ordering::Relaxed) + 1;
if count.trailing_zeros() >= 6 {
tracing::trace!(channel = self.name, sent = count, "channel.metrics");
}
}
result
}
#[must_use]
pub fn inner(&self) -> &mpsc::UnboundedSender<T> {
&self.inner
}
}
impl<T> Clone for InstrumentedUnboundedSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
name: self.name,
#[cfg(feature = "profiling")]
sent: AtomicU64::new(0),
}
}
}
#[must_use]
pub fn instrumented_channel<T>(
buffer: usize,
name: &'static str,
) -> (InstrumentedSender<T>, InstrumentedReceiver<T>) {
let (tx, rx) = mpsc::channel(buffer);
(
InstrumentedSender {
inner: tx,
name,
#[cfg(feature = "profiling")]
sent: AtomicU64::new(0),
},
InstrumentedReceiver {
inner: rx,
name,
#[cfg(feature = "profiling")]
received: AtomicU64::new(0),
},
)
}
#[must_use]
pub fn instrumented_unbounded_channel<T>(
name: &'static str,
) -> (InstrumentedUnboundedSender<T>, mpsc::UnboundedReceiver<T>) {
let (tx, rx) = mpsc::unbounded_channel();
(
InstrumentedUnboundedSender {
inner: tx,
name,
#[cfg(feature = "profiling")]
sent: AtomicU64::new(0),
},
rx,
)
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "profiling")]
#[tokio::test]
async fn bounded_send_increments_counter() {
let (tx, mut rx) = instrumented_channel::<u32>(8, "test-bounded");
assert_eq!(tx.sent.load(Ordering::Relaxed), 0);
tx.send(1).await.expect("send succeeds");
assert_eq!(tx.sent.load(Ordering::Relaxed), 1);
tx.send(2).await.expect("send succeeds");
assert_eq!(tx.sent.load(Ordering::Relaxed), 2);
let _ = rx.recv().await;
let _ = rx.recv().await;
}
#[cfg(feature = "profiling")]
#[tokio::test]
async fn clone_resets_counter_to_zero() {
let (tx, mut rx) = instrumented_channel::<u32>(8, "test-clone");
tx.send(1).await.expect("send succeeds");
tx.send(2).await.expect("send succeeds");
assert_eq!(tx.sent.load(Ordering::Relaxed), 2);
let tx2 = tx.clone();
assert_eq!(tx2.sent.load(Ordering::Relaxed), 0, "clone starts at 0");
assert_eq!(tx.sent.load(Ordering::Relaxed), 2);
let _ = rx.recv().await;
let _ = rx.recv().await;
}
#[cfg(feature = "profiling")]
#[test]
fn unbounded_send_increments_counter() {
let (tx, _rx) = instrumented_unbounded_channel::<u32>("test-unbounded");
assert_eq!(tx.sent.load(Ordering::Relaxed), 0);
tx.send(1).expect("send succeeds");
assert_eq!(tx.sent.load(Ordering::Relaxed), 1);
}
#[cfg(feature = "profiling")]
#[test]
fn unbounded_clone_resets_counter() {
let (tx, _rx) = instrumented_unbounded_channel::<u32>("test-unbounded-clone");
tx.send(1).expect("send succeeds");
assert_eq!(tx.sent.load(Ordering::Relaxed), 1);
let tx2 = tx.clone();
assert_eq!(tx2.sent.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn from_receiver_wraps_existing() {
let (tx_raw, rx_raw) = tokio::sync::mpsc::channel::<u32>(4);
let mut wrapped = InstrumentedReceiver::from_receiver(rx_raw, "wrap-test");
tx_raw.send(42).await.expect("send succeeds");
let val = wrapped.recv().await.expect("recv succeeds");
assert_eq!(val, 42);
}
#[test]
fn from_sender_wraps_existing() {
let (tx_raw, mut rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
let wrapped = InstrumentedUnboundedSender::from_sender(tx_raw, "wrap-unbounded");
wrapped.send(99).expect("send succeeds");
let val = rx.try_recv().expect("value available");
assert_eq!(val, 99);
}
}