1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
use std::sync::{
    atomic::{AtomicI64, Ordering},
    Arc,
};

pub type Sequence = i64;

#[repr(align(64))]
pub struct AtomicSequence {
    _pad: [u8; 56],
    offset: AtomicI64,
}

impl AtomicSequence {
    pub fn get(&self) -> Sequence {
        self.offset.load(Ordering::Acquire)
    }

    pub fn set(&self, value: Sequence) {
        self.offset.store(value, Ordering::Release);
    }
}

impl Default for AtomicSequence {
    fn default() -> Self {
        AtomicSequence::from(-1)
    }
}

impl From<Sequence> for AtomicSequence {
    fn from(offset: Sequence) -> Self {
        AtomicSequence {
            offset: AtomicI64::new(offset),
            _pad: [0u8; 56],
        }
    }
}

pub trait SequenceBarrier: Send + Sync {
    fn wait_for(&self, sequence: Sequence) -> Option<Sequence>;
    fn signal(&self);
}

pub trait Sequencer {
    type Barrier: SequenceBarrier;

    fn next(&self, count: usize) -> (Sequence, Sequence);
    fn publish(&self, highest: Sequence);
    fn create_barrier(&mut self, gating_sequences: Vec<Arc<AtomicSequence>>) -> Self::Barrier;
    fn add_gating_sequence(&mut self, gating_sequence: Arc<AtomicSequence>);
    fn get_cursor(&self) -> Arc<AtomicSequence>;
    fn drain(self);
}

pub trait WaitStrategy: Send + Sync {
    fn new() -> Self;
    fn wait_for<F: Fn() -> bool>(
        &self,
        sequence: Sequence,
        dependencies: &[Arc<AtomicSequence>],
        check_alert: F,
    ) -> Option<Sequence>;
    fn signal(&self);
}

pub trait DataProvider<T>: Sync + Send {
    fn buffer_size(&self) -> usize;
    #[allow(clippy::mut_from_ref)]
    unsafe fn get_mut(&self, sequence: Sequence) -> &mut T;
    unsafe fn get(&self, sequence: Sequence) -> &T;
}

pub trait EventProcessorMut<'a, T> {
    fn prepare<B: SequenceBarrier + 'a, D: DataProvider<T> + 'a>(
        self,
        barrier: B,
        data_provider: Arc<D>,
    ) -> Box<dyn Runnable + 'a>;
    fn get_cursor(&self) -> Arc<AtomicSequence>;
}

pub trait EventProcessor<'a, T>: EventProcessorMut<'a, T> {}

pub trait Runnable: Send {
    fn run(self: Box<Self>);
}

pub trait EventProcessorExecutor<'a> {
    type Handle: ExecutorHandle;
    fn with_runnables(items: Vec<Box<dyn Runnable + 'a>>) -> Self;
    fn spawn(self) -> Self::Handle;
}

pub trait ExecutorHandle {
    fn join(self);
}

pub trait EventProducer<'a> {
    type Item;

    fn write<F, U, I, E>(&self, items: I, f: F)
    where
        I: IntoIterator<Item = U, IntoIter = E>,
        E: ExactSizeIterator<Item = U>,
        F: Fn(&mut Self::Item, Sequence, &U);

    fn drain(self);
}