1#[cfg(feature = "opentelemetry")]
6use super::trace;
7use crate::lifecycle::CredentialState;
8use crate::transport::{NetworkError, NetworkResult};
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::extract_trace_context;
11use actr_protocol::prost::Message as ProstMessage;
12use actr_protocol::{
13 AIdCredential, ActrId, ActrToSignaling, CredentialUpdateRequest, GetSigningKeyRequest,
14 PeerToSignaling, Ping, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
15 RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterRequest,
16 UnregisterResponse, actr_to_signaling, peer_to_signaling, signaling_envelope,
17 signaling_to_actr,
18};
19use async_trait::async_trait;
20use base64::Engine as _;
21use futures_util::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::{
27 Arc, OnceLock,
28 atomic::{AtomicBool, AtomicU64, Ordering},
29};
30use std::time::Duration;
31use tokio::net::TcpStream;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
34use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async_with_config};
35#[cfg(feature = "opentelemetry")]
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use url::Url;
38
39type WsSink = Arc<
41 tokio::sync::Mutex<
42 Option<
43 futures_util::stream::SplitSink<
44 WebSocketStream<MaybeTlsStream<TcpStream>>,
45 tokio_tungstenite::tungstenite::Message,
46 >,
47 >,
48 >,
49>;
50
51const RESPONSE_TIMEOUT_SECS: u64 = 15;
57const PING_INTERVAL_SECS: u64 = 5;
59const PONG_TIMEOUT_SECS: u64 = 10;
60const SIGNALING_SEND_TIMEOUT_SECS: u64 = 5;
61const CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS: u64 = 5;
62const DISCONNECT_LOCK_TIMEOUT_SECS: u64 = 5;
63const DISCONNECT_CLOSE_TIMEOUT_SECS: u64 = 1;
64
65#[derive(Debug, Clone)]
71pub struct SignalingConfig {
72 pub server_url: Url,
74
75 pub connection_timeout: u64,
77
78 pub heartbeat_interval: u64,
80
81 pub reconnect_config: ReconnectConfig,
83
84 pub auth_config: Option<AuthConfig>,
86
87 pub webrtc_role: Option<String>,
89}
90
91#[derive(Debug, Clone)]
93pub struct ReconnectConfig {
94 pub enabled: bool,
96
97 pub max_attempts: u32,
99
100 pub initial_delay: u64,
102
103 pub max_delay: u64,
105
106 pub backoff_multiplier: f64,
108}
109
110impl Default for ReconnectConfig {
111 fn default() -> Self {
112 Self {
113 enabled: true,
114 max_attempts: 10,
115 initial_delay: 1,
116 max_delay: 60,
117 backoff_multiplier: 2.0,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct AuthConfig {
125 pub auth_type: AuthType,
127
128 pub credentials: HashMap<String, String>,
130}
131
132#[derive(Debug, Clone)]
134pub enum AuthType {
135 None,
137 BearerToken,
139 ApiKey,
141 Jwt,
143}
144
145#[async_trait]
155pub trait SignalingClient: Send + Sync {
156 async fn connect(&self) -> NetworkResult<()>;
158
159 async fn connect_once(&self) -> NetworkResult<()> {
164 self.connect().await
165 }
166
167 fn schedule_auto_reconnect(&self) {}
170
171 async fn disconnect(&self) -> NetworkResult<()>;
173
174 async fn probe_alive(&self, _timeout: Duration) -> NetworkResult<()> {
180 if self.is_connected() {
181 Ok(())
182 } else {
183 Err(NetworkError::ConnectionError(
184 "Signaling client is not connected".to_string(),
185 ))
186 }
187 }
188
189 async fn send_register_request(
192 &self,
193 request: RegisterRequest,
194 ) -> NetworkResult<RegisterResponse>;
195
196 async fn send_unregister_request(
201 &self,
202 actor_id: ActrId,
203 credential: AIdCredential,
204 reason: Option<String>,
205 ) -> NetworkResult<UnregisterResponse>;
206
207 async fn send_heartbeat(
210 &self,
211 actor_id: ActrId,
212 credential: AIdCredential,
213 availability: ServiceAvailabilityState,
214 power_reserve: f32,
215 mailbox_backlog: f32,
216 ) -> NetworkResult<Pong>;
217
218 async fn send_route_candidates_request(
220 &self,
221 actor_id: ActrId,
222 credential: AIdCredential,
223 request: RouteCandidatesRequest,
224 ) -> NetworkResult<RouteCandidatesResponse>;
225
226 async fn get_signing_key(
231 &self,
232 actor_id: ActrId,
233 credential: AIdCredential,
234 key_id: u32,
235 ) -> NetworkResult<(u32, Vec<u8>)>;
236
237 async fn send_credential_update_request(
242 &self,
243 actor_id: ActrId,
244 credential: AIdCredential,
245 ) -> NetworkResult<RegisterResponse>;
246
247 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
249
250 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
252
253 fn is_connected(&self) -> bool;
255
256 fn get_stats(&self) -> SignalingStats;
258 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent>;
260
261 async fn set_actor_id(&self, actor_id: ActrId);
263 async fn set_credential_state(&self, credential_state: CredentialState);
264
265 async fn clear_identity(&self);
272
273 fn set_hook_callback(&self, _cb: HookCallback) {}
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub enum ConnectionState {
282 Disconnected,
283 Connected,
284}
285
286#[derive(Debug, Clone)]
292pub enum SignalingEvent {
293 ConnectStart { attempt: u32 },
295 Connected,
297 Disconnected { reason: DisconnectReason },
299}
300
301#[derive(Debug, Clone)]
303pub enum DisconnectReason {
304 StreamEnded,
306 PongTimeout,
308 PingSendFailed,
310 CredentialExpired,
312 Manual,
314 ConnectionFailed(String),
316}
317
318#[derive(Clone, Debug)]
327pub enum HookEvent {
328 SignalingConnectStart {
330 attempt: u32,
331 },
332 SignalingConnected,
333 SignalingDisconnected,
334 WebRtcConnectStart {
336 peer_id: ActrId,
337 },
338 WebRtcConnected {
339 peer_id: ActrId,
340 relayed: bool,
341 },
342 WebRtcDisconnected {
343 peer_id: ActrId,
344 },
345 DataStreamDeliveryUncertain {
346 stream_id: String,
347 session_id: u64,
348 reason: String,
349 },
350 WebSocketConnectStart {
352 peer_id: ActrId,
353 },
354 WebSocketConnected {
355 peer_id: ActrId,
356 },
357 WebSocketDisconnected {
358 peer_id: ActrId,
359 },
360 CredentialRenewed {
362 new_expiry: std::time::SystemTime,
363 },
364 CredentialExpiring {
365 new_expiry: std::time::SystemTime,
366 },
367 MailboxBackpressure {
369 queue_len: usize,
370 threshold: usize,
371 },
372}
373
374pub type HookCallback =
379 Arc<dyn Fn(HookEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
380
381#[derive(Debug, Clone, Copy)]
382enum ConnectIntent {
383 Explicit,
384 AutoReconnect { generation: u64 },
385}
386
387pub struct WebSocketSignalingClient {
389 config: SignalingConfig,
390 actor_id: tokio::sync::Mutex<Option<ActrId>>,
391 credential_state: tokio::sync::Mutex<Option<CredentialState>>,
392 ws_sink: WsSink,
394 ws_stream: tokio::sync::Mutex<
396 Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
397 >,
398 connected: Arc<AtomicBool>,
400 connecting: Arc<AtomicBool>,
402 stats: Arc<AtomicSignalingStats>,
404 envelope_counter: tokio::sync::Mutex<u64>,
406 pending_replies: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<SignalingEnvelope>>>>,
408 pending_pongs: Arc<tokio::sync::Mutex<HashMap<Vec<u8>, oneshot::Sender<()>>>>,
410 probe_counter: AtomicU64,
412 inbound_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<SignalingEnvelope>>>,
414 inbound_tx: tokio::sync::Mutex<mpsc::UnboundedSender<SignalingEnvelope>>,
415 receiver_task: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
417 ping_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
419 event_tx: broadcast::Sender<SignalingEvent>,
421 last_pong: Arc<AtomicU64>,
423 reconnector_started: Arc<AtomicBool>,
425 reconnect_notify: Arc<tokio::sync::Notify>,
427 auto_reconnect_suppressed: AtomicBool,
429 reconnect_generation: AtomicU64,
431 hook_callback: OnceLock<HookCallback>,
433}
434
435impl WebSocketSignalingClient {
436 pub fn new(config: SignalingConfig) -> Self {
438 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
439 let (event_tx, _event_rx) = broadcast::channel(64);
440 Self {
441 config,
442 actor_id: tokio::sync::Mutex::new(None),
443 credential_state: tokio::sync::Mutex::new(None),
444 ws_sink: Arc::new(tokio::sync::Mutex::new(None)),
445 ws_stream: tokio::sync::Mutex::new(None),
446 connected: Arc::new(AtomicBool::new(false)),
447 connecting: Arc::new(AtomicBool::new(false)),
448 stats: Arc::new(AtomicSignalingStats::default()),
449 envelope_counter: tokio::sync::Mutex::new(0),
450 pending_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
451 pending_pongs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
452 probe_counter: AtomicU64::new(0),
453 inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
454 inbound_tx: tokio::sync::Mutex::new(inbound_tx),
455 receiver_task: Arc::new(tokio::sync::Mutex::new(None)),
456 ping_task: tokio::sync::Mutex::new(None),
457 event_tx,
458 last_pong: Arc::new(AtomicU64::new(0)),
459 reconnector_started: Arc::new(AtomicBool::new(false)),
460 reconnect_notify: Arc::new(tokio::sync::Notify::new()),
461 auto_reconnect_suppressed: AtomicBool::new(false),
462 reconnect_generation: AtomicU64::new(0),
463 hook_callback: OnceLock::new(),
464 }
465 }
466
467 async fn invoke_hook(&self, event: HookEvent) {
474 if let Some(cb) = self.hook_callback.get() {
475 cb(event).await;
476 }
477 }
478
479 async fn publish_disconnected_transition(
480 was_connected: bool,
481 stats: &Arc<AtomicSignalingStats>,
482 event_tx: &broadcast::Sender<SignalingEvent>,
483 hook_callback: Option<HookCallback>,
484 reason: DisconnectReason,
485 reconnect_notify: Option<&Arc<tokio::sync::Notify>>,
486 ) -> bool {
487 if !was_connected {
488 return false;
489 }
490
491 stats.disconnections.fetch_add(1, Ordering::Relaxed);
492
493 if let Some(cb) = hook_callback {
494 cb(HookEvent::SignalingDisconnected).await;
495 }
496
497 let _ = event_tx.send(SignalingEvent::Disconnected { reason });
498
499 if let Some(notify) = reconnect_notify {
500 notify.notify_one();
501 }
502
503 true
504 }
505
506 pub fn start_reconnect_manager(self: &Arc<Self>) {
507 if !self.config.reconnect_config.enabled {
508 return;
509 }
510 if self
511 .reconnector_started
512 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
513 .is_err()
514 {
515 return; }
517
518 tracing::info!("🔄 Starting reconnect manager for signaling client");
519
520 let client = Arc::downgrade(self);
521 let notify = self.reconnect_notify.clone();
522
523 tokio::spawn(async move {
524 loop {
525 let reconnect_requested = tokio::select! {
526 _ = notify.notified() => true,
527 _ = tokio::time::sleep(Duration::from_secs(30)) => false,
528 };
529
530 if !reconnect_requested && client.upgrade().is_none() {
531 break;
532 }
533 if !reconnect_requested {
534 continue;
535 }
536
537 let Some(client) = client.upgrade() else {
538 break;
539 };
540
541 if !client.config.reconnect_config.enabled {
542 break;
543 }
544
545 if Arc::strong_count(&client) <= 1 {
546 break;
547 }
548
549 client.run_reconnect_cycle().await;
551 }
552 });
553 }
554
555 async fn run_reconnect_cycle(self: &Arc<Self>) {
557 use actr_framework::ExponentialBackoff;
558
559 let cfg = &self.config.reconnect_config;
560 let generation = self.reconnect_generation.load(Ordering::Acquire);
561
562 if Arc::strong_count(self) <= 1 {
563 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
564 return;
565 }
566
567 if self.auto_reconnect_cancelled(generation) {
568 tracing::debug!("Skipping signaling auto-reconnect cycle after explicit disconnect");
569 return;
570 }
571
572 if self.connected.load(Ordering::Acquire) {
573 tracing::debug!("🔎 Probing connected signaling before reconnect cycle");
574 match self
575 .probe_alive(Duration::from_secs(PONG_TIMEOUT_SECS))
576 .await
577 {
578 Ok(()) => {
579 tracing::debug!("Signaling probe succeeded, skipping reconnect cycle");
580 return;
581 }
582 Err(e) => {
583 tracing::warn!("Signaling probe failed before reconnect: {e}");
584 if let Err(disconnect_err) = self.disconnect_internal(false).await {
585 tracing::warn!(
586 "⚠️ Disconnect cleanup failed after failed probe (non-fatal): {disconnect_err}"
587 );
588 }
589 }
590 }
591 }
592
593 let backoff = ExponentialBackoff::builder()
594 .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
595 .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
596 .max_retries(cfg.max_attempts)
597 .with_jitter()
598 .build();
599
600 let mut attempt: u32 = 0;
601
602 for delay in backoff {
603 if Arc::strong_count(self) <= 1 {
604 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
605 return;
606 }
607
608 if self.auto_reconnect_cancelled(generation) {
609 tracing::debug!(
610 "Stopping signaling auto-reconnect cycle after explicit disconnect"
611 );
612 return;
613 }
614
615 if self.connected.load(Ordering::Acquire) {
616 tracing::debug!("Already connected, aborting reconnect cycle");
617 return;
618 }
619
620 attempt += 1;
621 let _ = self.event_tx.send(SignalingEvent::ConnectStart { attempt });
622
623 match self.connect_once_for_auto_reconnect(generation).await {
624 Ok(()) => {
625 tracing::info!("✅ Signaling reconnect succeeded on attempt {attempt}");
626 return;
627 }
628 Err(e) => {
629 if self.auto_reconnect_cancelled(generation) {
630 tracing::debug!(
631 "Stopping signaling auto-reconnect cycle after explicit disconnect"
632 );
633 return;
634 }
635
636 tracing::warn!(
637 "❌ Reconnect attempt {attempt} failed: {e}, retrying in {delay:?}"
638 );
639 tokio::select! {
640 _ = tokio::time::sleep(delay) => {}
641 _ = self.reconnect_notify.notified() => {
642 tracing::debug!("Explicit reconnect request interrupted reconnect backoff");
643 }
644 }
645 if Arc::strong_count(self) <= 1 {
646 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
647 return;
648 }
649 if self.auto_reconnect_cancelled(generation) {
650 tracing::debug!(
651 "Stopping signaling auto-reconnect cycle after explicit disconnect"
652 );
653 return;
654 }
655 }
656 }
657 }
658
659 tracing::error!("Reconnect failed after {attempt} attempts, entering cooldown");
661 let cooldown = std::time::Duration::from_secs(cfg.max_delay.max(1) * 2);
662 tokio::select! {
663 _ = tokio::time::sleep(cooldown) => {}
664 _ = self.reconnect_notify.notified() => {
665 tracing::debug!("Explicit reconnect request interrupted reconnect cooldown");
666 }
667 }
668 if self.auto_reconnect_cancelled(generation) {
669 tracing::debug!(
670 "Signaling auto-reconnect cooldown ended after explicit disconnect suppression"
671 );
672 }
673 }
675
676 #[cfg(feature = "test-utils")]
684 pub async fn connect_to(url: &str) -> NetworkResult<Arc<Self>> {
685 let config = SignalingConfig {
686 server_url: url.parse()?,
687 connection_timeout: 5,
688 heartbeat_interval: 30,
689 reconnect_config: ReconnectConfig::default(),
690 auth_config: None,
691 webrtc_role: None,
692 };
693
694 let client = Arc::new(Self::new(config));
695 client.start_reconnect_manager();
696 client.connect().await?;
697 Ok(client)
698 }
699
700 #[cfg(feature = "test-utils")]
706 pub async fn connect_to_with_identity(
707 url: &str,
708 actor_id: ActrId,
709 credential_state: CredentialState,
710 ) -> NetworkResult<Arc<Self>> {
711 let config = SignalingConfig {
712 server_url: url.parse()?,
713 connection_timeout: 5,
714 heartbeat_interval: 30,
715 reconnect_config: ReconnectConfig::default(),
716 auth_config: None,
717 webrtc_role: None,
718 };
719
720 let client = Arc::new(Self::new(config));
721 client.set_actor_id(actor_id).await;
722 client.set_credential_state(credential_state).await;
723 client.start_reconnect_manager();
724 client.connect().await?;
725 Ok(client)
726 }
727
728 async fn next_envelope_id(&self) -> String {
730 let mut counter = self.envelope_counter.lock().await;
731 *counter += 1;
732 format!("env-{}", *counter)
733 }
734
735 async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
737 SignalingEnvelope {
738 envelope_version: 1,
739 envelope_id: self.next_envelope_id().await,
740 reply_for: None,
741 timestamp: prost_types::Timestamp {
742 seconds: chrono::Utc::now().timestamp(),
743 nanos: 0,
744 },
745 traceparent: None,
746 tracestate: None,
747 flow: Some(flow),
748 }
749 }
750
751 async fn reset_inbound_channel(&self) {
753 self.drop_pending_replies("inbound channel reset").await;
754 self.drop_pending_pongs("inbound channel reset").await;
755
756 let (tx, rx) = mpsc::unbounded_channel();
757 *self.inbound_tx.lock().await = tx;
758 *self.inbound_rx.lock().await = rx;
759 }
760
761 async fn drop_pending_replies(&self, reason: &'static str) {
762 let dropped = {
763 let mut pending = self.pending_replies.lock().await;
764 let dropped = pending.len();
765 pending.clear();
766 dropped
767 };
768
769 if dropped > 0 {
770 tracing::debug!(reason, dropped, "Dropping pending signaling reply waiters");
771 }
772 }
773
774 async fn drop_pending_pongs(&self, reason: &'static str) {
775 let dropped = {
776 let mut pending = self.pending_pongs.lock().await;
777 let dropped = pending.len();
778 pending.clear();
779 dropped
780 };
781
782 if dropped > 0 {
783 tracing::debug!(reason, dropped, "Dropping pending signaling pong waiters");
784 }
785 }
786
787 async fn build_url_with_identity(&self) -> Url {
792 let mut url = self.config.server_url.clone();
793 let actor_id_opt = self.actor_id.lock().await.clone();
794 if let Some(actor_id) = actor_id_opt {
795 let actor_str = actr_protocol::ActrId::to_string_repr(&actor_id);
796 url.query_pairs_mut().append_pair("actor_id", &actor_str);
797 }
798
799 let cred_state_opt = self.credential_state.lock().await.clone();
801 if let Some(cred_state) = cred_state_opt {
802 let cred = cred_state.credential().await;
803 let claims_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.claims);
804 let sig_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.signature);
805 url.query_pairs_mut()
806 .append_pair("key_id", &cred.key_id.to_string())
807 .append_pair("claims", &claims_b64)
808 .append_pair("signature", &sig_b64);
809 }
810
811 if let Some(role) = &self.config.webrtc_role {
813 url.query_pairs_mut().append_pair("webrtc_role", role);
814 }
815
816 url
817 }
818
819 fn redact_signaling_url_for_log(url: &Url) -> String {
820 let mut redacted = url.clone();
821 let pairs: Vec<(String, String)> = redacted
822 .query_pairs()
823 .map(|(key, value)| {
824 let redacted_value = match key.to_ascii_lowercase().as_str() {
825 "claims" | "signature" | "token" | "authorization" | "bearer"
826 | "access_token" | "api_key" => "REDACTED".to_string(),
827 _ => value.into_owned(),
828 };
829 (key.into_owned(), redacted_value)
830 })
831 .collect();
832
833 redacted.set_query(None);
834 if !pairs.is_empty() {
835 let mut query = redacted.query_pairs_mut();
836 for (key, value) in pairs {
837 query.append_pair(&key, &value);
838 }
839 }
840
841 redacted.to_string()
842 }
843
844 fn auto_reconnect_cancelled(&self, generation: u64) -> bool {
845 self.auto_reconnect_suppressed.load(Ordering::Acquire)
846 || self.reconnect_generation.load(Ordering::Acquire) != generation
847 }
848
849 async fn establish_connection_once(&self) -> NetworkResult<()> {
853 self.establish_connection_once_with_intent(ConnectIntent::Explicit)
854 .await
855 }
856
857 async fn establish_connection_once_for_auto_reconnect(
858 &self,
859 generation: u64,
860 ) -> NetworkResult<()> {
861 self.establish_connection_once_with_intent(ConnectIntent::AutoReconnect { generation })
862 .await
863 }
864
865 async fn establish_connection_once_with_intent(
866 &self,
867 intent: ConnectIntent,
868 ) -> NetworkResult<()> {
869 if self.connected.load(Ordering::Acquire) {
871 tracing::debug!("Connection already established, skipping establish_connection_once()");
872 return Ok(());
873 }
874
875 let url = self.build_url_with_identity().await;
876 let timeout_secs = self.config.connection_timeout;
877 tracing::debug!(
878 "Establishing connection to URL: {}",
879 Self::redact_signaling_url_for_log(&url)
880 );
881 let config = WebSocketConfig::default().write_buffer_size(0);
883 let connect_result = if timeout_secs == 0 {
885 connect_async_with_config(url.as_str(), Some(config), false).await
886 } else {
887 let timeout_duration = std::time::Duration::from_secs(timeout_secs);
888 tokio::time::timeout(
889 timeout_duration,
890 connect_async_with_config(url.as_str(), Some(config), false),
891 )
892 .await
893 .map_err(|_| {
894 NetworkError::ConnectionError(format!(
895 "Signaling connect timeout after {}s",
896 timeout_secs
897 ))
898 })?
899 }?;
900
901 let (ws_stream, _) = connect_result;
902
903 let (sink, stream) = ws_stream.split();
905
906 if let ConnectIntent::AutoReconnect { generation } = intent
907 && self.auto_reconnect_cancelled(generation)
908 {
909 tracing::debug!(
910 generation,
911 "Discarding completed signaling auto-reconnect after explicit disconnect"
912 );
913 let mut sink = sink;
914 if let Err(e) = sink.close().await {
915 tracing::warn!(
916 "Signaling auto-reconnect socket close failed after cancellation: {}",
917 e
918 );
919 }
920 return Err(NetworkError::ConnectionError(
921 "Signaling auto-reconnect was cancelled by explicit disconnect".to_string(),
922 ));
923 }
924
925 *self.ws_sink.lock().await = Some(sink);
926 *self.ws_stream.lock().await = Some(stream);
927 self.connected.store(true, Ordering::Release);
928 self.auto_reconnect_suppressed
929 .store(false, Ordering::Release);
930 self.last_pong.store(current_unix_secs(), Ordering::Release);
931 self.invoke_hook(HookEvent::SignalingConnected).await;
933 let _ = self.event_tx.send(SignalingEvent::Connected);
934
935 self.stats.connections.fetch_add(1, Ordering::Relaxed);
936
937 Ok(())
938 }
939
940 async fn connect_with_retries(&self) -> NetworkResult<()> {
942 use actr_framework::ExponentialBackoff;
943
944 let cfg = &self.config.reconnect_config;
945
946 if !cfg.enabled {
948 return self.establish_connection_once().await;
949 }
950
951 let backoff = ExponentialBackoff::builder()
952 .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
953 .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
954 .max_retries(cfg.max_attempts)
955 .with_jitter()
956 .build();
957
958 let mut last_err = None;
959
960 for (attempt, delay) in std::iter::once(std::time::Duration::ZERO)
962 .chain(backoff)
963 .enumerate()
964 {
965 let attempt = attempt as u32 + 1;
966 self.invoke_hook(HookEvent::SignalingConnectStart { attempt })
967 .await;
968 if delay > std::time::Duration::ZERO {
969 tracing::info!("Retry signaling connect after {delay:?} (attempt {attempt})");
970 tokio::select! {
971 _ = tokio::time::sleep(delay) => {}
972 _ = self.reconnect_notify.notified() => {
973 tracing::debug!("Explicit reconnect request interrupted signaling connect backoff");
974 }
975 }
976 }
977
978 match self.establish_connection_once().await {
979 Ok(()) => return Ok(()),
980 Err(e) => {
981 tracing::warn!("Signaling connect attempt {attempt} failed: {e:?}");
982 last_err = Some(e);
983 }
984 }
985 }
986
987 let total = cfg.max_attempts + 1; tracing::error!("Signaling connect failed after {total} attempts, giving up");
989 Err(last_err.unwrap_or_else(|| {
990 NetworkError::ConnectionError("All connection attempts failed".to_string())
991 }))
992 }
993
994 #[cfg_attr(
996 feature = "opentelemetry",
997 tracing::instrument(skip_all, fields(envelope_id = %envelope.envelope_id))
998 )]
999 async fn send_envelope_and_wait_response(
1000 &self,
1001 envelope: SignalingEnvelope,
1002 ) -> NetworkResult<SignalingEnvelope> {
1003 let reply_for = envelope.envelope_id.clone();
1004
1005 let (tx, rx) = oneshot::channel();
1007 self.pending_replies
1008 .lock()
1009 .await
1010 .insert(reply_for.clone(), tx);
1011
1012 if let Err(e) = self.send_envelope(envelope).await {
1013 self.pending_replies.lock().await.remove(&reply_for);
1015 return Err(e);
1016 }
1017
1018 let result =
1019 tokio::time::timeout(std::time::Duration::from_secs(RESPONSE_TIMEOUT_SECS), rx).await;
1020 if result.is_err() {
1022 self.pending_replies.lock().await.remove(&reply_for);
1023 }
1024
1025 let response_envelope = result
1026 .map_err(|_| {
1027 NetworkError::ConnectionError(
1028 "Timed out waiting for signaling response".to_string(),
1029 )
1030 })?
1031 .map_err(|_| {
1032 NetworkError::ConnectionError(
1033 "Receiver dropped while waiting for signaling response".to_string(),
1034 )
1035 })?;
1036
1037 Ok(response_envelope)
1038 }
1039
1040 async fn start_receiver(&self) {
1042 let mut stream_guard = self.ws_stream.lock().await;
1043 if stream_guard.is_none() {
1044 return;
1045 }
1046
1047 let mut stream = stream_guard.take().expect("stream exists");
1048 let pending = self.pending_replies.clone();
1049 let inbound_tx = { self.inbound_tx.lock().await.clone() };
1050 let stats = self.stats.clone();
1051 let connected = self.connected.clone();
1052 let event_tx = self.event_tx.clone();
1053 let last_pong = self.last_pong.clone();
1054 let pending_pongs = self.pending_pongs.clone();
1055 let reconnect_notify = self.reconnect_notify.clone();
1056 let reconnect_enabled = self.config.reconnect_config.enabled;
1057 let hook_callback = self.hook_callback.get().cloned();
1058 let handle = tokio::spawn(async move {
1059 while let Some(msg) = stream.next().await {
1060 match msg {
1061 Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
1062 last_pong.store(current_unix_secs(), Ordering::Release);
1064 match SignalingEnvelope::decode(&data[..]) {
1065 Ok(envelope) => {
1066 #[cfg(feature = "opentelemetry")]
1067 let span = {
1068 let span = tracing::info_span!("signaling.receive_envelope", envelope_id = %envelope.envelope_id);
1069 span.set_parent(extract_trace_context(&envelope));
1070 span
1071 };
1072
1073 stats.messages_received.fetch_add(1, Ordering::Relaxed);
1074 tracing::debug!("Received message: {:?}", envelope);
1075 if let Some(reply_for) = envelope.reply_for.clone() {
1076 if let Some(sender) = pending.lock().await.remove(&reply_for) {
1077 #[cfg(feature = "opentelemetry")]
1078 let _ = span.enter();
1079 if let Err(e) = sender.send(envelope) {
1080 stats.errors.fetch_add(1, Ordering::Relaxed);
1081 tracing::warn!(
1082 "Failed to send reply envelope to waiter: {e:?}",
1083 );
1084 }
1085 continue;
1086 }
1087 }
1088 tracing::debug!(
1089 "Unmatched or push message -> forward to inbound channel"
1090 );
1091 if let Err(e) = inbound_tx.send(envelope) {
1093 stats.errors.fetch_add(1, Ordering::Relaxed);
1094 tracing::warn!(
1095 "Failed to send envelope to inbound channel: {e:?}"
1096 );
1097 }
1098 }
1099 Err(e) => {
1100 stats.errors.fetch_add(1, Ordering::Relaxed);
1101 tracing::warn!("Failed to decode SignalingEnvelope: {e}");
1102 }
1103 }
1104 }
1105 Ok(tokio_tungstenite::tungstenite::Message::Pong(payload)) => {
1106 tracing::debug!("Received pong");
1107 last_pong.store(current_unix_secs(), Ordering::Release);
1108 if let Some(sender) = pending_pongs.lock().await.remove(&payload.to_vec()) {
1109 let _ = sender.send(());
1110 }
1111 }
1112 Ok(tokio_tungstenite::tungstenite::Message::Ping(_)) => {
1113 tracing::debug!("Received ping");
1114 last_pong.store(current_unix_secs(), Ordering::Release);
1115 }
1116 Ok(other) => {
1117 tracing::warn!("Received non-binary frame, ignoring: {other:?}");
1118 }
1119 Err(e) => {
1120 stats.errors.fetch_add(1, Ordering::Relaxed);
1121 tracing::error!("Signaling receive error: {e}");
1122 break;
1123 }
1124 }
1125 }
1126
1127 tracing::warn!("Stream terminated");
1128 let was_connected = connected.swap(false, Ordering::AcqRel);
1132 Self::publish_disconnected_transition(
1133 was_connected,
1134 &stats,
1135 &event_tx,
1136 hook_callback,
1137 DisconnectReason::StreamEnded,
1138 reconnect_enabled.then_some(&reconnect_notify),
1139 )
1140 .await;
1141 pending_pongs.lock().await.clear();
1142 });
1143
1144 *self.receiver_task.lock().await = Some(handle);
1145 }
1146
1147 async fn start_ping_task(&self) {
1150 let mut existing = self.ping_task.lock().await;
1151 if let Some(handle) = existing.as_ref() {
1152 if handle.is_finished() {
1153 existing.take();
1154 } else {
1155 return;
1156 }
1157 }
1158
1159 let sink = self.ws_sink.clone();
1160 let connected = self.connected.clone();
1161 let stats = self.stats.clone();
1162 let event_tx = self.event_tx.clone();
1163 let last_pong = self.last_pong.clone();
1164 let receiver_task_clone = Arc::clone(&self.receiver_task);
1165 let reconnect_notify = self.reconnect_notify.clone();
1166 let reconnect_enabled = self.config.reconnect_config.enabled;
1167 let hook_callback = self.hook_callback.get().cloned();
1168
1169 let handle = tokio::spawn(async move {
1170 loop {
1171 tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
1172
1173 if !connected.load(Ordering::Acquire) {
1174 break;
1175 }
1176
1177 let mut disconnect_reason = None;
1179 {
1180 let mut sink_guard = sink.lock().await;
1181 if let Some(sink) = sink_guard.as_mut() {
1182 match tokio::time::timeout(
1183 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1184 sink.send(tokio_tungstenite::tungstenite::Message::Ping(
1185 Vec::new().into(),
1186 )),
1187 )
1188 .await
1189 {
1190 Ok(Ok(())) => {}
1191 Ok(Err(e)) => {
1192 tracing::warn!("Signaling ping send failed: {e}");
1193 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1194 }
1195 Err(_) => {
1196 tracing::warn!("Signaling ping send timed out");
1197 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1198 }
1199 }
1200 } else {
1201 tracing::warn!("Signaling not connected");
1202 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1203 }
1204 }
1205
1206 if let Some(reason) = disconnect_reason {
1207 let was_connected = connected.swap(false, Ordering::AcqRel);
1208 Self::publish_disconnected_transition(
1209 was_connected,
1210 &stats,
1211 &event_tx,
1212 hook_callback.clone(),
1213 reason,
1214 reconnect_enabled.then_some(&reconnect_notify),
1215 )
1216 .await;
1217 break;
1218 }
1219
1220 let now = current_unix_secs();
1222 let last = last_pong.load(Ordering::Acquire);
1223 if now.saturating_sub(last) > PONG_TIMEOUT_SECS {
1224 tracing::warn!(
1225 "Signaling pong timeout (last seen {}s ago), marking disconnected",
1226 now.saturating_sub(last)
1227 );
1228 if let Some(handle) = receiver_task_clone.lock().await.take() {
1229 handle.abort();
1230 }
1231 let was_connected = connected.swap(false, Ordering::AcqRel);
1232 Self::publish_disconnected_transition(
1233 was_connected,
1234 &stats,
1235 &event_tx,
1236 hook_callback.clone(),
1237 DisconnectReason::PongTimeout,
1238 reconnect_enabled.then_some(&reconnect_notify),
1239 )
1240 .await;
1241 break;
1242 }
1243 }
1244 });
1245
1246 *existing = Some(handle);
1247 }
1248
1249 async fn disconnect_internal(&self, suppress_auto_reconnect: bool) -> NetworkResult<()> {
1250 if suppress_auto_reconnect {
1251 self.reconnect_generation.fetch_add(1, Ordering::AcqRel);
1252 self.auto_reconnect_suppressed
1253 .store(true, Ordering::Release);
1254 self.reconnect_notify.notify_waiters();
1255 }
1256
1257 self.drop_pending_replies("signaling disconnect").await;
1258 self.drop_pending_pongs("signaling disconnect").await;
1259 let was_connected = self.connected.swap(false, Ordering::AcqRel);
1260
1261 let ping_handle = match tokio::time::timeout(
1266 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1267 self.ping_task.lock(),
1268 )
1269 .await
1270 {
1271 Ok(mut task_guard) => task_guard.take(),
1272 Err(_) => {
1273 tracing::warn!("Timed out waiting for signaling ping task lock during disconnect");
1274 None
1275 }
1276 };
1277 if let Some(handle) = ping_handle {
1278 handle.abort();
1279 }
1280
1281 let receiver_handle = match tokio::time::timeout(
1282 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1283 self.receiver_task.lock(),
1284 )
1285 .await
1286 {
1287 Ok(mut task_guard) => task_guard.take(),
1288 Err(_) => {
1289 tracing::warn!(
1290 "Timed out waiting for signaling receiver task lock during disconnect"
1291 );
1292 None
1293 }
1294 };
1295 if let Some(handle) = receiver_handle {
1296 handle.abort();
1297 }
1298
1299 let sink = match tokio::time::timeout(
1303 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1304 self.ws_sink.lock(),
1305 )
1306 .await
1307 {
1308 Ok(mut sink_guard) => sink_guard.take(),
1309 Err(_) => {
1310 tracing::warn!(
1311 "Timed out waiting for signaling WebSocket sink lock during disconnect"
1312 );
1313 None
1314 }
1315 };
1316
1317 if let Some(mut sink) = sink {
1318 match tokio::time::timeout(
1319 std::time::Duration::from_secs(DISCONNECT_CLOSE_TIMEOUT_SECS),
1320 sink.close(),
1321 )
1322 .await
1323 {
1324 Ok(Ok(())) => {}
1325 Ok(Err(e)) => {
1326 tracing::warn!("Signaling WebSocket close failed during disconnect: {}", e);
1327 }
1328 Err(_) => {
1329 tracing::warn!(
1330 "Signaling WebSocket close timed out during disconnect; continuing cleanup"
1331 );
1332 }
1333 }
1334 }
1335
1336 match tokio::time::timeout(
1337 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1338 self.ws_stream.lock(),
1339 )
1340 .await
1341 {
1342 Ok(mut stream_guard) => {
1343 stream_guard.take();
1344 }
1345 Err(_) => {
1346 tracing::warn!(
1347 "Timed out waiting for signaling WebSocket stream lock during disconnect"
1348 );
1349 }
1350 }
1351
1352 self.reset_inbound_channel().await;
1353
1354 Self::publish_disconnected_transition(
1356 was_connected,
1357 &self.stats,
1358 &self.event_tx,
1359 self.hook_callback.get().cloned(),
1360 DisconnectReason::Manual,
1361 None,
1362 )
1363 .await;
1364
1365 Ok(())
1366 }
1367
1368 async fn connect_once_for_auto_reconnect(&self, generation: u64) -> NetworkResult<()> {
1369 if self.auto_reconnect_cancelled(generation) {
1370 return Err(NetworkError::ConnectionError(
1371 "Signaling auto-reconnect was cancelled".to_string(),
1372 ));
1373 }
1374
1375 if self.connected.load(Ordering::Acquire) {
1376 tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1377 return Ok(());
1378 }
1379
1380 match self
1381 .connecting
1382 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1383 {
1384 Ok(_) => {}
1385 Err(_) => {
1386 if self.connected.load(Ordering::Acquire) {
1387 tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1388 return Ok(());
1389 }
1390
1391 tracing::debug!(
1392 "Another connection attempt in progress, waiting for state change..."
1393 );
1394 let result = self.wait_for_connection_result().await;
1395 if self.auto_reconnect_cancelled(generation) {
1396 return Err(NetworkError::ConnectionError(
1397 "Signaling auto-reconnect was cancelled".to_string(),
1398 ));
1399 }
1400 return result;
1401 }
1402 }
1403
1404 if self.auto_reconnect_cancelled(generation) {
1405 self.connecting.store(false, Ordering::Release);
1406 return Err(NetworkError::ConnectionError(
1407 "Signaling auto-reconnect was cancelled".to_string(),
1408 ));
1409 }
1410
1411 if self.connected.load(Ordering::Acquire) {
1412 tracing::debug!("Connection completed by another task while acquiring lock");
1413 self.connecting.store(false, Ordering::Release);
1414 return Ok(());
1415 }
1416
1417 tracing::debug!(
1418 generation,
1419 "Acquired connection lock, establishing one auto-reconnect signaling attempt..."
1420 );
1421
1422 let result = self
1423 .establish_connection_once_for_auto_reconnect(generation)
1424 .await;
1425 self.connecting.store(false, Ordering::Release);
1426
1427 match result {
1428 Ok(()) => {
1429 if self.auto_reconnect_cancelled(generation) {
1430 self.disconnect_internal(false).await?;
1431 return Err(NetworkError::ConnectionError(
1432 "Signaling auto-reconnect was cancelled".to_string(),
1433 ));
1434 }
1435 self.start_receiver().await;
1436 self.start_ping_task().await;
1437 Ok(())
1438 }
1439 Err(e) => {
1440 if !self.auto_reconnect_cancelled(generation) {
1441 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1442 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1443 });
1444 tracing::error!("Connection attempt failed: {e}");
1445 }
1446 Err(e)
1447 }
1448 }
1449 }
1450
1451 async fn wait_for_connection_result(&self) -> NetworkResult<()> {
1455 let mut event_rx = self.event_tx.subscribe();
1456 let deadline = tokio::time::Instant::now()
1457 + std::time::Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS);
1458
1459 loop {
1460 tokio::select! {
1461 _ = tokio::time::sleep_until(deadline) => {
1462 if self.connected.load(Ordering::Acquire) {
1464 tracing::debug!("Connection succeeded just before timeout");
1465 return Ok(());
1466 }
1467 return Err(NetworkError::ConnectionError(
1468 "Timeout waiting for concurrent connection attempt".to_string(),
1469 ));
1470 }
1471 result = event_rx.recv() => {
1472 match result {
1473 Ok(SignalingEvent::Connected) => {
1474 tracing::debug!("Connection established by another task");
1475 return Ok(());
1476 }
1477 Ok(SignalingEvent::Disconnected { reason }) => {
1478 return Err(NetworkError::ConnectionError(format!(
1479 "Concurrent signaling connection failed: {reason:?}"
1480 )));
1481 }
1482 Ok(_) => continue, Err(broadcast::error::RecvError::Lagged(n)) => {
1484 tracing::warn!("Event receiver lagged by {n} events");
1485 if self.connected.load(Ordering::Acquire) {
1487 return Ok(());
1488 }
1489 continue;
1490 }
1491 Err(broadcast::error::RecvError::Closed) => {
1492 return Err(NetworkError::ConnectionError(
1493 "Event channel closed while waiting for connection".to_string(),
1494 ));
1495 }
1496 }
1497 }
1498 }
1499 }
1500 }
1501}
1502
1503#[async_trait]
1504impl SignalingClient for WebSocketSignalingClient {
1505 async fn connect(&self) -> NetworkResult<()> {
1506 match self
1511 .connecting
1512 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1513 {
1514 Ok(_) => {
1515 }
1518 Err(_) => {
1519 if self.connected.load(Ordering::Acquire) {
1522 tracing::debug!("Already connected, skipping connect()");
1523 return Ok(());
1524 }
1525
1526 tracing::debug!(
1528 "Another connection attempt in progress, waiting for state change..."
1529 );
1530 return self.wait_for_connection_result().await;
1531 }
1532 }
1533
1534 if self.connected.load(Ordering::Acquire) {
1539 tracing::debug!("Connection completed by another task while acquiring lock");
1540 self.connecting.store(false, Ordering::Release);
1541 return Ok(());
1542 }
1543
1544 tracing::debug!("Acquired connection lock, establishing connection...");
1545
1546 let result = self.connect_with_retries().await;
1548
1549 self.connecting.store(false, Ordering::Release);
1551
1552 match result {
1554 Ok(()) => {
1555 self.start_receiver().await;
1556 self.start_ping_task().await;
1557 Ok(())
1558 }
1559 Err(e) => {
1560 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1562 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1563 });
1564 tracing::error!("Connection failed: {e}");
1565 Err(e)
1566 }
1567 }
1568 }
1569
1570 async fn connect_once(&self) -> NetworkResult<()> {
1571 loop {
1572 if self.connected.load(Ordering::Acquire) {
1573 tracing::debug!("Already connected, skipping connect_once()");
1574 return Ok(());
1575 }
1576
1577 match self
1578 .connecting
1579 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1580 {
1581 Ok(_) => break,
1582 Err(_) => {
1583 if self.connected.load(Ordering::Acquire) {
1584 tracing::debug!("Already connected, skipping connect_once()");
1585 return Ok(());
1586 }
1587
1588 tracing::debug!(
1589 "Another connection attempt in progress, waiting for state change..."
1590 );
1591 match self.wait_for_connection_result().await {
1592 Ok(()) => return Ok(()),
1593 Err(e)
1594 if !self.connected.load(Ordering::Acquire)
1595 && !self.connecting.load(Ordering::Acquire) =>
1596 {
1597 tracing::debug!(
1598 "Concurrent signaling connection failed; explicit connect_once will retry immediately: {e}"
1599 );
1600 continue;
1601 }
1602 Err(e) => return Err(e),
1603 }
1604 }
1605 }
1606 }
1607
1608 if self.connected.load(Ordering::Acquire) {
1609 tracing::debug!("Connection completed by another task while acquiring lock");
1610 self.connecting.store(false, Ordering::Release);
1611 return Ok(());
1612 }
1613
1614 tracing::debug!(
1615 "Acquired connection lock, establishing one signaling connection attempt..."
1616 );
1617
1618 let result = self.establish_connection_once().await;
1619 self.connecting.store(false, Ordering::Release);
1620
1621 match result {
1622 Ok(()) => {
1623 self.start_receiver().await;
1624 self.start_ping_task().await;
1625 Ok(())
1626 }
1627 Err(e) => {
1628 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1629 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1630 });
1631 tracing::error!("Connection attempt failed: {e}");
1632 Err(e)
1633 }
1634 }
1635 }
1636
1637 fn schedule_auto_reconnect(&self) {
1638 if !self.config.reconnect_config.enabled {
1639 tracing::debug!("Skipping signaling auto-reconnect schedule; config disabled");
1640 return;
1641 }
1642
1643 self.auto_reconnect_suppressed
1644 .store(false, Ordering::Release);
1645 self.reconnect_notify.notify_one();
1646 }
1647
1648 async fn disconnect(&self) -> NetworkResult<()> {
1649 self.disconnect_internal(true).await
1650 }
1651
1652 async fn probe_alive(&self, timeout: Duration) -> NetworkResult<()> {
1653 if !self.connected.load(Ordering::Acquire) {
1654 return Err(NetworkError::ConnectionError(
1655 "Signaling client is not connected".to_string(),
1656 ));
1657 }
1658
1659 let probe_id = self.probe_counter.fetch_add(1, Ordering::Relaxed) + 1;
1660 let payload =
1661 format!("actr-signaling-probe-{probe_id}-{}", current_unix_secs()).into_bytes();
1662 let (tx, rx) = oneshot::channel();
1663 self.pending_pongs.lock().await.insert(payload.clone(), tx);
1664
1665 let send_timeout = std::cmp::min(
1666 timeout,
1667 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1668 );
1669 let ping_payload = payload.clone();
1670 let send_result = tokio::time::timeout(send_timeout, async {
1671 let mut sink_guard = self.ws_sink.lock().await;
1672 match sink_guard.as_mut() {
1673 Some(sink) => sink
1674 .send(tokio_tungstenite::tungstenite::Message::Ping(
1675 ping_payload.into(),
1676 ))
1677 .await
1678 .map_err(|e| {
1679 NetworkError::ConnectionError(format!("Signaling probe ping failed: {e}"))
1680 }),
1681 None => Err(NetworkError::ConnectionError(
1682 "Signaling probe failed: WebSocket sink is not available".to_string(),
1683 )),
1684 }
1685 })
1686 .await
1687 .unwrap_or_else(|_| {
1688 Err(NetworkError::TimeoutError(format!(
1689 "Timed out sending signaling probe ping after {}ms",
1690 send_timeout.as_millis()
1691 )))
1692 });
1693
1694 if let Err(e) = send_result {
1695 self.pending_pongs.lock().await.remove(&payload);
1696 let was_connected = self.connected.swap(false, Ordering::AcqRel);
1697 Self::publish_disconnected_transition(
1698 was_connected,
1699 &self.stats,
1700 &self.event_tx,
1701 self.hook_callback.get().cloned(),
1702 DisconnectReason::PingSendFailed,
1703 None,
1704 )
1705 .await;
1706 return Err(e);
1707 }
1708
1709 match tokio::time::timeout(timeout, rx).await {
1710 Ok(Ok(())) => {
1711 self.last_pong.store(current_unix_secs(), Ordering::Release);
1712 Ok(())
1713 }
1714 Ok(Err(_)) => {
1715 self.pending_pongs.lock().await.remove(&payload);
1716 Err(NetworkError::ConnectionError(
1717 "Signaling probe pong waiter dropped".to_string(),
1718 ))
1719 }
1720 Err(_) => {
1721 self.pending_pongs.lock().await.remove(&payload);
1722 Err(NetworkError::TimeoutError(format!(
1723 "Timed out waiting for signaling probe pong after {}ms",
1724 timeout.as_millis()
1725 )))
1726 }
1727 }
1728 }
1729
1730 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1731 async fn send_register_request(
1732 &self,
1733 request: RegisterRequest,
1734 ) -> NetworkResult<RegisterResponse> {
1735 let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
1737 payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
1738 });
1739
1740 let envelope = self.create_envelope(flow).await;
1741 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1742
1743 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1744 {
1745 if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
1746 server_to_actr.payload
1747 {
1748 return Ok(response);
1749 }
1750 }
1751
1752 Err(NetworkError::ConnectionError(
1753 "Invalid registration response".to_string(),
1754 ))
1755 }
1756
1757 #[cfg_attr(
1758 feature = "opentelemetry",
1759 tracing::instrument(skip_all, fields(actor_id = %actor_id))
1760 )]
1761 async fn send_unregister_request(
1762 &self,
1763 actor_id: ActrId,
1764 credential: AIdCredential,
1765 reason: Option<String>,
1766 ) -> NetworkResult<UnregisterResponse> {
1767 let request = UnregisterRequest {
1769 actr_id: actor_id.clone(),
1770 reason,
1771 };
1772
1773 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1775 source: actor_id,
1776 credential,
1777 payload: Some(actr_to_signaling::Payload::UnregisterRequest(request)),
1778 });
1779
1780 let envelope = self.create_envelope(flow).await;
1782 self.send_envelope(envelope).await?;
1783
1784 Ok(UnregisterResponse {
1789 result: Some(actr_protocol::unregister_response::Result::Success(
1790 actr_protocol::unregister_response::UnregisterOk {},
1791 )),
1792 })
1793 }
1794
1795 #[cfg_attr(
1796 feature = "opentelemetry",
1797 tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1798 )]
1799 async fn send_heartbeat(
1800 &self,
1801 actor_id: ActrId,
1802 credential: AIdCredential,
1803 availability: ServiceAvailabilityState,
1804 power_reserve: f32,
1805 mailbox_backlog: f32,
1806 ) -> NetworkResult<Pong> {
1807 let ping = Ping {
1808 availability: availability as i32,
1809 power_reserve,
1810 mailbox_backlog,
1811 sticky_client_ids: vec![], };
1813
1814 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1815 source: actor_id,
1816 credential,
1817 payload: Some(actr_to_signaling::Payload::Ping(ping)),
1818 });
1819
1820 let envelope = self.create_envelope(flow).await;
1821 let reply_for = envelope.envelope_id.clone();
1822
1823 let (tx, rx) = oneshot::channel();
1825 self.pending_replies
1826 .lock()
1827 .await
1828 .insert(reply_for.clone(), tx);
1829
1830 if let Err(e) = self.send_envelope(envelope).await {
1831 self.pending_replies.lock().await.remove(&reply_for);
1833 return Err(e);
1834 }
1835
1836 let response_envelope = rx.await.map_err(|_| {
1838 NetworkError::ConnectionError(
1839 "Receiver dropped while waiting for heartbeat response".to_string(),
1840 )
1841 })?;
1842
1843 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1845 {
1846 match server_to_actr.payload {
1847 Some(signaling_to_actr::Payload::Pong(pong)) => {
1848 return Ok(pong);
1849 }
1850 Some(signaling_to_actr::Payload::Error(err)) => {
1851 if err.code == 401 {
1853 return Err(NetworkError::CredentialExpired(err.message));
1854 }
1855 return Err(NetworkError::AuthenticationError(format!(
1856 "{} ({})",
1857 err.message, err.code
1858 )));
1859 }
1860 _ => {}
1861 }
1862 }
1863
1864 Err(NetworkError::ConnectionError(
1865 "Received response but not a Pong message".to_string(),
1866 ))
1867 }
1868
1869 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1870 async fn send_route_candidates_request(
1871 &self,
1872 actor_id: ActrId,
1873 credential: AIdCredential,
1874 request: RouteCandidatesRequest,
1875 ) -> NetworkResult<RouteCandidatesResponse> {
1876 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1877 source: actor_id,
1878 credential,
1879 payload: Some(actr_to_signaling::Payload::RouteCandidatesRequest(request)),
1880 });
1881
1882 let envelope = self.create_envelope(flow).await;
1883 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1884
1885 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1886 {
1887 match server_to_actr.payload {
1888 Some(signaling_to_actr::Payload::RouteCandidatesResponse(response)) => {
1889 return Ok(response);
1890 }
1891 Some(signaling_to_actr::Payload::Error(err)) => {
1892 return Err(NetworkError::ServiceDiscoveryError(format!(
1893 "{} ({})",
1894 err.message, err.code
1895 )));
1896 }
1897 _ => {}
1898 }
1899 }
1900
1901 Err(NetworkError::ConnectionError(
1902 "Invalid route candidates response".to_string(),
1903 ))
1904 }
1905
1906 async fn get_signing_key(
1907 &self,
1908 actor_id: ActrId,
1909 credential: AIdCredential,
1910 key_id: u32,
1911 ) -> NetworkResult<(u32, Vec<u8>)> {
1912 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1913 source: actor_id,
1914 credential,
1915 payload: Some(actr_to_signaling::Payload::GetSigningKeyRequest(
1916 GetSigningKeyRequest { key_id },
1917 )),
1918 });
1919
1920 let envelope = self.create_envelope(flow).await;
1921 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1922
1923 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1924 {
1925 match server_to_actr.payload {
1926 Some(signaling_to_actr::Payload::GetSigningKeyResponse(resp)) => {
1927 return Ok((resp.key_id, resp.pubkey.to_vec()));
1928 }
1929 Some(signaling_to_actr::Payload::Error(err)) => {
1930 return Err(NetworkError::ConnectionError(format!(
1931 "get_signing_key failed: {} ({})",
1932 err.message, err.code
1933 )));
1934 }
1935 _ => {}
1936 }
1937 }
1938
1939 Err(NetworkError::ConnectionError(
1940 "get_signing_key: invalid response".to_string(),
1941 ))
1942 }
1943
1944 #[cfg_attr(
1945 feature = "opentelemetry",
1946 tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1947 )]
1948 async fn send_credential_update_request(
1949 &self,
1950 actor_id: ActrId,
1951 credential: AIdCredential,
1952 ) -> NetworkResult<RegisterResponse> {
1953 let request = CredentialUpdateRequest {
1954 actr_id: actor_id.clone(),
1955 };
1956
1957 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1958 source: actor_id,
1959 credential,
1960 payload: Some(actr_to_signaling::Payload::CredentialUpdateRequest(request)),
1961 });
1962
1963 let envelope = self.create_envelope(flow).await;
1964 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1965
1966 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1967 {
1968 match server_to_actr.payload {
1969 Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
1970 return Ok(response);
1971 }
1972 Some(signaling_to_actr::Payload::Error(err)) => {
1973 return Err(NetworkError::ConnectionError(format!(
1974 "Credential update failed: {} ({})",
1975 err.message, err.code
1976 )));
1977 }
1978 _ => {}
1979 }
1980 }
1981
1982 Err(NetworkError::ConnectionError(
1983 "Invalid credential update response".to_string(),
1984 ))
1985 }
1986
1987 #[cfg_attr(
1988 feature = "opentelemetry",
1989 tracing::instrument(level = "debug", skip_all, fields(envelope_id = %envelope.envelope_id))
1990 )]
1991 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
1992 #[cfg(feature = "opentelemetry")]
1993 let envelope = {
1994 let mut envelope = envelope;
1995 trace::inject_span_context(&tracing::Span::current(), &mut envelope);
1996 envelope
1997 };
1998
1999 if !self.is_connected() {
2002 return Err(NetworkError::ConnectionError(
2003 "Cannot send: WebSocket not connected".to_string(),
2004 ));
2005 }
2006
2007 let mut sink_guard = self.ws_sink.lock().await;
2008
2009 if let Some(sink) = sink_guard.as_mut() {
2010 let mut buf = Vec::new();
2012 envelope.encode(&mut buf)?;
2013 let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
2014 match tokio::time::timeout(
2015 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
2016 sink.send(msg),
2017 )
2018 .await
2019 {
2020 Ok(Ok(())) => {}
2021 Ok(Err(e)) => return Err(e.into()),
2022 Err(_) => {
2023 self.connected.store(false, Ordering::Release);
2024 return Err(NetworkError::ConnectionError(
2025 "Signaling WebSocket send timed out".to_string(),
2026 ));
2027 }
2028 }
2029
2030 self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
2031 tracing::debug!("Stats: {:?}", self.stats.snapshot());
2032 Ok(())
2033 } else {
2034 Err(NetworkError::ConnectionError("Not connected".to_string()))
2035 }
2036 }
2037
2038 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2039 let mut rx = self.inbound_rx.lock().await;
2040 match rx.recv().await {
2041 Some(envelope) => Ok(Some(envelope)),
2042 None => {
2043 tracing::error!("Inbound channel closed");
2044 Err(NetworkError::ConnectionError(
2045 "Inbound channel closed".to_string(),
2046 ))
2047 }
2048 }
2049 }
2050
2051 fn is_connected(&self) -> bool {
2052 self.connected.load(Ordering::Acquire)
2053 }
2054
2055 fn get_stats(&self) -> SignalingStats {
2056 self.stats.snapshot()
2057 }
2058
2059 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2060 self.event_tx.subscribe()
2061 }
2062
2063 async fn set_actor_id(&self, actor_id: ActrId) {
2064 *self.actor_id.lock().await = Some(actor_id);
2065 }
2066
2067 async fn set_credential_state(&self, credential_state: CredentialState) {
2068 *self.credential_state.lock().await = Some(credential_state);
2069 }
2070
2071 async fn clear_identity(&self) {
2072 *self.actor_id.lock().await = None;
2073 *self.credential_state.lock().await = None;
2074 }
2075
2076 fn set_hook_callback(&self, cb: HookCallback) {
2077 let _ = self.hook_callback.set(cb);
2078 }
2079}
2080
2081#[derive(Debug)]
2083pub(crate) struct AtomicSignalingStats {
2084 pub connections: AtomicU64,
2086
2087 pub disconnections: AtomicU64,
2089
2090 pub messages_sent: AtomicU64,
2092
2093 pub messages_received: AtomicU64,
2095
2096 pub heartbeats_sent: AtomicU64,
2099
2100 pub heartbeats_received: AtomicU64,
2103
2104 pub errors: AtomicU64,
2106}
2107
2108impl Default for AtomicSignalingStats {
2109 fn default() -> Self {
2110 Self {
2111 connections: AtomicU64::new(0),
2112 disconnections: AtomicU64::new(0),
2113 messages_sent: AtomicU64::new(0),
2114 messages_received: AtomicU64::new(0),
2115 heartbeats_sent: AtomicU64::new(0),
2116 heartbeats_received: AtomicU64::new(0),
2117 errors: AtomicU64::new(0),
2118 }
2119 }
2120}
2121
2122#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
2124pub struct SignalingStats {
2125 pub connections: u64,
2127
2128 pub disconnections: u64,
2130
2131 pub messages_sent: u64,
2133
2134 pub messages_received: u64,
2136
2137 pub heartbeats_sent: u64,
2139
2140 pub heartbeats_received: u64,
2142
2143 pub errors: u64,
2145}
2146
2147impl AtomicSignalingStats {
2148 pub fn snapshot(&self) -> SignalingStats {
2150 SignalingStats {
2151 connections: self.connections.load(Ordering::Relaxed),
2152 disconnections: self.disconnections.load(Ordering::Relaxed),
2153 messages_sent: self.messages_sent.load(Ordering::Relaxed),
2154 messages_received: self.messages_received.load(Ordering::Relaxed),
2155 heartbeats_sent: self.heartbeats_sent.load(Ordering::Relaxed),
2156 heartbeats_received: self.heartbeats_received.load(Ordering::Relaxed),
2157 errors: self.errors.load(Ordering::Relaxed),
2158 }
2159 }
2160}
2161
2162fn current_unix_secs() -> u64 {
2163 use std::time::{SystemTime, UNIX_EPOCH};
2164 SystemTime::now()
2165 .duration_since(UNIX_EPOCH)
2166 .unwrap_or_default()
2167 .as_secs()
2168}
2169
2170#[cfg(test)]
2171mod tests {
2172 use super::*;
2173 use std::future::Future;
2174 use std::pin::Pin;
2175 use std::sync::atomic::{AtomicUsize, Ordering as UsizeOrdering};
2176
2177 struct FakeSignalingClient {
2179 event_tx: broadcast::Sender<SignalingEvent>,
2180 connected: AtomicBool,
2181 connect_calls: Arc<AtomicUsize>,
2182 actor_id: tokio::sync::Mutex<Option<ActrId>>,
2183 credential_state: tokio::sync::Mutex<Option<CredentialState>>,
2184 }
2185
2186 #[async_trait]
2187 impl SignalingClient for FakeSignalingClient {
2188 async fn connect(&self) -> NetworkResult<()> {
2189 self.connect_calls.fetch_add(1, UsizeOrdering::SeqCst);
2190 Ok(())
2191 }
2192
2193 async fn disconnect(&self) -> NetworkResult<()> {
2194 Ok(())
2195 }
2196
2197 async fn send_register_request(
2198 &self,
2199 _request: RegisterRequest,
2200 ) -> NetworkResult<RegisterResponse> {
2201 unimplemented!("not needed in tests");
2202 }
2203
2204 async fn send_unregister_request(
2205 &self,
2206 _actor_id: ActrId,
2207 _credential: AIdCredential,
2208 _reason: Option<String>,
2209 ) -> NetworkResult<UnregisterResponse> {
2210 unimplemented!("not needed in tests");
2211 }
2212
2213 async fn send_heartbeat(
2214 &self,
2215 _actor_id: ActrId,
2216 _credential: AIdCredential,
2217 _availability: ServiceAvailabilityState,
2218 _power_reserve: f32,
2219 _mailbox_backlog: f32,
2220 ) -> NetworkResult<Pong> {
2221 unimplemented!("not needed in tests");
2222 }
2223
2224 async fn send_route_candidates_request(
2225 &self,
2226 _actor_id: ActrId,
2227 _credential: AIdCredential,
2228 _request: RouteCandidatesRequest,
2229 ) -> NetworkResult<RouteCandidatesResponse> {
2230 unimplemented!("not needed in tests");
2231 }
2232
2233 async fn get_signing_key(
2234 &self,
2235 _actor_id: ActrId,
2236 _credential: AIdCredential,
2237 _key_id: u32,
2238 ) -> NetworkResult<(u32, Vec<u8>)> {
2239 unimplemented!("not needed in tests");
2240 }
2241
2242 async fn send_credential_update_request(
2243 &self,
2244 _actor_id: ActrId,
2245 _credential: AIdCredential,
2246 ) -> NetworkResult<RegisterResponse> {
2247 unimplemented!("not needed in tests");
2248 }
2249
2250 async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
2251 unimplemented!("not needed in tests");
2252 }
2253
2254 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2255 unimplemented!("not needed in tests");
2256 }
2257
2258 fn is_connected(&self) -> bool {
2259 self.connected.load(Ordering::SeqCst)
2260 }
2261
2262 fn get_stats(&self) -> SignalingStats {
2263 SignalingStats::default()
2264 }
2265
2266 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2267 self.event_tx.subscribe()
2268 }
2269
2270 async fn set_actor_id(&self, actor_id: ActrId) {
2271 *self.actor_id.lock().await = Some(actor_id);
2272 }
2273
2274 async fn set_credential_state(&self, credential_state: CredentialState) {
2275 *self.credential_state.lock().await = Some(credential_state);
2276 }
2277
2278 async fn clear_identity(&self) {
2279 *self.actor_id.lock().await = None;
2280 *self.credential_state.lock().await = None;
2281 }
2282 }
2283
2284 fn make_fake_client() -> Arc<FakeSignalingClient> {
2285 let (event_tx, _erx) = broadcast::channel(64);
2286 Arc::new(FakeSignalingClient {
2287 event_tx,
2288 connected: AtomicBool::new(false),
2289 connect_calls: Arc::new(AtomicUsize::new(0)),
2290 actor_id: tokio::sync::Mutex::new(None),
2291 credential_state: tokio::sync::Mutex::new(None),
2292 })
2293 }
2294
2295 fn make_config() -> SignalingConfig {
2297 SignalingConfig {
2298 server_url: Url::parse("ws://127.0.0.1:1/signaling/ws").unwrap(),
2299 connection_timeout: 2,
2300 heartbeat_interval: 30,
2301 reconnect_config: ReconnectConfig::default(),
2302 auth_config: None,
2303 webrtc_role: None,
2304 }
2305 }
2306
2307 fn make_ws_client(config: SignalingConfig) -> Arc<WebSocketSignalingClient> {
2309 Arc::new(WebSocketSignalingClient::new(config))
2310 }
2311
2312 #[tokio::test]
2313 async fn probe_alive_times_out_when_sink_lock_is_stalled() {
2314 let client = make_ws_client(make_config());
2315 client.connected.store(true, Ordering::Release);
2316
2317 let _sink_guard = client.ws_sink.lock().await;
2318
2319 let result = tokio::time::timeout(
2320 Duration::from_millis(250),
2321 client.probe_alive(Duration::from_millis(20)),
2322 )
2323 .await
2324 .expect("probe should be bounded by its own timeout");
2325
2326 let err = result.expect_err("stalled sink lock should fail the probe");
2327 assert!(
2328 err.to_string()
2329 .contains("Timed out sending signaling probe ping"),
2330 "unexpected error: {err}"
2331 );
2332 assert!(
2333 !client.is_connected(),
2334 "stalled probe send should mark signaling disconnected"
2335 );
2336 assert_eq!(client.get_stats().disconnections, 1);
2337 assert!(
2338 client.pending_pongs.lock().await.is_empty(),
2339 "failed probe send should remove its pending pong waiter"
2340 );
2341 }
2342
2343 #[tokio::test]
2344 async fn explicit_connect_once_retries_after_concurrent_attempt_fails() {
2345 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2346 .await
2347 .expect("test listener should bind");
2348 let server_url = format!(
2349 "ws://{}/signaling/ws",
2350 listener
2351 .local_addr()
2352 .expect("test listener should have local addr")
2353 );
2354 let server_task = tokio::spawn(async move {
2355 let (stream, _) = listener
2356 .accept()
2357 .await
2358 .expect("test server should accept tcp connection");
2359 let ws_stream = tokio_tungstenite::accept_async(stream)
2360 .await
2361 .expect("test server should complete websocket handshake");
2362 tokio::time::sleep(Duration::from_millis(100)).await;
2363 drop(ws_stream);
2364 });
2365
2366 let mut config = make_config();
2367 config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2368 config.connection_timeout = 2;
2369 config.reconnect_config = ReconnectConfig {
2370 enabled: false,
2371 ..ReconnectConfig::default()
2372 };
2373 let client = make_ws_client(config);
2374
2375 client.connecting.store(true, Ordering::Release);
2376 let connect_task = {
2377 let client = client.clone();
2378 tokio::spawn(async move { client.connect_once().await })
2379 };
2380
2381 tokio::time::sleep(Duration::from_millis(50)).await;
2382 client.connecting.store(false, Ordering::Release);
2383 let _ = client.event_tx.send(SignalingEvent::Disconnected {
2384 reason: DisconnectReason::ConnectionFailed("simulated auto attempt failed".into()),
2385 });
2386
2387 tokio::time::timeout(Duration::from_secs(2), connect_task)
2388 .await
2389 .expect("explicit connect_once should not wait for auto backoff")
2390 .expect("connect_once task should not panic")
2391 .expect("explicit connect_once should retry after concurrent failure");
2392
2393 assert!(
2394 client.is_connected(),
2395 "explicit recovery connect should establish signaling"
2396 );
2397
2398 client.disconnect().await.ok();
2399 let _ = tokio::time::timeout(Duration::from_secs(1), server_task).await;
2400 }
2401
2402 #[tokio::test]
2403 async fn test_publish_disconnected_transition_fires_hook_once() {
2404 let stats = Arc::new(AtomicSignalingStats::default());
2405 let (event_tx, mut event_rx) = broadcast::channel(4);
2406 let hook_count = Arc::new(AtomicUsize::new(0));
2407 let hook_count_for_cb = hook_count.clone();
2408 let hook_callback: HookCallback = Arc::new(move |event| {
2409 let hook_count = hook_count_for_cb.clone();
2410 Box::pin(async move {
2411 if matches!(event, HookEvent::SignalingDisconnected) {
2412 hook_count.fetch_add(1, UsizeOrdering::SeqCst);
2413 }
2414 }) as Pin<Box<dyn Future<Output = ()> + Send>>
2415 });
2416
2417 let first = WebSocketSignalingClient::publish_disconnected_transition(
2418 true,
2419 &stats,
2420 &event_tx,
2421 Some(hook_callback.clone()),
2422 DisconnectReason::StreamEnded,
2423 None,
2424 )
2425 .await;
2426 assert!(
2427 first,
2428 "first connected->disconnected transition should publish"
2429 );
2430 assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2431 assert_eq!(stats.snapshot().disconnections, 1);
2432 assert!(matches!(
2433 event_rx.recv().await,
2434 Ok(SignalingEvent::Disconnected {
2435 reason: DisconnectReason::StreamEnded
2436 })
2437 ));
2438
2439 let second = WebSocketSignalingClient::publish_disconnected_transition(
2440 false,
2441 &stats,
2442 &event_tx,
2443 Some(hook_callback),
2444 DisconnectReason::PongTimeout,
2445 None,
2446 )
2447 .await;
2448 assert!(
2449 !second,
2450 "stale duplicate disconnected transition should be ignored"
2451 );
2452 assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2453 assert_eq!(stats.snapshot().disconnections, 1);
2454 assert!(event_rx.try_recv().is_err());
2455 }
2456
2457 #[test]
2462 fn test_reconnect_config_defaults() {
2463 let cfg = ReconnectConfig::default();
2464 assert!(cfg.enabled);
2465 assert_eq!(cfg.max_attempts, 10);
2466 assert_eq!(cfg.initial_delay, 1);
2467 assert_eq!(cfg.max_delay, 60);
2468 assert!((cfg.backoff_multiplier - 2.0).abs() < f64::EPSILON);
2469 }
2470
2471 #[test]
2476 fn test_websocket_signaling_client_initial_state_disconnected() {
2477 let client = WebSocketSignalingClient::new(make_config());
2478 assert!(
2479 !client.is_connected(),
2480 "newly created client should be Disconnected"
2481 );
2482 assert!(
2483 !client.connecting.load(Ordering::Acquire),
2484 "newly created client should not be in connecting state"
2485 );
2486 assert!(
2487 !client.reconnector_started.load(Ordering::Acquire),
2488 "reconnect manager should not be started automatically"
2489 );
2490 }
2491
2492 #[test]
2493 fn test_initial_stats_are_zero() {
2494 let client = WebSocketSignalingClient::new(make_config());
2495 let stats = client.get_stats();
2496 assert_eq!(stats.connections, 0);
2497 assert_eq!(stats.disconnections, 0);
2498 assert_eq!(stats.messages_sent, 0);
2499 assert_eq!(stats.messages_received, 0);
2500 assert_eq!(stats.errors, 0);
2501 }
2502
2503 #[test]
2504 fn test_signaling_url_log_redacts_credential_query_params() {
2505 let url = Url::parse(
2506 "wss://example.com/signaling?actor_id=abc&key_id=7&claims=claims-value&signature=signature-value&token=token-value",
2507 )
2508 .unwrap();
2509
2510 let redacted = WebSocketSignalingClient::redact_signaling_url_for_log(&url);
2511
2512 assert!(redacted.contains("actor_id=abc"));
2513 assert!(redacted.contains("key_id=7"));
2514 assert!(redacted.contains("claims=REDACTED"));
2515 assert!(redacted.contains("signature=REDACTED"));
2516 assert!(redacted.contains("token=REDACTED"));
2517 assert!(!redacted.contains("claims-value"));
2518 assert!(!redacted.contains("signature-value"));
2519 assert!(!redacted.contains("token-value"));
2520 }
2521
2522 #[tokio::test]
2527 async fn test_reconnect_manager_idempotent() {
2528 let client = make_ws_client(make_config());
2529
2530 client.start_reconnect_manager();
2532 assert!(
2533 client.reconnector_started.load(Ordering::Acquire),
2534 "reconnector_started should be true after first call"
2535 );
2536
2537 client.start_reconnect_manager();
2539 assert!(client.reconnector_started.load(Ordering::Acquire));
2541 }
2542
2543 #[tokio::test]
2544 async fn test_reconnect_manager_disabled_when_config_disabled() {
2545 let mut config = make_config();
2546 config.reconnect_config.enabled = false;
2547 let client = make_ws_client(config);
2548
2549 client.start_reconnect_manager();
2550 assert!(
2551 !client.reconnector_started.load(Ordering::Acquire),
2552 "reconnect manager should not start when reconnect config is disabled"
2553 );
2554 }
2555
2556 #[tokio::test]
2557 async fn test_reconnect_manager_does_not_keep_client_alive() {
2558 let client = make_ws_client(make_config());
2559 let weak = Arc::downgrade(&client);
2560
2561 client.start_reconnect_manager();
2562 drop(client);
2563
2564 assert!(
2565 weak.upgrade().is_none(),
2566 "reconnect manager must not keep signaling client alive after owner drop"
2567 );
2568 }
2569
2570 #[tokio::test]
2575 async fn test_connect_fast_path_when_already_connected() {
2576 let client = make_ws_client(make_config());
2577 client.connected.store(true, Ordering::Release);
2579
2580 let result = client.connect().await;
2582 assert!(
2583 result.is_ok(),
2584 "connect() should return Ok when already connected"
2585 );
2586 assert!(!client.connecting.load(Ordering::Acquire));
2588 }
2589
2590 #[tokio::test]
2591 async fn test_connect_sets_connecting_flag() {
2592 let mut config = make_config();
2593 config.reconnect_config.enabled = false; config.connection_timeout = 1;
2595 let client = make_ws_client(config);
2596
2597 let result = client.connect().await;
2599 assert!(
2600 result.is_err(),
2601 "connecting to unreachable address should fail"
2602 );
2603 assert!(
2604 !client.connecting.load(Ordering::Acquire),
2605 "connecting flag should be cleared after connection failure"
2606 );
2607 }
2608
2609 #[tokio::test]
2614 async fn test_event_subscribe_receives_events() {
2615 let client = make_ws_client(make_config());
2616 let mut rx = client.subscribe_events();
2617
2618 let _ = client.event_tx.send(SignalingEvent::Connected);
2620
2621 match tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await {
2622 Ok(Ok(SignalingEvent::Connected)) => {} other => panic!("expected Connected event, but got {:?}", other),
2624 }
2625 }
2626
2627 #[tokio::test]
2628 async fn test_disconnect_event_on_connect_failure() {
2629 let mut config = make_config();
2630 config.reconnect_config.enabled = false;
2631 config.connection_timeout = 1;
2632 let client = make_ws_client(config);
2633 let mut rx = client.subscribe_events();
2634
2635 let _ = client.connect().await;
2637
2638 match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
2640 Ok(Ok(SignalingEvent::Disconnected {
2641 reason: DisconnectReason::ConnectionFailed(_),
2642 })) => {} other => panic!(
2644 "expected Disconnected(ConnectionFailed) event, but got {:?}",
2645 other
2646 ),
2647 }
2648 }
2649
2650 #[tokio::test]
2655 async fn test_disconnect_clears_connected_flag() {
2656 let client = make_ws_client(make_config());
2657 client.connected.store(true, Ordering::Release);
2659 assert!(client.is_connected());
2660
2661 let result = client.disconnect().await;
2662 assert!(result.is_ok());
2663 assert!(
2664 !client.is_connected(),
2665 "should be Disconnected after disconnect()"
2666 );
2667 }
2668
2669 #[tokio::test]
2670 async fn test_disconnect_increments_disconnection_stat() {
2671 let client = make_ws_client(make_config());
2672 client.connected.store(true, Ordering::Release);
2673
2674 let stats_before = client.get_stats().disconnections;
2675 let _ = client.disconnect().await;
2676 let stats_after = client.get_stats().disconnections;
2677 assert_eq!(
2678 stats_after,
2679 stats_before + 1,
2680 "disconnect() should increment disconnection count"
2681 );
2682 }
2683
2684 #[tokio::test]
2685 async fn test_disconnect_idempotent() {
2686 let client = make_ws_client(make_config());
2687
2688 let r1 = client.disconnect().await;
2690 let r2 = client.disconnect().await;
2691 assert!(r1.is_ok());
2692 assert!(r2.is_ok());
2693 assert!(!client.is_connected());
2694 }
2695
2696 #[tokio::test]
2701 async fn test_reconnect_notify_wakes_waiter() {
2702 let notify = Arc::new(tokio::sync::Notify::new());
2703 let notify_clone = notify.clone();
2704 let woken = Arc::new(AtomicBool::new(false));
2705 let woken_clone = woken.clone();
2706
2707 let handle = tokio::spawn(async move {
2708 notify_clone.notified().await;
2709 woken_clone.store(true, Ordering::Release);
2710 });
2711
2712 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2714 assert!(
2715 !woken.load(Ordering::Acquire),
2716 "should not be woken before notification"
2717 );
2718
2719 notify.notify_one();
2721 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2722 assert!(
2723 woken.load(Ordering::Acquire),
2724 "should be woken after notification"
2725 );
2726
2727 handle.abort();
2728 }
2729
2730 #[tokio::test]
2731 async fn test_schedule_auto_reconnect_reenables_after_explicit_disconnect() {
2732 let client = make_ws_client(make_config());
2733
2734 client
2735 .disconnect()
2736 .await
2737 .expect("explicit disconnect should be idempotent");
2738 assert!(
2739 client.auto_reconnect_suppressed.load(Ordering::Acquire),
2740 "explicit disconnect should suppress stale auto-reconnect cycles"
2741 );
2742
2743 client.schedule_auto_reconnect();
2744
2745 assert!(
2746 !client.auto_reconnect_suppressed.load(Ordering::Acquire),
2747 "scheduling a fresh auto-reconnect should clear explicit disconnect suppression"
2748 );
2749 }
2750
2751 #[tokio::test]
2752 async fn test_explicit_disconnect_suppresses_reconnect_cycle_in_backoff() {
2753 let mut config = make_config();
2754 config.connection_timeout = 1;
2755 config.reconnect_config = ReconnectConfig {
2756 enabled: true,
2757 max_attempts: 5,
2758 initial_delay: 1,
2759 max_delay: 1,
2760 backoff_multiplier: 1.0,
2761 };
2762 let client = make_ws_client(config);
2763 let mut rx = client.subscribe_events();
2764
2765 let reconnect_client = client.clone();
2766 let reconnect_task = tokio::spawn(async move {
2767 reconnect_client.run_reconnect_cycle().await;
2768 });
2769
2770 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2771 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2772 other => panic!("expected first reconnect attempt, got {other:?}"),
2773 }
2774
2775 client
2776 .disconnect()
2777 .await
2778 .expect("explicit disconnect should be idempotent");
2779
2780 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2781 .await
2782 .expect("suppressed reconnect cycle should exit promptly")
2783 .expect("reconnect task should not panic");
2784
2785 while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
2786 {
2787 if let SignalingEvent::ConnectStart { attempt } = event {
2788 panic!("suppressed reconnect cycle sent unexpected attempt {attempt}");
2789 }
2790 }
2791
2792 assert!(
2793 client.auto_reconnect_suppressed.load(Ordering::Acquire),
2794 "explicit disconnect should suppress stale auto-reconnect cycles"
2795 );
2796 }
2797
2798 #[tokio::test]
2799 async fn test_explicit_disconnect_suppresses_in_flight_auto_reconnect_connected_event() {
2800 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2801 .await
2802 .expect("test listener should bind");
2803 let server_url = format!(
2804 "ws://{}/signaling/ws",
2805 listener
2806 .local_addr()
2807 .expect("test listener should have local addr")
2808 );
2809 let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
2810
2811 let server_task = tokio::spawn(async move {
2812 let (stream, _) = listener
2813 .accept()
2814 .await
2815 .expect("test server should accept tcp connection");
2816 let _ = release_rx.await;
2817 let ws_stream = tokio_tungstenite::accept_async(stream)
2818 .await
2819 .expect("test server should complete websocket handshake");
2820 tokio::time::sleep(Duration::from_millis(100)).await;
2821 drop(ws_stream);
2822 });
2823
2824 let mut config = make_config();
2825 config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2826 config.connection_timeout = 5;
2827 config.reconnect_config = ReconnectConfig {
2828 enabled: true,
2829 max_attempts: 3,
2830 initial_delay: 1,
2831 max_delay: 1,
2832 backoff_multiplier: 1.0,
2833 };
2834 let client = make_ws_client(config);
2835 let mut rx = client.subscribe_events();
2836
2837 let reconnect_client = client.clone();
2838 let reconnect_task = tokio::spawn(async move {
2839 reconnect_client.run_reconnect_cycle().await;
2840 });
2841
2842 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2843 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2844 other => panic!("expected first reconnect attempt, got {other:?}"),
2845 }
2846
2847 client
2848 .disconnect()
2849 .await
2850 .expect("explicit disconnect should cancel the in-flight auto-reconnect");
2851 release_tx
2852 .send(())
2853 .expect("test server handshake should still be waiting");
2854
2855 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2856 .await
2857 .expect("cancelled in-flight reconnect should exit promptly")
2858 .expect("reconnect task should not panic");
2859
2860 while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(150), rx.recv()).await
2861 {
2862 assert!(
2863 !matches!(event, SignalingEvent::Connected),
2864 "cancelled auto-reconnect must not publish Connected"
2865 );
2866 }
2867
2868 assert!(
2869 !client.is_connected(),
2870 "cancelled auto-reconnect must not leave signaling connected"
2871 );
2872
2873 tokio::time::timeout(Duration::from_secs(1), server_task)
2874 .await
2875 .expect("test server task should finish")
2876 .expect("test server task should not panic");
2877 }
2878
2879 #[tokio::test]
2884 async fn test_build_url_without_identity() {
2885 let config = make_config();
2886 let expected_base = config.server_url.to_string();
2887 let client = WebSocketSignalingClient::new(config);
2888
2889 let url = client.build_url_with_identity().await;
2890 assert_eq!(
2891 url.to_string(),
2892 expected_base,
2893 "URL should not contain identity parameters when actor_id is not set"
2894 );
2895 }
2896
2897 #[tokio::test]
2898 async fn test_build_url_with_webrtc_role() {
2899 let mut config = make_config();
2900 config.webrtc_role = Some("answer".to_string());
2901 let client = WebSocketSignalingClient::new(config);
2902
2903 let url = client.build_url_with_identity().await;
2904 assert!(
2905 url.query().unwrap_or("").contains("webrtc_role=answer"),
2906 "URL should contain webrtc_role parameter, actual URL: {}",
2907 url
2908 );
2909 }
2910
2911 #[tokio::test]
2916 async fn test_reset_inbound_channel_creates_fresh_channel() {
2917 let client = WebSocketSignalingClient::new(make_config());
2918
2919 {
2921 let tx = client.inbound_tx.lock().await;
2922 let _ = tx.send(SignalingEnvelope::default());
2923 }
2924
2925 client.reset_inbound_channel().await;
2927
2928 let mut rx = client.inbound_rx.lock().await;
2930 let result = rx.try_recv();
2931 assert!(
2932 result.is_err(),
2933 "old messages should not be visible in the new channel after reset"
2934 );
2935 }
2936
2937 #[tokio::test]
2942 async fn test_envelope_id_monotonically_increasing() {
2943 let client = WebSocketSignalingClient::new(make_config());
2944
2945 let id1 = client.next_envelope_id().await;
2946 let id2 = client.next_envelope_id().await;
2947 let id3 = client.next_envelope_id().await;
2948
2949 assert_eq!(id1, "env-1");
2950 assert_eq!(id2, "env-2");
2951 assert_eq!(id3, "env-3");
2952 }
2953
2954 #[tokio::test]
2959 async fn test_send_envelope_fails_when_not_connected() {
2960 let client = WebSocketSignalingClient::new(make_config());
2961 let envelope = SignalingEnvelope::default();
2962
2963 let result = client.send_envelope(envelope).await;
2964 assert!(
2965 result.is_err(),
2966 "send_envelope should return error when not connected"
2967 );
2968 match result {
2969 Err(NetworkError::ConnectionError(msg)) => {
2970 assert!(
2971 msg.contains("not connected") || msg.contains("Not connected"),
2972 "error message should contain 'not connected', actual: {}",
2973 msg
2974 );
2975 }
2976 other => panic!("expected ConnectionError, got {:?}", other),
2977 }
2978 }
2979
2980 #[tokio::test]
2985 async fn test_fake_client_tracks_connect_calls() {
2986 let client = make_fake_client();
2987 assert_eq!(client.connect_calls.load(UsizeOrdering::SeqCst), 0);
2988
2989 client.connect().await.unwrap();
2990 client.connect().await.unwrap();
2991 client.connect().await.unwrap();
2992
2993 assert_eq!(
2994 client.connect_calls.load(UsizeOrdering::SeqCst),
2995 3,
2996 "FakeSignalingClient should accurately track connect call count"
2997 );
2998 }
2999}