Skip to main content

polymarket_kernel/
ring_buffer.rs

1use core::ffi::c_int;
2use core::marker::PhantomData;
3use core::mem::MaybeUninit;
4use core::ptr::NonNull;
5
6const CACHELINE: usize = 64;
7const INDEX_PAD: usize = CACHELINE - core::mem::size_of::<u64>();
8
9#[repr(C)]
10#[derive(Clone, Copy, Debug, Default, PartialEq)]
11pub struct L2Update {
12    pub market_id: u64,
13    pub mid_price: f64,
14    pub implied_vol: f64,
15}
16
17#[repr(C, align(64))]
18pub struct SpscRingBuffer {
19    _head: u64,
20    _head_pad: [u8; INDEX_PAD],
21    _tail: u64,
22    _tail_pad: [u8; INDEX_PAD],
23    _slots: *mut L2Update,
24    _capacity: u64,
25    _mask: u64,
26}
27
28impl Default for SpscRingBuffer {
29    fn default() -> Self {
30        Self {
31            _head: 0,
32            _head_pad: [0; INDEX_PAD],
33            _tail: 0,
34            _tail_pad: [0; INDEX_PAD],
35            _slots: core::ptr::null_mut(),
36            _capacity: 0,
37            _mask: 0,
38        }
39    }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum RingInitError {
44    InvalidCapacity,
45    InitFailed,
46}
47
48unsafe extern "C" {
49    fn spsc_ring_buffer_init(rb: *mut SpscRingBuffer, slots: *mut L2Update, capacity: u64)
50    -> c_int;
51    fn spsc_ring_buffer_push(rb: *mut SpscRingBuffer, msg: *const L2Update) -> c_int;
52    fn spsc_ring_buffer_pop(rb: *mut SpscRingBuffer, out: *mut L2Update) -> c_int;
53    fn spsc_ring_buffer_capacity(rb: *const SpscRingBuffer) -> u64;
54    fn spsc_ring_buffer_len(rb: *const SpscRingBuffer) -> u64;
55    fn spsc_ring_buffer_is_empty(rb: *const SpscRingBuffer) -> c_int;
56    fn spsc_ring_buffer_is_full(rb: *const SpscRingBuffer) -> c_int;
57}
58
59pub struct Producer<'a> {
60    ring: NonNull<SpscRingBuffer>,
61    _marker: PhantomData<(&'a SpscRingBuffer, &'a [L2Update])>,
62}
63
64pub struct Consumer<'a> {
65    ring: NonNull<SpscRingBuffer>,
66    _marker: PhantomData<(&'a SpscRingBuffer, &'a [L2Update])>,
67}
68
69pub fn split<'a>(
70    ring: &'a mut SpscRingBuffer,
71    slots: &'a mut [L2Update],
72) -> Result<(Producer<'a>, Consumer<'a>), RingInitError> {
73    if slots.len() < 2 || !slots.len().is_power_of_two() {
74        return Err(RingInitError::InvalidCapacity);
75    }
76
77    let rc =
78        unsafe { spsc_ring_buffer_init(ring as *mut _, slots.as_mut_ptr(), slots.len() as u64) };
79    if rc != 1 {
80        return Err(RingInitError::InitFailed);
81    }
82
83    let ptr = NonNull::from(ring);
84    Ok((
85        Producer {
86            ring: ptr,
87            _marker: PhantomData,
88        },
89        Consumer {
90            ring: ptr,
91            _marker: PhantomData,
92        },
93    ))
94}
95
96impl Producer<'_> {
97    #[inline]
98    pub fn try_push(&mut self, update: L2Update) -> Result<(), L2Update> {
99        let rc = unsafe { spsc_ring_buffer_push(self.ring.as_ptr(), &update as *const _) };
100        if rc == 1 { Ok(()) } else { Err(update) }
101    }
102
103    #[inline]
104    pub fn capacity(&self) -> usize {
105        unsafe { spsc_ring_buffer_capacity(self.ring.as_ptr()) as usize }
106    }
107
108    #[inline]
109    pub fn len(&self) -> usize {
110        unsafe { spsc_ring_buffer_len(self.ring.as_ptr()) as usize }
111    }
112
113    #[inline]
114    pub fn is_full(&self) -> bool {
115        unsafe { spsc_ring_buffer_is_full(self.ring.as_ptr()) == 1 }
116    }
117}
118
119impl Consumer<'_> {
120    #[inline]
121    pub fn try_pop(&mut self) -> Option<L2Update> {
122        let mut out = MaybeUninit::<L2Update>::uninit();
123        let rc = unsafe { spsc_ring_buffer_pop(self.ring.as_ptr(), out.as_mut_ptr()) };
124        if rc == 1 {
125            Some(unsafe { out.assume_init() })
126        } else {
127            None
128        }
129    }
130
131    #[inline]
132    pub fn capacity(&self) -> usize {
133        unsafe { spsc_ring_buffer_capacity(self.ring.as_ptr()) as usize }
134    }
135
136    #[inline]
137    pub fn len(&self) -> usize {
138        unsafe { spsc_ring_buffer_len(self.ring.as_ptr()) as usize }
139    }
140
141    #[inline]
142    pub fn is_empty(&self) -> bool {
143        unsafe { spsc_ring_buffer_is_empty(self.ring.as_ptr()) == 1 }
144    }
145}
146
147// SAFETY: each endpoint is intended for single-thread ownership and uses only
148// lock-free atomic coordination in C. Moving endpoints across threads is safe.
149unsafe impl Send for Producer<'_> {}
150unsafe impl Send for Consumer<'_> {}
151
152// SAFETY: sharing references is safe because mutation APIs require `&mut self`
153// and all cross-thread synchronization is handled by the underlying atomics.
154unsafe impl Sync for Producer<'_> {}
155unsafe impl Sync for Consumer<'_> {}