medea_jason/rpc/
heartbeat.rs

1//! Connection loss detection via ping/pong mechanism.
2
3use std::{cell::RefCell, rc::Rc, time::Duration};
4
5use derive_more::with_trait::{Debug, Mul};
6use futures::{StreamExt as _, channel::mpsc, future, stream::LocalBoxStream};
7use medea_client_api_proto::{ClientMsg, ServerMsg};
8
9use crate::{platform, utils::TaskHandle};
10
11/// Idle timeout of [`WebSocketRpcClient`].
12///
13/// [`WebSocketRpcClient`]: super::WebSocketRpcClient
14#[derive(Clone, Copy, Debug)]
15pub struct IdleTimeout(pub Duration);
16
17/// Ping interval of [`WebSocketRpcClient`].
18///
19/// [`WebSocketRpcClient`]: super::WebSocketRpcClient
20#[derive(Clone, Copy, Debug, Mul)]
21pub struct PingInterval(pub Duration);
22
23/// Inner data of [`Heartbeat`].
24#[derive(Debug)]
25struct Inner {
26    /// [`platform::RpcTransport`] which heartbeats.
27    #[debug("{transport:p}")]
28    transport: Rc<dyn platform::RpcTransport>,
29
30    /// Idle timeout of the [`platform::RpcTransport`].
31    idle_timeout: IdleTimeout,
32
33    /// Ping interval of the [`platform::RpcTransport`].
34    ping_interval: PingInterval,
35
36    /// [`TaskHandle`] for [`Future`] which sends [`ClientMsg::Pong`] on
37    /// [`ServerMsg::Ping`].
38    handle_ping_task: Option<TaskHandle>,
39
40    /// [`TaskHandle`] for idle watchdog.
41    idle_watchdog_task: Option<TaskHandle>,
42
43    /// Number of last received [`ServerMsg::Ping`].
44    last_ping_num: u32,
45
46    /// [`mpsc::UnboundedSender`]s for a [`Heartbeat::on_idle`].
47    on_idle_subs: Vec<mpsc::UnboundedSender<()>>,
48}
49
50impl Inner {
51    /// Sends [`ClientMsg::Pong`] to a server.
52    ///
53    /// If some error happen then it will be printed with [`log::error`].
54    fn send_pong(&self, n: u32) {
55        _ = self
56            .transport
57            .send(&ClientMsg::Pong(n))
58            .map_err(tracerr::wrap!(=> platform::TransportError))
59            .map_err(|e| log::error!("Failed to send pong: {e}"));
60    }
61}
62
63/// Detector of connection loss via ping/pong mechanism.
64#[derive(Debug)]
65pub struct Heartbeat(Rc<RefCell<Inner>>);
66
67impl Heartbeat {
68    /// Starts this [`Heartbeat`] for the provided [`platform::RpcTransport`]
69    /// with the provided `idle_timeout` and `ping_interval`.
70    #[must_use]
71    pub fn start(
72        transport: Rc<dyn platform::RpcTransport>,
73        ping_interval: PingInterval,
74        idle_timeout: IdleTimeout,
75    ) -> Self {
76        let inner = Rc::new(RefCell::new(Inner {
77            idle_timeout,
78            ping_interval,
79            transport,
80            handle_ping_task: None,
81            idle_watchdog_task: None,
82            on_idle_subs: Vec::new(),
83            last_ping_num: 0,
84        }));
85
86        let handle_ping_task = spawn_ping_handle_task(Rc::clone(&inner));
87        let idle_watchdog_task = spawn_idle_watchdog_task(Rc::clone(&inner));
88
89        inner.borrow_mut().idle_watchdog_task = Some(idle_watchdog_task);
90        inner.borrow_mut().handle_ping_task = Some(handle_ping_task);
91
92        Self(inner)
93    }
94
95    /// Updates this [`Heartbeat`] settings.
96    pub fn update_settings(
97        &self,
98        idle_timeout: IdleTimeout,
99        ping_interval: PingInterval,
100    ) {
101        self.0.borrow_mut().idle_timeout = idle_timeout;
102        self.0.borrow_mut().ping_interval = ping_interval;
103    }
104
105    /// Returns [`LocalBoxStream`] to which will sent `()` when [`Heartbeat`]
106    /// considers that [`platform::RpcTransport`] is idle.
107    #[must_use]
108    pub fn on_idle(&self) -> LocalBoxStream<'static, ()> {
109        let (on_idle_tx, on_idle_rx) = mpsc::unbounded();
110        self.0.borrow_mut().on_idle_subs.push(on_idle_tx);
111
112        Box::pin(on_idle_rx)
113    }
114}
115
116/// Spawns idle watchdog task returning its handle.
117///
118/// This task is responsible for throwing [`Heartbeat::on_idle`] when
119/// [`ServerMsg`] hasn't been received within `idle_timeout`.
120///
121/// Also this watchdog will repeat [`ClientMsg::Pong`] if
122/// [`ServerMsg::Ping`] wasn't received within `ping_interval * 2`.
123fn spawn_idle_watchdog_task(this: Rc<RefCell<Inner>>) -> TaskHandle {
124    let (idle_watchdog_fut, idle_watchdog_handle) =
125        future::abortable(async move {
126            let wait_for_ping = this.borrow().ping_interval * 2;
127            platform::delay_for(wait_for_ping.0).await;
128
129            let last_ping_num = this.borrow().last_ping_num;
130            this.borrow().send_pong(last_ping_num + 1);
131
132            let idle_timeout = this.borrow().idle_timeout;
133            platform::delay_for(idle_timeout.0 - wait_for_ping.0).await;
134            this.borrow_mut()
135                .on_idle_subs
136                .retain(|sub| sub.unbounded_send(()).is_ok());
137        });
138
139    platform::spawn(async move {
140        _ = idle_watchdog_fut.await.ok();
141    });
142
143    idle_watchdog_handle.into()
144}
145
146/// Spawns ping handle task returning its handle.
147///
148/// This task is responsible for answering [`ServerMsg::Ping`] with
149/// [`ClientMsg::Pong`] and renewing idle watchdog task.
150fn spawn_ping_handle_task(this: Rc<RefCell<Inner>>) -> TaskHandle {
151    let mut on_message_stream = this.borrow().transport.on_message();
152
153    let (handle_ping_fut, handle_ping_task) = future::abortable(async move {
154        while let Some(msg) = on_message_stream.next().await {
155            let idle_task = spawn_idle_watchdog_task(Rc::clone(&this));
156            this.borrow_mut().idle_watchdog_task = Some(idle_task);
157
158            if let ServerMsg::Ping(num) = msg {
159                this.borrow_mut().last_ping_num = num;
160                this.borrow().send_pong(num);
161            }
162        }
163    });
164    platform::spawn(async move {
165        _ = handle_ping_fut.await.ok();
166    });
167    handle_ping_task.into()
168}
169
170impl Drop for Heartbeat {
171    fn drop(&mut self) {
172        let mut inner = self.0.borrow_mut();
173        drop(inner.handle_ping_task.take());
174        drop(inner.idle_watchdog_task.take());
175    }
176}