embedded_mqttc/queue_vec/
mod.rs

1
2use core::cell::RefCell;
3
4use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};
5use split::{QueuedVecInner, WithQueuedVecInner};
6
7pub mod split;
8
9pub const MAX_WAKERS: usize = 4;
10
11pub struct QueuedVec<R: RawMutex, T: 'static, const N: usize> {
12    inner: Mutex<R, RefCell<QueuedVecInner<(), T, N>>>
13}
14
15impl <R: RawMutex, T: 'static, const N: usize> WithQueuedVecInner<(), T, N> for QueuedVec<R, T, N> {
16    fn with_queued_vec_inner<F, O>(&self, operation: F) -> O where F: FnOnce(&mut QueuedVecInner<(), T, N>) -> O {
17        self.inner.lock(|inner| {
18            let mut inner = inner.borrow_mut();
19            operation(&mut inner)
20        })
21    }
22}
23
24impl <R: RawMutex, T: 'static, const N: usize> QueuedVec<R, T, N> {
25
26    pub fn new() -> Self {
27        Self {
28            inner: Mutex::new(RefCell::new(QueuedVecInner::new(())))
29        }
30    }
31
32    /// Remove all elements from the queue which satisfy the remove_where function.
33    /// Every call to next on the returned iterator removes one element and returns it if present
34    pub fn remove<'a, F: FnMut(&T) -> bool>(&'a self, remove_where: F) -> RemoveIterator<'a, R, T, F, N> {
35        RemoveIterator{
36            q: self,
37            remove_where
38        }
39    }
40
41}
42
43pub struct RemoveIterator <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize>{
44    q: &'a QueuedVec<R, T, N>,
45    remove_where: F
46}
47
48impl <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize> Iterator for RemoveIterator<'a, R, T, F, N> {
49    type Item = T;
50
51    fn next(&mut self) -> Option<Self::Item> {
52
53       self.q.with_queued_vec_inner(|inner|{
54            let (inner, _) = inner.working_copy();
55            for i in 0..inner.data.len() {
56                if (self.remove_where)(&inner.data[i]) {
57                    return Some(inner.data.remove(i))
58                }
59            }
60            None
61       })
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    extern crate std;
68    
69    use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
70    use tokio::time::sleep;
71    use core::time::Duration;
72    use std::sync::Arc;
73
74    use super::{split::WithQueuedVecInner, QueuedVec};
75
76    #[tokio::test]
77    async fn test_add() {
78        // let executor = ThreadPool::new().unwrap();
79
80        let q = QueuedVec::<CriticalSectionRawMutex, usize, 4>::new();
81
82        q.push(1).await;
83        q.push(2).await;
84        q.push(3).await;
85        q.push(4).await;
86
87        q.operate(|v| {
88            assert_eq!(&v[..], &[1, 2, 3, 4]);
89        });
90    }
91
92    #[tokio::test]
93    async fn test_wait_add() {
94
95        let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
96        let q2 = q.clone();
97        
98        q.push(1).await;
99        q.push(2).await;
100        q.push(3).await;
101        q.push(4).await;
102
103        tokio::spawn(async move {
104            q2.push(5).await;
105        });
106
107        sleep(Duration::from_millis(15)).await;
108
109        q.operate(|v|{
110            assert_eq!(&v[..], &[1, 2, 3, 4]);
111            v.remove(0);
112        });
113
114        sleep(Duration::from_millis(15)).await;
115        
116        q.operate(|v| {
117            assert_eq!(&v[..], &[2, 3, 4, 5]);
118        });
119    }
120
121    #[tokio::test]
122    async fn test_parallelism() {
123
124        const EXPECTED: usize = 190;
125
126        let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
127
128        let q1 = q.clone();
129        let jh1 = tokio::spawn(async move {
130            for i in 0..10 {
131                q1.push(i * 2).await;
132            }
133        });
134
135        let q2 = q.clone();
136        let jh2 = tokio::spawn(async move {
137            for i in 0..10 {
138                q2.push(i * 2 + 1).await;
139            }
140        });
141
142        let test_future = async {
143            sleep(Duration::from_millis(15)).await;
144
145            let mut n = 0;
146
147            while q.operate(|v| {
148                match v.pop() {
149                    Some(value) => {
150                        n += value;
151                        true
152                    },
153                        None => false,
154                    }
155                }) {
156                    sleep(Duration::from_millis(5)).await;
157                }
158
159            assert_eq!(n, EXPECTED);
160        };
161
162        let (_, r2, r3) = tokio::join!(test_future, jh1, jh2);
163        r2.unwrap();
164        r3.unwrap();
165
166    }
167
168}