task_system/sync_queue/system/
mod.rs

1use std::fmt::Display;
2
3use super::*;
4
5impl<T: Debug> Debug for TaskSystem<T> {
6    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
7        let s = &mut f.debug_struct("TaskSystem");
8        match self.queue.try_lock() {
9            Ok(o) => s.field("queue", &o).finish(),
10            Err(_) => s.field("queue", &"<LOCKED>").finish(),
11        }
12    }
13}
14
15impl<T: Debug> Display for TaskSystem<T> {
16    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
17        match self.queue.try_lock() {
18            Ok(o) => f.debug_list().entries(o.iter()).finish(),
19            Err(_) => f.debug_struct("TaskSystem").field("queue", &"<LOCKED>").finish(),
20        }
21    }
22}
23
24impl<T> Default for TaskSystem<T> {
25    fn default() -> Self {
26        Self { interrupt: Arc::new(Mutex::new(false)), queue: Arc::new(Mutex::new(VecDeque::new())) }
27    }
28}
29
30impl<T> TaskSystem<T> {
31    #[cfg(feature = "tokio")]
32    pub fn start<F>(&self, callback: F) -> tokio::task::JoinHandle<()>
33    where
34        F: Fn(T) -> bool + Send + 'static,
35        T: Send + 'static,
36    {
37        self.resume();
38        let queue = self.queue.clone();
39        let interrupt = self.interrupt.clone();
40        tokio::task::spawn_blocking(move || {
41            loop {
42                let task = match queue.try_lock() {
43                    Ok(mut o) => match o.pop_front() {
44                        Some(s) => s,
45                        None => continue,
46                    },
47                    Err(_) => continue,
48                };
49                if can_interrupt(&interrupt) || !callback(task) {
50                    break;
51                }
52            }
53        })
54    }
55
56    ///  Cancel all tasks that match the condition.
57    ///
58    /// # Arguments
59    ///
60    /// * `condition`:
61    ///
62    /// returns: Vec<T, Global>
63    ///
64    /// # Examples
65    ///
66    /// ```
67    /// use task_system::TaskSystem;
68    /// let ts = TaskSystem::default();
69    /// (1..10).for_each(|i| {
70    ///     ts.send(i);
71    /// });
72    /// assert_eq!(ts.cancel_if(|i| i % 2 == 0), vec![2, 4, 6, 8]);
73    /// ```
74    pub fn cancel_if<F>(&self, condition: F) -> Vec<T>
75    where
76        F: Fn(&T) -> bool + Send + 'static,
77        T: Send + 'static,
78    {
79        let mut result = Vec::new();
80        match self.queue.try_lock() {
81            Ok(mut o) => {
82                let mut i = 0;
83                while i < o.len() {
84                    if condition(&o[i]) {
85                        match o.remove(i) {
86                            Some(s) => {
87                                result.push(s);
88                            }
89                            None => continue,
90                        }
91                    }
92                    else {
93                        i += 1;
94                    }
95                }
96            }
97            Err(_) => {}
98        }
99        result
100    }
101    /// Cancel the first task that matches the condition.
102    ///
103    /// # Arguments
104    ///
105    /// * `condition`:
106    ///
107    /// returns: Option<T>
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// use task_system::TaskSystem;
113    /// let ts = TaskSystem::default();
114    /// (1..10).for_each(|i| {
115    ///     ts.send(i);
116    /// });
117    /// assert_eq!(ts.cancel_first(|i| *i == 5), Some(5));
118    /// ```
119    pub fn cancel_first<F>(&self, condition: F) -> Option<T>
120    where
121        F: Fn(&T) -> bool + Send + 'static,
122        T: Send + 'static,
123    {
124        match self.queue.try_lock() {
125            Ok(mut o) => {
126                let mut i = 0;
127                while i < o.len() {
128                    if condition(&o[i]) {
129                        return o.remove(i);
130                    }
131                    else {
132                        i += 1;
133                    }
134                }
135                None
136            }
137            Err(_) => None,
138        }
139    }
140    /// Send a new task to the task system.
141    pub fn send(&self, task: T) -> bool {
142        send_task(&self.queue, task).is_ok()
143    }
144    /// Get a sender for the task system.
145    pub fn sender(&self) -> TaskSender<T> {
146        TaskSender { refer: TaskSystem { interrupt: self.interrupt.clone(), queue: self.queue.clone() } }
147    }
148    /// Receive a task from the task system.
149    pub fn receive(&self) -> Option<T> {
150        match self.queue.try_lock() {
151            Ok(mut o) => o.pop_front(),
152            Err(_) => None,
153        }
154    }
155    /// Consume a task from the task system.
156    pub fn consume<F>(&self, callback: F) -> bool
157    where
158        F: Fn(T) -> bool + Send + 'static,
159        T: Send + 'static,
160    {
161        match self.receive() {
162            Some(s) => {
163                callback(s);
164                true
165            }
166            None => false,
167        }
168    }
169    /// Interrupt all task runner.
170    pub fn interrupt(&self) {
171        match self.interrupt.try_lock() {
172            Ok(mut o) => *o = true,
173            Err(_) => (),
174        }
175    }
176    /// Stop interrupting task runner.
177    pub fn resume(&self) {
178        match self.interrupt.try_lock() {
179            Ok(mut o) => *o = false,
180            Err(_) => (),
181        }
182    }
183}
184
185#[allow(dead_code)]
186fn can_interrupt(interrupt: &Arc<Mutex<bool>>) -> bool {
187    match interrupt.try_lock() {
188        Ok(o) => *o,
189        Err(_) => false,
190    }
191}