1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
mod barrier;
mod consumer;
mod dsl;
mod executor;
mod prelude;
mod producer;
mod ringbuffer;
mod utils;
mod wait;

pub use dsl::*;
pub use prelude::*;
pub mod internal {
    pub use super::barrier::*;
    pub use super::consumer::*;
    pub use super::executor::*;
    pub use super::producer::*;
    pub use super::ringbuffer::*;
    pub use super::wait::*;
}

#[cfg(test)]
mod test {
    use super::internal::*;
    use super::*;
    use std::sync::Arc;

    #[test]
    fn test_blocking_wait_strategy() {
        let ring_buffer: Arc<RingBuffer<i64>> = Arc::new(RingBuffer::new(4096));
        let wait_strategy = BlockingWaitStrategy::new();
        let mut sequencer = SingleProducerSequencer::new(ring_buffer.buffer_size(), wait_strategy);

        let barrier = sequencer.create_barrier(vec![sequencer.get_cursor()]);
        let consumer = BatchEventProcessor::create(|data, sequence, _| {
            if *data != sequence {
                dbg!(*data);
                dbg!(sequence);
                panic!();
            }
        });

        sequencer.add_gating_sequence(consumer.get_cursor());
        let data_provider = ring_buffer.clone();
        let producer = Producer::new(data_provider.clone(), sequencer);

        let executor =
            ThreadedExecutor::with_runnables(vec![consumer.prepare(barrier, data_provider)]);
        let handle = executor.spawn();

        for _ in 0..10_000 {
            let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect();
            producer.write(buffer, |slot, seq, _| {
                *slot = seq;
            });
        }

        producer.drain();
        handle.join();
    }

    #[test]
    fn test_dsl() {
        let ring_buffer: Arc<RingBuffer<i64>> = Arc::new(RingBuffer::new(4096));
        let (executor, producer) = dsl::DisrustorBuilder::new(ring_buffer.clone())
            .with_blocking_wait()
            .with_single_producer()
            .with_barrier(|b| {
                b.handle_events_mut(|data, sequence, _| {
                    if *data != sequence {
                        dbg!(*data);
                        dbg!(sequence);
                        panic!();
                    }
                });
            })
            .build();

        let handle = executor.spawn();
        for _ in 0..10_000 {
            let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect();
            producer.write(buffer, |slot, seq, _| {
                *slot = seq;
            });
        }
        producer.drain();
        handle.join();
    }
}