Skip to main content

matrixcode_core/matrixrpc/lifecycle/
manager.rs

1//! Lifecycle Manager
2//!
3//! Manages the lifecycle of extension services including connection,
4//! reconnection, heartbeat monitoring, and graceful shutdown.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{broadcast, mpsc, RwLock};
11use tokio::time::{interval, sleep};
12
13use crate::matrixrpc::registry::RegistryService;
14use crate::matrixrpc::service::{ExtensionService, ServiceId, ServiceStatus};
15
16/// Lifecycle event
17#[derive(Debug, Clone)]
18pub enum LifecycleEvent {
19    /// Service started
20    Started(ServiceId),
21
22    /// Service stopped
23    Stopped(ServiceId),
24
25    /// Service status changed
26    StatusChanged {
27        id: ServiceId,
28        old_status: ServiceStatus,
29        new_status: ServiceStatus,
30    },
31
32    /// Heartbeat received
33    Heartbeat(ServiceId),
34
35    /// Heartbeat timeout
36    HeartbeatTimeout(ServiceId),
37
38    /// Reconnection attempt
39    Reconnecting {
40        id: ServiceId,
41        attempt: u32,
42        max_attempts: u32,
43    },
44
45    /// Reconnection succeeded
46    Reconnected(ServiceId),
47
48    /// Reconnection failed
49    ReconnectFailed(ServiceId),
50
51    /// Service error
52    Error {
53        id: ServiceId,
54        error: String,
55    },
56}
57
58/// Lifecycle configuration
59#[derive(Debug, Clone)]
60pub struct LifecycleConfig {
61    /// Heartbeat interval in seconds
62    pub heartbeat_interval_secs: u64,
63
64    /// Heartbeat timeout in seconds
65    pub heartbeat_timeout_secs: u64,
66
67    /// Maximum reconnection attempts
68    pub max_reconnect_attempts: u32,
69
70    /// Initial reconnection delay in milliseconds
71    pub reconnect_delay_ms: u64,
72
73    /// Maximum reconnection delay in milliseconds
74    pub max_reconnect_delay_ms: u64,
75
76    /// Backoff multiplier for reconnection delays
77    pub reconnect_backoff_multiplier: f64,
78
79    /// Enable auto-reconnection
80    pub auto_reconnect: bool,
81}
82
83impl Default for LifecycleConfig {
84    fn default() -> Self {
85        Self {
86            heartbeat_interval_secs: 30,
87            heartbeat_timeout_secs: 90,
88            max_reconnect_attempts: 5,
89            reconnect_delay_ms: 1000,
90            max_reconnect_delay_ms: 30000,
91            reconnect_backoff_multiplier: 2.0,
92            auto_reconnect: true,
93        }
94    }
95}
96
97/// Lifecycle manager error
98#[derive(Debug, thiserror::Error)]
99pub enum LifecycleError {
100    /// Service not found
101    #[error("Service '{0}' not found")]
102    NotFound(String),
103
104    /// Connection failed
105    #[error("Connection failed: {0}")]
106    ConnectionFailed(String),
107
108    /// Reconnection failed
109    #[error("Reconnection failed after {0} attempts")]
110    ReconnectFailed(u32),
111
112    /// Invalid state
113    #[error("Invalid state: {0}")]
114    InvalidState(String),
115
116    /// Internal error
117    #[error("Internal error: {0}")]
118    Internal(String),
119}
120
121/// Service lifecycle state
122#[derive(Debug, Clone)]
123struct ServiceLifecycle {
124#[allow(dead_code)]
125    /// Service ID
126    id: ServiceId,
127#[allow(dead_code)]
128
129    /// Current status
130    status: ServiceStatus,
131
132    /// Reconnection attempts
133    reconnect_attempts: u32,
134
135    /// Is auto-reconnect enabled for this service
136    auto_reconnect: bool,
137
138    /// Stop signal sender
139    stop_tx: Option<mpsc::Sender<()>>,
140}
141
142/// Lifecycle Manager
143///
144/// Manages service lifecycle events including:
145/// - Connection initialization
146/// - Heartbeat monitoring
147/// - Auto-reconnection with backoff
148/// - Graceful shutdown
149pub struct LifecycleManager {
150    /// Registry service reference
151    registry: Arc<RegistryService>,
152
153    /// Lifecycle configuration
154    config: LifecycleConfig,
155
156    /// Service lifecycle states
157    lifecycles: Arc<RwLock<HashMap<ServiceId, ServiceLifecycle>>>,
158
159    /// Event broadcaster
160    event_tx: broadcast::Sender<LifecycleEvent>,
161}
162
163impl LifecycleManager {
164    /// Create a new lifecycle manager
165    pub fn new(registry: Arc<RegistryService>) -> Self {
166        Self::with_config(registry, LifecycleConfig::default())
167    }
168
169    /// Create a new lifecycle manager with configuration
170    pub fn with_config(registry: Arc<RegistryService>, config: LifecycleConfig) -> Self {
171        let (event_tx, _) = broadcast::channel(256);
172
173        Self {
174            registry,
175            config,
176            lifecycles: Arc::new(RwLock::new(HashMap::new())),
177            event_tx,
178        }
179    }
180
181    /// Subscribe to lifecycle events
182    pub fn subscribe(&self) -> broadcast::Receiver<LifecycleEvent> {
183        self.event_tx.subscribe()
184    }
185
186    /// Start managing a service
187    pub async fn start_service(&self, service: ExtensionService) -> Result<ServiceId, LifecycleError> {
188        let auto_reconnect = service.transport.auto_reconnect;
189
190        // Register the service
191        let id = self
192            .registry
193            .register(service)
194            .await
195            .map_err(|e| LifecycleError::Internal(e.to_string()))?;
196
197        // Set initial status
198        self.registry
199            .update_status(&id, ServiceStatus::Starting)
200            .await
201            .map_err(|e| LifecycleError::Internal(e.to_string()))?;
202
203        // Create lifecycle state
204        let (stop_tx, stop_rx) = mpsc::channel(1);
205        let lifecycle = ServiceLifecycle {
206            id: id.clone(),
207            status: ServiceStatus::Starting,
208            reconnect_attempts: 0,
209            auto_reconnect,
210            stop_tx: Some(stop_tx),
211        };
212
213        {
214            let mut lifecycles = self.lifecycles.write().await;
215            lifecycles.insert(id.clone(), lifecycle);
216        }
217
218        // Start heartbeat monitor task
219        self.spawn_heartbeat_monitor(id.clone(), stop_rx);
220
221        // Emit started event
222        let _ = self.event_tx.send(LifecycleEvent::Started(id.clone()));
223
224        // Transition to running
225        self.transition_status(&id, ServiceStatus::Running).await?;
226
227        Ok(id)
228    }
229
230    /// Stop managing a service
231    pub async fn stop_service(&self, id: &ServiceId) -> Result<(), LifecycleError> {
232        // Update status first before removing from lifecycles
233        self.transition_status(id, ServiceStatus::Stopping).await?;
234
235        // Get lifecycle and remove from tracking
236        let lifecycle = {
237            let mut lifecycles = self.lifecycles.write().await;
238            lifecycles.remove(id).ok_or_else(|| LifecycleError::NotFound(id.to_string()))?
239        };
240
241        // Send stop signal
242        if let Some(stop_tx) = lifecycle.stop_tx {
243            let _ = stop_tx.send(()).await;
244        }
245
246        // Unregister from registry
247        self.registry
248            .unregister(id)
249            .await
250            .map_err(|e| LifecycleError::Internal(e.to_string()))?;
251
252        // Emit stopped event
253        let _ = self.event_tx.send(LifecycleEvent::Stopped(id.clone()));
254
255        Ok(())
256    }
257
258    /// Handle heartbeat from a service
259    pub async fn handle_heartbeat(&self, id: &ServiceId) -> Result<(), LifecycleError> {
260        self.registry
261            .heartbeat(id)
262            .await
263            .map_err(|e| LifecycleError::Internal(e.to_string()))?;
264
265        // Reset reconnection attempts
266        {
267            let mut lifecycles = self.lifecycles.write().await;
268            if let Some(lifecycle) = lifecycles.get_mut(id) {
269                lifecycle.reconnect_attempts = 0;
270            }
271        }
272
273        // Emit heartbeat event
274        let _ = self.event_tx.send(LifecycleEvent::Heartbeat(id.clone()));
275
276        Ok(())
277    }
278
279    /// Handle service error
280    pub async fn handle_error(&self, id: &ServiceId, error: String) -> Result<(), LifecycleError> {
281        // Emit error event
282        let _ = self.event_tx.send(LifecycleEvent::Error {
283            id: id.clone(),
284            error,
285        });
286
287        // Transition to error status
288        self.transition_status(id, ServiceStatus::Error).await?;
289
290        // Attempt reconnection if enabled
291        let should_reconnect = {
292            let lifecycles = self.lifecycles.read().await;
293            lifecycles
294                .get(id)
295                .map(|l| l.auto_reconnect)
296                .unwrap_or(false)
297        };
298
299        if should_reconnect {
300            self.attempt_reconnect(id).await?;
301        }
302
303        Ok(())
304    }
305
306    /// Attempt to reconnect a service
307    async fn attempt_reconnect(&self, id: &ServiceId) -> Result<(), LifecycleError> {
308        let (max_attempts, _delay_ms, backoff) = {
309            let lifecycles = self.lifecycles.read().await;
310            let lifecycle = lifecycles.get(id).ok_or_else(|| LifecycleError::NotFound(id.to_string()))?;
311            (self.config.max_reconnect_attempts, self.config.reconnect_delay_ms, lifecycle.reconnect_attempts)
312        };
313
314        // Check max attempts
315        if backoff >= max_attempts {
316            let _ = self.event_tx.send(LifecycleEvent::ReconnectFailed(id.clone()));
317            return Err(LifecycleError::ReconnectFailed(max_attempts));
318        }
319
320        // Update status and increment attempts
321        {
322            let mut lifecycles = self.lifecycles.write().await;
323            if let Some(lifecycle) = lifecycles.get_mut(id) {
324                lifecycle.reconnect_attempts += 1;
325                lifecycle.status = ServiceStatus::Reconnecting;
326            }
327        }
328
329        // Emit reconnecting event
330        let _ = self.event_tx.send(LifecycleEvent::Reconnecting {
331            id: id.clone(),
332            attempt: backoff + 1,
333            max_attempts: max_attempts,
334        });
335
336        self.registry
337            .update_status(id, ServiceStatus::Reconnecting)
338            .await
339            .map_err(|e| LifecycleError::Internal(e.to_string()))?;
340
341        // Calculate backoff delay
342        let delay = self.calculate_reconnect_delay(backoff);
343
344        // Wait for backoff
345        sleep(Duration::from_millis(delay)).await;
346
347        // Here you would attempt actual reconnection
348        // This is a placeholder - actual implementation depends on transport
349
350        Ok(())
351    }
352
353    /// Calculate reconnection delay with exponential backoff
354    fn calculate_reconnect_delay(&self, attempt: u32) -> u64 {
355        let base = self.config.reconnect_delay_ms as f64;
356        let multiplier = self.config.reconnect_backoff_multiplier.powi(attempt as i32);
357        let delay = base * multiplier;
358
359        // Add jitter (10%)
360        let jitter = delay * 0.1 * (rand_jitter() - 0.5) * 2.0;
361
362        let final_delay = (delay + jitter) as u64;
363        final_delay.min(self.config.max_reconnect_delay_ms)
364    }
365
366    /// Transition service status
367    async fn transition_status(
368        &self,
369        id: &ServiceId,
370        new_status: ServiceStatus,
371    ) -> Result<(), LifecycleError> {
372        let old_status = {
373            let lifecycles = self.lifecycles.read().await;
374            lifecycles
375                .get(id)
376                .map(|l| l.status)
377                .ok_or_else(|| LifecycleError::NotFound(id.to_string()))?
378        };
379
380        if old_status == new_status {
381            return Ok(());
382        }
383
384        // Update lifecycle state
385        {
386            let mut lifecycles = self.lifecycles.write().await;
387            if let Some(lifecycle) = lifecycles.get_mut(id) {
388                lifecycle.status = new_status;
389            }
390        }
391
392        // Update registry
393        self.registry
394            .update_status(id, new_status)
395            .await
396            .map_err(|e| LifecycleError::Internal(e.to_string()))?;
397
398        // Emit status changed event
399        let _ = self.event_tx.send(LifecycleEvent::StatusChanged {
400            id: id.clone(),
401            old_status,
402            new_status,
403        });
404
405        Ok(())
406    }
407
408    /// Spawn heartbeat monitor task
409    fn spawn_heartbeat_monitor(&self, id: ServiceId, mut stop_rx: mpsc::Receiver<()>) {
410        let registry = self.registry.clone();
411        let event_tx = self.event_tx.clone();
412        let timeout_secs = self.config.heartbeat_timeout_secs;
413
414        tokio::spawn(async move {
415            let mut check_interval = interval(Duration::from_secs(timeout_secs / 3));
416
417            loop {
418                tokio::select! {
419                    _ = stop_rx.recv() => {
420                        // Stop signal received
421                        break;
422                    }
423                    _ = check_interval.tick() => {
424                        // Check if service is healthy
425                        if let Some(service) = registry.get(&id).await {
426                            if !service.is_healthy(timeout_secs) {
427                                if service.status == ServiceStatus::Running {
428                                    // Emit heartbeat timeout event
429                                    let _ = event_tx.send(LifecycleEvent::HeartbeatTimeout(id.clone()));
430
431                                    // Update status to reconnecting
432                                    let _ = registry.update_status(&id, ServiceStatus::Reconnecting).await;
433                                }
434                            }
435                        } else {
436                            // Service no longer exists
437                            break;
438                        }
439                    }
440                }
441            }
442        });
443    }
444
445    /// Stop all services
446    pub async fn stop_all(&self) {
447        let ids: Vec<ServiceId> = {
448            let lifecycles = self.lifecycles.read().await;
449            lifecycles.keys().cloned().collect()
450        };
451
452        for id in ids {
453            let _ = self.stop_service(&id).await;
454        }
455    }
456
457    /// Get service status
458    pub async fn get_status(&self, id: &ServiceId) -> Option<ServiceStatus> {
459        let lifecycles = self.lifecycles.read().await;
460        lifecycles.get(id).map(|l| l.status)
461    }
462
463    /// Check if service is healthy
464    pub async fn is_healthy(&self, id: &ServiceId) -> bool {
465        let lifecycles = self.lifecycles.read().await;
466        lifecycles
467            .get(id)
468            .map(|l| l.status == ServiceStatus::Running)
469            .unwrap_or(false)
470    }
471
472    /// Get managed service count
473    pub async fn count(&self) -> usize {
474        self.lifecycles.read().await.len()
475    }
476
477    /// Trigger health check for all services
478    pub async fn health_check(&self) -> Vec<ServiceId> {
479        self.registry.health_check().await
480    }
481}
482
483/// Simple random jitter generator
484fn rand_jitter() -> f64 {
485    // Use a simple deterministic jitter based on time
486    // In production, use proper random number generator
487    use std::time::{SystemTime, UNIX_EPOCH};
488    let nanos = SystemTime::now()
489        .duration_since(UNIX_EPOCH)
490        .unwrap_or_default()
491        .subsec_nanos();
492    nanos as f64 / u32::MAX as f64
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[tokio::test]
500    async fn test_lifecycle_manager_creation() {
501        let registry = Arc::new(RegistryService::new());
502        let manager = LifecycleManager::new(registry);
503        assert_eq!(manager.count().await, 0);
504    }
505
506    #[tokio::test]
507    async fn test_start_stop_service() {
508        let registry = Arc::new(RegistryService::new());
509        let manager = LifecycleManager::new(registry);
510
511        let service = ExtensionService::new("test", "1.0.0");
512        let id = manager.start_service(service).await.unwrap();
513
514        assert_eq!(manager.count().await, 1);
515        assert_eq!(manager.get_status(&id).await, Some(ServiceStatus::Running));
516
517        manager.stop_service(&id).await.unwrap();
518        assert_eq!(manager.count().await, 0);
519    }
520
521    #[tokio::test]
522    async fn test_handle_heartbeat() {
523        let registry = Arc::new(RegistryService::new());
524        let manager = LifecycleManager::new(registry);
525
526        let service = ExtensionService::new("test", "1.0.0");
527        let id = manager.start_service(service).await.unwrap();
528
529        let result = manager.handle_heartbeat(&id).await;
530        assert!(result.is_ok());
531    }
532
533    #[tokio::test]
534    async fn test_handle_error() {
535        let registry = Arc::new(RegistryService::new());
536        let manager = LifecycleManager::new(registry.clone());
537
538        let service = ExtensionService::new("test", "1.0.0");
539        let id = manager.start_service(service).await.unwrap();
540
541        // Disable auto-reconnect for this test
542        {
543            let mut lifecycles = manager.lifecycles.write().await;
544            if let Some(l) = lifecycles.get_mut(&id) {
545                l.auto_reconnect = false;
546            }
547        }
548
549        manager
550            .handle_error(&id, "Test error".to_string())
551            .await
552            .unwrap();
553
554        assert_eq!(manager.get_status(&id).await, Some(ServiceStatus::Error));
555    }
556
557    #[tokio::test]
558    async fn test_lifecycle_events() {
559        let registry = Arc::new(RegistryService::new());
560        let manager = LifecycleManager::new(registry);
561
562        let mut event_rx = manager.subscribe();
563
564        let service = ExtensionService::new("test", "1.0.0");
565        let id = manager.start_service(service).await.unwrap();
566
567        // Should receive Started and StatusChanged events
568        let event1 = event_rx.try_recv();
569        let event2 = event_rx.try_recv();
570
571        assert!(event1.is_ok() || event2.is_ok());
572    }
573
574    #[tokio::test]
575    async fn test_lifecycle_config() {
576        let registry = Arc::new(RegistryService::new());
577        let config = LifecycleConfig {
578            heartbeat_interval_secs: 10,
579            heartbeat_timeout_secs: 30,
580            max_reconnect_attempts: 3,
581            ..Default::default()
582        };
583        let manager = LifecycleManager::with_config(registry, config);
584
585        assert_eq!(manager.config.heartbeat_interval_secs, 10);
586        assert_eq!(manager.config.heartbeat_timeout_secs, 30);
587        assert_eq!(manager.config.max_reconnect_attempts, 3);
588    }
589
590    #[test]
591    fn test_calculate_reconnect_delay() {
592        let registry = Arc::new(RegistryService::new());
593        let manager = LifecycleManager::new(registry);
594
595        let delay0 = manager.calculate_reconnect_delay(0);
596        let delay1 = manager.calculate_reconnect_delay(1);
597        let delay2 = manager.calculate_reconnect_delay(2);
598
599        // Delay should increase with backoff
600        assert!(delay1 > delay0);
601        assert!(delay2 > delay1);
602    }
603}