Skip to main content

dk_protocol/
events.rs

1use dashmap::DashMap;
2use tokio::sync::broadcast;
3
4use crate::WatchEvent;
5
6/// Special channel key that receives a copy of every event regardless of repo.
7const ALL_CHANNEL: &str = "__all__";
8
9/// Shared event bus for broadcasting repo events to watching agents.
10///
11/// Uses per-repo [`tokio::sync::broadcast`] channels so subscribers
12/// only receive events for repos they care about.  A special "__all__"
13/// channel receives a copy of every published event (used by the
14/// platform bridge).
15///
16/// Events that are not consumed before the channel capacity (256) is
17/// exhausted are silently dropped for lagged receivers.
18#[derive(Clone)]
19pub struct EventBus {
20    channels: DashMap<String, broadcast::Sender<WatchEvent>>,
21}
22
23impl EventBus {
24    /// Create a new event bus.
25    pub fn new() -> Self {
26        let channels = DashMap::new();
27        // Pre-create the global "__all__" channel.
28        let (tx, _) = broadcast::channel(256);
29        channels.insert(ALL_CHANNEL.to_string(), tx);
30        Self { channels }
31    }
32
33    /// Get or create the broadcast sender for the given key.
34    fn get_or_create_sender(&self, key: &str) -> broadcast::Sender<WatchEvent> {
35        self.channels
36            .entry(key.to_string())
37            .or_insert_with(|| {
38                let (tx, _) = broadcast::channel(256);
39                tx
40            })
41            .clone()
42    }
43
44    /// Publish an event to a specific repo channel AND the global "__all__" channel.
45    ///
46    /// If there are no subscribers the event is silently discarded.
47    pub fn publish(&self, event: WatchEvent) {
48        let repo_id = &event.repo_id;
49
50        // Publish to repo-specific channel if repo_id is set.
51        if !repo_id.is_empty() {
52            let tx = self.get_or_create_sender(repo_id);
53            let _ = tx.send(event.clone());
54        }
55
56        // Always publish to the global "__all__" channel.
57        if let Some(tx) = self.channels.get(ALL_CHANNEL) {
58            let _ = tx.send(event);
59        }
60    }
61
62    /// Subscribe to events for a specific repo.
63    ///
64    /// The receiver is created while the DashMap shard lock is held so that
65    /// `cleanup_idle` cannot race and remove the channel between creation
66    /// and subscription.
67    pub fn subscribe(&self, repo_id: &str) -> broadcast::Receiver<WatchEvent> {
68        self.channels
69            .entry(repo_id.to_string())
70            .or_insert_with(|| {
71                let (tx, _) = broadcast::channel(256);
72                tx
73            })
74            .subscribe()
75    }
76
77    /// Subscribe to ALL events across all repos (for the platform bridge).
78    pub fn subscribe_all(&self) -> broadcast::Receiver<WatchEvent> {
79        self.channels
80            .entry(ALL_CHANNEL.to_string())
81            .or_insert_with(|| {
82                let (tx, _) = broadcast::channel(256);
83                tx
84            })
85            .subscribe()
86    }
87
88    /// Remove the channel for a specific repo (e.g. when decommissioned).
89    pub fn remove_repo(&self, repo_id: &str) {
90        if repo_id != ALL_CHANNEL {
91            self.channels.remove(repo_id);
92        }
93    }
94
95    /// Remove channels that have no active receivers, excluding the global channel.
96    /// Call periodically (e.g. every few minutes) to prevent unbounded growth.
97    pub fn cleanup_idle(&self) {
98        self.channels.retain(|key, sender| {
99            key == ALL_CHANNEL || sender.receiver_count() > 0
100        });
101    }
102}