zenoh_sync/
fifo_queue.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use tokio::sync::Mutex;
15use zenoh_collections::RingBuffer;
16use zenoh_core::zasynclock;
17
18use crate::Condition;
19
20pub struct FifoQueue<T> {
21    not_empty: Condition,
22    not_full: Condition,
23    buffer: Mutex<RingBuffer<T>>,
24}
25
26impl<T> FifoQueue<T> {
27    pub fn new(capacity: usize) -> FifoQueue<T> {
28        FifoQueue {
29            not_empty: Condition::new(),
30            not_full: Condition::new(),
31            buffer: Mutex::new(RingBuffer::new(capacity)),
32        }
33    }
34
35    pub fn try_push(&self, x: T) -> Option<T> {
36        if let Ok(mut guard) = self.buffer.try_lock() {
37            let res = guard.push(x);
38            if res.is_none() {
39                drop(guard);
40                self.not_empty.notify_one();
41            }
42            return res;
43        }
44        Some(x)
45    }
46
47    pub async fn push(&self, x: T) {
48        loop {
49            let mut guard = zasynclock!(self.buffer);
50            if !guard.is_full() {
51                guard.push(x);
52                drop(guard);
53                self.not_empty.notify_one();
54                return;
55            }
56            self.not_full.wait(guard).await;
57        }
58    }
59
60    pub fn try_pull(&self) -> Option<T> {
61        if let Ok(mut guard) = self.buffer.try_lock() {
62            if let Some(e) = guard.pull() {
63                drop(guard);
64                self.not_full.notify_one();
65                return Some(e);
66            }
67        }
68        None
69    }
70
71    pub async fn pull(&self) -> T {
72        loop {
73            let mut guard = zasynclock!(self.buffer);
74            if let Some(e) = guard.pull() {
75                drop(guard);
76                self.not_full.notify_one();
77                return e;
78            }
79            self.not_empty.wait(guard).await;
80        }
81    }
82}