disrustor 0.3.0

This project is a port of the LMAX Disruptor to Rust
Documentation
use crate::{consumer::*, executor::*, prelude::*, producer::*, ringbuffer::*, wait::*};
use std::{marker::PhantomData, sync::Arc};

#[derive(Debug)]
pub struct DisrustorBuilder {}

pub struct WithDataProvider<D: DataProvider<T>, T>
where
    T: Send + Sync,
{
    data_provider: Arc<D>,
    _element: PhantomData<T>,
}

pub struct WithWaitStrategy<W: WaitStrategy, D: DataProvider<T>, T>
where
    T: Send + Sync,
{
    with_data_provider: WithDataProvider<D, T>,
    _wait_strategy: PhantomData<W>,
}

pub struct WithSequencer<S: Sequencer, W: WaitStrategy, D: DataProvider<T>, T>
where
    T: Send + Sync,
{
    with_wait_strategy: WithWaitStrategy<W, D, T>,
    sequencer: S,
}

pub struct BarrierScope<'a, S: Sequencer, D: DataProvider<T>, T> {
    sequencer: S,
    data_provider: Arc<D>,
    gating_sequences: Vec<Arc<AtomicSequence>>,
    cursors: Vec<Arc<AtomicSequence>>,
    event_handlers: Vec<Box<dyn Runnable + 'a>>,
    _element: PhantomData<T>,
}

pub struct WithEventHandlers<'a, S: Sequencer, W: WaitStrategy, D: DataProvider<T>, T>
where
    T: Send + Sync,
{
    with_sequencer: WithSequencer<S, W, D, T>,
    event_handlers: Vec<Box<dyn Runnable + 'a>>,
    gating_sequences: Vec<Arc<AtomicSequence>>,
}

impl DisrustorBuilder {
    #[allow(clippy::new_ret_no_self)]
    pub fn new<D: DataProvider<T>, T>(data_provider: Arc<D>) -> WithDataProvider<D, T>
    where
        T: Send + Sync,
    {
        WithDataProvider {
            data_provider,
            _element: Default::default(),
        }
    }

    pub fn with_ring_buffer<T: Default>(capacity: usize) -> WithDataProvider<RingBuffer<T>, T>
    where
        T: Send + Sync,
    {
        Self::new(Arc::new(RingBuffer::new(capacity)))
    }
}

impl<D: DataProvider<T>, T> WithDataProvider<D, T>
where
    T: Send + Sync,
{
    pub fn with_wait_strategy<W: WaitStrategy>(self) -> WithWaitStrategy<W, D, T> {
        WithWaitStrategy {
            with_data_provider: self,
            _wait_strategy: Default::default(),
        }
    }

    pub fn with_blocking_wait(self) -> WithWaitStrategy<BlockingWaitStrategy, D, T> {
        self.with_wait_strategy()
    }

    pub fn with_spin_wait(self) -> WithWaitStrategy<SpinLoopWaitStrategy, D, T> {
        self.with_wait_strategy()
    }
}

impl<W: WaitStrategy, D: DataProvider<T>, T> WithWaitStrategy<W, D, T>
where
    T: Send + Sync,
{
    pub fn with_sequencer<S: Sequencer>(self, sequencer: S) -> WithSequencer<S, W, D, T> {
        WithSequencer {
            with_wait_strategy: self,
            sequencer,
        }
    }

    pub fn with_single_producer(self) -> WithSequencer<SingleProducerSequencer<W>, W, D, T> {
        let buffer_size = self.with_data_provider.data_provider.buffer_size();
        self.with_sequencer(SingleProducerSequencer::new(buffer_size, W::new()))
    }

    pub fn with_multi_producer(self) -> WithSequencer<MultiProducerSequencer<W>, W, D, T> {
        let buffer_size = self.with_data_provider.data_provider.buffer_size();
        self.with_sequencer(MultiProducerSequencer::new(buffer_size, W::new()))
    }
}

