embedded_mqttc/queue_vec/
mod.rs1
2use core::cell::RefCell;
3
4use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};
5use split::{QueuedVecInner, WithQueuedVecInner};
6
7pub mod split;
8
9pub const MAX_WAKERS: usize = 4;
10
11pub struct QueuedVec<R: RawMutex, T: 'static, const N: usize> {
12 inner: Mutex<R, RefCell<QueuedVecInner<(), T, N>>>
13}
14
15impl <R: RawMutex, T: 'static, const N: usize> WithQueuedVecInner<(), T, N> for QueuedVec<R, T, N> {
16 fn with_queued_vec_inner<F, O>(&self, operation: F) -> O where F: FnOnce(&mut QueuedVecInner<(), T, N>) -> O {
17 self.inner.lock(|inner| {
18 let mut inner = inner.borrow_mut();
19 operation(&mut inner)
20 })
21 }
22}
23
24impl <R: RawMutex, T: 'static, const N: usize> QueuedVec<R, T, N> {
25
26 pub fn new() -> Self {
27 Self {
28 inner: Mutex::new(RefCell::new(QueuedVecInner::new(())))
29 }
30 }
31
32 pub fn remove<'a, F: FnMut(&T) -> bool>(&'a self, remove_where: F) -> RemoveIterator<'a, R, T, F, N> {
35 RemoveIterator{
36 q: self,
37 remove_where
38 }
39 }
40
41}
42
43pub struct RemoveIterator <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize>{
44 q: &'a QueuedVec<R, T, N>,
45 remove_where: F
46}
47
48impl <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize> Iterator for RemoveIterator<'a, R, T, F, N> {
49 type Item = T;
50
51 fn next(&mut self) -> Option<Self::Item> {
52
53 self.q.with_queued_vec_inner(|inner|{
54 let (inner, _) = inner.working_copy();
55 for i in 0..inner.data.len() {
56 if (self.remove_where)(&inner.data[i]) {
57 return Some(inner.data.remove(i))
58 }
59 }
60 None
61 })
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 extern crate std;
68
69 use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
70 use tokio::time::sleep;
71 use core::time::Duration;
72 use std::sync::Arc;
73
74 use super::{split::WithQueuedVecInner, QueuedVec};
75
76 #[tokio::test]
77 async fn test_add() {
78 let q = QueuedVec::<CriticalSectionRawMutex, usize, 4>::new();
81
82 q.push(1).await;
83 q.push(2).await;
84 q.push(3).await;
85 q.push(4).await;
86
87 q.operate(|v| {
88 assert_eq!(&v[..], &[1, 2, 3, 4]);
89 });
90 }
91
92 #[tokio::test]
93 async fn test_wait_add() {
94
95 let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
96 let q2 = q.clone();
97
98 q.push(1).await;
99 q.push(2).await;
100 q.push(3).await;
101 q.push(4).await;
102
103 tokio::spawn(async move {
104 q2.push(5).await;
105 });
106
107 sleep(Duration::from_millis(15)).await;
108
109 q.operate(|v|{
110 assert_eq!(&v[..], &[1, 2, 3, 4]);
111 v.remove(0);
112 });
113
114 sleep(Duration::from_millis(15)).await;
115
116 q.operate(|v| {
117 assert_eq!(&v[..], &[2, 3, 4, 5]);
118 });
119 }
120
121 #[tokio::test]
122 async fn test_parallelism() {
123
124 const EXPECTED: usize = 190;
125
126 let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
127
128 let q1 = q.clone();
129 let jh1 = tokio::spawn(async move {
130 for i in 0..10 {
131 q1.push(i * 2).await;
132 }
133 });
134
135 let q2 = q.clone();
136 let jh2 = tokio::spawn(async move {
137 for i in 0..10 {
138 q2.push(i * 2 + 1).await;
139 }
140 });
141
142 let test_future = async {
143 sleep(Duration::from_millis(15)).await;
144
145 let mut n = 0;
146
147 while q.operate(|v| {
148 match v.pop() {
149 Some(value) => {
150 n += value;
151 true
152 },
153 None => false,
154 }
155 }) {
156 sleep(Duration::from_millis(5)).await;
157 }
158
159 assert_eq!(n, EXPECTED);
160 };
161
162 let (_, r2, r3) = tokio::join!(test_future, jh1, jh2);
163 r2.unwrap();
164 r3.unwrap();
165
166 }
167
168}