use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
pub struct VirtualQueueMetrics {
inner: RwLock<HashMap<u64, Arc<DbCounters>>>,
}
struct DbCounters {
throttle_events: AtomicU64,
suspend_events: AtomicU64,
}
impl DbCounters {
fn new() -> Self {
Self {
throttle_events: AtomicU64::new(0),
suspend_events: AtomicU64::new(0),
}
}
}
impl VirtualQueueMetrics {
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}
pub fn record_throttle(&self, database_id: u64) {
self.counters(database_id)
.throttle_events
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_suspend(&self, database_id: u64) {
self.counters(database_id)
.suspend_events
.fetch_add(1, Ordering::Relaxed);
}
pub fn throttle_events(&self, database_id: u64) -> u64 {
let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
guard
.get(&database_id)
.map(|c| c.throttle_events.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn suspend_events(&self, database_id: u64) -> u64 {
let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
guard
.get(&database_id)
.map(|c| c.suspend_events.load(Ordering::Relaxed))
.unwrap_or(0)
}
fn counters(&self, database_id: u64) -> Arc<DbCounters> {
{
let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
if let Some(c) = guard.get(&database_id) {
return Arc::clone(c);
}
}
let mut guard = self.inner.write().unwrap_or_else(|p| p.into_inner());
let entry = guard
.entry(database_id)
.or_insert_with(|| Arc::new(DbCounters::new()));
Arc::clone(entry)
}
}
impl Default for VirtualQueueMetrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn record_then_read() {
let m = VirtualQueueMetrics::new();
m.record_throttle(7);
m.record_throttle(7);
m.record_suspend(7);
assert_eq!(m.throttle_events(7), 2);
assert_eq!(m.suspend_events(7), 1);
assert_eq!(m.throttle_events(99), 0);
}
#[test]
fn concurrent_writers_share_atomic_counter() {
let m = Arc::new(VirtualQueueMetrics::new());
let mut handles = Vec::new();
for _ in 0..8 {
let m = Arc::clone(&m);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
m.record_throttle(42);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(m.throttle_events(42), 8 * 1000);
}
#[test]
fn concurrent_first_observation_does_not_lose_counts() {
let m = Arc::new(VirtualQueueMetrics::new());
let mut handles = Vec::new();
for _ in 0..8 {
let m = Arc::clone(&m);
handles.push(thread::spawn(move || {
m.record_suspend(123);
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(m.suspend_events(123), 8);
}
}