tokio_warden_restartable/
lib.rs

1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3use std::{
4    future::Future,
5    pin::Pin,
6    sync::{Arc, RwLock},
7    task::{Context, Poll},
8};
9
10use tokio::task::{AbortHandle, JoinError, JoinHandle};
11
12/// A handle that can be used to restart the inner task of a [`Restartable`].
13#[derive(Clone)]
14pub struct RestartHandle {
15    current_abort: Arc<RwLock<AbortHandle>>,
16}
17
18impl RestartHandle {
19    /// Aborts the current running inner task and restarts a new one.
20    pub fn restart(&self) {
21        self.current_abort
22            .read()
23            .expect("failed to obtain read lock on abort handle")
24            .abort();
25    }
26}
27
28pub struct Restartable<F, T> {
29    spawn: F,
30    current: JoinHandle<T>,
31    current_abort: Arc<RwLock<AbortHandle>>,
32}
33
34impl<F, T> Restartable<F, T> {
35    /// Returns a handle that can be used to restart the inner task.
36    pub fn restart_handle(&self) -> RestartHandle {
37        RestartHandle {
38            current_abort: Arc::clone(&self.current_abort),
39        }
40    }
41}
42
43impl<F, T> Future for Restartable<F, T>
44where
45    F: FnMut(Option<Result<T, JoinError>>) -> JoinHandle<T> + Unpin,
46{
47    type Output = T;
48
49    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        let this = self.get_mut();
51        if let Poll::Ready(output) = Pin::new(&mut this.current).poll(cx) {
52            this.current = (this.spawn)(Some(output));
53            // Update the abort handle for the new task
54            *this
55                .current_abort
56                .write()
57                .expect("failed to obtain write lock on abort handle") =
58                this.current.abort_handle();
59            // Yield on restart
60            cx.waker().wake_by_ref();
61        }
62        Poll::Pending
63    }
64}
65
66/// Creates an auto restarting task.
67///
68/// The inner task is first created by `spawn` with `None`. Once the inner task
69/// aborts or exits, a new inner task is created by calling `spawn` with the
70/// previous result.
71pub fn restartable<F, T>(mut spawn: F) -> Restartable<F, T>
72where
73    F: FnMut(Option<Result<T, JoinError>>) -> JoinHandle<T>,
74{
75    let current = spawn(None);
76    let current_abort = Arc::new(RwLock::new(current.abort_handle()));
77    Restartable {
78        spawn,
79        current,
80        current_abort,
81    }
82}
83
84#[cfg(test)]
85mod tests;