#[cfg(feature = "metrics")]
pub mod aggregator;
#[cfg(feature = "metrics")]
pub use aggregator::{AggregatorConfig, AggregatorHandle};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
pub const COUNTERS_MAGIC: u32 = u32::from_le_bytes(*b"MYC0");
pub const COUNTERS_FILE_RESERVED_BYTES: usize = 4096;
pub const CACHE_LINE_BYTES: usize = 64;
pub const MAX_COUNTER_SLOTS: usize = 62;
pub const COUNTER_LABEL_BYTES: usize = 48;
pub const COUNTER_FLAG_IN_USE: u32 = 1 << 0;
pub const COUNTER_FLAG_PRODUCER: u32 = 1 << 1;
pub const COUNTER_FLAG_CONSUMER: u32 = 1 << 2;
#[repr(C, align(64))]
pub struct CountersHeader {
pub magic: u32,
pub version: u32,
pub slot_count: AtomicU32,
pub slot_capacity: u32,
pub slot_stride: u32,
pub slots_offset: u32,
_reserved: [u8; 64 - 24],
}
const _: () = assert!(std::mem::size_of::<CountersHeader>() == CACHE_LINE_BYTES);
#[repr(C, align(64))]
pub struct CounterSlot {
pub id: u32,
pub flags: AtomicU32,
pub value: AtomicU64,
pub label: [u8; COUNTER_LABEL_BYTES],
}
const _: () = assert!(std::mem::size_of::<CounterSlot>() == CACHE_LINE_BYTES);
#[derive(Clone, Copy, Debug)]
pub struct CounterHandle {
inner: NonNull<CounterSlot>,
}
unsafe impl Send for CounterHandle {}
unsafe impl Sync for CounterHandle {}
impl CounterHandle {
pub unsafe fn from_ptr(ptr: NonNull<CounterSlot>) -> Self {
Self { inner: ptr }
}
#[inline(always)]
pub fn inc(&self) {
let slot = unsafe { self.inner.as_ref() };
slot.value.fetch_add(1, Ordering::Relaxed);
}
#[inline(always)]
pub fn add(&self, n: u64) {
let slot = unsafe { self.inner.as_ref() };
slot.value.fetch_add(n, Ordering::Relaxed);
}
#[inline(always)]
pub fn record_max(&self, value: u64) {
let slot = unsafe { self.inner.as_ref() };
let mut current = slot.value.load(Ordering::Relaxed);
while value > current {
match slot.value.compare_exchange_weak(
current,
value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
#[inline]
pub fn get(&self) -> u64 {
let slot = unsafe { self.inner.as_ref() };
slot.value.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub struct CountersFile {
base: NonNull<u8>,
len: usize,
}
unsafe impl Send for CountersFile {}
unsafe impl Sync for CountersFile {}
impl CountersFile {
pub unsafe fn init(ptr: NonNull<u8>) -> Self {
let header_ptr = ptr.as_ptr() as *mut CountersHeader;
let slots_offset = std::mem::size_of::<CountersHeader>() as u32;
unsafe {
std::ptr::write(
header_ptr,
CountersHeader {
magic: COUNTERS_MAGIC,
version: 0,
slot_count: AtomicU32::new(0),
slot_capacity: MAX_COUNTER_SLOTS as u32,
slot_stride: std::mem::size_of::<CounterSlot>() as u32,
slots_offset,
_reserved: [0u8; 64 - 24],
},
);
}
Self {
base: ptr,
len: COUNTERS_FILE_RESERVED_BYTES,
}
}
pub unsafe fn attach(ptr: NonNull<u8>) -> Result<Self, AttachError> {
let header = unsafe { &*(ptr.as_ptr() as *const CountersHeader) };
if header.magic != COUNTERS_MAGIC {
return Err(AttachError::BadMagic(header.magic));
}
if header.version != 0 {
return Err(AttachError::UnsupportedVersion(header.version));
}
Ok(Self {
base: ptr,
len: COUNTERS_FILE_RESERVED_BYTES,
})
}
#[inline]
pub fn header(&self) -> &CountersHeader {
unsafe { &*(self.base.as_ptr() as *const CountersHeader) }
}
fn slot_ptr(&self, idx: usize) -> NonNull<CounterSlot> {
let header = self.header();
let off = header.slots_offset as usize + idx * std::mem::size_of::<CounterSlot>();
debug_assert!(off + std::mem::size_of::<CounterSlot>() <= self.len);
unsafe { NonNull::new_unchecked(self.base.as_ptr().add(off) as *mut CounterSlot) }
}
pub fn register(&self, id: u32, flags: u32, label: &str) -> Option<CounterHandle> {
let header = self.header();
let idx = header.slot_count.fetch_add(1, Ordering::AcqRel) as usize;
if idx >= MAX_COUNTER_SLOTS {
header.slot_count.fetch_sub(1, Ordering::Relaxed);
return None;
}
let ptr = self.slot_ptr(idx);
unsafe {
let raw = ptr.as_ptr();
std::ptr::addr_of_mut!((*raw).id).write(id);
let label_ptr = std::ptr::addr_of_mut!((*raw).label);
(*label_ptr).fill(0);
let bytes = label.as_bytes();
let n = bytes.len().min(COUNTER_LABEL_BYTES);
std::ptr::copy_nonoverlapping(bytes.as_ptr(), label_ptr.cast::<u8>(), n);
(*raw)
.flags
.store(flags | COUNTER_FLAG_IN_USE, Ordering::Release);
}
Some(unsafe { CounterHandle::from_ptr(ptr) })
}
#[must_use]
pub fn boxed() -> OwnedCountersFile {
let buf: Box<OwnedCountersBuf> =
Box::new(OwnedCountersBuf([0u8; COUNTERS_FILE_RESERVED_BYTES]));
let ptr = Box::into_raw(buf);
let view = unsafe { CountersFile::init(NonNull::new_unchecked(ptr.cast::<u8>())) };
OwnedCountersFile {
file: view,
buf: ptr,
}
}
pub fn snapshot(&self) -> Vec<CounterSnapshot> {
let header = self.header();
let count = header.slot_count.load(Ordering::Acquire) as usize;
let count = count.min(MAX_COUNTER_SLOTS);
let mut out = Vec::with_capacity(count);
for idx in 0..count {
let ptr = self.slot_ptr(idx);
let slot = unsafe { ptr.as_ref() };
let flags = slot.flags.load(Ordering::Acquire);
if flags & COUNTER_FLAG_IN_USE == 0 {
continue;
}
let label_len = slot
.label
.iter()
.position(|b| *b == 0)
.unwrap_or(COUNTER_LABEL_BYTES);
let label = String::from_utf8_lossy(&slot.label[..label_len]).into_owned();
out.push(CounterSnapshot {
id: slot.id,
flags,
value: slot.value.load(Ordering::Relaxed),
label,
});
}
out
}
}
#[repr(C, align(64))]
struct OwnedCountersBuf([u8; COUNTERS_FILE_RESERVED_BYTES]);
pub struct OwnedCountersFile {
file: CountersFile,
buf: *mut OwnedCountersBuf,
}
unsafe impl Send for OwnedCountersFile {}
unsafe impl Sync for OwnedCountersFile {}
impl std::ops::Deref for OwnedCountersFile {
type Target = CountersFile;
fn deref(&self) -> &CountersFile {
&self.file
}
}
impl std::fmt::Debug for OwnedCountersFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OwnedCountersFile")
.field("file", &self.file)
.finish()
}
}
impl Drop for OwnedCountersFile {
fn drop(&mut self) {
unsafe {
drop(Box::from_raw(self.buf));
}
}
}
#[derive(Debug)]
pub enum AttachError {
BadMagic(u32),
UnsupportedVersion(u32),
}
impl std::fmt::Display for AttachError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AttachError::BadMagic(m) => write!(f, "counters file: bad magic 0x{m:08x}"),
AttachError::UnsupportedVersion(v) => {
write!(f, "counters file: unsupported version {v}")
}
}
}
}
impl std::error::Error for AttachError {}
#[derive(Debug, Clone)]
pub struct CounterSnapshot {
pub id: u32,
pub flags: u32,
pub value: u64,
pub label: String,
}
pub mod ids {
pub const EVENTS_PUBLISHED: u32 = 0x10;
pub const EVENTS_PUBLISHED_BYTES: u32 = 0x11;
pub const PRODUCER_FULL_EVENTS: u32 = 0x12;
pub const PRODUCER_PARK_COUNT: u32 = 0x13;
pub const PRODUCER_UNPARK_COUNT: u32 = 0x14;
pub const EVENTS_CONSUMED: u32 = 0x20;
pub const EVENTS_CONSUMED_BYTES: u32 = 0x21;
pub const CONSUMER_EMPTY_SPINS: u32 = 0x22;
pub const CONSUMER_PARK_COUNT: u32 = 0x23;
pub const CONSUMER_UNPARK_COUNT: u32 = 0x24;
pub const CONSUMER_LAG_MAX: u32 = 0x25;
pub const FRAME_PUBLISH_TOTAL: u32 = 0x30;
pub const FRAME_FRAGMENT_COUNT: u32 = 0x31;
pub const FRAME_REASSEMBLE_COUNT: u32 = 0x32;
pub const CODEC_ENCODE_TOTAL: u32 = 0x33;
pub const CODEC_DECODE_TOTAL: u32 = 0x34;
pub const APP_RESERVED_BASE: u32 = 0x100;
}
#[cfg(test)]
mod tests {
use super::*;
#[repr(C, align(64))]
struct AlignedRegion([u8; COUNTERS_FILE_RESERVED_BYTES]);
fn fresh_region() -> Box<AlignedRegion> {
Box::new(AlignedRegion([0u8; COUNTERS_FILE_RESERVED_BYTES]))
}
fn ptr_from(buf: &mut AlignedRegion) -> NonNull<u8> {
NonNull::new(buf.0.as_mut_ptr()).unwrap()
}
#[test]
fn header_is_one_cache_line() {
assert_eq!(std::mem::size_of::<CountersHeader>(), CACHE_LINE_BYTES);
}
#[test]
fn slot_is_one_cache_line() {
assert_eq!(std::mem::size_of::<CounterSlot>(), CACHE_LINE_BYTES);
}
#[test]
fn header_plus_slots_fit_in_reserved_region() {
let bytes = std::mem::size_of::<CountersHeader>()
+ MAX_COUNTER_SLOTS * std::mem::size_of::<CounterSlot>();
assert!(bytes <= COUNTERS_FILE_RESERVED_BYTES);
}
#[test]
fn init_then_attach_roundtrip() {
let mut buf = fresh_region();
let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
let header = file.header();
assert_eq!(header.magic, COUNTERS_MAGIC);
assert_eq!(header.slot_capacity, MAX_COUNTER_SLOTS as u32);
assert_eq!(
header.slot_stride as usize,
std::mem::size_of::<CounterSlot>()
);
let _ = file;
let attached = unsafe { CountersFile::attach(ptr_from(&mut buf)) }.unwrap();
assert_eq!(attached.header().magic, COUNTERS_MAGIC);
}
#[test]
fn attach_rejects_bad_magic() {
let mut buf = fresh_region();
let err = unsafe { CountersFile::attach(ptr_from(&mut buf)) }.unwrap_err();
match err {
AttachError::BadMagic(0) => {}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn register_inc_get_snapshot_roundtrip() {
let mut buf = fresh_region();
let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
let pub_h = file
.register(
ids::EVENTS_PUBLISHED,
COUNTER_FLAG_PRODUCER,
"events_published",
)
.unwrap();
let con_h = file
.register(
ids::EVENTS_CONSUMED,
COUNTER_FLAG_CONSUMER,
"events_consumed",
)
.unwrap();
let lag_h = file
.register(
ids::CONSUMER_LAG_MAX,
COUNTER_FLAG_CONSUMER,
"consumer_lag_max",
)
.unwrap();
for _ in 0..123 {
pub_h.inc();
}
con_h.add(456);
lag_h.record_max(7);
lag_h.record_max(3); lag_h.record_max(11);
assert_eq!(pub_h.get(), 123);
assert_eq!(con_h.get(), 456);
assert_eq!(lag_h.get(), 11);
let snap = file.snapshot();
let by_id = |id: u32| snap.iter().find(|c| c.id == id).cloned();
let p = by_id(ids::EVENTS_PUBLISHED).unwrap();
assert_eq!(p.value, 123);
assert_eq!(p.label, "events_published");
assert_eq!(p.flags & COUNTER_FLAG_IN_USE, COUNTER_FLAG_IN_USE);
assert_eq!(p.flags & COUNTER_FLAG_PRODUCER, COUNTER_FLAG_PRODUCER);
let c = by_id(ids::EVENTS_CONSUMED).unwrap();
assert_eq!(c.value, 456);
assert_eq!(c.flags & COUNTER_FLAG_CONSUMER, COUNTER_FLAG_CONSUMER);
let lag = by_id(ids::CONSUMER_LAG_MAX).unwrap();
assert_eq!(lag.value, 11);
}
#[test]
fn register_returns_none_at_capacity() {
let mut buf = fresh_region();
let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
for i in 0..MAX_COUNTER_SLOTS {
let h = file.register(i as u32, COUNTER_FLAG_PRODUCER, "x");
assert!(h.is_some(), "slot {i} should fit");
}
assert!(file
.register(0xDEAD, COUNTER_FLAG_PRODUCER, "overflow")
.is_none());
assert_eq!(
file.header().slot_count.load(Ordering::Acquire) as usize,
MAX_COUNTER_SLOTS
);
}
#[test]
fn external_attach_sees_writer_increments() {
let mut buf = fresh_region();
let raw = ptr_from(&mut buf);
let writer = unsafe { CountersFile::init(raw) };
let h = writer
.register(
ids::EVENTS_PUBLISHED,
COUNTER_FLAG_PRODUCER,
"events_published",
)
.unwrap();
for _ in 0..100 {
h.inc();
}
let reader = unsafe { CountersFile::attach(raw) }.unwrap();
let snap = reader.snapshot();
assert_eq!(snap.len(), 1);
assert_eq!(snap[0].id, ids::EVENTS_PUBLISHED);
assert_eq!(snap[0].value, 100);
assert_eq!(snap[0].label, "events_published");
}
#[test]
fn boxed_creates_usable_counters_file() {
let file = CountersFile::boxed();
assert_eq!(file.header().magic, COUNTERS_MAGIC);
let h = file
.register(
ids::EVENTS_PUBLISHED,
COUNTER_FLAG_PRODUCER,
"events_published",
)
.expect("first slot fits");
for _ in 0..7 {
h.inc();
}
let snap = file.snapshot();
assert_eq!(snap.len(), 1);
assert_eq!(snap[0].id, ids::EVENTS_PUBLISHED);
assert_eq!(snap[0].value, 7);
}
#[test]
fn owned_counters_file_drop_releases_buffer() {
for _ in 0..1024 {
let f = CountersFile::boxed();
let _ = f.register(1, COUNTER_FLAG_PRODUCER, "x");
}
}
#[test]
fn owned_counters_file_is_send_and_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<OwnedCountersFile>();
assert_sync::<OwnedCountersFile>();
}
}