use spsc_ringbuf_core::shared_pool::*;
const POOL_DEPTH: usize = 16;
pub struct Message {
id: u32,
payload: PoolIndex<POOL_DEPTH>,
}
impl HasPoolIdx<POOL_DEPTH> for Message {
fn get_pool_idx(&self) -> PoolIndex<POOL_DEPTH> {
self.payload
}
fn set_pool_idx(&mut self, pindex: PoolIndex<POOL_DEPTH>) {
self.payload = pindex
}
}
pub struct Payload {
value: u32,
}
#[test]
fn test_errors() {
let shared_pool: SharedPool<Payload, Message, POOL_DEPTH, 32> = SharedPool::new();
let (mut producer, mut consumer) = shared_pool.split().unwrap();
let message = producer.stage().unwrap();
message.id = 41;
assert!(producer.commit().is_ok());
let (recvd, payload) = consumer.peek_with_payload();
let recvd = recvd.unwrap();
assert!(!recvd.get_pool_idx().is_valid());
assert!(payload.is_none());
consumer.pop().unwrap();
for i in 0..16 {
let (_, payload) = producer.stage_with_payload().unwrap();
let inner = payload.try_write().unwrap();
inner.value = i;
payload.write_done().unwrap();
producer.commit().unwrap();
}
assert!(producer.stage_with_payload().is_err());
let (recvd, payload) = consumer.peek_with_payload();
let recvd = recvd.unwrap();
let payload = payload.unwrap();
let pool_idx = recvd.get_pool_idx();
payload.read_done().unwrap();
consumer.pop().unwrap();
assert!(producer.stage_with_payload().is_err());
assert!(producer.stage().is_some());
assert!(consumer.return_payload(pool_idx).is_ok());
let new_stage = producer.stage_with_payload();
match new_stage {
Ok((msg, _)) => assert!(msg.get_pool_idx().is_valid()),
_ => panic!("new stage should have valid payload!")
}
}
static SHARED_POOL: SharedPool<Payload, Message, POOL_DEPTH, 16> = SharedPool::new();
#[test]
fn test_threads() {
use std::thread;
use std::time::Duration;
let (mut producer, mut consumer) = SHARED_POOL.split().unwrap();
let total_transfer = 277;
let c_handle = thread::spawn(move || {
let mut exit = false;
let mut expected_value = 0;
while !exit {
if consumer.peek().is_some() {
let (recvd, payload) = consumer.peek_with_payload();
let recvd = recvd.unwrap();
let payload = payload.unwrap();
let pool_idx = recvd.get_pool_idx();
assert!(payload.try_read().unwrap().value == expected_value);
println!("consume {}", expected_value);
expected_value += 1;
if payload.try_read().unwrap().value == total_transfer - 1 {
exit = true;
}
payload.read_done().unwrap();
consumer.pop().unwrap();
assert!(consumer.return_payload(pool_idx).is_ok());
}
thread::sleep(Duration::from_millis(10));
}
});
let p_handle = thread::spawn(move || {
let mut exit = false;
let mut count = 0;
while !exit {
thread::sleep(Duration::from_millis(1));
if let Ok((_, payload)) = producer.stage_with_payload(){
let inner = payload.try_write().unwrap();
inner.value = count;
println!("produce {}", count);
count += 1;
payload.write_done().unwrap();
producer.commit().unwrap();
if count == total_transfer {
exit = true;
}
}
}
});
assert!(SHARED_POOL.num_free() == POOL_DEPTH as u32);
p_handle.join().unwrap();
c_handle.join().unwrap();
}