essential_relayer/
handle.rs

1use crate::Result;
2use tokio::sync::watch;
3
4#[cfg(test)]
5mod tests;
6
7/// Handle for closing or joining the relayer.
8pub struct Handle {
9    join_blocks: tokio::task::JoinHandle<Result<()>>,
10    close: Close,
11}
12
13/// Struct which when dropped will close the relayer.
14struct Close {
15    close_blocks: watch::Sender<()>,
16}
17
18impl Handle {
19    /// Create a new handle.
20    pub fn new(
21        join_blocks: tokio::task::JoinHandle<Result<()>>,
22        close_blocks: watch::Sender<()>,
23    ) -> Self {
24        Self {
25            join_blocks,
26            close: Close { close_blocks },
27        }
28    }
29
30    /// Close the Relayer streams and join them.
31    ///
32    /// If this future isn't polled the streams will continue to run.
33    /// However, if the future is dropped the streams will be closed.
34    pub async fn close(self) -> Result<()> {
35        let Self { join_blocks, close } = self;
36        // Close the streams.
37        close.close();
38
39        // Join both the streams.
40        let br = join_blocks.await;
41
42        // Flatten the results together.
43        flatten_result(br)
44    }
45
46    /// Join the Relayer streams.
47    ///
48    /// This does not close the streams.
49    /// Instead it waits for the streams to finish.
50    ///
51    /// However, if either stream finishes or errors then
52    /// both streams are closed.
53    ///
54    /// If this future is dropped then both streams will close.
55    pub async fn join(self) -> Result<()> {
56        let Self { join_blocks, close } = self;
57        let r = join_blocks.await;
58        close.close();
59        flatten_result(r)
60    }
61}
62
63impl Close {
64    fn close(&self) {
65        let _ = self.close_blocks.send(());
66    }
67}
68
69/// Flatten the result of a join handle into the relayer result.
70fn flatten_result(result: std::result::Result<Result<()>, tokio::task::JoinError>) -> Result<()> {
71    match result {
72        // Joined successfully.
73        // Return the result from the task.
74        Ok(r) => r,
75        Err(e) => {
76            // If the task panicked then resume the panic.
77            if e.is_panic() {
78                std::panic::resume_unwind(e.into_panic())
79            } else {
80                // If the task was cancelled then we consider the stream
81                // to successfully finished.
82                Ok(())
83            }
84        }
85    }
86}
87
88impl Drop for Close {
89    fn drop(&mut self) {
90        self.close();
91    }
92}