disruptor_mp/observability/mod.rs
1//! Aeron-style counters file for low-overhead observability.
2//!
3//! See RFC 0040. Each shared-memory segment reserves a small header zone
4//! holding an array of cache-line-padded `CounterSlot`s. Writers (producer
5//! and consumer hot paths) increment a slot via a single relaxed atomic
6//! `fetch_add`; out-of-process readers attach to the segment and walk the
7//! same array.
8//!
9//! Cost model:
10//! - hot path increment ≈ 1 ns on x86, 1.5 ns on aarch64 (relaxed atomic).
11//! - reader path is non-blocking; counters are eventually-consistent
12//! monotonic values — no synchronisation needed with the writer.
13//!
14//! Submodules:
15//! - [`aggregator`] (feature `metrics`) — worker thread that pumps the
16//! counters file into the `metrics`-rs facade on a periodic interval,
17//! bridging hot-path counters to Prometheus / OTLP / any other
18//! recorder the embedding application installs.
19
20#[cfg(feature = "metrics")]
21pub mod aggregator;
22#[cfg(feature = "metrics")]
23pub use aggregator::{AggregatorConfig, AggregatorHandle};
24
25use std::ptr::NonNull;
26use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
27
28/// Magic sentinel marking a valid counters file header. Spells "MYC0" in
29/// little-endian — Myelon Counters v0.
30pub const COUNTERS_MAGIC: u32 = u32::from_le_bytes(*b"MYC0");
31
32/// Bytes reserved at the top of a shared-memory segment for the counters
33/// file. Holds the header plus up to ~62 fixed-size slots, which is well
34/// above the per-segment counter set defined in RFC 0040.
35pub const COUNTERS_FILE_RESERVED_BYTES: usize = 4096;
36
37/// Cache line size assumed for slot alignment / inter-counter padding.
38pub const CACHE_LINE_BYTES: usize = 64;
39
40/// Maximum number of counter slots a single segment can host. Picked so
41/// `header + MAX_SLOTS * CounterSlot` fits inside `COUNTERS_FILE_RESERVED_BYTES`.
42pub const MAX_COUNTER_SLOTS: usize = 62;
43
44/// Maximum bytes available for a counter label, null-padded UTF-8.
45pub const COUNTER_LABEL_BYTES: usize = 48;
46
47/// Slot is in use (writer claimed it).
48pub const COUNTER_FLAG_IN_USE: u32 = 1 << 0;
49/// Slot belongs to a producer hot path.
50pub const COUNTER_FLAG_PRODUCER: u32 = 1 << 1;
51/// Slot belongs to a consumer hot path.
52pub const COUNTER_FLAG_CONSUMER: u32 = 1 << 2;
53
54/// Header for the counters file region. Lives at the start of the
55/// reserved zone described in RFC 0040.
56#[repr(C, align(64))]
57pub struct CountersHeader {
58 /// `COUNTERS_MAGIC` when valid.
59 pub magic: u32,
60 /// Layout version; bumped when slot stride or header shape changes.
61 pub version: u32,
62 /// Number of slots actually in use; advances monotonically.
63 pub slot_count: AtomicU32,
64 /// Maximum slots the array can hold (always `MAX_COUNTER_SLOTS`).
65 pub slot_capacity: u32,
66 /// Bytes per slot (`size_of::<CounterSlot>()`); included so external
67 /// readers don't need to depend on this crate's structs.
68 pub slot_stride: u32,
69 /// Bytes from the start of this header to the first slot.
70 pub slots_offset: u32,
71 /// Reserved for future use; pad up to one cache line.
72 _reserved: [u8; 64 - 24],
73}
74
75const _: () = assert!(std::mem::size_of::<CountersHeader>() == CACHE_LINE_BYTES);
76
77/// One Aeron-style counter slot. `#[repr(C, align(64))]` keeps each slot
78/// on its own cache line so writers don't false-share with readers or
79/// neighbouring slots.
80#[repr(C, align(64))]
81pub struct CounterSlot {
82 /// Stable counter identifier (see RFC 0040 §Counters).
83 pub id: u32,
84 /// `COUNTER_FLAG_*` bits.
85 pub flags: AtomicU32,
86 /// Monotonic counter value.
87 pub value: AtomicU64,
88 /// Null-padded UTF-8 label, e.g. `"events_published"`.
89 pub label: [u8; COUNTER_LABEL_BYTES],
90}
91
92const _: () = assert!(std::mem::size_of::<CounterSlot>() == CACHE_LINE_BYTES);
93
94/// Mutable handle to a `CounterSlot`.
95///
96/// Created by writers (producer / consumer construction) and held by
97/// reference for the lifetime of the segment. Increments are relaxed
98/// atomic; reads from a separate thread see eventually-consistent
99/// values.
100#[derive(Clone, Copy, Debug)]
101pub struct CounterHandle {
102 inner: NonNull<CounterSlot>,
103}
104
105unsafe impl Send for CounterHandle {}
106unsafe impl Sync for CounterHandle {}
107
108impl CounterHandle {
109 /// Build a handle from a slot pointer. Caller must ensure the slot
110 /// lives at least as long as the handle (typically tied to a shared
111 /// memory mapping).
112 ///
113 /// # Safety
114 /// `ptr` must point to a valid `CounterSlot` in a mapping that
115 /// outlives the returned handle.
116 pub unsafe fn from_ptr(ptr: NonNull<CounterSlot>) -> Self {
117 Self { inner: ptr }
118 }
119
120 /// Increment by 1.
121 #[inline(always)]
122 pub fn inc(&self) {
123 // SAFETY: handle was constructed from a valid slot pointer with
124 // a lifetime tied to the underlying mapping.
125 let slot = unsafe { self.inner.as_ref() };
126 slot.value.fetch_add(1, Ordering::Relaxed);
127 }
128
129 /// Add `n`.
130 #[inline(always)]
131 pub fn add(&self, n: u64) {
132 let slot = unsafe { self.inner.as_ref() };
133 slot.value.fetch_add(n, Ordering::Relaxed);
134 }
135
136 /// Update the stored maximum if `value` exceeds the current
137 /// reading. Useful for high-water-mark counters such as
138 /// `consumer_lag_max`.
139 #[inline(always)]
140 pub fn record_max(&self, value: u64) {
141 let slot = unsafe { self.inner.as_ref() };
142 let mut current = slot.value.load(Ordering::Relaxed);
143 while value > current {
144 match slot.value.compare_exchange_weak(
145 current,
146 value,
147 Ordering::Relaxed,
148 Ordering::Relaxed,
149 ) {
150 Ok(_) => break,
151 Err(actual) => current = actual,
152 }
153 }
154 }
155
156 /// Read the current value (relaxed).
157 #[inline]
158 pub fn get(&self) -> u64 {
159 let slot = unsafe { self.inner.as_ref() };
160 slot.value.load(Ordering::Relaxed)
161 }
162}
163
164/// View over a counters file region. Owns no memory; constructed from a
165/// pointer into a shared-memory mapping or a stack buffer (test only).
166#[derive(Debug)]
167pub struct CountersFile {
168 base: NonNull<u8>,
169 len: usize,
170}
171
172unsafe impl Send for CountersFile {}
173unsafe impl Sync for CountersFile {}
174
175impl CountersFile {
176 /// Initialise a freshly-zeroed reserved region as a counters file.
177 /// Must be called exactly once per segment, by whichever process
178 /// creates the segment.
179 ///
180 /// # Safety
181 /// `ptr` must point to at least `COUNTERS_FILE_RESERVED_BYTES` of
182 /// writable memory that outlives the returned view, and that memory
183 /// must be zero-initialised before this call.
184 pub unsafe fn init(ptr: NonNull<u8>) -> Self {
185 let header_ptr = ptr.as_ptr() as *mut CountersHeader;
186 let slots_offset = std::mem::size_of::<CountersHeader>() as u32;
187 // SAFETY: caller's contract guarantees `ptr` is writable for at
188 // least `COUNTERS_FILE_RESERVED_BYTES` and that the memory is
189 // zero-initialised, which is what `write` requires here.
190 unsafe {
191 std::ptr::write(
192 header_ptr,
193 CountersHeader {
194 magic: COUNTERS_MAGIC,
195 version: 0,
196 slot_count: AtomicU32::new(0),
197 slot_capacity: MAX_COUNTER_SLOTS as u32,
198 slot_stride: std::mem::size_of::<CounterSlot>() as u32,
199 slots_offset,
200 _reserved: [0u8; 64 - 24],
201 },
202 );
203 }
204 // Slots are already zeroed by the caller's mmap/SHM allocation.
205 Self {
206 base: ptr,
207 len: COUNTERS_FILE_RESERVED_BYTES,
208 }
209 }
210
211 /// Attach to an existing counters file region. Validates the magic
212 /// and version.
213 ///
214 /// # Safety
215 /// `ptr` must point to at least `COUNTERS_FILE_RESERVED_BYTES` of
216 /// readable memory whose lifetime contains the returned view. The
217 /// memory must already have been initialised by a writer.
218 pub unsafe fn attach(ptr: NonNull<u8>) -> Result<Self, AttachError> {
219 // SAFETY: caller's contract guarantees `ptr` points at an
220 // initialised `CountersHeader` whose backing memory remains
221 // valid for at least the lifetime of the returned view.
222 let header = unsafe { &*(ptr.as_ptr() as *const CountersHeader) };
223 if header.magic != COUNTERS_MAGIC {
224 return Err(AttachError::BadMagic(header.magic));
225 }
226 if header.version != 0 {
227 return Err(AttachError::UnsupportedVersion(header.version));
228 }
229 Ok(Self {
230 base: ptr,
231 len: COUNTERS_FILE_RESERVED_BYTES,
232 })
233 }
234
235 /// Header reference.
236 #[inline]
237 pub fn header(&self) -> &CountersHeader {
238 unsafe { &*(self.base.as_ptr() as *const CountersHeader) }
239 }
240
241 fn slot_ptr(&self, idx: usize) -> NonNull<CounterSlot> {
242 let header = self.header();
243 let off = header.slots_offset as usize + idx * std::mem::size_of::<CounterSlot>();
244 debug_assert!(off + std::mem::size_of::<CounterSlot>() <= self.len);
245 // SAFETY: bounds checked above; offset comes from the in-band header.
246 unsafe { NonNull::new_unchecked(self.base.as_ptr().add(off) as *mut CounterSlot) }
247 }
248
249 /// Reserve a new slot, populate `id`, `flags`, and `label`, and
250 /// return a writer handle. Returns `None` when the slot capacity is
251 /// exhausted.
252 pub fn register(&self, id: u32, flags: u32, label: &str) -> Option<CounterHandle> {
253 let header = self.header();
254 let idx = header.slot_count.fetch_add(1, Ordering::AcqRel) as usize;
255 if idx >= MAX_COUNTER_SLOTS {
256 // Roll back so subsequent register() calls don't keep
257 // incrementing past capacity.
258 header.slot_count.fetch_sub(1, Ordering::Relaxed);
259 return None;
260 }
261 let ptr = self.slot_ptr(idx);
262 // SAFETY: pointer is in-bounds and not yet observed by readers
263 // — slot_count was incremented above before any writer or reader
264 // could see this index, and the Release on `flags` below
265 // publishes the populated slot.
266 unsafe {
267 let raw = ptr.as_ptr();
268 std::ptr::addr_of_mut!((*raw).id).write(id);
269 let label_ptr = std::ptr::addr_of_mut!((*raw).label);
270 (*label_ptr).fill(0);
271 let bytes = label.as_bytes();
272 let n = bytes.len().min(COUNTER_LABEL_BYTES);
273 std::ptr::copy_nonoverlapping(bytes.as_ptr(), label_ptr.cast::<u8>(), n);
274 // value left at its zero initialisation.
275 (*raw)
276 .flags
277 .store(flags | COUNTER_FLAG_IN_USE, Ordering::Release);
278 }
279 Some(unsafe { CounterHandle::from_ptr(ptr) })
280 }
281
282 /// Allocate a fresh, zero-initialised counters file on the heap and
283 /// initialise it. Returns an [`OwnedCountersFile`] that owns the
284 /// backing buffer alongside the `CountersFile` view.
285 ///
286 /// Safe alternative to [`CountersFile::init`] for single-process use
287 /// cases (tests, in-process metrics) where the counters file does
288 /// not need to be shared across processes via SHM or mmap.
289 ///
290 /// The returned `OwnedCountersFile` derefs to `&CountersFile`, so
291 /// all `register` / `snapshot` / etc. operations work directly:
292 ///
293 /// ```
294 /// use disruptor_mp::observability::CountersFile;
295 /// let file = CountersFile::boxed();
296 /// let handle = file.register(1, 0, "events_published").unwrap();
297 /// handle.inc();
298 /// assert_eq!(file.snapshot()[0].value, 1);
299 /// ```
300 #[must_use]
301 pub fn boxed() -> OwnedCountersFile {
302 // `CountersHeader` and `CounterSlot` are `#[repr(C, align(64))]`,
303 // so the backing buffer must also be 64-byte aligned. A plain
304 // `Box<[u8; N]>` only guarantees `align_of::<u8>() = 1`, which
305 // tripped the doctest with a `misaligned pointer dereference:
306 // address must be a multiple of 0x40` on aarch64. The
307 // `OwnedCountersBuf` newtype below carries `#[repr(C, align(64))]`
308 // so the heap allocation comes out cache-line aligned. The
309 // matching `Drop` reconstitutes `Box<OwnedCountersBuf>` (same
310 // type, same alignment) to release the allocation safely.
311 let buf: Box<OwnedCountersBuf> =
312 Box::new(OwnedCountersBuf([0u8; COUNTERS_FILE_RESERVED_BYTES]));
313 let ptr = Box::into_raw(buf);
314 // SAFETY: `ptr` is a valid, non-null, exclusively-owned pointer to
315 // a freshly-allocated, zero-initialised, 64-byte-aligned buffer of
316 // exactly `COUNTERS_FILE_RESERVED_BYTES`. The view stored in
317 // `file.base` aliases the same heap region; `OwnedCountersFile`'s
318 // `Drop` reconstitutes the `Box<OwnedCountersBuf>` to free it.
319 let view = unsafe { CountersFile::init(NonNull::new_unchecked(ptr.cast::<u8>())) };
320 OwnedCountersFile {
321 file: view,
322 buf: ptr,
323 }
324 }
325
326 /// Snapshot all in-use slots — `(id, flags, value, label)` tuples.
327 /// Intended for external readers and tests.
328 pub fn snapshot(&self) -> Vec<CounterSnapshot> {
329 let header = self.header();
330 let count = header.slot_count.load(Ordering::Acquire) as usize;
331 let count = count.min(MAX_COUNTER_SLOTS);
332 let mut out = Vec::with_capacity(count);
333 for idx in 0..count {
334 let ptr = self.slot_ptr(idx);
335 let slot = unsafe { ptr.as_ref() };
336 let flags = slot.flags.load(Ordering::Acquire);
337 if flags & COUNTER_FLAG_IN_USE == 0 {
338 continue;
339 }
340 let label_len = slot
341 .label
342 .iter()
343 .position(|b| *b == 0)
344 .unwrap_or(COUNTER_LABEL_BYTES);
345 let label = String::from_utf8_lossy(&slot.label[..label_len]).into_owned();
346 out.push(CounterSnapshot {
347 id: slot.id,
348 flags,
349 value: slot.value.load(Ordering::Relaxed),
350 label,
351 });
352 }
353 out
354 }
355}
356
357/// 64-byte-aligned backing buffer for [`OwnedCountersFile`]. Kept
358/// `repr(C, align(64))` because [`CountersHeader`] and [`CounterSlot`]
359/// (the structs the buffer is reinterpreted as) are themselves 64-byte
360/// aligned — a plain `Box<[u8; N]>` only promises 1-byte alignment,
361/// which trips `misaligned pointer dereference` panics on platforms
362/// that enforce alignment (e.g. aarch64 in debug builds).
363#[repr(C, align(64))]
364struct OwnedCountersBuf([u8; COUNTERS_FILE_RESERVED_BYTES]);
365
366/// Heap-allocated [`CountersFile`] that owns its backing buffer.
367/// Produced by [`CountersFile::boxed`] for use cases that don't need
368/// cross-process sharing (tests, in-process metrics).
369///
370/// Derefs to `&CountersFile` so all the usual `register` /
371/// `snapshot` / etc. methods work directly.
372pub struct OwnedCountersFile {
373 file: CountersFile,
374 // Raw pointer to the boxed buffer. Reconstituted into a `Box` in
375 // `Drop` so the allocation is released. We keep it as a raw pointer
376 // rather than a `Box` field because `Box<[u8; N]>` and the `NonNull`
377 // inside `file` are both aliasing the same heap region, and Rust's
378 // aliasing rules are friendlier when we don't materialise both as
379 // separate owners during the struct's lifetime.
380 buf: *mut OwnedCountersBuf,
381}
382
383// SAFETY: the buffer is heap-owned, isolated to this `OwnedCountersFile`
384// instance; the file view is `Send`+`Sync` (it's an opaque pointer that
385// the inner type already asserts safe for cross-thread access through
386// its own Send/Sync impls). Wrapping it adds no new aliasing.
387unsafe impl Send for OwnedCountersFile {}
388unsafe impl Sync for OwnedCountersFile {}
389
390impl std::ops::Deref for OwnedCountersFile {
391 type Target = CountersFile;
392 fn deref(&self) -> &CountersFile {
393 &self.file
394 }
395}
396
397impl std::fmt::Debug for OwnedCountersFile {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 f.debug_struct("OwnedCountersFile")
400 .field("file", &self.file)
401 .finish()
402 }
403}
404
405impl Drop for OwnedCountersFile {
406 fn drop(&mut self) {
407 // SAFETY: `buf` was produced by `Box::into_raw` in
408 // `CountersFile::boxed`; reconstituting the Box releases the
409 // allocation. `self.file` is dropped before this Drop body
410 // returns and the file's NonNull pointer is not accessed
411 // afterwards.
412 unsafe {
413 drop(Box::from_raw(self.buf));
414 }
415 }
416}
417
418/// Error returned by [`CountersFile::attach`].
419#[derive(Debug)]
420pub enum AttachError {
421 /// Magic word didn't match `COUNTERS_MAGIC`.
422 BadMagic(u32),
423 /// Header version is from a newer layout this build doesn't know.
424 UnsupportedVersion(u32),
425}
426
427impl std::fmt::Display for AttachError {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 match self {
430 AttachError::BadMagic(m) => write!(f, "counters file: bad magic 0x{m:08x}"),
431 AttachError::UnsupportedVersion(v) => {
432 write!(f, "counters file: unsupported version {v}")
433 }
434 }
435 }
436}
437
438impl std::error::Error for AttachError {}
439
440/// Plain-data view of one counter slot, useful for tests and external
441/// readers (e.g. a future `myelon-stat` binary).
442#[derive(Debug, Clone)]
443pub struct CounterSnapshot {
444 /// Stable counter identifier.
445 pub id: u32,
446 /// `COUNTER_FLAG_*` bits.
447 pub flags: u32,
448 /// Current value at snapshot time.
449 pub value: u64,
450 /// Decoded label.
451 pub label: String,
452}
453
454// ---------- counter ID registry --------------------------------------------
455
456/// Stable counter IDs used by `disruptor-mp`. See RFC 0040 §Counters for
457/// the canonical table. Values above 0x100 are reserved for application
458/// extension via [`CountersFile::register`].
459pub mod ids {
460 // Producer hot path
461 /// `events_published` — successful `publish()` calls.
462 pub const EVENTS_PUBLISHED: u32 = 0x10;
463 /// `events_published_bytes` — cumulative payload bytes published.
464 pub const EVENTS_PUBLISHED_BYTES: u32 = 0x11;
465 /// `producer_full_events` — `publish()` saw ring full.
466 pub const PRODUCER_FULL_EVENTS: u32 = 0x12;
467 /// `producer_park_count` — publisher parked on full.
468 pub const PRODUCER_PARK_COUNT: u32 = 0x13;
469 /// `producer_unpark_count` — publisher unparked by consumer.
470 pub const PRODUCER_UNPARK_COUNT: u32 = 0x14;
471
472 // Consumer hot path
473 /// `events_consumed` — successful `try_consume_next()` calls.
474 pub const EVENTS_CONSUMED: u32 = 0x20;
475 /// `events_consumed_bytes` — cumulative payload bytes consumed.
476 pub const EVENTS_CONSUMED_BYTES: u32 = 0x21;
477 /// `consumer_empty_spins` — `try_consume_next` saw ring empty.
478 pub const CONSUMER_EMPTY_SPINS: u32 = 0x22;
479 /// `consumer_park_count` — consumer parked on empty.
480 pub const CONSUMER_PARK_COUNT: u32 = 0x23;
481 /// `consumer_unpark_count` — consumer unparked by producer.
482 pub const CONSUMER_UNPARK_COUNT: u32 = 0x24;
483 /// `consumer_lag_max` — high-water mark of `producer_seq − consumer_seq`.
484 pub const CONSUMER_LAG_MAX: u32 = 0x25;
485
486 // Coordination (myelon typed layer)
487 /// `frame_publish_total` — `FramedTransport::publish()` calls.
488 pub const FRAME_PUBLISH_TOTAL: u32 = 0x30;
489 /// `frame_fragment_count` — frames split across slots.
490 pub const FRAME_FRAGMENT_COUNT: u32 = 0x31;
491 /// `frame_reassemble_count` — frames reassembled on consume.
492 pub const FRAME_REASSEMBLE_COUNT: u32 = 0x32;
493 /// `codec_encode_total` — `Codec::encode()` calls.
494 pub const CODEC_ENCODE_TOTAL: u32 = 0x33;
495 /// `codec_decode_total` — `Codec::decode()` calls.
496 pub const CODEC_DECODE_TOTAL: u32 = 0x34;
497
498 /// First ID reserved for application extension.
499 pub const APP_RESERVED_BASE: u32 = 0x100;
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 /// 64-byte aligned region matching the layout expectations of
507 /// `CountersHeader` / `CounterSlot`. Real shared-memory mappings
508 /// satisfy this alignment naturally; tests allocate it explicitly.
509 #[repr(C, align(64))]
510 struct AlignedRegion([u8; COUNTERS_FILE_RESERVED_BYTES]);
511
512 /// Allocate a zeroed, cache-line-aligned region. Heap-resident so
513 /// the pointer stays stable for the lifetime of the test.
514 fn fresh_region() -> Box<AlignedRegion> {
515 Box::new(AlignedRegion([0u8; COUNTERS_FILE_RESERVED_BYTES]))
516 }
517
518 fn ptr_from(buf: &mut AlignedRegion) -> NonNull<u8> {
519 NonNull::new(buf.0.as_mut_ptr()).unwrap()
520 }
521
522 #[test]
523 fn header_is_one_cache_line() {
524 assert_eq!(std::mem::size_of::<CountersHeader>(), CACHE_LINE_BYTES);
525 }
526
527 #[test]
528 fn slot_is_one_cache_line() {
529 assert_eq!(std::mem::size_of::<CounterSlot>(), CACHE_LINE_BYTES);
530 }
531
532 #[test]
533 fn header_plus_slots_fit_in_reserved_region() {
534 let bytes = std::mem::size_of::<CountersHeader>()
535 + MAX_COUNTER_SLOTS * std::mem::size_of::<CounterSlot>();
536 assert!(bytes <= COUNTERS_FILE_RESERVED_BYTES);
537 }
538
539 #[test]
540 fn init_then_attach_roundtrip() {
541 let mut buf = fresh_region();
542 let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
543 let header = file.header();
544 assert_eq!(header.magic, COUNTERS_MAGIC);
545 assert_eq!(header.slot_capacity, MAX_COUNTER_SLOTS as u32);
546 assert_eq!(
547 header.slot_stride as usize,
548 std::mem::size_of::<CounterSlot>()
549 );
550 // Re-attach to the same memory; the writer view above shares
551 // the buffer non-mutably, so simply let it fall out of scope.
552 let _ = file;
553 let attached = unsafe { CountersFile::attach(ptr_from(&mut buf)) }.unwrap();
554 assert_eq!(attached.header().magic, COUNTERS_MAGIC);
555 }
556
557 #[test]
558 fn attach_rejects_bad_magic() {
559 let mut buf = fresh_region();
560 // Don't init — magic stays zero.
561 let err = unsafe { CountersFile::attach(ptr_from(&mut buf)) }.unwrap_err();
562 match err {
563 AttachError::BadMagic(0) => {}
564 other => panic!("unexpected error: {other:?}"),
565 }
566 }
567
568 #[test]
569 fn register_inc_get_snapshot_roundtrip() {
570 let mut buf = fresh_region();
571 let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
572
573 let pub_h = file
574 .register(
575 ids::EVENTS_PUBLISHED,
576 COUNTER_FLAG_PRODUCER,
577 "events_published",
578 )
579 .unwrap();
580 let con_h = file
581 .register(
582 ids::EVENTS_CONSUMED,
583 COUNTER_FLAG_CONSUMER,
584 "events_consumed",
585 )
586 .unwrap();
587 let lag_h = file
588 .register(
589 ids::CONSUMER_LAG_MAX,
590 COUNTER_FLAG_CONSUMER,
591 "consumer_lag_max",
592 )
593 .unwrap();
594
595 for _ in 0..123 {
596 pub_h.inc();
597 }
598 con_h.add(456);
599 lag_h.record_max(7);
600 lag_h.record_max(3); // lower than current; ignored.
601 lag_h.record_max(11);
602
603 assert_eq!(pub_h.get(), 123);
604 assert_eq!(con_h.get(), 456);
605 assert_eq!(lag_h.get(), 11);
606
607 let snap = file.snapshot();
608 let by_id = |id: u32| snap.iter().find(|c| c.id == id).cloned();
609 let p = by_id(ids::EVENTS_PUBLISHED).unwrap();
610 assert_eq!(p.value, 123);
611 assert_eq!(p.label, "events_published");
612 assert_eq!(p.flags & COUNTER_FLAG_IN_USE, COUNTER_FLAG_IN_USE);
613 assert_eq!(p.flags & COUNTER_FLAG_PRODUCER, COUNTER_FLAG_PRODUCER);
614 let c = by_id(ids::EVENTS_CONSUMED).unwrap();
615 assert_eq!(c.value, 456);
616 assert_eq!(c.flags & COUNTER_FLAG_CONSUMER, COUNTER_FLAG_CONSUMER);
617 let lag = by_id(ids::CONSUMER_LAG_MAX).unwrap();
618 assert_eq!(lag.value, 11);
619 }
620
621 #[test]
622 fn register_returns_none_at_capacity() {
623 let mut buf = fresh_region();
624 let file = unsafe { CountersFile::init(ptr_from(&mut buf)) };
625 for i in 0..MAX_COUNTER_SLOTS {
626 let h = file.register(i as u32, COUNTER_FLAG_PRODUCER, "x");
627 assert!(h.is_some(), "slot {i} should fit");
628 }
629 assert!(file
630 .register(0xDEAD, COUNTER_FLAG_PRODUCER, "overflow")
631 .is_none());
632 // slot_count must be clamped (rolled back when over-capacity).
633 assert_eq!(
634 file.header().slot_count.load(Ordering::Acquire) as usize,
635 MAX_COUNTER_SLOTS
636 );
637 }
638
639 #[test]
640 fn external_attach_sees_writer_increments() {
641 // Simulates a separate "process": one CountersFile initialises &
642 // writes, a second view attaches to the same memory and reads.
643 let mut buf = fresh_region();
644 // Take the raw pointer once so both views share it without
645 // creating overlapping mutable borrows of `buf`.
646 let raw = ptr_from(&mut buf);
647 let writer = unsafe { CountersFile::init(raw) };
648 let h = writer
649 .register(
650 ids::EVENTS_PUBLISHED,
651 COUNTER_FLAG_PRODUCER,
652 "events_published",
653 )
654 .unwrap();
655 for _ in 0..100 {
656 h.inc();
657 }
658 let reader = unsafe { CountersFile::attach(raw) }.unwrap();
659 let snap = reader.snapshot();
660 assert_eq!(snap.len(), 1);
661 assert_eq!(snap[0].id, ids::EVENTS_PUBLISHED);
662 assert_eq!(snap[0].value, 100);
663 assert_eq!(snap[0].label, "events_published");
664 }
665
666 #[test]
667 fn boxed_creates_usable_counters_file() {
668 // The safe `CountersFile::boxed()` wrapper allocates and
669 // initialises the counters region in one call; the returned
670 // `OwnedCountersFile` derefs to `&CountersFile`.
671 let file = CountersFile::boxed();
672 assert_eq!(file.header().magic, COUNTERS_MAGIC);
673 let h = file
674 .register(
675 ids::EVENTS_PUBLISHED,
676 COUNTER_FLAG_PRODUCER,
677 "events_published",
678 )
679 .expect("first slot fits");
680 for _ in 0..7 {
681 h.inc();
682 }
683 let snap = file.snapshot();
684 assert_eq!(snap.len(), 1);
685 assert_eq!(snap[0].id, ids::EVENTS_PUBLISHED);
686 assert_eq!(snap[0].value, 7);
687 }
688
689 #[test]
690 fn owned_counters_file_drop_releases_buffer() {
691 // Smoke-test for the Drop impl: allocating, registering, then
692 // dropping a sequence of `OwnedCountersFile`s should not leak
693 // or fault. Miri / asan would catch UAF / leak; this test just
694 // exercises the path so a regression in Drop is at minimum
695 // noisy in normal runs.
696 for _ in 0..1024 {
697 let f = CountersFile::boxed();
698 let _ = f.register(1, COUNTER_FLAG_PRODUCER, "x");
699 // Drop at end of scope.
700 }
701 }
702
703 #[test]
704 fn owned_counters_file_is_send_and_sync() {
705 // Compile-time check that we can move it across threads.
706 fn assert_send<T: Send>() {}
707 fn assert_sync<T: Sync>() {}
708 assert_send::<OwnedCountersFile>();
709 assert_sync::<OwnedCountersFile>();
710 }
711}