task_system/sync_queue/system/
mod.rs1use std::fmt::Display;
2
3use super::*;
4
5impl<T: Debug> Debug for TaskSystem<T> {
6 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
7 let s = &mut f.debug_struct("TaskSystem");
8 match self.queue.try_lock() {
9 Ok(o) => s.field("queue", &o).finish(),
10 Err(_) => s.field("queue", &"<LOCKED>").finish(),
11 }
12 }
13}
14
15impl<T: Debug> Display for TaskSystem<T> {
16 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
17 match self.queue.try_lock() {
18 Ok(o) => f.debug_list().entries(o.iter()).finish(),
19 Err(_) => f.debug_struct("TaskSystem").field("queue", &"<LOCKED>").finish(),
20 }
21 }
22}
23
24impl<T> Default for TaskSystem<T> {
25 fn default() -> Self {
26 Self { interrupt: Arc::new(Mutex::new(false)), queue: Arc::new(Mutex::new(VecDeque::new())) }
27 }
28}
29
30impl<T> TaskSystem<T> {
31 #[cfg(feature = "tokio")]
32 pub fn start<F>(&self, callback: F) -> tokio::task::JoinHandle<()>
33 where
34 F: Fn(T) -> bool + Send + 'static,
35 T: Send + 'static,
36 {
37 self.resume();
38 let queue = self.queue.clone();
39 let interrupt = self.interrupt.clone();
40 tokio::task::spawn_blocking(move || {
41 loop {
42 let task = match queue.try_lock() {
43 Ok(mut o) => match o.pop_front() {
44 Some(s) => s,
45 None => continue,
46 },
47 Err(_) => continue,
48 };
49 if can_interrupt(&interrupt) || !callback(task) {
50 break;
51 }
52 }
53 })
54 }
55
56 pub fn cancel_if<F>(&self, condition: F) -> Vec<T>
75 where
76 F: Fn(&T) -> bool + Send + 'static,
77 T: Send + 'static,
78 {
79 let mut result = Vec::new();
80 match self.queue.try_lock() {
81 Ok(mut o) => {
82 let mut i = 0;
83 while i < o.len() {
84 if condition(&o[i]) {
85 match o.remove(i) {
86 Some(s) => {
87 result.push(s);
88 }
89 None => continue,
90 }
91 }
92 else {
93 i += 1;
94 }
95 }
96 }
97 Err(_) => {}
98 }
99 result
100 }
101 pub fn cancel_first<F>(&self, condition: F) -> Option<T>
120 where
121 F: Fn(&T) -> bool + Send + 'static,
122 T: Send + 'static,
123 {
124 match self.queue.try_lock() {
125 Ok(mut o) => {
126 let mut i = 0;
127 while i < o.len() {
128 if condition(&o[i]) {
129 return o.remove(i);
130 }
131 else {
132 i += 1;
133 }
134 }
135 None
136 }
137 Err(_) => None,
138 }
139 }
140 pub fn send(&self, task: T) -> bool {
142 send_task(&self.queue, task).is_ok()
143 }
144 pub fn sender(&self) -> TaskSender<T> {
146 TaskSender { refer: TaskSystem { interrupt: self.interrupt.clone(), queue: self.queue.clone() } }
147 }
148 pub fn receive(&self) -> Option<T> {
150 match self.queue.try_lock() {
151 Ok(mut o) => o.pop_front(),
152 Err(_) => None,
153 }
154 }
155 pub fn consume<F>(&self, callback: F) -> bool
157 where
158 F: Fn(T) -> bool + Send + 'static,
159 T: Send + 'static,
160 {
161 match self.receive() {
162 Some(s) => {
163 callback(s);
164 true
165 }
166 None => false,
167 }
168 }
169 pub fn interrupt(&self) {
171 match self.interrupt.try_lock() {
172 Ok(mut o) => *o = true,
173 Err(_) => (),
174 }
175 }
176 pub fn resume(&self) {
178 match self.interrupt.try_lock() {
179 Ok(mut o) => *o = false,
180 Err(_) => (),
181 }
182 }
183}
184
185#[allow(dead_code)]
186fn can_interrupt(interrupt: &Arc<Mutex<bool>>) -> bool {
187 match interrupt.try_lock() {
188 Ok(o) => *o,
189 Err(_) => false,
190 }
191}