1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use libpetri_core::petri_net::PetriNet;
8use libpetri_export::dot_exporter::dot_export;
9use libpetri_export::mapper::sanitize;
10
11use crate::debug_event_store::DebugEventStore;
12use crate::debug_response::{NetStructure, PlaceInfo, TransitionInfo};
13use crate::place_analysis::PlaceAnalysis;
14
15pub struct DebugSession {
17 pub session_id: String,
18 pub net_name: String,
19 pub dot_diagram: String,
20 pub places: Option<PlaceAnalysis>,
21 pub transition_names: Vec<String>,
22 pub event_store: Arc<DebugEventStore>,
23 pub start_time: u64,
24 pub active: bool,
25 pub imported_structure: Option<NetStructure>,
26 pub end_time: Option<u64>,
28 pub tags: HashMap<String, String>,
31}
32
33impl DebugSession {
34 pub fn duration_ms(&self) -> Option<u64> {
37 self.end_time.map(|end| end.saturating_sub(self.start_time))
38 }
39}
40
41pub type SessionCompletionListener = Box<dyn Fn(&DebugSession) + Send + Sync>;
43
44pub fn build_net_structure(session: &DebugSession) -> NetStructure {
46 if let Some(ref imported) = session.imported_structure {
47 return imported.clone();
48 }
49
50 let Some(ref places) = session.places else {
51 return NetStructure {
52 places: Vec::new(),
53 transitions: Vec::new(),
54 };
55 };
56
57 let place_infos: Vec<PlaceInfo> = places
58 .data()
59 .iter()
60 .map(|(name, info)| PlaceInfo {
61 name: name.clone(),
62 graph_id: format!("p_{}", sanitize(name)),
63 token_type: info.token_type.clone(),
64 is_start: !info.has_incoming,
65 is_end: !info.has_outgoing,
66 is_environment: false,
67 })
68 .collect();
69
70 let transition_infos: Vec<TransitionInfo> = session
71 .transition_names
72 .iter()
73 .map(|name| TransitionInfo {
74 name: name.clone(),
75 graph_id: format!("t_{}", sanitize(name)),
76 })
77 .collect();
78
79 NetStructure {
80 places: place_infos,
81 transitions: transition_infos,
82 }
83}
84
85pub type EventStoreFactory = Box<dyn Fn(&str) -> DebugEventStore + Send + Sync>;
87
88pub struct DebugSessionRegistry {
109 sessions: HashMap<String, DebugSession>,
110 max_sessions: usize,
111 event_store_factory: EventStoreFactory,
112 completion_listeners: Vec<SessionCompletionListener>,
113}
114
115impl DebugSessionRegistry {
116 pub fn new() -> Self {
118 Self::with_options(50, None, Vec::new())
119 }
120
121 pub fn with_options(
123 max_sessions: usize,
124 event_store_factory: Option<EventStoreFactory>,
125 completion_listeners: Vec<SessionCompletionListener>,
126 ) -> Self {
127 Self {
128 sessions: HashMap::new(),
129 max_sessions,
130 event_store_factory: event_store_factory
131 .unwrap_or_else(|| Box::new(|id: &str| DebugEventStore::new(id.to_string()))),
132 completion_listeners,
133 }
134 }
135
136 pub fn register(&mut self, session_id: String, net: &PetriNet) -> Arc<DebugEventStore> {
138 self.register_with_tags(session_id, net, HashMap::new())
139 }
140
141 pub fn register_with_tags(
147 &mut self,
148 session_id: String,
149 net: &PetriNet,
150 tags: HashMap<String, String>,
151 ) -> Arc<DebugEventStore> {
152 let dot_diagram = dot_export(net, None);
153 let places = PlaceAnalysis::from_net(net);
154 let event_store = Arc::new((self.event_store_factory)(&session_id));
155
156 let transition_names: Vec<String> = net
157 .transitions()
158 .iter()
159 .map(|t| t.name().to_string())
160 .collect();
161
162 let session = DebugSession {
163 session_id: session_id.clone(),
164 net_name: net.name().to_string(),
165 dot_diagram,
166 places: Some(places),
167 transition_names,
168 event_store: Arc::clone(&event_store),
169 start_time: now_ms(),
170 active: true,
171 imported_structure: None,
172 end_time: None,
173 tags,
174 };
175
176 self.evict_if_necessary();
177 self.sessions.insert(session_id, session);
178 event_store
179 }
180
181 pub fn complete(&mut self, session_id: &str) {
185 if let Some(session) = self.sessions.get_mut(session_id) {
186 session.active = false;
187 if session.end_time.is_none() {
188 session.end_time = Some(now_ms());
189 }
190 for listener in &self.completion_listeners {
191 listener(session);
192 }
193 }
194 }
195
196 pub fn remove(&mut self, session_id: &str) -> Option<DebugSession> {
198 let removed = self.sessions.remove(session_id);
199 if let Some(ref session) = removed {
200 session.event_store.close();
201 }
202 removed
203 }
204
205 pub fn tag(&mut self, session_id: &str, key: String, value: String) {
210 if let Some(session) = self.sessions.get_mut(session_id) {
211 session.tags.insert(key, value);
212 }
213 }
214
215 pub fn tags_for(&self, session_id: &str) -> HashMap<String, String> {
219 self.sessions
220 .get(session_id)
221 .map(|s| s.tags.clone())
222 .unwrap_or_default()
223 }
224
225 pub fn get_session(&self, session_id: &str) -> Option<&DebugSession> {
227 self.sessions.get(session_id)
228 }
229
230 pub fn list_sessions(&self, limit: usize) -> Vec<&DebugSession> {
232 self.list_sessions_tagged(limit, &HashMap::new())
233 }
234
235 pub fn list_sessions_tagged(
239 &self,
240 limit: usize,
241 tag_filter: &HashMap<String, String>,
242 ) -> Vec<&DebugSession> {
243 let mut sessions: Vec<&DebugSession> = self
244 .sessions
245 .values()
246 .filter(|s| Self::matches_tag_filter(s, tag_filter))
247 .collect();
248 sessions.sort_by(|a, b| b.start_time.cmp(&a.start_time));
249 sessions.truncate(limit);
250 sessions
251 }
252
253 pub fn list_active_sessions(&self, limit: usize) -> Vec<&DebugSession> {
255 self.list_active_sessions_tagged(limit, &HashMap::new())
256 }
257
258 pub fn list_active_sessions_tagged(
260 &self,
261 limit: usize,
262 tag_filter: &HashMap<String, String>,
263 ) -> Vec<&DebugSession> {
264 let mut sessions: Vec<&DebugSession> = self
265 .sessions
266 .values()
267 .filter(|s| s.active)
268 .filter(|s| Self::matches_tag_filter(s, tag_filter))
269 .collect();
270 sessions.sort_by(|a, b| b.start_time.cmp(&a.start_time));
271 sessions.truncate(limit);
272 sessions
273 }
274
275 fn matches_tag_filter(session: &DebugSession, filter: &HashMap<String, String>) -> bool {
277 if filter.is_empty() {
278 return true;
279 }
280 filter.iter().all(|(k, v)| session.tags.get(k) == Some(v))
281 }
282
283 pub fn size(&self) -> usize {
285 self.sessions.len()
286 }
287
288 pub fn register_imported(
290 &mut self,
291 session_id: String,
292 net_name: String,
293 dot_diagram: String,
294 structure: NetStructure,
295 event_store: Arc<DebugEventStore>,
296 start_time: u64,
297 ) {
298 self.register_imported_with_metadata(
299 session_id,
300 net_name,
301 dot_diagram,
302 structure,
303 event_store,
304 start_time,
305 None,
306 HashMap::new(),
307 );
308 }
309
310 #[allow(clippy::too_many_arguments)]
312 pub fn register_imported_with_metadata(
313 &mut self,
314 session_id: String,
315 net_name: String,
316 dot_diagram: String,
317 structure: NetStructure,
318 event_store: Arc<DebugEventStore>,
319 start_time: u64,
320 end_time: Option<u64>,
321 tags: HashMap<String, String>,
322 ) {
323 self.evict_if_necessary();
324
325 let session = DebugSession {
326 session_id: session_id.clone(),
327 net_name,
328 dot_diagram,
329 places: None,
330 transition_names: Vec::new(),
331 event_store,
332 start_time,
333 active: false,
334 imported_structure: Some(structure),
335 end_time,
336 tags,
337 };
338
339 self.sessions.insert(session_id, session);
340 }
341
342 fn evict_if_necessary(&mut self) {
343 if self.sessions.len() < self.max_sessions {
344 return;
345 }
346
347 let mut candidates: Vec<(&String, bool, u64)> = self
349 .sessions
350 .iter()
351 .map(|(id, s)| (id, s.active, s.start_time))
352 .collect();
353 candidates.sort_by(|a, b| {
354 if a.1 != b.1 {
355 return if a.1 {
356 std::cmp::Ordering::Greater
357 } else {
358 std::cmp::Ordering::Less
359 };
360 }
361 a.2.cmp(&b.2)
362 });
363
364 let to_remove: Vec<String> = candidates
365 .iter()
366 .take_while(|_| self.sessions.len() >= self.max_sessions)
367 .map(|(id, _, _)| (*id).clone())
368 .collect();
369
370 for id in to_remove {
371 if self.sessions.len() < self.max_sessions {
372 break;
373 }
374 if let Some(session) = self.sessions.remove(&id) {
375 session.event_store.close();
376 }
377 }
378 }
379}
380
381impl Default for DebugSessionRegistry {
382 fn default() -> Self {
383 Self::new()
384 }
385}
386
387fn now_ms() -> u64 {
388 SystemTime::now()
389 .duration_since(UNIX_EPOCH)
390 .unwrap_or_default()
391 .as_millis() as u64
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use libpetri_core::input::one;
398 use libpetri_core::output::out_place;
399 use libpetri_core::place::Place;
400 use libpetri_core::transition::Transition;
401
402 fn test_net() -> PetriNet {
403 let p1 = Place::<i32>::new("p1");
404 let p2 = Place::<i32>::new("p2");
405 let t = Transition::builder("t1")
406 .input(one(&p1))
407 .output(out_place(&p2))
408 .build();
409 PetriNet::builder("test").transition(t).build()
410 }
411
412 #[test]
413 fn register_and_get_session() {
414 let mut registry = DebugSessionRegistry::new();
415 let net = test_net();
416 let _store = registry.register("s1".into(), &net);
417
418 let session = registry.get_session("s1").unwrap();
419 assert_eq!(session.net_name, "test");
420 assert!(session.active);
421 assert!(!session.dot_diagram.is_empty());
422 }
423
424 #[test]
425 fn complete_session() {
426 let mut registry = DebugSessionRegistry::new();
427 let net = test_net();
428 let _store = registry.register("s1".into(), &net);
429
430 registry.complete("s1");
431 let session = registry.get_session("s1").unwrap();
432 assert!(!session.active);
433 }
434
435 #[test]
436 fn list_sessions() {
437 let mut registry = DebugSessionRegistry::new();
438 let net = test_net();
439 let _s1 = registry.register("s1".into(), &net);
440 let _s2 = registry.register("s2".into(), &net);
441
442 assert_eq!(registry.list_sessions(10).len(), 2);
443 assert_eq!(registry.size(), 2);
444 }
445
446 #[test]
447 fn list_active_sessions() {
448 let mut registry = DebugSessionRegistry::new();
449 let net = test_net();
450 let _s1 = registry.register("s1".into(), &net);
451 let _s2 = registry.register("s2".into(), &net);
452 registry.complete("s1");
453
454 assert_eq!(registry.list_active_sessions(10).len(), 1);
455 }
456
457 #[test]
458 fn remove_session() {
459 let mut registry = DebugSessionRegistry::new();
460 let net = test_net();
461 let _store = registry.register("s1".into(), &net);
462
463 let removed = registry.remove("s1");
464 assert!(removed.is_some());
465 assert!(registry.get_session("s1").is_none());
466 assert_eq!(registry.size(), 0);
467 }
468
469 #[test]
470 fn build_net_structure_from_live_session() {
471 let mut registry = DebugSessionRegistry::new();
472 let net = test_net();
473 let _store = registry.register("s1".into(), &net);
474
475 let session = registry.get_session("s1").unwrap();
476 let structure = build_net_structure(session);
477
478 assert_eq!(structure.places.len(), 2);
479 assert_eq!(structure.transitions.len(), 1);
480
481 let p1 = structure.places.iter().find(|p| p.name == "p1").unwrap();
482 assert_eq!(p1.graph_id, "p_p1");
483 assert!(p1.is_start);
484 assert!(!p1.is_end);
485
486 let p2 = structure.places.iter().find(|p| p.name == "p2").unwrap();
487 assert!(p2.is_end);
488 assert!(!p2.is_start);
489
490 assert_eq!(structure.transitions[0].name, "t1");
491 assert_eq!(structure.transitions[0].graph_id, "t_t1");
492 }
493
494 #[test]
495 fn eviction_at_capacity() {
496 let mut registry = DebugSessionRegistry::with_options(2, None, Vec::new());
497 let net = test_net();
498
499 let _s1 = registry.register("s1".into(), &net);
500 let _s2 = registry.register("s2".into(), &net);
501 registry.complete("s1");
502 let _s3 = registry.register("s3".into(), &net);
504
505 assert_eq!(registry.size(), 2);
506 assert!(registry.get_session("s1").is_none());
507 assert!(registry.get_session("s2").is_some());
508 assert!(registry.get_session("s3").is_some());
509 }
510
511 fn tags_map<const N: usize>(pairs: [(&str, &str); N]) -> HashMap<String, String> {
514 pairs
515 .iter()
516 .map(|(k, v)| (k.to_string(), v.to_string()))
517 .collect()
518 }
519
520 #[test]
521 fn register_with_tags() {
522 let mut registry = DebugSessionRegistry::new();
523 let net = test_net();
524 registry.register_with_tags(
525 "s1".into(),
526 &net,
527 tags_map([("channel", "voice"), ("env", "staging")]),
528 );
529
530 let tags = registry.tags_for("s1");
531 assert_eq!(tags.get("channel"), Some(&"voice".to_string()));
532 assert_eq!(tags.get("env"), Some(&"staging".to_string()));
533 }
534
535 #[test]
536 fn default_register_has_empty_tags() {
537 let mut registry = DebugSessionRegistry::new();
538 let net = test_net();
539 registry.register("s1".into(), &net);
540
541 assert!(registry.tags_for("s1").is_empty());
542 }
543
544 #[test]
545 fn tags_for_unknown_session_returns_empty() {
546 let registry = DebugSessionRegistry::new();
547 assert!(registry.tags_for("never-registered").is_empty());
548 }
549
550 #[test]
551 fn set_tag_after_registration() {
552 let mut registry = DebugSessionRegistry::new();
553 let net = test_net();
554 registry.register("s1".into(), &net);
555
556 registry.tag("s1", "channel".into(), "text".into());
557 registry.tag("s1", "experiment".into(), "abc".into());
558
559 let tags = registry.tags_for("s1");
560 assert_eq!(tags.len(), 2);
561 assert_eq!(tags.get("channel"), Some(&"text".to_string()));
562 assert_eq!(tags.get("experiment"), Some(&"abc".to_string()));
563 }
564
565 #[test]
566 fn replace_existing_tag_value() {
567 let mut registry = DebugSessionRegistry::new();
568 let net = test_net();
569 registry.register_with_tags("s1".into(), &net, tags_map([("channel", "voice")]));
570
571 registry.tag("s1", "channel".into(), "text".into());
572
573 assert_eq!(
574 registry.tags_for("s1").get("channel"),
575 Some(&"text".to_string())
576 );
577 }
578
579 #[test]
580 fn tag_unknown_session_is_no_op() {
581 let mut registry = DebugSessionRegistry::new();
582
583 registry.tag("never-registered", "channel".into(), "voice".into());
584
585 assert!(registry.tags_for("never-registered").is_empty());
586 assert!(
587 registry
588 .list_sessions_tagged(10, &tags_map([("channel", "voice")]))
589 .is_empty()
590 );
591 }
592
593 #[test]
594 fn tag_removed_session_is_no_op() {
595 let mut registry = DebugSessionRegistry::new();
596 let net = test_net();
597 registry.register("s1".into(), &net);
598 registry.remove("s1");
599
600 registry.tag("s1", "channel".into(), "voice".into());
601
602 assert!(registry.tags_for("s1").is_empty());
603 }
604
605 #[test]
606 fn filter_sessions_by_tag() {
607 let mut registry = DebugSessionRegistry::new();
608 let net = test_net();
609 registry.register_with_tags("text-1".into(), &net, tags_map([("channel", "text")]));
610 registry.register_with_tags("voice-1".into(), &net, tags_map([("channel", "voice")]));
611 registry.register_with_tags("voice-2".into(), &net, tags_map([("channel", "voice")]));
612
613 let voices = registry.list_sessions_tagged(10, &tags_map([("channel", "voice")]));
614
615 assert_eq!(voices.len(), 2);
616 assert!(voices.iter().all(|s| s.session_id.starts_with("voice")));
617 }
618
619 #[test]
620 fn and_match_multiple_tag_keys() {
621 let mut registry = DebugSessionRegistry::new();
622 let net = test_net();
623 registry.register_with_tags(
624 "s1".into(),
625 &net,
626 tags_map([("channel", "voice"), ("env", "staging")]),
627 );
628 registry.register_with_tags(
629 "s2".into(),
630 &net,
631 tags_map([("channel", "voice"), ("env", "prod")]),
632 );
633 registry.register_with_tags(
634 "s3".into(),
635 &net,
636 tags_map([("channel", "text"), ("env", "staging")]),
637 );
638
639 let filtered = registry.list_sessions_tagged(
640 10,
641 &tags_map([("channel", "voice"), ("env", "staging")]),
642 );
643
644 assert_eq!(filtered.len(), 1);
645 assert_eq!(filtered[0].session_id, "s1");
646 }
647
648 #[test]
649 fn filter_active_sessions_by_tag() {
650 let mut registry = DebugSessionRegistry::new();
651 let net = test_net();
652 registry.register_with_tags(
653 "active-voice".into(),
654 &net,
655 tags_map([("channel", "voice")]),
656 );
657 registry.register_with_tags(
658 "completed-voice".into(),
659 &net,
660 tags_map([("channel", "voice")]),
661 );
662 registry.register_with_tags(
663 "active-text".into(),
664 &net,
665 tags_map([("channel", "text")]),
666 );
667 registry.complete("completed-voice");
668
669 let active_voices =
670 registry.list_active_sessions_tagged(10, &tags_map([("channel", "voice")]));
671
672 assert_eq!(active_voices.len(), 1);
673 assert_eq!(active_voices[0].session_id, "active-voice");
674 }
675
676 #[test]
677 fn stamp_end_time_on_complete() {
678 let mut registry = DebugSessionRegistry::new();
679 let net = test_net();
680 let _store = registry.register("s1".into(), &net);
681 assert!(registry.get_session("s1").unwrap().end_time.is_none());
682
683 registry.complete("s1");
684
685 let s = registry.get_session("s1").unwrap();
686 assert!(s.end_time.is_some());
687 assert!(!s.active);
688 }
689
690 #[test]
691 fn preserve_end_time_on_second_complete() {
692 let mut registry = DebugSessionRegistry::new();
693 let net = test_net();
694 let _store = registry.register("s1".into(), &net);
695
696 registry.complete("s1");
697 let first_end = registry.get_session("s1").unwrap().end_time;
698
699 std::thread::sleep(std::time::Duration::from_millis(5));
700 registry.complete("s1");
701 let second_end = registry.get_session("s1").unwrap().end_time;
702
703 assert_eq!(first_end, second_end);
704 }
705
706 #[test]
707 fn duration_ms_for_completed_session() {
708 let mut registry = DebugSessionRegistry::new();
709 let net = test_net();
710 let _store = registry.register("s1".into(), &net);
711
712 std::thread::sleep(std::time::Duration::from_millis(2));
713 registry.complete("s1");
714
715 let s = registry.get_session("s1").unwrap();
716 let duration = s.duration_ms().expect("duration should be Some");
717 assert!(duration >= 1, "expected duration >= 1ms, got {}", duration);
719 }
720
721 #[test]
722 fn duration_ms_is_none_for_active_session() {
723 let mut registry = DebugSessionRegistry::new();
724 let net = test_net();
725 let _store = registry.register("s1".into(), &net);
726
727 assert!(registry.get_session("s1").unwrap().duration_ms().is_none());
728 }
729
730 #[test]
731 fn clear_tags_on_remove() {
732 let mut registry = DebugSessionRegistry::new();
733 let net = test_net();
734 registry.register_with_tags("s1".into(), &net, tags_map([("channel", "voice")]));
735
736 registry.remove("s1");
737
738 assert!(registry.tags_for("s1").is_empty());
739 }
740
741 #[test]
742 fn register_imported_with_metadata() {
743 let mut registry = DebugSessionRegistry::new();
744 let structure = NetStructure {
745 places: vec![],
746 transitions: vec![],
747 };
748 let start_time = 1000;
749 let end_time = 1500;
750
751 registry.register_imported_with_metadata(
752 "imported-1".into(),
753 "TestNet".into(),
754 "digraph{}".into(),
755 structure,
756 Arc::new(DebugEventStore::new("imported-1".into())),
757 start_time,
758 Some(end_time),
759 tags_map([("channel", "voice"), ("source", "archive")]),
760 );
761
762 let s = registry.get_session("imported-1").unwrap();
763 assert!(!s.active);
764 assert_eq!(s.end_time, Some(end_time));
765 assert_eq!(s.duration_ms(), Some(500));
766
767 let tags = registry.tags_for("imported-1");
768 assert_eq!(tags.get("channel"), Some(&"voice".to_string()));
769 assert_eq!(tags.get("source"), Some(&"archive".to_string()));
770 }
771
772 #[test]
773 fn backward_compat_register_imported_no_metadata() {
774 let mut registry = DebugSessionRegistry::new();
775 let structure = NetStructure {
776 places: vec![],
777 transitions: vec![],
778 };
779
780 registry.register_imported(
781 "imported-1".into(),
782 "TestNet".into(),
783 "digraph{}".into(),
784 structure,
785 Arc::new(DebugEventStore::new("imported-1".into())),
786 1000,
787 );
788
789 let s = registry.get_session("imported-1").unwrap();
790 assert!(s.end_time.is_none());
791 assert!(registry.tags_for("imported-1").is_empty());
792 }
793
794 #[test]
795 fn cleanup_tags_on_eviction() {
796 let mut registry = DebugSessionRegistry::with_options(2, None, Vec::new());
797 let net = test_net();
798
799 registry.register_with_tags("s1".into(), &net, tags_map([("channel", "voice")]));
800 registry.register_with_tags("s2".into(), &net, tags_map([("channel", "text")]));
801 registry.complete("s1");
802
803 registry.register_with_tags("s3".into(), &net, tags_map([("channel", "voice")]));
805
806 assert!(registry.get_session("s1").is_none());
807 assert!(registry.tags_for("s1").is_empty());
808 assert_eq!(
809 registry.tags_for("s2").get("channel"),
810 Some(&"text".to_string())
811 );
812 assert_eq!(
813 registry.tags_for("s3").get("channel"),
814 Some(&"voice".to_string())
815 );
816 }
817
818 #[test]
819 fn concurrent_tag_and_complete_smoke() {
820 use std::sync::{Barrier, Mutex};
821 use std::thread;
822
823 let registry = Arc::new(Mutex::new(DebugSessionRegistry::new()));
824 let net = test_net();
825 registry.lock().unwrap().register("s1".into(), &net);
826
827 let barrier = Arc::new(Barrier::new(8));
828 let mut handles = Vec::new();
829
830 for i in 0..8 {
831 let registry = Arc::clone(®istry);
832 let barrier = Arc::clone(&barrier);
833 handles.push(thread::spawn(move || {
834 barrier.wait();
835 let mut reg = registry.lock().unwrap();
836 if i % 2 == 0 {
837 reg.tag("s1", format!("k{i}"), "v".into());
838 } else {
839 reg.complete("s1");
840 }
841 }));
842 }
843
844 for h in handles {
845 h.join().unwrap();
846 }
847
848 let reg = registry.lock().unwrap();
849 let session = reg.get_session("s1").expect("session must exist");
850 assert!(!session.active, "session should be marked complete");
851 assert!(
852 session.end_time.is_some(),
853 "end time should be stamped after complete()"
854 );
855 let tags = reg.tags_for("s1");
856 assert_eq!(tags.len(), 4, "four even-indexed threads tagged the session");
857 for k in ["k0", "k2", "k4", "k6"] {
858 assert_eq!(tags.get(k), Some(&"v".to_string()), "missing tag {k}");
859 }
860 }
861}