1use std::collections::HashMap;
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone)]
25pub struct Event {
26 pub topic: String,
28 pub payload: serde_json::Value,
30 pub source: String,
32 pub timestamp: Duration,
34}
35
36impl fmt::Display for Event {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "[{}] {} → {}", self.source, self.topic, self.payload)
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct TopicFilter {
45 pub pattern: String,
47}
48
49impl TopicFilter {
50 pub fn new(pattern: &str) -> Self {
51 TopicFilter {
52 pattern: pattern.to_string(),
53 }
54 }
55
56 pub fn matches(&self, topic: &str) -> bool {
58 if self.pattern == "*" {
59 return true;
60 }
61 if let Some(prefix) = self.pattern.strip_suffix(".*") {
62 topic.starts_with(prefix) && (topic.len() == prefix.len() || topic.as_bytes()[prefix.len()] == b'.')
63 } else {
64 self.pattern == topic
65 }
66 }
67}
68
69const BUS_CAPACITY: usize = 1024;
73
74#[derive(Clone)]
80pub struct EventBus {
81 sender: tokio::sync::broadcast::Sender<Event>,
82 created_at: Instant,
83 stats: Arc<Mutex<BusStats>>,
84}
85
86#[derive(Debug, Clone, Default)]
88pub struct BusStats {
89 pub events_published: u64,
90 pub events_delivered: u64,
91 pub events_dropped: u64,
92 pub active_subscribers: u32,
93 pub topics_seen: Vec<String>,
94 pub topic_publish_counts: HashMap<String, u64>,
96 pub event_history: Vec<EventRecord>,
98}
99
100#[derive(Debug, Clone)]
102pub struct EventRecord {
103 pub topic: String,
105 pub payload: serde_json::Value,
107 pub source: String,
109 pub timestamp_secs: u64,
111}
112
113impl EventBus {
114 pub fn new() -> Self {
116 let (sender, _) = tokio::sync::broadcast::channel(BUS_CAPACITY);
117 EventBus {
118 sender,
119 created_at: Instant::now(),
120 stats: Arc::new(Mutex::new(BusStats::default())),
121 }
122 }
123
124 pub fn publish(&self, topic: &str, payload: serde_json::Value, source: &str) -> Event {
126 let event = Event {
127 topic: topic.to_string(),
128 payload,
129 source: source.to_string(),
130 timestamp: self.created_at.elapsed(),
131 };
132
133 {
134 let mut stats = self.stats.lock().unwrap();
135 stats.events_published += 1;
136 if !stats.topics_seen.contains(&event.topic) {
137 stats.topics_seen.push(event.topic.clone());
138 }
139 *stats.topic_publish_counts.entry(event.topic.clone()).or_insert(0) += 1;
140 stats.event_history.push(EventRecord {
141 topic: event.topic.clone(),
142 payload: event.payload.clone(),
143 source: event.source.clone(),
144 timestamp_secs: std::time::SystemTime::now()
145 .duration_since(std::time::UNIX_EPOCH)
146 .unwrap_or_default()
147 .as_secs(),
148 });
149 if stats.event_history.len() > 200 {
150 stats.event_history.remove(0);
151 }
152 }
153
154 let _ = self.sender.send(event.clone());
156 event
157 }
158
159 pub fn recent_events(&self, limit: usize, topic_filter: Option<&str>) -> Vec<EventRecord> {
161 let stats = self.stats.lock().unwrap();
162 stats.event_history.iter().rev()
163 .filter(|e| match topic_filter {
164 Some(t) => e.topic == t || t == "*" || (t.ends_with(".*") && e.topic.starts_with(&t[..t.len()-2])),
165 None => true,
166 })
167 .take(limit)
168 .cloned()
169 .collect()
170 }
171
172 pub fn subscribe(&self, filter: TopicFilter) -> Subscription {
174 let receiver = self.sender.subscribe();
175
176 {
177 let mut stats = self.stats.lock().unwrap();
178 stats.active_subscribers += 1;
179 }
180
181 Subscription {
182 receiver,
183 filter,
184 bus_stats: Arc::clone(&self.stats),
185 }
186 }
187
188 pub fn stats(&self) -> BusStats {
190 self.stats.lock().unwrap().clone()
191 }
192
193 pub fn subscriber_count(&self) -> usize {
195 self.sender.receiver_count()
196 }
197}
198
199pub struct Subscription {
201 receiver: tokio::sync::broadcast::Receiver<Event>,
202 filter: TopicFilter,
203 bus_stats: Arc<Mutex<BusStats>>,
204}
205
206impl Subscription {
207 pub async fn recv(&mut self) -> Result<Event, SubscriptionError> {
209 loop {
210 match self.receiver.recv().await {
211 Ok(event) => {
212 if self.filter.matches(&event.topic) {
213 let mut stats = self.bus_stats.lock().unwrap();
214 stats.events_delivered += 1;
215 return Ok(event);
216 }
217 }
219 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
220 let mut stats = self.bus_stats.lock().unwrap();
221 stats.events_dropped += n;
222 }
224 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
225 return Err(SubscriptionError::BusClosed);
226 }
227 }
228 }
229 }
230
231 pub fn try_recv(&mut self) -> Result<Option<Event>, SubscriptionError> {
233 loop {
234 match self.receiver.try_recv() {
235 Ok(event) => {
236 if self.filter.matches(&event.topic) {
237 let mut stats = self.bus_stats.lock().unwrap();
238 stats.events_delivered += 1;
239 return Ok(Some(event));
240 }
241 }
243 Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {
244 return Ok(None);
245 }
246 Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => {
247 let mut stats = self.bus_stats.lock().unwrap();
248 stats.events_dropped += n;
249 }
251 Err(tokio::sync::broadcast::error::TryRecvError::Closed) => {
252 return Err(SubscriptionError::BusClosed);
253 }
254 }
255 }
256 }
257}
258
259impl Drop for Subscription {
260 fn drop(&mut self) {
261 let mut stats = self.bus_stats.lock().unwrap();
262 stats.active_subscribers = stats.active_subscribers.saturating_sub(1);
263 }
264}
265
266#[derive(Debug, Clone, PartialEq, Eq)]
268pub enum SubscriptionError {
269 BusClosed,
271}
272
273impl fmt::Display for SubscriptionError {
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 match self {
276 SubscriptionError::BusClosed => write!(f, "event bus closed"),
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
285#[serde(rename_all = "snake_case")]
286pub enum RestartPolicy {
287 Never,
289 OnCrash { max_restarts: u32 },
291 Always,
293}
294
295impl Default for RestartPolicy {
296 fn default() -> Self {
297 RestartPolicy::OnCrash { max_restarts: 3 }
298 }
299}
300
301#[derive(Debug, Clone, serde::Serialize)]
303pub struct SupervisedDaemon {
304 pub name: String,
305 pub state: SupervisorState,
306 pub restart_policy: RestartPolicy,
307 pub restart_count: u32,
308 pub last_heartbeat: Option<Duration>,
309 pub crash_reason: Option<String>,
310 pub uptime: Duration,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
315#[serde(rename_all = "lowercase")]
316pub enum SupervisorState {
317 Registered,
319 Running,
321 Waiting,
323 Restarting,
325 Stopped,
327 Dead,
329}
330
331pub struct DaemonSupervisor {
334 daemons: HashMap<String, SupervisedDaemon>,
335 bus: EventBus,
336 created_at: Instant,
337 heartbeat_timeout: Duration,
338}
339
340impl DaemonSupervisor {
341 pub fn new(bus: EventBus) -> Self {
343 DaemonSupervisor {
344 daemons: HashMap::new(),
345 bus,
346 created_at: Instant::now(),
347 heartbeat_timeout: Duration::from_secs(30),
348 }
349 }
350
351 pub fn register(&mut self, name: &str, policy: RestartPolicy) {
353 let daemon = SupervisedDaemon {
354 name: name.to_string(),
355 state: SupervisorState::Registered,
356 restart_policy: policy,
357 restart_count: 0,
358 last_heartbeat: None,
359 crash_reason: None,
360 uptime: Duration::ZERO,
361 };
362 self.daemons.insert(name.to_string(), daemon);
363
364 self.bus.publish(
365 "supervisor.registered",
366 serde_json::json!({ "daemon": name }),
367 "supervisor",
368 );
369 }
370
371 pub fn mark_started(&mut self, name: &str) -> bool {
373 if let Some(d) = self.daemons.get_mut(name) {
374 d.state = SupervisorState::Running;
375 d.last_heartbeat = Some(self.created_at.elapsed());
376 d.uptime = Duration::ZERO;
377
378 self.bus.publish(
379 "supervisor.started",
380 serde_json::json!({ "daemon": name }),
381 "supervisor",
382 );
383 true
384 } else {
385 false
386 }
387 }
388
389 pub fn heartbeat(&mut self, name: &str) -> bool {
391 if let Some(d) = self.daemons.get_mut(name) {
392 d.last_heartbeat = Some(self.created_at.elapsed());
393 true
394 } else {
395 false
396 }
397 }
398
399 pub fn mark_waiting(&mut self, name: &str) -> bool {
401 if let Some(d) = self.daemons.get_mut(name) {
402 d.state = SupervisorState::Waiting;
403
404 self.bus.publish(
405 "supervisor.waiting",
406 serde_json::json!({ "daemon": name }),
407 "supervisor",
408 );
409 true
410 } else {
411 false
412 }
413 }
414
415 pub fn report_crash(&mut self, name: &str, reason: &str) -> bool {
417 if let Some(d) = self.daemons.get_mut(name) {
418 d.crash_reason = Some(reason.to_string());
419 d.restart_count += 1;
420
421 let will_restart = match d.restart_policy {
422 RestartPolicy::Never => false,
423 RestartPolicy::OnCrash { max_restarts } => d.restart_count <= max_restarts,
424 RestartPolicy::Always => true,
425 };
426
427 if will_restart {
428 d.state = SupervisorState::Restarting;
429 self.bus.publish(
430 "supervisor.restarting",
431 serde_json::json!({
432 "daemon": name,
433 "reason": reason,
434 "restart_count": d.restart_count,
435 }),
436 "supervisor",
437 );
438 } else {
439 d.state = SupervisorState::Dead;
440 self.bus.publish(
441 "supervisor.dead",
442 serde_json::json!({
443 "daemon": name,
444 "reason": reason,
445 "restart_count": d.restart_count,
446 }),
447 "supervisor",
448 );
449 }
450
451 will_restart
452 } else {
453 false
454 }
455 }
456
457 pub fn stop(&mut self, name: &str) -> bool {
459 if let Some(d) = self.daemons.get_mut(name) {
460 d.state = SupervisorState::Stopped;
461
462 self.bus.publish(
463 "supervisor.stopped",
464 serde_json::json!({ "daemon": name }),
465 "supervisor",
466 );
467 true
468 } else {
469 false
470 }
471 }
472
473 pub fn unregister(&mut self, name: &str) -> bool {
475 if self.daemons.remove(name).is_some() {
476 self.bus.publish(
477 "supervisor.unregistered",
478 serde_json::json!({ "daemon": name }),
479 "supervisor",
480 );
481 true
482 } else {
483 false
484 }
485 }
486
487 pub fn get(&self, name: &str) -> Option<&SupervisedDaemon> {
489 self.daemons.get(name)
490 }
491
492 pub fn list(&self) -> Vec<&SupervisedDaemon> {
494 self.daemons.values().collect()
495 }
496
497 pub fn state_counts(&self) -> HashMap<&'static str, usize> {
499 let mut counts = HashMap::new();
500 for d in self.daemons.values() {
501 let key = match d.state {
502 SupervisorState::Registered => "registered",
503 SupervisorState::Running => "running",
504 SupervisorState::Waiting => "waiting",
505 SupervisorState::Restarting => "restarting",
506 SupervisorState::Stopped => "stopped",
507 SupervisorState::Dead => "dead",
508 };
509 *counts.entry(key).or_insert(0) += 1;
510 }
511 counts
512 }
513
514 pub fn check_heartbeats(&mut self) -> Vec<String> {
516 let now = self.created_at.elapsed();
517 let timeout = self.heartbeat_timeout;
518 let mut timed_out = Vec::new();
519
520 let names: Vec<String> = self
521 .daemons
522 .iter()
523 .filter(|(_, d)| d.state == SupervisorState::Running)
524 .filter(|(_, d)| {
525 d.last_heartbeat
526 .map(|hb| now.saturating_sub(hb) > timeout)
527 .unwrap_or(false)
528 })
529 .map(|(name, _)| name.clone())
530 .collect();
531
532 for name in &names {
533 self.report_crash(&name, "heartbeat timeout");
534 timed_out.push(name.clone());
535 }
536
537 timed_out
538 }
539
540 pub fn summary(&self) -> String {
542 let counts = self.state_counts();
543 let total = self.daemons.len();
544 let running = counts.get("running").copied().unwrap_or(0);
545 let waiting = counts.get("waiting").copied().unwrap_or(0);
546 let dead = counts.get("dead").copied().unwrap_or(0);
547 format!(
548 "{total} daemons ({running} running, {waiting} waiting, {dead} dead)"
549 )
550 }
551}
552
553#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn topic_filter_exact() {
561 let f = TopicFilter::new("deploy");
562 assert!(f.matches("deploy"));
563 assert!(!f.matches("deploy.done"));
564 assert!(!f.matches("undeploy"));
565 }
566
567 #[test]
568 fn topic_filter_wildcard() {
569 let f = TopicFilter::new("*");
570 assert!(f.matches("deploy"));
571 assert!(f.matches("supervisor.started"));
572 assert!(f.matches("anything"));
573 }
574
575 #[test]
576 fn topic_filter_prefix() {
577 let f = TopicFilter::new("supervisor.*");
578 assert!(f.matches("supervisor.started"));
579 assert!(f.matches("supervisor.stopped"));
580 assert!(f.matches("supervisor.dead"));
581 assert!(!f.matches("deploy"));
582 assert!(!f.matches("supervisorx"));
583 }
584
585 #[test]
586 fn bus_publish_and_stats() {
587 let bus = EventBus::new();
588 bus.publish("test.event", serde_json::json!({"x": 1}), "test");
589 bus.publish("test.event", serde_json::json!({"x": 2}), "test");
590 bus.publish("other", serde_json::json!(null), "sys");
591
592 let stats = bus.stats();
593 assert_eq!(stats.events_published, 3);
594 assert_eq!(stats.topics_seen.len(), 2);
595 assert!(stats.topics_seen.contains(&"test.event".to_string()));
596 assert!(stats.topics_seen.contains(&"other".to_string()));
597 }
598
599 #[tokio::test]
600 async fn bus_subscribe_and_recv() {
601 let bus = EventBus::new();
602 let mut sub = bus.subscribe(TopicFilter::new("hello"));
603
604 bus.publish("hello", serde_json::json!({"msg": "world"}), "test");
605 bus.publish("ignore", serde_json::json!(null), "test");
606 bus.publish("hello", serde_json::json!({"msg": "again"}), "test");
607
608 let e1 = sub.try_recv().unwrap().unwrap();
609 assert_eq!(e1.topic, "hello");
610 assert_eq!(e1.payload["msg"], "world");
611
612 let e2 = sub.try_recv().unwrap().unwrap();
613 assert_eq!(e2.payload["msg"], "again");
614
615 assert!(sub.try_recv().unwrap().is_none());
617 }
618
619 #[test]
620 fn bus_subscriber_count() {
621 let bus = EventBus::new();
622 assert_eq!(bus.subscriber_count(), 0);
623
624 let _sub1 = bus.subscribe(TopicFilter::new("*"));
625 assert_eq!(bus.subscriber_count(), 1);
626
627 let _sub2 = bus.subscribe(TopicFilter::new("deploy"));
628 assert_eq!(bus.subscriber_count(), 2);
629
630 drop(_sub1);
631 assert_eq!(bus.subscriber_count(), 1);
632 }
633
634 #[test]
635 fn supervisor_register_and_lifecycle() {
636 let bus = EventBus::new();
637 let mut sup = DaemonSupervisor::new(bus);
638
639 sup.register("flow_a", RestartPolicy::default());
640 assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Registered);
641
642 sup.mark_started("flow_a");
643 assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Running);
644
645 sup.mark_waiting("flow_a");
646 assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Waiting);
647
648 sup.stop("flow_a");
649 assert_eq!(sup.get("flow_a").unwrap().state, SupervisorState::Stopped);
650 }
651
652 #[test]
653 fn supervisor_crash_restart_policy() {
654 let bus = EventBus::new();
655 let mut sup = DaemonSupervisor::new(bus);
656
657 sup.register("flow_b", RestartPolicy::OnCrash { max_restarts: 2 });
659 sup.mark_started("flow_b");
660
661 assert!(sup.report_crash("flow_b", "panic"));
663 assert_eq!(sup.get("flow_b").unwrap().state, SupervisorState::Restarting);
664
665 sup.mark_started("flow_b");
667 assert!(sup.report_crash("flow_b", "panic again"));
668 assert_eq!(sup.get("flow_b").unwrap().state, SupervisorState::Restarting);
669
670 sup.mark_started("flow_b");
672 assert!(!sup.report_crash("flow_b", "fatal"));
673 assert_eq!(sup.get("flow_b").unwrap().state, SupervisorState::Dead);
674 assert_eq!(sup.get("flow_b").unwrap().restart_count, 3);
675 }
676
677 #[test]
678 fn supervisor_never_restart() {
679 let bus = EventBus::new();
680 let mut sup = DaemonSupervisor::new(bus);
681
682 sup.register("ephemeral", RestartPolicy::Never);
683 sup.mark_started("ephemeral");
684
685 assert!(!sup.report_crash("ephemeral", "one shot"));
686 assert_eq!(sup.get("ephemeral").unwrap().state, SupervisorState::Dead);
687 }
688
689 #[test]
690 fn supervisor_always_restart() {
691 let bus = EventBus::new();
692 let mut sup = DaemonSupervisor::new(bus);
693
694 sup.register("immortal", RestartPolicy::Always);
695 sup.mark_started("immortal");
696
697 for i in 0..10 {
698 assert!(sup.report_crash("immortal", &format!("crash {i}")));
699 assert_eq!(sup.get("immortal").unwrap().state, SupervisorState::Restarting);
700 sup.mark_started("immortal");
701 }
702 assert_eq!(sup.get("immortal").unwrap().restart_count, 10);
703 }
704
705 #[test]
706 fn supervisor_unregister() {
707 let bus = EventBus::new();
708 let mut sup = DaemonSupervisor::new(bus);
709
710 sup.register("temp", RestartPolicy::Never);
711 assert!(sup.unregister("temp"));
712 assert!(sup.get("temp").is_none());
713 assert!(!sup.unregister("temp")); }
715
716 #[test]
717 fn supervisor_state_counts() {
718 let bus = EventBus::new();
719 let mut sup = DaemonSupervisor::new(bus);
720
721 sup.register("a", RestartPolicy::Never);
722 sup.register("b", RestartPolicy::Never);
723 sup.register("c", RestartPolicy::Never);
724
725 sup.mark_started("a");
726 sup.mark_started("b");
727 sup.mark_waiting("c");
728
729 let counts = sup.state_counts();
730 assert_eq!(counts.get("running"), Some(&2));
731 assert_eq!(counts.get("waiting"), Some(&1));
732 }
733
734 #[test]
735 fn supervisor_summary() {
736 let bus = EventBus::new();
737 let mut sup = DaemonSupervisor::new(bus);
738
739 sup.register("x", RestartPolicy::Never);
740 sup.register("y", RestartPolicy::Never);
741 sup.mark_started("x");
742
743 let s = sup.summary();
744 assert!(s.contains("2 daemons"));
745 assert!(s.contains("1 running"));
746 }
747
748 #[test]
749 fn supervisor_heartbeat_timeout() {
750 let bus = EventBus::new();
751 let mut sup = DaemonSupervisor::new(bus);
752 sup.heartbeat_timeout = Duration::from_millis(1);
754
755 sup.register("slow", RestartPolicy::OnCrash { max_restarts: 1 });
756 sup.mark_started("slow");
757
758 std::thread::sleep(Duration::from_millis(5));
762
763 let timed_out = sup.check_heartbeats();
764 assert_eq!(timed_out, vec!["slow"]);
765 assert_eq!(sup.get("slow").unwrap().state, SupervisorState::Restarting);
766 }
767
768 #[test]
769 fn event_display() {
770 let event = Event {
771 topic: "deploy".to_string(),
772 payload: serde_json::json!({"flow": "TestFlow"}),
773 source: "client".to_string(),
774 timestamp: Duration::from_secs(5),
775 };
776 let s = format!("{event}");
777 assert!(s.contains("client"));
778 assert!(s.contains("deploy"));
779 }
780
781 #[test]
782 fn supervisor_emits_lifecycle_events() {
783 let bus = EventBus::new();
784 let mut sub = bus.subscribe(TopicFilter::new("supervisor.*"));
785 let mut sup = DaemonSupervisor::new(bus);
786
787 sup.register("d1", RestartPolicy::Never);
788 sup.mark_started("d1");
789 sup.stop("d1");
790
791 let e1 = sub.try_recv().unwrap().unwrap();
793 assert_eq!(e1.topic, "supervisor.registered");
794
795 let e2 = sub.try_recv().unwrap().unwrap();
796 assert_eq!(e2.topic, "supervisor.started");
797
798 let e3 = sub.try_recv().unwrap().unwrap();
799 assert_eq!(e3.topic, "supervisor.stopped");
800
801 assert!(sub.try_recv().unwrap().is_none());
802 }
803}