1use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
15pub enum TaskType {
16 CpuIntensive,
18 IoIntensive,
20 NetworkIntensive,
22 MemoryIntensive,
24 GpuCompute,
26 #[default]
28 Mixed,
29 Streaming,
31 Background,
33}
34
35impl TaskType {
36 pub fn description(&self) -> &'static str {
38 match self {
39 Self::CpuIntensive => "CPU-intensive workload",
40 Self::IoIntensive => "IO-intensive workload",
41 Self::NetworkIntensive => "Network-intensive workload",
42 Self::MemoryIntensive => "Memory-intensive workload",
43 Self::GpuCompute => "GPU compute workload",
44 Self::Mixed => "Mixed workload",
45 Self::Streaming => "Streaming workload",
46 Self::Background => "Background task",
47 }
48 }
49
50 pub fn resource_priority(&self) -> (f64, f64, f64, f64) {
52 match self {
53 Self::CpuIntensive => (1.0, 0.3, 0.2, 0.1),
54 Self::IoIntensive => (0.3, 1.0, 0.2, 0.1),
55 Self::NetworkIntensive => (0.3, 0.2, 1.0, 0.1),
56 Self::MemoryIntensive => (0.2, 0.3, 0.2, 1.0),
57 Self::GpuCompute => (0.5, 0.2, 0.1, 0.8),
58 Self::Mixed => (0.5, 0.5, 0.5, 0.5),
59 Self::Streaming => (0.4, 0.4, 0.6, 0.4),
60 Self::Background => (0.2, 0.2, 0.2, 0.2),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TaskMemoryProfile {
68 pub task_id: u64,
70 pub tokio_task_id: Option<u64>,
72 pub task_name: String,
74 pub task_type: TaskType,
76 pub created_at_ms: u64,
78 pub completed_at_ms: Option<u64>,
80 pub total_bytes: u64,
82 pub current_memory: u64,
84 pub peak_memory: u64,
86 pub total_allocations: u64,
88 pub total_deallocations: u64,
90 pub duration_ns: u64,
92 pub allocation_rate: f64,
94 pub efficiency_score: f64,
96 pub average_allocation_size: f64,
98}
99
100impl TaskMemoryProfile {
101 pub fn new(task_id: u64, task_name: String, task_type: TaskType) -> Self {
103 Self {
104 task_id,
105 tokio_task_id: None,
106 task_name,
107 task_type,
108 created_at_ms: Self::now_ms(),
109 completed_at_ms: None,
110 total_bytes: 0,
111 current_memory: 0,
112 peak_memory: 0,
113 total_allocations: 0,
114 total_deallocations: 0,
115 duration_ns: 0,
116 allocation_rate: 0.0,
117 efficiency_score: 1.0,
118 average_allocation_size: 0.0,
119 }
120 }
121
122 pub fn with_tokio_id(
124 task_id: u64,
125 tokio_task_id: u64,
126 task_name: String,
127 task_type: TaskType,
128 ) -> Self {
129 let mut profile = Self::new(task_id, task_name, task_type);
130 profile.tokio_task_id = Some(tokio_task_id);
131 profile
132 }
133
134 fn now_ms() -> u64 {
136 std::time::SystemTime::now()
137 .duration_since(std::time::UNIX_EPOCH)
138 .unwrap_or_default()
139 .as_millis() as u64
140 }
141
142 pub fn mark_completed(&mut self) {
144 self.completed_at_ms = Some(Self::now_ms());
145 self.duration_ns = self.lifetime().as_nanos() as u64;
146 self.update_metrics();
147 }
148
149 pub fn record_allocation(&mut self, size: u64) {
151 self.total_bytes += size;
152 self.current_memory += size;
153 self.total_allocations += 1;
154
155 if self.current_memory > self.peak_memory {
156 self.peak_memory = self.current_memory;
157 }
158
159 self.update_metrics();
160 }
161
162 pub fn record_deallocation(&mut self, size: u64) {
164 self.current_memory = self.current_memory.saturating_sub(size);
165 self.total_deallocations += 1;
166 self.update_metrics();
167 }
168
169 pub fn is_completed(&self) -> bool {
171 self.completed_at_ms.is_some()
172 }
173
174 pub fn lifetime(&self) -> Duration {
176 let end_ms = self.completed_at_ms.unwrap_or_else(Self::now_ms);
177 let start_ms = self.created_at_ms;
178 Duration::from_millis(end_ms.saturating_sub(start_ms))
179 }
180
181 pub fn memory_efficiency(&self) -> f64 {
183 if self.total_bytes == 0 {
184 1.0
185 } else {
186 let deallocated = self.total_bytes - self.current_memory;
187 deallocated as f64 / self.total_bytes as f64
188 }
189 }
190
191 pub fn has_potential_leak(&self) -> bool {
193 self.is_completed() && self.current_memory > 0
194 }
195
196 fn update_metrics(&mut self) {
198 let lifetime_secs = self.lifetime().as_secs_f64();
199
200 self.allocation_rate = if lifetime_secs > 0.0 {
201 self.total_bytes as f64 / lifetime_secs
202 } else {
203 0.0
204 };
205
206 self.efficiency_score = self.memory_efficiency();
207
208 self.average_allocation_size = if self.total_allocations > 0 {
209 self.total_bytes as f64 / self.total_allocations as f64
210 } else {
211 0.0
212 };
213 }
214
215 pub fn summary(&self) -> String {
217 format!(
218 "Task '{}' (ID: {}, Type: {:?}): {} allocations, {:.2} MB total, {:.2} MB peak, {:.1}% efficiency",
219 self.task_name,
220 self.task_id,
221 self.task_type,
222 self.total_allocations,
223 self.total_bytes as f64 / 1_048_576.0,
224 self.peak_memory as f64 / 1_048_576.0,
225 self.efficiency_score * 100.0
226 )
227 }
228}
229
230#[derive(Debug, Clone)]
232pub struct TaskProfileManager {
233 profiles: Arc<Mutex<HashMap<u64, TaskMemoryProfile>>>,
234 next_task_id: Arc<AtomicU64>,
235}
236
237impl TaskProfileManager {
238 pub fn new() -> Self {
240 Self {
241 profiles: Arc::new(Mutex::new(HashMap::new())),
242 next_task_id: Arc::new(AtomicU64::new(1)),
243 }
244 }
245
246 pub fn create_task(&self, task_name: String, task_type: TaskType) -> u64 {
248 let task_id = self.next_task_id.fetch_add(1, Ordering::Relaxed);
249
250 let profile = TaskMemoryProfile::new(task_id, task_name, task_type);
251
252 if let Ok(mut profiles) = self.profiles.lock() {
253 profiles.insert(task_id, profile);
254 }
255
256 task_id
257 }
258
259 pub fn record_allocation(&self, task_id: u64, size: u64) {
261 if let Ok(mut profiles) = self.profiles.lock() {
262 if let Some(profile) = profiles.get_mut(&task_id) {
263 profile.record_allocation(size);
264 }
265 }
266 }
267
268 pub fn record_deallocation(&self, task_id: u64, size: u64) {
270 if let Ok(mut profiles) = self.profiles.lock() {
271 if let Some(profile) = profiles.get_mut(&task_id) {
272 profile.record_deallocation(size);
273 }
274 }
275 }
276
277 pub fn complete_task(&self, task_id: u64) {
279 if let Ok(mut profiles) = self.profiles.lock() {
280 if let Some(profile) = profiles.get_mut(&task_id) {
281 profile.mark_completed();
282 }
283 }
284 }
285
286 pub fn get_profile(&self, task_id: u64) -> Option<TaskMemoryProfile> {
288 if let Ok(profiles) = self.profiles.lock() {
289 profiles.get(&task_id).cloned()
290 } else {
291 None
292 }
293 }
294
295 pub fn get_all_profiles(&self) -> Vec<TaskMemoryProfile> {
297 if let Ok(profiles) = self.profiles.lock() {
298 profiles.values().cloned().collect()
299 } else {
300 Vec::new()
301 }
302 }
303
304 pub fn get_profiles_by_type(&self, task_type: TaskType) -> Vec<TaskMemoryProfile> {
306 if let Ok(profiles) = self.profiles.lock() {
307 profiles
308 .values()
309 .filter(|p| p.task_type == task_type)
310 .cloned()
311 .collect()
312 } else {
313 Vec::new()
314 }
315 }
316
317 pub fn get_aggregated_stats(&self) -> AggregatedTaskStats {
319 let profiles = self.get_all_profiles();
320
321 let total_tasks = profiles.len();
322 let completed_tasks = profiles.iter().filter(|p| p.is_completed()).count();
323
324 let total_memory_allocated: u64 = profiles.iter().map(|p| p.total_bytes).sum();
325 let current_memory_usage: u64 = profiles.iter().map(|p| p.current_memory).sum();
326 let peak_memory_usage: u64 = profiles.iter().map(|p| p.peak_memory).max().unwrap_or(0);
327
328 let total_duration: Duration = profiles.iter().map(|p| p.lifetime()).sum::<Duration>();
329 let average_lifetime = if total_tasks > 0 {
330 let total_secs = total_duration.as_secs_f64();
331 let avg_secs = total_secs / total_tasks as f64;
332 Duration::from_secs_f64(avg_secs)
333 } else {
334 Duration::ZERO
335 };
336
337 let overall_efficiency = if total_memory_allocated > 0 {
338 let total_deallocated = total_memory_allocated - current_memory_usage;
339 total_deallocated as f64 / total_memory_allocated as f64
340 } else {
341 1.0
342 };
343
344 let potential_leaks = profiles.iter().filter(|p| p.has_potential_leak()).count();
345
346 AggregatedTaskStats {
347 total_tasks,
348 completed_tasks,
349 total_memory_allocated,
350 current_memory_usage,
351 peak_memory_usage,
352 average_lifetime,
353 overall_efficiency,
354 potential_leaks,
355 }
356 }
357
358 pub fn clear(&self) {
360 if let Ok(mut profiles) = self.profiles.lock() {
361 profiles.clear();
362 }
363 }
364
365 pub fn active_task_count(&self) -> usize {
367 if let Ok(profiles) = self.profiles.lock() {
368 profiles.iter().filter(|(_, p)| !p.is_completed()).count()
369 } else {
370 0
371 }
372 }
373}
374
375impl Default for TaskProfileManager {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct AggregatedTaskStats {
384 pub total_tasks: usize,
386 pub completed_tasks: usize,
388 pub total_memory_allocated: u64,
390 pub current_memory_usage: u64,
392 pub peak_memory_usage: u64,
394 pub average_lifetime: Duration,
396 pub overall_efficiency: f64,
398 pub potential_leaks: usize,
400}
401
402impl AggregatedTaskStats {
403 pub fn new() -> Self {
405 Self {
406 total_tasks: 0,
407 completed_tasks: 0,
408 total_memory_allocated: 0,
409 current_memory_usage: 0,
410 peak_memory_usage: 0,
411 average_lifetime: Duration::ZERO,
412 overall_efficiency: 1.0,
413 potential_leaks: 0,
414 }
415 }
416
417 pub fn memory_summary(&self) -> String {
419 format!(
420 "Tasks: {} ({}% complete), Memory: {:.2}MB allocated, {:.2}MB current, {:.1}% efficiency, {} potential leaks",
421 self.total_tasks,
422 if self.total_tasks > 0 {
423 self.completed_tasks * 100 / self.total_tasks
424 } else {
425 0
426 },
427 self.total_memory_allocated as f64 / 1_048_576.0,
428 self.current_memory_usage as f64 / 1_048_576.0,
429 self.overall_efficiency * 100.0,
430 self.potential_leaks
431 )
432 }
433}
434
435impl Default for AggregatedTaskStats {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444
445 #[test]
446 fn test_task_type_description() {
447 assert_eq!(
448 TaskType::CpuIntensive.description(),
449 "CPU-intensive workload"
450 );
451 assert_eq!(TaskType::IoIntensive.description(), "IO-intensive workload");
452 assert_eq!(
453 TaskType::NetworkIntensive.description(),
454 "Network-intensive workload"
455 );
456 }
457
458 #[test]
459 fn test_task_type_resource_priority() {
460 let (cpu, io, net, mem) = TaskType::CpuIntensive.resource_priority();
461 assert!(cpu > io && cpu > net && cpu > mem);
462
463 let (cpu, io, net, mem) = TaskType::IoIntensive.resource_priority();
464 assert!(io > cpu && io > net && io > mem);
465 }
466
467 #[test]
468 fn test_task_memory_profile_basic() {
469 let profile = TaskMemoryProfile::new(1, "test_task".to_string(), TaskType::CpuIntensive);
470
471 assert_eq!(profile.task_id, 1);
472 assert_eq!(profile.task_name, "test_task");
473 assert_eq!(profile.task_type, TaskType::CpuIntensive);
474 assert!(!profile.is_completed());
475 assert_eq!(profile.current_memory, 0);
476 assert_eq!(profile.total_bytes, 0);
477 }
478
479 #[test]
480 fn test_record_allocation() {
481 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
482
483 profile.record_allocation(1024);
484 assert_eq!(profile.current_memory, 1024);
485 assert_eq!(profile.total_bytes, 1024);
486 assert_eq!(profile.peak_memory, 1024);
487 assert_eq!(profile.total_allocations, 1);
488
489 profile.record_allocation(2048);
490 assert_eq!(profile.current_memory, 3072);
491 assert_eq!(profile.total_bytes, 3072);
492 assert_eq!(profile.peak_memory, 3072);
493 assert_eq!(profile.total_allocations, 2);
494 }
495
496 #[test]
497 fn test_record_deallocation() {
498 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
499
500 profile.record_allocation(3072);
501 profile.record_deallocation(1024);
502 assert_eq!(profile.current_memory, 2048);
503 assert_eq!(profile.total_bytes, 3072);
504 assert_eq!(profile.peak_memory, 3072);
505 assert_eq!(profile.total_deallocations, 1);
506 }
507
508 #[test]
509 fn test_memory_efficiency() {
510 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
511
512 profile.record_allocation(1000);
513 profile.record_deallocation(1000);
514 assert_eq!(profile.memory_efficiency(), 1.0);
515
516 profile.record_allocation(1000);
517 assert_eq!(profile.memory_efficiency(), 0.5);
518 }
519
520 #[test]
521 fn test_potential_leak_detection() {
522 let mut profile = TaskMemoryProfile::new(1, "test".to_string(), TaskType::Mixed);
523
524 profile.record_allocation(1000);
525 assert!(!profile.has_potential_leak());
526
527 profile.record_deallocation(1000);
528 profile.mark_completed();
529 assert!(!profile.has_potential_leak());
530
531 let mut profile2 = TaskMemoryProfile::new(2, "test2".to_string(), TaskType::Mixed);
532 profile2.record_allocation(1000);
533 profile2.mark_completed();
534 assert!(profile2.has_potential_leak());
535 }
536
537 #[test]
538 fn test_task_profile_manager() {
539 let manager = TaskProfileManager::new();
540
541 let task_id = manager.create_task("test_task".to_string(), TaskType::CpuIntensive);
542 assert!(task_id > 0);
543
544 manager.record_allocation(task_id, 1024);
545 manager.record_allocation(task_id, 2048);
546
547 let profile = manager.get_profile(task_id);
548 assert!(profile.is_some());
549 assert_eq!(profile.unwrap().total_bytes, 3072);
550 }
551
552 #[test]
553 fn test_aggregated_stats() {
554 let manager = TaskProfileManager::new();
555
556 let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
557 manager.record_allocation(task1, 1000);
558 manager.record_deallocation(task1, 500);
559 manager.complete_task(task1);
560
561 let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
562 manager.record_allocation(task2, 2000);
563
564 let stats = manager.get_aggregated_stats();
565 assert_eq!(stats.total_tasks, 2);
566 assert_eq!(stats.completed_tasks, 1);
567 assert_eq!(stats.total_memory_allocated, 3000);
568 assert_eq!(stats.current_memory_usage, 2500);
569 }
570
571 #[test]
572 fn test_active_task_count() {
573 let manager = TaskProfileManager::new();
574
575 let task1 = manager.create_task("task1".to_string(), TaskType::Mixed);
576 let task2 = manager.create_task("task2".to_string(), TaskType::Mixed);
577
578 assert_eq!(manager.active_task_count(), 2);
579
580 manager.complete_task(task1);
581 assert_eq!(manager.active_task_count(), 1);
582
583 manager.complete_task(task2);
584 assert_eq!(manager.active_task_count(), 0);
585 }
586
587 #[test]
588 fn test_profiles_by_type() {
589 let manager = TaskProfileManager::new();
590
591 let _ = manager.create_task("cpu_task".to_string(), TaskType::CpuIntensive);
592 let _ = manager.create_task("io_task".to_string(), TaskType::IoIntensive);
593 let _ = manager.create_task("cpu_task2".to_string(), TaskType::CpuIntensive);
594
595 let cpu_profiles = manager.get_profiles_by_type(TaskType::CpuIntensive);
596 assert_eq!(cpu_profiles.len(), 2);
597
598 let io_profiles = manager.get_profiles_by_type(TaskType::IoIntensive);
599 assert_eq!(io_profiles.len(), 1);
600 }
601}