use crate::core::safe_operations::SafeLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
static GLOBAL_ASYNC_ANALYZER: OnceLock<Arc<AsyncAnalyzer>> = OnceLock::new();
pub fn get_global_async_analyzer() -> Arc<AsyncAnalyzer> {
GLOBAL_ASYNC_ANALYZER
.get_or_init(|| Arc::new(AsyncAnalyzer::new()))
.clone()
}
pub struct AsyncAnalyzer {
active_futures: Mutex<HashMap<usize, FutureInfo>>,
state_transitions: Mutex<Vec<StateTransition>>,
await_points: Mutex<Vec<AwaitPoint>>,
task_events: Mutex<Vec<TaskEvent>>,
}
impl Default for AsyncAnalyzer {
fn default() -> Self {
Self::new()
}
}
impl AsyncAnalyzer {
pub fn new() -> Self {
Self {
active_futures: Mutex::new(HashMap::new()),
state_transitions: Mutex::new(Vec::new()),
await_points: Mutex::new(Vec::new()),
task_events: Mutex::new(Vec::new()),
}
}
pub fn track_future(&self, ptr: usize, future_type: &str, initial_state: FutureState) {
let future_info = FutureInfo {
ptr,
future_type: future_type.to_string(),
current_state: initial_state.clone(),
creation_time: current_timestamp(),
completion_time: None,
state_history: vec![initial_state.clone()],
await_count: 0,
poll_count: 0,
thread_id: format!("{:?}", std::thread::current().id()),
};
if let Ok(mut futures) = self.active_futures.lock() {
futures.insert(ptr, future_info);
}
let event = TaskEvent {
ptr,
event_type: TaskEventType::Created,
timestamp: current_timestamp(),
thread_id: format!("{:?}", std::thread::current().id()),
details: format!("Future {future_type} created"),
};
if let Ok(mut events) = self.task_events.lock() {
events.push(event);
}
}
pub fn record_state_transition(
&self,
ptr: usize,
from_state: FutureState,
to_state: FutureState,
) {
let transition = StateTransition {
ptr,
from_state: from_state.clone(),
to_state: to_state.clone(),
timestamp: current_timestamp(),
thread_id: format!("{:?}", std::thread::current().id()),
};
if let Ok(mut transitions) = self.state_transitions.lock() {
transitions.push(transition);
}
if let Ok(mut futures) = self.active_futures.lock() {
if let Some(future_info) = futures.get_mut(&ptr) {
future_info.current_state = to_state.clone();
future_info.state_history.push(to_state.clone());
if matches!(to_state, FutureState::Pending) {
future_info.poll_count += 1;
}
}
}
}
pub fn record_await_point(&self, ptr: usize, location: &str, await_type: AwaitType) {
let await_point = AwaitPoint {
ptr,
location: location.to_string(),
await_type,
timestamp: current_timestamp(),
thread_id: format!("{:?}", std::thread::current().id()),
duration: None, };
if let Ok(mut awaits) = self.await_points.lock() {
awaits.push(await_point);
}
if let Ok(mut futures) = self.active_futures.lock() {
if let Some(future_info) = futures.get_mut(&ptr) {
future_info.await_count += 1;
}
}
}
pub fn complete_await_point(&self, ptr: usize, location: &str) {
let completion_time = current_timestamp();
if let Ok(mut awaits) = self.await_points.lock() {
for await_point in awaits.iter_mut().rev() {
if await_point.ptr == ptr
&& await_point.location == location
&& await_point.duration.is_none()
{
await_point.duration = Some(completion_time - await_point.timestamp);
break;
}
}
}
}
pub fn complete_future(&self, ptr: usize, result: FutureResult) {
let completion_time = current_timestamp();
if let Ok(mut futures) = self.active_futures.lock() {
if let Some(future_info) = futures.get_mut(&ptr) {
future_info.completion_time = Some(completion_time);
future_info.current_state = match result {
FutureResult::Ready => FutureState::Ready,
FutureResult::Cancelled => FutureState::Cancelled,
FutureResult::Panicked => FutureState::Panicked,
};
}
}
let event = TaskEvent {
ptr,
event_type: TaskEventType::Completed,
timestamp: completion_time,
thread_id: format!("{:?}", std::thread::current().id()),
details: format!("Future completed with result: {result:?}"),
};
if let Ok(mut events) = self.task_events.lock() {
events.push(event);
}
}
pub fn get_async_statistics(&self) -> AsyncStatistics {
let futures = self
.active_futures
.safe_lock()
.expect("Failed to acquire lock on active_futures");
let transitions = self
.state_transitions
.safe_lock()
.expect("Failed to acquire lock on state_transitions");
let awaits = self
.await_points
.safe_lock()
.expect("Failed to acquire lock on await_points");
let _events = self
.task_events
.safe_lock()
.expect("Failed to acquire lock on task_events");
let total_futures = futures.len();
let completed_futures = futures
.values()
.filter(|f| f.completion_time.is_some())
.count();
let active_futures = total_futures - completed_futures;
let completion_times: Vec<u64> = futures
.values()
.filter_map(|f| {
if let (Some(completion), creation) = (f.completion_time, f.creation_time) {
Some(completion - creation)
} else {
None
}
})
.collect();
let avg_completion_time = if !completion_times.is_empty() {
completion_times.iter().sum::<u64>() / completion_times.len() as u64
} else {
0
};
let total_awaits = awaits.len();
let completed_awaits = awaits.iter().filter(|a| a.duration.is_some()).count();
let await_durations: Vec<u64> = awaits.iter().filter_map(|a| a.duration).collect();
let avg_await_duration = if !await_durations.is_empty() {
await_durations.iter().sum::<u64>() / await_durations.len() as u64
} else {
0
};
let mut by_type = HashMap::new();
for future in futures.values() {
*by_type.entry(future.future_type.clone()).or_insert(0) += 1;
}
AsyncStatistics {
total_futures,
active_futures,
completed_futures,
total_state_transitions: transitions.len(),
total_awaits,
completed_awaits,
avg_completion_time,
avg_await_duration,
by_type,
}
}
pub fn analyze_async_patterns(&self) -> AsyncPatternAnalysis {
let futures = self
.active_futures
.safe_lock()
.expect("Failed to acquire lock on active_futures");
let awaits = self
.await_points
.safe_lock()
.expect("Failed to acquire lock on await_points");
let mut patterns = Vec::new();
let long_running_threshold = 1_000_000_000; let long_running_count = futures
.values()
.filter(|f| {
if let Some(completion) = f.completion_time {
completion - f.creation_time > long_running_threshold
} else {
current_timestamp() - f.creation_time > long_running_threshold
}
})
.count();
if long_running_count > 0 {
patterns.push(AsyncPattern {
pattern_type: AsyncPatternType::LongRunningFutures,
description: format!("{long_running_count} futures running longer than 1 second",),
severity: AsyncPatternSeverity::Warning,
suggestion: "Consider breaking down long-running operations or adding timeouts"
.to_string(),
});
}
let high_poll_threshold = 100;
let high_poll_count = futures
.values()
.filter(|f| f.poll_count > high_poll_threshold)
.count();
if high_poll_count > 0 {
patterns.push(AsyncPattern {
pattern_type: AsyncPatternType::ExcessivePolling,
description: format!(
"{high_poll_count} futures polled more than {high_poll_threshold} times",
),
severity: AsyncPatternSeverity::Warning,
suggestion: "High poll count may indicate inefficient async design".to_string(),
});
}
let high_concurrency_threshold = 50;
if futures.len() > high_concurrency_threshold {
patterns.push(AsyncPattern {
pattern_type: AsyncPatternType::HighConcurrency,
description: format!("{} concurrent futures detected", futures.len()),
severity: AsyncPatternSeverity::Info,
suggestion:
"High concurrency - ensure this is intentional and resources are managed"
.to_string(),
});
}
let slow_await_threshold = 100_000_000; let slow_awaits = awaits
.iter()
.filter(|a| a.duration.is_some_and(|d| d > slow_await_threshold))
.count();
if slow_awaits > 0 {
patterns.push(AsyncPattern {
pattern_type: AsyncPatternType::SlowAwaitPoints,
description: format!("{slow_awaits} await points took longer than 100ms"),
severity: AsyncPatternSeverity::Warning,
suggestion: "Slow await points may indicate blocking operations in async code"
.to_string(),
});
}
AsyncPatternAnalysis {
patterns,
total_futures_analyzed: futures.len(),
analysis_timestamp: current_timestamp(),
}
}
pub fn get_future_info(&self, ptr: usize) -> Option<FutureInfo> {
self.active_futures
.safe_lock()
.expect("Failed to acquire lock on active_futures")
.get(&ptr)
.cloned()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FutureInfo {
pub ptr: usize,
pub future_type: String,
pub current_state: FutureState,
pub creation_time: u64,
pub completion_time: Option<u64>,
pub state_history: Vec<FutureState>,
pub await_count: usize,
pub poll_count: usize,
pub thread_id: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum FutureState {
Pending,
Ready,
Cancelled,
Panicked,
Created,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateTransition {
pub ptr: usize,
pub from_state: FutureState,
pub to_state: FutureState,
pub timestamp: u64,
pub thread_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwaitPoint {
pub ptr: usize,
pub location: String,
pub await_type: AwaitType,
pub timestamp: u64,
pub thread_id: String,
pub duration: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum AwaitType {
Regular,
Timeout,
Select,
Join,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskEvent {
pub ptr: usize,
pub event_type: TaskEventType,
pub timestamp: u64,
pub thread_id: String,
pub details: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum TaskEventType {
Created,
Started,
Suspended,
Resumed,
Completed,
Cancelled,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum FutureResult {
Ready,
Cancelled,
Panicked,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AsyncStatistics {
pub total_futures: usize,
pub active_futures: usize,
pub completed_futures: usize,
pub total_state_transitions: usize,
pub total_awaits: usize,
pub completed_awaits: usize,
pub avg_completion_time: u64,
pub avg_await_duration: u64,
pub by_type: HashMap<String, usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AsyncPatternAnalysis {
pub patterns: Vec<AsyncPattern>,
pub total_futures_analyzed: usize,
pub analysis_timestamp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AsyncPattern {
pub pattern_type: AsyncPatternType,
pub description: String,
pub severity: AsyncPatternSeverity,
pub suggestion: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum AsyncPatternType {
LongRunningFutures,
ExcessivePolling,
HighConcurrency,
SlowAwaitPoints,
FutureMemoryLeaks,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum AsyncPatternSeverity {
Info,
Warning,
Error,
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_future_tracking() {
let analyzer = AsyncAnalyzer::new();
analyzer.track_future(0x1000, "async_fn", FutureState::Created);
let info = analyzer.get_future_info(0x1000);
assert!(info.is_some());
assert_eq!(
info.expect("Failed to get async info").future_type,
"async_fn"
);
analyzer.record_state_transition(0x1000, FutureState::Created, FutureState::Pending);
let info = analyzer.get_future_info(0x1000);
assert_eq!(
info.expect("Failed to get async info").current_state,
FutureState::Pending
);
}
#[test]
fn test_await_tracking() {
let analyzer = AsyncAnalyzer::new();
analyzer.track_future(0x1000, "async_fn", FutureState::Created);
analyzer.record_await_point(0x1000, "line_42", AwaitType::Regular);
analyzer.complete_await_point(0x1000, "line_42");
let stats = analyzer.get_async_statistics();
assert_eq!(stats.total_awaits, 1);
assert_eq!(stats.completed_awaits, 1);
}
}