polymarket_kernel/
ring_buffer.rs1use core::ffi::c_int;
2use core::marker::PhantomData;
3use core::mem::MaybeUninit;
4use core::ptr::NonNull;
5
6const CACHELINE: usize = 64;
7const INDEX_PAD: usize = CACHELINE - core::mem::size_of::<u64>();
8
9#[repr(C)]
10#[derive(Clone, Copy, Debug, Default, PartialEq)]
11pub struct L2Update {
12 pub market_id: u64,
13 pub mid_price: f64,
14 pub implied_vol: f64,
15}
16
17#[repr(C, align(64))]
18pub struct SpscRingBuffer {
19 _head: u64,
20 _head_pad: [u8; INDEX_PAD],
21 _tail: u64,
22 _tail_pad: [u8; INDEX_PAD],
23 _slots: *mut L2Update,
24 _capacity: u64,
25 _mask: u64,
26}
27
28impl Default for SpscRingBuffer {
29 fn default() -> Self {
30 Self {
31 _head: 0,
32 _head_pad: [0; INDEX_PAD],
33 _tail: 0,
34 _tail_pad: [0; INDEX_PAD],
35 _slots: core::ptr::null_mut(),
36 _capacity: 0,
37 _mask: 0,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum RingInitError {
44 InvalidCapacity,
45 InitFailed,
46}
47
48unsafe extern "C" {
49 fn spsc_ring_buffer_init(rb: *mut SpscRingBuffer, slots: *mut L2Update, capacity: u64)
50 -> c_int;
51 fn spsc_ring_buffer_push(rb: *mut SpscRingBuffer, msg: *const L2Update) -> c_int;
52 fn spsc_ring_buffer_pop(rb: *mut SpscRingBuffer, out: *mut L2Update) -> c_int;
53 fn spsc_ring_buffer_capacity(rb: *const SpscRingBuffer) -> u64;
54 fn spsc_ring_buffer_len(rb: *const SpscRingBuffer) -> u64;
55 fn spsc_ring_buffer_is_empty(rb: *const SpscRingBuffer) -> c_int;
56 fn spsc_ring_buffer_is_full(rb: *const SpscRingBuffer) -> c_int;
57}
58
59pub struct Producer<'a> {
60 ring: NonNull<SpscRingBuffer>,
61 _marker: PhantomData<(&'a SpscRingBuffer, &'a [L2Update])>,
62}
63
64pub struct Consumer<'a> {
65 ring: NonNull<SpscRingBuffer>,
66 _marker: PhantomData<(&'a SpscRingBuffer, &'a [L2Update])>,
67}
68
69pub fn split<'a>(
70 ring: &'a mut SpscRingBuffer,
71 slots: &'a mut [L2Update],
72) -> Result<(Producer<'a>, Consumer<'a>), RingInitError> {
73 if slots.len() < 2 || !slots.len().is_power_of_two() {
74 return Err(RingInitError::InvalidCapacity);
75 }
76
77 let rc =
78 unsafe { spsc_ring_buffer_init(ring as *mut _, slots.as_mut_ptr(), slots.len() as u64) };
79 if rc != 1 {
80 return Err(RingInitError::InitFailed);
81 }
82
83 let ptr = NonNull::from(ring);
84 Ok((
85 Producer {
86 ring: ptr,
87 _marker: PhantomData,
88 },
89 Consumer {
90 ring: ptr,
91 _marker: PhantomData,
92 },
93 ))
94}
95
96impl Producer<'_> {
97 #[inline]
98 pub fn try_push(&mut self, update: L2Update) -> Result<(), L2Update> {
99 let rc = unsafe { spsc_ring_buffer_push(self.ring.as_ptr(), &update as *const _) };
100 if rc == 1 { Ok(()) } else { Err(update) }
101 }
102
103 #[inline]
104 pub fn capacity(&self) -> usize {
105 unsafe { spsc_ring_buffer_capacity(self.ring.as_ptr()) as usize }
106 }
107
108 #[inline]
109 pub fn len(&self) -> usize {
110 unsafe { spsc_ring_buffer_len(self.ring.as_ptr()) as usize }
111 }
112
113 #[inline]
114 pub fn is_full(&self) -> bool {
115 unsafe { spsc_ring_buffer_is_full(self.ring.as_ptr()) == 1 }
116 }
117}
118
119impl Consumer<'_> {
120 #[inline]
121 pub fn try_pop(&mut self) -> Option<L2Update> {
122 let mut out = MaybeUninit::<L2Update>::uninit();
123 let rc = unsafe { spsc_ring_buffer_pop(self.ring.as_ptr(), out.as_mut_ptr()) };
124 if rc == 1 {
125 Some(unsafe { out.assume_init() })
126 } else {
127 None
128 }
129 }
130
131 #[inline]
132 pub fn capacity(&self) -> usize {
133 unsafe { spsc_ring_buffer_capacity(self.ring.as_ptr()) as usize }
134 }
135
136 #[inline]
137 pub fn len(&self) -> usize {
138 unsafe { spsc_ring_buffer_len(self.ring.as_ptr()) as usize }
139 }
140
141 #[inline]
142 pub fn is_empty(&self) -> bool {
143 unsafe { spsc_ring_buffer_is_empty(self.ring.as_ptr()) == 1 }
144 }
145}
146
147unsafe impl Send for Producer<'_> {}
150unsafe impl Send for Consumer<'_> {}
151
152unsafe impl Sync for Producer<'_> {}
155unsafe impl Sync for Consumer<'_> {}