Skip to main content

libpetri_debug/
debug_protocol_handler.rs

1//! Framework-agnostic handler for the Petri net debug protocol.
2//!
3//! Manages debug subscriptions, event filtering, breakpoints, and replay
4//! for connected clients. Decoupled from any specific WebSocket framework
5//! via the [`ResponseSink`] trait.
6
7use std::collections::HashMap;
8
9use libpetri_event::net_event::NetEvent;
10
11use crate::debug_command::{BreakpointConfig, BreakpointType, DebugCommand, EventFilter};
12use crate::debug_response::{DebugResponse, NetEventInfo, SessionSummary};
13use crate::debug_session_registry::{DebugSession, DebugSessionRegistry, build_net_structure};
14use crate::marking_cache::{MarkingCache, compute_state};
15use crate::net_event_converter::{
16    event_type_name, extract_place_name, extract_transition_name, to_event_info,
17};
18
19/// Callback for sending responses to a connected client.
20pub trait ResponseSink: Send + Sync {
21    fn send(&self, response: DebugResponse);
22}
23
24/// Blanket impl for closures.
25impl<F: Fn(DebugResponse) + Send + Sync> ResponseSink for F {
26    fn send(&self, response: DebugResponse) {
27        self(response);
28    }
29}
30
31/// Maximum events per batch when sending historical events.
32const BATCH_SIZE: usize = 500;
33
34/// Debug protocol handler managing client connections and command dispatch.
35pub struct DebugProtocolHandler {
36    session_registry: DebugSessionRegistry,
37    clients: HashMap<String, ClientState>,
38}
39
40struct ClientState {
41    sink: Box<dyn ResponseSink>,
42    subscriptions: SubscriptionState,
43}
44
45impl DebugProtocolHandler {
46    /// Creates a new protocol handler.
47    pub fn new(session_registry: DebugSessionRegistry) -> Self {
48        Self {
49            session_registry,
50            clients: HashMap::new(),
51        }
52    }
53
54    /// Returns a reference to the session registry.
55    pub fn session_registry(&self) -> &DebugSessionRegistry {
56        &self.session_registry
57    }
58
59    /// Returns a mutable reference to the session registry.
60    pub fn session_registry_mut(&mut self) -> &mut DebugSessionRegistry {
61        &mut self.session_registry
62    }
63
64    /// Registers a new client connection.
65    pub fn client_connected(&mut self, client_id: String, sink: Box<dyn ResponseSink>) {
66        self.clients.insert(
67            client_id,
68            ClientState {
69                sink,
70                subscriptions: SubscriptionState::new(),
71            },
72        );
73    }
74
75    /// Cleans up when a client disconnects.
76    pub fn client_disconnected(&mut self, client_id: &str) {
77        self.clients.remove(client_id);
78    }
79
80    /// Handles a command from a connected client.
81    pub fn handle_command(&mut self, client_id: &str, command: DebugCommand) {
82        if !self.clients.contains_key(client_id) {
83            return;
84        }
85
86        let result = match command {
87            DebugCommand::ListSessions { limit, active_only } => {
88                self.handle_list_sessions(client_id, limit, active_only)
89            }
90            DebugCommand::Subscribe {
91                session_id,
92                mode,
93                from_index,
94            } => self.handle_subscribe(client_id, session_id, mode, from_index),
95            DebugCommand::Unsubscribe { session_id } => {
96                self.handle_unsubscribe(client_id, session_id)
97            }
98            DebugCommand::Seek {
99                session_id,
100                timestamp,
101            } => self.handle_seek(client_id, session_id, timestamp),
102            DebugCommand::PlaybackSpeed { session_id, speed } => {
103                self.handle_playback_speed(client_id, session_id, speed)
104            }
105            DebugCommand::Filter { session_id, filter } => {
106                self.handle_set_filter(client_id, session_id, filter)
107            }
108            DebugCommand::Pause { session_id } => self.handle_pause(client_id, session_id),
109            DebugCommand::Resume { session_id } => self.handle_resume(client_id, session_id),
110            DebugCommand::StepForward { session_id } => {
111                self.handle_step_forward(client_id, session_id)
112            }
113            DebugCommand::StepBackward { session_id } => {
114                self.handle_step_backward(client_id, session_id)
115            }
116            DebugCommand::SetBreakpoint {
117                session_id,
118                breakpoint,
119            } => self.handle_set_breakpoint(client_id, session_id, breakpoint),
120            DebugCommand::ClearBreakpoint {
121                session_id,
122                breakpoint_id,
123            } => self.handle_clear_breakpoint(client_id, session_id, breakpoint_id),
124            DebugCommand::ListBreakpoints { session_id } => {
125                self.handle_list_breakpoints(client_id, session_id)
126            }
127            DebugCommand::ListArchives { .. }
128            | DebugCommand::ImportArchive { .. }
129            | DebugCommand::UploadArchive { .. } => {
130                // Archive commands not yet implemented
131                Ok(())
132            }
133        };
134
135        if let Err(e) = result {
136            if let Some(client) = self.clients.get(client_id) {
137                send(
138                    &*client.sink,
139                    DebugResponse::Error {
140                        code: "COMMAND_ERROR".into(),
141                        message: e,
142                        session_id: None,
143                    },
144                );
145            }
146        }
147    }
148
149    /// Delivers a live event to all subscribed clients for the given session.
150    pub fn broadcast_event(&mut self, session_id: &str, event: &NetEvent) {
151        let event_info = to_event_info(event);
152
153        // Collect client IDs to avoid borrow issues
154        let client_ids: Vec<String> = self.clients.keys().cloned().collect();
155
156        for client_id in client_ids {
157            let client = self.clients.get_mut(&client_id).unwrap();
158            let Some(sub) = client.subscriptions.sessions.get_mut(session_id) else {
159                continue;
160            };
161
162            if sub.paused {
163                continue;
164            }
165
166            if !matches_filter(&sub.filter, event) {
167                sub.event_index += 1;
168                continue;
169            }
170
171            // Check breakpoints
172            let hit_bp = check_breakpoints(&sub.breakpoints, event);
173            let idx = sub.event_index;
174            sub.event_index += 1;
175
176            if let Some(bp) = hit_bp {
177                sub.paused = true;
178                send(
179                    &*client.sink,
180                    DebugResponse::BreakpointHit {
181                        session_id: session_id.to_string(),
182                        breakpoint_id: bp.id.clone(),
183                        event: event_info.clone(),
184                        event_index: idx,
185                    },
186                );
187            }
188
189            send(
190                &*client.sink,
191                DebugResponse::Event {
192                    session_id: session_id.to_string(),
193                    index: idx,
194                    event: event_info.clone(),
195                },
196            );
197        }
198    }
199
200    // ======================== Command Handlers ========================
201
202    fn handle_list_sessions(
203        &self,
204        client_id: &str,
205        limit: Option<usize>,
206        active_only: Option<bool>,
207    ) -> Result<(), String> {
208        let limit = limit.unwrap_or(50);
209        let sessions = if active_only.unwrap_or(false) {
210            self.session_registry.list_active_sessions(limit)
211        } else {
212            self.session_registry.list_sessions(limit)
213        };
214
215        let summaries: Vec<SessionSummary> = sessions.iter().map(|s| session_summary(s)).collect();
216
217        send_to(
218            &self.clients,
219            client_id,
220            DebugResponse::SessionList {
221                sessions: summaries,
222            },
223        );
224        Ok(())
225    }
226
227    fn handle_subscribe(
228        &mut self,
229        client_id: &str,
230        session_id: String,
231        mode: crate::debug_command::SubscriptionMode,
232        from_index: Option<usize>,
233    ) -> Result<(), String> {
234        let session = self
235            .session_registry
236            .get_session(&session_id)
237            .ok_or_else(|| format!("Session not found: {session_id}"))?;
238
239        let events = session.event_store.events();
240        let computed = compute_state(&events);
241        let structure = build_net_structure(session);
242        let from_index = from_index.unwrap_or(0);
243
244        let mode_str = match mode {
245            crate::debug_command::SubscriptionMode::Live => "live",
246            crate::debug_command::SubscriptionMode::Replay => "replay",
247        };
248
249        let current_marking = computed
250            .marking
251            .iter()
252            .map(|(k, v)| (k.clone(), v.clone()))
253            .collect();
254
255        let client = self.clients.get(client_id).unwrap();
256        send(
257            &*client.sink,
258            DebugResponse::Subscribed {
259                session_id: session_id.clone(),
260                net_name: session.net_name.clone(),
261                dot_diagram: session.dot_diagram.clone(),
262                structure,
263                current_marking,
264                enabled_transitions: computed.enabled_transitions.clone(),
265                in_flight_transitions: computed.in_flight_transitions.clone(),
266                event_count: session.event_store.event_count(),
267                mode: mode_str.into(),
268            },
269        );
270
271        // Send historical events
272        let historical = session.event_store.events_from(from_index);
273        let converted: Vec<NetEventInfo> = historical.iter().map(|e| to_event_info(e)).collect();
274        send_in_batches(
275            &self.clients,
276            client_id,
277            &session_id,
278            from_index,
279            &converted,
280        );
281
282        let event_index = from_index + historical.len();
283        let paused = matches!(mode, crate::debug_command::SubscriptionMode::Replay);
284
285        let client = self.clients.get_mut(client_id).unwrap();
286        client
287            .subscriptions
288            .add_subscription(session_id, event_index, paused);
289
290        Ok(())
291    }
292
293    fn handle_unsubscribe(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
294        if let Some(client) = self.clients.get_mut(client_id) {
295            client.subscriptions.cancel(&session_id);
296        }
297        send_to(
298            &self.clients,
299            client_id,
300            DebugResponse::Unsubscribed { session_id },
301        );
302        Ok(())
303    }
304
305    fn handle_seek(
306        &mut self,
307        client_id: &str,
308        session_id: String,
309        timestamp: String,
310    ) -> Result<(), String> {
311        let session = self
312            .session_registry
313            .get_session(&session_id)
314            .ok_or("Session not found")?;
315
316        let events = session.event_store.events();
317        let target_ts: u64 = timestamp.parse().unwrap_or(0);
318
319        let mut target_index = events.len();
320        for (i, e) in events.iter().enumerate() {
321            if e.timestamp() >= target_ts {
322                target_index = i;
323                break;
324            }
325        }
326
327        let client = self.clients.get_mut(client_id).unwrap();
328        client
329            .subscriptions
330            .set_event_index(&session_id, target_index);
331        let computed = client
332            .subscriptions
333            .compute_state_at(&events, &session_id, target_index);
334
335        send(
336            &*client.sink,
337            DebugResponse::MarkingSnapshot {
338                session_id,
339                marking: computed.marking,
340                enabled_transitions: computed.enabled_transitions,
341                in_flight_transitions: computed.in_flight_transitions,
342            },
343        );
344        Ok(())
345    }
346
347    fn handle_playback_speed(
348        &mut self,
349        client_id: &str,
350        session_id: String,
351        speed: f64,
352    ) -> Result<(), String> {
353        let client = self.clients.get_mut(client_id).unwrap();
354        client.subscriptions.set_speed(&session_id, speed);
355        let paused = client.subscriptions.is_paused(&session_id);
356        let current_index = client.subscriptions.get_event_index(&session_id);
357        send(
358            &*client.sink,
359            DebugResponse::PlaybackStateChanged {
360                session_id,
361                paused,
362                speed,
363                current_index,
364            },
365        );
366        Ok(())
367    }
368
369    fn handle_set_filter(
370        &mut self,
371        client_id: &str,
372        session_id: String,
373        filter: EventFilter,
374    ) -> Result<(), String> {
375        let client = self.clients.get_mut(client_id).unwrap();
376        client.subscriptions.set_filter(&session_id, filter.clone());
377        send(
378            &*client.sink,
379            DebugResponse::FilterApplied { session_id, filter },
380        );
381        Ok(())
382    }
383
384    fn handle_pause(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
385        let client = self.clients.get_mut(client_id).unwrap();
386        client.subscriptions.set_paused(&session_id, true);
387        let speed = client.subscriptions.get_speed(&session_id);
388        let current_index = client.subscriptions.get_event_index(&session_id);
389        send(
390            &*client.sink,
391            DebugResponse::PlaybackStateChanged {
392                session_id,
393                paused: true,
394                speed,
395                current_index,
396            },
397        );
398        Ok(())
399    }
400
401    fn handle_resume(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
402        let client = self.clients.get_mut(client_id).unwrap();
403        client.subscriptions.set_paused(&session_id, false);
404        let speed = client.subscriptions.get_speed(&session_id);
405        let current_index = client.subscriptions.get_event_index(&session_id);
406        send(
407            &*client.sink,
408            DebugResponse::PlaybackStateChanged {
409                session_id,
410                paused: false,
411                speed,
412                current_index,
413            },
414        );
415        Ok(())
416    }
417
418    fn handle_step_forward(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
419        let session = self
420            .session_registry
421            .get_session(&session_id)
422            .ok_or("Session not found")?;
423
424        let events = session.event_store.events();
425        let client = self.clients.get_mut(client_id).unwrap();
426        let current_index = client.subscriptions.get_event_index(&session_id);
427
428        if current_index < events.len() {
429            let event_info = to_event_info(&events[current_index]);
430            send(
431                &*client.sink,
432                DebugResponse::Event {
433                    session_id: session_id.clone(),
434                    index: current_index,
435                    event: event_info,
436                },
437            );
438            client
439                .subscriptions
440                .set_event_index(&session_id, current_index + 1);
441        }
442        Ok(())
443    }
444
445    fn handle_step_backward(&mut self, client_id: &str, session_id: String) -> Result<(), String> {
446        let session = self
447            .session_registry
448            .get_session(&session_id)
449            .ok_or("Session not found")?;
450
451        let events = session.event_store.events();
452        let client = self.clients.get_mut(client_id).unwrap();
453        let current_index = client.subscriptions.get_event_index(&session_id);
454
455        if current_index > 0 {
456            let new_index = current_index - 1;
457            client.subscriptions.set_event_index(&session_id, new_index);
458            let computed = client
459                .subscriptions
460                .compute_state_at(&events, &session_id, new_index);
461
462            send(
463                &*client.sink,
464                DebugResponse::MarkingSnapshot {
465                    session_id,
466                    marking: computed.marking,
467                    enabled_transitions: computed.enabled_transitions,
468                    in_flight_transitions: computed.in_flight_transitions,
469                },
470            );
471        }
472        Ok(())
473    }
474
475    fn handle_set_breakpoint(
476        &mut self,
477        client_id: &str,
478        session_id: String,
479        breakpoint: BreakpointConfig,
480    ) -> Result<(), String> {
481        let client = self.clients.get_mut(client_id).unwrap();
482        client
483            .subscriptions
484            .add_breakpoint(&session_id, breakpoint.clone());
485        send(
486            &*client.sink,
487            DebugResponse::BreakpointSet {
488                session_id,
489                breakpoint,
490            },
491        );
492        Ok(())
493    }
494
495    fn handle_clear_breakpoint(
496        &mut self,
497        client_id: &str,
498        session_id: String,
499        breakpoint_id: String,
500    ) -> Result<(), String> {
501        let client = self.clients.get_mut(client_id).unwrap();
502        client
503            .subscriptions
504            .remove_breakpoint(&session_id, &breakpoint_id);
505        send(
506            &*client.sink,
507            DebugResponse::BreakpointCleared {
508                session_id,
509                breakpoint_id,
510            },
511        );
512        Ok(())
513    }
514
515    fn handle_list_breakpoints(&self, client_id: &str, session_id: String) -> Result<(), String> {
516        let client = self.clients.get(client_id).unwrap();
517        let breakpoints = client.subscriptions.get_breakpoints(&session_id);
518        send(
519            &*client.sink,
520            DebugResponse::BreakpointList {
521                session_id,
522                breakpoints,
523            },
524        );
525        Ok(())
526    }
527}
528
529// ======================== Helper Functions ========================
530
531fn send(sink: &dyn ResponseSink, response: DebugResponse) {
532    sink.send(response);
533}
534
535fn send_to(clients: &HashMap<String, ClientState>, client_id: &str, response: DebugResponse) {
536    if let Some(client) = clients.get(client_id) {
537        send(&*client.sink, response);
538    }
539}
540
541fn send_in_batches(
542    clients: &HashMap<String, ClientState>,
543    client_id: &str,
544    session_id: &str,
545    start_index: usize,
546    events: &[NetEventInfo],
547) {
548    let Some(client) = clients.get(client_id) else {
549        return;
550    };
551
552    if events.is_empty() {
553        send(
554            &*client.sink,
555            DebugResponse::EventBatch {
556                session_id: session_id.to_string(),
557                start_index,
558                events: vec![],
559                has_more: false,
560            },
561        );
562        return;
563    }
564
565    for (i, chunk) in events.chunks(BATCH_SIZE).enumerate() {
566        let chunk_start = start_index + i * BATCH_SIZE;
567        let has_more = chunk_start + chunk.len() < start_index + events.len();
568        send(
569            &*client.sink,
570            DebugResponse::EventBatch {
571                session_id: session_id.to_string(),
572                start_index: chunk_start,
573                events: chunk.to_vec(),
574                has_more,
575            },
576        );
577    }
578}
579
580fn session_summary(session: &DebugSession) -> SessionSummary {
581    SessionSummary {
582        session_id: session.session_id.clone(),
583        net_name: session.net_name.clone(),
584        start_time: session.start_time.to_string(),
585        active: session.active,
586        event_count: session.event_store.event_count(),
587    }
588}
589
590fn matches_filter(filter: &Option<EventFilter>, event: &NetEvent) -> bool {
591    let Some(filter) = filter else { return true };
592
593    if let Some(ref types) = filter.event_types {
594        if !types.is_empty() {
595            let name = event_type_name(event);
596            if !types.iter().any(|t| t == name) {
597                return false;
598            }
599        }
600    }
601
602    if let Some(ref names) = filter.transition_names {
603        if !names.is_empty() {
604            let t_name = extract_transition_name(event);
605            match t_name {
606                Some(n) => {
607                    if !names.iter().any(|t| t == n) {
608                        return false;
609                    }
610                }
611                None => return false,
612            }
613        }
614    }
615
616    if let Some(ref names) = filter.place_names {
617        if !names.is_empty() {
618            let p_name = extract_place_name(event);
619            match p_name {
620                Some(n) => {
621                    if !names.iter().any(|t| t == n) {
622                        return false;
623                    }
624                }
625                None => return false,
626            }
627        }
628    }
629
630    true
631}
632
633fn matches_breakpoint(bp: &BreakpointConfig, event: &NetEvent) -> bool {
634    if !bp.enabled {
635        return false;
636    }
637    match bp.bp_type {
638        BreakpointType::TransitionEnabled => {
639            matches!(event, NetEvent::TransitionEnabled { transition_name, .. }
640                if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
641        }
642        BreakpointType::TransitionStart => {
643            matches!(event, NetEvent::TransitionStarted { transition_name, .. }
644                if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
645        }
646        BreakpointType::TransitionComplete => {
647            matches!(event, NetEvent::TransitionCompleted { transition_name, .. }
648                if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
649        }
650        BreakpointType::TransitionFail => {
651            matches!(event, NetEvent::TransitionFailed { transition_name, .. }
652                if bp.target.as_ref().is_none_or(|t| t == transition_name.as_ref()))
653        }
654        BreakpointType::TokenAdded => {
655            matches!(event, NetEvent::TokenAdded { place_name, .. }
656                if bp.target.as_ref().is_none_or(|t| t == place_name.as_ref()))
657        }
658        BreakpointType::TokenRemoved => {
659            matches!(event, NetEvent::TokenRemoved { place_name, .. }
660                if bp.target.as_ref().is_none_or(|t| t == place_name.as_ref()))
661        }
662    }
663}
664
665fn check_breakpoints(
666    breakpoints: &HashMap<String, BreakpointConfig>,
667    event: &NetEvent,
668) -> Option<BreakpointConfig> {
669    for bp in breakpoints.values() {
670        if matches_breakpoint(bp, event) {
671            return Some(bp.clone());
672        }
673    }
674    None
675}
676
677// ======================== Subscription State ========================
678
679struct SessionSubscription {
680    event_index: usize,
681    marking_cache: MarkingCache,
682    breakpoints: HashMap<String, BreakpointConfig>,
683    paused: bool,
684    speed: f64,
685    filter: Option<EventFilter>,
686}
687
688struct SubscriptionState {
689    sessions: HashMap<String, SessionSubscription>,
690}
691
692impl SubscriptionState {
693    fn new() -> Self {
694        Self {
695            sessions: HashMap::new(),
696        }
697    }
698
699    fn add_subscription(&mut self, session_id: String, event_index: usize, paused: bool) {
700        self.sessions.insert(
701            session_id,
702            SessionSubscription {
703                event_index,
704                marking_cache: MarkingCache::new(),
705                breakpoints: HashMap::new(),
706                paused,
707                speed: 1.0,
708                filter: None,
709            },
710        );
711    }
712
713    fn cancel(&mut self, session_id: &str) {
714        self.sessions.remove(session_id);
715    }
716
717    fn is_paused(&self, session_id: &str) -> bool {
718        self.sessions.get(session_id).is_some_and(|s| s.paused)
719    }
720
721    fn set_paused(&mut self, session_id: &str, paused: bool) {
722        if let Some(sub) = self.sessions.get_mut(session_id) {
723            sub.paused = paused;
724        }
725    }
726
727    fn get_speed(&self, session_id: &str) -> f64 {
728        self.sessions.get(session_id).map_or(1.0, |s| s.speed)
729    }
730
731    fn set_speed(&mut self, session_id: &str, speed: f64) {
732        if let Some(sub) = self.sessions.get_mut(session_id) {
733            sub.speed = speed;
734        }
735    }
736
737    fn get_event_index(&self, session_id: &str) -> usize {
738        self.sessions.get(session_id).map_or(0, |s| s.event_index)
739    }
740
741    fn set_event_index(&mut self, session_id: &str, index: usize) {
742        if let Some(sub) = self.sessions.get_mut(session_id) {
743            sub.event_index = index;
744        }
745    }
746
747    fn compute_state_at(
748        &mut self,
749        events: &[NetEvent],
750        session_id: &str,
751        target_index: usize,
752    ) -> crate::marking_cache::ComputedState {
753        if let Some(sub) = self.sessions.get_mut(session_id) {
754            sub.marking_cache.compute_at(events, target_index)
755        } else {
756            compute_state(&events[..target_index.min(events.len())])
757        }
758    }
759
760    fn set_filter(&mut self, session_id: &str, filter: EventFilter) {
761        if let Some(sub) = self.sessions.get_mut(session_id) {
762            sub.filter = Some(filter);
763        }
764    }
765
766    fn add_breakpoint(&mut self, session_id: &str, breakpoint: BreakpointConfig) {
767        if let Some(sub) = self.sessions.get_mut(session_id) {
768            sub.breakpoints.insert(breakpoint.id.clone(), breakpoint);
769        }
770    }
771
772    fn remove_breakpoint(&mut self, session_id: &str, breakpoint_id: &str) {
773        if let Some(sub) = self.sessions.get_mut(session_id) {
774            sub.breakpoints.remove(breakpoint_id);
775        }
776    }
777
778    fn get_breakpoints(&self, session_id: &str) -> Vec<BreakpointConfig> {
779        self.sessions
780            .get(session_id)
781            .map_or_else(Vec::new, |s| s.breakpoints.values().cloned().collect())
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788    use crate::debug_event_store::DebugEventStore;
789    use std::sync::{Arc, Mutex};
790
791    fn make_handler_with_net() -> (DebugProtocolHandler, Arc<DebugEventStore>) {
792        use libpetri_core::input::one;
793        use libpetri_core::output::out_place;
794        use libpetri_core::place::Place;
795        use libpetri_core::transition::Transition;
796
797        let p1 = Place::<i32>::new("p1");
798        let p2 = Place::<i32>::new("p2");
799        let t = Transition::builder("t1")
800            .input(one(&p1))
801            .output(out_place(&p2))
802            .build();
803        let net = libpetri_core::petri_net::PetriNet::builder("test")
804            .transition(t)
805            .build();
806
807        let mut registry = DebugSessionRegistry::new();
808        let store = registry.register("s1".into(), &net);
809        let handler = DebugProtocolHandler::new(registry);
810        (handler, store)
811    }
812
813    fn collector_sink() -> (Box<dyn ResponseSink>, Arc<Mutex<Vec<DebugResponse>>>) {
814        let collected = Arc::new(Mutex::new(Vec::new()));
815        let collected_clone = Arc::clone(&collected);
816        let sink: Box<dyn ResponseSink> = Box::new(move |resp: DebugResponse| {
817            collected_clone.lock().unwrap().push(resp);
818        });
819        (sink, collected)
820    }
821
822    #[test]
823    fn list_sessions() {
824        let (mut handler, _store) = make_handler_with_net();
825        let (sink, collected) = collector_sink();
826        handler.client_connected("c1".into(), sink);
827
828        handler.handle_command(
829            "c1",
830            DebugCommand::ListSessions {
831                limit: None,
832                active_only: None,
833            },
834        );
835
836        let responses = collected.lock().unwrap();
837        assert_eq!(responses.len(), 1);
838        match &responses[0] {
839            DebugResponse::SessionList { sessions } => {
840                assert_eq!(sessions.len(), 1);
841                assert_eq!(sessions[0].net_name, "test");
842            }
843            _ => panic!("expected SessionList"),
844        }
845    }
846
847    #[test]
848    fn subscribe_and_unsubscribe() {
849        let (mut handler, _store) = make_handler_with_net();
850        let (sink, collected) = collector_sink();
851        handler.client_connected("c1".into(), sink);
852
853        handler.handle_command(
854            "c1",
855            DebugCommand::Subscribe {
856                session_id: "s1".into(),
857                mode: crate::debug_command::SubscriptionMode::Live,
858                from_index: None,
859            },
860        );
861
862        {
863            let responses = collected.lock().unwrap();
864            assert!(responses.len() >= 1);
865            match &responses[0] {
866                DebugResponse::Subscribed {
867                    session_id,
868                    net_name,
869                    ..
870                } => {
871                    assert_eq!(session_id, "s1");
872                    assert_eq!(net_name, "test");
873                }
874                _ => panic!("expected Subscribed"),
875            }
876        }
877
878        handler.handle_command(
879            "c1",
880            DebugCommand::Unsubscribe {
881                session_id: "s1".into(),
882            },
883        );
884
885        let responses = collected.lock().unwrap();
886        let last = responses.last().unwrap();
887        match last {
888            DebugResponse::Unsubscribed { session_id } => {
889                assert_eq!(session_id, "s1");
890            }
891            _ => panic!("expected Unsubscribed"),
892        }
893    }
894
895    #[test]
896    fn subscribe_to_nonexistent_session() {
897        let (mut handler, _store) = make_handler_with_net();
898        let (sink, collected) = collector_sink();
899        handler.client_connected("c1".into(), sink);
900
901        handler.handle_command(
902            "c1",
903            DebugCommand::Subscribe {
904                session_id: "nonexistent".into(),
905                mode: crate::debug_command::SubscriptionMode::Live,
906                from_index: None,
907            },
908        );
909
910        let responses = collected.lock().unwrap();
911        match &responses[0] {
912            DebugResponse::Error { code, .. } => assert_eq!(code, "COMMAND_ERROR"),
913            _ => panic!("expected Error"),
914        }
915    }
916
917    #[test]
918    fn pause_and_resume() {
919        let (mut handler, _store) = make_handler_with_net();
920        let (sink, collected) = collector_sink();
921        handler.client_connected("c1".into(), sink);
922
923        handler.handle_command(
924            "c1",
925            DebugCommand::Subscribe {
926                session_id: "s1".into(),
927                mode: crate::debug_command::SubscriptionMode::Live,
928                from_index: None,
929            },
930        );
931
932        handler.handle_command(
933            "c1",
934            DebugCommand::Pause {
935                session_id: "s1".into(),
936            },
937        );
938
939        let responses = collected.lock().unwrap();
940        let pause_resp = responses
941            .iter()
942            .find(|r| matches!(r, DebugResponse::PlaybackStateChanged { paused: true, .. }));
943        assert!(pause_resp.is_some());
944    }
945
946    #[test]
947    fn set_and_list_breakpoints() {
948        let (mut handler, _store) = make_handler_with_net();
949        let (sink, collected) = collector_sink();
950        handler.client_connected("c1".into(), sink);
951
952        handler.handle_command(
953            "c1",
954            DebugCommand::Subscribe {
955                session_id: "s1".into(),
956                mode: crate::debug_command::SubscriptionMode::Live,
957                from_index: None,
958            },
959        );
960
961        handler.handle_command(
962            "c1",
963            DebugCommand::SetBreakpoint {
964                session_id: "s1".into(),
965                breakpoint: BreakpointConfig {
966                    id: "bp1".into(),
967                    bp_type: BreakpointType::TransitionStart,
968                    target: Some("t1".into()),
969                    enabled: true,
970                },
971            },
972        );
973
974        handler.handle_command(
975            "c1",
976            DebugCommand::ListBreakpoints {
977                session_id: "s1".into(),
978            },
979        );
980
981        let responses = collected.lock().unwrap();
982        let bp_list = responses
983            .iter()
984            .find(|r| matches!(r, DebugResponse::BreakpointList { .. }));
985        match bp_list.unwrap() {
986            DebugResponse::BreakpointList { breakpoints, .. } => {
987                assert_eq!(breakpoints.len(), 1);
988                assert_eq!(breakpoints[0].id, "bp1");
989            }
990            _ => unreachable!(),
991        }
992    }
993
994    #[test]
995    fn broadcast_event_to_subscribers() {
996        let (mut handler, store) = make_handler_with_net();
997        let (sink, collected) = collector_sink();
998        handler.client_connected("c1".into(), sink);
999
1000        handler.handle_command(
1001            "c1",
1002            DebugCommand::Subscribe {
1003                session_id: "s1".into(),
1004                mode: crate::debug_command::SubscriptionMode::Live,
1005                from_index: None,
1006            },
1007        );
1008
1009        let event = NetEvent::TransitionStarted {
1010            transition_name: Arc::from("t1"),
1011            timestamp: 1000,
1012        };
1013        store.append(event.clone());
1014        handler.broadcast_event("s1", &event);
1015
1016        let responses = collected.lock().unwrap();
1017        let event_resp = responses
1018            .iter()
1019            .find(|r| matches!(r, DebugResponse::Event { .. }));
1020        assert!(event_resp.is_some());
1021    }
1022
1023    #[test]
1024    fn filter_matching() {
1025        let event = NetEvent::TransitionStarted {
1026            transition_name: Arc::from("t1"),
1027            timestamp: 0,
1028        };
1029
1030        // No filter — matches all
1031        assert!(matches_filter(&None, &event));
1032
1033        // Type filter matching
1034        let filter = EventFilter {
1035            event_types: Some(vec!["TransitionStarted".into()]),
1036            transition_names: None,
1037            place_names: None,
1038        };
1039        assert!(matches_filter(&Some(filter), &event));
1040
1041        // Type filter not matching
1042        let filter = EventFilter {
1043            event_types: Some(vec!["TokenAdded".into()]),
1044            transition_names: None,
1045            place_names: None,
1046        };
1047        assert!(!matches_filter(&Some(filter), &event));
1048
1049        // Transition name filter
1050        let filter = EventFilter {
1051            event_types: None,
1052            transition_names: Some(vec!["t1".into()]),
1053            place_names: None,
1054        };
1055        assert!(matches_filter(&Some(filter), &event));
1056
1057        let filter = EventFilter {
1058            event_types: None,
1059            transition_names: Some(vec!["t2".into()]),
1060            place_names: None,
1061        };
1062        assert!(!matches_filter(&Some(filter), &event));
1063    }
1064
1065    #[test]
1066    fn breakpoint_matching() {
1067        let event = NetEvent::TransitionStarted {
1068            transition_name: Arc::from("t1"),
1069            timestamp: 0,
1070        };
1071
1072        let bp = BreakpointConfig {
1073            id: "bp1".into(),
1074            bp_type: BreakpointType::TransitionStart,
1075            target: Some("t1".into()),
1076            enabled: true,
1077        };
1078        assert!(matches_breakpoint(&bp, &event));
1079
1080        // Disabled breakpoint
1081        let bp_disabled = BreakpointConfig {
1082            id: "bp2".into(),
1083            bp_type: BreakpointType::TransitionStart,
1084            target: Some("t1".into()),
1085            enabled: false,
1086        };
1087        assert!(!matches_breakpoint(&bp_disabled, &event));
1088
1089        // Wrong target
1090        let bp_wrong = BreakpointConfig {
1091            id: "bp3".into(),
1092            bp_type: BreakpointType::TransitionStart,
1093            target: Some("t2".into()),
1094            enabled: true,
1095        };
1096        assert!(!matches_breakpoint(&bp_wrong, &event));
1097
1098        // Wildcard (no target)
1099        let bp_wild = BreakpointConfig {
1100            id: "bp4".into(),
1101            bp_type: BreakpointType::TransitionStart,
1102            target: None,
1103            enabled: true,
1104        };
1105        assert!(matches_breakpoint(&bp_wild, &event));
1106    }
1107
1108    #[test]
1109    fn client_disconnect_cleanup() {
1110        let (mut handler, _store) = make_handler_with_net();
1111        let (sink, _collected) = collector_sink();
1112        handler.client_connected("c1".into(), sink);
1113        handler.client_disconnected("c1");
1114        assert!(handler.clients.is_empty());
1115    }
1116
1117    #[test]
1118    fn step_forward_and_backward() {
1119        let (mut handler, store) = make_handler_with_net();
1120        let (sink, collected) = collector_sink();
1121        handler.client_connected("c1".into(), sink);
1122
1123        // Add some events
1124        for i in 0..5 {
1125            store.append(NetEvent::TokenAdded {
1126                place_name: Arc::from("p1"),
1127                timestamp: i,
1128            });
1129        }
1130
1131        handler.handle_command(
1132            "c1",
1133            DebugCommand::Subscribe {
1134                session_id: "s1".into(),
1135                mode: crate::debug_command::SubscriptionMode::Replay,
1136                from_index: Some(0),
1137            },
1138        );
1139
1140        // Step forward
1141        handler.handle_command(
1142            "c1",
1143            DebugCommand::StepForward {
1144                session_id: "s1".into(),
1145            },
1146        );
1147
1148        // Step backward
1149        handler.handle_command(
1150            "c1",
1151            DebugCommand::StepBackward {
1152                session_id: "s1".into(),
1153            },
1154        );
1155
1156        let responses = collected.lock().unwrap();
1157        assert!(responses.len() >= 3); // subscribed + batch + step responses
1158    }
1159}