memscope_rs/async_memory/
api.rs

1//! High-level API for async memory tracking
2//!
3//! Provides user-friendly functions for initializing tracking, spawning
4//! tracked tasks, and retrieving memory statistics.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10// Note: tokio dependency will be added conditionally for async features
11
12use crate::async_memory::buffer::get_buffer_stats;
13use crate::async_memory::error::AsyncResult;
14use crate::async_memory::task_id::{generate_task_id, set_current_task, TaskInfo};
15
16/// Initialize async memory tracking system
17///
18/// Must be called before spawning any tracked tasks.
19/// Sets up background aggregation and monitoring.
20pub fn initialize() -> AsyncResult<()> {
21    // For now, just verify the system is ready
22    // Future implementation would start background aggregator thread
23    tracing::info!("Async memory tracking system initialized");
24    Ok(())
25}
26
27/// Create a tracked future wrapper
28///
29/// Wraps the provided future in a TrackedFuture that automatically
30/// attributes memory allocations to the task.
31///
32/// Note: Use with your preferred async runtime (tokio, async-std, etc.)
33pub fn create_tracked<F>(future: F) -> TrackedFuture<F>
34where
35    F: Future,
36{
37    TrackedFuture::new(future)
38}
39
40/// Future wrapper that provides task-level memory tracking
41///
42/// Automatically sets task context during poll operations,
43/// enabling allocation attribution to the specific task.
44pub struct TrackedFuture<F> {
45    inner: Pin<Box<F>>,
46    task_id: Option<crate::async_memory::TaskId>,
47}
48
49impl<F> TrackedFuture<F>
50where
51    F: Future,
52{
53    /// Create a new tracked future
54    pub fn new(future: F) -> Self {
55        Self {
56            inner: Box::pin(future),
57            task_id: None,
58        }
59    }
60}
61
62impl<F> Future for TrackedFuture<F>
63where
64    F: Future,
65{
66    type Output = F::Output;
67
68    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69        // Generate task ID on first poll
70        if self.task_id.is_none() {
71            match generate_task_id(cx) {
72                Ok(id) => self.task_id = Some(id),
73                Err(e) => {
74                    tracing::warn!("Failed to generate task ID: {}", e);
75                    // Continue without tracking rather than failing
76                }
77            }
78        }
79
80        // Set task context for allocation attribution
81        if let Some(task_id) = self.task_id {
82            let task_info = TaskInfo::new(task_id, None);
83            set_current_task(task_info);
84
85            // Poll the inner future
86            let result = self.inner.as_mut().poll(cx);
87
88            // Clear task context when leaving
89            if result.is_ready() {
90                crate::async_memory::task_id::clear_current_task();
91            }
92
93            result
94        } else {
95            // Poll without tracking if ID generation failed
96            self.inner.as_mut().poll(cx)
97        }
98    }
99}
100
101/// Memory usage snapshot for async tasks
102#[derive(Debug, Clone)]
103pub struct AsyncMemorySnapshot {
104    /// Number of currently active tracked tasks
105    pub active_task_count: usize,
106    /// Total memory allocated by tracked tasks
107    pub total_allocated_bytes: u64,
108    /// Number of allocation events recorded
109    pub allocation_events: u64,
110    /// Number of events dropped due to buffer overflow
111    pub events_dropped: u64,
112    /// Buffer utilization ratio (0.0 to 1.0)
113    pub buffer_utilization: f64,
114}
115
116impl AsyncMemorySnapshot {
117    /// Get the number of active tasks
118    pub fn active_task_count(&self) -> usize {
119        self.active_task_count
120    }
121
122    /// Get total allocated memory in bytes
123    pub fn total_allocated(&self) -> u64 {
124        self.total_allocated_bytes
125    }
126
127    /// Check if data quality is good (< 5% events dropped)
128    pub fn has_good_data_quality(&self) -> bool {
129        if self.allocation_events == 0 {
130            return true;
131        }
132        let drop_rate = self.events_dropped as f64 / self.allocation_events as f64;
133        drop_rate < 0.05
134    }
135
136    /// Get data quality warning if applicable
137    pub fn data_quality_warning(&self) -> Option<String> {
138        if !self.has_good_data_quality() && self.allocation_events > 0 {
139            let drop_rate = (self.events_dropped as f64 / self.allocation_events as f64) * 100.0;
140            Some(format!(
141                "Poor data quality: {:.1}% of events dropped. Consider increasing buffer size.",
142                drop_rate
143            ))
144        } else {
145            None
146        }
147    }
148}
149
150/// Get current memory usage snapshot
151///
152/// Returns statistics about async task memory usage.
153/// This is a simplified implementation - production version would
154/// aggregate data from all threads and the background aggregator.
155pub fn get_memory_snapshot() -> AsyncMemorySnapshot {
156    let buffer_stats = get_buffer_stats();
157
158    // Simplified snapshot - real implementation would aggregate from all sources
159    AsyncMemorySnapshot {
160        active_task_count: if buffer_stats.current_events > 0 {
161            1
162        } else {
163            0
164        },
165        total_allocated_bytes: buffer_stats.current_events as u64 * 1024, // Rough estimate
166        allocation_events: buffer_stats.current_events as u64,
167        events_dropped: buffer_stats.events_dropped as u64,
168        buffer_utilization: buffer_stats.utilization,
169    }
170}
171
172/// Check if async memory tracking is currently active
173pub fn is_tracking_active() -> bool {
174    crate::async_memory::allocator::is_tracking_enabled()
175}
176
177/// Spawn a tracked async task
178///
179/// This is a convenience function that wraps the provided future in a TrackedFuture.
180/// Use with your preferred async runtime (tokio, async-std, etc.)
181///
182/// Example with tokio:
183/// ```rust,no_run
184/// use memscope_rs::async_memory;
185///
186/// #[tokio::main]
187/// async fn main() {
188///     let handle = async_memory::spawn_tracked(async {
189///         let data = vec![0u8; 1024];
190///         data.len()
191///     });
192///     
193///     let result = handle.await;
194///     println!("Result: {}", result);
195/// }
196/// ```
197pub fn spawn_tracked<F>(future: F) -> TrackedFuture<F>
198where
199    F: Future,
200{
201    create_tracked(future)
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use std::task::{RawWaker, RawWakerVTable, Waker};
208
209    // Helper to create a dummy waker for testing
210    fn create_test_waker() -> Waker {
211        fn noop(_: *const ()) {}
212        fn clone_waker(data: *const ()) -> RawWaker {
213            RawWaker::new(data, &VTABLE)
214        }
215
216        const VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, noop, noop, noop);
217
218        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
219    }
220
221    #[test]
222    fn test_initialization() {
223        let result = initialize();
224        assert!(result.is_ok());
225    }
226
227    #[test]
228    fn test_multiple_initialization() {
229        // Test that multiple initializations are safe
230        assert!(initialize().is_ok());
231        assert!(initialize().is_ok());
232        assert!(initialize().is_ok());
233    }
234
235    #[test]
236    fn test_memory_snapshot_good_quality() {
237        let snapshot = AsyncMemorySnapshot {
238            active_task_count: 1,
239            total_allocated_bytes: 1024,
240            allocation_events: 100,
241            events_dropped: 0,
242            buffer_utilization: 0.5,
243        };
244
245        assert!(snapshot.buffer_utilization >= 0.0);
246        assert!(snapshot.buffer_utilization <= 1.0);
247        assert_eq!(snapshot.active_task_count(), 1);
248        assert_eq!(snapshot.total_allocated(), 1024);
249        assert!(snapshot.has_good_data_quality());
250        assert!(snapshot.data_quality_warning().is_none());
251    }
252
253    #[test]
254    fn test_memory_snapshot_poor_quality() {
255        let snapshot = AsyncMemorySnapshot {
256            active_task_count: 2,
257            total_allocated_bytes: 2048,
258            allocation_events: 100,
259            events_dropped: 10, // 10% drop rate
260            buffer_utilization: 0.9,
261        };
262
263        assert!(!snapshot.has_good_data_quality());
264        let warning = snapshot.data_quality_warning();
265        assert!(warning.is_some());
266        let warning_msg = warning.unwrap();
267        assert!(warning_msg.contains("10.0%"));
268        assert!(warning_msg.contains("Poor data quality"));
269    }
270
271    #[test]
272    fn test_memory_snapshot_edge_cases() {
273        // Test with zero events
274        let snapshot = AsyncMemorySnapshot {
275            active_task_count: 0,
276            total_allocated_bytes: 0,
277            allocation_events: 0,
278            events_dropped: 0,
279            buffer_utilization: 0.0,
280        };
281        assert!(snapshot.has_good_data_quality());
282        assert!(snapshot.data_quality_warning().is_none());
283
284        // Test with high drop rate but zero events
285        let snapshot = AsyncMemorySnapshot {
286            active_task_count: 0,
287            total_allocated_bytes: 0,
288            allocation_events: 0,
289            events_dropped: 100, // Should not matter if no events total
290            buffer_utilization: 0.0,
291        };
292        assert!(snapshot.has_good_data_quality());
293    }
294
295    #[test]
296    fn test_memory_snapshot_boundary_conditions() {
297        // Test exactly at 5% drop rate (boundary condition)
298        let snapshot = AsyncMemorySnapshot {
299            active_task_count: 1,
300            total_allocated_bytes: 1000,
301            allocation_events: 100,
302            events_dropped: 5, // Exactly 5%
303            buffer_utilization: 0.5,
304        };
305        assert!(!snapshot.has_good_data_quality()); // 5% is NOT good quality
306        assert!(snapshot.data_quality_warning().is_some());
307
308        // Test just under 5% drop rate
309        let snapshot = AsyncMemorySnapshot {
310            active_task_count: 1,
311            total_allocated_bytes: 1000,
312            allocation_events: 1000,
313            events_dropped: 49, // 4.9%
314            buffer_utilization: 0.5,
315        };
316        assert!(snapshot.has_good_data_quality());
317        assert!(snapshot.data_quality_warning().is_none());
318    }
319
320    #[test]
321    fn test_tracked_future_creation() {
322        let future = async { 42 };
323        let tracked = create_tracked(future);
324
325        // Verify initial state
326        assert!(tracked.task_id.is_none());
327    }
328
329    #[test]
330    fn test_spawn_tracked_alias() {
331        let future = async { "hello" };
332        let tracked = spawn_tracked(future);
333
334        // spawn_tracked should be equivalent to create_tracked
335        assert!(tracked.task_id.is_none());
336    }
337
338    #[test]
339    fn test_tracked_future_poll_ready() {
340        let future = async { 123 };
341        let mut tracked = create_tracked(future);
342        let waker = create_test_waker();
343        let mut cx = Context::from_waker(&waker);
344
345        // Poll should complete immediately for simple future
346        let result = Pin::new(&mut tracked).poll(&mut cx);
347        match result {
348            Poll::Ready(value) => assert_eq!(value, 123),
349            Poll::Pending => {
350                // May be pending in test environment, that's OK
351            }
352        }
353    }
354
355    #[test]
356    fn test_tracked_future_multiple_polls() {
357        use std::sync::{Arc, Mutex};
358
359        let poll_count = Arc::new(Mutex::new(0));
360        let poll_count_clone = poll_count.clone();
361
362        // Create a future that's pending the first time, ready the second
363        let future = async move {
364            let should_wait = {
365                let mut count = poll_count_clone.lock().unwrap();
366                *count += 1;
367                *count == 1
368            };
369            if should_wait {
370                // Simulate pending on first poll
371                std::future::pending::<()>().await;
372            }
373            "completed"
374        };
375
376        let mut tracked = create_tracked(future);
377        let waker = create_test_waker();
378        let mut cx = Context::from_waker(&waker);
379
380        // First poll - may generate task ID
381        let _result1 = Pin::new(&mut tracked).poll(&mut cx);
382
383        // Second poll - should reuse task ID if generated
384        let _result2 = Pin::new(&mut tracked).poll(&mut cx);
385
386        // Task ID should be consistent between polls if generated
387        // (We can't easily test the internal state, but the behavior should be consistent)
388    }
389
390    #[test]
391    fn test_tracked_future_task_context() {
392        use crate::async_memory::task_id::{clear_current_task, get_current_task};
393
394        // Clear any existing context first
395        clear_current_task();
396
397        let future = async {
398            // Try to get current task context during execution
399            let _task_info = get_current_task();
400            true // Just return a simple value
401        };
402
403        let mut tracked = create_tracked(future);
404        let waker = create_test_waker();
405        let mut cx = Context::from_waker(&waker);
406
407        // Poll the future
408        let _result = Pin::new(&mut tracked).poll(&mut cx);
409
410        // Context should be cleared after completion
411        let _current_task = get_current_task();
412        // May or may not have task info depending on implementation
413    }
414
415    #[test]
416    fn test_is_tracking_active() {
417        // Test the tracking status function
418        let _is_active = is_tracking_active();
419        // The function should not panic and should return a boolean
420    }
421
422    #[test]
423    fn test_get_memory_snapshot_integration() {
424        // Test the real get_memory_snapshot function
425        let snapshot = get_memory_snapshot();
426
427        // Verify snapshot has reasonable values
428        assert!(snapshot.buffer_utilization >= 0.0);
429        assert!(snapshot.buffer_utilization <= 1.0);
430        // Other fields may vary based on system state
431    }
432
433    #[test]
434    fn test_async_memory_snapshot_debug() {
435        let snapshot = AsyncMemorySnapshot {
436            active_task_count: 5,
437            total_allocated_bytes: 4096,
438            allocation_events: 200,
439            events_dropped: 1,
440            buffer_utilization: 0.75,
441        };
442
443        let debug_str = format!("{:?}", snapshot);
444        assert!(debug_str.contains("active_task_count: 5"));
445        assert!(debug_str.contains("total_allocated_bytes: 4096"));
446    }
447
448    #[test]
449    fn test_async_memory_snapshot_clone() {
450        let original = AsyncMemorySnapshot {
451            active_task_count: 3,
452            total_allocated_bytes: 1024,
453            allocation_events: 50,
454            events_dropped: 0,
455            buffer_utilization: 0.25,
456        };
457
458        let cloned = original.clone();
459        assert_eq!(original.active_task_count, cloned.active_task_count);
460        assert_eq!(original.total_allocated_bytes, cloned.total_allocated_bytes);
461        assert_eq!(original.allocation_events, cloned.allocation_events);
462        assert_eq!(original.events_dropped, cloned.events_dropped);
463        assert_eq!(original.buffer_utilization, cloned.buffer_utilization);
464    }
465
466    #[test]
467    fn test_data_quality_warning_formatting() {
468        let snapshot = AsyncMemorySnapshot {
469            active_task_count: 1,
470            total_allocated_bytes: 1000,
471            allocation_events: 100,
472            events_dropped: 25, // 25% drop rate
473            buffer_utilization: 0.8,
474        };
475
476        let warning = snapshot.data_quality_warning().unwrap();
477        assert!(warning.contains("25.0%"));
478        assert!(warning.contains("buffer size"));
479    }
480
481    #[test]
482    fn test_tracked_future_error_handling() {
483        // Test behavior when task ID generation might fail
484        let future = async { "test" };
485        let mut tracked = TrackedFuture::new(future);
486
487        // Verify initial state
488        assert!(tracked.task_id.is_none());
489
490        let waker = create_test_waker();
491        let mut cx = Context::from_waker(&waker);
492
493        // Poll should handle ID generation gracefully
494        let _result = Pin::new(&mut tracked).poll(&mut cx);
495
496        // Should not panic even if ID generation fails
497    }
498
499    #[test]
500    fn test_tracked_future_new_constructor() {
501        let future = async { vec![1, 2, 3] };
502        let tracked = TrackedFuture::new(future);
503
504        assert!(tracked.task_id.is_none());
505        // The inner future should be properly boxed and pinned
506    }
507
508    #[test]
509    fn test_memory_snapshot_large_numbers() {
510        let snapshot = AsyncMemorySnapshot {
511            active_task_count: usize::MAX,
512            total_allocated_bytes: u64::MAX,
513            allocation_events: u64::MAX / 2,
514            events_dropped: u64::MAX / 4,
515            buffer_utilization: 1.0,
516        };
517
518        // Should handle large numbers without overflow
519        assert_eq!(snapshot.active_task_count(), usize::MAX);
520        assert_eq!(snapshot.total_allocated(), u64::MAX);
521        assert!(!snapshot.has_good_data_quality()); // Very high drop rate
522        assert!(snapshot.data_quality_warning().is_some());
523    }
524
525    #[test]
526    fn test_tracked_future_with_different_output_types() {
527        // Test with various return types
528        let string_future = create_tracked(async { String::from("test") });
529        let number_future = create_tracked(async { 42u64 });
530        let unit_future = create_tracked(async {});
531        let option_future = create_tracked(async { Some(100) });
532        let result_future = create_tracked(async { Ok::<_, &str>(200) });
533
534        // All should compile and be valid TrackedFuture instances
535        drop(string_future);
536        drop(number_future);
537        drop(unit_future);
538        drop(option_future);
539        drop(result_future);
540    }
541}