1use crate::core::safe_operations::SafeLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex, OnceLock};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14static GLOBAL_ASYNC_ANALYZER: OnceLock<Arc<AsyncAnalyzer>> = OnceLock::new();
16
17pub fn get_global_async_analyzer() -> Arc<AsyncAnalyzer> {
19 GLOBAL_ASYNC_ANALYZER
20 .get_or_init(|| Arc::new(AsyncAnalyzer::new()))
21 .clone()
22}
23
24pub struct AsyncAnalyzer {
26 active_futures: Mutex<HashMap<usize, FutureInfo>>,
28 state_transitions: Mutex<Vec<StateTransition>>,
30 await_points: Mutex<Vec<AwaitPoint>>,
32 task_events: Mutex<Vec<TaskEvent>>,
34}
35
36impl Default for AsyncAnalyzer {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl AsyncAnalyzer {
43 pub fn new() -> Self {
45 Self {
46 active_futures: Mutex::new(HashMap::new()),
47 state_transitions: Mutex::new(Vec::new()),
48 await_points: Mutex::new(Vec::new()),
49 task_events: Mutex::new(Vec::new()),
50 }
51 }
52
53 pub fn track_future(&self, ptr: usize, future_type: &str, initial_state: FutureState) {
55 let future_info = FutureInfo {
56 ptr,
57 future_type: future_type.to_string(),
58 current_state: initial_state.clone(),
59 creation_time: current_timestamp(),
60 completion_time: None,
61 state_history: vec![initial_state.clone()],
62 await_count: 0,
63 poll_count: 0,
64 thread_id: format!("{:?}", std::thread::current().id()),
65 };
66
67 if let Ok(mut futures) = self.active_futures.lock() {
68 futures.insert(ptr, future_info);
69 }
70
71 let event = TaskEvent {
73 ptr,
74 event_type: TaskEventType::Created,
75 timestamp: current_timestamp(),
76 thread_id: format!("{:?}", std::thread::current().id()),
77 details: format!("Future {future_type} created"),
78 };
79
80 if let Ok(mut events) = self.task_events.lock() {
81 events.push(event);
82 }
83 }
84
85 pub fn record_state_transition(
87 &self,
88 ptr: usize,
89 from_state: FutureState,
90 to_state: FutureState,
91 ) {
92 let transition = StateTransition {
93 ptr,
94 from_state: from_state.clone(),
95 to_state: to_state.clone(),
96 timestamp: current_timestamp(),
97 thread_id: format!("{:?}", std::thread::current().id()),
98 };
99
100 if let Ok(mut transitions) = self.state_transitions.lock() {
101 transitions.push(transition);
102 }
103
104 if let Ok(mut futures) = self.active_futures.lock() {
106 if let Some(future_info) = futures.get_mut(&ptr) {
107 future_info.current_state = to_state.clone();
108 future_info.state_history.push(to_state.clone());
109
110 if matches!(to_state, FutureState::Pending) {
111 future_info.poll_count += 1;
112 }
113 }
114 }
115 }
116
117 pub fn record_await_point(&self, ptr: usize, location: &str, await_type: AwaitType) {
119 let await_point = AwaitPoint {
120 ptr,
121 location: location.to_string(),
122 await_type,
123 timestamp: current_timestamp(),
124 thread_id: format!("{:?}", std::thread::current().id()),
125 duration: None, };
127
128 if let Ok(mut awaits) = self.await_points.lock() {
129 awaits.push(await_point);
130 }
131
132 if let Ok(mut futures) = self.active_futures.lock() {
134 if let Some(future_info) = futures.get_mut(&ptr) {
135 future_info.await_count += 1;
136 }
137 }
138 }
139
140 pub fn complete_await_point(&self, ptr: usize, location: &str) {
142 let completion_time = current_timestamp();
143
144 if let Ok(mut awaits) = self.await_points.lock() {
145 for await_point in awaits.iter_mut().rev() {
147 if await_point.ptr == ptr
148 && await_point.location == location
149 && await_point.duration.is_none()
150 {
151 await_point.duration = Some(completion_time - await_point.timestamp);
152 break;
153 }
154 }
155 }
156 }
157
158 pub fn complete_future(&self, ptr: usize, result: FutureResult) {
160 let completion_time = current_timestamp();
161
162 if let Ok(mut futures) = self.active_futures.lock() {
163 if let Some(future_info) = futures.get_mut(&ptr) {
164 future_info.completion_time = Some(completion_time);
165 future_info.current_state = match result {
166 FutureResult::Ready => FutureState::Ready,
167 FutureResult::Cancelled => FutureState::Cancelled,
168 FutureResult::Panicked => FutureState::Panicked,
169 };
170 }
171 }
172
173 let event = TaskEvent {
175 ptr,
176 event_type: TaskEventType::Completed,
177 timestamp: completion_time,
178 thread_id: format!("{:?}", std::thread::current().id()),
179 details: format!("Future completed with result: {result:?}"),
180 };
181
182 if let Ok(mut events) = self.task_events.lock() {
183 events.push(event);
184 }
185 }
186
187 pub fn get_async_statistics(&self) -> AsyncStatistics {
189 let futures = self
190 .active_futures
191 .safe_lock()
192 .expect("Failed to acquire lock on active_futures");
193 let transitions = self
194 .state_transitions
195 .safe_lock()
196 .expect("Failed to acquire lock on state_transitions");
197 let awaits = self
198 .await_points
199 .safe_lock()
200 .expect("Failed to acquire lock on await_points");
201 let _events = self
202 .task_events
203 .safe_lock()
204 .expect("Failed to acquire lock on task_events");
205
206 let total_futures = futures.len();
207 let completed_futures = futures
208 .values()
209 .filter(|f| f.completion_time.is_some())
210 .count();
211 let active_futures = total_futures - completed_futures;
212
213 let completion_times: Vec<u64> = futures
215 .values()
216 .filter_map(|f| {
217 if let (Some(completion), creation) = (f.completion_time, f.creation_time) {
218 Some(completion - creation)
219 } else {
220 None
221 }
222 })
223 .collect();
224
225 let avg_completion_time = if !completion_times.is_empty() {
226 completion_times.iter().sum::<u64>() / completion_times.len() as u64
227 } else {
228 0
229 };
230
231 let total_awaits = awaits.len();
233 let completed_awaits = awaits.iter().filter(|a| a.duration.is_some()).count();
234
235 let await_durations: Vec<u64> = awaits.iter().filter_map(|a| a.duration).collect();
236
237 let avg_await_duration = if !await_durations.is_empty() {
238 await_durations.iter().sum::<u64>() / await_durations.len() as u64
239 } else {
240 0
241 };
242
243 let mut by_type = HashMap::new();
245 for future in futures.values() {
246 *by_type.entry(future.future_type.clone()).or_insert(0) += 1;
247 }
248
249 AsyncStatistics {
250 total_futures,
251 active_futures,
252 completed_futures,
253 total_state_transitions: transitions.len(),
254 total_awaits,
255 completed_awaits,
256 avg_completion_time,
257 avg_await_duration,
258 by_type,
259 }
260 }
261
262 pub fn analyze_async_patterns(&self) -> AsyncPatternAnalysis {
264 let futures = self
265 .active_futures
266 .safe_lock()
267 .expect("Failed to acquire lock on active_futures");
268 let awaits = self
269 .await_points
270 .safe_lock()
271 .expect("Failed to acquire lock on await_points");
272
273 let mut patterns = Vec::new();
274
275 let long_running_threshold = 1_000_000_000; let long_running_count = futures
278 .values()
279 .filter(|f| {
280 if let Some(completion) = f.completion_time {
281 completion - f.creation_time > long_running_threshold
282 } else {
283 current_timestamp() - f.creation_time > long_running_threshold
284 }
285 })
286 .count();
287
288 if long_running_count > 0 {
289 patterns.push(AsyncPattern {
290 pattern_type: AsyncPatternType::LongRunningFutures,
291 description: format!("{long_running_count} futures running longer than 1 second",),
292 severity: AsyncPatternSeverity::Warning,
293 suggestion: "Consider breaking down long-running operations or adding timeouts"
294 .to_string(),
295 });
296 }
297
298 let high_poll_threshold = 100;
300 let high_poll_count = futures
301 .values()
302 .filter(|f| f.poll_count > high_poll_threshold)
303 .count();
304
305 if high_poll_count > 0 {
306 patterns.push(AsyncPattern {
307 pattern_type: AsyncPatternType::ExcessivePolling,
308 description: format!(
309 "{high_poll_count} futures polled more than {high_poll_threshold} times",
310 ),
311 severity: AsyncPatternSeverity::Warning,
312 suggestion: "High poll count may indicate inefficient async design".to_string(),
313 });
314 }
315
316 let high_concurrency_threshold = 50;
318 if futures.len() > high_concurrency_threshold {
319 patterns.push(AsyncPattern {
320 pattern_type: AsyncPatternType::HighConcurrency,
321 description: format!("{} concurrent futures detected", futures.len()),
322 severity: AsyncPatternSeverity::Info,
323 suggestion:
324 "High concurrency - ensure this is intentional and resources are managed"
325 .to_string(),
326 });
327 }
328
329 let slow_await_threshold = 100_000_000; let slow_awaits = awaits
332 .iter()
333 .filter(|a| a.duration.is_some_and(|d| d > slow_await_threshold))
334 .count();
335
336 if slow_awaits > 0 {
337 patterns.push(AsyncPattern {
338 pattern_type: AsyncPatternType::SlowAwaitPoints,
339 description: format!("{slow_awaits} await points took longer than 100ms"),
340 severity: AsyncPatternSeverity::Warning,
341 suggestion: "Slow await points may indicate blocking operations in async code"
342 .to_string(),
343 });
344 }
345
346 AsyncPatternAnalysis {
347 patterns,
348 total_futures_analyzed: futures.len(),
349 analysis_timestamp: current_timestamp(),
350 }
351 }
352
353 pub fn get_future_info(&self, ptr: usize) -> Option<FutureInfo> {
355 self.active_futures
356 .safe_lock()
357 .expect("Failed to acquire lock on active_futures")
358 .get(&ptr)
359 .cloned()
360 }
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct FutureInfo {
366 pub ptr: usize,
368 pub future_type: String,
370 pub current_state: FutureState,
372 pub creation_time: u64,
374 pub completion_time: Option<u64>,
376 pub state_history: Vec<FutureState>,
378 pub await_count: usize,
380 pub poll_count: usize,
382 pub thread_id: String,
384}
385
386#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
388pub enum FutureState {
389 Pending,
391 Ready,
393 Cancelled,
395 Panicked,
397 Created,
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct StateTransition {
404 pub ptr: usize,
406 pub from_state: FutureState,
408 pub to_state: FutureState,
410 pub timestamp: u64,
412 pub thread_id: String,
414}
415
416#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct AwaitPoint {
419 pub ptr: usize,
421 pub location: String,
423 pub await_type: AwaitType,
425 pub timestamp: u64,
427 pub thread_id: String,
429 pub duration: Option<u64>,
431}
432
433#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
435pub enum AwaitType {
436 Regular,
438 Timeout,
440 Select,
442 Join,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct TaskEvent {
449 pub ptr: usize,
451 pub event_type: TaskEventType,
453 pub timestamp: u64,
455 pub thread_id: String,
457 pub details: String,
459}
460
461#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
463pub enum TaskEventType {
464 Created,
466 Started,
468 Suspended,
470 Resumed,
472 Completed,
474 Cancelled,
476}
477
478#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
480pub enum FutureResult {
481 Ready,
483 Cancelled,
485 Panicked,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct AsyncStatistics {
492 pub total_futures: usize,
494 pub active_futures: usize,
496 pub completed_futures: usize,
498 pub total_state_transitions: usize,
500 pub total_awaits: usize,
502 pub completed_awaits: usize,
504 pub avg_completion_time: u64,
506 pub avg_await_duration: u64,
508 pub by_type: HashMap<String, usize>,
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
514pub struct AsyncPatternAnalysis {
515 pub patterns: Vec<AsyncPattern>,
517 pub total_futures_analyzed: usize,
519 pub analysis_timestamp: u64,
521}
522
523#[derive(Debug, Clone, Serialize, Deserialize)]
525pub struct AsyncPattern {
526 pub pattern_type: AsyncPatternType,
528 pub description: String,
530 pub severity: AsyncPatternSeverity,
532 pub suggestion: String,
534}
535
536#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
538pub enum AsyncPatternType {
539 LongRunningFutures,
541 ExcessivePolling,
543 HighConcurrency,
545 SlowAwaitPoints,
547 FutureMemoryLeaks,
549}
550
551#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
553pub enum AsyncPatternSeverity {
554 Info,
556 Warning,
558 Error,
560}
561
562fn current_timestamp() -> u64 {
564 SystemTime::now()
565 .duration_since(UNIX_EPOCH)
566 .unwrap_or_default()
567 .as_nanos() as u64
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573
574 #[test]
575 fn test_future_tracking() {
576 let analyzer = AsyncAnalyzer::new();
577
578 analyzer.track_future(0x1000, "async_fn", FutureState::Created);
580
581 let info = analyzer.get_future_info(0x1000);
583 assert!(info.is_some());
584 assert_eq!(
585 info.expect("Failed to get async info").future_type,
586 "async_fn"
587 );
588
589 analyzer.record_state_transition(0x1000, FutureState::Created, FutureState::Pending);
591
592 let info = analyzer.get_future_info(0x1000);
594 assert_eq!(
595 info.expect("Failed to get async info").current_state,
596 FutureState::Pending
597 );
598 }
599
600 #[test]
601 fn test_await_tracking() {
602 let analyzer = AsyncAnalyzer::new();
603
604 analyzer.track_future(0x1000, "async_fn", FutureState::Created);
606 analyzer.record_await_point(0x1000, "line_42", AwaitType::Regular);
607
608 analyzer.complete_await_point(0x1000, "line_42");
610
611 let stats = analyzer.get_async_statistics();
613 assert_eq!(stats.total_awaits, 1);
614 assert_eq!(stats.completed_awaits, 1);
615 }
616}