use crate::{ConnectionStatus, ErrorKind, internal_rpc::InternalRPCHandle, killswitch::KillSwitch};
use async_rs::{Runtime, traits::*};
use std::{
fmt,
sync::{Arc, Mutex, MutexGuard},
time::{Duration, Instant},
};
use tracing::error;
#[derive(Clone)]
pub struct Heartbeat<RK: RuntimeKit + Clone + Send + 'static> {
connection_status: ConnectionStatus,
killswitch: KillSwitch,
runtime: Runtime<RK>,
inner: Arc<Mutex<Inner>>,
}
impl<RK: RuntimeKit + Clone + Send + 'static> Heartbeat<RK> {
pub(crate) fn new(connection_status: ConnectionStatus, runtime: Runtime<RK>) -> Self {
let killswitch = Default::default();
let inner = Default::default();
Self {
connection_status,
killswitch,
runtime,
inner,
}
}
pub(crate) fn set_timeout(&self, timeout: Duration) {
self.lock_inner().timeout = Some(timeout);
}
pub(crate) fn killswitch(&self) -> KillSwitch {
self.killswitch.clone()
}
pub(crate) fn start(&self, internal_rpc: InternalRPCHandle) {
let heartbeat = self.clone();
let poison = self.lock_inner().poison.clone();
self.runtime.spawn(async move {
while let Some(dur) = heartbeat.poll_timeout(&internal_rpc, &poison) {
heartbeat.runtime.sleep(dur).await;
}
});
}
fn poll_timeout(
&self,
internal_rpc: &InternalRPCHandle,
poison: &KillSwitch,
) -> Option<Duration> {
if poison.killed() {
return None;
}
if !self.connection_status.connected() {
self.cancel();
return None;
}
self.lock_inner()
.poll_timeout(internal_rpc, &self.killswitch)
}
pub(crate) fn update_last_write(&self) {
self.lock_inner().update_last_write();
}
pub(crate) fn update_last_read(&self) {
self.lock_inner().update_last_read();
}
pub(crate) fn cancel(&self) {
self.lock_inner().cancel();
}
pub(crate) fn reset(&self) {
self.killswitch.reset();
self.lock_inner().reset();
}
fn lock_inner(&self) -> MutexGuard<'_, Inner> {
self.inner.lock().unwrap_or_else(|e| e.into_inner())
}
}
impl<RK: RuntimeKit + Clone + Send + 'static> fmt::Debug for Heartbeat<RK> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Heartbeat").finish()
}
}
struct Inner {
last_read: Instant,
last_write: Instant,
timeout: Option<Duration>,
poison: KillSwitch,
}
impl Default for Inner {
fn default() -> Self {
Self {
last_read: Instant::now(),
last_write: Instant::now(),
timeout: None,
poison: KillSwitch::default(),
}
}
}
impl Inner {
fn poll_timeout(
&mut self,
internal_rpc: &InternalRPCHandle,
killswitch: &KillSwitch,
) -> Option<Duration> {
let timeout = self.timeout?;
if Instant::now().duration_since(self.last_read) > 4 * timeout {
error!(
"We haven't received anything from the server for too long, closing connection."
);
self.timeout = None;
killswitch.kill();
internal_rpc.set_connection_error(ErrorKind::MissingHeartbeatError.into());
return None;
}
timeout
.checked_sub(self.last_write.elapsed())
.map(|timeout| timeout.max(Duration::from_millis(1)))
.or_else(|| {
self.update_last_write();
internal_rpc.send_heartbeat();
Some(timeout)
})
}
fn update_last_write(&mut self) {
self.last_write = Instant::now();
}
fn update_last_read(&mut self) {
self.last_read = Instant::now();
}
fn cancel(&mut self) {
self.timeout = None;
self.poison.kill();
}
fn reset(&mut self) {
self.cancel();
self.update_last_read();
self.update_last_write();
self.poison = KillSwitch::default();
}
}