1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use crate::Condition;
use tokio::sync::Mutex;
use zenoh_collections::RingBuffer;
use zenoh_core::zasynclock;

pub struct FifoQueue<T> {
    not_empty: Condition,
    not_full: Condition,
    buffer: Mutex<RingBuffer<T>>,
}

impl<T> FifoQueue<T> {
    pub fn new(capacity: usize) -> FifoQueue<T> {
        FifoQueue {
            not_empty: Condition::new(),
            not_full: Condition::new(),
            buffer: Mutex::new(RingBuffer::new(capacity)),
        }
    }

    pub fn try_push(&self, x: T) -> Option<T> {
        if let Ok(mut guard) = self.buffer.try_lock() {
            let res = guard.push(x);
            if res.is_none() {
                drop(guard);
                self.not_empty.notify_one();
            }
            return res;
        }
        Some(x)
    }

    pub async fn push(&self, x: T) {
        loop {
            let mut guard = zasynclock!(self.buffer);
            if !guard.is_full() {
                guard.push(x);
                drop(guard);
                self.not_empty.notify_one();
                return;
            }
            self.not_full.wait(guard).await;
        }
    }

    pub fn try_pull(&self) -> Option<T> {
        if let Ok(mut guard) = self.buffer.try_lock() {
            if let Some(e) = guard.pull() {
                drop(guard);
                self.not_full.notify_one();
                return Some(e);
            }
        }
        None
    }

    pub async fn pull(&self) -> T {
        loop {
            let mut guard = zasynclock!(self.buffer);
            if let Some(e) = guard.pull() {
                drop(guard);
                self.not_full.notify_one();
                return e;
            }
            self.not_empty.wait(guard).await;
        }
    }
}