nodedb-bridge 0.0.0

Lock-free SPSC ring buffer bridging Tokio (Send) and Thread-per-Core (! Send) runtimes
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
//! Lock-free SPSC ring buffer.
//!
//! The core data structure: a fixed-capacity circular buffer with cache-line-padded
//! head (consumer) and tail (producer) counters. Only two atomic operations per
//! enqueue/dequeue — one `Relaxed` load of the remote counter, one `Release`/`Acquire`
//! store of the local counter.
//!
//! ## Memory layout
//!
//! ```text
//! ┌─────────────────────────────────────────────────────┐
//! │  [CacheLine] tail (written by producer only)        │
//! │  [CacheLine] head (written by consumer only)        │
//! │  [capacity]  slot array (T values)                  │
//! └─────────────────────────────────────────────────────┘
//! ```
//!
//! The `tail` and `head` live on separate cache lines to prevent false sharing.
//! Capacity must be a power of two so we can use bitwise AND instead of modulo.

use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};

use crate::error::{BridgeError, Result};
use crate::metrics::BridgeMetrics;

/// Cache line size on x86_64 and most aarch64 implementations.
const CACHE_LINE: usize = 64;

/// Shared state between producer and consumer.
///
/// The producer writes `tail`, reads `head`.
/// The consumer writes `head`, reads `tail`.
///
/// Both counters are monotonically increasing u64 values. The actual slot index
/// is `counter & (capacity - 1)` (bitwise AND because capacity is power-of-two).
struct Shared<T> {
    /// Producer's write cursor. Padded to its own cache line.
    tail: CacheAligned<AtomicU64>,

    /// Consumer's read cursor. Padded to its own cache line.
    head: CacheAligned<AtomicU64>,

    /// Fixed-size slot array.
    slots: Box<[UnsafeCell<Option<T>>]>,

    /// Capacity (always power of two).
    capacity: usize,

    /// Bitmask: `capacity - 1` for fast modulo.
    mask: usize,

    /// Set to true when either side is dropped.
    disconnected: AtomicBool,

    /// Shared metrics counters.
    metrics: BridgeMetrics,
}

// SAFETY: The SPSC protocol ensures that producer and consumer never access
// the same slot concurrently. The producer writes slots[tail & mask] and the
// consumer reads slots[head & mask]. As long as the ring buffer is not full,
// these indices never collide. The atomic head/tail counters enforce this.
unsafe impl<T: Send> Send for Shared<T> {}
unsafe impl<T: Send> Sync for Shared<T> {}

/// Cache-line-aligned wrapper to prevent false sharing between head and tail.
#[repr(align(64))]
struct CacheAligned<T> {
    value: T,
    _pad: [u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
}

impl<T: Default> CacheAligned<T> {
    fn new(value: T) -> Self {
        Self {
            value,
            _pad: [0u8; CACHE_LINE - std::mem::size_of::<AtomicU64>()],
        }
    }
}

/// The producer half of the ring buffer.
///
/// Lives on the **Control Plane** (Tokio). It is `Send` so it can be moved
/// between Tokio worker threads.
pub struct Producer<T> {
    shared: Arc<Shared<T>>,

    /// Cached copy of `head` to avoid atomic loads on every enqueue.
    /// Only refreshed when the ring appears full.
    cached_head: u64,
}

/// The consumer half of the ring buffer.
///
/// Lives on the **Data Plane** (TPC). It is `Send` so it can be sent to
/// the TPC core during setup, but once pinned to a core it stays there.
pub struct Consumer<T> {
    shared: Arc<Shared<T>>,

    /// Cached copy of `tail` to avoid atomic loads on every dequeue.
    /// Only refreshed when the ring appears empty.
    cached_tail: u64,
}

/// Create a new SPSC ring buffer with the given capacity.
///
/// Capacity is rounded up to the next power of two.
///
/// Returns `(Producer, Consumer)` — send the producer to the Control Plane
/// and the consumer to the Data Plane.
pub struct RingBuffer;

impl RingBuffer {
    pub fn channel<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
        assert!(capacity > 0, "ring buffer capacity must be > 0");

        let capacity = capacity.next_power_of_two();
        let mask = capacity - 1;

        let mut slots = Vec::with_capacity(capacity);
        for _ in 0..capacity {
            slots.push(UnsafeCell::new(None));
        }

        let shared = Arc::new(Shared {
            tail: CacheAligned::new(AtomicU64::new(0)),
            head: CacheAligned::new(AtomicU64::new(0)),
            slots: slots.into_boxed_slice(),
            capacity,
            mask,
            disconnected: AtomicBool::new(false),
            metrics: BridgeMetrics::new(),
        });

        let producer = Producer {
            shared: Arc::clone(&shared),
            cached_head: 0,
        };

