memscope_rs/async_memory/
api.rs1use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::async_memory::buffer::get_buffer_stats;
13use crate::async_memory::error::AsyncResult;
14use crate::async_memory::task_id::{generate_task_id, set_current_task, TaskInfo};
15
16pub fn initialize() -> AsyncResult<()> {
21 tracing::info!("Async memory tracking system initialized");
24 Ok(())
25}
26
27pub fn create_tracked<F>(future: F) -> TrackedFuture<F>
34where
35 F: Future,
36{
37 TrackedFuture::new(future)
38}
39
40pub struct TrackedFuture<F> {
45 inner: Pin<Box<F>>,
46 task_id: Option<crate::async_memory::TaskId>,
47}
48
49impl<F> TrackedFuture<F>
50where
51 F: Future,
52{
53 pub fn new(future: F) -> Self {
55 Self {
56 inner: Box::pin(future),
57 task_id: None,
58 }
59 }
60}
61
62impl<F> Future for TrackedFuture<F>
63where
64 F: Future,
65{
66 type Output = F::Output;
67
68 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69 if self.task_id.is_none() {
71 match generate_task_id(cx) {
72 Ok(id) => self.task_id = Some(id),
73 Err(e) => {
74 tracing::warn!("Failed to generate task ID: {}", e);
75 }
77 }
78 }
79
80 if let Some(task_id) = self.task_id {
82 let task_info = TaskInfo::new(task_id, None);
83 set_current_task(task_info);
84
85 let result = self.inner.as_mut().poll(cx);
87
88 if result.is_ready() {
90 crate::async_memory::task_id::clear_current_task();
91 }
92
93 result
94 } else {
95 self.inner.as_mut().poll(cx)
97 }
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct AsyncMemorySnapshot {
104 pub active_task_count: usize,
106 pub total_allocated_bytes: u64,
108 pub allocation_events: u64,
110 pub events_dropped: u64,
112 pub buffer_utilization: f64,
114}
115
116impl AsyncMemorySnapshot {
117 pub fn active_task_count(&self) -> usize {
119 self.active_task_count
120 }
121
122 pub fn total_allocated(&self) -> u64 {
124 self.total_allocated_bytes
125 }
126
127 pub fn has_good_data_quality(&self) -> bool {
129 if self.allocation_events == 0 {
130 return true;
131 }
132 let drop_rate = self.events_dropped as f64 / self.allocation_events as f64;
133 drop_rate < 0.05
134 }
135
136 pub fn data_quality_warning(&self) -> Option<String> {
138 if !self.has_good_data_quality() && self.allocation_events > 0 {
139 let drop_rate = (self.events_dropped as f64 / self.allocation_events as f64) * 100.0;
140 Some(format!(
141 "Poor data quality: {:.1}% of events dropped. Consider increasing buffer size.",
142 drop_rate
143 ))
144 } else {
145 None
146 }
147 }
148}
149
150pub fn get_memory_snapshot() -> AsyncMemorySnapshot {
156 let buffer_stats = get_buffer_stats();
157
158 AsyncMemorySnapshot {
160 active_task_count: if buffer_stats.current_events > 0 {
161 1
162 } else {
163 0
164 },
165 total_allocated_bytes: buffer_stats.current_events as u64 * 1024, allocation_events: buffer_stats.current_events as u64,
167 events_dropped: buffer_stats.events_dropped as u64,
168 buffer_utilization: buffer_stats.utilization,
169 }
170}
171
172pub fn is_tracking_active() -> bool {
174 crate::async_memory::allocator::is_tracking_enabled()
175}
176
177pub fn spawn_tracked<F>(future: F) -> TrackedFuture<F>
198where
199 F: Future,
200{
201 create_tracked(future)
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use std::task::{RawWaker, RawWakerVTable, Waker};
208
209 fn create_test_waker() -> Waker {
211 fn noop(_: *const ()) {}
212 fn clone_waker(data: *const ()) -> RawWaker {
213 RawWaker::new(data, &VTABLE)
214 }
215
216 const VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, noop, noop, noop);
217
218 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
219 }
220
221 #[test]
222 fn test_initialization() {
223 let result = initialize();
224 assert!(result.is_ok());
225 }
226
227 #[test]
228 fn test_multiple_initialization() {
229 assert!(initialize().is_ok());
231 assert!(initialize().is_ok());
232 assert!(initialize().is_ok());
233 }
234
235 #[test]
236 fn test_memory_snapshot_good_quality() {
237 let snapshot = AsyncMemorySnapshot {
238 active_task_count: 1,
239 total_allocated_bytes: 1024,
240 allocation_events: 100,
241 events_dropped: 0,
242 buffer_utilization: 0.5,
243 };
244
245 assert!(snapshot.buffer_utilization >= 0.0);
246 assert!(snapshot.buffer_utilization <= 1.0);
247 assert_eq!(snapshot.active_task_count(), 1);
248 assert_eq!(snapshot.total_allocated(), 1024);
249 assert!(snapshot.has_good_data_quality());
250 assert!(snapshot.data_quality_warning().is_none());
251 }
252
253 #[test]
254 fn test_memory_snapshot_poor_quality() {
255 let snapshot = AsyncMemorySnapshot {
256 active_task_count: 2,
257 total_allocated_bytes: 2048,
258 allocation_events: 100,
259 events_dropped: 10, buffer_utilization: 0.9,
261 };
262
263 assert!(!snapshot.has_good_data_quality());
264 let warning = snapshot.data_quality_warning();
265 assert!(warning.is_some());
266 let warning_msg = warning.unwrap();
267 assert!(warning_msg.contains("10.0%"));
268 assert!(warning_msg.contains("Poor data quality"));
269 }
270
271 #[test]
272 fn test_memory_snapshot_edge_cases() {
273 let snapshot = AsyncMemorySnapshot {
275 active_task_count: 0,
276 total_allocated_bytes: 0,
277 allocation_events: 0,
278 events_dropped: 0,
279 buffer_utilization: 0.0,
280 };
281 assert!(snapshot.has_good_data_quality());
282 assert!(snapshot.data_quality_warning().is_none());
283
284 let snapshot = AsyncMemorySnapshot {
286 active_task_count: 0,
287 total_allocated_bytes: 0,
288 allocation_events: 0,
289 events_dropped: 100, buffer_utilization: 0.0,
291 };
292 assert!(snapshot.has_good_data_quality());
293 }
294
295 #[test]
296 fn test_memory_snapshot_boundary_conditions() {
297 let snapshot = AsyncMemorySnapshot {
299 active_task_count: 1,
300 total_allocated_bytes: 1000,
301 allocation_events: 100,
302 events_dropped: 5, buffer_utilization: 0.5,
304 };
305 assert!(!snapshot.has_good_data_quality()); assert!(snapshot.data_quality_warning().is_some());
307
308 let snapshot = AsyncMemorySnapshot {
310 active_task_count: 1,
311 total_allocated_bytes: 1000,
312 allocation_events: 1000,
313 events_dropped: 49, buffer_utilization: 0.5,
315 };
316 assert!(snapshot.has_good_data_quality());
317 assert!(snapshot.data_quality_warning().is_none());
318 }
319
320 #[test]
321 fn test_tracked_future_creation() {
322 let future = async { 42 };
323 let tracked = create_tracked(future);
324
325 assert!(tracked.task_id.is_none());
327 }
328
329 #[test]
330 fn test_spawn_tracked_alias() {
331 let future = async { "hello" };
332 let tracked = spawn_tracked(future);
333
334 assert!(tracked.task_id.is_none());
336 }
337
338 #[test]
339 fn test_tracked_future_poll_ready() {
340 let future = async { 123 };
341 let mut tracked = create_tracked(future);
342 let waker = create_test_waker();
343 let mut cx = Context::from_waker(&waker);
344
345 let result = Pin::new(&mut tracked).poll(&mut cx);
347 match result {
348 Poll::Ready(value) => assert_eq!(value, 123),
349 Poll::Pending => {
350 }
352 }
353 }
354
355 #[test]
356 fn test_tracked_future_multiple_polls() {
357 use std::sync::{Arc, Mutex};
358
359 let poll_count = Arc::new(Mutex::new(0));
360 let poll_count_clone = poll_count.clone();
361
362 let future = async move {
364 let should_wait = {
365 let mut count = poll_count_clone.lock().unwrap();
366 *count += 1;
367 *count == 1
368 };
369 if should_wait {
370 std::future::pending::<()>().await;
372 }
373 "completed"
374 };
375
376 let mut tracked = create_tracked(future);
377 let waker = create_test_waker();
378 let mut cx = Context::from_waker(&waker);
379
380 let _result1 = Pin::new(&mut tracked).poll(&mut cx);
382
383 let _result2 = Pin::new(&mut tracked).poll(&mut cx);
385
386 }
389
390 #[test]
391 fn test_tracked_future_task_context() {
392 use crate::async_memory::task_id::{clear_current_task, get_current_task};
393
394 clear_current_task();
396
397 let future = async {
398 let _task_info = get_current_task();
400 true };
402
403 let mut tracked = create_tracked(future);
404 let waker = create_test_waker();
405 let mut cx = Context::from_waker(&waker);
406
407 let _result = Pin::new(&mut tracked).poll(&mut cx);
409
410 let _current_task = get_current_task();
412 }
414
415 #[test]
416 fn test_is_tracking_active() {
417 let _is_active = is_tracking_active();
419 }
421
422 #[test]
423 fn test_get_memory_snapshot_integration() {
424 let snapshot = get_memory_snapshot();
426
427 assert!(snapshot.buffer_utilization >= 0.0);
429 assert!(snapshot.buffer_utilization <= 1.0);
430 }
432
433 #[test]
434 fn test_async_memory_snapshot_debug() {
435 let snapshot = AsyncMemorySnapshot {
436 active_task_count: 5,
437 total_allocated_bytes: 4096,
438 allocation_events: 200,
439 events_dropped: 1,
440 buffer_utilization: 0.75,
441 };
442
443 let debug_str = format!("{:?}", snapshot);
444 assert!(debug_str.contains("active_task_count: 5"));
445 assert!(debug_str.contains("total_allocated_bytes: 4096"));
446 }
447
448 #[test]
449 fn test_async_memory_snapshot_clone() {
450 let original = AsyncMemorySnapshot {
451 active_task_count: 3,
452 total_allocated_bytes: 1024,
453 allocation_events: 50,
454 events_dropped: 0,
455 buffer_utilization: 0.25,
456 };
457
458 let cloned = original.clone();
459 assert_eq!(original.active_task_count, cloned.active_task_count);
460 assert_eq!(original.total_allocated_bytes, cloned.total_allocated_bytes);
461 assert_eq!(original.allocation_events, cloned.allocation_events);
462 assert_eq!(original.events_dropped, cloned.events_dropped);
463 assert_eq!(original.buffer_utilization, cloned.buffer_utilization);
464 }
465
466 #[test]
467 fn test_data_quality_warning_formatting() {
468 let snapshot = AsyncMemorySnapshot {
469 active_task_count: 1,
470 total_allocated_bytes: 1000,
471 allocation_events: 100,
472 events_dropped: 25, buffer_utilization: 0.8,
474 };
475
476 let warning = snapshot.data_quality_warning().unwrap();
477 assert!(warning.contains("25.0%"));
478 assert!(warning.contains("buffer size"));
479 }
480
481 #[test]
482 fn test_tracked_future_error_handling() {
483 let future = async { "test" };
485 let mut tracked = TrackedFuture::new(future);
486
487 assert!(tracked.task_id.is_none());
489
490 let waker = create_test_waker();
491 let mut cx = Context::from_waker(&waker);
492
493 let _result = Pin::new(&mut tracked).poll(&mut cx);
495
496 }
498
499 #[test]
500 fn test_tracked_future_new_constructor() {
501 let future = async { vec![1, 2, 3] };
502 let tracked = TrackedFuture::new(future);
503
504 assert!(tracked.task_id.is_none());
505 }
507
508 #[test]
509 fn test_memory_snapshot_large_numbers() {
510 let snapshot = AsyncMemorySnapshot {
511 active_task_count: usize::MAX,
512 total_allocated_bytes: u64::MAX,
513 allocation_events: u64::MAX / 2,
514 events_dropped: u64::MAX / 4,
515 buffer_utilization: 1.0,
516 };
517
518 assert_eq!(snapshot.active_task_count(), usize::MAX);
520 assert_eq!(snapshot.total_allocated(), u64::MAX);
521 assert!(!snapshot.has_good_data_quality()); assert!(snapshot.data_quality_warning().is_some());
523 }
524
525 #[test]
526 fn test_tracked_future_with_different_output_types() {
527 let string_future = create_tracked(async { String::from("test") });
529 let number_future = create_tracked(async { 42u64 });
530 let unit_future = create_tracked(async {});
531 let option_future = create_tracked(async { Some(100) });
532 let result_future = create_tracked(async { Ok::<_, &str>(200) });
533
534 drop(string_future);
536 drop(number_future);
537 drop(unit_future);
538 drop(option_future);
539 drop(result_future);
540 }
541}