matrixcode_core/matrixrpc/lifecycle/
manager.rs1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{broadcast, mpsc, RwLock};
11use tokio::time::{interval, sleep};
12
13use crate::matrixrpc::registry::RegistryService;
14use crate::matrixrpc::service::{ExtensionService, ServiceId, ServiceStatus};
15
16#[derive(Debug, Clone)]
18pub enum LifecycleEvent {
19 Started(ServiceId),
21
22 Stopped(ServiceId),
24
25 StatusChanged {
27 id: ServiceId,
28 old_status: ServiceStatus,
29 new_status: ServiceStatus,
30 },
31
32 Heartbeat(ServiceId),
34
35 HeartbeatTimeout(ServiceId),
37
38 Reconnecting {
40 id: ServiceId,
41 attempt: u32,
42 max_attempts: u32,
43 },
44
45 Reconnected(ServiceId),
47
48 ReconnectFailed(ServiceId),
50
51 Error {
53 id: ServiceId,
54 error: String,
55 },
56}
57
58#[derive(Debug, Clone)]
60pub struct LifecycleConfig {
61 pub heartbeat_interval_secs: u64,
63
64 pub heartbeat_timeout_secs: u64,
66
67 pub max_reconnect_attempts: u32,
69
70 pub reconnect_delay_ms: u64,
72
73 pub max_reconnect_delay_ms: u64,
75
76 pub reconnect_backoff_multiplier: f64,
78
79 pub auto_reconnect: bool,
81}
82
83impl Default for LifecycleConfig {
84 fn default() -> Self {
85 Self {
86 heartbeat_interval_secs: 30,
87 heartbeat_timeout_secs: 90,
88 max_reconnect_attempts: 5,
89 reconnect_delay_ms: 1000,
90 max_reconnect_delay_ms: 30000,
91 reconnect_backoff_multiplier: 2.0,
92 auto_reconnect: true,
93 }
94 }
95}
96
97#[derive(Debug, thiserror::Error)]
99pub enum LifecycleError {
100 #[error("Service '{0}' not found")]
102 NotFound(String),
103
104 #[error("Connection failed: {0}")]
106 ConnectionFailed(String),
107
108 #[error("Reconnection failed after {0} attempts")]
110 ReconnectFailed(u32),
111
112 #[error("Invalid state: {0}")]
114 InvalidState(String),
115
116 #[error("Internal error: {0}")]
118 Internal(String),
119}
120
121#[derive(Debug, Clone)]
123struct ServiceLifecycle {
124#[allow(dead_code)]
125 id: ServiceId,
127#[allow(dead_code)]
128
129 status: ServiceStatus,
131
132 reconnect_attempts: u32,
134
135 auto_reconnect: bool,
137
138 stop_tx: Option<mpsc::Sender<()>>,
140}
141
142pub struct LifecycleManager {
150 registry: Arc<RegistryService>,
152
153 config: LifecycleConfig,
155
156 lifecycles: Arc<RwLock<HashMap<ServiceId, ServiceLifecycle>>>,
158
159 event_tx: broadcast::Sender<LifecycleEvent>,
161}
162
163impl LifecycleManager {
164 pub fn new(registry: Arc<RegistryService>) -> Self {
166 Self::with_config(registry, LifecycleConfig::default())
167 }
168
169 pub fn with_config(registry: Arc<RegistryService>, config: LifecycleConfig) -> Self {
171 let (event_tx, _) = broadcast::channel(256);
172
173 Self {
174 registry,
175 config,
176 lifecycles: Arc::new(RwLock::new(HashMap::new())),
177 event_tx,
178 }
179 }
180
181 pub fn subscribe(&self) -> broadcast::Receiver<LifecycleEvent> {
183 self.event_tx.subscribe()
184 }
185
186 pub async fn start_service(&self, service: ExtensionService) -> Result<ServiceId, LifecycleError> {
188 let auto_reconnect = service.transport.auto_reconnect;
189
190 let id = self
192 .registry
193 .register(service)
194 .await
195 .map_err(|e| LifecycleError::Internal(e.to_string()))?;
196
197 self.registry
199 .update_status(&id, ServiceStatus::Starting)
200 .await
201 .map_err(|e| LifecycleError::Internal(e.to_string()))?;
202
203 let (stop_tx, stop_rx) = mpsc::channel(1);
205 let lifecycle = ServiceLifecycle {
206 id: id.clone(),
207 status: ServiceStatus::Starting,
208 reconnect_attempts: 0,
209 auto_reconnect,
210 stop_tx: Some(stop_tx),
211 };
212
213 {
214 let mut lifecycles = self.lifecycles.write().await;
215 lifecycles.insert(id.clone(), lifecycle);
216 }
217
218 self.spawn_heartbeat_monitor(id.clone(), stop_rx);
220
221 let _ = self.event_tx.send(LifecycleEvent::Started(id.clone()));
223
224 self.transition_status(&id, ServiceStatus::Running).await?;
226
227 Ok(id)
228 }
229
230 pub async fn stop_service(&self, id: &ServiceId) -> Result<(), LifecycleError> {
232 self.transition_status(id, ServiceStatus::Stopping).await?;
234
235 let lifecycle = {
237 let mut lifecycles = self.lifecycles.write().await;
238 lifecycles.remove(id).ok_or_else(|| LifecycleError::NotFound(id.to_string()))?
239 };
240
241 if let Some(stop_tx) = lifecycle.stop_tx {
243 let _ = stop_tx.send(()).await;
244 }
245
246 self.registry
248 .unregister(id)
249 .await
250 .map_err(|e| LifecycleError::Internal(e.to_string()))?;
251
252 let _ = self.event_tx.send(LifecycleEvent::Stopped(id.clone()));
254
255 Ok(())
256 }
257
258 pub async fn handle_heartbeat(&self, id: &ServiceId) -> Result<(), LifecycleError> {
260 self.registry
261 .heartbeat(id)
262 .await
263 .map_err(|e| LifecycleError::Internal(e.to_string()))?;
264
265 {
267 let mut lifecycles = self.lifecycles.write().await;
268 if let Some(lifecycle) = lifecycles.get_mut(id) {
269 lifecycle.reconnect_attempts = 0;
270 }
271 }
272
273 let _ = self.event_tx.send(LifecycleEvent::Heartbeat(id.clone()));
275
276 Ok(())
277 }
278
279 pub async fn handle_error(&self, id: &ServiceId, error: String) -> Result<(), LifecycleError> {
281 let _ = self.event_tx.send(LifecycleEvent::Error {
283 id: id.clone(),
284 error,
285 });
286
287 self.transition_status(id, ServiceStatus::Error).await?;
289
290 let should_reconnect = {
292 let lifecycles = self.lifecycles.read().await;
293 lifecycles
294 .get(id)
295 .map(|l| l.auto_reconnect)
296 .unwrap_or(false)
297 };
298
299 if should_reconnect {
300 self.attempt_reconnect(id).await?;
301 }
302
303 Ok(())
304 }
305
306 async fn attempt_reconnect(&self, id: &ServiceId) -> Result<(), LifecycleError> {
308 let (max_attempts, _delay_ms, backoff) = {
309 let lifecycles = self.lifecycles.read().await;
310 let lifecycle = lifecycles.get(id).ok_or_else(|| LifecycleError::NotFound(id.to_string()))?;
311 (self.config.max_reconnect_attempts, self.config.reconnect_delay_ms, lifecycle.reconnect_attempts)
312 };
313
314 if backoff >= max_attempts {
316 let _ = self.event_tx.send(LifecycleEvent::ReconnectFailed(id.clone()));
317 return Err(LifecycleError::ReconnectFailed(max_attempts));
318 }
319
320 {
322 let mut lifecycles = self.lifecycles.write().await;
323 if let Some(lifecycle) = lifecycles.get_mut(id) {
324 lifecycle.reconnect_attempts += 1;
325 lifecycle.status = ServiceStatus::Reconnecting;
326 }
327 }
328
329 let _ = self.event_tx.send(LifecycleEvent::Reconnecting {
331 id: id.clone(),
332 attempt: backoff + 1,
333 max_attempts: max_attempts,
334 });
335
336 self.registry
337 .update_status(id, ServiceStatus::Reconnecting)
338 .await
339 .map_err(|e| LifecycleError::Internal(e.to_string()))?;
340
341 let delay = self.calculate_reconnect_delay(backoff);
343
344 sleep(Duration::from_millis(delay)).await;
346
347 Ok(())
351 }
352
353 fn calculate_reconnect_delay(&self, attempt: u32) -> u64 {
355 let base = self.config.reconnect_delay_ms as f64;
356 let multiplier = self.config.reconnect_backoff_multiplier.powi(attempt as i32);
357 let delay = base * multiplier;
358
359 let jitter = delay * 0.1 * (rand_jitter() - 0.5) * 2.0;
361
362 let final_delay = (delay + jitter) as u64;
363 final_delay.min(self.config.max_reconnect_delay_ms)
364 }
365
366 async fn transition_status(
368 &self,
369 id: &ServiceId,
370 new_status: ServiceStatus,
371 ) -> Result<(), LifecycleError> {
372 let old_status = {
373 let lifecycles = self.lifecycles.read().await;
374 lifecycles
375 .get(id)
376 .map(|l| l.status)
377 .ok_or_else(|| LifecycleError::NotFound(id.to_string()))?
378 };
379
380 if old_status == new_status {
381 return Ok(());
382 }
383
384 {
386 let mut lifecycles = self.lifecycles.write().await;
387 if let Some(lifecycle) = lifecycles.get_mut(id) {
388 lifecycle.status = new_status;
389 }
390 }
391
392 self.registry
394 .update_status(id, new_status)
395 .await
396 .map_err(|e| LifecycleError::Internal(e.to_string()))?;
397
398 let _ = self.event_tx.send(LifecycleEvent::StatusChanged {
400 id: id.clone(),
401 old_status,
402 new_status,
403 });
404
405 Ok(())
406 }
407
408 fn spawn_heartbeat_monitor(&self, id: ServiceId, mut stop_rx: mpsc::Receiver<()>) {
410 let registry = self.registry.clone();
411 let event_tx = self.event_tx.clone();
412 let timeout_secs = self.config.heartbeat_timeout_secs;
413
414 tokio::spawn(async move {
415 let mut check_interval = interval(Duration::from_secs(timeout_secs / 3));
416
417 loop {
418 tokio::select! {
419 _ = stop_rx.recv() => {
420 break;
422 }
423 _ = check_interval.tick() => {
424 if let Some(service) = registry.get(&id).await {
426 if !service.is_healthy(timeout_secs) {
427 if service.status == ServiceStatus::Running {
428 let _ = event_tx.send(LifecycleEvent::HeartbeatTimeout(id.clone()));
430
431 let _ = registry.update_status(&id, ServiceStatus::Reconnecting).await;
433 }
434 }
435 } else {
436 break;
438 }
439 }
440 }
441 }
442 });
443 }
444
445 pub async fn stop_all(&self) {
447 let ids: Vec<ServiceId> = {
448 let lifecycles = self.lifecycles.read().await;
449 lifecycles.keys().cloned().collect()
450 };
451
452 for id in ids {
453 let _ = self.stop_service(&id).await;
454 }
455 }
456
457 pub async fn get_status(&self, id: &ServiceId) -> Option<ServiceStatus> {
459 let lifecycles = self.lifecycles.read().await;
460 lifecycles.get(id).map(|l| l.status)
461 }
462
463 pub async fn is_healthy(&self, id: &ServiceId) -> bool {
465 let lifecycles = self.lifecycles.read().await;
466 lifecycles
467 .get(id)
468 .map(|l| l.status == ServiceStatus::Running)
469 .unwrap_or(false)
470 }
471
472 pub async fn count(&self) -> usize {
474 self.lifecycles.read().await.len()
475 }
476
477 pub async fn health_check(&self) -> Vec<ServiceId> {
479 self.registry.health_check().await
480 }
481}
482
483fn rand_jitter() -> f64 {
485 use std::time::{SystemTime, UNIX_EPOCH};
488 let nanos = SystemTime::now()
489 .duration_since(UNIX_EPOCH)
490 .unwrap_or_default()
491 .subsec_nanos();
492 nanos as f64 / u32::MAX as f64
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[tokio::test]
500 async fn test_lifecycle_manager_creation() {
501 let registry = Arc::new(RegistryService::new());
502 let manager = LifecycleManager::new(registry);
503 assert_eq!(manager.count().await, 0);
504 }
505
506 #[tokio::test]
507 async fn test_start_stop_service() {
508 let registry = Arc::new(RegistryService::new());
509 let manager = LifecycleManager::new(registry);
510
511 let service = ExtensionService::new("test", "1.0.0");
512 let id = manager.start_service(service).await.unwrap();
513
514 assert_eq!(manager.count().await, 1);
515 assert_eq!(manager.get_status(&id).await, Some(ServiceStatus::Running));
516
517 manager.stop_service(&id).await.unwrap();
518 assert_eq!(manager.count().await, 0);
519 }
520
521 #[tokio::test]
522 async fn test_handle_heartbeat() {
523 let registry = Arc::new(RegistryService::new());
524 let manager = LifecycleManager::new(registry);
525
526 let service = ExtensionService::new("test", "1.0.0");
527 let id = manager.start_service(service).await.unwrap();
528
529 let result = manager.handle_heartbeat(&id).await;
530 assert!(result.is_ok());
531 }
532
533 #[tokio::test]
534 async fn test_handle_error() {
535 let registry = Arc::new(RegistryService::new());
536 let manager = LifecycleManager::new(registry.clone());
537
538 let service = ExtensionService::new("test", "1.0.0");
539 let id = manager.start_service(service).await.unwrap();
540
541 {
543 let mut lifecycles = manager.lifecycles.write().await;
544 if let Some(l) = lifecycles.get_mut(&id) {
545 l.auto_reconnect = false;
546 }
547 }
548
549 manager
550 .handle_error(&id, "Test error".to_string())
551 .await
552 .unwrap();
553
554 assert_eq!(manager.get_status(&id).await, Some(ServiceStatus::Error));
555 }
556
557 #[tokio::test]
558 async fn test_lifecycle_events() {
559 let registry = Arc::new(RegistryService::new());
560 let manager = LifecycleManager::new(registry);
561
562 let mut event_rx = manager.subscribe();
563
564 let service = ExtensionService::new("test", "1.0.0");
565 let id = manager.start_service(service).await.unwrap();
566
567 let event1 = event_rx.try_recv();
569 let event2 = event_rx.try_recv();
570
571 assert!(event1.is_ok() || event2.is_ok());
572 }
573
574 #[tokio::test]
575 async fn test_lifecycle_config() {
576 let registry = Arc::new(RegistryService::new());
577 let config = LifecycleConfig {
578 heartbeat_interval_secs: 10,
579 heartbeat_timeout_secs: 30,
580 max_reconnect_attempts: 3,
581 ..Default::default()
582 };
583 let manager = LifecycleManager::with_config(registry, config);
584
585 assert_eq!(manager.config.heartbeat_interval_secs, 10);
586 assert_eq!(manager.config.heartbeat_timeout_secs, 30);
587 assert_eq!(manager.config.max_reconnect_attempts, 3);
588 }
589
590 #[test]
591 fn test_calculate_reconnect_delay() {
592 let registry = Arc::new(RegistryService::new());
593 let manager = LifecycleManager::new(registry);
594
595 let delay0 = manager.calculate_reconnect_delay(0);
596 let delay1 = manager.calculate_reconnect_delay(1);
597 let delay2 = manager.calculate_reconnect_delay(2);
598
599 assert!(delay1 > delay0);
601 assert!(delay2 > delay1);
602 }
603}