1use std::sync::{Condvar, Mutex};
15
16use zenoh_collections::StackBuffer;
17use zenoh_core::zlock;
18
19pub struct LifoQueue<T> {
20 not_empty: Condvar,
21 not_full: Condvar,
22 buffer: Mutex<StackBuffer<T>>,
23}
24
25impl<T> LifoQueue<T> {
26 pub fn new(capacity: usize) -> LifoQueue<T> {
27 LifoQueue {
28 not_empty: Condvar::new(),
29 not_full: Condvar::new(),
30 buffer: Mutex::new(StackBuffer::new(capacity)),
31 }
32 }
33
34 pub fn try_push(&self, x: T) -> Option<T> {
35 if let Ok(mut guard) = self.buffer.try_lock() {
36 let res = guard.push(x);
37 if res.is_none() {
38 drop(guard);
39 self.not_empty.notify_one();
40 }
41 return res;
42 }
43 Some(x)
44 }
45
46 pub fn push(&self, x: T) {
47 let mut guard = zlock!(self.buffer);
48 loop {
49 if !guard.is_full() {
50 guard.push(x);
51 drop(guard);
52 self.not_empty.notify_one();
53 return;
54 }
55 guard = self.not_full.wait(guard).unwrap();
56 }
57 }
58
59 pub fn try_pull(&self) -> Option<T> {
60 if let Ok(mut guard) = self.buffer.try_lock() {
61 if let Some(e) = guard.pop() {
62 drop(guard);
63 self.not_full.notify_one();
64 return Some(e);
65 }
66 }
67 None
68 }
69
70 pub fn pull(&self) -> T {
71 let mut guard = zlock!(self.buffer);
72 loop {
73 if let Some(e) = guard.pop() {
74 drop(guard);
75 self.not_full.notify_one();
76 return e;
77 }
78 guard = self.not_empty.wait(guard).unwrap();
79 }
80 }
81}