        let consumer = Consumer {
            shared,
            cached_tail: 0,
        };

        (producer, consumer)
    }
}

impl<T> Producer<T> {
    /// Try to enqueue a value. Returns `Err(BridgeError::Full)` if the ring is full,
    /// or `Err(BridgeError::Disconnected)` if the consumer was dropped.
    pub fn try_push(&mut self, value: T) -> Result<()> {
        if self.shared.disconnected.load(Ordering::Relaxed) {
            return Err(BridgeError::Disconnected { side: "consumer" });
        }

        let tail = self.shared.tail.value.load(Ordering::Relaxed);

        // Fast path: check cached head first (avoids atomic load).
        if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
            // Slow path: refresh cached head from the atomic.
            self.cached_head = self.shared.head.value.load(Ordering::Acquire);

            if tail.wrapping_sub(self.cached_head) >= self.shared.capacity as u64 {
                self.shared.metrics.record_full();
                return Err(BridgeError::Full {
                    capacity: self.shared.capacity,
                    pending: (tail.wrapping_sub(self.cached_head)) as usize,
                });
            }
        }

        let idx = (tail as usize) & self.shared.mask;

        // SAFETY: We have exclusive write access to this slot because:
        // 1. We are the only producer (SPSC).
        // 2. The consumer's head hasn't reached this slot yet (checked above).
        unsafe {
            (*self.shared.slots[idx].get()) = Some(value);
        }

        // Make the value visible to the consumer.
        self.shared
            .tail
            .value
            .store(tail.wrapping_add(1), Ordering::Release);

        self.shared.metrics.record_push();
        Ok(())
    }

    /// Returns the current queue utilization as a percentage (0-100).
    pub fn utilization(&self) -> u8 {
        let tail = self.shared.tail.value.load(Ordering::Relaxed);
        let head = self.shared.head.value.load(Ordering::Relaxed);
        let pending = tail.wrapping_sub(head) as usize;
        ((pending * 100) / self.shared.capacity) as u8
    }

    /// Returns the number of items currently in the queue.
    pub fn len(&self) -> usize {
        let tail = self.shared.tail.value.load(Ordering::Relaxed);
        let head = self.shared.head.value.load(Ordering::Relaxed);
        tail.wrapping_sub(head) as usize
    }

    /// Returns `true` if the queue is empty.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Returns the fixed capacity of the ring buffer.
    pub fn capacity(&self) -> usize {
        self.shared.capacity
    }

    /// Returns a reference to the shared metrics.
    pub fn metrics(&self) -> &BridgeMetrics {
        &self.shared.metrics
    }
}

impl<T> Consumer<T> {
    /// Try to dequeue a value. Returns `Err(BridgeError::Empty)` if the ring is empty,
    /// or `Err(BridgeError::Disconnected)` if the producer was dropped.
    pub fn try_pop(&mut self) -> Result<T> {
        let head = self.shared.head.value.load(Ordering::Relaxed);

        // Fast path: check cached tail first.
        if head == self.cached_tail {
            // Slow path: refresh cached tail from the atomic.
            self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);

            if head == self.cached_tail {
                if self.shared.disconnected.load(Ordering::Relaxed) {
                    return Err(BridgeError::Disconnected { side: "producer" });
                }
                return Err(BridgeError::Empty);
            }
        }

        let idx = (head as usize) & self.shared.mask;

        // SAFETY: We have exclusive read access to this slot because:
        // 1. We are the only consumer (SPSC).
        // 2. The producer has already written to this slot (tail > head, checked above).
        let value = unsafe { (*self.shared.slots[idx].get()).take() };

        // Advance head to free the slot for the producer.
        self.shared
            .head
            .value
            .store(head.wrapping_add(1), Ordering::Release);

        self.shared.metrics.record_pop();

        // SAFETY: The producer wrote `Some(value)` before advancing tail.
        // We only reach here when tail > head, so the slot is guaranteed occupied.
        Ok(value.expect("BUG: slot was None despite tail > head"))
    }

    /// Drain up to `max` items into the provided vector. Returns the count drained.
    ///
    /// More efficient than calling `try_pop` in a loop because it batches the
    /// atomic tail load.
    pub fn drain_into(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
        let head = self.shared.head.value.load(Ordering::Relaxed);
        self.cached_tail = self.shared.tail.value.load(Ordering::Acquire);

        let available = self.cached_tail.wrapping_sub(head) as usize;
        let count = available.min(max);

        for i in 0..count {
            let idx = ((head.wrapping_add(i as u64)) as usize) & self.shared.mask;
            // SAFETY: same as try_pop — we've verified these slots are occupied.
            let value = unsafe { (*self.shared.slots[idx].get()).take() };
            buf.push(value.expect("BUG: slot was None during drain"));
        }

        if count > 0 {
            self.shared
                .head
                .value
                .store(head.wrapping_add(count as u64), Ordering::Release);
            self.shared.metrics.record_pops(count as u64);
        }

        count
    }

    /// Returns the number of items currently in the queue.
    pub fn len(&self) -> usize {
        let tail = self.shared.tail.value.load(Ordering::Relaxed);
        let head = self.shared.head.value.load(Ordering::Relaxed);
        tail.wrapping_sub(head) as usize
    }

    /// Returns `true` if the queue is empty.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Returns a reference to the shared metrics.
    pub fn metrics(&self) -> &BridgeMetrics {
        &self.shared.metrics
    }
}

