Skip to main content

palladium_runtime/
ring_buffer.rs

1use palladium_transport::MailboxMessage;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4// ── CacheLineAtomicBool ───────────────────────────────────────────────────────
5
6/// A single-bit flag padded to a full cache line (64 bytes) to prevent false
7/// sharing when an array of flags is accessed concurrently from different cores.
8///
9/// Used for the per-core `has_work` array: the sender core sets the destination
10/// core's flag after a ring push; the drain task reads it to skip ring scanning
11/// when there is no pending inter-core traffic.
12#[repr(align(64))]
13pub struct CacheLineAtomicBool {
14    pub value: AtomicBool,
15}
16
17impl CacheLineAtomicBool {
18    pub const fn new(v: bool) -> Self {
19        Self {
20            value: AtomicBool::new(v),
21        }
22    }
23
24    /// Signal that work is available (Release so ring push is visible to reader).
25    #[inline]
26    pub fn signal(&self) {
27        self.value.store(true, Ordering::Release);
28    }
29
30    /// Set the flag to `true` with Release ordering.
31    #[inline]
32    pub fn set_true(&self) {
33        self.value.store(true, Ordering::Release);
34    }
35
36    /// Set the flag to `false` with Release ordering.
37    #[inline]
38    pub fn set_false(&self) {
39        self.value.store(false, Ordering::Release);
40    }
41
42    /// Load the flag with Acquire ordering (paired with `signal`).
43    #[inline]
44    pub fn is_set(&self) -> bool {
45        self.value.load(Ordering::Acquire)
46    }
47
48    /// Atomically read and clear the flag (AcqRel).
49    #[inline]
50    pub fn take(&self) -> bool {
51        self.value.swap(false, Ordering::AcqRel)
52    }
53}
54
55// ── InterCoreQueue ────────────────────────────────────────────────────────────
56
57/// Cache-line-padded SPSC inter-core message queue.
58///
59/// One instance per (source_core, dest_core) pair.  The `#[repr(align(64))]`
60/// ensures that adjacent queue entries in a `Vec<InterCoreQueue>` do not share
61/// a cache line, preventing false sharing between cores that access different
62/// pairs.
63///
64/// Uses [`crossbeam_queue::ArrayQueue`] for wait-free push/pop.  `push`
65/// returns `false` when the queue is at capacity — callers should treat this
66/// as a backpressure signal.
67#[repr(align(64))]
68pub struct InterCoreQueue {
69    inner: crossbeam_queue::ArrayQueue<MailboxMessage>,
70}
71
72impl InterCoreQueue {
73    pub fn new(capacity: usize) -> Self {
74        Self {
75            inner: crossbeam_queue::ArrayQueue::new(capacity),
76        }
77    }
78
79    /// Push a message. Returns `false` if the queue is full.
80    pub fn push(&self, msg: MailboxMessage) -> bool {
81        self.inner.push(msg).is_ok()
82    }
83
84    /// Pop the next message, or `None` if empty.
85    pub fn pop(&self) -> Option<MailboxMessage> {
86        self.inner.pop()
87    }
88
89    pub fn is_empty(&self) -> bool {
90        self.inner.is_empty()
91    }
92
93    pub fn len(&self) -> usize {
94        self.inner.len()
95    }
96
97    pub fn capacity(&self) -> usize {
98        self.inner.capacity()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use palladium_actor::{AddrHash, Envelope, MessagePayload};
106
107    fn make_msg(tag: u32) -> MailboxMessage {
108        let src = AddrHash::synthetic(b"src");
109        let dst = AddrHash::synthetic(b"dst");
110        MailboxMessage {
111            envelope: Envelope::new(src, dst, tag as u64, 0),
112            payload: MessagePayload::local(tag),
113        }
114    }
115
116    #[test]
117    fn ring_buffer_push_pop_fifo() {
118        let q = InterCoreQueue::new(8);
119        assert!(q.push(make_msg(1)));
120        assert!(q.push(make_msg(2)));
121        assert!(q.push(make_msg(3)));
122
123        let m1 = q.pop().unwrap();
124        let m2 = q.pop().unwrap();
125        let m3 = q.pop().unwrap();
126
127        // FIFO: pop order matches push order.
128        assert_eq!(m1.envelope.type_tag, 1);
129        assert_eq!(m2.envelope.type_tag, 2);
130        assert_eq!(m3.envelope.type_tag, 3);
131        assert!(q.pop().is_none());
132    }
133
134    #[test]
135    fn ring_buffer_bounded_capacity_returns_false_when_full() {
136        let q = InterCoreQueue::new(4);
137        assert!(q.push(make_msg(0)));
138        assert!(q.push(make_msg(1)));
139        assert!(q.push(make_msg(2)));
140        assert!(q.push(make_msg(3)));
141
142        // Queue is now at capacity; next push must fail.
143        assert!(!q.push(make_msg(4)), "push should fail when queue is full");
144        assert_eq!(q.len(), 4);
145    }
146
147    #[test]
148    fn ring_buffer_is_empty_and_len() {
149        let q = InterCoreQueue::new(4);
150        assert!(q.is_empty());
151        assert_eq!(q.len(), 0);
152
153        q.push(make_msg(7));
154        assert!(!q.is_empty());
155        assert_eq!(q.len(), 1);
156
157        q.pop();
158        assert!(q.is_empty());
159    }
160
161    #[test]
162    fn ring_buffer_cache_line_aligned() {
163        // Struct must be at least one cache line (64 bytes) aligned.
164        // crossbeam's ArrayQueue is internally 128-byte aligned, so we may get
165        // more than 64 — that's fine and still prevents false sharing.
166        assert!(std::mem::align_of::<InterCoreQueue>() >= 64);
167    }
168
169    /// Two threads: producer pushes 1000 messages; consumer pops all of them.
170    /// Verifies the queue is safe across threads (MailboxMessage: Send).
171    #[test]
172    fn ring_buffer_cross_thread_send_recv() {
173        use std::sync::Arc;
174
175        const N: u32 = 1000;
176        const CAP: usize = 128;
177        let q = Arc::new(InterCoreQueue::new(CAP));
178
179        let q_prod = Arc::clone(&q);
180        let producer = std::thread::spawn(move || {
181            let mut sent = 0u32;
182            while sent < N {
183                if q_prod.push(make_msg(sent)) {
184                    sent += 1;
185                } else {
186                    std::hint::spin_loop();
187                }
188            }
189        });
190
191        let mut received = Vec::with_capacity(N as usize);
192        while received.len() < N as usize {
193            if let Some(msg) = q.pop() {
194                received.push(msg.envelope.type_tag);
195            } else {
196                std::hint::spin_loop();
197            }
198        }
199
200        producer.join().unwrap();
201        assert_eq!(received.len(), N as usize);
202        // FIFO: values arrive in order 0..N.
203        for (i, &tag) in received.iter().enumerate() {
204            assert_eq!(tag, i as u64, "FIFO violated at index {i}");
205        }
206    }
207}