medea_jason/rpc/
heartbeat.rs1use std::{cell::RefCell, rc::Rc, time::Duration};
4
5use derive_more::with_trait::{Debug, Mul};
6use futures::{StreamExt as _, channel::mpsc, future, stream::LocalBoxStream};
7use medea_client_api_proto::{ClientMsg, ServerMsg};
8
9use crate::{platform, utils::TaskHandle};
10
11#[derive(Clone, Copy, Debug)]
15pub struct IdleTimeout(pub Duration);
16
17#[derive(Clone, Copy, Debug, Mul)]
21pub struct PingInterval(pub Duration);
22
23#[derive(Debug)]
25struct Inner {
26 #[debug("{transport:p}")]
28 transport: Rc<dyn platform::RpcTransport>,
29
30 idle_timeout: IdleTimeout,
32
33 ping_interval: PingInterval,
35
36 handle_ping_task: Option<TaskHandle>,
39
40 idle_watchdog_task: Option<TaskHandle>,
42
43 last_ping_num: u32,
45
46 on_idle_subs: Vec<mpsc::UnboundedSender<()>>,
48}
49
50impl Inner {
51 fn send_pong(&self, n: u32) {
55 _ = self
56 .transport
57 .send(&ClientMsg::Pong(n))
58 .map_err(tracerr::wrap!(=> platform::TransportError))
59 .map_err(|e| log::error!("Failed to send pong: {e}"));
60 }
61}
62
63#[derive(Debug)]
65pub struct Heartbeat(Rc<RefCell<Inner>>);
66
67impl Heartbeat {
68 #[must_use]
71 pub fn start(
72 transport: Rc<dyn platform::RpcTransport>,
73 ping_interval: PingInterval,
74 idle_timeout: IdleTimeout,
75 ) -> Self {
76 let inner = Rc::new(RefCell::new(Inner {
77 idle_timeout,
78 ping_interval,
79 transport,
80 handle_ping_task: None,
81 idle_watchdog_task: None,
82 on_idle_subs: Vec::new(),
83 last_ping_num: 0,
84 }));
85
86 let handle_ping_task = spawn_ping_handle_task(Rc::clone(&inner));
87 let idle_watchdog_task = spawn_idle_watchdog_task(Rc::clone(&inner));
88
89 inner.borrow_mut().idle_watchdog_task = Some(idle_watchdog_task);
90 inner.borrow_mut().handle_ping_task = Some(handle_ping_task);
91
92 Self(inner)
93 }
94
95 pub fn update_settings(
97 &self,
98 idle_timeout: IdleTimeout,
99 ping_interval: PingInterval,
100 ) {
101 self.0.borrow_mut().idle_timeout = idle_timeout;
102 self.0.borrow_mut().ping_interval = ping_interval;
103 }
104
105 #[must_use]
108 pub fn on_idle(&self) -> LocalBoxStream<'static, ()> {
109 let (on_idle_tx, on_idle_rx) = mpsc::unbounded();
110 self.0.borrow_mut().on_idle_subs.push(on_idle_tx);
111
112 Box::pin(on_idle_rx)
113 }
114}
115
116fn spawn_idle_watchdog_task(this: Rc<RefCell<Inner>>) -> TaskHandle {
124 let (idle_watchdog_fut, idle_watchdog_handle) =
125 future::abortable(async move {
126 let wait_for_ping = this.borrow().ping_interval * 2;
127 platform::delay_for(wait_for_ping.0).await;
128
129 let last_ping_num = this.borrow().last_ping_num;
130 this.borrow().send_pong(last_ping_num + 1);
131
132 let idle_timeout = this.borrow().idle_timeout;
133 platform::delay_for(idle_timeout.0 - wait_for_ping.0).await;
134 this.borrow_mut()
135 .on_idle_subs
136 .retain(|sub| sub.unbounded_send(()).is_ok());
137 });
138
139 platform::spawn(async move {
140 _ = idle_watchdog_fut.await.ok();
141 });
142
143 idle_watchdog_handle.into()
144}
145
146fn spawn_ping_handle_task(this: Rc<RefCell<Inner>>) -> TaskHandle {
151 let mut on_message_stream = this.borrow().transport.on_message();
152
153 let (handle_ping_fut, handle_ping_task) = future::abortable(async move {
154 while let Some(msg) = on_message_stream.next().await {
155 let idle_task = spawn_idle_watchdog_task(Rc::clone(&this));
156 this.borrow_mut().idle_watchdog_task = Some(idle_task);
157
158 if let ServerMsg::Ping(num) = msg {
159 this.borrow_mut().last_ping_num = num;
160 this.borrow().send_pong(num);
161 }
162 }
163 });
164 platform::spawn(async move {
165 _ = handle_ping_fut.await.ok();
166 });
167 handle_ping_task.into()
168}
169
170impl Drop for Heartbeat {
171 fn drop(&mut self) {
172 let mut inner = self.0.borrow_mut();
173 drop(inner.handle_ping_task.take());
174 drop(inner.idle_watchdog_task.take());
175 }
176}