disruptor_rs/
sequencer.rs1use std::sync::atomic::{AtomicBool, Ordering};
46use std::sync::Arc;
47
48use crate::barrier::ProcessingSequenceBarrier;
49use crate::sequence::{AtomicSequence, Sequence};
50use crate::traits::Sequencer;
51use crate::traits::WaitingStrategy;
52use crate::utils::Utils;
53
54pub struct SingleProducerSequencer<W: WaitingStrategy> {
55 buffer_size: i64,
56 cursor: Arc<AtomicSequence>,
57 next_value: Sequence,
58 cached_value: Sequence,
59 gating_sequences: Vec<Arc<AtomicSequence>>,
60 waiting_strategy: Arc<W>,
61 is_done: Arc<AtomicBool>,
62}
63
64impl<W: WaitingStrategy> SingleProducerSequencer<W> {
65 pub fn new(buffer_size: usize, waiting_strategy: W) -> Self {
66 Self {
67 buffer_size: buffer_size as i64,
68 cursor: Arc::new(AtomicSequence::default()),
69 next_value: Sequence::from(0),
70 cached_value: Sequence::default(),
71 gating_sequences: Vec::new(),
72 waiting_strategy: Arc::new(waiting_strategy),
73 is_done: Default::default(),
74 }
75 }
76}
77
78impl<W: WaitingStrategy> Sequencer for SingleProducerSequencer<W> {
79 type Barrier = ProcessingSequenceBarrier<W>;
80
81 fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>) {
82 self.gating_sequences.push(gating_sequence.clone());
83 }
84
85 fn remove_gating_sequence(&mut self, sequence: &Arc<AtomicSequence>) -> bool {
86 let index = self
87 .gating_sequences
88 .iter()
89 .position(|s| Arc::ptr_eq(s, sequence));
90 if let Some(index) = index {
91 self.gating_sequences.remove(index);
92 true
93 } else {
94 false
95 }
96 }
97
98 fn create_sequence_barrier(&self, gating_sequences: &[Arc<AtomicSequence>]) -> Self::Barrier {
99 ProcessingSequenceBarrier::new(
100 self.is_done.clone(),
101 Vec::from(gating_sequences),
102 self.waiting_strategy.clone(),
103 )
104 }
105
106 fn get_cursor(&self) -> Arc<AtomicSequence> {
107 self.cursor.clone()
108 }
109
110 fn next(&mut self, n: Sequence) -> (Sequence, Sequence) {
111 let mut min_sequence = self.cached_value;
112 let next = self.next_value;
113 let (start, end) = (next, next + (n - 1));
114
115 while min_sequence + self.buffer_size < end {
116 if let Some(new_min_sequence) =
117 self.waiting_strategy
118 .wait_for(min_sequence, &self.gating_sequences, || false)
119 {
120 min_sequence = new_min_sequence;
121 } else {
122 break;
123 }
124 }
125
126 self.cached_value = min_sequence;
127 self.next_value = end + 1;
128
129 (start, end)
130 }
131
132 fn publish(&self, _: Sequence, high: Sequence) {
133 self.cursor.set(high);
134 self.waiting_strategy.signal_all_when_blocking();
135 }
136
137 fn drain(self) {
138 let current = self.next_value - 1;
139 while Utils::get_minimum_sequence(&self.gating_sequences) < current {
140 self.waiting_strategy.signal_all_when_blocking();
141 }
142 self.is_done.store(true, Ordering::SeqCst);
143 self.waiting_strategy.signal_all_when_blocking();
144 }
145}
146
147impl<W: WaitingStrategy> Drop for SingleProducerSequencer<W> {
148 fn drop(&mut self) {
149 self.is_done.store(true, Ordering::SeqCst);
150 self.waiting_strategy.signal_all_when_blocking();
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use crate::sequence::AtomicSequence;
157 use crate::sequencer::{Sequencer, SingleProducerSequencer};
158 use crate::waiting::BusySpinWaitStrategy;
159 use std::sync::Arc;
160
161 const BUFFER_SIZE: usize = 16;
162 const BUFFER_SIZE_I64: i64 = BUFFER_SIZE as i64;
163
164 #[test]
165 fn test_get_cursor() {
166 let sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
167 assert_eq!(sequencer.get_cursor().get(), -1);
168 }
169
170 #[test]
171 fn test_next() {
172 let gating_sequence = Arc::new(AtomicSequence::default());
173 let mut sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
174 sequencer.add_gating_sequence(&gating_sequence);
175 assert_eq!(sequencer.next(1), (0, 0));
176 sequencer.publish(0, 0);
177 gating_sequence.set(0);
178 assert_eq!(sequencer.next(BUFFER_SIZE_I64), (1, BUFFER_SIZE_I64));
179 }
180
181 #[test]
182 fn test_publish() {
183 let sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
184 sequencer.publish(0, 10);
185 assert_eq!(sequencer.cursor.get(), 10);
186 }
187
188 #[test]
189 fn test_add_gating_sequences() {
190 let mut sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
191 let gating_sequence = Arc::new(AtomicSequence::default());
192 sequencer.add_gating_sequence(&gating_sequence);
193 assert_eq!(sequencer.gating_sequences.len(), 1);
194 assert_eq!(sequencer.gating_sequences[0], gating_sequence);
195 }
196
197 #[test]
198 fn test_remove_gating_sequence() {
199 let mut sequencer = SingleProducerSequencer::new(BUFFER_SIZE, BusySpinWaitStrategy);
200 let gating_sequence = Arc::new(AtomicSequence::default());
201 sequencer.add_gating_sequence(&gating_sequence);
202 assert_eq!(sequencer.gating_sequences.len(), 1);
203 assert!(sequencer.remove_gating_sequence(&gating_sequence));
204 assert_eq!(sequencer.gating_sequences.len(), 0);
205 assert!(!sequencer.remove_gating_sequence(&gating_sequence));
206 }
207}