Skip to main content

dk_engine/workspace/
event_bus.rs

1//! Per-repo event bus for cross-session awareness.
2//!
3//! Uses `tokio::sync::broadcast` channels to fan out workspace events
4//! to all subscribers within the same repository. Each repo gets its
5//! own channel, lazily created on first publish or subscribe.
6
7use dashmap::DashMap;
8use dk_core::RepoId;
9use tokio::sync::broadcast;
10use uuid::Uuid;
11
12// ── Event types ──────────────────────────────────────────────────────
13
14/// Events broadcast within a repository for cross-session coordination.
15#[derive(Debug, Clone)]
16pub enum SessionEvent {
17    /// A new session workspace was created.
18    SessionCreated {
19        session_id: Uuid,
20        agent_id: String,
21        intent: String,
22    },
23
24    /// A file was modified in a session's overlay.
25    FileModified {
26        session_id: Uuid,
27        file_path: String,
28    },
29
30    /// A changeset was submitted for review/merge.
31    ChangesetSubmitted {
32        session_id: Uuid,
33        files_modified: usize,
34    },
35
36    /// A changeset was merged into the repository.
37    ChangesetMerged {
38        session_id: Uuid,
39        commit_hash: String,
40    },
41
42    /// A session disconnected (workspace may still be persistent).
43    SessionDisconnected {
44        session_id: Uuid,
45    },
46}
47
48// ── RepoEventBus ─────────────────────────────────────────────────────
49
50/// Default broadcast channel capacity per repo.
51const DEFAULT_CHANNEL_CAPACITY: usize = 256;
52
53/// A per-repository event bus backed by `tokio::sync::broadcast`.
54///
55/// Channels are lazily created on first `publish` or `subscribe` for a
56/// given repo. Slow consumers that fall behind will receive
57/// `RecvError::Lagged`, which is non-fatal — they skip missed events.
58pub struct RepoEventBus {
59    channels: DashMap<RepoId, broadcast::Sender<SessionEvent>>,
60}
61
62impl RepoEventBus {
63    /// Create a new, empty event bus.
64    pub fn new() -> Self {
65        Self {
66            channels: DashMap::new(),
67        }
68    }
69
70    /// Publish an event to all subscribers of the given repository.
71    ///
72    /// If no subscribers exist yet, the event is silently dropped (the
73    /// channel is still created for future subscribers).
74    pub fn publish(&self, repo_id: RepoId, event: SessionEvent) {
75        let sender = self.get_or_create_sender(repo_id);
76        // send() returns Err if there are no receivers, which is fine.
77        let _ = sender.send(event);
78    }
79
80    /// Subscribe to events for a repository.
81    ///
82    /// Returns a `broadcast::Receiver` that yields `SessionEvent`s.
83    pub fn subscribe(&self, repo_id: RepoId) -> broadcast::Receiver<SessionEvent> {
84        let sender = self.get_or_create_sender(repo_id);
85        sender.subscribe()
86    }
87
88    /// Number of repositories with active channels.
89    pub fn active_repos(&self) -> usize {
90        self.channels.len()
91    }
92
93    /// Number of active subscribers for a given repo.
94    ///
95    /// Returns 0 if no channel exists for the repo.
96    pub fn subscriber_count(&self, repo_id: RepoId) -> usize {
97        self.channels
98            .get(&repo_id)
99            .map(|s| s.receiver_count())
100            .unwrap_or(0)
101    }
102
103    /// Remove channels with no active subscribers.
104    pub fn prune_dead_channels(&self) {
105        self.channels.retain(|_repo_id, sender| sender.receiver_count() > 0);
106    }
107
108    /// Get or lazily create the broadcast sender for a repo.
109    fn get_or_create_sender(&self, repo_id: RepoId) -> broadcast::Sender<SessionEvent> {
110        self.channels
111            .entry(repo_id)
112            .or_insert_with(|| broadcast::channel(DEFAULT_CHANNEL_CAPACITY).0)
113            .value()
114            .clone()
115    }
116}
117
118impl Default for RepoEventBus {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[tokio::test]
129    async fn publish_and_receive() {
130        let bus = RepoEventBus::new();
131        let repo = Uuid::new_v4();
132
133        let mut rx = bus.subscribe(repo);
134
135        bus.publish(
136            repo,
137            SessionEvent::SessionCreated {
138                session_id: Uuid::new_v4(),
139                agent_id: "agent-1".into(),
140                intent: "fix bug".into(),
141            },
142        );
143
144        let event = rx.recv().await.expect("should receive event");
145        match event {
146            SessionEvent::SessionCreated { agent_id, .. } => {
147                assert_eq!(agent_id, "agent-1");
148            }
149            other => panic!("unexpected event: {other:?}"),
150        }
151    }
152
153    #[tokio::test]
154    async fn no_subscriber_does_not_panic() {
155        let bus = RepoEventBus::new();
156        let repo = Uuid::new_v4();
157
158        // Publishing with no subscribers should not panic.
159        bus.publish(
160            repo,
161            SessionEvent::SessionDisconnected {
162                session_id: Uuid::new_v4(),
163            },
164        );
165    }
166
167    #[test]
168    fn subscriber_count() {
169        let bus = RepoEventBus::new();
170        let repo = Uuid::new_v4();
171
172        assert_eq!(bus.subscriber_count(repo), 0);
173
174        let _rx1 = bus.subscribe(repo);
175        assert_eq!(bus.subscriber_count(repo), 1);
176
177        let _rx2 = bus.subscribe(repo);
178        assert_eq!(bus.subscriber_count(repo), 2);
179    }
180
181    #[test]
182    fn active_repos_count() {
183        let bus = RepoEventBus::new();
184        assert_eq!(bus.active_repos(), 0);
185
186        let _rx = bus.subscribe(Uuid::new_v4());
187        assert_eq!(bus.active_repos(), 1);
188
189        let _rx2 = bus.subscribe(Uuid::new_v4());
190        assert_eq!(bus.active_repos(), 2);
191    }
192
193    #[tokio::test]
194    async fn multiple_subscribers_receive_same_event() {
195        let bus = RepoEventBus::new();
196        let repo = Uuid::new_v4();
197
198        let mut rx1 = bus.subscribe(repo);
199        let mut rx2 = bus.subscribe(repo);
200
201        bus.publish(
202            repo,
203            SessionEvent::FileModified {
204                session_id: Uuid::new_v4(),
205                file_path: "src/main.rs".into(),
206            },
207        );
208
209        let e1 = rx1.recv().await.expect("rx1 should receive");
210        let e2 = rx2.recv().await.expect("rx2 should receive");
211
212        match (e1, e2) {
213            (
214                SessionEvent::FileModified { file_path: p1, .. },
215                SessionEvent::FileModified { file_path: p2, .. },
216            ) => {
217                assert_eq!(p1, "src/main.rs");
218                assert_eq!(p2, "src/main.rs");
219            }
220            _ => panic!("both should receive FileModified"),
221        }
222    }
223}