1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
use std::sync::{ atomic::{AtomicI64, Ordering}, Arc, }; pub type Sequence = i64; #[repr(align(64))] pub struct AtomicSequence { _pad: [u8; 56], offset: AtomicI64, } impl AtomicSequence { pub fn get(&self) -> Sequence { self.offset.load(Ordering::Acquire) } pub fn set(&self, value: Sequence) { self.offset.store(value, Ordering::Release); } } impl Default for AtomicSequence { fn default() -> Self { AtomicSequence::from(-1) } } impl From<Sequence> for AtomicSequence { fn from(offset: Sequence) -> Self { AtomicSequence { offset: AtomicI64::new(offset), _pad: [0u8; 56], } } } pub trait SequenceBarrier: Send + Sync { fn wait_for(&self, sequence: Sequence) -> Option<Sequence>; fn signal(&self); } pub trait Sequencer { type Barrier: SequenceBarrier; fn next(&self, count: usize) -> (Sequence, Sequence); fn publish(&self, highest: Sequence); fn create_barrier(&mut self, gating_sequences: Vec<Arc<AtomicSequence>>) -> Self::Barrier; fn add_gating_sequence(&mut self, gating_sequence: Arc<AtomicSequence>); fn get_cursor(&self) -> Arc<AtomicSequence>; fn drain(self); } pub trait WaitStrategy: Send + Sync { fn new() -> Self; fn wait_for<F: Fn() -> bool>( &self, sequence: Sequence, dependencies: &[Arc<AtomicSequence>], check_alert: F, ) -> Option<Sequence>; fn signal(&self); } pub trait DataProvider<T>: Sync + Send { fn buffer_size(&self) -> usize; #[allow(clippy::mut_from_ref)] unsafe fn get_mut(&self, sequence: Sequence) -> &mut T; unsafe fn get(&self, sequence: Sequence) -> &T; } pub trait EventProcessorMut<'a, T> { fn prepare<B: SequenceBarrier + 'a, D: DataProvider<T> + 'a>( self, barrier: B, data_provider: Arc<D>, ) -> Box<dyn Runnable + 'a>; fn get_cursor(&self) -> Arc<AtomicSequence>; } pub trait EventProcessor<'a, T>: EventProcessorMut<'a, T> {} pub trait Runnable: Send { fn run(self: Box<Self>); } pub trait EventProcessorExecutor<'a> { type Handle: ExecutorHandle; fn with_runnables(items: Vec<Box<dyn Runnable + 'a>>) -> Self; fn spawn(self) -> Self::Handle; } pub trait ExecutorHandle { fn join(self); } pub trait EventProducer<'a> { type Item; fn write<F, U, I, E>(&self, items: I, f: F) where I: IntoIterator<Item = U, IntoIter = E>, E: ExactSizeIterator<Item = U>, F: Fn(&mut Self::Item, Sequence, &U); fn drain(self); }