use mtop_client::{ServerID, SlabItems, Slabs, Stats};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[repr(transparent)]
pub struct Host(String);
impl Host {
pub fn from(id: &ServerID) -> Self {
Self(id.to_string())
}
}
impl fmt::Display for Host {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct StatsDelta {
pub previous: ServerStats,
pub current: ServerStats,
pub seconds: u64,
}
#[derive(Debug, Clone)]
pub struct ServerStats {
pub stats: Stats,
pub slabs: Slabs,
pub items: SlabItems,
}
#[derive(Debug)]
pub struct StatsQueue {
queues: Mutex<HashMap<Host, VecDeque<ServerStats>>>,
max_size: usize,
}
impl StatsQueue {
pub fn new(max_size: usize) -> Self {
Self {
max_size,
queues: Mutex::new(HashMap::new()),
}
}
pub async fn insert(&self, host: Host, stats: Stats, slabs: Slabs, items: SlabItems) {
let mut map = self.queues.lock().await;
let q = map.entry(host).or_default();
if let Some(prev) = q.back() {
if stats.uptime == prev.stats.uptime {
tracing::debug!(
message = "server uptime did not advance, dropping measurement",
old_uptime = prev.stats.uptime,
current_uptime = stats.uptime,
);
return;
} else if stats.uptime < prev.stats.uptime {
tracing::debug!(
message = "server uptime reset detected, clearing measurement queue",
old_uptime = prev.stats.uptime,
current_uptime = stats.uptime
);
q.clear();
} else if stats.server_time <= prev.stats.server_time {
tracing::debug!(
message = "server timestamp did not advance, dropping measurement",
old_timestamp = prev.stats.server_time,
current_timestamp = stats.server_time
);
return;
}
}
q.push_back(ServerStats { stats, slabs, items });
if q.len() > self.max_size {
q.pop_front();
}
}
pub async fn read_delta(&self, host: &Host) -> Option<StatsDelta> {
let map = self.queues.lock().await;
map.get(host).and_then(|q| match (q.front(), q.back()) {
(Some(previous), Some(current)) if q.len() >= 2 => {
let seconds = (current.stats.server_time - previous.stats.server_time) as u64;
Some(StatsDelta {
previous: previous.clone(),
current: current.clone(),
seconds,
})
}
_ => None,
})
}
pub async fn read_last(&self, host: &Host) -> Option<ServerStats> {
let map = self.queues.lock().await;
map.get(host).and_then(|q| q.back().cloned())
}
}
#[derive(Debug)]
pub struct BlockingStatsQueue {
queue: Arc<StatsQueue>,
handle: Handle,
}
impl BlockingStatsQueue {
pub fn new(queue: Arc<StatsQueue>, handle: Handle) -> Self {
Self { queue, handle }
}
pub fn insert(&self, host: Host, stats: Stats, slabs: Slabs, items: SlabItems) {
self.handle.block_on(self.queue.insert(host, stats, slabs, items))
}
pub fn read_delta(&self, host: &Host) -> Option<StatsDelta> {
self.handle.block_on(self.queue.read_delta(host))
}
pub fn read_last(&self, host: &Host) -> Option<ServerStats> {
self.handle.block_on(self.queue.read_last(host))
}
}
#[cfg(test)]
mod test {
use super::{Host, StatsQueue};
use mtop_client::{ServerID, SlabItems, Slabs, Stats};
fn fixture(uptime: u64, server_time: i64) -> (Host, Stats, Slabs, SlabItems) {
let host = Host::from(&ServerID::from(("localhost", 11211)));
let stats = Stats {
uptime,
server_time,
..Default::default()
};
let slabs = Slabs::default();
let items = SlabItems::default();
(host, stats, slabs, items)
}
#[tokio::test]
async fn test_stats_queue_insert_same_uptime() {
let queue = StatsQueue::new(10);
let (host, stats, slabs, items) = fixture(1, 1);
queue.insert(host, stats, slabs, items).await;
let (host, stats, slabs, items) = fixture(1, 2);
queue.insert(host.clone(), stats, slabs, items).await;
let last = queue.read_last(&host).await.unwrap();
assert_eq!(1, last.stats.uptime);
assert_eq!(1, last.stats.server_time);
}
#[tokio::test]
async fn test_stats_queue_insert_lower_uptime() {
let queue = StatsQueue::new(10);
let (host, stats, slabs, items) = fixture(10, 1);
queue.insert(host, stats, slabs, items).await;
let (host, stats, slabs, items) = fixture(1, 2);
queue.insert(host.clone(), stats, slabs, items).await;
let last = queue.read_last(&host).await.unwrap();
assert_eq!(1, last.stats.uptime);
assert_eq!(2, last.stats.server_time);
}
#[tokio::test]
async fn test_stats_queue_insert_same_server_time() {
let queue = StatsQueue::new(10);
let (host, stats, slabs, items) = fixture(1, 1);
queue.insert(host, stats, slabs, items).await;
let (host, stats, slabs, items) = fixture(2, 1);
queue.insert(host.clone(), stats, slabs, items).await;
let last = queue.read_last(&host).await.unwrap();
assert_eq!(1, last.stats.uptime);
assert_eq!(1, last.stats.server_time);
}
#[tokio::test]
async fn test_stats_queue_insert_lower_server_time() {
let queue = StatsQueue::new(10);
let (host, stats, slabs, items) = fixture(1, 10);
queue.insert(host, stats, slabs, items).await;
let (host, stats, slabs, items) = fixture(2, 1);
queue.insert(host.clone(), stats, slabs, items).await;
let last = queue.read_last(&host).await.unwrap();
assert_eq!(1, last.stats.uptime);
assert_eq!(10, last.stats.server_time);
}
#[tokio::test]
async fn test_stats_queue_insert_success() {
let queue = StatsQueue::new(10);
let (host, stats, slabs, items) = fixture(1, 1);
queue.insert(host, stats, slabs, items).await;
let (host, stats, slabs, items) = fixture(2, 2);
queue.insert(host.clone(), stats, slabs, items).await;
let last = queue.read_last(&host).await.unwrap();
assert_eq!(2, last.stats.uptime);
assert_eq!(2, last.stats.server_time);
}
}