1use crate::utils::CacheAligned;
2use kovan::Atom;
3use std::cell::UnsafeCell;
4use std::marker::PhantomData as marker;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
7use std::thread;
8
9#[derive(Debug)]
13pub struct Sequence {
14 value: CacheAligned<AtomicI64>,
15}
16
17impl Sequence {
18 pub fn new(initial: i64) -> Self {
20 Sequence {
21 value: CacheAligned::new(AtomicI64::new(initial)),
22 }
23 }
24
25 pub fn get(&self) -> i64 {
27 self.value.load(Ordering::Acquire)
28 }
29
30 pub fn set(&self, value: i64) {
32 self.value.store(value, Ordering::Release);
33 }
34
35 pub fn compare_and_set(&self, current: i64, new: i64) -> bool {
39 self.value
40 .compare_exchange(current, new, Ordering::SeqCst, Ordering::Relaxed)
41 .is_ok()
42 }
43}
44
45pub trait Sequencer: Send + Sync {
47 fn next(&self) -> i64;
49
50 fn publish(&self, sequence: i64);
52
53 fn get_cursor(&self) -> i64;
55
56 fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>);
58
59 fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64;
61}
62
63pub struct SingleProducerSequencer {
67 cursor: Arc<Sequence>,
69 next_sequence: Sequence,
71 gating_sequences: Atom<Vec<Arc<Sequence>>>,
73 buffer_size: usize,
75 wait_strategy: Arc<dyn WaitStrategy>,
77}
78
79impl SingleProducerSequencer {
85 pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
86 Self {
87 cursor: Arc::new(Sequence::new(-1)),
88 next_sequence: Sequence::new(-1),
89 gating_sequences: Atom::new(Vec::new()),
90 buffer_size,
91 wait_strategy,
92 }
93 }
94}
95
96impl Sequencer for SingleProducerSequencer {
97 fn next(&self) -> i64 {
98 let next = self.next_sequence.get() + 1;
99 self.next_sequence.set(next);
100
101 let wrap_point = next - self.buffer_size as i64;
102 let gating_guard = self.gating_sequences.load();
105 let gating_sequences: &[Arc<Sequence>] = gating_guard.as_slice();
106
107 let min_seq =
108 |seqs: &[Arc<Sequence>]| seqs.iter().map(|s| s.get()).min().unwrap_or(i64::MAX);
109
110 let mut min_gating_sequence = min_seq(gating_sequences);
111
112 while wrap_point > min_gating_sequence {
113 thread::yield_now();
114 min_gating_sequence = min_seq(gating_sequences);
115 }
116
117 next
118 }
119
120 fn publish(&self, sequence: i64) {
121 self.cursor.set(sequence);
122 self.wait_strategy.signal_all_when_blocking();
123 }
124
125 fn get_cursor(&self) -> i64 {
126 self.cursor.get()
127 }
128
129 fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
130 self.gating_sequences.rcu(|current| {
133 let mut new_list = current.clone();
134 new_list.extend(sequences.iter().cloned());
135 new_list
136 });
137 }
138
139 fn get_highest_published_sequence(&self, _next_sequence: i64, available_sequence: i64) -> i64 {
140 available_sequence
141 }
142}
143
144pub struct MultiProducerSequencer {
148 gating_sequences: Atom<Vec<Arc<Sequence>>>,
150 buffer_size: usize,
152 wait_strategy: Arc<dyn WaitStrategy>,
154 claim_sequence: AtomicI64,
156 available_buffer: Box<[AtomicI64]>,
158 mask: usize,
160}
161
162impl MultiProducerSequencer {
167 pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
168 let mut available_buffer = Vec::with_capacity(buffer_size);
169 for _ in 0..buffer_size {
170 available_buffer.push(AtomicI64::new(-1));
171 }
172
173 Self {
174 gating_sequences: Atom::new(Vec::new()),
175 buffer_size,
176 wait_strategy,
177 claim_sequence: AtomicI64::new(-1),
178 available_buffer: available_buffer.into_boxed_slice(),
179 mask: buffer_size - 1,
180 }
181 }
182}
183
184impl Sequencer for MultiProducerSequencer {
185 fn next(&self) -> i64 {
186 let current = self.claim_sequence.fetch_add(1, Ordering::SeqCst);
187 let next = current + 1;
188
189 let wrap_point = next - self.buffer_size as i64;
190 let gating_guard = self.gating_sequences.load();
191 let gating_sequences: &[Arc<Sequence>] = gating_guard.as_slice();
192
193 let min_seq =
194 |seqs: &[Arc<Sequence>]| seqs.iter().map(|s| s.get()).min().unwrap_or(i64::MAX);
195
196 let mut min_gating_sequence = min_seq(gating_sequences);
197
198 while wrap_point > min_gating_sequence {
199 thread::yield_now();
200 min_gating_sequence = min_seq(gating_sequences);
201 }
202
203 next
204 }
205
206 fn publish(&self, sequence: i64) {
207 let index = (sequence as usize) & self.mask;
208 self.available_buffer[index].store(sequence, Ordering::Release);
209 self.wait_strategy.signal_all_when_blocking();
210 }
211
212 fn get_cursor(&self) -> i64 {
213 self.claim_sequence.load(Ordering::Relaxed)
214 }
215
216 fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
217 self.gating_sequences.rcu(|current| {
218 let mut new_list = current.clone();
219 new_list.extend(sequences.iter().cloned());
220 new_list
221 });
222 }
223
224 fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64 {
225 let mut sequence = next_sequence;
229 while sequence <= available_sequence {
230 if !self.is_published(sequence) {
231 return sequence - 1;
232 }
233 sequence += 1;
234 }
235 available_sequence
236 }
237}
238
239impl MultiProducerSequencer {
240 fn is_published(&self, sequence: i64) -> bool {
241 let index = (sequence as usize) & self.mask;
242 self.available_buffer[index].load(Ordering::Acquire) == sequence
243 }
244}
245
246pub trait WaitStrategy: Send + Sync {
248 fn wait_for(
250 &self,
251 sequence: i64,
252 cursor: &Arc<dyn Sequencer>,
253 dependent: &Arc<dyn Sequencer>,
254 barrier: &ProcessingSequenceBarrier,
255 ) -> Result<i64, AlertException>;
256
257 fn signal_all_when_blocking(&self);
259}
260
261#[derive(Debug, Clone, Copy)]
262pub struct AlertException;
263
264pub struct BusySpinWaitStrategy;
268
269impl WaitStrategy for BusySpinWaitStrategy {
270 fn wait_for(
271 &self,
272 sequence: i64,
273 _cursor: &Arc<dyn Sequencer>,
274 dependent: &Arc<dyn Sequencer>,
275 barrier: &ProcessingSequenceBarrier,
276 ) -> Result<i64, AlertException> {
277 let mut available_sequence;
278 loop {
279 if barrier.is_alerted() {
280 return Err(AlertException);
281 }
282 available_sequence = dependent.get_cursor();
283 if available_sequence >= sequence {
284 return Ok(available_sequence);
285 }
286 std::hint::spin_loop();
287 }
288 }
289
290 fn signal_all_when_blocking(&self) {}
291}
292
293pub struct YieldingWaitStrategy;
297
298impl WaitStrategy for YieldingWaitStrategy {
299 fn wait_for(
300 &self,
301 sequence: i64,
302 _cursor: &Arc<dyn Sequencer>,
303 dependent: &Arc<dyn Sequencer>,
304 barrier: &ProcessingSequenceBarrier,
305 ) -> Result<i64, AlertException> {
306 let mut counter = 100;
307 let mut available_sequence;
308 loop {
309 if barrier.is_alerted() {
310 return Err(AlertException);
311 }
312 available_sequence = dependent.get_cursor();
313 if available_sequence >= sequence {
314 return Ok(available_sequence);
315 }
316
317 counter -= 1;
318 if counter == 0 {
319 thread::yield_now();
320 counter = 100;
321 } else {
322 std::hint::spin_loop();
323 }
324 }
325 }
326
327 fn signal_all_when_blocking(&self) {}
328}
329
330pub struct BlockingWaitStrategy {
334 mutex: std::sync::Mutex<()>,
335 condvar: std::sync::Condvar,
336}
337
338impl Default for BlockingWaitStrategy {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344impl BlockingWaitStrategy {
345 pub fn new() -> Self {
347 Self {
348 mutex: std::sync::Mutex::new(()),
349 condvar: std::sync::Condvar::new(),
350 }
351 }
352}
353
354impl WaitStrategy for BlockingWaitStrategy {
355 fn wait_for(
356 &self,
357 sequence: i64,
358 _cursor: &Arc<dyn Sequencer>,
359 dependent: &Arc<dyn Sequencer>,
360 barrier: &ProcessingSequenceBarrier,
361 ) -> Result<i64, AlertException> {
362 let mut available_sequence = dependent.get_cursor();
363 if available_sequence < sequence {
364 let mut guard = self.mutex.lock().unwrap();
365 while dependent.get_cursor() < sequence {
366 if barrier.is_alerted() {
367 return Err(AlertException);
368 }
369 guard = self.condvar.wait(guard).unwrap();
370 }
373 available_sequence = dependent.get_cursor();
374 }
375
376 while available_sequence < sequence {
377 if barrier.is_alerted() {
378 return Err(AlertException);
379 }
380 available_sequence = dependent.get_cursor();
381 thread::yield_now();
383 }
384
385 Ok(available_sequence)
386 }
387
388 fn signal_all_when_blocking(&self) {
389 let _guard = self.mutex.lock().unwrap();
390 self.condvar.notify_all();
391 }
392}
393
394pub struct ProcessingSequenceBarrier {
396 wait_strategy: Arc<dyn WaitStrategy>,
398 dependent_sequencer: Arc<dyn Sequencer>,
400 cursor_sequencer: Arc<dyn Sequencer>,
402 alerted: AtomicBool,
404}
405
406impl ProcessingSequenceBarrier {
407 pub fn new(
409 wait_strategy: Arc<dyn WaitStrategy>,
410 dependent_sequencer: Arc<dyn Sequencer>,
411 cursor_sequencer: Arc<dyn Sequencer>,
412 ) -> Self {
413 Self {
414 wait_strategy,
415 dependent_sequencer,
416 cursor_sequencer,
417 alerted: AtomicBool::new(false),
418 }
419 }
420
421 pub fn wait_for(&self, sequence: i64) -> Result<i64, AlertException> {
423 let available = self.wait_strategy.wait_for(
424 sequence,
425 &self.cursor_sequencer,
426 &self.dependent_sequencer,
427 self,
428 )?;
429
430 Ok(self
432 .cursor_sequencer
433 .get_highest_published_sequence(sequence, available))
434 }
435
436 pub fn is_alerted(&self) -> bool {
438 self.alerted.load(Ordering::Acquire)
439 }
440
441 pub fn alert(&self) {
443 self.alerted.store(true, Ordering::Release);
444 self.wait_strategy.signal_all_when_blocking();
445 }
446
447 pub fn clear_alert(&self) {
449 self.alerted.store(false, Ordering::Release);
450 }
451}
452
453pub trait EventHandler<T>: Send + Sync {
455 fn on_event(&self, event: &T, sequence: u64, end_of_batch: bool);
457}
458
459pub struct RingBuffer<T> {
461 buffer: Box<[UnsafeCell<T>]>,
463 mask: usize,
465 sequencer: Arc<dyn Sequencer>,
467}
468
469unsafe impl<T: Send> Send for RingBuffer<T> {}
470unsafe impl<T: Send + Sync> Sync for RingBuffer<T> {}
473
474impl<T> RingBuffer<T> {
475 pub fn new<F>(factory: F, size: usize, sequencer: Arc<dyn Sequencer>) -> Self
477 where
478 F: Fn() -> T,
479 {
480 let capacity = size.next_power_of_two();
481 let mut buffer = Vec::with_capacity(capacity);
482 for _ in 0..capacity {
483 buffer.push(UnsafeCell::new(factory()));
484 }
485
486 Self {
487 buffer: buffer.into_boxed_slice(),
488 mask: capacity - 1,
489 sequencer,
490 }
491 }
492
493 pub fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
495 self.sequencer.add_gating_sequences(sequences);
496 }
497
498 pub unsafe fn get(&self, sequence: i64) -> &T {
510 unsafe { &*self.buffer[(sequence as usize) & self.mask].get() }
511 }
512
513 pub fn get_mut(&mut self, sequence: i64) -> &mut T {
515 unsafe { &mut *self.buffer[(sequence as usize) & self.mask].get() }
516 }
517
518 pub unsafe fn get_unchecked(&self, sequence: i64) -> &T {
525 unsafe {
526 &*self
527 .buffer
528 .get_unchecked((sequence as usize) & self.mask)
529 .get()
530 }
531 }
532
533 #[allow(clippy::mut_from_ref)]
540 pub unsafe fn get_unchecked_mut(&self, sequence: i64) -> &mut T {
541 unsafe {
542 &mut *self
543 .buffer
544 .get_unchecked((sequence as usize) & self.mask)
545 .get()
546 }
547 }
548
549 pub fn next(&self) -> i64 {
551 self.sequencer.next()
552 }
553
554 pub fn publish(&self, sequence: i64) {
556 self.sequencer.publish(sequence);
557 }
558}
559
560pub struct Producer<T> {
565 ring_buffer: Arc<RingBuffer<T>>,
566 barriers: Vec<Arc<ProcessingSequenceBarrier>>,
568 join_handles: Vec<Option<thread::JoinHandle<()>>>,
570}
571
572impl<T> Producer<T> {
573 pub fn publish<F>(&mut self, update: F)
575 where
576 F: FnOnce(&mut T),
577 {
578 let sequence = self.ring_buffer.next();
579 let event = unsafe { self.ring_buffer.get_unchecked_mut(sequence) };
581 update(event);
582 self.ring_buffer.publish(sequence);
583 }
584}
585
586impl<T> Drop for Producer<T> {
587 fn drop(&mut self) {
588 for barrier in &self.barriers {
590 barrier.alert();
591 }
592 for handle in &mut self.join_handles {
594 if let Some(h) = handle.take() {
595 let _ = h.join();
596 }
597 }
598 }
599}
600
601pub struct BatchEventProcessor<T> {
603 ring_buffer: Arc<RingBuffer<T>>,
605 sequence: Arc<Sequence>,
607 barrier: Arc<ProcessingSequenceBarrier>,
609 handler: Arc<dyn EventHandler<T>>,
611}
612
613impl<T> BatchEventProcessor<T> {
614 pub fn new(
616 ring_buffer: Arc<RingBuffer<T>>,
617 barrier: Arc<ProcessingSequenceBarrier>,
618 handler: Arc<dyn EventHandler<T>>,
619 ) -> Self {
620 Self {
621 ring_buffer,
622 sequence: Arc::new(Sequence::new(-1)),
623 barrier,
624 handler,
625 }
626 }
627
628 pub fn get_sequence(&self) -> Arc<Sequence> {
630 self.sequence.clone()
631 }
632
633 pub fn run(&self) {
635 let mut next_sequence = self.sequence.get() + 1;
636 loop {
637 match self.barrier.wait_for(next_sequence) {
638 Ok(available_sequence) => {
639 while next_sequence <= available_sequence {
640 let event = unsafe { self.ring_buffer.get_unchecked(next_sequence) };
642 self.handler.on_event(
643 event,
644 next_sequence as u64,
645 next_sequence == available_sequence,
646 );
647 next_sequence += 1;
648 }
649 self.sequence.set(available_sequence);
650 }
651 Err(_) => {
652 if self.barrier.is_alerted() {
653 break;
654 }
655 }
656 }
657 }
658 }
659}
660
661pub struct Disruptor<T> {
663 ring_buffer: Arc<RingBuffer<T>>,
665 processors: Vec<Arc<BatchEventProcessor<T>>>,
667 started: bool,
669 wait_strategy: Arc<dyn WaitStrategy>,
671}
672
673pub enum ProducerType {
674 Single,
675 Multi,
676}
677
678pub struct DisruptorBuilder<T, F> {
679 factory: F,
681 buffer_size: usize,
683 wait_strategy: Arc<dyn WaitStrategy>,
685 producer_type: ProducerType,
687 marker: marker<T>,
688}
689
690impl<T, F> DisruptorBuilder<T, F>
691where
692 F: Fn() -> T,
693{
694 pub fn new(factory: F) -> Self {
696 Self {
697 factory,
698 buffer_size: 1024,
699 wait_strategy: Arc::new(BusySpinWaitStrategy),
700 producer_type: ProducerType::Single,
701 marker: marker::<T>,
702 }
703 }
704
705 pub fn buffer_size(mut self, size: usize) -> Self {
707 self.buffer_size = size;
708 self
709 }
710
711 pub fn wait_strategy<W: WaitStrategy + 'static>(mut self, strategy: W) -> Self {
713 self.wait_strategy = Arc::new(strategy);
714 self
715 }
716
717 pub fn single_producer(mut self) -> Self {
719 self.producer_type = ProducerType::Single;
720 self
721 }
722
723 pub fn multi_producer(mut self) -> Self {
725 self.producer_type = ProducerType::Multi;
726 self
727 }
728
729 pub fn build(self) -> Disruptor<T> {
731 let sequencer: Arc<dyn Sequencer> = match self.producer_type {
732 ProducerType::Single => Arc::new(SingleProducerSequencer::new(
733 self.buffer_size,
734 self.wait_strategy.clone(),
735 )),
736 ProducerType::Multi => Arc::new(MultiProducerSequencer::new(
737 self.buffer_size,
738 self.wait_strategy.clone(),
739 )),
740 };
741
742 let ring_buffer = Arc::new(RingBuffer::new(self.factory, self.buffer_size, sequencer));
743 Disruptor {
744 ring_buffer,
745 processors: Vec::new(),
746 started: false,
747 wait_strategy: self.wait_strategy,
748 }
749 }
750}
751
752impl<T: Send + Sync + 'static> Disruptor<T> {
753 pub fn builder<F>(factory: F) -> DisruptorBuilder<T, F>
755 where
756 F: Fn() -> T,
757 {
758 DisruptorBuilder::new(factory)
759 }
760
761 pub fn handle_events_with<H: EventHandler<T> + 'static>(&mut self, handler: H) -> &mut Self {
763 let barrier = Arc::new(ProcessingSequenceBarrier::new(
767 self.wait_strategy.clone(),
768 self.ring_buffer.sequencer.clone(), self.ring_buffer.sequencer.clone(),
770 ));
771
772 let processor = Arc::new(BatchEventProcessor::new(
773 self.ring_buffer.clone(),
774 barrier,
775 Arc::new(handler),
776 ));
777
778 self.processors.push(processor);
779 self
780 }
781
782 pub fn start(mut self) -> Producer<T> {
788 let mut gating_sequences = Vec::new();
789 let mut barriers = Vec::new();
790 let mut join_handles = Vec::new();
791
792 for processor in &self.processors {
793 gating_sequences.push(processor.get_sequence());
794 barriers.push(processor.barrier.clone());
795 let p = processor.clone();
796 join_handles.push(Some(thread::spawn(move || {
797 p.run();
798 })));
799 }
800
801 self.ring_buffer.add_gating_sequences(gating_sequences);
803
804 self.started = true;
805
806 Producer {
807 ring_buffer: self.ring_buffer,
808 barriers,
809 join_handles,
810 }
811 }
812}