braid_http/client/
subscription.rs1use crate::error::{BraidError, Result};
4use crate::types::Update;
5use futures::Stream;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11pub struct ReceiverStream<T> {
12 receiver: async_channel::Receiver<T>,
13}
14
15impl<T> ReceiverStream<T> {
16 pub fn new(receiver: async_channel::Receiver<T>) -> Self {
17 Self { receiver }
18 }
19}
20
21impl<T: Unpin> Stream for ReceiverStream<T> {
22 type Item = T;
23 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
24 let mut fut = self.receiver.recv();
25 unsafe { Pin::new_unchecked(&mut fut) }
26 .poll(cx)
27 .map(|res| res.ok())
28 }
29}
30
31#[cfg(not(target_arch = "wasm32"))]
32use std::time::Instant;
33
34#[cfg(target_arch = "wasm32")]
35#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
36pub struct Instant {
37 start_ms: f64,
38}
39
40#[cfg(target_arch = "wasm32")]
41impl Instant {
42 pub fn now() -> Self {
43 let start_ms = web_sys::window()
44 .and_then(|w| w.performance())
45 .map(|p| p.now())
46 .unwrap_or(0.0);
47 Self { start_ms }
48 }
49 pub fn elapsed(&self) -> Duration {
50 let now = web_sys::window()
51 .and_then(|w| w.performance())
52 .map(|p| p.now())
53 .unwrap_or(0.0);
54 Duration::from_secs_f64(((now - self.start_ms).max(0.0)) / 1000.0)
55 }
56}
57
58#[cfg(target_arch = "wasm32")]
59impl std::ops::Add<Duration> for Instant {
60 type Output = Instant;
61 fn add(self, other: Duration) -> Instant {
62 Instant {
63 start_ms: self.start_ms + other.as_secs_f64() * 1000.0,
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct HeartbeatConfig {
71 pub interval_secs: f64,
72 pub timeout: Duration,
73}
74
75impl HeartbeatConfig {
76 pub fn new(interval_secs: f64) -> Self {
77 let timeout_secs = 1.2 * interval_secs + 3.0;
78 Self {
79 interval_secs,
80 timeout: Duration::from_secs_f64(timeout_secs),
81 }
82 }
83
84 pub fn from_header(value: &str) -> Option<Self> {
85 value
86 .trim()
87 .strip_suffix('s')
88 .unwrap_or(value)
89 .parse::<f64>()
90 .ok()
91 .map(Self::new)
92 }
93}
94
95pub struct Subscription {
96 receiver: async_channel::Receiver<Result<Update>>,
97 heartbeat_config: Option<HeartbeatConfig>,
98 last_activity: Instant,
99}
100
101impl Subscription {
102 pub fn new(receiver: async_channel::Receiver<Result<Update>>) -> Self {
103 Subscription {
104 receiver,
105 heartbeat_config: None,
106 last_activity: Instant::now(),
107 }
108 }
109
110 pub fn with_heartbeat(
111 receiver: async_channel::Receiver<Result<Update>>,
112 heartbeat_config: HeartbeatConfig,
113 ) -> Self {
114 Subscription {
115 receiver,
116 heartbeat_config: Some(heartbeat_config),
117 last_activity: Instant::now(),
118 }
119 }
120
121 pub async fn next(&mut self) -> Option<Result<Update>> {
122 if let Some(ref config) = self.heartbeat_config {
123 let timeout = config.timeout;
124 #[cfg(not(target_arch = "wasm32"))]
125 {
126 let deadline = self.last_activity + timeout;
127 tokio::select! {
128 result = self.receiver.recv() => {
129 self.last_activity = Instant::now();
130 result.ok()
131 }
132 _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
133 Some(Err(BraidError::Timeout))
134 }
135 }
136 }
137 #[cfg(target_arch = "wasm32")]
138 {
139 use futures::{future::FutureExt, pin_mut, select};
140 let recv_fut = self.receiver.recv().fuse();
141 let timer_fut =
142 gloo_timers::future::TimeoutFuture::new(timeout.as_millis() as u32).fuse();
143 pin_mut!(recv_fut, timer_fut);
144 select! {
145 result = recv_fut => {
146 self.last_activity = Instant::now();
147 result.ok()
148 }
149 _ = timer_fut => Some(Err(BraidError::Timeout))
150 }
151 }
152 } else {
153 let result = self.receiver.recv().await.ok();
154 self.last_activity = Instant::now();
155 result
156 }
157 }
158
159 pub fn is_heartbeat_timeout(&self) -> bool {
160 if let Some(ref config) = self.heartbeat_config {
161 self.last_activity.elapsed() > config.timeout
162 } else {
163 false
164 }
165 }
166}
167
168impl Stream for Subscription {
169 type Item = Result<Update>;
170 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
171 let this = unsafe { self.get_unchecked_mut() };
172 let mut fut = this.receiver.recv();
173 match unsafe { Pin::new_unchecked(&mut fut) }.poll(cx) {
174 Poll::Ready(result) => {
175 this.last_activity = Instant::now();
176 Poll::Ready(result.ok())
177 }
178 Poll::Pending => {
179 if this.is_heartbeat_timeout() {
180 Poll::Ready(Some(Err(BraidError::Timeout)))
181 } else {
182 Poll::Pending
183 }
184 }
185 }
186 }
187}
188
189pub struct SubscriptionStream {
190 receiver: ReceiverStream<Result<Update>>,
191}
192
193impl SubscriptionStream {
194 pub fn new(receiver: async_channel::Receiver<Result<Update>>) -> Self {
195 SubscriptionStream {
196 receiver: ReceiverStream::new(receiver),
197 }
198 }
199}
200
201impl Stream for SubscriptionStream {
202 type Item = Result<Update>;
203 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204 let this = unsafe { self.get_unchecked_mut() };
205 unsafe { Pin::new_unchecked(&mut this.receiver) }.poll_next(cx)
206 }
207}