impl<T> Drop for Producer<T> {
    fn drop(&mut self) {
        self.shared.disconnected.store(true, Ordering::Release);
    }
}

impl<T> Drop for Consumer<T> {
    fn drop(&mut self) {
        self.shared.disconnected.store(true, Ordering::Release);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn basic_push_pop() {
        let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);

        tx.try_push(1).unwrap();
        tx.try_push(2).unwrap();
        tx.try_push(3).unwrap();
        tx.try_push(4).unwrap();

        // Buffer should be full (capacity rounds to 4).
        assert!(matches!(tx.try_push(5), Err(BridgeError::Full { .. })));

        assert_eq!(rx.try_pop().unwrap(), 1);
        assert_eq!(rx.try_pop().unwrap(), 2);

        // Now there's room for more.
        tx.try_push(5).unwrap();
        tx.try_push(6).unwrap();

        assert_eq!(rx.try_pop().unwrap(), 3);
        assert_eq!(rx.try_pop().unwrap(), 4);
        assert_eq!(rx.try_pop().unwrap(), 5);
        assert_eq!(rx.try_pop().unwrap(), 6);

        assert!(matches!(rx.try_pop(), Err(BridgeError::Empty)));
    }

    #[test]
    fn power_of_two_rounding() {
        let (tx, _rx) = RingBuffer::channel::<u64>(3);
        assert_eq!(tx.capacity(), 4);

        let (tx, _rx) = RingBuffer::channel::<u64>(5);
        assert_eq!(tx.capacity(), 8);

        let (tx, _rx) = RingBuffer::channel::<u64>(8);
        assert_eq!(tx.capacity(), 8);
    }

    #[test]
    fn utilization_tracking() {
        let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);

        assert_eq!(tx.utilization(), 0);

        for i in 0..6 {
            tx.try_push(i).unwrap();
        }
        assert_eq!(tx.utilization(), 75);

        rx.try_pop().unwrap();
        rx.try_pop().unwrap();
        assert_eq!(tx.utilization(), 50);
    }

    #[test]
    fn drain_into_batch() {
        let (mut tx, mut rx) = RingBuffer::channel::<u64>(16);

        for i in 0..10 {
            tx.try_push(i).unwrap();
        }

        let mut buf = Vec::new();
        let drained = rx.drain_into(&mut buf, 5);
        assert_eq!(drained, 5);
        assert_eq!(buf, vec![0, 1, 2, 3, 4]);

        let drained = rx.drain_into(&mut buf, 100);
        assert_eq!(drained, 5);
        assert_eq!(buf, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
    }

    #[test]
    fn disconnect_detection_producer_drops() {
        let (tx, mut rx) = RingBuffer::channel::<u64>(4);
        drop(tx);

        assert!(matches!(
            rx.try_pop(),
            Err(BridgeError::Disconnected { side: "producer" })
        ));
    }

    #[test]
    fn disconnect_detection_consumer_drops() {
        let (mut tx, rx) = RingBuffer::channel::<u64>(4);
        drop(rx);

        assert!(matches!(
            tx.try_push(1),
            Err(BridgeError::Disconnected { side: "consumer" })
        ));
    }

    #[test]
    fn wrapping_behavior() {
        // Verify the ring buffer works correctly when counters wrap past capacity.
        let (mut tx, mut rx) = RingBuffer::channel::<u64>(4);

        for round in 0..1000u64 {
            for i in 0..4 {
                tx.try_push(round * 4 + i).unwrap();
            }
            for i in 0..4 {
                assert_eq!(rx.try_pop().unwrap(), round * 4 + i);
            }
        }
    }

    #[test]
    fn metrics_counting() {
        let (mut tx, mut rx) = RingBuffer::channel::<u64>(8);

        for i in 0..5 {
            tx.try_push(i).unwrap();
        }
        assert_eq!(tx.metrics().pushes(), 5);

        for _ in 0..3 {
            rx.try_pop().unwrap();
        }
        assert_eq!(rx.metrics().pops(), 3);
    }
}