disruptor_rs/
lib.rs

1mod barrier;
2mod builder;
3mod executor;
4pub mod processor;
5pub mod producer;
6pub mod ringbuffer;
7pub mod sequence;
8pub mod sequencer;
9pub mod traits;
10pub mod utils;
11pub mod waiting;
12
13pub use barrier::*;
14pub use builder::*;
15pub use traits::*;
16
17#[cfg(test)]
18mod tests {
19    use ringbuffer::RingBuffer;
20    use sequence::Sequence;
21    use std::sync::Arc;
22    use traits::{EventHandler, EventProcessorExecutor, EventProducer, ExecutorHandle};
23
24    use super::*;
25
26    struct Checker;
27    impl EventHandler<i64> for Checker {
28        fn on_event(&self, data: &i64, sequence: Sequence, _: bool) {
29            if *data != sequence {
30                dbg!(*data);
31                dbg!(sequence);
32                panic!();
33            }
34        }
35
36        fn on_start(&self) {}
37
38        fn on_shutdown(&self) {}
39    }
40
41    impl EventHandlerMut<i64> for Checker {
42        fn on_event(&mut self, data: &i64, sequence: Sequence, _: bool) {
43            if *data != sequence {
44                dbg!(*data);
45                dbg!(sequence);
46                panic!();
47            }
48        }
49
50        fn on_start(&mut self) {}
51
52        fn on_shutdown(&mut self) {}
53    }
54
55    #[test]
56    fn test_dsl() {
57        let data_provider = Arc::new(RingBuffer::new(4096));
58        let (executor, mut producer) = builder::DisruptorBuilder::new(data_provider)
59            .with_busy_spin_waiting_strategy()
60            .with_single_producer_sequencer()
61            .with_barrier(|b| {
62                b.handle_events(Checker {});
63            })
64            .build();
65
66        let handle = executor.spawn();
67        for _ in 0..10_000 {
68            let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect();
69            producer.write(buffer, |slot, seq, _| {
70                *slot = seq;
71            });
72        }
73        println!("Draining");
74        producer.drain();
75        handle.join();
76    }
77
78    #[test]
79    fn test_dsl_mut() {
80        let data_provider = Arc::new(RingBuffer::new(4096));
81        let (executor, mut producer) = builder::DisruptorBuilder::new(data_provider)
82            .with_busy_spin_waiting_strategy()
83            .with_single_producer_sequencer()
84            .with_barrier(|b| {
85                b.handle_events_mut(Checker {});
86            })
87            .build();
88
89        let handle = executor.spawn();
90        for _ in 0..10_000 {
91            let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect();
92            producer.write(buffer, |slot, seq, _| {
93                *slot = seq;
94            });
95        }
96        println!("Draining");
97        producer.drain();
98        handle.join();
99    }
100}