1use actr_protocol::{ActrId, PayloadType};
7use tokio::sync::broadcast;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum ConnectionState {
13 New,
15 Connecting,
17 Connected,
19 Disconnected,
21 Failed,
23 Closed,
25}
26
27impl std::fmt::Display for ConnectionState {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 ConnectionState::New => write!(f, "New"),
31 ConnectionState::Connecting => write!(f, "Connecting"),
32 ConnectionState::Connected => write!(f, "Connected"),
33 ConnectionState::Disconnected => write!(f, "Disconnected"),
34 ConnectionState::Failed => write!(f, "Failed"),
35 ConnectionState::Closed => write!(f, "Closed"),
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub enum ConnectionEvent {
43 StateChanged {
45 peer_id: ActrId,
46 state: ConnectionState,
47 },
48
49 DataChannelClosed {
51 peer_id: ActrId,
52 payload_type: PayloadType,
53 },
54
55 ConnectionClosed { peer_id: ActrId },
57
58 IceRestartStarted { peer_id: ActrId },
60
61 IceRestartCompleted { peer_id: ActrId, success: bool },
63
64 NewOfferReceived { peer_id: ActrId, sdp: String },
66
67 NewRoleAssignment { peer_id: ActrId, is_offerer: bool },
69}
70
71impl ConnectionEvent {
72 pub fn peer_id(&self) -> &ActrId {
74 match self {
75 ConnectionEvent::StateChanged { peer_id, .. } => peer_id,
76 ConnectionEvent::DataChannelClosed { peer_id, .. } => peer_id,
77 ConnectionEvent::ConnectionClosed { peer_id } => peer_id,
78 ConnectionEvent::IceRestartStarted { peer_id } => peer_id,
79 ConnectionEvent::IceRestartCompleted { peer_id, .. } => peer_id,
80 ConnectionEvent::NewOfferReceived { peer_id, .. } => peer_id,
81 ConnectionEvent::NewRoleAssignment { peer_id, .. } => peer_id,
82 }
83 }
84
85 pub fn should_trigger_cleanup(&self) -> bool {
87 matches!(
88 self,
89 ConnectionEvent::ConnectionClosed { .. }
90 | ConnectionEvent::StateChanged {
91 state: ConnectionState::Closed,
92 ..
93 }
94 | ConnectionEvent::IceRestartCompleted { success: false, .. }
95 )
96 }
97
98 pub fn is_recoverable_state(&self) -> bool {
100 matches!(
101 self,
102 ConnectionEvent::StateChanged {
103 state: ConnectionState::Disconnected | ConnectionState::Failed,
104 ..
105 }
106 )
107 }
108}
109
110const DEFAULT_CHANNEL_CAPACITY: usize = 64;
112
113#[derive(Debug)]
118pub struct ConnectionEventBroadcaster {
119 tx: broadcast::Sender<ConnectionEvent>,
120}
121
122impl ConnectionEventBroadcaster {
123 pub fn new() -> Self {
125 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
126 }
127
128 pub fn with_capacity(capacity: usize) -> Self {
130 let (tx, _) = broadcast::channel(capacity);
131 Self { tx }
132 }
133
134 pub fn send(&self, event: ConnectionEvent) -> usize {
139 match self.tx.send(event) {
140 Ok(count) => count,
141 Err(_) => 0, }
143 }
144
145 pub fn subscribe(&self) -> broadcast::Receiver<ConnectionEvent> {
147 self.tx.subscribe()
148 }
149
150 pub fn receiver_count(&self) -> usize {
152 self.tx.receiver_count()
153 }
154
155 pub fn sender(&self) -> broadcast::Sender<ConnectionEvent> {
157 self.tx.clone()
158 }
159}
160
161impl Default for ConnectionEventBroadcaster {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl Clone for ConnectionEventBroadcaster {
168 fn clone(&self) -> Self {
169 Self {
170 tx: self.tx.clone(),
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use actr_protocol::{ActrId, ActrType, Realm};
179
180 fn test_peer_id() -> ActrId {
181 ActrId {
182 realm: Realm { realm_id: 1 },
183 serial_number: 1,
184 r#type: ActrType {
185 manufacturer: "test".to_string(),
186 name: "device".to_string(),
187 },
188 }
189 }
190
191 #[tokio::test]
192 async fn test_broadcaster_send_receive() {
193 let broadcaster = ConnectionEventBroadcaster::new();
194 let mut rx = broadcaster.subscribe();
195
196 let peer_id = test_peer_id();
197 broadcaster.send(ConnectionEvent::ConnectionClosed {
198 peer_id: peer_id.clone(),
199 });
200
201 let event = rx.recv().await.unwrap();
202 assert!(matches!(event, ConnectionEvent::ConnectionClosed { .. }));
203 }
204
205 #[tokio::test]
206 async fn test_multiple_subscribers() {
207 let broadcaster = ConnectionEventBroadcaster::new();
208 let mut rx1 = broadcaster.subscribe();
209 let mut rx2 = broadcaster.subscribe();
210
211 let peer_id = test_peer_id();
212 let count = broadcaster.send(ConnectionEvent::StateChanged {
213 peer_id: peer_id.clone(),
214 state: ConnectionState::Connected,
215 });
216
217 assert_eq!(count, 2);
218
219 let event1 = rx1.recv().await.unwrap();
220 let event2 = rx2.recv().await.unwrap();
221
222 assert!(matches!(
223 event1,
224 ConnectionEvent::StateChanged {
225 state: ConnectionState::Connected,
226 ..
227 }
228 ));
229 assert!(matches!(
230 event2,
231 ConnectionEvent::StateChanged {
232 state: ConnectionState::Connected,
233 ..
234 }
235 ));
236 }
237
238 #[test]
239 fn test_should_trigger_cleanup() {
240 let peer_id = test_peer_id();
241
242 assert!(
244 ConnectionEvent::ConnectionClosed {
245 peer_id: peer_id.clone()
246 }
247 .should_trigger_cleanup()
248 );
249
250 assert!(
251 ConnectionEvent::StateChanged {
252 peer_id: peer_id.clone(),
253 state: ConnectionState::Closed,
254 }
255 .should_trigger_cleanup()
256 );
257
258 assert!(
259 ConnectionEvent::IceRestartCompleted {
260 peer_id: peer_id.clone(),
261 success: false,
262 }
263 .should_trigger_cleanup()
264 );
265
266 assert!(
268 !ConnectionEvent::StateChanged {
269 peer_id: peer_id.clone(),
270 state: ConnectionState::Disconnected,
271 }
272 .should_trigger_cleanup()
273 );
274
275 assert!(
276 !ConnectionEvent::IceRestartCompleted {
277 peer_id: peer_id.clone(),
278 success: true,
279 }
280 .should_trigger_cleanup()
281 );
282 }
283
284 #[test]
285 fn test_is_recoverable_state() {
286 let peer_id = test_peer_id();
287
288 assert!(
290 ConnectionEvent::StateChanged {
291 peer_id: peer_id.clone(),
292 state: ConnectionState::Disconnected,
293 }
294 .is_recoverable_state()
295 );
296
297 assert!(
298 ConnectionEvent::StateChanged {
299 peer_id: peer_id.clone(),
300 state: ConnectionState::Failed,
301 }
302 .is_recoverable_state()
303 );
304
305 assert!(
307 !ConnectionEvent::StateChanged {
308 peer_id: peer_id.clone(),
309 state: ConnectionState::Closed,
310 }
311 .is_recoverable_state()
312 );
313
314 assert!(
315 !ConnectionEvent::ConnectionClosed {
316 peer_id: peer_id.clone()
317 }
318 .is_recoverable_state()
319 );
320 }
321}