1use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
15pub enum TaskType {
16 CpuIntensive,
18 IoIntensive,
20 NetworkIntensive,
22 MemoryIntensive,
24 GpuCompute,
26 #[default]
28 Mixed,
29 Streaming,
31 Background,
33}
34
35impl TaskType {
36 pub fn description(&self) -> &'static str {
38 match self {
39 Self::CpuIntensive => "CPU-intensive workload",
40 Self::IoIntensive => "IO-intensive workload",
41 Self::NetworkIntensive => "Network-intensive workload",
42 Self::MemoryIntensive => "Memory-intensive workload",
43 Self::GpuCompute => "GPU compute workload",
44 Self::Mixed => "Mixed workload",
45 Self::Streaming => "Streaming workload",
46 Self::Background => "Background task",
47 }
48 }
49
50 pub fn resource_priority(&self) -> (f64, f64, f64, f64) {
52 match self {
53 Self::CpuIntensive => (1.0, 0.3, 0.2, 0.1),
54 Self::IoIntensive => (0.3, 1.0, 0.2, 0.1),
55 Self::NetworkIntensive => (0.3, 0.2, 1.0, 0.1),
56 Self::MemoryIntensive => (0.2, 0.3, 0.2, 1.0),
57 Self::GpuCompute => (0.5, 0.2, 0.1, 0.8),
58 Self::Mixed => (0.5, 0.5, 0.5, 0.5),
59 Self::Streaming => (0.4, 0.4, 0.6, 0.4),
60 Self::Background => (0.2, 0.2, 0.2, 0.2),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TaskMemoryProfile {
68 pub task_id: u64,
70 pub tokio_task_id: Option<u64>,
72 pub task_name: String,
74 pub task_type: TaskType,
76 pub created_at_ms: u64,
78 pub completed_at_ms: Option<u64>,
80 pub total_bytes: u64,
82 pub current_memory: u64,
84 pub peak_memory: u64,
86 pub total_allocations: u64,
88 pub total_deallocations: u64,
90 pub duration_ns: u64,
92 pub allocation_rate: f64,
94 pub efficiency_score: f64,
96 pub average_allocation_size: f64,
98}
99
100impl TaskMemoryProfile {
101 pub fn new(task_id: u64, task_name: String, task_type: TaskType) -> Self {
103 Self {
104 task_id,
105 tokio_task_id: None,
106 task_name,
107 task_type,
108 created_at_ms: Self::now_ms(),
109 completed_at_ms: None,
110 total_bytes: 0,
111 current_memory: 0,
112 peak_memory: 0,
113 total_allocations: 0,
114 total_deallocations: 0,
115 duration_ns: 0,
116 allocation_rate: 0.0,
117 efficiency_score: 1.0,
118 average_allocation_size: 0.0,
119 }
120 }
121
122 pub fn with_tokio_id(
124 task_id: u64,
125 tokio_task_id: u64,
126 task_name: String,
127 task_type: TaskType,
128 ) -> Self {
129 let mut profile = Self::new(task_id, task_name, task_type);
130 profile.tokio_task_id = Some(tokio_task_id);
131 profile
132 }
133
134 fn now_ms() -> u64 {
136 std::time::SystemTime::now()
137 .duration_since(std::time::UNIX_EPOCH)
138 .unwrap_or_default()
139 .as_millis() as u64
140 }
141
142 pub fn mark_completed(&mut self) {
144 self.completed_at_ms = Some(Self::now_ms());
145 self.duration_ns = self.lifetime().as_nanos() as u64;
146 self.update_metrics();
147 }
148
149 pub fn record_allocation(&mut self, size: u64) {
151 self.total_bytes += size;
152 self.current_memory += size;
153 self.total_allocations += 1;
154
155 if self.current_memory > self.peak_memory {
156 self.peak_memory = self.current_memory;
157 }
158
159 self.update_metrics();
160 }
161
162 pub fn record_deallocation(&mut self, size: u64) {
164 self.current_memory = self.current_memory.saturating_sub(size);
165 self.total_deallocations += 1;
166 self.update_metrics();
167 }
168
169 pub fn is_completed(&self) -> bool {
171 self.completed_at_ms.is_some()
172 }
173
174 pub fn lifetime(&self) -> Duration {
176 let end_ms = self.completed_at_ms.unwrap_or_else(Self::now_ms);
177 let start_ms = self.created_at_ms;
178 Duration::from_millis(end_ms.saturating_sub(start_ms))
179 }
180
181 pub fn memory_efficiency(&self) -> f64 {
183 if self.total_bytes == 0 {
184 1.0
185 } else {
186 let deallocated = self.total_bytes - self.current_memory;
187 deallocated as f64 / self.total_bytes as f64
188 }
189 }
190
191 pub fn has_potential_leak(&self) -> bool {
193 self.is_completed() && self.current_memory > 0
194 }
195
196 fn update_metrics(&mut self) {
198 let lifetime_secs = self.lifetime().as_secs_f64();
199
200 self.allocation_rate = if lifetime_secs > 0.0 {
201 self.total_bytes as f64 / lifetime_secs
202 } else {
203 0.0
204 };
205
206 self.efficiency_score = self.memory_efficiency();
207
208 self.average_allocation_size = if self.total_allocations > 0 {
209 self.total_bytes as f64 / self.total_allocations as f64
210 } else {
211 0.0
212 };
213 }
214
215 pub fn summary(&self) -> String {
217 format!(
218 "Task '{}' (ID: {}, Type: {:?}): {} allocations, {:.2} MB total, {:.2} MB peak, {:.1}% efficiency",
219 self.task_name,
220 self.task_id,
221 self.task_type,
222 self.total_allocations,
223 self.total_bytes as f64 / 1_048_576.0,
224 self.peak_memory as f64 / 1_048_576.0,
225 self.efficiency_score * 100.0
226 )
227 }
228}
229
230#[derive(Debug, Clone)]
232pub struct TaskProfileManager {
233 profiles: Arc<Mutex<HashMap<u64, TaskMemoryProfile>>>,
234 next_task_id: Arc<AtomicU64>,
235}
236
237impl TaskProfileManager {
238 pub fn new() -> Self {
240 Self {
241 profiles: Arc::new(Mutex::new(HashMap::new())),
242 next_task_id: Arc::new(AtomicU64::new(1)),
243 }
244 }
245
246 pub fn create_task(&self, task_name: String, task_type: TaskType) -> u64 {
248 let task_id = self.next_task_id.fetch_add(1, Ordering::Relaxed);
249
250 let profile = TaskMemoryProfile::new(task_id, task_name, task_type);
251
252 if let Ok(mut profiles) = self.profiles.lock() {
253 profiles.insert(task_id, profile);
254 }
255
256 task_id
257 }
258
259 pub fn record_allocation(&self, task_id: u64, size: u64) {
261 if let Ok(mut profiles) = self.profiles.lock() {
262 if let Some(profile) = profiles.get_mut(&task_id) {
263 profile.record_allocation(size);
264 }
265 }
266 }
267
268 pub fn record_deallocation(&self, task_id: u64, size: u64) {
270 if let Ok(mut profiles) = self.profiles.lock() {
271 if let Some(profile) = profiles.get_mut(&task_id) {
272 profile.record_deallocation(size);
273 }
274 }
275 }
276
277 pub fn complete_task(&self, task_id: u64) {
279 if let Ok(mut profiles) = self.profiles.lock() {
280 if let Some(profile) = profiles.get_mut(&task_id) {
281 profile.mark_completed();
282 }
283 }
284 }
285
286 pub fn get_profile(&self, task_id: u64) -> Option<TaskMemoryProfile> {
288 if let Ok(profiles) = self.profiles.lock() {
289 profiles.get(&task_id).cloned()
290 } else {
291 None
292 }
293 }
294
295 pub fn get_all_profiles(&self) -> Vec<TaskMemoryProfile> {
297 if let Ok(profiles) = self.profiles.lock() {
298 profiles.values().cloned().collect()
299 } else {
300 Vec::new()
301 }
302 }
303
304 pub fn get_profiles_by_type(&self, task_type: TaskType) -> Vec<TaskMemoryProfile> {
306 if let Ok(profiles) = self.profiles.lock() {
307 profiles
308 .values()
309 .filter(|p| p.task_type == task_type)
310 .cloned()
311 .collect()
312 } else {
313 Vec::new()
314 }
315 }
316
317 pub fn get_aggregated_stats(&self) -> AggregatedTaskStats {
319 let profiles = self.get_all_profiles();
320
321 let total_tasks = profiles.len();
322 let completed_tasks = profiles.iter().filter(|p| p.is_completed()).count();
323
324 let total_memory_allocated: u64 = profiles.iter().map(|p| p.total_bytes).sum();
325 let current_memory_usage: u64 = profiles.iter().map(|p| p.current_memory).sum();
326 let peak_memory_usage: u64 = profiles.iter().map(|p| p.peak_memory).max().unwrap_or(0);
327
328 let total_duration: Duration = profiles.iter().map(|p| p.lifetime()).sum::<Duration>();
329 let average_lifetime = if total_tasks > 0 {
330 let total_secs = total_duration.as_secs_f64();
331 let avg_secs = total_secs / total_tasks as f64;
332 Duration::from_secs_f64(avg_secs)
333 } else {
334 Duration::ZERO
335 };
336
337 let overall_efficiency = if total_memory_allocated > 0 {
338 let total_deallocated = total_memory_allocated - current_memory_usage;
339 total_deallocated as f64 / total_memory_allocated as f64
340 } else {
341 1.0
342 };
343
344 let potential_leaks = profiles.iter().filter(|p| p.has_potential_leak()).count();
345
346 AggregatedTaskStats {
347 total_tasks,
348 completed_tasks,
349 total_memory_allocated,
350 current_memory_usage,
351 peak_memory_usage,
352 average_lifetime,
353 overall_efficiency,
354 potential_leaks,
355 }
356 }
357
358 pub fn clear(&self) {
360 if let Ok(mut profiles) = self.profiles.lock() {
361 profiles.clear();
362 }
363 }
364
365 pub fn active_task_count(&self) -> usize {
367 if let Ok(profiles) = self.profiles.lock() {
368 profiles.iter().filter(|(_, p)| !p.is_completed()).count()
369 } else {
370 0
371 }
372 }
373}
374
375impl Default for TaskProfileManager {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct AggregatedTaskStats {
384 pub total_tasks: usize,
386 pub completed_tasks: usize,
388 pub total_memory_allocated: u64,
390 pub current_memory_usage: u64,
392 pub peak_memory_usage: u64,
394 pub average_lifetime: Duration,
396 pub overall_efficiency: f64,
398 pub potential_leaks: usize,
400}
401
402impl AggregatedTaskStats {
403 pub fn new() -> Self {
405 Self {
406 total_tasks: 0,
407 completed_tasks: 0,
408 total_memory_allocated: 0,
409 current_memory_usage: 0,
410 peak_memory_usage: 0,
411 average_lifetime: Duration::ZERO,
412 overall_efficiency: 1.0,
413 potential_leaks: 0,
414 }
415 }
416
417 pub fn memory_summary(&self) -> String {
419 format!(
420 "Tasks: {} ({}% complete), Memory: {:.2}MB allocated, {:.2}MB current, {:.1}% efficiency, {} potential leaks",
421 self.total_tasks,
422 self.completed_tasks
423 .checked_mul(100)
424 .and_then(|v| v.checked_div(self.total_tasks))
425 .unwrap_or(0),
426 self.total_memory_allocated as f64 / 1_048_576.0,
427 self.current_memory_usage as f64 / 1_048_576.0,
428 self.overall_efficiency * 100.0,
429 self.potential_leaks
430 )
431 }
432}
433
434impl Default for AggregatedTaskStats {
435 fn default() -> Self {
436 Self::new()
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443
444 #[test]
445 fn test_task_type_description() {
446 assert_eq!(
447 TaskType::CpuIntensive.description(),
448 "CPU-intensive workload"
449 );
450 assert_eq!(TaskType::IoIntensive.description(), "IO-intensive workload");
451 assert_eq!(
452 TaskType::NetworkIntensive.description(),
453 "Network-intensive workload"
454 );
455 }
456
457 #[test]
458 fn test_task_type_resource_priority() {
459 let (cpu, io, net, mem) = TaskType::CpuIntensive.resource_priority();
460 assert!(cpu > io && cpu > net && cpu > mem);
461
462 let (cpu, io, net, mem) = TaskType::IoIntensive.resource_priority();
463 assert!(io > cpu && io > net && io > mem);
464 }
465
466 #[test]
467 fn test_task_memory_profile_basic() {
468 let profile = TaskMemoryProfile::new(1, "test_task".to_string(), TaskType::CpuIntensive);
469
470 assert_eq!(profile.task_id, 1);
471 assert_eq!(profile.task_name, "test_task");
472 assert_eq!(profile.task_type, TaskType::CpuIntensive);
473 assert!(!profile.is_completed());
474 assert_eq!(profile.current_memory, 0);
475 assert_eq!(profile.total_bytes, 0);
476 }
477
478 #[test]
479 fn test_record_allocation() {
480 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
481
482 profile.record_allocation(1024);
483 assert_eq!(profile.current_memory, 1024);
484 assert_eq!(profile.total_bytes, 1024);
485 assert_eq!(profile.peak_memory, 1024);
486 assert_eq!(profile.total_allocations, 1);
487
488 profile.record_allocation(2048);
489 assert_eq!(profile.current_memory, 3072);
490 assert_eq!(profile.total_bytes, 3072);
491 assert_eq!(profile.peak_memory, 3072);
492 assert_eq!(profile.total_allocations, 2);
493 }
494
495 #[test]
496 fn test_record_deallocation() {
497 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
498
499 profile.record_allocation(3072);
500 profile.record_deallocation(1024);
501 assert_eq!(profile.current_memory, 2048);
502 assert_eq!(profile.total_bytes, 3072);
503 assert_eq!(profile.peak_memory, 3072);
504 assert_eq!(profile.total_deallocations, 1);
505 }
506
507 #[test]
508 fn test_memory_efficiency() {
509 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
510
511 profile.record_allocation(1000);
512 profile.record_deallocation(1000);
513 assert_eq!(profile.memory_efficiency(), 1.0);
514
515 profile.record_allocation(1000);
516 assert_eq!(profile.memory_efficiency(), 0.5);
517 }
518
519 #[test]
520 fn test_potential_leak_detection() {
521 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
522
523 profile.record_allocation(1000);
524 assert!(!profile.has_potential_leak());
525
526 profile.record_deallocation(1000);
527 profile.mark_completed();
528 assert!(!profile.has_potential_leak());
529
530 let mut profile2 = TaskMemoryProfile::new(2, "test2".to_string(), TaskType::Mixed);
531 profile2.record_allocation(1000);
532 profile2.mark_completed();
533 assert!(profile2.has_potential_leak());
534 }
535
536 #[test]
537 fn test_task_profile_manager() {
538 let manager = TaskProfileManager::new();
539
540 let task_id = manager.create_task("test_task".to_string(), TaskType::CpuIntensive);
541 assert!(task_id > 0);
542
543 manager.record_allocation(task_id, 1024);
544 manager.record_allocation(task_id, 2048);
545
546 let profile = manager.get_profile(task_id);
547 assert!(profile.is_some());
548 assert_eq!(profile.unwrap().total_bytes, 3072);
549 }
550
551 #[test]
552 fn test_aggregated_stats() {
553 let manager = TaskProfileManager::new();
554
555 let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
556 manager.record_allocation(task1, 1000);
557 manager.record_deallocation(task1, 500);
558 manager.complete_task(task1);
559
560 let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
561 manager.record_allocation(task2, 2000);
562
563 let stats = manager.get_aggregated_stats();
564 assert_eq!(stats.total_tasks, 2);
565 assert_eq!(stats.completed_tasks, 1);
566 assert_eq!(stats.total_memory_allocated, 3000);
567 assert_eq!(stats.current_memory_usage, 2500);
568 }
569
570 #[test]
571 fn test_active_task_count() {
572 let manager = TaskProfileManager::new();
573
574 let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
575 let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
576
577 assert_eq!(manager.active_task_count(), 2);
578
579 manager.complete_task(task1);
580 assert_eq!(manager.active_task_count(), 1);
581
582 manager.complete_task(task2);
583 assert_eq!(manager.active_task_count(), 0);
584 }
585
586 #[test]
587 fn test_profiles_by_type() {
588 let manager = TaskProfileManager::new();
589
590 let _ = manager.create_task("cpu_task".to_string(), TaskType::CpuIntensive);
591 let _ = manager.create_task("io_task".to_string(), TaskType::IoIntensive);
592 let _ = manager.create_task("cpu_task2".to_string(), TaskType::CpuIntensive);
593
594 let cpu_profiles = manager.get_profiles_by_type(TaskType::CpuIntensive);
595 assert_eq!(cpu_profiles.len(), 2);
596
597 let io_profiles = manager.get_profiles_by_type(TaskType::IoIntensive);
598 assert_eq!(io_profiles.len(), 1);
599 }
600}