memscope_rs/analysis/
async_analysis.rs

1//! Async type tracking and analysis
2//!
3//! This module implements async type analysis features from ComplexTypeForRust.md:
4//! - Future and Stream state machine analysis
5//! - Async task lifecycle tracking
6//! - Await point analysis
7
8use crate::core::safe_operations::SafeLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex, OnceLock};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14/// Global async analyzer instance
15static GLOBAL_ASYNC_ANALYZER: OnceLock<Arc<AsyncAnalyzer>> = OnceLock::new();
16
17/// Get the global async analyzer instance
18pub fn get_global_async_analyzer() -> Arc<AsyncAnalyzer> {
19    GLOBAL_ASYNC_ANALYZER
20        .get_or_init(|| Arc::new(AsyncAnalyzer::new()))
21        .clone()
22}
23
24/// Async type analysis system
25pub struct AsyncAnalyzer {
26    /// Active futures tracking
27    active_futures: Mutex<HashMap<usize, FutureInfo>>,
28    /// Future state transitions
29    state_transitions: Mutex<Vec<StateTransition>>,
30    /// Await point analysis
31    await_points: Mutex<Vec<AwaitPoint>>,
32    /// Task lifecycle events
33    task_events: Mutex<Vec<TaskEvent>>,
34}
35
36impl Default for AsyncAnalyzer {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl AsyncAnalyzer {
43    /// Create a new async analyzer
44    pub fn new() -> Self {
45        Self {
46            active_futures: Mutex::new(HashMap::new()),
47            state_transitions: Mutex::new(Vec::new()),
48            await_points: Mutex::new(Vec::new()),
49            task_events: Mutex::new(Vec::new()),
50        }
51    }
52
53    /// Track a new future
54    pub fn track_future(&self, ptr: usize, future_type: &str, initial_state: FutureState) {
55        let future_info = FutureInfo {
56            ptr,
57            future_type: future_type.to_string(),
58            current_state: initial_state.clone(),
59            creation_time: current_timestamp(),
60            completion_time: None,
61            state_history: vec![initial_state.clone()],
62            await_count: 0,
63            poll_count: 0,
64            thread_id: format!("{:?}", std::thread::current().id()),
65        };
66
67        if let Ok(mut futures) = self.active_futures.lock() {
68            futures.insert(ptr, future_info);
69        }
70
71        // Record task creation event
72        let event = TaskEvent {
73            ptr,
74            event_type: TaskEventType::Created,
75            timestamp: current_timestamp(),
76            thread_id: format!("{:?}", std::thread::current().id()),
77            details: format!("Future {future_type} created"),
78        };
79
80        if let Ok(mut events) = self.task_events.lock() {
81            events.push(event);
82        }
83    }
84
85    /// Record a state transition
86    pub fn record_state_transition(
87        &self,
88        ptr: usize,
89        from_state: FutureState,
90        to_state: FutureState,
91    ) {
92        let transition = StateTransition {
93            ptr,
94            from_state: from_state.clone(),
95            to_state: to_state.clone(),
96            timestamp: current_timestamp(),
97            thread_id: format!("{:?}", std::thread::current().id()),
98        };
99
100        if let Ok(mut transitions) = self.state_transitions.lock() {
101            transitions.push(transition);
102        }
103
104        // Update future info
105        if let Ok(mut futures) = self.active_futures.lock() {
106            if let Some(future_info) = futures.get_mut(&ptr) {
107                future_info.current_state = to_state.clone();
108                future_info.state_history.push(to_state.clone());
109
110                if matches!(to_state, FutureState::Pending) {
111                    future_info.poll_count += 1;
112                }
113            }
114        }
115    }
116
117    /// Record an await point
118    pub fn record_await_point(&self, ptr: usize, location: &str, await_type: AwaitType) {
119        let await_point = AwaitPoint {
120            ptr,
121            location: location.to_string(),
122            await_type,
123            timestamp: current_timestamp(),
124            thread_id: format!("{:?}", std::thread::current().id()),
125            duration: None, // Will be filled when await completes
126        };
127
128        if let Ok(mut awaits) = self.await_points.lock() {
129            awaits.push(await_point);
130        }
131
132        // Update await count
133        if let Ok(mut futures) = self.active_futures.lock() {
134            if let Some(future_info) = futures.get_mut(&ptr) {
135                future_info.await_count += 1;
136            }
137        }
138    }
139
140    /// Complete an await point
141    pub fn complete_await_point(&self, ptr: usize, location: &str) {
142        let completion_time = current_timestamp();
143
144        if let Ok(mut awaits) = self.await_points.lock() {
145            // Find the most recent await point for this location
146            for await_point in awaits.iter_mut().rev() {
147                if await_point.ptr == ptr
148                    && await_point.location == location
149                    && await_point.duration.is_none()
150                {
151                    await_point.duration = Some(completion_time - await_point.timestamp);
152                    break;
153                }
154            }
155        }
156    }
157
158    /// Mark a future as completed
159    pub fn complete_future(&self, ptr: usize, result: FutureResult) {
160        let completion_time = current_timestamp();
161
162        if let Ok(mut futures) = self.active_futures.lock() {
163            if let Some(future_info) = futures.get_mut(&ptr) {
164                future_info.completion_time = Some(completion_time);
165                future_info.current_state = match result {
166                    FutureResult::Ready => FutureState::Ready,
167                    FutureResult::Cancelled => FutureState::Cancelled,
168                    FutureResult::Panicked => FutureState::Panicked,
169                };
170            }
171        }
172
173        // Record completion event
174        let event = TaskEvent {
175            ptr,
176            event_type: TaskEventType::Completed,
177            timestamp: completion_time,
178            thread_id: format!("{:?}", std::thread::current().id()),
179            details: format!("Future completed with result: {result:?}"),
180        };
181
182        if let Ok(mut events) = self.task_events.lock() {
183            events.push(event);
184        }
185    }
186
187    /// Get async statistics
188    pub fn get_async_statistics(&self) -> AsyncStatistics {
189        let futures = self
190            .active_futures
191            .safe_lock()
192            .expect("Failed to acquire lock on active_futures");
193        let transitions = self
194            .state_transitions
195            .safe_lock()
196            .expect("Failed to acquire lock on state_transitions");
197        let awaits = self
198            .await_points
199            .safe_lock()
200            .expect("Failed to acquire lock on await_points");
201        let _events = self
202            .task_events
203            .safe_lock()
204            .expect("Failed to acquire lock on task_events");
205
206        let total_futures = futures.len();
207        let completed_futures = futures
208            .values()
209            .filter(|f| f.completion_time.is_some())
210            .count();
211        let active_futures = total_futures - completed_futures;
212
213        // Calculate average completion time
214        let completion_times: Vec<u64> = futures
215            .values()
216            .filter_map(|f| {
217                if let (Some(completion), creation) = (f.completion_time, f.creation_time) {
218                    Some(completion - creation)
219                } else {
220                    None
221                }
222            })
223            .collect();
224
225        let avg_completion_time = if !completion_times.is_empty() {
226            completion_times.iter().sum::<u64>() / completion_times.len() as u64
227        } else {
228            0
229        };
230
231        // Calculate await statistics
232        let total_awaits = awaits.len();
233        let completed_awaits = awaits.iter().filter(|a| a.duration.is_some()).count();
234
235        let await_durations: Vec<u64> = awaits.iter().filter_map(|a| a.duration).collect();
236
237        let avg_await_duration = if !await_durations.is_empty() {
238            await_durations.iter().sum::<u64>() / await_durations.len() as u64
239        } else {
240            0
241        };
242
243        // Count by future type
244        let mut by_type = HashMap::new();
245        for future in futures.values() {
246            *by_type.entry(future.future_type.clone()).or_insert(0) += 1;
247        }
248
249        AsyncStatistics {
250            total_futures,
251            active_futures,
252            completed_futures,
253            total_state_transitions: transitions.len(),
254            total_awaits,
255            completed_awaits,
256            avg_completion_time,
257            avg_await_duration,
258            by_type,
259        }
260    }
261
262    /// Analyze async patterns
263    pub fn analyze_async_patterns(&self) -> AsyncPatternAnalysis {
264        let futures = self
265            .active_futures
266            .safe_lock()
267            .expect("Failed to acquire lock on active_futures");
268        let awaits = self
269            .await_points
270            .safe_lock()
271            .expect("Failed to acquire lock on await_points");
272
273        let mut patterns = Vec::new();
274
275        // Pattern: Long-running futures
276        let long_running_threshold = 1_000_000_000; // 1 second in nanoseconds
277        let long_running_count = futures
278            .values()
279            .filter(|f| {
280                if let Some(completion) = f.completion_time {
281                    completion - f.creation_time > long_running_threshold
282                } else {
283                    current_timestamp() - f.creation_time > long_running_threshold
284                }
285            })
286            .count();
287
288        if long_running_count > 0 {
289            patterns.push(AsyncPattern {
290                pattern_type: AsyncPatternType::LongRunningFutures,
291                description: format!("{long_running_count} futures running longer than 1 second",),
292                severity: AsyncPatternSeverity::Warning,
293                suggestion: "Consider breaking down long-running operations or adding timeouts"
294                    .to_string(),
295            });
296        }
297
298        // Pattern: Excessive polling
299        let high_poll_threshold = 100;
300        let high_poll_count = futures
301            .values()
302            .filter(|f| f.poll_count > high_poll_threshold)
303            .count();
304
305        if high_poll_count > 0 {
306            patterns.push(AsyncPattern {
307                pattern_type: AsyncPatternType::ExcessivePolling,
308                description: format!(
309                    "{high_poll_count} futures polled more than {high_poll_threshold} times",
310                ),
311                severity: AsyncPatternSeverity::Warning,
312                suggestion: "High poll count may indicate inefficient async design".to_string(),
313            });
314        }
315
316        // Pattern: Many concurrent futures
317        let high_concurrency_threshold = 50;
318        if futures.len() > high_concurrency_threshold {
319            patterns.push(AsyncPattern {
320                pattern_type: AsyncPatternType::HighConcurrency,
321                description: format!("{} concurrent futures detected", futures.len()),
322                severity: AsyncPatternSeverity::Info,
323                suggestion:
324                    "High concurrency - ensure this is intentional and resources are managed"
325                        .to_string(),
326            });
327        }
328
329        // Pattern: Slow await points
330        let slow_await_threshold = 100_000_000; // 100ms in nanoseconds
331        let slow_awaits = awaits
332            .iter()
333            .filter(|a| a.duration.is_some_and(|d| d > slow_await_threshold))
334            .count();
335
336        if slow_awaits > 0 {
337            patterns.push(AsyncPattern {
338                pattern_type: AsyncPatternType::SlowAwaitPoints,
339                description: format!("{slow_awaits} await points took longer than 100ms"),
340                severity: AsyncPatternSeverity::Warning,
341                suggestion: "Slow await points may indicate blocking operations in async code"
342                    .to_string(),
343            });
344        }
345
346        AsyncPatternAnalysis {
347            patterns,
348            total_futures_analyzed: futures.len(),
349            analysis_timestamp: current_timestamp(),
350        }
351    }
352
353    /// Get future information
354    pub fn get_future_info(&self, ptr: usize) -> Option<FutureInfo> {
355        self.active_futures
356            .safe_lock()
357            .expect("Failed to acquire lock on active_futures")
358            .get(&ptr)
359            .cloned()
360    }
361}
362
363/// Information about a tracked future
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct FutureInfo {
366    /// Memory pointer
367    pub ptr: usize,
368    /// Type of future
369    pub future_type: String,
370    /// Current state
371    pub current_state: FutureState,
372    /// Creation timestamp
373    pub creation_time: u64,
374    /// Completion timestamp (if completed)
375    pub completion_time: Option<u64>,
376    /// History of state changes
377    pub state_history: Vec<FutureState>,
378    /// Number of await points
379    pub await_count: usize,
380    /// Number of times polled
381    pub poll_count: usize,
382    /// Thread where created
383    pub thread_id: String,
384}
385
386/// Future states
387#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
388pub enum FutureState {
389    /// Future is pending
390    Pending,
391    /// Future is ready with a value
392    Ready,
393    /// Future was cancelled
394    Cancelled,
395    /// Future panicked
396    Panicked,
397    /// Initial state
398    Created,
399}
400
401/// State transition information
402#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct StateTransition {
404    /// Future pointer
405    pub ptr: usize,
406    /// Previous state
407    pub from_state: FutureState,
408    /// New state
409    pub to_state: FutureState,
410    /// Transition timestamp
411    pub timestamp: u64,
412    /// Thread where transition occurred
413    pub thread_id: String,
414}
415
416/// Await point information
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct AwaitPoint {
419    /// Future pointer
420    pub ptr: usize,
421    /// Location in code
422    pub location: String,
423    /// Type of await
424    pub await_type: AwaitType,
425    /// Await start timestamp
426    pub timestamp: u64,
427    /// Thread where await occurred
428    pub thread_id: String,
429    /// Duration of await (if completed)
430    pub duration: Option<u64>,
431}
432
433/// Types of await operations
434#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
435pub enum AwaitType {
436    /// Regular await
437    Regular,
438    /// Timeout await
439    Timeout,
440    /// Select await
441    Select,
442    /// Join await
443    Join,
444}
445
446/// Task lifecycle events
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct TaskEvent {
449    /// Future pointer
450    pub ptr: usize,
451    /// Event type
452    pub event_type: TaskEventType,
453    /// Event timestamp
454    pub timestamp: u64,
455    /// Thread where event occurred
456    pub thread_id: String,
457    /// Additional details
458    pub details: String,
459}
460
461/// Types of task events
462#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
463pub enum TaskEventType {
464    /// Task created
465    Created,
466    /// Task started
467    Started,
468    /// Task suspended
469    Suspended,
470    /// Task resumed
471    Resumed,
472    /// Task completed
473    Completed,
474    /// Task cancelled
475    Cancelled,
476}
477
478/// Future completion results
479#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
480pub enum FutureResult {
481    /// Future completed successfully
482    Ready,
483    /// Future was cancelled
484    Cancelled,
485    /// Future panicked
486    Panicked,
487}
488
489/// Async statistics
490#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct AsyncStatistics {
492    /// Total futures tracked
493    pub total_futures: usize,
494    /// Currently active futures
495    pub active_futures: usize,
496    /// Completed futures
497    pub completed_futures: usize,
498    /// Total state transitions
499    pub total_state_transitions: usize,
500    /// Total await points
501    pub total_awaits: usize,
502    /// Completed await points
503    pub completed_awaits: usize,
504    /// Average completion time in nanoseconds
505    pub avg_completion_time: u64,
506    /// Average await duration in nanoseconds
507    pub avg_await_duration: u64,
508    /// Count by future type
509    pub by_type: HashMap<String, usize>,
510}
511
512/// Async pattern analysis
513#[derive(Debug, Clone, Serialize, Deserialize)]
514pub struct AsyncPatternAnalysis {
515    /// Detected patterns
516    pub patterns: Vec<AsyncPattern>,
517    /// Total futures analyzed
518    pub total_futures_analyzed: usize,
519    /// Analysis timestamp
520    pub analysis_timestamp: u64,
521}
522
523/// Detected async pattern
524#[derive(Debug, Clone, Serialize, Deserialize)]
525pub struct AsyncPattern {
526    /// Type of pattern
527    pub pattern_type: AsyncPatternType,
528    /// Description of the pattern
529    pub description: String,
530    /// Severity level
531    pub severity: AsyncPatternSeverity,
532    /// Suggested action
533    pub suggestion: String,
534}
535
536/// Types of async patterns
537#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
538pub enum AsyncPatternType {
539    /// Long-running futures
540    LongRunningFutures,
541    /// Excessive polling
542    ExcessivePolling,
543    /// High concurrency
544    HighConcurrency,
545    /// Slow await points
546    SlowAwaitPoints,
547    /// Memory leaks in futures
548    FutureMemoryLeaks,
549}
550
551/// Pattern severity levels
552#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
553pub enum AsyncPatternSeverity {
554    /// Informational
555    Info,
556    /// Warning
557    Warning,
558    /// Error
559    Error,
560}
561
562/// Get current timestamp
563fn current_timestamp() -> u64 {
564    SystemTime::now()
565        .duration_since(UNIX_EPOCH)
566        .unwrap_or_default()
567        .as_nanos() as u64
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573
574    #[test]
575    fn test_future_tracking() {
576        let analyzer = AsyncAnalyzer::new();
577
578        // Track a future
579        analyzer.track_future(0x1000, "async_fn", FutureState::Created);
580
581        // Check it's tracked
582        let info = analyzer.get_future_info(0x1000);
583        assert!(info.is_some());
584        assert_eq!(
585            info.expect("Failed to get async info").future_type,
586            "async_fn"
587        );
588
589        // Record state transition
590        analyzer.record_state_transition(0x1000, FutureState::Created, FutureState::Pending);
591
592        // Check state updated
593        let info = analyzer.get_future_info(0x1000);
594        assert_eq!(
595            info.expect("Failed to get async info").current_state,
596            FutureState::Pending
597        );
598    }
599
600    #[test]
601    fn test_await_tracking() {
602        let analyzer = AsyncAnalyzer::new();
603
604        // Track future and await
605        analyzer.track_future(0x1000, "async_fn", FutureState::Created);
606        analyzer.record_await_point(0x1000, "line_42", AwaitType::Regular);
607
608        // Complete await
609        analyzer.complete_await_point(0x1000, "line_42");
610
611        // Check statistics
612        let stats = analyzer.get_async_statistics();
613        assert_eq!(stats.total_awaits, 1);
614        assert_eq!(stats.completed_awaits, 1);
615    }
616}