use std::sync::Mutex;
use crossbeam_channel::Sender;
use libpetri_event::net_event::NetEvent;
pub const DEFAULT_MAX_EVENTS: usize = 10_000;
pub struct DebugEventStore {
session_id: String,
max_events: usize,
inner: Mutex<Inner>,
}
struct Inner {
events: Vec<NetEvent>,
event_count: usize,
evicted_count: usize,
subscribers: Vec<Sender<NetEvent>>,
}
pub struct Subscription {
cancel_tx: Option<Sender<()>>,
}
impl Subscription {
pub fn cancel(&mut self) {
self.cancel_tx.take();
}
pub fn is_active(&self) -> bool {
self.cancel_tx.is_some()
}
}
impl DebugEventStore {
pub fn new(session_id: String) -> Self {
Self::with_capacity(session_id, DEFAULT_MAX_EVENTS)
}
pub fn with_capacity(session_id: String, max_events: usize) -> Self {
assert!(
max_events > 0,
"max_events must be positive, got: {max_events}"
);
Self {
session_id,
max_events,
inner: Mutex::new(Inner {
events: Vec::new(),
event_count: 0,
evicted_count: 0,
subscribers: Vec::new(),
}),
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn max_events(&self) -> usize {
self.max_events
}
pub fn append(&self, event: NetEvent) {
let mut inner = self.inner.lock().unwrap();
inner.events.push(event.clone());
inner.event_count += 1;
while inner.events.len() > self.max_events {
inner.events.remove(0);
inner.evicted_count += 1;
}
inner
.subscribers
.retain(|tx| tx.send(event.clone()).is_ok());
}
pub fn events(&self) -> Vec<NetEvent> {
self.inner.lock().unwrap().events.clone()
}
pub fn event_count(&self) -> usize {
self.inner.lock().unwrap().event_count
}
pub fn size(&self) -> usize {
self.inner.lock().unwrap().events.len()
}
pub fn is_empty(&self) -> bool {
self.inner.lock().unwrap().events.is_empty()
}
pub fn evicted_count(&self) -> usize {
self.inner.lock().unwrap().evicted_count
}
pub fn subscribe(&self) -> (crossbeam_channel::Receiver<NetEvent>, Subscription) {
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let (cancel_tx, _cancel_rx) = crossbeam_channel::bounded(1);
self.inner.lock().unwrap().subscribers.push(event_tx);
(
event_rx,
Subscription {
cancel_tx: Some(cancel_tx),
},
)
}
pub fn subscriber_count(&self) -> usize {
self.inner.lock().unwrap().subscribers.len()
}
pub fn events_from(&self, from_index: usize) -> Vec<NetEvent> {
let inner = self.inner.lock().unwrap();
let adjusted_skip = from_index.saturating_sub(inner.evicted_count);
if adjusted_skip >= inner.events.len() {
return Vec::new();
}
inner.events[adjusted_skip..].to_vec()
}
pub fn events_since(&self, from: u64) -> Vec<NetEvent> {
let inner = self.inner.lock().unwrap();
inner
.events
.iter()
.filter(|e| e.timestamp() >= from)
.cloned()
.collect()
}
pub fn events_between(&self, from: u64, to: u64) -> Vec<NetEvent> {
let inner = self.inner.lock().unwrap();
inner
.events
.iter()
.filter(|e| e.timestamp() >= from && e.timestamp() < to)
.cloned()
.collect()
}
pub fn close(&self) {
self.inner.lock().unwrap().subscribers.clear();
}
}
impl std::fmt::Debug for DebugEventStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.lock().unwrap();
f.debug_struct("DebugEventStore")
.field("session_id", &self.session_id)
.field("max_events", &self.max_events)
.field("event_count", &inner.event_count)
.field("retained", &inner.events.len())
.field("subscribers", &inner.subscribers.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn started_event(name: &str, ts: u64) -> NetEvent {
NetEvent::TransitionStarted {
transition_name: Arc::from(name),
timestamp: ts,
}
}
#[test]
fn basic_append_and_retrieve() {
let store = DebugEventStore::new("s1".into());
store.append(started_event("t1", 100));
store.append(started_event("t2", 200));
assert_eq!(store.event_count(), 2);
assert_eq!(store.size(), 2);
assert!(!store.is_empty());
assert_eq!(store.events().len(), 2);
}
#[test]
fn eviction_at_capacity() {
let store = DebugEventStore::with_capacity("s1".into(), 3);
for i in 0..5 {
store.append(started_event("t", i));
}
assert_eq!(store.event_count(), 5);
assert_eq!(store.size(), 3);
assert_eq!(store.evicted_count(), 2);
let events = store.events();
assert_eq!(events[0].timestamp(), 2);
}
#[test]
fn events_from_with_eviction() {
let store = DebugEventStore::with_capacity("s1".into(), 3);
for i in 0..5 {
store.append(started_event("t", i));
}
let from_0 = store.events_from(0);
assert_eq!(from_0.len(), 3); let from_3 = store.events_from(3);
assert_eq!(from_3.len(), 2); }
#[test]
fn events_since_and_between() {
let store = DebugEventStore::new("s1".into());
for i in 0..5 {
store.append(started_event("t", i * 100));
}
assert_eq!(store.events_since(200).len(), 3);
assert_eq!(store.events_between(100, 300).len(), 2);
}
#[test]
fn subscription_broadcast() {
let store = DebugEventStore::new("s1".into());
let (rx, _sub) = store.subscribe();
store.append(started_event("t1", 100));
let event = rx.try_recv().unwrap();
assert_eq!(event.timestamp(), 100);
}
#[test]
fn subscription_cancel() {
let store = DebugEventStore::new("s1".into());
let (rx, mut sub) = store.subscribe();
assert!(sub.is_active());
sub.cancel();
assert!(!sub.is_active());
store.append(started_event("t1", 100));
let _ = rx.try_recv();
}
#[test]
fn close_clears_subscribers() {
let store = DebugEventStore::new("s1".into());
let (_rx, _sub) = store.subscribe();
assert_eq!(store.subscriber_count(), 1);
store.close();
assert_eq!(store.subscriber_count(), 0);
}
}