memscope_rs/async_memory/
tracker.rs

1//! Task memory tracker for aggregating and analyzing allocation data
2//!
3//! Provides the core tracking infrastructure that collects allocation events
4//! from buffers and maintains task-level memory profiles.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11
12use crate::async_memory::buffer::{collect_all_events, AllocationEvent};
13use crate::async_memory::error::{AsyncError, AsyncResult};
14use crate::async_memory::profile::{AggregatedTaskStats, TaskMemoryProfile};
15use crate::async_memory::TaskId;
16
17/// Re-export TrackedFuture from api module
18pub use crate::async_memory::api::TrackedFuture;
19
20/// Central task memory tracker
21///
22/// Aggregates allocation events from thread-local buffers and maintains
23/// per-task memory profiles for analysis and reporting.
24pub struct TaskMemoryTracker {
25    /// Task profiles indexed by task ID
26    profiles: HashMap<TaskId, TaskMemoryProfile>,
27    /// Aggregated statistics across all tasks
28    aggregated_stats: AggregatedTaskStats,
29    /// Whether the tracker is currently active
30    is_active: Arc<AtomicBool>,
31    /// Background aggregation thread handle
32    aggregator_handle: Option<thread::JoinHandle<()>>,
33}
34
35impl TaskMemoryTracker {
36    /// Create new task memory tracker
37    pub fn new() -> Self {
38        Self {
39            profiles: HashMap::new(),
40            aggregated_stats: AggregatedTaskStats::new(),
41            is_active: Arc::new(AtomicBool::new(false)),
42            aggregator_handle: None,
43        }
44    }
45
46    /// Start background aggregation thread
47    pub fn start(&mut self) -> AsyncResult<()> {
48        if self.is_active.load(Ordering::Relaxed) {
49            return Err(AsyncError::initialization(
50                "tracker",
51                "Task memory tracker already started",
52                false,
53            ));
54        }
55
56        self.is_active.store(true, Ordering::Relaxed);
57        let is_active = Arc::clone(&self.is_active);
58
59        let handle = thread::Builder::new()
60            .name("async-memory-aggregator".to_string())
61            .spawn(move || {
62                Self::aggregator_thread_main(is_active);
63            })
64            .map_err(|e| {
65                AsyncError::system(
66                    "thread_spawn",
67                    "Failed to start aggregator thread",
68                    Some(&e.to_string()),
69                )
70            })?;
71
72        self.aggregator_handle = Some(handle);
73        tracing::info!("Task memory tracker started");
74        Ok(())
75    }
76
77    /// Stop background aggregation thread
78    pub fn stop(&mut self) -> AsyncResult<()> {
79        if !self.is_active.load(Ordering::Relaxed) {
80            return Ok(()); // Already stopped
81        }
82
83        self.is_active.store(false, Ordering::Relaxed);
84
85        if let Some(handle) = self.aggregator_handle.take() {
86            handle.join().map_err(|_| {
87                AsyncError::system("thread_join", "Failed to join aggregator thread", None)
88            })?;
89        }
90
91        tracing::info!("Task memory tracker stopped");
92        Ok(())
93    }
94
95    /// Process allocation events and update profiles
96    pub fn process_events(&mut self, events: Vec<AllocationEvent>) {
97        for event in events {
98            self.process_single_event(event);
99        }
100    }
101
102    /// Process a single allocation event
103    fn process_single_event(&mut self, event: AllocationEvent) {
104        let profile = self
105            .profiles
106            .entry(event.task_id)
107            .or_insert_with(|| TaskMemoryProfile::new(event.task_id));
108
109        if event.is_allocation() {
110            profile.record_allocation(event.size as u64);
111        } else if event.is_deallocation() {
112            profile.record_deallocation(event.size as u64);
113        }
114
115        // Update aggregated stats (simplified)
116        self.update_aggregated_stats();
117    }
118
119    /// Update aggregated statistics
120    fn update_aggregated_stats(&mut self) {
121        self.aggregated_stats = AggregatedTaskStats::new();
122        for profile in self.profiles.values() {
123            self.aggregated_stats.add_task(profile);
124        }
125    }
126
127    /// Get task profile by ID
128    pub fn get_task_profile(&self, task_id: TaskId) -> Option<&TaskMemoryProfile> {
129        self.profiles.get(&task_id)
130    }
131
132    /// Get all task profiles
133    pub fn get_all_profiles(&self) -> &HashMap<TaskId, TaskMemoryProfile> {
134        &self.profiles
135    }
136
137    /// Get aggregated statistics
138    pub fn get_aggregated_stats(&self) -> &AggregatedTaskStats {
139        &self.aggregated_stats
140    }
141
142    /// Mark task as completed
143    pub fn mark_task_completed(&mut self, task_id: TaskId) {
144        if let Some(profile) = self.profiles.get_mut(&task_id) {
145            profile.mark_completed();
146            self.update_aggregated_stats();
147        }
148    }
149
150    /// Clean up completed tasks (keep only recent ones)
151    pub fn cleanup_completed_tasks(&mut self, max_completed: usize) {
152        let mut completed_tasks: Vec<_> = self
153            .profiles
154            .iter()
155            .filter(|(_, profile)| profile.is_completed())
156            .map(|(&id, profile)| (id, profile.completed_at.unwrap_or(profile.created_at)))
157            .collect();
158
159        if completed_tasks.len() <= max_completed {
160            return; // No cleanup needed
161        }
162
163        // Sort by completion time (oldest first)
164        completed_tasks.sort_by_key(|(_, completion_time)| *completion_time);
165
166        // Remove oldest completed tasks
167        let to_remove = completed_tasks.len() - max_completed;
168        for &(task_id, _) in completed_tasks.iter().take(to_remove) {
169            self.profiles.remove(&task_id);
170        }
171
172        self.update_aggregated_stats();
173        tracing::debug!("Cleaned up {} completed tasks", to_remove);
174    }
175
176    /// Background aggregator thread main loop
177    fn aggregator_thread_main(is_active: Arc<AtomicBool>) {
178        tracing::info!("Task memory aggregator thread started");
179
180        while is_active.load(Ordering::Relaxed) {
181            // Collect events from all thread buffers
182            let events = collect_all_events();
183
184            if !events.is_empty() {
185                tracing::debug!("Collected {} allocation events", events.len());
186                // TODO: Process events with global tracker instance
187                // For now, just log the collection
188            }
189
190            // Sleep briefly to avoid busy waiting
191            thread::sleep(Duration::from_millis(10));
192        }
193
194        tracing::info!("Task memory aggregator thread stopped");
195    }
196}
197
198impl Default for TaskMemoryTracker {
199    fn default() -> Self {
200        Self::new()
201    }
202}
203
204impl Drop for TaskMemoryTracker {
205    fn drop(&mut self) {
206        // Ensure background thread is stopped
207        let _ = self.stop();
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::async_memory::buffer::AllocationEvent;
215
216    #[test]
217    fn test_tracker_creation() {
218        let tracker = TaskMemoryTracker::new();
219        assert_eq!(tracker.profiles.len(), 0);
220        assert_eq!(tracker.aggregated_stats.total_tasks, 0);
221    }
222
223    #[test]
224    fn test_event_processing() {
225        let mut tracker = TaskMemoryTracker::new();
226
227        let events = vec![
228            AllocationEvent::allocation(1, 0x1000, 1024, 100),
229            AllocationEvent::allocation(1, 0x2000, 2048, 200),
230            AllocationEvent::deallocation(1, 0x1000, 1024, 300),
231            AllocationEvent::allocation(2, 0x3000, 512, 400),
232        ];
233
234        tracker.process_events(events);
235
236        // Check task 1 profile
237        let profile1 = tracker
238            .get_task_profile(1)
239            .expect("Task 1 profile not found");
240        assert_eq!(profile1.total_allocated, 3072); // 1024 + 2048
241        assert_eq!(profile1.current_usage, 2048); // 3072 - 1024
242        assert_eq!(profile1.allocation_count, 2);
243        assert_eq!(profile1.deallocation_count, 1);
244
245        // Check task 2 profile
246        let profile2 = tracker
247            .get_task_profile(2)
248            .expect("Task 2 profile not found");
249        assert_eq!(profile2.total_allocated, 512);
250        assert_eq!(profile2.current_usage, 512);
251        assert_eq!(profile2.allocation_count, 1);
252        assert_eq!(profile2.deallocation_count, 0);
253
254        // Check aggregated stats
255        let stats = tracker.get_aggregated_stats();
256        assert_eq!(stats.total_tasks, 2);
257        assert_eq!(stats.total_memory_allocated, 3584); // 3072 + 512
258        assert_eq!(stats.current_memory_usage, 2560); // 2048 + 512
259    }
260
261    #[test]
262    fn test_task_completion() {
263        let mut tracker = TaskMemoryTracker::new();
264
265        let events = vec![AllocationEvent::allocation(1, 0x1000, 1024, 100)];
266        tracker.process_events(events);
267
268        // Initially not completed
269        let profile = tracker.get_task_profile(1).unwrap();
270        assert!(!profile.is_completed());
271        assert_eq!(tracker.get_aggregated_stats().completed_tasks, 0);
272
273        // Mark as completed
274        tracker.mark_task_completed(1);
275        let profile = tracker.get_task_profile(1).unwrap();
276        assert!(profile.is_completed());
277        assert_eq!(tracker.get_aggregated_stats().completed_tasks, 1);
278    }
279
280    #[test]
281    fn test_cleanup_completed_tasks() {
282        let mut tracker = TaskMemoryTracker::new();
283
284        // Create multiple completed tasks
285        for i in 1..=10 {
286            let events = vec![AllocationEvent::allocation(
287                i as TaskId,
288                0x1000 + i as usize,
289                1024,
290                (100 + i) as u64,
291            )];
292            tracker.process_events(events);
293            tracker.mark_task_completed(i);
294        }
295
296        assert_eq!(tracker.profiles.len(), 10);
297        assert_eq!(tracker.get_aggregated_stats().completed_tasks, 10);
298
299        // Cleanup keeping only 5 most recent
300        tracker.cleanup_completed_tasks(5);
301        assert_eq!(tracker.profiles.len(), 5);
302        assert_eq!(tracker.get_aggregated_stats().completed_tasks, 5);
303
304        // Verify we kept the most recent ones (6-10)
305        for i in 6..=10 {
306            assert!(tracker.get_task_profile(i).is_some());
307        }
308        for i in 1..=5 {
309            assert!(tracker.get_task_profile(i).is_none());
310        }
311    }
312
313    #[test]
314    fn test_tracker_lifecycle() {
315        let mut tracker = TaskMemoryTracker::new();
316
317        // Start tracker
318        tracker.start().expect("Failed to start tracker");
319        assert!(tracker.is_active.load(Ordering::Relaxed));
320        assert!(tracker.aggregator_handle.is_some());
321
322        // Should not be able to start again
323        let result = tracker.start();
324        assert!(result.is_err());
325
326        // Stop tracker
327        tracker.stop().expect("Failed to stop tracker");
328        assert!(!tracker.is_active.load(Ordering::Relaxed));
329        assert!(tracker.aggregator_handle.is_none());
330
331        // Should be able to stop again without error
332        tracker.stop().expect("Failed to stop tracker second time");
333    }
334
335    #[test]
336    fn test_multiple_tasks_aggregation() {
337        let mut tracker = TaskMemoryTracker::new();
338
339        // Create events for multiple tasks with different patterns
340        let mut events = Vec::new();
341
342        // Task 1: Large allocations
343        events.push(AllocationEvent::allocation(1, 0x1000, 10_000, 100));
344        events.push(AllocationEvent::allocation(1, 0x2000, 20_000, 200));
345
346        // Task 2: Small frequent allocations
347        for i in 0..10 {
348            events.push(AllocationEvent::allocation(
349                2,
350                0x3000 + i,
351                100,
352                (300 + i) as u64,
353            ));
354        }
355
356        // Task 3: Allocation and immediate deallocation
357        events.push(AllocationEvent::allocation(3, 0x4000, 5000, 400));
358        events.push(AllocationEvent::deallocation(3, 0x4000, 5000, 500));
359
360        tracker.process_events(events);
361
362        // Verify individual profiles
363        let profile1 = tracker.get_task_profile(1).unwrap();
364        assert_eq!(profile1.total_allocated, 30_000);
365        assert_eq!(profile1.allocation_count, 2);
366
367        let profile2 = tracker.get_task_profile(2).unwrap();
368        assert_eq!(profile2.total_allocated, 1000); // 10 * 100
369        assert_eq!(profile2.allocation_count, 10);
370
371        let profile3 = tracker.get_task_profile(3).unwrap();
372        assert_eq!(profile3.total_allocated, 5000);
373        assert_eq!(profile3.current_usage, 0); // Fully deallocated
374        assert_eq!(profile3.memory_efficiency(), 1.0);
375
376        // Verify aggregated stats
377        let stats = tracker.get_aggregated_stats();
378        assert_eq!(stats.total_tasks, 3);
379        assert_eq!(stats.total_memory_allocated, 36_000); // 30k + 1k + 5k
380    }
381}