essential_node/handles/
node.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crate::error::{CriticalError, NodeHandleJoinError};

/// Handle for closing or joining the relayer and validation streams.
pub struct Handle {
    relayer: Option<essential_relayer::Handle>,
    validation: Option<crate::handles::validation::Handle<CriticalError>>,
}

impl Handle {
    /// Create a new handle.
    pub(crate) fn new(
        relayer: Option<essential_relayer::Handle>,
        validation: Option<crate::handles::validation::Handle<CriticalError>>,
    ) -> Self {
        Self {
            relayer,
            validation,
        }
    }

    /// Close the relayer and validation streams.
    ///
    /// If this future is dropped then all three streams will be closed.
    pub async fn close(self) -> Result<(), CriticalError> {
        let Self {
            relayer,
            validation,
        } = self;
        if let Some(relayer) = relayer {
            relayer.close().await?;
        }
        if let Some(validation) = validation {
            validation.close().await?;
        }
        Ok(())
    }

    /// Join the relayer and validation streams.
    ///
    /// Does not close but waits for all three streams to finish.
    /// If any of the streams finish or error then all streams will be closed.
    ///
    /// If this future is dropped then both streams will be closed.
    pub async fn join(self) -> Result<(), NodeHandleJoinError> {
        let Self {
            relayer,
            validation,
        } = self;

        let relayer_future = async move {
            if let Some(relayer) = relayer {
                relayer.join().await.map_err(NodeHandleJoinError::Relayer)
            } else {
                Ok(())
            }
        };
        tokio::pin!(relayer_future);

        let validation_future = async move {
            if let Some(validation) = validation {
                validation
                    .join()
                    .await
                    .map_err(NodeHandleJoinError::Validation)
            } else {
                Ok(())
            }
        };
        tokio::pin!(validation_future);

        // Wait for all to successfully complete or return early if one errors.
        tokio::try_join!(relayer_future, validation_future)?;
        Ok(())
    }
}