Skip to main content

memscope_rs/timeline/
engine.rs

1//! Timeline Engine - Time-based memory analysis
2//!
3//! This module provides the TimelineEngine which is responsible for
4//! time-based analysis and replay of memory events.
5
6use crate::core::{MemScopeError, MemScopeResult};
7use crate::event_store::{MemoryEvent, SharedEventStore};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::RwLock;
10
11pub struct TimelineEngine {
12    event_store: SharedEventStore,
13    cached_events: RwLock<Vec<MemoryEvent>>,
14    cache_version: AtomicU64,
15}
16
17impl TimelineEngine {
18    pub fn new(event_store: SharedEventStore) -> Self {
19        Self {
20            event_store,
21            cached_events: RwLock::new(Vec::new()),
22            cache_version: AtomicU64::new(0),
23        }
24    }
25
26    fn ensure_sorted_cache(&self) -> MemScopeResult<()> {
27        // Double-check locking pattern to avoid TOCTOU race condition
28        let current_count = self.event_store.len();
29
30        // First check: read-only
31        {
32            let cache = self.cached_events.read().map_err(|e| {
33                MemScopeError::system(
34                    crate::core::error::SystemErrorType::Locking,
35                    format!("Failed to acquire cached_events read lock: {}", e),
36                )
37            })?;
38            if cache.len() == current_count {
39                return Ok(());
40            }
41        }
42
43        // Second check: acquire write lock
44        {
45            let mut cache = self.cached_events.write().map_err(|e| {
46                MemScopeError::system(
47                    crate::core::error::SystemErrorType::Locking,
48                    format!("Failed to acquire cached_events write lock: {}", e),
49                )
50            })?;
51
52            // Check again in case another thread already updated the cache
53            if cache.len() == current_count {
54                return Ok(());
55            }
56
57            // Rebuild cache
58            *cache = self.event_store.snapshot();
59            cache.sort_by_key(|e| e.timestamp);
60            self.cache_version.fetch_add(1, Ordering::Relaxed);
61        }
62        Ok(())
63    }
64
65    pub fn get_events_in_range(&self, start: u64, end: u64) -> MemScopeResult<Vec<MemoryEvent>> {
66        self.ensure_sorted_cache()?;
67
68        let cache = self.cached_events.read().map_err(|e| {
69            MemScopeError::system(
70                crate::core::error::SystemErrorType::Locking,
71                format!("Failed to acquire cached_events read lock: {}", e),
72            )
73        })?;
74        if cache.is_empty() {
75            return Ok(Vec::new());
76        }
77
78        let start_idx = cache.partition_point(|e| e.timestamp < start);
79        let end_idx = cache.partition_point(|e| e.timestamp < end);
80
81        if start_idx >= end_idx {
82            return Ok(Vec::new());
83        }
84
85        Ok(cache[start_idx..end_idx].to_vec())
86    }
87
88    pub fn get_events_for_pointer(&self, ptr: usize) -> Vec<MemoryEvent> {
89        self.event_store
90            .snapshot()
91            .into_iter()
92            .filter(|e| e.ptr == ptr)
93            .collect()
94    }
95
96    pub fn get_events_for_thread(&self, thread_id: u64) -> Vec<MemoryEvent> {
97        self.event_store
98            .snapshot()
99            .into_iter()
100            .filter(|e| e.thread_id == thread_id)
101            .collect()
102    }
103
104    pub fn get_events_for_scope(&self, scope_name: &str) -> Vec<MemoryEvent> {
105        self.event_store
106            .snapshot()
107            .into_iter()
108            .filter(|e| {
109                e.var_name
110                    .as_ref()
111                    .map(|name| name == scope_name)
112                    .unwrap_or(false)
113            })
114            .collect()
115    }
116
117    pub fn replay_up_to(&self, timestamp: u64) -> MemScopeResult<Vec<MemoryEvent>> {
118        self.get_events_in_range(0, timestamp)
119    }
120
121    pub fn invalidate_cache(&self) {
122        self.cache_version.store(0, Ordering::Relaxed);
123    }
124
125    pub fn get_event_count(&self) -> usize {
126        self.event_store.len()
127    }
128
129    pub fn get_time_range(&self) -> MemScopeResult<Option<(u64, u64)>> {
130        self.ensure_sorted_cache()?;
131
132        let cache = self.cached_events.read().map_err(|e| {
133            MemScopeError::system(
134                crate::core::error::SystemErrorType::Locking,
135                format!("Failed to acquire cached_events read lock: {}", e),
136            )
137        })?;
138        if cache.is_empty() {
139            return Ok(None);
140        }
141
142        Ok(Some((
143            cache.first().unwrap().timestamp,
144            cache.last().unwrap().timestamp,
145        )))
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::event_store::EventStore;
153    use std::sync::Arc;
154
155    #[test]
156    fn test_timeline_engine_creation() {
157        let event_store = Arc::new(EventStore::new());
158        let engine = TimelineEngine::new(event_store);
159        let events = engine.get_events_in_range(0, u64::MAX);
160        assert!(events.unwrap().is_empty());
161    }
162
163    #[test]
164    fn test_get_events_for_pointer() {
165        let event_store = Arc::new(EventStore::new());
166        event_store.record(MemoryEvent::allocate(0x1000, 1024, 1));
167        event_store.record(MemoryEvent::deallocate(0x1000, 1024, 1));
168
169        let engine = TimelineEngine::new(event_store);
170        let events = engine.get_events_for_pointer(0x1000);
171
172        assert_eq!(events.len(), 2);
173    }
174
175    #[test]
176    fn test_get_events_for_thread() {
177        let event_store = Arc::new(EventStore::new());
178        event_store.record(MemoryEvent::allocate(0x1000, 1024, 1));
179        event_store.record(MemoryEvent::allocate(0x2000, 2048, 2));
180
181        let engine = TimelineEngine::new(event_store);
182        let events = engine.get_events_for_thread(1);
183
184        assert_eq!(events.len(), 1);
185    }
186
187    #[test]
188    fn test_replay_up_to() {
189        let event_store = Arc::new(EventStore::new());
190        event_store.record(MemoryEvent::allocate(0x1000, 1024, 1));
191        event_store.record(MemoryEvent::allocate(0x2000, 2048, 1));
192
193        let engine = TimelineEngine::new(event_store);
194        let events = engine.replay_up_to(u64::MAX).unwrap();
195
196        assert_eq!(events.len(), 2);
197    }
198
199    #[test]
200    fn test_time_range_query_efficiency() {
201        let event_store = Arc::new(EventStore::new());
202
203        for i in 0..1000 {
204            let mut event = MemoryEvent::allocate(0x1000 + i, 1024, 1);
205            event.timestamp = i as u64 * 1000;
206            event_store.record(event);
207        }
208
209        let engine = TimelineEngine::new(event_store);
210
211        let events = engine.get_events_in_range(100000, 200000).unwrap();
212        assert_eq!(events.len(), 100);
213
214        assert!(events.first().unwrap().timestamp >= 100000);
215        assert!(events.last().unwrap().timestamp < 200000);
216    }
217
218    #[test]
219    fn test_get_time_range() {
220        let event_store = Arc::new(EventStore::new());
221
222        let mut e1 = MemoryEvent::allocate(0x1000, 1024, 1);
223        e1.timestamp = 100;
224        let mut e2 = MemoryEvent::allocate(0x2000, 1024, 1);
225        e2.timestamp = 500;
226
227        event_store.record(e1);
228        event_store.record(e2);
229
230        let engine = TimelineEngine::new(event_store);
231        let range = engine.get_time_range().unwrap();
232
233        assert_eq!(range, Some((100, 500)));
234    }
235}