actr_runtime/wire/webrtc/
coordinator.rs

1// WebRTC Signaling Coordinator - Coordinates WebRTC P2P connection establishment
2
3#[allow(dead_code)]
4fn is_ipv4_candidate_allowed(cand: &str) -> bool {
5    // Only filter out IPv6 candidates (link-local and other IPv6 addresses)
6    // Allow all IPv4 candidates (private and public IPs)
7    if cand.contains("fe80::") || cand.contains(" udp6 ") || cand.contains("::") {
8        return false;
9    }
10
11    // Accept all IPv4 candidates by default
12    // This includes: loopback (127.x), private (10.x, 172.x, 192.168.x), and public IPs
13    true
14}
15
16// Responsibilities:
17// - Listen to WebRTC signaling messages from SignalingClient
18// - Handle Offer/Answer/ICE candidate exchanges
19// - Establish and manage RTCPeerConnection instances
20// - Create and cache WebRtcConnection instances
21// - Aggregate messages from all peers
22
23use super::connection::WebRtcConnection;
24use super::negotiator::WebRtcNegotiator;
25#[cfg(feature = "opentelemetry")]
26use super::trace;
27use super::{SignalingClient, WebRtcConfig};
28use crate::INITIAL_CONNECTION_TIMEOUT;
29use crate::error::{RuntimeError, RuntimeResult};
30use crate::inbound::MediaFrameRegistry;
31use crate::lifecycle::CredentialState;
32use crate::transport::connection_event::{ConnectionEvent, ConnectionEventBroadcaster};
33use actr_framework::Bytes;
34use actr_protocol::ActrIdExt;
35use actr_protocol::prost::Message as ProstMessage;
36use actr_protocol::{
37    ActrId, ActrRelay, PayloadType, RoleAssignment, RoleNegotiation, SignalingEnvelope, actr_relay,
38    session_description::Type as SdpType, signaling_envelope,
39};
40use std::collections::HashMap;
41use std::{
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
46use tokio::task::JoinHandle;
47use tokio_util::sync::CancellationToken;
48use tracing::Instrument;
49#[cfg(feature = "opentelemetry")]
50use tracing_opentelemetry::OpenTelemetrySpanExt;
51use webrtc::data_channel::RTCDataChannel;
52use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
53use webrtc::ice_transport::ice_gathering_state::RTCIceGatheringState;
54use webrtc::peer_connection::{RTCPeerConnection, peer_connection_state::RTCPeerConnectionState};
55use webrtc::track::track_local::TrackLocalWriter;
56
57const ICE_RESTART_MAX_RETRIES: u32 = 5;
58const ICE_RESTART_TIMEOUT: Duration = Duration::from_secs(5);
59const ICE_RESTART_INITIAL_BACKOFF_MS: u64 = 5000;
60const ICE_RESTART_MAX_BACKOFF_MS: u64 = 60000;
61const ICE_RESTART_MAX_TOTAL_DURATION: Duration = Duration::from_secs(60);
62const ICE_GATHERING_TIMEOUT: Duration = Duration::from_secs(10);
63const ROLE_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
64
65// Health check constants
66const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(10);
67const MAX_FAILED_DURATION: Duration = Duration::from_secs(60); // 1 minute
68
69/// Per-peer negotiation state (role, ready signals)
70/// Consolidates multiple related fields into a single lock to reduce contention.
71#[derive(Default)]
72struct PeerNegotiationState {
73    /// Role negotiation responder
74    role_tx: Option<oneshot::Sender<bool>>,
75    /// Ready notifier for answerer path
76    ready_tx: Option<oneshot::Sender<()>>,
77    /// Ready receiver for proactive offerer path
78    ready_rx: Option<oneshot::Receiver<()>>,
79}
80
81/// Simple exponential backoff iterator for retries
82#[derive(Debug)]
83struct ExponentialBackoff {
84    current_retries: u32,
85    max_retries: Option<u32>,
86    initial_delay: Duration,
87    max_delay: Duration,
88    /// Optional total duration limit (across all retries)
89    max_total_duration: Option<Duration>,
90    /// Start time for tracking total duration
91    start_time: Option<Instant>,
92}
93
94impl ExponentialBackoff {
95    pub fn new(initial_delay: Duration, max_delay: Duration, max_retries: Option<u32>) -> Self {
96        Self {
97            current_retries: 0,
98            max_retries,
99            initial_delay,
100            max_delay,
101            max_total_duration: None,
102            start_time: None,
103        }
104    }
105
106    /// Create a new ExponentialBackoff with total duration limit
107    pub fn with_total_duration(
108        initial_delay: Duration,
109        max_delay: Duration,
110        max_retries: Option<u32>,
111        max_total_duration: Duration,
112    ) -> Self {
113        Self {
114            current_retries: 0,
115            max_retries,
116            initial_delay,
117            max_delay,
118            max_total_duration: Some(max_total_duration),
119            start_time: Some(Instant::now()),
120        }
121    }
122
123    /// Check if total duration has been exceeded
124    fn is_duration_exceeded(&self) -> bool {
125        if let (Some(max_duration), Some(start)) = (self.max_total_duration, self.start_time) {
126            start.elapsed() > max_duration
127        } else {
128            false
129        }
130    }
131}
132
133impl Iterator for ExponentialBackoff {
134    type Item = Duration;
135
136    fn next(&mut self) -> Option<Duration> {
137        // Initialize start_time on first call if max_total_duration is set
138        if self.max_total_duration.is_some() && self.start_time.is_none() {
139            self.start_time = Some(Instant::now());
140        }
141
142        // Check total duration limit first
143        if self.is_duration_exceeded() {
144            return None;
145        }
146
147        let delay = self.initial_delay;
148
149        // Check max retries
150        if let Some(max_retries) = self.max_retries {
151            self.current_retries += 1;
152            if self.current_retries > max_retries {
153                return None;
154            }
155        }
156
157        self.initial_delay = (self.initial_delay * 2).min(self.max_delay);
158        Some(delay)
159    }
160}
161
162/// Type alias for message receiver (from all peers)
163type MessageRx = Arc<Mutex<mpsc::UnboundedReceiver<(Vec<u8>, Bytes, PayloadType)>>>;
164
165/// Peer connection state
166struct PeerState {
167    /// RTCPeerConnection (for receiving ICE candidates)
168    peer_connection: Arc<RTCPeerConnection>,
169
170    /// WebRtcConnection (for business message transmission)
171    webrtc_conn: WebRtcConnection,
172
173    /// Connection ready notification (for initiate_connection to wait)
174    ready_tx: Option<oneshot::Sender<()>>,
175
176    /// Whether we are the offerer for the current session (affects ICE restart handling)
177    is_offerer: bool,
178
179    /// Whether ICE restart is in progress (controls buffering and retries)
180    ice_restart_inflight: bool,
181
182    /// Restart attempts counter (resets on success)
183    ice_restart_attempts: u32,
184
185    /// In-flight ICE restart task handle (for de-duplication and lifecycle management)
186    restart_task_handle: Option<JoinHandle<()>>,
187
188    /// Last state change timestamp (for health check)
189    last_state_change: std::time::Instant,
190
191    /// Current connection state (for health check)
192    current_state: RTCPeerConnectionState,
193}
194
195/// WebRTC signaling coordinator
196pub struct WebRtcCoordinator {
197    /// Local Actor ID
198    local_id: ActrId,
199
200    /// Local credentials
201    credential_state: CredentialState,
202
203    /// SignalingClient (for sending ICE/SDP)
204    signaling_client: Arc<dyn SignalingClient>,
205
206    /// WebRTC negotiator
207    negotiator: WebRtcNegotiator,
208
209    /// Peer state mapping (ActrId → PeerState)
210    peers: Arc<RwLock<HashMap<ActrId, PeerState>>>,
211
212    /// Pending ICE candidates (received before remote description is set)
213    /// ActrId → Vec<candidate_string>
214    pending_candidates: Arc<RwLock<HashMap<ActrId, Vec<String>>>>,
215
216    /// Message receive channel (aggregated from all peers)
217    /// (from: ActrId bytes, data: Bytes)
218    /// Format: (sender_id_bytes, message_data, payload_type)
219    message_rx: MessageRx,
220    message_tx: mpsc::UnboundedSender<(Vec<u8>, Bytes, PayloadType)>,
221
222    /// MediaTrack callback registry (for WebRTC native media streams)
223    media_frame_registry: Arc<MediaFrameRegistry>,
224
225    /// Per-peer negotiation state (role, ready signals, restart tasks)
226    /// Single lock consolidating pending_role, pending_ready, pending_ready_wait, and in_flight_restarts
227    peer_negotiation: Arc<Mutex<HashMap<ActrId, PeerNegotiationState>>>,
228
229    /// Connection event broadcaster for notifying all layers
230    event_broadcaster: ConnectionEventBroadcaster,
231
232    /// Root tracing contexts for connection initiation (ActrId → Context)
233    #[cfg(feature = "opentelemetry")]
234    root_context_map: Arc<RwLock<HashMap<ActrId, opentelemetry::Context>>>,
235}
236
237impl WebRtcCoordinator {
238    /// Create new coordinator
239    pub fn new(
240        local_id: ActrId,
241        credential_state: CredentialState,
242        signaling_client: Arc<dyn SignalingClient>,
243        webrtc_config: WebRtcConfig,
244        realm_id: u32,
245        media_frame_registry: Arc<MediaFrameRegistry>,
246    ) -> Self {
247        let (message_tx, message_rx) = mpsc::unbounded_channel();
248        let negotiator = WebRtcNegotiator::new(webrtc_config, realm_id, credential_state.clone());
249
250        Self {
251            local_id,
252            credential_state,
253            signaling_client,
254            negotiator,
255            peers: Arc::new(RwLock::new(HashMap::new())),
256            pending_candidates: Arc::new(RwLock::new(HashMap::new())),
257            message_rx: Arc::new(Mutex::new(message_rx)),
258            message_tx,
259            media_frame_registry,
260            peer_negotiation: Arc::new(Mutex::new(HashMap::new())),
261            event_broadcaster: ConnectionEventBroadcaster::new(),
262            #[cfg(feature = "opentelemetry")]
263            root_context_map: Arc::new(RwLock::new(HashMap::new())),
264        }
265    }
266
267    /// Get a subscriber for connection events
268    pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<ConnectionEvent> {
269        self.event_broadcaster.subscribe()
270    }
271
272    /// Get the event sender for sharing with WebRtcConnection instances
273    pub fn event_sender(&self) -> tokio::sync::broadcast::Sender<ConnectionEvent> {
274        self.event_broadcaster.sender()
275    }
276
277    /// Trigger ICE restart for all connections in Failed/Disconnected state
278    pub async fn retry_failed_connections(self: &Arc<Self>) {
279        let peers = self.peers.read().await;
280        // Collect peers that need restart to avoid holding lock during async operations
281        let mut targets = Vec::new();
282
283        for (peer_id, state) in peers.iter() {
284            match state.current_state {
285                RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => {
286                    if !state.ice_restart_inflight {
287                        targets.push(peer_id.clone());
288                    }
289                }
290                _ => {
291                    #[cfg(test)]
292                    tracing::debug!(
293                        "Actor {:?} is in state {:?}, test restart",
294                        peer_id,
295                        state.current_state
296                    );
297                    targets.push(peer_id.clone());
298                }
299            }
300        }
301        drop(peers); // Release lock
302
303        for peer_id in targets {
304            tracing::info!("♻️ Auto-retrying failed connection to actor {:?}", peer_id);
305            if let Err(e) = self.restart_ice(&peer_id).await {
306                tracing::error!("❌ Failed to restart ICE for {:?}: {}", peer_id, e);
307            }
308        }
309    }
310
311    /// Clear pending ICE restart attempts (called on network loss)
312    pub async fn clear_pending_restarts(&self) {
313        let mut peers = self.peers.write().await;
314        for (peer_id, state) in peers.iter_mut() {
315            if state.ice_restart_inflight {
316                tracing::info!("🛑 Aborting in-flight ICE restart for {:?}", peer_id);
317                // Cancel restart task if it exists
318                if let Some(handle) = state.restart_task_handle.take() {
319                    handle.abort();
320                }
321                state.ice_restart_inflight = false;
322                // Note: We don't reset attempts here, as we might want to resume counting if network comes back quickly?
323                // Actually, resetting makes sense as it's a fresh network start.
324                state.ice_restart_attempts = 0;
325            }
326        }
327    }
328
329    /// Start internal event listener for handling connection close events
330    ///
331    /// This listens for ConnectionClosed and DataChannelClosed events and triggers
332    /// cleanup of WebRtcCoordinator's internal resources (peers map, pending candidates, etc.)
333    fn spawn_internal_event_listener(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
334        let mut event_rx = self.event_broadcaster.subscribe();
335        let coordinator = Arc::downgrade(self);
336
337        tokio::spawn(async move {
338            loop {
339                match event_rx.recv().await {
340                    Ok(event) => {
341                        if let Some(coord) = coordinator.upgrade() {
342                            // Extract peer_id and check if cleanup is needed
343                            let peer_id_to_cleanup = match &event {
344                                ConnectionEvent::DataChannelClosed {
345                                    peer_id,
346                                    payload_type,
347                                } => {
348                                    // Only cleanup if peer still exists (avoid duplicate cleanup)
349                                    if coord.peers.read().await.contains_key(peer_id) {
350                                        tracing::warn!(
351                                            "⚠️ DataChannel closed for peer {}, payload_type={:?}; triggering coordinator cleanup",
352                                            peer_id.serial_number,
353                                            payload_type
354                                        );
355                                        Some(peer_id.clone())
356                                    } else {
357                                        tracing::debug!(
358                                            "ℹ️ DataChannel closed for peer {} but already cleaned up",
359                                            peer_id.serial_number
360                                        );
361                                        None
362                                    }
363                                }
364                                ConnectionEvent::ConnectionClosed { peer_id } => {
365                                    if coord.peers.read().await.contains_key(peer_id) {
366                                        tracing::warn!(
367                                            "⚠️ Connection closed for peer {}; triggering coordinator cleanup",
368                                            peer_id.serial_number
369                                        );
370                                        Some(peer_id.clone())
371                                    } else {
372                                        tracing::debug!(
373                                            "ℹ️ Connection closed for peer {} but already cleaned up",
374                                            peer_id.serial_number
375                                        );
376                                        None
377                                    }
378                                }
379                                ConnectionEvent::StateChanged { peer_id, state } => {
380                                    use crate::transport::connection_event::ConnectionState;
381                                    if matches!(state, ConnectionState::Closed) {
382                                        if coord.peers.read().await.contains_key(peer_id) {
383                                            tracing::warn!(
384                                                "⚠️ PeerConnection state changed to Closed for peer {}; triggering coordinator cleanup",
385                                                peer_id.serial_number
386                                            );
387                                            Some(peer_id.clone())
388                                        } else {
389                                            tracing::debug!(
390                                                "ℹ️ PeerConnection Closed for peer {} but already cleaned up",
391                                                peer_id.serial_number
392                                            );
393                                            None
394                                        }
395                                    } else {
396                                        None
397                                    }
398                                }
399                                _ => None,
400                            };
401
402                            // Cleanup outside the match to avoid holding read lock
403                            if let Some(peer_id) = peer_id_to_cleanup {
404                                coord.cleanup_cancelled_connection(&peer_id).await;
405                            }
406                        } else {
407                            // Coordinator dropped, exit
408                            tracing::debug!(
409                                "🔌 WebRtcCoordinator internal event listener stopping (coordinator dropped)"
410                            );
411                            break;
412                        }
413                    }
414                    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
415                        tracing::warn!(
416                            "⚠️ WebRtcCoordinator internal event listener lagged by {} events",
417                            n
418                        );
419                    }
420                    Err(tokio::sync::broadcast::error::RecvError::Closed) => {
421                        tracing::debug!(
422                            "🔌 WebRtcCoordinator internal event listener stopped (channel closed)"
423                        );
424                        break;
425                    }
426                }
427            }
428        })
429    }
430
431    /// Start health check task to clean up stale connections
432    ///
433    /// Periodically checks peer connection states and cleans up:
434    /// - Connections in Failed/Closed state for too long (> 1 minutes)
435    ///
436    /// Note: Disconnected states and ICE restart failures are handled automatically
437    /// by the existing ICE restart mechanism, so we only check terminal states here.
438    fn spawn_health_check_task(self: &Arc<Self>) -> JoinHandle<()> {
439        let coordinator = Arc::downgrade(self);
440
441        tokio::spawn(async move {
442            let mut interval = tokio::time::interval(HEALTH_CHECK_INTERVAL);
443            interval.tick().await; // Skip first immediate tick
444
445            loop {
446                interval.tick().await;
447
448                if let Some(coord) = coordinator.upgrade() {
449                    coord.check_and_cleanup_stale_connections().await;
450                } else {
451                    tracing::debug!("🔌 Health check task stopping (coordinator dropped)");
452                    break;
453                }
454            }
455
456            tracing::info!("🛑 Health check task exited");
457        })
458    }
459
460    /// Check and cleanup stale peer connections
461    ///
462    /// This method identifies peers that should be cleaned up based on:
463    /// - Failed/Closed state duration exceeding threshold
464    ///
465    /// Note: ICE restart failures and Disconnected states are handled automatically
466    /// by the ICE restart mechanism, so we don't need to check them here.
467    async fn check_and_cleanup_stale_connections(&self) {
468        let peers_to_cleanup: Vec<(ActrId, String)> = {
469            let peers = self.peers.read().await;
470            let now = std::time::Instant::now();
471
472            peers
473                .iter()
474                .filter_map(|(peer_id, state)| {
475                    // Get current real-time state from RTCPeerConnection
476                    let current_state = state.peer_connection.connection_state();
477                    let duration_since_change = now.duration_since(state.last_state_change);
478
479                    // Cleanup condition: Failed/Closed state for too long
480                    // These are terminal states that won't recover automatically
481                    if matches!(
482                        current_state,
483                        RTCPeerConnectionState::Failed | RTCPeerConnectionState::Closed
484                    ) && duration_since_change > MAX_FAILED_DURATION
485                    {
486                        let reason = format!(
487                            "{:?} for {}s",
488                            current_state,
489                            duration_since_change.as_secs()
490                        );
491
492                        tracing::warn!(
493                            "🧹 Marking peer {} for cleanup: {}",
494                            peer_id.serial_number,
495                            reason
496                        );
497
498                        Some((peer_id.clone(), reason))
499                    } else {
500                        None
501                    }
502                })
503                .collect()
504        };
505
506        // Cleanup marked peers
507        if !peers_to_cleanup.is_empty() {
508            tracing::info!(
509                "🧹 Health check: cleaning up {} stale connection(s)",
510                peers_to_cleanup.len()
511            );
512
513            for (peer_id, reason) in peers_to_cleanup {
514                tracing::info!(
515                    "🧹 Cleaning up stale connection for peer {}: {}",
516                    peer_id.serial_number,
517                    reason
518                );
519                self.cleanup_cancelled_connection(&peer_id).await;
520            }
521        }
522    }
523
524    /// Start signaling coordinator (listen for ActrRelay messages)
525    ///
526    /// This method starts a background task that continuously listens for messages from SignalingClient
527    /// and handles WebRTC-related signaling (Offer/Answer/ICE)
528    pub async fn start(self: Arc<Self>) -> RuntimeResult<()> {
529        tracing::info!("🚀 WebRtcCoordinator starting signaling loop");
530
531        // Start internal event listener for connection close handling
532        self.spawn_internal_event_listener();
533
534        // Start health check task for cleaning up stale connections
535        self.spawn_health_check_task();
536
537        let coordinator = self.clone();
538        tokio::spawn(async move {
539            loop {
540                // 1. Receive message from SignalingClient
541                match coordinator.signaling_client.receive_envelope().await {
542                    Ok(Some(envelope)) => {
543                        #[cfg(feature = "opentelemetry")]
544                        let (span, remote_ctx) = {
545                            let remote_ctx = trace::extract_trace_context(&envelope);
546                            let span = tracing::info_span!(
547                                "signaling.handle_envelope",
548                                envelope_id = envelope.envelope_id,
549                                reply_for = ?envelope.reply_for
550                            );
551                            span.set_parent(remote_ctx.clone());
552                            (span, remote_ctx)
553                        };
554
555                        let handle_envelope_fut = coordinator.handle_envelope(
556                            envelope,
557                            #[cfg(feature = "opentelemetry")]
558                            remote_ctx,
559                        );
560                        #[cfg(feature = "opentelemetry")]
561                        let handle_envelope_fut = handle_envelope_fut.instrument(span);
562                        handle_envelope_fut.await;
563                    }
564                    Ok(None) => {
565                        tracing::info!(
566                            "🔌 SignalingClient connection closed, exiting signaling loop"
567                        );
568                        break;
569                    }
570                    Err(e) => {
571                        tracing::error!("❌ Signaling receive error: {}", e);
572                        // Continue loop, don't exit (may be temporary error)
573                    }
574                }
575            }
576
577            tracing::info!("🛑 WebRtcCoordinator signaling loop exited");
578        });
579
580        Ok(())
581    }
582
583    /// Handle received signaling envelope
584    async fn handle_envelope(
585        self: &Arc<Self>,
586        envelope: SignalingEnvelope,
587        #[cfg(feature = "opentelemetry")] remote_ctx: opentelemetry::Context,
588    ) {
589        // Decode SignalingEnvelope
590        match envelope.flow {
591            Some(signaling_envelope::Flow::ActrRelay(relay)) => {
592                let source = relay.source;
593                let target = relay.target;
594                #[cfg(feature = "opentelemetry")]
595                self.root_context_map
596                    .write()
597                    .await
598                    .insert(source.clone(), remote_ctx);
599                match relay.payload {
600                    Some(actr_relay::Payload::SessionDescription(sd)) => match sd.r#type() {
601                        SdpType::Offer => {
602                            tracing::info!("📥 Received Offer from {:?}", source.serial_number);
603                            if let Err(e) = self.handle_offer(&source, sd.sdp).await {
604                                tracing::error!("❌ Failed to handle Offer: {}", e);
605                            }
606                        }
607                        SdpType::Answer => {
608                            tracing::info!("📥 Received Answer from {:?}", source.serial_number);
609                            if let Err(e) = self.handle_answer(&source, sd.sdp).await {
610                                tracing::error!("❌ Failed to handle Answer: {}", e);
611                            }
612                        }
613                        SdpType::RenegotiationOffer => {
614                            tracing::warn!("⚠️ Received RenegotiationOffer, not supported yet");
615                        }
616                        SdpType::IceRestartOffer => {
617                            tracing::info!(
618                                "♻️ Received ICE Restart Offer from {:?}",
619                                source.serial_number
620                            );
621                            if let Err(e) = self.handle_ice_restart_offer(&source, sd.sdp).await {
622                                tracing::error!("❌ Failed to handle ICE Restart Offer: {}", e);
623                            }
624                        }
625                    },
626                    Some(actr_relay::Payload::RoleAssignment(assign)) => {
627                        tracing::info!(
628                            "🎭 Received RoleAssignment from {:?}, is_offerer={} (source peer)",
629                            source.serial_number,
630                            assign.is_offerer,
631                        );
632                        let peer = if source == self.local_id {
633                            target.clone()
634                        } else {
635                            source.clone()
636                        };
637                        self.handle_role_assignment(assign.clone(), peer).await;
638                    }
639                    Some(actr_relay::Payload::IceCandidate(ice)) => {
640                        tracing::debug!(
641                            "📥 Received ICE Candidate from {:?}",
642                            source.serial_number
643                        );
644                        if let Err(e) = self.handle_ice_candidate(&source, ice.candidate).await {
645                            tracing::error!("❌ Failed to handle ICE Candidate: {}", e);
646                        }
647                    }
648                    Some(actr_relay::Payload::RoleNegotiation(_)) => {
649                        tracing::trace!(
650                            "📥 Received RoleNegotiation payload; ignored by WebRtcCoordinator"
651                        );
652                    }
653                    None => {
654                        tracing::warn!("⚠️ ActrRelay missing payload");
655                    }
656                }
657            }
658            Some(other_flow) => {
659                tracing::warn!("⚠️ Ignoring non-ActrRelay flow: {:?}", other_flow);
660            }
661            None => {
662                tracing::warn!("⚠️ SignalingEnvelope missing flow");
663            }
664        }
665    }
666
667    /// Close all peer connections and clear internal peer state.
668    ///
669    /// This is typically called during shutdown to ensure that all
670    /// RTCPeerConnection instances are closed and associated state
671    /// (pending ICE candidates, WebRtcConnection state) is dropped.
672    pub async fn close_all_peers(&self) -> RuntimeResult<()> {
673        tracing::info!("🔻 Closing all WebRTC peer connections");
674
675        // Take snapshot of peers and clear map
676        let peers_snapshot: Vec<Arc<RTCPeerConnection>> = {
677            let mut peers = self.peers.write().await;
678            let conns: Vec<Arc<RTCPeerConnection>> =
679                peers.values().map(|p| p.peer_connection.clone()).collect();
680            peers.clear();
681            conns
682        };
683
684        // Clear pending ICE candidates
685        {
686            let mut pending = self.pending_candidates.write().await;
687            pending.clear();
688        }
689
690        // Clear root tracing contexts (if enabled)
691        #[cfg(feature = "opentelemetry")]
692        self.root_context_map.write().await.clear();
693
694        // Close each RTCPeerConnection
695        for pc in peers_snapshot {
696            tracing::info!("🔻 Closing PeerConnection");
697
698            if let Err(e) = pc.close().await {
699                tracing::warn!("⚠️ Failed to close PeerConnection: {}", e);
700            } else {
701                tracing::info!("✅ PeerConnection closed");
702            }
703        }
704
705        Ok(())
706    }
707
708    /// Send ActrRelay message (internal helper method)
709    #[cfg_attr(
710        feature = "opentelemetry",
711        tracing::instrument(level = "info", skip_all, fields(target = %target.to_string_repr()))
712    )]
713    async fn send_actr_relay(
714        &self,
715        target: &ActrId,
716        payload: actr_relay::Payload,
717    ) -> RuntimeResult<()> {
718        let credential = self.credential_state.credential().await;
719        let relay = ActrRelay {
720            source: self.local_id.clone(),
721            credential,
722            target: target.clone(),
723            payload: Some(payload),
724        };
725
726        let flow = signaling_envelope::Flow::ActrRelay(relay);
727
728        let envelope = SignalingEnvelope {
729            envelope_version: 1,
730            envelope_id: uuid::Uuid::new_v4().to_string(),
731            reply_for: None,
732            timestamp: prost_types::Timestamp {
733                seconds: chrono::Utc::now().timestamp(),
734                nanos: 0,
735            },
736            traceparent: None,
737            tracestate: None,
738            flow: Some(flow),
739        };
740
741        self.signaling_client
742            .send_envelope(envelope)
743            .await
744            .map_err(|e| RuntimeError::Unavailable {
745                message: format!("Signaling server unavailable: {e}"),
746                target: None,
747            })?;
748
749        Ok(())
750    }
751
752    /// Initiate connection (create Offer)
753    ///
754    /// Acts as the initiator, sending a WebRTC connection request to the target peer
755    #[cfg_attr(
756        feature = "opentelemetry",
757        tracing::instrument(level = "info", skip_all, fields(target_id = %target.to_string_repr()))
758    )]
759    pub async fn initiate_connection(
760        self: &Arc<Self>,
761        target: &ActrId,
762    ) -> RuntimeResult<oneshot::Receiver<()>> {
763        tracing::info!(
764            "🚀 Initiating P2P connection to {}",
765            target.to_string_repr()
766        );
767
768        // Role negotiation: determine if we should be offerer or answerer
769        let role_result =
770            tokio::time::timeout(Duration::from_secs(15), self.negotiate_role(target)).await;
771
772        let is_offerer = match role_result {
773            Ok(Ok(v)) => v,
774            Ok(Err(e)) => {
775                self.peer_negotiation.lock().await.remove(target);
776                return Err(e);
777            }
778            Err(_) => {
779                self.peer_negotiation.lock().await.remove(target);
780                return Err(RuntimeError::DeadlineExceeded {
781                    message: "Role negotiation timeout".to_string(),
782                    timeout_ms: 5000,
783                });
784            }
785        };
786        tracing::debug!(
787            "Role negotiation decided we are {:?} for {}",
788            if is_offerer { "offerer" } else { "answerer" },
789            target.serial_number
790        );
791        if !is_offerer {
792            let (tx, rx) = oneshot::channel();
793            self.peer_negotiation
794                .lock()
795                .await
796                .entry(target.clone())
797                .or_default()
798                .ready_tx = Some(tx);
799            return Ok(rx);
800        }
801
802        self.start_offer_connection(target, true).await
803    }
804
805    /// Create and send an offer (offerer path). If `skip_negotiation` is true, assumes角色已确定。
806    /// This method includes retry logic for initial connection failures.
807    #[cfg_attr(
808        feature = "opentelemetry",
809        tracing::instrument(skip_all, fields(target_id = ?target.to_string_repr()))
810    )]
811    async fn start_offer_connection(
812        self: &Arc<Self>,
813        target: &ActrId,
814        skip_negotiation: bool,
815    ) -> RuntimeResult<oneshot::Receiver<()>> {
816        if !skip_negotiation {
817            let role_result =
818                tokio::time::timeout(Duration::from_secs(15), self.negotiate_role(target)).await;
819
820            let role_result = match role_result {
821                Ok(Ok(v)) => v,
822                Ok(Err(e)) => {
823                    self.peer_negotiation.lock().await.remove(target);
824                    return Err(e);
825                }
826                Err(_) => {
827                    self.peer_negotiation.lock().await.remove(target);
828                    return Err(RuntimeError::DeadlineExceeded {
829                        message: "Role negotiation timeout".to_string(),
830                        timeout_ms: 5000,
831                    });
832                }
833            };
834
835            if !role_result {
836                tracing::info!(
837                    "🎭 Role negotiation decided we are answerer for {}, waiting for offer",
838                    target.serial_number
839                );
840                let (tx, rx) = oneshot::channel();
841                self.peer_negotiation
842                    .lock()
843                    .await
844                    .entry(target.clone())
845                    .or_default()
846                    .ready_tx = Some(tx);
847                return Ok(rx);
848            }
849        }
850
851        // Single connection attempt (no retry)
852        tracing::info!("🔄 Starting connection to serial={}", target.serial_number);
853
854        match self.do_single_offer_connection(target).await {
855            Ok((ready_rx, webrtc_conn)) => {
856                // Wait for connection to be ready with timeout
857                match tokio::time::timeout(INITIAL_CONNECTION_TIMEOUT, ready_rx).await {
858                    Ok(Ok(())) => {
859                        tracing::info!(
860                            "✅ Connection established to serial={}",
861                            target.serial_number
862                        );
863                        // Return a new channel that's already signaled
864                        let (tx, rx) = oneshot::channel();
865                        let _ = tx.send(());
866                        return Ok(rx);
867                    }
868                    Ok(Err(_)) => {
869                        tracing::warn!(
870                            "⚠️ Connection failed (channel closed) for serial={}",
871                            target.serial_number
872                        );
873                        // Cleanup failed connection attempt
874                        self.cleanup_failed_connection(target, webrtc_conn).await;
875                        return Err(RuntimeError::Other(anyhow::anyhow!(
876                            "Connection ready channel closed"
877                        )));
878                    }
879                    Err(_) => {
880                        tracing::warn!(
881                            "⚠️ Connection timed out for serial={}",
882                            target.serial_number
883                        );
884                        // Cleanup failed connection attempt
885                        self.cleanup_failed_connection(target, webrtc_conn).await;
886                        return Err(RuntimeError::DeadlineExceeded {
887                            message: "Initial connection timeout".to_string(),
888                            timeout_ms: INITIAL_CONNECTION_TIMEOUT.as_millis() as u64,
889                        });
890                    }
891                }
892            }
893            Err(e) => {
894                tracing::warn!(
895                    "⚠️ Connection failed for serial={}: {}",
896                    target.serial_number,
897                    e
898                );
899                return Err(e);
900            }
901        }
902    }
903
904    /// Cleanup a failed connection attempt
905    ///
906    /// NOTE: Releases the write lock BEFORE calling close() to avoid blocking
907    /// other operations on `peers` during potentially slow close operations.
908    async fn cleanup_failed_connection(&self, target: &ActrId, webrtc_conn: WebRtcConnection) {
909        // Remove from peers map first, then close outside the lock
910        let state_to_close = {
911            let mut peers = self.peers.write().await;
912            peers.remove(target)
913        }; // Lock released here
914
915        if let Some(state) = state_to_close {
916            if let Err(e) = state.peer_connection.close().await {
917                tracing::warn!(
918                    "⚠️ Failed to close peer_connection during cleanup for {}: {}",
919                    target.serial_number,
920                    e
921                );
922            }
923        }
924
925        // Close WebRtcConnection
926        if let Err(e) = webrtc_conn.close().await {
927            tracing::warn!(
928                "⚠️ Failed to close WebRtcConnection during cleanup for {}: {}",
929                target.serial_number,
930                e
931            );
932        }
933
934        // Clear pending candidates
935        {
936            let mut pending = self.pending_candidates.write().await;
937            pending.remove(target);
938        }
939
940        tracing::debug!(
941            "🧹 Cleaned up failed connection attempt for serial={}",
942            target.serial_number
943        );
944    }
945
946    /// Cleanup a cancelled connection attempt (simpler version without WebRtcConnection)
947    ///
948    /// Used when connection creation is cancelled before completion.
949    ///
950    /// IMPORTANT: This method must release all locks before calling close() methods
951    /// to avoid deadlock, since close() may trigger events that call this method again.
952    async fn cleanup_cancelled_connection(&self, target: &ActrId) {
953        tracing::debug!(
954            "🧹 Starting cleanup for cancelled connection serial={}",
955            target.serial_number
956        );
957
958        // 1. Remove from peers map FIRST, release lock, THEN close
959        //    This avoids deadlock: close() sends events that may trigger this method again
960        let state_to_close = {
961            let mut peers = self.peers.write().await;
962            peers.remove(target)
963        }; // Lock released here
964
965        // Now close outside the lock (close() may send ConnectionClosed event)
966        if let Some(state) = state_to_close {
967            if let Err(e) = state.peer_connection.close().await {
968                tracing::warn!(
969                    "⚠️ Failed to close peer_connection during cancel cleanup for {}: {}",
970                    target.serial_number,
971                    e
972                );
973            }
974            if let Err(e) = state.webrtc_conn.close().await {
975                tracing::warn!(
976                    "⚠️ Failed to close webrtc_conn during cancel cleanup for {}: {}",
977                    target.serial_number,
978                    e
979                );
980            }
981        }
982
983        // 2. Clear pending candidates
984        self.pending_candidates.write().await.remove(target);
985
986        // 3. Clear negotiation state (role negotiation only, no restart_handle)
987        if self.peer_negotiation.lock().await.remove(target).is_some() {
988            tracing::debug!(
989                "🧹 Clearing negotiation state for serial={}",
990                target.serial_number
991            );
992        }
993
994        // 4. Cancel in-flight restart task if any (from peers lock)
995        // Note: We need to abort the task before removing the peer state
996        if let Some(peer_state) = self.peers.read().await.get(target) {
997            if let Some(ref handle) = peer_state.restart_task_handle {
998                handle.abort();
999                tracing::debug!(
1000                    "🧹 Aborted restart task for serial={}",
1001                    target.serial_number
1002                );
1003            }
1004        }
1005
1006        tracing::debug!(
1007            "🧹 Cleaned up cancelled connection for serial={}",
1008            target.serial_number
1009        );
1010    }
1011
1012    /// Perform a single offer connection attempt (without retry logic)
1013    async fn do_single_offer_connection(
1014        self: &Arc<Self>,
1015        target: &ActrId,
1016    ) -> RuntimeResult<(oneshot::Receiver<()>, WebRtcConnection)> {
1017        let peer_connection = self.negotiator.create_peer_connection().await?;
1018        let peer_connection_arc = Arc::new(peer_connection);
1019
1020        // 2. Create WebRtcConnection (shares Arc<RTCPeerConnection>) and
1021        //    install state-change handler with ICE-restart wiring.
1022        let webrtc_conn = WebRtcConnection::new(
1023            target.clone(),
1024            Arc::clone(&peer_connection_arc),
1025            self.event_broadcaster.sender(),
1026        );
1027        self.install_restart_handler(
1028            webrtc_conn.clone(),
1029            Arc::clone(&peer_connection_arc),
1030            target.clone(),
1031        );
1032
1033        // 2.5. CRITICAL: Insert peer state early as placeholder to prevent race conditions
1034        // Create ready channel now, will be populated in step 8
1035        let (ready_tx, ready_rx) = oneshot::channel();
1036        {
1037            let mut peers = self.peers.write().await;
1038            peers.insert(
1039                target.clone(),
1040                PeerState {
1041                    peer_connection: peer_connection_arc.clone(),
1042                    webrtc_conn: webrtc_conn.clone(),
1043                    ready_tx: Some(ready_tx),
1044                    is_offerer: true,
1045                    ice_restart_inflight: false,
1046                    ice_restart_attempts: 0,
1047                    restart_task_handle: None,
1048                    last_state_change: std::time::Instant::now(),
1049                    current_state: RTCPeerConnectionState::New,
1050                },
1051            );
1052            tracing::debug!(
1053                "🔒 Inserted placeholder peer state for {} (offerer)",
1054                target.to_string_repr()
1055            );
1056        } // Release lock immediately
1057
1058        // 3. Pre-create negotiated DataChannel for Reliable to trigger ICE gathering
1059        let _reliable_lane = webrtc_conn
1060            .get_lane(actr_protocol::PayloadType::RpcReliable)
1061            .await?;
1062        tracing::debug!("Pre-created Reliable DataChannel for ICE gathering");
1063
1064        // 3.5. Pre-create media tracks for sending (MUST be done before creating Offer)
1065        let _video_track = webrtc_conn
1066            .add_media_track("video-track-1".to_string(), "VP8", "video")
1067            .await?;
1068        tracing::debug!("Pre-created video MediaTrack: video-track-1");
1069
1070        // 4. Register on_track callback for receiving MediaTrack (WebRTC native media)
1071        let media_registry = Arc::clone(&self.media_frame_registry);
1072        let sender_id = target.clone();
1073        peer_connection_arc.on_track(Box::new(move |track, _receiver, _transceiver| {
1074            let media_registry = Arc::clone(&media_registry);
1075            let sender_id = sender_id.clone();
1076            Box::pin(async move {
1077                let track_id = track.id();
1078                tracing::info!(
1079                    "📹 Received MediaTrack: track_id={}, sender={}",
1080                    track_id,
1081                    sender_id.to_string_repr()
1082                );
1083
1084                tokio::spawn(async move {
1085                    loop {
1086                        match track.read_rtp().await {
1087                            Ok((rtp_packet, _attributes)) => {
1088                                let payload_data = rtp_packet.payload.clone();
1089                                let timestamp = rtp_packet.header.timestamp;
1090                                let codec = "unknown".to_string();
1091                                let sample = actr_framework::MediaSample {
1092                                    data: payload_data,
1093                                    timestamp,
1094                                    codec,
1095                                    media_type: actr_framework::MediaType::Video,
1096                                };
1097                                media_registry
1098                                    .dispatch(&track_id, sample, sender_id.clone())
1099                                    .await;
1100                            }
1101                            Err(e) => {
1102                                tracing::error!(
1103                                    "❌ Failed to read RTP from track {}: {}",
1104                                    track_id,
1105                                    e
1106                                );
1107                                break;
1108                            }
1109                        }
1110                    }
1111                    tracing::info!("🛑 MediaTrack reader task exited for track_id={}", track_id);
1112                });
1113            })
1114        }));
1115
1116        // 5. Set ICE candidate callback (local ICE candidate collection)
1117        let coordinator = Arc::downgrade(self);
1118        let target_id = target.clone();
1119        #[cfg(feature = "opentelemetry")]
1120        let root_context_map = self.root_context_map.clone();
1121        peer_connection_arc.on_ice_candidate(Box::new(
1122            move |candidate: Option<RTCIceCandidate>| {
1123                let coordinator = coordinator.clone();
1124                let target_id = target_id.clone();
1125                #[cfg(feature = "opentelemetry")]
1126                let root_context_map = root_context_map.clone();
1127                Box::pin(async move {
1128                    if let Some(cand) = candidate {
1129                        if let Some(coord) = coordinator.upgrade() {
1130                            let candidate_json = match cand.to_json() {
1131                                Ok(json) => json.candidate,
1132                                Err(e) => {
1133                                    tracing::error!("❌ ICE Candidate serialization failed: {}", e);
1134                                    return;
1135                                }
1136                            };
1137
1138                            let ice_candidate = actr_protocol::IceCandidate {
1139                                candidate: candidate_json,
1140                                sdp_mid: None,
1141                                sdp_mline_index: None,
1142                                username_fragment: None,
1143                            };
1144
1145                            let payload = actr_relay::Payload::IceCandidate(ice_candidate);
1146
1147                            // Get root context at callback execution time (not at setup time)
1148                            #[cfg(feature = "opentelemetry")]
1149                            let span = {
1150                                let span = tracing::info_span!(
1151                                    "send_ice_candidate",
1152                                    target_id = %target_id.to_string_repr()
1153                                );
1154                                if let Some(ctx) =
1155                                    root_context_map.read().await.get(&target_id).cloned()
1156                                {
1157                                    span.set_parent(ctx);
1158                                } else {
1159                                    tracing::warn!(
1160                                        "⚠️ No root context found for target_id={}",
1161                                        target_id.to_string_repr()
1162                                    );
1163                                }
1164                                span
1165                            };
1166                            let send_actr_relay_fut = coord.send_actr_relay(&target_id, payload);
1167                            #[cfg(feature = "opentelemetry")]
1168                            let send_actr_relay_fut = send_actr_relay_fut.instrument(span);
1169                            if let Err(e) = send_actr_relay_fut.await {
1170                                tracing::error!("❌ Failed to send ICE Candidate: {}", e);
1171                            } else {
1172                                tracing::debug!("✅ Sent ICE Candidate");
1173                            }
1174                        }
1175                    } else {
1176                        tracing::debug!("❌ ICE Candidate is None");
1177                    }
1178                })
1179            },
1180        ));
1181
1182        // 6. Create Offer
1183        let offer_sdp = self.negotiator.create_offer(&peer_connection_arc).await?;
1184
1185        // 8. Send Offer via signaling server
1186        let session_desc = actr_protocol::SessionDescription {
1187            r#type: SdpType::Offer as i32,
1188            sdp: offer_sdp,
1189        };
1190        let payload = actr_relay::Payload::SessionDescription(session_desc);
1191        self.send_actr_relay(target, payload).await?;
1192
1193        tracing::info!("✅ Sent Offer to {}", target.to_string_repr());
1194
1195        // 10. Start receive loop (receive and aggregate messages from this peer)
1196        self.start_peer_receive_loop(target.clone(), webrtc_conn.clone())
1197            .await;
1198
1199        Ok((ready_rx, webrtc_conn))
1200    }
1201
1202    /// Handle received Offer (passive side)
1203    ///
1204    /// Called when receiving a connection request from another peer.
1205    /// Supports both initial negotiation and renegotiation.
1206    #[cfg_attr(
1207        feature = "opentelemetry",
1208        tracing::instrument(level = "info", skip_all, fields(remote_id = %from.to_string_repr()))
1209    )]
1210    async fn handle_offer(self: &Arc<Self>, from: &ActrId, offer_sdp: String) -> RuntimeResult<()> {
1211        // ========== PrepareForIncomingOffer: Clean up existing connection if any ==========
1212        let existing_peer = {
1213            let peers = self.peers.read().await;
1214            peers.contains_key(from)
1215        };
1216
1217        if existing_peer {
1218            tracing::info!(
1219                "🔄 Existing connection found for serial={}, preparing for new Offer",
1220                from.serial_number
1221            );
1222
1223            // Clean up old connection using unified cleanup method
1224            self.cleanup_cancelled_connection(from).await;
1225        }
1226        // ========== PrepareForIncomingOffer END ==========
1227
1228        tracing::info!("📥 Handling Offer from serial={}", from.serial_number);
1229
1230        // 1. Create RTCPeerConnection
1231        let peer_connection = self.negotiator.create_peer_connection().await?;
1232        let peer_connection_arc = Arc::new(peer_connection);
1233
1234        // 2. Create WebRtcConnection (shares Arc<RTCPeerConnection>)
1235        let webrtc_conn = WebRtcConnection::new(
1236            from.clone(),
1237            Arc::clone(&peer_connection_arc),
1238            self.event_broadcaster.sender(),
1239        );
1240
1241        // CRITICAL: Insert peer state immediately as a placeholder to prevent race conditions.
1242        // This prevents ensure_connection from creating a duplicate connection while we're
1243        // still setting up callbacks and negotiating the connection.
1244        // The state will be updated later after Answer is sent (step 6).
1245        {
1246            let mut peers = self.peers.write().await;
1247            peers.insert(
1248                from.clone(),
1249                PeerState {
1250                    peer_connection: peer_connection_arc.clone(),
1251                    webrtc_conn: webrtc_conn.clone(),
1252                    ready_tx: None,
1253                    is_offerer: false,
1254                    ice_restart_inflight: false,
1255                    ice_restart_attempts: 0,
1256                    restart_task_handle: None,
1257                    last_state_change: std::time::Instant::now(),
1258                    current_state: RTCPeerConnectionState::New,
1259                },
1260            );
1261            tracing::debug!(
1262                "🔒 Inserted placeholder peer state for {} (answerer)",
1263                from.to_string_repr()
1264            );
1265        } // Release lock immediately
1266
1267        // 3. Register state change handler (combines cleanup + ready notification)
1268        // NOTE: on_peer_connection_state_change can only have ONE callback, so we combine:
1269        //   - WebRtcConnection.handle_state_change() for cleanup on terminal states
1270        //   - Ready notification when Connected (answerer side)
1271        let webrtc_conn_for_state = webrtc_conn.clone();
1272        let coord_weak_for_state = Arc::downgrade(self);
1273        let from_id_for_state = from.clone();
1274        peer_connection_arc.on_peer_connection_state_change(Box::new(
1275            move |state: RTCPeerConnectionState| {
1276                let webrtc_conn = webrtc_conn_for_state.clone();
1277                let coord_weak = coord_weak_for_state.clone();
1278                let peer_id = from_id_for_state.clone();
1279                Box::pin(async move {
1280                    // First: run WebRtcConnection's state change handler (cleanup logic)
1281                    webrtc_conn.handle_state_change(state).await;
1282
1283                    // Update state tracking for health check
1284                    if let Some(coord) = coord_weak.upgrade() {
1285                        let mut peers = coord.peers.write().await;
1286                        if let Some(peer_state) = peers.get_mut(&peer_id) {
1287                            peer_state.current_state = state;
1288                            peer_state.last_state_change = std::time::Instant::now();
1289                        }
1290                        drop(peers); // Release lock
1291                    }
1292                })
1293            },
1294        ));
1295
1296        // 4. Register on_data_channel handler to reuse negotiated channels created by the offerer
1297        let conn_for_data_channel = webrtc_conn.clone();
1298
1299        let from_id_for_data_channel = from.clone();
1300        let coord_weak_for_state = Arc::downgrade(self);
1301        let message_tx = self.message_tx.clone();
1302        peer_connection_arc.on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
1303            let conn = conn_for_data_channel.clone();
1304            let coord_weak = coord_weak_for_state.clone();
1305            let peer_id = from_id_for_data_channel.clone();
1306            let message_tx = message_tx.clone();
1307            Box::pin(async move {
1308                let channel_id = dc.id();
1309                let label = dc.label();
1310                let dc_for_registration = Arc::clone(&dc);
1311
1312                let payload_type = PayloadType::from_str_name(&label);
1313
1314                if let Some(coord) = coord_weak.upgrade() {
1315                    let ready_tx = {
1316                        let mut neg = coord.peer_negotiation.lock().await;
1317                        neg.get_mut(&peer_id).and_then(|s| s.ready_tx.take())
1318                    };
1319                    if let Some(tx) = ready_tx {
1320                        tracing::info!(
1321                            "✅ [Answerer] Connection ready, sending notification for {}",
1322                            peer_id.serial_number
1323                        );
1324                        let _ = tx.send(());
1325                    }
1326                }
1327
1328                match payload_type {
1329                    Some(pt) => {
1330                        if let Err(e) = conn
1331                            .register_received_data_channel(dc_for_registration, pt, message_tx)
1332                            .await
1333                        {
1334                            tracing::warn!(
1335                                "❌ Failed to register received DataChannel label={} id={}: {}",
1336                                label,
1337                                channel_id,
1338                                e
1339                            );
1340                        } else {
1341                            tracing::debug!(
1342                                "📨 Registered DataChannel from offerer label={} id={}",
1343                                label,
1344                                channel_id
1345                            );
1346                        }
1347                    }
1348                    None => {
1349                        tracing::warn!(
1350                            "❓ Ignoring DataChannel with unmapped id={} label={}",
1351                            channel_id,
1352                            label
1353                        );
1354                    }
1355                }
1356            })
1357        }));
1358
1359        // 3.5. Pre-create media tracks for sending (MUST be done before creating Answer)
1360        // Create a default video track for demo purposes
1361        let _video_track = webrtc_conn
1362            .add_media_track("video-track-1".to_string(), "VP8", "video")
1363            .await?;
1364        tracing::debug!("Pre-created video MediaTrack: video-track-1 (answerer)");
1365
1366        // 4. Register on_track callback for receiving MediaTrack (WebRTC native media)
1367        let media_registry = Arc::clone(&self.media_frame_registry);
1368        let sender_id = from.clone();
1369        peer_connection_arc.on_track(Box::new(move |track, _receiver, _transceiver| {
1370            let media_registry = Arc::clone(&media_registry);
1371            let sender_id = sender_id.clone();
1372            Box::pin(async move {
1373                let track_id = track.id();
1374                tracing::info!(
1375                    "📹 Received MediaTrack: track_id={}, sender={}",
1376                    track_id,
1377                    sender_id.to_string_repr()
1378                );
1379
1380                // Spawn task to read RTP packets from track
1381                tokio::spawn(async move {
1382                    loop {
1383                        // Read RTP packet from track
1384                        match track.read_rtp().await {
1385                            Ok((rtp_packet, _attributes)) => {
1386                                // Extract payload and timestamp
1387                                let payload_data = rtp_packet.payload.clone();
1388                                let timestamp = rtp_packet.header.timestamp;
1389
1390                                // TODO: Extract codec from track (for now use placeholder)
1391                                let codec = "unknown".to_string();
1392
1393                                // Create MediaSample
1394                                let sample = actr_framework::MediaSample {
1395                                    data: payload_data,
1396                                    timestamp,
1397                                    codec,
1398                                    media_type: actr_framework::MediaType::Video, // TODO: detect from track
1399                                };
1400
1401                                // Dispatch to registered callback
1402                                media_registry
1403                                    .dispatch(&track_id, sample, sender_id.clone())
1404                                    .await;
1405                            }
1406                            Err(e) => {
1407                                tracing::error!(
1408                                    "❌ Failed to read RTP from track {}: {}",
1409                                    track_id,
1410                                    e
1411                                );
1412                                break;
1413                            }
1414                        }
1415                    }
1416                    tracing::info!("🛑 MediaTrack reader task exited for track_id={}", track_id);
1417                });
1418            })
1419        }));
1420
1421        // 5. Set ICE candidate callback (local ICE candidate collection)
1422        let coordinator = Arc::downgrade(self);
1423        let target_id = from.clone();
1424        #[cfg(feature = "opentelemetry")]
1425        let root_context_map = self.root_context_map.clone();
1426        peer_connection_arc.on_ice_candidate(Box::new(
1427            move |candidate: Option<RTCIceCandidate>| {
1428                let coordinator = coordinator.clone();
1429                let target_id = target_id.clone();
1430                #[cfg(feature = "opentelemetry")]
1431                let root_context_map = root_context_map.clone();
1432                Box::pin(async move {
1433                    if let Some(cand) = candidate {
1434                        if let Some(coord) = coordinator.upgrade() {
1435                            // Convert RTCIceCandidate to JSON string (webrtc crate's standard method)
1436                            let candidate_json = match cand.to_json() {
1437                                Ok(json) => json.candidate,
1438                                Err(e) => {
1439                                    tracing::error!("❌ ICE Candidate serialization failed: {}", e);
1440                                    return;
1441                                }
1442                            };
1443
1444                            let ice_candidate = actr_protocol::IceCandidate {
1445                                candidate: candidate_json,
1446                                sdp_mid: None,
1447                                sdp_mline_index: None,
1448                                username_fragment: None,
1449                            };
1450
1451                            let payload = actr_relay::Payload::IceCandidate(ice_candidate);
1452
1453                            // Get root context at callback execution time (not at setup time)
1454                            #[cfg(feature = "opentelemetry")]
1455                            let span = {
1456                                let span = tracing::info_span!(
1457                                    "send_ice_candidate",
1458                                    target_id = %target_id.to_string_repr()
1459                                );
1460                                if let Some(ctx) =
1461                                    root_context_map.read().await.get(&target_id).cloned()
1462                                {
1463                                    span.set_parent(ctx);
1464                                } else {
1465                                    tracing::warn!(
1466                                        "⚠️ No root context found for target_id={}",
1467                                        target_id.to_string_repr()
1468                                    );
1469                                }
1470                                span
1471                            };
1472                            let send_actr_relay_fut = coord.send_actr_relay(&target_id, payload);
1473                            #[cfg(feature = "opentelemetry")]
1474                            let send_actr_relay_fut = send_actr_relay_fut.instrument(span);
1475                            if let Err(e) = send_actr_relay_fut.await {
1476                                tracing::error!("❌ Failed to send ICE Candidate: {}", e);
1477                            }
1478                            tracing::debug!(
1479                                "🔄 Handle offer Sent ICE Candidate to serial={}",
1480                                target_id.serial_number
1481                            );
1482                        }
1483                    }
1484                })
1485            },
1486        ));
1487
1488        // 5. Create Answer
1489        let answer_sdp = self
1490            .negotiator
1491            .create_answer(&peer_connection_arc, offer_sdp)
1492            .await?;
1493
1494        // 7. Send Answer via signaling server
1495        let session_desc = actr_protocol::SessionDescription {
1496            r#type: SdpType::Answer as i32,
1497            sdp: answer_sdp,
1498        };
1499        let payload = actr_relay::Payload::SessionDescription(session_desc);
1500        self.send_actr_relay(from, payload).await?;
1501
1502        tracing::info!("✅ Sent Answer to {}", from.to_string_repr());
1503
1504        // 8. Flush any buffered ICE candidates (remote description is now set)
1505        self.flush_pending_candidates(from, &peer_connection_arc)
1506            .await?;
1507
1508        // Note: ready notification is sent in on_data_channel callback
1509        // when DataChannel is actually registered (see above)
1510
1511        Ok(())
1512    }
1513
1514    /// Handle received Answer (initiator side)
1515    ///
1516    /// Supports both initial negotiation and renegotiation answers.
1517    #[cfg_attr(
1518        feature = "opentelemetry",
1519        tracing::instrument(
1520            level = "info",
1521            skip_all,
1522            fields(
1523                remote.id = %from.to_string_repr(),
1524                answer_len = answer_sdp.len()
1525            )
1526        )
1527    )]
1528    async fn handle_answer(
1529        self: &Arc<Self>,
1530        from: &ActrId,
1531        answer_sdp: String,
1532    ) -> RuntimeResult<()> {
1533        // Get corresponding PeerConnection and ready_tx
1534        let (peer_connection, ready_tx, is_renegotiation) = {
1535            let mut peers = self.peers.write().await;
1536            tracing::info!(
1537                "🔍 [LOOKUP] Searching for: id={}, total peers={}",
1538                from.to_string_repr(),
1539                peers.len()
1540            );
1541            for (k, _) in peers.iter() {
1542                tracing::info!("   📌 [LOOKUP] Stored: id={}", k.to_string_repr());
1543            }
1544            let state = peers.get_mut(from).ok_or_else(|| {
1545                RuntimeError::Other(anyhow::anyhow!("Peer not found: {}", from.to_string_repr()))
1546            })?;
1547
1548            let pc = state.peer_connection.clone();
1549            let tx = state.ready_tx.take();
1550            let is_reneg = tx.is_none(); // If ready_tx already taken, this is renegotiation
1551            (pc, tx, is_reneg)
1552        };
1553
1554        if is_renegotiation {
1555            tracing::info!(
1556                "🔄 Handling renegotiation Answer from {}",
1557                from.to_string_repr()
1558            );
1559        } else {
1560            tracing::info!("📥 Handling initial Answer from {}", from.to_string_repr());
1561        }
1562
1563        // Handle Answer (set remote SDP)
1564        self.negotiator
1565            .handle_answer(&peer_connection, answer_sdp)
1566            .await?;
1567
1568        // Flush any buffered ICE candidates (remote description is now set)
1569        self.flush_pending_candidates(from, &peer_connection)
1570            .await?;
1571
1572        tracing::info!(
1573            "✅ WebRTC connection negotiation completed: {}",
1574            from.to_string_repr()
1575        );
1576
1577        // Wait for PeerConnection to actually connect (max 5 seconds)
1578        let pc_clone = peer_connection.clone();
1579        let peers = Arc::clone(&self.peers);
1580        let from_id = from.clone();
1581        tokio::spawn(async move {
1582            let start = tokio::time::Instant::now();
1583            loop {
1584                let state = pc_clone.connection_state();
1585                if state == webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState::Connected {
1586                    tracing::info!("✅ PeerConnection fully connected");
1587                    // Mark ICE restart attempt complete
1588                    let mut peers_guard = peers.write().await;
1589                    if let Some(s) = peers_guard.get_mut(&from_id) {
1590                        s.ice_restart_inflight = false;
1591                        s.ice_restart_attempts = 0;
1592                    }
1593                    break;
1594                }
1595                if state == webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState::Failed {
1596                    tracing::error!("❌ PeerConnection failed");
1597                    return;
1598                }
1599                if start.elapsed() > std::time::Duration::from_secs(5) {
1600                    tracing::warn!("⚠️ PeerConnection connection timeout (5s)");
1601                    break;
1602                }
1603                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1604            }
1605
1606            // Notify initiate_connection that connection is ready
1607            if let Some(tx) = ready_tx {
1608                let _ = tx.send(());
1609            }
1610        });
1611
1612        Ok(())
1613    }
1614
1615    /// Flush buffered ICE candidates for a peer
1616    ///
1617    /// Called after remote description is set, to add any candidates that arrived early
1618    async fn flush_pending_candidates(
1619        &self,
1620        peer_id: &ActrId,
1621        peer_connection: &RTCPeerConnection,
1622    ) -> RuntimeResult<()> {
1623        // Extract buffered candidates for this peer
1624        let candidates = {
1625            let mut pending = self.pending_candidates.write().await;
1626            pending.remove(peer_id)
1627        };
1628
1629        if let Some(candidates) = candidates {
1630            tracing::debug!(
1631                "🔄 Flushing {} buffered ICE candidates for {:?}",
1632                candidates.len(),
1633                peer_id
1634            );
1635
1636            for candidate in candidates {
1637                if let Err(e) = self
1638                    .negotiator
1639                    .add_ice_candidate(peer_connection, candidate)
1640                    .await
1641                {
1642                    tracing::warn!("⚠️ Failed to add buffered ICE candidate: {}", e);
1643                }
1644            }
1645        }
1646
1647        Ok(())
1648    }
1649
1650    /// Handle received ICE Candidate
1651    #[cfg_attr(
1652        feature = "opentelemetry",
1653        tracing::instrument(
1654            level = "trace",
1655            skip_all,
1656            fields(
1657                remote.id = %from.to_string_repr(),
1658                candidate_len = candidate.len()
1659            )
1660        )
1661    )]
1662    async fn handle_ice_candidate(
1663        self: &Arc<Self>,
1664        from: &ActrId,
1665        candidate: String,
1666    ) -> RuntimeResult<()> {
1667        tracing::trace!("📥 Received ICE Candidate from {}", from.to_string_repr());
1668
1669        // DEBUG: Temporarily disable candidate filtering for local testing
1670        // TODO: Re-enable proper filtering for production
1671        // if !is_ipv4_candidate_allowed(&candidate) {
1672        //     tracing::debug!("🚫 Ignoring ICE candidate from {:?}: {}", from, candidate);
1673        //     return Ok(());
1674        // }
1675
1676        // Try to get peer and check if remote description is set
1677        let peer_opt = {
1678            let peers = self.peers.read().await;
1679            peers.get(from).map(|state| state.peer_connection.clone())
1680        };
1681
1682        match peer_opt {
1683            Some(peer_connection) => {
1684                // Check if remote description is set
1685                if peer_connection.remote_description().await.is_some() {
1686                    // Can add candidate immediately
1687                    self.negotiator
1688                        .add_ice_candidate(&peer_connection, candidate)
1689                        .await?;
1690                    tracing::trace!("✅ Added ICE Candidate from {}", from.to_string_repr());
1691                } else {
1692                    // Buffer for later (remote description not yet set)
1693                    self.pending_candidates
1694                        .write()
1695                        .await
1696                        .entry(from.clone())
1697                        .or_insert_with(Vec::new)
1698                        .push(candidate);
1699                    tracing::debug!(
1700                        "🔖 Buffered ICE candidate from {:?} (remote description not yet set)",
1701                        from
1702                    );
1703                }
1704            }
1705            None => {
1706                // Buffer for when peer is created
1707                self.pending_candidates
1708                    .write()
1709                    .await
1710                    .entry(from.clone())
1711                    .or_insert_with(Vec::new)
1712                    .push(candidate);
1713                tracing::debug!(
1714                    "🔖 Buffered ICE candidate from {:?} (peer not yet created)",
1715                    from
1716                );
1717            }
1718        }
1719
1720        Ok(())
1721    }
1722
1723    /// Start peer receive loop
1724    ///
1725    /// Starts a background task for each peer to receive messages from WebRtcConnection and aggregate to a unified message_tx
1726    ///
1727    /// IMPORTANT: We need to listen to ALL PayloadTypes, not just RpcReliable:
1728    /// - RpcReliable, RpcSignal: for RPC messages
1729    /// - StreamReliable, StreamLatencyFirst: for DataStream messages
1730    async fn start_peer_receive_loop(&self, peer_id: ActrId, webrtc_conn: WebRtcConnection) {
1731        let message_tx = self.message_tx.clone();
1732
1733        // Listen to all relevant PayloadTypes
1734        let payload_types = vec![
1735            PayloadType::RpcReliable,
1736            PayloadType::RpcSignal,
1737            PayloadType::StreamReliable,
1738            PayloadType::StreamLatencyFirst,
1739        ];
1740
1741        for payload_type in payload_types {
1742            let message_tx_clone = message_tx.clone();
1743            let peer_id_clone = peer_id.clone();
1744            let webrtc_conn_clone = webrtc_conn.clone();
1745
1746            tokio::spawn(async move {
1747                tracing::debug!(
1748                    "📡 Starting receive loop for peer {:?}, PayloadType: {:?}",
1749                    peer_id_clone,
1750                    payload_type
1751                );
1752
1753                // Get Lane for this PayloadType
1754                let lane = match webrtc_conn_clone.get_lane(payload_type).await {
1755                    Ok(l) => l,
1756                    Err(e) => {
1757                        tracing::error!(
1758                            "❌ Failed to get Lane for {:?}, PayloadType {:?}: {}",
1759                            peer_id_clone,
1760                            payload_type,
1761                            e
1762                        );
1763                        return;
1764                    }
1765                };
1766
1767                // Continuously receive messages
1768                loop {
1769                    match lane.recv().await {
1770                        Ok(data) => {
1771                            tracing::debug!(
1772                                "📨 Received message from {:?} (PayloadType: {:?}): {} bytes",
1773                                peer_id_clone,
1774                                payload_type,
1775                                data.len()
1776                            );
1777
1778                            // Serialize peer_id as bytes
1779                            let peer_id_bytes = peer_id_clone.encode_to_vec();
1780
1781                            // Send to aggregation channel (include PayloadType)
1782                            if let Err(e) =
1783                                message_tx_clone.send((peer_id_bytes, data, payload_type))
1784                            {
1785                                tracing::error!("❌ Message aggregation failed: {:?}", e);
1786                                break;
1787                            }
1788                        }
1789                        Err(e) => {
1790                            tracing::warn!(
1791                                "❌ Peer {:?} message receive failed (PayloadType: {:?}): {}",
1792                                peer_id_clone,
1793                                payload_type,
1794                                e
1795                            );
1796                            break;
1797                        }
1798                    }
1799                }
1800
1801                tracing::debug!(
1802                    "📡 Receive loop exited for peer {:?}, PayloadType: {:?}",
1803                    peer_id_clone,
1804                    payload_type
1805                );
1806            });
1807        }
1808    }
1809
1810    /// Send message to specified peer
1811    ///
1812    /// If connection doesn't exist, automatically initiates WebRTC connection and waits for it to be ready.
1813    /// Supports retry with exponential backoff on transient errors.
1814    #[cfg_attr(
1815        feature = "opentelemetry",
1816        tracing::instrument(skip_all, fields(target_id = ?target.to_string_repr()))
1817    )]
1818    pub(crate) async fn send_message(
1819        self: &Arc<Self>,
1820        target: &ActrId,
1821        data: &[u8],
1822    ) -> RuntimeResult<()> {
1823        const MAX_RETRIES: u32 = 3;
1824        const OVERALL_TIMEOUT: Duration = Duration::from_secs(30);
1825
1826        tracing::debug!("📤 Sending message to {:?}: {} bytes", target, data.len());
1827
1828        // Wrap entire operation with overall timeout
1829        let result = tokio::time::timeout(
1830            OVERALL_TIMEOUT,
1831            self.send_message_with_retry(target, data, MAX_RETRIES),
1832        )
1833        .await;
1834
1835        match result {
1836            Ok(inner_result) => inner_result,
1837            Err(_) => {
1838                tracing::error!(
1839                    "⏰ Overall timeout ({}s) exceeded for send_message to {}",
1840                    OVERALL_TIMEOUT.as_secs(),
1841                    target.to_string_repr()
1842                );
1843                self.cleanup_cancelled_connection(target).await;
1844                Err(RuntimeError::DeadlineExceeded {
1845                    message: format!(
1846                        "send_message overall timeout ({}s)",
1847                        OVERALL_TIMEOUT.as_secs()
1848                    ),
1849                    timeout_ms: OVERALL_TIMEOUT.as_millis() as u64,
1850                })
1851            }
1852        }
1853    }
1854
1855    /// Inner implementation of send_message with retry logic
1856    async fn send_message_with_retry(
1857        self: &Arc<Self>,
1858        target: &ActrId,
1859        data: &[u8],
1860        max_retries: u32,
1861    ) -> RuntimeResult<()> {
1862        let mut backoff = ExponentialBackoff::new(
1863            Duration::from_millis(1), // initial delay
1864            Duration::from_secs(10),  // max delay
1865            None,                     // no limit (we control manually)
1866        );
1867
1868        let mut last_error = None;
1869
1870        for attempt in 0..=max_retries {
1871            // Wait before retry (skip first attempt)
1872            if attempt > 0 {
1873                let delay = backoff.next().unwrap_or(Duration::from_secs(5));
1874                tracing::info!(
1875                    "🔄 Retrying send_message to {} (attempt {}/{}, delay {:?})",
1876                    target.to_string_repr(),
1877                    attempt + 1,
1878                    max_retries + 1,
1879                    delay
1880                );
1881                tokio::time::sleep(delay).await;
1882            }
1883
1884            match self.try_send_message_once(target, data).await {
1885                Ok(()) => return Ok(()),
1886                Err(e) => {
1887                    // Only retry on transient errors
1888                    let should_retry = matches!(
1889                        &e,
1890                        RuntimeError::DeadlineExceeded { .. } | RuntimeError::Other(_)
1891                    );
1892
1893                    if !should_retry {
1894                        return Err(e);
1895                    }
1896
1897                    tracing::warn!(
1898                        "⚠️ send_message attempt {}/{} failed: {}",
1899                        attempt + 1,
1900                        max_retries + 1,
1901                        e
1902                    );
1903                    last_error = Some(e);
1904
1905                    // Cleanup connection before retry (might be stale)
1906                    self.cleanup_cancelled_connection(target).await;
1907                }
1908            }
1909        }
1910
1911        // All retries exhausted
1912        Err(last_error.unwrap_or_else(|| {
1913            RuntimeError::Other(anyhow::anyhow!("send_message failed after all retries"))
1914        }))
1915    }
1916
1917    /// Single attempt to send a message
1918    async fn try_send_message_once(
1919        self: &Arc<Self>,
1920        target: &ActrId,
1921        data: &[u8],
1922    ) -> RuntimeResult<()> {
1923        // Check if connection exists or is being established
1924        let has_connection = loop {
1925            let state = {
1926                let peers = self.peers.read().await;
1927                peers
1928                    .get(target)
1929                    .map(|s| (s.current_state, s.last_state_change))
1930            };
1931
1932            match state {
1933                Some((
1934                    RTCPeerConnectionState::New | RTCPeerConnectionState::Connecting,
1935                    started,
1936                )) => {
1937                    // Connection is being established, check if it's still fresh
1938                    if started.elapsed() < INITIAL_CONNECTION_TIMEOUT {
1939                        // Wait a bit and check again
1940                        tracing::debug!(
1941                            "⏳ Connection to {} is being established, waiting...",
1942                            target.to_string_repr()
1943                        );
1944                        tokio::time::sleep(Duration::from_millis(100)).await;
1945                        continue;
1946                    } else {
1947                        // Connecting timeout, treat as not connected
1948                        tracing::warn!(
1949                            "⏰ Connection to {} timed out while connecting",
1950                            target.to_string_repr()
1951                        );
1952                        break false;
1953                    }
1954                }
1955                Some((RTCPeerConnectionState::Connected, _)) => {
1956                    // Connection exists and is ready
1957                    break true;
1958                }
1959                Some(_) => {
1960                    // Connection exists but in other state (Disconnected/Failed/Closed)
1961                    // Let initiate_connection handle it
1962                    break false;
1963                }
1964                None => {
1965                    // No connection exists
1966                    break false;
1967                }
1968            }
1969        };
1970
1971        #[cfg(feature = "opentelemetry")]
1972        let _ = self
1973            .root_context_map
1974            .write()
1975            .await
1976            .insert(target.clone(), tracing::Span::current().context());
1977
1978        // If connection doesn't exist, initiate connection
1979        if !has_connection {
1980            tracing::info!(
1981                "🔗 First send to {:?}, initiating role negotiation + WebRTC connection",
1982                target.serial_number
1983            );
1984
1985            let ready_rx = self.initiate_connection(target).await?;
1986            tracing::debug!(?ready_rx, "ready_rx");
1987
1988            // Wait for connection to be ready (10s timeout for single attempt)
1989            match tokio::time::timeout(Duration::from_secs(10), ready_rx).await {
1990                Ok(Ok(())) => {
1991                    tracing::info!("✅ WebRTC connection ready: {}", target.to_string_repr());
1992                }
1993                Ok(Err(_)) => {
1994                    return Err(RuntimeError::Other(anyhow::anyhow!(
1995                        "Connection establishment failed (channel closed)"
1996                    )));
1997                }
1998                Err(_) => {
1999                    return Err(RuntimeError::DeadlineExceeded {
2000                        message: "Connection establishment timeout".to_string(),
2001                        timeout_ms: 10000,
2002                    });
2003                }
2004            }
2005        }
2006
2007        // Get corresponding WebRtcConnection
2008        let webrtc_conn = {
2009            let peers = self.peers.read().await;
2010            peers
2011                .get(target)
2012                .map(|state| state.webrtc_conn.clone())
2013                .ok_or_else(|| {
2014                    RuntimeError::Other(anyhow::anyhow!("Peer connection not found: {target:?}"))
2015                })?
2016        };
2017
2018        // Get Reliable Lane
2019        let lane = webrtc_conn
2020            .get_lane(PayloadType::RpcReliable)
2021            .await
2022            .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to get Lane: {e}")))?;
2023
2024        // Send message (convert to Bytes)
2025        lane.send(Bytes::copy_from_slice(data))
2026            .await
2027            .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to send message: {e}")))?;
2028
2029        Ok(())
2030    }
2031
2032    /// Receive message (aggregated from all peers)
2033    /// Receive message with PayloadType information
2034    ///
2035    /// Returns: Option<(sender_id_bytes, message_data, payload_type)>
2036    pub async fn receive_message(&self) -> RuntimeResult<Option<(Vec<u8>, Bytes, PayloadType)>> {
2037        let mut rx = self.message_rx.lock().await;
2038        Ok(rx.recv().await)
2039    }
2040
2041    /// Create WebRTC connection (factory method)
2042    ///
2043    /// For ConnectionFactory, creates a WebRTC connection to the specified Dest.
2044    /// If connection already exists, returns it directly; otherwise initiates new connection and waits for it to be ready.
2045    /// Supports retry with exponential backoff on timeout or channel errors.
2046    /// The entire method has a 30-second overall timeout.
2047    ///
2048    /// # Arguments
2049    /// - `dest`: destination (must be Actor type)
2050    /// - `cancel_token`: optional cancellation token to terminate the operation
2051    ///
2052    /// # Returns
2053    /// - `Ok(WebRtcConnection)`: ready WebRTC connection
2054    /// - `Err`: WebRTC only supports Actor targets, connection cancelled, or connection establishment failed
2055    #[cfg_attr(
2056        feature = "opentelemetry",
2057        tracing::instrument(skip_all, fields(target_id = ?dest.as_actor_id().map(|id| id.to_string_repr())))
2058    )]
2059    pub async fn create_connection(
2060        self: &Arc<Self>,
2061        dest: &crate::transport::Dest,
2062        cancel_token: Option<CancellationToken>,
2063    ) -> RuntimeResult<WebRtcConnection> {
2064        // Overall timeout for the entire create_connection operation
2065        const OVERALL_TIMEOUT: Duration = Duration::from_secs(30);
2066
2067        // Extract target_id first (before timeout wrapper) for cleanup
2068        let target_id = dest.as_actor_id().ok_or_else(|| {
2069            RuntimeError::ConfigurationError(
2070                "WebRTC only supports Actor targets, not Shell".to_string(),
2071            )
2072        })?;
2073
2074        // Wrap the entire operation with overall timeout
2075        let result = tokio::time::timeout(
2076            OVERALL_TIMEOUT,
2077            self.create_connection_inner(dest, cancel_token.clone()),
2078        )
2079        .await;
2080
2081        match result {
2082            Ok(inner_result) => inner_result,
2083            Err(_) => {
2084                // Overall timeout exceeded
2085                tracing::error!(
2086                    "⏰ [Factory] Overall timeout ({}s) exceeded for connection to {}",
2087                    OVERALL_TIMEOUT.as_secs(),
2088                    target_id.to_string_repr()
2089                );
2090                self.cleanup_cancelled_connection(target_id).await;
2091                Err(RuntimeError::DeadlineExceeded {
2092                    message: format!(
2093                        "WebRTC connection creation overall timeout ({}s)",
2094                        OVERALL_TIMEOUT.as_secs()
2095                    ),
2096                    timeout_ms: OVERALL_TIMEOUT.as_millis() as u64,
2097                })
2098            }
2099        }
2100    }
2101
2102    /// Inner implementation of create_connection without overall timeout
2103    async fn create_connection_inner(
2104        self: &Arc<Self>,
2105        dest: &crate::transport::Dest,
2106        cancel_token: Option<CancellationToken>,
2107    ) -> RuntimeResult<WebRtcConnection> {
2108        // Check cancellation at entry
2109        if let Some(ref token) = cancel_token {
2110            if token.is_cancelled() {
2111                return Err(RuntimeError::Other(anyhow::anyhow!(
2112                    "Connection creation cancelled before starting"
2113                )));
2114            }
2115        }
2116
2117        // 1. Check if dest is Actor
2118        let target_id = dest.as_actor_id().ok_or_else(|| {
2119            RuntimeError::ConfigurationError(
2120                "WebRTC only supports Actor targets, not Shell".to_string(),
2121            )
2122        })?;
2123
2124        tracing::debug!(
2125            "🏭 [Factory] Creating WebRTC connection to {:?}",
2126            target_id.to_string_repr()
2127        );
2128
2129        // 2. Check if connection already exists
2130        {
2131            let peers = self.peers.read().await;
2132            if let Some(state) = peers.get(target_id) {
2133                tracing::debug!(
2134                    "♻️ [Factory] Reusing existing WebRTC connection: {:?}",
2135                    target_id.to_string_repr()
2136                );
2137                return Ok(state.webrtc_conn.clone());
2138            }
2139        }
2140
2141        // 3. Retry loop with exponential backoff (max 3 retries)
2142        const MAX_RETRIES: u32 = 3;
2143        let mut backoff = ExponentialBackoff::new(
2144            Duration::from_secs(5),  // initial delay
2145            Duration::from_secs(15), // max delay
2146            None,                    // no limit (we control manually)
2147        );
2148
2149        let mut last_error = None;
2150
2151        for attempt in 0..=MAX_RETRIES {
2152            // Check cancellation before each attempt
2153            if let Some(ref token) = cancel_token {
2154                if token.is_cancelled() {
2155                    return Err(RuntimeError::Other(anyhow::anyhow!(
2156                        "Connection creation cancelled"
2157                    )));
2158                }
2159            }
2160
2161            // Wait before retry (skip first attempt)
2162            if attempt > 0 {
2163                let delay = backoff.next().unwrap_or(Duration::from_secs(10));
2164                tracing::info!(
2165                    "🔄 [Factory] Retrying connection to {} (attempt {}/{}, delay {:?})",
2166                    target_id.to_string_repr(),
2167                    attempt + 1,
2168                    MAX_RETRIES + 1,
2169                    delay
2170                );
2171
2172                // Interruptible sleep with cancellation
2173                if let Some(ref token) = cancel_token {
2174                    tokio::select! {
2175                        biased;
2176                        _ = token.cancelled() => {
2177                            self.cleanup_cancelled_connection(target_id).await;
2178                            return Err(RuntimeError::Other(anyhow::anyhow!(
2179                                "Connection creation cancelled during retry wait"
2180                            )));
2181                        }
2182                        _ = tokio::time::sleep(delay) => {}
2183                    }
2184                } else {
2185                    tokio::time::sleep(delay).await;
2186                }
2187            } else {
2188                tracing::info!(
2189                    "🔨 [Factory] Initiating new WebRTC connection: {:?}",
2190                    target_id.to_string_repr()
2191                );
2192            }
2193
2194            // Attempt connection
2195            match self
2196                .try_create_connection_once(target_id, cancel_token.as_ref())
2197                .await
2198            {
2199                Ok(conn) => return Ok(conn),
2200                Err(e) => {
2201                    // Check if this is a cancellation error - don't retry
2202                    if let Some(ref token) = cancel_token {
2203                        if token.is_cancelled() {
2204                            return Err(e);
2205                        }
2206                    }
2207
2208                    // Only retry on timeout or transient errors
2209                    let should_retry = matches!(
2210                        &e,
2211                        RuntimeError::DeadlineExceeded { .. } | RuntimeError::Other(_)
2212                    );
2213
2214                    if !should_retry {
2215                        return Err(e);
2216                    }
2217
2218                    tracing::warn!(
2219                        "⚠️ [Factory] Connection attempt {}/{} failed: {}",
2220                        attempt + 1,
2221                        MAX_RETRIES + 1,
2222                        e
2223                    );
2224                    last_error = Some(e);
2225
2226                    // Cleanup failed connection before retry
2227                    self.cleanup_cancelled_connection(target_id).await;
2228                }
2229            }
2230        }
2231
2232        // All retries exhausted
2233        Err(last_error.unwrap_or_else(|| {
2234            RuntimeError::Other(anyhow::anyhow!("Connection failed after all retries"))
2235        }))
2236    }
2237
2238    /// Single attempt to create a WebRTC connection
2239    async fn try_create_connection_once(
2240        self: &Arc<Self>,
2241        target_id: &ActrId,
2242        cancel_token: Option<&CancellationToken>,
2243    ) -> RuntimeResult<WebRtcConnection> {
2244        #[cfg(feature = "opentelemetry")]
2245        self.root_context_map
2246            .write()
2247            .await
2248            .insert(target_id.clone(), tracing::Span::current().context());
2249
2250        let ready_rx = self.initiate_connection(target_id).await?;
2251
2252        // Check cancellation after initiation
2253        if let Some(token) = cancel_token {
2254            if token.is_cancelled() {
2255                self.cleanup_cancelled_connection(target_id).await;
2256                return Err(RuntimeError::Other(anyhow::anyhow!(
2257                    "Connection creation cancelled after initiation"
2258                )));
2259            }
2260        }
2261
2262        // Wait for connection to be ready (10s timeout) with cancellation support
2263        let timeout_duration = std::time::Duration::from_secs(10);
2264
2265        let wait_result = if let Some(token) = cancel_token {
2266            tokio::select! {
2267                biased;
2268                _ = token.cancelled() => {
2269                    self.cleanup_cancelled_connection(target_id).await;
2270                    return Err(RuntimeError::Other(anyhow::anyhow!(
2271                        "Connection creation cancelled while waiting"
2272                    )));
2273                }
2274                _ = tokio::time::sleep(timeout_duration) => {
2275                    Err(RuntimeError::DeadlineExceeded {
2276                        message: "WebRTC connection establishment timeout".to_string(),
2277                        timeout_ms: 10000,
2278                    })
2279                }
2280                result = ready_rx => {
2281                    result.map_err(|_| RuntimeError::Other(anyhow::anyhow!(
2282                        "Connection establishment failed (channel closed)"
2283                    )))
2284                }
2285            }
2286        } else {
2287            tokio::time::timeout(timeout_duration, ready_rx)
2288                .await
2289                .map_err(|_| RuntimeError::DeadlineExceeded {
2290                    message: "WebRTC connection establishment timeout".to_string(),
2291                    timeout_ms: 10000,
2292                })?
2293                .map_err(|_| {
2294                    RuntimeError::Other(anyhow::anyhow!(
2295                        "Connection establishment failed (channel closed)"
2296                    ))
2297                })
2298        };
2299
2300        wait_result?;
2301
2302        tracing::info!(
2303            "✅ [Factory] WebRTC connection ready: {:?}",
2304            target_id.to_string_repr()
2305        );
2306
2307        // Final cancellation check
2308        if let Some(token) = cancel_token {
2309            if token.is_cancelled() {
2310                self.cleanup_cancelled_connection(target_id).await;
2311                return Err(RuntimeError::Other(anyhow::anyhow!(
2312                    "Connection creation cancelled after ready"
2313                )));
2314            }
2315        }
2316
2317        // Get and return WebRtcConnection
2318        let peers = self.peers.read().await;
2319        peers
2320            .get(target_id)
2321            .map(|state| state.webrtc_conn.clone())
2322            .ok_or_else(|| {
2323                RuntimeError::Other(anyhow::anyhow!(
2324                    "Peer not found after connection establishment"
2325                ))
2326            })
2327    }
2328
2329    /// Send media sample to target Actor via WebRTC Track
2330    ///
2331    /// # Arguments
2332    /// - `target`: Target Actor ID
2333    /// - `track_id`: Media track identifier
2334    /// - `sample`: Media sample to send
2335    ///
2336    /// # Returns
2337    /// Ok(()) if sent successfully
2338    pub async fn send_media_sample(
2339        &self,
2340        target: &actr_protocol::ActrId,
2341        track_id: &str,
2342        sample: actr_framework::MediaSample,
2343    ) -> RuntimeResult<()> {
2344        use webrtc::rtp::header::Header as RtpHeader;
2345        use webrtc::rtp::packet::Packet as RtpPacket;
2346
2347        // 1. Get PeerState for target
2348        let peers = self.peers.read().await;
2349        let peer_state = peers.get(target).ok_or_else(|| {
2350            RuntimeError::Other(anyhow::anyhow!(
2351                "No connection to target: {}",
2352                target.to_string_repr()
2353            ))
2354        })?;
2355
2356        // 2. Get Track from WebRtcConnection
2357        let track = peer_state
2358            .webrtc_conn
2359            .get_media_track(track_id)
2360            .await
2361            .ok_or_else(|| RuntimeError::Other(anyhow::anyhow!("Track not found: {track_id}")))?;
2362
2363        // 3. Get next sequence number for this track
2364        let sequence_number = peer_state
2365            .webrtc_conn
2366            .next_sequence_number(track_id)
2367            .await
2368            .ok_or_else(|| {
2369                RuntimeError::Other(anyhow::anyhow!(
2370                    "Sequence number not found for track: {track_id}"
2371                ))
2372            })?;
2373
2374        // 4. Get SSRC for this track
2375        let ssrc = peer_state
2376            .webrtc_conn
2377            .get_ssrc(track_id)
2378            .await
2379            .ok_or_else(|| {
2380                RuntimeError::Other(anyhow::anyhow!("SSRC not found for track: {track_id}"))
2381            })?;
2382
2383        // 5. Construct RTP packet from MediaSample
2384        let rtp_packet = RtpPacket {
2385            header: RtpHeader {
2386                version: 2,
2387                padding: false,
2388                extension: false,
2389                marker: true,     // Mark each sample (simplified)
2390                payload_type: 96, // Dynamic payload type (simplified - TODO: codec-specific)
2391                sequence_number,  // Per-track sequence number (wraps at 65535)
2392                timestamp: sample.timestamp,
2393                ssrc, // Unique SSRC per track (randomly generated)
2394                ..Default::default()
2395            },
2396            payload: sample.data,
2397        };
2398
2399        // 6. Send RTP packet via track
2400        track
2401            .write_rtp(&rtp_packet)
2402            .await
2403            .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to write RTP: {e}")))?;
2404
2405        tracing::debug!(
2406            "📤 Sent MediaSample: track_id={}, seq={}, ssrc=0x{:08x}, timestamp={}, size={}",
2407            track_id,
2408            sequence_number,
2409            ssrc,
2410            sample.timestamp,
2411            rtp_packet.payload.len()
2412        );
2413
2414        Ok(())
2415    }
2416
2417    /// Add dynamic media track and trigger SDP renegotiation
2418    ///
2419    /// # Arguments
2420    /// - `target`: Target Actor ID
2421    /// - `track_id`: Media track identifier
2422    /// - `codec`: Codec name (e.g., "VP8", "H264", "OPUS")
2423    /// - `media_type`: Media type ("video" or "audio")
2424    ///
2425    /// # Returns
2426    /// Ok(()) if track added and renegotiation completed successfully
2427    ///
2428    /// # Note
2429    /// This triggers SDP renegotiation on the existing PeerConnection.
2430    /// The connection remains active and existing tracks continue transmitting.
2431    pub async fn add_dynamic_track(
2432        &self,
2433        target: &actr_protocol::ActrId,
2434        track_id: String,
2435        codec: &str,
2436        media_type: &str,
2437    ) -> RuntimeResult<()> {
2438        tracing::info!(
2439            "🎬 Adding dynamic track: track_id={}, codec={}, type={}, target={}",
2440            track_id,
2441            codec,
2442            media_type,
2443            target.to_string_repr()
2444        );
2445
2446        // 1. Get existing peer state and extract needed parts
2447        let (webrtc_conn, peer_connection) = {
2448            let peers = self.peers.read().await;
2449            let state = peers.get(target).ok_or_else(|| {
2450                RuntimeError::Other(anyhow::anyhow!(
2451                    "No connection to target: {}",
2452                    target.to_string_repr()
2453                ))
2454            })?;
2455            (state.webrtc_conn.clone(), state.peer_connection.clone())
2456        };
2457
2458        // 2. Add track to existing PeerConnection
2459        webrtc_conn
2460            .add_media_track(track_id.clone(), codec, media_type)
2461            .await?;
2462
2463        tracing::info!("✅ Added track to PeerConnection: {}", track_id);
2464
2465        // 3. Trigger SDP renegotiation
2466        let root_span = tracing::info_span!("add_track", target_id = %target.to_string_repr());
2467        #[cfg(feature = "opentelemetry")]
2468        self.root_context_map
2469            .write()
2470            .await
2471            .insert(target.clone(), root_span.context());
2472
2473        self.renegotiate_connection(target, &peer_connection)
2474            .instrument(root_span)
2475            .await?;
2476
2477        tracing::info!("✅ Dynamic track added successfully: {}", track_id);
2478
2479        Ok(())
2480    }
2481
2482    /// Renegotiate SDP with existing peer
2483    ///
2484    /// Creates new Offer with updated track list and exchanges SDP.
2485    /// ICE connection remains active (no restart).
2486    async fn renegotiate_connection(
2487        &self,
2488        target: &actr_protocol::ActrId,
2489        peer_connection: &Arc<RTCPeerConnection>,
2490    ) -> RuntimeResult<()> {
2491        tracing::info!(
2492            "🔄 Starting SDP renegotiation with {}",
2493            target.to_string_repr()
2494        );
2495
2496        // 1. Create new Offer (includes all tracks: old + new)
2497        let offer = peer_connection.create_offer(None).await.map_err(|e| {
2498            RuntimeError::Other(anyhow::anyhow!("Failed to create renegotiation offer: {e}"))
2499        })?;
2500        let offer_sdp = offer.sdp.clone();
2501
2502        // 2. Set local description
2503        peer_connection
2504            .set_local_description(offer)
2505            .await
2506            .map_err(|e| {
2507                RuntimeError::Other(anyhow::anyhow!("Failed to set local description: {e}"))
2508            })?;
2509
2510        tracing::debug!(
2511            "📝 Created renegotiation Offer (SDP length: {})",
2512            offer_sdp.len()
2513        );
2514
2515        // 3. Send Offer via signaling server
2516        let session_desc = actr_protocol::SessionDescription {
2517            r#type: SdpType::Offer as i32,
2518            sdp: offer_sdp,
2519        };
2520        let payload = actr_relay::Payload::SessionDescription(session_desc);
2521        self.send_actr_relay(target, payload).await?;
2522
2523        tracing::info!("✅ Sent renegotiation Offer to {}", target.to_string_repr());
2524
2525        // 4. Answer will be handled by existing handle_answer() method
2526        // Note: We don't wait for Answer here to avoid blocking.
2527        // The renegotiation completes asynchronously when Answer arrives.
2528
2529        Ok(())
2530    }
2531
2532    /// Initiate ICE restart on an existing connection (offerer side).
2533    /// Uses atomic state management within peers lock for complete de-duplication.
2534    /// If ICE restart fails after all retries, attempts to establish a new connection.
2535    pub async fn restart_ice(
2536        self: &Arc<Self>,
2537        target: &actr_protocol::ActrId,
2538    ) -> RuntimeResult<()> {
2539        // All ICE restart state management is now unified within the peers lock
2540        // This provides atomic de-duplication and eliminates race conditions
2541        let (peer_connection, should_start_restart) = {
2542            let mut peers = self.peers.write().await;
2543            if let Some(state) = peers.get_mut(target) {
2544                // 1. Check if restart is already in-flight using restart_task_handle
2545                if let Some(ref handle) = state.restart_task_handle {
2546                    if !handle.is_finished() {
2547                        tracing::debug!(
2548                            "🚫 ICE restart already in-flight for serial={}, skipping (task not finished)",
2549                            target.serial_number
2550                        );
2551                        return Ok(());
2552                    }
2553                }
2554
2555                // 2. Also check ice_restart_inflight flag as a backup
2556                if state.ice_restart_inflight {
2557                    tracing::debug!(
2558                        "🚫 ICE restart already in-flight for serial={}, skipping (ice_restart_inflight=true)",
2559                        target.serial_number
2560                    );
2561                    return Ok(());
2562                }
2563
2564                // 3. Check if we are the offerer
2565                if !state.is_offerer {
2566                    tracing::warn!(
2567                        "🚫 Skip ICE restart to serial={}: we are not the offerer",
2568                        target.serial_number
2569                    );
2570                    return Ok(());
2571                }
2572
2573                // 4. Set flag to prevent concurrent restarts
2574                state.ice_restart_inflight = true;
2575                // Note: ice_restart_attempts will be managed by do_ice_restart_inner
2576                // Note: restart_task_handle will be set after spawning the task
2577
2578                (state.peer_connection.clone(), true)
2579            } else {
2580                tracing::warn!(
2581                    "🚫 Skip ICE restart to serial={}: peer not found",
2582                    target.serial_number
2583                );
2584                return Ok(());
2585            }
2586        };
2587
2588        if !should_start_restart {
2589            return Ok(());
2590        }
2591
2592        tracing::info!(
2593            "♻️ Initiating ICE restart to serial={}",
2594            target.serial_number
2595        );
2596
2597        // Spawn restart task
2598        let target_clone = target.clone();
2599        let peers = Arc::clone(&self.peers);
2600        let negotiator = self.negotiator.clone();
2601        let local_id = self.local_id.clone();
2602        let credential_state = self.credential_state.clone();
2603        let signaling_client = Arc::clone(&self.signaling_client);
2604        let coordinator_weak = Arc::downgrade(self);
2605
2606        let handle = tokio::spawn(async move {
2607            let restart_result = Self::do_ice_restart_inner(
2608                &target_clone,
2609                &peers,
2610                peer_connection,
2611                &negotiator,
2612                &local_id,
2613                credential_state,
2614                &signaling_client,
2615            )
2616            .await;
2617
2618            match restart_result {
2619                Ok(true) => {
2620                    tracing::info!(
2621                        "✅ ICE restart succeeded for serial={}",
2622                        target_clone.serial_number
2623                    );
2624                }
2625                Ok(false) => {
2626                    // ICE restart failed after all retries, clean up and try to establish new connection
2627                    tracing::warn!(
2628                        "⚠️ ICE restart exhausted for serial={}, cleaning up and attempting fresh connection",
2629                        target_clone.serial_number
2630                    );
2631
2632                    if let Some(coord) = coordinator_weak.upgrade() {
2633                        // First, clean up the old connection resources
2634                        tracing::info!(
2635                            "🧹 Cleaning up old connection after ICE restart failure for serial={}",
2636                            target_clone.serial_number
2637                        );
2638                        coord.cleanup_cancelled_connection(&target_clone).await;
2639                    }
2640                }
2641                Err(e) => {
2642                    tracing::error!(
2643                        "❌ ICE restart failed for serial={}: {}",
2644                        target_clone.serial_number,
2645                        e
2646                    );
2647
2648                    // Clean up resources on error
2649                    if let Some(coord) = coordinator_weak.upgrade() {
2650                        tracing::info!(
2651                            "🧹 Cleaning up connection after ICE restart error for serial={}",
2652                            target_clone.serial_number
2653                        );
2654                        coord.cleanup_cancelled_connection(&target_clone).await;
2655                    }
2656                }
2657            }
2658
2659            // Cleanup restart_task_handle registration - now in peers lock
2660            {
2661                let mut peers_guard = peers.write().await;
2662                if let Some(state) = peers_guard.get_mut(&target_clone) {
2663                    state.restart_task_handle = None;
2664                }
2665            }
2666        });
2667
2668        // Store the restart handle immediately in the same peers lock
2669        // This completes the atomic state transition
2670        {
2671            let mut peers = self.peers.write().await;
2672            if let Some(state) = peers.get_mut(target) {
2673                state.restart_task_handle = Some(handle);
2674            }
2675        }
2676
2677        Ok(())
2678    }
2679
2680    /// Internal ICE restart implementation with retries
2681    /// Returns Ok(true) if restart succeeded, Ok(false) if all retries exhausted
2682    async fn do_ice_restart_inner(
2683        target: &ActrId,
2684        peers: &Arc<RwLock<HashMap<ActrId, PeerState>>>,
2685        peer_connection: Arc<RTCPeerConnection>,
2686        negotiator: &WebRtcNegotiator,
2687        local_id: &ActrId,
2688        credential_state: CredentialState,
2689        signaling_client: &Arc<dyn SignalingClient>,
2690    ) -> RuntimeResult<bool> {
2691        // Use enhanced backoff with total duration limit
2692        let backoff = ExponentialBackoff::with_total_duration(
2693            Duration::from_millis(ICE_RESTART_INITIAL_BACKOFF_MS),
2694            Duration::from_millis(ICE_RESTART_MAX_BACKOFF_MS),
2695            Some(ICE_RESTART_MAX_RETRIES),
2696            ICE_RESTART_MAX_TOTAL_DURATION,
2697        );
2698
2699        let mut restart_ok = false;
2700        let mut gathering_started_at: Option<Instant> = None;
2701
2702        for delay in backoff {
2703            // ========== Guard 1: Check signaling state ==========
2704            if !signaling_client.is_connected() {
2705                tracing::debug!(
2706                    "🔄 Signaling not ready for ICE restart to serial={}, will retry after {:?}",
2707                    target.serial_number,
2708                    delay
2709                );
2710                tokio::time::sleep(delay).await;
2711                continue; // Skip this iteration, don't create offer
2712            }
2713
2714            // ========== Guard 2: Check ICE gathering state (with timeout detection) ==========
2715            let gathering_state = peer_connection.ice_gathering_state();
2716            if gathering_state == RTCIceGatheringState::Gathering {
2717                let started = gathering_started_at.get_or_insert_with(Instant::now);
2718                let gathering_duration = started.elapsed();
2719
2720                if gathering_duration > ICE_GATHERING_TIMEOUT {
2721                    tracing::error!(
2722                        "❌ ICE gathering stuck for {:?}, aborting ICE restart for serial={}",
2723                        gathering_duration,
2724                        target.serial_number
2725                    );
2726                    // Close peer connection to stop gathering
2727                    let _ = peer_connection.close().await;
2728                    return Ok(false);
2729                }
2730
2731                tracing::debug!(
2732                    "⏳ ICE gathering in progress ({:?} elapsed), will retry after {:?}",
2733                    gathering_duration,
2734                    delay
2735                );
2736                tokio::time::sleep(delay).await;
2737                continue; // Skip this iteration, wait for gathering to complete
2738            } else {
2739                // Not gathering, reset timer
2740                gathering_started_at = None;
2741            }
2742
2743            // ========== Both guards passed, safe to create offer ==========
2744            let (offer_sdp, attempt) = {
2745                let mut peers_guard = peers.write().await;
2746                let state = match peers_guard.get_mut(target) {
2747                    Some(s) => s,
2748                    None => {
2749                        tracing::warn!(
2750                            "🚫 Peer state not found during ICE restart for serial={}",
2751                            target.serial_number
2752                        );
2753                        return Ok(false);
2754                    }
2755                };
2756
2757                if !state.is_offerer {
2758                    tracing::warn!(
2759                        "🚫 Skip ICE restart to serial={}: we are not the offerer",
2760                        target.serial_number
2761                    );
2762                    state.ice_restart_inflight = false;
2763                    state.ice_restart_attempts = 0;
2764                    return Ok(false);
2765                }
2766
2767                // ice_restart_inflight is already set to true in restart_ice()
2768                // Just increment the attempt counter here
2769                state.ice_restart_attempts += 1;
2770                let attempt = state.ice_restart_attempts;
2771
2772                let offer_sdp = negotiator
2773                    .create_ice_restart_offer(&peer_connection)
2774                    .await?;
2775
2776                (offer_sdp, attempt)
2777            };
2778
2779            // Send ICE restart offer
2780            let relay = ActrRelay {
2781                source: local_id.clone(),
2782                credential: credential_state.credential().await,
2783                target: target.clone(),
2784                payload: Some(actr_relay::Payload::SessionDescription(
2785                    actr_protocol::SessionDescription {
2786                        r#type: SdpType::IceRestartOffer as i32,
2787                        sdp: offer_sdp,
2788                    },
2789                )),
2790            };
2791
2792            let envelope = SignalingEnvelope {
2793                envelope_version: 1,
2794                envelope_id: uuid::Uuid::new_v4().to_string(),
2795                reply_for: None,
2796                timestamp: prost_types::Timestamp {
2797                    seconds: chrono::Utc::now().timestamp(),
2798                    nanos: 0,
2799                },
2800                flow: Some(signaling_envelope::Flow::ActrRelay(relay)),
2801                traceparent: None,
2802                tracestate: None,
2803            };
2804
2805            if let Err(e) = signaling_client.send_envelope(envelope).await {
2806                tracing::error!(
2807                    "❌ Failed to send ICE restart offer to serial={}: {}",
2808                    target.serial_number,
2809                    e
2810                );
2811                // Mark inflight as false and continue to next retry
2812                let mut peers_guard = peers.write().await;
2813                if let Some(state) = peers_guard.get_mut(target) {
2814                    state.ice_restart_inflight = false;
2815                }
2816                tokio::time::sleep(delay).await;
2817                continue;
2818            }
2819
2820            tracing::info!(
2821                "♻️ ICE restart attempt {} sent to serial={}",
2822                attempt,
2823                target.serial_number
2824            );
2825
2826            // Wait for restart completion
2827            let success =
2828                Self::wait_for_restart_completion_static(peers, target, ICE_RESTART_TIMEOUT).await;
2829
2830            if success {
2831                restart_ok = true;
2832                break;
2833            }
2834
2835            tracing::warn!(
2836                "⚠️ ICE restart attempt {} timed out for serial={}",
2837                attempt,
2838                target.serial_number
2839            );
2840
2841            // Mark current attempt ended
2842            {
2843                let mut peers_guard = peers.write().await;
2844                if let Some(state) = peers_guard.get_mut(target) {
2845                    state.ice_restart_inflight = false;
2846                }
2847            }
2848
2849            // Exponential backoff before retrying
2850            tracing::info!(
2851                "⏳ Waiting {:?} before next ICE restart attempt to serial={}",
2852                delay,
2853                target.serial_number
2854            );
2855            tokio::time::sleep(delay).await;
2856        }
2857
2858        if !restart_ok {
2859            tracing::warn!(
2860                "⚠️ Backoff iterator exhausted for serial={}, stopping retries and dropping peer",
2861                target.serial_number
2862            );
2863            Self::drop_peer_connection_static(peers, target).await;
2864            return Ok(false);
2865        }
2866
2867        Ok(true)
2868    }
2869
2870    /// Static version of wait_for_restart_completion for use in spawned task
2871    /// Uses read lock for checking status to avoid blocking other peers
2872    async fn wait_for_restart_completion_static(
2873        peers: &Arc<RwLock<HashMap<ActrId, PeerState>>>,
2874        target: &ActrId,
2875        timeout: Duration,
2876    ) -> bool {
2877        let mut interval = tokio::time::interval(Duration::from_millis(100));
2878        let timeout_sleep = tokio::time::sleep(timeout);
2879        tokio::pin!(timeout_sleep);
2880
2881        loop {
2882            tokio::select! {
2883                _ = &mut timeout_sleep => {
2884                    return false;
2885                }
2886                _ = interval.tick() => {
2887                    // Use read lock to check status (allows concurrent access)
2888                    let is_done = {
2889                        let peers_guard = peers.read().await;
2890                        match peers_guard.get(target) {
2891                            Some(state) => !state.ice_restart_inflight,
2892                            None => return false,
2893                        }
2894                    };
2895
2896                    if is_done {
2897                        // Only acquire write lock when actually need to reset counter
2898                        let mut peers_guard = peers.write().await;
2899                        if let Some(state) = peers_guard.get_mut(target) {
2900                            state.ice_restart_attempts = 0;
2901                        }
2902                        return true;
2903                    }
2904                }
2905            }
2906        }
2907    }
2908
2909    /// Static version of drop_peer_connection for use in spawned task
2910    ///
2911    /// NOTE: Releases the write lock BEFORE calling close() to avoid blocking
2912    /// other operations on `peers` during potentially slow close operations.
2913    async fn drop_peer_connection_static(
2914        peers: &Arc<RwLock<HashMap<ActrId, PeerState>>>,
2915        target: &ActrId,
2916    ) {
2917        // Remove peer from map first, then close outside the lock
2918        let state_to_close = {
2919            let mut peers_guard = peers.write().await;
2920            peers_guard.remove(target)
2921        }; // Lock released here
2922
2923        if let Some(state) = state_to_close {
2924            if let Err(e) = state.peer_connection.close().await {
2925                tracing::warn!(
2926                    "⚠️ Failed to close peer_connection for {}: {}",
2927                    target.serial_number,
2928                    e
2929                );
2930            }
2931            if let Err(e) = state.webrtc_conn.close().await {
2932                tracing::warn!(
2933                    "⚠️ Failed to close WebRtcConnection for {}: {}",
2934                    target.serial_number,
2935                    e
2936                );
2937            }
2938            tracing::info!("🧹 Dropped peer connection for {}", target.serial_number);
2939        } else {
2940            tracing::warn!(
2941                "⚠️ drop_peer_connection: peer not found {}",
2942                target.serial_number
2943            );
2944        }
2945    }
2946
2947    /// Handle renegotiation Offer (existing connection)
2948    ///
2949    /// Called when receiving an Offer on an already-established connection.
2950    /// This happens when the remote peer adds/removes tracks dynamically.
2951    #[allow(dead_code)]
2952    async fn handle_renegotiation_offer(
2953        &self,
2954        from: &ActrId,
2955        offer_sdp: String,
2956    ) -> RuntimeResult<()> {
2957        tracing::info!(
2958            "🔄 Processing renegotiation Offer from {}",
2959            from.to_string_repr()
2960        );
2961
2962        // 1. Get existing peer connection
2963        let peer_connection = {
2964            let peers = self.peers.read().await;
2965            let state = peers.get(from).ok_or_else(|| {
2966                RuntimeError::Other(anyhow::anyhow!("Peer state not found for renegotiation"))
2967            })?;
2968            state.peer_connection.clone()
2969        };
2970
2971        // 2. Set remote description (new Offer)
2972        let offer =
2973            webrtc::peer_connection::sdp::session_description::RTCSessionDescription::offer(
2974                offer_sdp,
2975            )
2976            .map_err(|e| {
2977                RuntimeError::Other(anyhow::anyhow!("Failed to parse renegotiation offer: {e}"))
2978            })?;
2979        peer_connection
2980            .set_remote_description(offer)
2981            .await
2982            .map_err(|e| {
2983                RuntimeError::Other(anyhow::anyhow!("Failed to set remote description: {e}"))
2984            })?;
2985
2986        tracing::debug!("✅ Set remote description (renegotiation Offer)");
2987
2988        // 3. Create Answer
2989        let answer = peer_connection.create_answer(None).await.map_err(|e| {
2990            RuntimeError::Other(anyhow::anyhow!(
2991                "Failed to create renegotiation answer: {e}"
2992            ))
2993        })?;
2994        let answer_sdp = answer.sdp.clone();
2995
2996        // 4. Set local description
2997        peer_connection
2998            .set_local_description(answer)
2999            .await
3000            .map_err(|e| {
3001                RuntimeError::Other(anyhow::anyhow!("Failed to set local description: {e}"))
3002            })?;
3003
3004        tracing::debug!(
3005            "✅ Created renegotiation Answer (SDP length: {})",
3006            answer_sdp.len()
3007        );
3008
3009        // 5. Send Answer via signaling server
3010        let session_desc = actr_protocol::SessionDescription {
3011            r#type: SdpType::Answer as i32,
3012            sdp: answer_sdp,
3013        };
3014        let payload = actr_relay::Payload::SessionDescription(session_desc);
3015        self.send_actr_relay(from, payload).await?;
3016
3017        tracing::info!("✅ Sent renegotiation Answer to {}", from.to_string_repr());
3018
3019        // Note: on_track callback will automatically trigger for new remote tracks
3020        // No need to manually handle track additions here
3021
3022        Ok(())
3023    }
3024
3025    /// Handle ICE restart Offer on an existing connection.
3026    /// Only the answerer should accept restart; offerer-side restarts are initiated locally.
3027    async fn handle_ice_restart_offer(
3028        self: &Arc<Self>,
3029        from: &ActrId,
3030        offer_sdp: String,
3031    ) -> RuntimeResult<()> {
3032        // Locate peer state and ensure we are not the offerer
3033        let (peer_connection, is_offerer) = {
3034            let peers = self.peers.read().await;
3035            let state = peers.get(from).ok_or_else(|| {
3036                RuntimeError::Other(anyhow::anyhow!(
3037                    "ICE restart offer received for unknown peer"
3038                ))
3039            })?;
3040            (state.peer_connection.clone(), state.is_offerer)
3041        };
3042
3043        if is_offerer {
3044            tracing::warn!(
3045                "🚫 Ignoring ICE restart offer from {:?}: we are current offerer",
3046                from.serial_number
3047            );
3048            return Ok(());
3049        }
3050
3051        // Apply remote restart offer and generate answer
3052        let answer_sdp = self
3053            .negotiator
3054            .create_answer(&peer_connection, offer_sdp)
3055            .await?;
3056
3057        // Send restart answer back
3058        let session_desc = actr_protocol::SessionDescription {
3059            r#type: SdpType::Answer as i32,
3060            sdp: answer_sdp,
3061        };
3062        let payload = actr_relay::Payload::SessionDescription(session_desc);
3063        self.send_actr_relay(from, payload).await?;
3064
3065        // Flush any buffered ICE candidates collected before remote description was set
3066        self.flush_pending_candidates(from, &peer_connection)
3067            .await?;
3068
3069        tracing::info!(
3070            "✅ Completed ICE restart answer to serial={}",
3071            from.serial_number
3072        );
3073
3074        Ok(())
3075    }
3076
3077    /// Remove peer connection and clear associated cached state.
3078
3079    /// Handle role assignment result
3080    #[cfg_attr(
3081        feature = "opentelemetry",
3082        tracing::instrument(skip_all, fields(peer_id = ?peer.to_string_repr()))
3083    )]
3084    async fn handle_role_assignment(self: &Arc<Self>, assign: RoleAssignment, peer: ActrId) {
3085        tracing::debug!(?assign, ?peer, "handle_role_assignment");
3086
3087        // ========== Check for role change to offerer and clean up if needed ==========
3088        // Only clean up when becoming offerer (we need to initiate a new connection)
3089        // If becoming answerer, we just wait for the peer's offer
3090        if assign.is_offerer {
3091            let has_connection = self.peers.read().await.contains_key(&peer);
3092
3093            // Clean up if we have an existing connection (reconnection scenario)
3094            if has_connection {
3095                tracing::info!(
3096                    "🔄 Assigned as offerer for {} (has_connection={}), cleaning up for new connection",
3097                    peer.serial_number,
3098                    has_connection
3099                );
3100
3101                // Spawn cleanup in background to avoid blocking signaling loop
3102                // cleanup_cancelled_connection may call close() which sends events,
3103                // and those events may trigger transport cleanup that could block
3104                let this = Arc::clone(self);
3105                let peer_clone = peer.clone();
3106                tokio::spawn(async move {
3107                    this.cleanup_cancelled_connection(&peer_clone).await;
3108                });
3109            }
3110        }
3111        // ========== End role change check ==========
3112
3113        // 先尝试唤醒等待的协商
3114        let role_sender = {
3115            let mut neg = self.peer_negotiation.lock().await;
3116            neg.get_mut(&peer).and_then(|s| s.role_tx.take())
3117        };
3118        if let Some(sender) = role_sender {
3119            if sender.send(assign.is_offerer).is_ok() {
3120                return;
3121            }
3122        }
3123
3124        tracing::debug!(
3125            ?assign,
3126            ?peer,
3127            "handle_role_assignment: no pending negotiation"
3128        );
3129        // 如果目前还没有连接,根据角色立即行动,避免依赖 send_message 才触发
3130        let has_connection = self.peers.read().await.contains_key(&peer);
3131        if has_connection {
3132            return;
3133        }
3134        if assign.is_offerer {
3135            tracing::info!(
3136                "🎭 Acting as offerer to {} per assignment (no pending negotiation)",
3137                peer.serial_number
3138            );
3139            // Spawn the offer connection in background to avoid blocking signaling loop
3140            let this = Arc::clone(self);
3141            let peer_clone = peer.clone();
3142            #[cfg(feature = "opentelemetry")]
3143            let current_span = tracing::Span::current();
3144            tokio::spawn(async move {
3145                let start_offer_fut = this.start_offer_connection(&peer_clone, true);
3146                #[cfg(feature = "opentelemetry")]
3147                let start_offer_fut = start_offer_fut.instrument(current_span);
3148                match start_offer_fut.await {
3149                    Ok(ready_rx) => {
3150                        this.peer_negotiation
3151                            .lock()
3152                            .await
3153                            .entry(peer_clone.clone())
3154                            .or_default()
3155                            .ready_rx = Some(ready_rx);
3156                    }
3157                    Err(e) => {
3158                        tracing::warn!(
3159                            "⚠️ Failed to start proactive offer connection to {}: {}",
3160                            peer_clone.serial_number,
3161                            e
3162                        );
3163                    }
3164                }
3165            });
3166        } else {
3167            tracing::debug!(
3168                "🎭 Assignment marks us as answerer for {}, waiting for offer (no pending negotiation)",
3169                peer.serial_number
3170            );
3171            let (tx, _rx) = oneshot::channel();
3172            self.peer_negotiation
3173                .lock()
3174                .await
3175                .entry(peer.clone())
3176                .or_default()
3177                .ready_tx = Some(tx);
3178
3179            // 防止长时间等不到 offer:超时后主动重新协商/建链
3180            let weak = Arc::downgrade(self);
3181            let peer_clone = peer.clone();
3182            #[cfg(feature = "opentelemetry")]
3183            let current_span = tracing::Span::current();
3184            tokio::spawn(async move {
3185                tokio::time::sleep(ROLE_WAIT_TIMEOUT).await;
3186                if let Some(coord) = weak.upgrade() {
3187                    // 如果已经有连接或 ready 被消费则退出
3188                    if coord.peers.read().await.contains_key(&peer_clone) {
3189                        return;
3190                    }
3191                    let pending = {
3192                        let mut neg = coord.peer_negotiation.lock().await;
3193                        neg.get_mut(&peer_clone).and_then(|s| s.ready_tx.take())
3194                    };
3195                    if pending.is_none() {
3196                        return;
3197                    }
3198                    tracing::warn!(
3199                        "⏳ Waiting for offer from {} timed out, force acting as offerer",
3200                        peer_clone.serial_number
3201                    );
3202                    let start_offer_fut = coord.start_offer_connection(&peer_clone, true);
3203                    #[cfg(feature = "opentelemetry")]
3204                    let start_offer_fut = start_offer_fut.instrument(current_span);
3205                    match start_offer_fut.await {
3206                        Ok(ready_rx) => {
3207                            coord
3208                                .peer_negotiation
3209                                .lock()
3210                                .await
3211                                .entry(peer_clone.clone())
3212                                .or_default()
3213                                .ready_rx = Some(ready_rx);
3214                        }
3215                        Err(e) => {
3216                            tracing::warn!(
3217                                "⚠️ Failed to start offer connection after timeout to {}: {}",
3218                                peer_clone.serial_number,
3219                                e
3220                            );
3221                        }
3222                    }
3223                }
3224            });
3225        }
3226    }
3227
3228    /// Initiate role negotiation and await assignment
3229    #[cfg_attr(
3230        feature = "opentelemetry",
3231        tracing::instrument(skip_all, fields(target_id = ?target.to_string_repr()))
3232    )]
3233    async fn negotiate_role(&self, target: &ActrId) -> RuntimeResult<bool> {
3234        let (tx, rx) = oneshot::channel();
3235        // 按目标 ActorId 记录等待的角色分配
3236        self.peer_negotiation
3237            .lock()
3238            .await
3239            .entry(target.clone())
3240            .or_default()
3241            .role_tx = Some(tx);
3242
3243        let payload = actr_relay::Payload::RoleNegotiation(RoleNegotiation {
3244            from: self.local_id.clone(),
3245            to: target.clone(),
3246            realm_id: self.local_id.realm.realm_id,
3247        });
3248
3249        tracing::debug!(
3250            "🔄 Sending role negotiation to serial={}",
3251            target.serial_number
3252        );
3253        self.send_actr_relay(target, payload).await?;
3254
3255        rx.await.map_err(|_| {
3256            RuntimeError::Other(anyhow::anyhow!(
3257                "Role negotiation channel closed before assignment"
3258            ))
3259        })
3260    }
3261
3262    /// Install a state change handler to auto-trigger ICE restart on disconnection (offerer only).
3263    fn install_restart_handler(
3264        self: &Arc<Self>,
3265        webrtc_conn: WebRtcConnection,
3266        peer_connection: Arc<RTCPeerConnection>,
3267        target: ActrId,
3268    ) {
3269        let coord = Arc::downgrade(self);
3270        peer_connection.on_peer_connection_state_change(Box::new(
3271            move |state: RTCPeerConnectionState| {
3272                let coord = coord.clone();
3273                let target = target.clone();
3274                let webrtc_conn = webrtc_conn.clone();
3275                Box::pin(async move {
3276                    // First run the base WebRtcConnection cleanup.
3277                    webrtc_conn.handle_state_change(state).await;
3278
3279                    tracing::info!(
3280                        "📡 PeerConnection state for {} -> {:?}",
3281                        target.serial_number,
3282                        state
3283                    );
3284
3285                    // Update state tracking for health check
3286                    if let Some(c) = coord.upgrade() {
3287                        let mut peers = c.peers.write().await;
3288                        if let Some(peer_state) = peers.get_mut(&target) {
3289                            peer_state.current_state = state;
3290                            peer_state.last_state_change = std::time::Instant::now();
3291                        }
3292                        drop(peers); // Release lock before potentially long-running operations
3293                    }
3294
3295                    if matches!(
3296                        state,
3297                        RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Failed
3298                    ) {
3299                        if let Some(c) = coord.upgrade() {
3300                            if let Err(e) = c.restart_ice(&target).await {
3301                                tracing::warn!(
3302                                    "⚠️ Failed to auto restart ICE to {}: {}",
3303                                    target.serial_number,
3304                                    e
3305                                );
3306                            }
3307                        }
3308                    }
3309                })
3310            },
3311        ));
3312    }
3313}