Skip to main content

memscope_rs/event_store/
store.rs

1//! Event Store Engine - Centralized event storage
2//!
3//! This module provides the EventStore which is responsible for storing
4//! all memory events across all tracking backends. It uses a lock-free
5//! SegQueue for high-concurrency recording with parking_lot RwLock for snapshots.
6
7use crate::event_store::event::MemoryEvent;
8use crossbeam::queue::SegQueue;
9use parking_lot::RwLock;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12
13/// Event Store - Centralized storage for memory events
14///
15/// The EventStore is the single source of truth for all memory events
16/// in the system. It uses a lock-free SegQueue for recording operations
17/// and a RwLock-protected Vec for efficient snapshots.
18///
19/// Key properties:
20/// - Lock-free recording: Uses SegQueue for O(1) append without blocking
21/// - Thread-safe: All operations are safe for concurrent use
22/// - Efficient snapshots: Uses RwLock for fast read access
23/// - Clear-safe: Uses atomic flag to prevent event loss during clear operations
24#[derive(Debug)]
25pub struct EventStore {
26    /// Lock-free queue for high-concurrency recording
27    queue: SegQueue<MemoryEvent>,
28    /// Cached events for fast snapshot access
29    cache: RwLock<Vec<MemoryEvent>>,
30    /// Approximate count of events (may be slightly stale)
31    count: AtomicUsize,
32    /// Flag to indicate clear operation is in progress
33    clearing: AtomicUsize,
34}
35
36impl EventStore {
37    /// Create a new EventStore
38    pub fn new() -> Self {
39        Self {
40            queue: SegQueue::new(),
41            cache: RwLock::new(Vec::new()),
42            count: AtomicUsize::new(0),
43            clearing: AtomicUsize::new(0),
44        }
45    }
46
47    /// Record a memory event
48    ///
49    /// This method is lock-free and can be called from any thread
50    /// without blocking other recording operations.
51    ///
52    /// # Arguments
53    /// * `event` - The memory event to record
54    ///
55    /// # Note
56    /// If a clear operation is in progress, this method will skip recording
57    /// the event to prevent data loss. This ensures consistency between
58    /// the event queue and the count.
59    pub fn record(&self, event: MemoryEvent) {
60        // Check if clear operation is in progress
61        // If clearing flag is set (non-zero), skip recording to prevent event loss
62        if self.clearing.load(Ordering::Acquire) != 0 {
63            tracing::trace!("Skipping event recording due to clear operation in progress");
64            return;
65        }
66
67        self.queue.push(event);
68        // Use Release ordering to ensure the push is visible before the count increment
69        self.count.fetch_add(1, Ordering::Release);
70    }
71
72    /// Flush pending events from queue to cache
73    fn flush_to_cache(&self) {
74        let mut cache = self.cache.write();
75        while let Some(event) = self.queue.pop() {
76            cache.push(event);
77        }
78    }
79
80    /// Get all events as a snapshot
81    ///
82    /// Returns a snapshot of all events currently in the store.
83    /// This method flushes any pending events from the lock-free queue
84    /// to the cache before returning.
85    ///
86    /// # Returns
87    /// A vector containing all events in the store
88    pub fn snapshot(&self) -> Vec<MemoryEvent> {
89        self.flush_to_cache();
90        self.cache.read().clone()
91    }
92
93    /// Get the number of events in the store
94    ///
95    /// Note: This returns an approximate count that may be slightly
96    /// higher than the actual count due to concurrent operations.
97    pub fn len(&self) -> usize {
98        self.count.load(Ordering::Relaxed)
99    }
100
101    /// Check if the store is empty
102    pub fn is_empty(&self) -> bool {
103        self.len() == 0
104    }
105
106    /// Clear all events from the store
107    ///
108    /// This method removes all events from both the queue and cache.
109    /// Uses write lock to ensure atomicity with concurrent record operations.
110    /// Sets a clearing flag to prevent record() operations during clear.
111    pub fn clear(&self) {
112        // Set clearing flag to prevent concurrent record operations
113        self.clearing.store(1, Ordering::Release);
114
115        // Acquire write lock first to prevent concurrent modifications
116        let mut cache = self.cache.write();
117
118        // Clear the queue
119        while self.queue.pop().is_some() {}
120
121        // Clear the cache
122        cache.clear();
123
124        // Reset count last, while still holding the write lock
125        self.count.store(0, Ordering::Release);
126
127        // Clear the clearing flag to allow record operations again
128        self.clearing.store(0, Ordering::Release);
129    }
130}
131
132impl Default for EventStore {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138/// Shared reference to EventStore
139pub type SharedEventStore = Arc<EventStore>;
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    #[test]
146    fn test_event_store_creation() {
147        let store = EventStore::new();
148        assert!(store.is_empty());
149        assert_eq!(store.len(), 0);
150    }
151
152    #[test]
153    fn test_record_event() {
154        let store = EventStore::new();
155        let event = MemoryEvent::allocate(0x1000, 1024, 1);
156        store.record(event);
157        assert_eq!(store.len(), 1);
158    }
159
160    #[test]
161    fn test_snapshot() {
162        let store = EventStore::new();
163        let event1 = MemoryEvent::allocate(0x1000, 1024, 1);
164        let event2 = MemoryEvent::deallocate(0x1000, 1024, 1);
165        store.record(event1.clone());
166        store.record(event2.clone());
167
168        let snapshot = store.snapshot();
169        assert_eq!(snapshot.len(), 2);
170        // Verify events are still in store after snapshot
171        assert_eq!(store.len(), 2);
172    }
173
174    #[test]
175    fn test_clear() {
176        let store = EventStore::new();
177        let event = MemoryEvent::allocate(0x1000, 1024, 1);
178        store.record(event);
179        assert_eq!(store.len(), 1);
180
181        store.clear();
182        assert!(store.is_empty());
183    }
184
185    #[test]
186    fn test_concurrent_access() {
187        use std::thread;
188        let store = Arc::new(EventStore::new());
189        let mut handles = vec![];
190
191        for i in 0..10 {
192            let store_clone = Arc::clone(&store);
193            let handle = thread::spawn(move || {
194                for j in 0..100 {
195                    let event = MemoryEvent::allocate(i * 1000 + j, 1024, i as u64);
196                    store_clone.record(event);
197                }
198            });
199            handles.push(handle);
200        }
201
202        for handle in handles {
203            handle.join().unwrap();
204        }
205
206        assert_eq!(store.len(), 1000);
207        let snapshot = store.snapshot();
208        assert_eq!(snapshot.len(), 1000);
209    }
210
211    #[test]
212    fn test_event_store_default() {
213        let store = EventStore::default();
214        assert!(store.is_empty());
215    }
216
217    #[test]
218    fn test_event_store_debug() {
219        let store = EventStore::new();
220        let debug_str = format!("{:?}", store);
221        assert!(debug_str.contains("EventStore"));
222    }
223
224    #[test]
225    fn test_multiple_record_snapshot() {
226        let store = EventStore::new();
227        for i in 0..100 {
228            let event = MemoryEvent::allocate(0x1000 + i, 1024, 1);
229            store.record(event);
230        }
231
232        let snapshot = store.snapshot();
233        assert_eq!(snapshot.len(), 100);
234    }
235
236    #[test]
237    fn test_clear_and_record() {
238        let store = EventStore::new();
239        store.record(MemoryEvent::allocate(0x1000, 1024, 1));
240        store.clear();
241        assert!(store.is_empty());
242
243        store.record(MemoryEvent::allocate(0x2000, 2048, 1));
244        assert_eq!(store.len(), 1);
245    }
246
247    #[test]
248    fn test_event_types() {
249        let store = EventStore::new();
250        store.record(MemoryEvent::allocate(0x1000, 1024, 1));
251        store.record(MemoryEvent::deallocate(0x1000, 1024, 1));
252        store.record(MemoryEvent::reallocate(0x1000, 1024, 2048, 1));
253
254        let snapshot = store.snapshot();
255        assert_eq!(snapshot.len(), 3);
256    }
257
258    #[test]
259    fn test_snapshot_consistency() {
260        let store = EventStore::new();
261        store.record(MemoryEvent::allocate(0x1000, 1024, 1));
262        store.record(MemoryEvent::allocate(0x2000, 2048, 1));
263
264        let snapshot1 = store.snapshot();
265        let snapshot2 = store.snapshot();
266
267        assert_eq!(snapshot1.len(), snapshot2.len());
268    }
269
270    #[test]
271    fn test_empty_snapshot() {
272        let store = EventStore::new();
273        let snapshot = store.snapshot();
274        assert!(snapshot.is_empty());
275    }
276
277    #[test]
278    fn test_large_number_of_events() {
279        let store = EventStore::new();
280        for i in 0..10000 {
281            store.record(MemoryEvent::allocate(i, 1024, 1));
282        }
283        assert_eq!(store.len(), 10000);
284    }
285}