Skip to main content

actr_hyper/wire/webrtc/
signaling.rs

1//! signaling clientImplementation
2//!
3//! Based on protobuf Definition'ssignalingprotocol, using SignalingEnvelope conclude construct
4
5#[cfg(feature = "opentelemetry")]
6use super::trace;
7use crate::lifecycle::CredentialState;
8use crate::transport::{NetworkError, NetworkResult};
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::extract_trace_context;
11use actr_protocol::prost::Message as ProstMessage;
12use actr_protocol::{
13    AIdCredential, ActrId, ActrToSignaling, CredentialUpdateRequest, GetSigningKeyRequest,
14    PeerToSignaling, Ping, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
15    RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterRequest,
16    UnregisterResponse, actr_to_signaling, peer_to_signaling, signaling_envelope,
17    signaling_to_actr,
18};
19use async_trait::async_trait;
20use base64::Engine as _;
21use futures_util::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::{
27    Arc, OnceLock,
28    atomic::{AtomicBool, AtomicU64, Ordering},
29};
30use std::time::Duration;
31use tokio::net::TcpStream;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
34use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async_with_config};
35#[cfg(feature = "opentelemetry")]
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use url::Url;
38
39/// WebSocket sink type alias for the split write half of a signaling connection
40type WsSink = Arc<
41    tokio::sync::Mutex<
42        Option<
43            futures_util::stream::SplitSink<
44                WebSocketStream<MaybeTlsStream<TcpStream>>,
45                tokio_tungstenite::tungstenite::Message,
46            >,
47        >,
48    >,
49>;
50
51// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
52// Constants
53// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
54
55/// Default timeout in seconds for waiting for signaling response
56const RESPONSE_TIMEOUT_SECS: u64 = 15;
57// WebSocket-level keepalive to detect silent half-open connections
58const PING_INTERVAL_SECS: u64 = 5;
59const PONG_TIMEOUT_SECS: u64 = 10;
60const SIGNALING_SEND_TIMEOUT_SECS: u64 = 5;
61const CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS: u64 = 5;
62const DISCONNECT_LOCK_TIMEOUT_SECS: u64 = 5;
63const DISCONNECT_CLOSE_TIMEOUT_SECS: u64 = 1;
64
65// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
66// configurationType
67// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
68
69/// signalingconfiguration
70#[derive(Debug, Clone)]
71pub struct SignalingConfig {
72    /// signaling server URL
73    pub server_url: Url,
74
75    /// Connecttimeout temporal duration (seconds)
76    pub connection_timeout: u64,
77
78    /// center skipinterval(seconds)
79    pub heartbeat_interval: u64,
80
81    /// reconnection configuration
82    pub reconnect_config: ReconnectConfig,
83
84    /// acknowledge verify configuration
85    pub auth_config: Option<AuthConfig>,
86
87    /// WebRTC role preference: "answer" if this node has advanced config
88    pub webrtc_role: Option<String>,
89}
90
91/// reconnection configuration
92#[derive(Debug, Clone)]
93pub struct ReconnectConfig {
94    /// whether start usage automatic reconnection
95    pub enabled: bool,
96
97    /// maximum reconnection attempts
98    pub max_attempts: u32,
99
100    /// initial reconnection delay(seconds)
101    pub initial_delay: u64,
102
103    /// maximum reconnection delay(seconds)
104    pub max_delay: u64,
105
106    /// Backoff multiplier factor
107    pub backoff_multiplier: f64,
108}
109
110impl Default for ReconnectConfig {
111    fn default() -> Self {
112        Self {
113            enabled: true,
114            max_attempts: 10,
115            initial_delay: 1,
116            max_delay: 60,
117            backoff_multiplier: 2.0,
118        }
119    }
120}
121
122/// acknowledge verify configuration
123#[derive(Debug, Clone)]
124pub struct AuthConfig {
125    /// acknowledge verify Type
126    pub auth_type: AuthType,
127
128    /// acknowledge verify credential data
129    pub credentials: HashMap<String, String>,
130}
131
132/// acknowledge verify Type
133#[derive(Debug, Clone)]
134pub enum AuthType {
135    /// no acknowledge verify
136    None,
137    /// Bearer Token
138    BearerToken,
139    /// API Key
140    ApiKey,
141    /// JWT
142    Jwt,
143}
144
145// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
146// Client interface and implementation
147// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
148
149/// signaling client connect port
150///
151/// # interior mutability
152/// allMethodusing `&self` and non `&mut self`, with for conveniencein Arc in shared.
153/// Implementation class needs interior mutability ( like Mutex)to manage WebSocket connection status.
154#[async_trait]
155pub trait SignalingClient: Send + Sync {
156    /// Connecttosignaling server
157    async fn connect(&self) -> NetworkResult<()>;
158
159    /// Perform a single explicit connection attempt.
160    ///
161    /// Network recovery events use this path so a failed restore attempt can
162    /// return quickly instead of sleeping inside the normal reconnect backoff.
163    async fn connect_once(&self) -> NetworkResult<()> {
164        self.connect().await
165    }
166
167    /// DisconnectConnect
168    async fn disconnect(&self) -> NetworkResult<()>;
169
170    /// Probe whether the existing signaling WebSocket is truly alive.
171    ///
172    /// The default implementation only checks local state. WebSocket-backed
173    /// clients override this with an active Ping/Pong probe to catch half-open
174    /// sockets before network recovery decides whether to reconnect.
175    async fn probe_alive(&self, _timeout: Duration) -> NetworkResult<()> {
176        if self.is_connected() {
177            Ok(())
178        } else {
179            Err(NetworkError::ConnectionError(
180                "Signaling client is not connected".to_string(),
181            ))
182        }
183    }
184
185    /// Deprecated: Registration now happens via AIS HTTP; this WS path is no longer used.
186    /// Kept for backward compatibility; will be removed in a future release.
187    async fn send_register_request(
188        &self,
189        request: RegisterRequest,
190    ) -> NetworkResult<RegisterResponse>;
191
192    /// Send UnregisterRequest to signaling server (Actr → Signaling flow)
193    ///
194    /// This is used when an Actor is shutting down gracefully and wants to
195    /// proactively notify the signaling server that it is no longer available.
196    async fn send_unregister_request(
197        &self,
198        actor_id: ActrId,
199        credential: AIdCredential,
200        reason: Option<String>,
201    ) -> NetworkResult<UnregisterResponse>;
202
203    /// Send center skip(Registerafter stream process, using ActrToSignaling)
204    /// Returns Pong response if received, error if timeout or no response
205    async fn send_heartbeat(
206        &self,
207        actor_id: ActrId,
208        credential: AIdCredential,
209        availability: ServiceAvailabilityState,
210        power_reserve: f32,
211        mailbox_backlog: f32,
212    ) -> NetworkResult<Pong>;
213
214    /// Send RouteCandidatesRequest (requires authenticated Actor session)
215    async fn send_route_candidates_request(
216        &self,
217        actor_id: ActrId,
218        credential: AIdCredential,
219        request: RouteCandidatesRequest,
220    ) -> NetworkResult<RouteCandidatesResponse>;
221
222    /// Query AIS Ed25519 signing public key via signaling
223    ///
224    /// Returns `(key_id, pubkey_bytes)` where pubkey_bytes is the 32-byte raw public key.
225    /// Typically called by AisKeyCache on cache miss; should not be used directly in hot paths.
226    async fn get_signing_key(
227        &self,
228        actor_id: ActrId,
229        credential: AIdCredential,
230        key_id: u32,
231    ) -> NetworkResult<(u32, Vec<u8>)>;
232
233    /// Send CredentialUpdateRequest to refresh the Actor's credential
234    ///
235    /// This is used to refresh the credential before it expires. The server responds
236    /// with a RegisterResponse containing the new credential and expiration time.
237    async fn send_credential_update_request(
238        &self,
239        actor_id: ActrId,
240        credential: AIdCredential,
241    ) -> NetworkResult<RegisterResponse>;
242
243    /// Sendsignalingsignal seal ( pass usage Method)
244    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
245
246    /// Receivesignalingsignal seal
247    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
248
249    /// Check connection status
250    fn is_connected(&self) -> bool;
251
252    /// GetConnect statistics info
253    fn get_stats(&self) -> SignalingStats;
254    /// Subscribe to signaling events (state transitions).
255    fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent>;
256
257    /// Set actor ID and credential state for reconnect URL parameters.
258    async fn set_actor_id(&self, actor_id: ActrId);
259    async fn set_credential_state(&self, credential_state: CredentialState);
260
261    /// Clear stored actor ID and credential state.
262    ///
263    /// After calling this, `connect()` will produce a clean WebSocket URL
264    /// without identity query parameters, so the signaling server treats
265    /// the connection as brand-new rather than a reconnect of the old actor.
266    /// This is required before re-registration when the credential has expired.
267    async fn clear_identity(&self);
268
269    /// Set a lifecycle hook callback that will be invoked (and awaited)
270    /// whenever signaling state changes (connect/disconnect).
271    /// Default implementation is a no-op for clients that don't support hooks.
272    fn set_hook_callback(&self, _cb: HookCallback) {}
273}
274
275/// High-level signaling connection state (kept for quick boolean checks).
276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
277pub enum ConnectionState {
278    Disconnected,
279    Connected,
280}
281
282/// Signaling state transition events.
283///
284/// Unlike `ConnectionState` (which is a snapshot), these represent discrete
285/// transitions and are delivered via `broadcast` so every subscriber sees
286/// every event, even if the same state occurs twice in a row.
287#[derive(Debug, Clone)]
288pub enum SignalingEvent {
289    /// About to start a connection attempt (includes retry count).
290    ConnectStart { attempt: u32 },
291    /// Connection successfully established.
292    Connected,
293    /// Connection lost.
294    Disconnected { reason: DisconnectReason },
295}
296
297/// Reason why the signaling connection was lost.
298#[derive(Debug, Clone)]
299pub enum DisconnectReason {
300    /// WebSocket stream ended (receiver task exited normally).
301    StreamEnded,
302    /// No Pong received within the timeout window.
303    PongTimeout,
304    /// Failed to send a WebSocket Ping frame.
305    PingSendFailed,
306    /// Credential expired (heartbeat 401).
307    CredentialExpired,
308    /// Explicit disconnect() call or external trigger.
309    Manual,
310    /// Connection attempt failed with an error.
311    ConnectionFailed(String),
312}
313
314// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
315// Hook callback for synchronous lifecycle notification
316// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
317
318/// Events that trigger workload lifecycle hooks.
319///
320/// Used by `HookCallback` to invoke workload hooks synchronously (awaited)
321/// at the point where the state change occurs.
322#[derive(Clone, Debug)]
323pub enum HookEvent {
324    // ── Signaling ──
325    SignalingConnectStart {
326        attempt: u32,
327    },
328    SignalingConnected,
329    SignalingDisconnected,
330    // ── WebRTC ──
331    WebRtcConnectStart {
332        peer_id: ActrId,
333    },
334    WebRtcConnected {
335        peer_id: ActrId,
336        relayed: bool,
337    },
338    WebRtcDisconnected {
339        peer_id: ActrId,
340    },
341    DataStreamDeliveryUncertain {
342        stream_id: String,
343        session_id: u64,
344        reason: String,
345    },
346    // ── WebSocket ──
347    WebSocketConnectStart {
348        peer_id: ActrId,
349    },
350    WebSocketConnected {
351        peer_id: ActrId,
352    },
353    WebSocketDisconnected {
354        peer_id: ActrId,
355    },
356    // ── Credential ──
357    CredentialRenewed {
358        new_expiry: std::time::SystemTime,
359    },
360    CredentialExpiring {
361        new_expiry: std::time::SystemTime,
362    },
363    // ── Mailbox ──
364    MailboxBackpressure {
365        queue_len: usize,
366        threshold: usize,
367    },
368}
369
370/// Callback closure that is awaited when a hook event occurs.
371///
372/// Set once via `set_hook_callback()`. All state-change paths invoke this
373/// closure and `.await` its result before proceeding.
374pub type HookCallback =
375    Arc<dyn Fn(HookEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
376
377#[derive(Debug, Clone, Copy)]
378enum ConnectIntent {
379    Explicit,
380    AutoReconnect { generation: u64 },
381}
382
383/// WebSocket signaling clientImplementation
384pub struct WebSocketSignalingClient {
385    config: SignalingConfig,
386    actor_id: tokio::sync::Mutex<Option<ActrId>>,
387    credential_state: tokio::sync::Mutex<Option<CredentialState>>,
388    /// WebSocket write end (using Mutex Implementation interior mutability )
389    ws_sink: WsSink,
390    /// WebSocket read end (using Mutex Implementation interior mutability )
391    ws_stream: tokio::sync::Mutex<
392        Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
393    >,
394    /// connection status
395    connected: Arc<AtomicBool>,
396    /// Connection in progress flag (prevents concurrent connect attempts)
397    connecting: Arc<AtomicBool>,
398    /// statistics info
399    stats: Arc<AtomicSignalingStats>,
400    /// Envelope count number device
401    envelope_counter: tokio::sync::Mutex<u64>,
402    /// Pending reply waiters (reply_for -> oneshot)
403    pending_replies: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<SignalingEnvelope>>>>,
404    /// Pending WebSocket Pong waiters (ping payload -> oneshot)
405    pending_pongs: Arc<tokio::sync::Mutex<HashMap<Vec<u8>, oneshot::Sender<()>>>>,
406    /// Monotonic probe payload counter.
407    probe_counter: AtomicU64,
408    /// Inbound envelope channel for unmatched messages (ActrRelay / push)
409    inbound_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<SignalingEnvelope>>>,
410    inbound_tx: tokio::sync::Mutex<mpsc::UnboundedSender<SignalingEnvelope>>,
411    /// Background receive task handle to allow graceful shutdown
412    receiver_task: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
413    /// Background ping task to detect half-open connections
414    ping_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
415    /// Connection state broadcast channel (event-driven)
416    event_tx: broadcast::Sender<SignalingEvent>,
417    /// Last time we saw inbound traffic (pong/any message), unix epoch seconds
418    last_pong: Arc<AtomicU64>,
419    /// Flag to track if reconnect manager has been started
420    reconnector_started: Arc<AtomicBool>,
421    /// Notify channel to wake up the reconnect manager
422    reconnect_notify: Arc<tokio::sync::Notify>,
423    /// Explicit disconnects from lifecycle/cleanup suppress stale auto-reconnect cycles.
424    auto_reconnect_suppressed: AtomicBool,
425    /// Incremented by explicit disconnects to invalidate in-flight auto-reconnect attempts.
426    reconnect_generation: AtomicU64,
427    /// Hook callback for synchronous lifecycle notification (set once, lock-free read)
428    hook_callback: OnceLock<HookCallback>,
429}
430
431impl WebSocketSignalingClient {
432    /// Create new WebSocket signaling client
433    pub fn new(config: SignalingConfig) -> Self {
434        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
435        let (event_tx, _event_rx) = broadcast::channel(64);
436        Self {
437            config,
438            actor_id: tokio::sync::Mutex::new(None),
439            credential_state: tokio::sync::Mutex::new(None),
440            ws_sink: Arc::new(tokio::sync::Mutex::new(None)),
441            ws_stream: tokio::sync::Mutex::new(None),
442            connected: Arc::new(AtomicBool::new(false)),
443            connecting: Arc::new(AtomicBool::new(false)),
444            stats: Arc::new(AtomicSignalingStats::default()),
445            envelope_counter: tokio::sync::Mutex::new(0),
446            pending_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
447            pending_pongs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
448            probe_counter: AtomicU64::new(0),
449            inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
450            inbound_tx: tokio::sync::Mutex::new(inbound_tx),
451            receiver_task: Arc::new(tokio::sync::Mutex::new(None)),
452            ping_task: tokio::sync::Mutex::new(None),
453            event_tx,
454            last_pong: Arc::new(AtomicU64::new(0)),
455            reconnector_started: Arc::new(AtomicBool::new(false)),
456            reconnect_notify: Arc::new(tokio::sync::Notify::new()),
457            auto_reconnect_suppressed: AtomicBool::new(false),
458            reconnect_generation: AtomicU64::new(0),
459            hook_callback: OnceLock::new(),
460        }
461    }
462
463    /// Start the reconnect manager if enabled in config and not already started.
464    ///
465    /// The manager waits on a `Notify` and runs an exponential-backoff retry loop
466    /// each time it is woken up.
467    /// Invoke the hook callback and await its completion.
468    /// No-op if no callback has been set yet.
469    async fn invoke_hook(&self, event: HookEvent) {
470        if let Some(cb) = self.hook_callback.get() {
471            cb(event).await;
472        }
473    }
474
475    async fn publish_disconnected_transition(
476        was_connected: bool,
477        stats: &Arc<AtomicSignalingStats>,
478        event_tx: &broadcast::Sender<SignalingEvent>,
479        hook_callback: Option<HookCallback>,
480        reason: DisconnectReason,
481        reconnect_notify: Option<&Arc<tokio::sync::Notify>>,
482    ) -> bool {
483        if !was_connected {
484            return false;
485        }
486
487        stats.disconnections.fetch_add(1, Ordering::Relaxed);
488
489        if let Some(cb) = hook_callback {
490            cb(HookEvent::SignalingDisconnected).await;
491        }
492
493        let _ = event_tx.send(SignalingEvent::Disconnected { reason });
494
495        if let Some(notify) = reconnect_notify {
496            notify.notify_one();
497        }
498
499        true
500    }
501
502    pub fn start_reconnect_manager(self: &Arc<Self>) {
503        if !self.config.reconnect_config.enabled {
504            return;
505        }
506        if self
507            .reconnector_started
508            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
509            .is_err()
510        {
511            return; // already started
512        }
513
514        tracing::info!("🔄 Starting reconnect manager for signaling client");
515
516        let client = Arc::downgrade(self);
517        let notify = self.reconnect_notify.clone();
518
519        tokio::spawn(async move {
520            loop {
521                let reconnect_requested = tokio::select! {
522                    _ = notify.notified() => true,
523                    _ = tokio::time::sleep(Duration::from_secs(30)) => false,
524                };
525
526                if !reconnect_requested && client.upgrade().is_none() {
527                    break;
528                }
529                if !reconnect_requested {
530                    continue;
531                }
532
533                let Some(client) = client.upgrade() else {
534                    break;
535                };
536
537                if !client.config.reconnect_config.enabled {
538                    break;
539                }
540
541                if Arc::strong_count(&client) <= 1 {
542                    break;
543                }
544
545                // Run reconnect cycle with exponential backoff
546                client.run_reconnect_cycle().await;
547            }
548        });
549    }
550
551    /// Execute one full reconnect cycle with exponential backoff + jitter.
552    async fn run_reconnect_cycle(self: &Arc<Self>) {
553        use actr_framework::ExponentialBackoff;
554
555        let cfg = &self.config.reconnect_config;
556        let generation = self.reconnect_generation.load(Ordering::Acquire);
557
558        if Arc::strong_count(self) <= 1 {
559            tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
560            return;
561        }
562
563        if self.auto_reconnect_cancelled(generation) {
564            tracing::debug!("Skipping signaling auto-reconnect cycle after explicit disconnect");
565            return;
566        }
567
568        if self.connected.load(Ordering::Acquire) {
569            tracing::debug!("🔎 Probing connected signaling before reconnect cycle");
570            match self
571                .probe_alive(Duration::from_secs(PONG_TIMEOUT_SECS))
572                .await
573            {
574                Ok(()) => {
575                    tracing::debug!("Signaling probe succeeded, skipping reconnect cycle");
576                    return;
577                }
578                Err(e) => {
579                    tracing::warn!("Signaling probe failed before reconnect: {e}");
580                    if let Err(disconnect_err) = self.disconnect_internal(false).await {
581                        tracing::warn!(
582                            "⚠️ Disconnect cleanup failed after failed probe (non-fatal): {disconnect_err}"
583                        );
584                    }
585                }
586            }
587        }
588
589        let backoff = ExponentialBackoff::builder()
590            .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
591            .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
592            .max_retries(cfg.max_attempts)
593            .with_jitter()
594            .build();
595
596        let mut attempt: u32 = 0;
597
598        for delay in backoff {
599            if Arc::strong_count(self) <= 1 {
600                tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
601                return;
602            }
603
604            if self.auto_reconnect_cancelled(generation) {
605                tracing::debug!(
606                    "Stopping signaling auto-reconnect cycle after explicit disconnect"
607                );
608                return;
609            }
610
611            if self.connected.load(Ordering::Acquire) {
612                tracing::debug!("Already connected, aborting reconnect cycle");
613                return;
614            }
615
616            attempt += 1;
617            let _ = self.event_tx.send(SignalingEvent::ConnectStart { attempt });
618
619            match self.connect_once_for_auto_reconnect(generation).await {
620                Ok(()) => {
621                    tracing::info!("✅ Signaling reconnect succeeded on attempt {attempt}");
622                    return;
623                }
624                Err(e) => {
625                    if self.auto_reconnect_cancelled(generation) {
626                        tracing::debug!(
627                            "Stopping signaling auto-reconnect cycle after explicit disconnect"
628                        );
629                        return;
630                    }
631
632                    tracing::warn!(
633                        "❌ Reconnect attempt {attempt} failed: {e}, retrying in {delay:?}"
634                    );
635                    tokio::select! {
636                        _ = tokio::time::sleep(delay) => {}
637                        _ = self.reconnect_notify.notified() => {
638                            tracing::debug!("Explicit reconnect request interrupted reconnect backoff");
639                        }
640                    }
641                    if Arc::strong_count(self) <= 1 {
642                        tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
643                        return;
644                    }
645                    if self.auto_reconnect_cancelled(generation) {
646                        tracing::debug!(
647                            "Stopping signaling auto-reconnect cycle after explicit disconnect"
648                        );
649                        return;
650                    }
651                }
652            }
653        }
654
655        // All retries exhausted — enter cooldown, then allow future wakeups
656        tracing::error!("Reconnect failed after {attempt} attempts, entering cooldown");
657        let cooldown = std::time::Duration::from_secs(cfg.max_delay.max(1) * 2);
658        tokio::select! {
659            _ = tokio::time::sleep(cooldown) => {}
660            _ = self.reconnect_notify.notified() => {
661                tracing::debug!("Explicit reconnect request interrupted reconnect cooldown");
662            }
663        }
664        if self.auto_reconnect_cancelled(generation) {
665            tracing::debug!(
666                "Signaling auto-reconnect cooldown ended after explicit disconnect suppression"
667            );
668        }
669        // After cooldown, the loop returns to notify.notified() and can be woken again
670    }
671
672    /// Test-only convenience constructor: create, connect, and return a client.
673    ///
674    /// The returned client has no `actor_id` / `credential_state` bound, so the
675    /// signaling URL carries no identity query parameters — mock-actrix will
676    /// not bind the WebSocket to any registry entry. Use this only for tests
677    /// that explicitly exercise the unbound path; integration tests that need
678    /// peer-to-peer relay should use [`Self::connect_to_with_identity`].
679    #[cfg(feature = "test-utils")]
680    pub async fn connect_to(url: &str) -> NetworkResult<Arc<Self>> {
681        let config = SignalingConfig {
682            server_url: url.parse()?,
683            connection_timeout: 5,
684            heartbeat_interval: 30,
685            reconnect_config: ReconnectConfig::default(),
686            auth_config: None,
687            webrtc_role: None,
688        };
689
690        let client = Arc::new(Self::new(config));
691        client.start_reconnect_manager();
692        client.connect().await?;
693        Ok(client)
694    }
695
696    /// Test-only constructor that pins identity *before* the WebSocket
697    /// handshake so mock-actrix can bind the connection to the actor on
698    /// register (`?actor_id=…` query parameter). Required by integration
699    /// tests that rely on peer-to-peer signaling relay — without this binding
700    /// mock-actrix drops outbound relays for "unbound target".
701    #[cfg(feature = "test-utils")]
702    pub async fn connect_to_with_identity(
703        url: &str,
704        actor_id: ActrId,
705        credential_state: CredentialState,
706    ) -> NetworkResult<Arc<Self>> {
707        let config = SignalingConfig {
708            server_url: url.parse()?,
709            connection_timeout: 5,
710            heartbeat_interval: 30,
711            reconnect_config: ReconnectConfig::default(),
712            auth_config: None,
713            webrtc_role: None,
714        };
715
716        let client = Arc::new(Self::new(config));
717        client.set_actor_id(actor_id).await;
718        client.set_credential_state(credential_state).await;
719        client.start_reconnect_manager();
720        client.connect().await?;
721        Ok(client)
722    }
723
724    /// alive integrate down a envelope ID
725    async fn next_envelope_id(&self) -> String {
726        let mut counter = self.envelope_counter.lock().await;
727        *counter += 1;
728        format!("env-{}", *counter)
729    }
730
731    /// Create SignalingEnvelope
732    async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
733        SignalingEnvelope {
734            envelope_version: 1,
735            envelope_id: self.next_envelope_id().await,
736            reply_for: None,
737            timestamp: prost_types::Timestamp {
738                seconds: chrono::Utc::now().timestamp(),
739                nanos: 0,
740            },
741            traceparent: None,
742            tracestate: None,
743            flow: Some(flow),
744        }
745    }
746
747    /// Reset inbound channel for a fresh session (useful after disconnects).
748    async fn reset_inbound_channel(&self) {
749        self.drop_pending_replies("inbound channel reset").await;
750        self.drop_pending_pongs("inbound channel reset").await;
751
752        let (tx, rx) = mpsc::unbounded_channel();
753        *self.inbound_tx.lock().await = tx;
754        *self.inbound_rx.lock().await = rx;
755    }
756
757    async fn drop_pending_replies(&self, reason: &'static str) {
758        let dropped = {
759            let mut pending = self.pending_replies.lock().await;
760            let dropped = pending.len();
761            pending.clear();
762            dropped
763        };
764
765        if dropped > 0 {
766            tracing::debug!(reason, dropped, "Dropping pending signaling reply waiters");
767        }
768    }
769
770    async fn drop_pending_pongs(&self, reason: &'static str) {
771        let dropped = {
772            let mut pending = self.pending_pongs.lock().await;
773            let dropped = pending.len();
774            pending.clear();
775            dropped
776        };
777
778        if dropped > 0 {
779            tracing::debug!(reason, dropped, "Dropping pending signaling pong waiters");
780        }
781    }
782
783    /// Build signaling URL with actor identity and Ed25519 credential params for authentication.
784    ///
785    /// Passes `actor_id`, `key_id`, `claims` (base64), `signature` (base64) as URL query params
786    /// so the signaling server can validate the credential before upgrading the WebSocket.
787    async fn build_url_with_identity(&self) -> Url {
788        let mut url = self.config.server_url.clone();
789        let actor_id_opt = self.actor_id.lock().await.clone();
790        if let Some(actor_id) = actor_id_opt {
791            let actor_str = actr_protocol::ActrId::to_string_repr(&actor_id);
792            url.query_pairs_mut().append_pair("actor_id", &actor_str);
793        }
794
795        // Pass Ed25519 credential in URL for initial WS auth
796        let cred_state_opt = self.credential_state.lock().await.clone();
797        if let Some(cred_state) = cred_state_opt {
798            let cred = cred_state.credential().await;
799            let claims_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.claims);
800            let sig_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.signature);
801            url.query_pairs_mut()
802                .append_pair("key_id", &cred.key_id.to_string())
803                .append_pair("claims", &claims_b64)
804                .append_pair("signature", &sig_b64);
805        }
806
807        // Add WebRTC role preference if configured
808        if let Some(role) = &self.config.webrtc_role {
809            url.query_pairs_mut().append_pair("webrtc_role", role);
810        }
811
812        url
813    }
814
815    fn redact_signaling_url_for_log(url: &Url) -> String {
816        let mut redacted = url.clone();
817        let pairs: Vec<(String, String)> = redacted
818            .query_pairs()
819            .map(|(key, value)| {
820                let redacted_value = match key.to_ascii_lowercase().as_str() {
821                    "claims" | "signature" | "token" | "authorization" | "bearer"
822                    | "access_token" | "api_key" => "REDACTED".to_string(),
823                    _ => value.into_owned(),
824                };
825                (key.into_owned(), redacted_value)
826            })
827            .collect();
828
829        redacted.set_query(None);
830        if !pairs.is_empty() {
831            let mut query = redacted.query_pairs_mut();
832            for (key, value) in pairs {
833                query.append_pair(&key, &value);
834            }
835        }
836
837        redacted.to_string()
838    }
839
840    fn auto_reconnect_cancelled(&self, generation: u64) -> bool {
841        self.auto_reconnect_suppressed.load(Ordering::Acquire)
842            || self.reconnect_generation.load(Ordering::Acquire) != generation
843    }
844
845    /// Establish a single signaling WebSocket connection attempt, honoring connection_timeout.
846    ///
847    /// This does not perform any retry logic; callers that want retries should wrap this.
848    async fn establish_connection_once(&self) -> NetworkResult<()> {
849        self.establish_connection_once_with_intent(ConnectIntent::Explicit)
850            .await
851    }
852
853    async fn establish_connection_once_for_auto_reconnect(
854        &self,
855        generation: u64,
856    ) -> NetworkResult<()> {
857        self.establish_connection_once_with_intent(ConnectIntent::AutoReconnect { generation })
858            .await
859    }
860
861    async fn establish_connection_once_with_intent(
862        &self,
863        intent: ConnectIntent,
864    ) -> NetworkResult<()> {
865        // Guard: Check if already connected (handles rare TOCTOU scenarios)
866        if self.connected.load(Ordering::Acquire) {
867            tracing::debug!("Connection already established, skipping establish_connection_once()");
868            return Ok(());
869        }
870
871        let url = self.build_url_with_identity().await;
872        let timeout_secs = self.config.connection_timeout;
873        tracing::debug!(
874            "Establishing connection to URL: {}",
875            Self::redact_signaling_url_for_log(&url)
876        );
877        // After disconnection, data written to the buffer will continue to be sent once the network recovers
878        let config = WebSocketConfig::default().write_buffer_size(0);
879        // Connect with an optional timeout. A timeout of 0 means "no timeout".
880        let connect_result = if timeout_secs == 0 {
881            connect_async_with_config(url.as_str(), Some(config), false).await
882        } else {
883            let timeout_duration = std::time::Duration::from_secs(timeout_secs);
884            tokio::time::timeout(
885                timeout_duration,
886                connect_async_with_config(url.as_str(), Some(config), false),
887            )
888            .await
889            .map_err(|_| {
890                NetworkError::ConnectionError(format!(
891                    "Signaling connect timeout after {}s",
892                    timeout_secs
893                ))
894            })?
895        }?;
896
897        let (ws_stream, _) = connect_result;
898
899        // Split read/write halves and initialize client state
900        let (sink, stream) = ws_stream.split();
901
902        if let ConnectIntent::AutoReconnect { generation } = intent
903            && self.auto_reconnect_cancelled(generation)
904        {
905            tracing::debug!(
906                generation,
907                "Discarding completed signaling auto-reconnect after explicit disconnect"
908            );
909            let mut sink = sink;
910            if let Err(e) = sink.close().await {
911                tracing::warn!(
912                    "Signaling auto-reconnect socket close failed after cancellation: {}",
913                    e
914                );
915            }
916            return Err(NetworkError::ConnectionError(
917                "Signaling auto-reconnect was cancelled by explicit disconnect".to_string(),
918            ));
919        }
920
921        *self.ws_sink.lock().await = Some(sink);
922        *self.ws_stream.lock().await = Some(stream);
923        self.connected.store(true, Ordering::Release);
924        self.auto_reconnect_suppressed
925            .store(false, Ordering::Release);
926        self.last_pong.store(current_unix_secs(), Ordering::Release);
927        // Invoke hook synchronously, then broadcast for other subscribers
928        self.invoke_hook(HookEvent::SignalingConnected).await;
929        let _ = self.event_tx.send(SignalingEvent::Connected);
930
931        self.stats.connections.fetch_add(1, Ordering::Relaxed);
932
933        Ok(())
934    }
935
936    /// Connect to signaling server with retry and exponential backoff based on reconnect_config.
937    async fn connect_with_retries(&self) -> NetworkResult<()> {
938        use actr_framework::ExponentialBackoff;
939
940        let cfg = &self.config.reconnect_config;
941
942        // If reconnect is disabled, just attempt once.
943        if !cfg.enabled {
944            return self.establish_connection_once().await;
945        }
946
947        let backoff = ExponentialBackoff::builder()
948            .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
949            .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
950            .max_retries(cfg.max_attempts)
951            .with_jitter()
952            .build();
953
954        let mut last_err = None;
955
956        // First attempt immediately (delay = 0), subsequent delays from backoff
957        for (attempt, delay) in std::iter::once(std::time::Duration::ZERO)
958            .chain(backoff)
959            .enumerate()
960        {
961            let attempt = attempt as u32 + 1;
962            self.invoke_hook(HookEvent::SignalingConnectStart { attempt })
963                .await;
964            if delay > std::time::Duration::ZERO {
965                tracing::info!("Retry signaling connect after {delay:?} (attempt {attempt})");
966                tokio::select! {
967                    _ = tokio::time::sleep(delay) => {}
968                    _ = self.reconnect_notify.notified() => {
969                        tracing::debug!("Explicit reconnect request interrupted signaling connect backoff");
970                    }
971                }
972            }
973
974            match self.establish_connection_once().await {
975                Ok(()) => return Ok(()),
976                Err(e) => {
977                    tracing::warn!("Signaling connect attempt {attempt} failed: {e:?}");
978                    last_err = Some(e);
979                }
980            }
981        }
982
983        let total = cfg.max_attempts + 1; // backoff max_retries + first attempt
984        tracing::error!("Signaling connect failed after {total} attempts, giving up");
985        Err(last_err.unwrap_or_else(|| {
986            NetworkError::ConnectionError("All connection attempts failed".to_string())
987        }))
988    }
989
990    /// Send envelope and wait for response with timeout and error handling.
991    #[cfg_attr(
992        feature = "opentelemetry",
993        tracing::instrument(skip_all, fields(envelope_id = %envelope.envelope_id))
994    )]
995    async fn send_envelope_and_wait_response(
996        &self,
997        envelope: SignalingEnvelope,
998    ) -> NetworkResult<SignalingEnvelope> {
999        let reply_for = envelope.envelope_id.clone();
1000
1001        // Register waiter before sending
1002        let (tx, rx) = oneshot::channel();
1003        self.pending_replies
1004            .lock()
1005            .await
1006            .insert(reply_for.clone(), tx);
1007
1008        if let Err(e) = self.send_envelope(envelope).await {
1009            // Cleanup waiter on immediate send failure to avoid leaks.
1010            self.pending_replies.lock().await.remove(&reply_for);
1011            return Err(e);
1012        }
1013
1014        let result =
1015            tokio::time::timeout(std::time::Duration::from_secs(RESPONSE_TIMEOUT_SECS), rx).await;
1016        // Clean up waiter on timeout
1017        if result.is_err() {
1018            self.pending_replies.lock().await.remove(&reply_for);
1019        }
1020
1021        let response_envelope = result
1022            .map_err(|_| {
1023                NetworkError::ConnectionError(
1024                    "Timed out waiting for signaling response".to_string(),
1025                )
1026            })?
1027            .map_err(|_| {
1028                NetworkError::ConnectionError(
1029                    "Receiver dropped while waiting for signaling response".to_string(),
1030                )
1031            })?;
1032
1033        Ok(response_envelope)
1034    }
1035
1036    /// Spawn background receiver to demux envelopes by reply_for.
1037    async fn start_receiver(&self) {
1038        let mut stream_guard = self.ws_stream.lock().await;
1039        if stream_guard.is_none() {
1040            return;
1041        }
1042
1043        let mut stream = stream_guard.take().expect("stream exists");
1044        let pending = self.pending_replies.clone();
1045        let inbound_tx = { self.inbound_tx.lock().await.clone() };
1046        let stats = self.stats.clone();
1047        let connected = self.connected.clone();
1048        let event_tx = self.event_tx.clone();
1049        let last_pong = self.last_pong.clone();
1050        let pending_pongs = self.pending_pongs.clone();
1051        let reconnect_notify = self.reconnect_notify.clone();
1052        let reconnect_enabled = self.config.reconnect_config.enabled;
1053        let hook_callback = self.hook_callback.get().cloned();
1054        let handle = tokio::spawn(async move {
1055            while let Some(msg) = stream.next().await {
1056                match msg {
1057                    Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
1058                        // Any inbound traffic counts as liveness
1059                        last_pong.store(current_unix_secs(), Ordering::Release);
1060                        match SignalingEnvelope::decode(&data[..]) {
1061                            Ok(envelope) => {
1062                                #[cfg(feature = "opentelemetry")]
1063                                let span = {
1064                                    let span = tracing::info_span!("signaling.receive_envelope", envelope_id = %envelope.envelope_id);
1065                                    span.set_parent(extract_trace_context(&envelope));
1066                                    span
1067                                };
1068
1069                                stats.messages_received.fetch_add(1, Ordering::Relaxed);
1070                                tracing::debug!("Received message: {:?}", envelope);
1071                                if let Some(reply_for) = envelope.reply_for.clone() {
1072                                    if let Some(sender) = pending.lock().await.remove(&reply_for) {
1073                                        #[cfg(feature = "opentelemetry")]
1074                                        let _ = span.enter();
1075                                        if let Err(e) = sender.send(envelope) {
1076                                            stats.errors.fetch_add(1, Ordering::Relaxed);
1077                                            tracing::warn!(
1078                                                "Failed to send reply envelope to waiter: {e:?}",
1079                                            );
1080                                        }
1081                                        continue;
1082                                    }
1083                                }
1084                                tracing::debug!(
1085                                    "Unmatched or push message -> forward to inbound channel"
1086                                );
1087                                // Unmatched or push message -> forward to inbound channel
1088                                if let Err(e) = inbound_tx.send(envelope) {
1089                                    stats.errors.fetch_add(1, Ordering::Relaxed);
1090                                    tracing::warn!(
1091                                        "Failed to send envelope to inbound channel: {e:?}"
1092                                    );
1093                                }
1094                            }
1095                            Err(e) => {
1096                                stats.errors.fetch_add(1, Ordering::Relaxed);
1097                                tracing::warn!("Failed to decode SignalingEnvelope: {e}");
1098                            }
1099                        }
1100                    }
1101                    Ok(tokio_tungstenite::tungstenite::Message::Pong(payload)) => {
1102                        tracing::debug!("Received pong");
1103                        last_pong.store(current_unix_secs(), Ordering::Release);
1104                        if let Some(sender) = pending_pongs.lock().await.remove(&payload.to_vec()) {
1105                            let _ = sender.send(());
1106                        }
1107                    }
1108                    Ok(tokio_tungstenite::tungstenite::Message::Ping(_)) => {
1109                        tracing::debug!("Received ping");
1110                        last_pong.store(current_unix_secs(), Ordering::Release);
1111                    }
1112                    Ok(other) => {
1113                        tracing::warn!("Received non-binary frame, ignoring: {other:?}");
1114                    }
1115                    Err(e) => {
1116                        stats.errors.fetch_add(1, Ordering::Relaxed);
1117                        tracing::error!("Signaling receive error: {e}");
1118                        break;
1119                    }
1120                }
1121            }
1122
1123            tracing::warn!("Stream terminated");
1124            // If explicit disconnect already marked the client disconnected,
1125            // do not start an automatic reconnect cycle for the intentional
1126            // close. The disconnect path publishes its own Manual event.
1127            let was_connected = connected.swap(false, Ordering::AcqRel);
1128            Self::publish_disconnected_transition(
1129                was_connected,
1130                &stats,
1131                &event_tx,
1132                hook_callback,
1133                DisconnectReason::StreamEnded,
1134                reconnect_enabled.then_some(&reconnect_notify),
1135            )
1136            .await;
1137            pending_pongs.lock().await.clear();
1138        });
1139
1140        *self.receiver_task.lock().await = Some(handle);
1141    }
1142
1143    /// Spawn background ping task to detect half-open connections where writes do not fail but peer is gone.
1144    /// fixme: merge to heartbeat task
1145    async fn start_ping_task(&self) {
1146        let mut existing = self.ping_task.lock().await;
1147        if let Some(handle) = existing.as_ref() {
1148            if handle.is_finished() {
1149                existing.take();
1150            } else {
1151                return;
1152            }
1153        }
1154
1155        let sink = self.ws_sink.clone();
1156        let connected = self.connected.clone();
1157        let stats = self.stats.clone();
1158        let event_tx = self.event_tx.clone();
1159        let last_pong = self.last_pong.clone();
1160        let receiver_task_clone = Arc::clone(&self.receiver_task);
1161        let reconnect_notify = self.reconnect_notify.clone();
1162        let reconnect_enabled = self.config.reconnect_config.enabled;
1163        let hook_callback = self.hook_callback.get().cloned();
1164
1165        let handle = tokio::spawn(async move {
1166            loop {
1167                tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
1168
1169                if !connected.load(Ordering::Acquire) {
1170                    break;
1171                }
1172
1173                // Send ping; mark disconnect on failure.
1174                let mut disconnect_reason = None;
1175                {
1176                    let mut sink_guard = sink.lock().await;
1177                    if let Some(sink) = sink_guard.as_mut() {
1178                        match tokio::time::timeout(
1179                            std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1180                            sink.send(tokio_tungstenite::tungstenite::Message::Ping(
1181                                Vec::new().into(),
1182                            )),
1183                        )
1184                        .await
1185                        {
1186                            Ok(Ok(())) => {}
1187                            Ok(Err(e)) => {
1188                                tracing::warn!("Signaling ping send failed: {e}");
1189                                disconnect_reason = Some(DisconnectReason::PingSendFailed);
1190                            }
1191                            Err(_) => {
1192                                tracing::warn!("Signaling ping send timed out");
1193                                disconnect_reason = Some(DisconnectReason::PingSendFailed);
1194                            }
1195                        }
1196                    } else {
1197                        tracing::warn!("Signaling not connected");
1198                        disconnect_reason = Some(DisconnectReason::PingSendFailed);
1199                    }
1200                }
1201
1202                if let Some(reason) = disconnect_reason {
1203                    let was_connected = connected.swap(false, Ordering::AcqRel);
1204                    Self::publish_disconnected_transition(
1205                        was_connected,
1206                        &stats,
1207                        &event_tx,
1208                        hook_callback.clone(),
1209                        reason,
1210                        reconnect_enabled.then_some(&reconnect_notify),
1211                    )
1212                    .await;
1213                    break;
1214                }
1215
1216                // Check for stale pong
1217                let now = current_unix_secs();
1218                let last = last_pong.load(Ordering::Acquire);
1219                if now.saturating_sub(last) > PONG_TIMEOUT_SECS {
1220                    tracing::warn!(
1221                        "Signaling pong timeout (last seen {}s ago), marking disconnected",
1222                        now.saturating_sub(last)
1223                    );
1224                    if let Some(handle) = receiver_task_clone.lock().await.take() {
1225                        handle.abort();
1226                    }
1227                    let was_connected = connected.swap(false, Ordering::AcqRel);
1228                    Self::publish_disconnected_transition(
1229                        was_connected,
1230                        &stats,
1231                        &event_tx,
1232                        hook_callback.clone(),
1233                        DisconnectReason::PongTimeout,
1234                        reconnect_enabled.then_some(&reconnect_notify),
1235                    )
1236                    .await;
1237                    break;
1238                }
1239            }
1240        });
1241
1242        *existing = Some(handle);
1243    }
1244
1245    async fn disconnect_internal(&self, suppress_auto_reconnect: bool) -> NetworkResult<()> {
1246        if suppress_auto_reconnect {
1247            self.reconnect_generation.fetch_add(1, Ordering::AcqRel);
1248            self.auto_reconnect_suppressed
1249                .store(true, Ordering::Release);
1250            self.reconnect_notify.notify_waiters();
1251        }
1252
1253        self.drop_pending_replies("signaling disconnect").await;
1254        self.drop_pending_pongs("signaling disconnect").await;
1255        let was_connected = self.connected.swap(false, Ordering::AcqRel);
1256
1257        // Stop background tasks before taking the WebSocket sink/stream locks.
1258        // A ping or receiver task can be inside a socket operation while holding
1259        // one of those locks; aborting first keeps disconnect from waiting on
1260        // the task it is about to shut down.
1261        let ping_handle = match tokio::time::timeout(
1262            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1263            self.ping_task.lock(),
1264        )
1265        .await
1266        {
1267            Ok(mut task_guard) => task_guard.take(),
1268            Err(_) => {
1269                tracing::warn!("Timed out waiting for signaling ping task lock during disconnect");
1270                None
1271            }
1272        };
1273        if let Some(handle) = ping_handle {
1274            handle.abort();
1275        }
1276
1277        let receiver_handle = match tokio::time::timeout(
1278            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1279            self.receiver_task.lock(),
1280        )
1281        .await
1282        {
1283            Ok(mut task_guard) => task_guard.take(),
1284            Err(_) => {
1285                tracing::warn!(
1286                    "Timed out waiting for signaling receiver task lock during disconnect"
1287                );
1288                None
1289            }
1290        };
1291        if let Some(handle) = receiver_handle {
1292            handle.abort();
1293        }
1294
1295        // Fetch and close the sink without holding the mutex during the close
1296        // await. The lock itself is bounded because a stalled send can hold it
1297        // on broken mobile network transitions.
1298        let sink = match tokio::time::timeout(
1299            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1300            self.ws_sink.lock(),
1301        )
1302        .await
1303        {
1304            Ok(mut sink_guard) => sink_guard.take(),
1305            Err(_) => {
1306                tracing::warn!(
1307                    "Timed out waiting for signaling WebSocket sink lock during disconnect"
1308                );
1309                None
1310            }
1311        };
1312
1313        if let Some(mut sink) = sink {
1314            match tokio::time::timeout(
1315                std::time::Duration::from_secs(DISCONNECT_CLOSE_TIMEOUT_SECS),
1316                sink.close(),
1317            )
1318            .await
1319            {
1320                Ok(Ok(())) => {}
1321                Ok(Err(e)) => {
1322                    tracing::warn!("Signaling WebSocket close failed during disconnect: {}", e);
1323                }
1324                Err(_) => {
1325                    tracing::warn!(
1326                        "Signaling WebSocket close timed out during disconnect; continuing cleanup"
1327                    );
1328                }
1329            }
1330        }
1331
1332        match tokio::time::timeout(
1333            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1334            self.ws_stream.lock(),
1335        )
1336        .await
1337        {
1338            Ok(mut stream_guard) => {
1339                stream_guard.take();
1340            }
1341            Err(_) => {
1342                tracing::warn!(
1343                    "Timed out waiting for signaling WebSocket stream lock during disconnect"
1344                );
1345            }
1346        }
1347
1348        self.reset_inbound_channel().await;
1349
1350        // Invoke hook synchronously, then broadcast for other subscribers
1351        Self::publish_disconnected_transition(
1352            was_connected,
1353            &self.stats,
1354            &self.event_tx,
1355            self.hook_callback.get().cloned(),
1356            DisconnectReason::Manual,
1357            None,
1358        )
1359        .await;
1360
1361        Ok(())
1362    }
1363
1364    async fn connect_once_for_auto_reconnect(&self, generation: u64) -> NetworkResult<()> {
1365        if self.auto_reconnect_cancelled(generation) {
1366            return Err(NetworkError::ConnectionError(
1367                "Signaling auto-reconnect was cancelled".to_string(),
1368            ));
1369        }
1370
1371        if self.connected.load(Ordering::Acquire) {
1372            tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1373            return Ok(());
1374        }
1375
1376        match self
1377            .connecting
1378            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1379        {
1380            Ok(_) => {}
1381            Err(_) => {
1382                if self.connected.load(Ordering::Acquire) {
1383                    tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1384                    return Ok(());
1385                }
1386
1387                tracing::debug!(
1388                    "Another connection attempt in progress, waiting for state change..."
1389                );
1390                let result = self.wait_for_connection_result().await;
1391                if self.auto_reconnect_cancelled(generation) {
1392                    return Err(NetworkError::ConnectionError(
1393                        "Signaling auto-reconnect was cancelled".to_string(),
1394                    ));
1395                }
1396                return result;
1397            }
1398        }
1399
1400        if self.auto_reconnect_cancelled(generation) {
1401            self.connecting.store(false, Ordering::Release);
1402            return Err(NetworkError::ConnectionError(
1403                "Signaling auto-reconnect was cancelled".to_string(),
1404            ));
1405        }
1406
1407        if self.connected.load(Ordering::Acquire) {
1408            tracing::debug!("Connection completed by another task while acquiring lock");
1409            self.connecting.store(false, Ordering::Release);
1410            return Ok(());
1411        }
1412
1413        tracing::debug!(
1414            generation,
1415            "Acquired connection lock, establishing one auto-reconnect signaling attempt..."
1416        );
1417
1418        let result = self
1419            .establish_connection_once_for_auto_reconnect(generation)
1420            .await;
1421        self.connecting.store(false, Ordering::Release);
1422
1423        match result {
1424            Ok(()) => {
1425                if self.auto_reconnect_cancelled(generation) {
1426                    self.disconnect_internal(false).await?;
1427                    return Err(NetworkError::ConnectionError(
1428                        "Signaling auto-reconnect was cancelled".to_string(),
1429                    ));
1430                }
1431                self.start_receiver().await;
1432                self.start_ping_task().await;
1433                Ok(())
1434            }
1435            Err(e) => {
1436                if !self.auto_reconnect_cancelled(generation) {
1437                    let _ = self.event_tx.send(SignalingEvent::Disconnected {
1438                        reason: DisconnectReason::ConnectionFailed(e.to_string()),
1439                    });
1440                    tracing::error!("Connection attempt failed: {e}");
1441                }
1442                Err(e)
1443            }
1444        }
1445    }
1446
1447    /// Wait for ongoing connection attempt to complete (used when another task is connecting).
1448    ///
1449    /// Uses the broadcast channel to wait for a Connected event without recursion.
1450    async fn wait_for_connection_result(&self) -> NetworkResult<()> {
1451        let mut event_rx = self.event_tx.subscribe();
1452        let deadline = tokio::time::Instant::now()
1453            + std::time::Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS);
1454
1455        loop {
1456            tokio::select! {
1457                _ = tokio::time::sleep_until(deadline) => {
1458                    // Final check before giving up
1459                    if self.connected.load(Ordering::Acquire) {
1460                        tracing::debug!("Connection succeeded just before timeout");
1461                        return Ok(());
1462                    }
1463                    return Err(NetworkError::ConnectionError(
1464                        "Timeout waiting for concurrent connection attempt".to_string(),
1465                    ));
1466                }
1467                result = event_rx.recv() => {
1468                    match result {
1469                        Ok(SignalingEvent::Connected) => {
1470                            tracing::debug!("Connection established by another task");
1471                            return Ok(());
1472                        }
1473                        Ok(SignalingEvent::Disconnected { reason }) => {
1474                            return Err(NetworkError::ConnectionError(format!(
1475                                "Concurrent signaling connection failed: {reason:?}"
1476                            )));
1477                        }
1478                        Ok(_) => continue, // ConnectStart — keep waiting
1479                        Err(broadcast::error::RecvError::Lagged(n)) => {
1480                            tracing::warn!("Event receiver lagged by {n} events");
1481                            // Check current state after lag
1482                            if self.connected.load(Ordering::Acquire) {
1483                                return Ok(());
1484                            }
1485                            continue;
1486                        }
1487                        Err(broadcast::error::RecvError::Closed) => {
1488                            return Err(NetworkError::ConnectionError(
1489                                "Event channel closed while waiting for connection".to_string(),
1490                            ));
1491                        }
1492                    }
1493                }
1494            }
1495        }
1496    }
1497}
1498
1499#[async_trait]
1500impl SignalingClient for WebSocketSignalingClient {
1501    async fn connect(&self) -> NetworkResult<()> {
1502        // 🔐 Try to acquire "connecting" lock using compare-and-swap
1503        // This is the first checkpoint - moving the fast-path check after lock acquisition
1504        // eliminates the TOCTOU window where another task could have established a connection
1505        // between our fast-path check and lock acquisition.
1506        match self
1507            .connecting
1508            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1509        {
1510            Ok(_) => {
1511                // We successfully acquired the lock (CAS changed false → true)
1512                // Now we hold exclusive access to the connection establishment
1513            }
1514            Err(_) => {
1515                // CAS failed - either another task holds the lock, or a connection exists
1516                // Check if already connected before waiting
1517                if self.connected.load(Ordering::Acquire) {
1518                    tracing::debug!("Already connected, skipping connect()");
1519                    return Ok(());
1520                }
1521
1522                // Another task is connecting, wait for state change using broadcast channel
1523                tracing::debug!(
1524                    "Another connection attempt in progress, waiting for state change..."
1525                );
1526                return self.wait_for_connection_result().await;
1527            }
1528        }
1529
1530        // 🔐 We now hold the "connecting" lock exclusively
1531        // Re-check connected: another task may have finished connecting between our
1532        // CAS and this load (the AcqRel fence on CAS guarantees we see the latest
1533        // `connected` store from whoever last released `connecting`).
1534        if self.connected.load(Ordering::Acquire) {
1535            tracing::debug!("Connection completed by another task while acquiring lock");
1536            self.connecting.store(false, Ordering::Release);
1537            return Ok(());
1538        }
1539
1540        tracing::debug!("Acquired connection lock, establishing connection...");
1541
1542        // Perform actual connection
1543        let result = self.connect_with_retries().await;
1544
1545        // Clear "connecting" flag regardless of result
1546        self.connecting.store(false, Ordering::Release);
1547
1548        // Handle connection result
1549        match result {
1550            Ok(()) => {
1551                self.start_receiver().await;
1552                self.start_ping_task().await;
1553                Ok(())
1554            }
1555            Err(e) => {
1556                // Explicitly notify waiting tasks that connection failed
1557                let _ = self.event_tx.send(SignalingEvent::Disconnected {
1558                    reason: DisconnectReason::ConnectionFailed(e.to_string()),
1559                });
1560                tracing::error!("Connection failed: {e}");
1561                Err(e)
1562            }
1563        }
1564    }
1565
1566    async fn connect_once(&self) -> NetworkResult<()> {
1567        if self.connected.load(Ordering::Acquire) {
1568            tracing::debug!("Already connected, skipping connect_once()");
1569            return Ok(());
1570        }
1571
1572        match self
1573            .connecting
1574            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1575        {
1576            Ok(_) => {}
1577            Err(_) => {
1578                if self.connected.load(Ordering::Acquire) {
1579                    tracing::debug!("Already connected, skipping connect_once()");
1580                    return Ok(());
1581                }
1582
1583                tracing::debug!(
1584                    "Another connection attempt in progress, waiting for state change..."
1585                );
1586                return self.wait_for_connection_result().await;
1587            }
1588        }
1589
1590        if self.connected.load(Ordering::Acquire) {
1591            tracing::debug!("Connection completed by another task while acquiring lock");
1592            self.connecting.store(false, Ordering::Release);
1593            return Ok(());
1594        }
1595
1596        tracing::debug!(
1597            "Acquired connection lock, establishing one signaling connection attempt..."
1598        );
1599
1600        let result = self.establish_connection_once().await;
1601        self.connecting.store(false, Ordering::Release);
1602
1603        match result {
1604            Ok(()) => {
1605                self.start_receiver().await;
1606                self.start_ping_task().await;
1607                Ok(())
1608            }
1609            Err(e) => {
1610                let _ = self.event_tx.send(SignalingEvent::Disconnected {
1611                    reason: DisconnectReason::ConnectionFailed(e.to_string()),
1612                });
1613                tracing::error!("Connection attempt failed: {e}");
1614                Err(e)
1615            }
1616        }
1617    }
1618
1619    async fn disconnect(&self) -> NetworkResult<()> {
1620        self.disconnect_internal(true).await
1621    }
1622
1623    async fn probe_alive(&self, timeout: Duration) -> NetworkResult<()> {
1624        if !self.connected.load(Ordering::Acquire) {
1625            return Err(NetworkError::ConnectionError(
1626                "Signaling client is not connected".to_string(),
1627            ));
1628        }
1629
1630        let probe_id = self.probe_counter.fetch_add(1, Ordering::Relaxed) + 1;
1631        let payload =
1632            format!("actr-signaling-probe-{probe_id}-{}", current_unix_secs()).into_bytes();
1633        let (tx, rx) = oneshot::channel();
1634        self.pending_pongs.lock().await.insert(payload.clone(), tx);
1635
1636        let send_result = {
1637            let mut sink_guard = self.ws_sink.lock().await;
1638            match sink_guard.as_mut() {
1639                Some(sink) => sink
1640                    .send(tokio_tungstenite::tungstenite::Message::Ping(
1641                        payload.clone().into(),
1642                    ))
1643                    .await
1644                    .map_err(|e| {
1645                        NetworkError::ConnectionError(format!("Signaling probe ping failed: {e}"))
1646                    }),
1647                None => Err(NetworkError::ConnectionError(
1648                    "Signaling probe failed: WebSocket sink is not available".to_string(),
1649                )),
1650            }
1651        };
1652
1653        if let Err(e) = send_result {
1654            self.pending_pongs.lock().await.remove(&payload);
1655            let was_connected = self.connected.swap(false, Ordering::AcqRel);
1656            Self::publish_disconnected_transition(
1657                was_connected,
1658                &self.stats,
1659                &self.event_tx,
1660                self.hook_callback.get().cloned(),
1661                DisconnectReason::PingSendFailed,
1662                None,
1663            )
1664            .await;
1665            return Err(e);
1666        }
1667
1668        match tokio::time::timeout(timeout, rx).await {
1669            Ok(Ok(())) => {
1670                self.last_pong.store(current_unix_secs(), Ordering::Release);
1671                Ok(())
1672            }
1673            Ok(Err(_)) => {
1674                self.pending_pongs.lock().await.remove(&payload);
1675                Err(NetworkError::ConnectionError(
1676                    "Signaling probe pong waiter dropped".to_string(),
1677                ))
1678            }
1679            Err(_) => {
1680                self.pending_pongs.lock().await.remove(&payload);
1681                Err(NetworkError::TimeoutError(format!(
1682                    "Timed out waiting for signaling probe pong after {}ms",
1683                    timeout.as_millis()
1684                )))
1685            }
1686        }
1687    }
1688
1689    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1690    async fn send_register_request(
1691        &self,
1692        request: RegisterRequest,
1693    ) -> NetworkResult<RegisterResponse> {
1694        // Create PeerToSignaling stream process (Register front )
1695        let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
1696            payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
1697        });
1698
1699        let envelope = self.create_envelope(flow).await;
1700        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1701
1702        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1703        {
1704            if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
1705                server_to_actr.payload
1706            {
1707                return Ok(response);
1708            }
1709        }
1710
1711        Err(NetworkError::ConnectionError(
1712            "Invalid registration response".to_string(),
1713        ))
1714    }
1715
1716    #[cfg_attr(
1717        feature = "opentelemetry",
1718        tracing::instrument(skip_all, fields(actor_id = %actor_id))
1719    )]
1720    async fn send_unregister_request(
1721        &self,
1722        actor_id: ActrId,
1723        credential: AIdCredential,
1724        reason: Option<String>,
1725    ) -> NetworkResult<UnregisterResponse> {
1726        // Build UnregisterRequest payload
1727        let request = UnregisterRequest {
1728            actr_id: actor_id.clone(),
1729            reason,
1730        };
1731
1732        // Wrap into ActrToSignaling flow
1733        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1734            source: actor_id,
1735            credential,
1736            payload: Some(actr_to_signaling::Payload::UnregisterRequest(request)),
1737        });
1738
1739        // Send envelope (fire-and-forget)
1740        let envelope = self.create_envelope(flow).await;
1741        self.send_envelope(envelope).await?;
1742
1743        // Do not wait for UnregisterResponse here because the signaling stream
1744        // is also consumed by WebRtcCoordinator. Waiting could race with that loop
1745        // and lead to spurious timeouts. Treat Unregister as best-effort.
1746        // not wait for the response , because the signaling stream have multi customers use it, fixme: should wait for the response
1747        Ok(UnregisterResponse {
1748            result: Some(actr_protocol::unregister_response::Result::Success(
1749                actr_protocol::unregister_response::UnregisterOk {},
1750            )),
1751        })
1752    }
1753
1754    #[cfg_attr(
1755        feature = "opentelemetry",
1756        tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1757    )]
1758    async fn send_heartbeat(
1759        &self,
1760        actor_id: ActrId,
1761        credential: AIdCredential,
1762        availability: ServiceAvailabilityState,
1763        power_reserve: f32,
1764        mailbox_backlog: f32,
1765    ) -> NetworkResult<Pong> {
1766        let ping = Ping {
1767            availability: availability as i32,
1768            power_reserve,
1769            mailbox_backlog,
1770            sticky_client_ids: vec![], // TODO: Implement sticky session tracking
1771        };
1772
1773        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1774            source: actor_id,
1775            credential,
1776            payload: Some(actr_to_signaling::Payload::Ping(ping)),
1777        });
1778
1779        let envelope = self.create_envelope(flow).await;
1780        let reply_for = envelope.envelope_id.clone();
1781
1782        // Register waiter before sending
1783        let (tx, rx) = oneshot::channel();
1784        self.pending_replies
1785            .lock()
1786            .await
1787            .insert(reply_for.clone(), tx);
1788
1789        if let Err(e) = self.send_envelope(envelope).await {
1790            // Cleanup waiter on immediate send failure to avoid leaks.
1791            self.pending_replies.lock().await.remove(&reply_for);
1792            return Err(e);
1793        }
1794
1795        // Wait for response
1796        let response_envelope = rx.await.map_err(|_| {
1797            NetworkError::ConnectionError(
1798                "Receiver dropped while waiting for heartbeat response".to_string(),
1799            )
1800        })?;
1801
1802        // Extract Pong from response, or handle Error response
1803        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1804        {
1805            match server_to_actr.payload {
1806                Some(signaling_to_actr::Payload::Pong(pong)) => {
1807                    return Ok(pong);
1808                }
1809                Some(signaling_to_actr::Payload::Error(err)) => {
1810                    // Check if it's a credential expired error (401)
1811                    if err.code == 401 {
1812                        return Err(NetworkError::CredentialExpired(err.message));
1813                    }
1814                    return Err(NetworkError::AuthenticationError(format!(
1815                        "{} ({})",
1816                        err.message, err.code
1817                    )));
1818                }
1819                _ => {}
1820            }
1821        }
1822
1823        Err(NetworkError::ConnectionError(
1824            "Received response but not a Pong message".to_string(),
1825        ))
1826    }
1827
1828    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1829    async fn send_route_candidates_request(
1830        &self,
1831        actor_id: ActrId,
1832        credential: AIdCredential,
1833        request: RouteCandidatesRequest,
1834    ) -> NetworkResult<RouteCandidatesResponse> {
1835        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1836            source: actor_id,
1837            credential,
1838            payload: Some(actr_to_signaling::Payload::RouteCandidatesRequest(request)),
1839        });
1840
1841        let envelope = self.create_envelope(flow).await;
1842        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1843
1844        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1845        {
1846            match server_to_actr.payload {
1847                Some(signaling_to_actr::Payload::RouteCandidatesResponse(response)) => {
1848                    return Ok(response);
1849                }
1850                Some(signaling_to_actr::Payload::Error(err)) => {
1851                    return Err(NetworkError::ServiceDiscoveryError(format!(
1852                        "{} ({})",
1853                        err.message, err.code
1854                    )));
1855                }
1856                _ => {}
1857            }
1858        }
1859
1860        Err(NetworkError::ConnectionError(
1861            "Invalid route candidates response".to_string(),
1862        ))
1863    }
1864
1865    async fn get_signing_key(
1866        &self,
1867        actor_id: ActrId,
1868        credential: AIdCredential,
1869        key_id: u32,
1870    ) -> NetworkResult<(u32, Vec<u8>)> {
1871        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1872            source: actor_id,
1873            credential,
1874            payload: Some(actr_to_signaling::Payload::GetSigningKeyRequest(
1875                GetSigningKeyRequest { key_id },
1876            )),
1877        });
1878
1879        let envelope = self.create_envelope(flow).await;
1880        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1881
1882        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1883        {
1884            match server_to_actr.payload {
1885                Some(signaling_to_actr::Payload::GetSigningKeyResponse(resp)) => {
1886                    return Ok((resp.key_id, resp.pubkey.to_vec()));
1887                }
1888                Some(signaling_to_actr::Payload::Error(err)) => {
1889                    return Err(NetworkError::ConnectionError(format!(
1890                        "get_signing_key failed: {} ({})",
1891                        err.message, err.code
1892                    )));
1893                }
1894                _ => {}
1895            }
1896        }
1897
1898        Err(NetworkError::ConnectionError(
1899            "get_signing_key: invalid response".to_string(),
1900        ))
1901    }
1902
1903    #[cfg_attr(
1904        feature = "opentelemetry",
1905        tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1906    )]
1907    async fn send_credential_update_request(
1908        &self,
1909        actor_id: ActrId,
1910        credential: AIdCredential,
1911    ) -> NetworkResult<RegisterResponse> {
1912        let request = CredentialUpdateRequest {
1913            actr_id: actor_id.clone(),
1914        };
1915
1916        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1917            source: actor_id,
1918            credential,
1919            payload: Some(actr_to_signaling::Payload::CredentialUpdateRequest(request)),
1920        });
1921
1922        let envelope = self.create_envelope(flow).await;
1923        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1924
1925        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1926        {
1927            match server_to_actr.payload {
1928                Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
1929                    return Ok(response);
1930                }
1931                Some(signaling_to_actr::Payload::Error(err)) => {
1932                    return Err(NetworkError::ConnectionError(format!(
1933                        "Credential update failed: {} ({})",
1934                        err.message, err.code
1935                    )));
1936                }
1937                _ => {}
1938            }
1939        }
1940
1941        Err(NetworkError::ConnectionError(
1942            "Invalid credential update response".to_string(),
1943        ))
1944    }
1945
1946    #[cfg_attr(
1947        feature = "opentelemetry",
1948        tracing::instrument(level = "debug", skip_all, fields(envelope_id = %envelope.envelope_id))
1949    )]
1950    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
1951        #[cfg(feature = "opentelemetry")]
1952        let envelope = {
1953            let mut envelope = envelope;
1954            trace::inject_span_context(&tracing::Span::current(), &mut envelope);
1955            envelope
1956        };
1957
1958        // Check connection state first to avoid sending on stale/closed connections
1959        // This prevents "Broken pipe" errors when ws_sink exists but connection is dead
1960        if !self.is_connected() {
1961            return Err(NetworkError::ConnectionError(
1962                "Cannot send: WebSocket not connected".to_string(),
1963            ));
1964        }
1965
1966        let mut sink_guard = self.ws_sink.lock().await;
1967
1968        if let Some(sink) = sink_guard.as_mut() {
1969            // using protobuf binary serialization
1970            let mut buf = Vec::new();
1971            envelope.encode(&mut buf)?;
1972            let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
1973            match tokio::time::timeout(
1974                std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1975                sink.send(msg),
1976            )
1977            .await
1978            {
1979                Ok(Ok(())) => {}
1980                Ok(Err(e)) => return Err(e.into()),
1981                Err(_) => {
1982                    self.connected.store(false, Ordering::Release);
1983                    return Err(NetworkError::ConnectionError(
1984                        "Signaling WebSocket send timed out".to_string(),
1985                    ));
1986                }
1987            }
1988
1989            self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1990            tracing::debug!("Stats: {:?}", self.stats.snapshot());
1991            Ok(())
1992        } else {
1993            Err(NetworkError::ConnectionError("Not connected".to_string()))
1994        }
1995    }
1996
1997    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
1998        let mut rx = self.inbound_rx.lock().await;
1999        match rx.recv().await {
2000            Some(envelope) => Ok(Some(envelope)),
2001            None => {
2002                tracing::error!("Inbound channel closed");
2003                Err(NetworkError::ConnectionError(
2004                    "Inbound channel closed".to_string(),
2005                ))
2006            }
2007        }
2008    }
2009
2010    fn is_connected(&self) -> bool {
2011        self.connected.load(Ordering::Acquire)
2012    }
2013
2014    fn get_stats(&self) -> SignalingStats {
2015        self.stats.snapshot()
2016    }
2017
2018    fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2019        self.event_tx.subscribe()
2020    }
2021
2022    async fn set_actor_id(&self, actor_id: ActrId) {
2023        *self.actor_id.lock().await = Some(actor_id);
2024    }
2025
2026    async fn set_credential_state(&self, credential_state: CredentialState) {
2027        *self.credential_state.lock().await = Some(credential_state);
2028    }
2029
2030    async fn clear_identity(&self) {
2031        *self.actor_id.lock().await = None;
2032        *self.credential_state.lock().await = None;
2033    }
2034
2035    fn set_hook_callback(&self, cb: HookCallback) {
2036        let _ = self.hook_callback.set(cb);
2037    }
2038}
2039
2040/// signaling statistics info
2041#[derive(Debug)]
2042pub(crate) struct AtomicSignalingStats {
2043    /// Connect attempts
2044    pub connections: AtomicU64,
2045
2046    /// DisconnectConnect attempts
2047    pub disconnections: AtomicU64,
2048
2049    /// Send'smessage number
2050    pub messages_sent: AtomicU64,
2051
2052    /// Receive'smessage number
2053    pub messages_received: AtomicU64,
2054
2055    /// Send's center skip number
2056    /// TODO: Wire heartbeat counters when heartbeat send/receive paths are instrumented; currently never incremented.
2057    pub heartbeats_sent: AtomicU64,
2058
2059    /// Receive's center skip number
2060    /// TODO: Wire heartbeat counters when heartbeat send/receive paths are instrumented; currently never incremented.
2061    pub heartbeats_received: AtomicU64,
2062
2063    /// Error attempts
2064    pub errors: AtomicU64,
2065}
2066
2067impl Default for AtomicSignalingStats {
2068    fn default() -> Self {
2069        Self {
2070            connections: AtomicU64::new(0),
2071            disconnections: AtomicU64::new(0),
2072            messages_sent: AtomicU64::new(0),
2073            messages_received: AtomicU64::new(0),
2074            heartbeats_sent: AtomicU64::new(0),
2075            heartbeats_received: AtomicU64::new(0),
2076            errors: AtomicU64::new(0),
2077        }
2078    }
2079}
2080
2081/// Snapshot of statistics for serialization and reading
2082#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
2083pub struct SignalingStats {
2084    /// Connect attempts
2085    pub connections: u64,
2086
2087    /// DisconnectConnect attempts
2088    pub disconnections: u64,
2089
2090    /// Send'smessage number
2091    pub messages_sent: u64,
2092
2093    /// Receive'smessage number
2094    pub messages_received: u64,
2095
2096    /// Send's center skip number
2097    pub heartbeats_sent: u64,
2098
2099    /// Receive's center skip number
2100    pub heartbeats_received: u64,
2101
2102    /// Error attempts
2103    pub errors: u64,
2104}
2105
2106impl AtomicSignalingStats {
2107    /// Create a snapshot of current statistics
2108    pub fn snapshot(&self) -> SignalingStats {
2109        SignalingStats {
2110            connections: self.connections.load(Ordering::Relaxed),
2111            disconnections: self.disconnections.load(Ordering::Relaxed),
2112            messages_sent: self.messages_sent.load(Ordering::Relaxed),
2113            messages_received: self.messages_received.load(Ordering::Relaxed),
2114            heartbeats_sent: self.heartbeats_sent.load(Ordering::Relaxed),
2115            heartbeats_received: self.heartbeats_received.load(Ordering::Relaxed),
2116            errors: self.errors.load(Ordering::Relaxed),
2117        }
2118    }
2119}
2120
2121fn current_unix_secs() -> u64 {
2122    use std::time::{SystemTime, UNIX_EPOCH};
2123    SystemTime::now()
2124        .duration_since(UNIX_EPOCH)
2125        .unwrap_or_default()
2126        .as_secs()
2127}
2128
2129#[cfg(test)]
2130mod tests {
2131    use super::*;
2132    use std::future::Future;
2133    use std::pin::Pin;
2134    use std::sync::atomic::{AtomicUsize, Ordering as UsizeOrdering};
2135
2136    /// Simple fake SignalingClient implementation for testing the reconnect helper.
2137    struct FakeSignalingClient {
2138        event_tx: broadcast::Sender<SignalingEvent>,
2139        connected: AtomicBool,
2140        connect_calls: Arc<AtomicUsize>,
2141        actor_id: tokio::sync::Mutex<Option<ActrId>>,
2142        credential_state: tokio::sync::Mutex<Option<CredentialState>>,
2143    }
2144
2145    #[async_trait]
2146    impl SignalingClient for FakeSignalingClient {
2147        async fn connect(&self) -> NetworkResult<()> {
2148            self.connect_calls.fetch_add(1, UsizeOrdering::SeqCst);
2149            Ok(())
2150        }
2151
2152        async fn disconnect(&self) -> NetworkResult<()> {
2153            Ok(())
2154        }
2155
2156        async fn send_register_request(
2157            &self,
2158            _request: RegisterRequest,
2159        ) -> NetworkResult<RegisterResponse> {
2160            unimplemented!("not needed in tests");
2161        }
2162
2163        async fn send_unregister_request(
2164            &self,
2165            _actor_id: ActrId,
2166            _credential: AIdCredential,
2167            _reason: Option<String>,
2168        ) -> NetworkResult<UnregisterResponse> {
2169            unimplemented!("not needed in tests");
2170        }
2171
2172        async fn send_heartbeat(
2173            &self,
2174            _actor_id: ActrId,
2175            _credential: AIdCredential,
2176            _availability: ServiceAvailabilityState,
2177            _power_reserve: f32,
2178            _mailbox_backlog: f32,
2179        ) -> NetworkResult<Pong> {
2180            unimplemented!("not needed in tests");
2181        }
2182
2183        async fn send_route_candidates_request(
2184            &self,
2185            _actor_id: ActrId,
2186            _credential: AIdCredential,
2187            _request: RouteCandidatesRequest,
2188        ) -> NetworkResult<RouteCandidatesResponse> {
2189            unimplemented!("not needed in tests");
2190        }
2191
2192        async fn get_signing_key(
2193            &self,
2194            _actor_id: ActrId,
2195            _credential: AIdCredential,
2196            _key_id: u32,
2197        ) -> NetworkResult<(u32, Vec<u8>)> {
2198            unimplemented!("not needed in tests");
2199        }
2200
2201        async fn send_credential_update_request(
2202            &self,
2203            _actor_id: ActrId,
2204            _credential: AIdCredential,
2205        ) -> NetworkResult<RegisterResponse> {
2206            unimplemented!("not needed in tests");
2207        }
2208
2209        async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
2210            unimplemented!("not needed in tests");
2211        }
2212
2213        async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2214            unimplemented!("not needed in tests");
2215        }
2216
2217        fn is_connected(&self) -> bool {
2218            self.connected.load(Ordering::SeqCst)
2219        }
2220
2221        fn get_stats(&self) -> SignalingStats {
2222            SignalingStats::default()
2223        }
2224
2225        fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2226            self.event_tx.subscribe()
2227        }
2228
2229        async fn set_actor_id(&self, actor_id: ActrId) {
2230            *self.actor_id.lock().await = Some(actor_id);
2231        }
2232
2233        async fn set_credential_state(&self, credential_state: CredentialState) {
2234            *self.credential_state.lock().await = Some(credential_state);
2235        }
2236
2237        async fn clear_identity(&self) {
2238            *self.actor_id.lock().await = None;
2239            *self.credential_state.lock().await = None;
2240        }
2241    }
2242
2243    fn make_fake_client() -> Arc<FakeSignalingClient> {
2244        let (event_tx, _erx) = broadcast::channel(64);
2245        Arc::new(FakeSignalingClient {
2246            event_tx,
2247            connected: AtomicBool::new(false),
2248            connect_calls: Arc::new(AtomicUsize::new(0)),
2249            actor_id: tokio::sync::Mutex::new(None),
2250            credential_state: tokio::sync::Mutex::new(None),
2251        })
2252    }
2253
2254    /// Helper: create a minimal SignalingConfig with an unreachable URL.
2255    fn make_config() -> SignalingConfig {
2256        SignalingConfig {
2257            server_url: Url::parse("ws://127.0.0.1:1/signaling/ws").unwrap(),
2258            connection_timeout: 2,
2259            heartbeat_interval: 30,
2260            reconnect_config: ReconnectConfig::default(),
2261            auth_config: None,
2262            webrtc_role: None,
2263        }
2264    }
2265
2266    /// Helper: create a WebSocketSignalingClient wrapped in Arc
2267    fn make_ws_client(config: SignalingConfig) -> Arc<WebSocketSignalingClient> {
2268        Arc::new(WebSocketSignalingClient::new(config))
2269    }
2270
2271    #[tokio::test]
2272    async fn test_publish_disconnected_transition_fires_hook_once() {
2273        let stats = Arc::new(AtomicSignalingStats::default());
2274        let (event_tx, mut event_rx) = broadcast::channel(4);
2275        let hook_count = Arc::new(AtomicUsize::new(0));
2276        let hook_count_for_cb = hook_count.clone();
2277        let hook_callback: HookCallback = Arc::new(move |event| {
2278            let hook_count = hook_count_for_cb.clone();
2279            Box::pin(async move {
2280                if matches!(event, HookEvent::SignalingDisconnected) {
2281                    hook_count.fetch_add(1, UsizeOrdering::SeqCst);
2282                }
2283            }) as Pin<Box<dyn Future<Output = ()> + Send>>
2284        });
2285
2286        let first = WebSocketSignalingClient::publish_disconnected_transition(
2287            true,
2288            &stats,
2289            &event_tx,
2290            Some(hook_callback.clone()),
2291            DisconnectReason::StreamEnded,
2292            None,
2293        )
2294        .await;
2295        assert!(
2296            first,
2297            "first connected->disconnected transition should publish"
2298        );
2299        assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2300        assert_eq!(stats.snapshot().disconnections, 1);
2301        assert!(matches!(
2302            event_rx.recv().await,
2303            Ok(SignalingEvent::Disconnected {
2304                reason: DisconnectReason::StreamEnded
2305            })
2306        ));
2307
2308        let second = WebSocketSignalingClient::publish_disconnected_transition(
2309            false,
2310            &stats,
2311            &event_tx,
2312            Some(hook_callback),
2313            DisconnectReason::PongTimeout,
2314            None,
2315        )
2316        .await;
2317        assert!(
2318            !second,
2319            "stale duplicate disconnected transition should be ignored"
2320        );
2321        assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2322        assert_eq!(stats.snapshot().disconnections, 1);
2323        assert!(event_rx.try_recv().is_err());
2324    }
2325
2326    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2327    // 1. Configuration defaults
2328    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2329
2330    #[test]
2331    fn test_reconnect_config_defaults() {
2332        let cfg = ReconnectConfig::default();
2333        assert!(cfg.enabled);
2334        assert_eq!(cfg.max_attempts, 10);
2335        assert_eq!(cfg.initial_delay, 1);
2336        assert_eq!(cfg.max_delay, 60);
2337        assert!((cfg.backoff_multiplier - 2.0).abs() < f64::EPSILON);
2338    }
2339
2340    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2341    // 2. Initial state
2342    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2343
2344    #[test]
2345    fn test_websocket_signaling_client_initial_state_disconnected() {
2346        let client = WebSocketSignalingClient::new(make_config());
2347        assert!(
2348            !client.is_connected(),
2349            "newly created client should be Disconnected"
2350        );
2351        assert!(
2352            !client.connecting.load(Ordering::Acquire),
2353            "newly created client should not be in connecting state"
2354        );
2355        assert!(
2356            !client.reconnector_started.load(Ordering::Acquire),
2357            "reconnect manager should not be started automatically"
2358        );
2359    }
2360
2361    #[test]
2362    fn test_initial_stats_are_zero() {
2363        let client = WebSocketSignalingClient::new(make_config());
2364        let stats = client.get_stats();
2365        assert_eq!(stats.connections, 0);
2366        assert_eq!(stats.disconnections, 0);
2367        assert_eq!(stats.messages_sent, 0);
2368        assert_eq!(stats.messages_received, 0);
2369        assert_eq!(stats.errors, 0);
2370    }
2371
2372    #[test]
2373    fn test_signaling_url_log_redacts_credential_query_params() {
2374        let url = Url::parse(
2375            "wss://example.com/signaling?actor_id=abc&key_id=7&claims=claims-value&signature=signature-value&token=token-value",
2376        )
2377        .unwrap();
2378
2379        let redacted = WebSocketSignalingClient::redact_signaling_url_for_log(&url);
2380
2381        assert!(redacted.contains("actor_id=abc"));
2382        assert!(redacted.contains("key_id=7"));
2383        assert!(redacted.contains("claims=REDACTED"));
2384        assert!(redacted.contains("signature=REDACTED"));
2385        assert!(redacted.contains("token=REDACTED"));
2386        assert!(!redacted.contains("claims-value"));
2387        assert!(!redacted.contains("signature-value"));
2388        assert!(!redacted.contains("token-value"));
2389    }
2390
2391    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2392    // 3. Reconnect manager idempotency
2393    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2394
2395    #[tokio::test]
2396    async fn test_reconnect_manager_idempotent() {
2397        let client = make_ws_client(make_config());
2398
2399        // First start should succeed
2400        client.start_reconnect_manager();
2401        assert!(
2402            client.reconnector_started.load(Ordering::Acquire),
2403            "reconnector_started should be true after first call"
2404        );
2405
2406        // Second call should not start a new manager (CAS fails)
2407        client.start_reconnect_manager();
2408        // Multiple managers would cause flaky tests due to duplicate reconnections; mainly verify the flag
2409        assert!(client.reconnector_started.load(Ordering::Acquire));
2410    }
2411
2412    #[tokio::test]
2413    async fn test_reconnect_manager_disabled_when_config_disabled() {
2414        let mut config = make_config();
2415        config.reconnect_config.enabled = false;
2416        let client = make_ws_client(config);
2417
2418        client.start_reconnect_manager();
2419        assert!(
2420            !client.reconnector_started.load(Ordering::Acquire),
2421            "reconnect manager should not start when reconnect config is disabled"
2422        );
2423    }
2424
2425    #[tokio::test]
2426    async fn test_reconnect_manager_does_not_keep_client_alive() {
2427        let client = make_ws_client(make_config());
2428        let weak = Arc::downgrade(&client);
2429
2430        client.start_reconnect_manager();
2431        drop(client);
2432
2433        assert!(
2434            weak.upgrade().is_none(),
2435            "reconnect manager must not keep signaling client alive after owner drop"
2436        );
2437    }
2438
2439    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2440    // 4. connect() concurrency exclusion
2441    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2442
2443    #[tokio::test]
2444    async fn test_connect_fast_path_when_already_connected() {
2445        let client = make_ws_client(make_config());
2446        // Manually set as connected
2447        client.connected.store(true, Ordering::Release);
2448
2449        // connect() should return Ok immediately without establishing a new connection
2450        let result = client.connect().await;
2451        assert!(
2452            result.is_ok(),
2453            "connect() should return Ok when already connected"
2454        );
2455        // Should not change connecting flag
2456        assert!(!client.connecting.load(Ordering::Acquire));
2457    }
2458
2459    #[tokio::test]
2460    async fn test_connect_sets_connecting_flag() {
2461        let mut config = make_config();
2462        config.reconnect_config.enabled = false; // disable retry, fail fast
2463        config.connection_timeout = 1;
2464        let client = make_ws_client(config);
2465
2466        // Connection will fail (unreachable address), but should properly clean up connecting flag
2467        let result = client.connect().await;
2468        assert!(
2469            result.is_err(),
2470            "connecting to unreachable address should fail"
2471        );
2472        assert!(
2473            !client.connecting.load(Ordering::Acquire),
2474            "connecting flag should be cleared after connection failure"
2475        );
2476    }
2477
2478    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2479    // 5. Event broadcast
2480    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2481
2482    #[tokio::test]
2483    async fn test_event_subscribe_receives_events() {
2484        let client = make_ws_client(make_config());
2485        let mut rx = client.subscribe_events();
2486
2487        // Manually send event
2488        let _ = client.event_tx.send(SignalingEvent::Connected);
2489
2490        match tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await {
2491            Ok(Ok(SignalingEvent::Connected)) => {} // expect Connected event
2492            other => panic!("expected Connected event, but got {:?}", other),
2493        }
2494    }
2495
2496    #[tokio::test]
2497    async fn test_disconnect_event_on_connect_failure() {
2498        let mut config = make_config();
2499        config.reconnect_config.enabled = false;
2500        config.connection_timeout = 1;
2501        let client = make_ws_client(config);
2502        let mut rx = client.subscribe_events();
2503
2504        // Connection fails
2505        let _ = client.connect().await;
2506
2507        // Should receive Disconnected(ConnectionFailed) event
2508        match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
2509            Ok(Ok(SignalingEvent::Disconnected {
2510                reason: DisconnectReason::ConnectionFailed(_),
2511            })) => {} // expected
2512            other => panic!(
2513                "expected Disconnected(ConnectionFailed) event, but got {:?}",
2514                other
2515            ),
2516        }
2517    }
2518
2519    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2520    // 6. disconnect() state cleanup
2521    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2522
2523    #[tokio::test]
2524    async fn test_disconnect_clears_connected_flag() {
2525        let client = make_ws_client(make_config());
2526        // Simulate connected state
2527        client.connected.store(true, Ordering::Release);
2528        assert!(client.is_connected());
2529
2530        let result = client.disconnect().await;
2531        assert!(result.is_ok());
2532        assert!(
2533            !client.is_connected(),
2534            "should be Disconnected after disconnect()"
2535        );
2536    }
2537
2538    #[tokio::test]
2539    async fn test_disconnect_increments_disconnection_stat() {
2540        let client = make_ws_client(make_config());
2541        client.connected.store(true, Ordering::Release);
2542
2543        let stats_before = client.get_stats().disconnections;
2544        let _ = client.disconnect().await;
2545        let stats_after = client.get_stats().disconnections;
2546        assert_eq!(
2547            stats_after,
2548            stats_before + 1,
2549            "disconnect() should increment disconnection count"
2550        );
2551    }
2552
2553    #[tokio::test]
2554    async fn test_disconnect_idempotent() {
2555        let client = make_ws_client(make_config());
2556
2557        // Calling disconnect() while not connected should not panic
2558        let r1 = client.disconnect().await;
2559        let r2 = client.disconnect().await;
2560        assert!(r1.is_ok());
2561        assert!(r2.is_ok());
2562        assert!(!client.is_connected());
2563    }
2564
2565    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2566    // 7. Reconnect notify mechanism
2567    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2568
2569    #[tokio::test]
2570    async fn test_reconnect_notify_wakes_waiter() {
2571        let notify = Arc::new(tokio::sync::Notify::new());
2572        let notify_clone = notify.clone();
2573        let woken = Arc::new(AtomicBool::new(false));
2574        let woken_clone = woken.clone();
2575
2576        let handle = tokio::spawn(async move {
2577            notify_clone.notified().await;
2578            woken_clone.store(true, Ordering::Release);
2579        });
2580
2581        // Ensure waiter has registered
2582        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2583        assert!(
2584            !woken.load(Ordering::Acquire),
2585            "should not be woken before notification"
2586        );
2587
2588        // Trigger notification
2589        notify.notify_one();
2590        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2591        assert!(
2592            woken.load(Ordering::Acquire),
2593            "should be woken after notification"
2594        );
2595
2596        handle.abort();
2597    }
2598
2599    #[tokio::test]
2600    async fn test_explicit_disconnect_suppresses_reconnect_cycle_in_backoff() {
2601        let mut config = make_config();
2602        config.connection_timeout = 1;
2603        config.reconnect_config = ReconnectConfig {
2604            enabled: true,
2605            max_attempts: 5,
2606            initial_delay: 1,
2607            max_delay: 1,
2608            backoff_multiplier: 1.0,
2609        };
2610        let client = make_ws_client(config);
2611        let mut rx = client.subscribe_events();
2612
2613        let reconnect_client = client.clone();
2614        let reconnect_task = tokio::spawn(async move {
2615            reconnect_client.run_reconnect_cycle().await;
2616        });
2617
2618        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2619            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2620            other => panic!("expected first reconnect attempt, got {other:?}"),
2621        }
2622
2623        client
2624            .disconnect()
2625            .await
2626            .expect("explicit disconnect should be idempotent");
2627
2628        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2629            .await
2630            .expect("suppressed reconnect cycle should exit promptly")
2631            .expect("reconnect task should not panic");
2632
2633        while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
2634        {
2635            if let SignalingEvent::ConnectStart { attempt } = event {
2636                panic!("suppressed reconnect cycle sent unexpected attempt {attempt}");
2637            }
2638        }
2639
2640        assert!(
2641            client.auto_reconnect_suppressed.load(Ordering::Acquire),
2642            "explicit disconnect should suppress stale auto-reconnect cycles"
2643        );
2644    }
2645
2646    #[tokio::test]
2647    async fn test_explicit_disconnect_suppresses_in_flight_auto_reconnect_connected_event() {
2648        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2649            .await
2650            .expect("test listener should bind");
2651        let server_url = format!(
2652            "ws://{}/signaling/ws",
2653            listener
2654                .local_addr()
2655                .expect("test listener should have local addr")
2656        );
2657        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
2658
2659        let server_task = tokio::spawn(async move {
2660            let (stream, _) = listener
2661                .accept()
2662                .await
2663                .expect("test server should accept tcp connection");
2664            let _ = release_rx.await;
2665            let ws_stream = tokio_tungstenite::accept_async(stream)
2666                .await
2667                .expect("test server should complete websocket handshake");
2668            tokio::time::sleep(Duration::from_millis(100)).await;
2669            drop(ws_stream);
2670        });
2671
2672        let mut config = make_config();
2673        config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2674        config.connection_timeout = 5;
2675        config.reconnect_config = ReconnectConfig {
2676            enabled: true,
2677            max_attempts: 3,
2678            initial_delay: 1,
2679            max_delay: 1,
2680            backoff_multiplier: 1.0,
2681        };
2682        let client = make_ws_client(config);
2683        let mut rx = client.subscribe_events();
2684
2685        let reconnect_client = client.clone();
2686        let reconnect_task = tokio::spawn(async move {
2687            reconnect_client.run_reconnect_cycle().await;
2688        });
2689
2690        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2691            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2692            other => panic!("expected first reconnect attempt, got {other:?}"),
2693        }
2694
2695        client
2696            .disconnect()
2697            .await
2698            .expect("explicit disconnect should cancel the in-flight auto-reconnect");
2699        release_tx
2700            .send(())
2701            .expect("test server handshake should still be waiting");
2702
2703        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2704            .await
2705            .expect("cancelled in-flight reconnect should exit promptly")
2706            .expect("reconnect task should not panic");
2707
2708        while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(150), rx.recv()).await
2709        {
2710            assert!(
2711                !matches!(event, SignalingEvent::Connected),
2712                "cancelled auto-reconnect must not publish Connected"
2713            );
2714        }
2715
2716        assert!(
2717            !client.is_connected(),
2718            "cancelled auto-reconnect must not leave signaling connected"
2719        );
2720
2721        tokio::time::timeout(Duration::from_secs(1), server_task)
2722            .await
2723            .expect("test server task should finish")
2724            .expect("test server task should not panic");
2725    }
2726
2727    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2728    // 8. URL construction tests
2729    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2730
2731    #[tokio::test]
2732    async fn test_build_url_without_identity() {
2733        let config = make_config();
2734        let expected_base = config.server_url.to_string();
2735        let client = WebSocketSignalingClient::new(config);
2736
2737        let url = client.build_url_with_identity().await;
2738        assert_eq!(
2739            url.to_string(),
2740            expected_base,
2741            "URL should not contain identity parameters when actor_id is not set"
2742        );
2743    }
2744
2745    #[tokio::test]
2746    async fn test_build_url_with_webrtc_role() {
2747        let mut config = make_config();
2748        config.webrtc_role = Some("answer".to_string());
2749        let client = WebSocketSignalingClient::new(config);
2750
2751        let url = client.build_url_with_identity().await;
2752        assert!(
2753            url.query().unwrap_or("").contains("webrtc_role=answer"),
2754            "URL should contain webrtc_role parameter, actual URL: {}",
2755            url
2756        );
2757    }
2758
2759    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2760    // 9. Inbound channel reset
2761    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2762
2763    #[tokio::test]
2764    async fn test_reset_inbound_channel_creates_fresh_channel() {
2765        let client = WebSocketSignalingClient::new(make_config());
2766
2767        // Get old tx and send a message
2768        {
2769            let tx = client.inbound_tx.lock().await;
2770            let _ = tx.send(SignalingEnvelope::default());
2771        }
2772
2773        // Reset channel
2774        client.reset_inbound_channel().await;
2775
2776        // Old messages should not be visible in the new channel
2777        let mut rx = client.inbound_rx.lock().await;
2778        let result = rx.try_recv();
2779        assert!(
2780            result.is_err(),
2781            "old messages should not be visible in the new channel after reset"
2782        );
2783    }
2784
2785    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2786    // 10. Envelope ID incrementing
2787    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2788
2789    #[tokio::test]
2790    async fn test_envelope_id_monotonically_increasing() {
2791        let client = WebSocketSignalingClient::new(make_config());
2792
2793        let id1 = client.next_envelope_id().await;
2794        let id2 = client.next_envelope_id().await;
2795        let id3 = client.next_envelope_id().await;
2796
2797        assert_eq!(id1, "env-1");
2798        assert_eq!(id2, "env-2");
2799        assert_eq!(id3, "env-3");
2800    }
2801
2802    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2803    // 11. send_envelope should return error when not connected
2804    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2805
2806    #[tokio::test]
2807    async fn test_send_envelope_fails_when_not_connected() {
2808        let client = WebSocketSignalingClient::new(make_config());
2809        let envelope = SignalingEnvelope::default();
2810
2811        let result = client.send_envelope(envelope).await;
2812        assert!(
2813            result.is_err(),
2814            "send_envelope should return error when not connected"
2815        );
2816        match result {
2817            Err(NetworkError::ConnectionError(msg)) => {
2818                assert!(
2819                    msg.contains("not connected") || msg.contains("Not connected"),
2820                    "error message should contain 'not connected', actual: {}",
2821                    msg
2822                );
2823            }
2824            other => panic!("expected ConnectionError, got {:?}", other),
2825        }
2826    }
2827
2828    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2829    // 12. FakeSignalingClient trait implementation verification
2830    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2831
2832    #[tokio::test]
2833    async fn test_fake_client_tracks_connect_calls() {
2834        let client = make_fake_client();
2835        assert_eq!(client.connect_calls.load(UsizeOrdering::SeqCst), 0);
2836
2837        client.connect().await.unwrap();
2838        client.connect().await.unwrap();
2839        client.connect().await.unwrap();
2840
2841        assert_eq!(
2842            client.connect_calls.load(UsizeOrdering::SeqCst),
2843            3,
2844            "FakeSignalingClient should accurately track connect call count"
2845        );
2846    }
2847}