use async_trait::async_trait;
use std::sync::Arc;
use crate::adapter::net::behavior::capability::CapabilityFilter;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaHealth {
pub healthy: bool,
pub diagnostic: Option<String>,
}
impl ReplicaHealth {
pub fn healthy() -> Self {
Self {
healthy: true,
diagnostic: None,
}
}
pub fn unhealthy(reason: impl Into<String>) -> Self {
Self {
healthy: false,
diagnostic: Some(reason.into()),
}
}
}
#[async_trait]
pub trait LifecycleDaemon: Send + Sync + 'static {
fn name(&self) -> &str;
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError>;
async fn on_stop(&self);
async fn health(&self) -> ReplicaHealth {
ReplicaHealth::healthy()
}
}
#[derive(Debug)]
pub enum LifecycleError {
StartFailed(String),
}
impl std::fmt::Display for LifecycleError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StartFailed(msg) => write!(f, "start failed: {msg}"),
}
}
}
impl std::error::Error for LifecycleError {}
pub struct LifecycleHandle {
daemon: Arc<dyn LifecycleDaemon>,
daemon_for_drop: Option<Arc<dyn LifecycleDaemon>>,
}
impl LifecycleHandle {
pub async fn start(daemon: Arc<dyn LifecycleDaemon>) -> Result<Self, LifecycleError> {
Arc::clone(&daemon).on_start().await?;
Ok(Self {
daemon: daemon.clone(),
daemon_for_drop: Some(daemon),
})
}
pub fn daemon(&self) -> &Arc<dyn LifecycleDaemon> {
&self.daemon
}
pub async fn stop(mut self) {
let daemon = self.daemon_for_drop.take();
if let Some(d) = daemon {
d.on_stop().await;
}
}
}
impl Drop for LifecycleHandle {
fn drop(&mut self) {
if let Some(daemon) = self.daemon_for_drop.take() {
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
handle.spawn(async move {
daemon.on_stop().await;
});
}
Err(_) => {
tracing::warn!(
daemon = daemon.name(),
"LifecycleHandle dropped outside a tokio runtime; \
skipping on_stop. Daemon must self-clean via its \
shutdown flag.",
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU8, Ordering};
struct CountingDaemon {
starts: AtomicU8,
stops: AtomicU8,
}
#[async_trait]
impl LifecycleDaemon for CountingDaemon {
fn name(&self) -> &str {
"counting"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
self.starts.fetch_add(1, Ordering::AcqRel);
Ok(())
}
async fn on_stop(&self) {
self.stops.fetch_add(1, Ordering::AcqRel);
}
}
#[tokio::test]
async fn start_fires_on_start_exactly_once() {
let daemon = Arc::new(CountingDaemon {
starts: AtomicU8::new(0),
stops: AtomicU8::new(0),
});
let handle = LifecycleHandle::start(daemon.clone()).await.expect("start");
assert_eq!(daemon.starts.load(Ordering::Acquire), 1);
assert_eq!(daemon.stops.load(Ordering::Acquire), 0);
handle.stop().await;
assert_eq!(daemon.stops.load(Ordering::Acquire), 1);
}
struct FailingStart;
#[async_trait]
impl LifecycleDaemon for FailingStart {
fn name(&self) -> &str {
"failing"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
Err(LifecycleError::StartFailed("intentional".into()))
}
async fn on_stop(&self) {}
}
#[tokio::test]
async fn start_failure_aborts_handle_creation() {
let result = LifecycleHandle::start(Arc::new(FailingStart)).await;
match result {
Err(LifecycleError::StartFailed(msg)) => assert_eq!(msg, "intentional"),
Ok(_) => panic!("expected StartFailed"),
}
}
#[tokio::test]
async fn drop_schedules_on_stop_under_tokio_runtime() {
let daemon = Arc::new(CountingDaemon {
starts: AtomicU8::new(0),
stops: AtomicU8::new(0),
});
{
let _handle = LifecycleHandle::start(daemon.clone()).await.expect("start");
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(daemon.stops.load(Ordering::Acquire), 1);
}
}