memscope_rs/event_store/store.rs
1//! Event Store Engine - Centralized event storage
2//!
3//! This module provides the EventStore which is responsible for storing
4//! all memory events across all tracking backends. It uses a lock-free
5//! SegQueue for high-concurrency recording with parking_lot RwLock for snapshots.
6
7use crate::event_store::event::MemoryEvent;
8use crossbeam::queue::SegQueue;
9use parking_lot::RwLock;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12
13/// Event Store - Centralized storage for memory events
14///
15/// The EventStore is the single source of truth for all memory events
16/// in the system. It uses a lock-free SegQueue for recording operations
17/// and a RwLock-protected Vec for efficient snapshots.
18///
19/// Key properties:
20/// - Lock-free recording: Uses SegQueue for O(1) append without blocking
21/// - Thread-safe: All operations are safe for concurrent use
22/// - Efficient snapshots: Uses RwLock for fast read access
23/// - Clear-safe: Uses atomic flag to prevent event loss during clear operations
24#[derive(Debug)]
25pub struct EventStore {
26 /// Lock-free queue for high-concurrency recording
27 queue: SegQueue<MemoryEvent>,
28 /// Cached events for fast snapshot access
29 cache: RwLock<Vec<MemoryEvent>>,
30 /// Approximate count of events (may be slightly stale)
31 count: AtomicUsize,
32 /// Flag to indicate clear operation is in progress
33 clearing: AtomicUsize,
34}
35
36impl EventStore {
37 /// Create a new EventStore
38 pub fn new() -> Self {
39 Self {
40 queue: SegQueue::new(),
41 cache: RwLock::new(Vec::new()),
42 count: AtomicUsize::new(0),
43 clearing: AtomicUsize::new(0),
44 }
45 }
46
47 /// Record a memory event
48 ///
49 /// This method is lock-free and can be called from any thread
50 /// without blocking other recording operations.
51 ///
52 /// # Arguments
53 /// * `event` - The memory event to record
54 ///
55 /// # Note
56 /// If a clear operation is in progress, this method will skip recording
57 /// the event to prevent data loss. This ensures consistency between
58 /// the event queue and the count.
59 pub fn record(&self, event: MemoryEvent) {
60 // Check if clear operation is in progress
61 // If clearing flag is set (non-zero), skip recording to prevent event loss
62 if self.clearing.load(Ordering::Acquire) != 0 {
63 tracing::trace!("Skipping event recording due to clear operation in progress");
64 return;
65 }
66
67 self.queue.push(event);
68 // Use Release ordering to ensure the push is visible before the count increment
69 self.count.fetch_add(1, Ordering::Release);
70 }
71
72 /// Flush pending events from queue to cache
73 fn flush_to_cache(&self) {
74 let mut cache = self.cache.write();
75 while let Some(event) = self.queue.pop() {
76 cache.push(event);
77 }
78 }
79
80 /// Get all events as a snapshot
81 ///
82 /// Returns a snapshot of all events currently in the store.
83 /// This method flushes any pending events from the lock-free queue
84 /// to the cache before returning.
85 ///
86 /// # Returns
87 /// A vector containing all events in the store
88 pub fn snapshot(&self) -> Vec<MemoryEvent> {
89 self.flush_to_cache();
90 self.cache.read().clone()
91 }
92
93 /// Get the number of events in the store
94 ///
95 /// Note: This returns an approximate count that may be slightly
96 /// higher than the actual count due to concurrent operations.
97 pub fn len(&self) -> usize {
98 self.count.load(Ordering::Relaxed)
99 }
100
101 /// Check if the store is empty
102 pub fn is_empty(&self) -> bool {
103 self.len() == 0
104 }
105
106 /// Clear all events from the store
107 ///
108 /// This method removes all events from both the queue and cache.
109 /// Uses write lock to ensure atomicity with concurrent record operations.
110 /// Sets a clearing flag to prevent record() operations during clear.
111 pub fn clear(&self) {
112 // Set clearing flag to prevent concurrent record operations
113 self.clearing.store(1, Ordering::Release);
114
115 // Acquire write lock first to prevent concurrent modifications
116 let mut cache = self.cache.write();
117
118 // Clear the queue
119 while self.queue.pop().is_some() {}
120
121 // Clear the cache
122 cache.clear();
123
124 // Reset count last, while still holding the write lock
125 self.count.store(0, Ordering::Release);
126
127 // Clear the clearing flag to allow record operations again
128 self.clearing.store(0, Ordering::Release);
129 }
130}
131
132impl Default for EventStore {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138/// Shared reference to EventStore
139pub type SharedEventStore = Arc<EventStore>;
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 #[test]
146 fn test_event_store_creation() {
147 let store = EventStore::new();
148 assert!(store.is_empty());
149 assert_eq!(store.len(), 0);
150 }
151
152 #[test]
153 fn test_record_event() {
154 let store = EventStore::new();
155 let event = MemoryEvent::allocate(0x1000, 1024, 1);
156 store.record(event);
157 assert_eq!(store.len(), 1);
158 }
159
160 #[test]
161 fn test_snapshot() {
162 let store = EventStore::new();
163 let event1 = MemoryEvent::allocate(0x1000, 1024, 1);
164 let event2 = MemoryEvent::deallocate(0x1000, 1024, 1);
165 store.record(event1.clone());
166 store.record(event2.clone());
167
168 let snapshot = store.snapshot();
169 assert_eq!(snapshot.len(), 2);
170 // Verify events are still in store after snapshot
171 assert_eq!(store.len(), 2);
172 }
173
174 #[test]
175 fn test_clear() {
176 let store = EventStore::new();
177 let event = MemoryEvent::allocate(0x1000, 1024, 1);
178 store.record(event);
179 assert_eq!(store.len(), 1);
180
181 store.clear();
182 assert!(store.is_empty());
183 }
184
185 #[test]
186 fn test_concurrent_access() {
187 use std::thread;
188 let store = Arc::new(EventStore::new());
189 let mut handles = vec![];
190
191 for i in 0..10 {
192 let store_clone = Arc::clone(&store);
193 let handle = thread::spawn(move || {
194 for j in 0..100 {
195 let event = MemoryEvent::allocate(i * 1000 + j, 1024, i as u64);
196 store_clone.record(event);
197 }
198 });
199 handles.push(handle);
200 }
201
202 for handle in handles {
203 handle.join().unwrap();
204 }
205
206 assert_eq!(store.len(), 1000);
207 let snapshot = store.snapshot();
208 assert_eq!(snapshot.len(), 1000);
209 }
210}