impl<'a, S: Sequencer + 'a, W: WaitStrategy, D: DataProvider<T> + 'a, T: Send + Sync + 'a>
    WithSequencer<S, W, D, T>
{
    pub fn with_barrier(
        mut self,
        f: impl FnOnce(&mut BarrierScope<'a, S, D, T>),
    ) -> WithEventHandlers<'a, S, W, D, T> {
        let cursor = self.sequencer.get_cursor();
        let mut scope = BarrierScope {
            sequencer: self.sequencer,
            data_provider: self
                .with_wait_strategy
                .with_data_provider
                .data_provider
                .clone(),
            gating_sequences: vec![cursor],
            event_handlers: Vec::new(),
            cursors: Vec::new(),
            _element: Default::default(),
        };

        f(&mut scope);
        self.sequencer = scope.sequencer;

        WithEventHandlers {
            with_sequencer: self,
            event_handlers: scope.event_handlers,
            gating_sequences: scope.cursors,
        }
    }
}

impl<'a, S: Sequencer + 'a, D: DataProvider<T> + 'a, T: Send + 'a> BarrierScope<'a, S, D, T> {
    pub fn handle_events<F>(&mut self, handler: F)
    where
        F: Fn(&T, Sequence, bool) + Send + 'static,
    {
        self.handle_events_with(BatchEventProcessor::create(handler))
    }

    pub fn handle_events_mut<F>(&mut self, handler: F)
    where
        F: Fn(&mut T, Sequence, bool) + Send + 'static,
    {
        self.handle_events_with(BatchEventProcessor::create_mut(handler))
    }

    pub fn handle_events_with<E: EventProcessorMut<'a, T>>(&mut self, processor: E) {
        self.cursors.push(processor.get_cursor());
        let barrier = self.sequencer.create_barrier(&self.gating_sequences);

        let runnable = processor.prepare(barrier, self.data_provider.clone());
        self.event_handlers.push(runnable);
    }

    pub fn with_barrier(mut self, f: impl FnOnce(&mut BarrierScope<'a, S, D, T>)) {
        let mut scope = BarrierScope {
            sequencer: self.sequencer,
            data_provider: self.data_provider.clone(),
            gating_sequences: self.cursors,
            event_handlers: Vec::new(),
            cursors: Vec::new(),
            _element: Default::default(),
        };

        f(&mut scope);
        self.event_handlers.append(&mut scope.event_handlers);
    }
}

impl<'a, S: Sequencer + 'a, W: WaitStrategy, D: DataProvider<T> + 'a, T: Send + Sync + 'a>
    WithEventHandlers<'a, S, W, D, T>
{
    pub fn with_barrier(mut self, f: impl FnOnce(&mut BarrierScope<'a, S, D, T>)) -> Self {
        let mut scope = BarrierScope {
            gating_sequences: self.gating_sequences.clone(),
            cursors: Vec::new(),
            sequencer: self.with_sequencer.sequencer,
            data_provider: self
                .with_sequencer
                .with_wait_strategy
                .with_data_provider
                .data_provider
                .clone(),
            event_handlers: Vec::new(),
            _element: Default::default(),
        };

        f(&mut scope);
        self.with_sequencer.sequencer = scope.sequencer;
        self.event_handlers.append(&mut scope.event_handlers);
        self.gating_sequences = scope.cursors;

        self
    }

    pub fn build(
        self,
    ) -> (
        impl EventProcessorExecutor<'a>,
        impl EventProducer<'a, Item = T>,
    ) {
        self.build_with_executor::<ThreadedExecutor<'a>>()
    }

    pub fn build_with_executor<E: EventProcessorExecutor<'a>>(
        mut self,
    ) -> (E, impl EventProducer<'a, Item = T>) {
        for gs in &self.gating_sequences {
            self.with_sequencer.sequencer.add_gating_sequence(gs);
        }
        let executor = E::with_runnables(self.event_handlers);
        let producer = Producer::new(
            self.with_sequencer
                .with_wait_strategy
                .with_data_provider
                .data_provider
                .clone(),
            self.with_sequencer.sequencer,
        );
        (executor, producer)
    }
}