1use crate::auth::{
2 build_websocket_url, hosted_auth_required_error, parse_jwt_expiry, token_is_expiring,
3 token_refresh_delay, AuthConfig, AuthToken, ResolvedAuthStrategy, TokenEndpointRequest,
4 TokenEndpointResponse, TokenTransport, MIN_REFRESH_DELAY_SECONDS,
5};
6use crate::config::ConnectionConfig;
7use crate::error::{HyperStackError, SocketIssue, SocketIssuePayload};
8use crate::frame::{parse_frame, Frame};
9use crate::subscription::{ClientMessage, Subscription, SubscriptionRegistry, Unsubscription};
10use futures_util::{SinkExt, StreamExt};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime, UNIX_EPOCH};
14use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
15use tokio::time::{sleep, Sleep};
16use tokio_tungstenite::{
17 connect_async,
18 tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ConnectionState {
23 Disconnected,
24 Connecting,
25 Connected,
26 Reconnecting { attempt: u32 },
27 Error,
28}
29
30pub enum ConnectionCommand {
31 Subscribe(Subscription),
32 Unsubscribe(Unsubscription),
33 Disconnect,
34}
35
36#[derive(Debug, serde::Deserialize)]
37struct RefreshAuthResponseMessage {
38 success: bool,
39 error: Option<String>,
40 expires_at: Option<u64>,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct SubscriptionOptions {
45 pub take: Option<u32>,
46 pub skip: Option<u32>,
47 pub with_snapshot: Option<bool>,
48 pub after: Option<String>,
49 pub snapshot_limit: Option<usize>,
50}
51
52struct ConnectionManagerInner {
53 #[allow(dead_code)]
54 url: String,
55 state: Arc<RwLock<ConnectionState>>,
56 subscriptions: Arc<RwLock<SubscriptionRegistry>>,
57 #[allow(dead_code)]
58 config: ConnectionConfig,
59 command_tx: mpsc::Sender<ConnectionCommand>,
60 last_error: Arc<RwLock<Option<Arc<HyperStackError>>>>,
61 last_socket_issue: Arc<RwLock<Option<SocketIssue>>>,
62 socket_issue_tx: broadcast::Sender<SocketIssue>,
63}
64
65#[derive(Clone)]
66pub struct ConnectionManager {
67 inner: Arc<ConnectionManagerInner>,
68}
69
70impl ConnectionManager {
71 pub async fn new(
72 url: String,
73 config: ConnectionConfig,
74 frame_tx: mpsc::Sender<Frame>,
75 ) -> Result<Self, HyperStackError> {
76 let (command_tx, command_rx) = mpsc::channel(100);
77 let (initial_connect_tx, initial_connect_rx) = oneshot::channel();
78 let state = Arc::new(RwLock::new(ConnectionState::Disconnected));
79 let subscriptions = Arc::new(RwLock::new(SubscriptionRegistry::new()));
80 let last_error = Arc::new(RwLock::new(None));
81 let last_socket_issue = Arc::new(RwLock::new(None));
82 let (socket_issue_tx, _) = broadcast::channel(100);
83
84 let inner = ConnectionManagerInner {
85 url: url.clone(),
86 state: state.clone(),
87 subscriptions: subscriptions.clone(),
88 config: config.clone(),
89 command_tx,
90 last_error: last_error.clone(),
91 last_socket_issue: last_socket_issue.clone(),
92 socket_issue_tx: socket_issue_tx.clone(),
93 };
94
95 spawn_connection_loop(
96 url,
97 state,
98 subscriptions,
99 config,
100 frame_tx,
101 command_rx,
102 last_error,
103 last_socket_issue,
104 socket_issue_tx,
105 initial_connect_tx,
106 );
107
108 let manager = Self {
109 inner: Arc::new(inner),
110 };
111
112 match initial_connect_rx.await {
113 Ok(Ok(())) => Ok(manager),
114 Ok(Err(error)) => Err(error),
115 Err(_) => Err(HyperStackError::ConnectionFailed(
116 "Connection task ended before initial connect completed".to_string(),
117 )),
118 }
119 }
120
121 pub async fn state(&self) -> ConnectionState {
122 *self.inner.state.read().await
123 }
124
125 pub async fn last_error(&self) -> Option<Arc<HyperStackError>> {
126 self.inner.last_error.read().await.clone()
127 }
128
129 pub async fn last_socket_issue(&self) -> Option<SocketIssue> {
130 self.inner.last_socket_issue.read().await.clone()
131 }
132
133 pub fn subscribe_socket_issues(&self) -> broadcast::Receiver<SocketIssue> {
134 self.inner.socket_issue_tx.subscribe()
135 }
136
137 pub async fn ensure_subscription(&self, view: &str, key: Option<&str>) {
138 self.ensure_subscription_with_opts(view, key, SubscriptionOptions::default())
139 .await
140 }
141
142 pub async fn ensure_subscription_with_opts(
143 &self,
144 view: &str,
145 key: Option<&str>,
146 opts: SubscriptionOptions,
147 ) {
148 let sub = Subscription {
149 view: view.to_string(),
150 key: key.map(|s| s.to_string()),
151 partition: None,
152 filters: None,
153 take: opts.take,
154 skip: opts.skip,
155 with_snapshot: opts.with_snapshot,
156 after: opts.after,
157 snapshot_limit: opts.snapshot_limit,
158 };
159
160 if !self.inner.subscriptions.read().await.contains(&sub) {
161 let _ = self
162 .inner
163 .command_tx
164 .send(ConnectionCommand::Subscribe(sub))
165 .await;
166 }
167 }
168
169 pub async fn subscribe(&self, sub: Subscription) {
170 let _ = self
171 .inner
172 .command_tx
173 .send(ConnectionCommand::Subscribe(sub))
174 .await;
175 }
176
177 pub async fn unsubscribe(&self, unsub: Unsubscription) {
178 let _ = self
179 .inner
180 .command_tx
181 .send(ConnectionCommand::Unsubscribe(unsub))
182 .await;
183 }
184
185 pub async fn disconnect(&self) {
186 let _ = self
187 .inner
188 .command_tx
189 .send(ConnectionCommand::Disconnect)
190 .await;
191 }
192}
193
194struct RuntimeAuthState {
195 websocket_url: String,
196 config: Option<AuthConfig>,
197 current_token: Option<String>,
198 token_expiry: Option<u64>,
199 http_client: reqwest::Client,
200}
201
202impl RuntimeAuthState {
203 fn new(websocket_url: String, config: Option<AuthConfig>) -> Self {
204 Self {
205 websocket_url,
206 config,
207 current_token: None,
208 token_expiry: None,
209 http_client: reqwest::Client::new(),
210 }
211 }
212
213 fn token_transport(&self) -> TokenTransport {
214 self.config
215 .as_ref()
216 .map(|config| config.token_transport)
217 .unwrap_or_default()
218 }
219
220 fn has_refreshable_auth(&self) -> bool {
221 self.config
222 .as_ref()
223 .is_some_and(|config| config.has_refreshable_auth(&self.websocket_url))
224 }
225
226 fn clear_cached_token(&mut self) {
227 self.current_token = None;
228 self.token_expiry = None;
229 }
230
231 fn refresh_timer(&self) -> Option<Pin<Box<Sleep>>> {
232 let delay = token_refresh_delay(self.token_expiry, current_unix_timestamp())?;
233 Some(Box::pin(sleep(Duration::from_secs(delay))))
234 }
235
236 async fn resolve_token(
237 &mut self,
238 force_refresh: bool,
239 ) -> Result<Option<String>, HyperStackError> {
240 if !force_refresh {
241 if let Some(token) = self.current_token.clone() {
242 if !token_is_expiring(self.token_expiry, current_unix_timestamp()) {
243 return Ok(Some(token));
244 }
245 }
246 }
247
248 let Some(config) = self.config.as_ref() else {
249 if crate::auth::is_hosted_hyperstack_websocket_url(&self.websocket_url) {
250 return Err(hosted_auth_required_error());
251 }
252 return Ok(None);
253 };
254
255 let strategy = config.resolve_strategy(&self.websocket_url);
256 match strategy {
257 ResolvedAuthStrategy::None => {
258 if crate::auth::is_hosted_hyperstack_websocket_url(&self.websocket_url) {
259 Err(hosted_auth_required_error())
260 } else {
261 Ok(None)
262 }
263 }
264 ResolvedAuthStrategy::StaticToken(token) => {
265 self.set_token(AuthToken::new(token)).map(Some)
266 }
267 ResolvedAuthStrategy::TokenProvider(provider) => {
268 let token = provider().await?;
269 self.set_token(token).map(Some)
270 }
271 ResolvedAuthStrategy::TokenEndpoint(endpoint) => {
272 let token = self.fetch_token_from_endpoint(&endpoint).await?;
273 self.set_token(token).map(Some)
274 }
275 }
276 }
277
278 fn set_token(&mut self, token: AuthToken) -> Result<String, HyperStackError> {
279 let token_value = token.token.trim().to_string();
280 if token_value.is_empty() {
281 return Err(HyperStackError::WebSocket {
282 message: "Authentication provider returned an empty token".to_string(),
283 code: None,
284 });
285 }
286
287 let expires_at = token.expires_at.or_else(|| parse_jwt_expiry(&token_value));
288 if expires_at.is_some() && token_is_expiring(expires_at, current_unix_timestamp()) {
289 return Err(HyperStackError::WebSocket {
290 message: "Authentication token is expired".to_string(),
291 code: Some(crate::error::AuthErrorCode::TokenExpired),
292 });
293 }
294
295 self.current_token = Some(token_value.clone());
296 self.token_expiry = expires_at;
297 Ok(token_value)
298 }
299
300 async fn fetch_token_from_endpoint(
301 &self,
302 token_endpoint: &str,
303 ) -> Result<AuthToken, HyperStackError> {
304 let mut request = self
305 .http_client
306 .post(token_endpoint)
307 .json(&TokenEndpointRequest {
308 websocket_url: &self.websocket_url,
309 });
310
311 if let Some(config) = self.config.as_ref() {
312 if let Some(publishable_key) = config.publishable_key.as_ref() {
313 request = request.header("Authorization", format!("Bearer {}", publishable_key));
314 }
315
316 for (key, value) in &config.token_endpoint_headers {
317 request = request.header(key, value);
318 }
319 }
320
321 let response = request.send().await.map_err(|error| {
322 HyperStackError::ConnectionFailed(format!("Token endpoint request failed: {error}"))
323 })?;
324 let status = response.status();
325 let header_code = response
326 .headers()
327 .get("X-Error-Code")
328 .and_then(|value| value.to_str().ok())
329 .map(str::to_string);
330 let fallback_message = status.canonical_reason().map(str::to_string);
331 let body = response.bytes().await.map_err(|error| {
332 HyperStackError::ConnectionFailed(format!(
333 "Failed to read token endpoint response: {error}"
334 ))
335 })?;
336
337 if !status.is_success() {
338 return Err(HyperStackError::from_auth_response(
339 status.as_u16(),
340 header_code.as_deref(),
341 Some(body.as_ref()),
342 fallback_message.as_deref(),
343 ));
344 }
345
346 let response: TokenEndpointResponse = serde_json::from_slice(body.as_ref())?;
347 let token = response.into_auth_token();
348 if token.token.trim().is_empty() {
349 return Err(HyperStackError::WebSocket {
350 message: "Token endpoint did not return a token".to_string(),
351 code: None,
352 });
353 }
354
355 Ok(token)
356 }
357
358 fn build_request(
359 &self,
360 token: Option<&str>,
361 ) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, HyperStackError> {
362 let url = build_websocket_url(&self.websocket_url, token, self.token_transport())?;
363 let mut request = url
364 .into_client_request()
365 .map_err(|error| HyperStackError::ConnectionFailed(error.to_string()))?;
366
367 if self.token_transport() == TokenTransport::Bearer {
368 if let Some(token) = token {
369 let header_value = HeaderValue::from_str(&format!("Bearer {token}"))
370 .map_err(|error| HyperStackError::ConnectionFailed(error.to_string()))?;
371 request.headers_mut().insert("Authorization", header_value);
372 }
373 }
374
375 Ok(request)
376 }
377}
378
379#[allow(clippy::too_many_arguments)]
380fn spawn_connection_loop(
381 url: String,
382 state: Arc<RwLock<ConnectionState>>,
383 subscriptions: Arc<RwLock<SubscriptionRegistry>>,
384 config: ConnectionConfig,
385 frame_tx: mpsc::Sender<Frame>,
386 mut command_rx: mpsc::Receiver<ConnectionCommand>,
387 last_error: Arc<RwLock<Option<Arc<HyperStackError>>>>,
388 last_socket_issue: Arc<RwLock<Option<SocketIssue>>>,
389 socket_issue_tx: broadcast::Sender<SocketIssue>,
390 initial_connect_tx: oneshot::Sender<Result<(), HyperStackError>>,
391) {
392 tokio::spawn(async move {
393 let mut auth_state = RuntimeAuthState::new(url.clone(), config.auth.clone());
394 let mut reconnect_attempt: u32 = 0;
395 let mut should_run = true;
396 let mut initial_connect_tx = Some(initial_connect_tx);
397 let mut force_token_refresh = false;
398 let mut immediate_reconnect = false;
399
400 while should_run {
401 *state.write().await = ConnectionState::Connecting;
402
403 let token = match auth_state.resolve_token(force_token_refresh).await {
404 Ok(token) => {
405 force_token_refresh = false;
406 token
407 }
408 Err(error) => {
409 set_last_error(&last_error, error.clone()).await;
410 *state.write().await = ConnectionState::Error;
411 report_initial_failure(&mut initial_connect_tx, error);
412 break;
413 }
414 };
415
416 let request = match auth_state.build_request(token.as_deref()) {
417 Ok(request) => request,
418 Err(error) => {
419 set_last_error(&last_error, error.clone()).await;
420 *state.write().await = ConnectionState::Error;
421 report_initial_failure(&mut initial_connect_tx, error);
422 break;
423 }
424 };
425
426 match connect_async(request).await {
427 Ok((ws, _)) => {
428 clear_last_error(&last_error).await;
429 *last_socket_issue.write().await = None;
430 *state.write().await = ConnectionState::Connected;
431 reconnect_attempt = 0;
432 immediate_reconnect = false;
433 report_initial_success(&mut initial_connect_tx);
434
435 let (mut ws_tx, mut ws_rx) = ws.split();
436 let subs = subscriptions.read().await.all();
437 for sub in subs {
438 let client_msg = ClientMessage::Subscribe(sub);
439 if let Ok(msg) = serde_json::to_string(&client_msg) {
440 let _ = ws_tx.send(Message::Text(msg)).await;
441 }
442 }
443
444 let ping_interval = config.ping_interval;
445 let mut ping_timer = tokio::time::interval(ping_interval);
446 let mut refresh_timer = auth_state.refresh_timer();
447
448 loop {
449 tokio::select! {
450 msg = ws_rx.next() => {
451 match msg {
452 Some(Ok(Message::Binary(bytes))) => {
453 if let Ok(frame) = parse_frame(&bytes) {
454 let _ = frame_tx.send(frame).await;
455 }
456 }
457 Some(Ok(Message::Text(text))) => {
458 if let Some(issue) = parse_socket_issue_message(&text) {
459 record_socket_issue(&last_socket_issue, &socket_issue_tx, issue.clone()).await;
460
461 let error = HyperStackError::from_socket_issue(issue);
462 if error.should_refresh_token() && auth_state.has_refreshable_auth() {
463 auth_state.clear_cached_token();
464 force_token_refresh = true;
465 immediate_reconnect = true;
466 }
467
468 let is_fatal = error
469 .socket_issue()
470 .map(|issue| issue.fatal)
471 .unwrap_or(false);
472 set_last_error(&last_error, error).await;
473
474 if is_fatal {
475 break;
476 }
477 } else if let Some(refresh_response) = parse_refresh_auth_response(&text) {
478 if refresh_response.success {
479 if let Some(expires_at) = refresh_response.expires_at {
480 auth_state.token_expiry = Some(expires_at);
481 }
482 refresh_timer = auth_state.refresh_timer();
483 } else {
484 let error = refresh_response_error(refresh_response);
485 if error.should_refresh_token() && auth_state.has_refreshable_auth() {
486 auth_state.clear_cached_token();
487 force_token_refresh = true;
488 }
489 immediate_reconnect = true;
490 set_last_error(&last_error, error).await;
491 break;
492 }
493 } else if let Ok(frame) = serde_json::from_str::<Frame>(&text) {
494 let _ = frame_tx.send(frame).await;
495 }
496 }
497 Some(Ok(Message::Ping(payload))) => {
498 let _ = ws_tx.send(Message::Pong(payload)).await;
499 }
500 Some(Ok(Message::Close(frame))) => {
501 if let Some(frame) = frame.as_ref() {
502 let reason = frame.reason.to_string();
503 if let Some(error) = HyperStackError::from_close_reason(&reason) {
504 if error.should_refresh_token() && auth_state.has_refreshable_auth() {
505 auth_state.clear_cached_token();
506 force_token_refresh = true;
507 immediate_reconnect = true;
508 }
509 set_last_error(&last_error, error).await;
510 }
511 }
512 break;
513 }
514 Some(Err(error)) => {
515 let parsed_error = HyperStackError::from_tungstenite(error);
516 if parsed_error.should_refresh_token() && auth_state.has_refreshable_auth() {
517 auth_state.clear_cached_token();
518 force_token_refresh = true;
519 immediate_reconnect = true;
520 }
521 set_last_error(&last_error, parsed_error).await;
522 break;
523 }
524 None => {
525 break;
526 }
527 _ => {}
528 }
529 }
530 cmd = command_rx.recv() => {
531 match cmd {
532 Some(ConnectionCommand::Subscribe(sub)) => {
533 subscriptions.write().await.add(sub.clone());
534 let client_msg = ClientMessage::Subscribe(sub);
535 if let Ok(msg) = serde_json::to_string(&client_msg) {
536 let _ = ws_tx.send(Message::Text(msg)).await;
537 }
538 }
539 Some(ConnectionCommand::Unsubscribe(unsub)) => {
540 let sub = Subscription {
541 view: unsub.view.clone(),
542 key: unsub.key.clone(),
543 partition: None,
544 filters: None,
545 take: None,
546 skip: None,
547 with_snapshot: None,
548 after: None,
549 snapshot_limit: None,
550 };
551 subscriptions.write().await.remove(&sub);
552 let client_msg = ClientMessage::Unsubscribe(unsub);
553 if let Ok(msg) = serde_json::to_string(&client_msg) {
554 let _ = ws_tx.send(Message::Text(msg)).await;
555 }
556 }
557 Some(ConnectionCommand::Disconnect) => {
558 let _ = ws_tx.close().await;
559 *state.write().await = ConnectionState::Disconnected;
560 should_run = false;
561 break;
562 }
563 None => {
564 should_run = false;
565 break;
566 }
567 }
568 }
569 _ = ping_timer.tick() => {
570 if let Ok(msg) = serde_json::to_string(&ClientMessage::Ping) {
571 let _ = ws_tx.send(Message::Text(msg)).await;
572 }
573 }
574 _ = wait_for_refresh_timer(&mut refresh_timer) => {
575 let previous_token = auth_state.current_token.clone();
576 match auth_state.resolve_token(true).await {
577 Ok(Some(token)) => {
578 refresh_timer = auth_state.refresh_timer();
579 if previous_token.as_deref() != Some(token.as_str()) {
580 match serde_json::to_string(&ClientMessage::RefreshAuth { token }) {
581 Ok(message) => {
582 if ws_tx.send(Message::Text(message)).await.is_err() {
583 immediate_reconnect = true;
584 break;
585 }
586 }
587 Err(error) => {
588 tracing::warn!("Failed to serialize auth refresh message: {}", error);
589 refresh_timer = Some(Box::pin(sleep(Duration::from_secs(MIN_REFRESH_DELAY_SECONDS))));
590 }
591 }
592 }
593 }
594 Ok(None) => {
595 refresh_timer = None;
596 }
597 Err(error) => {
598 tracing::warn!("Failed to refresh auth token in background: {}", error);
599 refresh_timer = Some(Box::pin(sleep(Duration::from_secs(MIN_REFRESH_DELAY_SECONDS))));
600 }
601 }
602 }
603 }
604 }
605 }
606 Err(error) => {
607 let parsed_error = HyperStackError::from_tungstenite(error);
608 if parsed_error.should_refresh_token() && auth_state.has_refreshable_auth() {
609 auth_state.clear_cached_token();
610 force_token_refresh = true;
611 immediate_reconnect = true;
612 }
613 tracing::error!("Connection failed: {}", parsed_error);
614 set_last_error(&last_error, parsed_error).await;
615 }
616 }
617
618 if !should_run {
619 break;
620 }
621
622 let latest_error = last_error.read().await.clone();
623 if let Some(error) = latest_error.as_deref() {
624 if error.should_refresh_token() && auth_state.has_refreshable_auth() {
625 auth_state.clear_cached_token();
626 force_token_refresh = true;
627 immediate_reconnect = true;
628 } else if !error.should_retry() {
629 *state.write().await = ConnectionState::Error;
630 report_initial_failure(&mut initial_connect_tx, error.clone());
631 break;
632 }
633 }
634
635 if !config.auto_reconnect {
636 *state.write().await = ConnectionState::Error;
637 let error = latest_error
638 .as_deref()
639 .cloned()
640 .unwrap_or(HyperStackError::ConnectionClosed);
641 report_initial_failure(&mut initial_connect_tx, error);
642 break;
643 }
644
645 if reconnect_attempt >= config.max_reconnect_attempts {
646 *state.write().await = ConnectionState::Error;
647 let error = latest_error.as_deref().cloned().unwrap_or(
648 HyperStackError::MaxReconnectAttempts(config.max_reconnect_attempts),
649 );
650 set_last_error(&last_error, error.clone()).await;
651 report_initial_failure(&mut initial_connect_tx, error);
652 break;
653 }
654
655 let delay = if immediate_reconnect {
656 Duration::from_millis(0)
657 } else {
658 config
659 .reconnect_intervals
660 .get(reconnect_attempt as usize)
661 .copied()
662 .unwrap_or_else(|| {
663 config
664 .reconnect_intervals
665 .last()
666 .copied()
667 .unwrap_or(Duration::from_secs(16))
668 })
669 };
670
671 *state.write().await = ConnectionState::Reconnecting {
672 attempt: reconnect_attempt,
673 };
674 reconnect_attempt += 1;
675
676 if !delay.is_zero() {
677 tracing::info!(
678 "Reconnecting in {:?} (attempt {})",
679 delay,
680 reconnect_attempt
681 );
682 sleep(delay).await;
683 }
684 }
685
686 if let Some(tx) = initial_connect_tx.take() {
687 let error = last_error
688 .read()
689 .await
690 .as_deref()
691 .cloned()
692 .unwrap_or(HyperStackError::ConnectionClosed);
693 let _ = tx.send(Err(error));
694 }
695 });
696}
697
698async fn set_last_error(
699 last_error: &Arc<RwLock<Option<Arc<HyperStackError>>>>,
700 error: HyperStackError,
701) {
702 *last_error.write().await = Some(Arc::new(error));
703}
704
705async fn clear_last_error(last_error: &Arc<RwLock<Option<Arc<HyperStackError>>>>) {
706 *last_error.write().await = None;
707}
708
709async fn record_socket_issue(
710 last_socket_issue: &Arc<RwLock<Option<SocketIssue>>>,
711 socket_issue_tx: &broadcast::Sender<SocketIssue>,
712 issue: SocketIssue,
713) {
714 *last_socket_issue.write().await = Some(issue.clone());
715 let _ = socket_issue_tx.send(issue);
716}
717
718async fn wait_for_refresh_timer(timer: &mut Option<Pin<Box<Sleep>>>) {
719 if let Some(timer) = timer.as_mut() {
720 timer.as_mut().await;
721 } else {
722 futures_util::future::pending::<()>().await;
723 }
724}
725
726fn report_initial_success(
727 initial_connect_tx: &mut Option<oneshot::Sender<Result<(), HyperStackError>>>,
728) {
729 if let Some(tx) = initial_connect_tx.take() {
730 let _ = tx.send(Ok(()));
731 }
732}
733
734fn report_initial_failure(
735 initial_connect_tx: &mut Option<oneshot::Sender<Result<(), HyperStackError>>>,
736 error: HyperStackError,
737) {
738 if let Some(tx) = initial_connect_tx.take() {
739 let _ = tx.send(Err(error));
740 }
741}
742
743fn current_unix_timestamp() -> u64 {
744 SystemTime::now()
745 .duration_since(UNIX_EPOCH)
746 .unwrap_or_default()
747 .as_secs()
748}
749
750fn parse_socket_issue_message(text: &str) -> Option<SocketIssue> {
751 let payload = serde_json::from_str::<SocketIssuePayload>(text).ok()?;
752 if payload.is_socket_issue() {
753 Some(payload.into_socket_issue())
754 } else {
755 None
756 }
757}
758
759fn parse_refresh_auth_response(text: &str) -> Option<RefreshAuthResponseMessage> {
760 let payload = serde_json::from_str::<RefreshAuthResponseMessage>(text).ok()?;
761 Some(payload)
762}
763
764fn refresh_response_error(response: RefreshAuthResponseMessage) -> HyperStackError {
765 let code = response
766 .error
767 .as_deref()
768 .and_then(crate::error::AuthErrorCode::from_wire);
769 let message = response
770 .error
771 .unwrap_or_else(|| "Authentication refresh failed".to_string());
772
773 HyperStackError::WebSocket { message, code }
774}