dk_engine/workspace/
event_bus.rs1use dashmap::DashMap;
8use dk_core::RepoId;
9use tokio::sync::broadcast;
10use uuid::Uuid;
11
12#[derive(Debug, Clone)]
16pub enum SessionEvent {
17 SessionCreated {
19 session_id: Uuid,
20 agent_id: String,
21 intent: String,
22 },
23
24 FileModified {
26 session_id: Uuid,
27 file_path: String,
28 },
29
30 ChangesetSubmitted {
32 session_id: Uuid,
33 files_modified: usize,
34 },
35
36 ChangesetMerged {
38 session_id: Uuid,
39 commit_hash: String,
40 },
41
42 SessionDisconnected {
44 session_id: Uuid,
45 },
46}
47
48const DEFAULT_CHANNEL_CAPACITY: usize = 256;
52
53pub struct RepoEventBus {
59 channels: DashMap<RepoId, broadcast::Sender<SessionEvent>>,
60}
61
62impl RepoEventBus {
63 pub fn new() -> Self {
65 Self {
66 channels: DashMap::new(),
67 }
68 }
69
70 pub fn publish(&self, repo_id: RepoId, event: SessionEvent) {
75 let sender = self.get_or_create_sender(repo_id);
76 let _ = sender.send(event);
78 }
79
80 pub fn subscribe(&self, repo_id: RepoId) -> broadcast::Receiver<SessionEvent> {
84 let sender = self.get_or_create_sender(repo_id);
85 sender.subscribe()
86 }
87
88 pub fn active_repos(&self) -> usize {
90 self.channels.len()
91 }
92
93 pub fn subscriber_count(&self, repo_id: RepoId) -> usize {
97 self.channels
98 .get(&repo_id)
99 .map(|s| s.receiver_count())
100 .unwrap_or(0)
101 }
102
103 pub fn prune_dead_channels(&self) {
105 self.channels.retain(|_repo_id, sender| sender.receiver_count() > 0);
106 }
107
108 fn get_or_create_sender(&self, repo_id: RepoId) -> broadcast::Sender<SessionEvent> {
110 self.channels
111 .entry(repo_id)
112 .or_insert_with(|| broadcast::channel(DEFAULT_CHANNEL_CAPACITY).0)
113 .value()
114 .clone()
115 }
116}
117
118impl Default for RepoEventBus {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[tokio::test]
129 async fn publish_and_receive() {
130 let bus = RepoEventBus::new();
131 let repo = Uuid::new_v4();
132
133 let mut rx = bus.subscribe(repo);
134
135 bus.publish(
136 repo,
137 SessionEvent::SessionCreated {
138 session_id: Uuid::new_v4(),
139 agent_id: "agent-1".into(),
140 intent: "fix bug".into(),
141 },
142 );
143
144 let event = rx.recv().await.expect("should receive event");
145 match event {
146 SessionEvent::SessionCreated { agent_id, .. } => {
147 assert_eq!(agent_id, "agent-1");
148 }
149 other => panic!("unexpected event: {other:?}"),
150 }
151 }
152
153 #[tokio::test]
154 async fn no_subscriber_does_not_panic() {
155 let bus = RepoEventBus::new();
156 let repo = Uuid::new_v4();
157
158 bus.publish(
160 repo,
161 SessionEvent::SessionDisconnected {
162 session_id: Uuid::new_v4(),
163 },
164 );
165 }
166
167 #[test]
168 fn subscriber_count() {
169 let bus = RepoEventBus::new();
170 let repo = Uuid::new_v4();
171
172 assert_eq!(bus.subscriber_count(repo), 0);
173
174 let _rx1 = bus.subscribe(repo);
175 assert_eq!(bus.subscriber_count(repo), 1);
176
177 let _rx2 = bus.subscribe(repo);
178 assert_eq!(bus.subscriber_count(repo), 2);
179 }
180
181 #[test]
182 fn active_repos_count() {
183 let bus = RepoEventBus::new();
184 assert_eq!(bus.active_repos(), 0);
185
186 let _rx = bus.subscribe(Uuid::new_v4());
187 assert_eq!(bus.active_repos(), 1);
188
189 let _rx2 = bus.subscribe(Uuid::new_v4());
190 assert_eq!(bus.active_repos(), 2);
191 }
192
193 #[tokio::test]
194 async fn multiple_subscribers_receive_same_event() {
195 let bus = RepoEventBus::new();
196 let repo = Uuid::new_v4();
197
198 let mut rx1 = bus.subscribe(repo);
199 let mut rx2 = bus.subscribe(repo);
200
201 bus.publish(
202 repo,
203 SessionEvent::FileModified {
204 session_id: Uuid::new_v4(),
205 file_path: "src/main.rs".into(),
206 },
207 );
208
209 let e1 = rx1.recv().await.expect("rx1 should receive");
210 let e2 = rx2.recv().await.expect("rx2 should receive");
211
212 match (e1, e2) {
213 (
214 SessionEvent::FileModified { file_path: p1, .. },
215 SessionEvent::FileModified { file_path: p2, .. },
216 ) => {
217 assert_eq!(p1, "src/main.rs");
218 assert_eq!(p2, "src/main.rs");
219 }
220 _ => panic!("both should receive FileModified"),
221 }
222 }
223}