1use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex, OnceLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13static GLOBAL_ASYNC_ANALYZER: OnceLock<Arc<AsyncAnalyzer>> = OnceLock::new();
15
16pub fn get_global_async_analyzer() -> Arc<AsyncAnalyzer> {
18 GLOBAL_ASYNC_ANALYZER
19 .get_or_init(|| Arc::new(AsyncAnalyzer::new()))
20 .clone()
21}
22
23pub struct AsyncAnalyzer {
25 active_futures: Mutex<HashMap<usize, FutureInfo>>,
27 state_transitions: Mutex<Vec<StateTransition>>,
29 await_points: Mutex<Vec<AwaitPoint>>,
31 task_events: Mutex<Vec<TaskEvent>>,
33}
34
35impl AsyncAnalyzer {
36 pub fn new() -> Self {
38 Self {
39 active_futures: Mutex::new(HashMap::new()),
40 state_transitions: Mutex::new(Vec::new()),
41 await_points: Mutex::new(Vec::new()),
42 task_events: Mutex::new(Vec::new()),
43 }
44 }
45
46 pub fn track_future(&self, ptr: usize, future_type: &str, initial_state: FutureState) {
48 let future_info = FutureInfo {
49 ptr,
50 future_type: future_type.to_string(),
51 current_state: initial_state.clone(),
52 creation_time: current_timestamp(),
53 completion_time: None,
54 state_history: vec![initial_state.clone()],
55 await_count: 0,
56 poll_count: 0,
57 thread_id: format!("{:?}", std::thread::current().id()),
58 };
59
60 if let Ok(mut futures) = self.active_futures.lock() {
61 futures.insert(ptr, future_info);
62 }
63
64 let event = TaskEvent {
66 ptr,
67 event_type: TaskEventType::Created,
68 timestamp: current_timestamp(),
69 thread_id: format!("{:?}", std::thread::current().id()),
70 details: format!("Future {} created", future_type),
71 };
72
73 if let Ok(mut events) = self.task_events.lock() {
74 events.push(event);
75 }
76 }
77
78 pub fn record_state_transition(
80 &self,
81 ptr: usize,
82 from_state: FutureState,
83 to_state: FutureState,
84 ) {
85 let transition = StateTransition {
86 ptr,
87 from_state: from_state.clone(),
88 to_state: to_state.clone(),
89 timestamp: current_timestamp(),
90 thread_id: format!("{:?}", std::thread::current().id()),
91 };
92
93 if let Ok(mut transitions) = self.state_transitions.lock() {
94 transitions.push(transition);
95 }
96
97 if let Ok(mut futures) = self.active_futures.lock() {
99 if let Some(future_info) = futures.get_mut(&ptr) {
100 future_info.current_state = to_state.clone();
101 future_info.state_history.push(to_state.clone());
102
103 if matches!(to_state, FutureState::Pending) {
104 future_info.poll_count += 1;
105 }
106 }
107 }
108 }
109
110 pub fn record_await_point(&self, ptr: usize, location: &str, await_type: AwaitType) {
112 let await_point = AwaitPoint {
113 ptr,
114 location: location.to_string(),
115 await_type,
116 timestamp: current_timestamp(),
117 thread_id: format!("{:?}", std::thread::current().id()),
118 duration: None, };
120
121 if let Ok(mut awaits) = self.await_points.lock() {
122 awaits.push(await_point);
123 }
124
125 if let Ok(mut futures) = self.active_futures.lock() {
127 if let Some(future_info) = futures.get_mut(&ptr) {
128 future_info.await_count += 1;
129 }
130 }
131 }
132
133 pub fn complete_await_point(&self, ptr: usize, location: &str) {
135 let completion_time = current_timestamp();
136
137 if let Ok(mut awaits) = self.await_points.lock() {
138 for await_point in awaits.iter_mut().rev() {
140 if await_point.ptr == ptr
141 && await_point.location == location
142 && await_point.duration.is_none()
143 {
144 await_point.duration = Some(completion_time - await_point.timestamp);
145 break;
146 }
147 }
148 }
149 }
150
151 pub fn complete_future(&self, ptr: usize, result: FutureResult) {
153 let completion_time = current_timestamp();
154
155 if let Ok(mut futures) = self.active_futures.lock() {
156 if let Some(future_info) = futures.get_mut(&ptr) {
157 future_info.completion_time = Some(completion_time);
158 future_info.current_state = match result {
159 FutureResult::Ready => FutureState::Ready,
160 FutureResult::Cancelled => FutureState::Cancelled,
161 FutureResult::Panicked => FutureState::Panicked,
162 };
163 }
164 }
165
166 let event = TaskEvent {
168 ptr,
169 event_type: TaskEventType::Completed,
170 timestamp: completion_time,
171 thread_id: format!("{:?}", std::thread::current().id()),
172 details: format!("Future completed with result: {:?}", result),
173 };
174
175 if let Ok(mut events) = self.task_events.lock() {
176 events.push(event);
177 }
178 }
179
180 pub fn get_async_statistics(&self) -> AsyncStatistics {
182 let futures = self.active_futures.lock().unwrap();
183 let transitions = self.state_transitions.lock().unwrap();
184 let awaits = self.await_points.lock().unwrap();
185 let _events = self.task_events.lock().unwrap();
186
187 let total_futures = futures.len();
188 let completed_futures = futures
189 .values()
190 .filter(|f| f.completion_time.is_some())
191 .count();
192 let active_futures = total_futures - completed_futures;
193
194 let completion_times: Vec<u64> = futures
196 .values()
197 .filter_map(|f| {
198 if let (Some(completion), creation) = (f.completion_time, f.creation_time) {
199 Some(completion - creation)
200 } else {
201 None
202 }
203 })
204 .collect();
205
206 let avg_completion_time = if !completion_times.is_empty() {
207 completion_times.iter().sum::<u64>() / completion_times.len() as u64
208 } else {
209 0
210 };
211
212 let total_awaits = awaits.len();
214 let completed_awaits = awaits.iter().filter(|a| a.duration.is_some()).count();
215
216 let await_durations: Vec<u64> = awaits.iter().filter_map(|a| a.duration).collect();
217
218 let avg_await_duration = if !await_durations.is_empty() {
219 await_durations.iter().sum::<u64>() / await_durations.len() as u64
220 } else {
221 0
222 };
223
224 let mut by_type = HashMap::new();
226 for future in futures.values() {
227 *by_type.entry(future.future_type.clone()).or_insert(0) += 1;
228 }
229
230 AsyncStatistics {
231 total_futures,
232 active_futures,
233 completed_futures,
234 total_state_transitions: transitions.len(),
235 total_awaits,
236 completed_awaits,
237 avg_completion_time,
238 avg_await_duration,
239 by_type,
240 }
241 }
242
243 pub fn analyze_async_patterns(&self) -> AsyncPatternAnalysis {
245 let futures = self.active_futures.lock().unwrap();
246 let awaits = self.await_points.lock().unwrap();
247
248 let mut patterns = Vec::new();
249
250 let long_running_threshold = 1_000_000_000; let long_running_count = futures
253 .values()
254 .filter(|f| {
255 if let Some(completion) = f.completion_time {
256 completion - f.creation_time > long_running_threshold
257 } else {
258 current_timestamp() - f.creation_time > long_running_threshold
259 }
260 })
261 .count();
262
263 if long_running_count > 0 {
264 patterns.push(AsyncPattern {
265 pattern_type: AsyncPatternType::LongRunningFutures,
266 description: format!(
267 "{} futures running longer than 1 second",
268 long_running_count
269 ),
270 severity: AsyncPatternSeverity::Warning,
271 suggestion: "Consider breaking down long-running operations or adding timeouts"
272 .to_string(),
273 });
274 }
275
276 let high_poll_threshold = 100;
278 let high_poll_count = futures
279 .values()
280 .filter(|f| f.poll_count > high_poll_threshold)
281 .count();
282
283 if high_poll_count > 0 {
284 patterns.push(AsyncPattern {
285 pattern_type: AsyncPatternType::ExcessivePolling,
286 description: format!(
287 "{} futures polled more than {} times",
288 high_poll_count, high_poll_threshold
289 ),
290 severity: AsyncPatternSeverity::Warning,
291 suggestion: "High poll count may indicate inefficient async design".to_string(),
292 });
293 }
294
295 let high_concurrency_threshold = 50;
297 if futures.len() > high_concurrency_threshold {
298 patterns.push(AsyncPattern {
299 pattern_type: AsyncPatternType::HighConcurrency,
300 description: format!("{} concurrent futures detected", futures.len()),
301 severity: AsyncPatternSeverity::Info,
302 suggestion:
303 "High concurrency - ensure this is intentional and resources are managed"
304 .to_string(),
305 });
306 }
307
308 let slow_await_threshold = 100_000_000; let slow_awaits = awaits
311 .iter()
312 .filter(|a| a.duration.map_or(false, |d| d > slow_await_threshold))
313 .count();
314
315 if slow_awaits > 0 {
316 patterns.push(AsyncPattern {
317 pattern_type: AsyncPatternType::SlowAwaitPoints,
318 description: format!("{} await points took longer than 100ms", slow_awaits),
319 severity: AsyncPatternSeverity::Warning,
320 suggestion: "Slow await points may indicate blocking operations in async code"
321 .to_string(),
322 });
323 }
324
325 AsyncPatternAnalysis {
326 patterns,
327 total_futures_analyzed: futures.len(),
328 analysis_timestamp: current_timestamp(),
329 }
330 }
331
332 pub fn get_future_info(&self, ptr: usize) -> Option<FutureInfo> {
334 self.active_futures.lock().unwrap().get(&ptr).cloned()
335 }
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
340pub struct FutureInfo {
341 pub ptr: usize,
343 pub future_type: String,
345 pub current_state: FutureState,
347 pub creation_time: u64,
349 pub completion_time: Option<u64>,
351 pub state_history: Vec<FutureState>,
353 pub await_count: usize,
355 pub poll_count: usize,
357 pub thread_id: String,
359}
360
361#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
363pub enum FutureState {
364 Pending,
366 Ready,
368 Cancelled,
370 Panicked,
372 Created,
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct StateTransition {
379 pub ptr: usize,
381 pub from_state: FutureState,
383 pub to_state: FutureState,
385 pub timestamp: u64,
387 pub thread_id: String,
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize)]
393pub struct AwaitPoint {
394 pub ptr: usize,
396 pub location: String,
398 pub await_type: AwaitType,
400 pub timestamp: u64,
402 pub thread_id: String,
404 pub duration: Option<u64>,
406}
407
408#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
410pub enum AwaitType {
411 Regular,
413 Timeout,
415 Select,
417 Join,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct TaskEvent {
424 pub ptr: usize,
426 pub event_type: TaskEventType,
428 pub timestamp: u64,
430 pub thread_id: String,
432 pub details: String,
434}
435
436#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
438pub enum TaskEventType {
439 Created,
441 Started,
443 Suspended,
445 Resumed,
447 Completed,
449 Cancelled,
451}
452
453#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
455pub enum FutureResult {
456 Ready,
458 Cancelled,
460 Panicked,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct AsyncStatistics {
467 pub total_futures: usize,
469 pub active_futures: usize,
471 pub completed_futures: usize,
473 pub total_state_transitions: usize,
475 pub total_awaits: usize,
477 pub completed_awaits: usize,
479 pub avg_completion_time: u64,
481 pub avg_await_duration: u64,
483 pub by_type: HashMap<String, usize>,
485}
486
487#[derive(Debug, Clone, Serialize, Deserialize)]
489pub struct AsyncPatternAnalysis {
490 pub patterns: Vec<AsyncPattern>,
492 pub total_futures_analyzed: usize,
494 pub analysis_timestamp: u64,
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct AsyncPattern {
501 pub pattern_type: AsyncPatternType,
503 pub description: String,
505 pub severity: AsyncPatternSeverity,
507 pub suggestion: String,
509}
510
511#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
513pub enum AsyncPatternType {
514 LongRunningFutures,
516 ExcessivePolling,
518 HighConcurrency,
520 SlowAwaitPoints,
522 FutureMemoryLeaks,
524}
525
526#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
528pub enum AsyncPatternSeverity {
529 Info,
531 Warning,
533 Error,
535}
536
537fn current_timestamp() -> u64 {
539 SystemTime::now()
540 .duration_since(UNIX_EPOCH)
541 .unwrap_or_default()
542 .as_nanos() as u64
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548
549 #[test]
550 fn test_future_tracking() {
551 let analyzer = AsyncAnalyzer::new();
552
553 analyzer.track_future(0x1000, "async_fn", FutureState::Created);
555
556 let info = analyzer.get_future_info(0x1000);
558 assert!(info.is_some());
559 assert_eq!(info.unwrap().future_type, "async_fn");
560
561 analyzer.record_state_transition(0x1000, FutureState::Created, FutureState::Pending);
563
564 let info = analyzer.get_future_info(0x1000);
566 assert_eq!(info.unwrap().current_state, FutureState::Pending);
567 }
568
569 #[test]
570 fn test_await_tracking() {
571 let analyzer = AsyncAnalyzer::new();
572
573 analyzer.track_future(0x1000, "async_fn", FutureState::Created);
575 analyzer.record_await_point(0x1000, "line_42", AwaitType::Regular);
576
577 analyzer.complete_await_point(0x1000, "line_42");
579
580 let stats = analyzer.get_async_statistics();
582 assert_eq!(stats.total_awaits, 1);
583 assert_eq!(stats.completed_awaits, 1);
584 }
585}