use futures_util::SinkExt;
use log::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep_until;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep_until;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, Sender};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
use super::*;
use crate::types::{self, Opcode};
pub const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
#[allow(dead_code)] #[derive(Debug)]
pub(super) struct HeartbeatHandler {
pub heartbeat_interval: Duration,
pub send: Sender<HeartbeatThreadCommunication>,
}
impl HeartbeatHandler {
pub fn new(
heartbeat_interval: Duration,
websocket_tx: Arc<Mutex<Sink>>,
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> Self {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
#[cfg(not(target_arch = "wasm32"))]
task::spawn(async move {
Self::heartbeat_task(websocket_tx, heartbeat_interval, receive, kill_receive).await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
Self::heartbeat_task(websocket_tx, heartbeat_interval, receive, kill_receive).await;
});
Self {
heartbeat_interval,
send,
}
}
pub async fn heartbeat_task(
websocket_tx: Arc<Mutex<Sink>>,
heartbeat_interval: Duration,
mut receive: Receiver<HeartbeatThreadCommunication>,
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
) {
let mut last_heartbeat_timestamp: Instant = Instant::now();
let mut last_heartbeat_acknowledged = true;
let mut last_seq_number: Option<u64> = None;
loop {
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
};
let mut should_send = false;
tokio::select! {
() = sleep_until(last_heartbeat_timestamp + timeout) => {
should_send = true;
}
Some(communication) = receive.recv() => {
if communication.sequence_number.is_some() {
last_seq_number = communication.sequence_number;
}
if let Some(op_code) = communication.op_code {
match op_code {
Opcode::Heartbeat => {
should_send = true;
}
Opcode::HeartbeatAck => {
last_heartbeat_acknowledged = true;
}
_ => {}
}
}
}
Ok(_) = kill_receive.recv() => {
log::trace!("GW: Closing heartbeat task");
break;
}
}
if should_send {
trace!("GW: Sending Heartbeat..");
let heartbeat = types::GatewayHeartbeat {
op: (Opcode::Heartbeat as u8),
d: last_seq_number,
};
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
let msg = GatewayMessage(heartbeat_json);
let send_result = websocket_tx.lock().await.send(msg.into()).await;
if send_result.is_err() {
warn!("GW: Couldn't send heartbeat, websocket seems broken");
break;
}
last_heartbeat_timestamp = Instant::now();
last_heartbeat_acknowledged = false;
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub(super) struct HeartbeatThreadCommunication {
pub(super) op_code: Option<Opcode>,
pub(super) sequence_number: Option<u64>,
}