memscope_rs/unified/strategies/
async_strategy.rs1use crate::unified::tracking_dispatcher::{
6 MemoryTracker, TrackerConfig, TrackerError, TrackerStatistics, TrackerType,
7};
8use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
9use std::sync::Arc;
10use tracing::{debug, info, warn};
11
12pub struct AsyncStrategy {
16 config: Option<TrackerConfig>,
18 global_state: Arc<AsyncGlobalState>,
20 task_registry: Arc<AsyncTaskRegistry>,
22 global_metrics: Arc<AsyncGlobalMetrics>,
24}
25
26#[derive(Debug)]
29struct AsyncGlobalState {
30 is_active: AtomicU64, active_tasks: AtomicUsize,
34 session_start_ns: AtomicU64,
36 next_allocation_id: AtomicU64,
38 next_task_id: AtomicU64,
40}
41
42#[derive(Debug)]
45struct AsyncTaskRegistry {
46 total_registered_tasks: AtomicUsize,
48 active_tracking_tasks: AtomicUsize,
50 peak_concurrent_tasks: AtomicUsize,
52}
53
54#[derive(Debug)]
57struct AsyncGlobalMetrics {
58 total_allocations: AtomicU64,
60 total_bytes_allocated: AtomicU64,
62 total_task_spawns: AtomicU64,
64 _avg_task_lifetime_ns: AtomicU64,
66 total_overhead_bytes: AtomicUsize,
68}
69
70impl Default for AsyncGlobalState {
71 fn default() -> Self {
73 Self {
74 is_active: AtomicU64::new(0),
75 active_tasks: AtomicUsize::new(0),
76 session_start_ns: AtomicU64::new(0),
77 next_allocation_id: AtomicU64::new(1),
78 next_task_id: AtomicU64::new(1),
79 }
80 }
81}
82
83impl Default for AsyncTaskRegistry {
84 fn default() -> Self {
86 Self {
87 total_registered_tasks: AtomicUsize::new(0),
88 active_tracking_tasks: AtomicUsize::new(0),
89 peak_concurrent_tasks: AtomicUsize::new(0),
90 }
91 }
92}
93
94impl Default for AsyncGlobalMetrics {
95 fn default() -> Self {
97 Self {
98 total_allocations: AtomicU64::new(0),
99 total_bytes_allocated: AtomicU64::new(0),
100 total_task_spawns: AtomicU64::new(0),
101 _avg_task_lifetime_ns: AtomicU64::new(0),
102 total_overhead_bytes: AtomicUsize::new(0),
103 }
104 }
105}
106
107impl AsyncStrategy {
108 pub fn new() -> Self {
111 debug!("Creating new async strategy");
112
113 Self {
114 config: None,
115 global_state: Arc::new(AsyncGlobalState::default()),
116 task_registry: Arc::new(AsyncTaskRegistry::default()),
117 global_metrics: Arc::new(AsyncGlobalMetrics::default()),
118 }
119 }
120
121 pub fn register_current_task(&self) -> Result<u64, TrackerError> {
124 let task_id = self
125 .global_state
126 .next_task_id
127 .fetch_add(1, Ordering::Relaxed);
128
129 debug!("Registering async task for tracking: id={}", task_id);
130
131 self.task_registry
133 .total_registered_tasks
134 .fetch_add(1, Ordering::Relaxed);
135 let current_active = self
136 .task_registry
137 .active_tracking_tasks
138 .fetch_add(1, Ordering::Relaxed)
139 + 1;
140
141 let current_peak = self
143 .task_registry
144 .peak_concurrent_tasks
145 .load(Ordering::Relaxed);
146 if current_active > current_peak {
147 self.task_registry
148 .peak_concurrent_tasks
149 .store(current_active, Ordering::Relaxed);
150 }
151
152 self.global_metrics
154 .total_task_spawns
155 .fetch_add(1, Ordering::Relaxed);
156
157 info!(
158 "Async task registered: id={}, active_tasks={}",
159 task_id, current_active
160 );
161 Ok(task_id)
162 }
163
164 fn get_timestamp_ns() -> u64 {
166 std::time::SystemTime::now()
167 .duration_since(std::time::UNIX_EPOCH)
168 .map(|d| d.as_nanos() as u64)
169 .unwrap_or(0)
170 }
171
172 fn export_as_json(&self) -> Result<String, TrackerError> {
174 let mut output = serde_json::Map::new();
179 output.insert("allocations".to_string(), serde_json::Value::Array(vec![]));
180 output.insert("strategy_metadata".to_string(), serde_json::json!({
181 "strategy_type": "async",
182 "total_allocations": self.global_metrics.total_allocations.load(Ordering::Relaxed),
183 "total_bytes": self.global_metrics.total_bytes_allocated.load(Ordering::Relaxed),
184 "total_tasks": self.task_registry.total_registered_tasks.load(Ordering::Relaxed),
185 "peak_concurrent_tasks": self.task_registry.peak_concurrent_tasks.load(Ordering::Relaxed),
186 "overhead_bytes": self.global_metrics.total_overhead_bytes.load(Ordering::Relaxed)
187 }));
188
189 serde_json::to_string_pretty(&output).map_err(|e| TrackerError::DataCollectionFailed {
190 reason: format!("JSON serialization failed: {}", e),
191 })
192 }
193}
194
195impl MemoryTracker for AsyncStrategy {
196 fn initialize(&mut self, config: TrackerConfig) -> Result<(), TrackerError> {
198 debug!("Initializing async strategy with config: {:?}", config);
199
200 if config.sample_rate < 0.0 || config.sample_rate > 1.0 {
202 return Err(TrackerError::InvalidConfiguration {
203 reason: "Sample rate must be between 0.0 and 1.0".to_string(),
204 });
205 }
206
207 self.config = Some(config);
209
210 self.global_state.is_active.store(0, Ordering::Relaxed);
212 self.global_state.active_tasks.store(0, Ordering::Relaxed);
213 self.global_state
214 .next_allocation_id
215 .store(1, Ordering::Relaxed);
216 self.global_state.next_task_id.store(1, Ordering::Relaxed);
217
218 self.global_metrics
220 .total_allocations
221 .store(0, Ordering::Relaxed);
222 self.global_metrics
223 .total_bytes_allocated
224 .store(0, Ordering::Relaxed);
225 self.global_metrics
226 .total_task_spawns
227 .store(0, Ordering::Relaxed);
228
229 info!("Async strategy initialized successfully");
230 Ok(())
231 }
232
233 fn start_tracking(&mut self) -> Result<(), TrackerError> {
235 debug!("Starting async tracking");
236
237 let was_active = self.global_state.is_active.swap(1, Ordering::Relaxed);
238 if was_active == 1 {
239 warn!("Async tracking was already active");
240 return Ok(());
241 }
242
243 self.global_state
245 .session_start_ns
246 .store(Self::get_timestamp_ns(), Ordering::Relaxed);
247
248 info!("Async tracking started successfully");
249 Ok(())
250 }
251
252 fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
254 debug!("Stopping async tracking");
255
256 let was_active = self.global_state.is_active.swap(0, Ordering::Relaxed);
257 if was_active == 0 {
258 warn!("Async tracking was not active");
259 }
260
261 let json_data = self.export_as_json()?;
263
264 let total_allocations = self
265 .global_metrics
266 .total_allocations
267 .load(Ordering::Relaxed);
268 let total_tasks = self
269 .task_registry
270 .total_registered_tasks
271 .load(Ordering::Relaxed);
272
273 info!(
274 "Async tracking stopped: {} allocations from {} tasks",
275 total_allocations, total_tasks
276 );
277
278 Ok(json_data.into_bytes())
279 }
280
281 fn get_statistics(&self) -> TrackerStatistics {
283 TrackerStatistics {
284 allocations_tracked: self
285 .global_metrics
286 .total_allocations
287 .load(Ordering::Relaxed),
288 memory_tracked_bytes: self
289 .global_metrics
290 .total_bytes_allocated
291 .load(Ordering::Relaxed),
292 overhead_bytes: self
293 .global_metrics
294 .total_overhead_bytes
295 .load(Ordering::Relaxed) as u64,
296 tracking_duration_ms: {
297 let start_ns = self.global_state.session_start_ns.load(Ordering::Relaxed);
298 if start_ns > 0 {
299 (Self::get_timestamp_ns() - start_ns) / 1_000_000
300 } else {
301 0
302 }
303 },
304 }
305 }
306
307 fn is_active(&self) -> bool {
309 self.global_state.is_active.load(Ordering::Relaxed) == 1
310 }
311
312 fn tracker_type(&self) -> TrackerType {
314 TrackerType::AsyncTracker
315 }
316}
317
318impl Default for AsyncStrategy {
319 fn default() -> Self {
321 Self::new()
322 }
323}