Skip to main content

libpetri_debug/
debug_event_store.rs

1//! Event store with live tailing and historical replay for debug sessions.
2
3use std::sync::Mutex;
4
5use crossbeam_channel::Sender;
6use libpetri_event::net_event::NetEvent;
7
8/// Default maximum events to retain before evicting oldest.
9pub const DEFAULT_MAX_EVENTS: usize = 10_000;
10
11/// Event store for debug sessions with subscription support.
12///
13/// Uses interior mutability (`Mutex`) so it can be shared via `Arc`
14/// between the executor (which owns the `DebugAwareEventStore`) and
15/// the protocol handler.
16pub struct DebugEventStore {
17    session_id: String,
18    max_events: usize,
19    inner: Mutex<Inner>,
20}
21
22struct Inner {
23    events: Vec<NetEvent>,
24    event_count: usize,
25    evicted_count: usize,
26    subscribers: Vec<Sender<NetEvent>>,
27}
28
29/// Handle for managing a live event subscription.
30pub struct Subscription {
31    cancel_tx: Option<Sender<()>>,
32}
33
34impl Subscription {
35    /// Cancels the subscription.
36    pub fn cancel(&mut self) {
37        self.cancel_tx.take();
38    }
39
40    /// Returns `true` if the subscription is still active.
41    pub fn is_active(&self) -> bool {
42        self.cancel_tx.is_some()
43    }
44}
45
46impl DebugEventStore {
47    /// Creates a new `DebugEventStore` with the given session ID and default capacity.
48    pub fn new(session_id: String) -> Self {
49        Self::with_capacity(session_id, DEFAULT_MAX_EVENTS)
50    }
51
52    /// Creates a new `DebugEventStore` with the given session ID and max event capacity.
53    pub fn with_capacity(session_id: String, max_events: usize) -> Self {
54        assert!(
55            max_events > 0,
56            "max_events must be positive, got: {max_events}"
57        );
58        Self {
59            session_id,
60            max_events,
61            inner: Mutex::new(Inner {
62                events: Vec::new(),
63                event_count: 0,
64                evicted_count: 0,
65                subscribers: Vec::new(),
66            }),
67        }
68    }
69
70    /// Returns the session ID.
71    pub fn session_id(&self) -> &str {
72        &self.session_id
73    }
74
75    /// Returns the max event capacity.
76    pub fn max_events(&self) -> usize {
77        self.max_events
78    }
79
80    /// Appends an event, evicting oldest if at capacity, and broadcasts to subscribers.
81    pub fn append(&self, event: NetEvent) {
82        let mut inner = self.inner.lock().unwrap();
83        inner.events.push(event.clone());
84        inner.event_count += 1;
85
86        // Evict oldest when capacity exceeded
87        while inner.events.len() > self.max_events {
88            inner.events.remove(0);
89            inner.evicted_count += 1;
90        }
91
92        // Broadcast to subscribers, removing disconnected ones
93        inner
94            .subscribers
95            .retain(|tx| tx.send(event.clone()).is_ok());
96    }
97
98    /// Returns a snapshot of all retained events.
99    pub fn events(&self) -> Vec<NetEvent> {
100        self.inner.lock().unwrap().events.clone()
101    }
102
103    /// Total number of events ever appended (including evicted).
104    pub fn event_count(&self) -> usize {
105        self.inner.lock().unwrap().event_count
106    }
107
108    /// Number of retained events.
109    pub fn size(&self) -> usize {
110        self.inner.lock().unwrap().events.len()
111    }
112
113    /// Returns `true` if no events are retained.
114    pub fn is_empty(&self) -> bool {
115        self.inner.lock().unwrap().events.is_empty()
116    }
117
118    /// Number of events evicted from the store.
119    pub fn evicted_count(&self) -> usize {
120        self.inner.lock().unwrap().evicted_count
121    }
122
123    /// Subscribe to receive events as they occur.
124    ///
125    /// Returns a `crossbeam_channel::Receiver` for receiving events
126    /// and a `Subscription` handle for cancellation.
127    pub fn subscribe(&self) -> (crossbeam_channel::Receiver<NetEvent>, Subscription) {
128        let (event_tx, event_rx) = crossbeam_channel::unbounded();
129        let (cancel_tx, _cancel_rx) = crossbeam_channel::bounded(1);
130
131        self.inner.lock().unwrap().subscribers.push(event_tx);
132
133        (
134            event_rx,
135            Subscription {
136                cancel_tx: Some(cancel_tx),
137            },
138        )
139    }
140
141    /// Number of active subscribers.
142    pub fn subscriber_count(&self) -> usize {
143        self.inner.lock().unwrap().subscribers.len()
144    }
145
146    /// Returns events starting from a specific global index.
147    pub fn events_from(&self, from_index: usize) -> Vec<NetEvent> {
148        let inner = self.inner.lock().unwrap();
149        let adjusted_skip = from_index.saturating_sub(inner.evicted_count);
150        if adjusted_skip >= inner.events.len() {
151            return Vec::new();
152        }
153        inner.events[adjusted_skip..].to_vec()
154    }
155
156    /// Returns all events since the specified timestamp.
157    pub fn events_since(&self, from: u64) -> Vec<NetEvent> {
158        let inner = self.inner.lock().unwrap();
159        inner
160            .events
161            .iter()
162            .filter(|e| e.timestamp() >= from)
163            .cloned()
164            .collect()
165    }
166
167    /// Returns events within a time range `[from, to)`.
168    pub fn events_between(&self, from: u64, to: u64) -> Vec<NetEvent> {
169        let inner = self.inner.lock().unwrap();
170        inner
171            .events
172            .iter()
173            .filter(|e| e.timestamp() >= from && e.timestamp() < to)
174            .cloned()
175            .collect()
176    }
177
178    /// Closes the store, removing all subscribers.
179    pub fn close(&self) {
180        self.inner.lock().unwrap().subscribers.clear();
181    }
182}
183
184impl std::fmt::Debug for DebugEventStore {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        let inner = self.inner.lock().unwrap();
187        f.debug_struct("DebugEventStore")
188            .field("session_id", &self.session_id)
189            .field("max_events", &self.max_events)
190            .field("event_count", &inner.event_count)
191            .field("retained", &inner.events.len())
192            .field("subscribers", &inner.subscribers.len())
193            .finish()
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use std::sync::Arc;
201
202    fn started_event(name: &str, ts: u64) -> NetEvent {
203        NetEvent::TransitionStarted {
204            transition_name: Arc::from(name),
205            timestamp: ts,
206        }
207    }
208
209    #[test]
210    fn basic_append_and_retrieve() {
211        let store = DebugEventStore::new("s1".into());
212        store.append(started_event("t1", 100));
213        store.append(started_event("t2", 200));
214        assert_eq!(store.event_count(), 2);
215        assert_eq!(store.size(), 2);
216        assert!(!store.is_empty());
217        assert_eq!(store.events().len(), 2);
218    }
219
220    #[test]
221    fn eviction_at_capacity() {
222        let store = DebugEventStore::with_capacity("s1".into(), 3);
223        for i in 0..5 {
224            store.append(started_event("t", i));
225        }
226        assert_eq!(store.event_count(), 5);
227        assert_eq!(store.size(), 3);
228        assert_eq!(store.evicted_count(), 2);
229        // Retained events should be timestamps 2, 3, 4
230        let events = store.events();
231        assert_eq!(events[0].timestamp(), 2);
232    }
233
234    #[test]
235    fn events_from_with_eviction() {
236        let store = DebugEventStore::with_capacity("s1".into(), 3);
237        for i in 0..5 {
238            store.append(started_event("t", i));
239        }
240        // Global index 0..4, evicted 0,1 → retained 2,3,4
241        let from_0 = store.events_from(0);
242        assert_eq!(from_0.len(), 3); // all retained
243        let from_3 = store.events_from(3);
244        assert_eq!(from_3.len(), 2); // indices 3,4
245    }
246
247    #[test]
248    fn events_since_and_between() {
249        let store = DebugEventStore::new("s1".into());
250        for i in 0..5 {
251            store.append(started_event("t", i * 100));
252        }
253        assert_eq!(store.events_since(200).len(), 3);
254        assert_eq!(store.events_between(100, 300).len(), 2);
255    }
256
257    #[test]
258    fn subscription_broadcast() {
259        let store = DebugEventStore::new("s1".into());
260        let (rx, _sub) = store.subscribe();
261        store.append(started_event("t1", 100));
262        let event = rx.try_recv().unwrap();
263        assert_eq!(event.timestamp(), 100);
264    }
265
266    #[test]
267    fn subscription_cancel() {
268        let store = DebugEventStore::new("s1".into());
269        let (rx, mut sub) = store.subscribe();
270        assert!(sub.is_active());
271        sub.cancel();
272        assert!(!sub.is_active());
273        // After cancel, the sender is dropped but existing senders in store
274        // will be cleaned up on next broadcast
275        store.append(started_event("t1", 100));
276        // rx may or may not receive depending on timing, but shouldn't panic
277        let _ = rx.try_recv();
278    }
279
280    #[test]
281    fn close_clears_subscribers() {
282        let store = DebugEventStore::new("s1".into());
283        let (_rx, _sub) = store.subscribe();
284        assert_eq!(store.subscriber_count(), 1);
285        store.close();
286        assert_eq!(store.subscriber_count(), 0);
287    }
288}