memscope_rs/event_store/
store.rs1use crate::event_store::event::MemoryEvent;
8use crossbeam::queue::SegQueue;
9use parking_lot::RwLock;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12
13#[derive(Debug)]
25pub struct EventStore {
26 queue: SegQueue<MemoryEvent>,
28 cache: RwLock<Vec<MemoryEvent>>,
30 count: AtomicUsize,
32 clearing: AtomicUsize,
34}
35
36impl EventStore {
37 pub fn new() -> Self {
39 Self {
40 queue: SegQueue::new(),
41 cache: RwLock::new(Vec::new()),
42 count: AtomicUsize::new(0),
43 clearing: AtomicUsize::new(0),
44 }
45 }
46
47 pub fn record(&self, event: MemoryEvent) {
60 if self.clearing.load(Ordering::Acquire) != 0 {
63 tracing::trace!("Skipping event recording due to clear operation in progress");
64 return;
65 }
66
67 self.queue.push(event);
68 self.count.fetch_add(1, Ordering::Release);
70 }
71
72 fn flush_to_cache(&self) {
74 let mut cache = self.cache.write();
75 while let Some(event) = self.queue.pop() {
76 cache.push(event);
77 }
78 }
79
80 pub fn snapshot(&self) -> Vec<MemoryEvent> {
89 self.flush_to_cache();
90 self.cache.read().clone()
91 }
92
93 pub fn len(&self) -> usize {
98 self.count.load(Ordering::Relaxed)
99 }
100
101 pub fn is_empty(&self) -> bool {
103 self.len() == 0
104 }
105
106 pub fn clear(&self) {
112 self.clearing.store(1, Ordering::Release);
114
115 let mut cache = self.cache.write();
117
118 while self.queue.pop().is_some() {}
120
121 cache.clear();
123
124 self.count.store(0, Ordering::Release);
126
127 self.clearing.store(0, Ordering::Release);
129 }
130}
131
132impl Default for EventStore {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138pub type SharedEventStore = Arc<EventStore>;
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 #[test]
146 fn test_event_store_creation() {
147 let store = EventStore::new();
148 assert!(store.is_empty());
149 assert_eq!(store.len(), 0);
150 }
151
152 #[test]
153 fn test_record_event() {
154 let store = EventStore::new();
155 let event = MemoryEvent::allocate(0x1000, 1024, 1);
156 store.record(event);
157 assert_eq!(store.len(), 1);
158 }
159
160 #[test]
161 fn test_snapshot() {
162 let store = EventStore::new();
163 let event1 = MemoryEvent::allocate(0x1000, 1024, 1);
164 let event2 = MemoryEvent::deallocate(0x1000, 1024, 1);
165 store.record(event1.clone());
166 store.record(event2.clone());
167
168 let snapshot = store.snapshot();
169 assert_eq!(snapshot.len(), 2);
170 assert_eq!(store.len(), 2);
172 }
173
174 #[test]
175 fn test_clear() {
176 let store = EventStore::new();
177 let event = MemoryEvent::allocate(0x1000, 1024, 1);
178 store.record(event);
179 assert_eq!(store.len(), 1);
180
181 store.clear();
182 assert!(store.is_empty());
183 }
184
185 #[test]
186 fn test_concurrent_access() {
187 use std::thread;
188 let store = Arc::new(EventStore::new());
189 let mut handles = vec![];
190
191 for i in 0..10 {
192 let store_clone = Arc::clone(&store);
193 let handle = thread::spawn(move || {
194 for j in 0..100 {
195 let event = MemoryEvent::allocate(i * 1000 + j, 1024, i as u64);
196 store_clone.record(event);
197 }
198 });
199 handles.push(handle);
200 }
201
202 for handle in handles {
203 handle.join().unwrap();
204 }
205
206 assert_eq!(store.len(), 1000);
207 let snapshot = store.snapshot();
208 assert_eq!(snapshot.len(), 1000);
209 }
210
211 #[test]
212 fn test_event_store_default() {
213 let store = EventStore::default();
214 assert!(store.is_empty());
215 }
216
217 #[test]
218 fn test_event_store_debug() {
219 let store = EventStore::new();
220 let debug_str = format!("{:?}", store);
221 assert!(debug_str.contains("EventStore"));
222 }
223
224 #[test]
225 fn test_multiple_record_snapshot() {
226 let store = EventStore::new();
227 for i in 0..100 {
228 let event = MemoryEvent::allocate(0x1000 + i, 1024, 1);
229 store.record(event);
230 }
231
232 let snapshot = store.snapshot();
233 assert_eq!(snapshot.len(), 100);
234 }
235
236 #[test]
237 fn test_clear_and_record() {
238 let store = EventStore::new();
239 store.record(MemoryEvent::allocate(0x1000, 1024, 1));
240 store.clear();
241 assert!(store.is_empty());
242
243 store.record(MemoryEvent::allocate(0x2000, 2048, 1));
244 assert_eq!(store.len(), 1);
245 }
246
247 #[test]
248 fn test_event_types() {
249 let store = EventStore::new();
250 store.record(MemoryEvent::allocate(0x1000, 1024, 1));
251 store.record(MemoryEvent::deallocate(0x1000, 1024, 1));
252 store.record(MemoryEvent::reallocate(0x1000, 1024, 2048, 1));
253
254 let snapshot = store.snapshot();
255 assert_eq!(snapshot.len(), 3);
256 }
257
258 #[test]
259 fn test_snapshot_consistency() {
260 let store = EventStore::new();
261 store.record(MemoryEvent::allocate(0x1000, 1024, 1));
262 store.record(MemoryEvent::allocate(0x2000, 2048, 1));
263
264 let snapshot1 = store.snapshot();
265 let snapshot2 = store.snapshot();
266
267 assert_eq!(snapshot1.len(), snapshot2.len());
268 }
269
270 #[test]
271 fn test_empty_snapshot() {
272 let store = EventStore::new();
273 let snapshot = store.snapshot();
274 assert!(snapshot.is_empty());
275 }
276
277 #[test]
278 fn test_large_number_of_events() {
279 let store = EventStore::new();
280 for i in 0..10000 {
281 store.record(MemoryEvent::allocate(i, 1024, 1));
282 }
283 assert_eq!(store.len(), 10000);
284 }
285}