embedded_mqttc/queue_vec/
mod.rs

1
2#![cfg_attr(not(feature = "std"), no_std)]
3
4use core::cell::RefCell;
5
6use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};
7use split::{QueuedVecInner, WithQueuedVecInner};
8
9pub mod split;
10
11pub const MAX_WAKERS: usize = 4;
12
13pub struct QueuedVec<R: RawMutex, T: 'static, const N: usize> {
14    inner: Mutex<R, RefCell<QueuedVecInner<(), T, N>>>
15}
16
17impl <R: RawMutex, T: 'static, const N: usize> WithQueuedVecInner<(), T, N> for QueuedVec<R, T, N> {
18    fn with_queued_vec_inner<F, O>(&self, operation: F) -> O where F: FnOnce(&mut QueuedVecInner<(), T, N>) -> O {
19        self.inner.lock(|inner| {
20            let mut inner = inner.borrow_mut();
21            operation(&mut inner)
22        })
23    }
24}
25
26impl <R: RawMutex, T: 'static, const N: usize> QueuedVec<R, T, N> {
27
28    pub fn new() -> Self {
29        Self {
30            inner: Mutex::new(RefCell::new(QueuedVecInner::new(())))
31        }
32    }
33
34    /// Remove all elements from the queue which satisfy the remove_where function.
35    /// Every call to next on the returned iterator removes one element and returns it if present
36    pub fn remove<'a, F: FnMut(&T) -> bool>(&'a self, remove_where: F) -> RemoveIterator<'a, R, T, F, N> {
37        RemoveIterator{
38            q: self,
39            remove_where
40        }
41    }
42
43}
44
45pub struct RemoveIterator <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize>{
46    q: &'a QueuedVec<R, T, N>,
47    remove_where: F
48}
49
50impl <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize> Iterator for RemoveIterator<'a, R, T, F, N> {
51    type Item = T;
52
53    fn next(&mut self) -> Option<Self::Item> {
54
55       self.q.with_queued_vec_inner(|inner|{
56            let (inner, _) = inner.working_copy();
57            for i in 0..inner.data.len() {
58                if (self.remove_where)(&inner.data[i]) {
59                    return Some(inner.data.remove(i))
60                }
61            }
62            None
63       })
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    extern crate std;
70    
71    use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
72    use tokio::time::sleep;
73    use core::time::Duration;
74    use std::sync::Arc;
75
76    use super::{split::WithQueuedVecInner, QueuedVec};
77
78    #[tokio::test]
79    async fn test_add() {
80        // let executor = ThreadPool::new().unwrap();
81
82        let q = QueuedVec::<CriticalSectionRawMutex, usize, 4>::new();
83
84        q.push(1).await;
85        q.push(2).await;
86        q.push(3).await;
87        q.push(4).await;
88
89        q.operate(|v| {
90            assert_eq!(&v[..], &[1, 2, 3, 4]);
91        });
92    }
93
94    #[tokio::test]
95    async fn test_wait_add() {
96
97        let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
98        let q2 = q.clone();
99        
100        q.push(1).await;
101        q.push(2).await;
102        q.push(3).await;
103        q.push(4).await;
104
105        tokio::spawn(async move {
106            q2.push(5).await;
107        });
108
109        sleep(Duration::from_millis(15)).await;
110
111        q.operate(|v|{
112            assert_eq!(&v[..], &[1, 2, 3, 4]);
113            v.remove(0);
114        });
115
116        sleep(Duration::from_millis(15)).await;
117        
118        q.operate(|v| {
119            assert_eq!(&v[..], &[2, 3, 4, 5]);
120        });
121    }
122
123    #[tokio::test]
124    async fn test_parallelism() {
125
126        const EXPECTED: usize = 190;
127
128        let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
129
130        let q1 = q.clone();
131        let jh1 = tokio::spawn(async move {
132            for i in 0..10 {
133                q1.push(i * 2).await;
134            }
135        });
136
137        let q2 = q.clone();
138        let jh2 = tokio::spawn(async move {
139            for i in 0..10 {
140                q2.push(i * 2 + 1).await;
141            }
142        });
143
144        let test_future = async {
145            sleep(Duration::from_millis(15)).await;
146
147            let mut n = 0;
148
149            while q.operate(|v| {
150                match v.pop() {
151                    Some(value) => {
152                        n += value;
153                        true
154                    },
155                        None => false,
156                    }
157                }) {
158                    sleep(Duration::from_millis(5)).await;
159                }
160
161            assert_eq!(n, EXPECTED);
162        };
163
164        let (_, r2, r3) = tokio::join!(test_future, jh1, jh2);
165        r2.unwrap();
166        r3.unwrap();
167
168    }
169
170}