Skip to main content

braid_http/client/
subscription.rs

1//! Subscription handling for Braid protocol.
2
3use crate::error::{BraidError, Result};
4use crate::types::Update;
5use futures::Stream;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11pub struct ReceiverStream<T> {
12    receiver: async_channel::Receiver<T>,
13}
14
15impl<T> ReceiverStream<T> {
16    pub fn new(receiver: async_channel::Receiver<T>) -> Self {
17        Self { receiver }
18    }
19}
20
21impl<T: Unpin> Stream for ReceiverStream<T> {
22    type Item = T;
23    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
24        let mut fut = self.receiver.recv();
25        unsafe { Pin::new_unchecked(&mut fut) }
26            .poll(cx)
27            .map(|res| res.ok())
28    }
29}
30
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Instant;
33
34#[cfg(target_arch = "wasm32")]
35#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
36pub struct Instant {
37    start_ms: f64,
38}
39
40#[cfg(target_arch = "wasm32")]
41impl Instant {
42    pub fn now() -> Self {
43        let start_ms = web_sys::window()
44            .and_then(|w| w.performance())
45            .map(|p| p.now())
46            .unwrap_or(0.0);
47        Self { start_ms }
48    }
49    pub fn elapsed(&self) -> Duration {
50        let now = web_sys::window()
51            .and_then(|w| w.performance())
52            .map(|p| p.now())
53            .unwrap_or(0.0);
54        Duration::from_secs_f64(((now - self.start_ms).max(0.0)) / 1000.0)
55    }
56}
57
58#[cfg(target_arch = "wasm32")]
59impl std::ops::Add<Duration> for Instant {
60    type Output = Instant;
61    fn add(self, other: Duration) -> Instant {
62        Instant {
63            start_ms: self.start_ms + other.as_secs_f64() * 1000.0,
64        }
65    }
66}
67
68/// Configuration for heartbeat timeout detection.
69#[derive(Debug, Clone)]
70pub struct HeartbeatConfig {
71    pub interval_secs: f64,
72    pub timeout: Duration,
73}
74
75impl HeartbeatConfig {
76    pub fn new(interval_secs: f64) -> Self {
77        let timeout_secs = 1.2 * interval_secs + 3.0;
78        Self {
79            interval_secs,
80            timeout: Duration::from_secs_f64(timeout_secs),
81        }
82    }
83
84    pub fn from_header(value: &str) -> Option<Self> {
85        value
86            .trim()
87            .strip_suffix('s')
88            .unwrap_or(value)
89            .parse::<f64>()
90            .ok()
91            .map(Self::new)
92    }
93}
94
95pub struct Subscription {
96    receiver: async_channel::Receiver<Result<Update>>,
97    heartbeat_config: Option<HeartbeatConfig>,
98    last_activity: Instant,
99}
100
101impl Subscription {
102    pub fn new(receiver: async_channel::Receiver<Result<Update>>) -> Self {
103        Subscription {
104            receiver,
105            heartbeat_config: None,
106            last_activity: Instant::now(),
107        }
108    }
109
110    pub fn with_heartbeat(
111        receiver: async_channel::Receiver<Result<Update>>,
112        heartbeat_config: HeartbeatConfig,
113    ) -> Self {
114        Subscription {
115            receiver,
116            heartbeat_config: Some(heartbeat_config),
117            last_activity: Instant::now(),
118        }
119    }
120
121    pub async fn next(&mut self) -> Option<Result<Update>> {
122        if let Some(ref config) = self.heartbeat_config {
123            let timeout = config.timeout;
124            #[cfg(not(target_arch = "wasm32"))]
125            {
126                let deadline = self.last_activity + timeout;
127                tokio::select! {
128                    result = self.receiver.recv() => {
129                        self.last_activity = Instant::now();
130                        result.ok()
131                    }
132                    _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
133                        Some(Err(BraidError::Timeout))
134                    }
135                }
136            }
137            #[cfg(target_arch = "wasm32")]
138            {
139                use futures::{future::FutureExt, pin_mut, select};
140                let recv_fut = self.receiver.recv().fuse();
141                let timer_fut =
142                    gloo_timers::future::TimeoutFuture::new(timeout.as_millis() as u32).fuse();
143                pin_mut!(recv_fut, timer_fut);
144                select! {
145                    result = recv_fut => {
146                        self.last_activity = Instant::now();
147                        result.ok()
148                    }
149                    _ = timer_fut => Some(Err(BraidError::Timeout))
150                }
151            }
152        } else {
153            let result = self.receiver.recv().await.ok();
154            self.last_activity = Instant::now();
155            result
156        }
157    }
158
159    pub fn is_heartbeat_timeout(&self) -> bool {
160        if let Some(ref config) = self.heartbeat_config {
161            self.last_activity.elapsed() > config.timeout
162        } else {
163            false
164        }
165    }
166}
167
168impl Stream for Subscription {
169    type Item = Result<Update>;
170    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
171        let this = unsafe { self.get_unchecked_mut() };
172        let mut fut = this.receiver.recv();
173        match unsafe { Pin::new_unchecked(&mut fut) }.poll(cx) {
174            Poll::Ready(result) => {
175                this.last_activity = Instant::now();
176                Poll::Ready(result.ok())
177            }
178            Poll::Pending => {
179                if this.is_heartbeat_timeout() {
180                    Poll::Ready(Some(Err(BraidError::Timeout)))
181                } else {
182                    Poll::Pending
183                }
184            }
185        }
186    }
187}
188
189pub struct SubscriptionStream {
190    receiver: ReceiverStream<Result<Update>>,
191}
192
193impl SubscriptionStream {
194    pub fn new(receiver: async_channel::Receiver<Result<Update>>) -> Self {
195        SubscriptionStream {
196            receiver: ReceiverStream::new(receiver),
197        }
198    }
199}
200
201impl Stream for SubscriptionStream {
202    type Item = Result<Update>;
203    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204        let this = unsafe { self.get_unchecked_mut() };
205        unsafe { Pin::new_unchecked(&mut this.receiver) }.poll_next(cx)
206    }
207}