use crate::event_store::event::MemoryEvent;
use crossbeam::queue::SegQueue;
use parking_lot::RwLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub struct EventStore {
queue: SegQueue<MemoryEvent>,
cache: RwLock<Vec<MemoryEvent>>,
count: AtomicUsize,
clearing: AtomicUsize,
}
impl EventStore {
pub fn new() -> Self {
Self {
queue: SegQueue::new(),
cache: RwLock::new(Vec::new()),
count: AtomicUsize::new(0),
clearing: AtomicUsize::new(0),
}
}
pub fn record(&self, event: MemoryEvent) {
if self.clearing.load(Ordering::Acquire) != 0 {
tracing::trace!("Skipping event recording due to clear operation in progress");
return;
}
self.queue.push(event);
self.count.fetch_add(1, Ordering::Release);
}
fn flush_to_cache(&self) {
let mut cache = self.cache.write();
while let Some(event) = self.queue.pop() {
cache.push(event);
}
}
pub fn snapshot(&self) -> Vec<MemoryEvent> {
self.flush_to_cache();
self.cache.read().clone()
}
pub fn len(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&self) {
self.clearing.store(1, Ordering::Release);
let mut cache = self.cache.write();
while self.queue.pop().is_some() {}
cache.clear();
self.count.store(0, Ordering::Release);
self.clearing.store(0, Ordering::Release);
}
}
impl Default for EventStore {
fn default() -> Self {
Self::new()
}
}
pub type SharedEventStore = Arc<EventStore>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_store_creation() {
let store = EventStore::new();
assert!(store.is_empty());
assert_eq!(store.len(), 0);
}
#[test]
fn test_record_event() {
let store = EventStore::new();
let event = MemoryEvent::allocate(0x1000, 1024, 1);
store.record(event);
assert_eq!(store.len(), 1);
}
#[test]
fn test_snapshot() {
let store = EventStore::new();
let event1 = MemoryEvent::allocate(0x1000, 1024, 1);
let event2 = MemoryEvent::deallocate(0x1000, 1024, 1);
store.record(event1.clone());
store.record(event2.clone());
let snapshot = store.snapshot();
assert_eq!(snapshot.len(), 2);
assert_eq!(store.len(), 2);
}
#[test]
fn test_clear() {
let store = EventStore::new();
let event = MemoryEvent::allocate(0x1000, 1024, 1);
store.record(event);
assert_eq!(store.len(), 1);
store.clear();
assert!(store.is_empty());
}
#[test]
fn test_concurrent_access() {
use std::thread;
let store = Arc::new(EventStore::new());
let mut handles = vec![];
for i in 0..10 {
let store_clone = Arc::clone(&store);
let handle = thread::spawn(move || {
for j in 0..100 {
let event = MemoryEvent::allocate(i * 1000 + j, 1024, i as u64);
store_clone.record(event);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(store.len(), 1000);
let snapshot = store.snapshot();
assert_eq!(snapshot.len(), 1000);
}
#[test]
fn test_event_store_default() {
let store = EventStore::default();
assert!(store.is_empty());
}
#[test]
fn test_event_store_debug() {
let store = EventStore::new();
let debug_str = format!("{:?}", store);
assert!(debug_str.contains("EventStore"));
}
#[test]
fn test_multiple_record_snapshot() {
let store = EventStore::new();
for i in 0..100 {
let event = MemoryEvent::allocate(0x1000 + i, 1024, 1);
store.record(event);
}
let snapshot = store.snapshot();
assert_eq!(snapshot.len(), 100);
}
#[test]
fn test_clear_and_record() {
let store = EventStore::new();
store.record(MemoryEvent::allocate(0x1000, 1024, 1));
store.clear();
assert!(store.is_empty());
store.record(MemoryEvent::allocate(0x2000, 2048, 1));
assert_eq!(store.len(), 1);
}
#[test]
fn test_event_types() {
let store = EventStore::new();
store.record(MemoryEvent::allocate(0x1000, 1024, 1));
store.record(MemoryEvent::deallocate(0x1000, 1024, 1));
store.record(MemoryEvent::reallocate(0x1000, 1024, 2048, 1));
let snapshot = store.snapshot();
assert_eq!(snapshot.len(), 3);
}
#[test]
fn test_snapshot_consistency() {
let store = EventStore::new();
store.record(MemoryEvent::allocate(0x1000, 1024, 1));
store.record(MemoryEvent::allocate(0x2000, 2048, 1));
let snapshot1 = store.snapshot();
let snapshot2 = store.snapshot();
assert_eq!(snapshot1.len(), snapshot2.len());
}
#[test]
fn test_empty_snapshot() {
let store = EventStore::new();
let snapshot = store.snapshot();
assert!(snapshot.is_empty());
}
#[test]
fn test_large_number_of_events() {
let store = EventStore::new();
for i in 0..10000 {
store.record(MemoryEvent::allocate(i, 1024, 1));
}
assert_eq!(store.len(), 10000);
}
}