Skip to main content

axon/
event_bus.rs

1//! Event Bus + Daemon Supervisor — reactive infrastructure for AxonServer.
2//!
3//! Provides an in-process publish/subscribe event bus that daemons use to
4//! communicate, plus a supervisor that monitors daemon health and manages
5//! lifecycle transitions (Idle → Running → Hibernating → Crashed).
6//!
7//! Architecture:
8//!   - `EventBus` — channel-based pub/sub with topic filtering.
9//!   - `Event` — typed envelope: topic + payload + metadata.
10//!   - `Subscription` — filtered receiver bound to a daemon.
11//!   - `DaemonSupervisor` — health monitor with restart policy.
12//!
13//! The bus uses `tokio::sync::broadcast` for fan-out delivery:
14//! every subscriber gets every event, filtering by topic on receive.
15
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21// ── Event types ──────────────────────────────────────────────────────────
22
23/// A typed event envelope flowing through the bus.
24#[derive(Debug, Clone)]
25pub struct Event {
26    /// Topic name for routing (e.g., "deploy", "daemon.started", "flow.complete").
27    pub topic: String,
28    /// JSON payload — arbitrary structured data.
29    pub payload: serde_json::Value,
30    /// Source daemon or system component that emitted the event.
31    pub source: String,
32    /// Monotonic timestamp (elapsed since bus creation).
33    pub timestamp: Duration,
34}
35
36impl fmt::Display for Event {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "[{}] {} → {}", self.source, self.topic, self.payload)
39    }
40}
41
42/// A subscription filter for topic-based routing.
43#[derive(Debug, Clone)]
44pub struct TopicFilter {
45    /// Exact topic match, or prefix match if ends with ".*".
46    pub pattern: String,
47}
48
49impl TopicFilter {
50    pub fn new(pattern: &str) -> Self {
51        TopicFilter {
52            pattern: pattern.to_string(),
53        }
54    }
55
56    /// Check if an event topic matches this filter.
57    pub fn matches(&self, topic: &str) -> bool {
58        if self.pattern == "*" {
59            return true;
60        }
61        if let Some(prefix) = self.pattern.strip_suffix(".*") {
62            topic.starts_with(prefix) && (topic.len() == prefix.len() || topic.as_bytes()[prefix.len()] == b'.')
63        } else {
64            self.pattern == topic
65        }
66    }
67}
68
69// ── Event Bus ────────────────────────────────────────────────────────────
70
71/// Capacity of the broadcast channel (events buffered before oldest dropped).
72const BUS_CAPACITY: usize = 1024;
73
74/// In-process publish/subscribe event bus.
75///
76/// Uses `tokio::sync::broadcast` for fan-out. Each subscriber receives all
77/// events and filters locally by topic. This is efficient for the expected
78/// daemon count (tens, not thousands).
79#[derive(Clone)]
80pub struct EventBus {
81    sender: tokio::sync::broadcast::Sender<Event>,
82    created_at: Instant,
83    stats: Arc<Mutex<BusStats>>,
84}
85
86/// Aggregate bus statistics.
87#[derive(Debug, Clone, Default)]
88pub struct BusStats {
89    pub events_published: u64,
90    pub events_delivered: u64,
91    pub events_dropped: u64,
92    pub active_subscribers: u32,
93    pub topics_seen: Vec<String>,
94    /// Per-topic publish counts.
95    pub topic_publish_counts: HashMap<String, u64>,
96    /// Recent event history (ring buffer, max 200).
97    pub event_history: Vec<EventRecord>,
98}
99
100/// A recorded event for history/replay purposes.
101#[derive(Debug, Clone)]
102pub struct EventRecord {
103    /// Topic name.
104    pub topic: String,
105    /// JSON payload.
106    pub payload: serde_json::Value,
107    /// Source.
108    pub source: String,
109    /// Wall-clock timestamp (Unix seconds).
110    pub timestamp_secs: u64,
111}
112
113impl EventBus {
114    /// Create a new event bus.
115    pub fn new() -> Self {
116        let (sender, _) = tokio::sync::broadcast::channel(BUS_CAPACITY);
117        EventBus {
118            sender,
119            created_at: Instant::now(),
120            stats: Arc::new(Mutex::new(BusStats::default())),
121        }
122    }
123
124    /// Publish an event to all subscribers.
125    pub fn publish(&self, topic: &str, payload: serde_json::Value, source: &str) -> Event {
126        let event = Event {
127            topic: topic.to_string(),
128            payload,
129            source: source.to_string(),
130            timestamp: self.created_at.elapsed(),
131        };
132
133        {
134            let mut stats = self.stats.lock().unwrap();
135            stats.events_published += 1;
136            if !stats.topics_seen.contains(&event.topic) {
137                stats.topics_seen.push(event.topic.clone());
138            }
139            *stats.topic_publish_counts.entry(event.topic.clone()).or_insert(0) += 1;
140            stats.event_history.push(EventRecord {
141                topic: event.topic.clone(),
142                payload: event.payload.clone(),
143                source: event.source.clone(),
144                timestamp_secs: std::time::SystemTime::now()
145                    .duration_since(std::time::UNIX_EPOCH)
146                    .unwrap_or_default()
147                    .as_secs(),
148            });
149            if stats.event_history.len() > 200 {
150                stats.event_history.remove(0);
151            }
152        }
153
154        // Send to all receivers; if none exist, event is silently dropped.
155        let _ = self.sender.send(event.clone());
156        event
157    }
158
159    /// Get recent event history, optionally filtered by topic.
160    pub fn recent_events(&self, limit: usize, topic_filter: Option<&str>) -> Vec<EventRecord> {
161        let stats = self.stats.lock().unwrap();
162        stats.event_history.iter().rev()
163            .filter(|e| match topic_filter {
164                Some(t) => e.topic == t || t == "*" || (t.ends_with(".*") && e.topic.starts_with(&t[..t.len()-2])),
165                None => true,
166            })
167            .take(limit)
168            .cloned()
169            .collect()
170    }
171
172    /// Create a subscription filtered by topic pattern.
173    pub fn subscribe(&self, filter: TopicFilter) -> Subscription {
174        let receiver = self.sender.subscribe();
175
176        {
177            let mut stats = self.stats.lock().unwrap();
178            stats.active_subscribers += 1;
179        }
180
181        Subscription {
182            receiver,
183            filter,
184            bus_stats: Arc::clone(&self.stats),
185        }
186    }
187
188    /// Get current bus statistics.
189    pub fn stats(&self) -> BusStats {
190        self.stats.lock().unwrap().clone()
191    }
192
193    /// Number of active subscribers.
194    pub fn subscriber_count(&self) -> usize {
195        self.sender.receiver_count()
196    }
197}
198
199/// A filtered subscription that receives events matching a topic pattern.
200pub struct Subscription {
201    receiver: tokio::sync::broadcast::Receiver<Event>,
202    filter: TopicFilter,
203    bus_stats: Arc<Mutex<BusStats>>,
204}
205
206impl Subscription {
207    /// Receive the next matching event (async, blocks until available).
208    pub async fn recv(&mut self) -> Result<Event, SubscriptionError> {
209        loop {
210            match self.receiver.recv().await {
211                Ok(event) => {
212                    if self.filter.matches(&event.topic) {
213                        let mut stats = self.bus_stats.lock().unwrap();
214                        stats.events_delivered += 1;
215                        return Ok(event);
216                    }
217                    // Non-matching event — continue waiting.
218                }
219                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
220                    let mut stats = self.bus_stats.lock().unwrap();
221                    stats.events_dropped += n;
222                    // Continue receiving — some events were missed.
223                }
224                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
225                    return Err(SubscriptionError::BusClosed);
226                }
227            }
228        }
229    }
230
231    /// Try to receive a matching event without blocking.
232    pub fn try_recv(&mut self) -> Result<Option<Event>, SubscriptionError> {
233        loop {
234            match self.receiver.try_recv() {
235                Ok(event) => {
236                    if self.filter.matches(&event.topic) {
237                        let mut stats = self.bus_stats.lock().unwrap();
238                        stats.events_delivered += 1;
239                        return Ok(Some(event));
240                    }
241                    // Non-matching — try next.
242                }
243                Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {
244                    return Ok(None);
245                }
246                Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => {
247                    let mut stats = self.bus_stats.lock().unwrap();
248                    stats.events_dropped += n;
249                    // Continue — try next available.
250                }
251                Err(tokio::sync::broadcast::error::TryRecvError::Closed) => {
252                    return Err(SubscriptionError::BusClosed);
253                }
254            }
255        }
256    }
257}
258
259impl Drop for Subscription {
260    fn drop(&mut self) {
261        let mut stats = self.bus_stats.lock().unwrap();
262        stats.active_subscribers = stats.active_subscribers.saturating_sub(1);
263    }
264}
265
266/// Errors from subscription operations.
267#[derive(Debug, Clone, PartialEq, Eq)]
268pub enum SubscriptionError {
269    /// The event bus has been dropped (server shutting down).
270    BusClosed,
271}
272
273impl fmt::Display for SubscriptionError {
274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275        match self {
276            SubscriptionError::BusClosed => write!(f, "event bus closed"),
277        }
278    }
279}
280
281// ── Daemon Supervisor ────────────────────────────────────────────────────
282
283/// Restart policy for supervised daemons.
284#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
285#[serde(rename_all = "snake_case")]
286pub enum RestartPolicy {
287    /// Never restart on crash.
288    Never,
289    /// Restart up to N times, then give up.
290    OnCrash { max_restarts: u32 },
291    /// Always restart (with exponential backoff).
292    Always,
293}
294
295impl Default for RestartPolicy {
296    fn default() -> Self {
297        RestartPolicy::OnCrash { max_restarts: 3 }
298    }
299}
300
301/// A supervised daemon entry tracked by the supervisor.
302#[derive(Debug, Clone, serde::Serialize)]
303pub struct SupervisedDaemon {
304    pub name: String,
305    pub state: SupervisorState,
306    pub restart_policy: RestartPolicy,
307    pub restart_count: u32,
308    pub last_heartbeat: Option<Duration>,
309    pub crash_reason: Option<String>,
310    pub uptime: Duration,
311}
312
313/// Supervisor-level daemon state (more granular than DaemonState).
314#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
315#[serde(rename_all = "lowercase")]
316pub enum SupervisorState {
317    /// Registered but not started.
318    Registered,
319    /// Actively running.
320    Running,
321    /// Waiting for an event (hibernating).
322    Waiting,
323    /// Restarting after a crash.
324    Restarting,
325    /// Stopped normally.
326    Stopped,
327    /// Crashed and won't be restarted (policy exhausted).
328    Dead,
329}
330
331/// The daemon supervisor: monitors health, enforces restart policies,
332/// and emits lifecycle events to the bus.
333pub struct DaemonSupervisor {
334    daemons: HashMap<String, SupervisedDaemon>,
335    bus: EventBus,
336    created_at: Instant,
337    heartbeat_timeout: Duration,
338}
339
340impl DaemonSupervisor {
341    /// Create a new supervisor attached to an event bus.
342    pub fn new(bus: EventBus) -> Self {
343        DaemonSupervisor {
344            daemons: HashMap::new(),
345            bus,
346            created_at: Instant::now(),
347            heartbeat_timeout: Duration::from_secs(30),
348        }
349    }
350
351    /// Register a daemon for supervision with a restart policy.
352    pub fn register(&mut self, name: &str, policy: RestartPolicy) {
353        let daemon = SupervisedDaemon {
354            name: name.to_string(),
355            state: SupervisorState::Registered,
356            restart_policy: policy,
357            restart_count: 0,
358            last_heartbeat: None,
359            crash_reason: None,
360            uptime: Duration::ZERO,
361        };
362        self.daemons.insert(name.to_string(), daemon);
363
364        self.bus.publish(
365            "supervisor.registered",
366            serde_json::json!({ "daemon": name }),
367            "supervisor",
368        );
369    }
370
371    /// Mark a daemon as started.
372    pub fn mark_started(&mut self, name: &str) -> bool {
373        if let Some(d) = self.daemons.get_mut(name) {
374            d.state = SupervisorState::Running;
375            d.last_heartbeat = Some(self.created_at.elapsed());
376            d.uptime = Duration::ZERO;
377
378            self.bus.publish(
379                "supervisor.started",
380                serde_json::json!({ "daemon": name }),
381                "supervisor",
382            );
383            true
384        } else {
385            false
386        }
387    }
388
389    /// Record a heartbeat from a daemon.
390    pub fn heartbeat(&mut self, name: &str) -> bool {
391        if let Some(d) = self.daemons.get_mut(name) {
392            d.last_heartbeat = Some(self.created_at.elapsed());
393            true
394        } else {
395            false
396        }
397    }
398
399    /// Mark a daemon as waiting for events (hibernating).
400    pub fn mark_waiting(&mut self, name: &str) -> bool {
401        if let Some(d) = self.daemons.get_mut(name) {
402            d.state = SupervisorState::Waiting;
403
404            self.bus.publish(
405                "supervisor.waiting",
406                serde_json::json!({ "daemon": name }),
407                "supervisor",
408            );
409            true
410        } else {
411            false
412        }
413    }
414
415    /// Report that a daemon has crashed. Returns whether it will be restarted.
416    pub fn report_crash(&mut self, name: &str, reason: &str) -> bool {
417        if let Some(d) = self.daemons.get_mut(name) {
418            d.crash_reason = Some(reason.to_string());
419            d.restart_count += 1;
420
421            let will_restart = match d.restart_policy {
422                RestartPolicy::Never => false,
423                RestartPolicy::OnCrash { max_restarts } => d.restart_count <= max_restarts,
424                RestartPolicy::Always => true,
425            };
426
427            if will_restart {
428                d.state = SupervisorState::Restarting;
429                self.bus.publish(
430                    "supervisor.restarting",
431                    serde_json::json!({
432                        "daemon": name,
433                        "reason": reason,
434                        "restart_count": d.restart_count,
435                    }),
436                    "supervisor",
437                );
438            } else {
439                d.state = SupervisorState::Dead;
440                self.bus.publish(
441                    "supervisor.dead",
442                    serde_json::json!({
443                        "daemon": name,
444                        "reason": reason,
445                        "restart_count": d.restart_count,
446                    }),
447                    "supervisor",
448                );
449            }
450
451            will_restart
452        } else {
453            false
454        }
455    }
456
457    /// Stop a daemon normally.
458    pub fn stop(&mut self, name: &str) -> bool {
459        if let Some(d) = self.daemons.get_mut(name) {
460            d.state = SupervisorState::Stopped;
461
462            self.bus.publish(
463                "supervisor.stopped",
464                serde_json::json!({ "daemon": name }),
465                "supervisor",
466            );
467            true
468        } else {
469            false
470        }
471    }
472
473    /// Remove a daemon from supervision.
474    pub fn unregister(&mut self, name: &str) -> bool {
475        if self.daemons.remove(name).is_some() {
476            self.bus.publish(
477                "supervisor.unregistered",
478                serde_json::json!({ "daemon": name }),
479                "supervisor",
480            );
481            true
482        } else {
483            false
484        }
485    }
486
487    /// Get the state of a specific daemon.
488    pub fn get(&self, name: &str) -> Option<&SupervisedDaemon> {
489        self.daemons.get(name)
490    }
491
492    /// List all supervised daemons.
493    pub fn list(&self) -> Vec<&SupervisedDaemon> {
494        self.daemons.values().collect()
495    }
496
497    /// Count daemons in each state.
498    pub fn state_counts(&self) -> HashMap<&'static str, usize> {
499        let mut counts = HashMap::new();
500        for d in self.daemons.values() {
501            let key = match d.state {
502                SupervisorState::Registered => "registered",
503                SupervisorState::Running => "running",
504                SupervisorState::Waiting => "waiting",
505                SupervisorState::Restarting => "restarting",
506                SupervisorState::Stopped => "stopped",
507                SupervisorState::Dead => "dead",
508            };
509            *counts.entry(key).or_insert(0) += 1;
510        }
511        counts
512    }
513
514    /// Check all daemons for stale heartbeats. Returns names of timed-out daemons.
515    pub fn check_heartbeats(&mut self) -> Vec<String> {
516        let now = self.created_at.elapsed();
517        let timeout = self.heartbeat_timeout;
518        let mut timed_out = Vec::new();
519
520        let names: Vec<String> = self
521            .daemons
522            .iter()
523            .filter(|(_, d)| d.state == SupervisorState::Running)
524            .filter(|(_, d)| {
525                d.last_heartbeat
526                    .map(|hb| now.saturating_sub(hb) > timeout)
527                    .unwrap_or(false)
528            })
529            .map(|(name, _)| name.clone())
530            .collect();
531
532        for name in &names {
533            self.report_crash(&name, "heartbeat timeout");
534            timed_out.push(name.clone());
535        }
536
537        timed_out
538    }
539
540    /// Summary string for logging.
541    pub fn summary(&self) -> String {
542        let counts = self.state_counts();
543        let total = self.daemons.len();
544        let running = counts.get("running").copied().unwrap_or(0);
545        let waiting = counts.get("waiting").copied().unwrap_or(0);
546        let dead = counts.get("dead").copied().unwrap_or(0);
547        format!(
548            "{total} daemons ({running} running, {waiting} waiting, {dead} dead)"
549        )
550    }
551}
552
553// ── Tests ────────────────────────────────────────────────────────────────
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn topic_filter_exact() {
561        let f = TopicFilter::new("deploy");
562        assert!(f.matches("deploy"));
563        assert!(!f.matches("deploy.done"));
564        assert!(!f.matches("undeploy"));
565    }
566
567    #[test]
568    fn topic_filter_wildcard() {
569        let f = TopicFilter::new("*");
570        assert!(f.matches("deploy"));
571        assert!(f.matches("supervisor.started"));
572        assert!(f.matches("anything"));
573    }
574
575    #[test]
576    fn topic_filter_prefix() {
577        let f = TopicFilter::new("supervisor.*");
578        assert!(f.matches("supervisor.started"));
579        assert!(f.matches("supervisor.stopped"));
580        assert!(f.matches("supervisor.dead"));
581        assert!(!f.matches("deploy"));
582        assert!(!f.matches("supervisorx"));
583    }
584
585    #[test]
586    fn bus_publish_and_stats() {
587        let bus = EventBus::new();
588        bus.publish("test.event", serde_json::json!({"x": 1}), "test");
589        bus.publish("test.event", serde_json::json!({"x": 2}), "test");
590        bus.publish("other", serde_json::json!(null), "sys");
591
592        let stats = bus.stats();
593        assert_eq!(stats.events_published, 3);
594        assert_eq!(stats.topics_seen.len(), 2);
595        assert!(stats.topics_seen.contains(&"test.event".to_string()));
596        assert!(stats.topics_seen.contains(&"other".to_string()));
597    }
598
599    #[tokio::test]
600    async fn bus_subscribe_and_recv() {
601        let bus = EventBus::new();
602        let mut sub = bus.subscribe(TopicFilter::new("hello"));
603
604        bus.publish("hello", serde_json::json!({"msg": "world"}), "test");
605        bus.publish("ignore", serde_json::json!(null), "test");
606        bus.publish("hello", serde_json::json!({"msg": "again"}), "test");
607
608        let e1 = sub.try_recv().unwrap().unwrap();
609        assert_eq!(e1.topic, "hello");
610        assert_eq!(e1.payload["msg"], "world");
611
612        let e2 = sub.try_recv().unwrap().unwrap();
613        assert_eq!(e2.payload["msg"], "again");
614
615        // No more matching events
616        assert!(sub.try_recv().unwrap().is_none());
617    }
618
619    #[test]
620    fn bus_subscriber_count() {
621        let bus = EventBus::new();
622        assert_eq!(bus.subscriber_count(), 0);
623
624        let _sub1 = bus.subscribe(TopicFilter::new("*"));
625        assert_eq!(bus.subscriber_count(), 1);
626
627        let _sub2 = bus.subscribe(TopicFilter::new("deploy"));
628        assert_eq!(bus.subscriber_count(), 2);
629
630        drop(_sub1);
631        assert_eq!(bus.subscriber_count(), 1);
632    }
633
634    #[test]
635    fn supervisor_register_and_lifecycle() {
636        let bus = EventBus::new();
637        let mut sup = DaemonSupervisor::new(bus);
638
639        sup.register("flow_a", RestartPolicy::default());
640        assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Registered);
641
642        sup.mark_started("flow_a");
643        assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Running);
644
645        sup.mark_waiting("flow_a");
646        assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Waiting);
647
648        sup.stop("flow_a");
649        assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Stopped);
650    }
651
652    #[test]
653    fn supervisor_crash_restart_policy() {
654        let bus = EventBus::new();
655        let mut sup = DaemonSupervisor::new(bus);
656
657        // OnCrash { max_restarts: 2 }
658        sup.register("flow_b", RestartPolicy::OnCrash { max_restarts: 2 });
659        sup.mark_started("flow_b");
660
661        // First crash → restart
662        assert!(sup.report_crash("flow_b", "panic"));
663        assert_eq!(sup.get("flow_b").unwrap().state, SupervisorState::Restarting);
664
665        // Second crash → restart
666        sup.mark_started("flow_b");
667        assert!(sup.report_crash("flow_b", "panic again"));
668        assert_eq!(sup.get("flow_b").unwrap().state, SupervisorState::Restarting);
669
670        // Third crash → dead (exceeded max_restarts=2)
671        sup.mark_started("flow_b");
672        assert!(!sup.report_crash("flow_b", "fatal"));
673        assert_eq!(sup.get("flow_b").unwrap().state, SupervisorState::Dead);
674        assert_eq!(sup.get("flow_b").unwrap().restart_count, 3);
675    }
676
677    #[test]
678    fn supervisor_never_restart() {
679        let bus = EventBus::new();
680        let mut sup = DaemonSupervisor::new(bus);
681
682        sup.register("ephemeral", RestartPolicy::Never);
683        sup.mark_started("ephemeral");
684
685        assert!(!sup.report_crash("ephemeral", "one shot"));
686        assert_eq!(sup.get("ephemeral").unwrap().state, SupervisorState::Dead);
687    }
688
689    #[test]
690    fn supervisor_always_restart() {
691        let bus = EventBus::new();
692        let mut sup = DaemonSupervisor::new(bus);
693
694        sup.register("immortal", RestartPolicy::Always);
695        sup.mark_started("immortal");
696
697        for i in 0..10 {
698            assert!(sup.report_crash("immortal", &format!("crash {i}")));
699            assert_eq!(sup.get("immortal").unwrap().state, SupervisorState::Restarting);
700            sup.mark_started("immortal");
701        }
702        assert_eq!(sup.get("immortal").unwrap().restart_count, 10);
703    }
704
705    #[test]
706    fn supervisor_unregister() {
707        let bus = EventBus::new();
708        let mut sup = DaemonSupervisor::new(bus);
709
710        sup.register("temp", RestartPolicy::Never);
711        assert!(sup.unregister("temp"));
712        assert!(sup.get("temp").is_none());
713        assert!(!sup.unregister("temp")); // Already gone
714    }
715
716    #[test]
717    fn supervisor_state_counts() {
718        let bus = EventBus::new();
719        let mut sup = DaemonSupervisor::new(bus);
720
721        sup.register("a", RestartPolicy::Never);
722        sup.register("b", RestartPolicy::Never);
723        sup.register("c", RestartPolicy::Never);
724
725        sup.mark_started("a");
726        sup.mark_started("b");
727        sup.mark_waiting("c");
728
729        let counts = sup.state_counts();
730        assert_eq!(counts.get("running"), Some(&2));
731        assert_eq!(counts.get("waiting"), Some(&1));
732    }
733
734    #[test]
735    fn supervisor_summary() {
736        let bus = EventBus::new();
737        let mut sup = DaemonSupervisor::new(bus);
738
739        sup.register("x", RestartPolicy::Never);
740        sup.register("y", RestartPolicy::Never);
741        sup.mark_started("x");
742
743        let s = sup.summary();
744        assert!(s.contains("2 daemons"));
745        assert!(s.contains("1 running"));
746    }
747
748    #[test]
749    fn supervisor_heartbeat_timeout() {
750        let bus = EventBus::new();
751        let mut sup = DaemonSupervisor::new(bus);
752        // Set a very short timeout for testing
753        sup.heartbeat_timeout = Duration::from_millis(1);
754
755        sup.register("slow", RestartPolicy::OnCrash { max_restarts: 1 });
756        sup.mark_started("slow");
757
758        // Simulate time passing by setting heartbeat in the past
759        // (heartbeat was set by mark_started, so it's "now")
760        // We need to wait just a tiny bit for the timeout to trigger
761        std::thread::sleep(Duration::from_millis(5));
762
763        let timed_out = sup.check_heartbeats();
764        assert_eq!(timed_out, vec!["slow"]);
765        assert_eq!(sup.get("slow").unwrap().state, SupervisorState::Restarting);
766    }
767
768    #[test]
769    fn event_display() {
770        let event = Event {
771            topic: "deploy".to_string(),
772            payload: serde_json::json!({"flow": "TestFlow"}),
773            source: "client".to_string(),
774            timestamp: Duration::from_secs(5),
775        };
776        let s = format!("{event}");
777        assert!(s.contains("client"));
778        assert!(s.contains("deploy"));
779    }
780
781    #[test]
782    fn supervisor_emits_lifecycle_events() {
783        let bus = EventBus::new();
784        let mut sub = bus.subscribe(TopicFilter::new("supervisor.*"));
785        let mut sup = DaemonSupervisor::new(bus);
786
787        sup.register("d1", RestartPolicy::Never);
788        sup.mark_started("d1");
789        sup.stop("d1");
790
791        // Should have received: registered, started, stopped
792        let e1 = sub.try_recv().unwrap().unwrap();
793        assert_eq!(e1.topic, "supervisor.registered");
794
795        let e2 = sub.try_recv().unwrap().unwrap();
796        assert_eq!(e2.topic, "supervisor.started");
797
798        let e3 = sub.try_recv().unwrap().unwrap();
799        assert_eq!(e3.topic, "supervisor.stopped");
800
801        assert!(sub.try_recv().unwrap().is_none());
802    }
803}