1#[cfg(feature = "opentelemetry")]
6use super::trace;
7use crate::lifecycle::CredentialState;
8use crate::transport::{NetworkError, NetworkResult};
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::extract_trace_context;
11use actr_protocol::prost::Message as ProstMessage;
12use actr_protocol::{
13 AIdCredential, ActrId, ActrToSignaling, CredentialUpdateRequest, GetSigningKeyRequest,
14 PeerToSignaling, Ping, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
15 RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterRequest,
16 UnregisterResponse, actr_to_signaling, peer_to_signaling, signaling_envelope,
17 signaling_to_actr,
18};
19use async_trait::async_trait;
20use base64::Engine as _;
21use futures_util::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::{
27 Arc, OnceLock,
28 atomic::{AtomicBool, AtomicU64, Ordering},
29};
30use std::time::Duration;
31use tokio::net::TcpStream;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
34use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async_with_config};
35#[cfg(feature = "opentelemetry")]
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use url::Url;
38
39type WsSink = Arc<
41 tokio::sync::Mutex<
42 Option<
43 futures_util::stream::SplitSink<
44 WebSocketStream<MaybeTlsStream<TcpStream>>,
45 tokio_tungstenite::tungstenite::Message,
46 >,
47 >,
48 >,
49>;
50
51const RESPONSE_TIMEOUT_SECS: u64 = 5;
57const PING_INTERVAL_SECS: u64 = 5;
59const PONG_TIMEOUT_SECS: u64 = 10;
60const SIGNALING_SEND_TIMEOUT_SECS: u64 = 5;
61const CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS: u64 = 5;
62const DISCONNECT_LOCK_TIMEOUT_SECS: u64 = 5;
63const DISCONNECT_CLOSE_TIMEOUT_SECS: u64 = 1;
64
65#[derive(Debug, Clone)]
71pub struct SignalingConfig {
72 pub server_url: Url,
74
75 pub connection_timeout: u64,
77
78 pub heartbeat_interval: u64,
80
81 pub reconnect_config: ReconnectConfig,
83
84 pub auth_config: Option<AuthConfig>,
86
87 pub webrtc_role: Option<String>,
89}
90
91#[derive(Debug, Clone)]
93pub struct ReconnectConfig {
94 pub enabled: bool,
96
97 pub max_attempts: u32,
99
100 pub initial_delay: u64,
102
103 pub max_delay: u64,
105
106 pub backoff_multiplier: f64,
108}
109
110impl Default for ReconnectConfig {
111 fn default() -> Self {
112 Self {
113 enabled: true,
114 max_attempts: 10,
115 initial_delay: 1,
116 max_delay: 60,
117 backoff_multiplier: 2.0,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct AuthConfig {
125 pub auth_type: AuthType,
127
128 pub credentials: HashMap<String, String>,
130}
131
132#[derive(Debug, Clone)]
134pub enum AuthType {
135 None,
137 BearerToken,
139 ApiKey,
141 Jwt,
143}
144
145#[async_trait]
155pub trait SignalingClient: Send + Sync {
156 async fn connect(&self) -> NetworkResult<()>;
158
159 async fn connect_once(&self) -> NetworkResult<()> {
164 self.connect().await
165 }
166
167 fn schedule_auto_reconnect(&self) {}
170
171 fn schedule_auto_reconnect_reset_backoff(&self) {
176 self.schedule_auto_reconnect();
177 }
178
179 async fn disconnect(&self) -> NetworkResult<()>;
181
182 async fn probe_alive(&self, _timeout: Duration) -> NetworkResult<()> {
188 if self.is_connected() {
189 Ok(())
190 } else {
191 Err(NetworkError::ConnectionError(
192 "Signaling client is not connected".to_string(),
193 ))
194 }
195 }
196
197 async fn send_register_request(
200 &self,
201 request: RegisterRequest,
202 ) -> NetworkResult<RegisterResponse>;
203
204 async fn send_unregister_request(
209 &self,
210 actor_id: ActrId,
211 credential: AIdCredential,
212 reason: Option<String>,
213 ) -> NetworkResult<UnregisterResponse>;
214
215 async fn send_heartbeat(
218 &self,
219 actor_id: ActrId,
220 credential: AIdCredential,
221 availability: ServiceAvailabilityState,
222 power_reserve: f32,
223 mailbox_backlog: f32,
224 ) -> NetworkResult<Pong>;
225
226 async fn send_route_candidates_request(
228 &self,
229 actor_id: ActrId,
230 credential: AIdCredential,
231 request: RouteCandidatesRequest,
232 ) -> NetworkResult<RouteCandidatesResponse>;
233
234 async fn get_signing_key(
239 &self,
240 actor_id: ActrId,
241 credential: AIdCredential,
242 key_id: u32,
243 ) -> NetworkResult<(u32, Vec<u8>)>;
244
245 async fn send_credential_update_request(
250 &self,
251 actor_id: ActrId,
252 credential: AIdCredential,
253 ) -> NetworkResult<RegisterResponse>;
254
255 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
257
258 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
260
261 fn is_connected(&self) -> bool;
263
264 fn get_stats(&self) -> SignalingStats;
266 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent>;
268
269 async fn set_actor_id(&self, actor_id: ActrId);
271 async fn set_credential_state(&self, credential_state: CredentialState);
272
273 async fn clear_identity(&self);
280
281 fn set_hook_callback(&self, _cb: HookCallback) {}
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
289pub enum ConnectionState {
290 Disconnected,
291 Connected,
292}
293
294#[derive(Debug, Clone)]
300pub enum SignalingEvent {
301 ConnectStart { attempt: u32 },
303 Connected,
305 Disconnected { reason: DisconnectReason },
307}
308
309#[derive(Debug, Clone)]
311pub enum DisconnectReason {
312 StreamEnded,
314 PongTimeout,
316 PingSendFailed,
318 CredentialExpired,
320 Manual,
322 ConnectionFailed(String),
324}
325
326#[derive(Clone, Debug)]
335pub enum HookEvent {
336 SignalingConnectStart {
338 attempt: u32,
339 },
340 SignalingConnected,
341 SignalingDisconnected,
342 WebRtcConnectStart {
344 peer_id: ActrId,
345 },
346 WebRtcConnected {
347 peer_id: ActrId,
348 relayed: bool,
349 },
350 WebRtcDisconnected {
351 peer_id: ActrId,
352 },
353 DataStreamDeliveryUncertain {
354 stream_id: String,
355 session_id: u64,
356 reason: String,
357 },
358 WebSocketConnectStart {
360 peer_id: ActrId,
361 },
362 WebSocketConnected {
363 peer_id: ActrId,
364 },
365 WebSocketDisconnected {
366 peer_id: ActrId,
367 },
368 CredentialRenewed {
370 new_expiry: std::time::SystemTime,
371 },
372 CredentialExpiring {
373 new_expiry: std::time::SystemTime,
374 },
375 MailboxBackpressure {
377 queue_len: usize,
378 threshold: usize,
379 },
380}
381
382pub type HookCallback =
387 Arc<dyn Fn(HookEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
388
389#[derive(Debug, Clone, Copy)]
390enum ConnectIntent {
391 Explicit,
392 AutoReconnect { generation: u64 },
393}
394
395pub struct WebSocketSignalingClient {
397 config: SignalingConfig,
398 actor_id: tokio::sync::Mutex<Option<ActrId>>,
399 credential_state: tokio::sync::Mutex<Option<CredentialState>>,
400 ws_sink: WsSink,
402 ws_stream: tokio::sync::Mutex<
404 Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
405 >,
406 connected: Arc<AtomicBool>,
408 connecting: Arc<AtomicBool>,
410 stats: Arc<AtomicSignalingStats>,
412 envelope_counter: tokio::sync::Mutex<u64>,
414 pending_replies: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<SignalingEnvelope>>>>,
416 pending_pongs: Arc<tokio::sync::Mutex<HashMap<Vec<u8>, oneshot::Sender<()>>>>,
418 probe_counter: AtomicU64,
420 inbound_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<SignalingEnvelope>>>,
422 inbound_tx: tokio::sync::Mutex<mpsc::UnboundedSender<SignalingEnvelope>>,
423 receiver_task: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
425 ping_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
427 event_tx: broadcast::Sender<SignalingEvent>,
429 last_pong: Arc<AtomicU64>,
431 reconnector_started: Arc<AtomicBool>,
433 reconnect_notify: Arc<tokio::sync::Notify>,
435 auto_reconnect_suppressed: AtomicBool,
437 reconnect_generation: AtomicU64,
439 reconnect_backoff_reset_generation: AtomicU64,
441 hook_callback: OnceLock<HookCallback>,
443}
444
445impl WebSocketSignalingClient {
446 pub fn new(config: SignalingConfig) -> Self {
448 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
449 let (event_tx, _event_rx) = broadcast::channel(64);
450 Self {
451 config,
452 actor_id: tokio::sync::Mutex::new(None),
453 credential_state: tokio::sync::Mutex::new(None),
454 ws_sink: Arc::new(tokio::sync::Mutex::new(None)),
455 ws_stream: tokio::sync::Mutex::new(None),
456 connected: Arc::new(AtomicBool::new(false)),
457 connecting: Arc::new(AtomicBool::new(false)),
458 stats: Arc::new(AtomicSignalingStats::default()),
459 envelope_counter: tokio::sync::Mutex::new(0),
460 pending_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
461 pending_pongs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
462 probe_counter: AtomicU64::new(0),
463 inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
464 inbound_tx: tokio::sync::Mutex::new(inbound_tx),
465 receiver_task: Arc::new(tokio::sync::Mutex::new(None)),
466 ping_task: tokio::sync::Mutex::new(None),
467 event_tx,
468 last_pong: Arc::new(AtomicU64::new(0)),
469 reconnector_started: Arc::new(AtomicBool::new(false)),
470 reconnect_notify: Arc::new(tokio::sync::Notify::new()),
471 auto_reconnect_suppressed: AtomicBool::new(false),
472 reconnect_generation: AtomicU64::new(0),
473 reconnect_backoff_reset_generation: AtomicU64::new(0),
474 hook_callback: OnceLock::new(),
475 }
476 }
477
478 async fn invoke_hook(&self, event: HookEvent) {
485 if let Some(cb) = self.hook_callback.get() {
486 cb(event).await;
487 }
488 }
489
490 async fn publish_disconnected_transition(
491 was_connected: bool,
492 stats: &Arc<AtomicSignalingStats>,
493 event_tx: &broadcast::Sender<SignalingEvent>,
494 hook_callback: Option<HookCallback>,
495 reason: DisconnectReason,
496 reconnect_notify: Option<&Arc<tokio::sync::Notify>>,
497 ) -> bool {
498 if !was_connected {
499 return false;
500 }
501
502 stats.disconnections.fetch_add(1, Ordering::Relaxed);
503
504 if let Some(cb) = hook_callback {
505 cb(HookEvent::SignalingDisconnected).await;
506 }
507
508 let _ = event_tx.send(SignalingEvent::Disconnected { reason });
509
510 if let Some(notify) = reconnect_notify {
511 notify.notify_one();
512 }
513
514 true
515 }
516
517 pub fn start_reconnect_manager(self: &Arc<Self>) {
518 if !self.config.reconnect_config.enabled {
519 return;
520 }
521 if self
522 .reconnector_started
523 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
524 .is_err()
525 {
526 return; }
528
529 tracing::info!("🔄 Starting reconnect manager for signaling client");
530
531 let client = Arc::downgrade(self);
532 let notify = self.reconnect_notify.clone();
533
534 tokio::spawn(async move {
535 loop {
536 let reconnect_requested = tokio::select! {
537 _ = notify.notified() => true,
538 _ = tokio::time::sleep(Duration::from_secs(30)) => false,
539 };
540
541 if !reconnect_requested && client.upgrade().is_none() {
542 break;
543 }
544 if !reconnect_requested {
545 continue;
546 }
547
548 let Some(client) = client.upgrade() else {
549 break;
550 };
551
552 if !client.config.reconnect_config.enabled {
553 break;
554 }
555
556 if Arc::strong_count(&client) <= 1 {
557 break;
558 }
559
560 client.run_reconnect_cycle().await;
562 }
563 });
564 }
565
566 async fn run_reconnect_cycle(self: &Arc<Self>) {
568 use actr_framework::ExponentialBackoff;
569
570 let cfg = &self.config.reconnect_config;
571 let generation = self.reconnect_generation.load(Ordering::Acquire);
572
573 if Arc::strong_count(self) <= 1 {
574 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
575 return;
576 }
577
578 if self.auto_reconnect_cancelled(generation) {
579 tracing::debug!("Skipping signaling auto-reconnect cycle after explicit disconnect");
580 return;
581 }
582
583 if self.connected.load(Ordering::Acquire) {
584 tracing::debug!("🔎 Probing connected signaling before reconnect cycle");
585 match self
586 .probe_alive(Duration::from_secs(PONG_TIMEOUT_SECS))
587 .await
588 {
589 Ok(()) => {
590 tracing::debug!("Signaling probe succeeded, skipping reconnect cycle");
591 return;
592 }
593 Err(e) => {
594 tracing::warn!("Signaling probe failed before reconnect: {e}");
595 if let Err(disconnect_err) = self.disconnect_internal(false).await {
596 tracing::warn!(
597 "⚠️ Disconnect cleanup failed after failed probe (non-fatal): {disconnect_err}"
598 );
599 }
600 }
601 }
602 }
603
604 'cycle: loop {
605 let backoff_reset_generation = self
606 .reconnect_backoff_reset_generation
607 .load(Ordering::Acquire);
608 let backoff = ExponentialBackoff::builder()
609 .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
610 .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
611 .max_retries(cfg.max_attempts)
612 .with_jitter()
613 .build();
614
615 let mut attempt: u32 = 0;
616
617 for delay in backoff {
618 if Arc::strong_count(self) <= 1 {
619 tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
620 return;
621 }
622
623 if self.auto_reconnect_cancelled(generation) {
624 tracing::debug!(
625 "Stopping signaling auto-reconnect cycle after explicit disconnect"
626 );
627 return;
628 }
629
630 if self.connected.load(Ordering::Acquire) {
631 tracing::debug!("Already connected, aborting reconnect cycle");
632 return;
633 }
634
635 attempt += 1;
636 let _ = self.event_tx.send(SignalingEvent::ConnectStart { attempt });
637
638 match self.connect_once_for_auto_reconnect(generation).await {
639 Ok(()) => {
640 tracing::info!("✅ Signaling reconnect succeeded on attempt {attempt}");
641 return;
642 }
643 Err(e) => {
644 if self.auto_reconnect_cancelled(generation) {
645 tracing::debug!(
646 "Stopping signaling auto-reconnect cycle after explicit disconnect"
647 );
648 return;
649 }
650
651 tracing::warn!(
652 "❌ Reconnect attempt {attempt} failed: {e}, retrying in {delay:?}"
653 );
654 tokio::select! {
655 _ = tokio::time::sleep(delay) => {}
656 _ = self.reconnect_notify.notified() => {
657 tracing::debug!("Explicit reconnect request interrupted reconnect backoff");
658 }
659 }
660 if self
661 .reconnect_backoff_reset_generation
662 .load(Ordering::Acquire)
663 != backoff_reset_generation
664 {
665 tracing::debug!(
666 "Restarting signaling reconnect backoff after external recovery event"
667 );
668 continue 'cycle;
669 }
670 if Arc::strong_count(self) <= 1 {
671 tracing::debug!(
672 "Stopping signaling auto-reconnect cycle after owner drop"
673 );
674 return;
675 }
676 if self.auto_reconnect_cancelled(generation) {
677 tracing::debug!(
678 "Stopping signaling auto-reconnect cycle after explicit disconnect"
679 );
680 return;
681 }
682 }
683 }
684 }
685
686 tracing::error!("Reconnect failed after {attempt} attempts, entering cooldown");
688 let cooldown = std::time::Duration::from_secs(cfg.max_delay.max(1) * 2);
689 tokio::select! {
690 _ = tokio::time::sleep(cooldown) => {}
691 _ = self.reconnect_notify.notified() => {
692 tracing::debug!("Explicit reconnect request interrupted reconnect cooldown");
693 }
694 }
695 if self
696 .reconnect_backoff_reset_generation
697 .load(Ordering::Acquire)
698 != backoff_reset_generation
699 {
700 tracing::debug!(
701 "Restarting signaling reconnect backoff after external recovery event"
702 );
703 continue 'cycle;
704 }
705 if self.auto_reconnect_cancelled(generation) {
706 tracing::debug!(
707 "Signaling auto-reconnect cooldown ended after explicit disconnect suppression"
708 );
709 }
710 return;
712 }
713 }
714
715 #[cfg(feature = "test-utils")]
723 pub async fn connect_to(url: &str) -> NetworkResult<Arc<Self>> {
724 let config = SignalingConfig {
725 server_url: url.parse()?,
726 connection_timeout: 5,
727 heartbeat_interval: 30,
728 reconnect_config: ReconnectConfig::default(),
729 auth_config: None,
730 webrtc_role: None,
731 };
732
733 let client = Arc::new(Self::new(config));
734 client.start_reconnect_manager();
735 client.connect().await?;
736 Ok(client)
737 }
738
739 #[cfg(feature = "test-utils")]
745 pub async fn connect_to_with_identity(
746 url: &str,
747 actor_id: ActrId,
748 credential_state: CredentialState,
749 ) -> NetworkResult<Arc<Self>> {
750 let config = SignalingConfig {
751 server_url: url.parse()?,
752 connection_timeout: 5,
753 heartbeat_interval: 30,
754 reconnect_config: ReconnectConfig::default(),
755 auth_config: None,
756 webrtc_role: None,
757 };
758
759 let client = Arc::new(Self::new(config));
760 client.set_actor_id(actor_id).await;
761 client.set_credential_state(credential_state).await;
762 client.start_reconnect_manager();
763 client.connect().await?;
764 Ok(client)
765 }
766
767 async fn next_envelope_id(&self) -> String {
769 let mut counter = self.envelope_counter.lock().await;
770 *counter += 1;
771 format!("env-{}", *counter)
772 }
773
774 async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
776 SignalingEnvelope {
777 envelope_version: 1,
778 envelope_id: self.next_envelope_id().await,
779 reply_for: None,
780 timestamp: prost_types::Timestamp {
781 seconds: chrono::Utc::now().timestamp(),
782 nanos: 0,
783 },
784 traceparent: None,
785 tracestate: None,
786 flow: Some(flow),
787 }
788 }
789
790 async fn reset_inbound_channel(&self) {
792 self.drop_pending_replies("inbound channel reset").await;
793 self.drop_pending_pongs("inbound channel reset").await;
794
795 let (tx, rx) = mpsc::unbounded_channel();
796 *self.inbound_tx.lock().await = tx;
797 *self.inbound_rx.lock().await = rx;
798 }
799
800 async fn drop_pending_replies(&self, reason: &'static str) {
801 let dropped = {
802 let mut pending = self.pending_replies.lock().await;
803 let dropped = pending.len();
804 pending.clear();
805 dropped
806 };
807
808 if dropped > 0 {
809 tracing::debug!(reason, dropped, "Dropping pending signaling reply waiters");
810 }
811 }
812
813 async fn drop_pending_pongs(&self, reason: &'static str) {
814 let dropped = {
815 let mut pending = self.pending_pongs.lock().await;
816 let dropped = pending.len();
817 pending.clear();
818 dropped
819 };
820
821 if dropped > 0 {
822 tracing::debug!(reason, dropped, "Dropping pending signaling pong waiters");
823 }
824 }
825
826 async fn build_url_with_identity(&self) -> Url {
831 let mut url = self.config.server_url.clone();
832 let actor_id_opt = self.actor_id.lock().await.clone();
833 if let Some(actor_id) = actor_id_opt {
834 let actor_str = actr_protocol::ActrId::to_string_repr(&actor_id);
835 url.query_pairs_mut().append_pair("actor_id", &actor_str);
836 }
837
838 let cred_state_opt = self.credential_state.lock().await.clone();
840 if let Some(cred_state) = cred_state_opt {
841 let cred = cred_state.credential().await;
842 let claims_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.claims);
843 let sig_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.signature);
844 url.query_pairs_mut()
845 .append_pair("key_id", &cred.key_id.to_string())
846 .append_pair("claims", &claims_b64)
847 .append_pair("signature", &sig_b64);
848 }
849
850 if let Some(role) = &self.config.webrtc_role {
852 url.query_pairs_mut().append_pair("webrtc_role", role);
853 }
854
855 url
856 }
857
858 fn redact_signaling_url_for_log(url: &Url) -> String {
859 let mut redacted = url.clone();
860 let pairs: Vec<(String, String)> = redacted
861 .query_pairs()
862 .map(|(key, value)| {
863 let redacted_value = match key.to_ascii_lowercase().as_str() {
864 "claims" | "signature" | "token" | "authorization" | "bearer"
865 | "access_token" | "api_key" => "REDACTED".to_string(),
866 _ => value.into_owned(),
867 };
868 (key.into_owned(), redacted_value)
869 })
870 .collect();
871
872 redacted.set_query(None);
873 if !pairs.is_empty() {
874 let mut query = redacted.query_pairs_mut();
875 for (key, value) in pairs {
876 query.append_pair(&key, &value);
877 }
878 }
879
880 redacted.to_string()
881 }
882
883 fn auto_reconnect_cancelled(&self, generation: u64) -> bool {
884 self.auto_reconnect_suppressed.load(Ordering::Acquire)
885 || self.reconnect_generation.load(Ordering::Acquire) != generation
886 }
887
888 async fn establish_connection_once(&self) -> NetworkResult<()> {
892 self.establish_connection_once_with_intent(ConnectIntent::Explicit)
893 .await
894 }
895
896 async fn establish_connection_once_for_auto_reconnect(
897 &self,
898 generation: u64,
899 ) -> NetworkResult<()> {
900 self.establish_connection_once_with_intent(ConnectIntent::AutoReconnect { generation })
901 .await
902 }
903
904 async fn establish_connection_once_with_intent(
905 &self,
906 intent: ConnectIntent,
907 ) -> NetworkResult<()> {
908 if self.connected.load(Ordering::Acquire) {
910 tracing::debug!("Connection already established, skipping establish_connection_once()");
911 return Ok(());
912 }
913
914 let url = self.build_url_with_identity().await;
915 let timeout_secs = self.config.connection_timeout;
916 tracing::debug!(
917 "Establishing connection to URL: {}",
918 Self::redact_signaling_url_for_log(&url)
919 );
920 let config = WebSocketConfig::default().write_buffer_size(0);
922 let connect_result = if timeout_secs == 0 {
924 connect_async_with_config(url.as_str(), Some(config), false).await
925 } else {
926 let timeout_duration = std::time::Duration::from_secs(timeout_secs);
927 tokio::time::timeout(
928 timeout_duration,
929 connect_async_with_config(url.as_str(), Some(config), false),
930 )
931 .await
932 .map_err(|_| {
933 NetworkError::ConnectionError(format!(
934 "Signaling connect timeout after {}s",
935 timeout_secs
936 ))
937 })?
938 }?;
939
940 let (ws_stream, _) = connect_result;
941
942 let (sink, stream) = ws_stream.split();
944
945 if let ConnectIntent::AutoReconnect { generation } = intent
946 && self.auto_reconnect_cancelled(generation)
947 {
948 tracing::debug!(
949 generation,
950 "Discarding completed signaling auto-reconnect after explicit disconnect"
951 );
952 let mut sink = sink;
953 if let Err(e) = sink.close().await {
954 tracing::warn!(
955 "Signaling auto-reconnect socket close failed after cancellation: {}",
956 e
957 );
958 }
959 return Err(NetworkError::ConnectionError(
960 "Signaling auto-reconnect was cancelled by explicit disconnect".to_string(),
961 ));
962 }
963
964 *self.ws_sink.lock().await = Some(sink);
965 *self.ws_stream.lock().await = Some(stream);
966 self.connected.store(true, Ordering::Release);
967 self.auto_reconnect_suppressed
968 .store(false, Ordering::Release);
969 self.last_pong.store(current_unix_secs(), Ordering::Release);
970 self.invoke_hook(HookEvent::SignalingConnected).await;
972 let _ = self.event_tx.send(SignalingEvent::Connected);
973
974 self.stats.connections.fetch_add(1, Ordering::Relaxed);
975
976 Ok(())
977 }
978
979 async fn connect_with_retries(&self) -> NetworkResult<()> {
981 use actr_framework::ExponentialBackoff;
982
983 let cfg = &self.config.reconnect_config;
984
985 if !cfg.enabled {
987 return self.connect_once().await;
988 }
989
990 let mut last_err = None;
991
992 'cycle: loop {
993 let backoff_reset_generation = self
994 .reconnect_backoff_reset_generation
995 .load(Ordering::Acquire);
996 let backoff = ExponentialBackoff::builder()
997 .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
998 .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
999 .max_retries(cfg.max_attempts)
1000 .with_jitter()
1001 .build();
1002
1003 for (attempt, delay) in std::iter::once(std::time::Duration::ZERO)
1005 .chain(backoff)
1006 .enumerate()
1007 {
1008 let attempt = attempt as u32 + 1;
1009 self.invoke_hook(HookEvent::SignalingConnectStart { attempt })
1010 .await;
1011 if delay > std::time::Duration::ZERO {
1012 tracing::info!("Retry signaling connect after {delay:?} (attempt {attempt})");
1013 tokio::select! {
1014 _ = tokio::time::sleep(delay) => {}
1015 _ = self.reconnect_notify.notified() => {
1016 tracing::debug!("Explicit reconnect request interrupted signaling connect backoff");
1017 }
1018 }
1019 if self
1020 .reconnect_backoff_reset_generation
1021 .load(Ordering::Acquire)
1022 != backoff_reset_generation
1023 {
1024 tracing::debug!(
1025 "Restarting explicit signaling connect backoff after external recovery event"
1026 );
1027 continue 'cycle;
1028 }
1029 }
1030
1031 match self.connect_once().await {
1032 Ok(()) => return Ok(()),
1033 Err(e) => {
1034 tracing::warn!("Signaling connect attempt {attempt} failed: {e:?}");
1035 last_err = Some(e);
1036 if self
1037 .reconnect_backoff_reset_generation
1038 .load(Ordering::Acquire)
1039 != backoff_reset_generation
1040 {
1041 tracing::debug!(
1042 "Restarting explicit signaling connect backoff after external recovery event"
1043 );
1044 continue 'cycle;
1045 }
1046 }
1047 }
1048 }
1049
1050 let total = cfg.max_attempts + 1; tracing::error!("Signaling connect failed after {total} attempts, giving up");
1052 return Err(last_err.unwrap_or_else(|| {
1053 NetworkError::ConnectionError("All connection attempts failed".to_string())
1054 }));
1055 }
1056 }
1057
1058 #[cfg_attr(
1060 feature = "opentelemetry",
1061 tracing::instrument(skip_all, fields(envelope_id = %envelope.envelope_id))
1062 )]
1063 async fn send_envelope_and_wait_response(
1064 &self,
1065 envelope: SignalingEnvelope,
1066 ) -> NetworkResult<SignalingEnvelope> {
1067 let reply_for = envelope.envelope_id.clone();
1068
1069 let (tx, rx) = oneshot::channel();
1071 self.pending_replies
1072 .lock()
1073 .await
1074 .insert(reply_for.clone(), tx);
1075
1076 if let Err(e) = self.send_envelope(envelope).await {
1077 self.pending_replies.lock().await.remove(&reply_for);
1079 return Err(e);
1080 }
1081
1082 let result =
1083 tokio::time::timeout(std::time::Duration::from_secs(RESPONSE_TIMEOUT_SECS), rx).await;
1084 if result.is_err() {
1086 self.pending_replies.lock().await.remove(&reply_for);
1087 }
1088
1089 let response_envelope = result
1090 .map_err(|_| {
1091 NetworkError::ConnectionError(
1092 "Timed out waiting for signaling response".to_string(),
1093 )
1094 })?
1095 .map_err(|_| {
1096 NetworkError::ConnectionError(
1097 "Receiver dropped while waiting for signaling response".to_string(),
1098 )
1099 })?;
1100
1101 Ok(response_envelope)
1102 }
1103
1104 async fn start_receiver(&self) {
1106 let mut stream_guard = self.ws_stream.lock().await;
1107 if stream_guard.is_none() {
1108 return;
1109 }
1110
1111 let mut stream = stream_guard.take().expect("stream exists");
1112 let pending = self.pending_replies.clone();
1113 let inbound_tx = { self.inbound_tx.lock().await.clone() };
1114 let stats = self.stats.clone();
1115 let connected = self.connected.clone();
1116 let event_tx = self.event_tx.clone();
1117 let last_pong = self.last_pong.clone();
1118 let pending_pongs = self.pending_pongs.clone();
1119 let reconnect_notify = self.reconnect_notify.clone();
1120 let reconnect_enabled = self.config.reconnect_config.enabled;
1121 let hook_callback = self.hook_callback.get().cloned();
1122 let handle = tokio::spawn(async move {
1123 while let Some(msg) = stream.next().await {
1124 match msg {
1125 Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
1126 last_pong.store(current_unix_secs(), Ordering::Release);
1128 match SignalingEnvelope::decode(&data[..]) {
1129 Ok(envelope) => {
1130 #[cfg(feature = "opentelemetry")]
1131 let span = {
1132 let span = tracing::info_span!("signaling.receive_envelope", envelope_id = %envelope.envelope_id);
1133 span.set_parent(extract_trace_context(&envelope));
1134 span
1135 };
1136
1137 stats.messages_received.fetch_add(1, Ordering::Relaxed);
1138 tracing::debug!("Received message: {:?}", envelope);
1139 if let Some(reply_for) = envelope.reply_for.clone() {
1140 if let Some(sender) = pending.lock().await.remove(&reply_for) {
1141 #[cfg(feature = "opentelemetry")]
1142 let _ = span.enter();
1143 if let Err(e) = sender.send(envelope) {
1144 stats.errors.fetch_add(1, Ordering::Relaxed);
1145 tracing::warn!(
1146 "Failed to send reply envelope to waiter: {e:?}",
1147 );
1148 }
1149 continue;
1150 }
1151 }
1152 tracing::debug!(
1153 "Unmatched or push message -> forward to inbound channel"
1154 );
1155 if let Err(e) = inbound_tx.send(envelope) {
1157 stats.errors.fetch_add(1, Ordering::Relaxed);
1158 tracing::warn!(
1159 "Failed to send envelope to inbound channel: {e:?}"
1160 );
1161 }
1162 }
1163 Err(e) => {
1164 stats.errors.fetch_add(1, Ordering::Relaxed);
1165 tracing::warn!("Failed to decode SignalingEnvelope: {e}");
1166 }
1167 }
1168 }
1169 Ok(tokio_tungstenite::tungstenite::Message::Pong(payload)) => {
1170 tracing::debug!("Received pong");
1171 last_pong.store(current_unix_secs(), Ordering::Release);
1172 if let Some(sender) = pending_pongs.lock().await.remove(&payload.to_vec()) {
1173 let _ = sender.send(());
1174 }
1175 }
1176 Ok(tokio_tungstenite::tungstenite::Message::Ping(_)) => {
1177 tracing::debug!("Received ping");
1178 last_pong.store(current_unix_secs(), Ordering::Release);
1179 }
1180 Ok(other) => {
1181 tracing::warn!("Received non-binary frame, ignoring: {other:?}");
1182 }
1183 Err(e) => {
1184 stats.errors.fetch_add(1, Ordering::Relaxed);
1185 tracing::error!("Signaling receive error: {e}");
1186 break;
1187 }
1188 }
1189 }
1190
1191 tracing::warn!("Stream terminated");
1192 let was_connected = connected.swap(false, Ordering::AcqRel);
1196 Self::publish_disconnected_transition(
1197 was_connected,
1198 &stats,
1199 &event_tx,
1200 hook_callback,
1201 DisconnectReason::StreamEnded,
1202 reconnect_enabled.then_some(&reconnect_notify),
1203 )
1204 .await;
1205 pending_pongs.lock().await.clear();
1206 });
1207
1208 *self.receiver_task.lock().await = Some(handle);
1209 }
1210
1211 async fn start_ping_task(&self) {
1214 let mut existing = self.ping_task.lock().await;
1215 if let Some(handle) = existing.as_ref() {
1216 if handle.is_finished() {
1217 existing.take();
1218 } else {
1219 return;
1220 }
1221 }
1222
1223 let sink = self.ws_sink.clone();
1224 let connected = self.connected.clone();
1225 let stats = self.stats.clone();
1226 let event_tx = self.event_tx.clone();
1227 let last_pong = self.last_pong.clone();
1228 let receiver_task_clone = Arc::clone(&self.receiver_task);
1229 let reconnect_notify = self.reconnect_notify.clone();
1230 let reconnect_enabled = self.config.reconnect_config.enabled;
1231 let hook_callback = self.hook_callback.get().cloned();
1232
1233 let handle = tokio::spawn(async move {
1234 loop {
1235 tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
1236
1237 if !connected.load(Ordering::Acquire) {
1238 break;
1239 }
1240
1241 let mut disconnect_reason = None;
1243 {
1244 let mut sink_guard = sink.lock().await;
1245 if let Some(sink) = sink_guard.as_mut() {
1246 match tokio::time::timeout(
1247 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1248 sink.send(tokio_tungstenite::tungstenite::Message::Ping(
1249 Vec::new().into(),
1250 )),
1251 )
1252 .await
1253 {
1254 Ok(Ok(())) => {}
1255 Ok(Err(e)) => {
1256 tracing::warn!("Signaling ping send failed: {e}");
1257 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1258 }
1259 Err(_) => {
1260 tracing::warn!("Signaling ping send timed out");
1261 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1262 }
1263 }
1264 } else {
1265 tracing::warn!("Signaling not connected");
1266 disconnect_reason = Some(DisconnectReason::PingSendFailed);
1267 }
1268 }
1269
1270 if let Some(reason) = disconnect_reason {
1271 let was_connected = connected.swap(false, Ordering::AcqRel);
1272 Self::publish_disconnected_transition(
1273 was_connected,
1274 &stats,
1275 &event_tx,
1276 hook_callback.clone(),
1277 reason,
1278 reconnect_enabled.then_some(&reconnect_notify),
1279 )
1280 .await;
1281 break;
1282 }
1283
1284 let now = current_unix_secs();
1286 let last = last_pong.load(Ordering::Acquire);
1287 if now.saturating_sub(last) > PONG_TIMEOUT_SECS {
1288 tracing::warn!(
1289 "Signaling pong timeout (last seen {}s ago), marking disconnected",
1290 now.saturating_sub(last)
1291 );
1292 if let Some(handle) = receiver_task_clone.lock().await.take() {
1293 handle.abort();
1294 }
1295 let was_connected = connected.swap(false, Ordering::AcqRel);
1296 Self::publish_disconnected_transition(
1297 was_connected,
1298 &stats,
1299 &event_tx,
1300 hook_callback.clone(),
1301 DisconnectReason::PongTimeout,
1302 reconnect_enabled.then_some(&reconnect_notify),
1303 )
1304 .await;
1305 break;
1306 }
1307 }
1308 });
1309
1310 *existing = Some(handle);
1311 }
1312
1313 async fn disconnect_internal(&self, suppress_auto_reconnect: bool) -> NetworkResult<()> {
1314 if suppress_auto_reconnect {
1315 self.reconnect_generation.fetch_add(1, Ordering::AcqRel);
1316 self.auto_reconnect_suppressed
1317 .store(true, Ordering::Release);
1318 self.reconnect_notify.notify_waiters();
1319 }
1320
1321 self.drop_pending_replies("signaling disconnect").await;
1322 self.drop_pending_pongs("signaling disconnect").await;
1323 let was_connected = self.connected.swap(false, Ordering::AcqRel);
1324
1325 let ping_handle = match tokio::time::timeout(
1330 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1331 self.ping_task.lock(),
1332 )
1333 .await
1334 {
1335 Ok(mut task_guard) => task_guard.take(),
1336 Err(_) => {
1337 tracing::warn!("Timed out waiting for signaling ping task lock during disconnect");
1338 None
1339 }
1340 };
1341 if let Some(handle) = ping_handle {
1342 handle.abort();
1343 }
1344
1345 let receiver_handle = match tokio::time::timeout(
1346 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1347 self.receiver_task.lock(),
1348 )
1349 .await
1350 {
1351 Ok(mut task_guard) => task_guard.take(),
1352 Err(_) => {
1353 tracing::warn!(
1354 "Timed out waiting for signaling receiver task lock during disconnect"
1355 );
1356 None
1357 }
1358 };
1359 if let Some(handle) = receiver_handle {
1360 handle.abort();
1361 }
1362
1363 let sink = match tokio::time::timeout(
1367 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1368 self.ws_sink.lock(),
1369 )
1370 .await
1371 {
1372 Ok(mut sink_guard) => sink_guard.take(),
1373 Err(_) => {
1374 tracing::warn!(
1375 "Timed out waiting for signaling WebSocket sink lock during disconnect"
1376 );
1377 None
1378 }
1379 };
1380
1381 if let Some(mut sink) = sink {
1382 match tokio::time::timeout(
1383 std::time::Duration::from_secs(DISCONNECT_CLOSE_TIMEOUT_SECS),
1384 sink.close(),
1385 )
1386 .await
1387 {
1388 Ok(Ok(())) => {}
1389 Ok(Err(e)) => {
1390 tracing::warn!("Signaling WebSocket close failed during disconnect: {}", e);
1391 }
1392 Err(_) => {
1393 tracing::warn!(
1394 "Signaling WebSocket close timed out during disconnect; continuing cleanup"
1395 );
1396 }
1397 }
1398 }
1399
1400 match tokio::time::timeout(
1401 std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1402 self.ws_stream.lock(),
1403 )
1404 .await
1405 {
1406 Ok(mut stream_guard) => {
1407 stream_guard.take();
1408 }
1409 Err(_) => {
1410 tracing::warn!(
1411 "Timed out waiting for signaling WebSocket stream lock during disconnect"
1412 );
1413 }
1414 }
1415
1416 self.reset_inbound_channel().await;
1417
1418 Self::publish_disconnected_transition(
1420 was_connected,
1421 &self.stats,
1422 &self.event_tx,
1423 self.hook_callback.get().cloned(),
1424 DisconnectReason::Manual,
1425 None,
1426 )
1427 .await;
1428
1429 Ok(())
1430 }
1431
1432 async fn connect_once_for_auto_reconnect(&self, generation: u64) -> NetworkResult<()> {
1433 if self.auto_reconnect_cancelled(generation) {
1434 return Err(NetworkError::ConnectionError(
1435 "Signaling auto-reconnect was cancelled".to_string(),
1436 ));
1437 }
1438
1439 if self.connected.load(Ordering::Acquire) {
1440 tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1441 return Ok(());
1442 }
1443
1444 match self
1445 .connecting
1446 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1447 {
1448 Ok(_) => {}
1449 Err(_) => {
1450 if self.connected.load(Ordering::Acquire) {
1451 tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1452 return Ok(());
1453 }
1454
1455 tracing::debug!(
1456 "Another connection attempt in progress, waiting for state change..."
1457 );
1458 let result = self.wait_for_connection_result().await;
1459 if self.auto_reconnect_cancelled(generation) {
1460 return Err(NetworkError::ConnectionError(
1461 "Signaling auto-reconnect was cancelled".to_string(),
1462 ));
1463 }
1464 return result;
1465 }
1466 }
1467
1468 if self.auto_reconnect_cancelled(generation) {
1469 self.connecting.store(false, Ordering::Release);
1470 return Err(NetworkError::ConnectionError(
1471 "Signaling auto-reconnect was cancelled".to_string(),
1472 ));
1473 }
1474
1475 if self.connected.load(Ordering::Acquire) {
1476 tracing::debug!("Connection completed by another task while acquiring lock");
1477 self.connecting.store(false, Ordering::Release);
1478 return Ok(());
1479 }
1480
1481 tracing::debug!(
1482 generation,
1483 "Acquired connection lock, establishing one auto-reconnect signaling attempt..."
1484 );
1485
1486 let result = self
1487 .establish_connection_once_for_auto_reconnect(generation)
1488 .await;
1489 self.connecting.store(false, Ordering::Release);
1490
1491 match result {
1492 Ok(()) => {
1493 if self.auto_reconnect_cancelled(generation) {
1494 self.disconnect_internal(false).await?;
1495 return Err(NetworkError::ConnectionError(
1496 "Signaling auto-reconnect was cancelled".to_string(),
1497 ));
1498 }
1499 self.start_receiver().await;
1500 self.start_ping_task().await;
1501 Ok(())
1502 }
1503 Err(e) => {
1504 if !self.auto_reconnect_cancelled(generation) {
1505 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1506 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1507 });
1508 tracing::error!("Connection attempt failed: {e}");
1509 }
1510 Err(e)
1511 }
1512 }
1513 }
1514
1515 async fn wait_for_connection_result(&self) -> NetworkResult<()> {
1519 let mut event_rx = self.event_tx.subscribe();
1520 let deadline = tokio::time::Instant::now()
1521 + std::time::Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS);
1522
1523 loop {
1524 tokio::select! {
1525 _ = tokio::time::sleep_until(deadline) => {
1526 if self.connected.load(Ordering::Acquire) {
1528 tracing::debug!("Connection succeeded just before timeout");
1529 return Ok(());
1530 }
1531 return Err(NetworkError::ConnectionError(
1532 "Timeout waiting for concurrent connection attempt".to_string(),
1533 ));
1534 }
1535 result = event_rx.recv() => {
1536 match result {
1537 Ok(SignalingEvent::Connected) => {
1538 tracing::debug!("Connection established by another task");
1539 return Ok(());
1540 }
1541 Ok(SignalingEvent::Disconnected { reason }) => {
1542 return Err(NetworkError::ConnectionError(format!(
1543 "Concurrent signaling connection failed: {reason:?}"
1544 )));
1545 }
1546 Ok(_) => continue, Err(broadcast::error::RecvError::Lagged(n)) => {
1548 tracing::warn!("Event receiver lagged by {n} events");
1549 if self.connected.load(Ordering::Acquire) {
1551 return Ok(());
1552 }
1553 continue;
1554 }
1555 Err(broadcast::error::RecvError::Closed) => {
1556 return Err(NetworkError::ConnectionError(
1557 "Event channel closed while waiting for connection".to_string(),
1558 ));
1559 }
1560 }
1561 }
1562 }
1563 }
1564 }
1565}
1566
1567#[async_trait]
1568impl SignalingClient for WebSocketSignalingClient {
1569 async fn connect(&self) -> NetworkResult<()> {
1570 if self.connected.load(Ordering::Acquire) {
1571 tracing::debug!("Already connected, skipping connect()");
1572 return Ok(());
1573 }
1574
1575 self.connect_with_retries().await
1576 }
1577
1578 async fn connect_once(&self) -> NetworkResult<()> {
1579 loop {
1580 if self.connected.load(Ordering::Acquire) {
1581 tracing::debug!("Already connected, skipping connect_once()");
1582 return Ok(());
1583 }
1584
1585 match self
1586 .connecting
1587 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1588 {
1589 Ok(_) => break,
1590 Err(_) => {
1591 if self.connected.load(Ordering::Acquire) {
1592 tracing::debug!("Already connected, skipping connect_once()");
1593 return Ok(());
1594 }
1595
1596 tracing::debug!(
1597 "Another connection attempt in progress, waiting for state change..."
1598 );
1599 match self.wait_for_connection_result().await {
1600 Ok(()) => return Ok(()),
1601 Err(e)
1602 if !self.connected.load(Ordering::Acquire)
1603 && !self.connecting.load(Ordering::Acquire) =>
1604 {
1605 tracing::debug!(
1606 "Concurrent signaling connection failed; explicit connect_once will retry immediately: {e}"
1607 );
1608 continue;
1609 }
1610 Err(e) => return Err(e),
1611 }
1612 }
1613 }
1614 }
1615
1616 if self.connected.load(Ordering::Acquire) {
1617 tracing::debug!("Connection completed by another task while acquiring lock");
1618 self.connecting.store(false, Ordering::Release);
1619 return Ok(());
1620 }
1621
1622 tracing::debug!(
1623 "Acquired connection lock, establishing one signaling connection attempt..."
1624 );
1625
1626 let result = self.establish_connection_once().await;
1627 self.connecting.store(false, Ordering::Release);
1628
1629 match result {
1630 Ok(()) => {
1631 self.start_receiver().await;
1632 self.start_ping_task().await;
1633 Ok(())
1634 }
1635 Err(e) => {
1636 let _ = self.event_tx.send(SignalingEvent::Disconnected {
1637 reason: DisconnectReason::ConnectionFailed(e.to_string()),
1638 });
1639 tracing::error!("Connection attempt failed: {e}");
1640 Err(e)
1641 }
1642 }
1643 }
1644
1645 fn schedule_auto_reconnect(&self) {
1646 if !self.config.reconnect_config.enabled {
1647 tracing::debug!("Skipping signaling auto-reconnect schedule; config disabled");
1648 return;
1649 }
1650
1651 self.auto_reconnect_suppressed
1652 .store(false, Ordering::Release);
1653 self.reconnect_notify.notify_one();
1654 }
1655
1656 fn schedule_auto_reconnect_reset_backoff(&self) {
1657 if !self.config.reconnect_config.enabled {
1658 tracing::debug!("Skipping signaling auto-reconnect schedule; config disabled");
1659 return;
1660 }
1661
1662 self.auto_reconnect_suppressed
1663 .store(false, Ordering::Release);
1664 self.reconnect_backoff_reset_generation
1665 .fetch_add(1, Ordering::AcqRel);
1666 self.reconnect_notify.notify_one();
1667 }
1668
1669 async fn disconnect(&self) -> NetworkResult<()> {
1670 self.disconnect_internal(true).await
1671 }
1672
1673 async fn probe_alive(&self, timeout: Duration) -> NetworkResult<()> {
1674 if !self.connected.load(Ordering::Acquire) {
1675 return Err(NetworkError::ConnectionError(
1676 "Signaling client is not connected".to_string(),
1677 ));
1678 }
1679
1680 let probe_id = self.probe_counter.fetch_add(1, Ordering::Relaxed) + 1;
1681 let payload =
1682 format!("actr-signaling-probe-{probe_id}-{}", current_unix_secs()).into_bytes();
1683 let (tx, rx) = oneshot::channel();
1684 self.pending_pongs.lock().await.insert(payload.clone(), tx);
1685
1686 let send_timeout = std::cmp::min(
1687 timeout,
1688 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1689 );
1690 let ping_payload = payload.clone();
1691 let send_result = tokio::time::timeout(send_timeout, async {
1692 let mut sink_guard = self.ws_sink.lock().await;
1693 match sink_guard.as_mut() {
1694 Some(sink) => sink
1695 .send(tokio_tungstenite::tungstenite::Message::Ping(
1696 ping_payload.into(),
1697 ))
1698 .await
1699 .map_err(|e| {
1700 NetworkError::ConnectionError(format!("Signaling probe ping failed: {e}"))
1701 }),
1702 None => Err(NetworkError::ConnectionError(
1703 "Signaling probe failed: WebSocket sink is not available".to_string(),
1704 )),
1705 }
1706 })
1707 .await
1708 .unwrap_or_else(|_| {
1709 Err(NetworkError::TimeoutError(format!(
1710 "Timed out sending signaling probe ping after {}ms",
1711 send_timeout.as_millis()
1712 )))
1713 });
1714
1715 if let Err(e) = send_result {
1716 self.pending_pongs.lock().await.remove(&payload);
1717 let was_connected = self.connected.swap(false, Ordering::AcqRel);
1718 Self::publish_disconnected_transition(
1719 was_connected,
1720 &self.stats,
1721 &self.event_tx,
1722 self.hook_callback.get().cloned(),
1723 DisconnectReason::PingSendFailed,
1724 None,
1725 )
1726 .await;
1727 return Err(e);
1728 }
1729
1730 match tokio::time::timeout(timeout, rx).await {
1731 Ok(Ok(())) => {
1732 self.last_pong.store(current_unix_secs(), Ordering::Release);
1733 Ok(())
1734 }
1735 Ok(Err(_)) => {
1736 self.pending_pongs.lock().await.remove(&payload);
1737 Err(NetworkError::ConnectionError(
1738 "Signaling probe pong waiter dropped".to_string(),
1739 ))
1740 }
1741 Err(_) => {
1742 self.pending_pongs.lock().await.remove(&payload);
1743 Err(NetworkError::TimeoutError(format!(
1744 "Timed out waiting for signaling probe pong after {}ms",
1745 timeout.as_millis()
1746 )))
1747 }
1748 }
1749 }
1750
1751 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1752 async fn send_register_request(
1753 &self,
1754 request: RegisterRequest,
1755 ) -> NetworkResult<RegisterResponse> {
1756 let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
1758 payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
1759 });
1760
1761 let envelope = self.create_envelope(flow).await;
1762 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1763
1764 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1765 {
1766 if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
1767 server_to_actr.payload
1768 {
1769 return Ok(response);
1770 }
1771 }
1772
1773 Err(NetworkError::ConnectionError(
1774 "Invalid registration response".to_string(),
1775 ))
1776 }
1777
1778 #[cfg_attr(
1779 feature = "opentelemetry",
1780 tracing::instrument(skip_all, fields(actor_id = %actor_id))
1781 )]
1782 async fn send_unregister_request(
1783 &self,
1784 actor_id: ActrId,
1785 credential: AIdCredential,
1786 reason: Option<String>,
1787 ) -> NetworkResult<UnregisterResponse> {
1788 let request = UnregisterRequest {
1790 actr_id: actor_id.clone(),
1791 reason,
1792 };
1793
1794 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1796 source: actor_id,
1797 credential,
1798 payload: Some(actr_to_signaling::Payload::UnregisterRequest(request)),
1799 });
1800
1801 let envelope = self.create_envelope(flow).await;
1803 self.send_envelope(envelope).await?;
1804
1805 Ok(UnregisterResponse {
1810 result: Some(actr_protocol::unregister_response::Result::Success(
1811 actr_protocol::unregister_response::UnregisterOk {},
1812 )),
1813 })
1814 }
1815
1816 #[cfg_attr(
1817 feature = "opentelemetry",
1818 tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1819 )]
1820 async fn send_heartbeat(
1821 &self,
1822 actor_id: ActrId,
1823 credential: AIdCredential,
1824 availability: ServiceAvailabilityState,
1825 power_reserve: f32,
1826 mailbox_backlog: f32,
1827 ) -> NetworkResult<Pong> {
1828 let ping = Ping {
1829 availability: availability as i32,
1830 power_reserve,
1831 mailbox_backlog,
1832 sticky_client_ids: vec![], };
1834
1835 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1836 source: actor_id,
1837 credential,
1838 payload: Some(actr_to_signaling::Payload::Ping(ping)),
1839 });
1840
1841 let envelope = self.create_envelope(flow).await;
1842 let reply_for = envelope.envelope_id.clone();
1843
1844 let (tx, rx) = oneshot::channel();
1846 self.pending_replies
1847 .lock()
1848 .await
1849 .insert(reply_for.clone(), tx);
1850
1851 if let Err(e) = self.send_envelope(envelope).await {
1852 self.pending_replies.lock().await.remove(&reply_for);
1854 return Err(e);
1855 }
1856
1857 let response_envelope = rx.await.map_err(|_| {
1859 NetworkError::ConnectionError(
1860 "Receiver dropped while waiting for heartbeat response".to_string(),
1861 )
1862 })?;
1863
1864 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1866 {
1867 match server_to_actr.payload {
1868 Some(signaling_to_actr::Payload::Pong(pong)) => {
1869 return Ok(pong);
1870 }
1871 Some(signaling_to_actr::Payload::Error(err)) => {
1872 if err.code == 401 {
1874 return Err(NetworkError::CredentialExpired(err.message));
1875 }
1876 return Err(NetworkError::AuthenticationError(format!(
1877 "{} ({})",
1878 err.message, err.code
1879 )));
1880 }
1881 _ => {}
1882 }
1883 }
1884
1885 Err(NetworkError::ConnectionError(
1886 "Received response but not a Pong message".to_string(),
1887 ))
1888 }
1889
1890 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1891 async fn send_route_candidates_request(
1892 &self,
1893 actor_id: ActrId,
1894 credential: AIdCredential,
1895 request: RouteCandidatesRequest,
1896 ) -> NetworkResult<RouteCandidatesResponse> {
1897 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1898 source: actor_id,
1899 credential,
1900 payload: Some(actr_to_signaling::Payload::RouteCandidatesRequest(request)),
1901 });
1902
1903 let envelope = self.create_envelope(flow).await;
1904 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1905
1906 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1907 {
1908 match server_to_actr.payload {
1909 Some(signaling_to_actr::Payload::RouteCandidatesResponse(response)) => {
1910 return Ok(response);
1911 }
1912 Some(signaling_to_actr::Payload::Error(err)) => {
1913 return Err(NetworkError::ServiceDiscoveryError(format!(
1914 "{} ({})",
1915 err.message, err.code
1916 )));
1917 }
1918 _ => {}
1919 }
1920 }
1921
1922 Err(NetworkError::ConnectionError(
1923 "Invalid route candidates response".to_string(),
1924 ))
1925 }
1926
1927 async fn get_signing_key(
1928 &self,
1929 actor_id: ActrId,
1930 credential: AIdCredential,
1931 key_id: u32,
1932 ) -> NetworkResult<(u32, Vec<u8>)> {
1933 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1934 source: actor_id,
1935 credential,
1936 payload: Some(actr_to_signaling::Payload::GetSigningKeyRequest(
1937 GetSigningKeyRequest { key_id },
1938 )),
1939 });
1940
1941 let envelope = self.create_envelope(flow).await;
1942 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1943
1944 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1945 {
1946 match server_to_actr.payload {
1947 Some(signaling_to_actr::Payload::GetSigningKeyResponse(resp)) => {
1948 return Ok((resp.key_id, resp.pubkey.to_vec()));
1949 }
1950 Some(signaling_to_actr::Payload::Error(err)) => {
1951 return Err(NetworkError::ConnectionError(format!(
1952 "get_signing_key failed: {} ({})",
1953 err.message, err.code
1954 )));
1955 }
1956 _ => {}
1957 }
1958 }
1959
1960 Err(NetworkError::ConnectionError(
1961 "get_signing_key: invalid response".to_string(),
1962 ))
1963 }
1964
1965 #[cfg_attr(
1966 feature = "opentelemetry",
1967 tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1968 )]
1969 async fn send_credential_update_request(
1970 &self,
1971 actor_id: ActrId,
1972 credential: AIdCredential,
1973 ) -> NetworkResult<RegisterResponse> {
1974 let request = CredentialUpdateRequest {
1975 actr_id: actor_id.clone(),
1976 };
1977
1978 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1979 source: actor_id,
1980 credential,
1981 payload: Some(actr_to_signaling::Payload::CredentialUpdateRequest(request)),
1982 });
1983
1984 let envelope = self.create_envelope(flow).await;
1985 let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1986
1987 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1988 {
1989 match server_to_actr.payload {
1990 Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
1991 return Ok(response);
1992 }
1993 Some(signaling_to_actr::Payload::Error(err)) => {
1994 return Err(NetworkError::ConnectionError(format!(
1995 "Credential update failed: {} ({})",
1996 err.message, err.code
1997 )));
1998 }
1999 _ => {}
2000 }
2001 }
2002
2003 Err(NetworkError::ConnectionError(
2004 "Invalid credential update response".to_string(),
2005 ))
2006 }
2007
2008 #[cfg_attr(
2009 feature = "opentelemetry",
2010 tracing::instrument(level = "debug", skip_all, fields(envelope_id = %envelope.envelope_id))
2011 )]
2012 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
2013 #[cfg(feature = "opentelemetry")]
2014 let envelope = {
2015 let mut envelope = envelope;
2016 trace::inject_span_context(&tracing::Span::current(), &mut envelope);
2017 envelope
2018 };
2019
2020 if !self.is_connected() {
2023 return Err(NetworkError::ConnectionError(
2024 "Cannot send: WebSocket not connected".to_string(),
2025 ));
2026 }
2027
2028 let mut sink_guard = self.ws_sink.lock().await;
2029
2030 if let Some(sink) = sink_guard.as_mut() {
2031 let mut buf = Vec::new();
2033 envelope.encode(&mut buf)?;
2034 let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
2035 match tokio::time::timeout(
2036 std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
2037 sink.send(msg),
2038 )
2039 .await
2040 {
2041 Ok(Ok(())) => {}
2042 Ok(Err(e)) => return Err(e.into()),
2043 Err(_) => {
2044 self.connected.store(false, Ordering::Release);
2045 return Err(NetworkError::ConnectionError(
2046 "Signaling WebSocket send timed out".to_string(),
2047 ));
2048 }
2049 }
2050
2051 self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
2052 tracing::debug!("Stats: {:?}", self.stats.snapshot());
2053 Ok(())
2054 } else {
2055 Err(NetworkError::ConnectionError("Not connected".to_string()))
2056 }
2057 }
2058
2059 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2060 let mut rx = self.inbound_rx.lock().await;
2061 match rx.recv().await {
2062 Some(envelope) => Ok(Some(envelope)),
2063 None => {
2064 tracing::error!("Inbound channel closed");
2065 Err(NetworkError::ConnectionError(
2066 "Inbound channel closed".to_string(),
2067 ))
2068 }
2069 }
2070 }
2071
2072 fn is_connected(&self) -> bool {
2073 self.connected.load(Ordering::Acquire)
2074 }
2075
2076 fn get_stats(&self) -> SignalingStats {
2077 self.stats.snapshot()
2078 }
2079
2080 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2081 self.event_tx.subscribe()
2082 }
2083
2084 async fn set_actor_id(&self, actor_id: ActrId) {
2085 *self.actor_id.lock().await = Some(actor_id);
2086 }
2087
2088 async fn set_credential_state(&self, credential_state: CredentialState) {
2089 *self.credential_state.lock().await = Some(credential_state);
2090 }
2091
2092 async fn clear_identity(&self) {
2093 *self.actor_id.lock().await = None;
2094 *self.credential_state.lock().await = None;
2095 }
2096
2097 fn set_hook_callback(&self, cb: HookCallback) {
2098 let _ = self.hook_callback.set(cb);
2099 }
2100}
2101
2102#[derive(Debug)]
2104pub(crate) struct AtomicSignalingStats {
2105 pub connections: AtomicU64,
2107
2108 pub disconnections: AtomicU64,
2110
2111 pub messages_sent: AtomicU64,
2113
2114 pub messages_received: AtomicU64,
2116
2117 pub heartbeats_sent: AtomicU64,
2120
2121 pub heartbeats_received: AtomicU64,
2124
2125 pub errors: AtomicU64,
2127}
2128
2129impl Default for AtomicSignalingStats {
2130 fn default() -> Self {
2131 Self {
2132 connections: AtomicU64::new(0),
2133 disconnections: AtomicU64::new(0),
2134 messages_sent: AtomicU64::new(0),
2135 messages_received: AtomicU64::new(0),
2136 heartbeats_sent: AtomicU64::new(0),
2137 heartbeats_received: AtomicU64::new(0),
2138 errors: AtomicU64::new(0),
2139 }
2140 }
2141}
2142
2143#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
2145pub struct SignalingStats {
2146 pub connections: u64,
2148
2149 pub disconnections: u64,
2151
2152 pub messages_sent: u64,
2154
2155 pub messages_received: u64,
2157
2158 pub heartbeats_sent: u64,
2160
2161 pub heartbeats_received: u64,
2163
2164 pub errors: u64,
2166}
2167
2168impl AtomicSignalingStats {
2169 pub fn snapshot(&self) -> SignalingStats {
2171 SignalingStats {
2172 connections: self.connections.load(Ordering::Relaxed),
2173 disconnections: self.disconnections.load(Ordering::Relaxed),
2174 messages_sent: self.messages_sent.load(Ordering::Relaxed),
2175 messages_received: self.messages_received.load(Ordering::Relaxed),
2176 heartbeats_sent: self.heartbeats_sent.load(Ordering::Relaxed),
2177 heartbeats_received: self.heartbeats_received.load(Ordering::Relaxed),
2178 errors: self.errors.load(Ordering::Relaxed),
2179 }
2180 }
2181}
2182
2183fn current_unix_secs() -> u64 {
2184 use std::time::{SystemTime, UNIX_EPOCH};
2185 SystemTime::now()
2186 .duration_since(UNIX_EPOCH)
2187 .unwrap_or_default()
2188 .as_secs()
2189}
2190
2191#[cfg(test)]
2192mod tests {
2193 use super::*;
2194 use std::future::Future;
2195 use std::pin::Pin;
2196 use std::sync::atomic::{AtomicUsize, Ordering as UsizeOrdering};
2197
2198 struct FakeSignalingClient {
2200 event_tx: broadcast::Sender<SignalingEvent>,
2201 connected: AtomicBool,
2202 connect_calls: Arc<AtomicUsize>,
2203 actor_id: tokio::sync::Mutex<Option<ActrId>>,
2204 credential_state: tokio::sync::Mutex<Option<CredentialState>>,
2205 }
2206
2207 #[async_trait]
2208 impl SignalingClient for FakeSignalingClient {
2209 async fn connect(&self) -> NetworkResult<()> {
2210 self.connect_calls.fetch_add(1, UsizeOrdering::SeqCst);
2211 Ok(())
2212 }
2213
2214 async fn disconnect(&self) -> NetworkResult<()> {
2215 Ok(())
2216 }
2217
2218 async fn send_register_request(
2219 &self,
2220 _request: RegisterRequest,
2221 ) -> NetworkResult<RegisterResponse> {
2222 unimplemented!("not needed in tests");
2223 }
2224
2225 async fn send_unregister_request(
2226 &self,
2227 _actor_id: ActrId,
2228 _credential: AIdCredential,
2229 _reason: Option<String>,
2230 ) -> NetworkResult<UnregisterResponse> {
2231 unimplemented!("not needed in tests");
2232 }
2233
2234 async fn send_heartbeat(
2235 &self,
2236 _actor_id: ActrId,
2237 _credential: AIdCredential,
2238 _availability: ServiceAvailabilityState,
2239 _power_reserve: f32,
2240 _mailbox_backlog: f32,
2241 ) -> NetworkResult<Pong> {
2242 unimplemented!("not needed in tests");
2243 }
2244
2245 async fn send_route_candidates_request(
2246 &self,
2247 _actor_id: ActrId,
2248 _credential: AIdCredential,
2249 _request: RouteCandidatesRequest,
2250 ) -> NetworkResult<RouteCandidatesResponse> {
2251 unimplemented!("not needed in tests");
2252 }
2253
2254 async fn get_signing_key(
2255 &self,
2256 _actor_id: ActrId,
2257 _credential: AIdCredential,
2258 _key_id: u32,
2259 ) -> NetworkResult<(u32, Vec<u8>)> {
2260 unimplemented!("not needed in tests");
2261 }
2262
2263 async fn send_credential_update_request(
2264 &self,
2265 _actor_id: ActrId,
2266 _credential: AIdCredential,
2267 ) -> NetworkResult<RegisterResponse> {
2268 unimplemented!("not needed in tests");
2269 }
2270
2271 async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
2272 unimplemented!("not needed in tests");
2273 }
2274
2275 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2276 unimplemented!("not needed in tests");
2277 }
2278
2279 fn is_connected(&self) -> bool {
2280 self.connected.load(Ordering::SeqCst)
2281 }
2282
2283 fn get_stats(&self) -> SignalingStats {
2284 SignalingStats::default()
2285 }
2286
2287 fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2288 self.event_tx.subscribe()
2289 }
2290
2291 async fn set_actor_id(&self, actor_id: ActrId) {
2292 *self.actor_id.lock().await = Some(actor_id);
2293 }
2294
2295 async fn set_credential_state(&self, credential_state: CredentialState) {
2296 *self.credential_state.lock().await = Some(credential_state);
2297 }
2298
2299 async fn clear_identity(&self) {
2300 *self.actor_id.lock().await = None;
2301 *self.credential_state.lock().await = None;
2302 }
2303 }
2304
2305 fn make_fake_client() -> Arc<FakeSignalingClient> {
2306 let (event_tx, _erx) = broadcast::channel(64);
2307 Arc::new(FakeSignalingClient {
2308 event_tx,
2309 connected: AtomicBool::new(false),
2310 connect_calls: Arc::new(AtomicUsize::new(0)),
2311 actor_id: tokio::sync::Mutex::new(None),
2312 credential_state: tokio::sync::Mutex::new(None),
2313 })
2314 }
2315
2316 fn make_config() -> SignalingConfig {
2318 SignalingConfig {
2319 server_url: Url::parse("ws://127.0.0.1:1/signaling/ws").unwrap(),
2320 connection_timeout: 2,
2321 heartbeat_interval: 30,
2322 reconnect_config: ReconnectConfig::default(),
2323 auth_config: None,
2324 webrtc_role: None,
2325 }
2326 }
2327
2328 fn make_ws_client(config: SignalingConfig) -> Arc<WebSocketSignalingClient> {
2330 Arc::new(WebSocketSignalingClient::new(config))
2331 }
2332
2333 #[tokio::test]
2334 async fn probe_alive_times_out_when_sink_lock_is_stalled() {
2335 let client = make_ws_client(make_config());
2336 client.connected.store(true, Ordering::Release);
2337
2338 let _sink_guard = client.ws_sink.lock().await;
2339
2340 let result = tokio::time::timeout(
2341 Duration::from_millis(250),
2342 client.probe_alive(Duration::from_millis(20)),
2343 )
2344 .await
2345 .expect("probe should be bounded by its own timeout");
2346
2347 let err = result.expect_err("stalled sink lock should fail the probe");
2348 assert!(
2349 err.to_string()
2350 .contains("Timed out sending signaling probe ping"),
2351 "unexpected error: {err}"
2352 );
2353 assert!(
2354 !client.is_connected(),
2355 "stalled probe send should mark signaling disconnected"
2356 );
2357 assert_eq!(client.get_stats().disconnections, 1);
2358 assert!(
2359 client.pending_pongs.lock().await.is_empty(),
2360 "failed probe send should remove its pending pong waiter"
2361 );
2362 }
2363
2364 #[tokio::test]
2365 async fn explicit_connect_once_retries_after_concurrent_attempt_fails() {
2366 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2367 .await
2368 .expect("test listener should bind");
2369 let server_url = format!(
2370 "ws://{}/signaling/ws",
2371 listener
2372 .local_addr()
2373 .expect("test listener should have local addr")
2374 );
2375 let server_task = tokio::spawn(async move {
2376 let (stream, _) = listener
2377 .accept()
2378 .await
2379 .expect("test server should accept tcp connection");
2380 let ws_stream = tokio_tungstenite::accept_async(stream)
2381 .await
2382 .expect("test server should complete websocket handshake");
2383 tokio::time::sleep(Duration::from_millis(100)).await;
2384 drop(ws_stream);
2385 });
2386
2387 let mut config = make_config();
2388 config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2389 config.connection_timeout = 2;
2390 config.reconnect_config = ReconnectConfig {
2391 enabled: false,
2392 ..ReconnectConfig::default()
2393 };
2394 let client = make_ws_client(config);
2395
2396 client.connecting.store(true, Ordering::Release);
2397 let connect_task = {
2398 let client = client.clone();
2399 tokio::spawn(async move { client.connect_once().await })
2400 };
2401
2402 tokio::time::sleep(Duration::from_millis(50)).await;
2403 client.connecting.store(false, Ordering::Release);
2404 let _ = client.event_tx.send(SignalingEvent::Disconnected {
2405 reason: DisconnectReason::ConnectionFailed("simulated auto attempt failed".into()),
2406 });
2407
2408 tokio::time::timeout(Duration::from_secs(2), connect_task)
2409 .await
2410 .expect("explicit connect_once should not wait for auto backoff")
2411 .expect("connect_once task should not panic")
2412 .expect("explicit connect_once should retry after concurrent failure");
2413
2414 assert!(
2415 client.is_connected(),
2416 "explicit recovery connect should establish signaling"
2417 );
2418
2419 client.disconnect().await.ok();
2420 let _ = tokio::time::timeout(Duration::from_secs(1), server_task).await;
2421 }
2422
2423 #[tokio::test]
2424 async fn network_restore_connect_once_preempts_connect_backoff() {
2425 let reserved_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2426 .await
2427 .expect("test listener should reserve a local port");
2428 let addr = reserved_listener
2429 .local_addr()
2430 .expect("reserved listener should have local addr");
2431 drop(reserved_listener);
2432
2433 let mut config = make_config();
2434 config.server_url =
2435 Url::parse(&format!("ws://{addr}/signaling/ws")).expect("test URL should parse");
2436 config.connection_timeout = 1;
2437 config.reconnect_config = ReconnectConfig {
2438 enabled: true,
2439 max_attempts: 10,
2440 initial_delay: 30,
2441 max_delay: 30,
2442 backoff_multiplier: 1.0,
2443 };
2444 let client = make_ws_client(config);
2445 let mut rx = client.subscribe_events();
2446
2447 let long_connect_task = {
2448 let client = client.clone();
2449 tokio::spawn(async move { client.connect().await })
2450 };
2451
2452 tokio::time::timeout(Duration::from_secs(2), async {
2453 loop {
2454 match rx.recv().await {
2455 Ok(SignalingEvent::Disconnected {
2456 reason: DisconnectReason::ConnectionFailed(_),
2457 }) => break,
2458 Ok(_) => continue,
2459 Err(e) => panic!("unexpected signaling event receive error: {e}"),
2460 }
2461 }
2462 })
2463 .await
2464 .expect("long connect should fail first attempt and enter backoff");
2465 assert!(
2466 !client.connecting.load(Ordering::Acquire),
2467 "connect() must release connecting while sleeping in backoff"
2468 );
2469
2470 let listener = tokio::net::TcpListener::bind(addr)
2471 .await
2472 .expect("network restore should make the signaling endpoint reachable");
2473 let server_task = tokio::spawn(async move {
2474 let (stream, _) = listener
2475 .accept()
2476 .await
2477 .expect("restored test server should accept tcp connection");
2478 let ws_stream = tokio_tungstenite::accept_async(stream)
2479 .await
2480 .expect("restored test server should complete websocket handshake");
2481 tokio::time::sleep(Duration::from_millis(250)).await;
2482 drop(ws_stream);
2483 });
2484
2485 let restore_result = tokio::time::timeout(
2486 Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS + 2),
2487 {
2488 let client = client.clone();
2489 async move { client.connect_once().await }
2490 },
2491 )
2492 .await
2493 .expect("restore connect_once should complete within the concurrent wait window");
2494
2495 long_connect_task.abort();
2496 server_task.abort();
2497 client.disconnect().await.ok();
2498
2499 assert!(
2500 restore_result.is_ok(),
2501 "network restore should not be blocked by an older connect() backoff; got {restore_result:?}"
2502 );
2503 }
2504
2505 #[tokio::test]
2506 async fn explicit_connect_backoff_reset_restarts_attempt_sequence() {
2507 let reserved_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2508 .await
2509 .expect("test listener should reserve a local port");
2510 let addr = reserved_listener
2511 .local_addr()
2512 .expect("reserved listener should have local addr");
2513 drop(reserved_listener);
2514
2515 let mut config = make_config();
2516 config.server_url =
2517 Url::parse(&format!("ws://{addr}/signaling/ws")).expect("test URL should parse");
2518 config.connection_timeout = 1;
2519 config.reconnect_config = ReconnectConfig {
2520 enabled: true,
2521 max_attempts: 10,
2522 initial_delay: 30,
2523 max_delay: 30,
2524 backoff_multiplier: 1.0,
2525 };
2526 let client = make_ws_client(config);
2527
2528 let (attempt_tx, mut attempt_rx) = tokio::sync::mpsc::unbounded_channel();
2529 let hook_callback: HookCallback = Arc::new(move |event| {
2530 let attempt_tx = attempt_tx.clone();
2531 Box::pin(async move {
2532 if let HookEvent::SignalingConnectStart { attempt } = event {
2533 let _ = attempt_tx.send(attempt);
2534 }
2535 })
2536 });
2537 client.set_hook_callback(hook_callback);
2538
2539 let connect_task = {
2540 let client = client.clone();
2541 tokio::spawn(async move { client.connect().await })
2542 };
2543
2544 assert_eq!(
2545 tokio::time::timeout(Duration::from_secs(1), attempt_rx.recv())
2546 .await
2547 .expect("connect should publish attempt 1"),
2548 Some(1)
2549 );
2550 assert_eq!(
2551 tokio::time::timeout(Duration::from_secs(2), attempt_rx.recv())
2552 .await
2553 .expect("connect should enter first backoff as attempt 2"),
2554 Some(2)
2555 );
2556
2557 client.schedule_auto_reconnect_reset_backoff();
2558
2559 assert_eq!(
2560 tokio::time::timeout(Duration::from_secs(2), attempt_rx.recv())
2561 .await
2562 .expect("reset should restart explicit connect attempts"),
2563 Some(1),
2564 "network recovery reset should restart explicit connect() backoff from attempt 1"
2565 );
2566
2567 connect_task.abort();
2568 client.disconnect().await.ok();
2569 }
2570
2571 #[tokio::test]
2572 async fn test_publish_disconnected_transition_fires_hook_once() {
2573 let stats = Arc::new(AtomicSignalingStats::default());
2574 let (event_tx, mut event_rx) = broadcast::channel(4);
2575 let hook_count = Arc::new(AtomicUsize::new(0));
2576 let hook_count_for_cb = hook_count.clone();
2577 let hook_callback: HookCallback = Arc::new(move |event| {
2578 let hook_count = hook_count_for_cb.clone();
2579 Box::pin(async move {
2580 if matches!(event, HookEvent::SignalingDisconnected) {
2581 hook_count.fetch_add(1, UsizeOrdering::SeqCst);
2582 }
2583 }) as Pin<Box<dyn Future<Output = ()> + Send>>
2584 });
2585
2586 let first = WebSocketSignalingClient::publish_disconnected_transition(
2587 true,
2588 &stats,
2589 &event_tx,
2590 Some(hook_callback.clone()),
2591 DisconnectReason::StreamEnded,
2592 None,
2593 )
2594 .await;
2595 assert!(
2596 first,
2597 "first connected->disconnected transition should publish"
2598 );
2599 assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2600 assert_eq!(stats.snapshot().disconnections, 1);
2601 assert!(matches!(
2602 event_rx.recv().await,
2603 Ok(SignalingEvent::Disconnected {
2604 reason: DisconnectReason::StreamEnded
2605 })
2606 ));
2607
2608 let second = WebSocketSignalingClient::publish_disconnected_transition(
2609 false,
2610 &stats,
2611 &event_tx,
2612 Some(hook_callback),
2613 DisconnectReason::PongTimeout,
2614 None,
2615 )
2616 .await;
2617 assert!(
2618 !second,
2619 "stale duplicate disconnected transition should be ignored"
2620 );
2621 assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2622 assert_eq!(stats.snapshot().disconnections, 1);
2623 assert!(event_rx.try_recv().is_err());
2624 }
2625
2626 #[test]
2631 fn test_reconnect_config_defaults() {
2632 let cfg = ReconnectConfig::default();
2633 assert!(cfg.enabled);
2634 assert_eq!(cfg.max_attempts, 10);
2635 assert_eq!(cfg.initial_delay, 1);
2636 assert_eq!(cfg.max_delay, 60);
2637 assert!((cfg.backoff_multiplier - 2.0).abs() < f64::EPSILON);
2638 }
2639
2640 #[test]
2645 fn test_websocket_signaling_client_initial_state_disconnected() {
2646 let client = WebSocketSignalingClient::new(make_config());
2647 assert!(
2648 !client.is_connected(),
2649 "newly created client should be Disconnected"
2650 );
2651 assert!(
2652 !client.connecting.load(Ordering::Acquire),
2653 "newly created client should not be in connecting state"
2654 );
2655 assert!(
2656 !client.reconnector_started.load(Ordering::Acquire),
2657 "reconnect manager should not be started automatically"
2658 );
2659 }
2660
2661 #[test]
2662 fn test_initial_stats_are_zero() {
2663 let client = WebSocketSignalingClient::new(make_config());
2664 let stats = client.get_stats();
2665 assert_eq!(stats.connections, 0);
2666 assert_eq!(stats.disconnections, 0);
2667 assert_eq!(stats.messages_sent, 0);
2668 assert_eq!(stats.messages_received, 0);
2669 assert_eq!(stats.errors, 0);
2670 }
2671
2672 #[test]
2673 fn test_signaling_url_log_redacts_credential_query_params() {
2674 let url = Url::parse(
2675 "wss://example.com/signaling?actor_id=abc&key_id=7&claims=claims-value&signature=signature-value&token=token-value",
2676 )
2677 .unwrap();
2678
2679 let redacted = WebSocketSignalingClient::redact_signaling_url_for_log(&url);
2680
2681 assert!(redacted.contains("actor_id=abc"));
2682 assert!(redacted.contains("key_id=7"));
2683 assert!(redacted.contains("claims=REDACTED"));
2684 assert!(redacted.contains("signature=REDACTED"));
2685 assert!(redacted.contains("token=REDACTED"));
2686 assert!(!redacted.contains("claims-value"));
2687 assert!(!redacted.contains("signature-value"));
2688 assert!(!redacted.contains("token-value"));
2689 }
2690
2691 #[tokio::test]
2696 async fn test_reconnect_manager_idempotent() {
2697 let client = make_ws_client(make_config());
2698
2699 client.start_reconnect_manager();
2701 assert!(
2702 client.reconnector_started.load(Ordering::Acquire),
2703 "reconnector_started should be true after first call"
2704 );
2705
2706 client.start_reconnect_manager();
2708 assert!(client.reconnector_started.load(Ordering::Acquire));
2710 }
2711
2712 #[tokio::test]
2713 async fn test_reconnect_manager_disabled_when_config_disabled() {
2714 let mut config = make_config();
2715 config.reconnect_config.enabled = false;
2716 let client = make_ws_client(config);
2717
2718 client.start_reconnect_manager();
2719 assert!(
2720 !client.reconnector_started.load(Ordering::Acquire),
2721 "reconnect manager should not start when reconnect config is disabled"
2722 );
2723 }
2724
2725 #[tokio::test]
2726 async fn test_reconnect_manager_does_not_keep_client_alive() {
2727 let client = make_ws_client(make_config());
2728 let weak = Arc::downgrade(&client);
2729
2730 client.start_reconnect_manager();
2731 drop(client);
2732
2733 assert!(
2734 weak.upgrade().is_none(),
2735 "reconnect manager must not keep signaling client alive after owner drop"
2736 );
2737 }
2738
2739 #[tokio::test]
2744 async fn test_connect_fast_path_when_already_connected() {
2745 let client = make_ws_client(make_config());
2746 client.connected.store(true, Ordering::Release);
2748
2749 let result = client.connect().await;
2751 assert!(
2752 result.is_ok(),
2753 "connect() should return Ok when already connected"
2754 );
2755 assert!(!client.connecting.load(Ordering::Acquire));
2757 }
2758
2759 #[tokio::test]
2760 async fn test_connect_sets_connecting_flag() {
2761 let mut config = make_config();
2762 config.reconnect_config.enabled = false; config.connection_timeout = 1;
2764 let client = make_ws_client(config);
2765
2766 let result = client.connect().await;
2768 assert!(
2769 result.is_err(),
2770 "connecting to unreachable address should fail"
2771 );
2772 assert!(
2773 !client.connecting.load(Ordering::Acquire),
2774 "connecting flag should be cleared after connection failure"
2775 );
2776 }
2777
2778 #[tokio::test]
2783 async fn test_event_subscribe_receives_events() {
2784 let client = make_ws_client(make_config());
2785 let mut rx = client.subscribe_events();
2786
2787 let _ = client.event_tx.send(SignalingEvent::Connected);
2789
2790 match tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await {
2791 Ok(Ok(SignalingEvent::Connected)) => {} other => panic!("expected Connected event, but got {:?}", other),
2793 }
2794 }
2795
2796 #[tokio::test]
2797 async fn test_disconnect_event_on_connect_failure() {
2798 let mut config = make_config();
2799 config.reconnect_config.enabled = false;
2800 config.connection_timeout = 1;
2801 let client = make_ws_client(config);
2802 let mut rx = client.subscribe_events();
2803
2804 let _ = client.connect().await;
2806
2807 match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
2809 Ok(Ok(SignalingEvent::Disconnected {
2810 reason: DisconnectReason::ConnectionFailed(_),
2811 })) => {} other => panic!(
2813 "expected Disconnected(ConnectionFailed) event, but got {:?}",
2814 other
2815 ),
2816 }
2817 }
2818
2819 #[tokio::test]
2824 async fn test_disconnect_clears_connected_flag() {
2825 let client = make_ws_client(make_config());
2826 client.connected.store(true, Ordering::Release);
2828 assert!(client.is_connected());
2829
2830 let result = client.disconnect().await;
2831 assert!(result.is_ok());
2832 assert!(
2833 !client.is_connected(),
2834 "should be Disconnected after disconnect()"
2835 );
2836 }
2837
2838 #[tokio::test]
2839 async fn test_disconnect_increments_disconnection_stat() {
2840 let client = make_ws_client(make_config());
2841 client.connected.store(true, Ordering::Release);
2842
2843 let stats_before = client.get_stats().disconnections;
2844 let _ = client.disconnect().await;
2845 let stats_after = client.get_stats().disconnections;
2846 assert_eq!(
2847 stats_after,
2848 stats_before + 1,
2849 "disconnect() should increment disconnection count"
2850 );
2851 }
2852
2853 #[tokio::test]
2854 async fn test_disconnect_idempotent() {
2855 let client = make_ws_client(make_config());
2856
2857 let r1 = client.disconnect().await;
2859 let r2 = client.disconnect().await;
2860 assert!(r1.is_ok());
2861 assert!(r2.is_ok());
2862 assert!(!client.is_connected());
2863 }
2864
2865 #[tokio::test]
2870 async fn test_reconnect_notify_wakes_waiter() {
2871 let notify = Arc::new(tokio::sync::Notify::new());
2872 let notify_clone = notify.clone();
2873 let woken = Arc::new(AtomicBool::new(false));
2874 let woken_clone = woken.clone();
2875
2876 let handle = tokio::spawn(async move {
2877 notify_clone.notified().await;
2878 woken_clone.store(true, Ordering::Release);
2879 });
2880
2881 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2883 assert!(
2884 !woken.load(Ordering::Acquire),
2885 "should not be woken before notification"
2886 );
2887
2888 notify.notify_one();
2890 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2891 assert!(
2892 woken.load(Ordering::Acquire),
2893 "should be woken after notification"
2894 );
2895
2896 handle.abort();
2897 }
2898
2899 #[tokio::test]
2900 async fn test_schedule_auto_reconnect_reenables_after_explicit_disconnect() {
2901 let client = make_ws_client(make_config());
2902
2903 client
2904 .disconnect()
2905 .await
2906 .expect("explicit disconnect should be idempotent");
2907 assert!(
2908 client.auto_reconnect_suppressed.load(Ordering::Acquire),
2909 "explicit disconnect should suppress stale auto-reconnect cycles"
2910 );
2911
2912 client.schedule_auto_reconnect();
2913
2914 assert!(
2915 !client.auto_reconnect_suppressed.load(Ordering::Acquire),
2916 "scheduling a fresh auto-reconnect should clear explicit disconnect suppression"
2917 );
2918 }
2919
2920 #[tokio::test]
2921 async fn test_schedule_auto_reconnect_reset_backoff_restarts_attempt_sequence() {
2922 let mut config = make_config();
2923 config.connection_timeout = 1;
2924 config.reconnect_config = ReconnectConfig {
2925 enabled: true,
2926 max_attempts: 5,
2927 initial_delay: 30,
2928 max_delay: 30,
2929 backoff_multiplier: 1.0,
2930 };
2931 let client = make_ws_client(config);
2932 let mut rx = client.subscribe_events();
2933
2934 let reconnect_client = client.clone();
2935 let reconnect_task = tokio::spawn(async move {
2936 reconnect_client.run_reconnect_cycle().await;
2937 });
2938
2939 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2940 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2941 other => panic!("expected first reconnect attempt, got {other:?}"),
2942 }
2943
2944 match tokio::time::timeout(Duration::from_secs(2), rx.recv()).await {
2945 Ok(Ok(SignalingEvent::Disconnected {
2946 reason: DisconnectReason::ConnectionFailed(_),
2947 })) => {}
2948 other => panic!("expected first reconnect failure, got {other:?}"),
2949 }
2950
2951 client.schedule_auto_reconnect_reset_backoff();
2952
2953 match tokio::time::timeout(Duration::from_secs(2), rx.recv()).await {
2954 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2955 other => panic!("expected reset reconnect attempt to restart at 1, got {other:?}"),
2956 }
2957
2958 client
2959 .disconnect()
2960 .await
2961 .expect("explicit disconnect should stop reconnect cycle");
2962 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2963 .await
2964 .expect("reconnect cycle should stop after explicit disconnect")
2965 .expect("reconnect task should not panic");
2966 }
2967
2968 #[tokio::test]
2969 async fn test_explicit_disconnect_suppresses_reconnect_cycle_in_backoff() {
2970 let mut config = make_config();
2971 config.connection_timeout = 1;
2972 config.reconnect_config = ReconnectConfig {
2973 enabled: true,
2974 max_attempts: 5,
2975 initial_delay: 1,
2976 max_delay: 1,
2977 backoff_multiplier: 1.0,
2978 };
2979 let client = make_ws_client(config);
2980 let mut rx = client.subscribe_events();
2981
2982 let reconnect_client = client.clone();
2983 let reconnect_task = tokio::spawn(async move {
2984 reconnect_client.run_reconnect_cycle().await;
2985 });
2986
2987 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2988 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2989 other => panic!("expected first reconnect attempt, got {other:?}"),
2990 }
2991
2992 client
2993 .disconnect()
2994 .await
2995 .expect("explicit disconnect should be idempotent");
2996
2997 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2998 .await
2999 .expect("suppressed reconnect cycle should exit promptly")
3000 .expect("reconnect task should not panic");
3001
3002 while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
3003 {
3004 if let SignalingEvent::ConnectStart { attempt } = event {
3005 panic!("suppressed reconnect cycle sent unexpected attempt {attempt}");
3006 }
3007 }
3008
3009 assert!(
3010 client.auto_reconnect_suppressed.load(Ordering::Acquire),
3011 "explicit disconnect should suppress stale auto-reconnect cycles"
3012 );
3013 }
3014
3015 #[tokio::test]
3016 async fn test_explicit_disconnect_suppresses_in_flight_auto_reconnect_connected_event() {
3017 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
3018 .await
3019 .expect("test listener should bind");
3020 let server_url = format!(
3021 "ws://{}/signaling/ws",
3022 listener
3023 .local_addr()
3024 .expect("test listener should have local addr")
3025 );
3026 let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
3027
3028 let server_task = tokio::spawn(async move {
3029 let (stream, _) = listener
3030 .accept()
3031 .await
3032 .expect("test server should accept tcp connection");
3033 let _ = release_rx.await;
3034 let ws_stream = tokio_tungstenite::accept_async(stream)
3035 .await
3036 .expect("test server should complete websocket handshake");
3037 tokio::time::sleep(Duration::from_millis(100)).await;
3038 drop(ws_stream);
3039 });
3040
3041 let mut config = make_config();
3042 config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
3043 config.connection_timeout = 5;
3044 config.reconnect_config = ReconnectConfig {
3045 enabled: true,
3046 max_attempts: 3,
3047 initial_delay: 1,
3048 max_delay: 1,
3049 backoff_multiplier: 1.0,
3050 };
3051 let client = make_ws_client(config);
3052 let mut rx = client.subscribe_events();
3053
3054 let reconnect_client = client.clone();
3055 let reconnect_task = tokio::spawn(async move {
3056 reconnect_client.run_reconnect_cycle().await;
3057 });
3058
3059 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
3060 Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
3061 other => panic!("expected first reconnect attempt, got {other:?}"),
3062 }
3063
3064 client
3065 .disconnect()
3066 .await
3067 .expect("explicit disconnect should cancel the in-flight auto-reconnect");
3068 release_tx
3069 .send(())
3070 .expect("test server handshake should still be waiting");
3071
3072 tokio::time::timeout(Duration::from_secs(2), reconnect_task)
3073 .await
3074 .expect("cancelled in-flight reconnect should exit promptly")
3075 .expect("reconnect task should not panic");
3076
3077 while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(150), rx.recv()).await
3078 {
3079 assert!(
3080 !matches!(event, SignalingEvent::Connected),
3081 "cancelled auto-reconnect must not publish Connected"
3082 );
3083 }
3084
3085 assert!(
3086 !client.is_connected(),
3087 "cancelled auto-reconnect must not leave signaling connected"
3088 );
3089
3090 tokio::time::timeout(Duration::from_secs(1), server_task)
3091 .await
3092 .expect("test server task should finish")
3093 .expect("test server task should not panic");
3094 }
3095
3096 #[tokio::test]
3101 async fn test_build_url_without_identity() {
3102 let config = make_config();
3103 let expected_base = config.server_url.to_string();
3104 let client = WebSocketSignalingClient::new(config);
3105
3106 let url = client.build_url_with_identity().await;
3107 assert_eq!(
3108 url.to_string(),
3109 expected_base,
3110 "URL should not contain identity parameters when actor_id is not set"
3111 );
3112 }
3113
3114 #[tokio::test]
3115 async fn test_build_url_with_webrtc_role() {
3116 let mut config = make_config();
3117 config.webrtc_role = Some("answer".to_string());
3118 let client = WebSocketSignalingClient::new(config);
3119
3120 let url = client.build_url_with_identity().await;
3121 assert!(
3122 url.query().unwrap_or("").contains("webrtc_role=answer"),
3123 "URL should contain webrtc_role parameter, actual URL: {}",
3124 url
3125 );
3126 }
3127
3128 #[tokio::test]
3133 async fn test_reset_inbound_channel_creates_fresh_channel() {
3134 let client = WebSocketSignalingClient::new(make_config());
3135
3136 {
3138 let tx = client.inbound_tx.lock().await;
3139 let _ = tx.send(SignalingEnvelope::default());
3140 }
3141
3142 client.reset_inbound_channel().await;
3144
3145 let mut rx = client.inbound_rx.lock().await;
3147 let result = rx.try_recv();
3148 assert!(
3149 result.is_err(),
3150 "old messages should not be visible in the new channel after reset"
3151 );
3152 }
3153
3154 #[tokio::test]
3159 async fn test_envelope_id_monotonically_increasing() {
3160 let client = WebSocketSignalingClient::new(make_config());
3161
3162 let id1 = client.next_envelope_id().await;
3163 let id2 = client.next_envelope_id().await;
3164 let id3 = client.next_envelope_id().await;
3165
3166 assert_eq!(id1, "env-1");
3167 assert_eq!(id2, "env-2");
3168 assert_eq!(id3, "env-3");
3169 }
3170
3171 #[tokio::test]
3176 async fn test_send_envelope_fails_when_not_connected() {
3177 let client = WebSocketSignalingClient::new(make_config());
3178 let envelope = SignalingEnvelope::default();
3179
3180 let result = client.send_envelope(envelope).await;
3181 assert!(
3182 result.is_err(),
3183 "send_envelope should return error when not connected"
3184 );
3185 match result {
3186 Err(NetworkError::ConnectionError(msg)) => {
3187 assert!(
3188 msg.contains("not connected") || msg.contains("Not connected"),
3189 "error message should contain 'not connected', actual: {}",
3190 msg
3191 );
3192 }
3193 other => panic!("expected ConnectionError, got {:?}", other),
3194 }
3195 }
3196
3197 #[tokio::test]
3202 async fn test_fake_client_tracks_connect_calls() {
3203 let client = make_fake_client();
3204 assert_eq!(client.connect_calls.load(UsizeOrdering::SeqCst), 0);
3205
3206 client.connect().await.unwrap();
3207 client.connect().await.unwrap();
3208 client.connect().await.unwrap();
3209
3210 assert_eq!(
3211 client.connect_calls.load(UsizeOrdering::SeqCst),
3212 3,
3213 "FakeSignalingClient should accurately track connect call count"
3214 );
3215 }
3216}