Skip to main content

actr_hyper/wire/webrtc/
signaling.rs

1//! signaling clientImplementation
2//!
3//! Based on protobuf Definition'ssignalingprotocol, using SignalingEnvelope conclude construct
4
5#[cfg(feature = "opentelemetry")]
6use super::trace;
7use crate::lifecycle::CredentialState;
8use crate::transport::{NetworkError, NetworkResult};
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::extract_trace_context;
11use actr_protocol::prost::Message as ProstMessage;
12use actr_protocol::{
13    AIdCredential, ActrId, ActrToSignaling, CredentialUpdateRequest, GetSigningKeyRequest,
14    PeerToSignaling, Ping, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
15    RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterRequest,
16    UnregisterResponse, actr_to_signaling, peer_to_signaling, signaling_envelope,
17    signaling_to_actr,
18};
19use async_trait::async_trait;
20use base64::Engine as _;
21use futures_util::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::{
27    Arc, OnceLock,
28    atomic::{AtomicBool, AtomicU64, Ordering},
29};
30use std::time::Duration;
31use tokio::net::TcpStream;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
34use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async_with_config};
35#[cfg(feature = "opentelemetry")]
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use url::Url;
38
39/// WebSocket sink type alias for the split write half of a signaling connection
40type WsSink = Arc<
41    tokio::sync::Mutex<
42        Option<
43            futures_util::stream::SplitSink<
44                WebSocketStream<MaybeTlsStream<TcpStream>>,
45                tokio_tungstenite::tungstenite::Message,
46            >,
47        >,
48    >,
49>;
50
51// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
52// Constants
53// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
54
55/// Default timeout in seconds for waiting for signaling response
56const RESPONSE_TIMEOUT_SECS: u64 = 15;
57// WebSocket-level keepalive to detect silent half-open connections
58const PING_INTERVAL_SECS: u64 = 5;
59const PONG_TIMEOUT_SECS: u64 = 10;
60const SIGNALING_SEND_TIMEOUT_SECS: u64 = 5;
61const CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS: u64 = 5;
62const DISCONNECT_LOCK_TIMEOUT_SECS: u64 = 5;
63const DISCONNECT_CLOSE_TIMEOUT_SECS: u64 = 1;
64
65// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
66// configurationType
67// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
68
69/// signalingconfiguration
70#[derive(Debug, Clone)]
71pub struct SignalingConfig {
72    /// signaling server URL
73    pub server_url: Url,
74
75    /// Connecttimeout temporal duration (seconds)
76    pub connection_timeout: u64,
77
78    /// center skipinterval(seconds)
79    pub heartbeat_interval: u64,
80
81    /// reconnection configuration
82    pub reconnect_config: ReconnectConfig,
83
84    /// acknowledge verify configuration
85    pub auth_config: Option<AuthConfig>,
86
87    /// WebRTC role preference: "answer" if this node has advanced config
88    pub webrtc_role: Option<String>,
89}
90
91/// reconnection configuration
92#[derive(Debug, Clone)]
93pub struct ReconnectConfig {
94    /// whether start usage automatic reconnection
95    pub enabled: bool,
96
97    /// maximum reconnection attempts
98    pub max_attempts: u32,
99
100    /// initial reconnection delay(seconds)
101    pub initial_delay: u64,
102
103    /// maximum reconnection delay(seconds)
104    pub max_delay: u64,
105
106    /// Backoff multiplier factor
107    pub backoff_multiplier: f64,
108}
109
110impl Default for ReconnectConfig {
111    fn default() -> Self {
112        Self {
113            enabled: true,
114            max_attempts: 10,
115            initial_delay: 1,
116            max_delay: 60,
117            backoff_multiplier: 2.0,
118        }
119    }
120}
121
122/// acknowledge verify configuration
123#[derive(Debug, Clone)]
124pub struct AuthConfig {
125    /// acknowledge verify Type
126    pub auth_type: AuthType,
127
128    /// acknowledge verify credential data
129    pub credentials: HashMap<String, String>,
130}
131
132/// acknowledge verify Type
133#[derive(Debug, Clone)]
134pub enum AuthType {
135    /// no acknowledge verify
136    None,
137    /// Bearer Token
138    BearerToken,
139    /// API Key
140    ApiKey,
141    /// JWT
142    Jwt,
143}
144
145// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
146// Client interface and implementation
147// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
148
149/// signaling client connect port
150///
151/// # interior mutability
152/// allMethodusing `&self` and non `&mut self`, with for conveniencein Arc in shared.
153/// Implementation class needs interior mutability ( like Mutex)to manage WebSocket connection status.
154#[async_trait]
155pub trait SignalingClient: Send + Sync {
156    /// Connecttosignaling server
157    async fn connect(&self) -> NetworkResult<()>;
158
159    /// Perform a single explicit connection attempt.
160    ///
161    /// Network recovery events use this path so a failed restore attempt can
162    /// return quickly instead of sleeping inside the normal reconnect backoff.
163    async fn connect_once(&self) -> NetworkResult<()> {
164        self.connect().await
165    }
166
167    /// Re-enable and wake the automatic reconnect manager after an explicit
168    /// lifecycle recovery attempt failed.
169    fn schedule_auto_reconnect(&self) {}
170
171    /// DisconnectConnect
172    async fn disconnect(&self) -> NetworkResult<()>;
173
174    /// Probe whether the existing signaling WebSocket is truly alive.
175    ///
176    /// The default implementation only checks local state. WebSocket-backed
177    /// clients override this with an active Ping/Pong probe to catch half-open
178    /// sockets before network recovery decides whether to reconnect.
179    async fn probe_alive(&self, _timeout: Duration) -> NetworkResult<()> {
180        if self.is_connected() {
181            Ok(())
182        } else {
183            Err(NetworkError::ConnectionError(
184                "Signaling client is not connected".to_string(),
185            ))
186        }
187    }
188
189    /// Deprecated: Registration now happens via AIS HTTP; this WS path is no longer used.
190    /// Kept for backward compatibility; will be removed in a future release.
191    async fn send_register_request(
192        &self,
193        request: RegisterRequest,
194    ) -> NetworkResult<RegisterResponse>;
195
196    /// Send UnregisterRequest to signaling server (Actr → Signaling flow)
197    ///
198    /// This is used when an Actor is shutting down gracefully and wants to
199    /// proactively notify the signaling server that it is no longer available.
200    async fn send_unregister_request(
201        &self,
202        actor_id: ActrId,
203        credential: AIdCredential,
204        reason: Option<String>,
205    ) -> NetworkResult<UnregisterResponse>;
206
207    /// Send center skip(Registerafter stream process, using ActrToSignaling)
208    /// Returns Pong response if received, error if timeout or no response
209    async fn send_heartbeat(
210        &self,
211        actor_id: ActrId,
212        credential: AIdCredential,
213        availability: ServiceAvailabilityState,
214        power_reserve: f32,
215        mailbox_backlog: f32,
216    ) -> NetworkResult<Pong>;
217
218    /// Send RouteCandidatesRequest (requires authenticated Actor session)
219    async fn send_route_candidates_request(
220        &self,
221        actor_id: ActrId,
222        credential: AIdCredential,
223        request: RouteCandidatesRequest,
224    ) -> NetworkResult<RouteCandidatesResponse>;
225
226    /// Query AIS Ed25519 signing public key via signaling
227    ///
228    /// Returns `(key_id, pubkey_bytes)` where pubkey_bytes is the 32-byte raw public key.
229    /// Typically called by AisKeyCache on cache miss; should not be used directly in hot paths.
230    async fn get_signing_key(
231        &self,
232        actor_id: ActrId,
233        credential: AIdCredential,
234        key_id: u32,
235    ) -> NetworkResult<(u32, Vec<u8>)>;
236
237    /// Send CredentialUpdateRequest to refresh the Actor's credential
238    ///
239    /// This is used to refresh the credential before it expires. The server responds
240    /// with a RegisterResponse containing the new credential and expiration time.
241    async fn send_credential_update_request(
242        &self,
243        actor_id: ActrId,
244        credential: AIdCredential,
245    ) -> NetworkResult<RegisterResponse>;
246
247    /// Sendsignalingsignal seal ( pass usage Method)
248    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
249
250    /// Receivesignalingsignal seal
251    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
252
253    /// Check connection status
254    fn is_connected(&self) -> bool;
255
256    /// GetConnect statistics info
257    fn get_stats(&self) -> SignalingStats;
258    /// Subscribe to signaling events (state transitions).
259    fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent>;
260
261    /// Set actor ID and credential state for reconnect URL parameters.
262    async fn set_actor_id(&self, actor_id: ActrId);
263    async fn set_credential_state(&self, credential_state: CredentialState);
264
265    /// Clear stored actor ID and credential state.
266    ///
267    /// After calling this, `connect()` will produce a clean WebSocket URL
268    /// without identity query parameters, so the signaling server treats
269    /// the connection as brand-new rather than a reconnect of the old actor.
270    /// This is required before re-registration when the credential has expired.
271    async fn clear_identity(&self);
272
273    /// Set a lifecycle hook callback that will be invoked (and awaited)
274    /// whenever signaling state changes (connect/disconnect).
275    /// Default implementation is a no-op for clients that don't support hooks.
276    fn set_hook_callback(&self, _cb: HookCallback) {}
277}
278
279/// High-level signaling connection state (kept for quick boolean checks).
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub enum ConnectionState {
282    Disconnected,
283    Connected,
284}
285
286/// Signaling state transition events.
287///
288/// Unlike `ConnectionState` (which is a snapshot), these represent discrete
289/// transitions and are delivered via `broadcast` so every subscriber sees
290/// every event, even if the same state occurs twice in a row.
291#[derive(Debug, Clone)]
292pub enum SignalingEvent {
293    /// About to start a connection attempt (includes retry count).
294    ConnectStart { attempt: u32 },
295    /// Connection successfully established.
296    Connected,
297    /// Connection lost.
298    Disconnected { reason: DisconnectReason },
299}
300
301/// Reason why the signaling connection was lost.
302#[derive(Debug, Clone)]
303pub enum DisconnectReason {
304    /// WebSocket stream ended (receiver task exited normally).
305    StreamEnded,
306    /// No Pong received within the timeout window.
307    PongTimeout,
308    /// Failed to send a WebSocket Ping frame.
309    PingSendFailed,
310    /// Credential expired (heartbeat 401).
311    CredentialExpired,
312    /// Explicit disconnect() call or external trigger.
313    Manual,
314    /// Connection attempt failed with an error.
315    ConnectionFailed(String),
316}
317
318// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
319// Hook callback for synchronous lifecycle notification
320// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
321
322/// Events that trigger workload lifecycle hooks.
323///
324/// Used by `HookCallback` to invoke workload hooks synchronously (awaited)
325/// at the point where the state change occurs.
326#[derive(Clone, Debug)]
327pub enum HookEvent {
328    // ── Signaling ──
329    SignalingConnectStart {
330        attempt: u32,
331    },
332    SignalingConnected,
333    SignalingDisconnected,
334    // ── WebRTC ──
335    WebRtcConnectStart {
336        peer_id: ActrId,
337    },
338    WebRtcConnected {
339        peer_id: ActrId,
340        relayed: bool,
341    },
342    WebRtcDisconnected {
343        peer_id: ActrId,
344    },
345    DataStreamDeliveryUncertain {
346        stream_id: String,
347        session_id: u64,
348        reason: String,
349    },
350    // ── WebSocket ──
351    WebSocketConnectStart {
352        peer_id: ActrId,
353    },
354    WebSocketConnected {
355        peer_id: ActrId,
356    },
357    WebSocketDisconnected {
358        peer_id: ActrId,
359    },
360    // ── Credential ──
361    CredentialRenewed {
362        new_expiry: std::time::SystemTime,
363    },
364    CredentialExpiring {
365        new_expiry: std::time::SystemTime,
366    },
367    // ── Mailbox ──
368    MailboxBackpressure {
369        queue_len: usize,
370        threshold: usize,
371    },
372}
373
374/// Callback closure that is awaited when a hook event occurs.
375///
376/// Set once via `set_hook_callback()`. All state-change paths invoke this
377/// closure and `.await` its result before proceeding.
378pub type HookCallback =
379    Arc<dyn Fn(HookEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
380
381#[derive(Debug, Clone, Copy)]
382enum ConnectIntent {
383    Explicit,
384    AutoReconnect { generation: u64 },
385}
386
387/// WebSocket signaling clientImplementation
388pub struct WebSocketSignalingClient {
389    config: SignalingConfig,
390    actor_id: tokio::sync::Mutex<Option<ActrId>>,
391    credential_state: tokio::sync::Mutex<Option<CredentialState>>,
392    /// WebSocket write end (using Mutex Implementation interior mutability )
393    ws_sink: WsSink,
394    /// WebSocket read end (using Mutex Implementation interior mutability )
395    ws_stream: tokio::sync::Mutex<
396        Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
397    >,
398    /// connection status
399    connected: Arc<AtomicBool>,
400    /// Connection in progress flag (prevents concurrent connect attempts)
401    connecting: Arc<AtomicBool>,
402    /// statistics info
403    stats: Arc<AtomicSignalingStats>,
404    /// Envelope count number device
405    envelope_counter: tokio::sync::Mutex<u64>,
406    /// Pending reply waiters (reply_for -> oneshot)
407    pending_replies: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<SignalingEnvelope>>>>,
408    /// Pending WebSocket Pong waiters (ping payload -> oneshot)
409    pending_pongs: Arc<tokio::sync::Mutex<HashMap<Vec<u8>, oneshot::Sender<()>>>>,
410    /// Monotonic probe payload counter.
411    probe_counter: AtomicU64,
412    /// Inbound envelope channel for unmatched messages (ActrRelay / push)
413    inbound_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<SignalingEnvelope>>>,
414    inbound_tx: tokio::sync::Mutex<mpsc::UnboundedSender<SignalingEnvelope>>,
415    /// Background receive task handle to allow graceful shutdown
416    receiver_task: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
417    /// Background ping task to detect half-open connections
418    ping_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
419    /// Connection state broadcast channel (event-driven)
420    event_tx: broadcast::Sender<SignalingEvent>,
421    /// Last time we saw inbound traffic (pong/any message), unix epoch seconds
422    last_pong: Arc<AtomicU64>,
423    /// Flag to track if reconnect manager has been started
424    reconnector_started: Arc<AtomicBool>,
425    /// Notify channel to wake up the reconnect manager
426    reconnect_notify: Arc<tokio::sync::Notify>,
427    /// Explicit disconnects from lifecycle/cleanup suppress stale auto-reconnect cycles.
428    auto_reconnect_suppressed: AtomicBool,
429    /// Incremented by explicit disconnects to invalidate in-flight auto-reconnect attempts.
430    reconnect_generation: AtomicU64,
431    /// Hook callback for synchronous lifecycle notification (set once, lock-free read)
432    hook_callback: OnceLock<HookCallback>,
433}
434
435impl WebSocketSignalingClient {
436    /// Create new WebSocket signaling client
437    pub fn new(config: SignalingConfig) -> Self {
438        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
439        let (event_tx, _event_rx) = broadcast::channel(64);
440        Self {
441            config,
442            actor_id: tokio::sync::Mutex::new(None),
443            credential_state: tokio::sync::Mutex::new(None),
444            ws_sink: Arc::new(tokio::sync::Mutex::new(None)),
445            ws_stream: tokio::sync::Mutex::new(None),
446            connected: Arc::new(AtomicBool::new(false)),
447            connecting: Arc::new(AtomicBool::new(false)),
448            stats: Arc::new(AtomicSignalingStats::default()),
449            envelope_counter: tokio::sync::Mutex::new(0),
450            pending_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
451            pending_pongs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
452            probe_counter: AtomicU64::new(0),
453            inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
454            inbound_tx: tokio::sync::Mutex::new(inbound_tx),
455            receiver_task: Arc::new(tokio::sync::Mutex::new(None)),
456            ping_task: tokio::sync::Mutex::new(None),
457            event_tx,
458            last_pong: Arc::new(AtomicU64::new(0)),
459            reconnector_started: Arc::new(AtomicBool::new(false)),
460            reconnect_notify: Arc::new(tokio::sync::Notify::new()),
461            auto_reconnect_suppressed: AtomicBool::new(false),
462            reconnect_generation: AtomicU64::new(0),
463            hook_callback: OnceLock::new(),
464        }
465    }
466
467    /// Start the reconnect manager if enabled in config and not already started.
468    ///
469    /// The manager waits on a `Notify` and runs an exponential-backoff retry loop
470    /// each time it is woken up.
471    /// Invoke the hook callback and await its completion.
472    /// No-op if no callback has been set yet.
473    async fn invoke_hook(&self, event: HookEvent) {
474        if let Some(cb) = self.hook_callback.get() {
475            cb(event).await;
476        }
477    }
478
479    async fn publish_disconnected_transition(
480        was_connected: bool,
481        stats: &Arc<AtomicSignalingStats>,
482        event_tx: &broadcast::Sender<SignalingEvent>,
483        hook_callback: Option<HookCallback>,
484        reason: DisconnectReason,
485        reconnect_notify: Option<&Arc<tokio::sync::Notify>>,
486    ) -> bool {
487        if !was_connected {
488            return false;
489        }
490
491        stats.disconnections.fetch_add(1, Ordering::Relaxed);
492
493        if let Some(cb) = hook_callback {
494            cb(HookEvent::SignalingDisconnected).await;
495        }
496
497        let _ = event_tx.send(SignalingEvent::Disconnected { reason });
498
499        if let Some(notify) = reconnect_notify {
500            notify.notify_one();
501        }
502
503        true
504    }
505
506    pub fn start_reconnect_manager(self: &Arc<Self>) {
507        if !self.config.reconnect_config.enabled {
508            return;
509        }
510        if self
511            .reconnector_started
512            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
513            .is_err()
514        {
515            return; // already started
516        }
517
518        tracing::info!("🔄 Starting reconnect manager for signaling client");
519
520        let client = Arc::downgrade(self);
521        let notify = self.reconnect_notify.clone();
522
523        tokio::spawn(async move {
524            loop {
525                let reconnect_requested = tokio::select! {
526                    _ = notify.notified() => true,
527                    _ = tokio::time::sleep(Duration::from_secs(30)) => false,
528                };
529
530                if !reconnect_requested && client.upgrade().is_none() {
531                    break;
532                }
533                if !reconnect_requested {
534                    continue;
535                }
536
537                let Some(client) = client.upgrade() else {
538                    break;
539                };
540
541                if !client.config.reconnect_config.enabled {
542                    break;
543                }
544
545                if Arc::strong_count(&client) <= 1 {
546                    break;
547                }
548
549                // Run reconnect cycle with exponential backoff
550                client.run_reconnect_cycle().await;
551            }
552        });
553    }
554
555    /// Execute one full reconnect cycle with exponential backoff + jitter.
556    async fn run_reconnect_cycle(self: &Arc<Self>) {
557        use actr_framework::ExponentialBackoff;
558
559        let cfg = &self.config.reconnect_config;
560        let generation = self.reconnect_generation.load(Ordering::Acquire);
561
562        if Arc::strong_count(self) <= 1 {
563            tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
564            return;
565        }
566
567        if self.auto_reconnect_cancelled(generation) {
568            tracing::debug!("Skipping signaling auto-reconnect cycle after explicit disconnect");
569            return;
570        }
571
572        if self.connected.load(Ordering::Acquire) {
573            tracing::debug!("🔎 Probing connected signaling before reconnect cycle");
574            match self
575                .probe_alive(Duration::from_secs(PONG_TIMEOUT_SECS))
576                .await
577            {
578                Ok(()) => {
579                    tracing::debug!("Signaling probe succeeded, skipping reconnect cycle");
580                    return;
581                }
582                Err(e) => {
583                    tracing::warn!("Signaling probe failed before reconnect: {e}");
584                    if let Err(disconnect_err) = self.disconnect_internal(false).await {
585                        tracing::warn!(
586                            "⚠️ Disconnect cleanup failed after failed probe (non-fatal): {disconnect_err}"
587                        );
588                    }
589                }
590            }
591        }
592
593        let backoff = ExponentialBackoff::builder()
594            .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
595            .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
596            .max_retries(cfg.max_attempts)
597            .with_jitter()
598            .build();
599
600        let mut attempt: u32 = 0;
601
602        for delay in backoff {
603            if Arc::strong_count(self) <= 1 {
604                tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
605                return;
606            }
607
608            if self.auto_reconnect_cancelled(generation) {
609                tracing::debug!(
610                    "Stopping signaling auto-reconnect cycle after explicit disconnect"
611                );
612                return;
613            }
614
615            if self.connected.load(Ordering::Acquire) {
616                tracing::debug!("Already connected, aborting reconnect cycle");
617                return;
618            }
619
620            attempt += 1;
621            let _ = self.event_tx.send(SignalingEvent::ConnectStart { attempt });
622
623            match self.connect_once_for_auto_reconnect(generation).await {
624                Ok(()) => {
625                    tracing::info!("✅ Signaling reconnect succeeded on attempt {attempt}");
626                    return;
627                }
628                Err(e) => {
629                    if self.auto_reconnect_cancelled(generation) {
630                        tracing::debug!(
631                            "Stopping signaling auto-reconnect cycle after explicit disconnect"
632                        );
633                        return;
634                    }
635
636                    tracing::warn!(
637                        "❌ Reconnect attempt {attempt} failed: {e}, retrying in {delay:?}"
638                    );
639                    tokio::select! {
640                        _ = tokio::time::sleep(delay) => {}
641                        _ = self.reconnect_notify.notified() => {
642                            tracing::debug!("Explicit reconnect request interrupted reconnect backoff");
643                        }
644                    }
645                    if Arc::strong_count(self) <= 1 {
646                        tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
647                        return;
648                    }
649                    if self.auto_reconnect_cancelled(generation) {
650                        tracing::debug!(
651                            "Stopping signaling auto-reconnect cycle after explicit disconnect"
652                        );
653                        return;
654                    }
655                }
656            }
657        }
658
659        // All retries exhausted — enter cooldown, then allow future wakeups
660        tracing::error!("Reconnect failed after {attempt} attempts, entering cooldown");
661        let cooldown = std::time::Duration::from_secs(cfg.max_delay.max(1) * 2);
662        tokio::select! {
663            _ = tokio::time::sleep(cooldown) => {}
664            _ = self.reconnect_notify.notified() => {
665                tracing::debug!("Explicit reconnect request interrupted reconnect cooldown");
666            }
667        }
668        if self.auto_reconnect_cancelled(generation) {
669            tracing::debug!(
670                "Signaling auto-reconnect cooldown ended after explicit disconnect suppression"
671            );
672        }
673        // After cooldown, the loop returns to notify.notified() and can be woken again
674    }
675
676    /// Test-only convenience constructor: create, connect, and return a client.
677    ///
678    /// The returned client has no `actor_id` / `credential_state` bound, so the
679    /// signaling URL carries no identity query parameters — mock-actrix will
680    /// not bind the WebSocket to any registry entry. Use this only for tests
681    /// that explicitly exercise the unbound path; integration tests that need
682    /// peer-to-peer relay should use [`Self::connect_to_with_identity`].
683    #[cfg(feature = "test-utils")]
684    pub async fn connect_to(url: &str) -> NetworkResult<Arc<Self>> {
685        let config = SignalingConfig {
686            server_url: url.parse()?,
687            connection_timeout: 5,
688            heartbeat_interval: 30,
689            reconnect_config: ReconnectConfig::default(),
690            auth_config: None,
691            webrtc_role: None,
692        };
693
694        let client = Arc::new(Self::new(config));
695        client.start_reconnect_manager();
696        client.connect().await?;
697        Ok(client)
698    }
699
700    /// Test-only constructor that pins identity *before* the WebSocket
701    /// handshake so mock-actrix can bind the connection to the actor on
702    /// register (`?actor_id=…` query parameter). Required by integration
703    /// tests that rely on peer-to-peer signaling relay — without this binding
704    /// mock-actrix drops outbound relays for "unbound target".
705    #[cfg(feature = "test-utils")]
706    pub async fn connect_to_with_identity(
707        url: &str,
708        actor_id: ActrId,
709        credential_state: CredentialState,
710    ) -> NetworkResult<Arc<Self>> {
711        let config = SignalingConfig {
712            server_url: url.parse()?,
713            connection_timeout: 5,
714            heartbeat_interval: 30,
715            reconnect_config: ReconnectConfig::default(),
716            auth_config: None,
717            webrtc_role: None,
718        };
719
720        let client = Arc::new(Self::new(config));
721        client.set_actor_id(actor_id).await;
722        client.set_credential_state(credential_state).await;
723        client.start_reconnect_manager();
724        client.connect().await?;
725        Ok(client)
726    }
727
728    /// alive integrate down a envelope ID
729    async fn next_envelope_id(&self) -> String {
730        let mut counter = self.envelope_counter.lock().await;
731        *counter += 1;
732        format!("env-{}", *counter)
733    }
734
735    /// Create SignalingEnvelope
736    async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
737        SignalingEnvelope {
738            envelope_version: 1,
739            envelope_id: self.next_envelope_id().await,
740            reply_for: None,
741            timestamp: prost_types::Timestamp {
742                seconds: chrono::Utc::now().timestamp(),
743                nanos: 0,
744            },
745            traceparent: None,
746            tracestate: None,
747            flow: Some(flow),
748        }
749    }
750
751    /// Reset inbound channel for a fresh session (useful after disconnects).
752    async fn reset_inbound_channel(&self) {
753        self.drop_pending_replies("inbound channel reset").await;
754        self.drop_pending_pongs("inbound channel reset").await;
755
756        let (tx, rx) = mpsc::unbounded_channel();
757        *self.inbound_tx.lock().await = tx;
758        *self.inbound_rx.lock().await = rx;
759    }
760
761    async fn drop_pending_replies(&self, reason: &'static str) {
762        let dropped = {
763            let mut pending = self.pending_replies.lock().await;
764            let dropped = pending.len();
765            pending.clear();
766            dropped
767        };
768
769        if dropped > 0 {
770            tracing::debug!(reason, dropped, "Dropping pending signaling reply waiters");
771        }
772    }
773
774    async fn drop_pending_pongs(&self, reason: &'static str) {
775        let dropped = {
776            let mut pending = self.pending_pongs.lock().await;
777            let dropped = pending.len();
778            pending.clear();
779            dropped
780        };
781
782        if dropped > 0 {
783            tracing::debug!(reason, dropped, "Dropping pending signaling pong waiters");
784        }
785    }
786
787    /// Build signaling URL with actor identity and Ed25519 credential params for authentication.
788    ///
789    /// Passes `actor_id`, `key_id`, `claims` (base64), `signature` (base64) as URL query params
790    /// so the signaling server can validate the credential before upgrading the WebSocket.
791    async fn build_url_with_identity(&self) -> Url {
792        let mut url = self.config.server_url.clone();
793        let actor_id_opt = self.actor_id.lock().await.clone();
794        if let Some(actor_id) = actor_id_opt {
795            let actor_str = actr_protocol::ActrId::to_string_repr(&actor_id);
796            url.query_pairs_mut().append_pair("actor_id", &actor_str);
797        }
798
799        // Pass Ed25519 credential in URL for initial WS auth
800        let cred_state_opt = self.credential_state.lock().await.clone();
801        if let Some(cred_state) = cred_state_opt {
802            let cred = cred_state.credential().await;
803            let claims_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.claims);
804            let sig_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.signature);
805            url.query_pairs_mut()
806                .append_pair("key_id", &cred.key_id.to_string())
807                .append_pair("claims", &claims_b64)
808                .append_pair("signature", &sig_b64);
809        }
810
811        // Add WebRTC role preference if configured
812        if let Some(role) = &self.config.webrtc_role {
813            url.query_pairs_mut().append_pair("webrtc_role", role);
814        }
815
816        url
817    }
818
819    fn redact_signaling_url_for_log(url: &Url) -> String {
820        let mut redacted = url.clone();
821        let pairs: Vec<(String, String)> = redacted
822            .query_pairs()
823            .map(|(key, value)| {
824                let redacted_value = match key.to_ascii_lowercase().as_str() {
825                    "claims" | "signature" | "token" | "authorization" | "bearer"
826                    | "access_token" | "api_key" => "REDACTED".to_string(),
827                    _ => value.into_owned(),
828                };
829                (key.into_owned(), redacted_value)
830            })
831            .collect();
832
833        redacted.set_query(None);
834        if !pairs.is_empty() {
835            let mut query = redacted.query_pairs_mut();
836            for (key, value) in pairs {
837                query.append_pair(&key, &value);
838            }
839        }
840
841        redacted.to_string()
842    }
843
844    fn auto_reconnect_cancelled(&self, generation: u64) -> bool {
845        self.auto_reconnect_suppressed.load(Ordering::Acquire)
846            || self.reconnect_generation.load(Ordering::Acquire) != generation
847    }
848
849    /// Establish a single signaling WebSocket connection attempt, honoring connection_timeout.
850    ///
851    /// This does not perform any retry logic; callers that want retries should wrap this.
852    async fn establish_connection_once(&self) -> NetworkResult<()> {
853        self.establish_connection_once_with_intent(ConnectIntent::Explicit)
854            .await
855    }
856
857    async fn establish_connection_once_for_auto_reconnect(
858        &self,
859        generation: u64,
860    ) -> NetworkResult<()> {
861        self.establish_connection_once_with_intent(ConnectIntent::AutoReconnect { generation })
862            .await
863    }
864
865    async fn establish_connection_once_with_intent(
866        &self,
867        intent: ConnectIntent,
868    ) -> NetworkResult<()> {
869        // Guard: Check if already connected (handles rare TOCTOU scenarios)
870        if self.connected.load(Ordering::Acquire) {
871            tracing::debug!("Connection already established, skipping establish_connection_once()");
872            return Ok(());
873        }
874
875        let url = self.build_url_with_identity().await;
876        let timeout_secs = self.config.connection_timeout;
877        tracing::debug!(
878            "Establishing connection to URL: {}",
879            Self::redact_signaling_url_for_log(&url)
880        );
881        // After disconnection, data written to the buffer will continue to be sent once the network recovers
882        let config = WebSocketConfig::default().write_buffer_size(0);
883        // Connect with an optional timeout. A timeout of 0 means "no timeout".
884        let connect_result = if timeout_secs == 0 {
885            connect_async_with_config(url.as_str(), Some(config), false).await
886        } else {
887            let timeout_duration = std::time::Duration::from_secs(timeout_secs);
888            tokio::time::timeout(
889                timeout_duration,
890                connect_async_with_config(url.as_str(), Some(config), false),
891            )
892            .await
893            .map_err(|_| {
894                NetworkError::ConnectionError(format!(
895                    "Signaling connect timeout after {}s",
896                    timeout_secs
897                ))
898            })?
899        }?;
900
901        let (ws_stream, _) = connect_result;
902
903        // Split read/write halves and initialize client state
904        let (sink, stream) = ws_stream.split();
905
906        if let ConnectIntent::AutoReconnect { generation } = intent
907            && self.auto_reconnect_cancelled(generation)
908        {
909            tracing::debug!(
910                generation,
911                "Discarding completed signaling auto-reconnect after explicit disconnect"
912            );
913            let mut sink = sink;
914            if let Err(e) = sink.close().await {
915                tracing::warn!(
916                    "Signaling auto-reconnect socket close failed after cancellation: {}",
917                    e
918                );
919            }
920            return Err(NetworkError::ConnectionError(
921                "Signaling auto-reconnect was cancelled by explicit disconnect".to_string(),
922            ));
923        }
924
925        *self.ws_sink.lock().await = Some(sink);
926        *self.ws_stream.lock().await = Some(stream);
927        self.connected.store(true, Ordering::Release);
928        self.auto_reconnect_suppressed
929            .store(false, Ordering::Release);
930        self.last_pong.store(current_unix_secs(), Ordering::Release);
931        // Invoke hook synchronously, then broadcast for other subscribers
932        self.invoke_hook(HookEvent::SignalingConnected).await;
933        let _ = self.event_tx.send(SignalingEvent::Connected);
934
935        self.stats.connections.fetch_add(1, Ordering::Relaxed);
936
937        Ok(())
938    }
939
940    /// Connect to signaling server with retry and exponential backoff based on reconnect_config.
941    async fn connect_with_retries(&self) -> NetworkResult<()> {
942        use actr_framework::ExponentialBackoff;
943
944        let cfg = &self.config.reconnect_config;
945
946        // If reconnect is disabled, just attempt once.
947        if !cfg.enabled {
948            return self.establish_connection_once().await;
949        }
950
951        let backoff = ExponentialBackoff::builder()
952            .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
953            .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
954            .max_retries(cfg.max_attempts)
955            .with_jitter()
956            .build();
957
958        let mut last_err = None;
959
960        // First attempt immediately (delay = 0), subsequent delays from backoff
961        for (attempt, delay) in std::iter::once(std::time::Duration::ZERO)
962            .chain(backoff)
963            .enumerate()
964        {
965            let attempt = attempt as u32 + 1;
966            self.invoke_hook(HookEvent::SignalingConnectStart { attempt })
967                .await;
968            if delay > std::time::Duration::ZERO {
969                tracing::info!("Retry signaling connect after {delay:?} (attempt {attempt})");
970                tokio::select! {
971                    _ = tokio::time::sleep(delay) => {}
972                    _ = self.reconnect_notify.notified() => {
973                        tracing::debug!("Explicit reconnect request interrupted signaling connect backoff");
974                    }
975                }
976            }
977
978            match self.establish_connection_once().await {
979                Ok(()) => return Ok(()),
980                Err(e) => {
981                    tracing::warn!("Signaling connect attempt {attempt} failed: {e:?}");
982                    last_err = Some(e);
983                }
984            }
985        }
986
987        let total = cfg.max_attempts + 1; // backoff max_retries + first attempt
988        tracing::error!("Signaling connect failed after {total} attempts, giving up");
989        Err(last_err.unwrap_or_else(|| {
990            NetworkError::ConnectionError("All connection attempts failed".to_string())
991        }))
992    }
993
994    /// Send envelope and wait for response with timeout and error handling.
995    #[cfg_attr(
996        feature = "opentelemetry",
997        tracing::instrument(skip_all, fields(envelope_id = %envelope.envelope_id))
998    )]
999    async fn send_envelope_and_wait_response(
1000        &self,
1001        envelope: SignalingEnvelope,
1002    ) -> NetworkResult<SignalingEnvelope> {
1003        let reply_for = envelope.envelope_id.clone();
1004
1005        // Register waiter before sending
1006        let (tx, rx) = oneshot::channel();
1007        self.pending_replies
1008            .lock()
1009            .await
1010            .insert(reply_for.clone(), tx);
1011
1012        if let Err(e) = self.send_envelope(envelope).await {
1013            // Cleanup waiter on immediate send failure to avoid leaks.
1014            self.pending_replies.lock().await.remove(&reply_for);
1015            return Err(e);
1016        }
1017
1018        let result =
1019            tokio::time::timeout(std::time::Duration::from_secs(RESPONSE_TIMEOUT_SECS), rx).await;
1020        // Clean up waiter on timeout
1021        if result.is_err() {
1022            self.pending_replies.lock().await.remove(&reply_for);
1023        }
1024
1025        let response_envelope = result
1026            .map_err(|_| {
1027                NetworkError::ConnectionError(
1028                    "Timed out waiting for signaling response".to_string(),
1029                )
1030            })?
1031            .map_err(|_| {
1032                NetworkError::ConnectionError(
1033                    "Receiver dropped while waiting for signaling response".to_string(),
1034                )
1035            })?;
1036
1037        Ok(response_envelope)
1038    }
1039
1040    /// Spawn background receiver to demux envelopes by reply_for.
1041    async fn start_receiver(&self) {
1042        let mut stream_guard = self.ws_stream.lock().await;
1043        if stream_guard.is_none() {
1044            return;
1045        }
1046
1047        let mut stream = stream_guard.take().expect("stream exists");
1048        let pending = self.pending_replies.clone();
1049        let inbound_tx = { self.inbound_tx.lock().await.clone() };
1050        let stats = self.stats.clone();
1051        let connected = self.connected.clone();
1052        let event_tx = self.event_tx.clone();
1053        let last_pong = self.last_pong.clone();
1054        let pending_pongs = self.pending_pongs.clone();
1055        let reconnect_notify = self.reconnect_notify.clone();
1056        let reconnect_enabled = self.config.reconnect_config.enabled;
1057        let hook_callback = self.hook_callback.get().cloned();
1058        let handle = tokio::spawn(async move {
1059            while let Some(msg) = stream.next().await {
1060                match msg {
1061                    Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
1062                        // Any inbound traffic counts as liveness
1063                        last_pong.store(current_unix_secs(), Ordering::Release);
1064                        match SignalingEnvelope::decode(&data[..]) {
1065                            Ok(envelope) => {
1066                                #[cfg(feature = "opentelemetry")]
1067                                let span = {
1068                                    let span = tracing::info_span!("signaling.receive_envelope", envelope_id = %envelope.envelope_id);
1069                                    span.set_parent(extract_trace_context(&envelope));
1070                                    span
1071                                };
1072
1073                                stats.messages_received.fetch_add(1, Ordering::Relaxed);
1074                                tracing::debug!("Received message: {:?}", envelope);
1075                                if let Some(reply_for) = envelope.reply_for.clone() {
1076                                    if let Some(sender) = pending.lock().await.remove(&reply_for) {
1077                                        #[cfg(feature = "opentelemetry")]
1078                                        let _ = span.enter();
1079                                        if let Err(e) = sender.send(envelope) {
1080                                            stats.errors.fetch_add(1, Ordering::Relaxed);
1081                                            tracing::warn!(
1082                                                "Failed to send reply envelope to waiter: {e:?}",
1083                                            );
1084                                        }
1085                                        continue;
1086                                    }
1087                                }
1088                                tracing::debug!(
1089                                    "Unmatched or push message -> forward to inbound channel"
1090                                );
1091                                // Unmatched or push message -> forward to inbound channel
1092                                if let Err(e) = inbound_tx.send(envelope) {
1093                                    stats.errors.fetch_add(1, Ordering::Relaxed);
1094                                    tracing::warn!(
1095                                        "Failed to send envelope to inbound channel: {e:?}"
1096                                    );
1097                                }
1098                            }
1099                            Err(e) => {
1100                                stats.errors.fetch_add(1, Ordering::Relaxed);
1101                                tracing::warn!("Failed to decode SignalingEnvelope: {e}");
1102                            }
1103                        }
1104                    }
1105                    Ok(tokio_tungstenite::tungstenite::Message::Pong(payload)) => {
1106                        tracing::debug!("Received pong");
1107                        last_pong.store(current_unix_secs(), Ordering::Release);
1108                        if let Some(sender) = pending_pongs.lock().await.remove(&payload.to_vec()) {
1109                            let _ = sender.send(());
1110                        }
1111                    }
1112                    Ok(tokio_tungstenite::tungstenite::Message::Ping(_)) => {
1113                        tracing::debug!("Received ping");
1114                        last_pong.store(current_unix_secs(), Ordering::Release);
1115                    }
1116                    Ok(other) => {
1117                        tracing::warn!("Received non-binary frame, ignoring: {other:?}");
1118                    }
1119                    Err(e) => {
1120                        stats.errors.fetch_add(1, Ordering::Relaxed);
1121                        tracing::error!("Signaling receive error: {e}");
1122                        break;
1123                    }
1124                }
1125            }
1126
1127            tracing::warn!("Stream terminated");
1128            // If explicit disconnect already marked the client disconnected,
1129            // do not start an automatic reconnect cycle for the intentional
1130            // close. The disconnect path publishes its own Manual event.
1131            let was_connected = connected.swap(false, Ordering::AcqRel);
1132            Self::publish_disconnected_transition(
1133                was_connected,
1134                &stats,
1135                &event_tx,
1136                hook_callback,
1137                DisconnectReason::StreamEnded,
1138                reconnect_enabled.then_some(&reconnect_notify),
1139            )
1140            .await;
1141            pending_pongs.lock().await.clear();
1142        });
1143
1144        *self.receiver_task.lock().await = Some(handle);
1145    }
1146
1147    /// Spawn background ping task to detect half-open connections where writes do not fail but peer is gone.
1148    /// fixme: merge to heartbeat task
1149    async fn start_ping_task(&self) {
1150        let mut existing = self.ping_task.lock().await;
1151        if let Some(handle) = existing.as_ref() {
1152            if handle.is_finished() {
1153                existing.take();
1154            } else {
1155                return;
1156            }
1157        }
1158
1159        let sink = self.ws_sink.clone();
1160        let connected = self.connected.clone();
1161        let stats = self.stats.clone();
1162        let event_tx = self.event_tx.clone();
1163        let last_pong = self.last_pong.clone();
1164        let receiver_task_clone = Arc::clone(&self.receiver_task);
1165        let reconnect_notify = self.reconnect_notify.clone();
1166        let reconnect_enabled = self.config.reconnect_config.enabled;
1167        let hook_callback = self.hook_callback.get().cloned();
1168
1169        let handle = tokio::spawn(async move {
1170            loop {
1171                tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
1172
1173                if !connected.load(Ordering::Acquire) {
1174                    break;
1175                }
1176
1177                // Send ping; mark disconnect on failure.
1178                let mut disconnect_reason = None;
1179                {
1180                    let mut sink_guard = sink.lock().await;
1181                    if let Some(sink) = sink_guard.as_mut() {
1182                        match tokio::time::timeout(
1183                            std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1184                            sink.send(tokio_tungstenite::tungstenite::Message::Ping(
1185                                Vec::new().into(),
1186                            )),
1187                        )
1188                        .await
1189                        {
1190                            Ok(Ok(())) => {}
1191                            Ok(Err(e)) => {
1192                                tracing::warn!("Signaling ping send failed: {e}");
1193                                disconnect_reason = Some(DisconnectReason::PingSendFailed);
1194                            }
1195                            Err(_) => {
1196                                tracing::warn!("Signaling ping send timed out");
1197                                disconnect_reason = Some(DisconnectReason::PingSendFailed);
1198                            }
1199                        }
1200                    } else {
1201                        tracing::warn!("Signaling not connected");
1202                        disconnect_reason = Some(DisconnectReason::PingSendFailed);
1203                    }
1204                }
1205
1206                if let Some(reason) = disconnect_reason {
1207                    let was_connected = connected.swap(false, Ordering::AcqRel);
1208                    Self::publish_disconnected_transition(
1209                        was_connected,
1210                        &stats,
1211                        &event_tx,
1212                        hook_callback.clone(),
1213                        reason,
1214                        reconnect_enabled.then_some(&reconnect_notify),
1215                    )
1216                    .await;
1217                    break;
1218                }
1219
1220                // Check for stale pong
1221                let now = current_unix_secs();
1222                let last = last_pong.load(Ordering::Acquire);
1223                if now.saturating_sub(last) > PONG_TIMEOUT_SECS {
1224                    tracing::warn!(
1225                        "Signaling pong timeout (last seen {}s ago), marking disconnected",
1226                        now.saturating_sub(last)
1227                    );
1228                    if let Some(handle) = receiver_task_clone.lock().await.take() {
1229                        handle.abort();
1230                    }
1231                    let was_connected = connected.swap(false, Ordering::AcqRel);
1232                    Self::publish_disconnected_transition(
1233                        was_connected,
1234                        &stats,
1235                        &event_tx,
1236                        hook_callback.clone(),
1237                        DisconnectReason::PongTimeout,
1238                        reconnect_enabled.then_some(&reconnect_notify),
1239                    )
1240                    .await;
1241                    break;
1242                }
1243            }
1244        });
1245
1246        *existing = Some(handle);
1247    }
1248
1249    async fn disconnect_internal(&self, suppress_auto_reconnect: bool) -> NetworkResult<()> {
1250        if suppress_auto_reconnect {
1251            self.reconnect_generation.fetch_add(1, Ordering::AcqRel);
1252            self.auto_reconnect_suppressed
1253                .store(true, Ordering::Release);
1254            self.reconnect_notify.notify_waiters();
1255        }
1256
1257        self.drop_pending_replies("signaling disconnect").await;
1258        self.drop_pending_pongs("signaling disconnect").await;
1259        let was_connected = self.connected.swap(false, Ordering::AcqRel);
1260
1261        // Stop background tasks before taking the WebSocket sink/stream locks.
1262        // A ping or receiver task can be inside a socket operation while holding
1263        // one of those locks; aborting first keeps disconnect from waiting on
1264        // the task it is about to shut down.
1265        let ping_handle = match tokio::time::timeout(
1266            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1267            self.ping_task.lock(),
1268        )
1269        .await
1270        {
1271            Ok(mut task_guard) => task_guard.take(),
1272            Err(_) => {
1273                tracing::warn!("Timed out waiting for signaling ping task lock during disconnect");
1274                None
1275            }
1276        };
1277        if let Some(handle) = ping_handle {
1278            handle.abort();
1279        }
1280
1281        let receiver_handle = match tokio::time::timeout(
1282            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1283            self.receiver_task.lock(),
1284        )
1285        .await
1286        {
1287            Ok(mut task_guard) => task_guard.take(),
1288            Err(_) => {
1289                tracing::warn!(
1290                    "Timed out waiting for signaling receiver task lock during disconnect"
1291                );
1292                None
1293            }
1294        };
1295        if let Some(handle) = receiver_handle {
1296            handle.abort();
1297        }
1298
1299        // Fetch and close the sink without holding the mutex during the close
1300        // await. The lock itself is bounded because a stalled send can hold it
1301        // on broken mobile network transitions.
1302        let sink = match tokio::time::timeout(
1303            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1304            self.ws_sink.lock(),
1305        )
1306        .await
1307        {
1308            Ok(mut sink_guard) => sink_guard.take(),
1309            Err(_) => {
1310                tracing::warn!(
1311                    "Timed out waiting for signaling WebSocket sink lock during disconnect"
1312                );
1313                None
1314            }
1315        };
1316
1317        if let Some(mut sink) = sink {
1318            match tokio::time::timeout(
1319                std::time::Duration::from_secs(DISCONNECT_CLOSE_TIMEOUT_SECS),
1320                sink.close(),
1321            )
1322            .await
1323            {
1324                Ok(Ok(())) => {}
1325                Ok(Err(e)) => {
1326                    tracing::warn!("Signaling WebSocket close failed during disconnect: {}", e);
1327                }
1328                Err(_) => {
1329                    tracing::warn!(
1330                        "Signaling WebSocket close timed out during disconnect; continuing cleanup"
1331                    );
1332                }
1333            }
1334        }
1335
1336        match tokio::time::timeout(
1337            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1338            self.ws_stream.lock(),
1339        )
1340        .await
1341        {
1342            Ok(mut stream_guard) => {
1343                stream_guard.take();
1344            }
1345            Err(_) => {
1346                tracing::warn!(
1347                    "Timed out waiting for signaling WebSocket stream lock during disconnect"
1348                );
1349            }
1350        }
1351
1352        self.reset_inbound_channel().await;
1353
1354        // Invoke hook synchronously, then broadcast for other subscribers
1355        Self::publish_disconnected_transition(
1356            was_connected,
1357            &self.stats,
1358            &self.event_tx,
1359            self.hook_callback.get().cloned(),
1360            DisconnectReason::Manual,
1361            None,
1362        )
1363        .await;
1364
1365        Ok(())
1366    }
1367
1368    async fn connect_once_for_auto_reconnect(&self, generation: u64) -> NetworkResult<()> {
1369        if self.auto_reconnect_cancelled(generation) {
1370            return Err(NetworkError::ConnectionError(
1371                "Signaling auto-reconnect was cancelled".to_string(),
1372            ));
1373        }
1374
1375        if self.connected.load(Ordering::Acquire) {
1376            tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1377            return Ok(());
1378        }
1379
1380        match self
1381            .connecting
1382            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1383        {
1384            Ok(_) => {}
1385            Err(_) => {
1386                if self.connected.load(Ordering::Acquire) {
1387                    tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1388                    return Ok(());
1389                }
1390
1391                tracing::debug!(
1392                    "Another connection attempt in progress, waiting for state change..."
1393                );
1394                let result = self.wait_for_connection_result().await;
1395                if self.auto_reconnect_cancelled(generation) {
1396                    return Err(NetworkError::ConnectionError(
1397                        "Signaling auto-reconnect was cancelled".to_string(),
1398                    ));
1399                }
1400                return result;
1401            }
1402        }
1403
1404        if self.auto_reconnect_cancelled(generation) {
1405            self.connecting.store(false, Ordering::Release);
1406            return Err(NetworkError::ConnectionError(
1407                "Signaling auto-reconnect was cancelled".to_string(),
1408            ));
1409        }
1410
1411        if self.connected.load(Ordering::Acquire) {
1412            tracing::debug!("Connection completed by another task while acquiring lock");
1413            self.connecting.store(false, Ordering::Release);
1414            return Ok(());
1415        }
1416
1417        tracing::debug!(
1418            generation,
1419            "Acquired connection lock, establishing one auto-reconnect signaling attempt..."
1420        );
1421
1422        let result = self
1423            .establish_connection_once_for_auto_reconnect(generation)
1424            .await;
1425        self.connecting.store(false, Ordering::Release);
1426
1427        match result {
1428            Ok(()) => {
1429                if self.auto_reconnect_cancelled(generation) {
1430                    self.disconnect_internal(false).await?;
1431                    return Err(NetworkError::ConnectionError(
1432                        "Signaling auto-reconnect was cancelled".to_string(),
1433                    ));
1434                }
1435                self.start_receiver().await;
1436                self.start_ping_task().await;
1437                Ok(())
1438            }
1439            Err(e) => {
1440                if !self.auto_reconnect_cancelled(generation) {
1441                    let _ = self.event_tx.send(SignalingEvent::Disconnected {
1442                        reason: DisconnectReason::ConnectionFailed(e.to_string()),
1443                    });
1444                    tracing::error!("Connection attempt failed: {e}");
1445                }
1446                Err(e)
1447            }
1448        }
1449    }
1450
1451    /// Wait for ongoing connection attempt to complete (used when another task is connecting).
1452    ///
1453    /// Uses the broadcast channel to wait for a Connected event without recursion.
1454    async fn wait_for_connection_result(&self) -> NetworkResult<()> {
1455        let mut event_rx = self.event_tx.subscribe();
1456        let deadline = tokio::time::Instant::now()
1457            + std::time::Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS);
1458
1459        loop {
1460            tokio::select! {
1461                _ = tokio::time::sleep_until(deadline) => {
1462                    // Final check before giving up
1463                    if self.connected.load(Ordering::Acquire) {
1464                        tracing::debug!("Connection succeeded just before timeout");
1465                        return Ok(());
1466                    }
1467                    return Err(NetworkError::ConnectionError(
1468                        "Timeout waiting for concurrent connection attempt".to_string(),
1469                    ));
1470                }
1471                result = event_rx.recv() => {
1472                    match result {
1473                        Ok(SignalingEvent::Connected) => {
1474                            tracing::debug!("Connection established by another task");
1475                            return Ok(());
1476                        }
1477                        Ok(SignalingEvent::Disconnected { reason }) => {
1478                            return Err(NetworkError::ConnectionError(format!(
1479                                "Concurrent signaling connection failed: {reason:?}"
1480                            )));
1481                        }
1482                        Ok(_) => continue, // ConnectStart — keep waiting
1483                        Err(broadcast::error::RecvError::Lagged(n)) => {
1484                            tracing::warn!("Event receiver lagged by {n} events");
1485                            // Check current state after lag
1486                            if self.connected.load(Ordering::Acquire) {
1487                                return Ok(());
1488                            }
1489                            continue;
1490                        }
1491                        Err(broadcast::error::RecvError::Closed) => {
1492                            return Err(NetworkError::ConnectionError(
1493                                "Event channel closed while waiting for connection".to_string(),
1494                            ));
1495                        }
1496                    }
1497                }
1498            }
1499        }
1500    }
1501}
1502
1503#[async_trait]
1504impl SignalingClient for WebSocketSignalingClient {
1505    async fn connect(&self) -> NetworkResult<()> {
1506        // 🔐 Try to acquire "connecting" lock using compare-and-swap
1507        // This is the first checkpoint - moving the fast-path check after lock acquisition
1508        // eliminates the TOCTOU window where another task could have established a connection
1509        // between our fast-path check and lock acquisition.
1510        match self
1511            .connecting
1512            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1513        {
1514            Ok(_) => {
1515                // We successfully acquired the lock (CAS changed false → true)
1516                // Now we hold exclusive access to the connection establishment
1517            }
1518            Err(_) => {
1519                // CAS failed - either another task holds the lock, or a connection exists
1520                // Check if already connected before waiting
1521                if self.connected.load(Ordering::Acquire) {
1522                    tracing::debug!("Already connected, skipping connect()");
1523                    return Ok(());
1524                }
1525
1526                // Another task is connecting, wait for state change using broadcast channel
1527                tracing::debug!(
1528                    "Another connection attempt in progress, waiting for state change..."
1529                );
1530                return self.wait_for_connection_result().await;
1531            }
1532        }
1533
1534        // 🔐 We now hold the "connecting" lock exclusively
1535        // Re-check connected: another task may have finished connecting between our
1536        // CAS and this load (the AcqRel fence on CAS guarantees we see the latest
1537        // `connected` store from whoever last released `connecting`).
1538        if self.connected.load(Ordering::Acquire) {
1539            tracing::debug!("Connection completed by another task while acquiring lock");
1540            self.connecting.store(false, Ordering::Release);
1541            return Ok(());
1542        }
1543
1544        tracing::debug!("Acquired connection lock, establishing connection...");
1545
1546        // Perform actual connection
1547        let result = self.connect_with_retries().await;
1548
1549        // Clear "connecting" flag regardless of result
1550        self.connecting.store(false, Ordering::Release);
1551
1552        // Handle connection result
1553        match result {
1554            Ok(()) => {
1555                self.start_receiver().await;
1556                self.start_ping_task().await;
1557                Ok(())
1558            }
1559            Err(e) => {
1560                // Explicitly notify waiting tasks that connection failed
1561                let _ = self.event_tx.send(SignalingEvent::Disconnected {
1562                    reason: DisconnectReason::ConnectionFailed(e.to_string()),
1563                });
1564                tracing::error!("Connection failed: {e}");
1565                Err(e)
1566            }
1567        }
1568    }
1569
1570    async fn connect_once(&self) -> NetworkResult<()> {
1571        loop {
1572            if self.connected.load(Ordering::Acquire) {
1573                tracing::debug!("Already connected, skipping connect_once()");
1574                return Ok(());
1575            }
1576
1577            match self
1578                .connecting
1579                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1580            {
1581                Ok(_) => break,
1582                Err(_) => {
1583                    if self.connected.load(Ordering::Acquire) {
1584                        tracing::debug!("Already connected, skipping connect_once()");
1585                        return Ok(());
1586                    }
1587
1588                    tracing::debug!(
1589                        "Another connection attempt in progress, waiting for state change..."
1590                    );
1591                    match self.wait_for_connection_result().await {
1592                        Ok(()) => return Ok(()),
1593                        Err(e)
1594                            if !self.connected.load(Ordering::Acquire)
1595                                && !self.connecting.load(Ordering::Acquire) =>
1596                        {
1597                            tracing::debug!(
1598                                "Concurrent signaling connection failed; explicit connect_once will retry immediately: {e}"
1599                            );
1600                            continue;
1601                        }
1602                        Err(e) => return Err(e),
1603                    }
1604                }
1605            }
1606        }
1607
1608        if self.connected.load(Ordering::Acquire) {
1609            tracing::debug!("Connection completed by another task while acquiring lock");
1610            self.connecting.store(false, Ordering::Release);
1611            return Ok(());
1612        }
1613
1614        tracing::debug!(
1615            "Acquired connection lock, establishing one signaling connection attempt..."
1616        );
1617
1618        let result = self.establish_connection_once().await;
1619        self.connecting.store(false, Ordering::Release);
1620
1621        match result {
1622            Ok(()) => {
1623                self.start_receiver().await;
1624                self.start_ping_task().await;
1625                Ok(())
1626            }
1627            Err(e) => {
1628                let _ = self.event_tx.send(SignalingEvent::Disconnected {
1629                    reason: DisconnectReason::ConnectionFailed(e.to_string()),
1630                });
1631                tracing::error!("Connection attempt failed: {e}");
1632                Err(e)
1633            }
1634        }
1635    }
1636
1637    fn schedule_auto_reconnect(&self) {
1638        if !self.config.reconnect_config.enabled {
1639            tracing::debug!("Skipping signaling auto-reconnect schedule; config disabled");
1640            return;
1641        }
1642
1643        self.auto_reconnect_suppressed
1644            .store(false, Ordering::Release);
1645        self.reconnect_notify.notify_one();
1646    }
1647
1648    async fn disconnect(&self) -> NetworkResult<()> {
1649        self.disconnect_internal(true).await
1650    }
1651
1652    async fn probe_alive(&self, timeout: Duration) -> NetworkResult<()> {
1653        if !self.connected.load(Ordering::Acquire) {
1654            return Err(NetworkError::ConnectionError(
1655                "Signaling client is not connected".to_string(),
1656            ));
1657        }
1658
1659        let probe_id = self.probe_counter.fetch_add(1, Ordering::Relaxed) + 1;
1660        let payload =
1661            format!("actr-signaling-probe-{probe_id}-{}", current_unix_secs()).into_bytes();
1662        let (tx, rx) = oneshot::channel();
1663        self.pending_pongs.lock().await.insert(payload.clone(), tx);
1664
1665        let send_timeout = std::cmp::min(
1666            timeout,
1667            std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1668        );
1669        let ping_payload = payload.clone();
1670        let send_result = tokio::time::timeout(send_timeout, async {
1671            let mut sink_guard = self.ws_sink.lock().await;
1672            match sink_guard.as_mut() {
1673                Some(sink) => sink
1674                    .send(tokio_tungstenite::tungstenite::Message::Ping(
1675                        ping_payload.into(),
1676                    ))
1677                    .await
1678                    .map_err(|e| {
1679                        NetworkError::ConnectionError(format!("Signaling probe ping failed: {e}"))
1680                    }),
1681                None => Err(NetworkError::ConnectionError(
1682                    "Signaling probe failed: WebSocket sink is not available".to_string(),
1683                )),
1684            }
1685        })
1686        .await
1687        .unwrap_or_else(|_| {
1688            Err(NetworkError::TimeoutError(format!(
1689                "Timed out sending signaling probe ping after {}ms",
1690                send_timeout.as_millis()
1691            )))
1692        });
1693
1694        if let Err(e) = send_result {
1695            self.pending_pongs.lock().await.remove(&payload);
1696            let was_connected = self.connected.swap(false, Ordering::AcqRel);
1697            Self::publish_disconnected_transition(
1698                was_connected,
1699                &self.stats,
1700                &self.event_tx,
1701                self.hook_callback.get().cloned(),
1702                DisconnectReason::PingSendFailed,
1703                None,
1704            )
1705            .await;
1706            return Err(e);
1707        }
1708
1709        match tokio::time::timeout(timeout, rx).await {
1710            Ok(Ok(())) => {
1711                self.last_pong.store(current_unix_secs(), Ordering::Release);
1712                Ok(())
1713            }
1714            Ok(Err(_)) => {
1715                self.pending_pongs.lock().await.remove(&payload);
1716                Err(NetworkError::ConnectionError(
1717                    "Signaling probe pong waiter dropped".to_string(),
1718                ))
1719            }
1720            Err(_) => {
1721                self.pending_pongs.lock().await.remove(&payload);
1722                Err(NetworkError::TimeoutError(format!(
1723                    "Timed out waiting for signaling probe pong after {}ms",
1724                    timeout.as_millis()
1725                )))
1726            }
1727        }
1728    }
1729
1730    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1731    async fn send_register_request(
1732        &self,
1733        request: RegisterRequest,
1734    ) -> NetworkResult<RegisterResponse> {
1735        // Create PeerToSignaling stream process (Register front )
1736        let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
1737            payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
1738        });
1739
1740        let envelope = self.create_envelope(flow).await;
1741        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1742
1743        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1744        {
1745            if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
1746                server_to_actr.payload
1747            {
1748                return Ok(response);
1749            }
1750        }
1751
1752        Err(NetworkError::ConnectionError(
1753            "Invalid registration response".to_string(),
1754        ))
1755    }
1756
1757    #[cfg_attr(
1758        feature = "opentelemetry",
1759        tracing::instrument(skip_all, fields(actor_id = %actor_id))
1760    )]
1761    async fn send_unregister_request(
1762        &self,
1763        actor_id: ActrId,
1764        credential: AIdCredential,
1765        reason: Option<String>,
1766    ) -> NetworkResult<UnregisterResponse> {
1767        // Build UnregisterRequest payload
1768        let request = UnregisterRequest {
1769            actr_id: actor_id.clone(),
1770            reason,
1771        };
1772
1773        // Wrap into ActrToSignaling flow
1774        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1775            source: actor_id,
1776            credential,
1777            payload: Some(actr_to_signaling::Payload::UnregisterRequest(request)),
1778        });
1779
1780        // Send envelope (fire-and-forget)
1781        let envelope = self.create_envelope(flow).await;
1782        self.send_envelope(envelope).await?;
1783
1784        // Do not wait for UnregisterResponse here because the signaling stream
1785        // is also consumed by WebRtcCoordinator. Waiting could race with that loop
1786        // and lead to spurious timeouts. Treat Unregister as best-effort.
1787        // not wait for the response , because the signaling stream have multi customers use it, fixme: should wait for the response
1788        Ok(UnregisterResponse {
1789            result: Some(actr_protocol::unregister_response::Result::Success(
1790                actr_protocol::unregister_response::UnregisterOk {},
1791            )),
1792        })
1793    }
1794
1795    #[cfg_attr(
1796        feature = "opentelemetry",
1797        tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1798    )]
1799    async fn send_heartbeat(
1800        &self,
1801        actor_id: ActrId,
1802        credential: AIdCredential,
1803        availability: ServiceAvailabilityState,
1804        power_reserve: f32,
1805        mailbox_backlog: f32,
1806    ) -> NetworkResult<Pong> {
1807        let ping = Ping {
1808            availability: availability as i32,
1809            power_reserve,
1810            mailbox_backlog,
1811            sticky_client_ids: vec![], // TODO: Implement sticky session tracking
1812        };
1813
1814        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1815            source: actor_id,
1816            credential,
1817            payload: Some(actr_to_signaling::Payload::Ping(ping)),
1818        });
1819
1820        let envelope = self.create_envelope(flow).await;
1821        let reply_for = envelope.envelope_id.clone();
1822
1823        // Register waiter before sending
1824        let (tx, rx) = oneshot::channel();
1825        self.pending_replies
1826            .lock()
1827            .await
1828            .insert(reply_for.clone(), tx);
1829
1830        if let Err(e) = self.send_envelope(envelope).await {
1831            // Cleanup waiter on immediate send failure to avoid leaks.
1832            self.pending_replies.lock().await.remove(&reply_for);
1833            return Err(e);
1834        }
1835
1836        // Wait for response
1837        let response_envelope = rx.await.map_err(|_| {
1838            NetworkError::ConnectionError(
1839                "Receiver dropped while waiting for heartbeat response".to_string(),
1840            )
1841        })?;
1842
1843        // Extract Pong from response, or handle Error response
1844        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1845        {
1846            match server_to_actr.payload {
1847                Some(signaling_to_actr::Payload::Pong(pong)) => {
1848                    return Ok(pong);
1849                }
1850                Some(signaling_to_actr::Payload::Error(err)) => {
1851                    // Check if it's a credential expired error (401)
1852                    if err.code == 401 {
1853                        return Err(NetworkError::CredentialExpired(err.message));
1854                    }
1855                    return Err(NetworkError::AuthenticationError(format!(
1856                        "{} ({})",
1857                        err.message, err.code
1858                    )));
1859                }
1860                _ => {}
1861            }
1862        }
1863
1864        Err(NetworkError::ConnectionError(
1865            "Received response but not a Pong message".to_string(),
1866        ))
1867    }
1868
1869    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1870    async fn send_route_candidates_request(
1871        &self,
1872        actor_id: ActrId,
1873        credential: AIdCredential,
1874        request: RouteCandidatesRequest,
1875    ) -> NetworkResult<RouteCandidatesResponse> {
1876        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1877            source: actor_id,
1878            credential,
1879            payload: Some(actr_to_signaling::Payload::RouteCandidatesRequest(request)),
1880        });
1881
1882        let envelope = self.create_envelope(flow).await;
1883        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1884
1885        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1886        {
1887            match server_to_actr.payload {
1888                Some(signaling_to_actr::Payload::RouteCandidatesResponse(response)) => {
1889                    return Ok(response);
1890                }
1891                Some(signaling_to_actr::Payload::Error(err)) => {
1892                    return Err(NetworkError::ServiceDiscoveryError(format!(
1893                        "{} ({})",
1894                        err.message, err.code
1895                    )));
1896                }
1897                _ => {}
1898            }
1899        }
1900
1901        Err(NetworkError::ConnectionError(
1902            "Invalid route candidates response".to_string(),
1903        ))
1904    }
1905
1906    async fn get_signing_key(
1907        &self,
1908        actor_id: ActrId,
1909        credential: AIdCredential,
1910        key_id: u32,
1911    ) -> NetworkResult<(u32, Vec<u8>)> {
1912        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1913            source: actor_id,
1914            credential,
1915            payload: Some(actr_to_signaling::Payload::GetSigningKeyRequest(
1916                GetSigningKeyRequest { key_id },
1917            )),
1918        });
1919
1920        let envelope = self.create_envelope(flow).await;
1921        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1922
1923        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1924        {
1925            match server_to_actr.payload {
1926                Some(signaling_to_actr::Payload::GetSigningKeyResponse(resp)) => {
1927                    return Ok((resp.key_id, resp.pubkey.to_vec()));
1928                }
1929                Some(signaling_to_actr::Payload::Error(err)) => {
1930                    return Err(NetworkError::ConnectionError(format!(
1931                        "get_signing_key failed: {} ({})",
1932                        err.message, err.code
1933                    )));
1934                }
1935                _ => {}
1936            }
1937        }
1938
1939        Err(NetworkError::ConnectionError(
1940            "get_signing_key: invalid response".to_string(),
1941        ))
1942    }
1943
1944    #[cfg_attr(
1945        feature = "opentelemetry",
1946        tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1947    )]
1948    async fn send_credential_update_request(
1949        &self,
1950        actor_id: ActrId,
1951        credential: AIdCredential,
1952    ) -> NetworkResult<RegisterResponse> {
1953        let request = CredentialUpdateRequest {
1954            actr_id: actor_id.clone(),
1955        };
1956
1957        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1958            source: actor_id,
1959            credential,
1960            payload: Some(actr_to_signaling::Payload::CredentialUpdateRequest(request)),
1961        });
1962
1963        let envelope = self.create_envelope(flow).await;
1964        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1965
1966        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1967        {
1968            match server_to_actr.payload {
1969                Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
1970                    return Ok(response);
1971                }
1972                Some(signaling_to_actr::Payload::Error(err)) => {
1973                    return Err(NetworkError::ConnectionError(format!(
1974                        "Credential update failed: {} ({})",
1975                        err.message, err.code
1976                    )));
1977                }
1978                _ => {}
1979            }
1980        }
1981
1982        Err(NetworkError::ConnectionError(
1983            "Invalid credential update response".to_string(),
1984        ))
1985    }
1986
1987    #[cfg_attr(
1988        feature = "opentelemetry",
1989        tracing::instrument(level = "debug", skip_all, fields(envelope_id = %envelope.envelope_id))
1990    )]
1991    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
1992        #[cfg(feature = "opentelemetry")]
1993        let envelope = {
1994            let mut envelope = envelope;
1995            trace::inject_span_context(&tracing::Span::current(), &mut envelope);
1996            envelope
1997        };
1998
1999        // Check connection state first to avoid sending on stale/closed connections
2000        // This prevents "Broken pipe" errors when ws_sink exists but connection is dead
2001        if !self.is_connected() {
2002            return Err(NetworkError::ConnectionError(
2003                "Cannot send: WebSocket not connected".to_string(),
2004            ));
2005        }
2006
2007        let mut sink_guard = self.ws_sink.lock().await;
2008
2009        if let Some(sink) = sink_guard.as_mut() {
2010            // using protobuf binary serialization
2011            let mut buf = Vec::new();
2012            envelope.encode(&mut buf)?;
2013            let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
2014            match tokio::time::timeout(
2015                std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
2016                sink.send(msg),
2017            )
2018            .await
2019            {
2020                Ok(Ok(())) => {}
2021                Ok(Err(e)) => return Err(e.into()),
2022                Err(_) => {
2023                    self.connected.store(false, Ordering::Release);
2024                    return Err(NetworkError::ConnectionError(
2025                        "Signaling WebSocket send timed out".to_string(),
2026                    ));
2027                }
2028            }
2029
2030            self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
2031            tracing::debug!("Stats: {:?}", self.stats.snapshot());
2032            Ok(())
2033        } else {
2034            Err(NetworkError::ConnectionError("Not connected".to_string()))
2035        }
2036    }
2037
2038    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2039        let mut rx = self.inbound_rx.lock().await;
2040        match rx.recv().await {
2041            Some(envelope) => Ok(Some(envelope)),
2042            None => {
2043                tracing::error!("Inbound channel closed");
2044                Err(NetworkError::ConnectionError(
2045                    "Inbound channel closed".to_string(),
2046                ))
2047            }
2048        }
2049    }
2050
2051    fn is_connected(&self) -> bool {
2052        self.connected.load(Ordering::Acquire)
2053    }
2054
2055    fn get_stats(&self) -> SignalingStats {
2056        self.stats.snapshot()
2057    }
2058
2059    fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2060        self.event_tx.subscribe()
2061    }
2062
2063    async fn set_actor_id(&self, actor_id: ActrId) {
2064        *self.actor_id.lock().await = Some(actor_id);
2065    }
2066
2067    async fn set_credential_state(&self, credential_state: CredentialState) {
2068        *self.credential_state.lock().await = Some(credential_state);
2069    }
2070
2071    async fn clear_identity(&self) {
2072        *self.actor_id.lock().await = None;
2073        *self.credential_state.lock().await = None;
2074    }
2075
2076    fn set_hook_callback(&self, cb: HookCallback) {
2077        let _ = self.hook_callback.set(cb);
2078    }
2079}
2080
2081/// signaling statistics info
2082#[derive(Debug)]
2083pub(crate) struct AtomicSignalingStats {
2084    /// Connect attempts
2085    pub connections: AtomicU64,
2086
2087    /// DisconnectConnect attempts
2088    pub disconnections: AtomicU64,
2089
2090    /// Send'smessage number
2091    pub messages_sent: AtomicU64,
2092
2093    /// Receive'smessage number
2094    pub messages_received: AtomicU64,
2095
2096    /// Send's center skip number
2097    /// TODO: Wire heartbeat counters when heartbeat send/receive paths are instrumented; currently never incremented.
2098    pub heartbeats_sent: AtomicU64,
2099
2100    /// Receive's center skip number
2101    /// TODO: Wire heartbeat counters when heartbeat send/receive paths are instrumented; currently never incremented.
2102    pub heartbeats_received: AtomicU64,
2103
2104    /// Error attempts
2105    pub errors: AtomicU64,
2106}
2107
2108impl Default for AtomicSignalingStats {
2109    fn default() -> Self {
2110        Self {
2111            connections: AtomicU64::new(0),
2112            disconnections: AtomicU64::new(0),
2113            messages_sent: AtomicU64::new(0),
2114            messages_received: AtomicU64::new(0),
2115            heartbeats_sent: AtomicU64::new(0),
2116            heartbeats_received: AtomicU64::new(0),
2117            errors: AtomicU64::new(0),
2118        }
2119    }
2120}
2121
2122/// Snapshot of statistics for serialization and reading
2123#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
2124pub struct SignalingStats {
2125    /// Connect attempts
2126    pub connections: u64,
2127
2128    /// DisconnectConnect attempts
2129    pub disconnections: u64,
2130
2131    /// Send'smessage number
2132    pub messages_sent: u64,
2133
2134    /// Receive'smessage number
2135    pub messages_received: u64,
2136
2137    /// Send's center skip number
2138    pub heartbeats_sent: u64,
2139
2140    /// Receive's center skip number
2141    pub heartbeats_received: u64,
2142
2143    /// Error attempts
2144    pub errors: u64,
2145}
2146
2147impl AtomicSignalingStats {
2148    /// Create a snapshot of current statistics
2149    pub fn snapshot(&self) -> SignalingStats {
2150        SignalingStats {
2151            connections: self.connections.load(Ordering::Relaxed),
2152            disconnections: self.disconnections.load(Ordering::Relaxed),
2153            messages_sent: self.messages_sent.load(Ordering::Relaxed),
2154            messages_received: self.messages_received.load(Ordering::Relaxed),
2155            heartbeats_sent: self.heartbeats_sent.load(Ordering::Relaxed),
2156            heartbeats_received: self.heartbeats_received.load(Ordering::Relaxed),
2157            errors: self.errors.load(Ordering::Relaxed),
2158        }
2159    }
2160}
2161
2162fn current_unix_secs() -> u64 {
2163    use std::time::{SystemTime, UNIX_EPOCH};
2164    SystemTime::now()
2165        .duration_since(UNIX_EPOCH)
2166        .unwrap_or_default()
2167        .as_secs()
2168}
2169
2170#[cfg(test)]
2171mod tests {
2172    use super::*;
2173    use std::future::Future;
2174    use std::pin::Pin;
2175    use std::sync::atomic::{AtomicUsize, Ordering as UsizeOrdering};
2176
2177    /// Simple fake SignalingClient implementation for testing the reconnect helper.
2178    struct FakeSignalingClient {
2179        event_tx: broadcast::Sender<SignalingEvent>,
2180        connected: AtomicBool,
2181        connect_calls: Arc<AtomicUsize>,
2182        actor_id: tokio::sync::Mutex<Option<ActrId>>,
2183        credential_state: tokio::sync::Mutex<Option<CredentialState>>,
2184    }
2185
2186    #[async_trait]
2187    impl SignalingClient for FakeSignalingClient {
2188        async fn connect(&self) -> NetworkResult<()> {
2189            self.connect_calls.fetch_add(1, UsizeOrdering::SeqCst);
2190            Ok(())
2191        }
2192
2193        async fn disconnect(&self) -> NetworkResult<()> {
2194            Ok(())
2195        }
2196
2197        async fn send_register_request(
2198            &self,
2199            _request: RegisterRequest,
2200        ) -> NetworkResult<RegisterResponse> {
2201            unimplemented!("not needed in tests");
2202        }
2203
2204        async fn send_unregister_request(
2205            &self,
2206            _actor_id: ActrId,
2207            _credential: AIdCredential,
2208            _reason: Option<String>,
2209        ) -> NetworkResult<UnregisterResponse> {
2210            unimplemented!("not needed in tests");
2211        }
2212
2213        async fn send_heartbeat(
2214            &self,
2215            _actor_id: ActrId,
2216            _credential: AIdCredential,
2217            _availability: ServiceAvailabilityState,
2218            _power_reserve: f32,
2219            _mailbox_backlog: f32,
2220        ) -> NetworkResult<Pong> {
2221            unimplemented!("not needed in tests");
2222        }
2223
2224        async fn send_route_candidates_request(
2225            &self,
2226            _actor_id: ActrId,
2227            _credential: AIdCredential,
2228            _request: RouteCandidatesRequest,
2229        ) -> NetworkResult<RouteCandidatesResponse> {
2230            unimplemented!("not needed in tests");
2231        }
2232
2233        async fn get_signing_key(
2234            &self,
2235            _actor_id: ActrId,
2236            _credential: AIdCredential,
2237            _key_id: u32,
2238        ) -> NetworkResult<(u32, Vec<u8>)> {
2239            unimplemented!("not needed in tests");
2240        }
2241
2242        async fn send_credential_update_request(
2243            &self,
2244            _actor_id: ActrId,
2245            _credential: AIdCredential,
2246        ) -> NetworkResult<RegisterResponse> {
2247            unimplemented!("not needed in tests");
2248        }
2249
2250        async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
2251            unimplemented!("not needed in tests");
2252        }
2253
2254        async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2255            unimplemented!("not needed in tests");
2256        }
2257
2258        fn is_connected(&self) -> bool {
2259            self.connected.load(Ordering::SeqCst)
2260        }
2261
2262        fn get_stats(&self) -> SignalingStats {
2263            SignalingStats::default()
2264        }
2265
2266        fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2267            self.event_tx.subscribe()
2268        }
2269
2270        async fn set_actor_id(&self, actor_id: ActrId) {
2271            *self.actor_id.lock().await = Some(actor_id);
2272        }
2273
2274        async fn set_credential_state(&self, credential_state: CredentialState) {
2275            *self.credential_state.lock().await = Some(credential_state);
2276        }
2277
2278        async fn clear_identity(&self) {
2279            *self.actor_id.lock().await = None;
2280            *self.credential_state.lock().await = None;
2281        }
2282    }
2283
2284    fn make_fake_client() -> Arc<FakeSignalingClient> {
2285        let (event_tx, _erx) = broadcast::channel(64);
2286        Arc::new(FakeSignalingClient {
2287            event_tx,
2288            connected: AtomicBool::new(false),
2289            connect_calls: Arc::new(AtomicUsize::new(0)),
2290            actor_id: tokio::sync::Mutex::new(None),
2291            credential_state: tokio::sync::Mutex::new(None),
2292        })
2293    }
2294
2295    /// Helper: create a minimal SignalingConfig with an unreachable URL.
2296    fn make_config() -> SignalingConfig {
2297        SignalingConfig {
2298            server_url: Url::parse("ws://127.0.0.1:1/signaling/ws").unwrap(),
2299            connection_timeout: 2,
2300            heartbeat_interval: 30,
2301            reconnect_config: ReconnectConfig::default(),
2302            auth_config: None,
2303            webrtc_role: None,
2304        }
2305    }
2306
2307    /// Helper: create a WebSocketSignalingClient wrapped in Arc
2308    fn make_ws_client(config: SignalingConfig) -> Arc<WebSocketSignalingClient> {
2309        Arc::new(WebSocketSignalingClient::new(config))
2310    }
2311
2312    #[tokio::test]
2313    async fn probe_alive_times_out_when_sink_lock_is_stalled() {
2314        let client = make_ws_client(make_config());
2315        client.connected.store(true, Ordering::Release);
2316
2317        let _sink_guard = client.ws_sink.lock().await;
2318
2319        let result = tokio::time::timeout(
2320            Duration::from_millis(250),
2321            client.probe_alive(Duration::from_millis(20)),
2322        )
2323        .await
2324        .expect("probe should be bounded by its own timeout");
2325
2326        let err = result.expect_err("stalled sink lock should fail the probe");
2327        assert!(
2328            err.to_string()
2329                .contains("Timed out sending signaling probe ping"),
2330            "unexpected error: {err}"
2331        );
2332        assert!(
2333            !client.is_connected(),
2334            "stalled probe send should mark signaling disconnected"
2335        );
2336        assert_eq!(client.get_stats().disconnections, 1);
2337        assert!(
2338            client.pending_pongs.lock().await.is_empty(),
2339            "failed probe send should remove its pending pong waiter"
2340        );
2341    }
2342
2343    #[tokio::test]
2344    async fn explicit_connect_once_retries_after_concurrent_attempt_fails() {
2345        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2346            .await
2347            .expect("test listener should bind");
2348        let server_url = format!(
2349            "ws://{}/signaling/ws",
2350            listener
2351                .local_addr()
2352                .expect("test listener should have local addr")
2353        );
2354        let server_task = tokio::spawn(async move {
2355            let (stream, _) = listener
2356                .accept()
2357                .await
2358                .expect("test server should accept tcp connection");
2359            let ws_stream = tokio_tungstenite::accept_async(stream)
2360                .await
2361                .expect("test server should complete websocket handshake");
2362            tokio::time::sleep(Duration::from_millis(100)).await;
2363            drop(ws_stream);
2364        });
2365
2366        let mut config = make_config();
2367        config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2368        config.connection_timeout = 2;
2369        config.reconnect_config = ReconnectConfig {
2370            enabled: false,
2371            ..ReconnectConfig::default()
2372        };
2373        let client = make_ws_client(config);
2374
2375        client.connecting.store(true, Ordering::Release);
2376        let connect_task = {
2377            let client = client.clone();
2378            tokio::spawn(async move { client.connect_once().await })
2379        };
2380
2381        tokio::time::sleep(Duration::from_millis(50)).await;
2382        client.connecting.store(false, Ordering::Release);
2383        let _ = client.event_tx.send(SignalingEvent::Disconnected {
2384            reason: DisconnectReason::ConnectionFailed("simulated auto attempt failed".into()),
2385        });
2386
2387        tokio::time::timeout(Duration::from_secs(2), connect_task)
2388            .await
2389            .expect("explicit connect_once should not wait for auto backoff")
2390            .expect("connect_once task should not panic")
2391            .expect("explicit connect_once should retry after concurrent failure");
2392
2393        assert!(
2394            client.is_connected(),
2395            "explicit recovery connect should establish signaling"
2396        );
2397
2398        client.disconnect().await.ok();
2399        let _ = tokio::time::timeout(Duration::from_secs(1), server_task).await;
2400    }
2401
2402    #[tokio::test]
2403    async fn test_publish_disconnected_transition_fires_hook_once() {
2404        let stats = Arc::new(AtomicSignalingStats::default());
2405        let (event_tx, mut event_rx) = broadcast::channel(4);
2406        let hook_count = Arc::new(AtomicUsize::new(0));
2407        let hook_count_for_cb = hook_count.clone();
2408        let hook_callback: HookCallback = Arc::new(move |event| {
2409            let hook_count = hook_count_for_cb.clone();
2410            Box::pin(async move {
2411                if matches!(event, HookEvent::SignalingDisconnected) {
2412                    hook_count.fetch_add(1, UsizeOrdering::SeqCst);
2413                }
2414            }) as Pin<Box<dyn Future<Output = ()> + Send>>
2415        });
2416
2417        let first = WebSocketSignalingClient::publish_disconnected_transition(
2418            true,
2419            &stats,
2420            &event_tx,
2421            Some(hook_callback.clone()),
2422            DisconnectReason::StreamEnded,
2423            None,
2424        )
2425        .await;
2426        assert!(
2427            first,
2428            "first connected->disconnected transition should publish"
2429        );
2430        assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2431        assert_eq!(stats.snapshot().disconnections, 1);
2432        assert!(matches!(
2433            event_rx.recv().await,
2434            Ok(SignalingEvent::Disconnected {
2435                reason: DisconnectReason::StreamEnded
2436            })
2437        ));
2438
2439        let second = WebSocketSignalingClient::publish_disconnected_transition(
2440            false,
2441            &stats,
2442            &event_tx,
2443            Some(hook_callback),
2444            DisconnectReason::PongTimeout,
2445            None,
2446        )
2447        .await;
2448        assert!(
2449            !second,
2450            "stale duplicate disconnected transition should be ignored"
2451        );
2452        assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2453        assert_eq!(stats.snapshot().disconnections, 1);
2454        assert!(event_rx.try_recv().is_err());
2455    }
2456
2457    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2458    // 1. Configuration defaults
2459    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2460
2461    #[test]
2462    fn test_reconnect_config_defaults() {
2463        let cfg = ReconnectConfig::default();
2464        assert!(cfg.enabled);
2465        assert_eq!(cfg.max_attempts, 10);
2466        assert_eq!(cfg.initial_delay, 1);
2467        assert_eq!(cfg.max_delay, 60);
2468        assert!((cfg.backoff_multiplier - 2.0).abs() < f64::EPSILON);
2469    }
2470
2471    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2472    // 2. Initial state
2473    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2474
2475    #[test]
2476    fn test_websocket_signaling_client_initial_state_disconnected() {
2477        let client = WebSocketSignalingClient::new(make_config());
2478        assert!(
2479            !client.is_connected(),
2480            "newly created client should be Disconnected"
2481        );
2482        assert!(
2483            !client.connecting.load(Ordering::Acquire),
2484            "newly created client should not be in connecting state"
2485        );
2486        assert!(
2487            !client.reconnector_started.load(Ordering::Acquire),
2488            "reconnect manager should not be started automatically"
2489        );
2490    }
2491
2492    #[test]
2493    fn test_initial_stats_are_zero() {
2494        let client = WebSocketSignalingClient::new(make_config());
2495        let stats = client.get_stats();
2496        assert_eq!(stats.connections, 0);
2497        assert_eq!(stats.disconnections, 0);
2498        assert_eq!(stats.messages_sent, 0);
2499        assert_eq!(stats.messages_received, 0);
2500        assert_eq!(stats.errors, 0);
2501    }
2502
2503    #[test]
2504    fn test_signaling_url_log_redacts_credential_query_params() {
2505        let url = Url::parse(
2506            "wss://example.com/signaling?actor_id=abc&key_id=7&claims=claims-value&signature=signature-value&token=token-value",
2507        )
2508        .unwrap();
2509
2510        let redacted = WebSocketSignalingClient::redact_signaling_url_for_log(&url);
2511
2512        assert!(redacted.contains("actor_id=abc"));
2513        assert!(redacted.contains("key_id=7"));
2514        assert!(redacted.contains("claims=REDACTED"));
2515        assert!(redacted.contains("signature=REDACTED"));
2516        assert!(redacted.contains("token=REDACTED"));
2517        assert!(!redacted.contains("claims-value"));
2518        assert!(!redacted.contains("signature-value"));
2519        assert!(!redacted.contains("token-value"));
2520    }
2521
2522    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2523    // 3. Reconnect manager idempotency
2524    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2525
2526    #[tokio::test]
2527    async fn test_reconnect_manager_idempotent() {
2528        let client = make_ws_client(make_config());
2529
2530        // First start should succeed
2531        client.start_reconnect_manager();
2532        assert!(
2533            client.reconnector_started.load(Ordering::Acquire),
2534            "reconnector_started should be true after first call"
2535        );
2536
2537        // Second call should not start a new manager (CAS fails)
2538        client.start_reconnect_manager();
2539        // Multiple managers would cause flaky tests due to duplicate reconnections; mainly verify the flag
2540        assert!(client.reconnector_started.load(Ordering::Acquire));
2541    }
2542
2543    #[tokio::test]
2544    async fn test_reconnect_manager_disabled_when_config_disabled() {
2545        let mut config = make_config();
2546        config.reconnect_config.enabled = false;
2547        let client = make_ws_client(config);
2548
2549        client.start_reconnect_manager();
2550        assert!(
2551            !client.reconnector_started.load(Ordering::Acquire),
2552            "reconnect manager should not start when reconnect config is disabled"
2553        );
2554    }
2555
2556    #[tokio::test]
2557    async fn test_reconnect_manager_does_not_keep_client_alive() {
2558        let client = make_ws_client(make_config());
2559        let weak = Arc::downgrade(&client);
2560
2561        client.start_reconnect_manager();
2562        drop(client);
2563
2564        assert!(
2565            weak.upgrade().is_none(),
2566            "reconnect manager must not keep signaling client alive after owner drop"
2567        );
2568    }
2569
2570    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2571    // 4. connect() concurrency exclusion
2572    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2573
2574    #[tokio::test]
2575    async fn test_connect_fast_path_when_already_connected() {
2576        let client = make_ws_client(make_config());
2577        // Manually set as connected
2578        client.connected.store(true, Ordering::Release);
2579
2580        // connect() should return Ok immediately without establishing a new connection
2581        let result = client.connect().await;
2582        assert!(
2583            result.is_ok(),
2584            "connect() should return Ok when already connected"
2585        );
2586        // Should not change connecting flag
2587        assert!(!client.connecting.load(Ordering::Acquire));
2588    }
2589
2590    #[tokio::test]
2591    async fn test_connect_sets_connecting_flag() {
2592        let mut config = make_config();
2593        config.reconnect_config.enabled = false; // disable retry, fail fast
2594        config.connection_timeout = 1;
2595        let client = make_ws_client(config);
2596
2597        // Connection will fail (unreachable address), but should properly clean up connecting flag
2598        let result = client.connect().await;
2599        assert!(
2600            result.is_err(),
2601            "connecting to unreachable address should fail"
2602        );
2603        assert!(
2604            !client.connecting.load(Ordering::Acquire),
2605            "connecting flag should be cleared after connection failure"
2606        );
2607    }
2608
2609    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2610    // 5. Event broadcast
2611    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2612
2613    #[tokio::test]
2614    async fn test_event_subscribe_receives_events() {
2615        let client = make_ws_client(make_config());
2616        let mut rx = client.subscribe_events();
2617
2618        // Manually send event
2619        let _ = client.event_tx.send(SignalingEvent::Connected);
2620
2621        match tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await {
2622            Ok(Ok(SignalingEvent::Connected)) => {} // expect Connected event
2623            other => panic!("expected Connected event, but got {:?}", other),
2624        }
2625    }
2626
2627    #[tokio::test]
2628    async fn test_disconnect_event_on_connect_failure() {
2629        let mut config = make_config();
2630        config.reconnect_config.enabled = false;
2631        config.connection_timeout = 1;
2632        let client = make_ws_client(config);
2633        let mut rx = client.subscribe_events();
2634
2635        // Connection fails
2636        let _ = client.connect().await;
2637
2638        // Should receive Disconnected(ConnectionFailed) event
2639        match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
2640            Ok(Ok(SignalingEvent::Disconnected {
2641                reason: DisconnectReason::ConnectionFailed(_),
2642            })) => {} // expected
2643            other => panic!(
2644                "expected Disconnected(ConnectionFailed) event, but got {:?}",
2645                other
2646            ),
2647        }
2648    }
2649
2650    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2651    // 6. disconnect() state cleanup
2652    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2653
2654    #[tokio::test]
2655    async fn test_disconnect_clears_connected_flag() {
2656        let client = make_ws_client(make_config());
2657        // Simulate connected state
2658        client.connected.store(true, Ordering::Release);
2659        assert!(client.is_connected());
2660
2661        let result = client.disconnect().await;
2662        assert!(result.is_ok());
2663        assert!(
2664            !client.is_connected(),
2665            "should be Disconnected after disconnect()"
2666        );
2667    }
2668
2669    #[tokio::test]
2670    async fn test_disconnect_increments_disconnection_stat() {
2671        let client = make_ws_client(make_config());
2672        client.connected.store(true, Ordering::Release);
2673
2674        let stats_before = client.get_stats().disconnections;
2675        let _ = client.disconnect().await;
2676        let stats_after = client.get_stats().disconnections;
2677        assert_eq!(
2678            stats_after,
2679            stats_before + 1,
2680            "disconnect() should increment disconnection count"
2681        );
2682    }
2683
2684    #[tokio::test]
2685    async fn test_disconnect_idempotent() {
2686        let client = make_ws_client(make_config());
2687
2688        // Calling disconnect() while not connected should not panic
2689        let r1 = client.disconnect().await;
2690        let r2 = client.disconnect().await;
2691        assert!(r1.is_ok());
2692        assert!(r2.is_ok());
2693        assert!(!client.is_connected());
2694    }
2695
2696    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2697    // 7. Reconnect notify mechanism
2698    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2699
2700    #[tokio::test]
2701    async fn test_reconnect_notify_wakes_waiter() {
2702        let notify = Arc::new(tokio::sync::Notify::new());
2703        let notify_clone = notify.clone();
2704        let woken = Arc::new(AtomicBool::new(false));
2705        let woken_clone = woken.clone();
2706
2707        let handle = tokio::spawn(async move {
2708            notify_clone.notified().await;
2709            woken_clone.store(true, Ordering::Release);
2710        });
2711
2712        // Ensure waiter has registered
2713        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2714        assert!(
2715            !woken.load(Ordering::Acquire),
2716            "should not be woken before notification"
2717        );
2718
2719        // Trigger notification
2720        notify.notify_one();
2721        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2722        assert!(
2723            woken.load(Ordering::Acquire),
2724            "should be woken after notification"
2725        );
2726
2727        handle.abort();
2728    }
2729
2730    #[tokio::test]
2731    async fn test_schedule_auto_reconnect_reenables_after_explicit_disconnect() {
2732        let client = make_ws_client(make_config());
2733
2734        client
2735            .disconnect()
2736            .await
2737            .expect("explicit disconnect should be idempotent");
2738        assert!(
2739            client.auto_reconnect_suppressed.load(Ordering::Acquire),
2740            "explicit disconnect should suppress stale auto-reconnect cycles"
2741        );
2742
2743        client.schedule_auto_reconnect();
2744
2745        assert!(
2746            !client.auto_reconnect_suppressed.load(Ordering::Acquire),
2747            "scheduling a fresh auto-reconnect should clear explicit disconnect suppression"
2748        );
2749    }
2750
2751    #[tokio::test]
2752    async fn test_explicit_disconnect_suppresses_reconnect_cycle_in_backoff() {
2753        let mut config = make_config();
2754        config.connection_timeout = 1;
2755        config.reconnect_config = ReconnectConfig {
2756            enabled: true,
2757            max_attempts: 5,
2758            initial_delay: 1,
2759            max_delay: 1,
2760            backoff_multiplier: 1.0,
2761        };
2762        let client = make_ws_client(config);
2763        let mut rx = client.subscribe_events();
2764
2765        let reconnect_client = client.clone();
2766        let reconnect_task = tokio::spawn(async move {
2767            reconnect_client.run_reconnect_cycle().await;
2768        });
2769
2770        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2771            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2772            other => panic!("expected first reconnect attempt, got {other:?}"),
2773        }
2774
2775        client
2776            .disconnect()
2777            .await
2778            .expect("explicit disconnect should be idempotent");
2779
2780        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2781            .await
2782            .expect("suppressed reconnect cycle should exit promptly")
2783            .expect("reconnect task should not panic");
2784
2785        while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
2786        {
2787            if let SignalingEvent::ConnectStart { attempt } = event {
2788                panic!("suppressed reconnect cycle sent unexpected attempt {attempt}");
2789            }
2790        }
2791
2792        assert!(
2793            client.auto_reconnect_suppressed.load(Ordering::Acquire),
2794            "explicit disconnect should suppress stale auto-reconnect cycles"
2795        );
2796    }
2797
2798    #[tokio::test]
2799    async fn test_explicit_disconnect_suppresses_in_flight_auto_reconnect_connected_event() {
2800        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2801            .await
2802            .expect("test listener should bind");
2803        let server_url = format!(
2804            "ws://{}/signaling/ws",
2805            listener
2806                .local_addr()
2807                .expect("test listener should have local addr")
2808        );
2809        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
2810
2811        let server_task = tokio::spawn(async move {
2812            let (stream, _) = listener
2813                .accept()
2814                .await
2815                .expect("test server should accept tcp connection");
2816            let _ = release_rx.await;
2817            let ws_stream = tokio_tungstenite::accept_async(stream)
2818                .await
2819                .expect("test server should complete websocket handshake");
2820            tokio::time::sleep(Duration::from_millis(100)).await;
2821            drop(ws_stream);
2822        });
2823
2824        let mut config = make_config();
2825        config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2826        config.connection_timeout = 5;
2827        config.reconnect_config = ReconnectConfig {
2828            enabled: true,
2829            max_attempts: 3,
2830            initial_delay: 1,
2831            max_delay: 1,
2832            backoff_multiplier: 1.0,
2833        };
2834        let client = make_ws_client(config);
2835        let mut rx = client.subscribe_events();
2836
2837        let reconnect_client = client.clone();
2838        let reconnect_task = tokio::spawn(async move {
2839            reconnect_client.run_reconnect_cycle().await;
2840        });
2841
2842        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2843            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2844            other => panic!("expected first reconnect attempt, got {other:?}"),
2845        }
2846
2847        client
2848            .disconnect()
2849            .await
2850            .expect("explicit disconnect should cancel the in-flight auto-reconnect");
2851        release_tx
2852            .send(())
2853            .expect("test server handshake should still be waiting");
2854
2855        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2856            .await
2857            .expect("cancelled in-flight reconnect should exit promptly")
2858            .expect("reconnect task should not panic");
2859
2860        while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(150), rx.recv()).await
2861        {
2862            assert!(
2863                !matches!(event, SignalingEvent::Connected),
2864                "cancelled auto-reconnect must not publish Connected"
2865            );
2866        }
2867
2868        assert!(
2869            !client.is_connected(),
2870            "cancelled auto-reconnect must not leave signaling connected"
2871        );
2872
2873        tokio::time::timeout(Duration::from_secs(1), server_task)
2874            .await
2875            .expect("test server task should finish")
2876            .expect("test server task should not panic");
2877    }
2878
2879    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2880    // 8. URL construction tests
2881    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2882
2883    #[tokio::test]
2884    async fn test_build_url_without_identity() {
2885        let config = make_config();
2886        let expected_base = config.server_url.to_string();
2887        let client = WebSocketSignalingClient::new(config);
2888
2889        let url = client.build_url_with_identity().await;
2890        assert_eq!(
2891            url.to_string(),
2892            expected_base,
2893            "URL should not contain identity parameters when actor_id is not set"
2894        );
2895    }
2896
2897    #[tokio::test]
2898    async fn test_build_url_with_webrtc_role() {
2899        let mut config = make_config();
2900        config.webrtc_role = Some("answer".to_string());
2901        let client = WebSocketSignalingClient::new(config);
2902
2903        let url = client.build_url_with_identity().await;
2904        assert!(
2905            url.query().unwrap_or("").contains("webrtc_role=answer"),
2906            "URL should contain webrtc_role parameter, actual URL: {}",
2907            url
2908        );
2909    }
2910
2911    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2912    // 9. Inbound channel reset
2913    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2914
2915    #[tokio::test]
2916    async fn test_reset_inbound_channel_creates_fresh_channel() {
2917        let client = WebSocketSignalingClient::new(make_config());
2918
2919        // Get old tx and send a message
2920        {
2921            let tx = client.inbound_tx.lock().await;
2922            let _ = tx.send(SignalingEnvelope::default());
2923        }
2924
2925        // Reset channel
2926        client.reset_inbound_channel().await;
2927
2928        // Old messages should not be visible in the new channel
2929        let mut rx = client.inbound_rx.lock().await;
2930        let result = rx.try_recv();
2931        assert!(
2932            result.is_err(),
2933            "old messages should not be visible in the new channel after reset"
2934        );
2935    }
2936
2937    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2938    // 10. Envelope ID incrementing
2939    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2940
2941    #[tokio::test]
2942    async fn test_envelope_id_monotonically_increasing() {
2943        let client = WebSocketSignalingClient::new(make_config());
2944
2945        let id1 = client.next_envelope_id().await;
2946        let id2 = client.next_envelope_id().await;
2947        let id3 = client.next_envelope_id().await;
2948
2949        assert_eq!(id1, "env-1");
2950        assert_eq!(id2, "env-2");
2951        assert_eq!(id3, "env-3");
2952    }
2953
2954    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2955    // 11. send_envelope should return error when not connected
2956    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2957
2958    #[tokio::test]
2959    async fn test_send_envelope_fails_when_not_connected() {
2960        let client = WebSocketSignalingClient::new(make_config());
2961        let envelope = SignalingEnvelope::default();
2962
2963        let result = client.send_envelope(envelope).await;
2964        assert!(
2965            result.is_err(),
2966            "send_envelope should return error when not connected"
2967        );
2968        match result {
2969            Err(NetworkError::ConnectionError(msg)) => {
2970                assert!(
2971                    msg.contains("not connected") || msg.contains("Not connected"),
2972                    "error message should contain 'not connected', actual: {}",
2973                    msg
2974                );
2975            }
2976            other => panic!("expected ConnectionError, got {:?}", other),
2977        }
2978    }
2979
2980    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2981    // 12. FakeSignalingClient trait implementation verification
2982    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2983
2984    #[tokio::test]
2985    async fn test_fake_client_tracks_connect_calls() {
2986        let client = make_fake_client();
2987        assert_eq!(client.connect_calls.load(UsizeOrdering::SeqCst), 0);
2988
2989        client.connect().await.unwrap();
2990        client.connect().await.unwrap();
2991        client.connect().await.unwrap();
2992
2993        assert_eq!(
2994            client.connect_calls.load(UsizeOrdering::SeqCst),
2995            3,
2996            "FakeSignalingClient should accurately track connect call count"
2997        );
2998    }
2999}