1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9#[derive(Debug, Clone)]
11pub struct MemoryMonitor {
12 pub active_timers: Arc<AtomicU64>,
14 pub active_tasks: Arc<AtomicU64>,
16 pub channel_depth: Arc<AtomicU64>,
18 pub messages_processed: Arc<AtomicU64>,
20 pub peak_memory_bytes: Arc<AtomicU64>,
22}
23
24impl Default for MemoryMonitor {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl MemoryMonitor {
31 pub fn new() -> Self {
33 Self {
34 active_timers: Arc::new(AtomicU64::new(0)),
35 active_tasks: Arc::new(AtomicU64::new(0)),
36 channel_depth: Arc::new(AtomicU64::new(0)),
37 messages_processed: Arc::new(AtomicU64::new(0)),
38 peak_memory_bytes: Arc::new(AtomicU64::new(0)),
39 }
40 }
41
42 pub fn timer_added(&self) {
44 self.active_timers.fetch_add(1, Ordering::Relaxed);
45 }
46
47 pub fn timer_removed(&self) {
49 self.active_timers.fetch_sub(1, Ordering::Relaxed);
50 }
51
52 pub fn get_active_timers(&self) -> u64 {
54 self.active_timers.load(Ordering::Relaxed)
55 }
56
57 pub fn task_spawned(&self) {
59 self.active_tasks.fetch_add(1, Ordering::Relaxed);
60 }
61
62 pub fn task_completed(&self) {
64 self.active_tasks.fetch_sub(1, Ordering::Relaxed);
65 }
66
67 pub fn get_active_tasks(&self) -> u64 {
69 self.active_tasks.load(Ordering::Relaxed)
70 }
71
72 pub fn set_channel_depth(&self, depth: u64) {
74 self.channel_depth.store(depth, Ordering::Relaxed);
75 }
76
77 pub fn get_channel_depth(&self) -> u64 {
79 self.channel_depth.load(Ordering::Relaxed)
80 }
81
82 pub fn message_processed(&self) {
84 self.messages_processed.fetch_add(1, Ordering::Relaxed);
85 }
86
87 pub fn get_messages_processed(&self) -> u64 {
89 self.messages_processed.load(Ordering::Relaxed)
90 }
91
92 pub fn update_peak_memory(&self, bytes: u64) {
94 loop {
95 let current = self.peak_memory_bytes.load(Ordering::Relaxed);
96 if bytes <= current {
97 break;
98 }
99 if self
100 .peak_memory_bytes
101 .compare_exchange_weak(current, bytes, Ordering::Relaxed, Ordering::Relaxed)
102 .is_ok()
103 {
104 break;
105 }
106 }
107 }
108
109 pub fn get_peak_memory_bytes(&self) -> u64 {
111 self.peak_memory_bytes.load(Ordering::Relaxed)
112 }
113
114 pub fn snapshot(&self) -> MemorySnapshot {
116 MemorySnapshot {
117 active_timers: self.get_active_timers(),
118 active_tasks: self.get_active_tasks(),
119 channel_depth: self.get_channel_depth(),
120 messages_processed: self.get_messages_processed(),
121 peak_memory_bytes: self.get_peak_memory_bytes(),
122 }
123 }
124
125 pub fn reset(&self) {
127 self.active_timers.store(0, Ordering::Relaxed);
128 self.active_tasks.store(0, Ordering::Relaxed);
129 self.channel_depth.store(0, Ordering::Relaxed);
130 self.messages_processed.store(0, Ordering::Relaxed);
131 self.peak_memory_bytes.store(0, Ordering::Relaxed);
132 }
133
134 pub fn check_health(&self) -> MemoryHealth {
136 let snapshot = self.snapshot();
137 let mut issues = Vec::new();
138
139 if snapshot.active_timers > 100 {
141 issues.push(format!("High timer count: {}", snapshot.active_timers));
142 }
143
144 if snapshot.active_tasks > 50 {
146 issues.push(format!("High task count: {}", snapshot.active_tasks));
147 }
148
149 if snapshot.channel_depth > 1000 {
151 issues.push(format!("High channel depth: {}", snapshot.channel_depth));
152 }
153
154 MemoryHealth {
155 is_healthy: issues.is_empty(),
156 issues,
157 snapshot,
158 }
159 }
160}
161
162#[derive(Debug, Clone)]
164pub struct MemorySnapshot {
165 pub active_timers: u64,
167 pub active_tasks: u64,
169 pub channel_depth: u64,
171 pub messages_processed: u64,
173 pub peak_memory_bytes: u64,
175}
176
177#[derive(Debug, Clone)]
179pub struct MemoryHealth {
180 pub is_healthy: bool,
182 pub issues: Vec<String>,
184 pub snapshot: MemorySnapshot,
186}
187
188impl std::fmt::Display for MemorySnapshot {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 write!(
191 f,
192 "Memory Snapshot - Timers: {}, Tasks: {}, Channel: {}, Messages: {}, Peak Memory: {} bytes",
193 self.active_timers,
194 self.active_tasks,
195 self.channel_depth,
196 self.messages_processed,
197 self.peak_memory_bytes
198 )
199 }
200}
201
202impl std::fmt::Display for MemoryHealth {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 if self.is_healthy {
205 write!(f, "Memory Health: HEALTHY\n{}", self.snapshot)
206 } else {
207 write!(
208 f,
209 "Memory Health: ISSUES DETECTED\nIssues: {}\n{}",
210 self.issues.join(", "),
211 self.snapshot
212 )
213 }
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn test_memory_monitor_basic() {
223 let monitor = MemoryMonitor::new();
224
225 assert_eq!(monitor.get_active_timers(), 0);
226 assert_eq!(monitor.get_active_tasks(), 0);
227
228 monitor.timer_added();
229 monitor.task_spawned();
230
231 assert_eq!(monitor.get_active_timers(), 1);
232 assert_eq!(monitor.get_active_tasks(), 1);
233
234 monitor.timer_removed();
235 monitor.task_completed();
236
237 assert_eq!(monitor.get_active_timers(), 0);
238 assert_eq!(monitor.get_active_tasks(), 0);
239 }
240
241 #[test]
242 fn test_memory_health_check() {
243 let monitor = MemoryMonitor::new();
244
245 let health = monitor.check_health();
247 assert!(health.is_healthy);
248
249 for _ in 0..150 {
251 monitor.timer_added();
252 }
253
254 let health = monitor.check_health();
255 assert!(!health.is_healthy);
256 assert!(!health.issues.is_empty());
257 }
258
259 #[test]
260 fn test_peak_memory_tracking() {
261 let monitor = MemoryMonitor::new();
262
263 monitor.update_peak_memory(1000);
264 assert_eq!(monitor.get_peak_memory_bytes(), 1000);
265
266 monitor.update_peak_memory(500); assert_eq!(monitor.get_peak_memory_bytes(), 1000);
268
269 monitor.update_peak_memory(2000); assert_eq!(monitor.get_peak_memory_bytes(), 2000);
271 }
272}