Skip to main content

hyperstack_sdk/
connection.rs

1use crate::auth::{
2    build_websocket_url, hosted_auth_required_error, parse_jwt_expiry, token_is_expiring,
3    token_refresh_delay, AuthConfig, AuthToken, ResolvedAuthStrategy, TokenEndpointRequest,
4    TokenEndpointResponse, TokenTransport, MIN_REFRESH_DELAY_SECONDS,
5};
6use crate::config::ConnectionConfig;
7use crate::error::{HyperStackError, SocketIssue, SocketIssuePayload};
8use crate::frame::{parse_frame, Frame};
9use crate::subscription::{ClientMessage, Subscription, SubscriptionRegistry, Unsubscription};
10use futures_util::{SinkExt, StreamExt};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime, UNIX_EPOCH};
14use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
15use tokio::time::{sleep, Sleep};
16use tokio_tungstenite::{
17    connect_async,
18    tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ConnectionState {
23    Disconnected,
24    Connecting,
25    Connected,
26    Reconnecting { attempt: u32 },
27    Error,
28}
29
30pub enum ConnectionCommand {
31    Subscribe(Subscription),
32    Unsubscribe(Unsubscription),
33    Disconnect,
34}
35
36#[derive(Debug, serde::Deserialize)]
37struct RefreshAuthResponseMessage {
38    success: bool,
39    error: Option<String>,
40    expires_at: Option<u64>,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct SubscriptionOptions {
45    pub take: Option<u32>,
46    pub skip: Option<u32>,
47    pub with_snapshot: Option<bool>,
48    pub after: Option<String>,
49    pub snapshot_limit: Option<usize>,
50}
51
52struct ConnectionManagerInner {
53    #[allow(dead_code)]
54    url: String,
55    state: Arc<RwLock<ConnectionState>>,
56    subscriptions: Arc<RwLock<SubscriptionRegistry>>,
57    #[allow(dead_code)]
58    config: ConnectionConfig,
59    command_tx: mpsc::Sender<ConnectionCommand>,
60    last_error: Arc<RwLock<Option<Arc<HyperStackError>>>>,
61    last_socket_issue: Arc<RwLock<Option<SocketIssue>>>,
62    socket_issue_tx: broadcast::Sender<SocketIssue>,
63}
64
65#[derive(Clone)]
66pub struct ConnectionManager {
67    inner: Arc<ConnectionManagerInner>,
68}
69
70impl ConnectionManager {
71    pub async fn new(
72        url: String,
73        config: ConnectionConfig,
74        frame_tx: mpsc::Sender<Frame>,
75    ) -> Result<Self, HyperStackError> {
76        let (command_tx, command_rx) = mpsc::channel(100);
77        let (initial_connect_tx, initial_connect_rx) = oneshot::channel();
78        let state = Arc::new(RwLock::new(ConnectionState::Disconnected));
79        let subscriptions = Arc::new(RwLock::new(SubscriptionRegistry::new()));
80        let last_error = Arc::new(RwLock::new(None));
81        let last_socket_issue = Arc::new(RwLock::new(None));
82        let (socket_issue_tx, _) = broadcast::channel(100);
83
84        let inner = ConnectionManagerInner {
85            url: url.clone(),
86            state: state.clone(),
87            subscriptions: subscriptions.clone(),
88            config: config.clone(),
89            command_tx,
90            last_error: last_error.clone(),
91            last_socket_issue: last_socket_issue.clone(),
92            socket_issue_tx: socket_issue_tx.clone(),
93        };
94
95        spawn_connection_loop(
96            url,
97            state,
98            subscriptions,
99            config,
100            frame_tx,
101            command_rx,
102            last_error,
103            last_socket_issue,
104            socket_issue_tx,
105            initial_connect_tx,
106        );
107
108        let manager = Self {
109            inner: Arc::new(inner),
110        };
111
112        match initial_connect_rx.await {
113            Ok(Ok(())) => Ok(manager),
114            Ok(Err(error)) => Err(error),
115            Err(_) => Err(HyperStackError::ConnectionFailed(
116                "Connection task ended before initial connect completed".to_string(),
117            )),
118        }
119    }
120
121    pub async fn state(&self) -> ConnectionState {
122        *self.inner.state.read().await
123    }
124
125    pub async fn last_error(&self) -> Option<Arc<HyperStackError>> {
126        self.inner.last_error.read().await.clone()
127    }
128
129    pub async fn last_socket_issue(&self) -> Option<SocketIssue> {
130        self.inner.last_socket_issue.read().await.clone()
131    }
132
133    pub fn subscribe_socket_issues(&self) -> broadcast::Receiver<SocketIssue> {
134        self.inner.socket_issue_tx.subscribe()
135    }
136
137    pub async fn ensure_subscription(&self, view: &str, key: Option<&str>) {
138        self.ensure_subscription_with_opts(view, key, SubscriptionOptions::default())
139            .await
140    }
141
142    pub async fn ensure_subscription_with_opts(
143        &self,
144        view: &str,
145        key: Option<&str>,
146        opts: SubscriptionOptions,
147    ) {
148        let sub = Subscription {
149            view: view.to_string(),
150            key: key.map(|s| s.to_string()),
151            partition: None,
152            filters: None,
153            take: opts.take,
154            skip: opts.skip,
155            with_snapshot: opts.with_snapshot,
156            after: opts.after,
157            snapshot_limit: opts.snapshot_limit,
158        };
159
160        if !self.inner.subscriptions.read().await.contains(&sub) {
161            let _ = self
162                .inner
163                .command_tx
164                .send(ConnectionCommand::Subscribe(sub))
165                .await;
166        }
167    }
168
169    pub async fn subscribe(&self, sub: Subscription) {
170        let _ = self
171            .inner
172            .command_tx
173            .send(ConnectionCommand::Subscribe(sub))
174            .await;
175    }
176
177    pub async fn unsubscribe(&self, unsub: Unsubscription) {
178        let _ = self
179            .inner
180            .command_tx
181            .send(ConnectionCommand::Unsubscribe(unsub))
182            .await;
183    }
184
185    pub async fn disconnect(&self) {
186        let _ = self
187            .inner
188            .command_tx
189            .send(ConnectionCommand::Disconnect)
190            .await;
191    }
192}
193
194struct RuntimeAuthState {
195    websocket_url: String,
196    config: Option<AuthConfig>,
197    current_token: Option<String>,
198    token_expiry: Option<u64>,
199    http_client: reqwest::Client,
200}
201
202impl RuntimeAuthState {
203    fn new(websocket_url: String, config: Option<AuthConfig>) -> Self {
204        Self {
205            websocket_url,
206            config,
207            current_token: None,
208            token_expiry: None,
209            http_client: reqwest::Client::new(),
210        }
211    }
212
213    fn token_transport(&self) -> TokenTransport {
214        self.config
215            .as_ref()
216            .map(|config| config.token_transport)
217            .unwrap_or_default()
218    }
219
220    fn has_refreshable_auth(&self) -> bool {
221        self.config
222            .as_ref()
223            .is_some_and(|config| config.has_refreshable_auth(&self.websocket_url))
224    }
225
226    fn clear_cached_token(&mut self) {
227        self.current_token = None;
228        self.token_expiry = None;
229    }
230
231    fn refresh_timer(&self) -> Option<Pin<Box<Sleep>>> {
232        let delay = token_refresh_delay(self.token_expiry, current_unix_timestamp())?;
233        Some(Box::pin(sleep(Duration::from_secs(delay))))
234    }
235
236    async fn resolve_token(
237        &mut self,
238        force_refresh: bool,
239    ) -> Result<Option<String>, HyperStackError> {
240        if !force_refresh {
241            if let Some(token) = self.current_token.clone() {
242                if !token_is_expiring(self.token_expiry, current_unix_timestamp()) {
243                    return Ok(Some(token));
244                }
245            }
246        }
247
248        let Some(config) = self.config.as_ref() else {
249            if crate::auth::is_hosted_hyperstack_websocket_url(&self.websocket_url) {
250                return Err(hosted_auth_required_error());
251            }
252            return Ok(None);
253        };
254
255        let strategy = config.resolve_strategy(&self.websocket_url);
256        match strategy {
257            ResolvedAuthStrategy::None => {
258                if crate::auth::is_hosted_hyperstack_websocket_url(&self.websocket_url) {
259                    Err(hosted_auth_required_error())
260                } else {
261                    Ok(None)
262                }
263            }
264            ResolvedAuthStrategy::StaticToken(token) => {
265                self.set_token(AuthToken::new(token)).map(Some)
266            }
267            ResolvedAuthStrategy::TokenProvider(provider) => {
268                let token = provider().await?;
269                self.set_token(token).map(Some)
270            }
271            ResolvedAuthStrategy::TokenEndpoint(endpoint) => {
272                let token = self.fetch_token_from_endpoint(&endpoint).await?;
273                self.set_token(token).map(Some)
274            }
275        }
276    }
277
278    fn set_token(&mut self, token: AuthToken) -> Result<String, HyperStackError> {
279        let token_value = token.token.trim().to_string();
280        if token_value.is_empty() {
281            return Err(HyperStackError::WebSocket {
282                message: "Authentication provider returned an empty token".to_string(),
283                code: None,
284            });
285        }
286
287        let expires_at = token.expires_at.or_else(|| parse_jwt_expiry(&token_value));
288        if expires_at.is_some() && token_is_expiring(expires_at, current_unix_timestamp()) {
289            return Err(HyperStackError::WebSocket {
290                message: "Authentication token is expired".to_string(),
291                code: Some(crate::error::AuthErrorCode::TokenExpired),
292            });
293        }
294
295        self.current_token = Some(token_value.clone());
296        self.token_expiry = expires_at;
297        Ok(token_value)
298    }
299
300    async fn fetch_token_from_endpoint(
301        &self,
302        token_endpoint: &str,
303    ) -> Result<AuthToken, HyperStackError> {
304        let mut request = self
305            .http_client
306            .post(token_endpoint)
307            .json(&TokenEndpointRequest {
308                websocket_url: &self.websocket_url,
309            });
310
311        if let Some(config) = self.config.as_ref() {
312            if let Some(publishable_key) = config.publishable_key.as_ref() {
313                request = request.header("Authorization", format!("Bearer {}", publishable_key));
314            }
315
316            for (key, value) in &config.token_endpoint_headers {
317                request = request.header(key, value);
318            }
319        }
320
321        let response = request.send().await.map_err(|error| {
322            HyperStackError::ConnectionFailed(format!("Token endpoint request failed: {error}"))
323        })?;
324        let status = response.status();
325        let header_code = response
326            .headers()
327            .get("X-Error-Code")
328            .and_then(|value| value.to_str().ok())
329            .map(str::to_string);
330        let fallback_message = status.canonical_reason().map(str::to_string);
331        let body = response.bytes().await.map_err(|error| {
332            HyperStackError::ConnectionFailed(format!(
333                "Failed to read token endpoint response: {error}"
334            ))
335        })?;
336
337        if !status.is_success() {
338            return Err(HyperStackError::from_auth_response(
339                status.as_u16(),
340                header_code.as_deref(),
341                Some(body.as_ref()),
342                fallback_message.as_deref(),
343            ));
344        }
345
346        let response: TokenEndpointResponse = serde_json::from_slice(body.as_ref())?;
347        let token = response.into_auth_token();
348        if token.token.trim().is_empty() {
349            return Err(HyperStackError::WebSocket {
350                message: "Token endpoint did not return a token".to_string(),
351                code: None,
352            });
353        }
354
355        Ok(token)
356    }
357
358    fn build_request(
359        &self,
360        token: Option<&str>,
361    ) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, HyperStackError> {
362        let url = build_websocket_url(&self.websocket_url, token, self.token_transport())?;
363        let mut request = url
364            .into_client_request()
365            .map_err(|error| HyperStackError::ConnectionFailed(error.to_string()))?;
366
367        if self.token_transport() == TokenTransport::Bearer {
368            if let Some(token) = token {
369                let header_value = HeaderValue::from_str(&format!("Bearer {token}"))
370                    .map_err(|error| HyperStackError::ConnectionFailed(error.to_string()))?;
371                request.headers_mut().insert("Authorization", header_value);
372            }
373        }
374
375        Ok(request)
376    }
377}
378
379#[allow(clippy::too_many_arguments)]
380fn spawn_connection_loop(
381    url: String,
382    state: Arc<RwLock<ConnectionState>>,
383    subscriptions: Arc<RwLock<SubscriptionRegistry>>,
384    config: ConnectionConfig,
385    frame_tx: mpsc::Sender<Frame>,
386    mut command_rx: mpsc::Receiver<ConnectionCommand>,
387    last_error: Arc<RwLock<Option<Arc<HyperStackError>>>>,
388    last_socket_issue: Arc<RwLock<Option<SocketIssue>>>,
389    socket_issue_tx: broadcast::Sender<SocketIssue>,
390    initial_connect_tx: oneshot::Sender<Result<(), HyperStackError>>,
391) {
392    tokio::spawn(async move {
393        let mut auth_state = RuntimeAuthState::new(url.clone(), config.auth.clone());
394        let mut reconnect_attempt: u32 = 0;
395        let mut should_run = true;
396        let mut initial_connect_tx = Some(initial_connect_tx);
397        let mut force_token_refresh = false;
398        let mut immediate_reconnect = false;
399
400        while should_run {
401            *state.write().await = ConnectionState::Connecting;
402
403            let token = match auth_state.resolve_token(force_token_refresh).await {
404                Ok(token) => {
405                    force_token_refresh = false;
406                    token
407                }
408                Err(error) => {
409                    set_last_error(&last_error, error.clone()).await;
410                    *state.write().await = ConnectionState::Error;
411                    report_initial_failure(&mut initial_connect_tx, error);
412                    break;
413                }
414            };
415
416            let request = match auth_state.build_request(token.as_deref()) {
417                Ok(request) => request,
418                Err(error) => {
419                    set_last_error(&last_error, error.clone()).await;
420                    *state.write().await = ConnectionState::Error;
421                    report_initial_failure(&mut initial_connect_tx, error);
422                    break;
423                }
424            };
425
426            match connect_async(request).await {
427                Ok((ws, _)) => {
428                    clear_last_error(&last_error).await;
429                    *last_socket_issue.write().await = None;
430                    *state.write().await = ConnectionState::Connected;
431                    reconnect_attempt = 0;
432                    immediate_reconnect = false;
433                    report_initial_success(&mut initial_connect_tx);
434
435                    let (mut ws_tx, mut ws_rx) = ws.split();
436                    let subs = subscriptions.read().await.all();
437                    for sub in subs {
438                        let client_msg = ClientMessage::Subscribe(sub);
439                        if let Ok(msg) = serde_json::to_string(&client_msg) {
440                            let _ = ws_tx.send(Message::Text(msg)).await;
441                        }
442                    }
443
444                    let ping_interval = config.ping_interval;
445                    let mut ping_timer = tokio::time::interval(ping_interval);
446                    let mut refresh_timer = auth_state.refresh_timer();
447
448                    loop {
449                        tokio::select! {
450                            msg = ws_rx.next() => {
451                                match msg {
452                                    Some(Ok(Message::Binary(bytes))) => {
453                                        if let Ok(frame) = parse_frame(&bytes) {
454                                            let _ = frame_tx.send(frame).await;
455                                        }
456                                    }
457                                    Some(Ok(Message::Text(text))) => {
458                                        if let Some(issue) = parse_socket_issue_message(&text) {
459                                            record_socket_issue(&last_socket_issue, &socket_issue_tx, issue.clone()).await;
460
461                                            let error = HyperStackError::from_socket_issue(issue);
462                                            if error.should_refresh_token() && auth_state.has_refreshable_auth() {
463                                                auth_state.clear_cached_token();
464                                                force_token_refresh = true;
465                                                immediate_reconnect = true;
466                                            }
467
468                                            let is_fatal = error
469                                                .socket_issue()
470                                                .map(|issue| issue.fatal)
471                                                .unwrap_or(false);
472                                            set_last_error(&last_error, error).await;
473
474                                            if is_fatal {
475                                                break;
476                                            }
477                                        } else if let Some(refresh_response) = parse_refresh_auth_response(&text) {
478                                            if refresh_response.success {
479                                                if let Some(expires_at) = refresh_response.expires_at {
480                                                    auth_state.token_expiry = Some(expires_at);
481                                                }
482                                                refresh_timer = auth_state.refresh_timer();
483                                            } else {
484                                                let error = refresh_response_error(refresh_response);
485                                                if error.should_refresh_token() && auth_state.has_refreshable_auth() {
486                                                    auth_state.clear_cached_token();
487                                                    force_token_refresh = true;
488                                                }
489                                                immediate_reconnect = true;
490                                                set_last_error(&last_error, error).await;
491                                                break;
492                                            }
493                                        } else if let Ok(frame) = serde_json::from_str::<Frame>(&text) {
494                                            let _ = frame_tx.send(frame).await;
495                                        }
496                                    }
497                                    Some(Ok(Message::Ping(payload))) => {
498                                        let _ = ws_tx.send(Message::Pong(payload)).await;
499                                    }
500                                    Some(Ok(Message::Close(frame))) => {
501                                        if let Some(frame) = frame.as_ref() {
502                                            let reason = frame.reason.to_string();
503                                            if let Some(error) = HyperStackError::from_close_reason(&reason) {
504                                                if error.should_refresh_token() && auth_state.has_refreshable_auth() {
505                                                    auth_state.clear_cached_token();
506                                                    force_token_refresh = true;
507                                                    immediate_reconnect = true;
508                                                }
509                                                set_last_error(&last_error, error).await;
510                                            }
511                                        }
512                                        break;
513                                    }
514                                    Some(Err(error)) => {
515                                        let parsed_error = HyperStackError::from_tungstenite(error);
516                                        if parsed_error.should_refresh_token() && auth_state.has_refreshable_auth() {
517                                            auth_state.clear_cached_token();
518                                            force_token_refresh = true;
519                                            immediate_reconnect = true;
520                                        }
521                                        set_last_error(&last_error, parsed_error).await;
522                                        break;
523                                    }
524                                    None => {
525                                        break;
526                                    }
527                                    _ => {}
528                                }
529                            }
530                            cmd = command_rx.recv() => {
531                                match cmd {
532                                    Some(ConnectionCommand::Subscribe(sub)) => {
533                                        subscriptions.write().await.add(sub.clone());
534                                        let client_msg = ClientMessage::Subscribe(sub);
535                                        if let Ok(msg) = serde_json::to_string(&client_msg) {
536                                            let _ = ws_tx.send(Message::Text(msg)).await;
537                                        }
538                                    }
539                                    Some(ConnectionCommand::Unsubscribe(unsub)) => {
540                                        let sub = Subscription {
541                                            view: unsub.view.clone(),
542                                            key: unsub.key.clone(),
543                                            partition: None,
544                                            filters: None,
545                                            take: None,
546                                            skip: None,
547                                            with_snapshot: None,
548                                            after: None,
549                                            snapshot_limit: None,
550                                        };
551                                        subscriptions.write().await.remove(&sub);
552                                        let client_msg = ClientMessage::Unsubscribe(unsub);
553                                        if let Ok(msg) = serde_json::to_string(&client_msg) {
554                                            let _ = ws_tx.send(Message::Text(msg)).await;
555                                        }
556                                    }
557                                    Some(ConnectionCommand::Disconnect) => {
558                                        let _ = ws_tx.close().await;
559                                        *state.write().await = ConnectionState::Disconnected;
560                                        should_run = false;
561                                        break;
562                                    }
563                                    None => {
564                                        should_run = false;
565                                        break;
566                                    }
567                                }
568                            }
569                            _ = ping_timer.tick() => {
570                                if let Ok(msg) = serde_json::to_string(&ClientMessage::Ping) {
571                                    let _ = ws_tx.send(Message::Text(msg)).await;
572                                }
573                            }
574                            _ = wait_for_refresh_timer(&mut refresh_timer) => {
575                                let previous_token = auth_state.current_token.clone();
576                                match auth_state.resolve_token(true).await {
577                                    Ok(Some(token)) => {
578                                        refresh_timer = auth_state.refresh_timer();
579                                        if previous_token.as_deref() != Some(token.as_str()) {
580                                            match serde_json::to_string(&ClientMessage::RefreshAuth { token }) {
581                                                Ok(message) => {
582                                                    if ws_tx.send(Message::Text(message)).await.is_err() {
583                                                        immediate_reconnect = true;
584                                                        break;
585                                                    }
586                                                }
587                                                Err(error) => {
588                                                    tracing::warn!("Failed to serialize auth refresh message: {}", error);
589                                                    refresh_timer = Some(Box::pin(sleep(Duration::from_secs(MIN_REFRESH_DELAY_SECONDS))));
590                                                }
591                                            }
592                                        }
593                                    }
594                                    Ok(None) => {
595                                        refresh_timer = None;
596                                    }
597                                    Err(error) => {
598                                        tracing::warn!("Failed to refresh auth token in background: {}", error);
599                                        refresh_timer = Some(Box::pin(sleep(Duration::from_secs(MIN_REFRESH_DELAY_SECONDS))));
600                                    }
601                                }
602                            }
603                        }
604                    }
605                }
606                Err(error) => {
607                    let parsed_error = HyperStackError::from_tungstenite(error);
608                    if parsed_error.should_refresh_token() && auth_state.has_refreshable_auth() {
609                        auth_state.clear_cached_token();
610                        force_token_refresh = true;
611                        immediate_reconnect = true;
612                    }
613                    tracing::error!("Connection failed: {}", parsed_error);
614                    set_last_error(&last_error, parsed_error).await;
615                }
616            }
617
618            if !should_run {
619                break;
620            }
621
622            let latest_error = last_error.read().await.clone();
623            if let Some(error) = latest_error.as_deref() {
624                if error.should_refresh_token() && auth_state.has_refreshable_auth() {
625                    auth_state.clear_cached_token();
626                    force_token_refresh = true;
627                    immediate_reconnect = true;
628                } else if !error.should_retry() {
629                    *state.write().await = ConnectionState::Error;
630                    report_initial_failure(&mut initial_connect_tx, error.clone());
631                    break;
632                }
633            }
634
635            if !config.auto_reconnect {
636                *state.write().await = ConnectionState::Error;
637                let error = latest_error
638                    .as_deref()
639                    .cloned()
640                    .unwrap_or(HyperStackError::ConnectionClosed);
641                report_initial_failure(&mut initial_connect_tx, error);
642                break;
643            }
644
645            if reconnect_attempt >= config.max_reconnect_attempts {
646                *state.write().await = ConnectionState::Error;
647                let error = latest_error.as_deref().cloned().unwrap_or(
648                    HyperStackError::MaxReconnectAttempts(config.max_reconnect_attempts),
649                );
650                set_last_error(&last_error, error.clone()).await;
651                report_initial_failure(&mut initial_connect_tx, error);
652                break;
653            }
654
655            let delay = if immediate_reconnect {
656                Duration::from_millis(0)
657            } else {
658                config
659                    .reconnect_intervals
660                    .get(reconnect_attempt as usize)
661                    .copied()
662                    .unwrap_or_else(|| {
663                        config
664                            .reconnect_intervals
665                            .last()
666                            .copied()
667                            .unwrap_or(Duration::from_secs(16))
668                    })
669            };
670
671            *state.write().await = ConnectionState::Reconnecting {
672                attempt: reconnect_attempt,
673            };
674            reconnect_attempt += 1;
675
676            if !delay.is_zero() {
677                tracing::info!(
678                    "Reconnecting in {:?} (attempt {})",
679                    delay,
680                    reconnect_attempt
681                );
682                sleep(delay).await;
683            }
684        }
685
686        if let Some(tx) = initial_connect_tx.take() {
687            let error = last_error
688                .read()
689                .await
690                .as_deref()
691                .cloned()
692                .unwrap_or(HyperStackError::ConnectionClosed);
693            let _ = tx.send(Err(error));
694        }
695    });
696}
697
698async fn set_last_error(
699    last_error: &Arc<RwLock<Option<Arc<HyperStackError>>>>,
700    error: HyperStackError,
701) {
702    *last_error.write().await = Some(Arc::new(error));
703}
704
705async fn clear_last_error(last_error: &Arc<RwLock<Option<Arc<HyperStackError>>>>) {
706    *last_error.write().await = None;
707}
708
709async fn record_socket_issue(
710    last_socket_issue: &Arc<RwLock<Option<SocketIssue>>>,
711    socket_issue_tx: &broadcast::Sender<SocketIssue>,
712    issue: SocketIssue,
713) {
714    *last_socket_issue.write().await = Some(issue.clone());
715    let _ = socket_issue_tx.send(issue);
716}
717
718async fn wait_for_refresh_timer(timer: &mut Option<Pin<Box<Sleep>>>) {
719    if let Some(timer) = timer.as_mut() {
720        timer.as_mut().await;
721    } else {
722        futures_util::future::pending::<()>().await;
723    }
724}
725
726fn report_initial_success(
727    initial_connect_tx: &mut Option<oneshot::Sender<Result<(), HyperStackError>>>,
728) {
729    if let Some(tx) = initial_connect_tx.take() {
730        let _ = tx.send(Ok(()));
731    }
732}
733
734fn report_initial_failure(
735    initial_connect_tx: &mut Option<oneshot::Sender<Result<(), HyperStackError>>>,
736    error: HyperStackError,
737) {
738    if let Some(tx) = initial_connect_tx.take() {
739        let _ = tx.send(Err(error));
740    }
741}
742
743fn current_unix_timestamp() -> u64 {
744    SystemTime::now()
745        .duration_since(UNIX_EPOCH)
746        .unwrap_or_default()
747        .as_secs()
748}
749
750fn parse_socket_issue_message(text: &str) -> Option<SocketIssue> {
751    let payload = serde_json::from_str::<SocketIssuePayload>(text).ok()?;
752    if payload.is_socket_issue() {
753        Some(payload.into_socket_issue())
754    } else {
755        None
756    }
757}
758
759fn parse_refresh_auth_response(text: &str) -> Option<RefreshAuthResponseMessage> {
760    let payload = serde_json::from_str::<RefreshAuthResponseMessage>(text).ok()?;
761    Some(payload)
762}
763
764fn refresh_response_error(response: RefreshAuthResponseMessage) -> HyperStackError {
765    let code = response
766        .error
767        .as_deref()
768        .and_then(crate::error::AuthErrorCode::from_wire);
769    let message = response
770        .error
771        .unwrap_or_else(|| "Authentication refresh failed".to_string());
772
773    HyperStackError::WebSocket { message, code }
774}