disruptor_rs/
sequencer.rs

1//! The Sequencer is the heart of the Disruptor pattern, coordinating access to the ring buffer.
2//!
3//! # Overview
4//! The Sequencer manages the production and tracking of sequence numbers, which represent
5//! positions in the ring buffer. It serves several critical functions:
6//!
7//! 1. **Sequence Generation**: Provides unique, monotonically increasing sequence numbers
8//!    to producers, ensuring ordered data flow.
9//!
10//! 2. **Capacity Management**: Prevents buffer overflow by tracking consumer progress
11//!    through gating sequences and ensuring producers don't overwrite unprocessed data.
12//!
13//! 3. **Publisher Coordination**: Manages the publication of new entries and notifies
14//!    consumers when new data is available.
15//!
16//! # Single Producer Design
17//! This implementation (`SingleProducerSequencer`) is optimized for single-producer scenarios,
18//! avoiding the need for CAS operations when claiming sequences. Key features:
19//!
20//! - Uses an atomic cursor to track the last published sequence
21//! - Maintains gating sequences to track consumer progress
22//! - Supports configurable waiting strategies for different throughput/CPU trade-offs
23//!
24//! # Usage Example
25//! ```rust
26//! use disruptor_rs::{
27//!     sequencer::SingleProducerSequencer,
28//!     waiting::BusySpinWaitStrategy,
29//! };
30//!
31//! // Create a sequencer with a buffer of 1024 slots
32//! let sequencer = SingleProducerSequencer::new(1024, BusySpinWaitStrategy);
33//! ```
34//!
35//! # Producer Workflow
36//! 1. Producer requests next sequence(s) via `next()`
37//! 2. Writes data to the ring buffer at the claimed sequence(s)
38//! 3. Publishes sequences via `publish()` to make data visible to consumers
39//!
40//! # Consumer Coordination
41//! - Consumers track their progress using gating sequences
42//! - Sequencer ensures producers don't overwrite data still being processed
43//! - Waiting strategy determines how threads wait for available sequences
44
45use std::sync::atomic::{AtomicBool, Ordering};
46use std::sync::Arc;
47
48use crate::barrier::ProcessingSequenceBarrier;
49use crate::sequence::{AtomicSequence, Sequence};
50use crate::traits::Sequencer;
51use crate::traits::WaitingStrategy;
52use crate::utils::Utils;
53
54pub struct SingleProducerSequencer<W: WaitingStrategy> {
55    buffer_size: i64,
56    cursor: Arc<AtomicSequence>,
57    next_value: Sequence,
58    cached_value: Sequence,
59    gating_sequences: Vec<Arc<AtomicSequence>>,
60    waiting_strategy: Arc<W>,
61    is_done: Arc<AtomicBool>,
62}
63
64impl<W: WaitingStrategy> SingleProducerSequencer<W> {
65    pub fn new(buffer_size: usize, waiting_strategy: W) -> Self {
66        Self {
67            buffer_size: buffer_size as i64,
68            cursor: Arc::new(AtomicSequence::default()),
69            next_value: Sequence::from(0),
70            cached_value: Sequence::default(),
71            gating_sequences: Vec::new(),
72            waiting_strategy: Arc::new(waiting_strategy),
73            is_done: Default::default(),
74        }
75    }
76}
77
78impl<W: WaitingStrategy> Sequencer for SingleProducerSequencer<W> {
79    type Barrier = ProcessingSequenceBarrier<W>;
80
81    fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>) {
82        self.gating_sequences.push(gating_sequence.clone());
83    }
84
85    fn remove_gating_sequence(&mut self, sequence: &Arc<AtomicSequence>) -> bool {
86        let index = self
87            .gating_sequences
88            .iter()
89            .position(|s| Arc::ptr_eq(s, sequence));
90        if let Some(index) = index {
91            self.gating_sequences.remove(index);
92            true
93        } else {
94            false
95        }
96    }
97
98    fn create_sequence_barrier(&self, gating_sequences: &[Arc<AtomicSequence>]) -> Self::Barrier {
99        ProcessingSequenceBarrier::new(
100            self.is_done.clone(),
101            Vec::from(gating_sequences),
102            self.waiting_strategy.clone(),
103        )
104    }
105
106    fn get_cursor(&self) -> Arc<AtomicSequence> {
107        self.cursor.clone()
108    }
109
110    fn next(&mut self, n: Sequence) -> (Sequence, Sequence) {
111        let mut min_sequence = self.cached_value;
112        let next = self.next_value;
113        let (start, end) = (next, next + (n - 1));
114
115        while min_sequence + self.buffer_size < end {
116            if let Some(new_min_sequence) =
117                self.waiting_strategy
118                    .wait_for(min_sequence, &self.gating_sequences, || false)
119            {
120                min_sequence = new_min_sequence;
121            } else {
122                break;
123            }
124        }
125
126        self.cached_value = min_sequence;
127        self.next_value = end + 1;
128
129        (start, end)
130    }
131
132    fn publish(&self, _: Sequence, high: Sequence) {
133        self.cursor.set(high);
134        self.waiting_strategy.signal_all_when_blocking();
135    }
136
137    fn drain(self) {
138        let current = self.next_value - 1;
139        while Utils::get_minimum_sequence(&self.gating_sequences) < current {
140            self.waiting_strategy.signal_all_when_blocking();
141        }
142        self.is_done.store(true, Ordering::SeqCst);
143        self.waiting_strategy.signal_all_when_blocking();
144    }
145}
146
147impl<W: WaitingStrategy> Drop for SingleProducerSequencer<W> {
148    fn drop(&mut self) {
149        self.is_done.store(true, Ordering::SeqCst);
150        self.waiting_strategy.signal_all_when_blocking();
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use crate::sequence::AtomicSequence;
157    use crate::sequencer::{Sequencer, SingleProducerSequencer};
158    use crate::waiting::BusySpinWaitStrategy;
159    use std::sync::Arc;
160
161    const BUFFER_SIZE: usize = 16;
162    const BUFFER_SIZE_I64: i64 = BUFFER_SIZE as i64;
163
164    #[test]
165    fn test_get_cursor() {
166        let sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
167        assert_eq!(sequencer.get_cursor().get(), -1);
168    }
169
170    #[test]
171    fn test_next() {
172        let gating_sequence = Arc::new(AtomicSequence::default());
173        let mut sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
174        sequencer.add_gating_sequence(&gating_sequence);
175        assert_eq!(sequencer.next(1), (0, 0));
176        sequencer.publish(0, 0);
177        gating_sequence.set(0);
178        assert_eq!(sequencer.next(BUFFER_SIZE_I64), (1, BUFFER_SIZE_I64));
179    }
180
181    #[test]
182    fn test_publish() {
183        let sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
184        sequencer.publish(0, 10);
185        assert_eq!(sequencer.cursor.get(), 10);
186    }
187
188    #[test]
189    fn test_add_gating_sequences() {
190        let mut sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
191        let gating_sequence = Arc::new(AtomicSequence::default());
192        sequencer.add_gating_sequence(&gating_sequence);
193        assert_eq!(sequencer.gating_sequences.len(), 1);
194        assert_eq!(sequencer.gating_sequences[0], gating_sequence);
195    }
196
197    #[test]
198    fn test_remove_gating_sequence() {
199        let mut sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
200        let gating_sequence = Arc::new(AtomicSequence::default());
201        sequencer.add_gating_sequence(&gating_sequence);
202        assert_eq!(sequencer.gating_sequences.len(), 1);
203        assert!(sequencer.remove_gating_sequence(&gating_sequence));
204        assert_eq!(sequencer.gating_sequences.len(), 0);
205        assert!(!sequencer.remove_gating_sequence(&gating_sequence));
206    }
207}