palladium_runtime/
ring_buffer.rs1use palladium_transport::MailboxMessage;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4#[repr(align(64))]
13pub struct CacheLineAtomicBool {
14 pub value: AtomicBool,
15}
16
17impl CacheLineAtomicBool {
18 pub const fn new(v: bool) -> Self {
19 Self {
20 value: AtomicBool::new(v),
21 }
22 }
23
24 #[inline]
26 pub fn signal(&self) {
27 self.value.store(true, Ordering::Release);
28 }
29
30 #[inline]
32 pub fn set_true(&self) {
33 self.value.store(true, Ordering::Release);
34 }
35
36 #[inline]
38 pub fn set_false(&self) {
39 self.value.store(false, Ordering::Release);
40 }
41
42 #[inline]
44 pub fn is_set(&self) -> bool {
45 self.value.load(Ordering::Acquire)
46 }
47
48 #[inline]
50 pub fn take(&self) -> bool {
51 self.value.swap(false, Ordering::AcqRel)
52 }
53}
54
55#[repr(align(64))]
68pub struct InterCoreQueue {
69 inner: crossbeam_queue::ArrayQueue<MailboxMessage>,
70}
71
72impl InterCoreQueue {
73 pub fn new(capacity: usize) -> Self {
74 Self {
75 inner: crossbeam_queue::ArrayQueue::new(capacity),
76 }
77 }
78
79 pub fn push(&self, msg: MailboxMessage) -> bool {
81 self.inner.push(msg).is_ok()
82 }
83
84 pub fn pop(&self) -> Option<MailboxMessage> {
86 self.inner.pop()
87 }
88
89 pub fn is_empty(&self) -> bool {
90 self.inner.is_empty()
91 }
92
93 pub fn len(&self) -> usize {
94 self.inner.len()
95 }
96
97 pub fn capacity(&self) -> usize {
98 self.inner.capacity()
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use palladium_actor::{AddrHash, Envelope, MessagePayload};
106
107 fn make_msg(tag: u32) -> MailboxMessage {
108 let src = AddrHash::synthetic(b"src");
109 let dst = AddrHash::synthetic(b"dst");
110 MailboxMessage {
111 envelope: Envelope::new(src, dst, tag as u64, 0),
112 payload: MessagePayload::local(tag),
113 }
114 }
115
116 #[test]
117 fn ring_buffer_push_pop_fifo() {
118 let q = InterCoreQueue::new(8);
119 assert!(q.push(make_msg(1)));
120 assert!(q.push(make_msg(2)));
121 assert!(q.push(make_msg(3)));
122
123 let m1 = q.pop().unwrap();
124 let m2 = q.pop().unwrap();
125 let m3 = q.pop().unwrap();
126
127 assert_eq!(m1.envelope.type_tag, 1);
129 assert_eq!(m2.envelope.type_tag, 2);
130 assert_eq!(m3.envelope.type_tag, 3);
131 assert!(q.pop().is_none());
132 }
133
134 #[test]
135 fn ring_buffer_bounded_capacity_returns_false_when_full() {
136 let q = InterCoreQueue::new(4);
137 assert!(q.push(make_msg(0)));
138 assert!(q.push(make_msg(1)));
139 assert!(q.push(make_msg(2)));
140 assert!(q.push(make_msg(3)));
141
142 assert!(!q.push(make_msg(4)), "push should fail when queue is full");
144 assert_eq!(q.len(), 4);
145 }
146
147 #[test]
148 fn ring_buffer_is_empty_and_len() {
149 let q = InterCoreQueue::new(4);
150 assert!(q.is_empty());
151 assert_eq!(q.len(), 0);
152
153 q.push(make_msg(7));
154 assert!(!q.is_empty());
155 assert_eq!(q.len(), 1);
156
157 q.pop();
158 assert!(q.is_empty());
159 }
160
161 #[test]
162 fn ring_buffer_cache_line_aligned() {
163 assert!(std::mem::align_of::<InterCoreQueue>() >= 64);
167 }
168
169 #[test]
172 fn ring_buffer_cross_thread_send_recv() {
173 use std::sync::Arc;
174
175 const N: u32 = 1000;
176 const CAP: usize = 128;
177 let q = Arc::new(InterCoreQueue::new(CAP));
178
179 let q_prod = Arc::clone(&q);
180 let producer = std::thread::spawn(move || {
181 let mut sent = 0u32;
182 while sent < N {
183 if q_prod.push(make_msg(sent)) {
184 sent += 1;
185 } else {
186 std::hint::spin_loop();
187 }
188 }
189 });
190
191 let mut received = Vec::with_capacity(N as usize);
192 while received.len() < N as usize {
193 if let Some(msg) = q.pop() {
194 received.push(msg.envelope.type_tag);
195 } else {
196 std::hint::spin_loop();
197 }
198 }
199
200 producer.join().unwrap();
201 assert_eq!(received.len(), N as usize);
202 for (i, &tag) in received.iter().enumerate() {
204 assert_eq!(tag, i as u64, "FIFO violated at index {i}");
205 }
206 }
207}