Skip to main content

veilid_tools/
must_join_handle.rs

1use super::*;
2
3use core::task::{Context, Poll};
4
5#[derive(Debug)]
6pub struct MustJoinHandle<T> {
7    join_handle: Option<LowLevelJoinHandle<T>>,
8    completed: bool,
9}
10
11impl<T> MustJoinHandle<T> {
12    #[must_use]
13    pub fn new(join_handle: LowLevelJoinHandle<T>) -> Self {
14        Self {
15            join_handle: Some(join_handle),
16            completed: false,
17        }
18    }
19
20    pub fn detach(mut self) {
21        cfg_if! {
22            if #[cfg(feature="rt-async-std")] {
23                self.join_handle = None;
24            } else if #[cfg(feature="rt-tokio")] {
25                self.join_handle = None;
26            } else if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
27                if let Some(jh) = self.join_handle.take() {
28                    jh.detach();
29                }
30            } else {
31                compile_error!("needs executor implementation");
32            }
33        }
34        self.completed = true;
35    }
36
37    #[allow(unused_mut)]
38    #[cfg_attr(
39        all(target_arch = "wasm32", target_os = "unknown"),
40        expect(clippy::unused_async)
41    )]
42    pub async fn abort(mut self) {
43        if !self.completed {
44            cfg_if! {
45                if #[cfg(feature="rt-async-std")] {
46                    if let Some(jh) = self.join_handle.take() {
47                        jh.cancel().await;
48                        self.completed = true;
49                    }
50                } else if #[cfg(feature="rt-tokio")] {
51                    if let Some(jh) = self.join_handle.take() {
52                        jh.abort();
53                        let _ = jh.await;
54                        self.completed = true;
55                    }
56                } else if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
57                    drop(self.join_handle.take());
58                    self.completed = true;
59                } else {
60                    compile_error!("needs executor implementation");
61                }
62
63            }
64        }
65    }
66}
67
68impl<T> Drop for MustJoinHandle<T> {
69    fn drop(&mut self) {
70        // panic if we haven't completed
71        if !self.completed {
72            panic!("MustJoinHandle was not completed upon drop. Add cooperative cancellation where appropriate to ensure this is completed before drop.")
73        }
74    }
75}
76
77impl<T: 'static> Future for MustJoinHandle<T> {
78    type Output = T;
79
80    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        match Pin::new(self.join_handle.as_mut().unwrap_or_log()).poll(cx) {
82            Poll::Ready(t) => {
83                if self.completed {
84                    panic!("should not poll completed join handle");
85                }
86                self.completed = true;
87                cfg_if! {
88                    if #[cfg(feature="rt-async-std")] {
89                        Poll::Ready(t)
90                    } else if #[cfg(feature="rt-tokio")] {
91                        match t {
92                            Ok(t) => Poll::Ready(t),
93                            Err(e) => {
94                                if e.is_panic() {
95                                    // Resume the panic on the main task
96                                    std::panic::resume_unwind(e.into_panic());
97                                } else {
98                                    panic!("join error was not a panic, should not poll after abort");
99                                }
100                            }
101                        }
102                    } else if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
103                        Poll::Ready(t)
104                    } else {
105                        compile_error!("needs executor implementation");
106                    }
107                }
108            }
109            Poll::Pending => Poll::Pending,
110        }
111    }
112}