1use crate::utils::CacheAligned;
2use std::cell::UnsafeCell;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
5use std::thread;
6
7#[derive(Debug)]
11pub struct Sequence {
12 value: CacheAligned<AtomicI64>,
13}
14
15impl Sequence {
16 pub fn new(initial: i64) -> Self {
18 Sequence {
19 value: CacheAligned::new(AtomicI64::new(initial)),
20 }
21 }
22
23 pub fn get(&self) -> i64 {
25 self.value.load(Ordering::Acquire)
26 }
27
28 pub fn set(&self, value: i64) {
30 self.value.store(value, Ordering::Release);
31 }
32
33 pub fn compare_and_set(&self, current: i64, new: i64) -> bool {
37 self.value
38 .compare_exchange(current, new, Ordering::SeqCst, Ordering::Relaxed)
39 .is_ok()
40 }
41}
42
43pub trait Sequencer: Send + Sync {
45 fn next(&self) -> i64;
47
48 fn publish(&self, sequence: i64);
50
51 fn get_cursor(&self) -> i64;
53
54 fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>);
56
57 fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64;
59}
60
61pub struct SingleProducerSequencer {
65 cursor: Arc<Sequence>,
67 next_sequence: Sequence,
69 gating_sequences: UnsafeCell<Vec<Arc<Sequence>>>,
71 buffer_size: usize,
73 wait_strategy: Arc<dyn WaitStrategy>,
75}
76
77unsafe impl Send for SingleProducerSequencer {}
78unsafe impl Sync for SingleProducerSequencer {}
79
80impl SingleProducerSequencer {
81 pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
82 Self {
83 cursor: Arc::new(Sequence::new(-1)),
84 next_sequence: Sequence::new(-1),
85 gating_sequences: UnsafeCell::new(Vec::new()),
86 buffer_size,
87 wait_strategy,
88 }
89 }
90}
91
92impl Sequencer for SingleProducerSequencer {
93 fn next(&self) -> i64 {
94 let next = self.next_sequence.get() + 1;
95 self.next_sequence.set(next);
96
97 let wrap_point = next - self.buffer_size as i64;
98 let gating_sequences = unsafe { &*self.gating_sequences.get() };
99
100 let mut min_gating_sequence = i64::MAX;
103 for seq in gating_sequences {
104 let s = seq.get();
105 if s < min_gating_sequence {
106 min_gating_sequence = s;
107 }
108 }
109
110 if wrap_point > min_gating_sequence {
111 while wrap_point > min_gating_sequence {
112 thread::yield_now();
113 min_gating_sequence = i64::MAX;
114 for seq in gating_sequences {
115 let s = seq.get();
116 if s < min_gating_sequence {
117 min_gating_sequence = s;
118 }
119 }
120 }
121 }
122
123 next
124 }
125
126 fn publish(&self, sequence: i64) {
127 self.cursor.set(sequence);
128 self.wait_strategy.signal_all_when_blocking();
129 }
130
131 fn get_cursor(&self) -> i64 {
132 self.cursor.get()
133 }
134
135 fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
136 unsafe {
137 (*self.gating_sequences.get()).extend(sequences);
138 }
139 }
140
141 fn get_highest_published_sequence(&self, _next_sequence: i64, available_sequence: i64) -> i64 {
142 available_sequence
143 }
144}
145
146pub struct MultiProducerSequencer {
150 gating_sequences: UnsafeCell<Vec<Arc<Sequence>>>,
152 buffer_size: usize,
154 wait_strategy: Arc<dyn WaitStrategy>,
156 claim_sequence: AtomicI64,
158 available_buffer: Box<[AtomicI64]>,
160 mask: usize,
162}
163
164unsafe impl Send for MultiProducerSequencer {}
165unsafe impl Sync for MultiProducerSequencer {}
166
167impl MultiProducerSequencer {
168 pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
169 let mut available_buffer = Vec::with_capacity(buffer_size);
170 for _ in 0..buffer_size {
171 available_buffer.push(AtomicI64::new(-1));
172 }
173
174 Self {
175 gating_sequences: UnsafeCell::new(Vec::new()),
176 buffer_size,
177 wait_strategy,
178 claim_sequence: AtomicI64::new(-1),
179 available_buffer: available_buffer.into_boxed_slice(),
180 mask: buffer_size - 1,
181 }
182 }
183}
184
185impl Sequencer for MultiProducerSequencer {
186 fn next(&self) -> i64 {
187 let current = self.claim_sequence.fetch_add(1, Ordering::SeqCst);
188 let next = current + 1;
189
190 let wrap_point = next - self.buffer_size as i64;
191 let gating_sequences = unsafe { &*self.gating_sequences.get() };
192
193 let mut min_gating_sequence = i64::MAX;
194 for seq in gating_sequences {
195 let s = seq.get();
196 if s < min_gating_sequence {
197 min_gating_sequence = s;
198 }
199 }
200
201 if wrap_point > min_gating_sequence {
202 while wrap_point > min_gating_sequence {
203 thread::yield_now();
204 min_gating_sequence = i64::MAX;
205 for seq in gating_sequences {
206 let s = seq.get();
207 if s < min_gating_sequence {
208 min_gating_sequence = s;
209 }
210 }
211 }
212 }
213
214 next
215 }
216
217 fn publish(&self, sequence: i64) {
218 let index = (sequence as usize) & self.mask;
219 self.available_buffer[index].store(sequence, Ordering::Release);
220 self.wait_strategy.signal_all_when_blocking();
221 }
222
223 fn get_cursor(&self) -> i64 {
224 self.claim_sequence.load(Ordering::Relaxed)
225 }
226
227 fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
228 unsafe {
229 (*self.gating_sequences.get()).extend(sequences);
230 }
231 }
232
233 fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64 {
234 let mut sequence = next_sequence;
238 while sequence <= available_sequence {
239 if !self.is_published(sequence) {
240 return sequence - 1;
241 }
242 sequence += 1;
243 }
244 available_sequence
245 }
246}
247
248impl MultiProducerSequencer {
249 fn is_published(&self, sequence: i64) -> bool {
250 let index = (sequence as usize) & self.mask;
251 self.available_buffer[index].load(Ordering::Acquire) == sequence
252 }
253}
254
255pub trait WaitStrategy: Send + Sync {
257 fn wait_for(
259 &self,
260 sequence: i64,
261 cursor: &Arc<dyn Sequencer>,
262 dependent: &Arc<dyn Sequencer>,
263 barrier: &ProcessingSequenceBarrier,
264 ) -> Result<i64, AlertException>;
265
266 fn signal_all_when_blocking(&self);
268}
269
270#[derive(Debug, Clone, Copy)]
271pub struct AlertException;
272
273pub struct BusySpinWaitStrategy;
277
278impl WaitStrategy for BusySpinWaitStrategy {
279 fn wait_for(
280 &self,
281 sequence: i64,
282 _cursor: &Arc<dyn Sequencer>,
283 dependent: &Arc<dyn Sequencer>,
284 barrier: &ProcessingSequenceBarrier,
285 ) -> Result<i64, AlertException> {
286 let mut available_sequence;
287 loop {
288 if barrier.is_alerted() {
289 return Err(AlertException);
290 }
291 available_sequence = dependent.get_cursor();
292 if available_sequence >= sequence {
293 return Ok(available_sequence);
294 }
295 std::hint::spin_loop();
296 }
297 }
298
299 fn signal_all_when_blocking(&self) {}
300}
301
302pub struct YieldingWaitStrategy;
306
307impl WaitStrategy for YieldingWaitStrategy {
308 fn wait_for(
309 &self,
310 sequence: i64,
311 _cursor: &Arc<dyn Sequencer>,
312 dependent: &Arc<dyn Sequencer>,
313 barrier: &ProcessingSequenceBarrier,
314 ) -> Result<i64, AlertException> {
315 let mut counter = 100;
316 let mut available_sequence;
317 loop {
318 if barrier.is_alerted() {
319 return Err(AlertException);
320 }
321 available_sequence = dependent.get_cursor();
322 if available_sequence >= sequence {
323 return Ok(available_sequence);
324 }
325
326 counter -= 1;
327 if counter == 0 {
328 thread::yield_now();
329 counter = 100;
330 } else {
331 std::hint::spin_loop();
332 }
333 }
334 }
335
336 fn signal_all_when_blocking(&self) {}
337}
338
339pub struct BlockingWaitStrategy {
343 mutex: std::sync::Mutex<()>,
344 condvar: std::sync::Condvar,
345}
346
347impl Default for BlockingWaitStrategy {
348 fn default() -> Self {
349 Self::new()
350 }
351}
352
353impl BlockingWaitStrategy {
354 pub fn new() -> Self {
356 Self {
357 mutex: std::sync::Mutex::new(()),
358 condvar: std::sync::Condvar::new(),
359 }
360 }
361}
362
363impl WaitStrategy for BlockingWaitStrategy {
364 fn wait_for(
365 &self,
366 sequence: i64,
367 _cursor: &Arc<dyn Sequencer>,
368 dependent: &Arc<dyn Sequencer>,
369 barrier: &ProcessingSequenceBarrier,
370 ) -> Result<i64, AlertException> {
371 let mut available_sequence = dependent.get_cursor();
372 if available_sequence < sequence {
373 let mut guard = self.mutex.lock().unwrap();
374 while dependent.get_cursor() < sequence {
375 if barrier.is_alerted() {
376 return Err(AlertException);
377 }
378 guard = self.condvar.wait(guard).unwrap();
379 }
382 available_sequence = dependent.get_cursor();
383 }
384
385 while available_sequence < sequence {
386 if barrier.is_alerted() {
387 return Err(AlertException);
388 }
389 available_sequence = dependent.get_cursor();
390 thread::yield_now();
392 }
393
394 Ok(available_sequence)
395 }
396
397 fn signal_all_when_blocking(&self) {
398 let _guard = self.mutex.lock().unwrap();
399 self.condvar.notify_all();
400 }
401}
402
403pub struct ProcessingSequenceBarrier {
405 wait_strategy: Arc<dyn WaitStrategy>,
407 dependent_sequencer: Arc<dyn Sequencer>,
409 cursor_sequencer: Arc<dyn Sequencer>,
411 alerted: AtomicBool,
413}
414
415impl ProcessingSequenceBarrier {
416 pub fn new(
418 wait_strategy: Arc<dyn WaitStrategy>,
419 dependent_sequencer: Arc<dyn Sequencer>,
420 cursor_sequencer: Arc<dyn Sequencer>,
421 ) -> Self {
422 Self {
423 wait_strategy,
424 dependent_sequencer,
425 cursor_sequencer,
426 alerted: AtomicBool::new(false),
427 }
428 }
429
430 pub fn wait_for(&self, sequence: i64) -> Result<i64, AlertException> {
432 let available = self.wait_strategy.wait_for(
433 sequence,
434 &self.cursor_sequencer,
435 &self.dependent_sequencer,
436 self,
437 )?;
438
439 Ok(self
441 .cursor_sequencer
442 .get_highest_published_sequence(sequence, available))
443 }
444
445 pub fn is_alerted(&self) -> bool {
447 self.alerted.load(Ordering::Relaxed)
448 }
449
450 pub fn alert(&self) {
452 self.alerted.store(true, Ordering::Relaxed);
453 self.wait_strategy.signal_all_when_blocking();
454 }
455
456 pub fn clear_alert(&self) {
458 self.alerted.store(false, Ordering::Relaxed);
459 }
460}
461
462pub trait EventHandler<T>: Send + Sync {
464 fn on_event(&self, event: &T, sequence: u64, end_of_batch: bool);
466}
467
468pub struct RingBuffer<T> {
470 buffer: Box<[UnsafeCell<T>]>,
472 mask: usize,
474 sequencer: Arc<dyn Sequencer>,
476}
477
478unsafe impl<T: Send> Send for RingBuffer<T> {}
479unsafe impl<T: Send> Sync for RingBuffer<T> {}
480
481impl<T> RingBuffer<T> {
482 pub fn new<F>(factory: F, size: usize, sequencer: Arc<dyn Sequencer>) -> Self
484 where
485 F: Fn() -> T,
486 {
487 let capacity = size.next_power_of_two();
488 let mut buffer = Vec::with_capacity(capacity);
489 for _ in 0..capacity {
490 buffer.push(UnsafeCell::new(factory()));
491 }
492
493 Self {
494 buffer: buffer.into_boxed_slice(),
495 mask: capacity - 1,
496 sequencer,
497 }
498 }
499
500 pub fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
502 self.sequencer.add_gating_sequences(sequences);
503 }
504
505 pub fn get(&self, sequence: i64) -> &T {
507 unsafe { &*self.buffer[(sequence as usize) & self.mask].get() }
508 }
509
510 pub fn get_mut(&mut self, sequence: i64) -> &mut T {
512 unsafe { &mut *self.buffer[(sequence as usize) & self.mask].get() }
513 }
514
515 pub unsafe fn get_unchecked(&self, sequence: i64) -> &T {
522 unsafe {
523 &*self
524 .buffer
525 .get_unchecked((sequence as usize) & self.mask)
526 .get()
527 }
528 }
529
530 #[allow(clippy::mut_from_ref)]
537 pub unsafe fn get_unchecked_mut(&self, sequence: i64) -> &mut T {
538 unsafe {
539 &mut *self
540 .buffer
541 .get_unchecked((sequence as usize) & self.mask)
542 .get()
543 }
544 }
545
546 pub fn next(&self) -> i64 {
548 self.sequencer.next()
549 }
550
551 pub fn publish(&self, sequence: i64) {
553 self.sequencer.publish(sequence);
554 }
555}
556
557pub struct Producer<T> {
559 ring_buffer: Arc<RingBuffer<T>>,
560}
561
562impl<T> Producer<T> {
563 pub fn publish<F>(&mut self, update: F)
565 where
566 F: FnOnce(&mut T),
567 {
568 let sequence = self.ring_buffer.next();
569 let event = unsafe { self.ring_buffer.get_unchecked_mut(sequence) };
571 update(event);
572 self.ring_buffer.publish(sequence);
573 }
574}
575
576pub struct BatchEventProcessor<T> {
578 ring_buffer: Arc<RingBuffer<T>>,
580 sequence: Arc<Sequence>,
582 barrier: Arc<ProcessingSequenceBarrier>,
584 handler: Arc<dyn EventHandler<T>>,
586}
587
588impl<T> BatchEventProcessor<T> {
589 pub fn new(
591 ring_buffer: Arc<RingBuffer<T>>,
592 barrier: Arc<ProcessingSequenceBarrier>,
593 handler: Arc<dyn EventHandler<T>>,
594 ) -> Self {
595 Self {
596 ring_buffer,
597 sequence: Arc::new(Sequence::new(-1)),
598 barrier,
599 handler,
600 }
601 }
602
603 pub fn get_sequence(&self) -> Arc<Sequence> {
605 self.sequence.clone()
606 }
607
608 pub fn run(&self) {
610 let mut next_sequence = self.sequence.get() + 1;
611 loop {
612 match self.barrier.wait_for(next_sequence) {
613 Ok(available_sequence) => {
614 while next_sequence <= available_sequence {
615 let event = unsafe { self.ring_buffer.get_unchecked(next_sequence) };
617 self.handler.on_event(
618 event,
619 next_sequence as u64,
620 next_sequence == available_sequence,
621 );
622 next_sequence += 1;
623 }
624 self.sequence.set(available_sequence);
625 }
626 Err(_) => {
627 if self.barrier.is_alerted() {
628 break;
629 }
630 }
631 }
632 }
633 }
634}
635
636pub struct Disruptor<T> {
638 ring_buffer: Arc<RingBuffer<T>>,
640 processors: Vec<Arc<BatchEventProcessor<T>>>,
642 started: bool,
644 wait_strategy: Arc<dyn WaitStrategy>,
646}
647
648pub enum ProducerType {
649 Single,
650 Multi,
651}
652
653pub struct DisruptorBuilder<T, F> {
654 factory: F,
656 buffer_size: usize,
658 wait_strategy: Arc<dyn WaitStrategy>,
660 producer_type: ProducerType,
662 _marker: std::marker::PhantomData<T>,
663}
664
665impl<T, F> DisruptorBuilder<T, F>
666where
667 F: Fn() -> T,
668{
669 pub fn new(factory: F) -> Self {
671 Self {
672 factory,
673 buffer_size: 1024,
674 wait_strategy: Arc::new(BusySpinWaitStrategy),
675 producer_type: ProducerType::Single,
676 _marker: std::marker::PhantomData,
677 }
678 }
679
680 pub fn buffer_size(mut self, size: usize) -> Self {
682 self.buffer_size = size;
683 self
684 }
685
686 pub fn wait_strategy<W: WaitStrategy + 'static>(mut self, strategy: W) -> Self {
688 self.wait_strategy = Arc::new(strategy);
689 self
690 }
691
692 pub fn single_producer(mut self) -> Self {
694 self.producer_type = ProducerType::Single;
695 self
696 }
697
698 pub fn multi_producer(mut self) -> Self {
700 self.producer_type = ProducerType::Multi;
701 self
702 }
703
704 pub fn build(self) -> Disruptor<T> {
706 let sequencer: Arc<dyn Sequencer> = match self.producer_type {
707 ProducerType::Single => Arc::new(SingleProducerSequencer::new(
708 self.buffer_size,
709 self.wait_strategy.clone(),
710 )),
711 ProducerType::Multi => Arc::new(MultiProducerSequencer::new(
712 self.buffer_size,
713 self.wait_strategy.clone(),
714 )),
715 };
716
717 let ring_buffer = Arc::new(RingBuffer::new(self.factory, self.buffer_size, sequencer));
718 Disruptor {
719 ring_buffer,
720 processors: Vec::new(),
721 started: false,
722 wait_strategy: self.wait_strategy,
723 }
724 }
725}
726
727impl<T: Send + Sync + 'static> Disruptor<T> {
728 pub fn builder<F>(factory: F) -> DisruptorBuilder<T, F>
730 where
731 F: Fn() -> T,
732 {
733 DisruptorBuilder::new(factory)
734 }
735
736 pub fn handle_events_with<H: EventHandler<T> + 'static>(&mut self, handler: H) -> &mut Self {
738 let barrier = Arc::new(ProcessingSequenceBarrier::new(
742 self.wait_strategy.clone(),
743 self.ring_buffer.sequencer.clone(), self.ring_buffer.sequencer.clone(),
745 ));
746
747 let processor = Arc::new(BatchEventProcessor::new(
748 self.ring_buffer.clone(),
749 barrier,
750 Arc::new(handler),
751 ));
752
753 self.processors.push(processor);
754 self
755 }
756
757 pub fn start(mut self) -> Producer<T> {
759 let mut gating_sequences = Vec::new();
760 for processor in &self.processors {
761 gating_sequences.push(processor.get_sequence());
762 let p = processor.clone();
763 thread::spawn(move || {
764 p.run();
765 });
766 }
767
768 self.ring_buffer.add_gating_sequences(gating_sequences);
772
773 self.started = true;
774
775 Producer {
776 ring_buffer: self.ring_buffer,
777 }
778 }
779}