Skip to main content

actr_hyper/transport/
connection_event.rs

1//! Connection Event System
2//!
3//! Unified event broadcasting mechanism for connection state changes.
4//! Enables proactive resource cleanup across all transport layers.
5
6use actr_protocol::{ActrId, PayloadType};
7use tokio::sync::broadcast;
8
9/// Connection state enumeration
10/// Maps to WebRTC RTCPeerConnectionState
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum ConnectionState {
13    /// Initial state
14    New,
15    /// ICE/DTLS negotiation in progress
16    Connecting,
17    /// Connection established
18    Connected,
19    /// ICE connectivity lost (may recover)
20    Disconnected,
21    /// ICE connectivity failed (offerer should try ICE restart)
22    Failed,
23    /// Connection closed
24    Closed,
25}
26
27impl std::fmt::Display for ConnectionState {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            ConnectionState::New => write!(f, "New"),
31            ConnectionState::Connecting => write!(f, "Connecting"),
32            ConnectionState::Connected => write!(f, "Connected"),
33            ConnectionState::Disconnected => write!(f, "Disconnected"),
34            ConnectionState::Failed => write!(f, "Failed"),
35            ConnectionState::Closed => write!(f, "Closed"),
36        }
37    }
38}
39
40/// Connection events broadcast to all subscribers
41#[derive(Debug, Clone)]
42pub enum ConnectionEvent {
43    /// Connection state changed
44    StateChanged {
45        peer_id: ActrId,
46        session_id: u64,
47        state: ConnectionState,
48    },
49
50    /// DataChannel closed for specific payload type
51    DataChannelClosed {
52        peer_id: ActrId,
53        session_id: u64,
54        payload_type: PayloadType,
55    },
56
57    /// DataChannel opened for specific payload type
58    /// This event is fired when a DataChannel transitions to Open state,
59    /// indicating SCTP layer is ready for data transmission
60    DataChannelOpened {
61        peer_id: ActrId,
62        session_id: u64,
63        payload_type: PayloadType,
64    },
65
66    /// Connection fully closed (triggers full cleanup)
67    ConnectionClosed { peer_id: ActrId, session_id: u64 },
68
69    /// ICE restart started
70    IceRestartStarted { peer_id: ActrId, session_id: u64 },
71
72    /// ICE restart completed
73    IceRestartCompleted {
74        peer_id: ActrId,
75        session_id: u64,
76        success: bool,
77    },
78
79    /// New offer received (triggers cleanup of existing connection)
80    NewOfferReceived { peer_id: ActrId, sdp: String },
81
82    /// New role assignment (triggers cleanup if role changed)
83    NewRoleAssignment { peer_id: ActrId, is_offerer: bool },
84}
85
86impl ConnectionEvent {
87    /// Get the peer_id from the event
88    pub fn peer_id(&self) -> &ActrId {
89        match self {
90            ConnectionEvent::StateChanged { peer_id, .. } => peer_id,
91            ConnectionEvent::DataChannelClosed { peer_id, .. } => peer_id,
92            ConnectionEvent::DataChannelOpened { peer_id, .. } => peer_id,
93            ConnectionEvent::ConnectionClosed { peer_id, .. } => peer_id,
94            ConnectionEvent::IceRestartStarted { peer_id, .. } => peer_id,
95            ConnectionEvent::IceRestartCompleted { peer_id, .. } => peer_id,
96            ConnectionEvent::NewOfferReceived { peer_id, .. } => peer_id,
97            ConnectionEvent::NewRoleAssignment { peer_id, .. } => peer_id,
98        }
99    }
100
101    /// Get the session_id from the event (None for events without session)
102    pub fn session_id(&self) -> Option<u64> {
103        match self {
104            Self::StateChanged { session_id, .. }
105            | Self::DataChannelClosed { session_id, .. }
106            | Self::DataChannelOpened { session_id, .. }
107            | Self::ConnectionClosed { session_id, .. }
108            | Self::IceRestartStarted { session_id, .. }
109            | Self::IceRestartCompleted { session_id, .. } => Some(*session_id),
110            _ => None,
111        }
112    }
113
114    /// Check if this event should trigger full resource cleanup
115    pub fn should_trigger_cleanup(&self) -> bool {
116        matches!(
117            self,
118            ConnectionEvent::ConnectionClosed { .. }
119                | ConnectionEvent::StateChanged {
120                    state: ConnectionState::Closed,
121                    ..
122                }
123                | ConnectionEvent::IceRestartCompleted { success: false, .. }
124        )
125    }
126
127    /// Check if this event indicates a recoverable state (ICE restart candidate)
128    pub fn is_recoverable_state(&self) -> bool {
129        matches!(
130            self,
131            ConnectionEvent::StateChanged {
132                state: ConnectionState::Disconnected | ConnectionState::Failed,
133                ..
134            }
135        )
136    }
137}
138
139/// Default broadcast channel capacity
140const DEFAULT_CHANNEL_CAPACITY: usize = 256;
141
142/// Connection event broadcaster
143///
144/// Manages a broadcast channel for distributing connection events
145/// to all subscribed layers.
146#[derive(Debug)]
147pub(crate) struct ConnectionEventBroadcaster {
148    tx: broadcast::Sender<ConnectionEvent>,
149}
150
151impl ConnectionEventBroadcaster {
152    /// Create a new broadcaster with default capacity
153    pub(crate) fn new() -> Self {
154        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
155    }
156
157    /// Create a new broadcaster with specified capacity
158    pub(crate) fn with_capacity(capacity: usize) -> Self {
159        let (tx, _) = broadcast::channel(capacity);
160        Self { tx }
161    }
162
163    /// Send an event to all subscribers
164    ///
165    /// Returns the number of receivers that received the event.
166    /// Returns 0 if there are no active subscribers (not an error).
167    pub(crate) fn send(&self, event: ConnectionEvent) -> usize {
168        self.tx.send(event).unwrap_or_default()
169    }
170
171    /// Subscribe to connection events
172    pub(crate) fn subscribe(&self) -> broadcast::Receiver<ConnectionEvent> {
173        self.tx.subscribe()
174    }
175
176    /// Get a clone of the sender for sharing
177    pub(crate) fn sender(&self) -> broadcast::Sender<ConnectionEvent> {
178        self.tx.clone()
179    }
180}
181
182impl Default for ConnectionEventBroadcaster {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl Clone for ConnectionEventBroadcaster {
189    fn clone(&self) -> Self {
190        Self {
191            tx: self.tx.clone(),
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use actr_protocol::{ActrId, ActrType, Realm};
200
201    fn test_peer_id() -> ActrId {
202        ActrId {
203            realm: Realm { realm_id: 1 },
204            serial_number: 1,
205            r#type: ActrType {
206                manufacturer: "test".to_string(),
207                name: "device".to_string(),
208                version: "1.0.0".to_string(),
209            },
210        }
211    }
212
213    #[tokio::test]
214    async fn test_broadcaster_send_receive() {
215        let broadcaster = ConnectionEventBroadcaster::new();
216        let mut rx = broadcaster.subscribe();
217
218        let peer_id = test_peer_id();
219        broadcaster.send(ConnectionEvent::ConnectionClosed {
220            peer_id: peer_id.clone(),
221            session_id: 0,
222        });
223
224        let event = rx.recv().await.unwrap();
225        assert!(matches!(event, ConnectionEvent::ConnectionClosed { .. }));
226    }
227
228    #[tokio::test]
229    async fn test_multiple_subscribers() {
230        let broadcaster = ConnectionEventBroadcaster::new();
231        let mut rx1 = broadcaster.subscribe();
232        let mut rx2 = broadcaster.subscribe();
233
234        let peer_id = test_peer_id();
235        let count = broadcaster.send(ConnectionEvent::StateChanged {
236            peer_id: peer_id.clone(),
237            session_id: 0,
238            state: ConnectionState::Connected,
239        });
240
241        assert_eq!(count, 2);
242
243        let event1 = rx1.recv().await.unwrap();
244        let event2 = rx2.recv().await.unwrap();
245
246        assert!(matches!(
247            event1,
248            ConnectionEvent::StateChanged {
249                state: ConnectionState::Connected,
250                ..
251            }
252        ));
253        assert!(matches!(
254            event2,
255            ConnectionEvent::StateChanged {
256                state: ConnectionState::Connected,
257                ..
258            }
259        ));
260    }
261
262    #[test]
263    fn test_should_trigger_cleanup() {
264        let peer_id = test_peer_id();
265
266        // Should trigger cleanup
267        assert!(
268            ConnectionEvent::ConnectionClosed {
269                peer_id: peer_id.clone(),
270                session_id: 0,
271            }
272            .should_trigger_cleanup()
273        );
274
275        assert!(
276            ConnectionEvent::StateChanged {
277                peer_id: peer_id.clone(),
278                session_id: 0,
279                state: ConnectionState::Closed,
280            }
281            .should_trigger_cleanup()
282        );
283
284        assert!(
285            ConnectionEvent::IceRestartCompleted {
286                peer_id: peer_id.clone(),
287                session_id: 0,
288                success: false,
289            }
290            .should_trigger_cleanup()
291        );
292
293        // Should NOT trigger cleanup
294        assert!(
295            !ConnectionEvent::StateChanged {
296                peer_id: peer_id.clone(),
297                session_id: 0,
298                state: ConnectionState::Disconnected,
299            }
300            .should_trigger_cleanup()
301        );
302
303        assert!(
304            !ConnectionEvent::IceRestartCompleted {
305                peer_id: peer_id.clone(),
306                session_id: 0,
307                success: true,
308            }
309            .should_trigger_cleanup()
310        );
311    }
312
313    #[test]
314    fn test_is_recoverable_state() {
315        let peer_id = test_peer_id();
316
317        // Recoverable states
318        assert!(
319            ConnectionEvent::StateChanged {
320                peer_id: peer_id.clone(),
321                session_id: 0,
322                state: ConnectionState::Disconnected,
323            }
324            .is_recoverable_state()
325        );
326
327        assert!(
328            ConnectionEvent::StateChanged {
329                peer_id: peer_id.clone(),
330                session_id: 0,
331                state: ConnectionState::Failed,
332            }
333            .is_recoverable_state()
334        );
335
336        // Not recoverable
337        assert!(
338            !ConnectionEvent::StateChanged {
339                peer_id: peer_id.clone(),
340                session_id: 0,
341                state: ConnectionState::Closed,
342            }
343            .is_recoverable_state()
344        );
345
346        assert!(
347            !ConnectionEvent::ConnectionClosed {
348                peer_id: peer_id.clone(),
349                session_id: 0,
350            }
351            .is_recoverable_state()
352        );
353    }
354}