1use tokio::sync::Mutex;
15use zenoh_collections::RingBuffer;
16use zenoh_core::zasynclock;
17
18use crate::Condition;
19
20pub struct FifoQueue<T> {
21 not_empty: Condition,
22 not_full: Condition,
23 buffer: Mutex<RingBuffer<T>>,
24}
25
26impl<T> FifoQueue<T> {
27 pub fn new(capacity: usize) -> FifoQueue<T> {
28 FifoQueue {
29 not_empty: Condition::new(),
30 not_full: Condition::new(),
31 buffer: Mutex::new(RingBuffer::new(capacity)),
32 }
33 }
34
35 pub fn try_push(&self, x: T) -> Option<T> {
36 if let Ok(mut guard) = self.buffer.try_lock() {
37 let res = guard.push(x);
38 if res.is_none() {
39 drop(guard);
40 self.not_empty.notify_one();
41 }
42 return res;
43 }
44 Some(x)
45 }
46
47 pub async fn push(&self, x: T) {
48 loop {
49 let mut guard = zasynclock!(self.buffer);
50 if !guard.is_full() {
51 guard.push(x);
52 drop(guard);
53 self.not_empty.notify_one();
54 return;
55 }
56 self.not_full.wait(guard).await;
57 }
58 }
59
60 pub fn try_pull(&self) -> Option<T> {
61 if let Ok(mut guard) = self.buffer.try_lock() {
62 if let Some(e) = guard.pull() {
63 drop(guard);
64 self.not_full.notify_one();
65 return Some(e);
66 }
67 }
68 None
69 }
70
71 pub async fn pull(&self) -> T {
72 loop {
73 let mut guard = zasynclock!(self.buffer);
74 if let Some(e) = guard.pull() {
75 drop(guard);
76 self.not_full.notify_one();
77 return e;
78 }
79 self.not_empty.wait(guard).await;
80 }
81 }
82}