1use std::collections::HashMap;
8
9use libpetri_event::net_event::NetEvent;
10
11use crate::debug_command::{BreakpointConfig, BreakpointType, DebugCommand, EventFilter};
12use crate::debug_response::{DebugResponse, NetEventInfo, SessionSummary};
13use crate::debug_session_registry::{DebugSession, DebugSessionRegistry, build_net_structure};
14use crate::marking_cache::{MarkingCache, compute_state};
15use crate::net_event_converter::{
16 event_type_name, extract_place_name, extract_transition_name, to_event_info,
17};
18
19pub trait ResponseSink: Send + Sync {
21 fn send(&self, response: DebugResponse);
22}
23
24impl<F: Fn(DebugResponse) + Send + Sync> ResponseSink for F {
26 fn send(&self, response: DebugResponse) {
27 self(response);
28 }
29}
30
31const BATCH_SIZE: usize = 500;
33
34pub struct DebugProtocolHandler {
36 session_registry: DebugSessionRegistry,
37 clients: HashMap<String, ClientState>,
38}
39
40struct ClientState {
41 sink: Box<dyn ResponseSink>,
42 subscriptions: SubscriptionState,
43}
44
45impl DebugProtocolHandler {
46 pub fn new(session_registry: DebugSessionRegistry) -> Self {
48 Self {
49 session_registry,
50 clients: HashMap::new(),
51 }
52 }
53
54 pub fn session_registry(&self) -> &DebugSessionRegistry {
56 &self.session_registry
57 }
58
59 pub fn session_registry_mut(&mut self) -> &mut DebugSessionRegistry {
61 &mut self.session_registry
62 }
63
64 pub fn client_connected(&mut self, client_id: String, sink: Box<dyn ResponseSink>) {
66 self.clients.insert(
67 client_id,
68 ClientState {
69 sink,
70 subscriptions: SubscriptionState::new(),
71 },
72 );
73 }
74
75 pub fn client_disconnected(&mut self, client_id: &str) {
77 self.clients.remove(client_id);
78 }
79
80 pub fn handle_command(&mut self, client_id: &str, command: DebugCommand) {
82 if !self.clients.contains_key(client_id) {
83 return;
84 }
85
86 let result = match command {
87 DebugCommand::ListSessions { limit, active_only } => {
88 self.handle_list_sessions(client_id, limit, active_only)
89 }
90 DebugCommand::Subscribe {
91 session_id,
92 mode,
93 from_index,
94 } => self.handle_subscribe(client_id, session_id, mode, from_index),
95 DebugCommand::Unsubscribe { session_id } => {
96 self.handle_unsubscribe(client_id, session_id)
97 }
98 DebugCommand::Seek {
99 session_id,
100 timestamp,
101 } => self.handle_seek(client_id, session_id, timestamp),
102 DebugCommand::PlaybackSpeed { session_id, speed } => {
103 self.handle_playback_speed(client_id, session_id, speed)
104 }
105 DebugCommand::Filter { session_id, filter } => {
106 self.handle_set_filter(client_id, session_id, filter)
107 }
108 DebugCommand::Pause { session_id } => self.handle_pause(client_id, session_id),
109 DebugCommand::Resume { session_id } => self.handle_resume(client_id, session_id),
110 DebugCommand::StepForward { session_id } => {
111 self.handle_step_forward(client_id, session_id)
112 }
113 DebugCommand::StepBackward { session_id } => {
114 self.handle_step_backward(client_id, session_id)
115 }
116 DebugCommand::SetBreakpoint {
117 session_id,
118 breakpoint,
119 } => self.handle_set_breakpoint(client_id, session_id, breakpoint),
120 DebugCommand::ClearBreakpoint {
121 session_id,
122 breakpoint_id,
123 } => self.handle_clear_breakpoint(client_id, session_id, breakpoint_id),
124 DebugCommand::ListBreakpoints { session_id } => {
125 self.handle_list_breakpoints(client_id, session_id)
126 }
127 DebugCommand::ListArchives { .. }
128 | DebugCommand::ImportArchive { .. }
129 | DebugCommand::UploadArchive { .. } => {
130 Ok(())
132 }
133 };
134
135 if let Err(e) = result {
136 if let Some(client) = self.clients.get(client_id) {
137 send(
138 &*client.sink,
139 DebugResponse::Error {
140 code: "COMMAND_ERROR".into(),
141 message: e,
142 session_id: None,
143 },
144 );
145 }
146 }
147 }
148
149 pub fn broadcast_event(&mut self, session_id: &str, event: &NetEvent) {
151 let event_info = to_event_info(event);
152
153 let client_ids: Vec<String> = self.clients.keys().cloned().collect();
155
156 for client_id in client_ids {
157 let client = self.clients.get_mut(&client_id).unwrap();
158 let Some(sub) = client.subscriptions.sessions.get_mut(session_id) else {
159 continue;
160 };
161
162 if sub.paused {
163 continue;
164 }
165
166 if !matches_filter(&sub.filter, event) {
167 sub.event_index += 1;
168 continue;
169 }
170
171 let hit_bp = check_breakpoints(&sub.breakpoints, event);
173 let idx = sub.event_index;
174 sub.event_index += 1;
175
176 if let Some(bp) = hit_bp {
177 sub.paused = true;
178 send(
179 &*client.sink,
180 DebugResponse::BreakpointHit {
181 session_id: session_id.to_string(),
182 breakpoint_id: bp.id.clone(),
183 event: event_info.clone(),
184 event_index: idx,
185 },
186 );
187 }
188
189 send(
190 &*client.sink,
191 DebugResponse::Event {
192 session_id: session_id.to_string(),
193 index: idx,
194 event: event_info.clone(),
195 },
196 );
197 }
198 }
199
200 fn handle_list_sessions(
203 &self,
204 client_id: &str,
205 limit: Option<usize>,
206 active_only: Option<bool>,
207 ) -> Result<(), String> {
208 let limit = limit.unwrap_or(50);
209 let sessions = if active_only.unwrap_or(false) {
210 self.session_registry.list_active_sessions(limit)
211 } else {
212 self.session_registry.list_sessions(limit)
213 };
214
215 let summaries: Vec<SessionSummary> = sessions.iter().map(|s| session_summary(s)).collect();
216
217 send_to(
218 &self.clients,
219 client_id,
220 DebugResponse::SessionList {
221 sessions: summaries,
222 },
223 );
224 Ok(())
225 }
226
227 fn handle_subscribe(
228 &mut self,
229 client_id: &str,
230 session_id: String,
231 mode: crate::debug_command::SubscriptionMode,
232 from_index: Option<usize>,
233 ) -> Result<(), String> {
234 let session = self
235 .session_registry
236 .get_session(&session_id)
237 .ok_or_else(|| format!("Session not found: {session_id}"))?;
238
239 let events = session.event_store.events();
240 let computed = compute_state(&events);
241 let structure = build_net_structure(session);
242 let from_index = from_index.unwrap_or(0);
243
244 let mode_str = match mode {
245 crate::debug_command::SubscriptionMode::Live => "live",
246 crate::debug_command::SubscriptionMode::Replay => "replay",
247 };
248
249 let current_marking = computed
250 .marking
251 .iter()
252 .map(|(k, v)| (k.clone(), v.clone()))
253 .collect();
254
255 let client = self.clients.get(client_id).unwrap();
256 send(
257 &*client.sink,
258 DebugResponse::Subscribed {
259 session_id: session_id.clone(),
260 net_name: session.net_name.clone(),
261 dot_diagram: session.dot_diagram.clone(),
262 structure,
263 current_marking,
264 enabled_transitions: computed.enabled_transitions.clone(),
265 in_flight_transitions: computed.in_flight_transitions.clone(),
266 event_count: session.event_store.event_count(),
267 mode: mode_str.into(),
268 },
269 );
270
271 let historical = session.event_store.events_from(from_index);
273 let converted: Vec<NetEventInfo> = historical.iter().map(|e| to_event_info(e)).collect();
274 send_in_batches(
275 &self.clients,
276 client_id,
277 &session_id,
278 from_index,
279 &converted,
280 );
281
282 let event_index = from_index + historical.len();
283 let paused = matches!(mode, crate::debug_command::SubscriptionMode::Replay);
284
285 let client = self.clients.get_mut(client_id).unwrap();
286 client
287 .subscriptions
288 .add_subscription(session_id, event_index, paused);
289
290 Ok(())
291 }
292
293 fn handle_unsubscribe(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
294 if let Some(client) = self.clients.get_mut(client_id) {
295 client.subscriptions.cancel(&session_id);
296 }
297 send_to(
298 &self.clients,
299 client_id,
300 DebugResponse::Unsubscribed { session_id },
301 );
302 Ok(())
303 }
304
305 fn handle_seek(
306 &mut self,
307 client_id: &str,
308 session_id: String,
309 timestamp: String,
310 ) -> Result<(), String> {
311 let session = self
312 .session_registry
313 .get_session(&session_id)
314 .ok_or("Session not found")?;
315
316 let events = session.event_store.events();
317 let target_ts: u64 = timestamp.parse().unwrap_or(0);
318
319 let mut target_index = events.len();
320 for (i, e) in events.iter().enumerate() {
321 if e.timestamp() >= target_ts {
322 target_index = i;
323 break;
324 }
325 }
326
327 let client = self.clients.get_mut(client_id).unwrap();
328 client
329 .subscriptions
330 .set_event_index(&session_id, target_index);
331 let computed = client
332 .subscriptions
333 .compute_state_at(&events, &session_id, target_index);
334
335 send(
336 &*client.sink,
337 DebugResponse::MarkingSnapshot {
338 session_id,
339 marking: computed.marking,
340 enabled_transitions: computed.enabled_transitions,
341 in_flight_transitions: computed.in_flight_transitions,
342 },
343 );
344 Ok(())
345 }
346
347 fn handle_playback_speed(
348 &mut self,
349 client_id: &str,
350 session_id: String,
351 speed: f64,
352 ) -> Result<(), String> {
353 let client = self.clients.get_mut(client_id).unwrap();
354 client.subscriptions.set_speed(&session_id, speed);
355 let paused = client.subscriptions.is_paused(&session_id);
356 let current_index = client.subscriptions.get_event_index(&session_id);
357 send(
358 &*client.sink,
359 DebugResponse::PlaybackStateChanged {
360 session_id,
361 paused,
362 speed,
363 current_index,
364 },
365 );
366 Ok(())
367 }
368
369 fn handle_set_filter(
370 &mut self,
371 client_id: &str,
372 session_id: String,
373 filter: EventFilter,
374 ) -> Result<(), String> {
375 let client = self.clients.get_mut(client_id).unwrap();
376 client.subscriptions.set_filter(&session_id, filter.clone());
377 send(
378 &*client.sink,
379 DebugResponse::FilterApplied { session_id, filter },
380 );
381 Ok(())
382 }
383
384 fn handle_pause(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
385 let client = self.clients.get_mut(client_id).unwrap();
386 client.subscriptions.set_paused(&session_id, true);
387 let speed = client.subscriptions.get_speed(&session_id);
388 let current_index = client.subscriptions.get_event_index(&session_id);
389 send(
390 &*client.sink,
391 DebugResponse::PlaybackStateChanged {
392 session_id,
393 paused: true,
394 speed,
395 current_index,
396 },
397 );
398 Ok(())
399 }
400
401 fn handle_resume(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
402 let client = self.clients.get_mut(client_id).unwrap();
403 client.subscriptions.set_paused(&session_id, false);
404 let speed = client.subscriptions.get_speed(&session_id);
405 let current_index = client.subscriptions.get_event_index(&session_id);
406 send(
407 &*client.sink,
408 DebugResponse::PlaybackStateChanged {
409 session_id,
410 paused: false,
411 speed,
412 current_index,
413 },
414 );
415 Ok(())
416 }
417
418 fn handle_step_forward(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
419 let session = self
420 .session_registry
421 .get_session(&session_id)
422 .ok_or("Session not found")?;
423
424 let events = session.event_store.events();
425 let client = self.clients.get_mut(client_id).unwrap();
426 let current_index = client.subscriptions.get_event_index(&session_id);
427
428 if current_index < events.len() {
429 let event_info = to_event_info(&events[current_index]);
430 send(
431 &*client.sink,
432 DebugResponse::Event {
433 session_id: session_id.clone(),
434 index: current_index,
435 event: event_info,
436 },
437 );
438 client
439 .subscriptions
440 .set_event_index(&session_id, current_index + 1);
441 }
442 Ok(())
443 }
444
445 fn handle_step_backward(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
446 let session = self
447 .session_registry
448 .get_session(&session_id)
449 .ok_or("Session not found")?;
450
451 let events = session.event_store.events();
452 let client = self.clients.get_mut(client_id).unwrap();
453 let current_index = client.subscriptions.get_event_index(&session_id);
454
455 if current_index > 0 {
456 let new_index = current_index - 1;
457 client.subscriptions.set_event_index(&session_id, new_index);
458 let computed = client
459 .subscriptions
460 .compute_state_at(&events, &session_id, new_index);
461
462 send(
463 &*client.sink,
464 DebugResponse::MarkingSnapshot {
465 session_id,
466 marking: computed.marking,
467 enabled_transitions: computed.enabled_transitions,
468 in_flight_transitions: computed.in_flight_transitions,
469 },
470 );
471 }
472 Ok(())
473 }
474
475 fn handle_set_breakpoint(
476 &mut self,
477 client_id: &str,
478 session_id: String,
479 breakpoint: BreakpointConfig,
480 ) -> Result<(), String> {
481 let client = self.clients.get_mut(client_id).unwrap();
482 client
483 .subscriptions
484 .add_breakpoint(&session_id, breakpoint.clone());
485 send(
486 &*client.sink,
487 DebugResponse::BreakpointSet {
488 session_id,
489 breakpoint,
490 },
491 );
492 Ok(())
493 }
494
495 fn handle_clear_breakpoint(
496 &mut self,
497 client_id: &str,
498 session_id: String,
499 breakpoint_id: String,
500 ) -> Result<(), String> {
501 let client = self.clients.get_mut(client_id).unwrap();
502 client
503 .subscriptions
504 .remove_breakpoint(&session_id, &breakpoint_id);
505 send(
506 &*client.sink,
507 DebugResponse::BreakpointCleared {
508 session_id,
509 breakpoint_id,
510 },
511 );
512 Ok(())
513 }
514
515 fn handle_list_breakpoints(&self, client_id: &str, session_id: String) -> Result<(), String> {
516 let client = self.clients.get(client_id).unwrap();
517 let breakpoints = client.subscriptions.get_breakpoints(&session_id);
518 send(
519 &*client.sink,
520 DebugResponse::BreakpointList {
521 session_id,
522 breakpoints,
523 },
524 );
525 Ok(())
526 }
527}
528
529fn send(sink: &dyn ResponseSink, response: DebugResponse) {
532 sink.send(response);
533}
534
535fn send_to(clients: &HashMap<String, ClientState>, client_id: &str, response: DebugResponse) {
536 if let Some(client) = clients.get(client_id) {
537 send(&*client.sink, response);
538 }
539}
540
541fn send_in_batches(
542 clients: &HashMap<String, ClientState>,
543 client_id: &str,
544 session_id: &str,
545 start_index: usize,
546 events: &[NetEventInfo],
547) {
548 let Some(client) = clients.get(client_id) else {
549 return;
550 };
551
552 if events.is_empty() {
553 send(
554 &*client.sink,
555 DebugResponse::EventBatch {
556 session_id: session_id.to_string(),
557 start_index,
558 events: vec![],
559 has_more: false,
560 },
561 );
562 return;
563 }
564
565 for (i, chunk) in events.chunks(BATCH_SIZE).enumerate() {
566 let chunk_start = start_index + i * BATCH_SIZE;
567 let has_more = chunk_start + chunk.len() < start_index + events.len();
568 send(
569 &*client.sink,
570 DebugResponse::EventBatch {
571 session_id: session_id.to_string(),
572 start_index: chunk_start,
573 events: chunk.to_vec(),
574 has_more,
575 },
576 );
577 }
578}
579
580fn session_summary(session: &DebugSession) -> SessionSummary {
581 SessionSummary {
582 session_id: session.session_id.clone(),
583 net_name: session.net_name.clone(),
584 start_time: session.start_time.to_string(),
585 active: session.active,
586 event_count: session.event_store.event_count(),
587 }
588}
589
590fn matches_filter(filter: &Option<EventFilter>, event: &NetEvent) -> bool {
591 let Some(filter) = filter else { return true };
592
593 if let Some(ref types) = filter.event_types {
594 if !types.is_empty() {
595 let name = event_type_name(event);
596 if !types.iter().any(|t| t == name) {
597 return false;
598 }
599 }
600 }
601
602 if let Some(ref names) = filter.transition_names {
603 if !names.is_empty() {
604 let t_name = extract_transition_name(event);
605 match t_name {
606 Some(n) => {
607 if !names.iter().any(|t| t == n) {
608 return false;
609 }
610 }
611 None => return false,
612 }
613 }
614 }
615
616 if let Some(ref names) = filter.place_names {
617 if !names.is_empty() {
618 let p_name = extract_place_name(event);
619 match p_name {
620 Some(n) => {
621 if !names.iter().any(|t| t == n) {
622 return false;
623 }
624 }
625 None => return false,
626 }
627 }
628 }
629
630 true
631}
632
633fn matches_breakpoint(bp: &BreakpointConfig, event: &NetEvent) -> bool {
634 if !bp.enabled {
635 return false;
636 }
637 match bp.bp_type {
638 BreakpointType::TransitionEnabled => {
639 matches!(event, NetEvent::TransitionEnabled { transition_name, .. }
640 if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
641 }
642 BreakpointType::TransitionStart => {
643 matches!(event, NetEvent::TransitionStarted { transition_name, .. }
644 if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
645 }
646 BreakpointType::TransitionComplete => {
647 matches!(event, NetEvent::TransitionCompleted { transition_name, .. }
648 if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
649 }
650 BreakpointType::TransitionFail => {
651 matches!(event, NetEvent::TransitionFailed { transition_name, .. }
652 if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
653 }
654 BreakpointType::TokenAdded => {
655 matches!(event, NetEvent::TokenAdded { place_name, .. }
656 if bp.target.as_ref().is_none_or(|t| t == place_name.as_ref()))
657 }
658 BreakpointType::TokenRemoved => {
659 matches!(event, NetEvent::TokenRemoved { place_name, .. }
660 if bp.target.as_ref().is_none_or(|t| t == place_name.as_ref()))
661 }
662 }
663}
664
665fn check_breakpoints(
666 breakpoints: &HashMap<String, BreakpointConfig>,
667 event: &NetEvent,
668) -> Option<BreakpointConfig> {
669 for bp in breakpoints.values() {
670 if matches_breakpoint(bp, event) {
671 return Some(bp.clone());
672 }
673 }
674 None
675}
676
677struct SessionSubscription {
680 event_index: usize,
681 marking_cache: MarkingCache,
682 breakpoints: HashMap<String, BreakpointConfig>,
683 paused: bool,
684 speed: f64,
685 filter: Option<EventFilter>,
686}
687
688struct SubscriptionState {
689 sessions: HashMap<String, SessionSubscription>,
690}
691
692impl SubscriptionState {
693 fn new() -> Self {
694 Self {
695 sessions: HashMap::new(),
696 }
697 }
698
699 fn add_subscription(&mut self, session_id: String, event_index: usize, paused: bool) {
700 self.sessions.insert(
701 session_id,
702 SessionSubscription {
703 event_index,
704 marking_cache: MarkingCache::new(),
705 breakpoints: HashMap::new(),
706 paused,
707 speed: 1.0,
708 filter: None,
709 },
710 );
711 }
712
713 fn cancel(&mut self, session_id: &str) {
714 self.sessions.remove(session_id);
715 }
716
717 fn is_paused(&self, session_id: &str) -> bool {
718 self.sessions.get(session_id).is_some_and(|s| s.paused)
719 }
720
721 fn set_paused(&mut self, session_id: &str, paused: bool) {
722 if let Some(sub) = self.sessions.get_mut(session_id) {
723 sub.paused = paused;
724 }
725 }
726
727 fn get_speed(&self, session_id: &str) -> f64 {
728 self.sessions.get(session_id).map_or(1.0, |s| s.speed)
729 }
730
731 fn set_speed(&mut self, session_id: &str, speed: f64) {
732 if let Some(sub) = self.sessions.get_mut(session_id) {
733 sub.speed = speed;
734 }
735 }
736
737 fn get_event_index(&self, session_id: &str) -> usize {
738 self.sessions.get(session_id).map_or(0, |s| s.event_index)
739 }
740
741 fn set_event_index(&mut self, session_id: &str, index: usize) {
742 if let Some(sub) = self.sessions.get_mut(session_id) {
743 sub.event_index = index;
744 }
745 }
746
747 fn compute_state_at(
748 &mut self,
749 events: &[NetEvent],
750 session_id: &str,
751 target_index: usize,
752 ) -> crate::marking_cache::ComputedState {
753 if let Some(sub) = self.sessions.get_mut(session_id) {
754 sub.marking_cache.compute_at(events, target_index)
755 } else {
756 compute_state(&events[..target_index.min(events.len())])
757 }
758 }
759
760 fn set_filter(&mut self, session_id: &str, filter: EventFilter) {
761 if let Some(sub) = self.sessions.get_mut(session_id) {
762 sub.filter = Some(filter);
763 }
764 }
765
766 fn add_breakpoint(&mut self, session_id: &str, breakpoint: BreakpointConfig) {
767 if let Some(sub) = self.sessions.get_mut(session_id) {
768 sub.breakpoints.insert(breakpoint.id.clone(), breakpoint);
769 }
770 }
771
772 fn remove_breakpoint(&mut self, session_id: &str, breakpoint_id: &str) {
773 if let Some(sub) = self.sessions.get_mut(session_id) {
774 sub.breakpoints.remove(breakpoint_id);
775 }
776 }
777
778 fn get_breakpoints(&self, session_id: &str) -> Vec<BreakpointConfig> {
779 self.sessions
780 .get(session_id)
781 .map_or_else(Vec::new, |s| s.breakpoints.values().cloned().collect())
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use super::*;
788 use crate::debug_event_store::DebugEventStore;
789 use std::sync::{Arc, Mutex};
790
791 fn make_handler_with_net() -> (DebugProtocolHandler, Arc<DebugEventStore>) {
792 use libpetri_core::input::one;
793 use libpetri_core::output::out_place;
794 use libpetri_core::place::Place;
795 use libpetri_core::transition::Transition;
796
797 let p1 = Place::<i32>::new("p1");
798 let p2 = Place::<i32>::new("p2");
799 let t = Transition::builder("t1")
800 .input(one(&p1))
801 .output(out_place(&p2))
802 .build();
803 let net = libpetri_core::petri_net::PetriNet::builder("test")
804 .transition(t)
805 .build();
806
807 let mut registry = DebugSessionRegistry::new();
808 let store = registry.register("s1".into(), &net);
809 let handler = DebugProtocolHandler::new(registry);
810 (handler, store)
811 }
812
813 fn collector_sink() -> (Box<dyn ResponseSink>, Arc<Mutex<Vec<DebugResponse>>>) {
814 let collected = Arc::new(Mutex::new(Vec::new()));
815 let collected_clone = Arc::clone(&collected);
816 let sink: Box<dyn ResponseSink> = Box::new(move |resp: DebugResponse| {
817 collected_clone.lock().unwrap().push(resp);
818 });
819 (sink, collected)
820 }
821
822 #[test]
823 fn list_sessions() {
824 let (mut handler, _store) = make_handler_with_net();
825 let (sink, collected) = collector_sink();
826 handler.client_connected("c1".into(), sink);
827
828 handler.handle_command(
829 "c1",
830 DebugCommand::ListSessions {
831 limit: None,
832 active_only: None,
833 },
834 );
835
836 let responses = collected.lock().unwrap();
837 assert_eq!(responses.len(), 1);
838 match &responses[0] {
839 DebugResponse::SessionList { sessions } => {
840 assert_eq!(sessions.len(), 1);
841 assert_eq!(sessions[0].net_name, "test");
842 }
843 _ => panic!("expected SessionList"),
844 }
845 }
846
847 #[test]
848 fn subscribe_and_unsubscribe() {
849 let (mut handler, _store) = make_handler_with_net();
850 let (sink, collected) = collector_sink();
851 handler.client_connected("c1".into(), sink);
852
853 handler.handle_command(
854 "c1",
855 DebugCommand::Subscribe {
856 session_id: "s1".into(),
857 mode: crate::debug_command::SubscriptionMode::Live,
858 from_index: None,
859 },
860 );
861
862 {
863 let responses = collected.lock().unwrap();
864 assert!(responses.len() >= 1);
865 match &responses[0] {
866 DebugResponse::Subscribed {
867 session_id,
868 net_name,
869 ..
870 } => {
871 assert_eq!(session_id, "s1");
872 assert_eq!(net_name, "test");
873 }
874 _ => panic!("expected Subscribed"),
875 }
876 }
877
878 handler.handle_command(
879 "c1",
880 DebugCommand::Unsubscribe {
881 session_id: "s1".into(),
882 },
883 );
884
885 let responses = collected.lock().unwrap();
886 let last = responses.last().unwrap();
887 match last {
888 DebugResponse::Unsubscribed { session_id } => {
889 assert_eq!(session_id, "s1");
890 }
891 _ => panic!("expected Unsubscribed"),
892 }
893 }
894
895 #[test]
896 fn subscribe_to_nonexistent_session() {
897 let (mut handler, _store) = make_handler_with_net();
898 let (sink, collected) = collector_sink();
899 handler.client_connected("c1".into(), sink);
900
901 handler.handle_command(
902 "c1",
903 DebugCommand::Subscribe {
904 session_id: "nonexistent".into(),
905 mode: crate::debug_command::SubscriptionMode::Live,
906 from_index: None,
907 },
908 );
909
910 let responses = collected.lock().unwrap();
911 match &responses[0] {
912 DebugResponse::Error { code, .. } => assert_eq!(code, "COMMAND_ERROR"),
913 _ => panic!("expected Error"),
914 }
915 }
916
917 #[test]
918 fn pause_and_resume() {
919 let (mut handler, _store) = make_handler_with_net();
920 let (sink, collected) = collector_sink();
921 handler.client_connected("c1".into(), sink);
922
923 handler.handle_command(
924 "c1",
925 DebugCommand::Subscribe {
926 session_id: "s1".into(),
927 mode: crate::debug_command::SubscriptionMode::Live,
928 from_index: None,
929 },
930 );
931
932 handler.handle_command(
933 "c1",
934 DebugCommand::Pause {
935 session_id: "s1".into(),
936 },
937 );
938
939 let responses = collected.lock().unwrap();
940 let pause_resp = responses
941 .iter()
942 .find(|r| matches!(r, DebugResponse::PlaybackStateChanged { paused: true, .. }));
943 assert!(pause_resp.is_some());
944 }
945
946 #[test]
947 fn set_and_list_breakpoints() {
948 let (mut handler, _store) = make_handler_with_net();
949 let (sink, collected) = collector_sink();
950 handler.client_connected("c1".into(), sink);
951
952 handler.handle_command(
953 "c1",
954 DebugCommand::Subscribe {
955 session_id: "s1".into(),
956 mode: crate::debug_command::SubscriptionMode::Live,
957 from_index: None,
958 },
959 );
960
961 handler.handle_command(
962 "c1",
963 DebugCommand::SetBreakpoint {
964 session_id: "s1".into(),
965 breakpoint: BreakpointConfig {
966 id: "bp1".into(),
967 bp_type: BreakpointType::TransitionStart,
968 target: Some("t1".into()),
969 enabled: true,
970 },
971 },
972 );
973
974 handler.handle_command(
975 "c1",
976 DebugCommand::ListBreakpoints {
977 session_id: "s1".into(),
978 },
979 );
980
981 let responses = collected.lock().unwrap();
982 let bp_list = responses
983 .iter()
984 .find(|r| matches!(r, DebugResponse::BreakpointList { .. }));
985 match bp_list.unwrap() {
986 DebugResponse::BreakpointList { breakpoints, .. } => {
987 assert_eq!(breakpoints.len(), 1);
988 assert_eq!(breakpoints[0].id, "bp1");
989 }
990 _ => unreachable!(),
991 }
992 }
993
994 #[test]
995 fn broadcast_event_to_subscribers() {
996 let (mut handler, store) = make_handler_with_net();
997 let (sink, collected) = collector_sink();
998 handler.client_connected("c1".into(), sink);
999
1000 handler.handle_command(
1001 "c1",
1002 DebugCommand::Subscribe {
1003 session_id: "s1".into(),
1004 mode: crate::debug_command::SubscriptionMode::Live,
1005 from_index: None,
1006 },
1007 );
1008
1009 let event = NetEvent::TransitionStarted {
1010 transition_name: Arc::from("t1"),
1011 timestamp: 1000,
1012 };
1013 store.append(event.clone());
1014 handler.broadcast_event("s1", &event);
1015
1016 let responses = collected.lock().unwrap();
1017 let event_resp = responses
1018 .iter()
1019 .find(|r| matches!(r, DebugResponse::Event { .. }));
1020 assert!(event_resp.is_some());
1021 }
1022
1023 #[test]
1024 fn filter_matching() {
1025 let event = NetEvent::TransitionStarted {
1026 transition_name: Arc::from("t1"),
1027 timestamp: 0,
1028 };
1029
1030 assert!(matches_filter(&None, &event));
1032
1033 let filter = EventFilter {
1035 event_types: Some(vec!["TransitionStarted".into()]),
1036 transition_names: None,
1037 place_names: None,
1038 };
1039 assert!(matches_filter(&Some(filter), &event));
1040
1041 let filter = EventFilter {
1043 event_types: Some(vec!["TokenAdded".into()]),
1044 transition_names: None,
1045 place_names: None,
1046 };
1047 assert!(!matches_filter(&Some(filter), &event));
1048
1049 let filter = EventFilter {
1051 event_types: None,
1052 transition_names: Some(vec!["t1".into()]),
1053 place_names: None,
1054 };
1055 assert!(matches_filter(&Some(filter), &event));
1056
1057 let filter = EventFilter {
1058 event_types: None,
1059 transition_names: Some(vec!["t2".into()]),
1060 place_names: None,
1061 };
1062 assert!(!matches_filter(&Some(filter), &event));
1063 }
1064
1065 #[test]
1066 fn breakpoint_matching() {
1067 let event = NetEvent::TransitionStarted {
1068 transition_name: Arc::from("t1"),
1069 timestamp: 0,
1070 };
1071
1072 let bp = BreakpointConfig {
1073 id: "bp1".into(),
1074 bp_type: BreakpointType::TransitionStart,
1075 target: Some("t1".into()),
1076 enabled: true,
1077 };
1078 assert!(matches_breakpoint(&bp, &event));
1079
1080 let bp_disabled = BreakpointConfig {
1082 id: "bp2".into(),
1083 bp_type: BreakpointType::TransitionStart,
1084 target: Some("t1".into()),
1085 enabled: false,
1086 };
1087 assert!(!matches_breakpoint(&bp_disabled, &event));
1088
1089 let bp_wrong = BreakpointConfig {
1091 id: "bp3".into(),
1092 bp_type: BreakpointType::TransitionStart,
1093 target: Some("t2".into()),
1094 enabled: true,
1095 };
1096 assert!(!matches_breakpoint(&bp_wrong, &event));
1097
1098 let bp_wild = BreakpointConfig {
1100 id: "bp4".into(),
1101 bp_type: BreakpointType::TransitionStart,
1102 target: None,
1103 enabled: true,
1104 };
1105 assert!(matches_breakpoint(&bp_wild, &event));
1106 }
1107
1108 #[test]
1109 fn client_disconnect_cleanup() {
1110 let (mut handler, _store) = make_handler_with_net();
1111 let (sink, _collected) = collector_sink();
1112 handler.client_connected("c1".into(), sink);
1113 handler.client_disconnected("c1");
1114 assert!(handler.clients.is_empty());
1115 }
1116
1117 #[test]
1118 fn step_forward_and_backward() {
1119 let (mut handler, store) = make_handler_with_net();
1120 let (sink, collected) = collector_sink();
1121 handler.client_connected("c1".into(), sink);
1122
1123 for i in 0..5 {
1125 store.append(NetEvent::TokenAdded {
1126 place_name: Arc::from("p1"),
1127 timestamp: i,
1128 });
1129 }
1130
1131 handler.handle_command(
1132 "c1",
1133 DebugCommand::Subscribe {
1134 session_id: "s1".into(),
1135 mode: crate::debug_command::SubscriptionMode::Replay,
1136 from_index: Some(0),
1137 },
1138 );
1139
1140 handler.handle_command(
1142 "c1",
1143 DebugCommand::StepForward {
1144 session_id: "s1".into(),
1145 },
1146 );
1147
1148 handler.handle_command(
1150 "c1",
1151 DebugCommand::StepBackward {
1152 session_id: "s1".into(),
1153 },
1154 );
1155
1156 let responses = collected.lock().unwrap();
1157 assert!(responses.len() >= 3); }
1159}