embedded_mqttc/queue_vec/
mod.rs1
2#![cfg_attr(not(feature = "std"), no_std)]
3
4use core::cell::RefCell;
5
6use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};
7use split::{QueuedVecInner, WithQueuedVecInner};
8
9pub mod split;
10
11pub const MAX_WAKERS: usize = 4;
12
13pub struct QueuedVec<R: RawMutex, T: 'static, const N: usize> {
14 inner: Mutex<R, RefCell<QueuedVecInner<(), T, N>>>
15}
16
17impl <R: RawMutex, T: 'static, const N: usize> WithQueuedVecInner<(), T, N> for QueuedVec<R, T, N> {
18 fn with_queued_vec_inner<F, O>(&self, operation: F) -> O where F: FnOnce(&mut QueuedVecInner<(), T, N>) -> O {
19 self.inner.lock(|inner| {
20 let mut inner = inner.borrow_mut();
21 operation(&mut inner)
22 })
23 }
24}
25
26impl <R: RawMutex, T: 'static, const N: usize> QueuedVec<R, T, N> {
27
28 pub fn new() -> Self {
29 Self {
30 inner: Mutex::new(RefCell::new(QueuedVecInner::new(())))
31 }
32 }
33
34 pub fn remove<'a, F: FnMut(&T) -> bool>(&'a self, remove_where: F) -> RemoveIterator<'a, R, T, F, N> {
37 RemoveIterator{
38 q: self,
39 remove_where
40 }
41 }
42
43}
44
45pub struct RemoveIterator <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize>{
46 q: &'a QueuedVec<R, T, N>,
47 remove_where: F
48}
49
50impl <'a, R: RawMutex, T: 'static, F: FnMut(&T) -> bool, const N: usize> Iterator for RemoveIterator<'a, R, T, F, N> {
51 type Item = T;
52
53 fn next(&mut self) -> Option<Self::Item> {
54
55 self.q.with_queued_vec_inner(|inner|{
56 let (inner, _) = inner.working_copy();
57 for i in 0..inner.data.len() {
58 if (self.remove_where)(&inner.data[i]) {
59 return Some(inner.data.remove(i))
60 }
61 }
62 None
63 })
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 extern crate std;
70
71 use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
72 use tokio::time::sleep;
73 use core::time::Duration;
74 use std::sync::Arc;
75
76 use super::{split::WithQueuedVecInner, QueuedVec};
77
78 #[tokio::test]
79 async fn test_add() {
80 let q = QueuedVec::<CriticalSectionRawMutex, usize, 4>::new();
83
84 q.push(1).await;
85 q.push(2).await;
86 q.push(3).await;
87 q.push(4).await;
88
89 q.operate(|v| {
90 assert_eq!(&v[..], &[1, 2, 3, 4]);
91 });
92 }
93
94 #[tokio::test]
95 async fn test_wait_add() {
96
97 let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
98 let q2 = q.clone();
99
100 q.push(1).await;
101 q.push(2).await;
102 q.push(3).await;
103 q.push(4).await;
104
105 tokio::spawn(async move {
106 q2.push(5).await;
107 });
108
109 sleep(Duration::from_millis(15)).await;
110
111 q.operate(|v|{
112 assert_eq!(&v[..], &[1, 2, 3, 4]);
113 v.remove(0);
114 });
115
116 sleep(Duration::from_millis(15)).await;
117
118 q.operate(|v| {
119 assert_eq!(&v[..], &[2, 3, 4, 5]);
120 });
121 }
122
123 #[tokio::test]
124 async fn test_parallelism() {
125
126 const EXPECTED: usize = 190;
127
128 let q = Arc::new(QueuedVec::<CriticalSectionRawMutex, usize, 4>::new());
129
130 let q1 = q.clone();
131 let jh1 = tokio::spawn(async move {
132 for i in 0..10 {
133 q1.push(i * 2).await;
134 }
135 });
136
137 let q2 = q.clone();
138 let jh2 = tokio::spawn(async move {
139 for i in 0..10 {
140 q2.push(i * 2 + 1).await;
141 }
142 });
143
144 let test_future = async {
145 sleep(Duration::from_millis(15)).await;
146
147 let mut n = 0;
148
149 while q.operate(|v| {
150 match v.pop() {
151 Some(value) => {
152 n += value;
153 true
154 },
155 None => false,
156 }
157 }) {
158 sleep(Duration::from_millis(5)).await;
159 }
160
161 assert_eq!(n, EXPECTED);
162 };
163
164 let (_, r2, r3) = tokio::join!(test_future, jh1, jh2);
165 r2.unwrap();
166 r3.unwrap();
167
168 }
169
170}