tokio_task_queue/
tasks_with_regular_pauses.rs

1//! Tasks that come separated by time pauses.
2//! A task can also be forced to be started at any time, but only during a pause.
3//! If a task is forced to be started, the schedule of pauses modifies to accommodate this task.
4//!
5//! This code is maybe more a demo than a serious module.
6
7use std::future::Future;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::spawn;
11use tokio::sync::{mpsc, Mutex};
12use tokio::sync::mpsc::Sender;
13use tokio::task::JoinHandle;
14use tokio::time::timeout;
15use async_trait::async_trait;
16use tokio_interruptible_future::{InterruptError, interruptible_sendable};
17
18#[allow(dead_code)]
19#[derive(Clone)]
20pub struct TasksWithRegularPausesData {
21    sudden_tx: Arc<Mutex<Option<Sender<()>>>>,
22}
23
24impl TasksWithRegularPausesData {
25    #[allow(dead_code)]
26    pub fn new() -> Self {
27        Self {
28            sudden_tx: Arc::new(Mutex::new(None)),
29        }
30    }
31}
32
33/// See module documentation.
34#[async_trait]
35pub trait TasksWithRegularPauses<Task: Future<Output = ()> + Send>: Send + Sync + 'static {
36    fn data(&self) -> &TasksWithRegularPausesData;
37    fn data_mut(&mut self) -> &mut TasksWithRegularPausesData;
38    async fn next_task(&mut self) -> Option<Task>;
39    fn sleep_duration(&self) -> Duration;
40    async fn _task(&mut self) -> Result<(), InterruptError> { // `InterruptError` here is a hack.
41        loop {
42            // It is time to run a task.
43            // let this1 = this.lock().await;
44            let fut = self.next_task().await;
45            if let Some(fut) = fut {
46                fut.await;
47            } else {
48                break;
49            }
50
51            // Signal that may interrupt the task.
52            let (sudden_tx, mut sudden_rx) = mpsc::channel(1);
53            self.data_mut().sudden_tx = Arc::new(Mutex::new(Some(sudden_tx)));
54
55            // Re-execute by a signal, or timeout (whichever comes first)
56            let sleep_duration = self.sleep_duration(); // lock for one line
57            let _ = timeout(sleep_duration, sudden_rx.recv()).await;
58        }
59        Ok(())
60    }
61    fn spawn(&'static mut self, interrupt_notifier: async_channel::Receiver<()>) -> JoinHandle<Result<(), InterruptError>> {
62        spawn( interruptible_sendable(interrupt_notifier, Box::pin(Self::_task(self))))
63    }
64    async fn suddenly(&self) -> Result<(), tokio::sync::mpsc::error::TrySendError<()>>{
65        let sudden_tx = self.data().sudden_tx.lock().await.take(); // atomic operation
66        if let Some(sudden_tx) = sudden_tx {
67            sudden_tx.try_send(())?;
68        }
69        Ok(())
70    }
71}
72
73/// Object-safe variation of TaskQueue
74// pub struct ObjectSafeTasksWithRegularPauses<Task: Future<Output = ()> + Send>: Send + Sync + 'static {
75//     base: Arc<Mutex<TasksWithRegularPauses<Task>>>,
76// }
77//
78// impl<Task: Future<Output = ()> + Send> ObjectSafeTasksWithRegularPauses<Task> {
79//     pub fn new() -> Self {
80//         Self {
81//             base: Arc::new(Mutex::new(TasksWithRegularPauses::new())),
82//         }
83//     }
84//     pub async fn get_arc(&self) -> &Arc<Mutex<TasksWithRegularPauses>> {
85//         &self.base
86//     }
87//     pub async fn get_arc_mut(&mut self) -> &Arc<Mutex<TasksWithRegularPauses>> {
88//         &mut self.base
89//     }
90// }
91
92#[cfg(test)]
93mod tests {
94    use std::sync::Arc;
95    use std::time::Duration;
96    use async_channel::bounded;
97    use tokio::sync::Mutex;
98    use async_trait::async_trait;
99    use tokio::runtime::Runtime;
100    use tokio_interruptible_future::InterruptError;
101    use crate::TaskItem;
102    use crate::tasks_with_regular_pauses::{TasksWithRegularPauses, TasksWithRegularPausesData};
103
104    #[derive(Clone)]
105    struct OurTaskQueue {
106        data: TasksWithRegularPausesData,
107    }
108
109    impl OurTaskQueue {
110        pub fn new() -> Self {
111            Self {
112                data: TasksWithRegularPausesData::new(),
113            }
114        }
115    }
116
117    #[async_trait]
118    impl<'a> TasksWithRegularPauses<TaskItem> for OurTaskQueue where Self: 'static {
119        fn data(&self) -> &TasksWithRegularPausesData {
120            &self.data
121        }
122        fn data_mut(&mut self) -> &mut TasksWithRegularPausesData {
123            &mut self.data
124        }
125        async fn next_task(&self) -> Option<TaskItem> {
126            Some(Box::pin(async { () }))
127        }
128        fn sleep_duration(&self) -> Duration {
129            Duration::from_millis(1)
130        }
131    }
132
133    #[test]
134    fn empty() -> Result<(), InterruptError> {
135        let queue = OurTaskQueue::new();
136        let (interrupt_notifier_tx, interrupt_notifier_rx) = bounded(1);
137        let rt  = Runtime::new().unwrap();
138        rt.block_on(async {
139            OurTaskQueue::spawn(queue.clone(), interrupt_notifier_rx);
140            let _ = interrupt_notifier_tx.send(()).await;
141            queue.clone().suddenly().await.unwrap();
142        });
143        Ok(())
144    }
145}