use super::{super::json, session::SessionSendError};
use async_tungstenite::tungstenite::Message as TungsteniteMessage;
use futures_channel::mpsc::UnboundedSender;
use std::{
collections::VecDeque,
convert::TryInto,
sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};
use twilight_model::gateway::payload::Heartbeat;
#[derive(Clone, Debug)]
pub struct Latency {
average: Option<Duration>,
heartbeats: u32,
recent: VecDeque<Duration>,
received: Option<Instant>,
sent: Option<Instant>,
}
impl Latency {
pub fn average(&self) -> Option<Duration> {
self.average
}
pub fn heartbeats(&self) -> u32 {
self.heartbeats
}
pub fn recent(&self) -> &VecDeque<Duration> {
&self.recent
}
pub fn received(&self) -> Option<Instant> {
self.received
}
pub fn sent(&self) -> Option<Instant> {
self.sent
}
}
#[derive(Debug)]
pub struct Heartbeats {
received: Mutex<Option<Instant>>,
recent: Mutex<VecDeque<u64>>,
sent: Mutex<Option<Instant>>,
total_iterations: AtomicU32,
total_time: AtomicU64,
}
impl Heartbeats {
pub fn latency(&self) -> Latency {
let iterations = self.total_iterations();
let recent = self
.recent
.lock()
.expect("recent poisoned")
.iter()
.copied()
.map(Duration::from_millis)
.collect();
Latency {
average: self.total_time().checked_div(iterations),
heartbeats: iterations,
recent,
received: self.received(),
sent: self.sent(),
}
}
pub fn last_acked(&self) -> bool {
self.received().is_some()
}
pub fn receive(&self) {
self.set_received(Instant::now());
self.total_iterations.fetch_add(1, Ordering::SeqCst);
if let Some(dur) = self.sent().map(|s| s.elapsed()) {
let millis = if let Ok(millis) = dur.as_millis().try_into() {
millis
} else {
tracing::error!("duration millis is more than u64: {:?}", dur);
return;
};
self.total_time.fetch_add(millis, Ordering::SeqCst);
let mut recent = self.recent.lock().expect("recent poisoned");
if recent.len() == 5 {
recent.pop_front();
}
recent.push_back(millis);
}
}
pub fn send(&self) {
self.received.lock().expect("received poisoned").take();
self.sent
.lock()
.expect("sent poisoned")
.replace(Instant::now());
}
fn received(&self) -> Option<Instant> {
*self.received.lock().expect("received poisoned")
}
fn set_received(&self, received: Instant) {
self.received
.lock()
.expect("received poisoned")
.replace(received);
}
fn sent(&self) -> Option<Instant> {
*self.sent.lock().expect("sent poisoned")
}
fn total_iterations(&self) -> u32 {
self.total_iterations.load(Ordering::Relaxed)
}
fn total_time(&self) -> Duration {
Duration::from_millis(self.total_time.load(Ordering::Relaxed))
}
}
impl Default for Heartbeats {
fn default() -> Self {
Self {
received: Mutex::new(None),
recent: Mutex::new(VecDeque::with_capacity(5)),
sent: Mutex::new(None),
total_iterations: AtomicU32::new(0),
total_time: AtomicU64::new(0),
}
}
}
pub struct Heartbeater {
heartbeats: Arc<Heartbeats>,
interval: u64,
seq: Arc<AtomicU64>,
tx: UnboundedSender<TungsteniteMessage>,
}
impl Heartbeater {
pub fn new(
heartbeats: Arc<Heartbeats>,
interval: u64,
seq: Arc<AtomicU64>,
tx: UnboundedSender<TungsteniteMessage>,
) -> Self {
Self {
heartbeats,
interval,
seq,
tx,
}
}
pub async fn run(self) {
if let Err(why) = self.try_run().await {
tracing::warn!("Error sending heartbeat: {:?}", why);
}
}
async fn try_run(self) -> Result<(), SessionSendError> {
let duration = Duration::from_millis(self.interval);
let mut last = true;
loop {
tokio::time::delay_for(duration).await;
if self.heartbeats.last_acked() {
last = true;
} else if last {
last = false;
} else {
return Ok(());
}
let seq = self.seq.load(Ordering::Acquire);
let heartbeat = Heartbeat::new(seq);
let bytes = json::to_vec(&heartbeat)
.map_err(|source| SessionSendError::Serializing { source })?;
tracing::debug!(seq, "sending heartbeat");
self.tx
.unbounded_send(TungsteniteMessage::Binary(bytes))
.map_err(|source| SessionSendError::Sending { source })?;
tracing::debug!(seq, "sent heartbeat");
self.heartbeats.send();
}
}
}
#[cfg(test)]
mod tests {
use super::Latency;
use static_assertions::assert_impl_all;
use std::fmt::Debug;
assert_impl_all!(Latency: Clone, Debug, Send, Sync);
}