tokio_task_queue/
lib.rs

1pub mod tasks_with_regular_pauses;
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use tokio::spawn;
7use tokio::sync::Mutex;
8use tokio::sync::mpsc::{channel, Receiver, Sender};
9use tokio::task::JoinHandle;
10use tokio_interruptible_future::{InterruptError, interruptible_straight};
11
12pub type TaskItem = Pin<Box<dyn Future<Output = ()> + Send>>;
13
14/// Execute futures from a stream of futures in order in a Tokio task. Not tested code.
15pub struct TaskQueue
16{
17    tx: Sender<TaskItem>,
18    pub(crate) rx: Arc<Mutex<Receiver<TaskItem>>>,
19}
20
21impl TaskQueue {
22    pub fn new() -> Self {
23        let (tx, rx) = channel(1);
24        Self {
25            tx,
26            rx: Arc::new(Mutex::new(rx)),
27        }
28    }
29    async fn _task(this: Arc<Mutex<Self>>) {
30        // let mut rx = ReceiverStream::new(rx);
31        loop {
32            let this2 = this.clone();
33            let fut = { // block to shorten locks lifetime
34                let obj = this2.lock().await;
35                let rx = obj.rx.clone();
36                let mut rx = rx.lock().await;
37                rx.recv().await
38            };
39            if let Some(fut) = fut {
40                fut.await;
41            } else {
42                break;
43            }
44        }
45    }
46    pub fn spawn(
47        this: Arc<Mutex<Self>>,
48        notify_interrupt: async_channel::Receiver<()>,
49    ) -> JoinHandle<Result<(), InterruptError>> {
50        spawn( interruptible_straight(notify_interrupt, async move {
51            Self::_task(this).await;
52            Ok(())
53        }))
54    }
55    pub async fn push_task(&self, fut: TaskItem) {
56        let _ = self.tx.send(fut).await;
57    }
58}
59
60/// Object-safe variation of TaskQueue
61pub struct ObjectSafeTaskQueue {
62    base: Arc<Mutex<TaskQueue>>,
63}
64
65impl ObjectSafeTaskQueue {
66    pub fn new() -> Self {
67        Self {
68            base: Arc::new(Mutex::new(TaskQueue::new())),
69        }
70    }
71    pub async fn get_arc(&self) -> &Arc<Mutex<TaskQueue>> {
72        &self.base
73    }
74    pub async fn get_arc_mut(&mut self) -> &Arc<Mutex<TaskQueue>> {
75        &mut self.base
76    }
77    pub async fn spawn(
78        &self,
79        notify_interrupt: async_channel::Receiver<()>,
80    ) -> JoinHandle<Result<(), InterruptError>> {
81        TaskQueue::spawn(self.base.clone(), notify_interrupt)
82    }
83    pub async fn push_task(&self, fut: TaskItem) {
84        self.base.lock().await.push_task(fut).await
85    }
86}