memscope_rs/unified/strategies/
async_strategy.rs

1// Async Memory Tracking Strategy
2// Optimized implementation for async/await applications
3// Uses task-local storage and async context awareness
4
5use crate::unified::tracking_dispatcher::{
6    MemoryTracker, TrackerConfig, TrackerError, TrackerStatistics, TrackerType,
7};
8use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
9use std::sync::Arc;
10use tracing::{debug, info, warn};
11
12/// Async memory tracking strategy
13/// Specialized for async/await applications with task-local tracking
14/// Provides context-aware tracking across async task boundaries
15pub struct AsyncStrategy {
16    /// Configuration for this strategy instance
17    config: Option<TrackerConfig>,
18    /// Global async tracking state
19    global_state: Arc<AsyncGlobalState>,
20    /// Task registry for managing async tasks
21    task_registry: Arc<AsyncTaskRegistry>,
22    /// Aggregated metrics across all tasks
23    global_metrics: Arc<AsyncGlobalMetrics>,
24}
25
26/// Global state for async tracking coordination
27/// Maintains task-level coordination without blocking async execution
28#[derive(Debug)]
29struct AsyncGlobalState {
30    /// Whether async tracking is currently active
31    is_active: AtomicU64, // 0 = inactive, 1 = active
32    /// Total number of active tracking tasks
33    active_tasks: AtomicUsize,
34    /// Session start timestamp
35    session_start_ns: AtomicU64,
36    /// Next allocation ID (globally unique across tasks)
37    next_allocation_id: AtomicU64,
38    /// Next task ID for task identification
39    next_task_id: AtomicU64,
40}
41
42/// Registry for async tasks participating in tracking
43/// Manages task lifecycle and coordination
44#[derive(Debug)]
45struct AsyncTaskRegistry {
46    /// Total tasks that have registered for tracking
47    total_registered_tasks: AtomicUsize,
48    /// Currently active tracking tasks
49    active_tracking_tasks: AtomicUsize,
50    /// Peak concurrent tasks
51    peak_concurrent_tasks: AtomicUsize,
52}
53
54/// Global metrics aggregated from all async tasks
55/// Provides system-wide view of async memory tracking
56#[derive(Debug)]
57struct AsyncGlobalMetrics {
58    /// Total allocations across all tasks
59    total_allocations: AtomicU64,
60    /// Total bytes allocated across all tasks
61    total_bytes_allocated: AtomicU64,
62    /// Total task spawns tracked
63    total_task_spawns: AtomicU64,
64    /// Average task lifetime (nanoseconds)
65    _avg_task_lifetime_ns: AtomicU64,
66    /// Total tracking overhead in bytes
67    total_overhead_bytes: AtomicUsize,
68}
69
70impl Default for AsyncGlobalState {
71    /// Initialize async global state with inactive values
72    fn default() -> Self {
73        Self {
74            is_active: AtomicU64::new(0),
75            active_tasks: AtomicUsize::new(0),
76            session_start_ns: AtomicU64::new(0),
77            next_allocation_id: AtomicU64::new(1),
78            next_task_id: AtomicU64::new(1),
79        }
80    }
81}
82
83impl Default for AsyncTaskRegistry {
84    /// Initialize task registry with zero counts
85    fn default() -> Self {
86        Self {
87            total_registered_tasks: AtomicUsize::new(0),
88            active_tracking_tasks: AtomicUsize::new(0),
89            peak_concurrent_tasks: AtomicUsize::new(0),
90        }
91    }
92}
93
94impl Default for AsyncGlobalMetrics {
95    /// Initialize async metrics with zero values
96    fn default() -> Self {
97        Self {
98            total_allocations: AtomicU64::new(0),
99            total_bytes_allocated: AtomicU64::new(0),
100            total_task_spawns: AtomicU64::new(0),
101            _avg_task_lifetime_ns: AtomicU64::new(0),
102            total_overhead_bytes: AtomicUsize::new(0),
103        }
104    }
105}
106
107impl AsyncStrategy {
108    /// Create new async strategy instance
109    /// Initializes async coordination structures
110    pub fn new() -> Self {
111        debug!("Creating new async strategy");
112
113        Self {
114            config: None,
115            global_state: Arc::new(AsyncGlobalState::default()),
116            task_registry: Arc::new(AsyncTaskRegistry::default()),
117            global_metrics: Arc::new(AsyncGlobalMetrics::default()),
118        }
119    }
120
121    /// Register current async task for tracking
122    /// Should be called when entering async context
123    pub fn register_current_task(&self) -> Result<u64, TrackerError> {
124        let task_id = self
125            .global_state
126            .next_task_id
127            .fetch_add(1, Ordering::Relaxed);
128
129        debug!("Registering async task for tracking: id={}", task_id);
130
131        // Update task registry
132        self.task_registry
133            .total_registered_tasks
134            .fetch_add(1, Ordering::Relaxed);
135        let current_active = self
136            .task_registry
137            .active_tracking_tasks
138            .fetch_add(1, Ordering::Relaxed)
139            + 1;
140
141        // Update peak concurrent tasks
142        let current_peak = self
143            .task_registry
144            .peak_concurrent_tasks
145            .load(Ordering::Relaxed);
146        if current_active > current_peak {
147            self.task_registry
148                .peak_concurrent_tasks
149                .store(current_active, Ordering::Relaxed);
150        }
151
152        // Update global metrics
153        self.global_metrics
154            .total_task_spawns
155            .fetch_add(1, Ordering::Relaxed);
156
157        info!(
158            "Async task registered: id={}, active_tasks={}",
159            task_id, current_active
160        );
161        Ok(task_id)
162    }
163
164    /// Get high-precision timestamp in nanoseconds
165    fn get_timestamp_ns() -> u64 {
166        std::time::SystemTime::now()
167            .duration_since(std::time::UNIX_EPOCH)
168            .map(|d| d.as_nanos() as u64)
169            .unwrap_or(0)
170    }
171
172    /// Export collected data as JSON compatible with existing format
173    fn export_as_json(&self) -> Result<String, TrackerError> {
174        // Note: In a complete implementation, this would collect data from
175        // task-local storage across all async tasks. For this demo, we return
176        // basic structure with metadata.
177
178        let mut output = serde_json::Map::new();
179        output.insert("allocations".to_string(), serde_json::Value::Array(vec![]));
180        output.insert("strategy_metadata".to_string(), serde_json::json!({
181            "strategy_type": "async",
182            "total_allocations": self.global_metrics.total_allocations.load(Ordering::Relaxed),
183            "total_bytes": self.global_metrics.total_bytes_allocated.load(Ordering::Relaxed),
184            "total_tasks": self.task_registry.total_registered_tasks.load(Ordering::Relaxed),
185            "peak_concurrent_tasks": self.task_registry.peak_concurrent_tasks.load(Ordering::Relaxed),
186            "overhead_bytes": self.global_metrics.total_overhead_bytes.load(Ordering::Relaxed)
187        }));
188
189        serde_json::to_string_pretty(&output).map_err(|e| TrackerError::DataCollectionFailed {
190            reason: format!("JSON serialization failed: {}", e),
191        })
192    }
193}
194
195impl MemoryTracker for AsyncStrategy {
196    /// Initialize strategy with provided configuration
197    fn initialize(&mut self, config: TrackerConfig) -> Result<(), TrackerError> {
198        debug!("Initializing async strategy with config: {:?}", config);
199
200        // Validate configuration
201        if config.sample_rate < 0.0 || config.sample_rate > 1.0 {
202            return Err(TrackerError::InvalidConfiguration {
203                reason: "Sample rate must be between 0.0 and 1.0".to_string(),
204            });
205        }
206
207        // Store configuration
208        self.config = Some(config);
209
210        // Reset global state
211        self.global_state.is_active.store(0, Ordering::Relaxed);
212        self.global_state.active_tasks.store(0, Ordering::Relaxed);
213        self.global_state
214            .next_allocation_id
215            .store(1, Ordering::Relaxed);
216        self.global_state.next_task_id.store(1, Ordering::Relaxed);
217
218        // Reset metrics
219        self.global_metrics
220            .total_allocations
221            .store(0, Ordering::Relaxed);
222        self.global_metrics
223            .total_bytes_allocated
224            .store(0, Ordering::Relaxed);
225        self.global_metrics
226            .total_task_spawns
227            .store(0, Ordering::Relaxed);
228
229        info!("Async strategy initialized successfully");
230        Ok(())
231    }
232
233    /// Start active memory tracking for async contexts
234    fn start_tracking(&mut self) -> Result<(), TrackerError> {
235        debug!("Starting async tracking");
236
237        let was_active = self.global_state.is_active.swap(1, Ordering::Relaxed);
238        if was_active == 1 {
239            warn!("Async tracking was already active");
240            return Ok(());
241        }
242
243        // Record session start time
244        self.global_state
245            .session_start_ns
246            .store(Self::get_timestamp_ns(), Ordering::Relaxed);
247
248        info!("Async tracking started successfully");
249        Ok(())
250    }
251
252    /// Stop tracking and collect data from all async tasks
253    fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
254        debug!("Stopping async tracking");
255
256        let was_active = self.global_state.is_active.swap(0, Ordering::Relaxed);
257        if was_active == 0 {
258            warn!("Async tracking was not active");
259        }
260
261        // Export data
262        let json_data = self.export_as_json()?;
263
264        let total_allocations = self
265            .global_metrics
266            .total_allocations
267            .load(Ordering::Relaxed);
268        let total_tasks = self
269            .task_registry
270            .total_registered_tasks
271            .load(Ordering::Relaxed);
272
273        info!(
274            "Async tracking stopped: {} allocations from {} tasks",
275            total_allocations, total_tasks
276        );
277
278        Ok(json_data.into_bytes())
279    }
280
281    /// Get current tracking statistics
282    fn get_statistics(&self) -> TrackerStatistics {
283        TrackerStatistics {
284            allocations_tracked: self
285                .global_metrics
286                .total_allocations
287                .load(Ordering::Relaxed),
288            memory_tracked_bytes: self
289                .global_metrics
290                .total_bytes_allocated
291                .load(Ordering::Relaxed),
292            overhead_bytes: self
293                .global_metrics
294                .total_overhead_bytes
295                .load(Ordering::Relaxed) as u64,
296            tracking_duration_ms: {
297                let start_ns = self.global_state.session_start_ns.load(Ordering::Relaxed);
298                if start_ns > 0 {
299                    (Self::get_timestamp_ns() - start_ns) / 1_000_000
300                } else {
301                    0
302                }
303            },
304        }
305    }
306
307    /// Check if strategy is currently active
308    fn is_active(&self) -> bool {
309        self.global_state.is_active.load(Ordering::Relaxed) == 1
310    }
311
312    /// Get strategy type identifier
313    fn tracker_type(&self) -> TrackerType {
314        TrackerType::AsyncTracker
315    }
316}
317
318impl Default for AsyncStrategy {
319    /// Create strategy with default configuration
320    fn default() -> Self {
321        Self::new()
322    }
323}