1mod barrier;
2mod builder;
3mod executor;
4pub mod processor;
5pub mod producer;
6pub mod ringbuffer;
7pub mod sequence;
8pub mod sequencer;
9pub mod traits;
10pub mod utils;
11pub mod waiting;
12
13pub use barrier::*;
14pub use builder::*;
15pub use traits::*;
16
17#[cfg(test)]
18mod tests {
19 use ringbuffer::RingBuffer;
20 use sequence::Sequence;
21 use std::sync::Arc;
22 use traits::{EventHandler, EventProcessorExecutor, EventProducer, ExecutorHandle};
23
24 use super::*;
25
26 struct Checker;
27 impl EventHandler<i64> for Checker {
28 fn on_event(&self, data: &i64, sequence: Sequence, _: bool) {
29 if *data != sequence {
30 dbg!(*data);
31 dbg!(sequence);
32 panic!();
33 }
34 }
35
36 fn on_start(&self) {}
37
38 fn on_shutdown(&self) {}
39 }
40
41 impl EventHandlerMut<i64> for Checker {
42 fn on_event(&mut self, data: &i64, sequence: Sequence, _: bool) {
43 if *data != sequence {
44 dbg!(*data);
45 dbg!(sequence);
46 panic!();
47 }
48 }
49
50 fn on_start(&mut self) {}
51
52 fn on_shutdown(&mut self) {}
53 }
54
55 #[test]
56 fn test_dsl() {
57 let data_provider = Arc::new(RingBuffer::new(4096));
58 let (executor, mut producer) = builder::DisruptorBuilder::new(data_provider)
59 .with_busy_spin_waiting_strategy()
60 .with_single_producer_sequencer()
61 .with_barrier(|b| {
62 b.handle_events(Checker {});
63 })
64 .build();
65
66 let handle = executor.spawn();
67 for _ in 0..10_000 {
68 let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect();
69 producer.write(buffer, |slot, seq, _| {
70 *slot = seq;
71 });
72 }
73 println!("Draining");
74 producer.drain();
75 handle.join();
76 }
77
78 #[test]
79 fn test_dsl_mut() {
80 let data_provider = Arc::new(RingBuffer::new(4096));
81 let (executor, mut producer) = builder::DisruptorBuilder::new(data_provider)
82 .with_busy_spin_waiting_strategy()
83 .with_single_producer_sequencer()
84 .with_barrier(|b| {
85 b.handle_events_mut(Checker {});
86 })
87 .build();
88
89 let handle = executor.spawn();
90 for _ in 0..10_000 {
91 let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect();
92 producer.write(buffer, |slot, seq, _| {
93 *slot = seq;
94 });
95 }
96 println!("Draining");
97 producer.drain();
98 handle.join();
99 }
100}