1use std::{
2 cell::UnsafeCell,
3 sync::atomic::{Ordering, fence},
4};
5
6use bytemuck::{Pod, Zeroable};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum EventSize {
11 XS, S, M, L, XL, XXL, }
18
19#[repr(C, align(64))]
20#[derive(Debug, Copy, Clone)]
21pub struct PooledEvent<const TSHIRT_SIZE: usize> {
22 pub len: u32,
23 pub event_type: u32,
24 pub data: [u8; TSHIRT_SIZE],
25}
26
27unsafe impl<const TSHIRT_SIZE: usize> Pod for PooledEvent<TSHIRT_SIZE> {}
28unsafe impl<const TSHIRT_SIZE: usize> Zeroable for PooledEvent<TSHIRT_SIZE> {}
29
30const MAX_STACK_BYTES: usize = 1_048_576; #[repr(C, align(64))]
34#[derive(Debug)]
35pub struct RingBuffer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
36 pub published_sequence: UnsafeCell<usize>,
38 pub data: UnsafeCell<[PooledEvent<TSHIRT_SIZE>; RING_CAPACITY]>,
39}
40
41impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
43 const _STACK_GUARD: () = {
44 let total_size = TSHIRT_SIZE * RING_CAPACITY;
45 assert!(
46 total_size <= MAX_STACK_BYTES,
47 "Ring buffer too large for stack! Reduce RING_CAPACITY or TSHIRT_SIZE"
48 );
49 };
50}
51
52unsafe impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Sync for RingBuffer<TSHIRT_SIZE, RING_CAPACITY> where
53 PooledEvent<TSHIRT_SIZE>: Send + Sync
54{
55}
56
57unsafe impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Send for RingBuffer<TSHIRT_SIZE, RING_CAPACITY> where
58 PooledEvent<TSHIRT_SIZE>: Send + Sync
59{
60}
61
62impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Default for RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
69 pub fn new() -> Self {
70 #[allow(path_statements)]
72 Self::_STACK_GUARD;
73
74 unsafe {
75 Self {
76 published_sequence: UnsafeCell::new(0usize),
77 data: UnsafeCell::new(std::mem::zeroed()),
78 }
79 }
80 }
81}
82
83#[repr(C, align(64))]
84#[derive(Debug, Copy, Clone)]
85pub struct Reader<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
86 pub cursor: usize,
87 pub ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>,
88}
89
90impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Reader<TSHIRT_SIZE, RING_CAPACITY> {
91 pub fn new(ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>) -> Self {
92 Self { cursor: 0, ringbuffer }
93 }
94
95 pub fn backpressure_ratio(&self) -> f32 {
96 let ps = self.ringbuffer.published_sequence.get();
97 let write_cursor = unsafe { *ps };
98 let reader_pos = self.cursor;
99 let lag = if write_cursor >= reader_pos {
101 write_cursor - reader_pos
102 } else {
103 return 1.0; };
106 (lag as f32 / RING_CAPACITY as f32).min(1.0)
108 }
109
110 pub fn is_under_pressure(&self) -> bool {
112 self.backpressure_ratio() > 0.8 }
114
115 pub fn should_throttle(&self) -> bool {
117 self.backpressure_ratio() >= 0.9 }
119}
120
121impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Iterator for Reader<TSHIRT_SIZE, RING_CAPACITY> {
122 type Item = PooledEvent<TSHIRT_SIZE>;
123
124 #[allow(clippy::comparison_chain)]
125 fn next(&mut self) -> Option<Self::Item> {
126 let published_sequence_ptr = self.ringbuffer.published_sequence.get();
128 let published_sequence = unsafe { *published_sequence_ptr };
129 if self.cursor > published_sequence {
131 panic!("read cursor ahead of writer!")
132 } else if self.cursor == published_sequence {
133 return None;
134 }
135 fence(Ordering::Acquire);
136 let slot = self.cursor % RING_CAPACITY;
138 let current_buffer = self.ringbuffer.data.get();
139 let event_read = unsafe { (*current_buffer)[slot] };
140 self.cursor += 1;
141 Some(event_read)
142 }
143}
144
145#[repr(C, align(64))]
146#[derive(Debug)] pub struct Writer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
148 pub ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>,
149}
150
151impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Writer<TSHIRT_SIZE, RING_CAPACITY> {
152 pub fn new(ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>) -> Self {
153 Self { ringbuffer }
154 }
155
156 pub fn add(&self, e: PooledEvent<TSHIRT_SIZE>) -> bool {
157 let published_sequence_ptr = self.ringbuffer.published_sequence.get();
158 let current_sequence = unsafe { *published_sequence_ptr };
159 let slot = current_sequence % RING_CAPACITY;
160 let ptr = self.ringbuffer.data.get();
161 unsafe { (*ptr)[slot] = e };
162 fence(Ordering::Release);
163 unsafe {
164 *published_sequence_ptr = current_sequence + 1;
165 }
166 true
167 }
168}