zenoh_sync/
lifo_queue.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::sync::{Condvar, Mutex};
15
16use zenoh_collections::StackBuffer;
17use zenoh_core::zlock;
18
19pub struct LifoQueue<T> {
20    not_empty: Condvar,
21    not_full: Condvar,
22    buffer: Mutex<StackBuffer<T>>,
23}
24
25impl<T> LifoQueue<T> {
26    pub fn new(capacity: usize) -> LifoQueue<T> {
27        LifoQueue {
28            not_empty: Condvar::new(),
29            not_full: Condvar::new(),
30            buffer: Mutex::new(StackBuffer::new(capacity)),
31        }
32    }
33
34    pub fn try_push(&self, x: T) -> Option<T> {
35        if let Ok(mut guard) = self.buffer.try_lock() {
36            let res = guard.push(x);
37            if res.is_none() {
38                drop(guard);
39                self.not_empty.notify_one();
40            }
41            return res;
42        }
43        Some(x)
44    }
45
46    pub fn push(&self, x: T) {
47        let mut guard = zlock!(self.buffer);
48        loop {
49            if !guard.is_full() {
50                guard.push(x);
51                drop(guard);
52                self.not_empty.notify_one();
53                return;
54            }
55            guard = self.not_full.wait(guard).unwrap();
56        }
57    }
58
59    pub fn try_pull(&self) -> Option<T> {
60        if let Ok(mut guard) = self.buffer.try_lock() {
61            if let Some(e) = guard.pop() {
62                drop(guard);
63                self.not_full.notify_one();
64                return Some(e);
65            }
66        }
67        None
68    }
69
70    pub fn pull(&self) -> T {
71        let mut guard = zlock!(self.buffer);
72        loop {
73            if let Some(e) = guard.pop() {
74                drop(guard);
75                self.not_full.notify_one();
76                return e;
77            }
78            guard = self.not_empty.wait(guard).unwrap();
79        }
80    }
81}