tokio_warden_restartable/
lib.rs1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3use std::{
4 future::Future,
5 pin::Pin,
6 sync::{Arc, RwLock},
7 task::{Context, Poll},
8};
9
10use tokio::task::{AbortHandle, JoinError, JoinHandle};
11
12#[derive(Clone)]
14pub struct RestartHandle {
15 current_abort: Arc<RwLock<AbortHandle>>,
16}
17
18impl RestartHandle {
19 pub fn restart(&self) {
21 self.current_abort
22 .read()
23 .expect("failed to obtain read lock on abort handle")
24 .abort();
25 }
26}
27
28pub struct Restartable<F, T> {
29 spawn: F,
30 current: JoinHandle<T>,
31 current_abort: Arc<RwLock<AbortHandle>>,
32}
33
34impl<F, T> Restartable<F, T> {
35 pub fn restart_handle(&self) -> RestartHandle {
37 RestartHandle {
38 current_abort: Arc::clone(&self.current_abort),
39 }
40 }
41}
42
43impl<F, T> Future for Restartable<F, T>
44where
45 F: FnMut(Option<Result<T, JoinError>>) -> JoinHandle<T> + Unpin,
46{
47 type Output = T;
48
49 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 let this = self.get_mut();
51 if let Poll::Ready(output) = Pin::new(&mut this.current).poll(cx) {
52 this.current = (this.spawn)(Some(output));
53 *this
55 .current_abort
56 .write()
57 .expect("failed to obtain write lock on abort handle") =
58 this.current.abort_handle();
59 cx.waker().wake_by_ref();
61 }
62 Poll::Pending
63 }
64}
65
66pub fn restartable<F, T>(mut spawn: F) -> Restartable<F, T>
72where
73 F: FnMut(Option<Result<T, JoinError>>) -> JoinHandle<T>,
74{
75 let current = spawn(None);
76 let current_abort = Arc::new(RwLock::new(current.abort_handle()));
77 Restartable {
78 spawn,
79 current,
80 current_abort,
81 }
82}
83
84#[cfg(test)]
85mod tests;