1use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::time::Instant;
6
7use crate::state::{GlobalState, ModuleState};
8
9#[derive(Clone)]
34pub struct ModuleHandle {
35 pub(crate) state: Arc<ModuleState>,
36 pub(crate) global: Arc<GlobalState>,
37 pub(crate) name: String,
38}
39
40impl ModuleHandle {
41 pub fn record_read(&self, topic: &str, count: u64) {
48 let read_state = self.state.get_or_create_read(topic);
49 read_state.count.fetch_add(count, Ordering::Relaxed);
50 }
51
52 pub fn record_write(&self, topic: &str, count: u64) {
59 let write_state = self.state.get_or_create_write(topic);
60 write_state.count.fetch_add(count, Ordering::Relaxed);
61
62 let global_counter = self.global.get_topic_write_counter(topic);
64 global_counter.fetch_add(count, Ordering::Relaxed);
65 }
66
67 pub fn start_read(&self, topic: &str) -> PendingGuard {
84 let read_state = self.state.get_or_create_read(topic);
85 *read_state.pending_since.write() = Some(Instant::now());
86
87 PendingGuard {
88 state: PendingState::Read(read_state),
89 }
90 }
91
92 pub fn start_write(&self, topic: &str) -> PendingGuard {
97 let write_state = self.state.get_or_create_write(topic);
98 *write_state.pending_since.write() = Some(Instant::now());
99
100 PendingGuard {
101 state: PendingState::Write(write_state),
102 }
103 }
104
105 pub fn set_read_pending(&self, topic: &str, since: Option<Instant>) {
110 let read_state = self.state.get_or_create_read(topic);
111 *read_state.pending_since.write() = since;
112 }
113
114 pub fn set_write_pending(&self, topic: &str, since: Option<Instant>) {
116 let write_state = self.state.get_or_create_write(topic);
117 *write_state.pending_since.write() = since;
118 }
119
120 pub fn name(&self) -> &str {
122 &self.name
123 }
124}
125
126impl std::fmt::Debug for ModuleHandle {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("ModuleHandle")
129 .field("name", &self.name)
130 .finish()
131 }
132}
133
134enum PendingState {
136 Read(Arc<crate::state::ReadState>),
137 Write(Arc<crate::state::WriteState>),
138}
139
140pub struct PendingGuard {
144 state: PendingState,
145}
146
147impl Drop for PendingGuard {
148 fn drop(&mut self) {
149 match &self.state {
150 PendingState::Read(s) => *s.pending_since.write() = None,
151 PendingState::Write(s) => *s.pending_since.write() = None,
152 }
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::state::GlobalState;
160
161 fn create_handle() -> ModuleHandle {
162 let global = Arc::new(GlobalState::default());
163 let state = global.register_module("test");
164 ModuleHandle {
165 state,
166 global,
167 name: "test".to_string(),
168 }
169 }
170
171 #[test]
172 fn test_record_read() {
173 let handle = create_handle();
174 handle.record_read("topic", 5);
175 handle.record_read("topic", 3);
176
177 let metrics = handle.state.collect();
178 assert_eq!(metrics.reads.get("topic").unwrap().count, 8);
179 }
180
181 #[test]
182 fn test_record_write() {
183 let handle = create_handle();
184 handle.record_write("topic", 10);
185
186 let metrics = handle.state.collect();
187 assert_eq!(metrics.writes.get("topic").unwrap().count, 10);
188 }
189
190 #[test]
191 fn test_pending_guard() {
192 let handle = create_handle();
193
194 {
195 let _guard = handle.start_read("topic");
196 let state = handle.state.get_or_create_read("topic");
198 assert!(state.pending_since.read().is_some());
199 }
200
201 let state = handle.state.get_or_create_read("topic");
203 assert!(state.pending_since.read().is_none());
204 }
205}