memscope_rs/analysis/
async_analysis.rs

1//! Async type tracking and analysis
2//!
3//! This module implements async type analysis features from ComplexTypeForRust.md:
4//! - Future and Stream state machine analysis
5//! - Async task lifecycle tracking
6//! - Await point analysis
7
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex, OnceLock};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13/// Global async analyzer instance
14static GLOBAL_ASYNC_ANALYZER: OnceLock<Arc<AsyncAnalyzer>> = OnceLock::new();
15
16/// Get the global async analyzer instance
17pub fn get_global_async_analyzer() -> Arc<AsyncAnalyzer> {
18    GLOBAL_ASYNC_ANALYZER
19        .get_or_init(|| Arc::new(AsyncAnalyzer::new()))
20        .clone()
21}
22
23/// Async type analysis system
24pub struct AsyncAnalyzer {
25    /// Active futures tracking
26    active_futures: Mutex<HashMap<usize, FutureInfo>>,
27    /// Future state transitions
28    state_transitions: Mutex<Vec<StateTransition>>,
29    /// Await point analysis
30    await_points: Mutex<Vec<AwaitPoint>>,
31    /// Task lifecycle events
32    task_events: Mutex<Vec<TaskEvent>>,
33}
34
35impl AsyncAnalyzer {
36    /// Create a new async analyzer
37    pub fn new() -> Self {
38        Self {
39            active_futures: Mutex::new(HashMap::new()),
40            state_transitions: Mutex::new(Vec::new()),
41            await_points: Mutex::new(Vec::new()),
42            task_events: Mutex::new(Vec::new()),
43        }
44    }
45
46    /// Track a new future
47    pub fn track_future(&self, ptr: usize, future_type: &str, initial_state: FutureState) {
48        let future_info = FutureInfo {
49            ptr,
50            future_type: future_type.to_string(),
51            current_state: initial_state.clone(),
52            creation_time: current_timestamp(),
53            completion_time: None,
54            state_history: vec![initial_state.clone()],
55            await_count: 0,
56            poll_count: 0,
57            thread_id: format!("{:?}", std::thread::current().id()),
58        };
59
60        if let Ok(mut futures) = self.active_futures.lock() {
61            futures.insert(ptr, future_info);
62        }
63
64        // Record task creation event
65        let event = TaskEvent {
66            ptr,
67            event_type: TaskEventType::Created,
68            timestamp: current_timestamp(),
69            thread_id: format!("{:?}", std::thread::current().id()),
70            details: format!("Future {} created", future_type),
71        };
72
73        if let Ok(mut events) = self.task_events.lock() {
74            events.push(event);
75        }
76    }
77
78    /// Record a state transition
79    pub fn record_state_transition(
80        &self,
81        ptr: usize,
82        from_state: FutureState,
83        to_state: FutureState,
84    ) {
85        let transition = StateTransition {
86            ptr,
87            from_state: from_state.clone(),
88            to_state: to_state.clone(),
89            timestamp: current_timestamp(),
90            thread_id: format!("{:?}", std::thread::current().id()),
91        };
92
93        if let Ok(mut transitions) = self.state_transitions.lock() {
94            transitions.push(transition);
95        }
96
97        // Update future info
98        if let Ok(mut futures) = self.active_futures.lock() {
99            if let Some(future_info) = futures.get_mut(&ptr) {
100                future_info.current_state = to_state.clone();
101                future_info.state_history.push(to_state.clone());
102
103                if matches!(to_state, FutureState::Pending) {
104                    future_info.poll_count += 1;
105                }
106            }
107        }
108    }
109
110    /// Record an await point
111    pub fn record_await_point(&self, ptr: usize, location: &str, await_type: AwaitType) {
112        let await_point = AwaitPoint {
113            ptr,
114            location: location.to_string(),
115            await_type,
116            timestamp: current_timestamp(),
117            thread_id: format!("{:?}", std::thread::current().id()),
118            duration: None, // Will be filled when await completes
119        };
120
121        if let Ok(mut awaits) = self.await_points.lock() {
122            awaits.push(await_point);
123        }
124
125        // Update await count
126        if let Ok(mut futures) = self.active_futures.lock() {
127            if let Some(future_info) = futures.get_mut(&ptr) {
128                future_info.await_count += 1;
129            }
130        }
131    }
132
133    /// Complete an await point
134    pub fn complete_await_point(&self, ptr: usize, location: &str) {
135        let completion_time = current_timestamp();
136
137        if let Ok(mut awaits) = self.await_points.lock() {
138            // Find the most recent await point for this location
139            for await_point in awaits.iter_mut().rev() {
140                if await_point.ptr == ptr
141                    && await_point.location == location
142                    && await_point.duration.is_none()
143                {
144                    await_point.duration = Some(completion_time - await_point.timestamp);
145                    break;
146                }
147            }
148        }
149    }
150
151    /// Mark a future as completed
152    pub fn complete_future(&self, ptr: usize, result: FutureResult) {
153        let completion_time = current_timestamp();
154
155        if let Ok(mut futures) = self.active_futures.lock() {
156            if let Some(future_info) = futures.get_mut(&ptr) {
157                future_info.completion_time = Some(completion_time);
158                future_info.current_state = match result {
159                    FutureResult::Ready => FutureState::Ready,
160                    FutureResult::Cancelled => FutureState::Cancelled,
161                    FutureResult::Panicked => FutureState::Panicked,
162                };
163            }
164        }
165
166        // Record completion event
167        let event = TaskEvent {
168            ptr,
169            event_type: TaskEventType::Completed,
170            timestamp: completion_time,
171            thread_id: format!("{:?}", std::thread::current().id()),
172            details: format!("Future completed with result: {:?}", result),
173        };
174
175        if let Ok(mut events) = self.task_events.lock() {
176            events.push(event);
177        }
178    }
179
180    /// Get async statistics
181    pub fn get_async_statistics(&self) -> AsyncStatistics {
182        let futures = self.active_futures.lock().unwrap();
183        let transitions = self.state_transitions.lock().unwrap();
184        let awaits = self.await_points.lock().unwrap();
185        let _events = self.task_events.lock().unwrap();
186
187        let total_futures = futures.len();
188        let completed_futures = futures
189            .values()
190            .filter(|f| f.completion_time.is_some())
191            .count();
192        let active_futures = total_futures - completed_futures;
193
194        // Calculate average completion time
195        let completion_times: Vec<u64> = futures
196            .values()
197            .filter_map(|f| {
198                if let (Some(completion), creation) = (f.completion_time, f.creation_time) {
199                    Some(completion - creation)
200                } else {
201                    None
202                }
203            })
204            .collect();
205
206        let avg_completion_time = if !completion_times.is_empty() {
207            completion_times.iter().sum::<u64>() / completion_times.len() as u64
208        } else {
209            0
210        };
211
212        // Calculate await statistics
213        let total_awaits = awaits.len();
214        let completed_awaits = awaits.iter().filter(|a| a.duration.is_some()).count();
215
216        let await_durations: Vec<u64> = awaits.iter().filter_map(|a| a.duration).collect();
217
218        let avg_await_duration = if !await_durations.is_empty() {
219            await_durations.iter().sum::<u64>() / await_durations.len() as u64
220        } else {
221            0
222        };
223
224        // Count by future type
225        let mut by_type = HashMap::new();
226        for future in futures.values() {
227            *by_type.entry(future.future_type.clone()).or_insert(0) += 1;
228        }
229
230        AsyncStatistics {
231            total_futures,
232            active_futures,
233            completed_futures,
234            total_state_transitions: transitions.len(),
235            total_awaits,
236            completed_awaits,
237            avg_completion_time,
238            avg_await_duration,
239            by_type,
240        }
241    }
242
243    /// Analyze async patterns
244    pub fn analyze_async_patterns(&self) -> AsyncPatternAnalysis {
245        let futures = self.active_futures.lock().unwrap();
246        let awaits = self.await_points.lock().unwrap();
247
248        let mut patterns = Vec::new();
249
250        // Pattern: Long-running futures
251        let long_running_threshold = 1_000_000_000; // 1 second in nanoseconds
252        let long_running_count = futures
253            .values()
254            .filter(|f| {
255                if let Some(completion) = f.completion_time {
256                    completion - f.creation_time > long_running_threshold
257                } else {
258                    current_timestamp() - f.creation_time > long_running_threshold
259                }
260            })
261            .count();
262
263        if long_running_count > 0 {
264            patterns.push(AsyncPattern {
265                pattern_type: AsyncPatternType::LongRunningFutures,
266                description: format!(
267                    "{} futures running longer than 1 second",
268                    long_running_count
269                ),
270                severity: AsyncPatternSeverity::Warning,
271                suggestion: "Consider breaking down long-running operations or adding timeouts"
272                    .to_string(),
273            });
274        }
275
276        // Pattern: Excessive polling
277        let high_poll_threshold = 100;
278        let high_poll_count = futures
279            .values()
280            .filter(|f| f.poll_count > high_poll_threshold)
281            .count();
282
283        if high_poll_count > 0 {
284            patterns.push(AsyncPattern {
285                pattern_type: AsyncPatternType::ExcessivePolling,
286                description: format!(
287                    "{} futures polled more than {} times",
288                    high_poll_count, high_poll_threshold
289                ),
290                severity: AsyncPatternSeverity::Warning,
291                suggestion: "High poll count may indicate inefficient async design".to_string(),
292            });
293        }
294
295        // Pattern: Many concurrent futures
296        let high_concurrency_threshold = 50;
297        if futures.len() > high_concurrency_threshold {
298            patterns.push(AsyncPattern {
299                pattern_type: AsyncPatternType::HighConcurrency,
300                description: format!("{} concurrent futures detected", futures.len()),
301                severity: AsyncPatternSeverity::Info,
302                suggestion:
303                    "High concurrency - ensure this is intentional and resources are managed"
304                        .to_string(),
305            });
306        }
307
308        // Pattern: Slow await points
309        let slow_await_threshold = 100_000_000; // 100ms in nanoseconds
310        let slow_awaits = awaits
311            .iter()
312            .filter(|a| a.duration.map_or(false, |d| d > slow_await_threshold))
313            .count();
314
315        if slow_awaits > 0 {
316            patterns.push(AsyncPattern {
317                pattern_type: AsyncPatternType::SlowAwaitPoints,
318                description: format!("{} await points took longer than 100ms", slow_awaits),
319                severity: AsyncPatternSeverity::Warning,
320                suggestion: "Slow await points may indicate blocking operations in async code"
321                    .to_string(),
322            });
323        }
324
325        AsyncPatternAnalysis {
326            patterns,
327            total_futures_analyzed: futures.len(),
328            analysis_timestamp: current_timestamp(),
329        }
330    }
331
332    /// Get future information
333    pub fn get_future_info(&self, ptr: usize) -> Option<FutureInfo> {
334        self.active_futures.lock().unwrap().get(&ptr).cloned()
335    }
336}
337
338/// Information about a tracked future
339#[derive(Debug, Clone, Serialize, Deserialize)]
340pub struct FutureInfo {
341    /// Memory pointer
342    pub ptr: usize,
343    /// Type of future
344    pub future_type: String,
345    /// Current state
346    pub current_state: FutureState,
347    /// Creation timestamp
348    pub creation_time: u64,
349    /// Completion timestamp (if completed)
350    pub completion_time: Option<u64>,
351    /// History of state changes
352    pub state_history: Vec<FutureState>,
353    /// Number of await points
354    pub await_count: usize,
355    /// Number of times polled
356    pub poll_count: usize,
357    /// Thread where created
358    pub thread_id: String,
359}
360
361/// Future states
362#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
363pub enum FutureState {
364    /// Future is pending
365    Pending,
366    /// Future is ready with a value
367    Ready,
368    /// Future was cancelled
369    Cancelled,
370    /// Future panicked
371    Panicked,
372    /// Initial state
373    Created,
374}
375
376/// State transition information
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct StateTransition {
379    /// Future pointer
380    pub ptr: usize,
381    /// Previous state
382    pub from_state: FutureState,
383    /// New state
384    pub to_state: FutureState,
385    /// Transition timestamp
386    pub timestamp: u64,
387    /// Thread where transition occurred
388    pub thread_id: String,
389}
390
391/// Await point information
392#[derive(Debug, Clone, Serialize, Deserialize)]
393pub struct AwaitPoint {
394    /// Future pointer
395    pub ptr: usize,
396    /// Location in code
397    pub location: String,
398    /// Type of await
399    pub await_type: AwaitType,
400    /// Await start timestamp
401    pub timestamp: u64,
402    /// Thread where await occurred
403    pub thread_id: String,
404    /// Duration of await (if completed)
405    pub duration: Option<u64>,
406}
407
408/// Types of await operations
409#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
410pub enum AwaitType {
411    /// Regular await
412    Regular,
413    /// Timeout await
414    Timeout,
415    /// Select await
416    Select,
417    /// Join await
418    Join,
419}
420
421/// Task lifecycle events
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct TaskEvent {
424    /// Future pointer
425    pub ptr: usize,
426    /// Event type
427    pub event_type: TaskEventType,
428    /// Event timestamp
429    pub timestamp: u64,
430    /// Thread where event occurred
431    pub thread_id: String,
432    /// Additional details
433    pub details: String,
434}
435
436/// Types of task events
437#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
438pub enum TaskEventType {
439    /// Task created
440    Created,
441    /// Task started
442    Started,
443    /// Task suspended
444    Suspended,
445    /// Task resumed
446    Resumed,
447    /// Task completed
448    Completed,
449    /// Task cancelled
450    Cancelled,
451}
452
453/// Future completion results
454#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
455pub enum FutureResult {
456    /// Future completed successfully
457    Ready,
458    /// Future was cancelled
459    Cancelled,
460    /// Future panicked
461    Panicked,
462}
463
464/// Async statistics
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct AsyncStatistics {
467    /// Total futures tracked
468    pub total_futures: usize,
469    /// Currently active futures
470    pub active_futures: usize,
471    /// Completed futures
472    pub completed_futures: usize,
473    /// Total state transitions
474    pub total_state_transitions: usize,
475    /// Total await points
476    pub total_awaits: usize,
477    /// Completed await points
478    pub completed_awaits: usize,
479    /// Average completion time in nanoseconds
480    pub avg_completion_time: u64,
481    /// Average await duration in nanoseconds
482    pub avg_await_duration: u64,
483    /// Count by future type
484    pub by_type: HashMap<String, usize>,
485}
486
487/// Async pattern analysis
488#[derive(Debug, Clone, Serialize, Deserialize)]
489pub struct AsyncPatternAnalysis {
490    /// Detected patterns
491    pub patterns: Vec<AsyncPattern>,
492    /// Total futures analyzed
493    pub total_futures_analyzed: usize,
494    /// Analysis timestamp
495    pub analysis_timestamp: u64,
496}
497
498/// Detected async pattern
499#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct AsyncPattern {
501    /// Type of pattern
502    pub pattern_type: AsyncPatternType,
503    /// Description of the pattern
504    pub description: String,
505    /// Severity level
506    pub severity: AsyncPatternSeverity,
507    /// Suggested action
508    pub suggestion: String,
509}
510
511/// Types of async patterns
512#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
513pub enum AsyncPatternType {
514    /// Long-running futures
515    LongRunningFutures,
516    /// Excessive polling
517    ExcessivePolling,
518    /// High concurrency
519    HighConcurrency,
520    /// Slow await points
521    SlowAwaitPoints,
522    /// Memory leaks in futures
523    FutureMemoryLeaks,
524}
525
526/// Pattern severity levels
527#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
528pub enum AsyncPatternSeverity {
529    /// Informational
530    Info,
531    /// Warning
532    Warning,
533    /// Error
534    Error,
535}
536
537/// Get current timestamp
538fn current_timestamp() -> u64 {
539    SystemTime::now()
540        .duration_since(UNIX_EPOCH)
541        .unwrap_or_default()
542        .as_nanos() as u64
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548
549    #[test]
550    fn test_future_tracking() {
551        let analyzer = AsyncAnalyzer::new();
552
553        // Track a future
554        analyzer.track_future(0x1000, "async_fn", FutureState::Created);
555
556        // Check it's tracked
557        let info = analyzer.get_future_info(0x1000);
558        assert!(info.is_some());
559        assert_eq!(info.unwrap().future_type, "async_fn");
560
561        // Record state transition
562        analyzer.record_state_transition(0x1000, FutureState::Created, FutureState::Pending);
563
564        // Check state updated
565        let info = analyzer.get_future_info(0x1000);
566        assert_eq!(info.unwrap().current_state, FutureState::Pending);
567    }
568
569    #[test]
570    fn test_await_tracking() {
571        let analyzer = AsyncAnalyzer::new();
572
573        // Track future and await
574        analyzer.track_future(0x1000, "async_fn", FutureState::Created);
575        analyzer.record_await_point(0x1000, "line_42", AwaitType::Regular);
576
577        // Complete await
578        analyzer.complete_await_point(0x1000, "line_42");
579
580        // Check statistics
581        let stats = analyzer.get_async_statistics();
582        assert_eq!(stats.total_awaits, 1);
583        assert_eq!(stats.completed_awaits, 1);
584    }
585}