1mod cond;
2pub use crate::cond::*;
3pub mod constants;
4pub mod consumer;
5mod signal;
6pub use self::signal::*;
7mod spinner;
8pub use self::spinner::*;
9
10use futures::Future;
11use std::{fmt, pin::Pin, sync::Arc, thread};
12use tokio::{
13 sync::Notify,
14 time::{self, Duration},
15};
16
17pub use emixcore::{Error, Result};
18
19#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
20#[must_use]
21pub enum TaskResult {
22 #[default]
23 None,
24 Cancelled,
25 TimedOut,
26 Error(String),
27 Success,
28}
29
30impl fmt::Display for TaskResult {
31 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32 match self {
33 TaskResult::Cancelled => write!(f, "Cancelled"),
34 TaskResult::TimedOut => write!(f, "Timedout"),
35 TaskResult::Error(e) => write!(f, "Error: {}", e),
36 TaskResult::Success => write!(f, "Success"),
37 _ => Ok(()),
38 }
39 }
40}
41
42#[derive(Default, Clone, Debug, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub enum QueueBehavior {
44 #[default]
45 FIFO,
46 LIFO,
47}
48
49impl fmt::Display for QueueBehavior {
50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51 match self {
52 QueueBehavior::FIFO => write!(f, "FIFO"),
53 QueueBehavior::LIFO => write!(f, "LIFO"),
54 }
55 }
56}
57
58pub trait TaskItem: Clone + Send + Sync + fmt::Debug {}
59impl<T: Clone + Send + Sync + fmt::Debug> TaskItem for T {}
60
61pub trait StaticTaskItem: TaskItem + 'static {}
62impl<T: TaskItem + 'static> StaticTaskItem for T {}
63
64pub trait TaskDelegation<TPC: AwaitableConsumer<T>, T: StaticTaskItem>: StaticTaskItem {
65 fn on_started(&self, pc: &TPC);
66 fn process(&self, pc: &TPC, item: &T) -> Result<TaskResult>;
67 fn on_completed(&self, pc: &TPC, item: &T, result: &TaskResult) -> bool;
68 fn on_cancelled(&self, pc: &TPC);
69 fn on_finished(&self, pc: &TPC);
70}
71
72pub trait AwaitableConsumer<T: TaskItem>: StaticTaskItem {
73 fn is_cancelled(&self) -> bool;
74 fn is_finished(&self) -> bool;
75}
76
77pub fn wait<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
78 this: &TPC,
79 finished: &Arc<ManualResetCond>,
80) -> Result<()> {
81 match finished.wait_while(|| !this.is_cancelled() && !this.is_finished()) {
82 Ok(_) => {
83 if this.is_cancelled() {
84 Err(Error::Canceled)
85 } else {
86 Ok(())
87 }
88 }
89 Err(e) => Err(e),
90 }
91}
92
93pub async fn wait_async<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
94 this: &TPC,
95 finished: &Arc<Notify>,
96) -> Result<()> {
97 let mut notified = false;
98
99 while !notified && !this.is_finished() && !this.is_cancelled() {
100 thread::sleep(Duration::ZERO);
101 finished.notified().await;
102 notified = true;
103 }
104
105 if this.is_cancelled() {
106 return Err(Error::Canceled);
107 }
108
109 Ok(())
110}
111
112pub fn wait_until<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
113 this: &TPC,
114 finished: &Arc<ManualResetCond>,
115 cond: impl Fn(&TPC) -> bool,
116) -> Result<()> {
117 match finished.wait_while(|| !this.is_cancelled() && !this.is_finished() && !cond(this)) {
118 Ok(_) => {
119 if this.is_cancelled() {
120 Err(Error::Canceled)
121 } else {
122 Ok(())
123 }
124 }
125 Err(e) => Err(e),
126 }
127}
128
129pub async fn wait_until_async<
130 TPC: AwaitableConsumer<T>,
131 T: StaticTaskItem,
132 F: Fn(&TPC) -> Pin<Box<dyn Future<Output = bool> + Send>>,
133>(
134 this: &TPC,
135 finished: &Arc<Notify>,
136 cond: F,
137) -> Result<()> {
138 let mut notified = false;
139
140 while !cond(this).await && !notified && !this.is_cancelled() && !this.is_finished() {
141 finished.notified().await;
142 notified = true;
143 }
144
145 if this.is_cancelled() {
146 return Err(Error::Canceled);
147 }
148
149 Ok(())
150}
151
152pub fn wait_for<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
153 this: &TPC,
154 timeout: Duration,
155 finished: &Arc<ManualResetCond>,
156) -> Result<()> {
157 if timeout.is_zero() {
158 return Err(Error::Timeout);
159 }
160
161 match finished.wait_timeout_while(|| !this.is_cancelled() && !this.is_finished(), timeout) {
162 Ok(true) => {
163 if this.is_cancelled() {
164 Err(Error::Canceled)
165 } else {
166 Ok(())
167 }
168 }
169 Ok(false) => Err(Error::Timeout),
170 Err(e) => {
171 match e {
173 Error::Poisoned(_) => Err(e),
174 _ => Err(Error::Timeout),
175 }
176 }
177 }
178}
179
180pub async fn wait_for_async<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
181 this: &TPC,
182 timeout: Duration,
183 finished: &Arc<Notify>,
184) -> Result<()> {
185 if timeout.is_zero() {
186 return Err(Error::Timeout);
187 }
188
189 let result = time::timeout(timeout, finished.notified()).await;
190 match result {
191 Ok(_) => {
192 if this.is_cancelled() {
193 Err(Error::Canceled)
194 } else {
195 Ok(())
196 }
197 }
198 Err(_) => Err(Error::Timeout),
199 }
200}
201
202pub fn wait_for_until<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
203 this: &TPC,
204 timeout: Duration,
205 finished: &Arc<ManualResetCond>,
206 cond: impl Fn(&TPC) -> bool,
207) -> Result<()> {
208 if timeout.is_zero() {
209 return Err(Error::Timeout);
210 }
211 match finished.wait_timeout_while(
212 || !this.is_cancelled() && !this.is_finished() && !cond(this),
213 timeout,
214 ) {
215 Ok(true) => {
216 if this.is_cancelled() {
217 Err(Error::Canceled)
218 } else {
219 Ok(())
220 }
221 }
222 Ok(false) => Err(Error::Timeout),
223 Err(e) => {
224 match e {
226 Error::Poisoned(_) => Err(e),
227 _ => Err(Error::Timeout),
228 }
229 }
230 }
231}
232
233pub async fn wait_for_until_async<
234 TPC: AwaitableConsumer<T>,
235 T: StaticTaskItem,
236 F: Fn(&TPC) -> Pin<Box<dyn Future<Output = bool> + Send>>,
237>(
238 this: &TPC,
239 timeout: Duration,
240 finished: &Arc<Notify>,
241 cond: F,
242) -> Result<()> {
243 if timeout.is_zero() {
244 return Err(Error::Timeout);
245 }
246
247 let start = time::Instant::now();
248
249 while !cond(this).await {
250 if this.is_cancelled() {
251 return Err(Error::Canceled);
252 }
253
254 if time::Instant::now().duration_since(start) > timeout {
255 return Err(Error::Timeout);
256 }
257
258 match time::timeout(constants::PEEK_TIMEOUT_DEF, finished.notified()).await {
259 Ok(_) => {
260 if this.is_cancelled() {
261 return Err(Error::Canceled);
262 }
263
264 return Ok(());
265 }
266 Err(_) => {
267 if time::Instant::now().duration_since(start) > timeout {
268 return Err(Error::Timeout);
269 }
270 }
271 }
272 }
273
274 Ok(())
275}