dk_protocol/events.rs
1use dashmap::DashMap;
2use tokio::sync::broadcast;
3
4use crate::WatchEvent;
5
6/// Special channel key that receives a copy of every event regardless of repo.
7const ALL_CHANNEL: &str = "__all__";
8
9/// Shared event bus for broadcasting repo events to watching agents.
10///
11/// Uses per-repo [`tokio::sync::broadcast`] channels so subscribers
12/// only receive events for repos they care about. A special "__all__"
13/// channel receives a copy of every published event (used by the
14/// platform bridge).
15///
16/// Events that are not consumed before the channel capacity (256) is
17/// exhausted are silently dropped for lagged receivers.
18#[derive(Clone)]
19pub struct EventBus {
20 channels: DashMap<String, broadcast::Sender<WatchEvent>>,
21}
22
23impl EventBus {
24 /// Create a new event bus.
25 pub fn new() -> Self {
26 let channels = DashMap::new();
27 // Pre-create the global "__all__" channel.
28 let (tx, _) = broadcast::channel(256);
29 channels.insert(ALL_CHANNEL.to_string(), tx);
30 Self { channels }
31 }
32
33 /// Get or create the broadcast sender for the given key.
34 fn get_or_create_sender(&self, key: &str) -> broadcast::Sender<WatchEvent> {
35 self.channels
36 .entry(key.to_string())
37 .or_insert_with(|| {
38 let (tx, _) = broadcast::channel(256);
39 tx
40 })
41 .clone()
42 }
43
44 /// Publish an event to a specific repo channel AND the global "__all__" channel.
45 ///
46 /// If there are no subscribers the event is silently discarded.
47 pub fn publish(&self, event: WatchEvent) {
48 let repo_id = &event.repo_id;
49
50 // Publish to repo-specific channel if repo_id is set.
51 if !repo_id.is_empty() {
52 let tx = self.get_or_create_sender(repo_id);
53 let _ = tx.send(event.clone());
54 }
55
56 // Always publish to the global "__all__" channel.
57 if let Some(tx) = self.channels.get(ALL_CHANNEL) {
58 let _ = tx.send(event);
59 }
60 }
61
62 /// Subscribe to events for a specific repo.
63 ///
64 /// The receiver is created while the DashMap shard lock is held so that
65 /// `cleanup_idle` cannot race and remove the channel between creation
66 /// and subscription.
67 pub fn subscribe(&self, repo_id: &str) -> broadcast::Receiver<WatchEvent> {
68 self.channels
69 .entry(repo_id.to_string())
70 .or_insert_with(|| {
71 let (tx, _) = broadcast::channel(256);
72 tx
73 })
74 .subscribe()
75 }
76
77 /// Subscribe to ALL events across all repos (for the platform bridge).
78 pub fn subscribe_all(&self) -> broadcast::Receiver<WatchEvent> {
79 self.channels
80 .entry(ALL_CHANNEL.to_string())
81 .or_insert_with(|| {
82 let (tx, _) = broadcast::channel(256);
83 tx
84 })
85 .subscribe()
86 }
87
88 /// Remove the channel for a specific repo (e.g. when decommissioned).
89 pub fn remove_repo(&self, repo_id: &str) {
90 if repo_id != ALL_CHANNEL {
91 self.channels.remove(repo_id);
92 }
93 }
94
95 /// Remove channels that have no active receivers, excluding the global channel.
96 /// Call periodically (e.g. every few minutes) to prevent unbounded growth.
97 pub fn cleanup_idle(&self) {
98 self.channels.retain(|key, sender| {
99 key == ALL_CHANNEL || sender.receiver_count() > 0
100 });
101 }
102}