Skip to main content

dynomite/core/
ring_queue.rs

1//! Cross-thread ring queues used between the worker, gossip, and stats
2//! threads.
3//!
4//! The C engine uses two SPSC ring buffers, `C2G_InQ` and `C2G_OutQ`,
5//! sized at 256 slots each. The Rust port replaces them with bounded
6//! [`crossbeam_channel`] pairs of the same capacity. The `RingChannels`
7//! struct owns both directions (core -> gossip and gossip -> core), so
8//! callers receive a single value to wire into their thread spawning
9//! code.
10//!
11//! # Examples
12//!
13//! ```
14//! use dynomite::core::ring_queue::RingChannels;
15//!
16//! let chans: RingChannels<u32, u32> = RingChannels::new();
17//! chans.in_tx.send(7).unwrap();
18//! assert_eq!(chans.in_rx.recv().unwrap(), 7);
19//! chans.out_tx.send(11).unwrap();
20//! assert_eq!(chans.out_rx.recv().unwrap(), 11);
21//! ```
22
23use crossbeam_channel::{bounded, Receiver, Sender};
24
25/// Maximum in-flight ring messages for the core -> gossip direction.
26///
27/// # Examples
28///
29/// ```
30/// use dynomite::core::ring_queue::C2G_IN_CAPACITY;
31/// assert_eq!(C2G_IN_CAPACITY, 256);
32/// ```
33pub const C2G_IN_CAPACITY: usize = 256;
34
35/// Maximum in-flight ring messages for the gossip -> core direction.
36///
37/// # Examples
38///
39/// ```
40/// use dynomite::core::ring_queue::C2G_OUT_CAPACITY;
41/// assert_eq!(C2G_OUT_CAPACITY, 256);
42/// ```
43pub const C2G_OUT_CAPACITY: usize = 256;
44
45/// A pair of bounded channels that carry the core <-> gossip ring
46/// traffic.
47///
48/// `I` is the message type the core thread sends *to* the secondary
49/// thread (typically gossip); `O` is the message type the secondary
50/// thread sends *back*.
51#[derive(Debug, Clone)]
52pub struct RingChannels<I, O> {
53    /// Sender owned by the producer of inbound messages.
54    pub in_tx: Sender<I>,
55    /// Receiver owned by the consumer of inbound messages.
56    pub in_rx: Receiver<I>,
57    /// Sender owned by the producer of outbound replies.
58    pub out_tx: Sender<O>,
59    /// Receiver owned by the consumer of outbound replies.
60    pub out_rx: Receiver<O>,
61}
62
63impl<I, O> RingChannels<I, O> {
64    /// Create a new pair sized at the C2G defaults.
65    ///
66    /// # Examples
67    ///
68    /// ```
69    /// use dynomite::core::ring_queue::RingChannels;
70    /// let _: RingChannels<(), ()> = RingChannels::new();
71    /// ```
72    pub fn new() -> Self {
73        Self::with_capacities(C2G_IN_CAPACITY, C2G_OUT_CAPACITY)
74    }
75
76    /// Create a new pair with explicit capacities. Useful for tests
77    /// and downstream stages that wire smaller queues.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use dynomite::core::ring_queue::RingChannels;
83    /// let chans: RingChannels<u32, ()> = RingChannels::with_capacities(2, 1);
84    /// chans.in_tx.send(1).unwrap();
85    /// chans.in_tx.send(2).unwrap();
86    /// assert!(chans.in_tx.try_send(3).is_err());
87    /// ```
88    pub fn with_capacities(in_cap: usize, out_cap: usize) -> Self {
89        let (in_tx, in_rx) = bounded(in_cap);
90        let (out_tx, out_rx) = bounded(out_cap);
91        Self {
92            in_tx,
93            in_rx,
94            out_tx,
95            out_rx,
96        }
97    }
98}
99
100impl<I, O> Default for RingChannels<I, O> {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn capacities_match_c_constants() {
112        assert_eq!(C2G_IN_CAPACITY, 256);
113        assert_eq!(C2G_OUT_CAPACITY, 256);
114    }
115
116    #[test]
117    fn fifo_ordering_is_preserved() {
118        let chans: RingChannels<u32, ()> = RingChannels::new();
119        for i in 0..16u32 {
120            chans.in_tx.send(i).unwrap();
121        }
122        for i in 0..16u32 {
123            assert_eq!(chans.in_rx.recv().unwrap(), i);
124        }
125    }
126
127    #[test]
128    fn bounded_capacity_blocks_when_full() {
129        let chans: RingChannels<u32, ()> = RingChannels::with_capacities(2, 1);
130        chans.in_tx.send(1).unwrap();
131        chans.in_tx.send(2).unwrap();
132        // The third send must hit the bound and fail under a non-blocking try.
133        assert!(chans.in_tx.try_send(3).is_err());
134    }
135}