disruptor_rs/barrier.rs
1//! Sequence barriers that control and coordinate consumer access to the ring buffer.
2//!
3//! # Processing Sequence Barrier
4//!
5//! A ProcessingSequenceBarrier acts as a coordination point between producers and consumers
6//! in the Disruptor pattern. It ensures that consumers only process events that are safe
7//! to consume based on dependencies and available sequences.
8//!
9//! ## Core Responsibilities
10//!
11//! 1. **Dependency Tracking**:
12//! - Maintains a list of "gating sequences" that represent dependencies
13//! - Ensures consumers don't read beyond the minimum available sequence across all dependencies
14//!
15//! 2. **Progress Control**:
16//! - Blocks consumers until required sequences are available
17//! - Implements the configured waiting strategy for efficient thread coordination
18//!
19//! 3. **Alert Handling**:
20//! - Supports graceful shutdown through an alert mechanism
21//! - Allows consumers to abort waiting when the system needs to stop
22//!
23//! ## Usage Example
24//! ```
25//! # use disruptor_rs::{
26//! # sequence::AtomicSequence,
27//! # ProcessingSequenceBarrier,
28//! # waiting::BusySpinWaitStrategy,
29//! # traits::SequenceBarrier,
30//! # sequence::Sequence,
31//! # };
32//! # use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
33//!
34//! // Create a wrapper type for testing
35//! struct TestBarrier(ProcessingSequenceBarrier<BusySpinWaitStrategy>);
36//!
37//! // Implement SequenceBarrier for the wrapper type
38//! impl SequenceBarrier for TestBarrier {
39//! fn wait_for(&self, sequence: Sequence) -> Option<Sequence> {
40//! Some(sequence)
41//! }
42//! fn signal(&self) {}
43//! }
44//!
45//! let alert = Arc::new(AtomicBool::new(false));
46//! let seq = Arc::new(AtomicSequence::default());
47//! seq.set(5);
48//! let waiting_strategy = Arc::new(BusySpinWaitStrategy::default());
49//! let barrier = TestBarrier(ProcessingSequenceBarrier::new(
50//! alert,
51//! vec![seq],
52//! waiting_strategy
53//! ));
54//! let sequence = barrier.wait_for(5);
55//! assert_eq!(sequence, Some(5));
56//! ```
57
58use std::sync::{
59 atomic::{AtomicBool, Ordering},
60 Arc,
61};
62
63use crate::{
64 sequence::{AtomicSequence, Sequence},
65 traits::{SequenceBarrier, WaitingStrategy},
66};
67
68/// A barrier that controls consumer access to the ring buffer based on available sequences
69/// and dependencies.
70pub struct ProcessingSequenceBarrier<W: WaitingStrategy> {
71 /// Alert flag for shutdown signaling
72 alert: Arc<AtomicBool>,
73 /// Sequences that must advance before consumption can proceed
74 gating_sequences: Vec<Arc<AtomicSequence>>,
75 /// Strategy determining how threads wait for sequences
76 waiting_strategy: Arc<W>,
77}
78
79impl<W: WaitingStrategy> ProcessingSequenceBarrier<W> {
80 /// Creates a new processing sequence barrier.
81 ///
82 /// # Parameters
83 /// - `alert`: Shutdown signal flag
84 /// - `gating_sequences`: Dependencies that must advance before consumption
85 /// - `waiting_strategy`: How threads should wait for sequences
86 pub fn new(
87 alert: Arc<AtomicBool>,
88 gating_sequences: Vec<Arc<AtomicSequence>>,
89 waiting_strategy: Arc<W>,
90 ) -> Self {
91 Self {
92 alert,
93 gating_sequences,
94 waiting_strategy,
95 }
96 }
97}
98
99impl<W: WaitingStrategy> SequenceBarrier for ProcessingSequenceBarrier<W> {
100 /// Waits for a specific sequence to become available.
101 fn wait_for(&self, sequence: Sequence) -> Option<Sequence> {
102 self.waiting_strategy
103 .wait_for(sequence, &self.gating_sequences, || {
104 self.alert.load(Ordering::Relaxed)
105 })
106 }
107
108 /// Signals waiting threads that new sequences may be available.
109 fn signal(&self) {
110 self.waiting_strategy.signal_all_when_blocking()
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use crate::waiting::BusySpinWaitStrategy;
118
119 #[test]
120 fn test_wait_for_returns_sequence_when_available() {
121 let alert = Arc::new(AtomicBool::new(false));
122 let seq = Arc::new(AtomicSequence::default());
123 seq.set(5);
124 let gating_sequences = vec![seq];
125 let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
126
127 let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
128
129 assert_eq!(barrier.wait_for(5), Some(5));
130 }
131
132 #[test]
133 fn test_wait_for_returns_none_when_alerted() {
134 let alert = Arc::new(AtomicBool::new(true));
135 let seq = Arc::new(AtomicSequence::default());
136 let gating_sequences = vec![seq];
137 let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
138
139 let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
140
141 assert_eq!(barrier.wait_for(5), None);
142 }
143
144 #[test]
145 fn test_signal_calls_waiting_strategy() {
146 let alert = Arc::new(AtomicBool::new(false));
147 let seq = Arc::new(AtomicSequence::default());
148 let gating_sequences = vec![seq];
149 let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
150
151 let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
152
153 barrier.signal(); // Just verify it doesn't panic
154 }
155
156 #[test]
157 fn test_multiple_gating_sequences() {
158 let alert = Arc::new(AtomicBool::new(false));
159 let seq1 = Arc::new(AtomicSequence::default());
160 let seq2 = Arc::new(AtomicSequence::default());
161 seq1.set(5);
162 seq2.set(3);
163 let gating_sequences = vec![seq1, seq2];
164 let waiting_strategy = Arc::new(BusySpinWaitStrategy::new());
165
166 let barrier = ProcessingSequenceBarrier::new(alert, gating_sequences, waiting_strategy);
167
168 // Should wait for minimum sequence (3)
169 assert_eq!(barrier.wait_for(3), Some(3));
170 }
171}