use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;
use crate::state::{GlobalState, ModuleState};
#[derive(Clone)]
pub struct ModuleHandle {
pub(crate) state: Arc<ModuleState>,
pub(crate) global: Arc<GlobalState>,
pub(crate) name: String,
}
impl ModuleHandle {
pub fn record_read(&self, topic: &str, count: u64) {
let read_state = self.state.get_or_create_read(topic);
read_state.count.fetch_add(count, Ordering::Relaxed);
}
pub fn record_write(&self, topic: &str, count: u64) {
let write_state = self.state.get_or_create_write(topic);
write_state.count.fetch_add(count, Ordering::Relaxed);
let global_counter = self.global.get_topic_write_counter(topic);
global_counter.fetch_add(count, Ordering::Relaxed);
}
pub fn start_read(&self, topic: &str) -> PendingGuard {
let read_state = self.state.get_or_create_read(topic);
*read_state.pending_since.write() = Some(Instant::now());
PendingGuard {
state: PendingState::Read(read_state),
}
}
pub fn start_write(&self, topic: &str) -> PendingGuard {
let write_state = self.state.get_or_create_write(topic);
*write_state.pending_since.write() = Some(Instant::now());
PendingGuard {
state: PendingState::Write(write_state),
}
}
pub fn set_read_pending(&self, topic: &str, since: Option<Instant>) {
let read_state = self.state.get_or_create_read(topic);
*read_state.pending_since.write() = since;
}
pub fn set_write_pending(&self, topic: &str, since: Option<Instant>) {
let write_state = self.state.get_or_create_write(topic);
*write_state.pending_since.write() = since;
}
pub fn name(&self) -> &str {
&self.name
}
}
impl std::fmt::Debug for ModuleHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ModuleHandle")
.field("name", &self.name)
.finish()
}
}
enum PendingState {
Read(Arc<crate::state::ReadState>),
Write(Arc<crate::state::WriteState>),
}
pub struct PendingGuard {
state: PendingState,
}
impl Drop for PendingGuard {
fn drop(&mut self) {
match &self.state {
PendingState::Read(s) => *s.pending_since.write() = None,
PendingState::Write(s) => *s.pending_since.write() = None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::GlobalState;
fn create_handle() -> ModuleHandle {
let global = Arc::new(GlobalState::default());
let state = global.register_module("test");
ModuleHandle {
state,
global,
name: "test".to_string(),
}
}
#[test]
fn test_record_read() {
let handle = create_handle();
handle.record_read("topic", 5);
handle.record_read("topic", 3);
let metrics = handle.state.collect();
assert_eq!(metrics.reads.get("topic").unwrap().count, 8);
}
#[test]
fn test_record_write() {
let handle = create_handle();
handle.record_write("topic", 10);
let metrics = handle.state.collect();
assert_eq!(metrics.writes.get("topic").unwrap().count, 10);
}
#[test]
fn test_pending_guard() {
let handle = create_handle();
{
let _guard = handle.start_read("topic");
let state = handle.state.get_or_create_read("topic");
assert!(state.pending_since.read().is_some());
}
let state = handle.state.get_or_create_read("topic");
assert!(state.pending_since.read().is_none());
}
}