1#[cfg(feature = "opentelemetry")]
6use super::trace;
7use crate::lifecycle::CredentialState;
8use crate::transport::{NetworkError, NetworkResult};
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::extract_trace_context;
11use actr_protocol::prost::Message as ProstMessage;
12use actr_protocol::{
13 AIdCredential, ActrId, ActrToSignaling, CredentialUpdateRequest, GetSigningKeyRequest,
14 PeerToSignaling, Ping, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
15 RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterRequest,
16 UnregisterResponse, actr_to_signaling, peer_to_signaling, signaling_envelope,
17 signaling_to_actr,
18};
19use async_trait::async_trait;
20use base64::Engine as _;
21use futures_util::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::{
27 Arc, OnceLock,
28 atomic::{AtomicBool, AtomicU64, Ordering},
29};
30use std::time::Duration;
31use tokio::net::TcpStream;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
34use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async_with_config};
35#[cfg(feature = "opentelemetry")]
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use url::Url;
38
39type WsSink = Arc<
41 tokio::sync::Mutex<
42 Option<
43 futures_util::stream::SplitSink<
44 WebSocketStream<MaybeTlsStream<TcpStream>>,
45 tokio_tungstenite::tungstenite::Message,
46 >,
47 >,
48 >,
49>;
50
51const RESPONSE_TIMEOUT_SECS: u64 = 15;
57const PING_INTERVAL_SECS: u64 = 5;
59const PONG_TIMEOUT_SECS: u64 = 10;
60const SIGNALING_SEND_TIMEOUT_SECS: u64 = 5;
61const CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS: u64 = 5;
62const DISCONNECT_LOCK_TIMEOUT_SECS: u64 = 5;
63const DISCONNECT_CLOSE_TIMEOUT_SECS: u64 = 1;
64
65#[derive(Debug, Clone)]
71pub struct SignalingConfig {
72 pub server_url: Url,
74
75 pub connection_timeout: u64,
77
78 pub heartbeat_interval: u64,
80
81 pub reconnect_config: ReconnectConfig,
83
84 pub auth_config: Option<AuthConfig>,
86
87 pub webrtc_role: Option<String>,
89}
90
91#[derive(Debug, Clone)]
93pub struct ReconnectConfig {
94 pub enabled: bool,
96
97 pub max_attempts: u32,
99
100 pub initial_delay: u64,
102
103 pub max_delay: u64,
105
106 pub backoff_multiplier: f64,
108}
109
110impl Default for ReconnectConfig {
111 fn default() -> Self {
112 Self {
113 enabled: true,
114 max_attempts: 10,
115 initial_delay: 1,
116 max_delay: 60,
117 backoff_multiplier: 2.0,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct AuthConfig {
125 pub auth_type: AuthType,
127
128 pub credentials: HashMap<String, String>,
130}
131
132#[derive(Debug, Clone)]
134pub enum AuthType {
135 None,
137 BearerToken,
139 ApiKey,
141 Jwt,
143}
144
145#[async_trait]
155pub trait SignalingClient: Send + Sync {
156 async fn connect(&self) -> NetworkResult<()>;
158
159 async fn connect_once(&self) -> NetworkResult<()> {
164 self.connect().await
165 }
166
167 async fn disconnect(&self) -> NetworkResult<()>;
169
170 async fn probe_alive(&self, _timeout: Duration) -> NetworkResult<()> {
176 if self.is_connected() {
177 Ok(())
178 } else {
179 Err(NetworkError::ConnectionError(
180 "Signaling client is not connected".to_string(),
181 ))
182 }
183 }
184
185 async fn send_register_request(
188 &self,
189 request: RegisterRequest,
190 ) -> NetworkResult<RegisterResponse>;
191
192 async fn send_unregister_request(
197 &self,
198 actor_id: ActrId,
199 credential: AIdCredential,
200 reason: Option<String>,
201 ) -> NetworkResult<UnregisterResponse>;
202
203 async fn send_heartbeat(
206 &self,
207 actor_id: ActrId,
208 credential: AIdCredential,
209 availability: ServiceAvailabilityState,
210 power_reserve: f32,
211 mailbox_backlog: f32,
212 ) -> NetworkResult<Pong>;
213
214 async fn send_route_candidates_request(
216 &self,
217 actor_id: ActrId,
218 credential: AIdCredential,
219 request: RouteCandidatesRequest,
220 ) -> NetworkResult<RouteCandidatesResponse>;
221
222 async fn get_signing_key(
227 &self,
228 actor_id: ActrId,
229 credential: AIdCredential,
230 key_id: u32,
231 ) -> NetworkResult<(u32, Vec<u8>)>;
232
233 async fn send_credential_update_request(
238 &self,
239 actor_id: ActrId,
240 credential: AIdCredential,
241 ) -> NetworkResult<RegisterResponse>;
242
243 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
245
246 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
248
249 fn is_connected(&self) -> bool;
251
252 fn get_stats(&self) -> SignalingStats;
254 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent>;
256
257 async fn set_actor_id(&self, actor_id: ActrId);
259 async fn set_credential_state(&self, credential_state: CredentialState);
260
261 async fn clear_identity(&self);
268
269 fn set_hook_callback(&self, _cb: HookCallback) {}
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
277pub enum ConnectionState {
278 Disconnected,
279 Connected,
280}
281
282#[derive(Debug, Clone)]
288pub enum SignalingEvent {
289 ConnectStart { attempt: u32 },
291 Connected,
293 Disconnected { reason: DisconnectReason },
295}
296
297#[derive(Debug, Clone)]
299pub enum DisconnectReason {
300 StreamEnded,
302 PongTimeout,
304 PingSendFailed,
306 CredentialExpired,
308 Manual,
310 ConnectionFailed(String),
312}
313
314#[derive(Clone, Debug)]
323pub enum HookEvent {
324 SignalingConnectStart {
326 attempt: u32,
327 },
328 SignalingConnected,
329 SignalingDisconnected,
330 WebRtcConnectStart {
332 peer_id: ActrId,
333 },
334 WebRtcConnected {
335 peer_id: ActrId,
336 relayed: bool,
337 },
338 WebRtcDisconnected {
339 peer_id: ActrId,
340 },
341 DataStreamDeliveryUncertain {
342 stream_id: String,
343 session_id: u64,
344 reason: String,
345 },
346 WebSocketConnectStart {
348 peer_id: ActrId,
349 },
350 WebSocketConnected {
351 peer_id: ActrId,
352 },
353 WebSocketDisconnected {
354 peer_id: ActrId,
355 },
356 CredentialRenewed {
358 new_expiry: std::time::SystemTime,
359 },
360 CredentialExpiring {
361 new_expiry: std::time::SystemTime,
362 },
363 MailboxBackpressure {
365 queue_len: usize,
366 threshold: usize,
367 },
368}
369
370pub type HookCallback =
375 Arc<dyn Fn(HookEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
376
377#[derive(Debug, Clone, Copy)]
378enum ConnectIntent {
379 Explicit,
380 AutoReconnect { generation: u64 },
381}
382
383pub struct WebSocketSignalingClient {
385 config: SignalingConfig,
386 actor_id: tokio::sync::Mutex<Option<ActrId>>,
387 credential_state: tokio::sync::Mutex<Option<CredentialState>>,
388 ws_sink: WsSink,
390 ws_stream: tokio::sync::Mutex<
392 Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
393 >,
394 connected: Arc<AtomicBool>,
396 connecting: Arc<AtomicBool>,
398 stats: Arc<AtomicSignalingStats>,
400 envelope_counter: tokio::sync::Mutex<u64>,
402 pending_replies: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<SignalingEnvelope>>>>,
404 pending_pongs: Arc<tokio::sync::Mutex<HashMap<Vec<u8>, oneshot::Sender<()>>>>,
406 probe_counter: AtomicU64,
408 inbound_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<SignalingEnvelope>>>,
410 inbound_tx: tokio::sync::Mutex<mpsc::UnboundedSender<SignalingEnvelope>>,
411 receiver_task: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
413 ping_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
415 event_tx: broadcast::Sender<SignalingEvent>,
417 last_pong: Arc<AtomicU64>,
419 reconnector_started: Arc<AtomicBool>,
421 reconnect_notify: Arc<tokio::sync::Notify>,
423 auto_reconnect_suppressed: AtomicBool,
425 reconnect_generation: AtomicU64,
427 hook_callback: OnceLock<HookCallback>,
429}
430
431impl WebSocketSignalingClient {
432 pub fn new(config: SignalingConfig) -> Self {
434 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
435 let (event_tx, _event_rx) = broadcast::channel(64);
436 Self {
437 config,
438 actor_id: tokio::sync::Mutex::new(None),
439 credential_state: tokio::sync::Mutex::new(None),
440 ws_sink: Arc::new(tokio::sync::Mutex::new(None)),
441 ws_stream: tokio::sync::Mutex::new(None),
442 connected: Arc::new(AtomicBool::new(false)),
443 connecting: Arc::new(AtomicBool::new(false)),
444 stats: Arc::new(AtomicSignalingStats::default()),
445 envelope_counter: tokio::sync::Mutex::new(0),
446 pending_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
447 pending_pongs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
448 probe_counter: AtomicU64::new(0),
449 inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
450 inbound_tx: tokio::sync::Mutex::new(inbound_tx),
451 receiver_task: Arc::new(tokio::sync::Mutex::new(None)),
452 ping_task: tokio::sync::Mutex::new(None),
453 event_tx,
454 last_pong: Arc::new(AtomicU64::new(0)),
455 reconnector_started: Arc::new(AtomicBool::new(false)),
456 reconnect_notify: Arc::new(tokio::sync::Notify::new()),
457 auto_reconnect_suppressed: AtomicBool::new(false),
458 reconnect_generation: AtomicU64::new(0),
459 hook_callback: OnceLock::new(),
460 }
461 }
462
463 async fn invoke_hook(&self, event: HookEvent) {
470 if let Some(cb) = self.hook_callback.get() {
471 cb(event).await;
472 }
473 }
474
475 async fn publish_disconnected_transition(
476 was_connected: bool,
477 stats: &Arc<AtomicSignalingStats>,
478 event_tx: &broadcast::Sender<SignalingEvent>,
479 hook_callback: Option<HookCallback>,
480 reason: DisconnectReason,
481 reconnect_notify: Option<&Arc<tokio::sync::Notify>>,
482 ) -> bool {
483 if !was_connected {
484 return false;
485 }
486
487 stats.disconnections.fetch_add(1, Ordering::Relaxed);
488
489 if let Some(cb) = hook_callback {
490 cb(HookEvent::SignalingDisconnected).await;
491 }
492
493 let _ = event_tx.send(SignalingEvent::Disconnected { reason });
494
495 if let Some(notify) = reconnect_notify {
496 notify.notify_one();
497 }
498
499 true
500 }
501
502 pub fn start_reconnect_manager(self: &Arc<Self>) {
503 if !self.config.reconnect_config.enabled {
504 return;
505 }
506 if self
507 .reconnector_started
508 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
509 .is_err()
510 {
511 return; }
513
514 tracing::info!("🔄 Starting reconnect manager for signaling client");
515
516 let client = Arc::downgrade(self);
517 let notify = self.reconnect_notify.clone();
518
519 tokio::spawn(async move {
520 loop {
521 let reconnect_requested = tokio::select! {
522 _ = notify.notified() => true,
523 _ = tokio::time::sleep(Duration::from_secs(30)) => false,
524 };
525
526 if !reconnect_requested && client.upgrade().is_none() {
527 break;
528 }
529 if !reconnect_requested {
530 continue;
531 }
532
533 let Some(client) = client.upgrade() else {
534 break;
535 };
536
537 if !client.config.reconnect_config.enabled {
538 break;
539 }
540
541 if Arc::strong_count(&client) <= 1 {
542 break;
543 }
544
545 client.run_reconnect_cycle().await;
547 }
548 });
549 }
550
551 async fn run_reconnect_cycle(self: &Arc<Self>) {
553 use actr_framework::ExponentialBackoff;
554
555 let cfg = &self.config.reconnect_config;
556 let generation = self.reconnect_generation.load(Ordering::Acquire);
557
558 if Arc::strong_count(self) <= 1 {
559 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
560 return;
561 }
562
563 if self.auto_reconnect_cancelled(generation) {
564 tracing::debug!("Skipping signaling auto-reconnect cycle after explicit disconnect");
565 return;
566 }
567
568 if self.connected.load(Ordering::Acquire) {
569 tracing::debug!("🔎 Probing connected signaling before reconnect cycle");
570 match self
571 .probe_alive(Duration::from_secs(PONG_TIMEOUT_SECS))
572 .await
573 {
574 Ok(()) => {
575 tracing::debug!("Signaling probe succeeded, skipping reconnect cycle");
576 return;
577 }
578 Err(e) => {
579 tracing::warn!("Signaling probe failed before reconnect: {e}");
580 if let Err(disconnect_err) = self.disconnect_internal(false).await {
581 tracing::warn!(
582 "⚠️ Disconnect cleanup failed after failed probe (non-fatal): {disconnect_err}"
583 );
584 }
585 }
586 }
587 }
588
589 let backoff = ExponentialBackoff::builder()
590 .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
591 .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
592 .max_retries(cfg.max_attempts)
593 .with_jitter()
594 .build();
595
596 let mut attempt: u32 = 0;
597
598 for delay in backoff {
599 if Arc::strong_count(self) <= 1 {
600 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
601 return;
602 }
603
604 if self.auto_reconnect_cancelled(generation) {
605 tracing::debug!(
606 "Stopping signaling auto-reconnect cycle after explicit disconnect"
607 );
608 return;
609 }
610
611 if self.connected.load(Ordering::Acquire) {
612 tracing::debug!("Already connected, aborting reconnect cycle");
613 return;
614 }
615
616 attempt += 1;
617 let _ = self.event_tx.send(SignalingEvent::ConnectStart { attempt });
618
619 match self.connect_once_for_auto_reconnect(generation).await {
620 Ok(()) => {
621 tracing::info!("✅ Signaling reconnect succeeded on attempt {attempt}");
622 return;
623 }
624 Err(e) => {
625 if self.auto_reconnect_cancelled(generation) {
626 tracing::debug!(
627 "Stopping signaling auto-reconnect cycle after explicit disconnect"
628 );
629 return;
630 }
631
632 tracing::warn!(
633 "❌ Reconnect attempt {attempt} failed: {e}, retrying in {delay:?}"
634 );
635 tokio::select! {
636 _ = tokio::time::sleep(delay) => {}
637 _ = self.reconnect_notify.notified() => {
638 tracing::debug!("Explicit reconnect request interrupted reconnect backoff");
639 }
640 }
641 if Arc::strong_count(self) <= 1 {
642 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
643 return;
644 }
645 if self.auto_reconnect_cancelled(generation) {
646 tracing::debug!(
647 "Stopping signaling auto-reconnect cycle after explicit disconnect"
648 );
649 return;
650 }
651 }
652 }
653 }
654
655 tracing::error!("Reconnect failed after {attempt} attempts, entering cooldown");
657 let cooldown = std::time::Duration::from_secs(cfg.max_delay.max(1) * 2);
658 tokio::select! {
659 _ = tokio::time::sleep(cooldown) => {}
660 _ = self.reconnect_notify.notified() => {
661 tracing::debug!("Explicit reconnect request interrupted reconnect cooldown");
662 }
663 }
664 if self.auto_reconnect_cancelled(generation) {
665 tracing::debug!(
666 "Signaling auto-reconnect cooldown ended after explicit disconnect suppression"
667 );
668 }
669 }
671
672 #[cfg(feature = "test-utils")]
680 pub async fn connect_to(url: &str) -> NetworkResult<Arc<Self>> {
681 let config = SignalingConfig {
682 server_url: url.parse()?,
683 connection_timeout: 5,
684 heartbeat_interval: 30,
685 reconnect_config: ReconnectConfig::default(),
686 auth_config: None,
687 webrtc_role: None,
688 };
689
690 let client = Arc::new(Self::new(config));
691 client.start_reconnect_manager();
692 client.connect().await?;
693 Ok(client)
694 }
695
696 #[cfg(feature = "test-utils")]
702 pub async fn connect_to_with_identity(
703 url: &str,
704 actor_id: ActrId,
705 credential_state: CredentialState,
706 ) -> NetworkResult<Arc<Self>> {
707 let config = SignalingConfig {
708 server_url: url.parse()?,
709 connection_timeout: 5,
710 heartbeat_interval: 30,
711 reconnect_config: ReconnectConfig::default(),
712 auth_config: None,
713 webrtc_role: None,
714 };
715
716 let client = Arc::new(Self::new(config));
717 client.set_actor_id(actor_id).await;
718 client.set_credential_state(credential_state).await;
719 client.start_reconnect_manager();
720 client.connect().await?;
721 Ok(client)
722 }
723
724 async fn next_envelope_id(&self) -> String {
726 let mut counter = self.envelope_counter.lock().await;
727 *counter += 1;
728 format!("env-{}", *counter)
729 }
730
731 async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
733 SignalingEnvelope {
734 envelope_version: 1,
735 envelope_id: self.next_envelope_id().await,
736 reply_for: None,
737 timestamp: prost_types::Timestamp {
738 seconds: chrono::Utc::now().timestamp(),
739 nanos: 0,
740 },
741 traceparent: None,
742 tracestate: None,
743 flow: Some(flow),
744 }
745 }
746
747 async fn reset_inbound_channel(&self) {
749 self.drop_pending_replies("inbound channel reset").await;
750 self.drop_pending_pongs("inbound channel reset").await;
751
752 let (tx, rx) = mpsc::unbounded_channel();
753 *self.inbound_tx.lock().await = tx;
754 *self.inbound_rx.lock().await = rx;
755 }
756
757 async fn drop_pending_replies(&self, reason: &'static str) {
758 let dropped = {
759 let mut pending = self.pending_replies.lock().await;
760 let dropped = pending.len();
761 pending.clear();
762 dropped
763 };
764
765 if dropped > 0 {
766 tracing::debug!(reason, dropped, "Dropping pending signaling reply waiters");
767 }
768 }
769
770 async fn drop_pending_pongs(&self, reason: &'static str) {
771 let dropped = {
772 let mut pending = self.pending_pongs.lock().await;
773 let dropped = pending.len();
774 pending.clear();
775 dropped
776 };
777
778 if dropped > 0 {
779 tracing::debug!(reason, dropped, "Dropping pending signaling pong waiters");
780 }
781 }
782
783 async fn build_url_with_identity(&self) -> Url {
788 let mut url = self.config.server_url.clone();
789 let actor_id_opt = self.actor_id.lock().await.clone();
790 if let Some(actor_id) = actor_id_opt {
791 let actor_str = actr_protocol::ActrId::to_string_repr(&actor_id);
792 url.query_pairs_mut().append_pair("actor_id", &actor_str);
793 }
794
795 let cred_state_opt = self.credential_state.lock().await.clone();
797 if let Some(cred_state) = cred_state_opt {
798 let cred = cred_state.credential().await;
799 let claims_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.claims);
800 let sig_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.signature);
801 url.query_pairs_mut()
802 .append_pair("key_id", &cred.key_id.to_string())
803 .append_pair("claims", &claims_b64)
804 .append_pair("signature", &sig_b64);
805 }
806
807 if let Some(role) = &self.config.webrtc_role {
809 url.query_pairs_mut().append_pair("webrtc_role", role);
810 }
811
812 url
813 }
814
815 fn redact_signaling_url_for_log(url: &Url) -> String {
816 let mut redacted = url.clone();
817 let pairs: Vec<(String, String)> = redacted
818 .query_pairs()
819 .map(|(key, value)| {
820 let redacted_value = match key.to_ascii_lowercase().as_str() {
821 "claims" | "signature" | "token" | "authorization" | "bearer"
822 | "access_token" | "api_key" => "REDACTED".to_string(),
823 _ => value.into_owned(),
824 };
825 (key.into_owned(), redacted_value)
826 })
827 .collect();
828
829 redacted.set_query(None);
830 if !pairs.is_empty() {
831 let mut query = redacted.query_pairs_mut();
832 for (key, value) in pairs {
833 query.append_pair(&key, &value);
834 }
835 }
836
837 redacted.to_string()
838 }
839
840 fn auto_reconnect_cancelled(&self, generation: u64) -> bool {
841 self.auto_reconnect_suppressed.load(Ordering::Acquire)
842 || self.reconnect_generation.load(Ordering::Acquire) != generation
843 }
844
845 async fn establish_connection_once(&self) -> NetworkResult<()> {
849 self.establish_connection_once_with_intent(ConnectIntent::Explicit)
850 .await
851 }
852
853 async fn establish_connection_once_for_auto_reconnect(
854 &self,
855 generation: u64,
856 ) -> NetworkResult<()> {
857 self.establish_connection_once_with_intent(ConnectIntent::AutoReconnect { generation })
858 .await
859 }
860
861 async fn establish_connection_once_with_intent(
862 &self,
863 intent: ConnectIntent,
864 ) -> NetworkResult<()> {
865 if self.connected.load(Ordering::Acquire) {
867 tracing::debug!("Connection already established, skipping establish_connection_once()");
868 return Ok(());
869 }
870
871 let url = self.build_url_with_identity().await;
872 let timeout_secs = self.config.connection_timeout;
873 tracing::debug!(
874 "Establishing connection to URL: {}",
875 Self::redact_signaling_url_for_log(&url)
876 );
877 let config = WebSocketConfig::default().write_buffer_size(0);
879 let connect_result = if timeout_secs == 0 {
881 connect_async_with_config(url.as_str(), Some(config), false).await
882 } else {
883 let timeout_duration = std::time::Duration::from_secs(timeout_secs);
884 tokio::time::timeout(
885 timeout_duration,
886 connect_async_with_config(url.as_str(), Some(config), false),
887 )
888 .await
889 .map_err(|_| {
890 NetworkError::ConnectionError(format!(
891 "Signaling connect timeout after {}s",
892 timeout_secs
893 ))
894 })?
895 }?;
896
897 let (ws_stream, _) = connect_result;
898
899 let (sink, stream) = ws_stream.split();
901
902 if let ConnectIntent::AutoReconnect { generation } = intent
903 && self.auto_reconnect_cancelled(generation)
904 {
905 tracing::debug!(
906 generation,
907 "Discarding completed signaling auto-reconnect after explicit disconnect"
908 );
909 let mut sink = sink;
910 if let Err(e) = sink.close().await {
911 tracing::warn!(
912 "Signaling auto-reconnect socket close failed after cancellation: {}",
913 e
914 );
915 }
916 return Err(NetworkError::ConnectionError(
917 "Signaling auto-reconnect was cancelled by explicit disconnect".to_string(),
918 ));
919 }
920
921 *self.ws_sink.lock().await = Some(sink);
922 *self.ws_stream.lock().await = Some(stream);
923 self.connected.store(true, Ordering::Release);
924 self.auto_reconnect_suppressed
925 .store(false, Ordering::Release);
926 self.last_pong.store(current_unix_secs(), Ordering::Release);
927 self.invoke_hook(HookEvent::SignalingConnected).await;
929 let _ = self.event_tx.send(SignalingEvent::Connected);
930
931 self.stats.connections.fetch_add(1, Ordering::Relaxed);
932
933 Ok(())
934 }
935
936 async fn connect_with_retries(&self) -> NetworkResult<()> {
938 use actr_framework::ExponentialBackoff;
939
940 let cfg = &self.config.reconnect_config;
941
942 if !cfg.enabled {
944 return self.establish_connection_once().await;
945 }
946
947 let backoff = ExponentialBackoff::builder()
948 .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
949 .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
950 .max_retries(cfg.max_attempts)
951 .with_jitter()
952 .build();
953
954 let mut last_err = None;
955
956 for (attempt, delay) in std::iter::once(std::time::Duration::ZERO)
958 .chain(backoff)
959 .enumerate()
960 {
961 let attempt = attempt as u32 + 1;
962 self.invoke_hook(HookEvent::SignalingConnectStart { attempt })
963 .await;
964 if delay > std::time::Duration::ZERO {
965 tracing::info!("Retry signaling connect after {delay:?} (attempt {attempt})");
966 tokio::select! {
967 _ = tokio::time::sleep(delay) => {}
968 _ = self.reconnect_notify.notified() => {
969 tracing::debug!("Explicit reconnect request interrupted signaling connect backoff");
970 }
971 }
972 }
973
974 match self.establish_connection_once().await {
975 Ok(()) => return Ok(()),
976 Err(e) => {
977 tracing::warn!("Signaling connect attempt {attempt} failed: {e:?}");
978 last_err = Some(e);
979 }
980 }
981 }
982
983 let total = cfg.max_attempts + 1; tracing::error!("Signaling connect failed after {total} attempts, giving up");
985 Err(last_err.unwrap_or_else(|| {
986 NetworkError::ConnectionError("All connection attempts failed".to_string())
987 }))
988 }
989
990 #[cfg_attr(
992 feature = "opentelemetry",
993 tracing::instrument(skip_all, fields(envelope_id = %envelope.envelope_id))
994 )]
995 async fn send_envelope_and_wait_response(
996 &self,
997 envelope: SignalingEnvelope,
998 ) -> NetworkResult<SignalingEnvelope> {
999 let reply_for = envelope.envelope_id.clone();
1000
1001 let (tx, rx) = oneshot::channel();
1003 self.pending_replies
1004 .lock()
1005 .await
1006 .insert(reply_for.clone(), tx);
1007
1008 if let Err(e) = self.send_envelope(envelope).await {
1009 self.pending_replies.lock().await.remove(&reply_for);
1011 return Err(e);
1012 }
1013
1014 let result =
1015 tokio::time::timeout(std::time::Duration::from_secs(RESPONSE_TIMEOUT_SECS), rx).await;
1016 if result.is_err() {
1018 self.pending_replies.lock().await.remove(&reply_for);
1019 }
1020
1021 let response_envelope = result
1022 .map_err(|_| {
1023 NetworkError::ConnectionError(
1024 "Timed out waiting for signaling response".to_string(),
1025 )
1026 })?
1027 .map_err(|_| {
1028 NetworkError::ConnectionError(
1029 "Receiver dropped while waiting for signaling response".to_string(),
1030 )
1031 })?;
1032
1033 Ok(response_envelope)
1034 }
1035
1036 async fn start_receiver(&self) {
1038 let mut stream_guard = self.ws_stream.lock().await;
1039 if stream_guard.is_none() {
1040 return;
1041 }
1042
1043 let mut stream = stream_guard.take().expect("stream exists");
1044 let pending = self.pending_replies.clone();
1045 let inbound_tx = { self.inbound_tx.lock().await.clone() };
1046 let stats = self.stats.clone();
1047 let connected = self.connected.clone();
1048 let event_tx = self.event_tx.clone();
1049 let last_pong = self.last_pong.clone();
1050 let pending_pongs = self.pending_pongs.clone();
1051 let reconnect_notify = self.reconnect_notify.clone();
1052 let reconnect_enabled = self.config.reconnect_config.enabled;
1053 let hook_callback = self.hook_callback.get().cloned();
1054 let handle = tokio::spawn(async move {
1055 while let Some(msg) = stream.next().await {
1056 match msg {
1057 Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
1058 last_pong.store(current_unix_secs(), Ordering::Release);
1060 match SignalingEnvelope::decode(&data[..]) {
1061 Ok(envelope) => {
1062 #[cfg(feature = "opentelemetry")]
1063 let span = {
1064 let span = tracing::info_span!("signaling.receive_envelope", envelope_id = %envelope.envelope_id);
1065 span.set_parent(extract_trace_context(&envelope));
1066 span
1067 };
1068
1069 stats.messages_received.fetch_add(1, Ordering::Relaxed);
1070 tracing::debug!("Received message: {:?}", envelope);
1071 if let Some(reply_for) = envelope.reply_for.clone() {
1072 if let Some(sender) = pending.lock().await.remove(&reply_for) {
1073 #[cfg(feature = "opentelemetry")]
1074 let _ = span.enter();
1075 if let Err(e) = sender.send(envelope) {
1076 stats.errors.fetch_add(1, Ordering::Relaxed);
1077 tracing::warn!(
1078 "Failed to send reply envelope to waiter: {e:?}",
1079 );
1080 }
1081 continue;
1082 }
1083 }
1084 tracing::debug!(
1085 "Unmatched or push message -> forward to inbound channel"
1086 );
1087 if let Err(e) = inbound_tx.send(envelope) {
1089 stats.errors.fetch_add(1, Ordering::Relaxed);
1090 tracing::warn!(
1091 "Failed to send envelope to inbound channel: {e:?}"
1092 );
1093 }
1094 }
1095 Err(e) => {
1096 stats.errors.fetch_add(1, Ordering::Relaxed);
1097 tracing::warn!("Failed to decode SignalingEnvelope: {e}");
1098 }
1099 }
1100 }
1101 Ok(tokio_tungstenite::tungstenite::Message::Pong(payload)) => {
1102 tracing::debug!("Received pong");
1103 last_pong.store(current_unix_secs(), Ordering::Release);
1104 if let Some(sender) = pending_pongs.lock().await.remove(&payload.to_vec()) {
1105 let _ = sender.send(());
1106 }
1107 }
1108 Ok(tokio_tungstenite::tungstenite::Message::Ping(_)) => {
1109 tracing::debug!("Received ping");
1110 last_pong.store(current_unix_secs(), Ordering::Release);
1111 }
1112 Ok(other) => {
1113 tracing::warn!("Received non-binary frame, ignoring: {other:?}");
1114 }
1115 Err(e) => {
1116 stats.errors.fetch_add(1, Ordering::Relaxed);
1117 tracing::error!("Signaling receive error: {e}");
1118 break;
1119 }
1120 }
1121 }
1122
1123 tracing::warn!("Stream terminated");
1124 let was_connected = connected.swap(false, Ordering::AcqRel);
1128 Self::publish_disconnected_transition(
1129 was_connected,
1130 &stats,
1131 &event_tx,
1132 hook_callback,
1133 DisconnectReason::StreamEnded,
1134 reconnect_enabled.then_some(&reconnect_notify),
1135 )
1136 .await;
1137 pending_pongs.lock().await.clear();
1138 });
1139
1140 *self.receiver_task.lock().await = Some(handle);
1141 }
1142
1143 async fn start_ping_task(&self) {
1146 let mut existing = self.ping_task.lock().await;
1147 if let Some(handle) = existing.as_ref() {
1148 if handle.is_finished() {
1149 existing.take();
1150 } else {
1151 return;
1152 }
1153 }
1154
1155 let sink = self.ws_sink.clone();
1156 let connected = self.connected.clone();
1157 let stats = self.stats.clone();
1158 let event_tx = self.event_tx.clone();
1159 let last_pong = self.last_pong.clone();
1160 let receiver_task_clone = Arc::clone(&self.receiver_task);
1161 let reconnect_notify = self.reconnect_notify.clone();
1162 let reconnect_enabled = self.config.reconnect_config.enabled;
1163 let hook_callback = self.hook_callback.get().cloned();
1164
1165 let handle = tokio::spawn(async move {
1166 loop {
1167 tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
1168
1169 if !connected.load(Ordering::Acquire) {
1170 break;
1171 }
1172
1173 let mut disconnect_reason = None;
1175 {
1176 let mut sink_guard = sink.lock().await;
1177 if let Some(sink) = sink_guard.as_mut() {
1178 match tokio::time::timeout(
1179 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1180 sink.send(tokio_tungstenite::tungstenite::Message::Ping(
1181 Vec::new().into(),
1182 )),
1183 )
1184 .await
1185 {
1186 Ok(Ok(())) => {}
1187 Ok(Err(e)) => {
1188 tracing::warn!("Signaling ping send failed: {e}");
1189 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1190 }
1191 Err(_) => {
1192 tracing::warn!("Signaling ping send timed out");
1193 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1194 }
1195 }
1196 } else {
1197 tracing::warn!("Signaling not connected");
1198 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1199 }
1200 }
1201
1202 if let Some(reason) = disconnect_reason {
1203 let was_connected = connected.swap(false, Ordering::AcqRel);
1204 Self::publish_disconnected_transition(
1205 was_connected,
1206 &stats,
1207 &event_tx,
1208 hook_callback.clone(),
1209 reason,
1210 reconnect_enabled.then_some(&reconnect_notify),
1211 )
1212 .await;
1213 break;
1214 }
1215
1216 let now = current_unix_secs();
1218 let last = last_pong.load(Ordering::Acquire);
1219 if now.saturating_sub(last) > PONG_TIMEOUT_SECS {
1220 tracing::warn!(
1221 "Signaling pong timeout (last seen {}s ago), marking disconnected",
1222 now.saturating_sub(last)
1223 );
1224 if let Some(handle) = receiver_task_clone.lock().await.take() {
1225 handle.abort();
1226 }
1227 let was_connected = connected.swap(false, Ordering::AcqRel);
1228 Self::publish_disconnected_transition(
1229 was_connected,
1230 &stats,
1231 &event_tx,
1232 hook_callback.clone(),
1233 DisconnectReason::PongTimeout,
1234 reconnect_enabled.then_some(&reconnect_notify),
1235 )
1236 .await;
1237 break;
1238 }
1239 }
1240 });
1241
1242 *existing = Some(handle);
1243 }
1244
1245 async fn disconnect_internal(&self, suppress_auto_reconnect: bool) -> NetworkResult<()> {
1246 if suppress_auto_reconnect {
1247 self.reconnect_generation.fetch_add(1, Ordering::AcqRel);
1248 self.auto_reconnect_suppressed
1249 .store(true, Ordering::Release);
1250 self.reconnect_notify.notify_waiters();
1251 }
1252
1253 self.drop_pending_replies("signaling disconnect").await;
1254 self.drop_pending_pongs("signaling disconnect").await;
1255 let was_connected = self.connected.swap(false, Ordering::AcqRel);
1256
1257 let ping_handle = match tokio::time::timeout(
1262 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1263 self.ping_task.lock(),
1264 )
1265 .await
1266 {
1267 Ok(mut task_guard) => task_guard.take(),
1268 Err(_) => {
1269 tracing::warn!("Timed out waiting for signaling ping task lock during disconnect");
1270 None
1271 }
1272 };
1273 if let Some(handle) = ping_handle {
1274 handle.abort();
1275 }
1276
1277 let receiver_handle = match tokio::time::timeout(
1278 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1279 self.receiver_task.lock(),
1280 )
1281 .await
1282 {
1283 Ok(mut task_guard) => task_guard.take(),
1284 Err(_) => {
1285 tracing::warn!(
1286 "Timed out waiting for signaling receiver task lock during disconnect"
1287 );
1288 None
1289 }
1290 };
1291 if let Some(handle) = receiver_handle {
1292 handle.abort();
1293 }
1294
1295 let sink = match tokio::time::timeout(
1299 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1300 self.ws_sink.lock(),
1301 )
1302 .await
1303 {
1304 Ok(mut sink_guard) => sink_guard.take(),
1305 Err(_) => {
1306 tracing::warn!(
1307 "Timed out waiting for signaling WebSocket sink lock during disconnect"
1308 );
1309 None
1310 }
1311 };
1312
1313 if let Some(mut sink) = sink {
1314 match tokio::time::timeout(
1315 std::time::Duration::from_secs(DISCONNECT_CLOSE_TIMEOUT_SECS),
1316 sink.close(),
1317 )
1318 .await
1319 {
1320 Ok(Ok(())) => {}
1321 Ok(Err(e)) => {
1322 tracing::warn!("Signaling WebSocket close failed during disconnect: {}", e);
1323 }
1324 Err(_) => {
1325 tracing::warn!(
1326 "Signaling WebSocket close timed out during disconnect; continuing cleanup"
1327 );
1328 }
1329 }
1330 }
1331
1332 match tokio::time::timeout(
1333 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1334 self.ws_stream.lock(),
1335 )
1336 .await
1337 {
1338 Ok(mut stream_guard) => {
1339 stream_guard.take();
1340 }
1341 Err(_) => {
1342 tracing::warn!(
1343 "Timed out waiting for signaling WebSocket stream lock during disconnect"
1344 );
1345 }
1346 }
1347
1348 self.reset_inbound_channel().await;
1349
1350 Self::publish_disconnected_transition(
1352 was_connected,
1353 &self.stats,
1354 &self.event_tx,
1355 self.hook_callback.get().cloned(),
1356 DisconnectReason::Manual,
1357 None,
1358 )
1359 .await;
1360
1361 Ok(())
1362 }
1363
1364 async fn connect_once_for_auto_reconnect(&self, generation: u64) -> NetworkResult<()> {
1365 if self.auto_reconnect_cancelled(generation) {
1366 return Err(NetworkError::ConnectionError(
1367 "Signaling auto-reconnect was cancelled".to_string(),
1368 ));
1369 }
1370
1371 if self.connected.load(Ordering::Acquire) {
1372 tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1373 return Ok(());
1374 }
1375
1376 match self
1377 .connecting
1378 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1379 {
1380 Ok(_) => {}
1381 Err(_) => {
1382 if self.connected.load(Ordering::Acquire) {
1383 tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1384 return Ok(());
1385 }
1386
1387 tracing::debug!(
1388 "Another connection attempt in progress, waiting for state change..."
1389 );
1390 let result = self.wait_for_connection_result().await;
1391 if self.auto_reconnect_cancelled(generation) {
1392 return Err(NetworkError::ConnectionError(
1393 "Signaling auto-reconnect was cancelled".to_string(),
1394 ));
1395 }
1396 return result;
1397 }
1398 }
1399
1400 if self.auto_reconnect_cancelled(generation) {
1401 self.connecting.store(false, Ordering::Release);
1402 return Err(NetworkError::ConnectionError(
1403 "Signaling auto-reconnect was cancelled".to_string(),
1404 ));
1405 }
1406
1407 if self.connected.load(Ordering::Acquire) {
1408 tracing::debug!("Connection completed by another task while acquiring lock");
1409 self.connecting.store(false, Ordering::Release);
1410 return Ok(());
1411 }
1412
1413 tracing::debug!(
1414 generation,
1415 "Acquired connection lock, establishing one auto-reconnect signaling attempt..."
1416 );
1417
1418 let result = self
1419 .establish_connection_once_for_auto_reconnect(generation)
1420 .await;
1421 self.connecting.store(false, Ordering::Release);
1422
1423 match result {
1424 Ok(()) => {
1425 if self.auto_reconnect_cancelled(generation) {
1426 self.disconnect_internal(false).await?;
1427 return Err(NetworkError::ConnectionError(
1428 "Signaling auto-reconnect was cancelled".to_string(),
1429 ));
1430 }
1431 self.start_receiver().await;
1432 self.start_ping_task().await;
1433 Ok(())
1434 }
1435 Err(e) => {
1436 if !self.auto_reconnect_cancelled(generation) {
1437 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1438 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1439 });
1440 tracing::error!("Connection attempt failed: {e}");
1441 }
1442 Err(e)
1443 }
1444 }
1445 }
1446
1447 async fn wait_for_connection_result(&self) -> NetworkResult<()> {
1451 let mut event_rx = self.event_tx.subscribe();
1452 let deadline = tokio::time::Instant::now()
1453 + std::time::Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS);
1454
1455 loop {
1456 tokio::select! {
1457 _ = tokio::time::sleep_until(deadline) => {
1458 if self.connected.load(Ordering::Acquire) {
1460 tracing::debug!("Connection succeeded just before timeout");
1461 return Ok(());
1462 }
1463 return Err(NetworkError::ConnectionError(
1464 "Timeout waiting for concurrent connection attempt".to_string(),
1465 ));
1466 }
1467 result = event_rx.recv() => {
1468 match result {
1469 Ok(SignalingEvent::Connected) => {
1470 tracing::debug!("Connection established by another task");
1471 return Ok(());
1472 }
1473 Ok(SignalingEvent::Disconnected { reason }) => {
1474 return Err(NetworkError::ConnectionError(format!(
1475 "Concurrent signaling connection failed: {reason:?}"
1476 )));
1477 }
1478 Ok(_) => continue, Err(broadcast::error::RecvError::Lagged(n)) => {
1480 tracing::warn!("Event receiver lagged by {n} events");
1481 if self.connected.load(Ordering::Acquire) {
1483 return Ok(());
1484 }
1485 continue;
1486 }
1487 Err(broadcast::error::RecvError::Closed) => {
1488 return Err(NetworkError::ConnectionError(
1489 "Event channel closed while waiting for connection".to_string(),
1490 ));
1491 }
1492 }
1493 }
1494 }
1495 }
1496 }
1497}
1498
1499#[async_trait]
1500impl SignalingClient for WebSocketSignalingClient {
1501 async fn connect(&self) -> NetworkResult<()> {
1502 match self
1507 .connecting
1508 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1509 {
1510 Ok(_) => {
1511 }
1514 Err(_) => {
1515 if self.connected.load(Ordering::Acquire) {
1518 tracing::debug!("Already connected, skipping connect()");
1519 return Ok(());
1520 }
1521
1522 tracing::debug!(
1524 "Another connection attempt in progress, waiting for state change..."
1525 );
1526 return self.wait_for_connection_result().await;
1527 }
1528 }
1529
1530 if self.connected.load(Ordering::Acquire) {
1535 tracing::debug!("Connection completed by another task while acquiring lock");
1536 self.connecting.store(false, Ordering::Release);
1537 return Ok(());
1538 }
1539
1540 tracing::debug!("Acquired connection lock, establishing connection...");
1541
1542 let result = self.connect_with_retries().await;
1544
1545 self.connecting.store(false, Ordering::Release);
1547
1548 match result {
1550 Ok(()) => {
1551 self.start_receiver().await;
1552 self.start_ping_task().await;
1553 Ok(())
1554 }
1555 Err(e) => {
1556 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1558 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1559 });
1560 tracing::error!("Connection failed: {e}");
1561 Err(e)
1562 }
1563 }
1564 }
1565
1566 async fn connect_once(&self) -> NetworkResult<()> {
1567 if self.connected.load(Ordering::Acquire) {
1568 tracing::debug!("Already connected, skipping connect_once()");
1569 return Ok(());
1570 }
1571
1572 match self
1573 .connecting
1574 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1575 {
1576 Ok(_) => {}
1577 Err(_) => {
1578 if self.connected.load(Ordering::Acquire) {
1579 tracing::debug!("Already connected, skipping connect_once()");
1580 return Ok(());
1581 }
1582
1583 tracing::debug!(
1584 "Another connection attempt in progress, waiting for state change..."
1585 );
1586 return self.wait_for_connection_result().await;
1587 }
1588 }
1589
1590 if self.connected.load(Ordering::Acquire) {
1591 tracing::debug!("Connection completed by another task while acquiring lock");
1592 self.connecting.store(false, Ordering::Release);
1593 return Ok(());
1594 }
1595
1596 tracing::debug!(
1597 "Acquired connection lock, establishing one signaling connection attempt..."
1598 );
1599
1600 let result = self.establish_connection_once().await;
1601 self.connecting.store(false, Ordering::Release);
1602
1603 match result {
1604 Ok(()) => {
1605 self.start_receiver().await;
1606 self.start_ping_task().await;
1607 Ok(())
1608 }
1609 Err(e) => {
1610 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1611 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1612 });
1613 tracing::error!("Connection attempt failed: {e}");
1614 Err(e)
1615 }
1616 }
1617 }
1618
1619 async fn disconnect(&self) -> NetworkResult<()> {
1620 self.disconnect_internal(true).await
1621 }
1622
1623 async fn probe_alive(&self, timeout: Duration) -> NetworkResult<()> {
1624 if !self.connected.load(Ordering::Acquire) {
1625 return Err(NetworkError::ConnectionError(
1626 "Signaling client is not connected".to_string(),
1627 ));
1628 }
1629
1630 let probe_id = self.probe_counter.fetch_add(1, Ordering::Relaxed) + 1;
1631 let payload =
1632 format!("actr-signaling-probe-{probe_id}-{}", current_unix_secs()).into_bytes();
1633 let (tx, rx) = oneshot::channel();
1634 self.pending_pongs.lock().await.insert(payload.clone(), tx);
1635
1636 let send_result = {
1637 let mut sink_guard = self.ws_sink.lock().await;
1638 match sink_guard.as_mut() {
1639 Some(sink) => sink
1640 .send(tokio_tungstenite::tungstenite::Message::Ping(
1641 payload.clone().into(),
1642 ))
1643 .await
1644 .map_err(|e| {
1645 NetworkError::ConnectionError(format!("Signaling probe ping failed: {e}"))
1646 }),
1647 None => Err(NetworkError::ConnectionError(
1648 "Signaling probe failed: WebSocket sink is not available".to_string(),
1649 )),
1650 }
1651 };
1652
1653 if let Err(e) = send_result {
1654 self.pending_pongs.lock().await.remove(&payload);
1655 let was_connected = self.connected.swap(false, Ordering::AcqRel);
1656 Self::publish_disconnected_transition(
1657 was_connected,
1658 &self.stats,
1659 &self.event_tx,
1660 self.hook_callback.get().cloned(),
1661 DisconnectReason::PingSendFailed,
1662 None,
1663 )
1664 .await;
1665 return Err(e);
1666 }
1667
1668 match tokio::time::timeout(timeout, rx).await {
1669 Ok(Ok(())) => {
1670 self.last_pong.store(current_unix_secs(), Ordering::Release);
1671 Ok(())
1672 }
1673 Ok(Err(_)) => {
1674 self.pending_pongs.lock().await.remove(&payload);
1675 Err(NetworkError::ConnectionError(
1676 "Signaling probe pong waiter dropped".to_string(),
1677 ))
1678 }
1679 Err(_) => {
1680 self.pending_pongs.lock().await.remove(&payload);
1681 Err(NetworkError::TimeoutError(format!(
1682 "Timed out waiting for signaling probe pong after {}ms",
1683 timeout.as_millis()
1684 )))
1685 }
1686 }
1687 }
1688
1689 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1690 async fn send_register_request(
1691 &self,
1692 request: RegisterRequest,
1693 ) -> NetworkResult<RegisterResponse> {
1694 let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
1696 payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
1697 });
1698
1699 let envelope = self.create_envelope(flow).await;
1700 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1701
1702 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1703 {
1704 if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
1705 server_to_actr.payload
1706 {
1707 return Ok(response);
1708 }
1709 }
1710
1711 Err(NetworkError::ConnectionError(
1712 "Invalid registration response".to_string(),
1713 ))
1714 }
1715
1716 #[cfg_attr(
1717 feature = "opentelemetry",
1718 tracing::instrument(skip_all, fields(actor_id = %actor_id))
1719 )]
1720 async fn send_unregister_request(
1721 &self,
1722 actor_id: ActrId,
1723 credential: AIdCredential,
1724 reason: Option<String>,
1725 ) -> NetworkResult<UnregisterResponse> {
1726 let request = UnregisterRequest {
1728 actr_id: actor_id.clone(),
1729 reason,
1730 };
1731
1732 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1734 source: actor_id,
1735 credential,
1736 payload: Some(actr_to_signaling::Payload::UnregisterRequest(request)),
1737 });
1738
1739 let envelope = self.create_envelope(flow).await;
1741 self.send_envelope(envelope).await?;
1742
1743 Ok(UnregisterResponse {
1748 result: Some(actr_protocol::unregister_response::Result::Success(
1749 actr_protocol::unregister_response::UnregisterOk {},
1750 )),
1751 })
1752 }
1753
1754 #[cfg_attr(
1755 feature = "opentelemetry",
1756 tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1757 )]
1758 async fn send_heartbeat(
1759 &self,
1760 actor_id: ActrId,
1761 credential: AIdCredential,
1762 availability: ServiceAvailabilityState,
1763 power_reserve: f32,
1764 mailbox_backlog: f32,
1765 ) -> NetworkResult<Pong> {
1766 let ping = Ping {
1767 availability: availability as i32,
1768 power_reserve,
1769 mailbox_backlog,
1770 sticky_client_ids: vec![], };
1772
1773 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1774 source: actor_id,
1775 credential,
1776 payload: Some(actr_to_signaling::Payload::Ping(ping)),
1777 });
1778
1779 let envelope = self.create_envelope(flow).await;
1780 let reply_for = envelope.envelope_id.clone();
1781
1782 let (tx, rx) = oneshot::channel();
1784 self.pending_replies
1785 .lock()
1786 .await
1787 .insert(reply_for.clone(), tx);
1788
1789 if let Err(e) = self.send_envelope(envelope).await {
1790 self.pending_replies.lock().await.remove(&reply_for);
1792 return Err(e);
1793 }
1794
1795 let response_envelope = rx.await.map_err(|_| {
1797 NetworkError::ConnectionError(
1798 "Receiver dropped while waiting for heartbeat response".to_string(),
1799 )
1800 })?;
1801
1802 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1804 {
1805 match server_to_actr.payload {
1806 Some(signaling_to_actr::Payload::Pong(pong)) => {
1807 return Ok(pong);
1808 }
1809 Some(signaling_to_actr::Payload::Error(err)) => {
1810 if err.code == 401 {
1812 return Err(NetworkError::CredentialExpired(err.message));
1813 }
1814 return Err(NetworkError::AuthenticationError(format!(
1815 "{} ({})",
1816 err.message, err.code
1817 )));
1818 }
1819 _ => {}
1820 }
1821 }
1822
1823 Err(NetworkError::ConnectionError(
1824 "Received response but not a Pong message".to_string(),
1825 ))
1826 }
1827
1828 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1829 async fn send_route_candidates_request(
1830 &self,
1831 actor_id: ActrId,
1832 credential: AIdCredential,
1833 request: RouteCandidatesRequest,
1834 ) -> NetworkResult<RouteCandidatesResponse> {
1835 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1836 source: actor_id,
1837 credential,
1838 payload: Some(actr_to_signaling::Payload::RouteCandidatesRequest(request)),
1839 });
1840
1841 let envelope = self.create_envelope(flow).await;
1842 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1843
1844 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1845 {
1846 match server_to_actr.payload {
1847 Some(signaling_to_actr::Payload::RouteCandidatesResponse(response)) => {
1848 return Ok(response);
1849 }
1850 Some(signaling_to_actr::Payload::Error(err)) => {
1851 return Err(NetworkError::ServiceDiscoveryError(format!(
1852 "{} ({})",
1853 err.message, err.code
1854 )));
1855 }
1856 _ => {}
1857 }
1858 }
1859
1860 Err(NetworkError::ConnectionError(
1861 "Invalid route candidates response".to_string(),
1862 ))
1863 }
1864
1865 async fn get_signing_key(
1866 &self,
1867 actor_id: ActrId,
1868 credential: AIdCredential,
1869 key_id: u32,
1870 ) -> NetworkResult<(u32, Vec<u8>)> {
1871 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1872 source: actor_id,
1873 credential,
1874 payload: Some(actr_to_signaling::Payload::GetSigningKeyRequest(
1875 GetSigningKeyRequest { key_id },
1876 )),
1877 });
1878
1879 let envelope = self.create_envelope(flow).await;
1880 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1881
1882 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1883 {
1884 match server_to_actr.payload {
1885 Some(signaling_to_actr::Payload::GetSigningKeyResponse(resp)) => {
1886 return Ok((resp.key_id, resp.pubkey.to_vec()));
1887 }
1888 Some(signaling_to_actr::Payload::Error(err)) => {
1889 return Err(NetworkError::ConnectionError(format!(
1890 "get_signing_key failed: {} ({})",
1891 err.message, err.code
1892 )));
1893 }
1894 _ => {}
1895 }
1896 }
1897
1898 Err(NetworkError::ConnectionError(
1899 "get_signing_key: invalid response".to_string(),
1900 ))
1901 }
1902
1903 #[cfg_attr(
1904 feature = "opentelemetry",
1905 tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1906 )]
1907 async fn send_credential_update_request(
1908 &self,
1909 actor_id: ActrId,
1910 credential: AIdCredential,
1911 ) -> NetworkResult<RegisterResponse> {
1912 let request = CredentialUpdateRequest {
1913 actr_id: actor_id.clone(),
1914 };
1915
1916 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1917 source: actor_id,
1918 credential,
1919 payload: Some(actr_to_signaling::Payload::CredentialUpdateRequest(request)),
1920 });
1921
1922 let envelope = self.create_envelope(flow).await;
1923 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1924
1925 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1926 {
1927 match server_to_actr.payload {
1928 Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
1929 return Ok(response);
1930 }
1931 Some(signaling_to_actr::Payload::Error(err)) => {
1932 return Err(NetworkError::ConnectionError(format!(
1933 "Credential update failed: {} ({})",
1934 err.message, err.code
1935 )));
1936 }
1937 _ => {}
1938 }
1939 }
1940
1941 Err(NetworkError::ConnectionError(
1942 "Invalid credential update response".to_string(),
1943 ))
1944 }
1945
1946 #[cfg_attr(
1947 feature = "opentelemetry",
1948 tracing::instrument(level = "debug", skip_all, fields(envelope_id = %envelope.envelope_id))
1949 )]
1950 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
1951 #[cfg(feature = "opentelemetry")]
1952 let envelope = {
1953 let mut envelope = envelope;
1954 trace::inject_span_context(&tracing::Span::current(), &mut envelope);
1955 envelope
1956 };
1957
1958 if !self.is_connected() {
1961 return Err(NetworkError::ConnectionError(
1962 "Cannot send: WebSocket not connected".to_string(),
1963 ));
1964 }
1965
1966 let mut sink_guard = self.ws_sink.lock().await;
1967
1968 if let Some(sink) = sink_guard.as_mut() {
1969 let mut buf = Vec::new();
1971 envelope.encode(&mut buf)?;
1972 let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
1973 match tokio::time::timeout(
1974 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1975 sink.send(msg),
1976 )
1977 .await
1978 {
1979 Ok(Ok(())) => {}
1980 Ok(Err(e)) => return Err(e.into()),
1981 Err(_) => {
1982 self.connected.store(false, Ordering::Release);
1983 return Err(NetworkError::ConnectionError(
1984 "Signaling WebSocket send timed out".to_string(),
1985 ));
1986 }
1987 }
1988
1989 self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1990 tracing::debug!("Stats: {:?}", self.stats.snapshot());
1991 Ok(())
1992 } else {
1993 Err(NetworkError::ConnectionError("Not connected".to_string()))
1994 }
1995 }
1996
1997 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
1998 let mut rx = self.inbound_rx.lock().await;
1999 match rx.recv().await {
2000 Some(envelope) => Ok(Some(envelope)),
2001 None => {
2002 tracing::error!("Inbound channel closed");
2003 Err(NetworkError::ConnectionError(
2004 "Inbound channel closed".to_string(),
2005 ))
2006 }
2007 }
2008 }
2009
2010 fn is_connected(&self) -> bool {
2011 self.connected.load(Ordering::Acquire)
2012 }
2013
2014 fn get_stats(&self) -> SignalingStats {
2015 self.stats.snapshot()
2016 }
2017
2018 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2019 self.event_tx.subscribe()
2020 }
2021
2022 async fn set_actor_id(&self, actor_id: ActrId) {
2023 *self.actor_id.lock().await = Some(actor_id);
2024 }
2025
2026 async fn set_credential_state(&self, credential_state: CredentialState) {
2027 *self.credential_state.lock().await = Some(credential_state);
2028 }
2029
2030 async fn clear_identity(&self) {
2031 *self.actor_id.lock().await = None;
2032 *self.credential_state.lock().await = None;
2033 }
2034
2035 fn set_hook_callback(&self, cb: HookCallback) {
2036 let _ = self.hook_callback.set(cb);
2037 }
2038}
2039
2040#[derive(Debug)]
2042pub(crate) struct AtomicSignalingStats {
2043 pub connections: AtomicU64,
2045
2046 pub disconnections: AtomicU64,
2048
2049 pub messages_sent: AtomicU64,
2051
2052 pub messages_received: AtomicU64,
2054
2055 pub heartbeats_sent: AtomicU64,
2058
2059 pub heartbeats_received: AtomicU64,
2062
2063 pub errors: AtomicU64,
2065}
2066
2067impl Default for AtomicSignalingStats {
2068 fn default() -> Self {
2069 Self {
2070 connections: AtomicU64::new(0),
2071 disconnections: AtomicU64::new(0),
2072 messages_sent: AtomicU64::new(0),
2073 messages_received: AtomicU64::new(0),
2074 heartbeats_sent: AtomicU64::new(0),
2075 heartbeats_received: AtomicU64::new(0),
2076 errors: AtomicU64::new(0),
2077 }
2078 }
2079}
2080
2081#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
2083pub struct SignalingStats {
2084 pub connections: u64,
2086
2087 pub disconnections: u64,
2089
2090 pub messages_sent: u64,
2092
2093 pub messages_received: u64,
2095
2096 pub heartbeats_sent: u64,
2098
2099 pub heartbeats_received: u64,
2101
2102 pub errors: u64,
2104}
2105
2106impl AtomicSignalingStats {
2107 pub fn snapshot(&self) -> SignalingStats {
2109 SignalingStats {
2110 connections: self.connections.load(Ordering::Relaxed),
2111 disconnections: self.disconnections.load(Ordering::Relaxed),
2112 messages_sent: self.messages_sent.load(Ordering::Relaxed),
2113 messages_received: self.messages_received.load(Ordering::Relaxed),
2114 heartbeats_sent: self.heartbeats_sent.load(Ordering::Relaxed),
2115 heartbeats_received: self.heartbeats_received.load(Ordering::Relaxed),
2116 errors: self.errors.load(Ordering::Relaxed),
2117 }
2118 }
2119}
2120
2121fn current_unix_secs() -> u64 {
2122 use std::time::{SystemTime, UNIX_EPOCH};
2123 SystemTime::now()
2124 .duration_since(UNIX_EPOCH)
2125 .unwrap_or_default()
2126 .as_secs()
2127}
2128
2129#[cfg(test)]
2130mod tests {
2131 use super::*;
2132 use std::future::Future;
2133 use std::pin::Pin;
2134 use std::sync::atomic::{AtomicUsize, Ordering as UsizeOrdering};
2135
2136 struct FakeSignalingClient {
2138 event_tx: broadcast::Sender<SignalingEvent>,
2139 connected: AtomicBool,
2140 connect_calls: Arc<AtomicUsize>,
2141 actor_id: tokio::sync::Mutex<Option<ActrId>>,
2142 credential_state: tokio::sync::Mutex<Option<CredentialState>>,
2143 }
2144
2145 #[async_trait]
2146 impl SignalingClient for FakeSignalingClient {
2147 async fn connect(&self) -> NetworkResult<()> {
2148 self.connect_calls.fetch_add(1, UsizeOrdering::SeqCst);
2149 Ok(())
2150 }
2151
2152 async fn disconnect(&self) -> NetworkResult<()> {
2153 Ok(())
2154 }
2155
2156 async fn send_register_request(
2157 &self,
2158 _request: RegisterRequest,
2159 ) -> NetworkResult<RegisterResponse> {
2160 unimplemented!("not needed in tests");
2161 }
2162
2163 async fn send_unregister_request(
2164 &self,
2165 _actor_id: ActrId,
2166 _credential: AIdCredential,
2167 _reason: Option<String>,
2168 ) -> NetworkResult<UnregisterResponse> {
2169 unimplemented!("not needed in tests");
2170 }
2171
2172 async fn send_heartbeat(
2173 &self,
2174 _actor_id: ActrId,
2175 _credential: AIdCredential,
2176 _availability: ServiceAvailabilityState,
2177 _power_reserve: f32,
2178 _mailbox_backlog: f32,
2179 ) -> NetworkResult<Pong> {
2180 unimplemented!("not needed in tests");
2181 }
2182
2183 async fn send_route_candidates_request(
2184 &self,
2185 _actor_id: ActrId,
2186 _credential: AIdCredential,
2187 _request: RouteCandidatesRequest,
2188 ) -> NetworkResult<RouteCandidatesResponse> {
2189 unimplemented!("not needed in tests");
2190 }
2191
2192 async fn get_signing_key(
2193 &self,
2194 _actor_id: ActrId,
2195 _credential: AIdCredential,
2196 _key_id: u32,
2197 ) -> NetworkResult<(u32, Vec<u8>)> {
2198 unimplemented!("not needed in tests");
2199 }
2200
2201 async fn send_credential_update_request(
2202 &self,
2203 _actor_id: ActrId,
2204 _credential: AIdCredential,
2205 ) -> NetworkResult<RegisterResponse> {
2206 unimplemented!("not needed in tests");
2207 }
2208
2209 async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
2210 unimplemented!("not needed in tests");
2211 }
2212
2213 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2214 unimplemented!("not needed in tests");
2215 }
2216
2217 fn is_connected(&self) -> bool {
2218 self.connected.load(Ordering::SeqCst)
2219 }
2220
2221 fn get_stats(&self) -> SignalingStats {
2222 SignalingStats::default()
2223 }
2224
2225 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2226 self.event_tx.subscribe()
2227 }
2228
2229 async fn set_actor_id(&self, actor_id: ActrId) {
2230 *self.actor_id.lock().await = Some(actor_id);
2231 }
2232
2233 async fn set_credential_state(&self, credential_state: CredentialState) {
2234 *self.credential_state.lock().await = Some(credential_state);
2235 }
2236
2237 async fn clear_identity(&self) {
2238 *self.actor_id.lock().await = None;
2239 *self.credential_state.lock().await = None;
2240 }
2241 }
2242
2243 fn make_fake_client() -> Arc<FakeSignalingClient> {
2244 let (event_tx, _erx) = broadcast::channel(64);
2245 Arc::new(FakeSignalingClient {
2246 event_tx,
2247 connected: AtomicBool::new(false),
2248 connect_calls: Arc::new(AtomicUsize::new(0)),
2249 actor_id: tokio::sync::Mutex::new(None),
2250 credential_state: tokio::sync::Mutex::new(None),
2251 })
2252 }
2253
2254 fn make_config() -> SignalingConfig {
2256 SignalingConfig {
2257 server_url: Url::parse("ws://127.0.0.1:1/signaling/ws").unwrap(),
2258 connection_timeout: 2,
2259 heartbeat_interval: 30,
2260 reconnect_config: ReconnectConfig::default(),
2261 auth_config: None,
2262 webrtc_role: None,
2263 }
2264 }
2265
2266 fn make_ws_client(config: SignalingConfig) -> Arc<WebSocketSignalingClient> {
2268 Arc::new(WebSocketSignalingClient::new(config))
2269 }
2270
2271 #[tokio::test]
2272 async fn test_publish_disconnected_transition_fires_hook_once() {
2273 let stats = Arc::new(AtomicSignalingStats::default());
2274 let (event_tx, mut event_rx) = broadcast::channel(4);
2275 let hook_count = Arc::new(AtomicUsize::new(0));
2276 let hook_count_for_cb = hook_count.clone();
2277 let hook_callback: HookCallback = Arc::new(move |event| {
2278 let hook_count = hook_count_for_cb.clone();
2279 Box::pin(async move {
2280 if matches!(event, HookEvent::SignalingDisconnected) {
2281 hook_count.fetch_add(1, UsizeOrdering::SeqCst);
2282 }
2283 }) as Pin<Box<dyn Future<Output = ()> + Send>>
2284 });
2285
2286 let first = WebSocketSignalingClient::publish_disconnected_transition(
2287 true,
2288 &stats,
2289 &event_tx,
2290 Some(hook_callback.clone()),
2291 DisconnectReason::StreamEnded,
2292 None,
2293 )
2294 .await;
2295 assert!(
2296 first,
2297 "first connected->disconnected transition should publish"
2298 );
2299 assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2300 assert_eq!(stats.snapshot().disconnections, 1);
2301 assert!(matches!(
2302 event_rx.recv().await,
2303 Ok(SignalingEvent::Disconnected {
2304 reason: DisconnectReason::StreamEnded
2305 })
2306 ));
2307
2308 let second = WebSocketSignalingClient::publish_disconnected_transition(
2309 false,
2310 &stats,
2311 &event_tx,
2312 Some(hook_callback),
2313 DisconnectReason::PongTimeout,
2314 None,
2315 )
2316 .await;
2317 assert!(
2318 !second,
2319 "stale duplicate disconnected transition should be ignored"
2320 );
2321 assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2322 assert_eq!(stats.snapshot().disconnections, 1);
2323 assert!(event_rx.try_recv().is_err());
2324 }
2325
2326 #[test]
2331 fn test_reconnect_config_defaults() {
2332 let cfg = ReconnectConfig::default();
2333 assert!(cfg.enabled);
2334 assert_eq!(cfg.max_attempts, 10);
2335 assert_eq!(cfg.initial_delay, 1);
2336 assert_eq!(cfg.max_delay, 60);
2337 assert!((cfg.backoff_multiplier - 2.0).abs() < f64::EPSILON);
2338 }
2339
2340 #[test]
2345 fn test_websocket_signaling_client_initial_state_disconnected() {
2346 let client = WebSocketSignalingClient::new(make_config());
2347 assert!(
2348 !client.is_connected(),
2349 "newly created client should be Disconnected"
2350 );
2351 assert!(
2352 !client.connecting.load(Ordering::Acquire),
2353 "newly created client should not be in connecting state"
2354 );
2355 assert!(
2356 !client.reconnector_started.load(Ordering::Acquire),
2357 "reconnect manager should not be started automatically"
2358 );
2359 }
2360
2361 #[test]
2362 fn test_initial_stats_are_zero() {
2363 let client = WebSocketSignalingClient::new(make_config());
2364 let stats = client.get_stats();
2365 assert_eq!(stats.connections, 0);
2366 assert_eq!(stats.disconnections, 0);
2367 assert_eq!(stats.messages_sent, 0);
2368 assert_eq!(stats.messages_received, 0);
2369 assert_eq!(stats.errors, 0);
2370 }
2371
2372 #[test]
2373 fn test_signaling_url_log_redacts_credential_query_params() {
2374 let url = Url::parse(
2375 "wss://example.com/signaling?actor_id=abc&key_id=7&claims=claims-value&signature=signature-value&token=token-value",
2376 )
2377 .unwrap();
2378
2379 let redacted = WebSocketSignalingClient::redact_signaling_url_for_log(&url);
2380
2381 assert!(redacted.contains("actor_id=abc"));
2382 assert!(redacted.contains("key_id=7"));
2383 assert!(redacted.contains("claims=REDACTED"));
2384 assert!(redacted.contains("signature=REDACTED"));
2385 assert!(redacted.contains("token=REDACTED"));
2386 assert!(!redacted.contains("claims-value"));
2387 assert!(!redacted.contains("signature-value"));
2388 assert!(!redacted.contains("token-value"));
2389 }
2390
2391 #[tokio::test]
2396 async fn test_reconnect_manager_idempotent() {
2397 let client = make_ws_client(make_config());
2398
2399 client.start_reconnect_manager();
2401 assert!(
2402 client.reconnector_started.load(Ordering::Acquire),
2403 "reconnector_started should be true after first call"
2404 );
2405
2406 client.start_reconnect_manager();
2408 assert!(client.reconnector_started.load(Ordering::Acquire));
2410 }
2411
2412 #[tokio::test]
2413 async fn test_reconnect_manager_disabled_when_config_disabled() {
2414 let mut config = make_config();
2415 config.reconnect_config.enabled = false;
2416 let client = make_ws_client(config);
2417
2418 client.start_reconnect_manager();
2419 assert!(
2420 !client.reconnector_started.load(Ordering::Acquire),
2421 "reconnect manager should not start when reconnect config is disabled"
2422 );
2423 }
2424
2425 #[tokio::test]
2426 async fn test_reconnect_manager_does_not_keep_client_alive() {
2427 let client = make_ws_client(make_config());
2428 let weak = Arc::downgrade(&client);
2429
2430 client.start_reconnect_manager();
2431 drop(client);
2432
2433 assert!(
2434 weak.upgrade().is_none(),
2435 "reconnect manager must not keep signaling client alive after owner drop"
2436 );
2437 }
2438
2439 #[tokio::test]
2444 async fn test_connect_fast_path_when_already_connected() {
2445 let client = make_ws_client(make_config());
2446 client.connected.store(true, Ordering::Release);
2448
2449 let result = client.connect().await;
2451 assert!(
2452 result.is_ok(),
2453 "connect() should return Ok when already connected"
2454 );
2455 assert!(!client.connecting.load(Ordering::Acquire));
2457 }
2458
2459 #[tokio::test]
2460 async fn test_connect_sets_connecting_flag() {
2461 let mut config = make_config();
2462 config.reconnect_config.enabled = false; config.connection_timeout = 1;
2464 let client = make_ws_client(config);
2465
2466 let result = client.connect().await;
2468 assert!(
2469 result.is_err(),
2470 "connecting to unreachable address should fail"
2471 );
2472 assert!(
2473 !client.connecting.load(Ordering::Acquire),
2474 "connecting flag should be cleared after connection failure"
2475 );
2476 }
2477
2478 #[tokio::test]
2483 async fn test_event_subscribe_receives_events() {
2484 let client = make_ws_client(make_config());
2485 let mut rx = client.subscribe_events();
2486
2487 let _ = client.event_tx.send(SignalingEvent::Connected);
2489
2490 match tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await {
2491 Ok(Ok(SignalingEvent::Connected)) => {} other => panic!("expected Connected event, but got {:?}", other),
2493 }
2494 }
2495
2496 #[tokio::test]
2497 async fn test_disconnect_event_on_connect_failure() {
2498 let mut config = make_config();
2499 config.reconnect_config.enabled = false;
2500 config.connection_timeout = 1;
2501 let client = make_ws_client(config);
2502 let mut rx = client.subscribe_events();
2503
2504 let _ = client.connect().await;
2506
2507 match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
2509 Ok(Ok(SignalingEvent::Disconnected {
2510 reason: DisconnectReason::ConnectionFailed(_),
2511 })) => {} other => panic!(
2513 "expected Disconnected(ConnectionFailed) event, but got {:?}",
2514 other
2515 ),
2516 }
2517 }
2518
2519 #[tokio::test]
2524 async fn test_disconnect_clears_connected_flag() {
2525 let client = make_ws_client(make_config());
2526 client.connected.store(true, Ordering::Release);
2528 assert!(client.is_connected());
2529
2530 let result = client.disconnect().await;
2531 assert!(result.is_ok());
2532 assert!(
2533 !client.is_connected(),
2534 "should be Disconnected after disconnect()"
2535 );
2536 }
2537
2538 #[tokio::test]
2539 async fn test_disconnect_increments_disconnection_stat() {
2540 let client = make_ws_client(make_config());
2541 client.connected.store(true, Ordering::Release);
2542
2543 let stats_before = client.get_stats().disconnections;
2544 let _ = client.disconnect().await;
2545 let stats_after = client.get_stats().disconnections;
2546 assert_eq!(
2547 stats_after,
2548 stats_before + 1,
2549 "disconnect() should increment disconnection count"
2550 );
2551 }
2552
2553 #[tokio::test]
2554 async fn test_disconnect_idempotent() {
2555 let client = make_ws_client(make_config());
2556
2557 let r1 = client.disconnect().await;
2559 let r2 = client.disconnect().await;
2560 assert!(r1.is_ok());
2561 assert!(r2.is_ok());
2562 assert!(!client.is_connected());
2563 }
2564
2565 #[tokio::test]
2570 async fn test_reconnect_notify_wakes_waiter() {
2571 let notify = Arc::new(tokio::sync::Notify::new());
2572 let notify_clone = notify.clone();
2573 let woken = Arc::new(AtomicBool::new(false));
2574 let woken_clone = woken.clone();
2575
2576 let handle = tokio::spawn(async move {
2577 notify_clone.notified().await;
2578 woken_clone.store(true, Ordering::Release);
2579 });
2580
2581 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2583 assert!(
2584 !woken.load(Ordering::Acquire),
2585 "should not be woken before notification"
2586 );
2587
2588 notify.notify_one();
2590 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2591 assert!(
2592 woken.load(Ordering::Acquire),
2593 "should be woken after notification"
2594 );
2595
2596 handle.abort();
2597 }
2598
2599 #[tokio::test]
2600 async fn test_explicit_disconnect_suppresses_reconnect_cycle_in_backoff() {
2601 let mut config = make_config();
2602 config.connection_timeout = 1;
2603 config.reconnect_config = ReconnectConfig {
2604 enabled: true,
2605 max_attempts: 5,
2606 initial_delay: 1,
2607 max_delay: 1,
2608 backoff_multiplier: 1.0,
2609 };
2610 let client = make_ws_client(config);
2611 let mut rx = client.subscribe_events();
2612
2613 let reconnect_client = client.clone();
2614 let reconnect_task = tokio::spawn(async move {
2615 reconnect_client.run_reconnect_cycle().await;
2616 });
2617
2618 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2619 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2620 other => panic!("expected first reconnect attempt, got {other:?}"),
2621 }
2622
2623 client
2624 .disconnect()
2625 .await
2626 .expect("explicit disconnect should be idempotent");
2627
2628 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2629 .await
2630 .expect("suppressed reconnect cycle should exit promptly")
2631 .expect("reconnect task should not panic");
2632
2633 while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
2634 {
2635 if let SignalingEvent::ConnectStart { attempt } = event {
2636 panic!("suppressed reconnect cycle sent unexpected attempt {attempt}");
2637 }
2638 }
2639
2640 assert!(
2641 client.auto_reconnect_suppressed.load(Ordering::Acquire),
2642 "explicit disconnect should suppress stale auto-reconnect cycles"
2643 );
2644 }
2645
2646 #[tokio::test]
2647 async fn test_explicit_disconnect_suppresses_in_flight_auto_reconnect_connected_event() {
2648 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2649 .await
2650 .expect("test listener should bind");
2651 let server_url = format!(
2652 "ws://{}/signaling/ws",
2653 listener
2654 .local_addr()
2655 .expect("test listener should have local addr")
2656 );
2657 let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
2658
2659 let server_task = tokio::spawn(async move {
2660 let (stream, _) = listener
2661 .accept()
2662 .await
2663 .expect("test server should accept tcp connection");
2664 let _ = release_rx.await;
2665 let ws_stream = tokio_tungstenite::accept_async(stream)
2666 .await
2667 .expect("test server should complete websocket handshake");
2668 tokio::time::sleep(Duration::from_millis(100)).await;
2669 drop(ws_stream);
2670 });
2671
2672 let mut config = make_config();
2673 config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2674 config.connection_timeout = 5;
2675 config.reconnect_config = ReconnectConfig {
2676 enabled: true,
2677 max_attempts: 3,
2678 initial_delay: 1,
2679 max_delay: 1,
2680 backoff_multiplier: 1.0,
2681 };
2682 let client = make_ws_client(config);
2683 let mut rx = client.subscribe_events();
2684
2685 let reconnect_client = client.clone();
2686 let reconnect_task = tokio::spawn(async move {
2687 reconnect_client.run_reconnect_cycle().await;
2688 });
2689
2690 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2691 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2692 other => panic!("expected first reconnect attempt, got {other:?}"),
2693 }
2694
2695 client
2696 .disconnect()
2697 .await
2698 .expect("explicit disconnect should cancel the in-flight auto-reconnect");
2699 release_tx
2700 .send(())
2701 .expect("test server handshake should still be waiting");
2702
2703 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2704 .await
2705 .expect("cancelled in-flight reconnect should exit promptly")
2706 .expect("reconnect task should not panic");
2707
2708 while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(150), rx.recv()).await
2709 {
2710 assert!(
2711 !matches!(event, SignalingEvent::Connected),
2712 "cancelled auto-reconnect must not publish Connected"
2713 );
2714 }
2715
2716 assert!(
2717 !client.is_connected(),
2718 "cancelled auto-reconnect must not leave signaling connected"
2719 );
2720
2721 tokio::time::timeout(Duration::from_secs(1), server_task)
2722 .await
2723 .expect("test server task should finish")
2724 .expect("test server task should not panic");
2725 }
2726
2727 #[tokio::test]
2732 async fn test_build_url_without_identity() {
2733 let config = make_config();
2734 let expected_base = config.server_url.to_string();
2735 let client = WebSocketSignalingClient::new(config);
2736
2737 let url = client.build_url_with_identity().await;
2738 assert_eq!(
2739 url.to_string(),
2740 expected_base,
2741 "URL should not contain identity parameters when actor_id is not set"
2742 );
2743 }
2744
2745 #[tokio::test]
2746 async fn test_build_url_with_webrtc_role() {
2747 let mut config = make_config();
2748 config.webrtc_role = Some("answer".to_string());
2749 let client = WebSocketSignalingClient::new(config);
2750
2751 let url = client.build_url_with_identity().await;
2752 assert!(
2753 url.query().unwrap_or("").contains("webrtc_role=answer"),
2754 "URL should contain webrtc_role parameter, actual URL: {}",
2755 url
2756 );
2757 }
2758
2759 #[tokio::test]
2764 async fn test_reset_inbound_channel_creates_fresh_channel() {
2765 let client = WebSocketSignalingClient::new(make_config());
2766
2767 {
2769 let tx = client.inbound_tx.lock().await;
2770 let _ = tx.send(SignalingEnvelope::default());
2771 }
2772
2773 client.reset_inbound_channel().await;
2775
2776 let mut rx = client.inbound_rx.lock().await;
2778 let result = rx.try_recv();
2779 assert!(
2780 result.is_err(),
2781 "old messages should not be visible in the new channel after reset"
2782 );
2783 }
2784
2785 #[tokio::test]
2790 async fn test_envelope_id_monotonically_increasing() {
2791 let client = WebSocketSignalingClient::new(make_config());
2792
2793 let id1 = client.next_envelope_id().await;
2794 let id2 = client.next_envelope_id().await;
2795 let id3 = client.next_envelope_id().await;
2796
2797 assert_eq!(id1, "env-1");
2798 assert_eq!(id2, "env-2");
2799 assert_eq!(id3, "env-3");
2800 }
2801
2802 #[tokio::test]
2807 async fn test_send_envelope_fails_when_not_connected() {
2808 let client = WebSocketSignalingClient::new(make_config());
2809 let envelope = SignalingEnvelope::default();
2810
2811 let result = client.send_envelope(envelope).await;
2812 assert!(
2813 result.is_err(),
2814 "send_envelope should return error when not connected"
2815 );
2816 match result {
2817 Err(NetworkError::ConnectionError(msg)) => {
2818 assert!(
2819 msg.contains("not connected") || msg.contains("Not connected"),
2820 "error message should contain 'not connected', actual: {}",
2821 msg
2822 );
2823 }
2824 other => panic!("expected ConnectionError, got {:?}", other),
2825 }
2826 }
2827
2828 #[tokio::test]
2833 async fn test_fake_client_tracks_connect_calls() {
2834 let client = make_fake_client();
2835 assert_eq!(client.connect_calls.load(UsizeOrdering::SeqCst), 0);
2836
2837 client.connect().await.unwrap();
2838 client.connect().await.unwrap();
2839 client.connect().await.unwrap();
2840
2841 assert_eq!(
2842 client.connect_calls.load(UsizeOrdering::SeqCst),
2843 3,
2844 "FakeSignalingClient should accurately track connect call count"
2845 );
2846 }
2847}