use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::{interval, sleep};
use crate::matrixrpc::registry::RegistryService;
use crate::matrixrpc::service::{ExtensionService, ServiceId, ServiceStatus};
#[derive(Debug, Clone)]
pub enum LifecycleEvent {
Started(ServiceId),
Stopped(ServiceId),
StatusChanged {
id: ServiceId,
old_status: ServiceStatus,
new_status: ServiceStatus,
},
Heartbeat(ServiceId),
HeartbeatTimeout(ServiceId),
Reconnecting {
id: ServiceId,
attempt: u32,
max_attempts: u32,
},
Reconnected(ServiceId),
ReconnectFailed(ServiceId),
Error {
id: ServiceId,
error: String,
},
}
#[derive(Debug, Clone)]
pub struct LifecycleConfig {
pub heartbeat_interval_secs: u64,
pub heartbeat_timeout_secs: u64,
pub max_reconnect_attempts: u32,
pub reconnect_delay_ms: u64,
pub max_reconnect_delay_ms: u64,
pub reconnect_backoff_multiplier: f64,
pub auto_reconnect: bool,
}
impl Default for LifecycleConfig {
fn default() -> Self {
Self {
heartbeat_interval_secs: 30,
heartbeat_timeout_secs: 90,
max_reconnect_attempts: 5,
reconnect_delay_ms: 1000,
max_reconnect_delay_ms: 30000,
reconnect_backoff_multiplier: 2.0,
auto_reconnect: true,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum LifecycleError {
#[error("Service '{0}' not found")]
NotFound(String),
#[error("Connection failed: {0}")]
ConnectionFailed(String),
#[error("Reconnection failed after {0} attempts")]
ReconnectFailed(u32),
#[error("Invalid state: {0}")]
InvalidState(String),
#[error("Internal error: {0}")]
Internal(String),
}
#[derive(Debug, Clone)]
struct ServiceLifecycle {
#[allow(dead_code)]
id: ServiceId,
#[allow(dead_code)]
status: ServiceStatus,
reconnect_attempts: u32,
auto_reconnect: bool,
stop_tx: Option<mpsc::Sender<()>>,
}
pub struct LifecycleManager {
registry: Arc<RegistryService>,
config: LifecycleConfig,
lifecycles: Arc<RwLock<HashMap<ServiceId, ServiceLifecycle>>>,
event_tx: broadcast::Sender<LifecycleEvent>,
}
impl LifecycleManager {
pub fn new(registry: Arc<RegistryService>) -> Self {
Self::with_config(registry, LifecycleConfig::default())
}
pub fn with_config(registry: Arc<RegistryService>, config: LifecycleConfig) -> Self {
let (event_tx, _) = broadcast::channel(256);
Self {
registry,
config,
lifecycles: Arc::new(RwLock::new(HashMap::new())),
event_tx,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<LifecycleEvent> {
self.event_tx.subscribe()
}
pub async fn start_service(&self, service: ExtensionService) -> Result<ServiceId, LifecycleError> {
let auto_reconnect = service.transport.auto_reconnect;
let id = self
.registry
.register(service)
.await
.map_err(|e| LifecycleError::Internal(e.to_string()))?;
self.registry
.update_status(&id, ServiceStatus::Starting)
.await
.map_err(|e| LifecycleError::Internal(e.to_string()))?;
let (stop_tx, stop_rx) = mpsc::channel(1);
let lifecycle = ServiceLifecycle {
id: id.clone(),
status: ServiceStatus::Starting,
reconnect_attempts: 0,
auto_reconnect,
stop_tx: Some(stop_tx),
};
{
let mut lifecycles = self.lifecycles.write().await;
lifecycles.insert(id.clone(), lifecycle);
}
self.spawn_heartbeat_monitor(id.clone(), stop_rx);
let _ = self.event_tx.send(LifecycleEvent::Started(id.clone()));
self.transition_status(&id, ServiceStatus::Running).await?;
Ok(id)
}
pub async fn stop_service(&self, id: &ServiceId) -> Result<(), LifecycleError> {
self.transition_status(id, ServiceStatus::Stopping).await?;
let lifecycle = {
let mut lifecycles = self.lifecycles.write().await;
lifecycles.remove(id).ok_or_else(|| LifecycleError::NotFound(id.to_string()))?
};
if let Some(stop_tx) = lifecycle.stop_tx {
let _ = stop_tx.send(()).await;
}
self.registry
.unregister(id)
.await
.map_err(|e| LifecycleError::Internal(e.to_string()))?;
let _ = self.event_tx.send(LifecycleEvent::Stopped(id.clone()));
Ok(())
}
pub async fn handle_heartbeat(&self, id: &ServiceId) -> Result<(), LifecycleError> {
self.registry
.heartbeat(id)
.await
.map_err(|e| LifecycleError::Internal(e.to_string()))?;
{
let mut lifecycles = self.lifecycles.write().await;
if let Some(lifecycle) = lifecycles.get_mut(id) {
lifecycle.reconnect_attempts = 0;
}
}
let _ = self.event_tx.send(LifecycleEvent::Heartbeat(id.clone()));
Ok(())
}
pub async fn handle_error(&self, id: &ServiceId, error: String) -> Result<(), LifecycleError> {
let _ = self.event_tx.send(LifecycleEvent::Error {
id: id.clone(),
error,
});
self.transition_status(id, ServiceStatus::Error).await?;
let should_reconnect = {
let lifecycles = self.lifecycles.read().await;
lifecycles
.get(id)
.map(|l| l.auto_reconnect)
.unwrap_or(false)
};
if should_reconnect {
self.attempt_reconnect(id).await?;
}
Ok(())
}
async fn attempt_reconnect(&self, id: &ServiceId) -> Result<(), LifecycleError> {
let (max_attempts, _delay_ms, backoff) = {
let lifecycles = self.lifecycles.read().await;
let lifecycle = lifecycles.get(id).ok_or_else(|| LifecycleError::NotFound(id.to_string()))?;
(self.config.max_reconnect_attempts, self.config.reconnect_delay_ms, lifecycle.reconnect_attempts)
};
if backoff >= max_attempts {
let _ = self.event_tx.send(LifecycleEvent::ReconnectFailed(id.clone()));
return Err(LifecycleError::ReconnectFailed(max_attempts));
}
{
let mut lifecycles = self.lifecycles.write().await;
if let Some(lifecycle) = lifecycles.get_mut(id) {
lifecycle.reconnect_attempts += 1;
lifecycle.status = ServiceStatus::Reconnecting;
}
}
let _ = self.event_tx.send(LifecycleEvent::Reconnecting {
id: id.clone(),
attempt: backoff + 1,
max_attempts: max_attempts,
});
self.registry
.update_status(id, ServiceStatus::Reconnecting)
.await
.map_err(|e| LifecycleError::Internal(e.to_string()))?;
let delay = self.calculate_reconnect_delay(backoff);
sleep(Duration::from_millis(delay)).await;
Ok(())
}
fn calculate_reconnect_delay(&self, attempt: u32) -> u64 {
let base = self.config.reconnect_delay_ms as f64;
let multiplier = self.config.reconnect_backoff_multiplier.powi(attempt as i32);
let delay = base * multiplier;
let jitter = delay * 0.1 * (rand_jitter() - 0.5) * 2.0;
let final_delay = (delay + jitter) as u64;
final_delay.min(self.config.max_reconnect_delay_ms)
}
async fn transition_status(
&self,
id: &ServiceId,
new_status: ServiceStatus,
) -> Result<(), LifecycleError> {
let old_status = {
let lifecycles = self.lifecycles.read().await;
lifecycles
.get(id)
.map(|l| l.status)
.ok_or_else(|| LifecycleError::NotFound(id.to_string()))?
};
if old_status == new_status {
return Ok(());
}
{
let mut lifecycles = self.lifecycles.write().await;
if let Some(lifecycle) = lifecycles.get_mut(id) {
lifecycle.status = new_status;
}
}
self.registry
.update_status(id, new_status)
.await
.map_err(|e| LifecycleError::Internal(e.to_string()))?;
let _ = self.event_tx.send(LifecycleEvent::StatusChanged {
id: id.clone(),
old_status,
new_status,
});
Ok(())
}
fn spawn_heartbeat_monitor(&self, id: ServiceId, mut stop_rx: mpsc::Receiver<()>) {
let registry = self.registry.clone();
let event_tx = self.event_tx.clone();
let timeout_secs = self.config.heartbeat_timeout_secs;
tokio::spawn(async move {
let mut check_interval = interval(Duration::from_secs(timeout_secs / 3));
loop {
tokio::select! {
_ = stop_rx.recv() => {
break;
}
_ = check_interval.tick() => {
if let Some(service) = registry.get(&id).await {
if !service.is_healthy(timeout_secs) {
if service.status == ServiceStatus::Running {
let _ = event_tx.send(LifecycleEvent::HeartbeatTimeout(id.clone()));
let _ = registry.update_status(&id, ServiceStatus::Reconnecting).await;
}
}
} else {
break;
}
}
}
}
});
}
pub async fn stop_all(&self) {
let ids: Vec<ServiceId> = {
let lifecycles = self.lifecycles.read().await;
lifecycles.keys().cloned().collect()
};
for id in ids {
let _ = self.stop_service(&id).await;
}
}
pub async fn get_status(&self, id: &ServiceId) -> Option<ServiceStatus> {
let lifecycles = self.lifecycles.read().await;
lifecycles.get(id).map(|l| l.status)
}
pub async fn is_healthy(&self, id: &ServiceId) -> bool {
let lifecycles = self.lifecycles.read().await;
lifecycles
.get(id)
.map(|l| l.status == ServiceStatus::Running)
.unwrap_or(false)
}
pub async fn count(&self) -> usize {
self.lifecycles.read().await.len()
}
pub async fn health_check(&self) -> Vec<ServiceId> {
self.registry.health_check().await
}
}
fn rand_jitter() -> f64 {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
nanos as f64 / u32::MAX as f64
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_lifecycle_manager_creation() {
let registry = Arc::new(RegistryService::new());
let manager = LifecycleManager::new(registry);
assert_eq!(manager.count().await, 0);
}
#[tokio::test]
async fn test_start_stop_service() {
let registry = Arc::new(RegistryService::new());
let manager = LifecycleManager::new(registry);
let service = ExtensionService::new("test", "1.0.0");
let id = manager.start_service(service).await.unwrap();
assert_eq!(manager.count().await, 1);
assert_eq!(manager.get_status(&id).await, Some(ServiceStatus::Running));
manager.stop_service(&id).await.unwrap();
assert_eq!(manager.count().await, 0);
}
#[tokio::test]
async fn test_handle_heartbeat() {
let registry = Arc::new(RegistryService::new());
let manager = LifecycleManager::new(registry);
let service = ExtensionService::new("test", "1.0.0");
let id = manager.start_service(service).await.unwrap();
let result = manager.handle_heartbeat(&id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_handle_error() {
let registry = Arc::new(RegistryService::new());
let manager = LifecycleManager::new(registry.clone());
let service = ExtensionService::new("test", "1.0.0");
let id = manager.start_service(service).await.unwrap();
{
let mut lifecycles = manager.lifecycles.write().await;
if let Some(l) = lifecycles.get_mut(&id) {
l.auto_reconnect = false;
}
}
manager
.handle_error(&id, "Test error".to_string())
.await
.unwrap();
assert_eq!(manager.get_status(&id).await, Some(ServiceStatus::Error));
}
#[tokio::test]
async fn test_lifecycle_events() {
let registry = Arc::new(RegistryService::new());
let manager = LifecycleManager::new(registry);
let mut event_rx = manager.subscribe();
let service = ExtensionService::new("test", "1.0.0");
let id = manager.start_service(service).await.unwrap();
let event1 = event_rx.try_recv();
let event2 = event_rx.try_recv();
assert!(event1.is_ok() || event2.is_ok());
}
#[tokio::test]
async fn test_lifecycle_config() {
let registry = Arc::new(RegistryService::new());
let config = LifecycleConfig {
heartbeat_interval_secs: 10,
heartbeat_timeout_secs: 30,
max_reconnect_attempts: 3,
..Default::default()
};
let manager = LifecycleManager::with_config(registry, config);
assert_eq!(manager.config.heartbeat_interval_secs, 10);
assert_eq!(manager.config.heartbeat_timeout_secs, 30);
assert_eq!(manager.config.max_reconnect_attempts, 3);
}
#[test]
fn test_calculate_reconnect_delay() {
let registry = Arc::new(RegistryService::new());
let manager = LifecycleManager::new(registry);
let delay0 = manager.calculate_reconnect_delay(0);
let delay1 = manager.calculate_reconnect_delay(1);
let delay2 = manager.calculate_reconnect_delay(2);
assert!(delay1 > delay0);
assert!(delay2 > delay1);
}
}