Skip to main content

actr_hyper/wire/webrtc/
signaling.rs

1//! signaling clientImplementation
2//!
3//! Based on protobuf Definition'ssignalingprotocol, using SignalingEnvelope conclude construct
4
5#[cfg(feature = "opentelemetry")]
6use super::trace;
7use crate::lifecycle::CredentialState;
8use crate::transport::{NetworkError, NetworkResult};
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::extract_trace_context;
11use actr_protocol::prost::Message as ProstMessage;
12use actr_protocol::{
13    AIdCredential, ActrId, ActrToSignaling, CredentialUpdateRequest, GetSigningKeyRequest,
14    PeerToSignaling, Ping, Pong, RegisterRequest, RegisterResponse, RouteCandidatesRequest,
15    RouteCandidatesResponse, ServiceAvailabilityState, SignalingEnvelope, UnregisterRequest,
16    UnregisterResponse, actr_to_signaling, peer_to_signaling, signaling_envelope,
17    signaling_to_actr,
18};
19use async_trait::async_trait;
20use base64::Engine as _;
21use futures_util::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::{
27    Arc, OnceLock,
28    atomic::{AtomicBool, AtomicU64, Ordering},
29};
30use std::time::Duration;
31use tokio::net::TcpStream;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
34use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async_with_config};
35#[cfg(feature = "opentelemetry")]
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use url::Url;
38
39/// WebSocket sink type alias for the split write half of a signaling connection
40type WsSink = Arc<
41    tokio::sync::Mutex<
42        Option<
43            futures_util::stream::SplitSink<
44                WebSocketStream<MaybeTlsStream<TcpStream>>,
45                tokio_tungstenite::tungstenite::Message,
46            >,
47        >,
48    >,
49>;
50
51// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
52// Constants
53// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
54
55/// Default timeout in seconds for waiting for signaling request/response RPCs.
56const RESPONSE_TIMEOUT_SECS: u64 = 5;
57// WebSocket-level keepalive to detect silent half-open connections
58const PING_INTERVAL_SECS: u64 = 5;
59const PONG_TIMEOUT_SECS: u64 = 10;
60const SIGNALING_SEND_TIMEOUT_SECS: u64 = 5;
61const CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS: u64 = 5;
62const DISCONNECT_LOCK_TIMEOUT_SECS: u64 = 5;
63const DISCONNECT_CLOSE_TIMEOUT_SECS: u64 = 1;
64
65// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
66// configurationType
67// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
68
69/// signalingconfiguration
70#[derive(Debug, Clone)]
71pub struct SignalingConfig {
72    /// signaling server URL
73    pub server_url: Url,
74
75    /// Connecttimeout temporal duration (seconds)
76    pub connection_timeout: u64,
77
78    /// center skipinterval(seconds)
79    pub heartbeat_interval: u64,
80
81    /// reconnection configuration
82    pub reconnect_config: ReconnectConfig,
83
84    /// acknowledge verify configuration
85    pub auth_config: Option<AuthConfig>,
86
87    /// WebRTC role preference: "answer" if this node has advanced config
88    pub webrtc_role: Option<String>,
89}
90
91/// reconnection configuration
92#[derive(Debug, Clone)]
93pub struct ReconnectConfig {
94    /// whether start usage automatic reconnection
95    pub enabled: bool,
96
97    /// maximum reconnection attempts
98    pub max_attempts: u32,
99
100    /// initial reconnection delay(seconds)
101    pub initial_delay: u64,
102
103    /// maximum reconnection delay(seconds)
104    pub max_delay: u64,
105
106    /// Backoff multiplier factor
107    pub backoff_multiplier: f64,
108}
109
110impl Default for ReconnectConfig {
111    fn default() -> Self {
112        Self {
113            enabled: true,
114            max_attempts: 10,
115            initial_delay: 1,
116            max_delay: 60,
117            backoff_multiplier: 2.0,
118        }
119    }
120}
121
122/// acknowledge verify configuration
123#[derive(Debug, Clone)]
124pub struct AuthConfig {
125    /// acknowledge verify Type
126    pub auth_type: AuthType,
127
128    /// acknowledge verify credential data
129    pub credentials: HashMap<String, String>,
130}
131
132/// acknowledge verify Type
133#[derive(Debug, Clone)]
134pub enum AuthType {
135    /// no acknowledge verify
136    None,
137    /// Bearer Token
138    BearerToken,
139    /// API Key
140    ApiKey,
141    /// JWT
142    Jwt,
143}
144
145// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
146// Client interface and implementation
147// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
148
149/// signaling client connect port
150///
151/// # interior mutability
152/// allMethodusing `&self` and non `&mut self`, with for conveniencein Arc in shared.
153/// Implementation class needs interior mutability ( like Mutex)to manage WebSocket connection status.
154#[async_trait]
155pub trait SignalingClient: Send + Sync {
156    /// Connecttosignaling server
157    async fn connect(&self) -> NetworkResult<()>;
158
159    /// Perform a single explicit connection attempt.
160    ///
161    /// Network recovery events use this path so a failed restore attempt can
162    /// return quickly instead of sleeping inside the normal reconnect backoff.
163    async fn connect_once(&self) -> NetworkResult<()> {
164        self.connect().await
165    }
166
167    /// Re-enable and wake the automatic reconnect manager after an explicit
168    /// lifecycle recovery attempt failed.
169    fn schedule_auto_reconnect(&self) {}
170
171    /// Re-enable automatic reconnect and restart its backoff sequence.
172    ///
173    /// Network restoration signals should use this path because they represent
174    /// a fresh external condition, not just another passive connection failure.
175    fn schedule_auto_reconnect_reset_backoff(&self) {
176        self.schedule_auto_reconnect();
177    }
178
179    /// DisconnectConnect
180    async fn disconnect(&self) -> NetworkResult<()>;
181
182    /// Probe whether the existing signaling WebSocket is truly alive.
183    ///
184    /// The default implementation only checks local state. WebSocket-backed
185    /// clients override this with an active Ping/Pong probe to catch half-open
186    /// sockets before network recovery decides whether to reconnect.
187    async fn probe_alive(&self, _timeout: Duration) -> NetworkResult<()> {
188        if self.is_connected() {
189            Ok(())
190        } else {
191            Err(NetworkError::ConnectionError(
192                "Signaling client is not connected".to_string(),
193            ))
194        }
195    }
196
197    /// Deprecated: Registration now happens via AIS HTTP; this WS path is no longer used.
198    /// Kept for backward compatibility; will be removed in a future release.
199    async fn send_register_request(
200        &self,
201        request: RegisterRequest,
202    ) -> NetworkResult<RegisterResponse>;
203
204    /// Send UnregisterRequest to signaling server (Actr → Signaling flow)
205    ///
206    /// This is used when an Actor is shutting down gracefully and wants to
207    /// proactively notify the signaling server that it is no longer available.
208    async fn send_unregister_request(
209        &self,
210        actor_id: ActrId,
211        credential: AIdCredential,
212        reason: Option<String>,
213    ) -> NetworkResult<UnregisterResponse>;
214
215    /// Send center skip(Registerafter stream process, using ActrToSignaling)
216    /// Returns Pong response if received, error if timeout or no response
217    async fn send_heartbeat(
218        &self,
219        actor_id: ActrId,
220        credential: AIdCredential,
221        availability: ServiceAvailabilityState,
222        power_reserve: f32,
223        mailbox_backlog: f32,
224    ) -> NetworkResult<Pong>;
225
226    /// Send RouteCandidatesRequest (requires authenticated Actor session)
227    async fn send_route_candidates_request(
228        &self,
229        actor_id: ActrId,
230        credential: AIdCredential,
231        request: RouteCandidatesRequest,
232    ) -> NetworkResult<RouteCandidatesResponse>;
233
234    /// Query AIS Ed25519 signing public key via signaling
235    ///
236    /// Returns `(key_id, pubkey_bytes)` where pubkey_bytes is the 32-byte raw public key.
237    /// Typically called by AisKeyCache on cache miss; should not be used directly in hot paths.
238    async fn get_signing_key(
239        &self,
240        actor_id: ActrId,
241        credential: AIdCredential,
242        key_id: u32,
243    ) -> NetworkResult<(u32, Vec<u8>)>;
244
245    /// Send CredentialUpdateRequest to refresh the Actor's credential
246    ///
247    /// This is used to refresh the credential before it expires. The server responds
248    /// with a RegisterResponse containing the new credential and expiration time.
249    async fn send_credential_update_request(
250        &self,
251        actor_id: ActrId,
252        credential: AIdCredential,
253    ) -> NetworkResult<RegisterResponse>;
254
255    /// Sendsignalingsignal seal ( pass usage Method)
256    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
257
258    /// Receivesignalingsignal seal
259    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
260
261    /// Check connection status
262    fn is_connected(&self) -> bool;
263
264    /// GetConnect statistics info
265    fn get_stats(&self) -> SignalingStats;
266    /// Subscribe to signaling events (state transitions).
267    fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent>;
268
269    /// Set actor ID and credential state for reconnect URL parameters.
270    async fn set_actor_id(&self, actor_id: ActrId);
271    async fn set_credential_state(&self, credential_state: CredentialState);
272
273    /// Clear stored actor ID and credential state.
274    ///
275    /// After calling this, `connect()` will produce a clean WebSocket URL
276    /// without identity query parameters, so the signaling server treats
277    /// the connection as brand-new rather than a reconnect of the old actor.
278    /// This is required before re-registration when the credential has expired.
279    async fn clear_identity(&self);
280
281    /// Set a lifecycle hook callback that will be invoked (and awaited)
282    /// whenever signaling state changes (connect/disconnect).
283    /// Default implementation is a no-op for clients that don't support hooks.
284    fn set_hook_callback(&self, _cb: HookCallback) {}
285}
286
287/// High-level signaling connection state (kept for quick boolean checks).
288#[derive(Debug, Clone, Copy, PartialEq, Eq)]
289pub enum ConnectionState {
290    Disconnected,
291    Connected,
292}
293
294/// Signaling state transition events.
295///
296/// Unlike `ConnectionState` (which is a snapshot), these represent discrete
297/// transitions and are delivered via `broadcast` so every subscriber sees
298/// every event, even if the same state occurs twice in a row.
299#[derive(Debug, Clone)]
300pub enum SignalingEvent {
301    /// About to start a connection attempt (includes retry count).
302    ConnectStart { attempt: u32 },
303    /// Connection successfully established.
304    Connected,
305    /// Connection lost.
306    Disconnected { reason: DisconnectReason },
307}
308
309/// Reason why the signaling connection was lost.
310#[derive(Debug, Clone)]
311pub enum DisconnectReason {
312    /// WebSocket stream ended (receiver task exited normally).
313    StreamEnded,
314    /// No Pong received within the timeout window.
315    PongTimeout,
316    /// Failed to send a WebSocket Ping frame.
317    PingSendFailed,
318    /// Credential expired (heartbeat 401).
319    CredentialExpired,
320    /// Explicit disconnect() call or external trigger.
321    Manual,
322    /// Connection attempt failed with an error.
323    ConnectionFailed(String),
324}
325
326// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
327// Hook callback for synchronous lifecycle notification
328// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
329
330/// Events that trigger workload lifecycle hooks.
331///
332/// Used by `HookCallback` to invoke workload hooks synchronously (awaited)
333/// at the point where the state change occurs.
334#[derive(Clone, Debug)]
335pub enum HookEvent {
336    // ── Signaling ──
337    SignalingConnectStart {
338        attempt: u32,
339    },
340    SignalingConnected,
341    SignalingDisconnected,
342    // ── WebRTC ──
343    WebRtcConnectStart {
344        peer_id: ActrId,
345    },
346    WebRtcConnected {
347        peer_id: ActrId,
348        relayed: bool,
349    },
350    WebRtcDisconnected {
351        peer_id: ActrId,
352    },
353    DataStreamDeliveryUncertain {
354        stream_id: String,
355        session_id: u64,
356        reason: String,
357    },
358    // ── WebSocket ──
359    WebSocketConnectStart {
360        peer_id: ActrId,
361    },
362    WebSocketConnected {
363        peer_id: ActrId,
364    },
365    WebSocketDisconnected {
366        peer_id: ActrId,
367    },
368    // ── Credential ──
369    CredentialRenewed {
370        new_expiry: std::time::SystemTime,
371    },
372    CredentialExpiring {
373        new_expiry: std::time::SystemTime,
374    },
375    // ── Mailbox ──
376    MailboxBackpressure {
377        queue_len: usize,
378        threshold: usize,
379    },
380}
381
382/// Callback closure that is awaited when a hook event occurs.
383///
384/// Set once via `set_hook_callback()`. All state-change paths invoke this
385/// closure and `.await` its result before proceeding.
386pub type HookCallback =
387    Arc<dyn Fn(HookEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
388
389#[derive(Debug, Clone, Copy)]
390enum ConnectIntent {
391    Explicit,
392    AutoReconnect { generation: u64 },
393}
394
395/// WebSocket signaling clientImplementation
396pub struct WebSocketSignalingClient {
397    config: SignalingConfig,
398    actor_id: tokio::sync::Mutex<Option<ActrId>>,
399    credential_state: tokio::sync::Mutex<Option<CredentialState>>,
400    /// WebSocket write end (using Mutex Implementation interior mutability )
401    ws_sink: WsSink,
402    /// WebSocket read end (using Mutex Implementation interior mutability )
403    ws_stream: tokio::sync::Mutex<
404        Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
405    >,
406    /// connection status
407    connected: Arc<AtomicBool>,
408    /// Connection in progress flag (prevents concurrent connect attempts)
409    connecting: Arc<AtomicBool>,
410    /// statistics info
411    stats: Arc<AtomicSignalingStats>,
412    /// Envelope count number device
413    envelope_counter: tokio::sync::Mutex<u64>,
414    /// Pending reply waiters (reply_for -> oneshot)
415    pending_replies: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<SignalingEnvelope>>>>,
416    /// Pending WebSocket Pong waiters (ping payload -> oneshot)
417    pending_pongs: Arc<tokio::sync::Mutex<HashMap<Vec<u8>, oneshot::Sender<()>>>>,
418    /// Monotonic probe payload counter.
419    probe_counter: AtomicU64,
420    /// Inbound envelope channel for unmatched messages (ActrRelay / push)
421    inbound_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<SignalingEnvelope>>>,
422    inbound_tx: tokio::sync::Mutex<mpsc::UnboundedSender<SignalingEnvelope>>,
423    /// Background receive task handle to allow graceful shutdown
424    receiver_task: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
425    /// Background ping task to detect half-open connections
426    ping_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
427    /// Connection state broadcast channel (event-driven)
428    event_tx: broadcast::Sender<SignalingEvent>,
429    /// Last time we saw inbound traffic (pong/any message), unix epoch seconds
430    last_pong: Arc<AtomicU64>,
431    /// Flag to track if reconnect manager has been started
432    reconnector_started: Arc<AtomicBool>,
433    /// Notify channel to wake up the reconnect manager
434    reconnect_notify: Arc<tokio::sync::Notify>,
435    /// Explicit disconnects from lifecycle/cleanup suppress stale auto-reconnect cycles.
436    auto_reconnect_suppressed: AtomicBool,
437    /// Incremented by explicit disconnects to invalidate in-flight auto-reconnect attempts.
438    reconnect_generation: AtomicU64,
439    /// Incremented by external recovery events that should restart reconnect backoff.
440    reconnect_backoff_reset_generation: AtomicU64,
441    /// Hook callback for synchronous lifecycle notification (set once, lock-free read)
442    hook_callback: OnceLock<HookCallback>,
443}
444
445impl WebSocketSignalingClient {
446    /// Create new WebSocket signaling client
447    pub fn new(config: SignalingConfig) -> Self {
448        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
449        let (event_tx, _event_rx) = broadcast::channel(64);
450        Self {
451            config,
452            actor_id: tokio::sync::Mutex::new(None),
453            credential_state: tokio::sync::Mutex::new(None),
454            ws_sink: Arc::new(tokio::sync::Mutex::new(None)),
455            ws_stream: tokio::sync::Mutex::new(None),
456            connected: Arc::new(AtomicBool::new(false)),
457            connecting: Arc::new(AtomicBool::new(false)),
458            stats: Arc::new(AtomicSignalingStats::default()),
459            envelope_counter: tokio::sync::Mutex::new(0),
460            pending_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
461            pending_pongs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
462            probe_counter: AtomicU64::new(0),
463            inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
464            inbound_tx: tokio::sync::Mutex::new(inbound_tx),
465            receiver_task: Arc::new(tokio::sync::Mutex::new(None)),
466            ping_task: tokio::sync::Mutex::new(None),
467            event_tx,
468            last_pong: Arc::new(AtomicU64::new(0)),
469            reconnector_started: Arc::new(AtomicBool::new(false)),
470            reconnect_notify: Arc::new(tokio::sync::Notify::new()),
471            auto_reconnect_suppressed: AtomicBool::new(false),
472            reconnect_generation: AtomicU64::new(0),
473            reconnect_backoff_reset_generation: AtomicU64::new(0),
474            hook_callback: OnceLock::new(),
475        }
476    }
477
478    /// Start the reconnect manager if enabled in config and not already started.
479    ///
480    /// The manager waits on a `Notify` and runs an exponential-backoff retry loop
481    /// each time it is woken up.
482    /// Invoke the hook callback and await its completion.
483    /// No-op if no callback has been set yet.
484    async fn invoke_hook(&self, event: HookEvent) {
485        if let Some(cb) = self.hook_callback.get() {
486            cb(event).await;
487        }
488    }
489
490    async fn publish_disconnected_transition(
491        was_connected: bool,
492        stats: &Arc<AtomicSignalingStats>,
493        event_tx: &broadcast::Sender<SignalingEvent>,
494        hook_callback: Option<HookCallback>,
495        reason: DisconnectReason,
496        reconnect_notify: Option<&Arc<tokio::sync::Notify>>,
497    ) -> bool {
498        if !was_connected {
499            return false;
500        }
501
502        stats.disconnections.fetch_add(1, Ordering::Relaxed);
503
504        if let Some(cb) = hook_callback {
505            cb(HookEvent::SignalingDisconnected).await;
506        }
507
508        let _ = event_tx.send(SignalingEvent::Disconnected { reason });
509
510        if let Some(notify) = reconnect_notify {
511            notify.notify_one();
512        }
513
514        true
515    }
516
517    pub fn start_reconnect_manager(self: &Arc<Self>) {
518        if !self.config.reconnect_config.enabled {
519            return;
520        }
521        if self
522            .reconnector_started
523            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
524            .is_err()
525        {
526            return; // already started
527        }
528
529        tracing::info!("🔄 Starting reconnect manager for signaling client");
530
531        let client = Arc::downgrade(self);
532        let notify = self.reconnect_notify.clone();
533
534        tokio::spawn(async move {
535            loop {
536                let reconnect_requested = tokio::select! {
537                    _ = notify.notified() => true,
538                    _ = tokio::time::sleep(Duration::from_secs(30)) => false,
539                };
540
541                if !reconnect_requested && client.upgrade().is_none() {
542                    break;
543                }
544                if !reconnect_requested {
545                    continue;
546                }
547
548                let Some(client) = client.upgrade() else {
549                    break;
550                };
551
552                if !client.config.reconnect_config.enabled {
553                    break;
554                }
555
556                if Arc::strong_count(&client) <= 1 {
557                    break;
558                }
559
560                // Run reconnect cycle with exponential backoff
561                client.run_reconnect_cycle().await;
562            }
563        });
564    }
565
566    /// Execute one full reconnect cycle with exponential backoff + jitter.
567    async fn run_reconnect_cycle(self: &Arc<Self>) {
568        use actr_framework::ExponentialBackoff;
569
570        let cfg = &self.config.reconnect_config;
571        let generation = self.reconnect_generation.load(Ordering::Acquire);
572
573        if Arc::strong_count(self) <= 1 {
574            tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
575            return;
576        }
577
578        if self.auto_reconnect_cancelled(generation) {
579            tracing::debug!("Skipping signaling auto-reconnect cycle after explicit disconnect");
580            return;
581        }
582
583        if self.connected.load(Ordering::Acquire) {
584            tracing::debug!("🔎 Probing connected signaling before reconnect cycle");
585            match self
586                .probe_alive(Duration::from_secs(PONG_TIMEOUT_SECS))
587                .await
588            {
589                Ok(()) => {
590                    tracing::debug!("Signaling probe succeeded, skipping reconnect cycle");
591                    return;
592                }
593                Err(e) => {
594                    tracing::warn!("Signaling probe failed before reconnect: {e}");
595                    if let Err(disconnect_err) = self.disconnect_internal(false).await {
596                        tracing::warn!(
597                            "⚠️ Disconnect cleanup failed after failed probe (non-fatal): {disconnect_err}"
598                        );
599                    }
600                }
601            }
602        }
603
604        'cycle: loop {
605            let backoff_reset_generation = self
606                .reconnect_backoff_reset_generation
607                .load(Ordering::Acquire);
608            let backoff = ExponentialBackoff::builder()
609                .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
610                .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
611                .max_retries(cfg.max_attempts)
612                .with_jitter()
613                .build();
614
615            let mut attempt: u32 = 0;
616
617            for delay in backoff {
618                if Arc::strong_count(self) <= 1 {
619                    tracing::debug!("Stopping signaling auto-reconnect cycle after owner drop");
620                    return;
621                }
622
623                if self.auto_reconnect_cancelled(generation) {
624                    tracing::debug!(
625                        "Stopping signaling auto-reconnect cycle after explicit disconnect"
626                    );
627                    return;
628                }
629
630                if self.connected.load(Ordering::Acquire) {
631                    tracing::debug!("Already connected, aborting reconnect cycle");
632                    return;
633                }
634
635                attempt += 1;
636                let _ = self.event_tx.send(SignalingEvent::ConnectStart { attempt });
637
638                match self.connect_once_for_auto_reconnect(generation).await {
639                    Ok(()) => {
640                        tracing::info!("✅ Signaling reconnect succeeded on attempt {attempt}");
641                        return;
642                    }
643                    Err(e) => {
644                        if self.auto_reconnect_cancelled(generation) {
645                            tracing::debug!(
646                                "Stopping signaling auto-reconnect cycle after explicit disconnect"
647                            );
648                            return;
649                        }
650
651                        tracing::warn!(
652                            "❌ Reconnect attempt {attempt} failed: {e}, retrying in {delay:?}"
653                        );
654                        tokio::select! {
655                            _ = tokio::time::sleep(delay) => {}
656                            _ = self.reconnect_notify.notified() => {
657                                tracing::debug!("Explicit reconnect request interrupted reconnect backoff");
658                            }
659                        }
660                        if self
661                            .reconnect_backoff_reset_generation
662                            .load(Ordering::Acquire)
663                            != backoff_reset_generation
664                        {
665                            tracing::debug!(
666                                "Restarting signaling reconnect backoff after external recovery event"
667                            );
668                            continue 'cycle;
669                        }
670                        if Arc::strong_count(self) <= 1 {
671                            tracing::debug!(
672                                "Stopping signaling auto-reconnect cycle after owner drop"
673                            );
674                            return;
675                        }
676                        if self.auto_reconnect_cancelled(generation) {
677                            tracing::debug!(
678                                "Stopping signaling auto-reconnect cycle after explicit disconnect"
679                            );
680                            return;
681                        }
682                    }
683                }
684            }
685
686            // All retries exhausted — enter cooldown, then allow future wakeups
687            tracing::error!("Reconnect failed after {attempt} attempts, entering cooldown");
688            let cooldown = std::time::Duration::from_secs(cfg.max_delay.max(1) * 2);
689            tokio::select! {
690                _ = tokio::time::sleep(cooldown) => {}
691                _ = self.reconnect_notify.notified() => {
692                    tracing::debug!("Explicit reconnect request interrupted reconnect cooldown");
693                }
694            }
695            if self
696                .reconnect_backoff_reset_generation
697                .load(Ordering::Acquire)
698                != backoff_reset_generation
699            {
700                tracing::debug!(
701                    "Restarting signaling reconnect backoff after external recovery event"
702                );
703                continue 'cycle;
704            }
705            if self.auto_reconnect_cancelled(generation) {
706                tracing::debug!(
707                    "Signaling auto-reconnect cooldown ended after explicit disconnect suppression"
708                );
709            }
710            // After cooldown, the loop returns to notify.notified() and can be woken again
711            return;
712        }
713    }
714
715    /// Test-only convenience constructor: create, connect, and return a client.
716    ///
717    /// The returned client has no `actor_id` / `credential_state` bound, so the
718    /// signaling URL carries no identity query parameters — mock-actrix will
719    /// not bind the WebSocket to any registry entry. Use this only for tests
720    /// that explicitly exercise the unbound path; integration tests that need
721    /// peer-to-peer relay should use [`Self::connect_to_with_identity`].
722    #[cfg(feature = "test-utils")]
723    pub async fn connect_to(url: &str) -> NetworkResult<Arc<Self>> {
724        let config = SignalingConfig {
725            server_url: url.parse()?,
726            connection_timeout: 5,
727            heartbeat_interval: 30,
728            reconnect_config: ReconnectConfig::default(),
729            auth_config: None,
730            webrtc_role: None,
731        };
732
733        let client = Arc::new(Self::new(config));
734        client.start_reconnect_manager();
735        client.connect().await?;
736        Ok(client)
737    }
738
739    /// Test-only constructor that pins identity *before* the WebSocket
740    /// handshake so mock-actrix can bind the connection to the actor on
741    /// register (`?actor_id=…` query parameter). Required by integration
742    /// tests that rely on peer-to-peer signaling relay — without this binding
743    /// mock-actrix drops outbound relays for "unbound target".
744    #[cfg(feature = "test-utils")]
745    pub async fn connect_to_with_identity(
746        url: &str,
747        actor_id: ActrId,
748        credential_state: CredentialState,
749    ) -> NetworkResult<Arc<Self>> {
750        let config = SignalingConfig {
751            server_url: url.parse()?,
752            connection_timeout: 5,
753            heartbeat_interval: 30,
754            reconnect_config: ReconnectConfig::default(),
755            auth_config: None,
756            webrtc_role: None,
757        };
758
759        let client = Arc::new(Self::new(config));
760        client.set_actor_id(actor_id).await;
761        client.set_credential_state(credential_state).await;
762        client.start_reconnect_manager();
763        client.connect().await?;
764        Ok(client)
765    }
766
767    /// alive integrate down a envelope ID
768    async fn next_envelope_id(&self) -> String {
769        let mut counter = self.envelope_counter.lock().await;
770        *counter += 1;
771        format!("env-{}", *counter)
772    }
773
774    /// Create SignalingEnvelope
775    async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
776        SignalingEnvelope {
777            envelope_version: 1,
778            envelope_id: self.next_envelope_id().await,
779            reply_for: None,
780            timestamp: prost_types::Timestamp {
781                seconds: chrono::Utc::now().timestamp(),
782                nanos: 0,
783            },
784            traceparent: None,
785            tracestate: None,
786            flow: Some(flow),
787        }
788    }
789
790    /// Reset inbound channel for a fresh session (useful after disconnects).
791    async fn reset_inbound_channel(&self) {
792        self.drop_pending_replies("inbound channel reset").await;
793        self.drop_pending_pongs("inbound channel reset").await;
794
795        let (tx, rx) = mpsc::unbounded_channel();
796        *self.inbound_tx.lock().await = tx;
797        *self.inbound_rx.lock().await = rx;
798    }
799
800    async fn drop_pending_replies(&self, reason: &'static str) {
801        let dropped = {
802            let mut pending = self.pending_replies.lock().await;
803            let dropped = pending.len();
804            pending.clear();
805            dropped
806        };
807
808        if dropped > 0 {
809            tracing::debug!(reason, dropped, "Dropping pending signaling reply waiters");
810        }
811    }
812
813    async fn drop_pending_pongs(&self, reason: &'static str) {
814        let dropped = {
815            let mut pending = self.pending_pongs.lock().await;
816            let dropped = pending.len();
817            pending.clear();
818            dropped
819        };
820
821        if dropped > 0 {
822            tracing::debug!(reason, dropped, "Dropping pending signaling pong waiters");
823        }
824    }
825
826    /// Build signaling URL with actor identity and Ed25519 credential params for authentication.
827    ///
828    /// Passes `actor_id`, `key_id`, `claims` (base64), `signature` (base64) as URL query params
829    /// so the signaling server can validate the credential before upgrading the WebSocket.
830    async fn build_url_with_identity(&self) -> Url {
831        let mut url = self.config.server_url.clone();
832        let actor_id_opt = self.actor_id.lock().await.clone();
833        if let Some(actor_id) = actor_id_opt {
834            let actor_str = actr_protocol::ActrId::to_string_repr(&actor_id);
835            url.query_pairs_mut().append_pair("actor_id", &actor_str);
836        }
837
838        // Pass Ed25519 credential in URL for initial WS auth
839        let cred_state_opt = self.credential_state.lock().await.clone();
840        if let Some(cred_state) = cred_state_opt {
841            let cred = cred_state.credential().await;
842            let claims_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.claims);
843            let sig_b64 = base64::engine::general_purpose::STANDARD.encode(&cred.signature);
844            url.query_pairs_mut()
845                .append_pair("key_id", &cred.key_id.to_string())
846                .append_pair("claims", &claims_b64)
847                .append_pair("signature", &sig_b64);
848        }
849
850        // Add WebRTC role preference if configured
851        if let Some(role) = &self.config.webrtc_role {
852            url.query_pairs_mut().append_pair("webrtc_role", role);
853        }
854
855        url
856    }
857
858    fn redact_signaling_url_for_log(url: &Url) -> String {
859        let mut redacted = url.clone();
860        let pairs: Vec<(String, String)> = redacted
861            .query_pairs()
862            .map(|(key, value)| {
863                let redacted_value = match key.to_ascii_lowercase().as_str() {
864                    "claims" | "signature" | "token" | "authorization" | "bearer"
865                    | "access_token" | "api_key" => "REDACTED".to_string(),
866                    _ => value.into_owned(),
867                };
868                (key.into_owned(), redacted_value)
869            })
870            .collect();
871
872        redacted.set_query(None);
873        if !pairs.is_empty() {
874            let mut query = redacted.query_pairs_mut();
875            for (key, value) in pairs {
876                query.append_pair(&key, &value);
877            }
878        }
879
880        redacted.to_string()
881    }
882
883    fn auto_reconnect_cancelled(&self, generation: u64) -> bool {
884        self.auto_reconnect_suppressed.load(Ordering::Acquire)
885            || self.reconnect_generation.load(Ordering::Acquire) != generation
886    }
887
888    /// Establish a single signaling WebSocket connection attempt, honoring connection_timeout.
889    ///
890    /// This does not perform any retry logic; callers that want retries should wrap this.
891    async fn establish_connection_once(&self) -> NetworkResult<()> {
892        self.establish_connection_once_with_intent(ConnectIntent::Explicit)
893            .await
894    }
895
896    async fn establish_connection_once_for_auto_reconnect(
897        &self,
898        generation: u64,
899    ) -> NetworkResult<()> {
900        self.establish_connection_once_with_intent(ConnectIntent::AutoReconnect { generation })
901            .await
902    }
903
904    async fn establish_connection_once_with_intent(
905        &self,
906        intent: ConnectIntent,
907    ) -> NetworkResult<()> {
908        // Guard: Check if already connected (handles rare TOCTOU scenarios)
909        if self.connected.load(Ordering::Acquire) {
910            tracing::debug!("Connection already established, skipping establish_connection_once()");
911            return Ok(());
912        }
913
914        let url = self.build_url_with_identity().await;
915        let timeout_secs = self.config.connection_timeout;
916        tracing::debug!(
917            "Establishing connection to URL: {}",
918            Self::redact_signaling_url_for_log(&url)
919        );
920        // After disconnection, data written to the buffer will continue to be sent once the network recovers
921        let config = WebSocketConfig::default().write_buffer_size(0);
922        // Connect with an optional timeout. A timeout of 0 means "no timeout".
923        let connect_result = if timeout_secs == 0 {
924            connect_async_with_config(url.as_str(), Some(config), false).await
925        } else {
926            let timeout_duration = std::time::Duration::from_secs(timeout_secs);
927            tokio::time::timeout(
928                timeout_duration,
929                connect_async_with_config(url.as_str(), Some(config), false),
930            )
931            .await
932            .map_err(|_| {
933                NetworkError::ConnectionError(format!(
934                    "Signaling connect timeout after {}s",
935                    timeout_secs
936                ))
937            })?
938        }?;
939
940        let (ws_stream, _) = connect_result;
941
942        // Split read/write halves and initialize client state
943        let (sink, stream) = ws_stream.split();
944
945        if let ConnectIntent::AutoReconnect { generation } = intent
946            && self.auto_reconnect_cancelled(generation)
947        {
948            tracing::debug!(
949                generation,
950                "Discarding completed signaling auto-reconnect after explicit disconnect"
951            );
952            let mut sink = sink;
953            if let Err(e) = sink.close().await {
954                tracing::warn!(
955                    "Signaling auto-reconnect socket close failed after cancellation: {}",
956                    e
957                );
958            }
959            return Err(NetworkError::ConnectionError(
960                "Signaling auto-reconnect was cancelled by explicit disconnect".to_string(),
961            ));
962        }
963
964        *self.ws_sink.lock().await = Some(sink);
965        *self.ws_stream.lock().await = Some(stream);
966        self.connected.store(true, Ordering::Release);
967        self.auto_reconnect_suppressed
968            .store(false, Ordering::Release);
969        self.last_pong.store(current_unix_secs(), Ordering::Release);
970        // Invoke hook synchronously, then broadcast for other subscribers
971        self.invoke_hook(HookEvent::SignalingConnected).await;
972        let _ = self.event_tx.send(SignalingEvent::Connected);
973
974        self.stats.connections.fetch_add(1, Ordering::Relaxed);
975
976        Ok(())
977    }
978
979    /// Connect to signaling server with retry and exponential backoff based on reconnect_config.
980    async fn connect_with_retries(&self) -> NetworkResult<()> {
981        use actr_framework::ExponentialBackoff;
982
983        let cfg = &self.config.reconnect_config;
984
985        // If reconnect is disabled, just attempt once.
986        if !cfg.enabled {
987            return self.connect_once().await;
988        }
989
990        let mut last_err = None;
991
992        'cycle: loop {
993            let backoff_reset_generation = self
994                .reconnect_backoff_reset_generation
995                .load(Ordering::Acquire);
996            let backoff = ExponentialBackoff::builder()
997                .initial_delay(std::time::Duration::from_secs(cfg.initial_delay.max(1)))
998                .max_delay(std::time::Duration::from_secs(cfg.max_delay.max(1)))
999                .max_retries(cfg.max_attempts)
1000                .with_jitter()
1001                .build();
1002
1003            // First attempt immediately (delay = 0), subsequent delays from backoff
1004            for (attempt, delay) in std::iter::once(std::time::Duration::ZERO)
1005                .chain(backoff)
1006                .enumerate()
1007            {
1008                let attempt = attempt as u32 + 1;
1009                self.invoke_hook(HookEvent::SignalingConnectStart { attempt })
1010                    .await;
1011                if delay > std::time::Duration::ZERO {
1012                    tracing::info!("Retry signaling connect after {delay:?} (attempt {attempt})");
1013                    tokio::select! {
1014                        _ = tokio::time::sleep(delay) => {}
1015                        _ = self.reconnect_notify.notified() => {
1016                            tracing::debug!("Explicit reconnect request interrupted signaling connect backoff");
1017                        }
1018                    }
1019                    if self
1020                        .reconnect_backoff_reset_generation
1021                        .load(Ordering::Acquire)
1022                        != backoff_reset_generation
1023                    {
1024                        tracing::debug!(
1025                            "Restarting explicit signaling connect backoff after external recovery event"
1026                        );
1027                        continue 'cycle;
1028                    }
1029                }
1030
1031                match self.connect_once().await {
1032                    Ok(()) => return Ok(()),
1033                    Err(e) => {
1034                        tracing::warn!("Signaling connect attempt {attempt} failed: {e:?}");
1035                        last_err = Some(e);
1036                        if self
1037                            .reconnect_backoff_reset_generation
1038                            .load(Ordering::Acquire)
1039                            != backoff_reset_generation
1040                        {
1041                            tracing::debug!(
1042                                "Restarting explicit signaling connect backoff after external recovery event"
1043                            );
1044                            continue 'cycle;
1045                        }
1046                    }
1047                }
1048            }
1049
1050            let total = cfg.max_attempts + 1; // backoff max_retries + first attempt
1051            tracing::error!("Signaling connect failed after {total} attempts, giving up");
1052            return Err(last_err.unwrap_or_else(|| {
1053                NetworkError::ConnectionError("All connection attempts failed".to_string())
1054            }));
1055        }
1056    }
1057
1058    /// Send envelope and wait for response with timeout and error handling.
1059    #[cfg_attr(
1060        feature = "opentelemetry",
1061        tracing::instrument(skip_all, fields(envelope_id = %envelope.envelope_id))
1062    )]
1063    async fn send_envelope_and_wait_response(
1064        &self,
1065        envelope: SignalingEnvelope,
1066    ) -> NetworkResult<SignalingEnvelope> {
1067        let reply_for = envelope.envelope_id.clone();
1068
1069        // Register waiter before sending
1070        let (tx, rx) = oneshot::channel();
1071        self.pending_replies
1072            .lock()
1073            .await
1074            .insert(reply_for.clone(), tx);
1075
1076        if let Err(e) = self.send_envelope(envelope).await {
1077            // Cleanup waiter on immediate send failure to avoid leaks.
1078            self.pending_replies.lock().await.remove(&reply_for);
1079            return Err(e);
1080        }
1081
1082        let result =
1083            tokio::time::timeout(std::time::Duration::from_secs(RESPONSE_TIMEOUT_SECS), rx).await;
1084        // Clean up waiter on timeout
1085        if result.is_err() {
1086            self.pending_replies.lock().await.remove(&reply_for);
1087        }
1088
1089        let response_envelope = result
1090            .map_err(|_| {
1091                NetworkError::ConnectionError(
1092                    "Timed out waiting for signaling response".to_string(),
1093                )
1094            })?
1095            .map_err(|_| {
1096                NetworkError::ConnectionError(
1097                    "Receiver dropped while waiting for signaling response".to_string(),
1098                )
1099            })?;
1100
1101        Ok(response_envelope)
1102    }
1103
1104    /// Spawn background receiver to demux envelopes by reply_for.
1105    async fn start_receiver(&self) {
1106        let mut stream_guard = self.ws_stream.lock().await;
1107        if stream_guard.is_none() {
1108            return;
1109        }
1110
1111        let mut stream = stream_guard.take().expect("stream exists");
1112        let pending = self.pending_replies.clone();
1113        let inbound_tx = { self.inbound_tx.lock().await.clone() };
1114        let stats = self.stats.clone();
1115        let connected = self.connected.clone();
1116        let event_tx = self.event_tx.clone();
1117        let last_pong = self.last_pong.clone();
1118        let pending_pongs = self.pending_pongs.clone();
1119        let reconnect_notify = self.reconnect_notify.clone();
1120        let reconnect_enabled = self.config.reconnect_config.enabled;
1121        let hook_callback = self.hook_callback.get().cloned();
1122        let handle = tokio::spawn(async move {
1123            while let Some(msg) = stream.next().await {
1124                match msg {
1125                    Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
1126                        // Any inbound traffic counts as liveness
1127                        last_pong.store(current_unix_secs(), Ordering::Release);
1128                        match SignalingEnvelope::decode(&data[..]) {
1129                            Ok(envelope) => {
1130                                #[cfg(feature = "opentelemetry")]
1131                                let span = {
1132                                    let span = tracing::info_span!("signaling.receive_envelope", envelope_id = %envelope.envelope_id);
1133                                    span.set_parent(extract_trace_context(&envelope));
1134                                    span
1135                                };
1136
1137                                stats.messages_received.fetch_add(1, Ordering::Relaxed);
1138                                tracing::debug!("Received message: {:?}", envelope);
1139                                if let Some(reply_for) = envelope.reply_for.clone() {
1140                                    if let Some(sender) = pending.lock().await.remove(&reply_for) {
1141                                        #[cfg(feature = "opentelemetry")]
1142                                        let _ = span.enter();
1143                                        if let Err(e) = sender.send(envelope) {
1144                                            stats.errors.fetch_add(1, Ordering::Relaxed);
1145                                            tracing::warn!(
1146                                                "Failed to send reply envelope to waiter: {e:?}",
1147                                            );
1148                                        }
1149                                        continue;
1150                                    }
1151                                }
1152                                tracing::debug!(
1153                                    "Unmatched or push message -> forward to inbound channel"
1154                                );
1155                                // Unmatched or push message -> forward to inbound channel
1156                                if let Err(e) = inbound_tx.send(envelope) {
1157                                    stats.errors.fetch_add(1, Ordering::Relaxed);
1158                                    tracing::warn!(
1159                                        "Failed to send envelope to inbound channel: {e:?}"
1160                                    );
1161                                }
1162                            }
1163                            Err(e) => {
1164                                stats.errors.fetch_add(1, Ordering::Relaxed);
1165                                tracing::warn!("Failed to decode SignalingEnvelope: {e}");
1166                            }
1167                        }
1168                    }
1169                    Ok(tokio_tungstenite::tungstenite::Message::Pong(payload)) => {
1170                        tracing::debug!("Received pong");
1171                        last_pong.store(current_unix_secs(), Ordering::Release);
1172                        if let Some(sender) = pending_pongs.lock().await.remove(&payload.to_vec()) {
1173                            let _ = sender.send(());
1174                        }
1175                    }
1176                    Ok(tokio_tungstenite::tungstenite::Message::Ping(_)) => {
1177                        tracing::debug!("Received ping");
1178                        last_pong.store(current_unix_secs(), Ordering::Release);
1179                    }
1180                    Ok(other) => {
1181                        tracing::warn!("Received non-binary frame, ignoring: {other:?}");
1182                    }
1183                    Err(e) => {
1184                        stats.errors.fetch_add(1, Ordering::Relaxed);
1185                        tracing::error!("Signaling receive error: {e}");
1186                        break;
1187                    }
1188                }
1189            }
1190
1191            tracing::warn!("Stream terminated");
1192            // If explicit disconnect already marked the client disconnected,
1193            // do not start an automatic reconnect cycle for the intentional
1194            // close. The disconnect path publishes its own Manual event.
1195            let was_connected = connected.swap(false, Ordering::AcqRel);
1196            Self::publish_disconnected_transition(
1197                was_connected,
1198                &stats,
1199                &event_tx,
1200                hook_callback,
1201                DisconnectReason::StreamEnded,
1202                reconnect_enabled.then_some(&reconnect_notify),
1203            )
1204            .await;
1205            pending_pongs.lock().await.clear();
1206        });
1207
1208        *self.receiver_task.lock().await = Some(handle);
1209    }
1210
1211    /// Spawn background ping task to detect half-open connections where writes do not fail but peer is gone.
1212    /// fixme: merge to heartbeat task
1213    async fn start_ping_task(&self) {
1214        let mut existing = self.ping_task.lock().await;
1215        if let Some(handle) = existing.as_ref() {
1216            if handle.is_finished() {
1217                existing.take();
1218            } else {
1219                return;
1220            }
1221        }
1222
1223        let sink = self.ws_sink.clone();
1224        let connected = self.connected.clone();
1225        let stats = self.stats.clone();
1226        let event_tx = self.event_tx.clone();
1227        let last_pong = self.last_pong.clone();
1228        let receiver_task_clone = Arc::clone(&self.receiver_task);
1229        let reconnect_notify = self.reconnect_notify.clone();
1230        let reconnect_enabled = self.config.reconnect_config.enabled;
1231        let hook_callback = self.hook_callback.get().cloned();
1232
1233        let handle = tokio::spawn(async move {
1234            loop {
1235                tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
1236
1237                if !connected.load(Ordering::Acquire) {
1238                    break;
1239                }
1240
1241                // Send ping; mark disconnect on failure.
1242                let mut disconnect_reason = None;
1243                {
1244                    let mut sink_guard = sink.lock().await;
1245                    if let Some(sink) = sink_guard.as_mut() {
1246                        match tokio::time::timeout(
1247                            std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1248                            sink.send(tokio_tungstenite::tungstenite::Message::Ping(
1249                                Vec::new().into(),
1250                            )),
1251                        )
1252                        .await
1253                        {
1254                            Ok(Ok(())) => {}
1255                            Ok(Err(e)) => {
1256                                tracing::warn!("Signaling ping send failed: {e}");
1257                                disconnect_reason = Some(DisconnectReason::PingSendFailed);
1258                            }
1259                            Err(_) => {
1260                                tracing::warn!("Signaling ping send timed out");
1261                                disconnect_reason = Some(DisconnectReason::PingSendFailed);
1262                            }
1263                        }
1264                    } else {
1265                        tracing::warn!("Signaling not connected");
1266                        disconnect_reason = Some(DisconnectReason::PingSendFailed);
1267                    }
1268                }
1269
1270                if let Some(reason) = disconnect_reason {
1271                    let was_connected = connected.swap(false, Ordering::AcqRel);
1272                    Self::publish_disconnected_transition(
1273                        was_connected,
1274                        &stats,
1275                        &event_tx,
1276                        hook_callback.clone(),
1277                        reason,
1278                        reconnect_enabled.then_some(&reconnect_notify),
1279                    )
1280                    .await;
1281                    break;
1282                }
1283
1284                // Check for stale pong
1285                let now = current_unix_secs();
1286                let last = last_pong.load(Ordering::Acquire);
1287                if now.saturating_sub(last) > PONG_TIMEOUT_SECS {
1288                    tracing::warn!(
1289                        "Signaling pong timeout (last seen {}s ago), marking disconnected",
1290                        now.saturating_sub(last)
1291                    );
1292                    if let Some(handle) = receiver_task_clone.lock().await.take() {
1293                        handle.abort();
1294                    }
1295                    let was_connected = connected.swap(false, Ordering::AcqRel);
1296                    Self::publish_disconnected_transition(
1297                        was_connected,
1298                        &stats,
1299                        &event_tx,
1300                        hook_callback.clone(),
1301                        DisconnectReason::PongTimeout,
1302                        reconnect_enabled.then_some(&reconnect_notify),
1303                    )
1304                    .await;
1305                    break;
1306                }
1307            }
1308        });
1309
1310        *existing = Some(handle);
1311    }
1312
1313    async fn disconnect_internal(&self, suppress_auto_reconnect: bool) -> NetworkResult<()> {
1314        if suppress_auto_reconnect {
1315            self.reconnect_generation.fetch_add(1, Ordering::AcqRel);
1316            self.auto_reconnect_suppressed
1317                .store(true, Ordering::Release);
1318            self.reconnect_notify.notify_waiters();
1319        }
1320
1321        self.drop_pending_replies("signaling disconnect").await;
1322        self.drop_pending_pongs("signaling disconnect").await;
1323        let was_connected = self.connected.swap(false, Ordering::AcqRel);
1324
1325        // Stop background tasks before taking the WebSocket sink/stream locks.
1326        // A ping or receiver task can be inside a socket operation while holding
1327        // one of those locks; aborting first keeps disconnect from waiting on
1328        // the task it is about to shut down.
1329        let ping_handle = match tokio::time::timeout(
1330            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1331            self.ping_task.lock(),
1332        )
1333        .await
1334        {
1335            Ok(mut task_guard) => task_guard.take(),
1336            Err(_) => {
1337                tracing::warn!("Timed out waiting for signaling ping task lock during disconnect");
1338                None
1339            }
1340        };
1341        if let Some(handle) = ping_handle {
1342            handle.abort();
1343        }
1344
1345        let receiver_handle = match tokio::time::timeout(
1346            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1347            self.receiver_task.lock(),
1348        )
1349        .await
1350        {
1351            Ok(mut task_guard) => task_guard.take(),
1352            Err(_) => {
1353                tracing::warn!(
1354                    "Timed out waiting for signaling receiver task lock during disconnect"
1355                );
1356                None
1357            }
1358        };
1359        if let Some(handle) = receiver_handle {
1360            handle.abort();
1361        }
1362
1363        // Fetch and close the sink without holding the mutex during the close
1364        // await. The lock itself is bounded because a stalled send can hold it
1365        // on broken mobile network transitions.
1366        let sink = match tokio::time::timeout(
1367            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1368            self.ws_sink.lock(),
1369        )
1370        .await
1371        {
1372            Ok(mut sink_guard) => sink_guard.take(),
1373            Err(_) => {
1374                tracing::warn!(
1375                    "Timed out waiting for signaling WebSocket sink lock during disconnect"
1376                );
1377                None
1378            }
1379        };
1380
1381        if let Some(mut sink) = sink {
1382            match tokio::time::timeout(
1383                std::time::Duration::from_secs(DISCONNECT_CLOSE_TIMEOUT_SECS),
1384                sink.close(),
1385            )
1386            .await
1387            {
1388                Ok(Ok(())) => {}
1389                Ok(Err(e)) => {
1390                    tracing::warn!("Signaling WebSocket close failed during disconnect: {}", e);
1391                }
1392                Err(_) => {
1393                    tracing::warn!(
1394                        "Signaling WebSocket close timed out during disconnect; continuing cleanup"
1395                    );
1396                }
1397            }
1398        }
1399
1400        match tokio::time::timeout(
1401            std::time::Duration::from_secs(DISCONNECT_LOCK_TIMEOUT_SECS),
1402            self.ws_stream.lock(),
1403        )
1404        .await
1405        {
1406            Ok(mut stream_guard) => {
1407                stream_guard.take();
1408            }
1409            Err(_) => {
1410                tracing::warn!(
1411                    "Timed out waiting for signaling WebSocket stream lock during disconnect"
1412                );
1413            }
1414        }
1415
1416        self.reset_inbound_channel().await;
1417
1418        // Invoke hook synchronously, then broadcast for other subscribers
1419        Self::publish_disconnected_transition(
1420            was_connected,
1421            &self.stats,
1422            &self.event_tx,
1423            self.hook_callback.get().cloned(),
1424            DisconnectReason::Manual,
1425            None,
1426        )
1427        .await;
1428
1429        Ok(())
1430    }
1431
1432    async fn connect_once_for_auto_reconnect(&self, generation: u64) -> NetworkResult<()> {
1433        if self.auto_reconnect_cancelled(generation) {
1434            return Err(NetworkError::ConnectionError(
1435                "Signaling auto-reconnect was cancelled".to_string(),
1436            ));
1437        }
1438
1439        if self.connected.load(Ordering::Acquire) {
1440            tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1441            return Ok(());
1442        }
1443
1444        match self
1445            .connecting
1446            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1447        {
1448            Ok(_) => {}
1449            Err(_) => {
1450                if self.connected.load(Ordering::Acquire) {
1451                    tracing::debug!("Already connected, skipping auto-reconnect connect_once()");
1452                    return Ok(());
1453                }
1454
1455                tracing::debug!(
1456                    "Another connection attempt in progress, waiting for state change..."
1457                );
1458                let result = self.wait_for_connection_result().await;
1459                if self.auto_reconnect_cancelled(generation) {
1460                    return Err(NetworkError::ConnectionError(
1461                        "Signaling auto-reconnect was cancelled".to_string(),
1462                    ));
1463                }
1464                return result;
1465            }
1466        }
1467
1468        if self.auto_reconnect_cancelled(generation) {
1469            self.connecting.store(false, Ordering::Release);
1470            return Err(NetworkError::ConnectionError(
1471                "Signaling auto-reconnect was cancelled".to_string(),
1472            ));
1473        }
1474
1475        if self.connected.load(Ordering::Acquire) {
1476            tracing::debug!("Connection completed by another task while acquiring lock");
1477            self.connecting.store(false, Ordering::Release);
1478            return Ok(());
1479        }
1480
1481        tracing::debug!(
1482            generation,
1483            "Acquired connection lock, establishing one auto-reconnect signaling attempt..."
1484        );
1485
1486        let result = self
1487            .establish_connection_once_for_auto_reconnect(generation)
1488            .await;
1489        self.connecting.store(false, Ordering::Release);
1490
1491        match result {
1492            Ok(()) => {
1493                if self.auto_reconnect_cancelled(generation) {
1494                    self.disconnect_internal(false).await?;
1495                    return Err(NetworkError::ConnectionError(
1496                        "Signaling auto-reconnect was cancelled".to_string(),
1497                    ));
1498                }
1499                self.start_receiver().await;
1500                self.start_ping_task().await;
1501                Ok(())
1502            }
1503            Err(e) => {
1504                if !self.auto_reconnect_cancelled(generation) {
1505                    let _ = self.event_tx.send(SignalingEvent::Disconnected {
1506                        reason: DisconnectReason::ConnectionFailed(e.to_string()),
1507                    });
1508                    tracing::error!("Connection attempt failed: {e}");
1509                }
1510                Err(e)
1511            }
1512        }
1513    }
1514
1515    /// Wait for ongoing connection attempt to complete (used when another task is connecting).
1516    ///
1517    /// Uses the broadcast channel to wait for a Connected event without recursion.
1518    async fn wait_for_connection_result(&self) -> NetworkResult<()> {
1519        let mut event_rx = self.event_tx.subscribe();
1520        let deadline = tokio::time::Instant::now()
1521            + std::time::Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS);
1522
1523        loop {
1524            tokio::select! {
1525                _ = tokio::time::sleep_until(deadline) => {
1526                    // Final check before giving up
1527                    if self.connected.load(Ordering::Acquire) {
1528                        tracing::debug!("Connection succeeded just before timeout");
1529                        return Ok(());
1530                    }
1531                    return Err(NetworkError::ConnectionError(
1532                        "Timeout waiting for concurrent connection attempt".to_string(),
1533                    ));
1534                }
1535                result = event_rx.recv() => {
1536                    match result {
1537                        Ok(SignalingEvent::Connected) => {
1538                            tracing::debug!("Connection established by another task");
1539                            return Ok(());
1540                        }
1541                        Ok(SignalingEvent::Disconnected { reason }) => {
1542                            return Err(NetworkError::ConnectionError(format!(
1543                                "Concurrent signaling connection failed: {reason:?}"
1544                            )));
1545                        }
1546                        Ok(_) => continue, // ConnectStart — keep waiting
1547                        Err(broadcast::error::RecvError::Lagged(n)) => {
1548                            tracing::warn!("Event receiver lagged by {n} events");
1549                            // Check current state after lag
1550                            if self.connected.load(Ordering::Acquire) {
1551                                return Ok(());
1552                            }
1553                            continue;
1554                        }
1555                        Err(broadcast::error::RecvError::Closed) => {
1556                            return Err(NetworkError::ConnectionError(
1557                                "Event channel closed while waiting for connection".to_string(),
1558                            ));
1559                        }
1560                    }
1561                }
1562            }
1563        }
1564    }
1565}
1566
1567#[async_trait]
1568impl SignalingClient for WebSocketSignalingClient {
1569    async fn connect(&self) -> NetworkResult<()> {
1570        if self.connected.load(Ordering::Acquire) {
1571            tracing::debug!("Already connected, skipping connect()");
1572            return Ok(());
1573        }
1574
1575        self.connect_with_retries().await
1576    }
1577
1578    async fn connect_once(&self) -> NetworkResult<()> {
1579        loop {
1580            if self.connected.load(Ordering::Acquire) {
1581                tracing::debug!("Already connected, skipping connect_once()");
1582                return Ok(());
1583            }
1584
1585            match self
1586                .connecting
1587                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1588            {
1589                Ok(_) => break,
1590                Err(_) => {
1591                    if self.connected.load(Ordering::Acquire) {
1592                        tracing::debug!("Already connected, skipping connect_once()");
1593                        return Ok(());
1594                    }
1595
1596                    tracing::debug!(
1597                        "Another connection attempt in progress, waiting for state change..."
1598                    );
1599                    match self.wait_for_connection_result().await {
1600                        Ok(()) => return Ok(()),
1601                        Err(e)
1602                            if !self.connected.load(Ordering::Acquire)
1603                                && !self.connecting.load(Ordering::Acquire) =>
1604                        {
1605                            tracing::debug!(
1606                                "Concurrent signaling connection failed; explicit connect_once will retry immediately: {e}"
1607                            );
1608                            continue;
1609                        }
1610                        Err(e) => return Err(e),
1611                    }
1612                }
1613            }
1614        }
1615
1616        if self.connected.load(Ordering::Acquire) {
1617            tracing::debug!("Connection completed by another task while acquiring lock");
1618            self.connecting.store(false, Ordering::Release);
1619            return Ok(());
1620        }
1621
1622        tracing::debug!(
1623            "Acquired connection lock, establishing one signaling connection attempt..."
1624        );
1625
1626        let result = self.establish_connection_once().await;
1627        self.connecting.store(false, Ordering::Release);
1628
1629        match result {
1630            Ok(()) => {
1631                self.start_receiver().await;
1632                self.start_ping_task().await;
1633                Ok(())
1634            }
1635            Err(e) => {
1636                let _ = self.event_tx.send(SignalingEvent::Disconnected {
1637                    reason: DisconnectReason::ConnectionFailed(e.to_string()),
1638                });
1639                tracing::error!("Connection attempt failed: {e}");
1640                Err(e)
1641            }
1642        }
1643    }
1644
1645    fn schedule_auto_reconnect(&self) {
1646        if !self.config.reconnect_config.enabled {
1647            tracing::debug!("Skipping signaling auto-reconnect schedule; config disabled");
1648            return;
1649        }
1650
1651        self.auto_reconnect_suppressed
1652            .store(false, Ordering::Release);
1653        self.reconnect_notify.notify_one();
1654    }
1655
1656    fn schedule_auto_reconnect_reset_backoff(&self) {
1657        if !self.config.reconnect_config.enabled {
1658            tracing::debug!("Skipping signaling auto-reconnect schedule; config disabled");
1659            return;
1660        }
1661
1662        self.auto_reconnect_suppressed
1663            .store(false, Ordering::Release);
1664        self.reconnect_backoff_reset_generation
1665            .fetch_add(1, Ordering::AcqRel);
1666        self.reconnect_notify.notify_one();
1667    }
1668
1669    async fn disconnect(&self) -> NetworkResult<()> {
1670        self.disconnect_internal(true).await
1671    }
1672
1673    async fn probe_alive(&self, timeout: Duration) -> NetworkResult<()> {
1674        if !self.connected.load(Ordering::Acquire) {
1675            return Err(NetworkError::ConnectionError(
1676                "Signaling client is not connected".to_string(),
1677            ));
1678        }
1679
1680        let probe_id = self.probe_counter.fetch_add(1, Ordering::Relaxed) + 1;
1681        let payload =
1682            format!("actr-signaling-probe-{probe_id}-{}", current_unix_secs()).into_bytes();
1683        let (tx, rx) = oneshot::channel();
1684        self.pending_pongs.lock().await.insert(payload.clone(), tx);
1685
1686        let send_timeout = std::cmp::min(
1687            timeout,
1688            std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
1689        );
1690        let ping_payload = payload.clone();
1691        let send_result = tokio::time::timeout(send_timeout, async {
1692            let mut sink_guard = self.ws_sink.lock().await;
1693            match sink_guard.as_mut() {
1694                Some(sink) => sink
1695                    .send(tokio_tungstenite::tungstenite::Message::Ping(
1696                        ping_payload.into(),
1697                    ))
1698                    .await
1699                    .map_err(|e| {
1700                        NetworkError::ConnectionError(format!("Signaling probe ping failed: {e}"))
1701                    }),
1702                None => Err(NetworkError::ConnectionError(
1703                    "Signaling probe failed: WebSocket sink is not available".to_string(),
1704                )),
1705            }
1706        })
1707        .await
1708        .unwrap_or_else(|_| {
1709            Err(NetworkError::TimeoutError(format!(
1710                "Timed out sending signaling probe ping after {}ms",
1711                send_timeout.as_millis()
1712            )))
1713        });
1714
1715        if let Err(e) = send_result {
1716            self.pending_pongs.lock().await.remove(&payload);
1717            let was_connected = self.connected.swap(false, Ordering::AcqRel);
1718            Self::publish_disconnected_transition(
1719                was_connected,
1720                &self.stats,
1721                &self.event_tx,
1722                self.hook_callback.get().cloned(),
1723                DisconnectReason::PingSendFailed,
1724                None,
1725            )
1726            .await;
1727            return Err(e);
1728        }
1729
1730        match tokio::time::timeout(timeout, rx).await {
1731            Ok(Ok(())) => {
1732                self.last_pong.store(current_unix_secs(), Ordering::Release);
1733                Ok(())
1734            }
1735            Ok(Err(_)) => {
1736                self.pending_pongs.lock().await.remove(&payload);
1737                Err(NetworkError::ConnectionError(
1738                    "Signaling probe pong waiter dropped".to_string(),
1739                ))
1740            }
1741            Err(_) => {
1742                self.pending_pongs.lock().await.remove(&payload);
1743                Err(NetworkError::TimeoutError(format!(
1744                    "Timed out waiting for signaling probe pong after {}ms",
1745                    timeout.as_millis()
1746                )))
1747            }
1748        }
1749    }
1750
1751    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1752    async fn send_register_request(
1753        &self,
1754        request: RegisterRequest,
1755    ) -> NetworkResult<RegisterResponse> {
1756        // Create PeerToSignaling stream process (Register front )
1757        let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
1758            payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
1759        });
1760
1761        let envelope = self.create_envelope(flow).await;
1762        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1763
1764        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1765        {
1766            if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
1767                server_to_actr.payload
1768            {
1769                return Ok(response);
1770            }
1771        }
1772
1773        Err(NetworkError::ConnectionError(
1774            "Invalid registration response".to_string(),
1775        ))
1776    }
1777
1778    #[cfg_attr(
1779        feature = "opentelemetry",
1780        tracing::instrument(skip_all, fields(actor_id = %actor_id))
1781    )]
1782    async fn send_unregister_request(
1783        &self,
1784        actor_id: ActrId,
1785        credential: AIdCredential,
1786        reason: Option<String>,
1787    ) -> NetworkResult<UnregisterResponse> {
1788        // Build UnregisterRequest payload
1789        let request = UnregisterRequest {
1790            actr_id: actor_id.clone(),
1791            reason,
1792        };
1793
1794        // Wrap into ActrToSignaling flow
1795        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1796            source: actor_id,
1797            credential,
1798            payload: Some(actr_to_signaling::Payload::UnregisterRequest(request)),
1799        });
1800
1801        // Send envelope (fire-and-forget)
1802        let envelope = self.create_envelope(flow).await;
1803        self.send_envelope(envelope).await?;
1804
1805        // Do not wait for UnregisterResponse here because the signaling stream
1806        // is also consumed by WebRtcCoordinator. Waiting could race with that loop
1807        // and lead to spurious timeouts. Treat Unregister as best-effort.
1808        // not wait for the response , because the signaling stream have multi customers use it, fixme: should wait for the response
1809        Ok(UnregisterResponse {
1810            result: Some(actr_protocol::unregister_response::Result::Success(
1811                actr_protocol::unregister_response::UnregisterOk {},
1812            )),
1813        })
1814    }
1815
1816    #[cfg_attr(
1817        feature = "opentelemetry",
1818        tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1819    )]
1820    async fn send_heartbeat(
1821        &self,
1822        actor_id: ActrId,
1823        credential: AIdCredential,
1824        availability: ServiceAvailabilityState,
1825        power_reserve: f32,
1826        mailbox_backlog: f32,
1827    ) -> NetworkResult<Pong> {
1828        let ping = Ping {
1829            availability: availability as i32,
1830            power_reserve,
1831            mailbox_backlog,
1832            sticky_client_ids: vec![], // TODO: Implement sticky session tracking
1833        };
1834
1835        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1836            source: actor_id,
1837            credential,
1838            payload: Some(actr_to_signaling::Payload::Ping(ping)),
1839        });
1840
1841        let envelope = self.create_envelope(flow).await;
1842        let reply_for = envelope.envelope_id.clone();
1843
1844        // Register waiter before sending
1845        let (tx, rx) = oneshot::channel();
1846        self.pending_replies
1847            .lock()
1848            .await
1849            .insert(reply_for.clone(), tx);
1850
1851        if let Err(e) = self.send_envelope(envelope).await {
1852            // Cleanup waiter on immediate send failure to avoid leaks.
1853            self.pending_replies.lock().await.remove(&reply_for);
1854            return Err(e);
1855        }
1856
1857        // Wait for response
1858        let response_envelope = rx.await.map_err(|_| {
1859            NetworkError::ConnectionError(
1860                "Receiver dropped while waiting for heartbeat response".to_string(),
1861            )
1862        })?;
1863
1864        // Extract Pong from response, or handle Error response
1865        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1866        {
1867            match server_to_actr.payload {
1868                Some(signaling_to_actr::Payload::Pong(pong)) => {
1869                    return Ok(pong);
1870                }
1871                Some(signaling_to_actr::Payload::Error(err)) => {
1872                    // Check if it's a credential expired error (401)
1873                    if err.code == 401 {
1874                        return Err(NetworkError::CredentialExpired(err.message));
1875                    }
1876                    return Err(NetworkError::AuthenticationError(format!(
1877                        "{} ({})",
1878                        err.message, err.code
1879                    )));
1880                }
1881                _ => {}
1882            }
1883        }
1884
1885        Err(NetworkError::ConnectionError(
1886            "Received response but not a Pong message".to_string(),
1887        ))
1888    }
1889
1890    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
1891    async fn send_route_candidates_request(
1892        &self,
1893        actor_id: ActrId,
1894        credential: AIdCredential,
1895        request: RouteCandidatesRequest,
1896    ) -> NetworkResult<RouteCandidatesResponse> {
1897        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1898            source: actor_id,
1899            credential,
1900            payload: Some(actr_to_signaling::Payload::RouteCandidatesRequest(request)),
1901        });
1902
1903        let envelope = self.create_envelope(flow).await;
1904        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1905
1906        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1907        {
1908            match server_to_actr.payload {
1909                Some(signaling_to_actr::Payload::RouteCandidatesResponse(response)) => {
1910                    return Ok(response);
1911                }
1912                Some(signaling_to_actr::Payload::Error(err)) => {
1913                    return Err(NetworkError::ServiceDiscoveryError(format!(
1914                        "{} ({})",
1915                        err.message, err.code
1916                    )));
1917                }
1918                _ => {}
1919            }
1920        }
1921
1922        Err(NetworkError::ConnectionError(
1923            "Invalid route candidates response".to_string(),
1924        ))
1925    }
1926
1927    async fn get_signing_key(
1928        &self,
1929        actor_id: ActrId,
1930        credential: AIdCredential,
1931        key_id: u32,
1932    ) -> NetworkResult<(u32, Vec<u8>)> {
1933        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1934            source: actor_id,
1935            credential,
1936            payload: Some(actr_to_signaling::Payload::GetSigningKeyRequest(
1937                GetSigningKeyRequest { key_id },
1938            )),
1939        });
1940
1941        let envelope = self.create_envelope(flow).await;
1942        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1943
1944        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1945        {
1946            match server_to_actr.payload {
1947                Some(signaling_to_actr::Payload::GetSigningKeyResponse(resp)) => {
1948                    return Ok((resp.key_id, resp.pubkey.to_vec()));
1949                }
1950                Some(signaling_to_actr::Payload::Error(err)) => {
1951                    return Err(NetworkError::ConnectionError(format!(
1952                        "get_signing_key failed: {} ({})",
1953                        err.message, err.code
1954                    )));
1955                }
1956                _ => {}
1957            }
1958        }
1959
1960        Err(NetworkError::ConnectionError(
1961            "get_signing_key: invalid response".to_string(),
1962        ))
1963    }
1964
1965    #[cfg_attr(
1966        feature = "opentelemetry",
1967        tracing::instrument(level = "debug", skip_all, fields(actor_id = %actor_id))
1968    )]
1969    async fn send_credential_update_request(
1970        &self,
1971        actor_id: ActrId,
1972        credential: AIdCredential,
1973    ) -> NetworkResult<RegisterResponse> {
1974        let request = CredentialUpdateRequest {
1975            actr_id: actor_id.clone(),
1976        };
1977
1978        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
1979            source: actor_id,
1980            credential,
1981            payload: Some(actr_to_signaling::Payload::CredentialUpdateRequest(request)),
1982        });
1983
1984        let envelope = self.create_envelope(flow).await;
1985        let response_envelope = self.send_envelope_and_wait_response(envelope).await?;
1986
1987        if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) = response_envelope.flow
1988        {
1989            match server_to_actr.payload {
1990                Some(signaling_to_actr::Payload::RegisterResponse(response)) => {
1991                    return Ok(response);
1992                }
1993                Some(signaling_to_actr::Payload::Error(err)) => {
1994                    return Err(NetworkError::ConnectionError(format!(
1995                        "Credential update failed: {} ({})",
1996                        err.message, err.code
1997                    )));
1998                }
1999                _ => {}
2000            }
2001        }
2002
2003        Err(NetworkError::ConnectionError(
2004            "Invalid credential update response".to_string(),
2005        ))
2006    }
2007
2008    #[cfg_attr(
2009        feature = "opentelemetry",
2010        tracing::instrument(level = "debug", skip_all, fields(envelope_id = %envelope.envelope_id))
2011    )]
2012    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
2013        #[cfg(feature = "opentelemetry")]
2014        let envelope = {
2015            let mut envelope = envelope;
2016            trace::inject_span_context(&tracing::Span::current(), &mut envelope);
2017            envelope
2018        };
2019
2020        // Check connection state first to avoid sending on stale/closed connections
2021        // This prevents "Broken pipe" errors when ws_sink exists but connection is dead
2022        if !self.is_connected() {
2023            return Err(NetworkError::ConnectionError(
2024                "Cannot send: WebSocket not connected".to_string(),
2025            ));
2026        }
2027
2028        let mut sink_guard = self.ws_sink.lock().await;
2029
2030        if let Some(sink) = sink_guard.as_mut() {
2031            // using protobuf binary serialization
2032            let mut buf = Vec::new();
2033            envelope.encode(&mut buf)?;
2034            let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
2035            match tokio::time::timeout(
2036                std::time::Duration::from_secs(SIGNALING_SEND_TIMEOUT_SECS),
2037                sink.send(msg),
2038            )
2039            .await
2040            {
2041                Ok(Ok(())) => {}
2042                Ok(Err(e)) => return Err(e.into()),
2043                Err(_) => {
2044                    self.connected.store(false, Ordering::Release);
2045                    return Err(NetworkError::ConnectionError(
2046                        "Signaling WebSocket send timed out".to_string(),
2047                    ));
2048                }
2049            }
2050
2051            self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
2052            tracing::debug!("Stats: {:?}", self.stats.snapshot());
2053            Ok(())
2054        } else {
2055            Err(NetworkError::ConnectionError("Not connected".to_string()))
2056        }
2057    }
2058
2059    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2060        let mut rx = self.inbound_rx.lock().await;
2061        match rx.recv().await {
2062            Some(envelope) => Ok(Some(envelope)),
2063            None => {
2064                tracing::error!("Inbound channel closed");
2065                Err(NetworkError::ConnectionError(
2066                    "Inbound channel closed".to_string(),
2067                ))
2068            }
2069        }
2070    }
2071
2072    fn is_connected(&self) -> bool {
2073        self.connected.load(Ordering::Acquire)
2074    }
2075
2076    fn get_stats(&self) -> SignalingStats {
2077        self.stats.snapshot()
2078    }
2079
2080    fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2081        self.event_tx.subscribe()
2082    }
2083
2084    async fn set_actor_id(&self, actor_id: ActrId) {
2085        *self.actor_id.lock().await = Some(actor_id);
2086    }
2087
2088    async fn set_credential_state(&self, credential_state: CredentialState) {
2089        *self.credential_state.lock().await = Some(credential_state);
2090    }
2091
2092    async fn clear_identity(&self) {
2093        *self.actor_id.lock().await = None;
2094        *self.credential_state.lock().await = None;
2095    }
2096
2097    fn set_hook_callback(&self, cb: HookCallback) {
2098        let _ = self.hook_callback.set(cb);
2099    }
2100}
2101
2102/// signaling statistics info
2103#[derive(Debug)]
2104pub(crate) struct AtomicSignalingStats {
2105    /// Connect attempts
2106    pub connections: AtomicU64,
2107
2108    /// DisconnectConnect attempts
2109    pub disconnections: AtomicU64,
2110
2111    /// Send'smessage number
2112    pub messages_sent: AtomicU64,
2113
2114    /// Receive'smessage number
2115    pub messages_received: AtomicU64,
2116
2117    /// Send's center skip number
2118    /// TODO: Wire heartbeat counters when heartbeat send/receive paths are instrumented; currently never incremented.
2119    pub heartbeats_sent: AtomicU64,
2120
2121    /// Receive's center skip number
2122    /// TODO: Wire heartbeat counters when heartbeat send/receive paths are instrumented; currently never incremented.
2123    pub heartbeats_received: AtomicU64,
2124
2125    /// Error attempts
2126    pub errors: AtomicU64,
2127}
2128
2129impl Default for AtomicSignalingStats {
2130    fn default() -> Self {
2131        Self {
2132            connections: AtomicU64::new(0),
2133            disconnections: AtomicU64::new(0),
2134            messages_sent: AtomicU64::new(0),
2135            messages_received: AtomicU64::new(0),
2136            heartbeats_sent: AtomicU64::new(0),
2137            heartbeats_received: AtomicU64::new(0),
2138            errors: AtomicU64::new(0),
2139        }
2140    }
2141}
2142
2143/// Snapshot of statistics for serialization and reading
2144#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
2145pub struct SignalingStats {
2146    /// Connect attempts
2147    pub connections: u64,
2148
2149    /// DisconnectConnect attempts
2150    pub disconnections: u64,
2151
2152    /// Send'smessage number
2153    pub messages_sent: u64,
2154
2155    /// Receive'smessage number
2156    pub messages_received: u64,
2157
2158    /// Send's center skip number
2159    pub heartbeats_sent: u64,
2160
2161    /// Receive's center skip number
2162    pub heartbeats_received: u64,
2163
2164    /// Error attempts
2165    pub errors: u64,
2166}
2167
2168impl AtomicSignalingStats {
2169    /// Create a snapshot of current statistics
2170    pub fn snapshot(&self) -> SignalingStats {
2171        SignalingStats {
2172            connections: self.connections.load(Ordering::Relaxed),
2173            disconnections: self.disconnections.load(Ordering::Relaxed),
2174            messages_sent: self.messages_sent.load(Ordering::Relaxed),
2175            messages_received: self.messages_received.load(Ordering::Relaxed),
2176            heartbeats_sent: self.heartbeats_sent.load(Ordering::Relaxed),
2177            heartbeats_received: self.heartbeats_received.load(Ordering::Relaxed),
2178            errors: self.errors.load(Ordering::Relaxed),
2179        }
2180    }
2181}
2182
2183fn current_unix_secs() -> u64 {
2184    use std::time::{SystemTime, UNIX_EPOCH};
2185    SystemTime::now()
2186        .duration_since(UNIX_EPOCH)
2187        .unwrap_or_default()
2188        .as_secs()
2189}
2190
2191#[cfg(test)]
2192mod tests {
2193    use super::*;
2194    use std::future::Future;
2195    use std::pin::Pin;
2196    use std::sync::atomic::{AtomicUsize, Ordering as UsizeOrdering};
2197
2198    /// Simple fake SignalingClient implementation for testing the reconnect helper.
2199    struct FakeSignalingClient {
2200        event_tx: broadcast::Sender<SignalingEvent>,
2201        connected: AtomicBool,
2202        connect_calls: Arc<AtomicUsize>,
2203        actor_id: tokio::sync::Mutex<Option<ActrId>>,
2204        credential_state: tokio::sync::Mutex<Option<CredentialState>>,
2205    }
2206
2207    #[async_trait]
2208    impl SignalingClient for FakeSignalingClient {
2209        async fn connect(&self) -> NetworkResult<()> {
2210            self.connect_calls.fetch_add(1, UsizeOrdering::SeqCst);
2211            Ok(())
2212        }
2213
2214        async fn disconnect(&self) -> NetworkResult<()> {
2215            Ok(())
2216        }
2217
2218        async fn send_register_request(
2219            &self,
2220            _request: RegisterRequest,
2221        ) -> NetworkResult<RegisterResponse> {
2222            unimplemented!("not needed in tests");
2223        }
2224
2225        async fn send_unregister_request(
2226            &self,
2227            _actor_id: ActrId,
2228            _credential: AIdCredential,
2229            _reason: Option<String>,
2230        ) -> NetworkResult<UnregisterResponse> {
2231            unimplemented!("not needed in tests");
2232        }
2233
2234        async fn send_heartbeat(
2235            &self,
2236            _actor_id: ActrId,
2237            _credential: AIdCredential,
2238            _availability: ServiceAvailabilityState,
2239            _power_reserve: f32,
2240            _mailbox_backlog: f32,
2241        ) -> NetworkResult<Pong> {
2242            unimplemented!("not needed in tests");
2243        }
2244
2245        async fn send_route_candidates_request(
2246            &self,
2247            _actor_id: ActrId,
2248            _credential: AIdCredential,
2249            _request: RouteCandidatesRequest,
2250        ) -> NetworkResult<RouteCandidatesResponse> {
2251            unimplemented!("not needed in tests");
2252        }
2253
2254        async fn get_signing_key(
2255            &self,
2256            _actor_id: ActrId,
2257            _credential: AIdCredential,
2258            _key_id: u32,
2259        ) -> NetworkResult<(u32, Vec<u8>)> {
2260            unimplemented!("not needed in tests");
2261        }
2262
2263        async fn send_credential_update_request(
2264            &self,
2265            _actor_id: ActrId,
2266            _credential: AIdCredential,
2267        ) -> NetworkResult<RegisterResponse> {
2268            unimplemented!("not needed in tests");
2269        }
2270
2271        async fn send_envelope(&self, _envelope: SignalingEnvelope) -> NetworkResult<()> {
2272            unimplemented!("not needed in tests");
2273        }
2274
2275        async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
2276            unimplemented!("not needed in tests");
2277        }
2278
2279        fn is_connected(&self) -> bool {
2280            self.connected.load(Ordering::SeqCst)
2281        }
2282
2283        fn get_stats(&self) -> SignalingStats {
2284            SignalingStats::default()
2285        }
2286
2287        fn subscribe_events(&self) -> broadcast::Receiver<SignalingEvent> {
2288            self.event_tx.subscribe()
2289        }
2290
2291        async fn set_actor_id(&self, actor_id: ActrId) {
2292            *self.actor_id.lock().await = Some(actor_id);
2293        }
2294
2295        async fn set_credential_state(&self, credential_state: CredentialState) {
2296            *self.credential_state.lock().await = Some(credential_state);
2297        }
2298
2299        async fn clear_identity(&self) {
2300            *self.actor_id.lock().await = None;
2301            *self.credential_state.lock().await = None;
2302        }
2303    }
2304
2305    fn make_fake_client() -> Arc<FakeSignalingClient> {
2306        let (event_tx, _erx) = broadcast::channel(64);
2307        Arc::new(FakeSignalingClient {
2308            event_tx,
2309            connected: AtomicBool::new(false),
2310            connect_calls: Arc::new(AtomicUsize::new(0)),
2311            actor_id: tokio::sync::Mutex::new(None),
2312            credential_state: tokio::sync::Mutex::new(None),
2313        })
2314    }
2315
2316    /// Helper: create a minimal SignalingConfig with an unreachable URL.
2317    fn make_config() -> SignalingConfig {
2318        SignalingConfig {
2319            server_url: Url::parse("ws://127.0.0.1:1/signaling/ws").unwrap(),
2320            connection_timeout: 2,
2321            heartbeat_interval: 30,
2322            reconnect_config: ReconnectConfig::default(),
2323            auth_config: None,
2324            webrtc_role: None,
2325        }
2326    }
2327
2328    /// Helper: create a WebSocketSignalingClient wrapped in Arc
2329    fn make_ws_client(config: SignalingConfig) -> Arc<WebSocketSignalingClient> {
2330        Arc::new(WebSocketSignalingClient::new(config))
2331    }
2332
2333    #[tokio::test]
2334    async fn probe_alive_times_out_when_sink_lock_is_stalled() {
2335        let client = make_ws_client(make_config());
2336        client.connected.store(true, Ordering::Release);
2337
2338        let _sink_guard = client.ws_sink.lock().await;
2339
2340        let result = tokio::time::timeout(
2341            Duration::from_millis(250),
2342            client.probe_alive(Duration::from_millis(20)),
2343        )
2344        .await
2345        .expect("probe should be bounded by its own timeout");
2346
2347        let err = result.expect_err("stalled sink lock should fail the probe");
2348        assert!(
2349            err.to_string()
2350                .contains("Timed out sending signaling probe ping"),
2351            "unexpected error: {err}"
2352        );
2353        assert!(
2354            !client.is_connected(),
2355            "stalled probe send should mark signaling disconnected"
2356        );
2357        assert_eq!(client.get_stats().disconnections, 1);
2358        assert!(
2359            client.pending_pongs.lock().await.is_empty(),
2360            "failed probe send should remove its pending pong waiter"
2361        );
2362    }
2363
2364    #[tokio::test]
2365    async fn explicit_connect_once_retries_after_concurrent_attempt_fails() {
2366        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2367            .await
2368            .expect("test listener should bind");
2369        let server_url = format!(
2370            "ws://{}/signaling/ws",
2371            listener
2372                .local_addr()
2373                .expect("test listener should have local addr")
2374        );
2375        let server_task = tokio::spawn(async move {
2376            let (stream, _) = listener
2377                .accept()
2378                .await
2379                .expect("test server should accept tcp connection");
2380            let ws_stream = tokio_tungstenite::accept_async(stream)
2381                .await
2382                .expect("test server should complete websocket handshake");
2383            tokio::time::sleep(Duration::from_millis(100)).await;
2384            drop(ws_stream);
2385        });
2386
2387        let mut config = make_config();
2388        config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
2389        config.connection_timeout = 2;
2390        config.reconnect_config = ReconnectConfig {
2391            enabled: false,
2392            ..ReconnectConfig::default()
2393        };
2394        let client = make_ws_client(config);
2395
2396        client.connecting.store(true, Ordering::Release);
2397        let connect_task = {
2398            let client = client.clone();
2399            tokio::spawn(async move { client.connect_once().await })
2400        };
2401
2402        tokio::time::sleep(Duration::from_millis(50)).await;
2403        client.connecting.store(false, Ordering::Release);
2404        let _ = client.event_tx.send(SignalingEvent::Disconnected {
2405            reason: DisconnectReason::ConnectionFailed("simulated auto attempt failed".into()),
2406        });
2407
2408        tokio::time::timeout(Duration::from_secs(2), connect_task)
2409            .await
2410            .expect("explicit connect_once should not wait for auto backoff")
2411            .expect("connect_once task should not panic")
2412            .expect("explicit connect_once should retry after concurrent failure");
2413
2414        assert!(
2415            client.is_connected(),
2416            "explicit recovery connect should establish signaling"
2417        );
2418
2419        client.disconnect().await.ok();
2420        let _ = tokio::time::timeout(Duration::from_secs(1), server_task).await;
2421    }
2422
2423    #[tokio::test]
2424    async fn network_restore_connect_once_preempts_connect_backoff() {
2425        let reserved_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2426            .await
2427            .expect("test listener should reserve a local port");
2428        let addr = reserved_listener
2429            .local_addr()
2430            .expect("reserved listener should have local addr");
2431        drop(reserved_listener);
2432
2433        let mut config = make_config();
2434        config.server_url =
2435            Url::parse(&format!("ws://{addr}/signaling/ws")).expect("test URL should parse");
2436        config.connection_timeout = 1;
2437        config.reconnect_config = ReconnectConfig {
2438            enabled: true,
2439            max_attempts: 10,
2440            initial_delay: 30,
2441            max_delay: 30,
2442            backoff_multiplier: 1.0,
2443        };
2444        let client = make_ws_client(config);
2445        let mut rx = client.subscribe_events();
2446
2447        let long_connect_task = {
2448            let client = client.clone();
2449            tokio::spawn(async move { client.connect().await })
2450        };
2451
2452        tokio::time::timeout(Duration::from_secs(2), async {
2453            loop {
2454                match rx.recv().await {
2455                    Ok(SignalingEvent::Disconnected {
2456                        reason: DisconnectReason::ConnectionFailed(_),
2457                    }) => break,
2458                    Ok(_) => continue,
2459                    Err(e) => panic!("unexpected signaling event receive error: {e}"),
2460                }
2461            }
2462        })
2463        .await
2464        .expect("long connect should fail first attempt and enter backoff");
2465        assert!(
2466            !client.connecting.load(Ordering::Acquire),
2467            "connect() must release connecting while sleeping in backoff"
2468        );
2469
2470        let listener = tokio::net::TcpListener::bind(addr)
2471            .await
2472            .expect("network restore should make the signaling endpoint reachable");
2473        let server_task = tokio::spawn(async move {
2474            let (stream, _) = listener
2475                .accept()
2476                .await
2477                .expect("restored test server should accept tcp connection");
2478            let ws_stream = tokio_tungstenite::accept_async(stream)
2479                .await
2480                .expect("restored test server should complete websocket handshake");
2481            tokio::time::sleep(Duration::from_millis(250)).await;
2482            drop(ws_stream);
2483        });
2484
2485        let restore_result = tokio::time::timeout(
2486            Duration::from_secs(CONCURRENT_CONNECT_WAIT_TIMEOUT_SECS + 2),
2487            {
2488                let client = client.clone();
2489                async move { client.connect_once().await }
2490            },
2491        )
2492        .await
2493        .expect("restore connect_once should complete within the concurrent wait window");
2494
2495        long_connect_task.abort();
2496        server_task.abort();
2497        client.disconnect().await.ok();
2498
2499        assert!(
2500            restore_result.is_ok(),
2501            "network restore should not be blocked by an older connect() backoff; got {restore_result:?}"
2502        );
2503    }
2504
2505    #[tokio::test]
2506    async fn explicit_connect_backoff_reset_restarts_attempt_sequence() {
2507        let reserved_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
2508            .await
2509            .expect("test listener should reserve a local port");
2510        let addr = reserved_listener
2511            .local_addr()
2512            .expect("reserved listener should have local addr");
2513        drop(reserved_listener);
2514
2515        let mut config = make_config();
2516        config.server_url =
2517            Url::parse(&format!("ws://{addr}/signaling/ws")).expect("test URL should parse");
2518        config.connection_timeout = 1;
2519        config.reconnect_config = ReconnectConfig {
2520            enabled: true,
2521            max_attempts: 10,
2522            initial_delay: 30,
2523            max_delay: 30,
2524            backoff_multiplier: 1.0,
2525        };
2526        let client = make_ws_client(config);
2527
2528        let (attempt_tx, mut attempt_rx) = tokio::sync::mpsc::unbounded_channel();
2529        let hook_callback: HookCallback = Arc::new(move |event| {
2530            let attempt_tx = attempt_tx.clone();
2531            Box::pin(async move {
2532                if let HookEvent::SignalingConnectStart { attempt } = event {
2533                    let _ = attempt_tx.send(attempt);
2534                }
2535            })
2536        });
2537        client.set_hook_callback(hook_callback);
2538
2539        let connect_task = {
2540            let client = client.clone();
2541            tokio::spawn(async move { client.connect().await })
2542        };
2543
2544        assert_eq!(
2545            tokio::time::timeout(Duration::from_secs(1), attempt_rx.recv())
2546                .await
2547                .expect("connect should publish attempt 1"),
2548            Some(1)
2549        );
2550        assert_eq!(
2551            tokio::time::timeout(Duration::from_secs(2), attempt_rx.recv())
2552                .await
2553                .expect("connect should enter first backoff as attempt 2"),
2554            Some(2)
2555        );
2556
2557        client.schedule_auto_reconnect_reset_backoff();
2558
2559        assert_eq!(
2560            tokio::time::timeout(Duration::from_secs(2), attempt_rx.recv())
2561                .await
2562                .expect("reset should restart explicit connect attempts"),
2563            Some(1),
2564            "network recovery reset should restart explicit connect() backoff from attempt 1"
2565        );
2566
2567        connect_task.abort();
2568        client.disconnect().await.ok();
2569    }
2570
2571    #[tokio::test]
2572    async fn test_publish_disconnected_transition_fires_hook_once() {
2573        let stats = Arc::new(AtomicSignalingStats::default());
2574        let (event_tx, mut event_rx) = broadcast::channel(4);
2575        let hook_count = Arc::new(AtomicUsize::new(0));
2576        let hook_count_for_cb = hook_count.clone();
2577        let hook_callback: HookCallback = Arc::new(move |event| {
2578            let hook_count = hook_count_for_cb.clone();
2579            Box::pin(async move {
2580                if matches!(event, HookEvent::SignalingDisconnected) {
2581                    hook_count.fetch_add(1, UsizeOrdering::SeqCst);
2582                }
2583            }) as Pin<Box<dyn Future<Output = ()> + Send>>
2584        });
2585
2586        let first = WebSocketSignalingClient::publish_disconnected_transition(
2587            true,
2588            &stats,
2589            &event_tx,
2590            Some(hook_callback.clone()),
2591            DisconnectReason::StreamEnded,
2592            None,
2593        )
2594        .await;
2595        assert!(
2596            first,
2597            "first connected->disconnected transition should publish"
2598        );
2599        assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2600        assert_eq!(stats.snapshot().disconnections, 1);
2601        assert!(matches!(
2602            event_rx.recv().await,
2603            Ok(SignalingEvent::Disconnected {
2604                reason: DisconnectReason::StreamEnded
2605            })
2606        ));
2607
2608        let second = WebSocketSignalingClient::publish_disconnected_transition(
2609            false,
2610            &stats,
2611            &event_tx,
2612            Some(hook_callback),
2613            DisconnectReason::PongTimeout,
2614            None,
2615        )
2616        .await;
2617        assert!(
2618            !second,
2619            "stale duplicate disconnected transition should be ignored"
2620        );
2621        assert_eq!(hook_count.load(UsizeOrdering::SeqCst), 1);
2622        assert_eq!(stats.snapshot().disconnections, 1);
2623        assert!(event_rx.try_recv().is_err());
2624    }
2625
2626    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2627    // 1. Configuration defaults
2628    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2629
2630    #[test]
2631    fn test_reconnect_config_defaults() {
2632        let cfg = ReconnectConfig::default();
2633        assert!(cfg.enabled);
2634        assert_eq!(cfg.max_attempts, 10);
2635        assert_eq!(cfg.initial_delay, 1);
2636        assert_eq!(cfg.max_delay, 60);
2637        assert!((cfg.backoff_multiplier - 2.0).abs() < f64::EPSILON);
2638    }
2639
2640    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2641    // 2. Initial state
2642    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2643
2644    #[test]
2645    fn test_websocket_signaling_client_initial_state_disconnected() {
2646        let client = WebSocketSignalingClient::new(make_config());
2647        assert!(
2648            !client.is_connected(),
2649            "newly created client should be Disconnected"
2650        );
2651        assert!(
2652            !client.connecting.load(Ordering::Acquire),
2653            "newly created client should not be in connecting state"
2654        );
2655        assert!(
2656            !client.reconnector_started.load(Ordering::Acquire),
2657            "reconnect manager should not be started automatically"
2658        );
2659    }
2660
2661    #[test]
2662    fn test_initial_stats_are_zero() {
2663        let client = WebSocketSignalingClient::new(make_config());
2664        let stats = client.get_stats();
2665        assert_eq!(stats.connections, 0);
2666        assert_eq!(stats.disconnections, 0);
2667        assert_eq!(stats.messages_sent, 0);
2668        assert_eq!(stats.messages_received, 0);
2669        assert_eq!(stats.errors, 0);
2670    }
2671
2672    #[test]
2673    fn test_signaling_url_log_redacts_credential_query_params() {
2674        let url = Url::parse(
2675            "wss://example.com/signaling?actor_id=abc&key_id=7&claims=claims-value&signature=signature-value&token=token-value",
2676        )
2677        .unwrap();
2678
2679        let redacted = WebSocketSignalingClient::redact_signaling_url_for_log(&url);
2680
2681        assert!(redacted.contains("actor_id=abc"));
2682        assert!(redacted.contains("key_id=7"));
2683        assert!(redacted.contains("claims=REDACTED"));
2684        assert!(redacted.contains("signature=REDACTED"));
2685        assert!(redacted.contains("token=REDACTED"));
2686        assert!(!redacted.contains("claims-value"));
2687        assert!(!redacted.contains("signature-value"));
2688        assert!(!redacted.contains("token-value"));
2689    }
2690
2691    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2692    // 3. Reconnect manager idempotency
2693    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2694
2695    #[tokio::test]
2696    async fn test_reconnect_manager_idempotent() {
2697        let client = make_ws_client(make_config());
2698
2699        // First start should succeed
2700        client.start_reconnect_manager();
2701        assert!(
2702            client.reconnector_started.load(Ordering::Acquire),
2703            "reconnector_started should be true after first call"
2704        );
2705
2706        // Second call should not start a new manager (CAS fails)
2707        client.start_reconnect_manager();
2708        // Multiple managers would cause flaky tests due to duplicate reconnections; mainly verify the flag
2709        assert!(client.reconnector_started.load(Ordering::Acquire));
2710    }
2711
2712    #[tokio::test]
2713    async fn test_reconnect_manager_disabled_when_config_disabled() {
2714        let mut config = make_config();
2715        config.reconnect_config.enabled = false;
2716        let client = make_ws_client(config);
2717
2718        client.start_reconnect_manager();
2719        assert!(
2720            !client.reconnector_started.load(Ordering::Acquire),
2721            "reconnect manager should not start when reconnect config is disabled"
2722        );
2723    }
2724
2725    #[tokio::test]
2726    async fn test_reconnect_manager_does_not_keep_client_alive() {
2727        let client = make_ws_client(make_config());
2728        let weak = Arc::downgrade(&client);
2729
2730        client.start_reconnect_manager();
2731        drop(client);
2732
2733        assert!(
2734            weak.upgrade().is_none(),
2735            "reconnect manager must not keep signaling client alive after owner drop"
2736        );
2737    }
2738
2739    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2740    // 4. connect() concurrency exclusion
2741    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2742
2743    #[tokio::test]
2744    async fn test_connect_fast_path_when_already_connected() {
2745        let client = make_ws_client(make_config());
2746        // Manually set as connected
2747        client.connected.store(true, Ordering::Release);
2748
2749        // connect() should return Ok immediately without establishing a new connection
2750        let result = client.connect().await;
2751        assert!(
2752            result.is_ok(),
2753            "connect() should return Ok when already connected"
2754        );
2755        // Should not change connecting flag
2756        assert!(!client.connecting.load(Ordering::Acquire));
2757    }
2758
2759    #[tokio::test]
2760    async fn test_connect_sets_connecting_flag() {
2761        let mut config = make_config();
2762        config.reconnect_config.enabled = false; // disable retry, fail fast
2763        config.connection_timeout = 1;
2764        let client = make_ws_client(config);
2765
2766        // Connection will fail (unreachable address), but should properly clean up connecting flag
2767        let result = client.connect().await;
2768        assert!(
2769            result.is_err(),
2770            "connecting to unreachable address should fail"
2771        );
2772        assert!(
2773            !client.connecting.load(Ordering::Acquire),
2774            "connecting flag should be cleared after connection failure"
2775        );
2776    }
2777
2778    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2779    // 5. Event broadcast
2780    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2781
2782    #[tokio::test]
2783    async fn test_event_subscribe_receives_events() {
2784        let client = make_ws_client(make_config());
2785        let mut rx = client.subscribe_events();
2786
2787        // Manually send event
2788        let _ = client.event_tx.send(SignalingEvent::Connected);
2789
2790        match tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await {
2791            Ok(Ok(SignalingEvent::Connected)) => {} // expect Connected event
2792            other => panic!("expected Connected event, but got {:?}", other),
2793        }
2794    }
2795
2796    #[tokio::test]
2797    async fn test_disconnect_event_on_connect_failure() {
2798        let mut config = make_config();
2799        config.reconnect_config.enabled = false;
2800        config.connection_timeout = 1;
2801        let client = make_ws_client(config);
2802        let mut rx = client.subscribe_events();
2803
2804        // Connection fails
2805        let _ = client.connect().await;
2806
2807        // Should receive Disconnected(ConnectionFailed) event
2808        match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
2809            Ok(Ok(SignalingEvent::Disconnected {
2810                reason: DisconnectReason::ConnectionFailed(_),
2811            })) => {} // expected
2812            other => panic!(
2813                "expected Disconnected(ConnectionFailed) event, but got {:?}",
2814                other
2815            ),
2816        }
2817    }
2818
2819    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2820    // 6. disconnect() state cleanup
2821    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2822
2823    #[tokio::test]
2824    async fn test_disconnect_clears_connected_flag() {
2825        let client = make_ws_client(make_config());
2826        // Simulate connected state
2827        client.connected.store(true, Ordering::Release);
2828        assert!(client.is_connected());
2829
2830        let result = client.disconnect().await;
2831        assert!(result.is_ok());
2832        assert!(
2833            !client.is_connected(),
2834            "should be Disconnected after disconnect()"
2835        );
2836    }
2837
2838    #[tokio::test]
2839    async fn test_disconnect_increments_disconnection_stat() {
2840        let client = make_ws_client(make_config());
2841        client.connected.store(true, Ordering::Release);
2842
2843        let stats_before = client.get_stats().disconnections;
2844        let _ = client.disconnect().await;
2845        let stats_after = client.get_stats().disconnections;
2846        assert_eq!(
2847            stats_after,
2848            stats_before + 1,
2849            "disconnect() should increment disconnection count"
2850        );
2851    }
2852
2853    #[tokio::test]
2854    async fn test_disconnect_idempotent() {
2855        let client = make_ws_client(make_config());
2856
2857        // Calling disconnect() while not connected should not panic
2858        let r1 = client.disconnect().await;
2859        let r2 = client.disconnect().await;
2860        assert!(r1.is_ok());
2861        assert!(r2.is_ok());
2862        assert!(!client.is_connected());
2863    }
2864
2865    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2866    // 7. Reconnect notify mechanism
2867    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2868
2869    #[tokio::test]
2870    async fn test_reconnect_notify_wakes_waiter() {
2871        let notify = Arc::new(tokio::sync::Notify::new());
2872        let notify_clone = notify.clone();
2873        let woken = Arc::new(AtomicBool::new(false));
2874        let woken_clone = woken.clone();
2875
2876        let handle = tokio::spawn(async move {
2877            notify_clone.notified().await;
2878            woken_clone.store(true, Ordering::Release);
2879        });
2880
2881        // Ensure waiter has registered
2882        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2883        assert!(
2884            !woken.load(Ordering::Acquire),
2885            "should not be woken before notification"
2886        );
2887
2888        // Trigger notification
2889        notify.notify_one();
2890        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2891        assert!(
2892            woken.load(Ordering::Acquire),
2893            "should be woken after notification"
2894        );
2895
2896        handle.abort();
2897    }
2898
2899    #[tokio::test]
2900    async fn test_schedule_auto_reconnect_reenables_after_explicit_disconnect() {
2901        let client = make_ws_client(make_config());
2902
2903        client
2904            .disconnect()
2905            .await
2906            .expect("explicit disconnect should be idempotent");
2907        assert!(
2908            client.auto_reconnect_suppressed.load(Ordering::Acquire),
2909            "explicit disconnect should suppress stale auto-reconnect cycles"
2910        );
2911
2912        client.schedule_auto_reconnect();
2913
2914        assert!(
2915            !client.auto_reconnect_suppressed.load(Ordering::Acquire),
2916            "scheduling a fresh auto-reconnect should clear explicit disconnect suppression"
2917        );
2918    }
2919
2920    #[tokio::test]
2921    async fn test_schedule_auto_reconnect_reset_backoff_restarts_attempt_sequence() {
2922        let mut config = make_config();
2923        config.connection_timeout = 1;
2924        config.reconnect_config = ReconnectConfig {
2925            enabled: true,
2926            max_attempts: 5,
2927            initial_delay: 30,
2928            max_delay: 30,
2929            backoff_multiplier: 1.0,
2930        };
2931        let client = make_ws_client(config);
2932        let mut rx = client.subscribe_events();
2933
2934        let reconnect_client = client.clone();
2935        let reconnect_task = tokio::spawn(async move {
2936            reconnect_client.run_reconnect_cycle().await;
2937        });
2938
2939        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2940            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2941            other => panic!("expected first reconnect attempt, got {other:?}"),
2942        }
2943
2944        match tokio::time::timeout(Duration::from_secs(2), rx.recv()).await {
2945            Ok(Ok(SignalingEvent::Disconnected {
2946                reason: DisconnectReason::ConnectionFailed(_),
2947            })) => {}
2948            other => panic!("expected first reconnect failure, got {other:?}"),
2949        }
2950
2951        client.schedule_auto_reconnect_reset_backoff();
2952
2953        match tokio::time::timeout(Duration::from_secs(2), rx.recv()).await {
2954            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2955            other => panic!("expected reset reconnect attempt to restart at 1, got {other:?}"),
2956        }
2957
2958        client
2959            .disconnect()
2960            .await
2961            .expect("explicit disconnect should stop reconnect cycle");
2962        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2963            .await
2964            .expect("reconnect cycle should stop after explicit disconnect")
2965            .expect("reconnect task should not panic");
2966    }
2967
2968    #[tokio::test]
2969    async fn test_explicit_disconnect_suppresses_reconnect_cycle_in_backoff() {
2970        let mut config = make_config();
2971        config.connection_timeout = 1;
2972        config.reconnect_config = ReconnectConfig {
2973            enabled: true,
2974            max_attempts: 5,
2975            initial_delay: 1,
2976            max_delay: 1,
2977            backoff_multiplier: 1.0,
2978        };
2979        let client = make_ws_client(config);
2980        let mut rx = client.subscribe_events();
2981
2982        let reconnect_client = client.clone();
2983        let reconnect_task = tokio::spawn(async move {
2984            reconnect_client.run_reconnect_cycle().await;
2985        });
2986
2987        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
2988            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
2989            other => panic!("expected first reconnect attempt, got {other:?}"),
2990        }
2991
2992        client
2993            .disconnect()
2994            .await
2995            .expect("explicit disconnect should be idempotent");
2996
2997        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
2998            .await
2999            .expect("suppressed reconnect cycle should exit promptly")
3000            .expect("reconnect task should not panic");
3001
3002        while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
3003        {
3004            if let SignalingEvent::ConnectStart { attempt } = event {
3005                panic!("suppressed reconnect cycle sent unexpected attempt {attempt}");
3006            }
3007        }
3008
3009        assert!(
3010            client.auto_reconnect_suppressed.load(Ordering::Acquire),
3011            "explicit disconnect should suppress stale auto-reconnect cycles"
3012        );
3013    }
3014
3015    #[tokio::test]
3016    async fn test_explicit_disconnect_suppresses_in_flight_auto_reconnect_connected_event() {
3017        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
3018            .await
3019            .expect("test listener should bind");
3020        let server_url = format!(
3021            "ws://{}/signaling/ws",
3022            listener
3023                .local_addr()
3024                .expect("test listener should have local addr")
3025        );
3026        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
3027
3028        let server_task = tokio::spawn(async move {
3029            let (stream, _) = listener
3030                .accept()
3031                .await
3032                .expect("test server should accept tcp connection");
3033            let _ = release_rx.await;
3034            let ws_stream = tokio_tungstenite::accept_async(stream)
3035                .await
3036                .expect("test server should complete websocket handshake");
3037            tokio::time::sleep(Duration::from_millis(100)).await;
3038            drop(ws_stream);
3039        });
3040
3041        let mut config = make_config();
3042        config.server_url = Url::parse(&server_url).expect("test websocket URL should parse");
3043        config.connection_timeout = 5;
3044        config.reconnect_config = ReconnectConfig {
3045            enabled: true,
3046            max_attempts: 3,
3047            initial_delay: 1,
3048            max_delay: 1,
3049            backoff_multiplier: 1.0,
3050        };
3051        let client = make_ws_client(config);
3052        let mut rx = client.subscribe_events();
3053
3054        let reconnect_client = client.clone();
3055        let reconnect_task = tokio::spawn(async move {
3056            reconnect_client.run_reconnect_cycle().await;
3057        });
3058
3059        match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
3060            Ok(Ok(SignalingEvent::ConnectStart { attempt: 1 })) => {}
3061            other => panic!("expected first reconnect attempt, got {other:?}"),
3062        }
3063
3064        client
3065            .disconnect()
3066            .await
3067            .expect("explicit disconnect should cancel the in-flight auto-reconnect");
3068        release_tx
3069            .send(())
3070            .expect("test server handshake should still be waiting");
3071
3072        tokio::time::timeout(Duration::from_secs(2), reconnect_task)
3073            .await
3074            .expect("cancelled in-flight reconnect should exit promptly")
3075            .expect("reconnect task should not panic");
3076
3077        while let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(150), rx.recv()).await
3078        {
3079            assert!(
3080                !matches!(event, SignalingEvent::Connected),
3081                "cancelled auto-reconnect must not publish Connected"
3082            );
3083        }
3084
3085        assert!(
3086            !client.is_connected(),
3087            "cancelled auto-reconnect must not leave signaling connected"
3088        );
3089
3090        tokio::time::timeout(Duration::from_secs(1), server_task)
3091            .await
3092            .expect("test server task should finish")
3093            .expect("test server task should not panic");
3094    }
3095
3096    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3097    // 8. URL construction tests
3098    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3099
3100    #[tokio::test]
3101    async fn test_build_url_without_identity() {
3102        let config = make_config();
3103        let expected_base = config.server_url.to_string();
3104        let client = WebSocketSignalingClient::new(config);
3105
3106        let url = client.build_url_with_identity().await;
3107        assert_eq!(
3108            url.to_string(),
3109            expected_base,
3110            "URL should not contain identity parameters when actor_id is not set"
3111        );
3112    }
3113
3114    #[tokio::test]
3115    async fn test_build_url_with_webrtc_role() {
3116        let mut config = make_config();
3117        config.webrtc_role = Some("answer".to_string());
3118        let client = WebSocketSignalingClient::new(config);
3119
3120        let url = client.build_url_with_identity().await;
3121        assert!(
3122            url.query().unwrap_or("").contains("webrtc_role=answer"),
3123            "URL should contain webrtc_role parameter, actual URL: {}",
3124            url
3125        );
3126    }
3127
3128    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3129    // 9. Inbound channel reset
3130    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3131
3132    #[tokio::test]
3133    async fn test_reset_inbound_channel_creates_fresh_channel() {
3134        let client = WebSocketSignalingClient::new(make_config());
3135
3136        // Get old tx and send a message
3137        {
3138            let tx = client.inbound_tx.lock().await;
3139            let _ = tx.send(SignalingEnvelope::default());
3140        }
3141
3142        // Reset channel
3143        client.reset_inbound_channel().await;
3144
3145        // Old messages should not be visible in the new channel
3146        let mut rx = client.inbound_rx.lock().await;
3147        let result = rx.try_recv();
3148        assert!(
3149            result.is_err(),
3150            "old messages should not be visible in the new channel after reset"
3151        );
3152    }
3153
3154    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3155    // 10. Envelope ID incrementing
3156    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3157
3158    #[tokio::test]
3159    async fn test_envelope_id_monotonically_increasing() {
3160        let client = WebSocketSignalingClient::new(make_config());
3161
3162        let id1 = client.next_envelope_id().await;
3163        let id2 = client.next_envelope_id().await;
3164        let id3 = client.next_envelope_id().await;
3165
3166        assert_eq!(id1, "env-1");
3167        assert_eq!(id2, "env-2");
3168        assert_eq!(id3, "env-3");
3169    }
3170
3171    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3172    // 11. send_envelope should return error when not connected
3173    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3174
3175    #[tokio::test]
3176    async fn test_send_envelope_fails_when_not_connected() {
3177        let client = WebSocketSignalingClient::new(make_config());
3178        let envelope = SignalingEnvelope::default();
3179
3180        let result = client.send_envelope(envelope).await;
3181        assert!(
3182            result.is_err(),
3183            "send_envelope should return error when not connected"
3184        );
3185        match result {
3186            Err(NetworkError::ConnectionError(msg)) => {
3187                assert!(
3188                    msg.contains("not connected") || msg.contains("Not connected"),
3189                    "error message should contain 'not connected', actual: {}",
3190                    msg
3191                );
3192            }
3193            other => panic!("expected ConnectionError, got {:?}", other),
3194        }
3195    }
3196
3197    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3198    // 12. FakeSignalingClient trait implementation verification
3199    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3200
3201    #[tokio::test]
3202    async fn test_fake_client_tracks_connect_calls() {
3203        let client = make_fake_client();
3204        assert_eq!(client.connect_calls.load(UsizeOrdering::SeqCst), 0);
3205
3206        client.connect().await.unwrap();
3207        client.connect().await.unwrap();
3208        client.connect().await.unwrap();
3209
3210        assert_eq!(
3211            client.connect_calls.load(UsizeOrdering::SeqCst),
3212            3,
3213            "FakeSignalingClient should accurately track connect call count"
3214        );
3215    }
3216}