1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
mod barrier; mod consumer; mod dsl; mod executor; mod prelude; mod producer; mod ringbuffer; mod utils; mod wait; pub use dsl::*; pub use prelude::*; pub mod internal { pub use super::barrier::*; pub use super::consumer::*; pub use super::executor::*; pub use super::producer::*; pub use super::ringbuffer::*; pub use super::wait::*; } #[cfg(test)] mod test { use super::internal::*; use super::*; use std::sync::Arc; #[test] fn test_blocking_wait_strategy() { let ring_buffer: Arc<RingBuffer<i64>> = Arc::new(RingBuffer::new(4096)); let wait_strategy = BlockingWaitStrategy::new(); let mut sequencer = SingleProducerSequencer::new(ring_buffer.buffer_size(), wait_strategy); let barrier = sequencer.create_barrier(vec![sequencer.get_cursor()]); let consumer = BatchEventProcessor::create(|data, sequence, _| { if *data != sequence { dbg!(*data); dbg!(sequence); panic!(); } }); sequencer.add_gating_sequence(consumer.get_cursor()); let data_provider = ring_buffer.clone(); let producer = Producer::new(data_provider.clone(), sequencer); let executor = ThreadedExecutor::with_runnables(vec![consumer.prepare(barrier, data_provider)]); let handle = executor.spawn(); for _ in 0..10_000 { let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect(); producer.write(buffer, |slot, seq, _| { *slot = seq; }); } producer.drain(); handle.join(); } #[test] fn test_dsl() { let ring_buffer: Arc<RingBuffer<i64>> = Arc::new(RingBuffer::new(4096)); let (executor, producer) = dsl::DisrustorBuilder::new(ring_buffer.clone()) .with_blocking_wait() .with_single_producer() .with_barrier(|b| { b.handle_events_mut(|data, sequence, _| { if *data != sequence { dbg!(*data); dbg!(sequence); panic!(); } }); }) .build(); let handle = executor.spawn(); for _ in 0..10_000 { let buffer: Vec<_> = std::iter::repeat(1).take(1000).collect(); producer.write(buffer, |slot, seq, _| { *slot = seq; }); } producer.drain(); handle.join(); } }