Skip to main content

disruptor_mp/observability/
mod.rs

1//! Aeron-style counters file for low-overhead observability.
2//!
3//! See RFC 0040. Each shared-memory segment reserves a small header zone
4//! holding an array of cache-line-padded `CounterSlot`s. Writers (producer
5//! and consumer hot paths) increment a slot via a single relaxed atomic
6//! `fetch_add`; out-of-process readers attach to the segment and walk the
7//! same array.
8//!
9//! Cost model:
10//! - hot path increment ≈ 1 ns on x86, 1.5 ns on aarch64 (relaxed atomic).
11//! - reader path is non-blocking; counters are eventually-consistent
12//!   monotonic values — no synchronisation needed with the writer.
13//!
14//! Submodules:
15//! - [`aggregator`] (feature `metrics`) — worker thread that pumps the
16//!   counters file into the `metrics`-rs facade on a periodic interval,
17//!   bridging hot-path counters to Prometheus / OTLP / any other
18//!   recorder the embedding application installs.
19
20#[cfg(feature = "metrics")]
21pub mod aggregator;
22#[cfg(feature = "metrics")]
23pub use aggregator::{AggregatorConfig, AggregatorHandle};
24
25use std::ptr::NonNull;
26use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
27
28/// Magic sentinel marking a valid counters file header. Spells "MYC0" in
29/// little-endian — Myelon Counters v0.
30pub const COUNTERS_MAGIC: u32 = u32::from_le_bytes(*b"MYC0");
31
32/// Bytes reserved at the top of a shared-memory segment for the counters
33/// file. Holds the header plus up to ~62 fixed-size slots, which is well
34/// above the per-segment counter set defined in RFC 0040.
35pub const COUNTERS_FILE_RESERVED_BYTES: usize = 4096;
36
37/// Cache line size assumed for slot alignment / inter-counter padding.
38pub const CACHE_LINE_BYTES: usize = 64;
39
40/// Maximum number of counter slots a single segment can host. Picked so
41/// `header + MAX_SLOTS * CounterSlot` fits inside `COUNTERS_FILE_RESERVED_BYTES`.
42pub const MAX_COUNTER_SLOTS: usize = 62;
43
44/// Maximum bytes available for a counter label, null-padded UTF-8.
45pub const COUNTER_LABEL_BYTES: usize = 48;
46
47/// Slot is in use (writer claimed it).
48pub const COUNTER_FLAG_IN_USE: u32 = 1 << 0;
49/// Slot belongs to a producer hot path.
50pub const COUNTER_FLAG_PRODUCER: u32 = 1 << 1;
51/// Slot belongs to a consumer hot path.
52pub const COUNTER_FLAG_CONSUMER: u32 = 1 << 2;
53
54/// Header for the counters file region. Lives at the start of the
55/// reserved zone described in RFC 0040.
56#[repr(C, align(64))]
57pub struct CountersHeader {
58    /// `COUNTERS_MAGIC` when valid.
59    pub magic: u32,
60    /// Layout version; bumped when slot stride or header shape changes.
61    pub version: u32,
62    /// Number of slots actually in use; advances monotonically.
63    pub slot_count: AtomicU32,
64    /// Maximum slots the array can hold (always `MAX_COUNTER_SLOTS`).
65    pub slot_capacity: u32,
66    /// Bytes per slot (`size_of::<CounterSlot>()`); included so external
67    /// readers don't need to depend on this crate's structs.
68    pub slot_stride: u32,
69    /// Bytes from the start of this header to the first slot.
70    pub slots_offset: u32,
71    /// Reserved for future use; pad up to one cache line.
72    _reserved: [u8; 64 - 24],
73}
74
75const _: () = assert!(std::mem::size_of::<CountersHeader>() == CACHE_LINE_BYTES);
76
77/// One Aeron-style counter slot. `#[repr(C, align(64))]` keeps each slot
78/// on its own cache line so writers don't false-share with readers or
79/// neighbouring slots.
80#[repr(C, align(64))]
81pub struct CounterSlot {
82    /// Stable counter identifier (see RFC 0040 §Counters).
83    pub id: u32,
84    /// `COUNTER_FLAG_*` bits.
85    pub flags: AtomicU32,
86    /// Monotonic counter value.
87    pub value: AtomicU64,
88    /// Null-padded UTF-8 label, e.g. `"events_published"`.
89    pub label: [u8; COUNTER_LABEL_BYTES],
90}
91
92const _: () = assert!(std::mem::size_of::<CounterSlot>() == CACHE_LINE_BYTES);
93
94/// Mutable handle to a `CounterSlot`.
95///
96/// Created by writers (producer / consumer construction) and held by
97/// reference for the lifetime of the segment. Increments are relaxed
98/// atomic; reads from a separate thread see eventually-consistent
99/// values.
100#[derive(Clone, Copy, Debug)]
101pub struct CounterHandle {
102    inner: NonNull<CounterSlot>,
103}
104
105unsafe impl Send for CounterHandle {}
106unsafe impl Sync for CounterHandle {}
107
108impl CounterHandle {
109    /// Build a handle from a slot pointer. Caller must ensure the slot
110    /// lives at least as long as the handle (typically tied to a shared
111    /// memory mapping).
112    ///
113    /// # Safety
114    /// `ptr` must point to a valid `CounterSlot` in a mapping that
115    /// outlives the returned handle.
116    pub unsafe fn from_ptr(ptr: NonNull<CounterSlot>) -> Self {
117        Self { inner: ptr }
118    }
119
120    /// Increment by 1.
121    #[inline(always)]
122    pub fn inc(&self) {
123        // SAFETY: handle was constructed from a valid slot pointer with
124        // a lifetime tied to the underlying mapping.
125        let slot = unsafe { self.inner.as_ref() };
126        slot.value.fetch_add(1, Ordering::Relaxed);
127    }
128
129    /// Add `n`.
130    #[inline(always)]
131    pub fn add(&self, n: u64) {
132        let slot = unsafe { self.inner.as_ref() };
133        slot.value.fetch_add(n, Ordering::Relaxed);
134    }
135
136    /// Update the stored maximum if `value` exceeds the current
137    /// reading. Useful for high-water-mark counters such as
138    /// `consumer_lag_max`.
139    #[inline(always)]
140    pub fn record_max(&self, value: u64) {
141        let slot = unsafe { self.inner.as_ref() };
142        let mut current = slot.value.load(Ordering::Relaxed);
143        while value > current {
144            match slot.value.compare_exchange_weak(
145                current,
146                value,
147                Ordering::Relaxed,
148                Ordering::Relaxed,
149            ) {
150                Ok(_) => break,
151                Err(actual) => current = actual,
152            }
153        }
154    }
155
156    /// Read the current value (relaxed).
157    #[inline]
158    pub fn get(&self) -> u64 {
159        let slot = unsafe { self.inner.as_ref() };
160        slot.value.load(Ordering::Relaxed)
161    }
162}
163
164/// View over a counters file region. Owns no memory; constructed from a
165/// pointer into a shared-memory mapping or a stack buffer (test only).
166#[derive(Debug)]
167pub struct CountersFile {
168    base: NonNull<u8>,
169    len: usize,
170}
171
172unsafe impl Send for CountersFile {}
173unsafe impl Sync for CountersFile {}
174
175impl CountersFile {
176    /// Initialise a freshly-zeroed reserved region as a counters file.
177    /// Must be called exactly once per segment, by whichever process
178    /// creates the segment.
179    ///
180    /// # Safety
181    /// `ptr` must point to at least `COUNTERS_FILE_RESERVED_BYTES` of
182    /// writable memory that outlives the returned view, and that memory
183    /// must be zero-initialised before this call.
184    pub unsafe fn init(ptr: NonNull<u8>) -> Self {
185        let header_ptr = ptr.as_ptr() as *mut CountersHeader;
186        let slots_offset = std::mem::size_of::<CountersHeader>() as u32;
187        // SAFETY: caller's contract guarantees `ptr` is writable for at
188        // least `COUNTERS_FILE_RESERVED_BYTES` and that the memory is
189        // zero-initialised, which is what `write` requires here.
190        unsafe {
191            std::ptr::write(
192                header_ptr,
193                CountersHeader {
194                    magic: COUNTERS_MAGIC,
195                    version: 0,
196                    slot_count: AtomicU32::new(0),
197                    slot_capacity: MAX_COUNTER_SLOTS as u32,
198                    slot_stride: std::mem::size_of::<CounterSlot>() as u32,
199                    slots_offset,
200                    _reserved: [0u8; 64 - 24],
201                },
202            );
203        }
204        // Slots are already zeroed by the caller's mmap/SHM allocation.
205        Self {
206            base: ptr,
207            len: COUNTERS_FILE_RESERVED_BYTES,
208        }
209    }
210
211    /// Attach to an existing counters file region. Validates the magic
212    /// and version.
213    ///
214    /// # Safety
215    /// `ptr` must point to at least `COUNTERS_FILE_RESERVED_BYTES` of
216    /// readable memory whose lifetime contains the returned view. The
217    /// memory must already have been initialised by a writer.
218    pub unsafe fn attach(ptr: NonNull<u8>) -> Result<Self, AttachError> {
219        // SAFETY: caller's contract guarantees `ptr` points at an
220        // initialised `CountersHeader` whose backing memory remains
221        // valid for at least the lifetime of the returned view.
222        let header = unsafe { &*(ptr.as_ptr() as *const CountersHeader) };
223        if header.magic != COUNTERS_MAGIC {
224            return Err(AttachError::BadMagic(header.magic));
225        }
226        if header.version != 0 {
227            return Err(AttachError::UnsupportedVersion(header.version));
228        }
229        Ok(Self {
230            base: ptr,
231            len: COUNTERS_FILE_RESERVED_BYTES,
232        })
233    }
234
235    /// Header reference.
236    #[inline]
237    pub fn header(&self) -> &CountersHeader {
238        unsafe { &*(self.base.as_ptr() as *const CountersHeader) }
239    }
240
241    fn slot_ptr(&self, idx: usize) -> NonNull<CounterSlot> {
242        let header = self.header();
243        let off = header.slots_offset as usize + idx * std::mem::size_of::<CounterSlot>();
244        debug_assert!(off + std::mem::size_of::<CounterSlot>() <= self.len);
245        // SAFETY: bounds checked above; offset comes from the in-band header.
246        unsafe { NonNull::new_unchecked(self.base.as_ptr().add(off) as *mut CounterSlot) }
247    }
248
249    /// Reserve a new slot, populate `id`, `flags`, and `label`, and
250    /// return a writer handle. Returns `None` when the slot capacity is
251    /// exhausted.
252    pub fn register(&self, id: u32, flags: u32, label: &str) -> Option<CounterHandle> {
253        let header = self.header();
254        let idx = header.slot_count.fetch_add(1, Ordering::AcqRel) as usize;
255        if idx >= MAX_COUNTER_SLOTS {
256            // Roll back so subsequent register() calls don't keep
257            // incrementing past capacity.
258            header.slot_count.fetch_sub(1, Ordering::Relaxed);
259            return None;
260        }
261        let ptr = self.slot_ptr(idx);
262        // SAFETY: pointer is in-bounds and not yet observed by readers
263        // — slot_count was incremented above before any writer or reader
264        // could see this index, and the Release on `flags` below
265        // publishes the populated slot.
266        unsafe {
267            let raw = ptr.as_ptr();
268            std::ptr::addr_of_mut!((*raw).id).write(id);
269            let label_ptr = std::ptr::addr_of_mut!((*raw).label);
270            (*label_ptr).fill(0);
271            let bytes = label.as_bytes();
272            let n = bytes.len().min(COUNTER_LABEL_BYTES);
273            std::ptr::copy_nonoverlapping(bytes.as_ptr(), label_ptr.cast::<u8>(), n);
274            // value left at its zero initialisation.
275            (*raw)
276                .flags
277                .store(flags | COUNTER_FLAG_IN_USE, Ordering::Release);
278        }
279        Some(unsafe { CounterHandle::from_ptr(ptr) })
280    }
281
282    /// Allocate a fresh, zero-initialised counters file on the heap and
283    /// initialise it. Returns an [`OwnedCountersFile`] that owns the
284    /// backing buffer alongside the `CountersFile` view.
285    ///
286    /// Safe alternative to [`CountersFile::init`] for single-process use
287    /// cases (tests, in-process metrics) where the counters file does
288    /// not need to be shared across processes via SHM or mmap.
289    ///
290    /// The returned `OwnedCountersFile` derefs to `&CountersFile`, so
291    /// all `register` / `snapshot` / etc. operations work directly:
292    ///
293    /// ```
294    /// use disruptor_mp::observability::CountersFile;
295    /// let file = CountersFile::boxed();
296    /// let handle = file.register(1, 0, "events_published").unwrap();
297    /// handle.inc();
298    /// assert_eq!(file.snapshot()[0].value, 1);
299    /// ```
300    #[must_use]
301    pub fn boxed() -> OwnedCountersFile {
302        // `CountersHeader` and `CounterSlot` are `#[repr(C, align(64))]`,
303        // so the backing buffer must also be 64-byte aligned. A plain
304        // `Box<[u8; N]>` only guarantees `align_of::<u8>() = 1`, which
305        // tripped the doctest with a `misaligned pointer dereference:
306        // address must be a multiple of 0x40` on aarch64. The
307        // `OwnedCountersBuf` newtype below carries `#[repr(C, align(64))]`
308        // so the heap allocation comes out cache-line aligned. The
309        // matching `Drop` reconstitutes `Box<OwnedCountersBuf>` (same
310        // type, same alignment) to release the allocation safely.
311        let buf: Box<OwnedCountersBuf> =
312            Box::new(OwnedCountersBuf([0u8; COUNTERS_FILE_RESERVED_BYTES]));
313        let ptr = Box::into_raw(buf);
314        // SAFETY: `ptr` is a valid, non-null, exclusively-owned pointer to
315        // a freshly-allocated, zero-initialised, 64-byte-aligned buffer of
316        // exactly `COUNTERS_FILE_RESERVED_BYTES`. The view stored in
317        // `file.base` aliases the same heap region; `OwnedCountersFile`'s
318        // `Drop` reconstitutes the `Box<OwnedCountersBuf>` to free it.
319        let view = unsafe { CountersFile::init(NonNull::new_unchecked(ptr.cast::<u8>())) };
320        OwnedCountersFile {
321            file: view,
322            buf: ptr,
323        }
324    }
325
326    /// Snapshot all in-use slots — `(id, flags, value, label)` tuples.
327    /// Intended for external readers and tests.
328    pub fn snapshot(&self) -> Vec<CounterSnapshot> {
329        let header = self.header();
330        let count = header.slot_count.load(Ordering::Acquire) as usize;
331        let count = count.min(MAX_COUNTER_SLOTS);
332        let mut out = Vec::with_capacity(count);
333        for idx in 0..count {
334            let ptr = self.slot_ptr(idx);
335            let slot = unsafe { ptr.as_ref() };
336            let flags = slot.flags.load(Ordering::Acquire);
337            if flags & COUNTER_FLAG_IN_USE == 0 {
338                continue;
339            }
340            let label_len = slot
341                .label
342                .iter()
343                .position(|b| *b == 0)
344                .unwrap_or(COUNTER_LABEL_BYTES);
345            let label = String::from_utf8_lossy(&slot.label[..label_len]).into_owned();
346            out.push(CounterSnapshot {
347                id: slot.id,
348                flags,
349                value: slot.value.load(Ordering::Relaxed),
350                label,
351            });
352        }
353        out
354    }
355}
356
357/// 64-byte-aligned backing buffer for [`OwnedCountersFile`]. Kept
358/// `repr(C, align(64))` because [`CountersHeader`] and [`CounterSlot`]
359/// (the structs the buffer is reinterpreted as) are themselves 64-byte
360/// aligned — a plain `Box<[u8; N]>` only promises 1-byte alignment,
361/// which trips `misaligned pointer dereference` panics on platforms
362/// that enforce alignment (e.g. aarch64 in debug builds).
363#[repr(C, align(64))]
364struct OwnedCountersBuf([u8; COUNTERS_FILE_RESERVED_BYTES]);
365
366/// Heap-allocated [`CountersFile`] that owns its backing buffer.
367/// Produced by [`CountersFile::boxed`] for use cases that don't need
368/// cross-process sharing (tests, in-process metrics).
369///
370/// Derefs to `&CountersFile` so all the usual `register` /
371/// `snapshot` / etc. methods work directly.
372pub struct OwnedCountersFile {
373    file: CountersFile,
374    // Raw pointer to the boxed buffer. Reconstituted into a `Box` in
375    // `Drop` so the allocation is released. We keep it as a raw pointer
376    // rather than a `Box` field because `Box<[u8; N]>` and the `NonNull`
377    // inside `file` are both aliasing the same heap region, and Rust's
378    // aliasing rules are friendlier when we don't materialise both as
379    // separate owners during the struct's lifetime.
380    buf: *mut OwnedCountersBuf,
381}
382
383// SAFETY: the buffer is heap-owned, isolated to this `OwnedCountersFile`
384// instance; the file view is `Send`+`Sync` (it's an opaque pointer that
385// the inner type already asserts safe for cross-thread access through
386// its own Send/Sync impls). Wrapping it adds no new aliasing.
387unsafe impl Send for OwnedCountersFile {}
388unsafe impl Sync for OwnedCountersFile {}
389
390impl std::ops::Deref for OwnedCountersFile {
391    type Target = CountersFile;
392    fn deref(&self) -> &CountersFile {
393        &self.file
394    }
395}
396
397impl std::fmt::Debug for OwnedCountersFile {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        f.debug_struct("OwnedCountersFile")
400            .field("file", &self.file)
401            .finish()
402    }
403}
404
405impl Drop for OwnedCountersFile {
406    fn drop(&mut self) {
407        // SAFETY: `buf` was produced by `Box::into_raw` in
408        // `CountersFile::boxed`; reconstituting the Box releases the
409        // allocation. `self.file` is dropped before this Drop body
410        // returns and the file's NonNull pointer is not accessed
411        // afterwards.
412        unsafe {
413            drop(Box::from_raw(self.buf));
414        }
415    }
416}
417
418/// Error returned by [`CountersFile::attach`].
419#[derive(Debug)]
420pub enum AttachError {
421    /// Magic word didn't match `COUNTERS_MAGIC`.
422    BadMagic(u32),
423    /// Header version is from a newer layout this build doesn't know.
424    UnsupportedVersion(u32),
425}
426
427impl std::fmt::Display for AttachError {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        match self {
430            AttachError::BadMagic(m) => write!(f, "counters file: bad magic 0x{m:08x}"),
431            AttachError::UnsupportedVersion(v) => {
432                write!(f, "counters file: unsupported version {v}")
433            }
434        }
435    }
436}
437
438impl std::error::Error for AttachError {}
439
440/// Plain-data view of one counter slot, useful for tests and external
441/// readers (e.g. a future `myelon-stat` binary).
442#[derive(Debug, Clone)]
443pub struct CounterSnapshot {
444    /// Stable counter identifier.
445    pub id: u32,
446    /// `COUNTER_FLAG_*` bits.
447    pub flags: u32,
448    /// Current value at snapshot time.
449    pub value: u64,
450    /// Decoded label.
451    pub label: String,
452}
453
454// ---------- counter ID registry --------------------------------------------
455
456/// Stable counter IDs used by `disruptor-mp`. See RFC 0040 §Counters for
457/// the canonical table. Values above 0x100 are reserved for application
458/// extension via [`CountersFile::register`].
459pub mod ids {
460    // Producer hot path
461    /// `events_published` — successful `publish()` calls.
462    pub const EVENTS_PUBLISHED: u32 = 0x10;
463    /// `events_published_bytes` — cumulative payload bytes published.
464    pub const EVENTS_PUBLISHED_BYTES: u32 = 0x11;
465    /// `producer_full_events` — `publish()` saw ring full.
466    pub const PRODUCER_FULL_EVENTS: u32 = 0x12;
467    /// `producer_park_count` — publisher parked on full.
468    pub const PRODUCER_PARK_COUNT: u32 = 0x13;
469    /// `producer_unpark_count` — publisher unparked by consumer.
470    pub const PRODUCER_UNPARK_COUNT: u32 = 0x14;
471
472    // Consumer hot path
473    /// `events_consumed` — successful `try_consume_next()` calls.
474    pub const EVENTS_CONSUMED: u32 = 0x20;
475    /// `events_consumed_bytes` — cumulative payload bytes consumed.
476    pub const EVENTS_CONSUMED_BYTES: u32 = 0x21;
477    /// `consumer_empty_spins` — `try_consume_next` saw ring empty.
478    pub const CONSUMER_EMPTY_SPINS: u32 = 0x22;
479    /// `consumer_park_count` — consumer parked on empty.
480    pub const CONSUMER_PARK_COUNT: u32 = 0x23;
481    /// `consumer_unpark_count` — consumer unparked by producer.
482    pub const CONSUMER_UNPARK_COUNT: u32 = 0x24;
483    /// `consumer_lag_max` — high-water mark of `producer_seq − consumer_seq`.
484    pub const CONSUMER_LAG_MAX: u32 = 0x25;
485
486    // Coordination (myelon typed layer)
487    /// `frame_publish_total` — `FramedTransport::publish()` calls.
488    pub const FRAME_PUBLISH_TOTAL: u32 = 0x30;
489    /// `frame_fragment_count` — frames split across slots.
490    pub const FRAME_FRAGMENT_COUNT: u32 = 0x31;
491    /// `frame_reassemble_count` — frames reassembled on consume.
492    pub const FRAME_REASSEMBLE_COUNT: u32 = 0x32;
493    /// `codec_encode_total` — `Codec::encode()` calls.
494    pub const CODEC_ENCODE_TOTAL: u32 = 0x33;
495    /// `codec_decode_total` — `Codec::decode()` calls.
496    pub const CODEC_DECODE_TOTAL: u32 = 0x34;
497
498    /// First ID reserved for application extension.
499    pub const APP_RESERVED_BASE: u32 = 0x100;
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    /// 64-byte aligned region matching the layout expectations of
507    /// `CountersHeader` / `CounterSlot`. Real shared-memory mappings
508    /// satisfy this alignment naturally; tests allocate it explicitly.
509    #[repr(C, align(64))]
510    struct AlignedRegion([u8; COUNTERS_FILE_RESERVED_BYTES]);
511
512    /// Allocate a zeroed, cache-line-aligned region. Heap-resident so
513    /// the pointer stays stable for the lifetime of the test.
514    fn fresh_region() -> Box<AlignedRegion> {
515        Box::new(AlignedRegion([0u8; COUNTERS_FILE_RESERVED_BYTES]))
516    }
517
518    fn ptr_from(buf: &mut AlignedRegion) -> NonNull<u8> {
519        NonNull::new(buf.0.as_mut_ptr()).unwrap()
520    }
521
522    #[test]
523    fn header_is_one_cache_line() {
524        assert_eq!(std::mem::size_of::<CountersHeader>(), CACHE_LINE_BYTES);
525    }
526
527    #[test]
528    fn slot_is_one_cache_line() {
529        assert_eq!(std::mem::size_of::<CounterSlot>(), CACHE_LINE_BYTES);
530    }
531
532    #[test]
533    fn header_plus_slots_fit_in_reserved_region() {
534        let bytes = std::mem::size_of::<CountersHeader>()
535            + MAX_COUNTER_SLOTS * std::mem::size_of::<CounterSlot>();
536        assert!(bytes <= COUNTERS_FILE_RESERVED_BYTES);
537    }
538
539    #[test]
540    fn init_then_attach_roundtrip() {
541        let mut buf = fresh_region();
542        let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
543        let header = file.header();
544        assert_eq!(header.magic, COUNTERS_MAGIC);
545        assert_eq!(header.slot_capacity, MAX_COUNTER_SLOTS as u32);
546        assert_eq!(
547            header.slot_stride as usize,
548            std::mem::size_of::<CounterSlot>()
549        );
550        // Re-attach to the same memory; the writer view above shares
551        // the buffer non-mutably, so simply let it fall out of scope.
552        let _ = file;
553        let attached = unsafe { CountersFile::attach(ptr_from(&mut buf)) }.unwrap();
554        assert_eq!(attached.header().magic, COUNTERS_MAGIC);
555    }
556
557    #[test]
558    fn attach_rejects_bad_magic() {
559        let mut buf = fresh_region();
560        // Don't init — magic stays zero.
561        let err = unsafe { CountersFile::attach(ptr_from(&mut buf)) }.unwrap_err();
562        match err {
563            AttachError::BadMagic(0) => {}
564            other => panic!("unexpected error: {other:?}"),
565        }
566    }
567
568    #[test]
569    fn register_inc_get_snapshot_roundtrip() {
570        let mut buf = fresh_region();
571        let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
572
573        let pub_h = file
574            .register(
575                ids::EVENTS_PUBLISHED,
576                COUNTER_FLAG_PRODUCER,
577                "events_published",
578            )
579            .unwrap();
580        let con_h = file
581            .register(
582                ids::EVENTS_CONSUMED,
583                COUNTER_FLAG_CONSUMER,
584                "events_consumed",
585            )
586            .unwrap();
587        let lag_h = file
588            .register(
589                ids::CONSUMER_LAG_MAX,
590                COUNTER_FLAG_CONSUMER,
591                "consumer_lag_max",
592            )
593            .unwrap();
594
595        for _ in 0..123 {
596            pub_h.inc();
597        }
598        con_h.add(456);
599        lag_h.record_max(7);
600        lag_h.record_max(3); // lower than current; ignored.
601        lag_h.record_max(11);
602
603        assert_eq!(pub_h.get(), 123);
604        assert_eq!(con_h.get(), 456);
605        assert_eq!(lag_h.get(), 11);
606
607        let snap = file.snapshot();
608        let by_id = |id: u32| snap.iter().find(|c| c.id == id).cloned();
609        let p = by_id(ids::EVENTS_PUBLISHED).unwrap();
610        assert_eq!(p.value, 123);
611        assert_eq!(p.label, "events_published");
612        assert_eq!(p.flags & COUNTER_FLAG_IN_USE, COUNTER_FLAG_IN_USE);
613        assert_eq!(p.flags & COUNTER_FLAG_PRODUCER, COUNTER_FLAG_PRODUCER);
614        let c = by_id(ids::EVENTS_CONSUMED).unwrap();
615        assert_eq!(c.value, 456);
616        assert_eq!(c.flags & COUNTER_FLAG_CONSUMER, COUNTER_FLAG_CONSUMER);
617        let lag = by_id(ids::CONSUMER_LAG_MAX).unwrap();
618        assert_eq!(lag.value, 11);
619    }
620
621    #[test]
622    fn register_returns_none_at_capacity() {
623        let mut buf = fresh_region();
624        let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
625        for i in 0..MAX_COUNTER_SLOTS {
626            let h = file.register(i as u32, COUNTER_FLAG_PRODUCER, "x");
627            assert!(h.is_some(), "slot {i} should fit");
628        }
629        assert!(file
630            .register(0xDEAD, COUNTER_FLAG_PRODUCER, "overflow")
631            .is_none());
632        // slot_count must be clamped (rolled back when over-capacity).
633        assert_eq!(
634            file.header().slot_count.load(Ordering::Acquire) as usize,
635            MAX_COUNTER_SLOTS
636        );
637    }
638
639    #[test]
640    fn external_attach_sees_writer_increments() {
641        // Simulates a separate "process": one CountersFile initialises &
642        // writes, a second view attaches to the same memory and reads.
643        let mut buf = fresh_region();
644        // Take the raw pointer once so both views share it without
645        // creating overlapping mutable borrows of `buf`.
646        let raw = ptr_from(&mut buf);
647        let writer = unsafe { CountersFile::init(raw) };
648        let h = writer
649            .register(
650                ids::EVENTS_PUBLISHED,
651                COUNTER_FLAG_PRODUCER,
652                "events_published",
653            )
654            .unwrap();
655        for _ in 0..100 {
656            h.inc();
657        }
658        let reader = unsafe { CountersFile::attach(raw) }.unwrap();
659        let snap = reader.snapshot();
660        assert_eq!(snap.len(), 1);
661        assert_eq!(snap[0].id, ids::EVENTS_PUBLISHED);
662        assert_eq!(snap[0].value, 100);
663        assert_eq!(snap[0].label, "events_published");
664    }
665
666    #[test]
667    fn boxed_creates_usable_counters_file() {
668        // The safe `CountersFile::boxed()` wrapper allocates and
669        // initialises the counters region in one call; the returned
670        // `OwnedCountersFile` derefs to `&CountersFile`.
671        let file = CountersFile::boxed();
672        assert_eq!(file.header().magic, COUNTERS_MAGIC);
673        let h = file
674            .register(
675                ids::EVENTS_PUBLISHED,
676                COUNTER_FLAG_PRODUCER,
677                "events_published",
678            )
679            .expect("first slot fits");
680        for _ in 0..7 {
681            h.inc();
682        }
683        let snap = file.snapshot();
684        assert_eq!(snap.len(), 1);
685        assert_eq!(snap[0].id, ids::EVENTS_PUBLISHED);
686        assert_eq!(snap[0].value, 7);
687    }
688
689    #[test]
690    fn owned_counters_file_drop_releases_buffer() {
691        // Smoke-test for the Drop impl: allocating, registering, then
692        // dropping a sequence of `OwnedCountersFile`s should not leak
693        // or fault. Miri / asan would catch UAF / leak; this test just
694        // exercises the path so a regression in Drop is at minimum
695        // noisy in normal runs.
696        for _ in 0..1024 {
697            let f = CountersFile::boxed();
698            let _ = f.register(1, COUNTER_FLAG_PRODUCER, "x");
699            // Drop at end of scope.
700        }
701    }
702
703    #[test]
704    fn owned_counters_file_is_send_and_sync() {
705        // Compile-time check that we can move it across threads.
706        fn assert_send<T: Send>() {}
707        fn assert_sync<T: Sync>() {}
708        assert_send::<OwnedCountersFile>();
709        assert_sync::<OwnedCountersFile>();
710    }
711}