1use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Mutex};
9use std::thread::ThreadId;
10
11use super::async_types::{
12 AsyncAllocation, AsyncError, AsyncMemorySnapshot, AsyncResult, AsyncSnapshot, AsyncStats,
13 TrackedFuture,
14};
15use super::task_profile::{TaskMemoryProfile, TaskType};
16
17#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
19pub struct TaskReport {
20 pub task_name: String,
21 pub task_type: TaskType,
22 pub efficiency_score: f64,
23 pub cpu_efficiency: f64,
24 pub memory_efficiency: f64,
25 pub io_efficiency: f64,
26 pub bottleneck: String,
27 pub recommendations: Vec<String>,
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct ResourceRanking {
33 pub task_name: String,
34 pub task_type: TaskType,
35 pub cpu_usage: f64,
36 pub memory_usage_mb: f64,
37 pub io_usage_mb: f64,
38 pub network_usage_mb: f64,
39 pub gpu_usage: f64,
40 pub overall_score: f64,
41}
42
43static GLOBAL_TRACKER: Mutex<Option<Arc<AsyncTracker>>> = Mutex::new(None);
45
46thread_local! {
47 static CURRENT_TASK_ID: std::cell::Cell<Option<u64>> = const { std::cell::Cell::new(None) };
48}
49
50pub struct TaskGuard {
53 task_id: u64,
54 cleaned_up: bool,
55}
56
57unsafe impl Send for TaskGuard {}
69
70unsafe impl Sync for TaskGuard {}
78
79impl TaskGuard {
80 fn new(task_id: u64) -> Self {
81 Self {
82 task_id,
83 cleaned_up: false,
84 }
85 }
86
87 pub fn task_id(&self) -> u64 {
89 self.task_id
90 }
91
92 pub fn release(mut self) {
94 self.cleaned_up = true;
95 TaskGuard::clear_current_task_internal();
96 }
97}
98
99impl Drop for TaskGuard {
100 fn drop(&mut self) {
101 if !self.cleaned_up {
102 TaskGuard::clear_current_task_internal();
103 }
104 }
105}
106
107impl TaskGuard {
108 fn clear_current_task_internal() {
109 CURRENT_TASK_ID.with(|cell| cell.set(None));
110 }
111}
112
113pub struct AsyncTracker {
115 allocations: Arc<Mutex<HashMap<usize, AsyncAllocation>>>,
116 stats: Arc<Mutex<AsyncStats>>,
117 profiles: Arc<Mutex<HashMap<u64, TaskMemoryProfile>>>,
118 initialized: Arc<Mutex<bool>>,
119}
120
121impl AsyncTracker {
122 pub fn new() -> Self {
123 Self {
124 allocations: Arc::new(Mutex::new(HashMap::new())),
125 stats: Arc::new(Mutex::new(AsyncStats::default())),
126 profiles: Arc::new(Mutex::new(HashMap::new())),
127 initialized: Arc::new(Mutex::new(false)),
128 }
129 }
130
131 pub fn set_current_task(task_id: u64) {
132 CURRENT_TASK_ID.with(|cell| cell.set(Some(task_id)));
133 }
134
135 pub fn clear_current_task() {
136 CURRENT_TASK_ID.with(|cell| cell.set(None));
137 }
138
139 pub fn get_current_task() -> Option<u64> {
140 CURRENT_TASK_ID.with(|cell| cell.get())
141 }
142
143 pub fn enter_task(task_id: u64) -> TaskGuard {
146 Self::set_current_task(task_id);
147 TaskGuard::new(task_id)
148 }
149
150 pub fn with_task<F, T>(task_id: u64, f: F) -> T
154 where
155 F: FnOnce() -> T,
156 {
157 let _guard = Self::enter_task(task_id);
158 f()
159 }
160
161 pub fn track_task_start(
162 &self,
163 task_id: u64,
164 name: String,
165 _thread_id: ThreadId,
166 ) -> Result<(), AsyncError> {
167 {
168 let mut profiles = self
169 .profiles
170 .lock()
171 .map_err(|e| AsyncError::mutex_lock_failed("profiles", &e.to_string()))?;
172
173 if profiles.contains_key(&task_id) {
174 return Err(AsyncError::duplicate_task(task_id));
175 }
176
177 profiles.insert(
178 task_id,
179 TaskMemoryProfile::new(task_id, name, TaskType::default()),
180 );
181 }
182
183 let mut stats = self
184 .stats
185 .lock()
186 .map_err(|e| AsyncError::mutex_lock_failed("stats", &e.to_string()))?;
187 stats.total_tasks += 1;
188 stats.active_tasks += 1;
189
190 Self::set_current_task(task_id);
191
192 Ok(())
193 }
194
195 pub fn track_task_end(&self, task_id: u64) -> Result<(), AsyncError> {
197 {
198 let mut profiles = self
199 .profiles
200 .lock()
201 .map_err(|e| AsyncError::mutex_lock_failed("profiles", &e.to_string()))?;
202
203 let profile = profiles
204 .get_mut(&task_id)
205 .ok_or_else(|| AsyncError::task_not_found(task_id))?;
206
207 if profile.is_completed() {
208 return Ok(());
209 }
210
211 profile.mark_completed();
212 }
213
214 let mut stats = self
215 .stats
216 .lock()
217 .map_err(|e| AsyncError::mutex_lock_failed("stats", &e.to_string()))?;
218 stats.active_tasks = stats.active_tasks.saturating_sub(1);
219
220 Self::clear_current_task();
221
222 Ok(())
223 }
224
225 pub fn track_allocation_auto(
226 &self,
227 ptr: usize,
228 size: usize,
229 var_name: Option<String>,
230 type_name: Option<String>,
231 ) {
232 if let Some(task_id) = Self::get_current_task() {
233 self.track_allocation_with_location(ptr, size, task_id, var_name, type_name, None);
234 }
235 }
236
237 pub fn track_allocation(&self, ptr: usize, size: usize, task_id: u64) {
239 self.track_allocation_with_location(ptr, size, task_id, None, None, None);
240 }
241
242 pub fn track_allocation_with_location(
244 &self,
245 ptr: usize,
246 size: usize,
247 task_id: u64,
248 var_name: Option<String>,
249 type_name: Option<String>,
250 source_location: Option<super::async_types::SourceLocation>,
251 ) {
252 let allocation = AsyncAllocation {
253 ptr,
254 size,
255 timestamp: Self::now(),
256 task_id,
257 var_name,
258 type_name,
259 source_location,
260 };
261
262 {
263 if let Ok(mut allocations) = self.allocations.lock() {
264 allocations.insert(ptr, allocation);
265 } else {
266 tracing::error!("Failed to acquire allocations lock during track_allocation");
267 }
268 }
269
270 {
271 if let Ok(mut profiles) = self.profiles.lock() {
272 if let Some(profile) = profiles.get_mut(&task_id) {
273 profile.record_allocation(size as u64);
274 }
275 } else {
276 tracing::error!("Failed to acquire profiles lock during track_allocation");
277 }
278 }
279
280 {
281 if let Ok(mut stats) = self.stats.lock() {
282 stats.total_allocations += 1;
283 stats.total_memory += size;
284 stats.active_memory += size;
285 if stats.active_memory > stats.peak_memory {
286 stats.peak_memory = stats.active_memory;
287 }
288 } else {
289 tracing::error!("Failed to acquire stats lock during track_allocation");
290 }
291 }
292 }
293
294 pub fn track_deallocation(&self, ptr: usize) {
296 let (task_id, size) = {
297 if let Ok(mut allocations) = self.allocations.lock() {
298 allocations
299 .remove(&ptr)
300 .map(|alloc| (alloc.task_id, alloc.size))
301 .unwrap_or((0, 0))
302 } else {
303 tracing::error!("Failed to acquire allocations lock during track_deallocation");
304 (0, 0)
305 }
306 };
307
308 if task_id != 0 {
309 if let Ok(mut profiles) = self.profiles.lock() {
310 if let Some(profile) = profiles.get_mut(&task_id) {
311 profile.record_deallocation(size as u64);
312 }
313 } else {
314 tracing::error!("Failed to acquire profiles lock during track_deallocation");
315 }
316 }
317
318 if size > 0 {
319 if let Ok(mut stats) = self.stats.lock() {
320 stats.active_memory = stats.active_memory.saturating_sub(size);
321 stats.total_deallocations += 1;
322 stats.total_deallocated += size as u64;
323 } else {
324 tracing::error!("Failed to acquire stats lock during track_deallocation");
325 }
326 }
327 }
328
329 pub fn get_stats(&self) -> AsyncStats {
331 if let Ok(stats) = self.stats.lock() {
332 stats.clone()
333 } else {
334 tracing::error!("Failed to acquire stats lock in get_stats");
335 AsyncStats::default()
336 }
337 }
338
339 pub fn snapshot(&self) -> AsyncSnapshot {
341 let profiles = if let Ok(p) = self.profiles.lock() {
342 p
343 } else {
344 tracing::error!("Failed to acquire profiles lock in snapshot");
345 return AsyncSnapshot::default();
346 };
347
348 let tasks: Vec<super::async_types::TaskInfo> = profiles
349 .values()
350 .filter(|p| p.completed_at_ms.is_none())
351 .map(|p| super::async_types::TaskInfo {
352 task_id: p.task_id,
353 name: p.task_name.clone(),
354 thread_id: std::thread::current().id(),
355 created_at: p.created_at_ms * 1_000_000,
356 active_allocations: p.total_allocations as usize,
357 total_memory: p.current_memory as usize,
358 })
359 .collect();
360 drop(profiles);
361
362 let allocations = {
363 if let Ok(allocs) = self.allocations.lock() {
364 allocs.values().cloned().collect()
365 } else {
366 tracing::error!("Failed to acquire allocations lock in snapshot");
367 Vec::new()
368 }
369 };
370
371 let stats = self.get_stats();
372
373 AsyncSnapshot {
374 timestamp: Self::now(),
375 tasks,
376 allocations,
377 stats,
378 }
379 }
380
381 pub fn get_task_profile(&self, task_id: u64) -> Option<TaskMemoryProfile> {
383 if let Ok(profiles) = self.profiles.lock() {
384 profiles.get(&task_id).cloned()
385 } else {
386 tracing::error!("Failed to acquire profiles lock in get_task_profile");
387 None
388 }
389 }
390
391 pub fn get_all_profiles(&self) -> Vec<TaskMemoryProfile> {
393 if let Ok(profiles) = self.profiles.lock() {
394 profiles.values().cloned().collect()
395 } else {
396 tracing::error!("Failed to acquire profiles lock in get_all_profiles");
397 Vec::new()
398 }
399 }
400
401 pub fn is_initialized(&self) -> bool {
403 if let Ok(initialized) = self.initialized.lock() {
404 *initialized
405 } else {
406 tracing::error!("Failed to acquire initialized lock in is_initialized");
407 false
408 }
409 }
410
411 pub fn set_initialized(&self) {
413 if let Ok(mut initialized) = self.initialized.lock() {
414 *initialized = true;
415 } else {
416 tracing::error!("Failed to acquire initialized lock in set_initialized");
417 }
418 }
419
420 pub fn analyze_task(&self, task_id: u64, task_type: TaskType) -> Option<TaskReport> {
422 let profile = self.get_task_profile(task_id)?;
423
424 let total_bytes = profile.total_bytes as f64;
425 let total_allocations = profile.total_allocations as f64;
426 let peak_memory = profile.peak_memory as f64;
427 let duration_ms = profile.duration_ns as f64 / 1_000_000.0;
428
429 let compute_efficiency = if duration_ms > 0.0 {
430 (total_allocations / duration_ms * 1000.0).min(1.0)
431 } else {
432 0.0
433 };
434
435 let cpu_efficiency = match task_type {
436 TaskType::CpuIntensive | TaskType::IoIntensive | TaskType::GpuCompute => {
437 compute_efficiency
438 }
439 TaskType::MemoryIntensive => {
440 if total_bytes > 0.0 {
441 (peak_memory / total_bytes).min(1.0)
442 } else {
443 0.0
444 }
445 }
446 TaskType::NetworkIntensive => {
447 if total_bytes > 0.0 {
448 (total_allocations / total_bytes * 1000.0).min(1.0)
449 } else {
450 0.0
451 }
452 }
453 _ => compute_efficiency,
454 };
455
456 let memory_efficiency = if total_bytes > 0.0 {
457 (total_allocations / total_bytes * 1000.0).min(1.0)
458 } else {
459 0.0
460 };
461
462 let io_efficiency = if duration_ms > 0.0 {
463 (total_bytes / duration_ms / 1_048_576.0).min(1.0)
464 } else {
465 0.0
466 };
467
468 let efficiency_score = (cpu_efficiency + memory_efficiency + io_efficiency) / 3.0;
469
470 let bottleneck = if duration_ms > 5000.0 {
471 "Execution Time".to_string()
472 } else if peak_memory > 100.0 * 1024.0 * 1024.0 {
473 "Memory".to_string()
474 } else if total_allocations > 10000.0 {
475 "Allocations".to_string()
476 } else {
477 "None".to_string()
478 };
479
480 let mut recommendations = Vec::new();
481 if duration_ms > 5000.0 {
482 recommendations.push("Consider optimizing task execution time".to_string());
483 }
484 if peak_memory > 100.0 * 1024.0 * 1024.0 {
485 recommendations.push("Reduce peak memory usage".to_string());
486 }
487 if total_allocations > 10000.0 {
488 recommendations.push("Reduce number of allocations".to_string());
489 }
490 if recommendations.is_empty() {
491 recommendations.push("Performance is good".to_string());
492 }
493
494 Some(TaskReport {
495 task_name: profile.task_name.clone(),
496 task_type,
497 efficiency_score,
498 cpu_efficiency,
499 memory_efficiency,
500 io_efficiency,
501 bottleneck,
502 recommendations,
503 })
504 }
505
506 pub fn get_resource_rankings(&self) -> Vec<ResourceRanking> {
508 let profiles = self.get_all_profiles();
509
510 let mut rankings: Vec<ResourceRanking> = profiles
511 .into_iter()
512 .map(|profile| {
513 let memory_mb = profile.total_bytes as f64 / 1_048_576.0;
514 let peak_memory_mb = profile.peak_memory as f64 / 1_048_576.0;
515 let duration_ms = profile.duration_ns as f64 / 1_000_000.0;
516 let allocation_rate = profile.allocation_rate;
517
518 let overall_score = memory_mb * 0.3
519 + peak_memory_mb * 0.2
520 + allocation_rate * 0.0001
521 + duration_ms * 0.0001;
522
523 ResourceRanking {
524 task_name: profile.task_name.clone(),
525 task_type: profile.task_type,
526 cpu_usage: allocation_rate,
527 memory_usage_mb: memory_mb,
528 io_usage_mb: 0.0,
529 network_usage_mb: 0.0,
530 gpu_usage: 0.0,
531 overall_score,
532 }
533 })
534 .collect();
535
536 rankings.sort_by(|a, b| {
537 b.overall_score
538 .partial_cmp(&a.overall_score)
539 .unwrap_or(std::cmp::Ordering::Equal)
540 });
541
542 rankings
543 }
544
545 fn now() -> u64 {
547 std::time::SystemTime::now()
548 .duration_since(std::time::UNIX_EPOCH)
549 .unwrap_or_default()
550 .as_nanos() as u64
551 }
552}
553
554impl Default for AsyncTracker {
555 fn default() -> Self {
556 Self::new()
557 }
558}
559
560impl Drop for AsyncTracker {
561 fn drop(&mut self) {
562 Self::clear_current_task();
563 }
564}
565
566pub fn initialize() -> AsyncResult<()> {
568 let mut global = GLOBAL_TRACKER.lock().map_err(|_| AsyncError::System {
569 operation: Arc::from("initialize"),
570 message: Arc::from("Failed to acquire global tracker lock"),
571 })?;
572
573 if global.is_none() {
574 let tracker = AsyncTracker::new();
575 tracker.set_initialized();
576 *global = Some(Arc::new(tracker));
577 tracing::info!("Async memory tracking system initialized");
578 Ok(())
579 } else {
580 Err(AsyncError::initialization(
581 "tracker",
582 "Already initialized",
583 true,
584 ))
585 }
586}
587
588pub fn shutdown() -> AsyncResult<()> {
590 let mut global = GLOBAL_TRACKER.lock().map_err(|_| AsyncError::System {
591 operation: Arc::from("shutdown"),
592 message: Arc::from("Failed to acquire global tracker lock"),
593 })?;
594
595 if global.is_some() {
596 *global = None;
597 tracing::info!("Async memory tracking system shutdown");
598 Ok(())
599 } else {
600 Err(AsyncError::initialization(
601 "tracker",
602 "Not initialized",
603 true,
604 ))
605 }
606}
607
608#[cfg(test)]
610pub fn reset_global_tracker() {
611 if let Ok(mut global) = GLOBAL_TRACKER.lock() {
612 *global = None;
613 } else {
614 tracing::error!("Failed to acquire global tracker lock in reset_global_tracker");
615 }
616}
617
618fn get_global_tracker() -> AsyncResult<Arc<AsyncTracker>> {
620 GLOBAL_TRACKER
621 .lock()
622 .map_err(|_| AsyncError::System {
623 operation: Arc::from("get_global_tracker"),
624 message: Arc::from("Failed to acquire global tracker lock"),
625 })?
626 .clone()
627 .ok_or_else(|| {
628 AsyncError::initialization("tracker", "Tracking system not initialized", true)
629 })
630}
631
632pub fn create_tracked<F>(future: F) -> TrackedFuture<F>
634where
635 F: Future,
636{
637 TrackedFuture::new(future)
638}
639
640pub fn spawn_tracked<F>(future: F) -> TrackedFuture<F>
642where
643 F: Future,
644{
645 create_tracked(future)
646}
647
648pub fn get_memory_snapshot() -> AsyncMemorySnapshot {
650 if let Ok(tracker) = get_global_tracker() {
651 let stats = tracker.get_stats();
652
653 AsyncMemorySnapshot {
654 active_task_count: stats.active_tasks,
655 total_allocated_bytes: stats.total_memory as u64,
656 allocation_events: stats.total_allocations as u64,
657 events_dropped: 0,
658 buffer_utilization: 0.0,
659 }
660 } else {
661 AsyncMemorySnapshot {
662 active_task_count: 0,
663 total_allocated_bytes: 0,
664 allocation_events: 0,
665 events_dropped: 0,
666 buffer_utilization: 0.0,
667 }
668 }
669}
670
671pub fn is_tracking_active() -> bool {
673 GLOBAL_TRACKER.lock().is_ok_and(|global| global.is_some())
674}
675
676pub fn track_current_allocation(ptr: usize, size: usize) -> AsyncResult<()> {
678 let tracker = get_global_tracker()?;
679 let task_info = super::async_types::get_current_task();
680
681 if task_info.has_tracking_id() {
682 tracker.track_allocation(ptr, size, (task_info.primary_id() & 0xFFFFFFFF) as u64);
683 }
684
685 Ok(())
686}
687
688pub fn track_current_deallocation(ptr: usize) -> AsyncResult<()> {
690 let tracker = get_global_tracker()?;
691 tracker.track_deallocation(ptr);
692 Ok(())
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698 use crate::capture::backends::async_types::TaskOperation;
699
700 #[test]
701 fn test_async_tracker_creation() {
702 let tracker = AsyncTracker::new();
703 let stats = tracker.get_stats();
704 assert_eq!(stats.total_tasks, 0);
705 }
706
707 #[test]
708 fn test_task_tracking() {
709 let tracker = AsyncTracker::new();
710 let thread_id = std::thread::current().id();
711 tracker
712 .track_task_start(1, "test_task".to_string(), thread_id)
713 .unwrap();
714
715 let stats = tracker.get_stats();
716 assert_eq!(stats.total_tasks, 1);
717 assert_eq!(stats.active_tasks, 1);
718
719 tracker.track_task_end(1).unwrap();
720 let stats = tracker.get_stats();
721 assert_eq!(stats.active_tasks, 0);
722 }
723
724 #[test]
725 fn test_allocation_tracking() {
726 let tracker = AsyncTracker::new();
727 let thread_id = std::thread::current().id();
728 tracker
729 .track_task_start(1, "test_task".to_string(), thread_id)
730 .unwrap();
731 tracker.track_allocation(0x1000, 1024, 1);
732
733 let profile = tracker.get_task_profile(1);
734 assert!(profile.is_some());
735 let profile = profile.unwrap();
736 assert_eq!(profile.total_allocations, 1);
737 assert_eq!(profile.total_bytes, 1024);
738 }
739
740 #[test]
741 fn test_initialization() {
742 reset_global_tracker();
743
744 let result = initialize();
745 assert!(result.is_ok());
746
747 let result2 = initialize();
748 if let Err(e) = result2 {
749 assert!(e.message().contains("Already initialized"));
750 }
751
752 let _ = shutdown();
753 }
754
755 #[test]
756 fn test_shutdown() {
757 reset_global_tracker();
758
759 initialize().unwrap();
760 let result = shutdown();
761 assert!(result.is_ok());
762
763 let result2 = shutdown();
764 if let Err(e) = result2 {
765 assert!(e.message().contains("Not initialized"));
766 }
767 }
768
769 #[test]
770 fn test_memory_snapshot() {
771 reset_global_tracker();
772
773 initialize().unwrap();
774 let snapshot = get_memory_snapshot();
775 assert_eq!(snapshot.active_task_count, 0);
776 let _ = shutdown();
777 }
778
779 #[test]
780 fn test_is_tracking_active() {
781 reset_global_tracker();
782
783 assert!(!is_tracking_active());
784 initialize().unwrap();
785 assert!(is_tracking_active());
786 let _ = shutdown();
787 assert!(!is_tracking_active());
788 }
789
790 #[test]
791 fn test_task_memory_profile() {
792 let tracker = AsyncTracker::new();
793 let thread_id = std::thread::current().id();
794 tracker
795 .track_task_start(1, "test_task".to_string(), thread_id)
796 .unwrap();
797 tracker.track_allocation(0x1000, 1024, 1);
798 tracker.track_allocation(0x2000, 2048, 1);
799 tracker.track_task_end(1).unwrap();
800
801 let profile = tracker.get_task_profile(1);
802 assert!(profile.is_some());
803 let profile = profile.unwrap();
804 assert_eq!(profile.task_id, 1);
805 assert_eq!(profile.total_allocations, 2);
806 assert_eq!(profile.total_bytes, 3072);
807 }
808
809 #[test]
810 fn test_duplicate_task_tracking() {
811 let tracker = AsyncTracker::new();
812 let thread_id = std::thread::current().id();
813
814 let result = tracker.track_task_start(1, "test_task".to_string(), thread_id);
816 assert!(result.is_ok());
817
818 let result = tracker.track_task_start(1, "duplicate_task".to_string(), thread_id);
820 assert!(result.is_err());
821 let error = result.unwrap_err();
822 assert!(
823 matches!(error, AsyncError::TaskTracking { operation, .. } if matches!(operation, TaskOperation::Duplicate))
824 );
825 }
826
827 #[test]
828 fn test_task_not_found() {
829 let tracker = AsyncTracker::new();
830
831 let result = tracker.track_task_end(999);
833 assert!(result.is_err());
834 let error = result.unwrap_err();
835 assert!(
836 matches!(error, AsyncError::TaskTracking { operation, .. } if matches!(operation, TaskOperation::TaskNotFound))
837 );
838 }
839
840 #[test]
841 fn test_task_guard_cleanup() {
842 assert!(AsyncTracker::get_current_task().is_none());
843
844 {
845 let _guard = AsyncTracker::enter_task(42);
846 assert_eq!(AsyncTracker::get_current_task(), Some(42));
847 }
848
849 assert!(AsyncTracker::get_current_task().is_none());
850 }
851
852 #[test]
853 fn test_with_task_closure() {
854 assert!(AsyncTracker::get_current_task().is_none());
855
856 let result = AsyncTracker::with_task(123, || {
857 assert_eq!(AsyncTracker::get_current_task(), Some(123));
858 "test_result"
859 });
860
861 assert_eq!(result, "test_result");
862 assert!(AsyncTracker::get_current_task().is_none());
863 }
864
865 #[test]
866 fn test_with_task_panic_cleanup() {
867 assert!(AsyncTracker::get_current_task().is_none());
868
869 let result = std::panic::catch_unwind(|| {
870 AsyncTracker::with_task(999, || {
871 assert_eq!(AsyncTracker::get_current_task(), Some(999));
872 panic!("intentional panic");
873 });
874 });
875
876 assert!(result.is_err());
877 assert!(AsyncTracker::get_current_task().is_none());
878 }
879}