#![deny(unsafe_code)]
#![warn(missing_docs)]
mod error;
mod handle;
mod shutdown;
mod tasks;
mod traits;
#[cfg(feature = "testing")]
pub mod testing;
pub use tokio::task::JoinHandle;
pub use tokio_util::sync::CancellationToken;
pub use error::{Result, ServiceError};
pub use handle::{ServiceHandle, TaskSummary};
pub use shutdown::{ExitReason, ExitStatus, ShutdownReason, ShutdownToken};
pub use tasks::{TaskKind, TaskRegistry};
pub use traits::{
DisconnectReason, InboundMessage, NodeLifecycle, PeerApi, PeerId, PeerInfo, RpcApi, RunContext,
StartContext, StopContext,
};
use std::sync::Arc;
use parking_lot::RwLock;
pub struct Service<N, A, R>
where
N: NodeLifecycle,
A: PeerApi,
R: RpcApi,
{
node: Arc<N>,
peer_api: Arc<A>,
rpc_api: Arc<R>,
shutdown: ShutdownToken,
tasks: TaskRegistry,
handle_state: Arc<RwLock<handle::HandleState>>,
started: Arc<std::sync::atomic::AtomicBool>,
}
impl<N, A, R> Service<N, A, R>
where
N: NodeLifecycle,
A: PeerApi,
R: RpcApi,
{
pub fn new(node: N, peer_api: A, rpc_api: R) -> Self {
let shutdown = ShutdownToken::new();
let tasks = TaskRegistry::new(shutdown.clone());
let handle_state = Arc::new(RwLock::new(handle::HandleState::new(N::NAME.unwrap_or(""))));
Self {
node: Arc::new(node),
peer_api: Arc::new(peer_api),
rpc_api: Arc::new(rpc_api),
shutdown,
tasks,
handle_state,
started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
pub fn handle(&self) -> ServiceHandle {
ServiceHandle::new(
self.shutdown.clone(),
self.tasks.clone(),
self.handle_state.clone(),
)
}
pub fn node(&self) -> &Arc<N> {
&self.node
}
pub fn peer_api(&self) -> &Arc<A> {
&self.peer_api
}
pub fn rpc_api(&self) -> &Arc<R> {
&self.rpc_api
}
pub fn shutdown_token(&self) -> &ShutdownToken {
&self.shutdown
}
pub fn tasks(&self) -> &TaskRegistry {
&self.tasks
}
pub fn request_shutdown(&self, reason: ShutdownReason) {
self.shutdown.cancel(reason);
}
pub async fn start(self) -> Result<ExitStatus> {
use std::sync::atomic::Ordering;
if self.started.swap(true, Ordering::SeqCst) {
return Err(ServiceError::AlreadyRunning);
}
let node = self.node.clone();
let start_ctx = StartContext {
name: N::NAME.unwrap_or(""),
shutdown: self.shutdown.clone(),
tasks: &self.tasks,
};
if let Err(e) = node.pre_start(&start_ctx).await {
return Err(ServiceError::PreStartFailed(Arc::new(e)));
}
if let Err(e) = node.on_start(&start_ctx).await {
let stop_ctx = StopContext {
shutdown: self.shutdown.clone(),
tasks: &self.tasks,
exit_reason: ExitReason::RunError(Arc::new(anyhow::anyhow!("on_start failed"))),
};
let _ = node.post_stop(&stop_ctx).await;
return Err(ServiceError::OnStartFailed(Arc::new(e)));
}
let run_ctx = RunContext {
shutdown: self.shutdown.clone(),
tasks: self.tasks.clone(),
};
let run_result = node.run(run_ctx).await;
let exit_reason = match &run_result {
Ok(()) => self
.shutdown
.reason()
.map(ExitReason::RequestedShutdown)
.unwrap_or(ExitReason::RunCompleted),
Err(e) => ExitReason::RunError(Arc::new(anyhow::anyhow!("{e}"))),
};
if !self.shutdown.is_cancelled() {
self.shutdown.cancel(ShutdownReason::RequestedByRun);
}
let join_result = self
.tasks
.join_all(std::time::Duration::from_secs(30))
.await;
let stop_ctx = StopContext {
shutdown: self.shutdown.clone(),
tasks: &self.tasks,
exit_reason: exit_reason.clone(),
};
let on_stop_result = node.on_stop(&stop_ctx).await;
let post_stop_result = node.post_stop(&stop_ctx).await;
if let Err(e) = run_result {
return Err(ServiceError::RunFailed(Arc::new(e)));
}
if let Err(e) = on_stop_result {
return Err(ServiceError::OnStopFailed(Arc::new(e)));
}
if let Err(e) = post_stop_result {
return Err(ServiceError::OnStopFailed(Arc::new(e)));
}
join_result?;
Ok(ExitStatus {
reason: exit_reason,
})
}
}