essential_node/handles/node.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
use crate::error::{CriticalError, NodeHandleJoinError};
/// Handle for closing or joining the relayer and validation streams.
pub struct Handle {
relayer: Option<essential_relayer::Handle>,
validation: Option<crate::handles::validation::Handle<CriticalError>>,
}
impl Handle {
/// Create a new handle.
pub(crate) fn new(
relayer: Option<essential_relayer::Handle>,
validation: Option<crate::handles::validation::Handle<CriticalError>>,
) -> Self {
Self {
relayer,
validation,
}
}
/// Close the relayer and validation streams.
///
/// If this future is dropped then all three streams will be closed.
pub async fn close(self) -> Result<(), CriticalError> {
let Self {
relayer,
validation,
} = self;
if let Some(relayer) = relayer {
relayer.close().await?;
}
if let Some(validation) = validation {
validation.close().await?;
}
Ok(())
}
/// Join the relayer and validation streams.
///
/// Does not close but waits for all three streams to finish.
/// If any of the streams finish or error then all streams will be closed.
///
/// If this future is dropped then both streams will be closed.
pub async fn join(self) -> Result<(), NodeHandleJoinError> {
let Self {
relayer,
validation,
} = self;
let relayer_future = async move {
if let Some(relayer) = relayer {
relayer.join().await.map_err(NodeHandleJoinError::Relayer)
} else {
Ok(())
}
};
tokio::pin!(relayer_future);
let validation_future = async move {
if let Some(validation) = validation {
validation
.join()
.await
.map_err(NodeHandleJoinError::Validation)
} else {
Ok(())
}
};
tokio::pin!(validation_future);
// Wait for all to successfully complete or return early if one errors.
tokio::try_join!(relayer_future, validation_future)?;
Ok(())
}
}