rusted_ring/
ring.rs

1use std::{
2    cell::UnsafeCell,
3    sync::atomic::{Ordering, fence},
4};
5
6use bytemuck::{Pod, Zeroable};
7
8/// T-shirt sizing of events
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum EventSize {
11    XS,  // 64 bytes   - Heartbeats, simple state changes
12    S,   // 256 bytes  - Basic CRDT operations, chat messages
13    M,   // 1KB        - Document edits, small file attachments
14    L,   // 4KB        - Whiteboard data, medium images
15    XL,  // 16KB       - Large files, complex diagrams
16    XXL, // 64KB+      - Heap fallback for rare huge events
17}
18
19#[repr(C, align(64))]
20#[derive(Debug, Copy, Clone)]
21pub struct PooledEvent<const TSHIRT_SIZE: usize> {
22    pub len: u32,
23    pub event_type: u32,
24    pub data: [u8; TSHIRT_SIZE],
25}
26
27unsafe impl<const TSHIRT_SIZE: usize> Pod for PooledEvent<TSHIRT_SIZE> {}
28unsafe impl<const TSHIRT_SIZE: usize> Zeroable for PooledEvent<TSHIRT_SIZE> {}
29
30// Stack safety guards - prevent unreasonable memory usage
31const MAX_STACK_BYTES: usize = 1_048_576; // 1MB max per ring buffer
32
33#[repr(C, align(64))]
34#[derive(Debug)]
35pub struct RingBuffer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
36    // Single Writer updates this
37    pub published_sequence: UnsafeCell<usize>,
38    pub data: UnsafeCell<[PooledEvent<TSHIRT_SIZE>; RING_CAPACITY]>,
39}
40
41// Compile-time guard to prevent stack overflow
42impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
43    const _STACK_GUARD: () = {
44        let total_size = TSHIRT_SIZE * RING_CAPACITY;
45        assert!(
46            total_size <= MAX_STACK_BYTES,
47            "Ring buffer too large for stack! Reduce RING_CAPACITY or TSHIRT_SIZE"
48        );
49    };
50}
51
52unsafe impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Sync for RingBuffer<TSHIRT_SIZE, RING_CAPACITY> where
53    PooledEvent<TSHIRT_SIZE>: Send + Sync
54{
55}
56
57unsafe impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Send for RingBuffer<TSHIRT_SIZE, RING_CAPACITY> where
58    PooledEvent<TSHIRT_SIZE>: Send + Sync
59{
60}
61
62impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Default for RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
69    pub fn new() -> Self {
70        // Trigger compile-time check
71        #[allow(path_statements)]
72        Self::_STACK_GUARD;
73
74        unsafe {
75            Self {
76                published_sequence: UnsafeCell::new(0usize),
77                data: UnsafeCell::new(std::mem::zeroed()),
78            }
79        }
80    }
81}
82
83#[repr(C, align(64))]
84#[derive(Debug, Copy, Clone)]
85pub struct Reader<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
86    pub cursor: usize,
87    pub ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>,
88}
89
90impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Reader<TSHIRT_SIZE, RING_CAPACITY> {
91    pub fn new(ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>) -> Self {
92        Self { cursor: 0, ringbuffer }
93    }
94
95    pub fn backpressure_ratio(&self) -> f32 {
96        let ps = self.ringbuffer.published_sequence.get();
97        let write_cursor = unsafe { *ps };
98        let reader_pos = self.cursor;
99        // Handle wraparound case
100        let lag = if write_cursor >= reader_pos {
101            write_cursor - reader_pos
102        } else {
103            // Wraparound case (rare but possible)
104            return 1.0; // Assume maximum pressure
105        };
106        // Cap at 100% pressure
107        (lag as f32 / RING_CAPACITY as f32).min(1.0)
108    }
109
110    /// Check if this reader is under pressure
111    pub fn is_under_pressure(&self) -> bool {
112        self.backpressure_ratio() > 0.8 // 80% threshold
113    }
114
115    /// Check if this reader should signal for throttling
116    pub fn should_throttle(&self) -> bool {
117        self.backpressure_ratio() >= 0.9 // 90% threshold (>= not >)
118    }
119}
120
121impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Iterator for Reader<TSHIRT_SIZE, RING_CAPACITY> {
122    type Item = PooledEvent<TSHIRT_SIZE>;
123
124    #[allow(clippy::comparison_chain)]
125    fn next(&mut self) -> Option<Self::Item> {
126        // how far has writer written
127        let published_sequence_ptr = self.ringbuffer.published_sequence.get();
128        let published_sequence = unsafe { *published_sequence_ptr };
129        // reader cannot be ahead of writer
130        if self.cursor > published_sequence {
131            panic!("read cursor ahead of writer!")
132        } else if self.cursor == published_sequence {
133            return None;
134        }
135        fence(Ordering::Acquire);
136        // Calculate slot from cursor
137        let slot = self.cursor % RING_CAPACITY;
138        let current_buffer = self.ringbuffer.data.get();
139        let event_read = unsafe { (*current_buffer)[slot] };
140        self.cursor += 1;
141        Some(event_read)
142    }
143}
144
145#[repr(C, align(64))]
146#[derive(Debug)] // we INTENTIONALLY do not allow clone or copy of a writer - Single writer always!
147pub struct Writer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
148    pub ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>,
149}
150
151impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> Writer<TSHIRT_SIZE, RING_CAPACITY> {
152    pub fn new(ringbuffer: &'static RingBuffer<TSHIRT_SIZE, RING_CAPACITY>) -> Self {
153        Self { ringbuffer }
154    }
155
156    pub fn add(&self, e: PooledEvent<TSHIRT_SIZE>) -> bool {
157        let published_sequence_ptr = self.ringbuffer.published_sequence.get();
158        let current_sequence = unsafe { *published_sequence_ptr };
159        let slot = current_sequence % RING_CAPACITY;
160        let ptr = self.ringbuffer.data.get();
161        unsafe { (*ptr)[slot] = e };
162        fence(Ordering::Release);
163        unsafe {
164            *published_sequence_ptr = current_sequence + 1;
165        }
166        true
167    }
168}