use futures_util::SinkExt;
use log::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep_until;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep_until;
use std::{sync::Arc, time::Duration};
use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
use crate::{
gateway::{heartbeat::HEARTBEAT_ACK_TIMEOUT, Sink},
types::{VoiceGatewaySendPayload, VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK},
voice::gateway::VoiceGatewayMessage,
};
#[allow(dead_code)] #[derive(Debug)]
pub(super) struct VoiceHeartbeatHandler {
pub heartbeat_interval: Duration,
pub send: Sender<VoiceHeartbeatThreadCommunication>,
}
impl VoiceHeartbeatHandler {
pub fn new(
heartbeat_interval: Duration,
starting_nonce: u64,
websocket_tx: Arc<Mutex<Sink>>,
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> Self {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
#[cfg(not(target_arch = "wasm32"))]
task::spawn(async move {
Self::heartbeat_task(
websocket_tx,
heartbeat_interval,
starting_nonce,
receive,
kill_receive,
)
.await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
Self::heartbeat_task(
websocket_tx,
heartbeat_interval,
starting_nonce,
receive,
kill_receive,
)
.await;
});
Self {
heartbeat_interval,
send,
}
}
pub async fn heartbeat_task(
websocket_tx: Arc<Mutex<Sink>>,
heartbeat_interval: Duration,
starting_nonce: u64,
mut receive: Receiver<VoiceHeartbeatThreadCommunication>,
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
) {
let mut last_heartbeat_timestamp: Instant = Instant::now();
let mut last_heartbeat_acknowledged = true;
let mut nonce: u64 = starting_nonce;
loop {
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
};
let mut should_send = false;
tokio::select! {
() = sleep_until(last_heartbeat_timestamp + timeout) => {
should_send = true;
}
Some(communication) = receive.recv() => {
if communication.updated_nonce.is_some() {
nonce = communication.updated_nonce.unwrap();
}
if let Some(op_code) = communication.op_code {
match op_code {
VOICE_HEARTBEAT => {
should_send = true;
}
VOICE_HEARTBEAT_ACK => {
last_heartbeat_acknowledged = true;
}
_ => {}
}
}
}
Ok(_) = kill_receive.recv() => {
log::trace!("VGW: Closing heartbeat task");
break;
}
}
if should_send {
trace!("VGW: Sending Heartbeat..");
let heartbeat = VoiceGatewaySendPayload {
op_code: VOICE_HEARTBEAT,
data: nonce.into(),
};
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
let msg = VoiceGatewayMessage(heartbeat_json);
let send_result = websocket_tx.lock().await.send(msg.into()).await;
if send_result.is_err() {
warn!("VGW: Couldnt send heartbeat, websocket seems broken");
break;
}
last_heartbeat_timestamp = Instant::now();
last_heartbeat_acknowledged = false;
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub(super) struct VoiceHeartbeatThreadCommunication {
pub(super) op_code: Option<u8>,
pub(super) updated_nonce: Option<u64>,
}