use std::{
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use super::{
client::{Sv2ClientInfo, Sv2ClientsMonitoring, Sv2ClientsSummary},
server::{ServerInfo, ServerMonitoring, ServerSummary},
sv1::{Sv1ClientInfo, Sv1ClientsMonitoring, Sv1ClientsSummary},
};
#[derive(Debug, Clone, Default)]
pub struct MonitoringSnapshot {
pub timestamp: Option<Instant>,
pub server_info: Option<ServerInfo>,
pub server_summary: Option<ServerSummary>,
pub sv2_clients: Option<Vec<Sv2ClientInfo>>,
pub sv2_clients_summary: Option<Sv2ClientsSummary>,
pub sv1_clients: Option<Vec<Sv1ClientInfo>>,
pub sv1_clients_summary: Option<Sv1ClientsSummary>,
}
pub struct SnapshotCache {
snapshot: RwLock<MonitoringSnapshot>,
refresh_interval: Duration,
server_source: Option<Arc<dyn ServerMonitoring + Send + Sync>>,
sv2_clients_source: Option<Arc<dyn Sv2ClientsMonitoring + Send + Sync>>,
sv1_clients_source: Option<Arc<dyn Sv1ClientsMonitoring + Send + Sync>>,
}
impl Clone for SnapshotCache {
fn clone(&self) -> Self {
let current_snapshot = self.snapshot.read().unwrap().clone();
Self {
snapshot: RwLock::new(current_snapshot),
refresh_interval: self.refresh_interval,
server_source: self.server_source.clone(),
sv2_clients_source: self.sv2_clients_source.clone(),
sv1_clients_source: self.sv1_clients_source.clone(),
}
}
}
impl SnapshotCache {
pub fn new(
refresh_interval: Duration,
server_source: Option<Arc<dyn ServerMonitoring + Send + Sync>>,
sv2_clients_source: Option<Arc<dyn Sv2ClientsMonitoring + Send + Sync>>,
) -> Self {
Self {
snapshot: RwLock::new(MonitoringSnapshot::default()),
refresh_interval,
server_source,
sv2_clients_source,
sv1_clients_source: None,
}
}
pub fn with_sv1_clients_source(
mut self,
sv1_source: Arc<dyn Sv1ClientsMonitoring + Send + Sync>,
) -> Self {
self.sv1_clients_source = Some(sv1_source);
self
}
pub fn get_snapshot(&self) -> MonitoringSnapshot {
self.snapshot.read().unwrap().clone()
}
pub fn refresh(&self) {
let mut new_snapshot = MonitoringSnapshot {
timestamp: Some(Instant::now()),
..Default::default()
};
if let Some(ref source) = self.server_source {
new_snapshot.server_info = Some(source.get_server());
new_snapshot.server_summary = Some(source.get_server_summary());
}
if let Some(ref source) = self.sv2_clients_source {
new_snapshot.sv2_clients = Some(source.get_sv2_clients());
new_snapshot.sv2_clients_summary = Some(source.get_sv2_clients_summary());
}
if let Some(ref source) = self.sv1_clients_source {
new_snapshot.sv1_clients = Some(source.get_sv1_clients());
new_snapshot.sv1_clients_summary = Some(source.get_sv1_clients_summary());
}
*self.snapshot.write().unwrap() = new_snapshot;
}
pub fn refresh_interval(&self) -> Duration {
self.refresh_interval
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MockServerMonitoring;
impl ServerMonitoring for MockServerMonitoring {
fn get_server(&self) -> ServerInfo {
ServerInfo {
extended_channels: vec![],
standard_channels: vec![],
}
}
}
struct MockSv2ClientsMonitoring;
impl Sv2ClientsMonitoring for MockSv2ClientsMonitoring {
fn get_sv2_clients(&self) -> Vec<Sv2ClientInfo> {
vec![]
}
}
#[test]
fn test_snapshot_cache_creation() {
let cache = SnapshotCache::new(
Duration::from_secs(5),
Some(Arc::new(MockServerMonitoring)),
Some(Arc::new(MockSv2ClientsMonitoring)),
);
let snapshot = cache.get_snapshot();
assert!(snapshot.timestamp.is_none());
assert_eq!(cache.refresh_interval(), Duration::from_secs(5));
}
#[test]
fn test_snapshot_refresh() {
let cache = SnapshotCache::new(
Duration::from_secs(5),
Some(Arc::new(MockServerMonitoring)),
Some(Arc::new(MockSv2ClientsMonitoring)),
);
let snapshot = cache.get_snapshot();
assert!(snapshot.timestamp.is_none());
assert!(snapshot.server_info.is_none());
cache.refresh();
let snapshot = cache.get_snapshot();
assert!(snapshot.timestamp.is_some());
assert!(snapshot.server_info.is_some());
assert!(snapshot.sv2_clients.is_some());
assert!(snapshot.sv2_clients_summary.is_some());
}
struct ContendedMonitoring {
lock_hold_duration: Duration,
monitoring_lock_acquisitions: std::sync::atomic::AtomicU64,
business_lock: std::sync::Mutex<()>,
}
impl ContendedMonitoring {
fn new(lock_hold_duration: Duration) -> Self {
Self {
lock_hold_duration,
monitoring_lock_acquisitions: std::sync::atomic::AtomicU64::new(0),
business_lock: std::sync::Mutex::new(()),
}
}
fn simulate_business_logic(&self) {
let _guard = self.business_lock.lock().unwrap();
std::thread::sleep(self.lock_hold_duration);
}
fn get_monitoring_acquisitions(&self) -> u64 {
self.monitoring_lock_acquisitions
.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Sv2ClientsMonitoring for ContendedMonitoring {
fn get_sv2_clients(&self) -> Vec<Sv2ClientInfo> {
let _guard = self.business_lock.lock().unwrap();
self.monitoring_lock_acquisitions
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_micros(10));
vec![]
}
}
impl ServerMonitoring for ContendedMonitoring {
fn get_server(&self) -> ServerInfo {
let _guard = self.business_lock.lock().unwrap();
self.monitoring_lock_acquisitions
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_micros(10));
ServerInfo {
extended_channels: vec![],
standard_channels: vec![],
}
}
}
#[test]
fn test_snapshot_cache_eliminates_lock_contention() {
let real_monitoring = Arc::new(ContendedMonitoring::new(Duration::from_millis(1)));
let cache = Arc::new(SnapshotCache::new(
Duration::from_secs(5),
None,
Some(real_monitoring.clone() as Arc<dyn Sv2ClientsMonitoring + Send + Sync>),
));
cache.refresh();
let business_mon = Arc::clone(&real_monitoring);
let business_handle = std::thread::spawn(move || {
let start = std::time::Instant::now();
let mut ops = 0u64;
while start.elapsed() < Duration::from_millis(100) {
business_mon.simulate_business_logic();
ops += 1;
}
ops
});
let mut monitoring_handles = vec![];
for _ in 0..16 {
let cache_ref = Arc::clone(&cache);
monitoring_handles.push(std::thread::spawn(move || {
let start = std::time::Instant::now();
let mut requests = 0u64;
while start.elapsed() < Duration::from_millis(100) {
let _ = cache_ref.get_snapshot();
requests += 1;
}
requests
}));
}
let _business_ops = business_handle.join().unwrap();
let total_cache_requests: u64 = monitoring_handles
.into_iter()
.map(|h| h.join().unwrap())
.sum();
let real_lock_acquisitions = real_monitoring.get_monitoring_acquisitions();
assert!(
real_lock_acquisitions <= 2,
"Cache acquired lock {} times, expected ≤2 (refresh only)",
real_lock_acquisitions
);
assert!(
total_cache_requests > 2,
"Cache should have processed requests",
);
}
}