Skip to main content

nodedb_bridge/
buffer.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Lock-free SPSC ring buffer.
4//!
5//! The core data structure: a fixed-capacity circular buffer with cache-line-padded
6//! head (consumer) and tail (producer) counters. Only two atomic operations per
7//! enqueue/dequeue — one `Relaxed` load of the remote counter, one `Release`/`Acquire`
8//! store of the local counter.
9//!
10//! ## Memory layout
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────┐
14//! │  [CacheLine] tail (written by producer only)        │
15//! │  [CacheLine] head (written by consumer only)        │
16//! │  [capacity]  slot array (T values)                  │
17//! └─────────────────────────────────────────────────────┘
18//! ```
19//!
20//! The `tail` and `head` live on separate cache lines to prevent false sharing.
21//! Capacity must be a power of two so we can use bitwise AND instead of modulo.
22
23use std::cell::UnsafeCell;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26
27use crate::error::{BridgeError, Result};
28use crate::metrics::BridgeMetrics;
29
30/// Cache line size on x86_64 and most aarch64 implementations.
31const CACHE_LINE: usize = 64;
32
33/// Shared state between producer and consumer.
34///
35/// The producer writes `tail`, reads `head`.
36/// The consumer writes `head`, reads `tail`.
37///
38/// Both counters are monotonically increasing u64 values. The actual slot index
39/// is `counter & (capacity - 1)` (bitwise AND because capacity is power-of-two).
40struct Shared<T> {
41    /// Producer's write cursor. Padded to its own cache line.
42    tail: CacheAligned<AtomicU64>,
43
44    /// Consumer's read cursor. Padded to its own cache line.
45    head: CacheAligned<AtomicU64>,
46
47    /// Fixed-size slot array.
48    slots: Box<[UnsafeCell<Option<T>>]>,
49
50    /// Capacity (always power of two).
51    capacity: usize,
52
53    /// Bitmask: `capacity - 1` for fast modulo.
54    mask: usize,
55
56    /// Set to true when either side is dropped.
57    disconnected: AtomicBool,
58
59    /// Shared metrics counters.
60    metrics: BridgeMetrics,
61}
62
63// SAFETY: The SPSC protocol ensures that producer and consumer never access
64// the same slot concurrently. The producer writes slots[tail & mask] and the
65// consumer reads slots[head & mask]. As long as the ring buffer is not full,
66// these indices never collide. The atomic head/tail counters enforce this.
67unsafe impl<T: Send> Send for Shared<T> {}
68unsafe impl<T: Send> Sync for Shared<T> {}
69
70/// Cache-line-aligned wrapper to prevent false sharing between head and tail.
71#[repr(align(64))]
72struct CacheAligned<T> {
73    value: T,
74    _pad: [u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
75}
76
77impl<T: Default> CacheAligned<T> {
78    fn new(value: T) -> Self {
79        Self {
80            value,
81            _pad: [0u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
82        }
83    }
84}
85
86/// The producer half of the ring buffer.
87///
88/// Lives on the **Control Plane** (Tokio). It is `Send` so it can be moved
89/// between Tokio worker threads.
90pub struct Producer<T> {
91    shared: Arc<Shared<T>>,
92
93    /// Cached copy of `head` to avoid atomic loads on every enqueue.
94    /// Only refreshed when the ring appears full.
95    cached_head: u64,
96}
97
98/// The consumer half of the ring buffer.
99///
100/// Lives on the **Data Plane** (TPC). It is `Send` so it can be sent to
101/// the TPC core during setup, but once pinned to a core it stays there.
102pub struct Consumer<T> {
103    shared: Arc<Shared<T>>,
104
105    /// Cached copy of `tail` to avoid atomic loads on every dequeue.
106    /// Only refreshed when the ring appears empty.
107    cached_tail: u64,
108}
109
110/// Create a new SPSC ring buffer with the given capacity.
111///
112/// Capacity is rounded up to the next power of two.
113///
114/// Returns `(Producer, Consumer)` — send the producer to the Control Plane
115/// and the consumer to the Data Plane.
116pub struct RingBuffer;
117
118impl RingBuffer {
119    pub fn channel<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
120        assert!(capacity > 0, "ring buffer capacity must be > 0");
121
122        let capacity = capacity.next_power_of_two();
123        let mask = capacity - 1;
124
125        let mut slots = Vec::with_capacity(capacity);
126        for _ in 0..capacity {
127            slots.push(UnsafeCell::new(None));
128        }
129
130        let shared = Arc::new(Shared {
131            tail: CacheAligned::new(AtomicU64::new(0)),
132            head: CacheAligned::new(AtomicU64::new(0)),
133            slots: slots.into_boxed_slice(),
134            capacity,
135            mask,
136            disconnected: AtomicBool::new(false),
137            metrics: BridgeMetrics::new(),
138        });
139
140        let producer = Producer {
141            shared: Arc::clone(&shared),
142            cached_head: 0,
143        };
144
145        let consumer = Consumer {
146            shared,
147            cached_tail: 0,
148        };
149
150        (producer, consumer)
151    }
152}
153
154impl<T> Producer<T> {
155    /// Try to enqueue a value. Returns `Err(BridgeError::Full)` if the ring is full,
156    /// or `Err(BridgeError::Disconnected)` if the consumer was dropped.
157    pub fn try_push(&mut self, value: T) -> Result<()> {
158        if self.shared.disconnected.load(Ordering::Relaxed) {
159            return Err(BridgeError::Disconnected { side: "consumer" });
160        }
161
162        let tail = self.shared.tail.value.load(Ordering::Relaxed);
163
164        // Fast path: check cached head first (avoids atomic load).
165        if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
166            // Slow path: refresh cached head from the atomic.
167            self.cached_head = self.shared.head.value.load(Ordering::Acquire);
168
169            if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
170                self.shared.metrics.record_full();
171                return Err(BridgeError::Full {
172                    capacity: self.shared.capacity,
173                    pending: (tail.wrapping_sub(self.cached_head)) as usize,
174                });
175            }
176        }
177
178        let idx = (tail as usize) & self.shared.mask;
179
180        // SAFETY: We have exclusive write access to this slot because:
181        // 1. We are the only producer (SPSC).
182        // 2. The consumer's head hasn't reached this slot yet (checked above).
183        unsafe {
184            (*self.shared.slots[idx].get()) = Some(value);
185        }
186
187        // Make the value visible to the consumer.
188        self.shared
189            .tail
190            .value
191            .store(tail.wrapping_add(1), Ordering::Release);
192
193        self.shared.metrics.record_push();
194        Ok(())
195    }
196
197    /// Returns the current queue utilization as a percentage (0-100).
198    pub fn utilization(&self) -> u8 {
199        let tail = self.shared.tail.value.load(Ordering::Relaxed);
200        let head = self.shared.head.value.load(Ordering::Relaxed);
201        let pending = tail.wrapping_sub(head) as usize;
202        ((pending * 100) / self.shared.capacity) as u8
203    }
204
205    /// Returns the number of items currently in the queue.
206    pub fn len(&self) -> usize {
207        let tail = self.shared.tail.value.load(Ordering::Relaxed);
208        let head = self.shared.head.value.load(Ordering::Relaxed);
209        tail.wrapping_sub(head) as usize
210    }
211
212    /// Returns `true` if the queue is empty.
213    pub fn is_empty(&self) -> bool {
214        self.len() == 0
215    }
216
217    /// Returns the fixed capacity of the ring buffer.
218    pub fn capacity(&self) -> usize {
219        self.shared.capacity
220    }
221
222    /// Returns a reference to the shared metrics.
223    pub fn metrics(&self) -> &BridgeMetrics {
224        &self.shared.metrics
225    }
226}
227
228impl<T> Consumer<T> {
229    /// Try to dequeue a value. Returns `Err(BridgeError::Empty)` if the ring is empty,
230    /// or `Err(BridgeError::Disconnected)` if the producer was dropped.
231    pub fn try_pop(&mut self) -> Result<T> {
232        let head = self.shared.head.value.load(Ordering::Relaxed);
233
234        // Fast path: check cached tail first.
235        if head == self.cached_tail {
236            // Slow path: refresh cached tail from the atomic.
237            self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
238
239            if head == self.cached_tail {
240                if self.shared.disconnected.load(Ordering::Relaxed) {
241                    return Err(BridgeError::Disconnected { side: "producer" });
242                }
243                return Err(BridgeError::Empty);
244            }
245        }
246
247        let idx = (head as usize) & self.shared.mask;
248
249        // SAFETY: We have exclusive read access to this slot because:
250        // 1. We are the only consumer (SPSC).
251        // 2. The producer has already written to this slot (tail > head, checked above).
252        let value = unsafe { (*self.shared.slots[idx].get()).take() };
253
254        // Advance head to free the slot for the producer.
255        self.shared
256            .head
257            .value
258            .store(head.wrapping_add(1), Ordering::Release);
259
260        self.shared.metrics.record_pop();
261
262        // SAFETY: The producer wrote `Some(value)` before advancing tail.
263        // We only reach here when tail > head, so the slot is guaranteed occupied.
264        Ok(value.expect("BUG: slot was None despite tail > head"))
265    }
266
267    /// Drain up to `max` items into the provided vector. Returns the count drained.
268    ///
269    /// More efficient than calling `try_pop` in a loop because it batches the
270    /// atomic tail load.
271    pub fn drain_into(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
272        let head = self.shared.head.value.load(Ordering::Relaxed);
273        self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
274
275        let available = self.cached_tail.wrapping_sub(head) as usize;
276        let count = available.min(max);
277
278        for i in 0..count {
279            let idx = ((head.wrapping_add(i as u64)) as usize) & self.shared.mask;
280            // SAFETY: same as try_pop — we've verified these slots are occupied.
281            let value = unsafe { (*self.shared.slots[idx].get()).take() };
282            buf.push(value.expect("BUG: slot was None during drain"));
283        }
284
285        if count > 0 {
286            self.shared
287                .head
288                .value
289                .store(head.wrapping_add(count as u64), Ordering::Release);
290            self.shared.metrics.record_pops(count as u64);
291        }
292
293        count
294    }
295
296    /// Returns the number of items currently in the queue.
297    pub fn len(&self) -> usize {
298        let tail = self.shared.tail.value.load(Ordering::Relaxed);
299        let head = self.shared.head.value.load(Ordering::Relaxed);
300        tail.wrapping_sub(head) as usize
301    }
302
303    /// Returns `true` if the queue is empty.
304    pub fn is_empty(&self) -> bool {
305        self.len() == 0
306    }
307
308    /// Returns a reference to the shared metrics.
309    pub fn metrics(&self) -> &BridgeMetrics {
310        &self.shared.metrics
311    }
312}
313
314impl<T> Drop for Producer<T> {
315    fn drop(&mut self) {
316        self.shared.disconnected.store(true, Ordering::Release);
317    }
318}
319
320impl<T> Drop for Consumer<T> {
321    fn drop(&mut self) {
322        self.shared.disconnected.store(true, Ordering::Release);
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn basic_push_pop() {
332        let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
333
334        tx.try_push(1).unwrap();
335        tx.try_push(2).unwrap();
336        tx.try_push(3).unwrap();
337        tx.try_push(4).unwrap();
338
339        // Buffer should be full (capacity rounds to 4).
340        assert!(matches!(tx.try_push(5), Err(BridgeError::Full { .. })));
341
342        assert_eq!(rx.try_pop().unwrap(), 1);
343        assert_eq!(rx.try_pop().unwrap(), 2);
344
345        // Now there's room for more.
346        tx.try_push(5).unwrap();
347        tx.try_push(6).unwrap();
348
349        assert_eq!(rx.try_pop().unwrap(), 3);
350        assert_eq!(rx.try_pop().unwrap(), 4);
351        assert_eq!(rx.try_pop().unwrap(), 5);
352        assert_eq!(rx.try_pop().unwrap(), 6);
353
354        assert!(matches!(rx.try_pop(), Err(BridgeError::Empty)));
355    }
356
357    #[test]
358    fn power_of_two_rounding() {
359        let (tx, _rx) = RingBuffer::channel::<u64>(3);
360        assert_eq!(tx.capacity(), 4);
361
362        let (tx, _rx) = RingBuffer::channel::<u64>(5);
363        assert_eq!(tx.capacity(), 8);
364
365        let (tx, _rx) = RingBuffer::channel::<u64>(8);
366        assert_eq!(tx.capacity(), 8);
367    }
368
369    #[test]
370    fn utilization_tracking() {
371        let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
372
373        assert_eq!(tx.utilization(), 0);
374
375        for i in 0..6 {
376            tx.try_push(i).unwrap();
377        }
378        assert_eq!(tx.utilization(), 75);
379
380        rx.try_pop().unwrap();
381        rx.try_pop().unwrap();
382        assert_eq!(tx.utilization(), 50);
383    }
384
385    #[test]
386    fn drain_into_batch() {
387        let (mut tx, mut rx) = RingBuffer::channel::<u64>(16);
388
389        for i in 0..10 {
390            tx.try_push(i).unwrap();
391        }
392
393        let mut buf = Vec::new();
394        let drained = rx.drain_into(&mut buf, 5);
395        assert_eq!(drained, 5);
396        assert_eq!(buf, vec![0, 1, 2, 3, 4]);
397
398        let drained = rx.drain_into(&mut buf, 100);
399        assert_eq!(drained, 5);
400        assert_eq!(buf, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
401    }
402
403    #[test]
404    fn disconnect_detection_producer_drops() {
405        let (tx, mut rx) = RingBuffer::channel::<u64>(4);
406        drop(tx);
407
408        assert!(matches!(
409            rx.try_pop(),
410            Err(BridgeError::Disconnected { side: "producer" })
411        ));
412    }
413
414    #[test]
415    fn disconnect_detection_consumer_drops() {
416        let (mut tx, rx) = RingBuffer::channel::<u64>(4);
417        drop(rx);
418
419        assert!(matches!(
420            tx.try_push(1),
421            Err(BridgeError::Disconnected { side: "consumer" })
422        ));
423    }
424
425    #[test]
426    fn wrapping_behavior() {
427        // Verify the ring buffer works correctly when counters wrap past capacity.
428        let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
429
430        for round in 0..1000u64 {
431            for i in 0..4 {
432                tx.try_push(round * 4 + i).unwrap();
433            }
434            for i in 0..4 {
435                assert_eq!(rx.try_pop().unwrap(), round * 4 + i);
436            }
437        }
438    }
439
440    #[test]
441    fn metrics_counting() {
442        let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
443
444        for i in 0..5 {
445            tx.try_push(i).unwrap();
446        }
447        assert_eq!(tx.metrics().pushes(), 5);
448
449        for _ in 0..3 {
450            rx.try_pop().unwrap();
451        }
452        assert_eq!(rx.metrics().pops(), 3);
453    }
454}