Skip to main content

emixthreading/
lib.rs

1mod cond;
2pub use crate::cond::*;
3pub mod constants;
4pub mod consumer;
5mod signal;
6pub use self::signal::*;
7mod spinner;
8pub use self::spinner::*;
9
10use futures::Future;
11use std::{fmt, pin::Pin, sync::Arc, thread};
12use tokio::{
13    sync::Notify,
14    time::{self, Duration},
15};
16
17pub use emixcore::{Error, Result};
18
19#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
20#[must_use]
21pub enum TaskResult {
22    #[default]
23    None,
24    Cancelled,
25    TimedOut,
26    Error(String),
27    Success,
28}
29
30impl fmt::Display for TaskResult {
31    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32        match self {
33            TaskResult::Cancelled => write!(f, "Cancelled"),
34            TaskResult::TimedOut => write!(f, "Timedout"),
35            TaskResult::Error(e) => write!(f, "Error: {}", e),
36            TaskResult::Success => write!(f, "Success"),
37            _ => Ok(()),
38        }
39    }
40}
41
42#[derive(Default, Clone, Debug, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub enum QueueBehavior {
44    #[default]
45    FIFO,
46    LIFO,
47}
48
49impl fmt::Display for QueueBehavior {
50    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51        match self {
52            QueueBehavior::FIFO => write!(f, "FIFO"),
53            QueueBehavior::LIFO => write!(f, "LIFO"),
54        }
55    }
56}
57
58pub trait TaskItem: Clone + Send + Sync + fmt::Debug {}
59impl<T: Clone + Send + Sync + fmt::Debug> TaskItem for T {}
60
61pub trait StaticTaskItem: TaskItem + 'static {}
62impl<T: TaskItem + 'static> StaticTaskItem for T {}
63
64pub trait TaskDelegation<TPC: AwaitableConsumer<T>, T: StaticTaskItem>: StaticTaskItem {
65    fn on_started(&self, pc: &TPC);
66    fn process(&self, pc: &TPC, item: &T) -> Result<TaskResult>;
67    fn on_completed(&self, pc: &TPC, item: &T, result: &TaskResult) -> bool;
68    fn on_cancelled(&self, pc: &TPC);
69    fn on_finished(&self, pc: &TPC);
70}
71
72pub trait AwaitableConsumer<T: TaskItem>: StaticTaskItem {
73    fn is_cancelled(&self) -> bool;
74    fn is_finished(&self) -> bool;
75}
76
77pub fn wait<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
78    this: &TPC,
79    finished: &Arc<ManualResetCond>,
80) -> Result<()> {
81    match finished.wait_while(|| !this.is_cancelled() && !this.is_finished()) {
82        Ok(_) => {
83            if this.is_cancelled() {
84                Err(Error::Canceled)
85            } else {
86                Ok(())
87            }
88        }
89        Err(e) => Err(e),
90    }
91}
92
93pub async fn wait_async<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
94    this: &TPC,
95    finished: &Arc<Notify>,
96) -> Result<()> {
97    let mut notified = false;
98
99    while !notified && !this.is_finished() && !this.is_cancelled() {
100        thread::sleep(Duration::ZERO);
101        finished.notified().await;
102        notified = true;
103    }
104
105    if this.is_cancelled() {
106        return Err(Error::Canceled);
107    }
108
109    Ok(())
110}
111
112pub fn wait_until<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
113    this: &TPC,
114    finished: &Arc<ManualResetCond>,
115    cond: impl Fn(&TPC) -> bool,
116) -> Result<()> {
117    match finished.wait_while(|| !this.is_cancelled() && !this.is_finished() && !cond(this)) {
118        Ok(_) => {
119            if this.is_cancelled() {
120                Err(Error::Canceled)
121            } else {
122                Ok(())
123            }
124        }
125        Err(e) => Err(e),
126    }
127}
128
129pub async fn wait_until_async<
130    TPC: AwaitableConsumer<T>,
131    T: StaticTaskItem,
132    F: Fn(&TPC) -> Pin<Box<dyn Future<Output = bool> + Send>>,
133>(
134    this: &TPC,
135    finished: &Arc<Notify>,
136    cond: F,
137) -> Result<()> {
138    let mut notified = false;
139
140    while !cond(this).await && !notified && !this.is_cancelled() && !this.is_finished() {
141        finished.notified().await;
142        notified = true;
143    }
144
145    if this.is_cancelled() {
146        return Err(Error::Canceled);
147    }
148
149    Ok(())
150}
151
152pub fn wait_for<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
153    this: &TPC,
154    timeout: Duration,
155    finished: &Arc<ManualResetCond>,
156) -> Result<()> {
157    if timeout.is_zero() {
158        return Err(Error::Timeout);
159    }
160
161    match finished.wait_timeout_while(|| !this.is_cancelled() && !this.is_finished(), timeout) {
162        Ok(true) => {
163            if this.is_cancelled() {
164                Err(Error::Canceled)
165            } else {
166                Ok(())
167            }
168        }
169        Ok(false) => Err(Error::Timeout),
170        Err(e) => {
171            // Preserve Poisoned errors, but treat other errors as timeout
172            match e {
173                Error::Poisoned(_) => Err(e),
174                _ => Err(Error::Timeout),
175            }
176        }
177    }
178}
179
180pub async fn wait_for_async<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
181    this: &TPC,
182    timeout: Duration,
183    finished: &Arc<Notify>,
184) -> Result<()> {
185    if timeout.is_zero() {
186        return Err(Error::Timeout);
187    }
188
189    let result = time::timeout(timeout, finished.notified()).await;
190    match result {
191        Ok(_) => {
192            if this.is_cancelled() {
193                Err(Error::Canceled)
194            } else {
195                Ok(())
196            }
197        }
198        Err(_) => Err(Error::Timeout),
199    }
200}
201
202pub fn wait_for_until<TPC: AwaitableConsumer<T>, T: StaticTaskItem>(
203    this: &TPC,
204    timeout: Duration,
205    finished: &Arc<ManualResetCond>,
206    cond: impl Fn(&TPC) -> bool,
207) -> Result<()> {
208    if timeout.is_zero() {
209        return Err(Error::Timeout);
210    }
211    match finished.wait_timeout_while(
212        || !this.is_cancelled() && !this.is_finished() && !cond(this),
213        timeout,
214    ) {
215        Ok(true) => {
216            if this.is_cancelled() {
217                Err(Error::Canceled)
218            } else {
219                Ok(())
220            }
221        }
222        Ok(false) => Err(Error::Timeout),
223        Err(e) => {
224            // Preserve Poisoned errors, but treat other errors as timeout
225            match e {
226                Error::Poisoned(_) => Err(e),
227                _ => Err(Error::Timeout),
228            }
229        }
230    }
231}
232
233pub async fn wait_for_until_async<
234    TPC: AwaitableConsumer<T>,
235    T: StaticTaskItem,
236    F: Fn(&TPC) -> Pin<Box<dyn Future<Output = bool> + Send>>,
237>(
238    this: &TPC,
239    timeout: Duration,
240    finished: &Arc<Notify>,
241    cond: F,
242) -> Result<()> {
243    if timeout.is_zero() {
244        return Err(Error::Timeout);
245    }
246
247    let start = time::Instant::now();
248
249    while !cond(this).await {
250        if this.is_cancelled() {
251            return Err(Error::Canceled);
252        }
253
254        if time::Instant::now().duration_since(start) > timeout {
255            return Err(Error::Timeout);
256        }
257
258        match time::timeout(constants::PEEK_TIMEOUT_DEF, finished.notified()).await {
259            Ok(_) => {
260                if this.is_cancelled() {
261                    return Err(Error::Canceled);
262                }
263
264                return Ok(());
265            }
266            Err(_) => {
267                if time::Instant::now().duration_since(start) > timeout {
268                    return Err(Error::Timeout);
269                }
270            }
271        }
272    }
273
274    Ok(())
275}