libpetri_debug/
debug_event_store.rs1use std::sync::Mutex;
4
5use crossbeam_channel::Sender;
6use libpetri_event::net_event::NetEvent;
7
8pub const DEFAULT_MAX_EVENTS: usize = 10_000;
10
11pub struct DebugEventStore {
17 session_id: String,
18 max_events: usize,
19 inner: Mutex<Inner>,
20}
21
22struct Inner {
23 events: Vec<NetEvent>,
24 event_count: usize,
25 evicted_count: usize,
26 subscribers: Vec<Sender<NetEvent>>,
27}
28
29pub struct Subscription {
31 cancel_tx: Option<Sender<()>>,
32}
33
34impl Subscription {
35 pub fn cancel(&mut self) {
37 self.cancel_tx.take();
38 }
39
40 pub fn is_active(&self) -> bool {
42 self.cancel_tx.is_some()
43 }
44}
45
46impl DebugEventStore {
47 pub fn new(session_id: String) -> Self {
49 Self::with_capacity(session_id, DEFAULT_MAX_EVENTS)
50 }
51
52 pub fn with_capacity(session_id: String, max_events: usize) -> Self {
54 assert!(
55 max_events > 0,
56 "max_events must be positive, got: {max_events}"
57 );
58 Self {
59 session_id,
60 max_events,
61 inner: Mutex::new(Inner {
62 events: Vec::new(),
63 event_count: 0,
64 evicted_count: 0,
65 subscribers: Vec::new(),
66 }),
67 }
68 }
69
70 pub fn session_id(&self) -> &str {
72 &self.session_id
73 }
74
75 pub fn max_events(&self) -> usize {
77 self.max_events
78 }
79
80 pub fn append(&self, event: NetEvent) {
82 let mut inner = self.inner.lock().unwrap();
83 inner.events.push(event.clone());
84 inner.event_count += 1;
85
86 while inner.events.len() > self.max_events {
88 inner.events.remove(0);
89 inner.evicted_count += 1;
90 }
91
92 inner
94 .subscribers
95 .retain(|tx| tx.send(event.clone()).is_ok());
96 }
97
98 pub fn events(&self) -> Vec<NetEvent> {
100 self.inner.lock().unwrap().events.clone()
101 }
102
103 pub fn event_count(&self) -> usize {
105 self.inner.lock().unwrap().event_count
106 }
107
108 pub fn size(&self) -> usize {
110 self.inner.lock().unwrap().events.len()
111 }
112
113 pub fn is_empty(&self) -> bool {
115 self.inner.lock().unwrap().events.is_empty()
116 }
117
118 pub fn evicted_count(&self) -> usize {
120 self.inner.lock().unwrap().evicted_count
121 }
122
123 pub fn subscribe(&self) -> (crossbeam_channel::Receiver<NetEvent>, Subscription) {
128 let (event_tx, event_rx) = crossbeam_channel::unbounded();
129 let (cancel_tx, _cancel_rx) = crossbeam_channel::bounded(1);
130
131 self.inner.lock().unwrap().subscribers.push(event_tx);
132
133 (
134 event_rx,
135 Subscription {
136 cancel_tx: Some(cancel_tx),
137 },
138 )
139 }
140
141 pub fn subscriber_count(&self) -> usize {
143 self.inner.lock().unwrap().subscribers.len()
144 }
145
146 pub fn events_from(&self, from_index: usize) -> Vec<NetEvent> {
148 let inner = self.inner.lock().unwrap();
149 let adjusted_skip = from_index.saturating_sub(inner.evicted_count);
150 if adjusted_skip >= inner.events.len() {
151 return Vec::new();
152 }
153 inner.events[adjusted_skip..].to_vec()
154 }
155
156 pub fn events_since(&self, from: u64) -> Vec<NetEvent> {
158 let inner = self.inner.lock().unwrap();
159 inner
160 .events
161 .iter()
162 .filter(|e| e.timestamp() >= from)
163 .cloned()
164 .collect()
165 }
166
167 pub fn events_between(&self, from: u64, to: u64) -> Vec<NetEvent> {
169 let inner = self.inner.lock().unwrap();
170 inner
171 .events
172 .iter()
173 .filter(|e| e.timestamp() >= from && e.timestamp() < to)
174 .cloned()
175 .collect()
176 }
177
178 pub fn close(&self) {
180 self.inner.lock().unwrap().subscribers.clear();
181 }
182}
183
184impl std::fmt::Debug for DebugEventStore {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 let inner = self.inner.lock().unwrap();
187 f.debug_struct("DebugEventStore")
188 .field("session_id", &self.session_id)
189 .field("max_events", &self.max_events)
190 .field("event_count", &inner.event_count)
191 .field("retained", &inner.events.len())
192 .field("subscribers", &inner.subscribers.len())
193 .finish()
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use std::sync::Arc;
201
202 fn started_event(name: &str, ts: u64) -> NetEvent {
203 NetEvent::TransitionStarted {
204 transition_name: Arc::from(name),
205 timestamp: ts,
206 }
207 }
208
209 #[test]
210 fn basic_append_and_retrieve() {
211 let store = DebugEventStore::new("s1".into());
212 store.append(started_event("t1", 100));
213 store.append(started_event("t2", 200));
214 assert_eq!(store.event_count(), 2);
215 assert_eq!(store.size(), 2);
216 assert!(!store.is_empty());
217 assert_eq!(store.events().len(), 2);
218 }
219
220 #[test]
221 fn eviction_at_capacity() {
222 let store = DebugEventStore::with_capacity("s1".into(), 3);
223 for i in 0..5 {
224 store.append(started_event("t", i));
225 }
226 assert_eq!(store.event_count(), 5);
227 assert_eq!(store.size(), 3);
228 assert_eq!(store.evicted_count(), 2);
229 let events = store.events();
231 assert_eq!(events[0].timestamp(), 2);
232 }
233
234 #[test]
235 fn events_from_with_eviction() {
236 let store = DebugEventStore::with_capacity("s1".into(), 3);
237 for i in 0..5 {
238 store.append(started_event("t", i));
239 }
240 let from_0 = store.events_from(0);
242 assert_eq!(from_0.len(), 3); let from_3 = store.events_from(3);
244 assert_eq!(from_3.len(), 2); }
246
247 #[test]
248 fn events_since_and_between() {
249 let store = DebugEventStore::new("s1".into());
250 for i in 0..5 {
251 store.append(started_event("t", i * 100));
252 }
253 assert_eq!(store.events_since(200).len(), 3);
254 assert_eq!(store.events_between(100, 300).len(), 2);
255 }
256
257 #[test]
258 fn subscription_broadcast() {
259 let store = DebugEventStore::new("s1".into());
260 let (rx, _sub) = store.subscribe();
261 store.append(started_event("t1", 100));
262 let event = rx.try_recv().unwrap();
263 assert_eq!(event.timestamp(), 100);
264 }
265
266 #[test]
267 fn subscription_cancel() {
268 let store = DebugEventStore::new("s1".into());
269 let (rx, mut sub) = store.subscribe();
270 assert!(sub.is_active());
271 sub.cancel();
272 assert!(!sub.is_active());
273 store.append(started_event("t1", 100));
276 let _ = rx.try_recv();
278 }
279
280 #[test]
281 fn close_clears_subscribers() {
282 let store = DebugEventStore::new("s1".into());
283 let (_rx, _sub) = store.subscribe();
284 assert_eq!(store.subscriber_count(), 1);
285 store.close();
286 assert_eq!(store.subscriber_count(), 0);
287 }
288}