use crate::utils::CacheAligned;
use kovan::Atom;
use std::cell::UnsafeCell;
use std::marker::PhantomData as marker;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::thread;
#[derive(Debug)]
pub struct Sequence {
value: CacheAligned<AtomicI64>,
}
impl Sequence {
pub fn new(initial: i64) -> Self {
Sequence {
value: CacheAligned::new(AtomicI64::new(initial)),
}
}
pub fn get(&self) -> i64 {
self.value.load(Ordering::Acquire)
}
pub fn set(&self, value: i64) {
self.value.store(value, Ordering::Release);
}
pub fn compare_and_set(&self, current: i64, new: i64) -> bool {
self.value
.compare_exchange(current, new, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
}
}
pub trait Sequencer: Send + Sync {
fn next(&self) -> i64;
fn publish(&self, sequence: i64);
fn get_cursor(&self) -> i64;
fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>);
fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64;
}
pub struct SingleProducerSequencer {
cursor: Arc<Sequence>,
next_sequence: Sequence,
gating_sequences: Atom<Vec<Arc<Sequence>>>,
buffer_size: usize,
wait_strategy: Arc<dyn WaitStrategy>,
}
impl SingleProducerSequencer {
pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
Self {
cursor: Arc::new(Sequence::new(-1)),
next_sequence: Sequence::new(-1),
gating_sequences: Atom::new(Vec::new()),
buffer_size,
wait_strategy,
}
}
}
impl Sequencer for SingleProducerSequencer {
fn next(&self) -> i64 {
let next = self.next_sequence.get() + 1;
self.next_sequence.set(next);
let wrap_point = next - self.buffer_size as i64;
let gating_guard = self.gating_sequences.load();
let gating_sequences: &[Arc<Sequence>] = gating_guard.as_slice();
let min_seq =
|seqs: &[Arc<Sequence>]| seqs.iter().map(|s| s.get()).min().unwrap_or(i64::MAX);
let mut min_gating_sequence = min_seq(gating_sequences);
while wrap_point > min_gating_sequence {
thread::yield_now();
min_gating_sequence = min_seq(gating_sequences);
}
next
}
fn publish(&self, sequence: i64) {
self.cursor.set(sequence);
self.wait_strategy.signal_all_when_blocking();
}
fn get_cursor(&self) -> i64 {
self.cursor.get()
}
fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
self.gating_sequences.rcu(|current| {
let mut new_list = current.clone();
new_list.extend(sequences.iter().cloned());
new_list
});
}
fn get_highest_published_sequence(&self, _next_sequence: i64, available_sequence: i64) -> i64 {
available_sequence
}
}
pub struct MultiProducerSequencer {
gating_sequences: Atom<Vec<Arc<Sequence>>>,
buffer_size: usize,
wait_strategy: Arc<dyn WaitStrategy>,
claim_sequence: AtomicI64,
available_buffer: Box<[AtomicI64]>,
mask: usize,
}
impl MultiProducerSequencer {
pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
let mut available_buffer = Vec::with_capacity(buffer_size);
for _ in 0..buffer_size {
available_buffer.push(AtomicI64::new(-1));
}
Self {
gating_sequences: Atom::new(Vec::new()),
buffer_size,
wait_strategy,
claim_sequence: AtomicI64::new(-1),
available_buffer: available_buffer.into_boxed_slice(),
mask: buffer_size - 1,
}
}
}
impl Sequencer for MultiProducerSequencer {
fn next(&self) -> i64 {
let current = self.claim_sequence.fetch_add(1, Ordering::SeqCst);
let next = current + 1;
let wrap_point = next - self.buffer_size as i64;
let gating_guard = self.gating_sequences.load();
let gating_sequences: &[Arc<Sequence>] = gating_guard.as_slice();
let min_seq =
|seqs: &[Arc<Sequence>]| seqs.iter().map(|s| s.get()).min().unwrap_or(i64::MAX);
let mut min_gating_sequence = min_seq(gating_sequences);
while wrap_point > min_gating_sequence {
thread::yield_now();
min_gating_sequence = min_seq(gating_sequences);
}
next
}
fn publish(&self, sequence: i64) {
let index = (sequence as usize) & self.mask;
self.available_buffer[index].store(sequence, Ordering::Release);
self.wait_strategy.signal_all_when_blocking();
}
fn get_cursor(&self) -> i64 {
self.claim_sequence.load(Ordering::Relaxed)
}
fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
self.gating_sequences.rcu(|current| {
let mut new_list = current.clone();
new_list.extend(sequences.iter().cloned());
new_list
});
}
fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64 {
let mut sequence = next_sequence;
while sequence <= available_sequence {
if !self.is_published(sequence) {
return sequence - 1;
}
sequence += 1;
}
available_sequence
}
}
impl MultiProducerSequencer {
fn is_published(&self, sequence: i64) -> bool {
let index = (sequence as usize) & self.mask;
self.available_buffer[index].load(Ordering::Acquire) == sequence
}
}
pub trait WaitStrategy: Send + Sync {
fn wait_for(
&self,
sequence: i64,
cursor: &Arc<dyn Sequencer>,
dependent: &Arc<dyn Sequencer>,
barrier: &ProcessingSequenceBarrier,
) -> Result<i64, AlertException>;
fn signal_all_when_blocking(&self);
}
#[derive(Debug, Clone, Copy)]
pub struct AlertException;
pub struct BusySpinWaitStrategy;
impl WaitStrategy for BusySpinWaitStrategy {
fn wait_for(
&self,
sequence: i64,
_cursor: &Arc<dyn Sequencer>,
dependent: &Arc<dyn Sequencer>,
barrier: &ProcessingSequenceBarrier,
) -> Result<i64, AlertException> {
let mut available_sequence;
loop {
if barrier.is_alerted() {
return Err(AlertException);
}
available_sequence = dependent.get_cursor();
if available_sequence >= sequence {
return Ok(available_sequence);
}
std::hint::spin_loop();
}
}
fn signal_all_when_blocking(&self) {}
}
pub struct YieldingWaitStrategy;
impl WaitStrategy for YieldingWaitStrategy {
fn wait_for(
&self,
sequence: i64,
_cursor: &Arc<dyn Sequencer>,
dependent: &Arc<dyn Sequencer>,
barrier: &ProcessingSequenceBarrier,
) -> Result<i64, AlertException> {
let mut counter = 100;
let mut available_sequence;
loop {
if barrier.is_alerted() {
return Err(AlertException);
}
available_sequence = dependent.get_cursor();
if available_sequence >= sequence {
return Ok(available_sequence);
}
counter -= 1;
if counter == 0 {
thread::yield_now();
counter = 100;
} else {
std::hint::spin_loop();
}
}
}
fn signal_all_when_blocking(&self) {}
}
pub struct BlockingWaitStrategy {
mutex: std::sync::Mutex<()>,
condvar: std::sync::Condvar,
}
impl Default for BlockingWaitStrategy {
fn default() -> Self {
Self::new()
}
}
impl BlockingWaitStrategy {
pub fn new() -> Self {
Self {
mutex: std::sync::Mutex::new(()),
condvar: std::sync::Condvar::new(),
}
}
}
impl WaitStrategy for BlockingWaitStrategy {
fn wait_for(
&self,
sequence: i64,
_cursor: &Arc<dyn Sequencer>,
dependent: &Arc<dyn Sequencer>,
barrier: &ProcessingSequenceBarrier,
) -> Result<i64, AlertException> {
let mut available_sequence = dependent.get_cursor();
if available_sequence < sequence {
let mut guard = self.mutex.lock().unwrap();
while dependent.get_cursor() < sequence {
if barrier.is_alerted() {
return Err(AlertException);
}
guard = self.condvar.wait(guard).unwrap();
}
available_sequence = dependent.get_cursor();
}
while available_sequence < sequence {
if barrier.is_alerted() {
return Err(AlertException);
}
available_sequence = dependent.get_cursor();
thread::yield_now();
}
Ok(available_sequence)
}
fn signal_all_when_blocking(&self) {
let _guard = self.mutex.lock().unwrap();
self.condvar.notify_all();
}
}
pub struct ProcessingSequenceBarrier {
wait_strategy: Arc<dyn WaitStrategy>,
dependent_sequencer: Arc<dyn Sequencer>,
cursor_sequencer: Arc<dyn Sequencer>,
alerted: AtomicBool,
}
impl ProcessingSequenceBarrier {
pub fn new(
wait_strategy: Arc<dyn WaitStrategy>,
dependent_sequencer: Arc<dyn Sequencer>,
cursor_sequencer: Arc<dyn Sequencer>,
) -> Self {
Self {
wait_strategy,
dependent_sequencer,
cursor_sequencer,
alerted: AtomicBool::new(false),
}
}
pub fn wait_for(&self, sequence: i64) -> Result<i64, AlertException> {
let available = self.wait_strategy.wait_for(
sequence,
&self.cursor_sequencer,
&self.dependent_sequencer,
self,
)?;
Ok(self
.cursor_sequencer
.get_highest_published_sequence(sequence, available))
}
pub fn is_alerted(&self) -> bool {
self.alerted.load(Ordering::Acquire)
}
pub fn alert(&self) {
self.alerted.store(true, Ordering::Release);
self.wait_strategy.signal_all_when_blocking();
}
pub fn clear_alert(&self) {
self.alerted.store(false, Ordering::Release);
}
}
pub trait EventHandler<T>: Send + Sync {
fn on_event(&self, event: &T, sequence: u64, end_of_batch: bool);
}
pub struct RingBuffer<T> {
buffer: Box<[UnsafeCell<T>]>,
mask: usize,
sequencer: Arc<dyn Sequencer>,
}
unsafe impl<T: Send> Send for RingBuffer<T> {}
unsafe impl<T: Send + Sync> Sync for RingBuffer<T> {}
impl<T> RingBuffer<T> {
pub fn new<F>(factory: F, size: usize, sequencer: Arc<dyn Sequencer>) -> Self
where
F: Fn() -> T,
{
let capacity = size.next_power_of_two();
let mut buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
buffer.push(UnsafeCell::new(factory()));
}
Self {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
sequencer,
}
}
pub fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
self.sequencer.add_gating_sequences(sequences);
}
pub unsafe fn get(&self, sequence: i64) -> &T {
unsafe { &*self.buffer[(sequence as usize) & self.mask].get() }
}
pub fn get_mut(&mut self, sequence: i64) -> &mut T {
unsafe { &mut *self.buffer[(sequence as usize) & self.mask].get() }
}
pub unsafe fn get_unchecked(&self, sequence: i64) -> &T {
unsafe {
&*self
.buffer
.get_unchecked((sequence as usize) & self.mask)
.get()
}
}
#[allow(clippy::mut_from_ref)]
pub unsafe fn get_unchecked_mut(&self, sequence: i64) -> &mut T {
unsafe {
&mut *self
.buffer
.get_unchecked((sequence as usize) & self.mask)
.get()
}
}
pub fn next(&self) -> i64 {
self.sequencer.next()
}
pub fn publish(&self, sequence: i64) {
self.sequencer.publish(sequence);
}
}
pub struct Producer<T> {
ring_buffer: Arc<RingBuffer<T>>,
barriers: Vec<Arc<ProcessingSequenceBarrier>>,
join_handles: Vec<Option<thread::JoinHandle<()>>>,
}
impl<T> Producer<T> {
pub fn publish<F>(&mut self, update: F)
where
F: FnOnce(&mut T),
{
let sequence = self.ring_buffer.next();
let event = unsafe { self.ring_buffer.get_unchecked_mut(sequence) };
update(event);
self.ring_buffer.publish(sequence);
}
}
impl<T> Drop for Producer<T> {
fn drop(&mut self) {
for barrier in &self.barriers {
barrier.alert();
}
for handle in &mut self.join_handles {
if let Some(h) = handle.take() {
let _ = h.join();
}
}
}
}
pub struct BatchEventProcessor<T> {
ring_buffer: Arc<RingBuffer<T>>,
sequence: Arc<Sequence>,
barrier: Arc<ProcessingSequenceBarrier>,
handler: Arc<dyn EventHandler<T>>,
}
impl<T> BatchEventProcessor<T> {
pub fn new(
ring_buffer: Arc<RingBuffer<T>>,
barrier: Arc<ProcessingSequenceBarrier>,
handler: Arc<dyn EventHandler<T>>,
) -> Self {
Self {
ring_buffer,
sequence: Arc::new(Sequence::new(-1)),
barrier,
handler,
}
}
pub fn get_sequence(&self) -> Arc<Sequence> {
self.sequence.clone()
}
pub fn run(&self) {
let mut next_sequence = self.sequence.get() + 1;
loop {
match self.barrier.wait_for(next_sequence) {
Ok(available_sequence) => {
while next_sequence <= available_sequence {
let event = unsafe { self.ring_buffer.get_unchecked(next_sequence) };
self.handler.on_event(
event,
next_sequence as u64,
next_sequence == available_sequence,
);
next_sequence += 1;
}
self.sequence.set(available_sequence);
}
Err(_) => {
if self.barrier.is_alerted() {
break;
}
}
}
}
}
}
pub struct Disruptor<T> {
ring_buffer: Arc<RingBuffer<T>>,
processors: Vec<Arc<BatchEventProcessor<T>>>,
started: bool,
wait_strategy: Arc<dyn WaitStrategy>,
}
pub enum ProducerType {
Single,
Multi,
}
pub struct DisruptorBuilder<T, F> {
factory: F,
buffer_size: usize,
wait_strategy: Arc<dyn WaitStrategy>,
producer_type: ProducerType,
marker: marker<T>,
}
impl<T, F> DisruptorBuilder<T, F>
where
F: Fn() -> T,
{
pub fn new(factory: F) -> Self {
Self {
factory,
buffer_size: 1024,
wait_strategy: Arc::new(BusySpinWaitStrategy),
producer_type: ProducerType::Single,
marker: marker::<T>,
}
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn wait_strategy<W: WaitStrategy + 'static>(mut self, strategy: W) -> Self {
self.wait_strategy = Arc::new(strategy);
self
}
pub fn single_producer(mut self) -> Self {
self.producer_type = ProducerType::Single;
self
}
pub fn multi_producer(mut self) -> Self {
self.producer_type = ProducerType::Multi;
self
}
pub fn build(self) -> Disruptor<T> {
let sequencer: Arc<dyn Sequencer> = match self.producer_type {
ProducerType::Single => Arc::new(SingleProducerSequencer::new(
self.buffer_size,
self.wait_strategy.clone(),
)),
ProducerType::Multi => Arc::new(MultiProducerSequencer::new(
self.buffer_size,
self.wait_strategy.clone(),
)),
};
let ring_buffer = Arc::new(RingBuffer::new(self.factory, self.buffer_size, sequencer));
Disruptor {
ring_buffer,
processors: Vec::new(),
started: false,
wait_strategy: self.wait_strategy,
}
}
}
impl<T: Send + Sync + 'static> Disruptor<T> {
pub fn builder<F>(factory: F) -> DisruptorBuilder<T, F>
where
F: Fn() -> T,
{
DisruptorBuilder::new(factory)
}
pub fn handle_events_with<H: EventHandler<T> + 'static>(&mut self, handler: H) -> &mut Self {
let barrier = Arc::new(ProcessingSequenceBarrier::new(
self.wait_strategy.clone(),
self.ring_buffer.sequencer.clone(), self.ring_buffer.sequencer.clone(),
));
let processor = Arc::new(BatchEventProcessor::new(
self.ring_buffer.clone(),
barrier,
Arc::new(handler),
));
self.processors.push(processor);
self
}
pub fn start(mut self) -> Producer<T> {
let mut gating_sequences = Vec::new();
let mut barriers = Vec::new();
let mut join_handles = Vec::new();
for processor in &self.processors {
gating_sequences.push(processor.get_sequence());
barriers.push(processor.barrier.clone());
let p = processor.clone();
join_handles.push(Some(thread::spawn(move || {
p.run();
})));
}
self.ring_buffer.add_gating_sequences(gating_sequences);
self.started = true;
Producer {
ring_buffer: self.ring_buffer,
barriers,
join_handles,
}
}
}