Skip to main content

memscope_rs/capture/backends/
task_profile.rs

1//! Task-level memory profiling for the unified tracker
2//!
3//! This module provides task-aware memory tracking capabilities,
4//! allowing users to track memory usage patterns at the granularity
5//! of individual tasks or workloads.
6
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13/// Task type classification for categorizing different workloads
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
15pub enum TaskType {
16    /// CPU-intensive tasks (e.g., matrix multiplication, data processing)
17    CpuIntensive,
18    /// IO-intensive tasks (e.g., file operations, database queries)
19    IoIntensive,
20    /// Network-intensive tasks (e.g., HTTP requests, RPC calls)
21    NetworkIntensive,
22    /// Memory-intensive tasks (e.g., large data structures, caching)
23    MemoryIntensive,
24    /// GPU compute tasks (e.g., CUDA, OpenCL operations)
25    GpuCompute,
26    /// Mixed workload with balanced resource usage
27    #[default]
28    Mixed,
29    /// Streaming data processing tasks
30    Streaming,
31    /// Background maintenance tasks
32    Background,
33}
34
35impl TaskType {
36    /// Get human-readable description of task type
37    pub fn description(&self) -> &'static str {
38        match self {
39            Self::CpuIntensive => "CPU-intensive workload",
40            Self::IoIntensive => "IO-intensive workload",
41            Self::NetworkIntensive => "Network-intensive workload",
42            Self::MemoryIntensive => "Memory-intensive workload",
43            Self::GpuCompute => "GPU compute workload",
44            Self::Mixed => "Mixed workload",
45            Self::Streaming => "Streaming workload",
46            Self::Background => "Background task",
47        }
48    }
49
50    /// Get resource priority for this task type
51    pub fn resource_priority(&self) -> (f64, f64, f64, f64) {
52        match self {
53            Self::CpuIntensive => (1.0, 0.3, 0.2, 0.1),
54            Self::IoIntensive => (0.3, 1.0, 0.2, 0.1),
55            Self::NetworkIntensive => (0.3, 0.2, 1.0, 0.1),
56            Self::MemoryIntensive => (0.2, 0.3, 0.2, 1.0),
57            Self::GpuCompute => (0.5, 0.2, 0.1, 0.8),
58            Self::Mixed => (0.5, 0.5, 0.5, 0.5),
59            Self::Streaming => (0.4, 0.4, 0.6, 0.4),
60            Self::Background => (0.2, 0.2, 0.2, 0.2),
61        }
62    }
63}
64
65/// Memory usage profile for a single task
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TaskMemoryProfile {
68    /// Unique task identifier
69    pub task_id: u64,
70    /// Task name for identification
71    pub task_name: String,
72    /// Task type classification
73    pub task_type: TaskType,
74    /// Task creation timestamp (milliseconds since Unix epoch)
75    pub created_at_ms: u64,
76    /// Task completion timestamp (milliseconds since Unix epoch, if completed)
77    pub completed_at_ms: Option<u64>,
78    /// Total bytes allocated by this task
79    pub total_bytes: u64,
80    /// Current memory usage (allocated - deallocated)
81    pub current_memory: u64,
82    /// Peak memory usage observed
83    pub peak_memory: u64,
84    /// Number of allocation operations
85    pub total_allocations: u64,
86    /// Number of deallocation operations
87    pub total_deallocations: u64,
88    /// Task duration in nanoseconds
89    pub duration_ns: u64,
90    /// Memory allocation rate (bytes/second)
91    pub allocation_rate: f64,
92    /// Memory efficiency score (0.0 to 1.0)
93    pub efficiency_score: f64,
94    /// Average allocation size
95    pub average_allocation_size: f64,
96}
97
98impl TaskMemoryProfile {
99    /// Create new task profile
100    pub fn new(task_id: u64, task_name: String, task_type: TaskType) -> Self {
101        Self {
102            task_id,
103            task_name,
104            task_type,
105            created_at_ms: Self::now_ms(),
106            completed_at_ms: None,
107            total_bytes: 0,
108            current_memory: 0,
109            peak_memory: 0,
110            total_allocations: 0,
111            total_deallocations: 0,
112            duration_ns: 0,
113            allocation_rate: 0.0,
114            efficiency_score: 1.0,
115            average_allocation_size: 0.0,
116        }
117    }
118
119    /// Get current timestamp in milliseconds
120    fn now_ms() -> u64 {
121        std::time::SystemTime::now()
122            .duration_since(std::time::UNIX_EPOCH)
123            .unwrap_or_default()
124            .as_millis() as u64
125    }
126
127    /// Mark task as completed
128    pub fn mark_completed(&mut self) {
129        self.completed_at_ms = Some(Self::now_ms());
130        self.duration_ns = self.lifetime().as_nanos() as u64;
131        self.update_metrics();
132    }
133
134    /// Record allocation event
135    pub fn record_allocation(&mut self, size: u64) {
136        self.total_bytes += size;
137        self.current_memory += size;
138        self.total_allocations += 1;
139
140        if self.current_memory > self.peak_memory {
141            self.peak_memory = self.current_memory;
142        }
143
144        self.update_metrics();
145    }
146
147    /// Record deallocation event
148    pub fn record_deallocation(&mut self, size: u64) {
149        self.current_memory = self.current_memory.saturating_sub(size);
150        self.total_deallocations += 1;
151        self.update_metrics();
152    }
153
154    /// Check if task is completed
155    pub fn is_completed(&self) -> bool {
156        self.completed_at_ms.is_some()
157    }
158
159    /// Get task lifetime duration
160    pub fn lifetime(&self) -> Duration {
161        let end_ms = self.completed_at_ms.unwrap_or_else(Self::now_ms);
162        let start_ms = self.created_at_ms;
163        Duration::from_millis(end_ms.saturating_sub(start_ms))
164    }
165
166    /// Calculate memory efficiency (deallocated / allocated)
167    pub fn memory_efficiency(&self) -> f64 {
168        if self.total_bytes == 0 {
169            1.0
170        } else {
171            let deallocated = self.total_bytes - self.current_memory;
172            deallocated as f64 / self.total_bytes as f64
173        }
174    }
175
176    /// Check if task has potential memory leak
177    pub fn has_potential_leak(&self) -> bool {
178        self.is_completed() && self.current_memory > 0
179    }
180
181    /// Update derived metrics
182    fn update_metrics(&mut self) {
183        let lifetime_secs = self.lifetime().as_secs_f64();
184
185        self.allocation_rate = if lifetime_secs > 0.0 {
186            self.total_bytes as f64 / lifetime_secs
187        } else {
188            0.0
189        };
190
191        self.efficiency_score = self.memory_efficiency();
192
193        self.average_allocation_size = if self.total_allocations > 0 {
194            self.total_bytes as f64 / self.total_allocations as f64
195        } else {
196            0.0
197        };
198    }
199
200    /// Get memory usage summary
201    pub fn summary(&self) -> String {
202        format!(
203            "Task '{}' (ID: {}, Type: {:?}): {} allocations, {:.2} MB total, {:.2} MB peak, {:.1}% efficiency",
204            self.task_name,
205            self.task_id,
206            self.task_type,
207            self.total_allocations,
208            self.total_bytes as f64 / 1_048_576.0,
209            self.peak_memory as f64 / 1_048_576.0,
210            self.efficiency_score * 100.0
211        )
212    }
213}
214
215/// Manager for tracking multiple task profiles
216#[derive(Debug, Clone)]
217pub struct TaskProfileManager {
218    profiles: Arc<Mutex<HashMap<u64, TaskMemoryProfile>>>,
219    next_task_id: Arc<AtomicU64>,
220}
221
222impl TaskProfileManager {
223    /// Create new task profile manager
224    pub fn new() -> Self {
225        Self {
226            profiles: Arc::new(Mutex::new(HashMap::new())),
227            next_task_id: Arc::new(AtomicU64::new(1)),
228        }
229    }
230
231    /// Create a new task profile
232    pub fn create_task(&self, task_name: String, task_type: TaskType) -> u64 {
233        let task_id = self.next_task_id.fetch_add(1, Ordering::Relaxed);
234
235        let profile = TaskMemoryProfile::new(task_id, task_name, task_type);
236
237        if let Ok(mut profiles) = self.profiles.lock() {
238            profiles.insert(task_id, profile);
239        }
240
241        task_id
242    }
243
244    /// Record allocation for a task
245    pub fn record_allocation(&self, task_id: u64, size: u64) {
246        if let Ok(mut profiles) = self.profiles.lock() {
247            if let Some(profile) = profiles.get_mut(&task_id) {
248                profile.record_allocation(size);
249            }
250        }
251    }
252
253    /// Record deallocation for a task
254    pub fn record_deallocation(&self, task_id: u64, size: u64) {
255        if let Ok(mut profiles) = self.profiles.lock() {
256            if let Some(profile) = profiles.get_mut(&task_id) {
257                profile.record_deallocation(size);
258            }
259        }
260    }
261
262    /// Mark task as completed
263    pub fn complete_task(&self, task_id: u64) {
264        if let Ok(mut profiles) = self.profiles.lock() {
265            if let Some(profile) = profiles.get_mut(&task_id) {
266                profile.mark_completed();
267            }
268        }
269    }
270
271    /// Get profile for a specific task
272    pub fn get_profile(&self, task_id: u64) -> Option<TaskMemoryProfile> {
273        if let Ok(profiles) = self.profiles.lock() {
274            profiles.get(&task_id).cloned()
275        } else {
276            None
277        }
278    }
279
280    /// Get all task profiles
281    pub fn get_all_profiles(&self) -> Vec<TaskMemoryProfile> {
282        if let Ok(profiles) = self.profiles.lock() {
283            profiles.values().cloned().collect()
284        } else {
285            Vec::new()
286        }
287    }
288
289    /// Get profiles by task type
290    pub fn get_profiles_by_type(&self, task_type: TaskType) -> Vec<TaskMemoryProfile> {
291        if let Ok(profiles) = self.profiles.lock() {
292            profiles
293                .values()
294                .filter(|p| p.task_type == task_type)
295                .cloned()
296                .collect()
297        } else {
298            Vec::new()
299        }
300    }
301
302    /// Get aggregated statistics
303    pub fn get_aggregated_stats(&self) -> AggregatedTaskStats {
304        let profiles = self.get_all_profiles();
305
306        let total_tasks = profiles.len();
307        let completed_tasks = profiles.iter().filter(|p| p.is_completed()).count();
308
309        let total_memory_allocated: u64 = profiles.iter().map(|p| p.total_bytes).sum();
310        let current_memory_usage: u64 = profiles.iter().map(|p| p.current_memory).sum();
311        let peak_memory_usage: u64 = profiles.iter().map(|p| p.peak_memory).max().unwrap_or(0);
312
313        let total_duration: Duration = profiles.iter().map(|p| p.lifetime()).sum::<Duration>();
314        let average_lifetime = if total_tasks > 0 {
315            let total_secs = total_duration.as_secs_f64();
316            let avg_secs = total_secs / total_tasks as f64;
317            Duration::from_secs_f64(avg_secs)
318        } else {
319            Duration::ZERO
320        };
321
322        let overall_efficiency = if total_memory_allocated > 0 {
323            let total_deallocated = total_memory_allocated - current_memory_usage;
324            total_deallocated as f64 / total_memory_allocated as f64
325        } else {
326            1.0
327        };
328
329        let potential_leaks = profiles.iter().filter(|p| p.has_potential_leak()).count();
330
331        AggregatedTaskStats {
332            total_tasks,
333            completed_tasks,
334            total_memory_allocated,
335            current_memory_usage,
336            peak_memory_usage,
337            average_lifetime,
338            overall_efficiency,
339            potential_leaks,
340        }
341    }
342
343    /// Clear all profiles
344    pub fn clear(&self) {
345        if let Ok(mut profiles) = self.profiles.lock() {
346            profiles.clear();
347        }
348    }
349
350    /// Get number of active tasks
351    pub fn active_task_count(&self) -> usize {
352        if let Ok(profiles) = self.profiles.lock() {
353            profiles.iter().filter(|(_, p)| !p.is_completed()).count()
354        } else {
355            0
356        }
357    }
358}
359
360impl Default for TaskProfileManager {
361    fn default() -> Self {
362        Self::new()
363    }
364}
365
366/// Aggregated statistics across multiple tasks
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct AggregatedTaskStats {
369    /// Total number of tasks tracked
370    pub total_tasks: usize,
371    /// Number of completed tasks
372    pub completed_tasks: usize,
373    /// Total memory allocated across all tasks
374    pub total_memory_allocated: u64,
375    /// Current memory usage across all active tasks
376    pub current_memory_usage: u64,
377    /// Peak memory usage observed
378    pub peak_memory_usage: u64,
379    /// Average task lifetime
380    pub average_lifetime: Duration,
381    /// Memory efficiency across all tasks
382    pub overall_efficiency: f64,
383    /// Tasks with potential memory leaks
384    pub potential_leaks: usize,
385}
386
387impl AggregatedTaskStats {
388    /// Create empty statistics
389    pub fn new() -> Self {
390        Self {
391            total_tasks: 0,
392            completed_tasks: 0,
393            total_memory_allocated: 0,
394            current_memory_usage: 0,
395            peak_memory_usage: 0,
396            average_lifetime: Duration::ZERO,
397            overall_efficiency: 1.0,
398            potential_leaks: 0,
399        }
400    }
401
402    /// Get memory usage summary
403    pub fn memory_summary(&self) -> String {
404        format!(
405            "Tasks: {} ({}% complete), Memory: {:.2}MB allocated, {:.2}MB current, {:.1}% efficiency, {} potential leaks",
406            self.total_tasks,
407            if self.total_tasks > 0 {
408                self.completed_tasks * 100 / self.total_tasks
409            } else {
410                0
411            },
412            self.total_memory_allocated as f64 / 1_048_576.0,
413            self.current_memory_usage as f64 / 1_048_576.0,
414            self.overall_efficiency * 100.0,
415            self.potential_leaks
416        )
417    }
418}
419
420impl Default for AggregatedTaskStats {
421    fn default() -> Self {
422        Self::new()
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_task_type_description() {
432        assert_eq!(
433            TaskType::CpuIntensive.description(),
434            "CPU-intensive workload"
435        );
436        assert_eq!(TaskType::IoIntensive.description(), "IO-intensive workload");
437        assert_eq!(
438            TaskType::NetworkIntensive.description(),
439            "Network-intensive workload"
440        );
441    }
442
443    #[test]
444    fn test_task_type_resource_priority() {
445        let (cpu, io, net, mem) = TaskType::CpuIntensive.resource_priority();
446        assert!(cpu > io && cpu > net && cpu > mem);
447
448        let (cpu, io, net, mem) = TaskType::IoIntensive.resource_priority();
449        assert!(io > cpu && io > net && io > mem);
450    }
451
452    #[test]
453    fn test_task_memory_profile_basic() {
454        let profile = TaskMemoryProfile::new(1, "test_task".to_string(), TaskType::CpuIntensive);
455
456        assert_eq!(profile.task_id, 1);
457        assert_eq!(profile.task_name, "test_task");
458        assert_eq!(profile.task_type, TaskType::CpuIntensive);
459        assert!(!profile.is_completed());
460        assert_eq!(profile.current_memory, 0);
461        assert_eq!(profile.total_bytes, 0);
462    }
463
464    #[test]
465    fn test_record_allocation() {
466        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
467
468        profile.record_allocation(1024);
469        assert_eq!(profile.current_memory, 1024);
470        assert_eq!(profile.total_bytes, 1024);
471        assert_eq!(profile.peak_memory, 1024);
472        assert_eq!(profile.total_allocations, 1);
473
474        profile.record_allocation(2048);
475        assert_eq!(profile.current_memory, 3072);
476        assert_eq!(profile.total_bytes, 3072);
477        assert_eq!(profile.peak_memory, 3072);
478        assert_eq!(profile.total_allocations, 2);
479    }
480
481    #[test]
482    fn test_record_deallocation() {
483        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
484
485        profile.record_allocation(3072);
486        profile.record_deallocation(1024);
487        assert_eq!(profile.current_memory, 2048);
488        assert_eq!(profile.total_bytes, 3072);
489        assert_eq!(profile.peak_memory, 3072);
490        assert_eq!(profile.total_deallocations, 1);
491    }
492
493    #[test]
494    fn test_memory_efficiency() {
495        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
496
497        profile.record_allocation(1000);
498        profile.record_deallocation(1000);
499        assert_eq!(profile.memory_efficiency(), 1.0);
500
501        profile.record_allocation(1000);
502        assert_eq!(profile.memory_efficiency(), 0.5);
503    }
504
505    #[test]
506    fn test_potential_leak_detection() {
507        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
508
509        profile.record_allocation(1000);
510        assert!(!profile.has_potential_leak());
511
512        profile.record_deallocation(1000);
513        profile.mark_completed();
514        assert!(!profile.has_potential_leak());
515
516        let mut profile2 = TaskMemoryProfile::new(2, "test2".to_string(), TaskType::Mixed);
517        profile2.record_allocation(1000);
518        profile2.mark_completed();
519        assert!(profile2.has_potential_leak());
520    }
521
522    #[test]
523    fn test_task_profile_manager() {
524        let manager = TaskProfileManager::new();
525
526        let task_id = manager.create_task("test_task".to_string(), TaskType::CpuIntensive);
527        assert!(task_id > 0);
528
529        manager.record_allocation(task_id, 1024);
530        manager.record_allocation(task_id, 2048);
531
532        let profile = manager.get_profile(task_id);
533        assert!(profile.is_some());
534        assert_eq!(profile.unwrap().total_bytes, 3072);
535    }
536
537    #[test]
538    fn test_aggregated_stats() {
539        let manager = TaskProfileManager::new();
540
541        let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
542        manager.record_allocation(task1, 1000);
543        manager.record_deallocation(task1, 500);
544        manager.complete_task(task1);
545
546        let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
547        manager.record_allocation(task2, 2000);
548
549        let stats = manager.get_aggregated_stats();
550        assert_eq!(stats.total_tasks, 2);
551        assert_eq!(stats.completed_tasks, 1);
552        assert_eq!(stats.total_memory_allocated, 3000);
553        assert_eq!(stats.current_memory_usage, 2500);
554    }
555
556    #[test]
557    fn test_active_task_count() {
558        let manager = TaskProfileManager::new();
559
560        let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
561        let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
562
563        assert_eq!(manager.active_task_count(), 2);
564
565        manager.complete_task(task1);
566        assert_eq!(manager.active_task_count(), 1);
567
568        manager.complete_task(task2);
569        assert_eq!(manager.active_task_count(), 0);
570    }
571
572    #[test]
573    fn test_profiles_by_type() {
574        let manager = TaskProfileManager::new();
575
576        let _ = manager.create_task("cpu_task".to_string(), TaskType::CpuIntensive);
577        let _ = manager.create_task("io_task".to_string(), TaskType::IoIntensive);
578        let _ = manager.create_task("cpu_task2".to_string(), TaskType::CpuIntensive);
579
580        let cpu_profiles = manager.get_profiles_by_type(TaskType::CpuIntensive);
581        assert_eq!(cpu_profiles.len(), 2);
582
583        let io_profiles = manager.get_profiles_by_type(TaskType::IoIntensive);
584        assert_eq!(io_profiles.len(), 1);
585    }
586}