essential_node/handles/
node.rs

1use crate::error::{CriticalError, NodeHandleJoinError};
2
3/// Handle for closing or joining the relayer and validation streams.
4pub struct Handle {
5    relayer: Option<essential_relayer::Handle>,
6    validation: Option<crate::handles::validation::Handle<CriticalError>>,
7}
8
9impl Handle {
10    /// Create a new handle.
11    pub(crate) fn new(
12        relayer: Option<essential_relayer::Handle>,
13        validation: Option<crate::handles::validation::Handle<CriticalError>>,
14    ) -> Self {
15        Self {
16            relayer,
17            validation,
18        }
19    }
20
21    /// Close the relayer and validation streams.
22    ///
23    /// If this future is dropped then all three streams will be closed.
24    pub async fn close(self) -> Result<(), CriticalError> {
25        let Self {
26            relayer,
27            validation,
28        } = self;
29        if let Some(relayer) = relayer {
30            relayer.close().await?;
31        }
32        if let Some(validation) = validation {
33            validation.close().await?;
34        }
35        Ok(())
36    }
37
38    /// Join the relayer and validation streams.
39    ///
40    /// Does not close but waits for all three streams to finish.
41    /// If any of the streams finish or error then all streams will be closed.
42    ///
43    /// If this future is dropped then both streams will be closed.
44    pub async fn join(self) -> Result<(), NodeHandleJoinError> {
45        let Self {
46            relayer,
47            validation,
48        } = self;
49
50        let relayer_future = async move {
51            if let Some(relayer) = relayer {
52                relayer.join().await.map_err(NodeHandleJoinError::Relayer)
53            } else {
54                Ok(())
55            }
56        };
57        tokio::pin!(relayer_future);
58
59        let validation_future = async move {
60            if let Some(validation) = validation {
61                validation
62                    .join()
63                    .await
64                    .map_err(NodeHandleJoinError::Validation)
65            } else {
66                Ok(())
67            }
68        };
69        tokio::pin!(validation_future);
70
71        // Wait for all to successfully complete or return early if one errors.
72        tokio::try_join!(relayer_future, validation_future)?;
73        Ok(())
74    }
75}