cog_task/comm/
queue.rs

1use eyre::{eyre, Result};
2use std::collections::VecDeque;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{mpsc, Arc, Mutex};
5
6pub const QUEUE_SIZE: usize = 64;
7pub const MAX_QUEUE_SIZE: usize = 256;
8
9pub type Queue<T> = Arc<Mutex<VecDeque<T>>>;
10pub struct QReader<T>(Queue<T>, Sender<()>, Receiver<()>);
11pub struct QWriter<T>(Queue<T>, Sender<()>);
12
13impl<T> QReader<T> {
14    pub fn new() -> Self {
15        let queue = Arc::new(Mutex::new(VecDeque::with_capacity(QUEUE_SIZE)));
16        let (tx, rx) = mpsc::channel();
17        Self(queue, tx, rx)
18    }
19
20    #[inline(always)]
21    pub fn push(&mut self, msg: impl Into<T>) {
22        let mut queue = self.0.lock().unwrap();
23        if self.1.send(()).is_ok() {
24            queue.push_back(msg.into());
25        }
26    }
27
28    #[inline(always)]
29    pub fn pop(&mut self) -> Option<T> {
30        if self.2.recv().is_ok() {
31            Some(self.0.lock().unwrap().pop_front().unwrap())
32        } else {
33            None
34        }
35    }
36
37    #[inline(always)]
38    pub fn try_pop(&mut self) -> Option<T> {
39        if let Ok(()) = self.2.try_recv() {
40            self.0.lock().unwrap().pop_front()
41        } else {
42            None
43        }
44    }
45
46    #[inline(always)]
47    pub fn poll(&mut self) -> Result<Vec<T>>
48    where
49        T: Eq,
50    {
51        let mut signals = Vec::with_capacity(16);
52        if self.2.recv().is_ok() {
53            let mut queue = self.0.lock().unwrap();
54            loop {
55                let signal = queue.pop_front().unwrap();
56                if !signals.contains(&signal) {
57                    signals.push(signal);
58                }
59                if self.2.try_recv().is_err() {
60                    break;
61                }
62                if signals.len() > MAX_QUEUE_SIZE {
63                    return Err(eyre!("Signal queue exceeded MAX_QUEUE_SIZE."));
64                }
65            }
66
67            Ok(signals)
68        } else {
69            Err(eyre!("Failed to poll. Ending sync queue."))
70        }
71    }
72
73    #[inline(always)]
74    pub fn clear(&mut self) {
75        self.0.lock().unwrap().clear();
76    }
77
78    #[inline(always)]
79    pub fn writer(&self) -> QWriter<T> {
80        QWriter(self.0.clone(), self.1.clone())
81    }
82}
83
84impl<T> Default for QReader<T> {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl<T> QWriter<T> {
91    #[inline(always)]
92    pub fn push(&mut self, msg: impl Into<T>) {
93        let mut queue = self.0.lock().unwrap();
94        if self.1.send(()).is_ok() {
95            queue.push_back(msg.into());
96        }
97    }
98}
99
100impl<T> Clone for QWriter<T> {
101    fn clone(&self) -> Self {
102        Self(self.0.clone(), self.1.clone())
103    }
104}