dynomite/core/ring_queue.rs
1//! Cross-thread ring queues used between the worker, gossip, and stats
2//! threads.
3//!
4//! The C engine uses two SPSC ring buffers, `C2G_InQ` and `C2G_OutQ`,
5//! sized at 256 slots each. The Rust port replaces them with bounded
6//! [`crossbeam_channel`] pairs of the same capacity. The `RingChannels`
7//! struct owns both directions (core -> gossip and gossip -> core), so
8//! callers receive a single value to wire into their thread spawning
9//! code.
10//!
11//! # Examples
12//!
13//! ```
14//! use dynomite::core::ring_queue::RingChannels;
15//!
16//! let chans: RingChannels<u32, u32> = RingChannels::new();
17//! chans.in_tx.send(7).unwrap();
18//! assert_eq!(chans.in_rx.recv().unwrap(), 7);
19//! chans.out_tx.send(11).unwrap();
20//! assert_eq!(chans.out_rx.recv().unwrap(), 11);
21//! ```
22
23use crossbeam_channel::{bounded, Receiver, Sender};
24
25/// Maximum in-flight ring messages for the core -> gossip direction.
26///
27/// # Examples
28///
29/// ```
30/// use dynomite::core::ring_queue::C2G_IN_CAPACITY;
31/// assert_eq!(C2G_IN_CAPACITY, 256);
32/// ```
33pub const C2G_IN_CAPACITY: usize = 256;
34
35/// Maximum in-flight ring messages for the gossip -> core direction.
36///
37/// # Examples
38///
39/// ```
40/// use dynomite::core::ring_queue::C2G_OUT_CAPACITY;
41/// assert_eq!(C2G_OUT_CAPACITY, 256);
42/// ```
43pub const C2G_OUT_CAPACITY: usize = 256;
44
45/// A pair of bounded channels that carry the core <-> gossip ring
46/// traffic.
47///
48/// `I` is the message type the core thread sends *to* the secondary
49/// thread (typically gossip); `O` is the message type the secondary
50/// thread sends *back*.
51#[derive(Debug, Clone)]
52pub struct RingChannels<I, O> {
53 /// Sender owned by the producer of inbound messages.
54 pub in_tx: Sender<I>,
55 /// Receiver owned by the consumer of inbound messages.
56 pub in_rx: Receiver<I>,
57 /// Sender owned by the producer of outbound replies.
58 pub out_tx: Sender<O>,
59 /// Receiver owned by the consumer of outbound replies.
60 pub out_rx: Receiver<O>,
61}
62
63impl<I, O> RingChannels<I, O> {
64 /// Create a new pair sized at the C2G defaults.
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// use dynomite::core::ring_queue::RingChannels;
70 /// let _: RingChannels<(), ()> = RingChannels::new();
71 /// ```
72 pub fn new() -> Self {
73 Self::with_capacities(C2G_IN_CAPACITY, C2G_OUT_CAPACITY)
74 }
75
76 /// Create a new pair with explicit capacities. Useful for tests
77 /// and downstream stages that wire smaller queues.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// use dynomite::core::ring_queue::RingChannels;
83 /// let chans: RingChannels<u32, ()> = RingChannels::with_capacities(2, 1);
84 /// chans.in_tx.send(1).unwrap();
85 /// chans.in_tx.send(2).unwrap();
86 /// assert!(chans.in_tx.try_send(3).is_err());
87 /// ```
88 pub fn with_capacities(in_cap: usize, out_cap: usize) -> Self {
89 let (in_tx, in_rx) = bounded(in_cap);
90 let (out_tx, out_rx) = bounded(out_cap);
91 Self {
92 in_tx,
93 in_rx,
94 out_tx,
95 out_rx,
96 }
97 }
98}
99
100impl<I, O> Default for RingChannels<I, O> {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 #[test]
111 fn capacities_match_c_constants() {
112 assert_eq!(C2G_IN_CAPACITY, 256);
113 assert_eq!(C2G_OUT_CAPACITY, 256);
114 }
115
116 #[test]
117 fn fifo_ordering_is_preserved() {
118 let chans: RingChannels<u32, ()> = RingChannels::new();
119 for i in 0..16u32 {
120 chans.in_tx.send(i).unwrap();
121 }
122 for i in 0..16u32 {
123 assert_eq!(chans.in_rx.recv().unwrap(), i);
124 }
125 }
126
127 #[test]
128 fn bounded_capacity_blocks_when_full() {
129 let chans: RingChannels<u32, ()> = RingChannels::with_capacities(2, 1);
130 chans.in_tx.send(1).unwrap();
131 chans.in_tx.send(2).unwrap();
132 // The third send must hit the bound and fail under a non-blocking try.
133 assert!(chans.in_tx.try_send(3).is_err());
134 }
135}