use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use buswatch_types::{Microseconds, ModuleMetrics, ReadMetrics, Snapshot, WriteMetrics};
use parking_lot::RwLock;
#[derive(Debug, Default)]
pub struct ReadState {
pub count: AtomicU64,
pub pending_since: RwLock<Option<Instant>>,
}
#[derive(Debug, Default)]
pub struct WriteState {
pub count: AtomicU64,
pub pending_since: RwLock<Option<Instant>>,
}
#[derive(Debug, Default)]
pub struct ModuleState {
pub reads: RwLock<BTreeMap<String, Arc<ReadState>>>,
pub writes: RwLock<BTreeMap<String, Arc<WriteState>>>,
}
impl ModuleState {
pub fn get_or_create_read(&self, topic: &str) -> Arc<ReadState> {
{
let reads = self.reads.read();
if let Some(state) = reads.get(topic) {
return state.clone();
}
}
let mut reads = self.reads.write();
reads
.entry(topic.to_string())
.or_insert_with(|| Arc::new(ReadState::default()))
.clone()
}
pub fn get_or_create_write(&self, topic: &str) -> Arc<WriteState> {
{
let writes = self.writes.read();
if let Some(state) = writes.get(topic) {
return state.clone();
}
}
let mut writes = self.writes.write();
writes
.entry(topic.to_string())
.or_insert_with(|| Arc::new(WriteState::default()))
.clone()
}
pub fn collect(&self) -> ModuleMetrics {
let now = Instant::now();
let reads = self
.reads
.read()
.iter()
.map(|(topic, state)| {
let count = state.count.load(Ordering::Relaxed);
let pending = state.pending_since.read().map(|since| {
let duration = now.duration_since(since);
Microseconds::from(duration)
});
(
topic.clone(),
ReadMetrics {
count,
backlog: None, pending,
rate: None, },
)
})
.collect();
let writes = self
.writes
.read()
.iter()
.map(|(topic, state)| {
let count = state.count.load(Ordering::Relaxed);
let pending = state.pending_since.read().map(|since| {
let duration = now.duration_since(since);
Microseconds::from(duration)
});
(
topic.clone(),
WriteMetrics {
count,
pending,
rate: None,
},
)
})
.collect();
ModuleMetrics { reads, writes }
}
}
#[derive(Debug, Default)]
pub struct GlobalState {
pub modules: RwLock<BTreeMap<String, Arc<ModuleState>>>,
pub topic_write_counts: RwLock<BTreeMap<String, Arc<AtomicU64>>>,
}
impl GlobalState {
pub fn register_module(&self, name: &str) -> Arc<ModuleState> {
{
let modules = self.modules.read();
if let Some(state) = modules.get(name) {
return state.clone();
}
}
let mut modules = self.modules.write();
modules
.entry(name.to_string())
.or_insert_with(|| Arc::new(ModuleState::default()))
.clone()
}
pub fn get_topic_write_counter(&self, topic: &str) -> Arc<AtomicU64> {
{
let counts = self.topic_write_counts.read();
if let Some(counter) = counts.get(topic) {
return counter.clone();
}
}
let mut counts = self.topic_write_counts.write();
counts
.entry(topic.to_string())
.or_insert_with(|| Arc::new(AtomicU64::new(0)))
.clone()
}
pub fn collect(&self) -> Snapshot {
let modules = self.modules.read();
let topic_writes = self.topic_write_counts.read();
let mut snapshot = Snapshot::builder();
for (name, state) in modules.iter() {
let mut metrics = state.collect();
for (topic, read_metrics) in metrics.reads.iter_mut() {
if let Some(total_writes) = topic_writes.get(topic) {
let total = total_writes.load(Ordering::Relaxed);
if total > read_metrics.count {
read_metrics.backlog = Some(total - read_metrics.count);
}
}
}
snapshot = snapshot.module_metrics(name.clone(), metrics);
}
snapshot.build()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_module_state_read_write() {
let state = ModuleState::default();
let read = state.get_or_create_read("topic1");
read.count.fetch_add(10, Ordering::Relaxed);
let write = state.get_or_create_write("topic2");
write.count.fetch_add(5, Ordering::Relaxed);
let metrics = state.collect();
assert_eq!(metrics.reads.get("topic1").unwrap().count, 10);
assert_eq!(metrics.writes.get("topic2").unwrap().count, 5);
}
#[test]
fn test_global_state_collect() {
let global = GlobalState::default();
let module1 = global.register_module("service-a");
let module2 = global.register_module("service-b");
module1
.get_or_create_read("events")
.count
.fetch_add(100, Ordering::Relaxed);
module2
.get_or_create_write("events")
.count
.fetch_add(100, Ordering::Relaxed);
global
.get_topic_write_counter("events")
.fetch_add(100, Ordering::Relaxed);
let snapshot = global.collect();
assert_eq!(snapshot.modules.len(), 2);
}
}