1use actr_protocol::{ActrId, PayloadType};
7use tokio::sync::broadcast;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum ConnectionState {
13 New,
15 Connecting,
17 Connected,
19 Disconnected,
21 Failed,
23 Closed,
25}
26
27impl std::fmt::Display for ConnectionState {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 ConnectionState::New => write!(f, "New"),
31 ConnectionState::Connecting => write!(f, "Connecting"),
32 ConnectionState::Connected => write!(f, "Connected"),
33 ConnectionState::Disconnected => write!(f, "Disconnected"),
34 ConnectionState::Failed => write!(f, "Failed"),
35 ConnectionState::Closed => write!(f, "Closed"),
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub enum ConnectionEvent {
43 StateChanged {
45 peer_id: ActrId,
46 session_id: u64,
47 state: ConnectionState,
48 },
49
50 DataChannelClosed {
52 peer_id: ActrId,
53 session_id: u64,
54 payload_type: PayloadType,
55 },
56
57 DataChannelOpened {
61 peer_id: ActrId,
62 session_id: u64,
63 payload_type: PayloadType,
64 },
65
66 ConnectionClosed { peer_id: ActrId, session_id: u64 },
68
69 IceRestartStarted { peer_id: ActrId, session_id: u64 },
71
72 IceRestartCompleted {
74 peer_id: ActrId,
75 session_id: u64,
76 success: bool,
77 },
78
79 NewOfferReceived { peer_id: ActrId, sdp: String },
81
82 NewRoleAssignment { peer_id: ActrId, is_offerer: bool },
84}
85
86impl ConnectionEvent {
87 pub fn peer_id(&self) -> &ActrId {
89 match self {
90 ConnectionEvent::StateChanged { peer_id, .. } => peer_id,
91 ConnectionEvent::DataChannelClosed { peer_id, .. } => peer_id,
92 ConnectionEvent::DataChannelOpened { peer_id, .. } => peer_id,
93 ConnectionEvent::ConnectionClosed { peer_id, .. } => peer_id,
94 ConnectionEvent::IceRestartStarted { peer_id, .. } => peer_id,
95 ConnectionEvent::IceRestartCompleted { peer_id, .. } => peer_id,
96 ConnectionEvent::NewOfferReceived { peer_id, .. } => peer_id,
97 ConnectionEvent::NewRoleAssignment { peer_id, .. } => peer_id,
98 }
99 }
100
101 pub fn session_id(&self) -> Option<u64> {
103 match self {
104 Self::StateChanged { session_id, .. }
105 | Self::DataChannelClosed { session_id, .. }
106 | Self::DataChannelOpened { session_id, .. }
107 | Self::ConnectionClosed { session_id, .. }
108 | Self::IceRestartStarted { session_id, .. }
109 | Self::IceRestartCompleted { session_id, .. } => Some(*session_id),
110 _ => None,
111 }
112 }
113
114 pub fn should_trigger_cleanup(&self) -> bool {
116 matches!(
117 self,
118 ConnectionEvent::ConnectionClosed { .. }
119 | ConnectionEvent::StateChanged {
120 state: ConnectionState::Closed,
121 ..
122 }
123 | ConnectionEvent::IceRestartCompleted { success: false, .. }
124 )
125 }
126
127 pub fn is_recoverable_state(&self) -> bool {
129 matches!(
130 self,
131 ConnectionEvent::StateChanged {
132 state: ConnectionState::Disconnected | ConnectionState::Failed,
133 ..
134 }
135 )
136 }
137}
138
139const DEFAULT_CHANNEL_CAPACITY: usize = 256;
141
142#[derive(Debug)]
147pub(crate) struct ConnectionEventBroadcaster {
148 tx: broadcast::Sender<ConnectionEvent>,
149}
150
151impl ConnectionEventBroadcaster {
152 pub(crate) fn new() -> Self {
154 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
155 }
156
157 pub(crate) fn with_capacity(capacity: usize) -> Self {
159 let (tx, _) = broadcast::channel(capacity);
160 Self { tx }
161 }
162
163 pub(crate) fn send(&self, event: ConnectionEvent) -> usize {
168 self.tx.send(event).unwrap_or_default()
169 }
170
171 pub(crate) fn subscribe(&self) -> broadcast::Receiver<ConnectionEvent> {
173 self.tx.subscribe()
174 }
175
176 pub(crate) fn sender(&self) -> broadcast::Sender<ConnectionEvent> {
178 self.tx.clone()
179 }
180}
181
182impl Default for ConnectionEventBroadcaster {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl Clone for ConnectionEventBroadcaster {
189 fn clone(&self) -> Self {
190 Self {
191 tx: self.tx.clone(),
192 }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use actr_protocol::{ActrId, ActrType, Realm};
200
201 fn test_peer_id() -> ActrId {
202 ActrId {
203 realm: Realm { realm_id: 1 },
204 serial_number: 1,
205 r#type: ActrType {
206 manufacturer: "test".to_string(),
207 name: "device".to_string(),
208 version: "1.0.0".to_string(),
209 },
210 }
211 }
212
213 #[tokio::test]
214 async fn test_broadcaster_send_receive() {
215 let broadcaster = ConnectionEventBroadcaster::new();
216 let mut rx = broadcaster.subscribe();
217
218 let peer_id = test_peer_id();
219 broadcaster.send(ConnectionEvent::ConnectionClosed {
220 peer_id: peer_id.clone(),
221 session_id: 0,
222 });
223
224 let event = rx.recv().await.unwrap();
225 assert!(matches!(event, ConnectionEvent::ConnectionClosed { .. }));
226 }
227
228 #[tokio::test]
229 async fn test_multiple_subscribers() {
230 let broadcaster = ConnectionEventBroadcaster::new();
231 let mut rx1 = broadcaster.subscribe();
232 let mut rx2 = broadcaster.subscribe();
233
234 let peer_id = test_peer_id();
235 let count = broadcaster.send(ConnectionEvent::StateChanged {
236 peer_id: peer_id.clone(),
237 session_id: 0,
238 state: ConnectionState::Connected,
239 });
240
241 assert_eq!(count, 2);
242
243 let event1 = rx1.recv().await.unwrap();
244 let event2 = rx2.recv().await.unwrap();
245
246 assert!(matches!(
247 event1,
248 ConnectionEvent::StateChanged {
249 state: ConnectionState::Connected,
250 ..
251 }
252 ));
253 assert!(matches!(
254 event2,
255 ConnectionEvent::StateChanged {
256 state: ConnectionState::Connected,
257 ..
258 }
259 ));
260 }
261
262 #[test]
263 fn test_should_trigger_cleanup() {
264 let peer_id = test_peer_id();
265
266 assert!(
268 ConnectionEvent::ConnectionClosed {
269 peer_id: peer_id.clone(),
270 session_id: 0,
271 }
272 .should_trigger_cleanup()
273 );
274
275 assert!(
276 ConnectionEvent::StateChanged {
277 peer_id: peer_id.clone(),
278 session_id: 0,
279 state: ConnectionState::Closed,
280 }
281 .should_trigger_cleanup()
282 );
283
284 assert!(
285 ConnectionEvent::IceRestartCompleted {
286 peer_id: peer_id.clone(),
287 session_id: 0,
288 success: false,
289 }
290 .should_trigger_cleanup()
291 );
292
293 assert!(
295 !ConnectionEvent::StateChanged {
296 peer_id: peer_id.clone(),
297 session_id: 0,
298 state: ConnectionState::Disconnected,
299 }
300 .should_trigger_cleanup()
301 );
302
303 assert!(
304 !ConnectionEvent::IceRestartCompleted {
305 peer_id: peer_id.clone(),
306 session_id: 0,
307 success: true,
308 }
309 .should_trigger_cleanup()
310 );
311 }
312
313 #[test]
314 fn test_is_recoverable_state() {
315 let peer_id = test_peer_id();
316
317 assert!(
319 ConnectionEvent::StateChanged {
320 peer_id: peer_id.clone(),
321 session_id: 0,
322 state: ConnectionState::Disconnected,
323 }
324 .is_recoverable_state()
325 );
326
327 assert!(
328 ConnectionEvent::StateChanged {
329 peer_id: peer_id.clone(),
330 session_id: 0,
331 state: ConnectionState::Failed,
332 }
333 .is_recoverable_state()
334 );
335
336 assert!(
338 !ConnectionEvent::StateChanged {
339 peer_id: peer_id.clone(),
340 session_id: 0,
341 state: ConnectionState::Closed,
342 }
343 .is_recoverable_state()
344 );
345
346 assert!(
347 !ConnectionEvent::ConnectionClosed {
348 peer_id: peer_id.clone(),
349 session_id: 0,
350 }
351 .is_recoverable_state()
352 );
353 }
354}