essential_relayer/handle.rs
1use crate::Result;
2use tokio::sync::watch;
3
4#[cfg(test)]
5mod tests;
6
7/// Handle for closing or joining the relayer.
8pub struct Handle {
9 join_blocks: tokio::task::JoinHandle<Result<()>>,
10 close: Close,
11}
12
13/// Struct which when dropped will close the relayer.
14struct Close {
15 close_blocks: watch::Sender<()>,
16}
17
18impl Handle {
19 /// Create a new handle.
20 pub fn new(
21 join_blocks: tokio::task::JoinHandle<Result<()>>,
22 close_blocks: watch::Sender<()>,
23 ) -> Self {
24 Self {
25 join_blocks,
26 close: Close { close_blocks },
27 }
28 }
29
30 /// Close the Relayer streams and join them.
31 ///
32 /// If this future isn't polled the streams will continue to run.
33 /// However, if the future is dropped the streams will be closed.
34 pub async fn close(self) -> Result<()> {
35 let Self { join_blocks, close } = self;
36 // Close the streams.
37 close.close();
38
39 // Join both the streams.
40 let br = join_blocks.await;
41
42 // Flatten the results together.
43 flatten_result(br)
44 }
45
46 /// Join the Relayer streams.
47 ///
48 /// This does not close the streams.
49 /// Instead it waits for the streams to finish.
50 ///
51 /// However, if either stream finishes or errors then
52 /// both streams are closed.
53 ///
54 /// If this future is dropped then both streams will close.
55 pub async fn join(self) -> Result<()> {
56 let Self { join_blocks, close } = self;
57 let r = join_blocks.await;
58 close.close();
59 flatten_result(r)
60 }
61}
62
63impl Close {
64 fn close(&self) {
65 let _ = self.close_blocks.send(());
66 }
67}
68
69/// Flatten the result of a join handle into the relayer result.
70fn flatten_result(result: std::result::Result<Result<()>, tokio::task::JoinError>) -> Result<()> {
71 match result {
72 // Joined successfully.
73 // Return the result from the task.
74 Ok(r) => r,
75 Err(e) => {
76 // If the task panicked then resume the panic.
77 if e.is_panic() {
78 std::panic::resume_unwind(e.into_panic())
79 } else {
80 // If the task was cancelled then we consider the stream
81 // to successfully finished.
82 Ok(())
83 }
84 }
85 }
86}
87
88impl Drop for Close {
89 fn drop(&mut self) {
90 self.close();
91 }
92}