memscope_rs/timeline/
engine.rs1use crate::core::{MemScopeError, MemScopeResult};
7use crate::event_store::{MemoryEvent, SharedEventStore};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::RwLock;
10
11pub struct TimelineEngine {
12 event_store: SharedEventStore,
13 cached_events: RwLock<Vec<MemoryEvent>>,
14 cache_version: AtomicU64,
15}
16
17impl TimelineEngine {
18 pub fn new(event_store: SharedEventStore) -> Self {
19 Self {
20 event_store,
21 cached_events: RwLock::new(Vec::new()),
22 cache_version: AtomicU64::new(0),
23 }
24 }
25
26 fn ensure_sorted_cache(&self) -> MemScopeResult<()> {
27 let current_count = self.event_store.len();
29
30 {
32 let cache = self.cached_events.read().map_err(|e| {
33 MemScopeError::system(
34 crate::core::error::SystemErrorType::Locking,
35 format!("Failed to acquire cached_events read lock: {}", e),
36 )
37 })?;
38 if cache.len() == current_count {
39 return Ok(());
40 }
41 }
42
43 {
45 let mut cache = self.cached_events.write().map_err(|e| {
46 MemScopeError::system(
47 crate::core::error::SystemErrorType::Locking,
48 format!("Failed to acquire cached_events write lock: {}", e),
49 )
50 })?;
51
52 if cache.len() == current_count {
54 return Ok(());
55 }
56
57 *cache = self.event_store.snapshot();
59 cache.sort_by_key(|e| e.timestamp);
60 self.cache_version.fetch_add(1, Ordering::Relaxed);
61 }
62 Ok(())
63 }
64
65 pub fn get_events_in_range(&self, start: u64, end: u64) -> MemScopeResult<Vec<MemoryEvent>> {
66 self.ensure_sorted_cache()?;
67
68 let cache = self.cached_events.read().map_err(|e| {
69 MemScopeError::system(
70 crate::core::error::SystemErrorType::Locking,
71 format!("Failed to acquire cached_events read lock: {}", e),
72 )
73 })?;
74 if cache.is_empty() {
75 return Ok(Vec::new());
76 }
77
78 let start_idx = cache.partition_point(|e| e.timestamp < start);
79 let end_idx = cache.partition_point(|e| e.timestamp < end);
80
81 if start_idx >= end_idx {
82 return Ok(Vec::new());
83 }
84
85 Ok(cache[start_idx..end_idx].to_vec())
86 }
87
88 pub fn get_events_for_pointer(&self, ptr: usize) -> Vec<MemoryEvent> {
89 self.event_store
90 .snapshot()
91 .into_iter()
92 .filter(|e| e.ptr == ptr)
93 .collect()
94 }
95
96 pub fn get_events_for_thread(&self, thread_id: u64) -> Vec<MemoryEvent> {
97 self.event_store
98 .snapshot()
99 .into_iter()
100 .filter(|e| e.thread_id == thread_id)
101 .collect()
102 }
103
104 pub fn get_events_for_scope(&self, scope_name: &str) -> Vec<MemoryEvent> {
105 self.event_store
106 .snapshot()
107 .into_iter()
108 .filter(|e| {
109 e.var_name
110 .as_ref()
111 .map(|name| name == scope_name)
112 .unwrap_or(false)
113 })
114 .collect()
115 }
116
117 pub fn replay_up_to(&self, timestamp: u64) -> MemScopeResult<Vec<MemoryEvent>> {
118 self.get_events_in_range(0, timestamp)
119 }
120
121 pub fn invalidate_cache(&self) {
122 self.cache_version.store(0, Ordering::Relaxed);
123 }
124
125 pub fn get_event_count(&self) -> usize {
126 self.event_store.len()
127 }
128
129 pub fn get_time_range(&self) -> MemScopeResult<Option<(u64, u64)>> {
130 self.ensure_sorted_cache()?;
131
132 let cache = self.cached_events.read().map_err(|e| {
133 MemScopeError::system(
134 crate::core::error::SystemErrorType::Locking,
135 format!("Failed to acquire cached_events read lock: {}", e),
136 )
137 })?;
138 if cache.is_empty() {
139 return Ok(None);
140 }
141
142 Ok(Some((
143 cache.first().unwrap().timestamp,
144 cache.last().unwrap().timestamp,
145 )))
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::event_store::EventStore;
153 use std::sync::Arc;
154
155 #[test]
156 fn test_timeline_engine_creation() {
157 let event_store = Arc::new(EventStore::new());
158 let engine = TimelineEngine::new(event_store);
159 let events = engine.get_events_in_range(0, u64::MAX);
160 assert!(events.unwrap().is_empty());
161 }
162
163 #[test]
164 fn test_get_events_for_pointer() {
165 let event_store = Arc::new(EventStore::new());
166 event_store.record(MemoryEvent::allocate(0x1000, 1024, 1));
167 event_store.record(MemoryEvent::deallocate(0x1000, 1024, 1));
168
169 let engine = TimelineEngine::new(event_store);
170 let events = engine.get_events_for_pointer(0x1000);
171
172 assert_eq!(events.len(), 2);
173 }
174
175 #[test]
176 fn test_get_events_for_thread() {
177 let event_store = Arc::new(EventStore::new());
178 event_store.record(MemoryEvent::allocate(0x1000, 1024, 1));
179 event_store.record(MemoryEvent::allocate(0x2000, 2048, 2));
180
181 let engine = TimelineEngine::new(event_store);
182 let events = engine.get_events_for_thread(1);
183
184 assert_eq!(events.len(), 1);
185 }
186
187 #[test]
188 fn test_replay_up_to() {
189 let event_store = Arc::new(EventStore::new());
190 event_store.record(MemoryEvent::allocate(0x1000, 1024, 1));
191 event_store.record(MemoryEvent::allocate(0x2000, 2048, 1));
192
193 let engine = TimelineEngine::new(event_store);
194 let events = engine.replay_up_to(u64::MAX).unwrap();
195
196 assert_eq!(events.len(), 2);
197 }
198
199 #[test]
200 fn test_time_range_query_efficiency() {
201 let event_store = Arc::new(EventStore::new());
202
203 for i in 0..1000 {
204 let mut event = MemoryEvent::allocate(0x1000 + i, 1024, 1);
205 event.timestamp = i as u64 * 1000;
206 event_store.record(event);
207 }
208
209 let engine = TimelineEngine::new(event_store);
210
211 let events = engine.get_events_in_range(100000, 200000).unwrap();
212 assert_eq!(events.len(), 100);
213
214 assert!(events.first().unwrap().timestamp >= 100000);
215 assert!(events.last().unwrap().timestamp < 200000);
216 }
217
218 #[test]
219 fn test_get_time_range() {
220 let event_store = Arc::new(EventStore::new());
221
222 let mut e1 = MemoryEvent::allocate(0x1000, 1024, 1);
223 e1.timestamp = 100;
224 let mut e2 = MemoryEvent::allocate(0x2000, 1024, 1);
225 e2.timestamp = 500;
226
227 event_store.record(e1);
228 event_store.record(e2);
229
230 let engine = TimelineEngine::new(event_store);
231 let range = engine.get_time_range().unwrap();
232
233 assert_eq!(range, Some((100, 500)));
234 }
235}