Skip to main content

memscope_rs/capture/backends/
task_profile.rs

1//! Task-level memory profiling for the unified tracker
2//!
3//! This module provides task-aware memory tracking capabilities,
4//! allowing users to track memory usage patterns at the granularity
5//! of individual tasks or workloads.
6
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13/// Task type classification for categorizing different workloads
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
15pub enum TaskType {
16    /// CPU-intensive tasks (e.g., matrix multiplication, data processing)
17    CpuIntensive,
18    /// IO-intensive tasks (e.g., file operations, database queries)
19    IoIntensive,
20    /// Network-intensive tasks (e.g., HTTP requests, RPC calls)
21    NetworkIntensive,
22    /// Memory-intensive tasks (e.g., large data structures, caching)
23    MemoryIntensive,
24    /// GPU compute tasks (e.g., CUDA, OpenCL operations)
25    GpuCompute,
26    /// Mixed workload with balanced resource usage
27    #[default]
28    Mixed,
29    /// Streaming data processing tasks
30    Streaming,
31    /// Background maintenance tasks
32    Background,
33}
34
35impl TaskType {
36    /// Get human-readable description of task type
37    pub fn description(&self) -> &'static str {
38        match self {
39            Self::CpuIntensive => "CPU-intensive workload",
40            Self::IoIntensive => "IO-intensive workload",
41            Self::NetworkIntensive => "Network-intensive workload",
42            Self::MemoryIntensive => "Memory-intensive workload",
43            Self::GpuCompute => "GPU compute workload",
44            Self::Mixed => "Mixed workload",
45            Self::Streaming => "Streaming workload",
46            Self::Background => "Background task",
47        }
48    }
49
50    /// Get resource priority for this task type
51    pub fn resource_priority(&self) -> (f64, f64, f64, f64) {
52        match self {
53            Self::CpuIntensive => (1.0, 0.3, 0.2, 0.1),
54            Self::IoIntensive => (0.3, 1.0, 0.2, 0.1),
55            Self::NetworkIntensive => (0.3, 0.2, 1.0, 0.1),
56            Self::MemoryIntensive => (0.2, 0.3, 0.2, 1.0),
57            Self::GpuCompute => (0.5, 0.2, 0.1, 0.8),
58            Self::Mixed => (0.5, 0.5, 0.5, 0.5),
59            Self::Streaming => (0.4, 0.4, 0.6, 0.4),
60            Self::Background => (0.2, 0.2, 0.2, 0.2),
61        }
62    }
63}
64
65/// Memory usage profile for a single task
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TaskMemoryProfile {
68    /// Unique task identifier (auto-generated, never reused)
69    pub task_id: u64,
70    /// Tokio task ID (if running in tokio context)
71    pub tokio_task_id: Option<u64>,
72    /// Task name for identification
73    pub task_name: String,
74    /// Task type classification
75    pub task_type: TaskType,
76    /// Task creation timestamp (milliseconds since Unix epoch)
77    pub created_at_ms: u64,
78    /// Task completion timestamp (milliseconds since Unix epoch, if completed)
79    pub completed_at_ms: Option<u64>,
80    /// Total bytes allocated by this task
81    pub total_bytes: u64,
82    /// Current memory usage (allocated - deallocated)
83    pub current_memory: u64,
84    /// Peak memory usage observed
85    pub peak_memory: u64,
86    /// Number of allocation operations
87    pub total_allocations: u64,
88    /// Number of deallocation operations
89    pub total_deallocations: u64,
90    /// Task duration in nanoseconds
91    pub duration_ns: u64,
92    /// Memory allocation rate (bytes/second)
93    pub allocation_rate: f64,
94    /// Memory efficiency score (0.0 to 1.0)
95    pub efficiency_score: f64,
96    /// Average allocation size
97    pub average_allocation_size: f64,
98}
99
100impl TaskMemoryProfile {
101    /// Create new task profile
102    pub fn new(task_id: u64, task_name: String, task_type: TaskType) -> Self {
103        Self {
104            task_id,
105            tokio_task_id: None,
106            task_name,
107            task_type,
108            created_at_ms: Self::now_ms(),
109            completed_at_ms: None,
110            total_bytes: 0,
111            current_memory: 0,
112            peak_memory: 0,
113            total_allocations: 0,
114            total_deallocations: 0,
115            duration_ns: 0,
116            allocation_rate: 0.0,
117            efficiency_score: 1.0,
118            average_allocation_size: 0.0,
119        }
120    }
121
122    /// Create new task profile with tokio task ID
123    pub fn with_tokio_id(
124        task_id: u64,
125        tokio_task_id: u64,
126        task_name: String,
127        task_type: TaskType,
128    ) -> Self {
129        let mut profile = Self::new(task_id, task_name, task_type);
130        profile.tokio_task_id = Some(tokio_task_id);
131        profile
132    }
133
134    /// Get current timestamp in milliseconds
135    fn now_ms() -> u64 {
136        std::time::SystemTime::now()
137            .duration_since(std::time::UNIX_EPOCH)
138            .unwrap_or_default()
139            .as_millis() as u64
140    }
141
142    /// Mark task as completed
143    pub fn mark_completed(&mut self) {
144        self.completed_at_ms = Some(Self::now_ms());
145        self.duration_ns = self.lifetime().as_nanos() as u64;
146        self.update_metrics();
147    }
148
149    /// Record allocation event
150    pub fn record_allocation(&mut self, size: u64) {
151        self.total_bytes += size;
152        self.current_memory += size;
153        self.total_allocations += 1;
154
155        if self.current_memory > self.peak_memory {
156            self.peak_memory = self.current_memory;
157        }
158
159        self.update_metrics();
160    }
161
162    /// Record deallocation event
163    pub fn record_deallocation(&mut self, size: u64) {
164        self.current_memory = self.current_memory.saturating_sub(size);
165        self.total_deallocations += 1;
166        self.update_metrics();
167    }
168
169    /// Check if task is completed
170    pub fn is_completed(&self) -> bool {
171        self.completed_at_ms.is_some()
172    }
173
174    /// Get task lifetime duration
175    pub fn lifetime(&self) -> Duration {
176        let end_ms = self.completed_at_ms.unwrap_or_else(Self::now_ms);
177        let start_ms = self.created_at_ms;
178        Duration::from_millis(end_ms.saturating_sub(start_ms))
179    }
180
181    /// Calculate memory efficiency (deallocated / allocated)
182    pub fn memory_efficiency(&self) -> f64 {
183        if self.total_bytes == 0 {
184            1.0
185        } else {
186            let deallocated = self.total_bytes - self.current_memory;
187            deallocated as f64 / self.total_bytes as f64
188        }
189    }
190
191    /// Check if task has potential memory leak
192    pub fn has_potential_leak(&self) -> bool {
193        self.is_completed() && self.current_memory > 0
194    }
195
196    /// Update derived metrics
197    fn update_metrics(&mut self) {
198        let lifetime_secs = self.lifetime().as_secs_f64();
199
200        self.allocation_rate = if lifetime_secs > 0.0 {
201            self.total_bytes as f64 / lifetime_secs
202        } else {
203            0.0
204        };
205
206        self.efficiency_score = self.memory_efficiency();
207
208        self.average_allocation_size = if self.total_allocations > 0 {
209            self.total_bytes as f64 / self.total_allocations as f64
210        } else {
211            0.0
212        };
213    }
214
215    /// Get memory usage summary
216    pub fn summary(&self) -> String {
217        format!(
218            "Task '{}' (ID: {}, Type: {:?}): {} allocations, {:.2} MB total, {:.2} MB peak, {:.1}% efficiency",
219            self.task_name,
220            self.task_id,
221            self.task_type,
222            self.total_allocations,
223            self.total_bytes as f64 / 1_048_576.0,
224            self.peak_memory as f64 / 1_048_576.0,
225            self.efficiency_score * 100.0
226        )
227    }
228}
229
230/// Manager for tracking multiple task profiles
231#[derive(Debug, Clone)]
232pub struct TaskProfileManager {
233    profiles: Arc<Mutex<HashMap<u64, TaskMemoryProfile>>>,
234    next_task_id: Arc<AtomicU64>,
235}
236
237impl TaskProfileManager {
238    /// Create new task profile manager
239    pub fn new() -> Self {
240        Self {
241            profiles: Arc::new(Mutex::new(HashMap::new())),
242            next_task_id: Arc::new(AtomicU64::new(1)),
243        }
244    }
245
246    /// Create a new task profile
247    pub fn create_task(&self, task_name: String, task_type: TaskType) -> u64 {
248        let task_id = self.next_task_id.fetch_add(1, Ordering::Relaxed);
249
250        let profile = TaskMemoryProfile::new(task_id, task_name, task_type);
251
252        if let Ok(mut profiles) = self.profiles.lock() {
253            profiles.insert(task_id, profile);
254        }
255
256        task_id
257    }
258
259    /// Record allocation for a task
260    pub fn record_allocation(&self, task_id: u64, size: u64) {
261        if let Ok(mut profiles) = self.profiles.lock() {
262            if let Some(profile) = profiles.get_mut(&task_id) {
263                profile.record_allocation(size);
264            }
265        }
266    }
267
268    /// Record deallocation for a task
269    pub fn record_deallocation(&self, task_id: u64, size: u64) {
270        if let Ok(mut profiles) = self.profiles.lock() {
271            if let Some(profile) = profiles.get_mut(&task_id) {
272                profile.record_deallocation(size);
273            }
274        }
275    }
276
277    /// Mark task as completed
278    pub fn complete_task(&self, task_id: u64) {
279        if let Ok(mut profiles) = self.profiles.lock() {
280            if let Some(profile) = profiles.get_mut(&task_id) {
281                profile.mark_completed();
282            }
283        }
284    }
285
286    /// Get profile for a specific task
287    pub fn get_profile(&self, task_id: u64) -> Option<TaskMemoryProfile> {
288        if let Ok(profiles) = self.profiles.lock() {
289            profiles.get(&task_id).cloned()
290        } else {
291            None
292        }
293    }
294
295    /// Get all task profiles
296    pub fn get_all_profiles(&self) -> Vec<TaskMemoryProfile> {
297        if let Ok(profiles) = self.profiles.lock() {
298            profiles.values().cloned().collect()
299        } else {
300            Vec::new()
301        }
302    }
303
304    /// Get profiles by task type
305    pub fn get_profiles_by_type(&self, task_type: TaskType) -> Vec<TaskMemoryProfile> {
306        if let Ok(profiles) = self.profiles.lock() {
307            profiles
308                .values()
309                .filter(|p| p.task_type == task_type)
310                .cloned()
311                .collect()
312        } else {
313            Vec::new()
314        }
315    }
316
317    /// Get aggregated statistics
318    pub fn get_aggregated_stats(&self) -> AggregatedTaskStats {
319        let profiles = self.get_all_profiles();
320
321        let total_tasks = profiles.len();
322        let completed_tasks = profiles.iter().filter(|p| p.is_completed()).count();
323
324        let total_memory_allocated: u64 = profiles.iter().map(|p| p.total_bytes).sum();
325        let current_memory_usage: u64 = profiles.iter().map(|p| p.current_memory).sum();
326        let peak_memory_usage: u64 = profiles.iter().map(|p| p.peak_memory).max().unwrap_or(0);
327
328        let total_duration: Duration = profiles.iter().map(|p| p.lifetime()).sum::<Duration>();
329        let average_lifetime = if total_tasks > 0 {
330            let total_secs = total_duration.as_secs_f64();
331            let avg_secs = total_secs / total_tasks as f64;
332            Duration::from_secs_f64(avg_secs)
333        } else {
334            Duration::ZERO
335        };
336
337        let overall_efficiency = if total_memory_allocated > 0 {
338            let total_deallocated = total_memory_allocated - current_memory_usage;
339            total_deallocated as f64 / total_memory_allocated as f64
340        } else {
341            1.0
342        };
343
344        let potential_leaks = profiles.iter().filter(|p| p.has_potential_leak()).count();
345
346        AggregatedTaskStats {
347            total_tasks,
348            completed_tasks,
349            total_memory_allocated,
350            current_memory_usage,
351            peak_memory_usage,
352            average_lifetime,
353            overall_efficiency,
354            potential_leaks,
355        }
356    }
357
358    /// Clear all profiles
359    pub fn clear(&self) {
360        if let Ok(mut profiles) = self.profiles.lock() {
361            profiles.clear();
362        }
363    }
364
365    /// Get number of active tasks
366    pub fn active_task_count(&self) -> usize {
367        if let Ok(profiles) = self.profiles.lock() {
368            profiles.iter().filter(|(_, p)| !p.is_completed()).count()
369        } else {
370            0
371        }
372    }
373}
374
375impl Default for TaskProfileManager {
376    fn default() -> Self {
377        Self::new()
378    }
379}
380
381/// Aggregated statistics across multiple tasks
382#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct AggregatedTaskStats {
384    /// Total number of tasks tracked
385    pub total_tasks: usize,
386    /// Number of completed tasks
387    pub completed_tasks: usize,
388    /// Total memory allocated across all tasks
389    pub total_memory_allocated: u64,
390    /// Current memory usage across all active tasks
391    pub current_memory_usage: u64,
392    /// Peak memory usage observed
393    pub peak_memory_usage: u64,
394    /// Average task lifetime
395    pub average_lifetime: Duration,
396    /// Memory efficiency across all tasks
397    pub overall_efficiency: f64,
398    /// Tasks with potential memory leaks
399    pub potential_leaks: usize,
400}
401
402impl AggregatedTaskStats {
403    /// Create empty statistics
404    pub fn new() -> Self {
405        Self {
406            total_tasks: 0,
407            completed_tasks: 0,
408            total_memory_allocated: 0,
409            current_memory_usage: 0,
410            peak_memory_usage: 0,
411            average_lifetime: Duration::ZERO,
412            overall_efficiency: 1.0,
413            potential_leaks: 0,
414        }
415    }
416
417    /// Get memory usage summary
418    pub fn memory_summary(&self) -> String {
419        format!(
420            "Tasks: {} ({}% complete), Memory: {:.2}MB allocated, {:.2}MB current, {:.1}% efficiency, {} potential leaks",
421            self.total_tasks,
422            self.completed_tasks
423                .checked_mul(100)
424                .and_then(|v| v.checked_div(self.total_tasks))
425                .unwrap_or(0),
426            self.total_memory_allocated as f64 / 1_048_576.0,
427            self.current_memory_usage as f64 / 1_048_576.0,
428            self.overall_efficiency * 100.0,
429            self.potential_leaks
430        )
431    }
432}
433
434impl Default for AggregatedTaskStats {
435    fn default() -> Self {
436        Self::new()
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443
444    #[test]
445    fn test_task_type_description() {
446        assert_eq!(
447            TaskType::CpuIntensive.description(),
448            "CPU-intensive workload"
449        );
450        assert_eq!(TaskType::IoIntensive.description(), "IO-intensive workload");
451        assert_eq!(
452            TaskType::NetworkIntensive.description(),
453            "Network-intensive workload"
454        );
455    }
456
457    #[test]
458    fn test_task_type_resource_priority() {
459        let (cpu, io, net, mem) = TaskType::CpuIntensive.resource_priority();
460        assert!(cpu > io && cpu > net && cpu > mem);
461
462        let (cpu, io, net, mem) = TaskType::IoIntensive.resource_priority();
463        assert!(io > cpu && io > net && io > mem);
464    }
465
466    #[test]
467    fn test_task_memory_profile_basic() {
468        let profile = TaskMemoryProfile::new(1, "test_task".to_string(), TaskType::CpuIntensive);
469
470        assert_eq!(profile.task_id, 1);
471        assert_eq!(profile.task_name, "test_task");
472        assert_eq!(profile.task_type, TaskType::CpuIntensive);
473        assert!(!profile.is_completed());
474        assert_eq!(profile.current_memory, 0);
475        assert_eq!(profile.total_bytes, 0);
476    }
477
478    #[test]
479    fn test_record_allocation() {
480        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
481
482        profile.record_allocation(1024);
483        assert_eq!(profile.current_memory, 1024);
484        assert_eq!(profile.total_bytes, 1024);
485        assert_eq!(profile.peak_memory, 1024);
486        assert_eq!(profile.total_allocations, 1);
487
488        profile.record_allocation(2048);
489        assert_eq!(profile.current_memory, 3072);
490        assert_eq!(profile.total_bytes, 3072);
491        assert_eq!(profile.peak_memory, 3072);
492        assert_eq!(profile.total_allocations, 2);
493    }
494
495    #[test]
496    fn test_record_deallocation() {
497        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
498
499        profile.record_allocation(3072);
500        profile.record_deallocation(1024);
501        assert_eq!(profile.current_memory, 2048);
502        assert_eq!(profile.total_bytes, 3072);
503        assert_eq!(profile.peak_memory, 3072);
504        assert_eq!(profile.total_deallocations, 1);
505    }
506
507    #[test]
508    fn test_memory_efficiency() {
509        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
510
511        profile.record_allocation(1000);
512        profile.record_deallocation(1000);
513        assert_eq!(profile.memory_efficiency(), 1.0);
514
515        profile.record_allocation(1000);
516        assert_eq!(profile.memory_efficiency(), 0.5);
517    }
518
519    #[test]
520    fn test_potential_leak_detection() {
521        let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
522
523        profile.record_allocation(1000);
524        assert!(!profile.has_potential_leak());
525
526        profile.record_deallocation(1000);
527        profile.mark_completed();
528        assert!(!profile.has_potential_leak());
529
530        let mut profile2 = TaskMemoryProfile::new(2, "test2".to_string(), TaskType::Mixed);
531        profile2.record_allocation(1000);
532        profile2.mark_completed();
533        assert!(profile2.has_potential_leak());
534    }
535
536    #[test]
537    fn test_task_profile_manager() {
538        let manager = TaskProfileManager::new();
539
540        let task_id = manager.create_task("test_task".to_string(), TaskType::CpuIntensive);
541        assert!(task_id > 0);
542
543        manager.record_allocation(task_id, 1024);
544        manager.record_allocation(task_id, 2048);
545
546        let profile = manager.get_profile(task_id);
547        assert!(profile.is_some());
548        assert_eq!(profile.unwrap().total_bytes, 3072);
549    }
550
551    #[test]
552    fn test_aggregated_stats() {
553        let manager = TaskProfileManager::new();
554
555        let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
556        manager.record_allocation(task1, 1000);
557        manager.record_deallocation(task1, 500);
558        manager.complete_task(task1);
559
560        let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
561        manager.record_allocation(task2, 2000);
562
563        let stats = manager.get_aggregated_stats();
564        assert_eq!(stats.total_tasks, 2);
565        assert_eq!(stats.completed_tasks, 1);
566        assert_eq!(stats.total_memory_allocated, 3000);
567        assert_eq!(stats.current_memory_usage, 2500);
568    }
569
570    #[test]
571    fn test_active_task_count() {
572        let manager = TaskProfileManager::new();
573
574        let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
575        let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
576
577        assert_eq!(manager.active_task_count(), 2);
578
579        manager.complete_task(task1);
580        assert_eq!(manager.active_task_count(), 1);
581
582        manager.complete_task(task2);
583        assert_eq!(manager.active_task_count(), 0);
584    }
585
586    #[test]
587    fn test_profiles_by_type() {
588        let manager = TaskProfileManager::new();
589
590        let _ = manager.create_task("cpu_task".to_string(), TaskType::CpuIntensive);
591        let _ = manager.create_task("io_task".to_string(), TaskType::IoIntensive);
592        let _ = manager.create_task("cpu_task2".to_string(), TaskType::CpuIntensive);
593
594        let cpu_profiles = manager.get_profiles_by_type(TaskType::CpuIntensive);
595        assert_eq!(cpu_profiles.len(), 2);
596
597        let io_profiles = manager.get_profiles_by_type(TaskType::IoIntensive);
598        assert_eq!(io_profiles.len(), 1);
599    }
600}