disruptor_rs/
barrier.rs

1//! Sequence barriers that control and coordinate consumer access to the ring buffer.
2//!
3//! # Processing Sequence Barrier
4//!
5//! A ProcessingSequenceBarrier acts as a coordination point between producers and consumers
6//! in the Disruptor pattern. It ensures that consumers only process events that are safe
7//! to consume based on dependencies and available sequences.
8//!
9//! ## Core Responsibilities
10//!
11//! 1. **Dependency Tracking**:
12//!    - Maintains a list of "gating sequences" that represent dependencies
13//!    - Ensures consumers don't read beyond the minimum available sequence across all dependencies
14//!
15//! 2. **Progress Control**:
16//!    - Blocks consumers until required sequences are available
17//!    - Implements the configured waiting strategy for efficient thread coordination
18//!
19//! 3. **Alert Handling**:
20//!    - Supports graceful shutdown through an alert mechanism
21//!    - Allows consumers to abort waiting when the system needs to stop
22//!
23//! ## Usage Example
24//! ```
25//! # use disruptor_rs::{
26//! #     sequence::AtomicSequence,
27//! #     ProcessingSequenceBarrier,
28//! #     waiting::BusySpinWaitStrategy,
29//! #     traits::SequenceBarrier,
30//! #     sequence::Sequence,
31//! # };
32//! # use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
33//!
34//! // Create a wrapper type for testing
35//! struct TestBarrier(ProcessingSequenceBarrier<BusySpinWaitStrategy>);
36//!
37//! // Implement SequenceBarrier for the wrapper type
38//! impl SequenceBarrier for TestBarrier {
39//!     fn wait_for(&self, sequence: Sequence) -> Option<Sequence> {
40//!         Some(sequence)
41//!     }
42//!     fn signal(&self) {}
43//! }
44//!
45//! let alert = Arc::new(AtomicBool::new(false));
46//! let seq = Arc::new(AtomicSequence::default());
47//! seq.set(5);
48//! let waiting_strategy = Arc::new(BusySpinWaitStrategy::default());
49//! let barrier = TestBarrier(ProcessingSequenceBarrier::new(
50//!     alert,
51//!     vec![seq],
52//!     waiting_strategy
53//! ));
54//! let sequence = barrier.wait_for(5);
55//! assert_eq!(sequence, Some(5));
56//! ```
57
58use std::sync::{
59    atomic::{AtomicBool, Ordering},
60    Arc,
61};
62
63use crate::{
64    sequence::{AtomicSequence, Sequence},
65    traits::{SequenceBarrier, WaitingStrategy},
66};
67
68/// A barrier that controls consumer access to the ring buffer based on available sequences
69/// and dependencies.
70pub struct ProcessingSequenceBarrier<W: WaitingStrategy> {
71    /// Alert flag for shutdown signaling
72    alert: Arc<AtomicBool>,
73    /// Sequences that must advance before consumption can proceed
74    gating_sequences: Vec<Arc<AtomicSequence>>,
75    /// Strategy determining how threads wait for sequences
76    waiting_strategy: Arc<W>,
77}
78
79impl<W: WaitingStrategy> ProcessingSequenceBarrier<W> {
80    /// Creates a new processing sequence barrier.
81    ///
82    /// # Parameters
83    /// - `alert`: Shutdown signal flag
84    /// - `gating_sequences`: Dependencies that must advance before consumption
85    /// - `waiting_strategy`: How threads should wait for sequences
86    pub fn new(
87        alert: Arc<AtomicBool>,
88        gating_sequences: Vec<Arc<AtomicSequence>>,
89        waiting_strategy: Arc<W>,
90    ) -> Self {
91        Self {
92            alert,
93            gating_sequences,
94            waiting_strategy,
95        }
96    }
97}
98
99impl<W: WaitingStrategy> SequenceBarrier for ProcessingSequenceBarrier<W> {
100    /// Waits for a specific sequence to become available.
101    fn wait_for(&self, sequence: Sequence) -> Option<Sequence> {
102        self.waiting_strategy
103            .wait_for(sequence, &self.gating_sequences, || {
104                self.alert.load(Ordering::Relaxed)
105            })
106    }
107
108    /// Signals waiting threads that new sequences may be available.
109    fn signal(&self) {
110        self.waiting_strategy.signal_all_when_blocking()
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use crate::waiting::BusySpinWaitStrategy;
118
119    #[test]
120    fn test_wait_for_returns_sequence_when_available() {
121        let alert = Arc::new(AtomicBool::new(false));
122        let seq = Arc::new(AtomicSequence::default());
123        seq.set(5);
124        let gating_sequences = vec![seq];
125        let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
126
127        let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
128
129        assert_eq!(barrier.wait_for(5), Some(5));
130    }
131
132    #[test]
133    fn test_wait_for_returns_none_when_alerted() {
134        let alert = Arc::new(AtomicBool::new(true));
135        let seq = Arc::new(AtomicSequence::default());
136        let gating_sequences = vec![seq];
137        let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
138
139        let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
140
141        assert_eq!(barrier.wait_for(5), None);
142    }
143
144    #[test]
145    fn test_signal_calls_waiting_strategy() {
146        let alert = Arc::new(AtomicBool::new(false));
147        let seq = Arc::new(AtomicSequence::default());
148        let gating_sequences = vec![seq];
149        let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
150
151        let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
152
153        barrier.signal(); // Just verify it doesn't panic
154    }
155
156    #[test]
157    fn test_multiple_gating_sequences() {
158        let alert = Arc::new(AtomicBool::new(false));
159        let seq1 = Arc::new(AtomicSequence::default());
160        let seq2 = Arc::new(AtomicSequence::default());
161        seq1.set(5);
162        seq2.set(3);
163        let gating_sequences = vec![seq1, seq2];
164        let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
165
166        let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
167
168        // Should wait for minimum sequence (3)
169        assert_eq!(barrier.wait_for(3), Some(3));
170    }
171}