1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::thread;
use std::time::Duration;
use mutringbuf::ConcurrentHeapRB;

const RB_SIZE: usize = 10;

fn f() {
    let buf = ConcurrentHeapRB::from(vec![0; RB_SIZE]);
    let (mut prod, mut cons) = buf.split();

    // Flag variable to stop threads
    let stop = Arc::new(AtomicBool::new(false));

    let stop_clone = stop.clone();
    // An infinite stream of data
    let producer = thread::spawn(move || {
        let mut produced = vec![];
        let mut counter = 0;

        while !stop_clone.load(Acquire) {
            if prod.push(counter).is_ok() {

                // Store produced values to check them later
                produced.push(counter);

                // Reset counter to avoid overflow
                if counter < u8::MAX { counter += 1; } else { counter = 0; }
            }
        }

        // Iterator has to be returned here, as it was moved at the beginning of the thread
        (prod, produced)
    });

    let stop_clone = stop.clone();
    let consumer = thread::spawn(move || {
        let mut consumed = vec![];

        while !stop_clone.load(Acquire) {
            // Store consumed values to check them later
            if let Some(value) = cons.pop() { consumed.push(value); }
        }

        // Iterator has to be returned here, as it was moved at the beginning of the thread
        (cons, consumed)
    });

    // Let threads run for a while...
    thread::sleep(Duration::from_millis(1));
    // Stop threads
    stop.store(true, Release);

    let (_prod, produced) = producer.join().unwrap();
    let (mut cons, mut consumed) = consumer.join().unwrap();

    // Consume the remaining part of the buffer
    if let Some((head, tail)) = cons.peek_available() {
        consumed.extend_from_slice(head);
        consumed.extend_from_slice(tail);
    }

    assert_eq!(produced, consumed)
}

fn main() {
   for _ in 0 .. 1000 {
       f();
   }
}