rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Per-event wake counters for the structural-refactor decision (pattern 1
//! vs current). Activated by setting `RUSTZMQ2_WAKE_COUNT=1`. Otherwise
//! every counter call is a single relaxed atomic load + branch — well
//! under 1 ns.
//!
//! Counts are dumped to stderr on the first call to [`dump_and_reset`]
//! after a measurement window — the bench binary calls it post-iter loop.
//!
//! Counters of interest:
//!   * `peer_loop_iters` — every top-of-loop entry in `peer_loop_inner`
//!   * `peer_loop_read_wakes` — read-arm fires (a decoded msg arrived)
//!   * `peer_loop_outbound_wakes` — outbound-arm fires (msg dequeued for
//!     write)
//!   * `peer_loop_writable_wakes` — writable-arm fires (socket drained)
//!   * `recv_next_wakes` — `recv_next` returns a message to caller
//!   * `inline_writes` — caller-thread inline writes that landed on the
//!     wire (skipping the channel + `peer_loop` entirely)
//!
//! Ratios that matter:
//!   * `peer_loop_iters / recv_next_wakes`: how many `peer_loop` wakes per
//!     delivered message. ~2 means per-RTT we cross the `peer_loop` task
//!     once for read + once for write.
//!   * `inline_writes / recv_next_wakes`: fraction of sends that took
//!     the inline fast path. Only nonzero on engines whose socket type
//!     opted in via `SocketOptions::inline_write_max` (REQ/REP/PAIR by
//!     default).

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};

static ENABLED: AtomicBool = AtomicBool::new(false);
static INIT: std::sync::Once = std::sync::Once::new();

#[inline]
fn enabled() -> bool {
    INIT.call_once(|| {
        if std::env::var_os("RUSTZMQ2_WAKE_COUNT").is_some() {
            ENABLED.store(true, Ordering::Relaxed);
        }
    });
    ENABLED.load(Ordering::Relaxed)
}

macro_rules! counter {
    ($name:ident) => {
        pub(crate) static $name: AtomicU64 = AtomicU64::new(0);
    };
}

counter!(PEER_LOOP_ITERS);
counter!(PEER_LOOP_READ_WAKES);
counter!(PEER_LOOP_OUTBOUND_WAKES);
counter!(PEER_LOOP_WRITABLE_WAKES);
counter!(RECV_NEXT_WAKES);
counter!(INLINE_WRITES);

#[inline]
pub(crate) fn bump(c: &AtomicU64) {
    if enabled() {
        c.fetch_add(1, Ordering::Relaxed);
    }
}

/// Dump current counts to stderr as a one-line summary, then reset to 0.
/// Safe to call from any thread; not synchronized — small race in counts
/// is fine, this is for quick measurement.
pub fn dump_and_reset(label: &str) {
    if !enabled() {
        return;
    }
    let pl = PEER_LOOP_ITERS.swap(0, Ordering::Relaxed);
    let rd = PEER_LOOP_READ_WAKES.swap(0, Ordering::Relaxed);
    let ob = PEER_LOOP_OUTBOUND_WAKES.swap(0, Ordering::Relaxed);
    let wr = PEER_LOOP_WRITABLE_WAKES.swap(0, Ordering::Relaxed);
    let rn = RECV_NEXT_WAKES.swap(0, Ordering::Relaxed);
    let il = INLINE_WRITES.swap(0, Ordering::Relaxed);
    eprintln!(
        "[wake-count {label}] peer_loop_iters={pl} read={rd} outbound={ob} writable={wr} recv_next={rn} inline_writes={il}",
    );
    if rn > 0 {
        eprintln!(
            "[wake-count {label}] per recv_next: peer_loop_iters={:.2} read={:.2} outbound={:.2} writable={:.2} inline={:.2}",
            pl as f64 / rn as f64,
            rd as f64 / rn as f64,
            ob as f64 / rn as f64,
            wr as f64 / rn as f64,
            il as f64 / rn as f64,
        );
    }
}