tokio_task_queue/
tasks_with_regular_pauses.rs1use std::future::Future;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::spawn;
11use tokio::sync::{mpsc, Mutex};
12use tokio::sync::mpsc::Sender;
13use tokio::task::JoinHandle;
14use tokio::time::timeout;
15use async_trait::async_trait;
16use tokio_interruptible_future::{InterruptError, interruptible_sendable};
17
18#[allow(dead_code)]
19#[derive(Clone)]
20pub struct TasksWithRegularPausesData {
21 sudden_tx: Arc<Mutex<Option<Sender<()>>>>,
22}
23
24impl TasksWithRegularPausesData {
25 #[allow(dead_code)]
26 pub fn new() -> Self {
27 Self {
28 sudden_tx: Arc::new(Mutex::new(None)),
29 }
30 }
31}
32
33#[async_trait]
35pub trait TasksWithRegularPauses<Task: Future<Output = ()> + Send>: Send + Sync + 'static {
36 fn data(&self) -> &TasksWithRegularPausesData;
37 fn data_mut(&mut self) -> &mut TasksWithRegularPausesData;
38 async fn next_task(&mut self) -> Option<Task>;
39 fn sleep_duration(&self) -> Duration;
40 async fn _task(&mut self) -> Result<(), InterruptError> { loop {
42 let fut = self.next_task().await;
45 if let Some(fut) = fut {
46 fut.await;
47 } else {
48 break;
49 }
50
51 let (sudden_tx, mut sudden_rx) = mpsc::channel(1);
53 self.data_mut().sudden_tx = Arc::new(Mutex::new(Some(sudden_tx)));
54
55 let sleep_duration = self.sleep_duration(); let _ = timeout(sleep_duration, sudden_rx.recv()).await;
58 }
59 Ok(())
60 }
61 fn spawn(&'static mut self, interrupt_notifier: async_channel::Receiver<()>) -> JoinHandle<Result<(), InterruptError>> {
62 spawn( interruptible_sendable(interrupt_notifier, Box::pin(Self::_task(self))))
63 }
64 async fn suddenly(&self) -> Result<(), tokio::sync::mpsc::error::TrySendError<()>>{
65 let sudden_tx = self.data().sudden_tx.lock().await.take(); if let Some(sudden_tx) = sudden_tx {
67 sudden_tx.try_send(())?;
68 }
69 Ok(())
70 }
71}
72
73#[cfg(test)]
93mod tests {
94 use std::sync::Arc;
95 use std::time::Duration;
96 use async_channel::bounded;
97 use tokio::sync::Mutex;
98 use async_trait::async_trait;
99 use tokio::runtime::Runtime;
100 use tokio_interruptible_future::InterruptError;
101 use crate::TaskItem;
102 use crate::tasks_with_regular_pauses::{TasksWithRegularPauses, TasksWithRegularPausesData};
103
104 #[derive(Clone)]
105 struct OurTaskQueue {
106 data: TasksWithRegularPausesData,
107 }
108
109 impl OurTaskQueue {
110 pub fn new() -> Self {
111 Self {
112 data: TasksWithRegularPausesData::new(),
113 }
114 }
115 }
116
117 #[async_trait]
118 impl<'a> TasksWithRegularPauses<TaskItem> for OurTaskQueue where Self: 'static {
119 fn data(&self) -> &TasksWithRegularPausesData {
120 &self.data
121 }
122 fn data_mut(&mut self) -> &mut TasksWithRegularPausesData {
123 &mut self.data
124 }
125 async fn next_task(&self) -> Option<TaskItem> {
126 Some(Box::pin(async { () }))
127 }
128 fn sleep_duration(&self) -> Duration {
129 Duration::from_millis(1)
130 }
131 }
132
133 #[test]
134 fn empty() -> Result<(), InterruptError> {
135 let queue = OurTaskQueue::new();
136 let (interrupt_notifier_tx, interrupt_notifier_rx) = bounded(1);
137 let rt = Runtime::new().unwrap();
138 rt.block_on(async {
139 OurTaskQueue::spawn(queue.clone(), interrupt_notifier_rx);
140 let _ = interrupt_notifier_tx.send(()).await;
141 queue.clone().suddenly().await.unwrap();
142 });
143 Ok(())
144 }
145}