1pub mod tasks_with_regular_pauses;
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use tokio::spawn;
7use tokio::sync::Mutex;
8use tokio::sync::mpsc::{channel, Receiver, Sender};
9use tokio::task::JoinHandle;
10use tokio_interruptible_future::{InterruptError, interruptible_straight};
11
12pub type TaskItem = Pin<Box<dyn Future<Output = ()> + Send>>;
13
14pub struct TaskQueue
16{
17 tx: Sender<TaskItem>,
18 pub(crate) rx: Arc<Mutex<Receiver<TaskItem>>>,
19}
20
21impl TaskQueue {
22 pub fn new() -> Self {
23 let (tx, rx) = channel(1);
24 Self {
25 tx,
26 rx: Arc::new(Mutex::new(rx)),
27 }
28 }
29 async fn _task(this: Arc<Mutex<Self>>) {
30 loop {
32 let this2 = this.clone();
33 let fut = { let obj = this2.lock().await;
35 let rx = obj.rx.clone();
36 let mut rx = rx.lock().await;
37 rx.recv().await
38 };
39 if let Some(fut) = fut {
40 fut.await;
41 } else {
42 break;
43 }
44 }
45 }
46 pub fn spawn(
47 this: Arc<Mutex<Self>>,
48 notify_interrupt: async_channel::Receiver<()>,
49 ) -> JoinHandle<Result<(), InterruptError>> {
50 spawn( interruptible_straight(notify_interrupt, async move {
51 Self::_task(this).await;
52 Ok(())
53 }))
54 }
55 pub async fn push_task(&self, fut: TaskItem) {
56 let _ = self.tx.send(fut).await;
57 }
58}
59
60pub struct ObjectSafeTaskQueue {
62 base: Arc<Mutex<TaskQueue>>,
63}
64
65impl ObjectSafeTaskQueue {
66 pub fn new() -> Self {
67 Self {
68 base: Arc::new(Mutex::new(TaskQueue::new())),
69 }
70 }
71 pub async fn get_arc(&self) -> &Arc<Mutex<TaskQueue>> {
72 &self.base
73 }
74 pub async fn get_arc_mut(&mut self) -> &Arc<Mutex<TaskQueue>> {
75 &mut self.base
76 }
77 pub async fn spawn(
78 &self,
79 notify_interrupt: async_channel::Receiver<()>,
80 ) -> JoinHandle<Result<(), InterruptError>> {
81 TaskQueue::spawn(self.base.clone(), notify_interrupt)
82 }
83 pub async fn push_task(&self, fut: TaskItem) {
84 self.base.lock().await.push_task(fut).await
85 }
86}