1#[allow(dead_code)]
4fn is_ipv4_candidate_allowed(cand: &str) -> bool {
5 if cand.contains("fe80::") || cand.contains(" udp6 ") || cand.contains("::") {
8 return false;
9 }
10
11 true
14}
15
16use super::connection::WebRtcConnection;
24use super::negotiator::WebRtcNegotiator;
25#[cfg(feature = "opentelemetry")]
26use super::trace;
27use super::{SignalingClient, WebRtcConfig};
28use crate::INITIAL_CONNECTION_TIMEOUT;
29use crate::error::{RuntimeError, RuntimeResult};
30use crate::inbound::MediaFrameRegistry;
31use crate::lifecycle::CredentialState;
32use crate::transport::connection_event::{ConnectionEvent, ConnectionEventBroadcaster};
33use actr_framework::Bytes;
34use actr_protocol::ActrIdExt;
35use actr_protocol::prost::Message as ProstMessage;
36use actr_protocol::{
37 ActrId, ActrRelay, PayloadType, RoleAssignment, RoleNegotiation, SignalingEnvelope, actr_relay,
38 session_description::Type as SdpType, signaling_envelope,
39};
40use std::collections::HashMap;
41use std::{
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
46use tokio::task::JoinHandle;
47use tokio_util::sync::CancellationToken;
48use tracing::Instrument;
49#[cfg(feature = "opentelemetry")]
50use tracing_opentelemetry::OpenTelemetrySpanExt;
51use webrtc::data_channel::RTCDataChannel;
52use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
53use webrtc::ice_transport::ice_gathering_state::RTCIceGatheringState;
54use webrtc::peer_connection::{RTCPeerConnection, peer_connection_state::RTCPeerConnectionState};
55use webrtc::track::track_local::TrackLocalWriter;
56
57const ICE_RESTART_MAX_RETRIES: u32 = 5;
58const ICE_RESTART_TIMEOUT: Duration = Duration::from_secs(5);
59const ICE_RESTART_INITIAL_BACKOFF_MS: u64 = 5000;
60const ICE_RESTART_MAX_BACKOFF_MS: u64 = 60000;
61const ICE_RESTART_MAX_TOTAL_DURATION: Duration = Duration::from_secs(60);
62const ICE_GATHERING_TIMEOUT: Duration = Duration::from_secs(10);
63const ROLE_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
64
65const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(10);
67const MAX_FAILED_DURATION: Duration = Duration::from_secs(60); #[derive(Default)]
72struct PeerNegotiationState {
73 role_tx: Option<oneshot::Sender<bool>>,
75 ready_tx: Option<oneshot::Sender<()>>,
77 ready_rx: Option<oneshot::Receiver<()>>,
79}
80
81#[derive(Debug)]
83struct ExponentialBackoff {
84 current_retries: u32,
85 max_retries: Option<u32>,
86 initial_delay: Duration,
87 max_delay: Duration,
88 max_total_duration: Option<Duration>,
90 start_time: Option<Instant>,
92}
93
94impl ExponentialBackoff {
95 pub fn new(initial_delay: Duration, max_delay: Duration, max_retries: Option<u32>) -> Self {
96 Self {
97 current_retries: 0,
98 max_retries,
99 initial_delay,
100 max_delay,
101 max_total_duration: None,
102 start_time: None,
103 }
104 }
105
106 pub fn with_total_duration(
108 initial_delay: Duration,
109 max_delay: Duration,
110 max_retries: Option<u32>,
111 max_total_duration: Duration,
112 ) -> Self {
113 Self {
114 current_retries: 0,
115 max_retries,
116 initial_delay,
117 max_delay,
118 max_total_duration: Some(max_total_duration),
119 start_time: Some(Instant::now()),
120 }
121 }
122
123 fn is_duration_exceeded(&self) -> bool {
125 if let (Some(max_duration), Some(start)) = (self.max_total_duration, self.start_time) {
126 start.elapsed() > max_duration
127 } else {
128 false
129 }
130 }
131}
132
133impl Iterator for ExponentialBackoff {
134 type Item = Duration;
135
136 fn next(&mut self) -> Option<Duration> {
137 if self.max_total_duration.is_some() && self.start_time.is_none() {
139 self.start_time = Some(Instant::now());
140 }
141
142 if self.is_duration_exceeded() {
144 return None;
145 }
146
147 let delay = self.initial_delay;
148
149 if let Some(max_retries) = self.max_retries {
151 self.current_retries += 1;
152 if self.current_retries > max_retries {
153 return None;
154 }
155 }
156
157 self.initial_delay = (self.initial_delay * 2).min(self.max_delay);
158 Some(delay)
159 }
160}
161
162type MessageRx = Arc<Mutex<mpsc::UnboundedReceiver<(Vec<u8>, Bytes, PayloadType)>>>;
164
165struct PeerState {
167 peer_connection: Arc<RTCPeerConnection>,
169
170 webrtc_conn: WebRtcConnection,
172
173 ready_tx: Option<oneshot::Sender<()>>,
175
176 is_offerer: bool,
178
179 ice_restart_inflight: bool,
181
182 ice_restart_attempts: u32,
184
185 restart_task_handle: Option<JoinHandle<()>>,
187
188 last_state_change: std::time::Instant,
190
191 current_state: RTCPeerConnectionState,
193}
194
195pub struct WebRtcCoordinator {
197 local_id: ActrId,
199
200 credential_state: CredentialState,
202
203 signaling_client: Arc<dyn SignalingClient>,
205
206 negotiator: WebRtcNegotiator,
208
209 peers: Arc<RwLock<HashMap<ActrId, PeerState>>>,
211
212 pending_candidates: Arc<RwLock<HashMap<ActrId, Vec<String>>>>,
215
216 message_rx: MessageRx,
220 message_tx: mpsc::UnboundedSender<(Vec<u8>, Bytes, PayloadType)>,
221
222 media_frame_registry: Arc<MediaFrameRegistry>,
224
225 peer_negotiation: Arc<Mutex<HashMap<ActrId, PeerNegotiationState>>>,
228
229 event_broadcaster: ConnectionEventBroadcaster,
231
232 #[cfg(feature = "opentelemetry")]
234 root_context_map: Arc<RwLock<HashMap<ActrId, opentelemetry::Context>>>,
235}
236
237impl WebRtcCoordinator {
238 pub fn new(
240 local_id: ActrId,
241 credential_state: CredentialState,
242 signaling_client: Arc<dyn SignalingClient>,
243 webrtc_config: WebRtcConfig,
244 realm_id: u32,
245 media_frame_registry: Arc<MediaFrameRegistry>,
246 ) -> Self {
247 let (message_tx, message_rx) = mpsc::unbounded_channel();
248 let negotiator = WebRtcNegotiator::new(webrtc_config, realm_id, credential_state.clone());
249
250 Self {
251 local_id,
252 credential_state,
253 signaling_client,
254 negotiator,
255 peers: Arc::new(RwLock::new(HashMap::new())),
256 pending_candidates: Arc::new(RwLock::new(HashMap::new())),
257 message_rx: Arc::new(Mutex::new(message_rx)),
258 message_tx,
259 media_frame_registry,
260 peer_negotiation: Arc::new(Mutex::new(HashMap::new())),
261 event_broadcaster: ConnectionEventBroadcaster::new(),
262 #[cfg(feature = "opentelemetry")]
263 root_context_map: Arc::new(RwLock::new(HashMap::new())),
264 }
265 }
266
267 pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<ConnectionEvent> {
269 self.event_broadcaster.subscribe()
270 }
271
272 pub fn event_sender(&self) -> tokio::sync::broadcast::Sender<ConnectionEvent> {
274 self.event_broadcaster.sender()
275 }
276
277 pub async fn retry_failed_connections(self: &Arc<Self>) {
279 let peers = self.peers.read().await;
280 let mut targets = Vec::new();
282
283 for (peer_id, state) in peers.iter() {
284 match state.current_state {
285 RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => {
286 if !state.ice_restart_inflight {
287 targets.push(peer_id.clone());
288 }
289 }
290 _ => {
291 #[cfg(test)]
292 tracing::debug!(
293 "Actor {:?} is in state {:?}, test restart",
294 peer_id,
295 state.current_state
296 );
297 targets.push(peer_id.clone());
298 }
299 }
300 }
301 drop(peers); for peer_id in targets {
304 tracing::info!("♻️ Auto-retrying failed connection to actor {:?}", peer_id);
305 if let Err(e) = self.restart_ice(&peer_id).await {
306 tracing::error!("❌ Failed to restart ICE for {:?}: {}", peer_id, e);
307 }
308 }
309 }
310
311 pub async fn clear_pending_restarts(&self) {
313 let mut peers = self.peers.write().await;
314 for (peer_id, state) in peers.iter_mut() {
315 if state.ice_restart_inflight {
316 tracing::info!("🛑 Aborting in-flight ICE restart for {:?}", peer_id);
317 if let Some(handle) = state.restart_task_handle.take() {
319 handle.abort();
320 }
321 state.ice_restart_inflight = false;
322 state.ice_restart_attempts = 0;
325 }
326 }
327 }
328
329 fn spawn_internal_event_listener(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
334 let mut event_rx = self.event_broadcaster.subscribe();
335 let coordinator = Arc::downgrade(self);
336
337 tokio::spawn(async move {
338 loop {
339 match event_rx.recv().await {
340 Ok(event) => {
341 if let Some(coord) = coordinator.upgrade() {
342 let peer_id_to_cleanup = match &event {
344 ConnectionEvent::DataChannelClosed {
345 peer_id,
346 payload_type,
347 } => {
348 if coord.peers.read().await.contains_key(peer_id) {
350 tracing::warn!(
351 "⚠️ DataChannel closed for peer {}, payload_type={:?}; triggering coordinator cleanup",
352 peer_id.serial_number,
353 payload_type
354 );
355 Some(peer_id.clone())
356 } else {
357 tracing::debug!(
358 "ℹ️ DataChannel closed for peer {} but already cleaned up",
359 peer_id.serial_number
360 );
361 None
362 }
363 }
364 ConnectionEvent::ConnectionClosed { peer_id } => {
365 if coord.peers.read().await.contains_key(peer_id) {
366 tracing::warn!(
367 "⚠️ Connection closed for peer {}; triggering coordinator cleanup",
368 peer_id.serial_number
369 );
370 Some(peer_id.clone())
371 } else {
372 tracing::debug!(
373 "ℹ️ Connection closed for peer {} but already cleaned up",
374 peer_id.serial_number
375 );
376 None
377 }
378 }
379 ConnectionEvent::StateChanged { peer_id, state } => {
380 use crate::transport::connection_event::ConnectionState;
381 if matches!(state, ConnectionState::Closed) {
382 if coord.peers.read().await.contains_key(peer_id) {
383 tracing::warn!(
384 "⚠️ PeerConnection state changed to Closed for peer {}; triggering coordinator cleanup",
385 peer_id.serial_number
386 );
387 Some(peer_id.clone())
388 } else {
389 tracing::debug!(
390 "ℹ️ PeerConnection Closed for peer {} but already cleaned up",
391 peer_id.serial_number
392 );
393 None
394 }
395 } else {
396 None
397 }
398 }
399 _ => None,
400 };
401
402 if let Some(peer_id) = peer_id_to_cleanup {
404 coord.cleanup_cancelled_connection(&peer_id).await;
405 }
406 } else {
407 tracing::debug!(
409 "🔌 WebRtcCoordinator internal event listener stopping (coordinator dropped)"
410 );
411 break;
412 }
413 }
414 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
415 tracing::warn!(
416 "⚠️ WebRtcCoordinator internal event listener lagged by {} events",
417 n
418 );
419 }
420 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
421 tracing::debug!(
422 "🔌 WebRtcCoordinator internal event listener stopped (channel closed)"
423 );
424 break;
425 }
426 }
427 }
428 })
429 }
430
431 fn spawn_health_check_task(self: &Arc<Self>) -> JoinHandle<()> {
439 let coordinator = Arc::downgrade(self);
440
441 tokio::spawn(async move {
442 let mut interval = tokio::time::interval(HEALTH_CHECK_INTERVAL);
443 interval.tick().await; loop {
446 interval.tick().await;
447
448 if let Some(coord) = coordinator.upgrade() {
449 coord.check_and_cleanup_stale_connections().await;
450 } else {
451 tracing::debug!("🔌 Health check task stopping (coordinator dropped)");
452 break;
453 }
454 }
455
456 tracing::info!("🛑 Health check task exited");
457 })
458 }
459
460 async fn check_and_cleanup_stale_connections(&self) {
468 let peers_to_cleanup: Vec<(ActrId, String)> = {
469 let peers = self.peers.read().await;
470 let now = std::time::Instant::now();
471
472 peers
473 .iter()
474 .filter_map(|(peer_id, state)| {
475 let current_state = state.peer_connection.connection_state();
477 let duration_since_change = now.duration_since(state.last_state_change);
478
479 if matches!(
482 current_state,
483 RTCPeerConnectionState::Failed | RTCPeerConnectionState::Closed
484 ) && duration_since_change > MAX_FAILED_DURATION
485 {
486 let reason = format!(
487 "{:?} for {}s",
488 current_state,
489 duration_since_change.as_secs()
490 );
491
492 tracing::warn!(
493 "🧹 Marking peer {} for cleanup: {}",
494 peer_id.serial_number,
495 reason
496 );
497
498 Some((peer_id.clone(), reason))
499 } else {
500 None
501 }
502 })
503 .collect()
504 };
505
506 if !peers_to_cleanup.is_empty() {
508 tracing::info!(
509 "🧹 Health check: cleaning up {} stale connection(s)",
510 peers_to_cleanup.len()
511 );
512
513 for (peer_id, reason) in peers_to_cleanup {
514 tracing::info!(
515 "🧹 Cleaning up stale connection for peer {}: {}",
516 peer_id.serial_number,
517 reason
518 );
519 self.cleanup_cancelled_connection(&peer_id).await;
520 }
521 }
522 }
523
524 pub async fn start(self: Arc<Self>) -> RuntimeResult<()> {
529 tracing::info!("🚀 WebRtcCoordinator starting signaling loop");
530
531 self.spawn_internal_event_listener();
533
534 self.spawn_health_check_task();
536
537 let coordinator = self.clone();
538 tokio::spawn(async move {
539 loop {
540 match coordinator.signaling_client.receive_envelope().await {
542 Ok(Some(envelope)) => {
543 #[cfg(feature = "opentelemetry")]
544 let (span, remote_ctx) = {
545 let remote_ctx = trace::extract_trace_context(&envelope);
546 let span = tracing::info_span!(
547 "signaling.handle_envelope",
548 envelope_id = envelope.envelope_id,
549 reply_for = ?envelope.reply_for
550 );
551 span.set_parent(remote_ctx.clone());
552 (span, remote_ctx)
553 };
554
555 let handle_envelope_fut = coordinator.handle_envelope(
556 envelope,
557 #[cfg(feature = "opentelemetry")]
558 remote_ctx,
559 );
560 #[cfg(feature = "opentelemetry")]
561 let handle_envelope_fut = handle_envelope_fut.instrument(span);
562 handle_envelope_fut.await;
563 }
564 Ok(None) => {
565 tracing::info!(
566 "🔌 SignalingClient connection closed, exiting signaling loop"
567 );
568 break;
569 }
570 Err(e) => {
571 tracing::error!("❌ Signaling receive error: {}", e);
572 }
574 }
575 }
576
577 tracing::info!("🛑 WebRtcCoordinator signaling loop exited");
578 });
579
580 Ok(())
581 }
582
583 async fn handle_envelope(
585 self: &Arc<Self>,
586 envelope: SignalingEnvelope,
587 #[cfg(feature = "opentelemetry")] remote_ctx: opentelemetry::Context,
588 ) {
589 match envelope.flow {
591 Some(signaling_envelope::Flow::ActrRelay(relay)) => {
592 let source = relay.source;
593 let target = relay.target;
594 #[cfg(feature = "opentelemetry")]
595 self.root_context_map
596 .write()
597 .await
598 .insert(source.clone(), remote_ctx);
599 match relay.payload {
600 Some(actr_relay::Payload::SessionDescription(sd)) => match sd.r#type() {
601 SdpType::Offer => {
602 tracing::info!("📥 Received Offer from {:?}", source.serial_number);
603 if let Err(e) = self.handle_offer(&source, sd.sdp).await {
604 tracing::error!("❌ Failed to handle Offer: {}", e);
605 }
606 }
607 SdpType::Answer => {
608 tracing::info!("📥 Received Answer from {:?}", source.serial_number);
609 if let Err(e) = self.handle_answer(&source, sd.sdp).await {
610 tracing::error!("❌ Failed to handle Answer: {}", e);
611 }
612 }
613 SdpType::RenegotiationOffer => {
614 tracing::warn!("⚠️ Received RenegotiationOffer, not supported yet");
615 }
616 SdpType::IceRestartOffer => {
617 tracing::info!(
618 "♻️ Received ICE Restart Offer from {:?}",
619 source.serial_number
620 );
621 if let Err(e) = self.handle_ice_restart_offer(&source, sd.sdp).await {
622 tracing::error!("❌ Failed to handle ICE Restart Offer: {}", e);
623 }
624 }
625 },
626 Some(actr_relay::Payload::RoleAssignment(assign)) => {
627 tracing::info!(
628 "🎭 Received RoleAssignment from {:?}, is_offerer={} (source peer)",
629 source.serial_number,
630 assign.is_offerer,
631 );
632 let peer = if source == self.local_id {
633 target.clone()
634 } else {
635 source.clone()
636 };
637 self.handle_role_assignment(assign.clone(), peer).await;
638 }
639 Some(actr_relay::Payload::IceCandidate(ice)) => {
640 tracing::debug!(
641 "📥 Received ICE Candidate from {:?}",
642 source.serial_number
643 );
644 if let Err(e) = self.handle_ice_candidate(&source, ice.candidate).await {
645 tracing::error!("❌ Failed to handle ICE Candidate: {}", e);
646 }
647 }
648 Some(actr_relay::Payload::RoleNegotiation(_)) => {
649 tracing::trace!(
650 "📥 Received RoleNegotiation payload; ignored by WebRtcCoordinator"
651 );
652 }
653 None => {
654 tracing::warn!("⚠️ ActrRelay missing payload");
655 }
656 }
657 }
658 Some(other_flow) => {
659 tracing::warn!("⚠️ Ignoring non-ActrRelay flow: {:?}", other_flow);
660 }
661 None => {
662 tracing::warn!("⚠️ SignalingEnvelope missing flow");
663 }
664 }
665 }
666
667 pub async fn close_all_peers(&self) -> RuntimeResult<()> {
673 tracing::info!("🔻 Closing all WebRTC peer connections");
674
675 let peers_snapshot: Vec<Arc<RTCPeerConnection>> = {
677 let mut peers = self.peers.write().await;
678 let conns: Vec<Arc<RTCPeerConnection>> =
679 peers.values().map(|p| p.peer_connection.clone()).collect();
680 peers.clear();
681 conns
682 };
683
684 {
686 let mut pending = self.pending_candidates.write().await;
687 pending.clear();
688 }
689
690 #[cfg(feature = "opentelemetry")]
692 self.root_context_map.write().await.clear();
693
694 for pc in peers_snapshot {
696 tracing::info!("🔻 Closing PeerConnection");
697
698 if let Err(e) = pc.close().await {
699 tracing::warn!("⚠️ Failed to close PeerConnection: {}", e);
700 } else {
701 tracing::info!("✅ PeerConnection closed");
702 }
703 }
704
705 Ok(())
706 }
707
708 #[cfg_attr(
710 feature = "opentelemetry",
711 tracing::instrument(level = "info", skip_all, fields(target = %target.to_string_repr()))
712 )]
713 async fn send_actr_relay(
714 &self,
715 target: &ActrId,
716 payload: actr_relay::Payload,
717 ) -> RuntimeResult<()> {
718 let credential = self.credential_state.credential().await;
719 let relay = ActrRelay {
720 source: self.local_id.clone(),
721 credential,
722 target: target.clone(),
723 payload: Some(payload),
724 };
725
726 let flow = signaling_envelope::Flow::ActrRelay(relay);
727
728 let envelope = SignalingEnvelope {
729 envelope_version: 1,
730 envelope_id: uuid::Uuid::new_v4().to_string(),
731 reply_for: None,
732 timestamp: prost_types::Timestamp {
733 seconds: chrono::Utc::now().timestamp(),
734 nanos: 0,
735 },
736 traceparent: None,
737 tracestate: None,
738 flow: Some(flow),
739 };
740
741 self.signaling_client
742 .send_envelope(envelope)
743 .await
744 .map_err(|e| RuntimeError::Unavailable {
745 message: format!("Signaling server unavailable: {e}"),
746 target: None,
747 })?;
748
749 Ok(())
750 }
751
752 #[cfg_attr(
756 feature = "opentelemetry",
757 tracing::instrument(level = "info", skip_all, fields(target_id = %target.to_string_repr()))
758 )]
759 pub async fn initiate_connection(
760 self: &Arc<Self>,
761 target: &ActrId,
762 ) -> RuntimeResult<oneshot::Receiver<()>> {
763 tracing::info!(
764 "🚀 Initiating P2P connection to {}",
765 target.to_string_repr()
766 );
767
768 let role_result =
770 tokio::time::timeout(Duration::from_secs(15), self.negotiate_role(target)).await;
771
772 let is_offerer = match role_result {
773 Ok(Ok(v)) => v,
774 Ok(Err(e)) => {
775 self.peer_negotiation.lock().await.remove(target);
776 return Err(e);
777 }
778 Err(_) => {
779 self.peer_negotiation.lock().await.remove(target);
780 return Err(RuntimeError::DeadlineExceeded {
781 message: "Role negotiation timeout".to_string(),
782 timeout_ms: 5000,
783 });
784 }
785 };
786 tracing::debug!(
787 "Role negotiation decided we are {:?} for {}",
788 if is_offerer { "offerer" } else { "answerer" },
789 target.serial_number
790 );
791 if !is_offerer {
792 let (tx, rx) = oneshot::channel();
793 self.peer_negotiation
794 .lock()
795 .await
796 .entry(target.clone())
797 .or_default()
798 .ready_tx = Some(tx);
799 return Ok(rx);
800 }
801
802 self.start_offer_connection(target, true).await
803 }
804
805 #[cfg_attr(
808 feature = "opentelemetry",
809 tracing::instrument(skip_all, fields(target_id = ?target.to_string_repr()))
810 )]
811 async fn start_offer_connection(
812 self: &Arc<Self>,
813 target: &ActrId,
814 skip_negotiation: bool,
815 ) -> RuntimeResult<oneshot::Receiver<()>> {
816 if !skip_negotiation {
817 let role_result =
818 tokio::time::timeout(Duration::from_secs(15), self.negotiate_role(target)).await;
819
820 let role_result = match role_result {
821 Ok(Ok(v)) => v,
822 Ok(Err(e)) => {
823 self.peer_negotiation.lock().await.remove(target);
824 return Err(e);
825 }
826 Err(_) => {
827 self.peer_negotiation.lock().await.remove(target);
828 return Err(RuntimeError::DeadlineExceeded {
829 message: "Role negotiation timeout".to_string(),
830 timeout_ms: 5000,
831 });
832 }
833 };
834
835 if !role_result {
836 tracing::info!(
837 "🎭 Role negotiation decided we are answerer for {}, waiting for offer",
838 target.serial_number
839 );
840 let (tx, rx) = oneshot::channel();
841 self.peer_negotiation
842 .lock()
843 .await
844 .entry(target.clone())
845 .or_default()
846 .ready_tx = Some(tx);
847 return Ok(rx);
848 }
849 }
850
851 tracing::info!("🔄 Starting connection to serial={}", target.serial_number);
853
854 match self.do_single_offer_connection(target).await {
855 Ok((ready_rx, webrtc_conn)) => {
856 match tokio::time::timeout(INITIAL_CONNECTION_TIMEOUT, ready_rx).await {
858 Ok(Ok(())) => {
859 tracing::info!(
860 "✅ Connection established to serial={}",
861 target.serial_number
862 );
863 let (tx, rx) = oneshot::channel();
865 let _ = tx.send(());
866 return Ok(rx);
867 }
868 Ok(Err(_)) => {
869 tracing::warn!(
870 "⚠️ Connection failed (channel closed) for serial={}",
871 target.serial_number
872 );
873 self.cleanup_failed_connection(target, webrtc_conn).await;
875 return Err(RuntimeError::Other(anyhow::anyhow!(
876 "Connection ready channel closed"
877 )));
878 }
879 Err(_) => {
880 tracing::warn!(
881 "⚠️ Connection timed out for serial={}",
882 target.serial_number
883 );
884 self.cleanup_failed_connection(target, webrtc_conn).await;
886 return Err(RuntimeError::DeadlineExceeded {
887 message: "Initial connection timeout".to_string(),
888 timeout_ms: INITIAL_CONNECTION_TIMEOUT.as_millis() as u64,
889 });
890 }
891 }
892 }
893 Err(e) => {
894 tracing::warn!(
895 "⚠️ Connection failed for serial={}: {}",
896 target.serial_number,
897 e
898 );
899 return Err(e);
900 }
901 }
902 }
903
904 async fn cleanup_failed_connection(&self, target: &ActrId, webrtc_conn: WebRtcConnection) {
909 let state_to_close = {
911 let mut peers = self.peers.write().await;
912 peers.remove(target)
913 }; if let Some(state) = state_to_close {
916 if let Err(e) = state.peer_connection.close().await {
917 tracing::warn!(
918 "⚠️ Failed to close peer_connection during cleanup for {}: {}",
919 target.serial_number,
920 e
921 );
922 }
923 }
924
925 if let Err(e) = webrtc_conn.close().await {
927 tracing::warn!(
928 "⚠️ Failed to close WebRtcConnection during cleanup for {}: {}",
929 target.serial_number,
930 e
931 );
932 }
933
934 {
936 let mut pending = self.pending_candidates.write().await;
937 pending.remove(target);
938 }
939
940 tracing::debug!(
941 "🧹 Cleaned up failed connection attempt for serial={}",
942 target.serial_number
943 );
944 }
945
946 async fn cleanup_cancelled_connection(&self, target: &ActrId) {
953 tracing::debug!(
954 "🧹 Starting cleanup for cancelled connection serial={}",
955 target.serial_number
956 );
957
958 let state_to_close = {
961 let mut peers = self.peers.write().await;
962 peers.remove(target)
963 }; if let Some(state) = state_to_close {
967 if let Err(e) = state.peer_connection.close().await {
968 tracing::warn!(
969 "⚠️ Failed to close peer_connection during cancel cleanup for {}: {}",
970 target.serial_number,
971 e
972 );
973 }
974 if let Err(e) = state.webrtc_conn.close().await {
975 tracing::warn!(
976 "⚠️ Failed to close webrtc_conn during cancel cleanup for {}: {}",
977 target.serial_number,
978 e
979 );
980 }
981 }
982
983 self.pending_candidates.write().await.remove(target);
985
986 if self.peer_negotiation.lock().await.remove(target).is_some() {
988 tracing::debug!(
989 "🧹 Clearing negotiation state for serial={}",
990 target.serial_number
991 );
992 }
993
994 if let Some(peer_state) = self.peers.read().await.get(target) {
997 if let Some(ref handle) = peer_state.restart_task_handle {
998 handle.abort();
999 tracing::debug!(
1000 "🧹 Aborted restart task for serial={}",
1001 target.serial_number
1002 );
1003 }
1004 }
1005
1006 tracing::debug!(
1007 "🧹 Cleaned up cancelled connection for serial={}",
1008 target.serial_number
1009 );
1010 }
1011
1012 async fn do_single_offer_connection(
1014 self: &Arc<Self>,
1015 target: &ActrId,
1016 ) -> RuntimeResult<(oneshot::Receiver<()>, WebRtcConnection)> {
1017 let peer_connection = self.negotiator.create_peer_connection().await?;
1018 let peer_connection_arc = Arc::new(peer_connection);
1019
1020 let webrtc_conn = WebRtcConnection::new(
1023 target.clone(),
1024 Arc::clone(&peer_connection_arc),
1025 self.event_broadcaster.sender(),
1026 );
1027 self.install_restart_handler(
1028 webrtc_conn.clone(),
1029 Arc::clone(&peer_connection_arc),
1030 target.clone(),
1031 );
1032
1033 let (ready_tx, ready_rx) = oneshot::channel();
1036 {
1037 let mut peers = self.peers.write().await;
1038 peers.insert(
1039 target.clone(),
1040 PeerState {
1041 peer_connection: peer_connection_arc.clone(),
1042 webrtc_conn: webrtc_conn.clone(),
1043 ready_tx: Some(ready_tx),
1044 is_offerer: true,
1045 ice_restart_inflight: false,
1046 ice_restart_attempts: 0,
1047 restart_task_handle: None,
1048 last_state_change: std::time::Instant::now(),
1049 current_state: RTCPeerConnectionState::New,
1050 },
1051 );
1052 tracing::debug!(
1053 "🔒 Inserted placeholder peer state for {} (offerer)",
1054 target.to_string_repr()
1055 );
1056 } let _reliable_lane = webrtc_conn
1060 .get_lane(actr_protocol::PayloadType::RpcReliable)
1061 .await?;
1062 tracing::debug!("Pre-created Reliable DataChannel for ICE gathering");
1063
1064 let _video_track = webrtc_conn
1066 .add_media_track("video-track-1".to_string(), "VP8", "video")
1067 .await?;
1068 tracing::debug!("Pre-created video MediaTrack: video-track-1");
1069
1070 let media_registry = Arc::clone(&self.media_frame_registry);
1072 let sender_id = target.clone();
1073 peer_connection_arc.on_track(Box::new(move |track, _receiver, _transceiver| {
1074 let media_registry = Arc::clone(&media_registry);
1075 let sender_id = sender_id.clone();
1076 Box::pin(async move {
1077 let track_id = track.id();
1078 tracing::info!(
1079 "📹 Received MediaTrack: track_id={}, sender={}",
1080 track_id,
1081 sender_id.to_string_repr()
1082 );
1083
1084 tokio::spawn(async move {
1085 loop {
1086 match track.read_rtp().await {
1087 Ok((rtp_packet, _attributes)) => {
1088 let payload_data = rtp_packet.payload.clone();
1089 let timestamp = rtp_packet.header.timestamp;
1090 let codec = "unknown".to_string();
1091 let sample = actr_framework::MediaSample {
1092 data: payload_data,
1093 timestamp,
1094 codec,
1095 media_type: actr_framework::MediaType::Video,
1096 };
1097 media_registry
1098 .dispatch(&track_id, sample, sender_id.clone())
1099 .await;
1100 }
1101 Err(e) => {
1102 tracing::error!(
1103 "❌ Failed to read RTP from track {}: {}",
1104 track_id,
1105 e
1106 );
1107 break;
1108 }
1109 }
1110 }
1111 tracing::info!("🛑 MediaTrack reader task exited for track_id={}", track_id);
1112 });
1113 })
1114 }));
1115
1116 let coordinator = Arc::downgrade(self);
1118 let target_id = target.clone();
1119 #[cfg(feature = "opentelemetry")]
1120 let root_context_map = self.root_context_map.clone();
1121 peer_connection_arc.on_ice_candidate(Box::new(
1122 move |candidate: Option<RTCIceCandidate>| {
1123 let coordinator = coordinator.clone();
1124 let target_id = target_id.clone();
1125 #[cfg(feature = "opentelemetry")]
1126 let root_context_map = root_context_map.clone();
1127 Box::pin(async move {
1128 if let Some(cand) = candidate {
1129 if let Some(coord) = coordinator.upgrade() {
1130 let candidate_json = match cand.to_json() {
1131 Ok(json) => json.candidate,
1132 Err(e) => {
1133 tracing::error!("❌ ICE Candidate serialization failed: {}", e);
1134 return;
1135 }
1136 };
1137
1138 let ice_candidate = actr_protocol::IceCandidate {
1139 candidate: candidate_json,
1140 sdp_mid: None,
1141 sdp_mline_index: None,
1142 username_fragment: None,
1143 };
1144
1145 let payload = actr_relay::Payload::IceCandidate(ice_candidate);
1146
1147 #[cfg(feature = "opentelemetry")]
1149 let span = {
1150 let span = tracing::info_span!(
1151 "send_ice_candidate",
1152 target_id = %target_id.to_string_repr()
1153 );
1154 if let Some(ctx) =
1155 root_context_map.read().await.get(&target_id).cloned()
1156 {
1157 span.set_parent(ctx);
1158 } else {
1159 tracing::warn!(
1160 "⚠️ No root context found for target_id={}",
1161 target_id.to_string_repr()
1162 );
1163 }
1164 span
1165 };
1166 let send_actr_relay_fut = coord.send_actr_relay(&target_id, payload);
1167 #[cfg(feature = "opentelemetry")]
1168 let send_actr_relay_fut = send_actr_relay_fut.instrument(span);
1169 if let Err(e) = send_actr_relay_fut.await {
1170 tracing::error!("❌ Failed to send ICE Candidate: {}", e);
1171 } else {
1172 tracing::debug!("✅ Sent ICE Candidate");
1173 }
1174 }
1175 } else {
1176 tracing::debug!("❌ ICE Candidate is None");
1177 }
1178 })
1179 },
1180 ));
1181
1182 let offer_sdp = self.negotiator.create_offer(&peer_connection_arc).await?;
1184
1185 let session_desc = actr_protocol::SessionDescription {
1187 r#type: SdpType::Offer as i32,
1188 sdp: offer_sdp,
1189 };
1190 let payload = actr_relay::Payload::SessionDescription(session_desc);
1191 self.send_actr_relay(target, payload).await?;
1192
1193 tracing::info!("✅ Sent Offer to {}", target.to_string_repr());
1194
1195 self.start_peer_receive_loop(target.clone(), webrtc_conn.clone())
1197 .await;
1198
1199 Ok((ready_rx, webrtc_conn))
1200 }
1201
1202 #[cfg_attr(
1207 feature = "opentelemetry",
1208 tracing::instrument(level = "info", skip_all, fields(remote_id = %from.to_string_repr()))
1209 )]
1210 async fn handle_offer(self: &Arc<Self>, from: &ActrId, offer_sdp: String) -> RuntimeResult<()> {
1211 let existing_peer = {
1213 let peers = self.peers.read().await;
1214 peers.contains_key(from)
1215 };
1216
1217 if existing_peer {
1218 tracing::info!(
1219 "🔄 Existing connection found for serial={}, preparing for new Offer",
1220 from.serial_number
1221 );
1222
1223 self.cleanup_cancelled_connection(from).await;
1225 }
1226 tracing::info!("📥 Handling Offer from serial={}", from.serial_number);
1229
1230 let peer_connection = self.negotiator.create_peer_connection().await?;
1232 let peer_connection_arc = Arc::new(peer_connection);
1233
1234 let webrtc_conn = WebRtcConnection::new(
1236 from.clone(),
1237 Arc::clone(&peer_connection_arc),
1238 self.event_broadcaster.sender(),
1239 );
1240
1241 {
1246 let mut peers = self.peers.write().await;
1247 peers.insert(
1248 from.clone(),
1249 PeerState {
1250 peer_connection: peer_connection_arc.clone(),
1251 webrtc_conn: webrtc_conn.clone(),
1252 ready_tx: None,
1253 is_offerer: false,
1254 ice_restart_inflight: false,
1255 ice_restart_attempts: 0,
1256 restart_task_handle: None,
1257 last_state_change: std::time::Instant::now(),
1258 current_state: RTCPeerConnectionState::New,
1259 },
1260 );
1261 tracing::debug!(
1262 "🔒 Inserted placeholder peer state for {} (answerer)",
1263 from.to_string_repr()
1264 );
1265 } let webrtc_conn_for_state = webrtc_conn.clone();
1272 let coord_weak_for_state = Arc::downgrade(self);
1273 let from_id_for_state = from.clone();
1274 peer_connection_arc.on_peer_connection_state_change(Box::new(
1275 move |state: RTCPeerConnectionState| {
1276 let webrtc_conn = webrtc_conn_for_state.clone();
1277 let coord_weak = coord_weak_for_state.clone();
1278 let peer_id = from_id_for_state.clone();
1279 Box::pin(async move {
1280 webrtc_conn.handle_state_change(state).await;
1282
1283 if let Some(coord) = coord_weak.upgrade() {
1285 let mut peers = coord.peers.write().await;
1286 if let Some(peer_state) = peers.get_mut(&peer_id) {
1287 peer_state.current_state = state;
1288 peer_state.last_state_change = std::time::Instant::now();
1289 }
1290 drop(peers); }
1292 })
1293 },
1294 ));
1295
1296 let conn_for_data_channel = webrtc_conn.clone();
1298
1299 let from_id_for_data_channel = from.clone();
1300 let coord_weak_for_state = Arc::downgrade(self);
1301 let message_tx = self.message_tx.clone();
1302 peer_connection_arc.on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
1303 let conn = conn_for_data_channel.clone();
1304 let coord_weak = coord_weak_for_state.clone();
1305 let peer_id = from_id_for_data_channel.clone();
1306 let message_tx = message_tx.clone();
1307 Box::pin(async move {
1308 let channel_id = dc.id();
1309 let label = dc.label();
1310 let dc_for_registration = Arc::clone(&dc);
1311
1312 let payload_type = PayloadType::from_str_name(&label);
1313
1314 if let Some(coord) = coord_weak.upgrade() {
1315 let ready_tx = {
1316 let mut neg = coord.peer_negotiation.lock().await;
1317 neg.get_mut(&peer_id).and_then(|s| s.ready_tx.take())
1318 };
1319 if let Some(tx) = ready_tx {
1320 tracing::info!(
1321 "✅ [Answerer] Connection ready, sending notification for {}",
1322 peer_id.serial_number
1323 );
1324 let _ = tx.send(());
1325 }
1326 }
1327
1328 match payload_type {
1329 Some(pt) => {
1330 if let Err(e) = conn
1331 .register_received_data_channel(dc_for_registration, pt, message_tx)
1332 .await
1333 {
1334 tracing::warn!(
1335 "❌ Failed to register received DataChannel label={} id={}: {}",
1336 label,
1337 channel_id,
1338 e
1339 );
1340 } else {
1341 tracing::debug!(
1342 "📨 Registered DataChannel from offerer label={} id={}",
1343 label,
1344 channel_id
1345 );
1346 }
1347 }
1348 None => {
1349 tracing::warn!(
1350 "❓ Ignoring DataChannel with unmapped id={} label={}",
1351 channel_id,
1352 label
1353 );
1354 }
1355 }
1356 })
1357 }));
1358
1359 let _video_track = webrtc_conn
1362 .add_media_track("video-track-1".to_string(), "VP8", "video")
1363 .await?;
1364 tracing::debug!("Pre-created video MediaTrack: video-track-1 (answerer)");
1365
1366 let media_registry = Arc::clone(&self.media_frame_registry);
1368 let sender_id = from.clone();
1369 peer_connection_arc.on_track(Box::new(move |track, _receiver, _transceiver| {
1370 let media_registry = Arc::clone(&media_registry);
1371 let sender_id = sender_id.clone();
1372 Box::pin(async move {
1373 let track_id = track.id();
1374 tracing::info!(
1375 "📹 Received MediaTrack: track_id={}, sender={}",
1376 track_id,
1377 sender_id.to_string_repr()
1378 );
1379
1380 tokio::spawn(async move {
1382 loop {
1383 match track.read_rtp().await {
1385 Ok((rtp_packet, _attributes)) => {
1386 let payload_data = rtp_packet.payload.clone();
1388 let timestamp = rtp_packet.header.timestamp;
1389
1390 let codec = "unknown".to_string();
1392
1393 let sample = actr_framework::MediaSample {
1395 data: payload_data,
1396 timestamp,
1397 codec,
1398 media_type: actr_framework::MediaType::Video, };
1400
1401 media_registry
1403 .dispatch(&track_id, sample, sender_id.clone())
1404 .await;
1405 }
1406 Err(e) => {
1407 tracing::error!(
1408 "❌ Failed to read RTP from track {}: {}",
1409 track_id,
1410 e
1411 );
1412 break;
1413 }
1414 }
1415 }
1416 tracing::info!("🛑 MediaTrack reader task exited for track_id={}", track_id);
1417 });
1418 })
1419 }));
1420
1421 let coordinator = Arc::downgrade(self);
1423 let target_id = from.clone();
1424 #[cfg(feature = "opentelemetry")]
1425 let root_context_map = self.root_context_map.clone();
1426 peer_connection_arc.on_ice_candidate(Box::new(
1427 move |candidate: Option<RTCIceCandidate>| {
1428 let coordinator = coordinator.clone();
1429 let target_id = target_id.clone();
1430 #[cfg(feature = "opentelemetry")]
1431 let root_context_map = root_context_map.clone();
1432 Box::pin(async move {
1433 if let Some(cand) = candidate {
1434 if let Some(coord) = coordinator.upgrade() {
1435 let candidate_json = match cand.to_json() {
1437 Ok(json) => json.candidate,
1438 Err(e) => {
1439 tracing::error!("❌ ICE Candidate serialization failed: {}", e);
1440 return;
1441 }
1442 };
1443
1444 let ice_candidate = actr_protocol::IceCandidate {
1445 candidate: candidate_json,
1446 sdp_mid: None,
1447 sdp_mline_index: None,
1448 username_fragment: None,
1449 };
1450
1451 let payload = actr_relay::Payload::IceCandidate(ice_candidate);
1452
1453 #[cfg(feature = "opentelemetry")]
1455 let span = {
1456 let span = tracing::info_span!(
1457 "send_ice_candidate",
1458 target_id = %target_id.to_string_repr()
1459 );
1460 if let Some(ctx) =
1461 root_context_map.read().await.get(&target_id).cloned()
1462 {
1463 span.set_parent(ctx);
1464 } else {
1465 tracing::warn!(
1466 "⚠️ No root context found for target_id={}",
1467 target_id.to_string_repr()
1468 );
1469 }
1470 span
1471 };
1472 let send_actr_relay_fut = coord.send_actr_relay(&target_id, payload);
1473 #[cfg(feature = "opentelemetry")]
1474 let send_actr_relay_fut = send_actr_relay_fut.instrument(span);
1475 if let Err(e) = send_actr_relay_fut.await {
1476 tracing::error!("❌ Failed to send ICE Candidate: {}", e);
1477 }
1478 tracing::debug!(
1479 "🔄 Handle offer Sent ICE Candidate to serial={}",
1480 target_id.serial_number
1481 );
1482 }
1483 }
1484 })
1485 },
1486 ));
1487
1488 let answer_sdp = self
1490 .negotiator
1491 .create_answer(&peer_connection_arc, offer_sdp)
1492 .await?;
1493
1494 let session_desc = actr_protocol::SessionDescription {
1496 r#type: SdpType::Answer as i32,
1497 sdp: answer_sdp,
1498 };
1499 let payload = actr_relay::Payload::SessionDescription(session_desc);
1500 self.send_actr_relay(from, payload).await?;
1501
1502 tracing::info!("✅ Sent Answer to {}", from.to_string_repr());
1503
1504 self.flush_pending_candidates(from, &peer_connection_arc)
1506 .await?;
1507
1508 Ok(())
1512 }
1513
1514 #[cfg_attr(
1518 feature = "opentelemetry",
1519 tracing::instrument(
1520 level = "info",
1521 skip_all,
1522 fields(
1523 remote.id = %from.to_string_repr(),
1524 answer_len = answer_sdp.len()
1525 )
1526 )
1527 )]
1528 async fn handle_answer(
1529 self: &Arc<Self>,
1530 from: &ActrId,
1531 answer_sdp: String,
1532 ) -> RuntimeResult<()> {
1533 let (peer_connection, ready_tx, is_renegotiation) = {
1535 let mut peers = self.peers.write().await;
1536 tracing::info!(
1537 "🔍 [LOOKUP] Searching for: id={}, total peers={}",
1538 from.to_string_repr(),
1539 peers.len()
1540 );
1541 for (k, _) in peers.iter() {
1542 tracing::info!(" 📌 [LOOKUP] Stored: id={}", k.to_string_repr());
1543 }
1544 let state = peers.get_mut(from).ok_or_else(|| {
1545 RuntimeError::Other(anyhow::anyhow!("Peer not found: {}", from.to_string_repr()))
1546 })?;
1547
1548 let pc = state.peer_connection.clone();
1549 let tx = state.ready_tx.take();
1550 let is_reneg = tx.is_none(); (pc, tx, is_reneg)
1552 };
1553
1554 if is_renegotiation {
1555 tracing::info!(
1556 "🔄 Handling renegotiation Answer from {}",
1557 from.to_string_repr()
1558 );
1559 } else {
1560 tracing::info!("📥 Handling initial Answer from {}", from.to_string_repr());
1561 }
1562
1563 self.negotiator
1565 .handle_answer(&peer_connection, answer_sdp)
1566 .await?;
1567
1568 self.flush_pending_candidates(from, &peer_connection)
1570 .await?;
1571
1572 tracing::info!(
1573 "✅ WebRTC connection negotiation completed: {}",
1574 from.to_string_repr()
1575 );
1576
1577 let pc_clone = peer_connection.clone();
1579 let peers = Arc::clone(&self.peers);
1580 let from_id = from.clone();
1581 tokio::spawn(async move {
1582 let start = tokio::time::Instant::now();
1583 loop {
1584 let state = pc_clone.connection_state();
1585 if state == webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState::Connected {
1586 tracing::info!("✅ PeerConnection fully connected");
1587 let mut peers_guard = peers.write().await;
1589 if let Some(s) = peers_guard.get_mut(&from_id) {
1590 s.ice_restart_inflight = false;
1591 s.ice_restart_attempts = 0;
1592 }
1593 break;
1594 }
1595 if state == webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState::Failed {
1596 tracing::error!("❌ PeerConnection failed");
1597 return;
1598 }
1599 if start.elapsed() > std::time::Duration::from_secs(5) {
1600 tracing::warn!("⚠️ PeerConnection connection timeout (5s)");
1601 break;
1602 }
1603 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1604 }
1605
1606 if let Some(tx) = ready_tx {
1608 let _ = tx.send(());
1609 }
1610 });
1611
1612 Ok(())
1613 }
1614
1615 async fn flush_pending_candidates(
1619 &self,
1620 peer_id: &ActrId,
1621 peer_connection: &RTCPeerConnection,
1622 ) -> RuntimeResult<()> {
1623 let candidates = {
1625 let mut pending = self.pending_candidates.write().await;
1626 pending.remove(peer_id)
1627 };
1628
1629 if let Some(candidates) = candidates {
1630 tracing::debug!(
1631 "🔄 Flushing {} buffered ICE candidates for {:?}",
1632 candidates.len(),
1633 peer_id
1634 );
1635
1636 for candidate in candidates {
1637 if let Err(e) = self
1638 .negotiator
1639 .add_ice_candidate(peer_connection, candidate)
1640 .await
1641 {
1642 tracing::warn!("⚠️ Failed to add buffered ICE candidate: {}", e);
1643 }
1644 }
1645 }
1646
1647 Ok(())
1648 }
1649
1650 #[cfg_attr(
1652 feature = "opentelemetry",
1653 tracing::instrument(
1654 level = "trace",
1655 skip_all,
1656 fields(
1657 remote.id = %from.to_string_repr(),
1658 candidate_len = candidate.len()
1659 )
1660 )
1661 )]
1662 async fn handle_ice_candidate(
1663 self: &Arc<Self>,
1664 from: &ActrId,
1665 candidate: String,
1666 ) -> RuntimeResult<()> {
1667 tracing::trace!("📥 Received ICE Candidate from {}", from.to_string_repr());
1668
1669 let peer_opt = {
1678 let peers = self.peers.read().await;
1679 peers.get(from).map(|state| state.peer_connection.clone())
1680 };
1681
1682 match peer_opt {
1683 Some(peer_connection) => {
1684 if peer_connection.remote_description().await.is_some() {
1686 self.negotiator
1688 .add_ice_candidate(&peer_connection, candidate)
1689 .await?;
1690 tracing::trace!("✅ Added ICE Candidate from {}", from.to_string_repr());
1691 } else {
1692 self.pending_candidates
1694 .write()
1695 .await
1696 .entry(from.clone())
1697 .or_insert_with(Vec::new)
1698 .push(candidate);
1699 tracing::debug!(
1700 "🔖 Buffered ICE candidate from {:?} (remote description not yet set)",
1701 from
1702 );
1703 }
1704 }
1705 None => {
1706 self.pending_candidates
1708 .write()
1709 .await
1710 .entry(from.clone())
1711 .or_insert_with(Vec::new)
1712 .push(candidate);
1713 tracing::debug!(
1714 "🔖 Buffered ICE candidate from {:?} (peer not yet created)",
1715 from
1716 );
1717 }
1718 }
1719
1720 Ok(())
1721 }
1722
1723 async fn start_peer_receive_loop(&self, peer_id: ActrId, webrtc_conn: WebRtcConnection) {
1731 let message_tx = self.message_tx.clone();
1732
1733 let payload_types = vec![
1735 PayloadType::RpcReliable,
1736 PayloadType::RpcSignal,
1737 PayloadType::StreamReliable,
1738 PayloadType::StreamLatencyFirst,
1739 ];
1740
1741 for payload_type in payload_types {
1742 let message_tx_clone = message_tx.clone();
1743 let peer_id_clone = peer_id.clone();
1744 let webrtc_conn_clone = webrtc_conn.clone();
1745
1746 tokio::spawn(async move {
1747 tracing::debug!(
1748 "📡 Starting receive loop for peer {:?}, PayloadType: {:?}",
1749 peer_id_clone,
1750 payload_type
1751 );
1752
1753 let lane = match webrtc_conn_clone.get_lane(payload_type).await {
1755 Ok(l) => l,
1756 Err(e) => {
1757 tracing::error!(
1758 "❌ Failed to get Lane for {:?}, PayloadType {:?}: {}",
1759 peer_id_clone,
1760 payload_type,
1761 e
1762 );
1763 return;
1764 }
1765 };
1766
1767 loop {
1769 match lane.recv().await {
1770 Ok(data) => {
1771 tracing::debug!(
1772 "📨 Received message from {:?} (PayloadType: {:?}): {} bytes",
1773 peer_id_clone,
1774 payload_type,
1775 data.len()
1776 );
1777
1778 let peer_id_bytes = peer_id_clone.encode_to_vec();
1780
1781 if let Err(e) =
1783 message_tx_clone.send((peer_id_bytes, data, payload_type))
1784 {
1785 tracing::error!("❌ Message aggregation failed: {:?}", e);
1786 break;
1787 }
1788 }
1789 Err(e) => {
1790 tracing::warn!(
1791 "❌ Peer {:?} message receive failed (PayloadType: {:?}): {}",
1792 peer_id_clone,
1793 payload_type,
1794 e
1795 );
1796 break;
1797 }
1798 }
1799 }
1800
1801 tracing::debug!(
1802 "📡 Receive loop exited for peer {:?}, PayloadType: {:?}",
1803 peer_id_clone,
1804 payload_type
1805 );
1806 });
1807 }
1808 }
1809
1810 #[cfg_attr(
1815 feature = "opentelemetry",
1816 tracing::instrument(skip_all, fields(target_id = ?target.to_string_repr()))
1817 )]
1818 pub(crate) async fn send_message(
1819 self: &Arc<Self>,
1820 target: &ActrId,
1821 data: &[u8],
1822 ) -> RuntimeResult<()> {
1823 const MAX_RETRIES: u32 = 3;
1824 const OVERALL_TIMEOUT: Duration = Duration::from_secs(30);
1825
1826 tracing::debug!("📤 Sending message to {:?}: {} bytes", target, data.len());
1827
1828 let result = tokio::time::timeout(
1830 OVERALL_TIMEOUT,
1831 self.send_message_with_retry(target, data, MAX_RETRIES),
1832 )
1833 .await;
1834
1835 match result {
1836 Ok(inner_result) => inner_result,
1837 Err(_) => {
1838 tracing::error!(
1839 "⏰ Overall timeout ({}s) exceeded for send_message to {}",
1840 OVERALL_TIMEOUT.as_secs(),
1841 target.to_string_repr()
1842 );
1843 self.cleanup_cancelled_connection(target).await;
1844 Err(RuntimeError::DeadlineExceeded {
1845 message: format!(
1846 "send_message overall timeout ({}s)",
1847 OVERALL_TIMEOUT.as_secs()
1848 ),
1849 timeout_ms: OVERALL_TIMEOUT.as_millis() as u64,
1850 })
1851 }
1852 }
1853 }
1854
1855 async fn send_message_with_retry(
1857 self: &Arc<Self>,
1858 target: &ActrId,
1859 data: &[u8],
1860 max_retries: u32,
1861 ) -> RuntimeResult<()> {
1862 let mut backoff = ExponentialBackoff::new(
1863 Duration::from_millis(1), Duration::from_secs(10), None, );
1867
1868 let mut last_error = None;
1869
1870 for attempt in 0..=max_retries {
1871 if attempt > 0 {
1873 let delay = backoff.next().unwrap_or(Duration::from_secs(5));
1874 tracing::info!(
1875 "🔄 Retrying send_message to {} (attempt {}/{}, delay {:?})",
1876 target.to_string_repr(),
1877 attempt + 1,
1878 max_retries + 1,
1879 delay
1880 );
1881 tokio::time::sleep(delay).await;
1882 }
1883
1884 match self.try_send_message_once(target, data).await {
1885 Ok(()) => return Ok(()),
1886 Err(e) => {
1887 let should_retry = matches!(
1889 &e,
1890 RuntimeError::DeadlineExceeded { .. } | RuntimeError::Other(_)
1891 );
1892
1893 if !should_retry {
1894 return Err(e);
1895 }
1896
1897 tracing::warn!(
1898 "⚠️ send_message attempt {}/{} failed: {}",
1899 attempt + 1,
1900 max_retries + 1,
1901 e
1902 );
1903 last_error = Some(e);
1904
1905 self.cleanup_cancelled_connection(target).await;
1907 }
1908 }
1909 }
1910
1911 Err(last_error.unwrap_or_else(|| {
1913 RuntimeError::Other(anyhow::anyhow!("send_message failed after all retries"))
1914 }))
1915 }
1916
1917 async fn try_send_message_once(
1919 self: &Arc<Self>,
1920 target: &ActrId,
1921 data: &[u8],
1922 ) -> RuntimeResult<()> {
1923 let has_connection = loop {
1925 let state = {
1926 let peers = self.peers.read().await;
1927 peers
1928 .get(target)
1929 .map(|s| (s.current_state, s.last_state_change))
1930 };
1931
1932 match state {
1933 Some((
1934 RTCPeerConnectionState::New | RTCPeerConnectionState::Connecting,
1935 started,
1936 )) => {
1937 if started.elapsed() < INITIAL_CONNECTION_TIMEOUT {
1939 tracing::debug!(
1941 "⏳ Connection to {} is being established, waiting...",
1942 target.to_string_repr()
1943 );
1944 tokio::time::sleep(Duration::from_millis(100)).await;
1945 continue;
1946 } else {
1947 tracing::warn!(
1949 "⏰ Connection to {} timed out while connecting",
1950 target.to_string_repr()
1951 );
1952 break false;
1953 }
1954 }
1955 Some((RTCPeerConnectionState::Connected, _)) => {
1956 break true;
1958 }
1959 Some(_) => {
1960 break false;
1963 }
1964 None => {
1965 break false;
1967 }
1968 }
1969 };
1970
1971 #[cfg(feature = "opentelemetry")]
1972 let _ = self
1973 .root_context_map
1974 .write()
1975 .await
1976 .insert(target.clone(), tracing::Span::current().context());
1977
1978 if !has_connection {
1980 tracing::info!(
1981 "🔗 First send to {:?}, initiating role negotiation + WebRTC connection",
1982 target.serial_number
1983 );
1984
1985 let ready_rx = self.initiate_connection(target).await?;
1986 tracing::debug!(?ready_rx, "ready_rx");
1987
1988 match tokio::time::timeout(Duration::from_secs(10), ready_rx).await {
1990 Ok(Ok(())) => {
1991 tracing::info!("✅ WebRTC connection ready: {}", target.to_string_repr());
1992 }
1993 Ok(Err(_)) => {
1994 return Err(RuntimeError::Other(anyhow::anyhow!(
1995 "Connection establishment failed (channel closed)"
1996 )));
1997 }
1998 Err(_) => {
1999 return Err(RuntimeError::DeadlineExceeded {
2000 message: "Connection establishment timeout".to_string(),
2001 timeout_ms: 10000,
2002 });
2003 }
2004 }
2005 }
2006
2007 let webrtc_conn = {
2009 let peers = self.peers.read().await;
2010 peers
2011 .get(target)
2012 .map(|state| state.webrtc_conn.clone())
2013 .ok_or_else(|| {
2014 RuntimeError::Other(anyhow::anyhow!("Peer connection not found: {target:?}"))
2015 })?
2016 };
2017
2018 let lane = webrtc_conn
2020 .get_lane(PayloadType::RpcReliable)
2021 .await
2022 .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to get Lane: {e}")))?;
2023
2024 lane.send(Bytes::copy_from_slice(data))
2026 .await
2027 .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to send message: {e}")))?;
2028
2029 Ok(())
2030 }
2031
2032 pub async fn receive_message(&self) -> RuntimeResult<Option<(Vec<u8>, Bytes, PayloadType)>> {
2037 let mut rx = self.message_rx.lock().await;
2038 Ok(rx.recv().await)
2039 }
2040
2041 #[cfg_attr(
2056 feature = "opentelemetry",
2057 tracing::instrument(skip_all, fields(target_id = ?dest.as_actor_id().map(|id| id.to_string_repr())))
2058 )]
2059 pub async fn create_connection(
2060 self: &Arc<Self>,
2061 dest: &crate::transport::Dest,
2062 cancel_token: Option<CancellationToken>,
2063 ) -> RuntimeResult<WebRtcConnection> {
2064 const OVERALL_TIMEOUT: Duration = Duration::from_secs(30);
2066
2067 let target_id = dest.as_actor_id().ok_or_else(|| {
2069 RuntimeError::ConfigurationError(
2070 "WebRTC only supports Actor targets, not Shell".to_string(),
2071 )
2072 })?;
2073
2074 let result = tokio::time::timeout(
2076 OVERALL_TIMEOUT,
2077 self.create_connection_inner(dest, cancel_token.clone()),
2078 )
2079 .await;
2080
2081 match result {
2082 Ok(inner_result) => inner_result,
2083 Err(_) => {
2084 tracing::error!(
2086 "⏰ [Factory] Overall timeout ({}s) exceeded for connection to {}",
2087 OVERALL_TIMEOUT.as_secs(),
2088 target_id.to_string_repr()
2089 );
2090 self.cleanup_cancelled_connection(target_id).await;
2091 Err(RuntimeError::DeadlineExceeded {
2092 message: format!(
2093 "WebRTC connection creation overall timeout ({}s)",
2094 OVERALL_TIMEOUT.as_secs()
2095 ),
2096 timeout_ms: OVERALL_TIMEOUT.as_millis() as u64,
2097 })
2098 }
2099 }
2100 }
2101
2102 async fn create_connection_inner(
2104 self: &Arc<Self>,
2105 dest: &crate::transport::Dest,
2106 cancel_token: Option<CancellationToken>,
2107 ) -> RuntimeResult<WebRtcConnection> {
2108 if let Some(ref token) = cancel_token {
2110 if token.is_cancelled() {
2111 return Err(RuntimeError::Other(anyhow::anyhow!(
2112 "Connection creation cancelled before starting"
2113 )));
2114 }
2115 }
2116
2117 let target_id = dest.as_actor_id().ok_or_else(|| {
2119 RuntimeError::ConfigurationError(
2120 "WebRTC only supports Actor targets, not Shell".to_string(),
2121 )
2122 })?;
2123
2124 tracing::debug!(
2125 "🏭 [Factory] Creating WebRTC connection to {:?}",
2126 target_id.to_string_repr()
2127 );
2128
2129 {
2131 let peers = self.peers.read().await;
2132 if let Some(state) = peers.get(target_id) {
2133 tracing::debug!(
2134 "♻️ [Factory] Reusing existing WebRTC connection: {:?}",
2135 target_id.to_string_repr()
2136 );
2137 return Ok(state.webrtc_conn.clone());
2138 }
2139 }
2140
2141 const MAX_RETRIES: u32 = 3;
2143 let mut backoff = ExponentialBackoff::new(
2144 Duration::from_secs(5), Duration::from_secs(15), None, );
2148
2149 let mut last_error = None;
2150
2151 for attempt in 0..=MAX_RETRIES {
2152 if let Some(ref token) = cancel_token {
2154 if token.is_cancelled() {
2155 return Err(RuntimeError::Other(anyhow::anyhow!(
2156 "Connection creation cancelled"
2157 )));
2158 }
2159 }
2160
2161 if attempt > 0 {
2163 let delay = backoff.next().unwrap_or(Duration::from_secs(10));
2164 tracing::info!(
2165 "🔄 [Factory] Retrying connection to {} (attempt {}/{}, delay {:?})",
2166 target_id.to_string_repr(),
2167 attempt + 1,
2168 MAX_RETRIES + 1,
2169 delay
2170 );
2171
2172 if let Some(ref token) = cancel_token {
2174 tokio::select! {
2175 biased;
2176 _ = token.cancelled() => {
2177 self.cleanup_cancelled_connection(target_id).await;
2178 return Err(RuntimeError::Other(anyhow::anyhow!(
2179 "Connection creation cancelled during retry wait"
2180 )));
2181 }
2182 _ = tokio::time::sleep(delay) => {}
2183 }
2184 } else {
2185 tokio::time::sleep(delay).await;
2186 }
2187 } else {
2188 tracing::info!(
2189 "🔨 [Factory] Initiating new WebRTC connection: {:?}",
2190 target_id.to_string_repr()
2191 );
2192 }
2193
2194 match self
2196 .try_create_connection_once(target_id, cancel_token.as_ref())
2197 .await
2198 {
2199 Ok(conn) => return Ok(conn),
2200 Err(e) => {
2201 if let Some(ref token) = cancel_token {
2203 if token.is_cancelled() {
2204 return Err(e);
2205 }
2206 }
2207
2208 let should_retry = matches!(
2210 &e,
2211 RuntimeError::DeadlineExceeded { .. } | RuntimeError::Other(_)
2212 );
2213
2214 if !should_retry {
2215 return Err(e);
2216 }
2217
2218 tracing::warn!(
2219 "⚠️ [Factory] Connection attempt {}/{} failed: {}",
2220 attempt + 1,
2221 MAX_RETRIES + 1,
2222 e
2223 );
2224 last_error = Some(e);
2225
2226 self.cleanup_cancelled_connection(target_id).await;
2228 }
2229 }
2230 }
2231
2232 Err(last_error.unwrap_or_else(|| {
2234 RuntimeError::Other(anyhow::anyhow!("Connection failed after all retries"))
2235 }))
2236 }
2237
2238 async fn try_create_connection_once(
2240 self: &Arc<Self>,
2241 target_id: &ActrId,
2242 cancel_token: Option<&CancellationToken>,
2243 ) -> RuntimeResult<WebRtcConnection> {
2244 #[cfg(feature = "opentelemetry")]
2245 self.root_context_map
2246 .write()
2247 .await
2248 .insert(target_id.clone(), tracing::Span::current().context());
2249
2250 let ready_rx = self.initiate_connection(target_id).await?;
2251
2252 if let Some(token) = cancel_token {
2254 if token.is_cancelled() {
2255 self.cleanup_cancelled_connection(target_id).await;
2256 return Err(RuntimeError::Other(anyhow::anyhow!(
2257 "Connection creation cancelled after initiation"
2258 )));
2259 }
2260 }
2261
2262 let timeout_duration = std::time::Duration::from_secs(10);
2264
2265 let wait_result = if let Some(token) = cancel_token {
2266 tokio::select! {
2267 biased;
2268 _ = token.cancelled() => {
2269 self.cleanup_cancelled_connection(target_id).await;
2270 return Err(RuntimeError::Other(anyhow::anyhow!(
2271 "Connection creation cancelled while waiting"
2272 )));
2273 }
2274 _ = tokio::time::sleep(timeout_duration) => {
2275 Err(RuntimeError::DeadlineExceeded {
2276 message: "WebRTC connection establishment timeout".to_string(),
2277 timeout_ms: 10000,
2278 })
2279 }
2280 result = ready_rx => {
2281 result.map_err(|_| RuntimeError::Other(anyhow::anyhow!(
2282 "Connection establishment failed (channel closed)"
2283 )))
2284 }
2285 }
2286 } else {
2287 tokio::time::timeout(timeout_duration, ready_rx)
2288 .await
2289 .map_err(|_| RuntimeError::DeadlineExceeded {
2290 message: "WebRTC connection establishment timeout".to_string(),
2291 timeout_ms: 10000,
2292 })?
2293 .map_err(|_| {
2294 RuntimeError::Other(anyhow::anyhow!(
2295 "Connection establishment failed (channel closed)"
2296 ))
2297 })
2298 };
2299
2300 wait_result?;
2301
2302 tracing::info!(
2303 "✅ [Factory] WebRTC connection ready: {:?}",
2304 target_id.to_string_repr()
2305 );
2306
2307 if let Some(token) = cancel_token {
2309 if token.is_cancelled() {
2310 self.cleanup_cancelled_connection(target_id).await;
2311 return Err(RuntimeError::Other(anyhow::anyhow!(
2312 "Connection creation cancelled after ready"
2313 )));
2314 }
2315 }
2316
2317 let peers = self.peers.read().await;
2319 peers
2320 .get(target_id)
2321 .map(|state| state.webrtc_conn.clone())
2322 .ok_or_else(|| {
2323 RuntimeError::Other(anyhow::anyhow!(
2324 "Peer not found after connection establishment"
2325 ))
2326 })
2327 }
2328
2329 pub async fn send_media_sample(
2339 &self,
2340 target: &actr_protocol::ActrId,
2341 track_id: &str,
2342 sample: actr_framework::MediaSample,
2343 ) -> RuntimeResult<()> {
2344 use webrtc::rtp::header::Header as RtpHeader;
2345 use webrtc::rtp::packet::Packet as RtpPacket;
2346
2347 let peers = self.peers.read().await;
2349 let peer_state = peers.get(target).ok_or_else(|| {
2350 RuntimeError::Other(anyhow::anyhow!(
2351 "No connection to target: {}",
2352 target.to_string_repr()
2353 ))
2354 })?;
2355
2356 let track = peer_state
2358 .webrtc_conn
2359 .get_media_track(track_id)
2360 .await
2361 .ok_or_else(|| RuntimeError::Other(anyhow::anyhow!("Track not found: {track_id}")))?;
2362
2363 let sequence_number = peer_state
2365 .webrtc_conn
2366 .next_sequence_number(track_id)
2367 .await
2368 .ok_or_else(|| {
2369 RuntimeError::Other(anyhow::anyhow!(
2370 "Sequence number not found for track: {track_id}"
2371 ))
2372 })?;
2373
2374 let ssrc = peer_state
2376 .webrtc_conn
2377 .get_ssrc(track_id)
2378 .await
2379 .ok_or_else(|| {
2380 RuntimeError::Other(anyhow::anyhow!("SSRC not found for track: {track_id}"))
2381 })?;
2382
2383 let rtp_packet = RtpPacket {
2385 header: RtpHeader {
2386 version: 2,
2387 padding: false,
2388 extension: false,
2389 marker: true, payload_type: 96, sequence_number, timestamp: sample.timestamp,
2393 ssrc, ..Default::default()
2395 },
2396 payload: sample.data,
2397 };
2398
2399 track
2401 .write_rtp(&rtp_packet)
2402 .await
2403 .map_err(|e| RuntimeError::Other(anyhow::anyhow!("Failed to write RTP: {e}")))?;
2404
2405 tracing::debug!(
2406 "📤 Sent MediaSample: track_id={}, seq={}, ssrc=0x{:08x}, timestamp={}, size={}",
2407 track_id,
2408 sequence_number,
2409 ssrc,
2410 sample.timestamp,
2411 rtp_packet.payload.len()
2412 );
2413
2414 Ok(())
2415 }
2416
2417 pub async fn add_dynamic_track(
2432 &self,
2433 target: &actr_protocol::ActrId,
2434 track_id: String,
2435 codec: &str,
2436 media_type: &str,
2437 ) -> RuntimeResult<()> {
2438 tracing::info!(
2439 "🎬 Adding dynamic track: track_id={}, codec={}, type={}, target={}",
2440 track_id,
2441 codec,
2442 media_type,
2443 target.to_string_repr()
2444 );
2445
2446 let (webrtc_conn, peer_connection) = {
2448 let peers = self.peers.read().await;
2449 let state = peers.get(target).ok_or_else(|| {
2450 RuntimeError::Other(anyhow::anyhow!(
2451 "No connection to target: {}",
2452 target.to_string_repr()
2453 ))
2454 })?;
2455 (state.webrtc_conn.clone(), state.peer_connection.clone())
2456 };
2457
2458 webrtc_conn
2460 .add_media_track(track_id.clone(), codec, media_type)
2461 .await?;
2462
2463 tracing::info!("✅ Added track to PeerConnection: {}", track_id);
2464
2465 let root_span = tracing::info_span!("add_track", target_id = %target.to_string_repr());
2467 #[cfg(feature = "opentelemetry")]
2468 self.root_context_map
2469 .write()
2470 .await
2471 .insert(target.clone(), root_span.context());
2472
2473 self.renegotiate_connection(target, &peer_connection)
2474 .instrument(root_span)
2475 .await?;
2476
2477 tracing::info!("✅ Dynamic track added successfully: {}", track_id);
2478
2479 Ok(())
2480 }
2481
2482 async fn renegotiate_connection(
2487 &self,
2488 target: &actr_protocol::ActrId,
2489 peer_connection: &Arc<RTCPeerConnection>,
2490 ) -> RuntimeResult<()> {
2491 tracing::info!(
2492 "🔄 Starting SDP renegotiation with {}",
2493 target.to_string_repr()
2494 );
2495
2496 let offer = peer_connection.create_offer(None).await.map_err(|e| {
2498 RuntimeError::Other(anyhow::anyhow!("Failed to create renegotiation offer: {e}"))
2499 })?;
2500 let offer_sdp = offer.sdp.clone();
2501
2502 peer_connection
2504 .set_local_description(offer)
2505 .await
2506 .map_err(|e| {
2507 RuntimeError::Other(anyhow::anyhow!("Failed to set local description: {e}"))
2508 })?;
2509
2510 tracing::debug!(
2511 "📝 Created renegotiation Offer (SDP length: {})",
2512 offer_sdp.len()
2513 );
2514
2515 let session_desc = actr_protocol::SessionDescription {
2517 r#type: SdpType::Offer as i32,
2518 sdp: offer_sdp,
2519 };
2520 let payload = actr_relay::Payload::SessionDescription(session_desc);
2521 self.send_actr_relay(target, payload).await?;
2522
2523 tracing::info!("✅ Sent renegotiation Offer to {}", target.to_string_repr());
2524
2525 Ok(())
2530 }
2531
2532 pub async fn restart_ice(
2536 self: &Arc<Self>,
2537 target: &actr_protocol::ActrId,
2538 ) -> RuntimeResult<()> {
2539 let (peer_connection, should_start_restart) = {
2542 let mut peers = self.peers.write().await;
2543 if let Some(state) = peers.get_mut(target) {
2544 if let Some(ref handle) = state.restart_task_handle {
2546 if !handle.is_finished() {
2547 tracing::debug!(
2548 "🚫 ICE restart already in-flight for serial={}, skipping (task not finished)",
2549 target.serial_number
2550 );
2551 return Ok(());
2552 }
2553 }
2554
2555 if state.ice_restart_inflight {
2557 tracing::debug!(
2558 "🚫 ICE restart already in-flight for serial={}, skipping (ice_restart_inflight=true)",
2559 target.serial_number
2560 );
2561 return Ok(());
2562 }
2563
2564 if !state.is_offerer {
2566 tracing::warn!(
2567 "🚫 Skip ICE restart to serial={}: we are not the offerer",
2568 target.serial_number
2569 );
2570 return Ok(());
2571 }
2572
2573 state.ice_restart_inflight = true;
2575 (state.peer_connection.clone(), true)
2579 } else {
2580 tracing::warn!(
2581 "🚫 Skip ICE restart to serial={}: peer not found",
2582 target.serial_number
2583 );
2584 return Ok(());
2585 }
2586 };
2587
2588 if !should_start_restart {
2589 return Ok(());
2590 }
2591
2592 tracing::info!(
2593 "♻️ Initiating ICE restart to serial={}",
2594 target.serial_number
2595 );
2596
2597 let target_clone = target.clone();
2599 let peers = Arc::clone(&self.peers);
2600 let negotiator = self.negotiator.clone();
2601 let local_id = self.local_id.clone();
2602 let credential_state = self.credential_state.clone();
2603 let signaling_client = Arc::clone(&self.signaling_client);
2604 let coordinator_weak = Arc::downgrade(self);
2605
2606 let handle = tokio::spawn(async move {
2607 let restart_result = Self::do_ice_restart_inner(
2608 &target_clone,
2609 &peers,
2610 peer_connection,
2611 &negotiator,
2612 &local_id,
2613 credential_state,
2614 &signaling_client,
2615 )
2616 .await;
2617
2618 match restart_result {
2619 Ok(true) => {
2620 tracing::info!(
2621 "✅ ICE restart succeeded for serial={}",
2622 target_clone.serial_number
2623 );
2624 }
2625 Ok(false) => {
2626 tracing::warn!(
2628 "⚠️ ICE restart exhausted for serial={}, cleaning up and attempting fresh connection",
2629 target_clone.serial_number
2630 );
2631
2632 if let Some(coord) = coordinator_weak.upgrade() {
2633 tracing::info!(
2635 "🧹 Cleaning up old connection after ICE restart failure for serial={}",
2636 target_clone.serial_number
2637 );
2638 coord.cleanup_cancelled_connection(&target_clone).await;
2639 }
2640 }
2641 Err(e) => {
2642 tracing::error!(
2643 "❌ ICE restart failed for serial={}: {}",
2644 target_clone.serial_number,
2645 e
2646 );
2647
2648 if let Some(coord) = coordinator_weak.upgrade() {
2650 tracing::info!(
2651 "🧹 Cleaning up connection after ICE restart error for serial={}",
2652 target_clone.serial_number
2653 );
2654 coord.cleanup_cancelled_connection(&target_clone).await;
2655 }
2656 }
2657 }
2658
2659 {
2661 let mut peers_guard = peers.write().await;
2662 if let Some(state) = peers_guard.get_mut(&target_clone) {
2663 state.restart_task_handle = None;
2664 }
2665 }
2666 });
2667
2668 {
2671 let mut peers = self.peers.write().await;
2672 if let Some(state) = peers.get_mut(target) {
2673 state.restart_task_handle = Some(handle);
2674 }
2675 }
2676
2677 Ok(())
2678 }
2679
2680 async fn do_ice_restart_inner(
2683 target: &ActrId,
2684 peers: &Arc<RwLock<HashMap<ActrId, PeerState>>>,
2685 peer_connection: Arc<RTCPeerConnection>,
2686 negotiator: &WebRtcNegotiator,
2687 local_id: &ActrId,
2688 credential_state: CredentialState,
2689 signaling_client: &Arc<dyn SignalingClient>,
2690 ) -> RuntimeResult<bool> {
2691 let backoff = ExponentialBackoff::with_total_duration(
2693 Duration::from_millis(ICE_RESTART_INITIAL_BACKOFF_MS),
2694 Duration::from_millis(ICE_RESTART_MAX_BACKOFF_MS),
2695 Some(ICE_RESTART_MAX_RETRIES),
2696 ICE_RESTART_MAX_TOTAL_DURATION,
2697 );
2698
2699 let mut restart_ok = false;
2700 let mut gathering_started_at: Option<Instant> = None;
2701
2702 for delay in backoff {
2703 if !signaling_client.is_connected() {
2705 tracing::debug!(
2706 "🔄 Signaling not ready for ICE restart to serial={}, will retry after {:?}",
2707 target.serial_number,
2708 delay
2709 );
2710 tokio::time::sleep(delay).await;
2711 continue; }
2713
2714 let gathering_state = peer_connection.ice_gathering_state();
2716 if gathering_state == RTCIceGatheringState::Gathering {
2717 let started = gathering_started_at.get_or_insert_with(Instant::now);
2718 let gathering_duration = started.elapsed();
2719
2720 if gathering_duration > ICE_GATHERING_TIMEOUT {
2721 tracing::error!(
2722 "❌ ICE gathering stuck for {:?}, aborting ICE restart for serial={}",
2723 gathering_duration,
2724 target.serial_number
2725 );
2726 let _ = peer_connection.close().await;
2728 return Ok(false);
2729 }
2730
2731 tracing::debug!(
2732 "⏳ ICE gathering in progress ({:?} elapsed), will retry after {:?}",
2733 gathering_duration,
2734 delay
2735 );
2736 tokio::time::sleep(delay).await;
2737 continue; } else {
2739 gathering_started_at = None;
2741 }
2742
2743 let (offer_sdp, attempt) = {
2745 let mut peers_guard = peers.write().await;
2746 let state = match peers_guard.get_mut(target) {
2747 Some(s) => s,
2748 None => {
2749 tracing::warn!(
2750 "🚫 Peer state not found during ICE restart for serial={}",
2751 target.serial_number
2752 );
2753 return Ok(false);
2754 }
2755 };
2756
2757 if !state.is_offerer {
2758 tracing::warn!(
2759 "🚫 Skip ICE restart to serial={}: we are not the offerer",
2760 target.serial_number
2761 );
2762 state.ice_restart_inflight = false;
2763 state.ice_restart_attempts = 0;
2764 return Ok(false);
2765 }
2766
2767 state.ice_restart_attempts += 1;
2770 let attempt = state.ice_restart_attempts;
2771
2772 let offer_sdp = negotiator
2773 .create_ice_restart_offer(&peer_connection)
2774 .await?;
2775
2776 (offer_sdp, attempt)
2777 };
2778
2779 let relay = ActrRelay {
2781 source: local_id.clone(),
2782 credential: credential_state.credential().await,
2783 target: target.clone(),
2784 payload: Some(actr_relay::Payload::SessionDescription(
2785 actr_protocol::SessionDescription {
2786 r#type: SdpType::IceRestartOffer as i32,
2787 sdp: offer_sdp,
2788 },
2789 )),
2790 };
2791
2792 let envelope = SignalingEnvelope {
2793 envelope_version: 1,
2794 envelope_id: uuid::Uuid::new_v4().to_string(),
2795 reply_for: None,
2796 timestamp: prost_types::Timestamp {
2797 seconds: chrono::Utc::now().timestamp(),
2798 nanos: 0,
2799 },
2800 flow: Some(signaling_envelope::Flow::ActrRelay(relay)),
2801 traceparent: None,
2802 tracestate: None,
2803 };
2804
2805 if let Err(e) = signaling_client.send_envelope(envelope).await {
2806 tracing::error!(
2807 "❌ Failed to send ICE restart offer to serial={}: {}",
2808 target.serial_number,
2809 e
2810 );
2811 let mut peers_guard = peers.write().await;
2813 if let Some(state) = peers_guard.get_mut(target) {
2814 state.ice_restart_inflight = false;
2815 }
2816 tokio::time::sleep(delay).await;
2817 continue;
2818 }
2819
2820 tracing::info!(
2821 "♻️ ICE restart attempt {} sent to serial={}",
2822 attempt,
2823 target.serial_number
2824 );
2825
2826 let success =
2828 Self::wait_for_restart_completion_static(peers, target, ICE_RESTART_TIMEOUT).await;
2829
2830 if success {
2831 restart_ok = true;
2832 break;
2833 }
2834
2835 tracing::warn!(
2836 "⚠️ ICE restart attempt {} timed out for serial={}",
2837 attempt,
2838 target.serial_number
2839 );
2840
2841 {
2843 let mut peers_guard = peers.write().await;
2844 if let Some(state) = peers_guard.get_mut(target) {
2845 state.ice_restart_inflight = false;
2846 }
2847 }
2848
2849 tracing::info!(
2851 "⏳ Waiting {:?} before next ICE restart attempt to serial={}",
2852 delay,
2853 target.serial_number
2854 );
2855 tokio::time::sleep(delay).await;
2856 }
2857
2858 if !restart_ok {
2859 tracing::warn!(
2860 "⚠️ Backoff iterator exhausted for serial={}, stopping retries and dropping peer",
2861 target.serial_number
2862 );
2863 Self::drop_peer_connection_static(peers, target).await;
2864 return Ok(false);
2865 }
2866
2867 Ok(true)
2868 }
2869
2870 async fn wait_for_restart_completion_static(
2873 peers: &Arc<RwLock<HashMap<ActrId, PeerState>>>,
2874 target: &ActrId,
2875 timeout: Duration,
2876 ) -> bool {
2877 let mut interval = tokio::time::interval(Duration::from_millis(100));
2878 let timeout_sleep = tokio::time::sleep(timeout);
2879 tokio::pin!(timeout_sleep);
2880
2881 loop {
2882 tokio::select! {
2883 _ = &mut timeout_sleep => {
2884 return false;
2885 }
2886 _ = interval.tick() => {
2887 let is_done = {
2889 let peers_guard = peers.read().await;
2890 match peers_guard.get(target) {
2891 Some(state) => !state.ice_restart_inflight,
2892 None => return false,
2893 }
2894 };
2895
2896 if is_done {
2897 let mut peers_guard = peers.write().await;
2899 if let Some(state) = peers_guard.get_mut(target) {
2900 state.ice_restart_attempts = 0;
2901 }
2902 return true;
2903 }
2904 }
2905 }
2906 }
2907 }
2908
2909 async fn drop_peer_connection_static(
2914 peers: &Arc<RwLock<HashMap<ActrId, PeerState>>>,
2915 target: &ActrId,
2916 ) {
2917 let state_to_close = {
2919 let mut peers_guard = peers.write().await;
2920 peers_guard.remove(target)
2921 }; if let Some(state) = state_to_close {
2924 if let Err(e) = state.peer_connection.close().await {
2925 tracing::warn!(
2926 "⚠️ Failed to close peer_connection for {}: {}",
2927 target.serial_number,
2928 e
2929 );
2930 }
2931 if let Err(e) = state.webrtc_conn.close().await {
2932 tracing::warn!(
2933 "⚠️ Failed to close WebRtcConnection for {}: {}",
2934 target.serial_number,
2935 e
2936 );
2937 }
2938 tracing::info!("🧹 Dropped peer connection for {}", target.serial_number);
2939 } else {
2940 tracing::warn!(
2941 "⚠️ drop_peer_connection: peer not found {}",
2942 target.serial_number
2943 );
2944 }
2945 }
2946
2947 #[allow(dead_code)]
2952 async fn handle_renegotiation_offer(
2953 &self,
2954 from: &ActrId,
2955 offer_sdp: String,
2956 ) -> RuntimeResult<()> {
2957 tracing::info!(
2958 "🔄 Processing renegotiation Offer from {}",
2959 from.to_string_repr()
2960 );
2961
2962 let peer_connection = {
2964 let peers = self.peers.read().await;
2965 let state = peers.get(from).ok_or_else(|| {
2966 RuntimeError::Other(anyhow::anyhow!("Peer state not found for renegotiation"))
2967 })?;
2968 state.peer_connection.clone()
2969 };
2970
2971 let offer =
2973 webrtc::peer_connection::sdp::session_description::RTCSessionDescription::offer(
2974 offer_sdp,
2975 )
2976 .map_err(|e| {
2977 RuntimeError::Other(anyhow::anyhow!("Failed to parse renegotiation offer: {e}"))
2978 })?;
2979 peer_connection
2980 .set_remote_description(offer)
2981 .await
2982 .map_err(|e| {
2983 RuntimeError::Other(anyhow::anyhow!("Failed to set remote description: {e}"))
2984 })?;
2985
2986 tracing::debug!("✅ Set remote description (renegotiation Offer)");
2987
2988 let answer = peer_connection.create_answer(None).await.map_err(|e| {
2990 RuntimeError::Other(anyhow::anyhow!(
2991 "Failed to create renegotiation answer: {e}"
2992 ))
2993 })?;
2994 let answer_sdp = answer.sdp.clone();
2995
2996 peer_connection
2998 .set_local_description(answer)
2999 .await
3000 .map_err(|e| {
3001 RuntimeError::Other(anyhow::anyhow!("Failed to set local description: {e}"))
3002 })?;
3003
3004 tracing::debug!(
3005 "✅ Created renegotiation Answer (SDP length: {})",
3006 answer_sdp.len()
3007 );
3008
3009 let session_desc = actr_protocol::SessionDescription {
3011 r#type: SdpType::Answer as i32,
3012 sdp: answer_sdp,
3013 };
3014 let payload = actr_relay::Payload::SessionDescription(session_desc);
3015 self.send_actr_relay(from, payload).await?;
3016
3017 tracing::info!("✅ Sent renegotiation Answer to {}", from.to_string_repr());
3018
3019 Ok(())
3023 }
3024
3025 async fn handle_ice_restart_offer(
3028 self: &Arc<Self>,
3029 from: &ActrId,
3030 offer_sdp: String,
3031 ) -> RuntimeResult<()> {
3032 let (peer_connection, is_offerer) = {
3034 let peers = self.peers.read().await;
3035 let state = peers.get(from).ok_or_else(|| {
3036 RuntimeError::Other(anyhow::anyhow!(
3037 "ICE restart offer received for unknown peer"
3038 ))
3039 })?;
3040 (state.peer_connection.clone(), state.is_offerer)
3041 };
3042
3043 if is_offerer {
3044 tracing::warn!(
3045 "🚫 Ignoring ICE restart offer from {:?}: we are current offerer",
3046 from.serial_number
3047 );
3048 return Ok(());
3049 }
3050
3051 let answer_sdp = self
3053 .negotiator
3054 .create_answer(&peer_connection, offer_sdp)
3055 .await?;
3056
3057 let session_desc = actr_protocol::SessionDescription {
3059 r#type: SdpType::Answer as i32,
3060 sdp: answer_sdp,
3061 };
3062 let payload = actr_relay::Payload::SessionDescription(session_desc);
3063 self.send_actr_relay(from, payload).await?;
3064
3065 self.flush_pending_candidates(from, &peer_connection)
3067 .await?;
3068
3069 tracing::info!(
3070 "✅ Completed ICE restart answer to serial={}",
3071 from.serial_number
3072 );
3073
3074 Ok(())
3075 }
3076
3077 #[cfg_attr(
3081 feature = "opentelemetry",
3082 tracing::instrument(skip_all, fields(peer_id = ?peer.to_string_repr()))
3083 )]
3084 async fn handle_role_assignment(self: &Arc<Self>, assign: RoleAssignment, peer: ActrId) {
3085 tracing::debug!(?assign, ?peer, "handle_role_assignment");
3086
3087 if assign.is_offerer {
3091 let has_connection = self.peers.read().await.contains_key(&peer);
3092
3093 if has_connection {
3095 tracing::info!(
3096 "🔄 Assigned as offerer for {} (has_connection={}), cleaning up for new connection",
3097 peer.serial_number,
3098 has_connection
3099 );
3100
3101 let this = Arc::clone(self);
3105 let peer_clone = peer.clone();
3106 tokio::spawn(async move {
3107 this.cleanup_cancelled_connection(&peer_clone).await;
3108 });
3109 }
3110 }
3111 let role_sender = {
3115 let mut neg = self.peer_negotiation.lock().await;
3116 neg.get_mut(&peer).and_then(|s| s.role_tx.take())
3117 };
3118 if let Some(sender) = role_sender {
3119 if sender.send(assign.is_offerer).is_ok() {
3120 return;
3121 }
3122 }
3123
3124 tracing::debug!(
3125 ?assign,
3126 ?peer,
3127 "handle_role_assignment: no pending negotiation"
3128 );
3129 let has_connection = self.peers.read().await.contains_key(&peer);
3131 if has_connection {
3132 return;
3133 }
3134 if assign.is_offerer {
3135 tracing::info!(
3136 "🎭 Acting as offerer to {} per assignment (no pending negotiation)",
3137 peer.serial_number
3138 );
3139 let this = Arc::clone(self);
3141 let peer_clone = peer.clone();
3142 #[cfg(feature = "opentelemetry")]
3143 let current_span = tracing::Span::current();
3144 tokio::spawn(async move {
3145 let start_offer_fut = this.start_offer_connection(&peer_clone, true);
3146 #[cfg(feature = "opentelemetry")]
3147 let start_offer_fut = start_offer_fut.instrument(current_span);
3148 match start_offer_fut.await {
3149 Ok(ready_rx) => {
3150 this.peer_negotiation
3151 .lock()
3152 .await
3153 .entry(peer_clone.clone())
3154 .or_default()
3155 .ready_rx = Some(ready_rx);
3156 }
3157 Err(e) => {
3158 tracing::warn!(
3159 "⚠️ Failed to start proactive offer connection to {}: {}",
3160 peer_clone.serial_number,
3161 e
3162 );
3163 }
3164 }
3165 });
3166 } else {
3167 tracing::debug!(
3168 "🎭 Assignment marks us as answerer for {}, waiting for offer (no pending negotiation)",
3169 peer.serial_number
3170 );
3171 let (tx, _rx) = oneshot::channel();
3172 self.peer_negotiation
3173 .lock()
3174 .await
3175 .entry(peer.clone())
3176 .or_default()
3177 .ready_tx = Some(tx);
3178
3179 let weak = Arc::downgrade(self);
3181 let peer_clone = peer.clone();
3182 #[cfg(feature = "opentelemetry")]
3183 let current_span = tracing::Span::current();
3184 tokio::spawn(async move {
3185 tokio::time::sleep(ROLE_WAIT_TIMEOUT).await;
3186 if let Some(coord) = weak.upgrade() {
3187 if coord.peers.read().await.contains_key(&peer_clone) {
3189 return;
3190 }
3191 let pending = {
3192 let mut neg = coord.peer_negotiation.lock().await;
3193 neg.get_mut(&peer_clone).and_then(|s| s.ready_tx.take())
3194 };
3195 if pending.is_none() {
3196 return;
3197 }
3198 tracing::warn!(
3199 "⏳ Waiting for offer from {} timed out, force acting as offerer",
3200 peer_clone.serial_number
3201 );
3202 let start_offer_fut = coord.start_offer_connection(&peer_clone, true);
3203 #[cfg(feature = "opentelemetry")]
3204 let start_offer_fut = start_offer_fut.instrument(current_span);
3205 match start_offer_fut.await {
3206 Ok(ready_rx) => {
3207 coord
3208 .peer_negotiation
3209 .lock()
3210 .await
3211 .entry(peer_clone.clone())
3212 .or_default()
3213 .ready_rx = Some(ready_rx);
3214 }
3215 Err(e) => {
3216 tracing::warn!(
3217 "⚠️ Failed to start offer connection after timeout to {}: {}",
3218 peer_clone.serial_number,
3219 e
3220 );
3221 }
3222 }
3223 }
3224 });
3225 }
3226 }
3227
3228 #[cfg_attr(
3230 feature = "opentelemetry",
3231 tracing::instrument(skip_all, fields(target_id = ?target.to_string_repr()))
3232 )]
3233 async fn negotiate_role(&self, target: &ActrId) -> RuntimeResult<bool> {
3234 let (tx, rx) = oneshot::channel();
3235 self.peer_negotiation
3237 .lock()
3238 .await
3239 .entry(target.clone())
3240 .or_default()
3241 .role_tx = Some(tx);
3242
3243 let payload = actr_relay::Payload::RoleNegotiation(RoleNegotiation {
3244 from: self.local_id.clone(),
3245 to: target.clone(),
3246 realm_id: self.local_id.realm.realm_id,
3247 });
3248
3249 tracing::debug!(
3250 "🔄 Sending role negotiation to serial={}",
3251 target.serial_number
3252 );
3253 self.send_actr_relay(target, payload).await?;
3254
3255 rx.await.map_err(|_| {
3256 RuntimeError::Other(anyhow::anyhow!(
3257 "Role negotiation channel closed before assignment"
3258 ))
3259 })
3260 }
3261
3262 fn install_restart_handler(
3264 self: &Arc<Self>,
3265 webrtc_conn: WebRtcConnection,
3266 peer_connection: Arc<RTCPeerConnection>,
3267 target: ActrId,
3268 ) {
3269 let coord = Arc::downgrade(self);
3270 peer_connection.on_peer_connection_state_change(Box::new(
3271 move |state: RTCPeerConnectionState| {
3272 let coord = coord.clone();
3273 let target = target.clone();
3274 let webrtc_conn = webrtc_conn.clone();
3275 Box::pin(async move {
3276 webrtc_conn.handle_state_change(state).await;
3278
3279 tracing::info!(
3280 "📡 PeerConnection state for {} -> {:?}",
3281 target.serial_number,
3282 state
3283 );
3284
3285 if let Some(c) = coord.upgrade() {
3287 let mut peers = c.peers.write().await;
3288 if let Some(peer_state) = peers.get_mut(&target) {
3289 peer_state.current_state = state;
3290 peer_state.last_state_change = std::time::Instant::now();
3291 }
3292 drop(peers); }
3294
3295 if matches!(
3296 state,
3297 RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Failed
3298 ) {
3299 if let Some(c) = coord.upgrade() {
3300 if let Err(e) = c.restart_ice(&target).await {
3301 tracing::warn!(
3302 "⚠️ Failed to auto restart ICE to {}: {}",
3303 target.serial_number,
3304 e
3305 );
3306 }
3307 }
3308 }
3309 })
3310 },
3311 ));
3312 }
3313}