Skip to main content

nodedb_bridge/
buffer.rs

1//! Lock-free SPSC ring buffer.
2//!
3//! The core data structure: a fixed-capacity circular buffer with cache-line-padded
4//! head (consumer) and tail (producer) counters. Only two atomic operations per
5//! enqueue/dequeue — one `Relaxed` load of the remote counter, one `Release`/`Acquire`
6//! store of the local counter.
7//!
8//! ## Memory layout
9//!
10//! ```text
11//! ┌─────────────────────────────────────────────────────┐
12//! │  [CacheLine] tail (written by producer only)        │
13//! │  [CacheLine] head (written by consumer only)        │
14//! │  [capacity]  slot array (T values)                  │
15//! └─────────────────────────────────────────────────────┘
16//! ```
17//!
18//! The `tail` and `head` live on separate cache lines to prevent false sharing.
19//! Capacity must be a power of two so we can use bitwise AND instead of modulo.
20
21use std::cell::UnsafeCell;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24
25use crate::error::{BridgeError, Result};
26use crate::metrics::BridgeMetrics;
27
28/// Cache line size on x86_64 and most aarch64 implementations.
29const CACHE_LINE: usize = 64;
30
31/// Shared state between producer and consumer.
32///
33/// The producer writes `tail`, reads `head`.
34/// The consumer writes `head`, reads `tail`.
35///
36/// Both counters are monotonically increasing u64 values. The actual slot index
37/// is `counter & (capacity - 1)` (bitwise AND because capacity is power-of-two).
38struct Shared<T> {
39    /// Producer's write cursor. Padded to its own cache line.
40    tail: CacheAligned<AtomicU64>,
41
42    /// Consumer's read cursor. Padded to its own cache line.
43    head: CacheAligned<AtomicU64>,
44
45    /// Fixed-size slot array.
46    slots: Box<[UnsafeCell<Option<T>>]>,
47
48    /// Capacity (always power of two).
49    capacity: usize,
50
51    /// Bitmask: `capacity - 1` for fast modulo.
52    mask: usize,
53
54    /// Set to true when either side is dropped.
55    disconnected: AtomicBool,
56
57    /// Shared metrics counters.
58    metrics: BridgeMetrics,
59}
60
61// SAFETY: The SPSC protocol ensures that producer and consumer never access
62// the same slot concurrently. The producer writes slots[tail & mask] and the
63// consumer reads slots[head & mask]. As long as the ring buffer is not full,
64// these indices never collide. The atomic head/tail counters enforce this.
65unsafe impl<T: Send> Send for Shared<T> {}
66unsafe impl<T: Send> Sync for Shared<T> {}
67
68/// Cache-line-aligned wrapper to prevent false sharing between head and tail.
69#[repr(align(64))]
70struct CacheAligned<T> {
71    value: T,
72    _pad: [u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
73}
74
75impl<T: Default> CacheAligned<T> {
76    fn new(value: T) -> Self {
77        Self {
78            value,
79            _pad: [0u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
80        }
81    }
82}
83
84/// The producer half of the ring buffer.
85///
86/// Lives on the **Control Plane** (Tokio). It is `Send` so it can be moved
87/// between Tokio worker threads.
88pub struct Producer<T> {
89    shared: Arc<Shared<T>>,
90
91    /// Cached copy of `head` to avoid atomic loads on every enqueue.
92    /// Only refreshed when the ring appears full.
93    cached_head: u64,
94}
95
96/// The consumer half of the ring buffer.
97///
98/// Lives on the **Data Plane** (TPC). It is `Send` so it can be sent to
99/// the TPC core during setup, but once pinned to a core it stays there.
100pub struct Consumer<T> {
101    shared: Arc<Shared<T>>,
102
103    /// Cached copy of `tail` to avoid atomic loads on every dequeue.
104    /// Only refreshed when the ring appears empty.
105    cached_tail: u64,
106}
107
108/// Create a new SPSC ring buffer with the given capacity.
109///
110/// Capacity is rounded up to the next power of two.
111///
112/// Returns `(Producer, Consumer)` — send the producer to the Control Plane
113/// and the consumer to the Data Plane.
114pub struct RingBuffer;
115
116impl RingBuffer {
117    pub fn channel<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
118        assert!(capacity > 0, "ring buffer capacity must be > 0");
119
120        let capacity = capacity.next_power_of_two();
121        let mask = capacity - 1;
122
123        let mut slots = Vec::with_capacity(capacity);
124        for _ in 0..capacity {
125            slots.push(UnsafeCell::new(None));
126        }
127
128        let shared = Arc::new(Shared {
129            tail: CacheAligned::new(AtomicU64::new(0)),
130            head: CacheAligned::new(AtomicU64::new(0)),
131            slots: slots.into_boxed_slice(),
132            capacity,
133            mask,
134            disconnected: AtomicBool::new(false),
135            metrics: BridgeMetrics::new(),
136        });
137
138        let producer = Producer {
139            shared: Arc::clone(&shared),
140            cached_head: 0,
141        };
142
143        let consumer = Consumer {
144            shared,
145            cached_tail: 0,
146        };
147
148        (producer, consumer)
149    }
150}
151
152impl<T> Producer<T> {
153    /// Try to enqueue a value. Returns `Err(BridgeError::Full)` if the ring is full,
154    /// or `Err(BridgeError::Disconnected)` if the consumer was dropped.
155    pub fn try_push(&mut self, value: T) -> Result<()> {
156        if self.shared.disconnected.load(Ordering::Relaxed) {
157            return Err(BridgeError::Disconnected { side: "consumer" });
158        }
159
160        let tail = self.shared.tail.value.load(Ordering::Relaxed);
161
162        // Fast path: check cached head first (avoids atomic load).
163        if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
164            // Slow path: refresh cached head from the atomic.
165            self.cached_head = self.shared.head.value.load(Ordering::Acquire);
166
167            if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
168                self.shared.metrics.record_full();
169                return Err(BridgeError::Full {
170                    capacity: self.shared.capacity,
171                    pending: (tail.wrapping_sub(self.cached_head)) as usize,
172                });
173            }
174        }
175
176        let idx = (tail as usize) & self.shared.mask;
177
178        // SAFETY: We have exclusive write access to this slot because:
179        // 1. We are the only producer (SPSC).
180        // 2. The consumer's head hasn't reached this slot yet (checked above).
181        unsafe {
182            (*self.shared.slots[idx].get()) = Some(value);
183        }
184
185        // Make the value visible to the consumer.
186        self.shared
187            .tail
188            .value
189            .store(tail.wrapping_add(1), Ordering::Release);
190
191        self.shared.metrics.record_push();
192        Ok(())
193    }
194
195    /// Returns the current queue utilization as a percentage (0-100).
196    pub fn utilization(&self) -> u8 {
197        let tail = self.shared.tail.value.load(Ordering::Relaxed);
198        let head = self.shared.head.value.load(Ordering::Relaxed);
199        let pending = tail.wrapping_sub(head) as usize;
200        ((pending * 100) / self.shared.capacity) as u8
201    }
202
203    /// Returns the number of items currently in the queue.
204    pub fn len(&self) -> usize {
205        let tail = self.shared.tail.value.load(Ordering::Relaxed);
206        let head = self.shared.head.value.load(Ordering::Relaxed);
207        tail.wrapping_sub(head) as usize
208    }
209
210    /// Returns `true` if the queue is empty.
211    pub fn is_empty(&self) -> bool {
212        self.len() == 0
213    }
214
215    /// Returns the fixed capacity of the ring buffer.
216    pub fn capacity(&self) -> usize {
217        self.shared.capacity
218    }
219
220    /// Returns a reference to the shared metrics.
221    pub fn metrics(&self) -> &BridgeMetrics {
222        &self.shared.metrics
223    }
224}
225
226impl<T> Consumer<T> {
227    /// Try to dequeue a value. Returns `Err(BridgeError::Empty)` if the ring is empty,
228    /// or `Err(BridgeError::Disconnected)` if the producer was dropped.
229    pub fn try_pop(&mut self) -> Result<T> {
230        let head = self.shared.head.value.load(Ordering::Relaxed);
231
232        // Fast path: check cached tail first.
233        if head == self.cached_tail {
234            // Slow path: refresh cached tail from the atomic.
235            self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
236
237            if head == self.cached_tail {
238                if self.shared.disconnected.load(Ordering::Relaxed) {
239                    return Err(BridgeError::Disconnected { side: "producer" });
240                }
241                return Err(BridgeError::Empty);
242            }
243        }
244
245        let idx = (head as usize) & self.shared.mask;
246
247        // SAFETY: We have exclusive read access to this slot because:
248        // 1. We are the only consumer (SPSC).
249        // 2. The producer has already written to this slot (tail > head, checked above).
250        let value = unsafe { (*self.shared.slots[idx].get()).take() };
251
252        // Advance head to free the slot for the producer.
253        self.shared
254            .head
255            .value
256            .store(head.wrapping_add(1), Ordering::Release);
257
258        self.shared.metrics.record_pop();
259
260        // SAFETY: The producer wrote `Some(value)` before advancing tail.
261        // We only reach here when tail > head, so the slot is guaranteed occupied.
262        Ok(value.expect("BUG: slot was None despite tail > head"))
263    }
264
265    /// Drain up to `max` items into the provided vector. Returns the count drained.
266    ///
267    /// More efficient than calling `try_pop` in a loop because it batches the
268    /// atomic tail load.
269    pub fn drain_into(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
270        let head = self.shared.head.value.load(Ordering::Relaxed);
271        self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);
272
273        let available = self.cached_tail.wrapping_sub(head) as usize;
274        let count = available.min(max);
275
276        for i in 0..count {
277            let idx = ((head.wrapping_add(i as u64)) as usize) & self.shared.mask;
278            // SAFETY: same as try_pop — we've verified these slots are occupied.
279            let value = unsafe { (*self.shared.slots[idx].get()).take() };
280            buf.push(value.expect("BUG: slot was None during drain"));
281        }
282
283        if count > 0 {
284            self.shared
285                .head
286                .value
287                .store(head.wrapping_add(count as u64), Ordering::Release);
288            self.shared.metrics.record_pops(count as u64);
289        }
290
291        count
292    }
293
294    /// Returns the number of items currently in the queue.
295    pub fn len(&self) -> usize {
296        let tail = self.shared.tail.value.load(Ordering::Relaxed);
297        let head = self.shared.head.value.load(Ordering::Relaxed);
298        tail.wrapping_sub(head) as usize
299    }
300
301    /// Returns `true` if the queue is empty.
302    pub fn is_empty(&self) -> bool {
303        self.len() == 0
304    }
305
306    /// Returns a reference to the shared metrics.
307    pub fn metrics(&self) -> &BridgeMetrics {
308        &self.shared.metrics
309    }
310}
311
312impl<T> Drop for Producer<T> {
313    fn drop(&mut self) {
314        self.shared.disconnected.store(true, Ordering::Release);
315    }
316}
317
318impl<T> Drop for Consumer<T> {
319    fn drop(&mut self) {
320        self.shared.disconnected.store(true, Ordering::Release);
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn basic_push_pop() {
330        let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
331
332        tx.try_push(1).unwrap();
333        tx.try_push(2).unwrap();
334        tx.try_push(3).unwrap();
335        tx.try_push(4).unwrap();
336
337        // Buffer should be full (capacity rounds to 4).
338        assert!(matches!(tx.try_push(5), Err(BridgeError::Full { .. })));
339
340        assert_eq!(rx.try_pop().unwrap(), 1);
341        assert_eq!(rx.try_pop().unwrap(), 2);
342
343        // Now there's room for more.
344        tx.try_push(5).unwrap();
345        tx.try_push(6).unwrap();
346
347        assert_eq!(rx.try_pop().unwrap(), 3);
348        assert_eq!(rx.try_pop().unwrap(), 4);
349        assert_eq!(rx.try_pop().unwrap(), 5);
350        assert_eq!(rx.try_pop().unwrap(), 6);
351
352        assert!(matches!(rx.try_pop(), Err(BridgeError::Empty)));
353    }
354
355    #[test]
356    fn power_of_two_rounding() {
357        let (tx, _rx) = RingBuffer::channel::<u64>(3);
358        assert_eq!(tx.capacity(), 4);
359
360        let (tx, _rx) = RingBuffer::channel::<u64>(5);
361        assert_eq!(tx.capacity(), 8);
362
363        let (tx, _rx) = RingBuffer::channel::<u64>(8);
364        assert_eq!(tx.capacity(), 8);
365    }
366
367    #[test]
368    fn utilization_tracking() {
369        let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
370
371        assert_eq!(tx.utilization(), 0);
372
373        for i in 0..6 {
374            tx.try_push(i).unwrap();
375        }
376        assert_eq!(tx.utilization(), 75);
377
378        rx.try_pop().unwrap();
379        rx.try_pop().unwrap();
380        assert_eq!(tx.utilization(), 50);
381    }
382
383    #[test]
384    fn drain_into_batch() {
385        let (mut tx, mut rx) = RingBuffer::channel::<u64>(16);
386
387        for i in 0..10 {
388            tx.try_push(i).unwrap();
389        }
390
391        let mut buf = Vec::new();
392        let drained = rx.drain_into(&mut buf, 5);
393        assert_eq!(drained, 5);
394        assert_eq!(buf, vec![0, 1, 2, 3, 4]);
395
396        let drained = rx.drain_into(&mut buf, 100);
397        assert_eq!(drained, 5);
398        assert_eq!(buf, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
399    }
400
401    #[test]
402    fn disconnect_detection_producer_drops() {
403        let (tx, mut rx) = RingBuffer::channel::<u64>(4);
404        drop(tx);
405
406        assert!(matches!(
407            rx.try_pop(),
408            Err(BridgeError::Disconnected { side: "producer" })
409        ));
410    }
411
412    #[test]
413    fn disconnect_detection_consumer_drops() {
414        let (mut tx, rx) = RingBuffer::channel::<u64>(4);
415        drop(rx);
416
417        assert!(matches!(
418            tx.try_push(1),
419            Err(BridgeError::Disconnected { side: "consumer" })
420        ));
421    }
422
423    #[test]
424    fn wrapping_behavior() {
425        // Verify the ring buffer works correctly when counters wrap past capacity.
426        let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);
427
428        for round in 0..1000u64 {
429            for i in 0..4 {
430                tx.try_push(round * 4 + i).unwrap();
431            }
432            for i in 0..4 {
433                assert_eq!(rx.try_pop().unwrap(), round * 4 + i);
434            }
435        }
436    }
437
438    #[test]
439    fn metrics_counting() {
440        let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);
441
442        for i in 0..5 {
443            tx.try_push(i).unwrap();
444        }
445        assert_eq!(tx.metrics().pushes(), 5);
446
447        for _ in 0..3 {
448            rx.try_pop().unwrap();
449        }
450        assert_eq!(rx.metrics().pops(), 3);
451    }
452}