memscope_rs/async_memory/
tracker.rs1use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11
12use crate::async_memory::buffer::{collect_all_events, AllocationEvent};
13use crate::async_memory::error::{AsyncError, AsyncResult};
14use crate::async_memory::profile::{AggregatedTaskStats, TaskMemoryProfile};
15use crate::async_memory::TaskId;
16
17pub use crate::async_memory::api::TrackedFuture;
19
20pub struct TaskMemoryTracker {
25 profiles: HashMap<TaskId, TaskMemoryProfile>,
27 aggregated_stats: AggregatedTaskStats,
29 is_active: Arc<AtomicBool>,
31 aggregator_handle: Option<thread::JoinHandle<()>>,
33}
34
35impl TaskMemoryTracker {
36 pub fn new() -> Self {
38 Self {
39 profiles: HashMap::new(),
40 aggregated_stats: AggregatedTaskStats::new(),
41 is_active: Arc::new(AtomicBool::new(false)),
42 aggregator_handle: None,
43 }
44 }
45
46 pub fn start(&mut self) -> AsyncResult<()> {
48 if self.is_active.load(Ordering::Relaxed) {
49 return Err(AsyncError::initialization(
50 "tracker",
51 "Task memory tracker already started",
52 false,
53 ));
54 }
55
56 self.is_active.store(true, Ordering::Relaxed);
57 let is_active = Arc::clone(&self.is_active);
58
59 let handle = thread::Builder::new()
60 .name("async-memory-aggregator".to_string())
61 .spawn(move || {
62 Self::aggregator_thread_main(is_active);
63 })
64 .map_err(|e| {
65 AsyncError::system(
66 "thread_spawn",
67 "Failed to start aggregator thread",
68 Some(&e.to_string()),
69 )
70 })?;
71
72 self.aggregator_handle = Some(handle);
73 tracing::info!("Task memory tracker started");
74 Ok(())
75 }
76
77 pub fn stop(&mut self) -> AsyncResult<()> {
79 if !self.is_active.load(Ordering::Relaxed) {
80 return Ok(()); }
82
83 self.is_active.store(false, Ordering::Relaxed);
84
85 if let Some(handle) = self.aggregator_handle.take() {
86 handle.join().map_err(|_| {
87 AsyncError::system("thread_join", "Failed to join aggregator thread", None)
88 })?;
89 }
90
91 tracing::info!("Task memory tracker stopped");
92 Ok(())
93 }
94
95 pub fn process_events(&mut self, events: Vec<AllocationEvent>) {
97 for event in events {
98 self.process_single_event(event);
99 }
100 }
101
102 fn process_single_event(&mut self, event: AllocationEvent) {
104 let profile = self
105 .profiles
106 .entry(event.task_id)
107 .or_insert_with(|| TaskMemoryProfile::new(event.task_id));
108
109 if event.is_allocation() {
110 profile.record_allocation(event.size as u64);
111 } else if event.is_deallocation() {
112 profile.record_deallocation(event.size as u64);
113 }
114
115 self.update_aggregated_stats();
117 }
118
119 fn update_aggregated_stats(&mut self) {
121 self.aggregated_stats = AggregatedTaskStats::new();
122 for profile in self.profiles.values() {
123 self.aggregated_stats.add_task(profile);
124 }
125 }
126
127 pub fn get_task_profile(&self, task_id: TaskId) -> Option<&TaskMemoryProfile> {
129 self.profiles.get(&task_id)
130 }
131
132 pub fn get_all_profiles(&self) -> &HashMap<TaskId, TaskMemoryProfile> {
134 &self.profiles
135 }
136
137 pub fn get_aggregated_stats(&self) -> &AggregatedTaskStats {
139 &self.aggregated_stats
140 }
141
142 pub fn mark_task_completed(&mut self, task_id: TaskId) {
144 if let Some(profile) = self.profiles.get_mut(&task_id) {
145 profile.mark_completed();
146 self.update_aggregated_stats();
147 }
148 }
149
150 pub fn cleanup_completed_tasks(&mut self, max_completed: usize) {
152 let mut completed_tasks: Vec<_> = self
153 .profiles
154 .iter()
155 .filter(|(_, profile)| profile.is_completed())
156 .map(|(&id, profile)| (id, profile.completed_at.unwrap_or(profile.created_at)))
157 .collect();
158
159 if completed_tasks.len() <= max_completed {
160 return; }
162
163 completed_tasks.sort_by_key(|(_, completion_time)| *completion_time);
165
166 let to_remove = completed_tasks.len() - max_completed;
168 for &(task_id, _) in completed_tasks.iter().take(to_remove) {
169 self.profiles.remove(&task_id);
170 }
171
172 self.update_aggregated_stats();
173 tracing::debug!("Cleaned up {} completed tasks", to_remove);
174 }
175
176 fn aggregator_thread_main(is_active: Arc<AtomicBool>) {
178 tracing::info!("Task memory aggregator thread started");
179
180 while is_active.load(Ordering::Relaxed) {
181 let events = collect_all_events();
183
184 if !events.is_empty() {
185 tracing::debug!("Collected {} allocation events", events.len());
186 }
189
190 thread::sleep(Duration::from_millis(10));
192 }
193
194 tracing::info!("Task memory aggregator thread stopped");
195 }
196}
197
198impl Default for TaskMemoryTracker {
199 fn default() -> Self {
200 Self::new()
201 }
202}
203
204impl Drop for TaskMemoryTracker {
205 fn drop(&mut self) {
206 let _ = self.stop();
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::async_memory::buffer::AllocationEvent;
215
216 #[test]
217 fn test_tracker_creation() {
218 let tracker = TaskMemoryTracker::new();
219 assert_eq!(tracker.profiles.len(), 0);
220 assert_eq!(tracker.aggregated_stats.total_tasks, 0);
221 }
222
223 #[test]
224 fn test_event_processing() {
225 let mut tracker = TaskMemoryTracker::new();
226
227 let events = vec![
228 AllocationEvent::allocation(1, 0x1000, 1024, 100),
229 AllocationEvent::allocation(1, 0x2000, 2048, 200),
230 AllocationEvent::deallocation(1, 0x1000, 1024, 300),
231 AllocationEvent::allocation(2, 0x3000, 512, 400),
232 ];
233
234 tracker.process_events(events);
235
236 let profile1 = tracker
238 .get_task_profile(1)
239 .expect("Task 1 profile not found");
240 assert_eq!(profile1.total_allocated, 3072); assert_eq!(profile1.current_usage, 2048); assert_eq!(profile1.allocation_count, 2);
243 assert_eq!(profile1.deallocation_count, 1);
244
245 let profile2 = tracker
247 .get_task_profile(2)
248 .expect("Task 2 profile not found");
249 assert_eq!(profile2.total_allocated, 512);
250 assert_eq!(profile2.current_usage, 512);
251 assert_eq!(profile2.allocation_count, 1);
252 assert_eq!(profile2.deallocation_count, 0);
253
254 let stats = tracker.get_aggregated_stats();
256 assert_eq!(stats.total_tasks, 2);
257 assert_eq!(stats.total_memory_allocated, 3584); assert_eq!(stats.current_memory_usage, 2560); }
260
261 #[test]
262 fn test_task_completion() {
263 let mut tracker = TaskMemoryTracker::new();
264
265 let events = vec![AllocationEvent::allocation(1, 0x1000, 1024, 100)];
266 tracker.process_events(events);
267
268 let profile = tracker.get_task_profile(1).unwrap();
270 assert!(!profile.is_completed());
271 assert_eq!(tracker.get_aggregated_stats().completed_tasks, 0);
272
273 tracker.mark_task_completed(1);
275 let profile = tracker.get_task_profile(1).unwrap();
276 assert!(profile.is_completed());
277 assert_eq!(tracker.get_aggregated_stats().completed_tasks, 1);
278 }
279
280 #[test]
281 fn test_cleanup_completed_tasks() {
282 let mut tracker = TaskMemoryTracker::new();
283
284 for i in 1..=10 {
286 let events = vec![AllocationEvent::allocation(
287 i as TaskId,
288 0x1000 + i as usize,
289 1024,
290 (100 + i) as u64,
291 )];
292 tracker.process_events(events);
293 tracker.mark_task_completed(i);
294 }
295
296 assert_eq!(tracker.profiles.len(), 10);
297 assert_eq!(tracker.get_aggregated_stats().completed_tasks, 10);
298
299 tracker.cleanup_completed_tasks(5);
301 assert_eq!(tracker.profiles.len(), 5);
302 assert_eq!(tracker.get_aggregated_stats().completed_tasks, 5);
303
304 for i in 6..=10 {
306 assert!(tracker.get_task_profile(i).is_some());
307 }
308 for i in 1..=5 {
309 assert!(tracker.get_task_profile(i).is_none());
310 }
311 }
312
313 #[test]
314 fn test_tracker_lifecycle() {
315 let mut tracker = TaskMemoryTracker::new();
316
317 tracker.start().expect("Failed to start tracker");
319 assert!(tracker.is_active.load(Ordering::Relaxed));
320 assert!(tracker.aggregator_handle.is_some());
321
322 let result = tracker.start();
324 assert!(result.is_err());
325
326 tracker.stop().expect("Failed to stop tracker");
328 assert!(!tracker.is_active.load(Ordering::Relaxed));
329 assert!(tracker.aggregator_handle.is_none());
330
331 tracker.stop().expect("Failed to stop tracker second time");
333 }
334
335 #[test]
336 fn test_multiple_tasks_aggregation() {
337 let mut tracker = TaskMemoryTracker::new();
338
339 let mut events = Vec::new();
341
342 events.push(AllocationEvent::allocation(1, 0x1000, 10_000, 100));
344 events.push(AllocationEvent::allocation(1, 0x2000, 20_000, 200));
345
346 for i in 0..10 {
348 events.push(AllocationEvent::allocation(
349 2,
350 0x3000 + i,
351 100,
352 (300 + i) as u64,
353 ));
354 }
355
356 events.push(AllocationEvent::allocation(3, 0x4000, 5000, 400));
358 events.push(AllocationEvent::deallocation(3, 0x4000, 5000, 500));
359
360 tracker.process_events(events);
361
362 let profile1 = tracker.get_task_profile(1).unwrap();
364 assert_eq!(profile1.total_allocated, 30_000);
365 assert_eq!(profile1.allocation_count, 2);
366
367 let profile2 = tracker.get_task_profile(2).unwrap();
368 assert_eq!(profile2.total_allocated, 1000); assert_eq!(profile2.allocation_count, 10);
370
371 let profile3 = tracker.get_task_profile(3).unwrap();
372 assert_eq!(profile3.total_allocated, 5000);
373 assert_eq!(profile3.current_usage, 0); assert_eq!(profile3.memory_efficiency(), 1.0);
375
376 let stats = tracker.get_aggregated_stats();
378 assert_eq!(stats.total_tasks, 3);
379 assert_eq!(stats.total_memory_allocated, 36_000); }
381}