use super::service_base::{Service, ServiceMetrics};
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{interval, Duration};
use tracing::{error, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ServiceState {
Uninitialized,
Starting,
Running,
Degraded,
Stopping,
Stopped,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub state: ServiceState,
pub message: String,
pub last_check: std::time::SystemTime,
pub uptime_seconds: u64,
pub metrics: ServiceMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LifecycleEvent {
Started,
Stopped,
HealthCheckPassed,
HealthCheckFailed(String),
StateChanged(ServiceState),
Error(String),
}
#[async_trait]
pub trait ManagedService: Service {
async fn initialize(&self) -> Result<()> {
Ok(())
}
async fn health_check(&self) -> Result<HealthStatus>;
async fn shutdown(&self) -> Result<()> {
Ok(())
}
fn handle_event(&mut self, event: LifecycleEvent) {
if let LifecycleEvent::Error(msg) = event {
error!("Service error: {}", msg);
}
}
}
pub struct LifecycleWrapper<S: Service> {
service: Arc<RwLock<S>>,
state: Arc<RwLock<ServiceState>>,
running: Arc<AtomicBool>,
start_time: std::time::SystemTime,
health_check_interval: Duration,
metrics: Arc<RwLock<ServiceMetrics>>,
}
impl<S: Service> LifecycleWrapper<S> {
pub fn new(service: S, health_check_interval: Duration) -> Self {
Self {
service: Arc::new(RwLock::new(service)),
state: Arc::new(RwLock::new(ServiceState::Uninitialized)),
running: Arc::new(AtomicBool::new(false)),
start_time: std::time::SystemTime::now(),
health_check_interval,
metrics: Arc::new(RwLock::new(ServiceMetrics::default())),
}
}
pub async fn start(&self) -> Result<()> {
let mut state = self.state.write().await;
*state = ServiceState::Starting;
drop(state);
info!("Starting service lifecycle");
self.running.store(true, Ordering::Relaxed);
let mut state = self.state.write().await;
*state = ServiceState::Running;
drop(state);
self.start_health_check_loop().await;
info!("Service lifecycle started");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
let mut state = self.state.write().await;
*state = ServiceState::Stopping;
drop(state);
info!("Stopping service lifecycle");
self.running.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(1)).await;
let mut state = self.state.write().await;
*state = ServiceState::Stopped;
drop(state);
info!("Service lifecycle stopped");
Ok(())
}
pub async fn get_state(&self) -> ServiceState {
*self.state.read().await
}
pub async fn get_health(&self) -> HealthStatus {
let state = *self.state.read().await;
let metrics = self.metrics.read().await.clone();
let uptime = self.start_time.elapsed().unwrap_or_default().as_secs();
HealthStatus {
state,
message: format!("Service in {state:?} state"),
last_check: std::time::SystemTime::now(),
uptime_seconds: uptime,
metrics,
}
}
async fn start_health_check_loop(&self) {
let running = self.running.clone();
let state = self.state.clone();
let metrics = self.metrics.clone();
let interval_duration = self.health_check_interval;
tokio::spawn(async move {
let mut ticker = interval(interval_duration);
while running.load(Ordering::Relaxed) {
ticker.tick().await;
let current_state = *state.read().await;
let current_metrics = metrics.read().await.clone();
let success_rate = current_metrics.success_rate();
let new_state = if success_rate < 0.5 && current_metrics.request_count > 10 {
ServiceState::Failed
} else if success_rate < 0.8 && current_metrics.request_count > 10 {
ServiceState::Degraded
} else if current_state == ServiceState::Running
|| current_state == ServiceState::Degraded
{
ServiceState::Running
} else {
current_state
};
if new_state != current_state {
let mut state_guard = state.write().await;
*state_guard = new_state;
info!(
"Service state changed: {:?} -> {:?}",
current_state, new_state
);
}
}
});
}
}
#[async_trait]
impl<S> Service for LifecycleWrapper<S>
where
S: Service + Send + Sync,
S::Input: Send + Sync + 'static,
S::Output: Send + Sync + 'static,
S::Error: Send + Sync + 'static,
{
type Input = S::Input;
type Output = S::Output;
type Error = S::Error;
async fn process(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
let start = std::time::Instant::now();
let current_state = *self.state.read().await;
if current_state != ServiceState::Running && current_state != ServiceState::Degraded {
warn!("Service called while in {:?} state", current_state);
}
let service = self.service.read().await;
let result = service.process(input).await;
let mut metrics = self.metrics.write().await;
metrics.record_request(start.elapsed(), result.is_ok());
result
}
}
type ManagedServiceObject = dyn ManagedService<Input = serde_json::Value, Output = serde_json::Value, Error = anyhow::Error>
+ Send
+ Sync;
pub struct ServiceSupervisor {
services: Arc<RwLock<Vec<Arc<ManagedServiceObject>>>>,
running: Arc<AtomicBool>,
}
impl Default for ServiceSupervisor {
fn default() -> Self {
Self::new()
}
}
impl ServiceSupervisor {
#[must_use]
pub fn new() -> Self {
Self {
services: Arc::new(RwLock::new(Vec::new())),
running: Arc::new(AtomicBool::new(false)),
}
}
pub async fn register<S>(&self, service: S)
where
S: ManagedService<
Input = serde_json::Value,
Output = serde_json::Value,
Error = anyhow::Error,
> + Send
+ Sync
+ 'static,
{
let mut services = self.services.write().await;
services.push(Arc::new(service) as Arc<ManagedServiceObject>);
}
pub async fn start_all(&self) -> Result<()> {
info!("Starting service supervisor");
self.running.store(true, Ordering::Relaxed);
let services = self.services.read().await;
for service in services.iter() {
if let Err(e) = service.initialize().await {
error!("Failed to initialize service: {}", e);
}
}
self.start_monitoring().await;
Ok(())
}
pub async fn stop_all(&self) -> Result<()> {
info!("Stopping service supervisor");
self.running.store(false, Ordering::Relaxed);
let services = self.services.read().await;
for service in services.iter() {
if let Err(e) = service.shutdown().await {
error!("Failed to shutdown service: {}", e);
}
}
Ok(())
}
async fn start_monitoring(&self) {
let services = self.services.clone();
let running = self.running.clone();
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(30));
while running.load(Ordering::Relaxed) {
ticker.tick().await;
let services = services.read().await;
for service in services.iter() {
match service.health_check().await {
Ok(status) => {
if status.state == ServiceState::Failed {
warn!("Service health check failed: {}", status.message);
}
}
Err(e) => {
error!("Health check error: {}", e);
}
}
}
}
});
}
pub async fn get_all_health(&self) -> Vec<HealthStatus> {
let services = self.services.read().await;
let mut statuses = Vec::new();
for service in services.iter() {
if let Ok(status) = service.health_check().await {
statuses.push(status);
}
}
statuses
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Error;
struct TestService {
fail_count: std::sync::atomic::AtomicU32,
}
#[async_trait]
impl Service for TestService {
type Input = String;
type Output = String;
type Error = Error;
async fn process(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
let count = self.fail_count.fetch_add(1, Ordering::Relaxed);
if count < 2 {
Err(anyhow::anyhow!("Simulated failure"))
} else {
Ok(format!("Processed: {}", input))
}
}
}
#[tokio::test]
async fn test_lifecycle_wrapper() {
let service = TestService {
fail_count: std::sync::atomic::AtomicU32::new(0),
};
let wrapper = LifecycleWrapper::new(service, Duration::from_millis(100));
wrapper.start().await.unwrap();
assert_eq!(wrapper.get_state().await, ServiceState::Running);
let _ = wrapper.process("test1".to_string()).await;
let _ = wrapper.process("test2".to_string()).await;
let result = wrapper.process("test3".to_string()).await.unwrap();
assert_eq!(result, "Processed: test3");
wrapper.stop().await.unwrap();
assert_eq!(wrapper.get_state().await, ServiceState::Stopped);
}
#[tokio::test]
async fn test_health_status() {
let service = TestService {
fail_count: std::sync::atomic::AtomicU32::new(0),
};
let wrapper = LifecycleWrapper::new(service, Duration::from_millis(100));
wrapper.start().await.unwrap();
let health = wrapper.get_health().await;
assert_eq!(health.state, ServiceState::Running);
assert!(health.uptime_seconds < 3600);
wrapper.stop().await.unwrap();
}
}
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}