actr_runtime/transport/
connection_event.rs

1//! Connection Event System
2//!
3//! Unified event broadcasting mechanism for connection state changes.
4//! Enables proactive resource cleanup across all transport layers.
5
6use actr_protocol::{ActrId, PayloadType};
7use tokio::sync::broadcast;
8
9/// Connection state enumeration
10/// Maps to WebRTC RTCPeerConnectionState
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum ConnectionState {
13    /// Initial state
14    New,
15    /// ICE/DTLS negotiation in progress
16    Connecting,
17    /// Connection established
18    Connected,
19    /// ICE connectivity lost (may recover)
20    Disconnected,
21    /// ICE connectivity failed (offerer should try ICE restart)
22    Failed,
23    /// Connection closed
24    Closed,
25}
26
27impl std::fmt::Display for ConnectionState {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            ConnectionState::New => write!(f, "New"),
31            ConnectionState::Connecting => write!(f, "Connecting"),
32            ConnectionState::Connected => write!(f, "Connected"),
33            ConnectionState::Disconnected => write!(f, "Disconnected"),
34            ConnectionState::Failed => write!(f, "Failed"),
35            ConnectionState::Closed => write!(f, "Closed"),
36        }
37    }
38}
39
40/// Connection events broadcast to all subscribers
41#[derive(Debug, Clone)]
42pub enum ConnectionEvent {
43    /// Connection state changed
44    StateChanged {
45        peer_id: ActrId,
46        state: ConnectionState,
47    },
48
49    /// DataChannel closed for specific payload type
50    DataChannelClosed {
51        peer_id: ActrId,
52        payload_type: PayloadType,
53    },
54
55    /// Connection fully closed (triggers full cleanup)
56    ConnectionClosed { peer_id: ActrId },
57
58    /// ICE restart started
59    IceRestartStarted { peer_id: ActrId },
60
61    /// ICE restart completed
62    IceRestartCompleted { peer_id: ActrId, success: bool },
63
64    /// New offer received (triggers cleanup of existing connection)
65    NewOfferReceived { peer_id: ActrId, sdp: String },
66
67    /// New role assignment (triggers cleanup if role changed)
68    NewRoleAssignment { peer_id: ActrId, is_offerer: bool },
69}
70
71impl ConnectionEvent {
72    /// Get the peer_id from the event
73    pub fn peer_id(&self) -> &ActrId {
74        match self {
75            ConnectionEvent::StateChanged { peer_id, .. } => peer_id,
76            ConnectionEvent::DataChannelClosed { peer_id, .. } => peer_id,
77            ConnectionEvent::ConnectionClosed { peer_id } => peer_id,
78            ConnectionEvent::IceRestartStarted { peer_id } => peer_id,
79            ConnectionEvent::IceRestartCompleted { peer_id, .. } => peer_id,
80            ConnectionEvent::NewOfferReceived { peer_id, .. } => peer_id,
81            ConnectionEvent::NewRoleAssignment { peer_id, .. } => peer_id,
82        }
83    }
84
85    /// Check if this event should trigger full resource cleanup
86    pub fn should_trigger_cleanup(&self) -> bool {
87        matches!(
88            self,
89            ConnectionEvent::ConnectionClosed { .. }
90                | ConnectionEvent::StateChanged {
91                    state: ConnectionState::Closed,
92                    ..
93                }
94                | ConnectionEvent::IceRestartCompleted { success: false, .. }
95        )
96    }
97
98    /// Check if this event indicates a recoverable state (ICE restart candidate)
99    pub fn is_recoverable_state(&self) -> bool {
100        matches!(
101            self,
102            ConnectionEvent::StateChanged {
103                state: ConnectionState::Disconnected | ConnectionState::Failed,
104                ..
105            }
106        )
107    }
108}
109
110/// Default broadcast channel capacity
111const DEFAULT_CHANNEL_CAPACITY: usize = 64;
112
113/// Connection event broadcaster
114///
115/// Manages a broadcast channel for distributing connection events
116/// to all subscribed layers.
117#[derive(Debug)]
118pub struct ConnectionEventBroadcaster {
119    tx: broadcast::Sender<ConnectionEvent>,
120}
121
122impl ConnectionEventBroadcaster {
123    /// Create a new broadcaster with default capacity
124    pub fn new() -> Self {
125        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
126    }
127
128    /// Create a new broadcaster with specified capacity
129    pub fn with_capacity(capacity: usize) -> Self {
130        let (tx, _) = broadcast::channel(capacity);
131        Self { tx }
132    }
133
134    /// Send an event to all subscribers
135    ///
136    /// Returns the number of receivers that received the event.
137    /// Returns 0 if there are no active subscribers (not an error).
138    pub fn send(&self, event: ConnectionEvent) -> usize {
139        match self.tx.send(event) {
140            Ok(count) => count,
141            Err(_) => 0, // No active receivers
142        }
143    }
144
145    /// Subscribe to connection events
146    pub fn subscribe(&self) -> broadcast::Receiver<ConnectionEvent> {
147        self.tx.subscribe()
148    }
149
150    /// Get the number of active subscribers
151    pub fn receiver_count(&self) -> usize {
152        self.tx.receiver_count()
153    }
154
155    /// Get a clone of the sender for sharing
156    pub fn sender(&self) -> broadcast::Sender<ConnectionEvent> {
157        self.tx.clone()
158    }
159}
160
161impl Default for ConnectionEventBroadcaster {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167impl Clone for ConnectionEventBroadcaster {
168    fn clone(&self) -> Self {
169        Self {
170            tx: self.tx.clone(),
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use actr_protocol::{ActrId, ActrType, Realm};
179
180    fn test_peer_id() -> ActrId {
181        ActrId {
182            realm: Realm { realm_id: 1 },
183            serial_number: 1,
184            r#type: ActrType {
185                manufacturer: "test".to_string(),
186                name: "device".to_string(),
187            },
188        }
189    }
190
191    #[tokio::test]
192    async fn test_broadcaster_send_receive() {
193        let broadcaster = ConnectionEventBroadcaster::new();
194        let mut rx = broadcaster.subscribe();
195
196        let peer_id = test_peer_id();
197        broadcaster.send(ConnectionEvent::ConnectionClosed {
198            peer_id: peer_id.clone(),
199        });
200
201        let event = rx.recv().await.unwrap();
202        assert!(matches!(event, ConnectionEvent::ConnectionClosed { .. }));
203    }
204
205    #[tokio::test]
206    async fn test_multiple_subscribers() {
207        let broadcaster = ConnectionEventBroadcaster::new();
208        let mut rx1 = broadcaster.subscribe();
209        let mut rx2 = broadcaster.subscribe();
210
211        let peer_id = test_peer_id();
212        let count = broadcaster.send(ConnectionEvent::StateChanged {
213            peer_id: peer_id.clone(),
214            state: ConnectionState::Connected,
215        });
216
217        assert_eq!(count, 2);
218
219        let event1 = rx1.recv().await.unwrap();
220        let event2 = rx2.recv().await.unwrap();
221
222        assert!(matches!(
223            event1,
224            ConnectionEvent::StateChanged {
225                state: ConnectionState::Connected,
226                ..
227            }
228        ));
229        assert!(matches!(
230            event2,
231            ConnectionEvent::StateChanged {
232                state: ConnectionState::Connected,
233                ..
234            }
235        ));
236    }
237
238    #[test]
239    fn test_should_trigger_cleanup() {
240        let peer_id = test_peer_id();
241
242        // Should trigger cleanup
243        assert!(
244            ConnectionEvent::ConnectionClosed {
245                peer_id: peer_id.clone()
246            }
247            .should_trigger_cleanup()
248        );
249
250        assert!(
251            ConnectionEvent::StateChanged {
252                peer_id: peer_id.clone(),
253                state: ConnectionState::Closed,
254            }
255            .should_trigger_cleanup()
256        );
257
258        assert!(
259            ConnectionEvent::IceRestartCompleted {
260                peer_id: peer_id.clone(),
261                success: false,
262            }
263            .should_trigger_cleanup()
264        );
265
266        // Should NOT trigger cleanup
267        assert!(
268            !ConnectionEvent::StateChanged {
269                peer_id: peer_id.clone(),
270                state: ConnectionState::Disconnected,
271            }
272            .should_trigger_cleanup()
273        );
274
275        assert!(
276            !ConnectionEvent::IceRestartCompleted {
277                peer_id: peer_id.clone(),
278                success: true,
279            }
280            .should_trigger_cleanup()
281        );
282    }
283
284    #[test]
285    fn test_is_recoverable_state() {
286        let peer_id = test_peer_id();
287
288        // Recoverable states
289        assert!(
290            ConnectionEvent::StateChanged {
291                peer_id: peer_id.clone(),
292                state: ConnectionState::Disconnected,
293            }
294            .is_recoverable_state()
295        );
296
297        assert!(
298            ConnectionEvent::StateChanged {
299                peer_id: peer_id.clone(),
300                state: ConnectionState::Failed,
301            }
302            .is_recoverable_state()
303        );
304
305        // Not recoverable
306        assert!(
307            !ConnectionEvent::StateChanged {
308                peer_id: peer_id.clone(),
309                state: ConnectionState::Closed,
310            }
311            .is_recoverable_state()
312        );
313
314        assert!(
315            !ConnectionEvent::ConnectionClosed {
316                peer_id: peer_id.clone()
317            }
318            .is_recoverable_state()
319        );
320    }
321}