Skip to main content

libpetri_runtime/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::output::Out;
8use libpetri_core::petri_net::PetriNet;
9use libpetri_core::token::ErasedToken;
10
11use libpetri_event::event_store::EventStore;
12use libpetri_event::net_event::NetEvent;
13
14use crate::bitmap;
15use crate::compiled_net::CompiledNet;
16use crate::marking::Marking;
17
18/// Tolerance for deadline enforcement to account for timer jitter.
19const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21/// Bitmap-based executor for Coloured Time Petri Nets.
22///
23/// Generic over `E: EventStore` for zero-cost noop event recording.
24/// The sync path (`run_sync`) executes inline without any async runtime.
25pub struct BitmapNetExecutor<E: EventStore> {
26    compiled: CompiledNet,
27    marking: Marking,
28    event_store: E,
29    #[allow(dead_code)]
30    environment_places: HashSet<Arc<str>>,
31    has_environment_places: bool,
32
33    // Bitmaps
34    marked_places: Vec<u64>,
35    dirty_set: Vec<u64>,
36    marking_snap_buffer: Vec<u64>,
37    dirty_snap_buffer: Vec<u64>,
38    firing_snap_buffer: Vec<u64>,
39
40    // Per-transition state
41    enabled_at_ms: Vec<f64>,
42    enabled_flags: Vec<bool>,
43    has_deadline_flags: Vec<bool>,
44    enabled_transition_count: usize,
45
46    // Precomputed flags
47    all_immediate: bool,
48    all_same_priority: bool,
49    has_any_deadlines: bool,
50
51    // Pending reset places for clock-restart detection
52    pending_reset_places: HashSet<Arc<str>>,
53    transition_input_place_names: Vec<HashSet<Arc<str>>>,
54
55    start_time: Instant,
56}
57
58/// Options for constructing a BitmapNetExecutor.
59#[derive(Default)]
60pub struct ExecutorOptions {
61    pub environment_places: HashSet<Arc<str>>,
62}
63
64impl<E: EventStore> BitmapNetExecutor<E> {
65    /// Creates a new executor for the given net with initial tokens.
66    pub fn new(net: &PetriNet, initial_tokens: Marking, options: ExecutorOptions) -> Self {
67        let compiled = CompiledNet::compile(net);
68        let word_count = compiled.word_count;
69        let tc = compiled.transition_count;
70        let dirty_word_count = bitmap::word_count(tc);
71
72        let mut has_any_deadlines = false;
73        let mut all_immediate = true;
74        let mut all_same_priority = true;
75        let first_priority = if tc > 0 {
76            compiled.transition(0).priority()
77        } else {
78            0
79        };
80        let mut has_deadline_flags = vec![false; tc];
81
82        for (tid, flag) in has_deadline_flags.iter_mut().enumerate() {
83            let t = compiled.transition(tid);
84            if t.timing().has_deadline() {
85                *flag = true;
86                has_any_deadlines = true;
87            }
88            if *t.timing() != libpetri_core::timing::Timing::Immediate {
89                all_immediate = false;
90            }
91            if t.priority() != first_priority {
92                all_same_priority = false;
93            }
94        }
95
96        // Precompute input place names per transition
97        let mut transition_input_place_names = Vec::with_capacity(tc);
98        for tid in 0..tc {
99            let t = compiled.transition(tid);
100            let names: HashSet<Arc<str>> = t
101                .input_specs()
102                .iter()
103                .map(|s| Arc::clone(s.place().name_arc()))
104                .collect();
105            transition_input_place_names.push(names);
106        }
107
108        Self {
109            compiled,
110            marking: initial_tokens,
111            event_store: E::default(),
112            has_environment_places: !options.environment_places.is_empty(),
113            environment_places: options.environment_places,
114            marked_places: vec![0u64; word_count],
115            dirty_set: vec![0u64; dirty_word_count],
116            marking_snap_buffer: vec![0u64; word_count],
117            dirty_snap_buffer: vec![0u64; dirty_word_count],
118            firing_snap_buffer: vec![0u64; word_count],
119            enabled_at_ms: vec![f64::NEG_INFINITY; tc],
120            enabled_flags: vec![false; tc],
121            has_deadline_flags,
122            enabled_transition_count: 0,
123            all_immediate,
124            all_same_priority,
125            has_any_deadlines,
126            pending_reset_places: HashSet::new(),
127            transition_input_place_names,
128            start_time: Instant::now(),
129        }
130    }
131
132    /// Runs the executor synchronously until completion.
133    ///
134    /// All transition actions must be sync (is_sync() returns true).
135    /// Returns the final marking.
136    pub fn run_sync(&mut self) -> &Marking {
137        self.initialize_marked_bitmap();
138        self.mark_all_dirty();
139
140        if E::ENABLED {
141            let now = now_millis();
142            self.event_store.append(NetEvent::ExecutionStarted {
143                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
144                timestamp: now,
145            });
146        }
147
148        loop {
149            self.update_dirty_transitions();
150
151            let cycle_now = self.elapsed_ms();
152
153            if self.has_any_deadlines {
154                self.enforce_deadlines(cycle_now);
155            }
156
157            if self.should_terminate() {
158                break;
159            }
160
161            if self.all_immediate && self.all_same_priority {
162                self.fire_ready_immediate_sync();
163            } else {
164                self.fire_ready_general_sync(cycle_now);
165            }
166
167            // If nothing is dirty anymore and nothing is enabled, we're done
168            if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
169                break;
170            }
171        }
172
173        if E::ENABLED {
174            let now = now_millis();
175            self.event_store.append(NetEvent::ExecutionCompleted {
176                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
177                timestamp: now,
178            });
179        }
180
181        &self.marking
182    }
183
184    /// Returns a reference to the current marking.
185    pub fn marking(&self) -> &Marking {
186        &self.marking
187    }
188
189    /// Returns a reference to the event store.
190    pub fn event_store(&self) -> &E {
191        &self.event_store
192    }
193
194    /// Returns true if the executor is quiescent (no enabled or in-flight transitions).
195    pub fn is_quiescent(&self) -> bool {
196        self.enabled_transition_count == 0
197    }
198
199    // ======================== Initialize ========================
200
201    fn initialize_marked_bitmap(&mut self) {
202        for pid in 0..self.compiled.place_count {
203            let place = self.compiled.place(pid);
204            if self.marking.has_tokens(place.name()) {
205                bitmap::set_bit(&mut self.marked_places, pid);
206            }
207        }
208    }
209
210    fn mark_all_dirty(&mut self) {
211        let tc = self.compiled.transition_count;
212        let dirty_words = self.dirty_set.len();
213        for w in 0..dirty_words.saturating_sub(1) {
214            self.dirty_set[w] = u64::MAX;
215        }
216        if dirty_words > 0 {
217            let last_word_bits = tc & bitmap::WORD_MASK;
218            self.dirty_set[dirty_words - 1] = if last_word_bits == 0 {
219                u64::MAX
220            } else {
221                (1u64 << last_word_bits) - 1
222            };
223        }
224    }
225
226    fn should_terminate(&self) -> bool {
227        if self.has_environment_places {
228            return false;
229        }
230        self.enabled_transition_count == 0
231    }
232
233    // ======================== Dirty Set Transitions ========================
234
235    fn update_dirty_transitions(&mut self) {
236        let now_ms = self.elapsed_ms();
237
238        // Snapshot marking bitmap
239        self.marking_snap_buffer
240            .copy_from_slice(&self.marked_places);
241
242        // Snapshot and clear dirty set
243        let dirty_words = self.dirty_set.len();
244        for w in 0..dirty_words {
245            self.dirty_snap_buffer[w] = self.dirty_set[w];
246            self.dirty_set[w] = 0;
247        }
248
249        // Collect dirty transition IDs first to avoid borrow conflict
250        let tc = self.compiled.transition_count;
251        let mut dirty_tids = Vec::new();
252        bitmap::for_each_set_bit(&self.dirty_snap_buffer, |tid| {
253            if tid < tc {
254                dirty_tids.push(tid);
255            }
256        });
257
258        let marking_snap = self.marking_snap_buffer.clone();
259        for tid in dirty_tids {
260            let was_enabled = self.enabled_flags[tid];
261            let can_now = self.can_enable(tid, &marking_snap);
262
263            if can_now && !was_enabled {
264                self.enabled_flags[tid] = true;
265                self.enabled_transition_count += 1;
266                self.enabled_at_ms[tid] = now_ms;
267
268                if E::ENABLED {
269                    self.event_store.append(NetEvent::TransitionEnabled {
270                        transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
271                        timestamp: now_millis(),
272                    });
273                }
274            } else if !can_now && was_enabled {
275                self.enabled_flags[tid] = false;
276                self.enabled_transition_count -= 1;
277                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
278            } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
279                self.enabled_at_ms[tid] = now_ms;
280                if E::ENABLED {
281                    self.event_store.append(NetEvent::TransitionClockRestarted {
282                        transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
283                        timestamp: now_millis(),
284                    });
285                }
286            }
287        }
288
289        self.pending_reset_places.clear();
290    }
291
292    fn enforce_deadlines(&mut self, now_ms: f64) {
293        for tid in 0..self.compiled.transition_count {
294            if !self.has_deadline_flags[tid] || !self.enabled_flags[tid] {
295                continue;
296            }
297            let t = self.compiled.transition(tid);
298            let elapsed = now_ms - self.enabled_at_ms[tid];
299            let latest_ms = t.timing().latest() as f64;
300            if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
301                self.enabled_flags[tid] = false;
302                self.enabled_transition_count -= 1;
303                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
304
305                if E::ENABLED {
306                    self.event_store.append(NetEvent::TransitionTimedOut {
307                        transition_name: Arc::clone(t.name_arc()),
308                        timestamp: now_millis(),
309                    });
310                }
311            }
312        }
313    }
314
315    fn can_enable(&self, tid: usize, marking_snap: &[u64]) -> bool {
316        if !self.compiled.can_enable_bitmap(tid, marking_snap) {
317            return false;
318        }
319
320        // Cardinality check
321        if let Some(card_check) = self.compiled.cardinality_check(tid) {
322            for i in 0..card_check.place_ids.len() {
323                let pid = card_check.place_ids[i];
324                let required = card_check.required_counts[i];
325                let place = self.compiled.place(pid);
326                if self.marking.count(place.name()) < required {
327                    return false;
328                }
329            }
330        }
331
332        // Guard check
333        if self.compiled.has_guards(tid) {
334            let t = self.compiled.transition(tid);
335            for spec in t.input_specs() {
336                if let Some(guard) = spec.guard() {
337                    let required = match spec {
338                        In::One { .. } => 1,
339                        In::Exactly { count, .. } => *count,
340                        In::AtLeast { minimum, .. } => *minimum,
341                        In::All { .. } => 1,
342                    };
343                    if self.marking.count_matching(spec.place_name(), &**guard) < required {
344                        return false;
345                    }
346                }
347            }
348        }
349
350        true
351    }
352
353    fn has_input_from_reset_place(&self, tid: usize) -> bool {
354        if self.pending_reset_places.is_empty() {
355            return false;
356        }
357        let input_names = &self.transition_input_place_names[tid];
358        for name in &self.pending_reset_places {
359            if input_names.contains(name) {
360                return true;
361            }
362        }
363        false
364    }
365
366    // ======================== Firing (Sync) ========================
367
368    fn fire_ready_immediate_sync(&mut self) {
369        for tid in 0..self.compiled.transition_count {
370            if !self.enabled_flags[tid] {
371                continue;
372            }
373            if self.can_enable(tid, &self.marked_places.clone()) {
374                self.fire_transition_sync(tid);
375            } else {
376                self.enabled_flags[tid] = false;
377                self.enabled_transition_count -= 1;
378                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
379            }
380        }
381    }
382
383    fn fire_ready_general_sync(&mut self, now_ms: f64) {
384        // Collect ready transitions
385        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
386        for tid in 0..self.compiled.transition_count {
387            if !self.enabled_flags[tid] {
388                continue;
389            }
390            let t = self.compiled.transition(tid);
391            let enabled_ms = self.enabled_at_ms[tid];
392            let elapsed = now_ms - enabled_ms;
393            let earliest_ms = t.timing().earliest() as f64;
394            if earliest_ms <= elapsed {
395                ready.push((tid, t.priority(), enabled_ms));
396            }
397        }
398        if ready.is_empty() {
399            return;
400        }
401
402        // Sort: higher priority first, then earlier enablement (FIFO)
403        ready.sort_by(|a, b| {
404            b.1.cmp(&a.1)
405                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
406        });
407
408        // Take fresh snapshot
409        self.firing_snap_buffer.copy_from_slice(&self.marked_places);
410
411        for (tid, _, _) in ready {
412            if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
413                self.fire_transition_sync(tid);
414                self.firing_snap_buffer.copy_from_slice(&self.marked_places);
415            } else {
416                self.enabled_flags[tid] = false;
417                self.enabled_transition_count -= 1;
418                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
419            }
420        }
421    }
422
423    fn fire_transition_sync(&mut self, tid: usize) {
424        // Clone all needed data from the transition to release the borrow on self.compiled
425        let t = self.compiled.transition(tid);
426        let transition_name = Arc::clone(t.name_arc());
427        let input_specs: Vec<In> = t.input_specs().to_vec();
428        let read_arcs: Vec<_> = t.reads().to_vec();
429        let reset_arcs: Vec<_> = t.resets().to_vec();
430        let output_place_names: HashSet<Arc<str>> = t
431            .output_places()
432            .iter()
433            .map(|p| Arc::clone(p.name_arc()))
434            .collect();
435        let action = Arc::clone(t.action());
436        // t (reference) is no longer used after this point
437
438        // Consume tokens based on input specs
439        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
440        for in_spec in &input_specs {
441            let place_name = in_spec.place_name();
442            let to_consume = match in_spec {
443                In::One { .. } => 1,
444                In::Exactly { count, .. } => *count,
445                In::All { guard, .. } | In::AtLeast { guard, .. } => {
446                    if guard.is_some() {
447                        self.marking
448                            .count_matching(place_name, &**guard.as_ref().unwrap())
449                    } else {
450                        self.marking.count(place_name)
451                    }
452                }
453            };
454
455            let place_name_arc = Arc::clone(in_spec.place().name_arc());
456            for _ in 0..to_consume {
457                let token = if let Some(guard) = in_spec.guard() {
458                    self.marking.remove_matching(place_name, &**guard)
459                } else {
460                    self.marking.remove_first(place_name)
461                };
462                if let Some(token) = token {
463                    if E::ENABLED {
464                        self.event_store.append(NetEvent::TokenRemoved {
465                            place_name: Arc::clone(&place_name_arc),
466                            timestamp: now_millis(),
467                        });
468                    }
469                    inputs
470                        .entry(Arc::clone(&place_name_arc))
471                        .or_default()
472                        .push(token);
473                }
474            }
475        }
476
477        // Read arcs (peek, don't consume)
478        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
479        for arc in &read_arcs {
480            if let Some(queue) = self.marking.queue(arc.place.name())
481                && let Some(token) = queue.front()
482            {
483                read_tokens
484                    .entry(Arc::clone(arc.place.name_arc()))
485                    .or_default()
486                    .push(token.clone());
487            }
488        }
489
490        // Reset arcs
491        for arc in &reset_arcs {
492            let removed = self.marking.remove_all(arc.place.name());
493            self.pending_reset_places
494                .insert(Arc::clone(arc.place.name_arc()));
495            if E::ENABLED {
496                for _ in &removed {
497                    self.event_store.append(NetEvent::TokenRemoved {
498                        place_name: Arc::clone(arc.place.name_arc()),
499                        timestamp: now_millis(),
500                    });
501                }
502            }
503        }
504
505        // Update bitmap after consumption
506        self.update_bitmap_after_consumption(tid);
507
508        if E::ENABLED {
509            self.event_store.append(NetEvent::TransitionStarted {
510                transition_name: Arc::clone(&transition_name),
511                timestamp: now_millis(),
512            });
513        }
514
515        // Create context and run action
516        let mut ctx = TransitionContext::new(
517            Arc::clone(&transition_name),
518            inputs,
519            read_tokens,
520            output_place_names,
521            None,
522        );
523
524        let result = action.run_sync(&mut ctx);
525
526        match result {
527            Ok(()) => {
528                // Process outputs
529                let outputs = ctx.take_outputs();
530                self.process_outputs(tid, &transition_name, outputs);
531
532                if E::ENABLED {
533                    self.event_store.append(NetEvent::TransitionCompleted {
534                        transition_name: Arc::clone(&transition_name),
535                        timestamp: now_millis(),
536                    });
537                }
538            }
539            Err(err) => {
540                if E::ENABLED {
541                    self.event_store.append(NetEvent::TransitionFailed {
542                        transition_name: Arc::clone(&transition_name),
543                        error: err.message,
544                        timestamp: now_millis(),
545                    });
546                }
547            }
548        }
549
550        // Mark transition as no longer enabled (it just fired)
551        self.enabled_flags[tid] = false;
552        self.enabled_transition_count -= 1;
553        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
554
555        // Mark this transition dirty for re-evaluation
556        self.mark_transition_dirty(tid);
557    }
558
559    fn process_outputs(
560        &mut self,
561        _tid: usize,
562        _transition_name: &Arc<str>,
563        outputs: Vec<OutputEntry>,
564    ) {
565        for entry in outputs {
566            self.marking.add_erased(&entry.place_name, entry.token);
567
568            if let Some(pid) = self.compiled.place_id(&entry.place_name) {
569                bitmap::set_bit(&mut self.marked_places, pid);
570                self.mark_dirty(pid);
571            }
572
573            if E::ENABLED {
574                self.event_store.append(NetEvent::TokenAdded {
575                    place_name: Arc::clone(&entry.place_name),
576                    timestamp: now_millis(),
577                });
578            }
579        }
580    }
581
582    fn update_bitmap_after_consumption(&mut self, tid: usize) {
583        let consumption_pids: Vec<usize> = self.compiled.consumption_place_ids(tid).to_vec();
584        for pid in consumption_pids {
585            let place = self.compiled.place(pid);
586            if !self.marking.has_tokens(place.name()) {
587                bitmap::clear_bit(&mut self.marked_places, pid);
588            }
589            self.mark_dirty(pid);
590        }
591    }
592
593    // ======================== Dirty Set Helpers ========================
594
595    fn has_dirty_bits(&self) -> bool {
596        !bitmap::is_empty(&self.dirty_set)
597    }
598
599    fn mark_dirty(&mut self, pid: usize) {
600        let tids: Vec<usize> = self.compiled.affected_transitions(pid).to_vec();
601        for tid in tids {
602            self.mark_transition_dirty(tid);
603        }
604    }
605
606    fn mark_transition_dirty(&mut self, tid: usize) {
607        bitmap::set_bit(&mut self.dirty_set, tid);
608    }
609
610    fn elapsed_ms(&self) -> f64 {
611        self.start_time.elapsed().as_secs_f64() * 1000.0
612    }
613}
614
615#[cfg(feature = "tokio")]
616use crate::environment::ExecutorSignal;
617
618/// Completion message sent by async actions back to the executor.
619#[cfg(feature = "tokio")]
620struct ActionCompletion {
621    transition_name: Arc<str>,
622    result: Result<Vec<OutputEntry>, String>,
623}
624
625#[cfg(feature = "tokio")]
626impl<E: EventStore> BitmapNetExecutor<E> {
627    /// Runs the executor asynchronously with tokio.
628    ///
629    /// Supports both sync and async transition actions. Sync actions execute
630    /// inline; async actions are spawned as tokio tasks and their completions
631    /// are collected via an mpsc channel.
632    ///
633    /// External events are injected via [`ExecutorSignal::Event`]. Lifecycle
634    /// signals [`ExecutorSignal::Drain`] and [`ExecutorSignal::Close`] control
635    /// graceful and immediate shutdown respectively. Use
636    /// [`ExecutorHandle`](crate::executor_handle::ExecutorHandle) for RAII-managed
637    /// lifecycle with automatic drain on drop.
638    ///
639    /// Returns the final marking when the executor quiesces or is closed.
640    pub async fn run_async(
641        &mut self,
642        mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
643    ) -> &Marking {
644        let (completion_tx, mut completion_rx) =
645            tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
646
647        self.initialize_marked_bitmap();
648        self.mark_all_dirty();
649
650        let mut in_flight_count: usize = 0;
651        let mut signal_channel_open = true;
652        let mut draining = false;
653        let mut closed = false;
654
655        if E::ENABLED {
656            let now = now_millis();
657            self.event_store.append(NetEvent::ExecutionStarted {
658                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
659                timestamp: now,
660            });
661        }
662
663        loop {
664            // Phase 1: Process completed async actions
665            while let Ok(completion) = completion_rx.try_recv() {
666                in_flight_count -= 1;
667                match completion.result {
668                    Ok(outputs) => {
669                        self.process_outputs(0, &completion.transition_name, outputs);
670                        if E::ENABLED {
671                            self.event_store.append(NetEvent::TransitionCompleted {
672                                transition_name: Arc::clone(&completion.transition_name),
673                                timestamp: now_millis(),
674                            });
675                        }
676                    }
677                    Err(err) => {
678                        if E::ENABLED {
679                            self.event_store.append(NetEvent::TransitionFailed {
680                                transition_name: Arc::clone(&completion.transition_name),
681                                error: err,
682                                timestamp: now_millis(),
683                            });
684                        }
685                    }
686                }
687            }
688
689            // Phase 2: Process signals (external events + lifecycle)
690            while let Ok(signal) = signal_rx.try_recv() {
691                match signal {
692                    ExecutorSignal::Event(event) if !draining => {
693                        self.marking.add_erased(&event.place_name, event.token);
694                        if let Some(pid) = self.compiled.place_id(&event.place_name) {
695                            bitmap::set_bit(&mut self.marked_places, pid);
696                            self.mark_dirty(pid);
697                        }
698                        if E::ENABLED {
699                            self.event_store.append(NetEvent::TokenAdded {
700                                place_name: Arc::clone(&event.place_name),
701                                timestamp: now_millis(),
702                            });
703                        }
704                    }
705                    ExecutorSignal::Event(_) => {
706                        // Draining: discard events arriving after drain signal
707                    }
708                    ExecutorSignal::Drain => {
709                        draining = true;
710                    }
711                    ExecutorSignal::Close => {
712                        closed = true;
713                        draining = true;
714                        // Discard remaining queued signals per ENV-013.
715                        // Note: events already processed earlier in this try_recv batch
716                        // are kept (single-channel design); Java/TS discard all queued
717                        // events via an atomic flag checked at processExternalEvents() entry.
718                        while signal_rx.try_recv().is_ok() {}
719                    }
720                }
721            }
722
723            // Phase 3: Update dirty transitions
724            self.update_dirty_transitions();
725
726            // Phase 4: Enforce deadlines
727            let cycle_now = self.elapsed_ms();
728            if self.has_any_deadlines {
729                self.enforce_deadlines(cycle_now);
730            }
731
732            // Termination check — O(1) flag checks
733            if closed && in_flight_count == 0 {
734                break; // ENV-013: immediate close, in-flight completed
735            }
736            if draining
737                && self.enabled_transition_count == 0
738                && in_flight_count == 0
739            {
740                break; // ENV-011: graceful drain, quiescent
741            }
742            if self.enabled_transition_count == 0
743                && in_flight_count == 0
744                && (!self.has_environment_places || !signal_channel_open)
745            {
746                break; // Standard termination
747            }
748
749            // Phase 5: Fire ready transitions
750            let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
751
752            // If we fired something or have dirty bits, loop immediately
753            if fired || self.has_dirty_bits() {
754                // Yield to let spawned tasks run
755                tokio::task::yield_now().await;
756                continue;
757            }
758
759            // Phase 6: Await work (completion, external event, or timer)
760            if in_flight_count == 0 && !self.has_environment_places
761                && self.enabled_transition_count == 0
762            {
763                break;
764            }
765            if in_flight_count == 0
766                && self.enabled_transition_count == 0
767                && (draining || !signal_channel_open)
768            {
769                break;
770            }
771
772            let timer_ms = self.millis_until_next_timed_transition();
773
774            tokio::select! {
775                Some(completion) = completion_rx.recv() => {
776                    in_flight_count -= 1;
777                    match completion.result {
778                        Ok(outputs) => {
779                            self.process_outputs(0, &completion.transition_name, outputs);
780                            if E::ENABLED {
781                                self.event_store.append(NetEvent::TransitionCompleted {
782                                    transition_name: Arc::clone(&completion.transition_name),
783                                    timestamp: now_millis(),
784                                });
785                            }
786                        }
787                        Err(err) => {
788                            if E::ENABLED {
789                                self.event_store.append(NetEvent::TransitionFailed {
790                                    transition_name: Arc::clone(&completion.transition_name),
791                                    error: err,
792                                    timestamp: now_millis(),
793                                });
794                            }
795                        }
796                    }
797                }
798                result = signal_rx.recv(), if signal_channel_open && !closed => {
799                    match result {
800                        Some(ExecutorSignal::Event(event)) if !draining => {
801                            self.marking.add_erased(&event.place_name, event.token);
802                            if let Some(pid) = self.compiled.place_id(&event.place_name) {
803                                bitmap::set_bit(&mut self.marked_places, pid);
804                                self.mark_dirty(pid);
805                            }
806                            if E::ENABLED {
807                                self.event_store.append(NetEvent::TokenAdded {
808                                    place_name: Arc::clone(&event.place_name),
809                                    timestamp: now_millis(),
810                                });
811                            }
812                        }
813                        Some(ExecutorSignal::Event(_)) => {
814                            // Draining: discard events
815                        }
816                        Some(ExecutorSignal::Drain) => {
817                            draining = true;
818                        }
819                        Some(ExecutorSignal::Close) => {
820                            closed = true;
821                            draining = true;
822                            while signal_rx.try_recv().is_ok() {}
823                        }
824                        None => {
825                            signal_channel_open = false;
826                        }
827                    }
828                }
829                _ = tokio::time::sleep(std::time::Duration::from_millis(
830                    if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
831                )) => {
832                    // Timer fired — re-evaluate transitions
833                }
834            }
835        }
836
837        if E::ENABLED {
838            let now = now_millis();
839            self.event_store.append(NetEvent::ExecutionCompleted {
840                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
841                timestamp: now,
842            });
843        }
844
845        &self.marking
846    }
847
848    /// Fires ready transitions, dispatching async actions via tokio::spawn.
849    /// Returns true if any transitions were fired.
850    fn fire_ready_async(
851        &mut self,
852        now_ms: f64,
853        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
854        in_flight_count: &mut usize,
855    ) -> bool {
856        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
857        for tid in 0..self.compiled.transition_count {
858            if !self.enabled_flags[tid] {
859                continue;
860            }
861            let t = self.compiled.transition(tid);
862            let enabled_ms = self.enabled_at_ms[tid];
863            let elapsed = now_ms - enabled_ms;
864            let earliest_ms = t.timing().earliest() as f64;
865            if earliest_ms <= elapsed {
866                ready.push((tid, t.priority(), enabled_ms));
867            }
868        }
869        if ready.is_empty() {
870            return false;
871        }
872
873        ready.sort_by(|a, b| {
874            b.1.cmp(&a.1)
875                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
876        });
877
878        self.firing_snap_buffer.copy_from_slice(&self.marked_places);
879
880        let mut fired_any = false;
881        for (tid, _, _) in ready {
882            if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
883                self.fire_transition_async(tid, completion_tx, in_flight_count);
884                self.firing_snap_buffer.copy_from_slice(&self.marked_places);
885                fired_any = true;
886            } else {
887                self.enabled_flags[tid] = false;
888                self.enabled_transition_count -= 1;
889                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
890            }
891        }
892        fired_any
893    }
894
895    /// Fires a single transition, either sync inline or async via tokio::spawn.
896    fn fire_transition_async(
897        &mut self,
898        tid: usize,
899        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
900        in_flight_count: &mut usize,
901    ) {
902        let t = self.compiled.transition(tid);
903        let transition_name = Arc::clone(t.name_arc());
904        let input_specs: Vec<In> = t.input_specs().to_vec();
905        let read_arcs: Vec<_> = t.reads().to_vec();
906        let reset_arcs: Vec<_> = t.resets().to_vec();
907        let output_place_names: HashSet<Arc<str>> = t
908            .output_places()
909            .iter()
910            .map(|p| Arc::clone(p.name_arc()))
911            .collect();
912        let action = Arc::clone(t.action());
913        let is_sync = action.is_sync();
914
915        // Consume tokens
916        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
917        for in_spec in &input_specs {
918            let place_name = in_spec.place_name();
919            let to_consume = match in_spec {
920                In::One { .. } => 1,
921                In::Exactly { count, .. } => *count,
922                In::All { guard, .. } | In::AtLeast { guard, .. } => {
923                    if guard.is_some() {
924                        self.marking
925                            .count_matching(place_name, &**guard.as_ref().unwrap())
926                    } else {
927                        self.marking.count(place_name)
928                    }
929                }
930            };
931
932            let place_name_arc = Arc::clone(in_spec.place().name_arc());
933            for _ in 0..to_consume {
934                let token = if let Some(guard) = in_spec.guard() {
935                    self.marking.remove_matching(place_name, &**guard)
936                } else {
937                    self.marking.remove_first(place_name)
938                };
939                if let Some(token) = token {
940                    if E::ENABLED {
941                        self.event_store.append(NetEvent::TokenRemoved {
942                            place_name: Arc::clone(&place_name_arc),
943                            timestamp: now_millis(),
944                        });
945                    }
946                    inputs
947                        .entry(Arc::clone(&place_name_arc))
948                        .or_default()
949                        .push(token);
950                }
951            }
952        }
953
954        // Read arcs
955        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
956        for arc in &read_arcs {
957            if let Some(queue) = self.marking.queue(arc.place.name())
958                && let Some(token) = queue.front()
959            {
960                read_tokens
961                    .entry(Arc::clone(arc.place.name_arc()))
962                    .or_default()
963                    .push(token.clone());
964            }
965        }
966
967        // Reset arcs
968        for arc in &reset_arcs {
969            let removed = self.marking.remove_all(arc.place.name());
970            self.pending_reset_places
971                .insert(Arc::clone(arc.place.name_arc()));
972            if E::ENABLED {
973                for _ in &removed {
974                    self.event_store.append(NetEvent::TokenRemoved {
975                        place_name: Arc::clone(arc.place.name_arc()),
976                        timestamp: now_millis(),
977                    });
978                }
979            }
980        }
981
982        self.update_bitmap_after_consumption(tid);
983
984        if E::ENABLED {
985            self.event_store.append(NetEvent::TransitionStarted {
986                transition_name: Arc::clone(&transition_name),
987                timestamp: now_millis(),
988            });
989        }
990
991        // Mark transition as no longer enabled
992        self.enabled_flags[tid] = false;
993        self.enabled_transition_count -= 1;
994        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
995        self.mark_transition_dirty(tid);
996
997        if is_sync {
998            // Inline sync execution
999            let mut ctx = TransitionContext::new(
1000                Arc::clone(&transition_name),
1001                inputs,
1002                read_tokens,
1003                output_place_names,
1004                None,
1005            );
1006            let result = action.run_sync(&mut ctx);
1007            match result {
1008                Ok(()) => {
1009                    let outputs = ctx.take_outputs();
1010                    self.process_outputs(tid, &transition_name, outputs);
1011                    if E::ENABLED {
1012                        self.event_store.append(NetEvent::TransitionCompleted {
1013                            transition_name: Arc::clone(&transition_name),
1014                            timestamp: now_millis(),
1015                        });
1016                    }
1017                }
1018                Err(err) => {
1019                    if E::ENABLED {
1020                        self.event_store.append(NetEvent::TransitionFailed {
1021                            transition_name: Arc::clone(&transition_name),
1022                            error: err.message,
1023                            timestamp: now_millis(),
1024                        });
1025                    }
1026                }
1027            }
1028        } else {
1029            // Async: spawn tokio task
1030            *in_flight_count += 1;
1031            let tx = completion_tx.clone();
1032            let name = Arc::clone(&transition_name);
1033            let ctx = TransitionContext::new(
1034                Arc::clone(&transition_name),
1035                inputs,
1036                read_tokens,
1037                output_place_names,
1038                None,
1039            );
1040            tokio::spawn(async move {
1041                let result = action.run_async(ctx).await;
1042                let completion = match result {
1043                    Ok(mut completed_ctx) => ActionCompletion {
1044                        transition_name: Arc::clone(&name),
1045                        result: Ok(completed_ctx.take_outputs()),
1046                    },
1047                    Err(err) => ActionCompletion {
1048                        transition_name: Arc::clone(&name),
1049                        result: Err(err.message),
1050                    },
1051                };
1052                let _ = tx.send(completion);
1053            });
1054        }
1055    }
1056
1057    /// Computes milliseconds until the next timed transition needs attention.
1058    fn millis_until_next_timed_transition(&self) -> f64 {
1059        let mut min_wait = f64::INFINITY;
1060        let now_ms = self.elapsed_ms();
1061
1062        for tid in 0..self.compiled.transition_count {
1063            if !self.enabled_flags[tid] {
1064                continue;
1065            }
1066            let t = self.compiled.transition(tid);
1067            let elapsed = now_ms - self.enabled_at_ms[tid];
1068
1069            let earliest_ms = t.timing().earliest() as f64;
1070            let remaining_earliest = earliest_ms - elapsed;
1071            if remaining_earliest <= 0.0 {
1072                return 0.0;
1073            }
1074            min_wait = min_wait.min(remaining_earliest);
1075
1076            if self.has_deadline_flags[tid] {
1077                let latest_ms = t.timing().latest() as f64;
1078                let remaining_deadline = latest_ms - elapsed;
1079                if remaining_deadline <= 0.0 {
1080                    return 0.0;
1081                }
1082                min_wait = min_wait.min(remaining_deadline);
1083            }
1084        }
1085
1086        min_wait
1087    }
1088}
1089
1090fn now_millis() -> u64 {
1091    std::time::SystemTime::now()
1092        .duration_since(std::time::UNIX_EPOCH)
1093        .unwrap_or_default()
1094        .as_millis() as u64
1095}
1096
1097/// Validates that the produced outputs satisfy the output spec.
1098#[allow(dead_code)]
1099fn validate_out_spec(out: &Out, produced_places: &HashSet<Arc<str>>) -> bool {
1100    match out {
1101        Out::Place(p) => produced_places.contains(p.name()),
1102        Out::And(children) => children
1103            .iter()
1104            .all(|c| validate_out_spec(c, produced_places)),
1105        Out::Xor(children) => children
1106            .iter()
1107            .any(|c| validate_out_spec(c, produced_places)),
1108        Out::Timeout { child, .. } => validate_out_spec(child, produced_places),
1109        Out::ForwardInput { to, .. } => produced_places.contains(to.name()),
1110    }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115    use super::*;
1116    use libpetri_core::action::passthrough;
1117    use libpetri_core::input::one;
1118    use libpetri_core::output::out_place;
1119    use libpetri_core::place::Place;
1120    use libpetri_core::token::Token;
1121    use libpetri_core::transition::Transition;
1122    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1123
1124    fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1125        let p1 = Place::<i32>::new("p1");
1126        let p2 = Place::<i32>::new("p2");
1127        let p3 = Place::<i32>::new("p3");
1128
1129        let t1 = Transition::builder("t1")
1130            .input(one(&p1))
1131            .output(out_place(&p2))
1132            .action(passthrough())
1133            .build();
1134        let t2 = Transition::builder("t2")
1135            .input(one(&p2))
1136            .output(out_place(&p3))
1137            .action(passthrough())
1138            .build();
1139
1140        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1141        (net, p1, p2, p3)
1142    }
1143
1144    #[test]
1145    fn sync_passthrough_chain() {
1146        let (net, p1, _p2, _p3) = simple_chain();
1147
1148        let mut marking = Marking::new();
1149        marking.add(&p1, Token::at(42, 0));
1150
1151        let mut executor =
1152            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1153        let result = executor.run_sync();
1154
1155        // Token should flow p1 -> p2 -> p3
1156        // passthrough produces no outputs, so token stays consumed
1157        // Actually passthrough produces nothing, so p2 and p3 will be empty
1158        assert_eq!(result.count("p1"), 0);
1159    }
1160
1161    #[test]
1162    fn sync_with_event_store() {
1163        let (net, p1, _, _) = simple_chain();
1164
1165        let mut marking = Marking::new();
1166        marking.add(&p1, Token::at(42, 0));
1167
1168        let mut executor =
1169            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1170        executor.run_sync();
1171
1172        let store = executor.event_store();
1173        assert!(!store.is_empty());
1174    }
1175
1176    #[test]
1177    fn sync_fork_chain() {
1178        let p1 = Place::<i32>::new("p1");
1179        let p2 = Place::<i32>::new("p2");
1180        let p3 = Place::<i32>::new("p3");
1181
1182        let t1 = Transition::builder("t1")
1183            .input(one(&p1))
1184            .output(libpetri_core::output::and(vec![
1185                out_place(&p2),
1186                out_place(&p3),
1187            ]))
1188            .action(libpetri_core::action::fork())
1189            .build();
1190
1191        let net = PetriNet::builder("fork").transition(t1).build();
1192
1193        let mut marking = Marking::new();
1194        marking.add(&p1, Token::at(42, 0));
1195
1196        let mut executor =
1197            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1198        let result = executor.run_sync();
1199
1200        // Fork copies input to all outputs
1201        assert_eq!(result.count("p1"), 0);
1202        assert_eq!(result.count("p2"), 1);
1203        assert_eq!(result.count("p3"), 1);
1204    }
1205
1206    #[test]
1207    fn sync_no_initial_tokens() {
1208        let (net, _, _, _) = simple_chain();
1209        let marking = Marking::new();
1210        let mut executor =
1211            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1212        let result = executor.run_sync();
1213        assert_eq!(result.count("p1"), 0);
1214        assert_eq!(result.count("p2"), 0);
1215        assert_eq!(result.count("p3"), 0);
1216    }
1217
1218    #[test]
1219    fn sync_priority_ordering() {
1220        let p = Place::<()>::new("p");
1221        let out_a = Place::<()>::new("a");
1222        let out_b = Place::<()>::new("b");
1223
1224        // t_high has priority 10, t_low has priority 1
1225        // Both consume from p, but t_high should fire first
1226        let t_high = Transition::builder("t_high")
1227            .input(one(&p))
1228            .output(out_place(&out_a))
1229            .action(libpetri_core::action::passthrough())
1230            .priority(10)
1231            .build();
1232        let t_low = Transition::builder("t_low")
1233            .input(one(&p))
1234            .output(out_place(&out_b))
1235            .action(libpetri_core::action::passthrough())
1236            .priority(1)
1237            .build();
1238
1239        let net = PetriNet::builder("priority")
1240            .transitions([t_high, t_low])
1241            .build();
1242
1243        let mut marking = Marking::new();
1244        marking.add(&p, Token::at((), 0));
1245
1246        let mut executor =
1247            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1248        executor.run_sync();
1249
1250        // Only one token, high priority should have consumed it
1251        // Since passthrough doesn't produce output, both outputs empty
1252        // but p should be empty (consumed by the higher priority transition)
1253        assert_eq!(executor.marking().count("p"), 0);
1254    }
1255
1256    #[test]
1257    fn sync_inhibitor_blocks() {
1258        let p1 = Place::<()>::new("p1");
1259        let p2 = Place::<()>::new("p2");
1260        let p_inh = Place::<()>::new("inh");
1261
1262        let t = Transition::builder("t1")
1263            .input(one(&p1))
1264            .output(out_place(&p2))
1265            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1266            .action(libpetri_core::action::passthrough())
1267            .build();
1268
1269        let net = PetriNet::builder("inhibitor").transition(t).build();
1270
1271        let mut marking = Marking::new();
1272        marking.add(&p1, Token::at((), 0));
1273        marking.add(&p_inh, Token::at((), 0));
1274
1275        let mut executor =
1276            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1277        executor.run_sync();
1278
1279        // Inhibitor should block — token remains in p1
1280        assert_eq!(executor.marking().count("p1"), 1);
1281    }
1282
1283    #[test]
1284    fn sync_linear_chain_5() {
1285        // Build a chain: p0 -> t0 -> p1 -> t1 -> ... -> p5
1286        let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1287        let transitions: Vec<Transition> = (0..5)
1288            .map(|i| {
1289                Transition::builder(format!("t{i}"))
1290                    .input(one(&places[i]))
1291                    .output(out_place(&places[i + 1]))
1292                    .action(libpetri_core::action::fork())
1293                    .build()
1294            })
1295            .collect();
1296
1297        let net = PetriNet::builder("chain5").transitions(transitions).build();
1298
1299        let mut marking = Marking::new();
1300        marking.add(&places[0], Token::at(1, 0));
1301
1302        let mut executor =
1303            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1304        let result = executor.run_sync();
1305
1306        // Token should flow through all 5 transitions to p5
1307        assert_eq!(result.count("p0"), 0);
1308        assert_eq!(result.count("p5"), 1);
1309    }
1310
1311    #[test]
1312    fn input_arc_requires_token_to_enable() {
1313        let p1 = Place::<i32>::new("p1");
1314        let p2 = Place::<i32>::new("p2");
1315        let t = Transition::builder("t1")
1316            .input(one(&p1))
1317            .output(out_place(&p2))
1318            .action(libpetri_core::action::fork())
1319            .build();
1320        let net = PetriNet::builder("test").transition(t).build();
1321
1322        // No tokens — transition should not fire
1323        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1324            &net,
1325            Marking::new(),
1326            ExecutorOptions::default(),
1327        );
1328        executor.run_sync();
1329        assert_eq!(executor.marking().count("p1"), 0);
1330        assert_eq!(executor.marking().count("p2"), 0);
1331    }
1332
1333    #[test]
1334    fn multiple_input_arcs_require_all_tokens() {
1335        let p1 = Place::<i32>::new("p1");
1336        let p2 = Place::<i32>::new("p2");
1337        let p3 = Place::<i32>::new("p3");
1338
1339        let t = Transition::builder("t1")
1340            .input(one(&p1))
1341            .input(one(&p2))
1342            .output(out_place(&p3))
1343            .action(libpetri_core::action::sync_action(|ctx| {
1344                ctx.output("p3", 99i32)?;
1345                Ok(())
1346            }))
1347            .build();
1348        let net = PetriNet::builder("test").transition(t).build();
1349
1350        // Only p1 has a token — should not fire
1351        let mut marking = Marking::new();
1352        marking.add(&p1, Token::at(1, 0));
1353        let mut executor =
1354            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1355        executor.run_sync();
1356        assert_eq!(executor.marking().count("p1"), 1);
1357        assert_eq!(executor.marking().count("p3"), 0);
1358
1359        // Both p1 and p2 have tokens — should fire
1360        let mut marking = Marking::new();
1361        marking.add(&p1, Token::at(1, 0));
1362        marking.add(&p2, Token::at(2, 0));
1363        let mut executor =
1364            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1365        executor.run_sync();
1366        assert_eq!(executor.marking().count("p1"), 0);
1367        assert_eq!(executor.marking().count("p2"), 0);
1368        assert_eq!(executor.marking().count("p3"), 1);
1369    }
1370
1371    #[test]
1372    fn read_arc_does_not_consume() {
1373        let p_in = Place::<i32>::new("in");
1374        let p_ctx = Place::<i32>::new("ctx");
1375        let p_out = Place::<i32>::new("out");
1376
1377        let t = Transition::builder("t1")
1378            .input(one(&p_in))
1379            .read(libpetri_core::arc::read(&p_ctx))
1380            .output(out_place(&p_out))
1381            .action(libpetri_core::action::sync_action(|ctx| {
1382                let v = ctx.input::<i32>("in")?;
1383                let r = ctx.read::<i32>("ctx")?;
1384                ctx.output("out", *v + *r)?;
1385                Ok(())
1386            }))
1387            .build();
1388        let net = PetriNet::builder("test").transition(t).build();
1389
1390        let mut marking = Marking::new();
1391        marking.add(&p_in, Token::at(10, 0));
1392        marking.add(&p_ctx, Token::at(5, 0));
1393
1394        let mut executor =
1395            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1396        executor.run_sync();
1397
1398        assert_eq!(executor.marking().count("in"), 0); // consumed
1399        assert_eq!(executor.marking().count("ctx"), 1); // NOT consumed (read arc)
1400        assert_eq!(executor.marking().count("out"), 1);
1401    }
1402
1403    #[test]
1404    fn reset_arc_removes_all_tokens() {
1405        let p_in = Place::<()>::new("in");
1406        let p_reset = Place::<i32>::new("reset");
1407        let p_out = Place::<()>::new("out");
1408
1409        let t = Transition::builder("t1")
1410            .input(one(&p_in))
1411            .reset(libpetri_core::arc::reset(&p_reset))
1412            .output(out_place(&p_out))
1413            .action(libpetri_core::action::fork())
1414            .build();
1415        let net = PetriNet::builder("test").transition(t).build();
1416
1417        let mut marking = Marking::new();
1418        marking.add(&p_in, Token::at((), 0));
1419        marking.add(&p_reset, Token::at(1, 0));
1420        marking.add(&p_reset, Token::at(2, 0));
1421        marking.add(&p_reset, Token::at(3, 0));
1422
1423        let mut executor =
1424            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1425        executor.run_sync();
1426
1427        assert_eq!(executor.marking().count("reset"), 0); // all cleared
1428        assert_eq!(executor.marking().count("out"), 1);
1429    }
1430
1431    #[test]
1432    fn exactly_cardinality_consumes_n() {
1433        let p = Place::<i32>::new("p");
1434        let p_out = Place::<i32>::new("out");
1435
1436        let t = Transition::builder("t1")
1437            .input(libpetri_core::input::exactly(3, &p))
1438            .output(out_place(&p_out))
1439            .action(libpetri_core::action::sync_action(|ctx| {
1440                let vals = ctx.inputs::<i32>("p")?;
1441                for v in vals {
1442                    ctx.output("out", *v)?;
1443                }
1444                Ok(())
1445            }))
1446            .build();
1447        let net = PetriNet::builder("test").transition(t).build();
1448
1449        let mut marking = Marking::new();
1450        for i in 0..5 {
1451            marking.add(&p, Token::at(i, 0));
1452        }
1453
1454        let mut executor =
1455            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1456        executor.run_sync();
1457
1458        // Consumed 3 of 5, produced 3
1459        assert_eq!(executor.marking().count("p"), 2);
1460        assert_eq!(executor.marking().count("out"), 3);
1461    }
1462
1463    #[test]
1464    fn all_cardinality_consumes_everything() {
1465        let p = Place::<i32>::new("p");
1466        let p_out = Place::<()>::new("out");
1467
1468        let t = Transition::builder("t1")
1469            .input(libpetri_core::input::all(&p))
1470            .output(out_place(&p_out))
1471            .action(libpetri_core::action::sync_action(|ctx| {
1472                let vals = ctx.inputs::<i32>("p")?;
1473                ctx.output("out", vals.len() as i32)?;
1474                Ok(())
1475            }))
1476            .build();
1477        let net = PetriNet::builder("test").transition(t).build();
1478
1479        let mut marking = Marking::new();
1480        for i in 0..5 {
1481            marking.add(&p, Token::at(i, 0));
1482        }
1483
1484        let mut executor =
1485            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1486        executor.run_sync();
1487
1488        assert_eq!(executor.marking().count("p"), 0);
1489    }
1490
1491    #[test]
1492    fn at_least_blocks_insufficient() {
1493        let p = Place::<i32>::new("p");
1494        let p_out = Place::<()>::new("out");
1495
1496        let t = Transition::builder("t1")
1497            .input(libpetri_core::input::at_least(3, &p))
1498            .output(out_place(&p_out))
1499            .action(libpetri_core::action::passthrough())
1500            .build();
1501        let net = PetriNet::builder("test").transition(t).build();
1502
1503        // Only 2 tokens, need 3+
1504        let mut marking = Marking::new();
1505        marking.add(&p, Token::at(1, 0));
1506        marking.add(&p, Token::at(2, 0));
1507
1508        let mut executor =
1509            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1510        executor.run_sync();
1511
1512        assert_eq!(executor.marking().count("p"), 2); // not consumed
1513    }
1514
1515    #[test]
1516    fn at_least_fires_with_enough_and_consumes_all() {
1517        let p = Place::<i32>::new("p");
1518        let p_out = Place::<()>::new("out");
1519
1520        let t = Transition::builder("t1")
1521            .input(libpetri_core::input::at_least(3, &p))
1522            .output(out_place(&p_out))
1523            .action(libpetri_core::action::passthrough())
1524            .build();
1525        let net = PetriNet::builder("test").transition(t).build();
1526
1527        let mut marking = Marking::new();
1528        for i in 0..5 {
1529            marking.add(&p, Token::at(i, 0));
1530        }
1531
1532        let mut executor =
1533            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1534        executor.run_sync();
1535
1536        assert_eq!(executor.marking().count("p"), 0); // all consumed
1537    }
1538
1539    #[test]
1540    fn guarded_input_only_consumes_matching() {
1541        let p = Place::<i32>::new("p");
1542        let p_out = Place::<i32>::new("out");
1543
1544        let t = Transition::builder("t1")
1545            .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1546            .output(out_place(&p_out))
1547            .action(libpetri_core::action::fork())
1548            .build();
1549        let net = PetriNet::builder("test").transition(t).build();
1550
1551        let mut marking = Marking::new();
1552        marking.add(&p, Token::at(3, 0)); // doesn't match guard
1553        marking.add(&p, Token::at(10, 0)); // matches
1554
1555        let mut executor =
1556            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1557        executor.run_sync();
1558
1559        assert_eq!(executor.marking().count("p"), 1); // 3 remains
1560        assert_eq!(executor.marking().count("out"), 1); // 10 forwarded
1561        let peeked = executor.marking().peek(&p_out).unwrap();
1562        assert_eq!(*peeked, 10);
1563    }
1564
1565    #[test]
1566    fn guarded_input_blocks_when_no_match() {
1567        let p = Place::<i32>::new("p");
1568        let p_out = Place::<i32>::new("out");
1569
1570        let t = Transition::builder("t1")
1571            .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1572            .output(out_place(&p_out))
1573            .action(libpetri_core::action::fork())
1574            .build();
1575        let net = PetriNet::builder("test").transition(t).build();
1576
1577        let mut marking = Marking::new();
1578        marking.add(&p, Token::at(3, 0));
1579        marking.add(&p, Token::at(10, 0));
1580
1581        let mut executor =
1582            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1583        executor.run_sync();
1584
1585        // Nothing matches guard, transition does not fire
1586        assert_eq!(executor.marking().count("p"), 2);
1587        assert_eq!(executor.marking().count("out"), 0);
1588    }
1589
1590    #[test]
1591    fn transform_action_outputs_to_all_places() {
1592        let p_in = Place::<i32>::new("in");
1593        let p_a = Place::<i32>::new("a");
1594        let p_b = Place::<i32>::new("b");
1595
1596        let t = Transition::builder("t1")
1597            .input(one(&p_in))
1598            .output(libpetri_core::output::and(vec![
1599                out_place(&p_a),
1600                out_place(&p_b),
1601            ]))
1602            .action(libpetri_core::action::transform(|ctx| {
1603                let v = ctx.input::<i32>("in").unwrap();
1604                Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
1605            }))
1606            .build();
1607        let net = PetriNet::builder("test").transition(t).build();
1608
1609        let mut marking = Marking::new();
1610        marking.add(&p_in, Token::at(5, 0));
1611
1612        let mut executor =
1613            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1614        executor.run_sync();
1615
1616        assert_eq!(executor.marking().count("a"), 1);
1617        assert_eq!(executor.marking().count("b"), 1);
1618        assert_eq!(*executor.marking().peek(&p_a).unwrap(), 10);
1619        assert_eq!(*executor.marking().peek(&p_b).unwrap(), 10);
1620    }
1621
1622    #[test]
1623    fn sync_action_custom_logic() {
1624        let p_in = Place::<i32>::new("in");
1625        let p_out = Place::<String>::new("out");
1626
1627        let t = Transition::builder("t1")
1628            .input(one(&p_in))
1629            .output(out_place(&p_out))
1630            .action(libpetri_core::action::sync_action(|ctx| {
1631                let v = ctx.input::<i32>("in")?;
1632                ctx.output("out", format!("value={v}"))?;
1633                Ok(())
1634            }))
1635            .build();
1636        let net = PetriNet::builder("test").transition(t).build();
1637
1638        let mut marking = Marking::new();
1639        marking.add(&p_in, Token::at(42, 0));
1640
1641        let mut executor =
1642            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1643        executor.run_sync();
1644
1645        assert_eq!(executor.marking().count("out"), 1);
1646        let peeked = executor.marking().peek(&p_out).unwrap();
1647        assert_eq!(*peeked, "value=42");
1648    }
1649
1650    #[test]
1651    fn action_error_does_not_crash() {
1652        let p_in = Place::<i32>::new("in");
1653        let p_out = Place::<i32>::new("out");
1654
1655        let t = Transition::builder("t1")
1656            .input(one(&p_in))
1657            .output(out_place(&p_out))
1658            .action(libpetri_core::action::sync_action(|_ctx| {
1659                Err(libpetri_core::action::ActionError::new(
1660                    "intentional failure",
1661                ))
1662            }))
1663            .build();
1664        let net = PetriNet::builder("test").transition(t).build();
1665
1666        let mut marking = Marking::new();
1667        marking.add(&p_in, Token::at(42, 0));
1668
1669        let mut executor =
1670            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1671        executor.run_sync();
1672
1673        // Token consumed even though action failed
1674        assert_eq!(executor.marking().count("in"), 0);
1675        assert_eq!(executor.marking().count("out"), 0);
1676
1677        // Failure event should be recorded
1678        let events = executor.event_store().events();
1679        assert!(
1680            events
1681                .iter()
1682                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
1683        );
1684    }
1685
1686    #[test]
1687    fn event_store_records_lifecycle() {
1688        let p1 = Place::<i32>::new("p1");
1689        let p2 = Place::<i32>::new("p2");
1690        let t = Transition::builder("t1")
1691            .input(one(&p1))
1692            .output(out_place(&p2))
1693            .action(libpetri_core::action::fork())
1694            .build();
1695        let net = PetriNet::builder("test").transition(t).build();
1696
1697        let mut marking = Marking::new();
1698        marking.add(&p1, Token::at(1, 0));
1699
1700        let mut executor =
1701            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1702        executor.run_sync();
1703
1704        let events = executor.event_store().events();
1705
1706        // Should have: ExecutionStarted, TransitionEnabled, TokenRemoved, TransitionStarted,
1707        // TokenAdded, TransitionCompleted, ExecutionCompleted
1708        assert!(
1709            events
1710                .iter()
1711                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
1712        );
1713        assert!(
1714            events
1715                .iter()
1716                .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
1717        );
1718        assert!(
1719            events
1720                .iter()
1721                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
1722        );
1723        assert!(
1724            events
1725                .iter()
1726                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
1727        );
1728        assert!(
1729            events
1730                .iter()
1731                .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
1732        );
1733        assert!(
1734            events
1735                .iter()
1736                .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
1737        );
1738        assert!(
1739            events
1740                .iter()
1741                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
1742        );
1743    }
1744
1745    #[test]
1746    fn noop_event_store_has_no_events() {
1747        let p1 = Place::<i32>::new("p1");
1748        let p2 = Place::<i32>::new("p2");
1749        let t = Transition::builder("t1")
1750            .input(one(&p1))
1751            .output(out_place(&p2))
1752            .action(libpetri_core::action::fork())
1753            .build();
1754        let net = PetriNet::builder("test").transition(t).build();
1755
1756        let mut marking = Marking::new();
1757        marking.add(&p1, Token::at(1, 0));
1758
1759        let mut executor =
1760            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1761        executor.run_sync();
1762
1763        assert!(executor.event_store().is_empty());
1764    }
1765
1766    #[test]
1767    fn empty_net_completes() {
1768        let net = PetriNet::builder("empty").build();
1769        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1770            &net,
1771            Marking::new(),
1772            ExecutorOptions::default(),
1773        );
1774        executor.run_sync();
1775        assert!(executor.is_quiescent());
1776    }
1777
1778    #[test]
1779    fn single_transition_fires_once() {
1780        let p = Place::<i32>::new("p");
1781        let out = Place::<i32>::new("out");
1782        let t = Transition::builder("t1")
1783            .input(one(&p))
1784            .output(out_place(&out))
1785            .action(libpetri_core::action::fork())
1786            .build();
1787        let net = PetriNet::builder("test").transition(t).build();
1788
1789        let mut marking = Marking::new();
1790        marking.add(&p, Token::at(42, 0));
1791
1792        let mut executor =
1793            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1794        executor.run_sync();
1795
1796        assert_eq!(executor.marking().count("p"), 0);
1797        assert_eq!(executor.marking().count("out"), 1);
1798    }
1799
1800    #[test]
1801    fn many_tokens_all_processed() {
1802        let p = Place::<i32>::new("p");
1803        let out = Place::<i32>::new("out");
1804        let t = Transition::builder("t1")
1805            .input(one(&p))
1806            .output(out_place(&out))
1807            .action(libpetri_core::action::fork())
1808            .build();
1809        let net = PetriNet::builder("test").transition(t).build();
1810
1811        let mut marking = Marking::new();
1812        for i in 0..100 {
1813            marking.add(&p, Token::at(i, 0));
1814        }
1815
1816        let mut executor =
1817            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1818        executor.run_sync();
1819
1820        assert_eq!(executor.marking().count("p"), 0);
1821        assert_eq!(executor.marking().count("out"), 100);
1822    }
1823
1824    #[test]
1825    fn input_fifo_ordering() {
1826        let p = Place::<i32>::new("p");
1827        let out = Place::<i32>::new("out");
1828        let t = Transition::builder("t1")
1829            .input(one(&p))
1830            .output(out_place(&out))
1831            .action(libpetri_core::action::fork())
1832            .build();
1833        let net = PetriNet::builder("test").transition(t).build();
1834
1835        let mut marking = Marking::new();
1836        marking.add(&p, Token::at(1, 0));
1837        marking.add(&p, Token::at(2, 0));
1838        marking.add(&p, Token::at(3, 0));
1839
1840        let mut executor =
1841            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1842        executor.run_sync();
1843
1844        // All processed, first one should have been consumed first
1845        assert_eq!(executor.marking().count("out"), 3);
1846    }
1847
1848    #[test]
1849    fn inhibitor_unblocked_when_token_removed() {
1850        // p1 has token, p_inh has token (blocks t1)
1851        // t_clear removes from p_inh, then t1 can fire
1852        let p1 = Place::<()>::new("p1");
1853        let p_inh = Place::<()>::new("inh");
1854        let p_out = Place::<()>::new("out");
1855        let p_trigger = Place::<()>::new("trigger");
1856
1857        // t_clear: consumes from inh, outputs to trigger
1858        let t_clear = Transition::builder("t_clear")
1859            .input(one(&p_inh))
1860            .output(out_place(&p_trigger))
1861            .action(libpetri_core::action::fork())
1862            .priority(10) // higher priority fires first
1863            .build();
1864
1865        // t1: consumes from p1, inhibited by inh
1866        let t1 = Transition::builder("t1")
1867            .input(one(&p1))
1868            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1869            .output(out_place(&p_out))
1870            .action(libpetri_core::action::fork())
1871            .priority(1)
1872            .build();
1873
1874        let net = PetriNet::builder("test").transitions([t_clear, t1]).build();
1875
1876        let mut marking = Marking::new();
1877        marking.add(&p1, Token::at((), 0));
1878        marking.add(&p_inh, Token::at((), 0));
1879
1880        let mut executor =
1881            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1882        executor.run_sync();
1883
1884        // t_clear fires first (priority 10), removes inh token
1885        // then t1 fires (no longer inhibited)
1886        assert_eq!(executor.marking().count("inh"), 0);
1887        assert_eq!(executor.marking().count("p1"), 0);
1888        assert_eq!(executor.marking().count("out"), 1);
1889    }
1890
1891    #[test]
1892    fn combined_input_read_reset() {
1893        let p_in = Place::<i32>::new("in");
1894        let p_ctx = Place::<String>::new("ctx");
1895        let p_clear = Place::<i32>::new("clear");
1896        let p_out = Place::<String>::new("out");
1897
1898        let t = Transition::builder("t1")
1899            .input(one(&p_in))
1900            .read(libpetri_core::arc::read(&p_ctx))
1901            .reset(libpetri_core::arc::reset(&p_clear))
1902            .output(out_place(&p_out))
1903            .action(libpetri_core::action::sync_action(|ctx| {
1904                let v = ctx.input::<i32>("in")?;
1905                let r = ctx.read::<String>("ctx")?;
1906                ctx.output("out", format!("{v}-{r}"))?;
1907                Ok(())
1908            }))
1909            .build();
1910        let net = PetriNet::builder("test").transition(t).build();
1911
1912        let mut marking = Marking::new();
1913        marking.add(&p_in, Token::at(42, 0));
1914        marking.add(&p_ctx, Token::at("hello".to_string(), 0));
1915        marking.add(&p_clear, Token::at(1, 0));
1916        marking.add(&p_clear, Token::at(2, 0));
1917
1918        let mut executor =
1919            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1920        executor.run_sync();
1921
1922        assert_eq!(executor.marking().count("in"), 0); // consumed
1923        assert_eq!(executor.marking().count("ctx"), 1); // read, not consumed
1924        assert_eq!(executor.marking().count("clear"), 0); // reset
1925        assert_eq!(executor.marking().count("out"), 1);
1926        let peeked = executor.marking().peek(&p_out).unwrap();
1927        assert_eq!(*peeked, "42-hello");
1928    }
1929
1930    #[test]
1931    fn workflow_sequential_chain() {
1932        // p1 -> t1 -> p2 -> t2 -> p3 -> t3 -> p4
1933        // Each transition doubles the value
1934        let p1 = Place::<i32>::new("p1");
1935        let p2 = Place::<i32>::new("p2");
1936        let p3 = Place::<i32>::new("p3");
1937        let p4 = Place::<i32>::new("p4");
1938
1939        let make_doubler = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
1940            let out_name: Arc<str> = Arc::from(outp.name());
1941            Transition::builder(name)
1942                .input(one(inp))
1943                .output(out_place(outp))
1944                .action(libpetri_core::action::sync_action(move |ctx| {
1945                    let v = ctx
1946                        .input::<i32>("p1")
1947                        .or_else(|_| ctx.input::<i32>("p2"))
1948                        .or_else(|_| ctx.input::<i32>("p3"))
1949                        .unwrap();
1950                    ctx.output(&out_name, *v * 2)?;
1951                    Ok(())
1952                }))
1953                .build()
1954        };
1955
1956        let t1 = make_doubler("t1", &p1, &p2);
1957        let t2 = make_doubler("t2", &p2, &p3);
1958        let t3 = make_doubler("t3", &p3, &p4);
1959
1960        let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
1961
1962        let mut marking = Marking::new();
1963        marking.add(&p1, Token::at(1, 0));
1964
1965        let mut executor =
1966            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1967        executor.run_sync();
1968
1969        assert_eq!(executor.marking().count("p4"), 1);
1970        assert_eq!(*executor.marking().peek(&p4).unwrap(), 8); // 1 * 2 * 2 * 2
1971    }
1972
1973    #[test]
1974    fn workflow_fork_join() {
1975        // p_start -> t_fork -> p_a, p_b -> t_join -> p_end
1976        let p_start = Place::<i32>::new("start");
1977        let p_a = Place::<i32>::new("a");
1978        let p_b = Place::<i32>::new("b");
1979        let p_end = Place::<i32>::new("end");
1980
1981        let t_fork = Transition::builder("fork")
1982            .input(one(&p_start))
1983            .output(libpetri_core::output::and(vec![
1984                out_place(&p_a),
1985                out_place(&p_b),
1986            ]))
1987            .action(libpetri_core::action::fork())
1988            .build();
1989
1990        let t_join = Transition::builder("join")
1991            .input(one(&p_a))
1992            .input(one(&p_b))
1993            .output(out_place(&p_end))
1994            .action(libpetri_core::action::sync_action(|ctx| {
1995                let a = ctx.input::<i32>("a")?;
1996                let b = ctx.input::<i32>("b")?;
1997                ctx.output("end", *a + *b)?;
1998                Ok(())
1999            }))
2000            .build();
2001
2002        let net = PetriNet::builder("fork-join")
2003            .transitions([t_fork, t_join])
2004            .build();
2005
2006        let mut marking = Marking::new();
2007        marking.add(&p_start, Token::at(5, 0));
2008
2009        let mut executor =
2010            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2011        executor.run_sync();
2012
2013        assert_eq!(executor.marking().count("start"), 0);
2014        assert_eq!(executor.marking().count("a"), 0);
2015        assert_eq!(executor.marking().count("b"), 0);
2016        assert_eq!(executor.marking().count("end"), 1);
2017        assert_eq!(*executor.marking().peek(&p_end).unwrap(), 10); // 5 + 5
2018    }
2019
2020    #[test]
2021    fn workflow_mutual_exclusion() {
2022        // Two workers compete for a mutex token
2023        let p_mutex = Place::<()>::new("mutex");
2024        let p_w1 = Place::<()>::new("w1");
2025        let p_w2 = Place::<()>::new("w2");
2026        let p_done1 = Place::<()>::new("done1");
2027        let p_done2 = Place::<()>::new("done2");
2028
2029        let t_w1 = Transition::builder("work1")
2030            .input(one(&p_w1))
2031            .input(one(&p_mutex))
2032            .output(libpetri_core::output::and(vec![
2033                out_place(&p_done1),
2034                out_place(&p_mutex), // return mutex
2035            ]))
2036            .action(libpetri_core::action::sync_action(|ctx| {
2037                ctx.output("done1", ())?;
2038                ctx.output("mutex", ())?;
2039                Ok(())
2040            }))
2041            .build();
2042
2043        let t_w2 = Transition::builder("work2")
2044            .input(one(&p_w2))
2045            .input(one(&p_mutex))
2046            .output(libpetri_core::output::and(vec![
2047                out_place(&p_done2),
2048                out_place(&p_mutex), // return mutex
2049            ]))
2050            .action(libpetri_core::action::sync_action(|ctx| {
2051                ctx.output("done2", ())?;
2052                ctx.output("mutex", ())?;
2053                Ok(())
2054            }))
2055            .build();
2056
2057        let net = PetriNet::builder("mutex").transitions([t_w1, t_w2]).build();
2058
2059        let mut marking = Marking::new();
2060        marking.add(&p_mutex, Token::at((), 0));
2061        marking.add(&p_w1, Token::at((), 0));
2062        marking.add(&p_w2, Token::at((), 0));
2063
2064        let mut executor =
2065            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2066        executor.run_sync();
2067
2068        // Both should complete, mutex should be returned
2069        assert_eq!(executor.marking().count("done1"), 1);
2070        assert_eq!(executor.marking().count("done2"), 1);
2071        assert_eq!(executor.marking().count("mutex"), 1); // returned
2072    }
2073
2074    #[test]
2075    fn produce_action() {
2076        let p_in = Place::<()>::new("in");
2077        let p_out = Place::<String>::new("out");
2078
2079        let t = Transition::builder("t1")
2080            .input(one(&p_in))
2081            .output(out_place(&p_out))
2082            .action(libpetri_core::action::produce(
2083                Arc::from("out"),
2084                "produced_value".to_string(),
2085            ))
2086            .build();
2087        let net = PetriNet::builder("test").transition(t).build();
2088
2089        let mut marking = Marking::new();
2090        marking.add(&p_in, Token::at((), 0));
2091
2092        let mut executor =
2093            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2094        executor.run_sync();
2095
2096        assert_eq!(executor.marking().count("out"), 1);
2097    }
2098
2099    #[test]
2100    fn xor_output_fires_one_branch() {
2101        let p = Place::<i32>::new("p");
2102        let a = Place::<i32>::new("a");
2103        let b = Place::<i32>::new("b");
2104
2105        let t = Transition::builder("t1")
2106            .input(one(&p))
2107            .output(libpetri_core::output::xor(vec![
2108                out_place(&a),
2109                out_place(&b),
2110            ]))
2111            .action(libpetri_core::action::sync_action(|ctx| {
2112                let v = ctx.input::<i32>("p")?;
2113                // Output to first branch place
2114                ctx.output("a", *v)?;
2115                Ok(())
2116            }))
2117            .build();
2118        let net = PetriNet::builder("test").transition(t).build();
2119
2120        let mut marking = Marking::new();
2121        marking.add(&p, Token::at(1, 0));
2122
2123        let mut executor =
2124            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2125        executor.run_sync();
2126
2127        // XOR: token goes to one branch
2128        let total = executor.marking().count("a") + executor.marking().count("b");
2129        assert!(total >= 1);
2130    }
2131
2132    #[test]
2133    fn and_output_fires_to_all() {
2134        let p = Place::<i32>::new("p");
2135        let a = Place::<i32>::new("a");
2136        let b = Place::<i32>::new("b");
2137        let c = Place::<i32>::new("c");
2138
2139        let t = Transition::builder("t1")
2140            .input(one(&p))
2141            .output(libpetri_core::output::and(vec![
2142                out_place(&a),
2143                out_place(&b),
2144                out_place(&c),
2145            ]))
2146            .action(libpetri_core::action::sync_action(|ctx| {
2147                let v = ctx.input::<i32>("p")?;
2148                ctx.output("a", *v)?;
2149                ctx.output("b", *v * 10)?;
2150                ctx.output("c", *v * 100)?;
2151                Ok(())
2152            }))
2153            .build();
2154        let net = PetriNet::builder("test").transition(t).build();
2155
2156        let mut marking = Marking::new();
2157        marking.add(&p, Token::at(1, 0));
2158
2159        let mut executor =
2160            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2161        executor.run_sync();
2162
2163        assert_eq!(executor.marking().count("a"), 1);
2164        assert_eq!(executor.marking().count("b"), 1);
2165        assert_eq!(executor.marking().count("c"), 1);
2166        assert_eq!(*executor.marking().peek(&a).unwrap(), 1);
2167        assert_eq!(*executor.marking().peek(&b).unwrap(), 10);
2168        assert_eq!(*executor.marking().peek(&c).unwrap(), 100);
2169    }
2170
2171    #[test]
2172    fn transition_with_no_output_consumes_only() {
2173        let p = Place::<i32>::new("p");
2174
2175        let t = Transition::builder("t1")
2176            .input(one(&p))
2177            .action(libpetri_core::action::sync_action(|ctx| {
2178                let _ = ctx.input::<i32>("p")?;
2179                Ok(())
2180            }))
2181            .build();
2182        let net = PetriNet::builder("test").transition(t).build();
2183
2184        let mut marking = Marking::new();
2185        marking.add(&p, Token::at(42, 0));
2186
2187        let mut executor =
2188            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2189        executor.run_sync();
2190
2191        assert_eq!(executor.marking().count("p"), 0);
2192    }
2193
2194    #[test]
2195    fn multiple_transitions_same_input_compete() {
2196        // Two transitions both need p1 — only one should fire per token
2197        let p1 = Place::<()>::new("p1");
2198        let out_a = Place::<()>::new("a");
2199        let out_b = Place::<()>::new("b");
2200
2201        let t_a = Transition::builder("ta")
2202            .input(one(&p1))
2203            .output(out_place(&out_a))
2204            .action(libpetri_core::action::fork())
2205            .build();
2206        let t_b = Transition::builder("tb")
2207            .input(one(&p1))
2208            .output(out_place(&out_b))
2209            .action(libpetri_core::action::fork())
2210            .build();
2211
2212        let net = PetriNet::builder("test").transitions([t_a, t_b]).build();
2213
2214        let mut marking = Marking::new();
2215        marking.add(&p1, Token::at((), 0));
2216
2217        let mut executor =
2218            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2219        executor.run_sync();
2220
2221        // One token, two competing transitions — exactly one fires
2222        let total = executor.marking().count("a") + executor.marking().count("b");
2223        assert_eq!(total, 1);
2224        assert_eq!(executor.marking().count("p1"), 0);
2225    }
2226
2227    #[test]
2228    fn priority_higher_fires_first() {
2229        let p = Place::<()>::new("p");
2230        let out_hi = Place::<()>::new("hi");
2231        let out_lo = Place::<()>::new("lo");
2232
2233        let t_hi = Transition::builder("hi")
2234            .input(one(&p))
2235            .output(out_place(&out_hi))
2236            .action(libpetri_core::action::fork())
2237            .priority(10)
2238            .build();
2239        let t_lo = Transition::builder("lo")
2240            .input(one(&p))
2241            .output(out_place(&out_lo))
2242            .action(libpetri_core::action::fork())
2243            .priority(1)
2244            .build();
2245
2246        let net = PetriNet::builder("test").transitions([t_hi, t_lo]).build();
2247
2248        let mut marking = Marking::new();
2249        marking.add(&p, Token::at((), 0));
2250
2251        let mut executor =
2252            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2253        executor.run_sync();
2254
2255        // Higher priority should win
2256        assert_eq!(executor.marking().count("hi"), 1);
2257        assert_eq!(executor.marking().count("lo"), 0);
2258    }
2259
2260    #[test]
2261    fn quiescent_when_no_enabled_transitions() {
2262        let p = Place::<i32>::new("p");
2263        let out = Place::<i32>::new("out");
2264
2265        let t = Transition::builder("t1")
2266            .input(one(&p))
2267            .output(out_place(&out))
2268            .action(libpetri_core::action::fork())
2269            .build();
2270        let net = PetriNet::builder("test").transition(t).build();
2271
2272        let mut marking = Marking::new();
2273        marking.add(&p, Token::at(1, 0));
2274
2275        let mut executor =
2276            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2277        executor.run_sync();
2278
2279        assert!(executor.is_quiescent());
2280    }
2281
2282    #[test]
2283    fn event_store_transition_enabled_disabled() {
2284        let p1 = Place::<()>::new("p1");
2285        let p2 = Place::<()>::new("p2");
2286
2287        let t = Transition::builder("t1")
2288            .input(one(&p1))
2289            .output(out_place(&p2))
2290            .action(libpetri_core::action::fork())
2291            .build();
2292        let net = PetriNet::builder("test").transition(t).build();
2293
2294        let mut marking = Marking::new();
2295        marking.add(&p1, Token::at((), 0));
2296
2297        let mut executor =
2298            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2299        executor.run_sync();
2300
2301        let events = executor.event_store().events();
2302        // Should have at least: ExecutionStarted, TransitionEnabled, TransitionStarted,
2303        // TokenAdded, TransitionCompleted, ExecutionCompleted
2304        assert!(events.len() >= 4);
2305
2306        // Check for TransitionEnabled event
2307        let has_enabled = events.iter().any(|e| {
2308            matches!(e, NetEvent::TransitionEnabled { transition_name, .. } if transition_name.as_ref() == "t1")
2309        });
2310        assert!(has_enabled);
2311    }
2312
2313    #[test]
2314    fn diamond_pattern() {
2315        // p1 -> t1 -> p2, p3 -> t2 -> p4 (from p2), t3 -> p4 (from p3)
2316        let p1 = Place::<()>::new("p1");
2317        let p2 = Place::<()>::new("p2");
2318        let p3 = Place::<()>::new("p3");
2319        let p4 = Place::<()>::new("p4");
2320
2321        let t1 = Transition::builder("t1")
2322            .input(one(&p1))
2323            .output(libpetri_core::output::and(vec![
2324                out_place(&p2),
2325                out_place(&p3),
2326            ]))
2327            .action(libpetri_core::action::fork())
2328            .build();
2329        let t2 = Transition::builder("t2")
2330            .input(one(&p2))
2331            .output(out_place(&p4))
2332            .action(libpetri_core::action::fork())
2333            .build();
2334        let t3 = Transition::builder("t3")
2335            .input(one(&p3))
2336            .output(out_place(&p4))
2337            .action(libpetri_core::action::fork())
2338            .build();
2339
2340        let net = PetriNet::builder("diamond")
2341            .transitions([t1, t2, t3])
2342            .build();
2343
2344        let mut marking = Marking::new();
2345        marking.add(&p1, Token::at((), 0));
2346
2347        let mut executor =
2348            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2349        executor.run_sync();
2350
2351        assert_eq!(executor.marking().count("p4"), 2); // both branches produce to p4
2352    }
2353
2354    #[test]
2355    fn self_loop_with_guard_terminates() {
2356        // Transition that loops back, but only fires when value > 0
2357        // Decrements each time, terminates when 0
2358        let p = Place::<i32>::new("p");
2359        let done = Place::<i32>::new("done");
2360
2361        let t = Transition::builder("dec")
2362            .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v > 0))
2363            .output(out_place(&p))
2364            .action(libpetri_core::action::sync_action(|ctx| {
2365                let v = ctx.input::<i32>("p")?;
2366                ctx.output("p", *v - 1)?;
2367                Ok(())
2368            }))
2369            .build();
2370
2371        // When value hits 0, this transition moves it to done
2372        let t_done = Transition::builder("finish")
2373            .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v == 0))
2374            .output(out_place(&done))
2375            .action(libpetri_core::action::fork())
2376            .build();
2377
2378        let net = PetriNet::builder("countdown")
2379            .transitions([t, t_done])
2380            .build();
2381
2382        let mut marking = Marking::new();
2383        marking.add(&p, Token::at(3, 0));
2384
2385        let mut executor =
2386            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2387        executor.run_sync();
2388
2389        assert_eq!(executor.marking().count("done"), 1);
2390        assert_eq!(*executor.marking().peek(&done).unwrap(), 0);
2391    }
2392
2393    #[test]
2394    fn multiple_tokens_different_types() {
2395        let p_int = Place::<i32>::new("ints");
2396        let p_str = Place::<String>::new("strs");
2397        let p_out = Place::<String>::new("out");
2398
2399        let t = Transition::builder("combine")
2400            .input(one(&p_int))
2401            .input(one(&p_str))
2402            .output(out_place(&p_out))
2403            .action(libpetri_core::action::sync_action(|ctx| {
2404                let i = ctx.input::<i32>("ints")?;
2405                let s = ctx.input::<String>("strs")?;
2406                ctx.output("out", format!("{s}-{i}"))?;
2407                Ok(())
2408            }))
2409            .build();
2410
2411        let net = PetriNet::builder("test").transition(t).build();
2412
2413        let mut marking = Marking::new();
2414        marking.add(&p_int, Token::at(42, 0));
2415        marking.add(&p_str, Token::at("hello".to_string(), 0));
2416
2417        let mut executor =
2418            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2419        executor.run_sync();
2420
2421        assert_eq!(*executor.marking().peek(&p_out).unwrap(), "hello-42");
2422    }
2423}
2424
2425#[cfg(all(test, feature = "tokio"))]
2426mod async_tests {
2427    use super::*;
2428    use crate::environment::{ExecutorSignal, ExternalEvent};
2429    use libpetri_core::action::{ActionError, async_action, fork};
2430    use libpetri_core::input::one;
2431    use libpetri_core::output::out_place;
2432    use libpetri_core::place::Place;
2433    use libpetri_core::token::{ErasedToken, Token};
2434    use libpetri_core::transition::Transition;
2435    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
2436
2437    #[tokio::test]
2438    async fn async_fork_single_transition() {
2439        let p1 = Place::<i32>::new("p1");
2440        let p2 = Place::<i32>::new("p2");
2441
2442        let t1 = Transition::builder("t1")
2443            .input(one(&p1))
2444            .output(out_place(&p2))
2445            .action(fork())
2446            .build();
2447
2448        let net = PetriNet::builder("test").transition(t1).build();
2449        let mut marking = Marking::new();
2450        marking.add(&p1, Token::at(42, 0));
2451
2452        let mut executor =
2453            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2454
2455        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2456        executor.run_async(rx).await;
2457
2458        assert_eq!(executor.marking().count("p2"), 1);
2459        assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
2460    }
2461
2462    #[tokio::test]
2463    async fn async_action_produces_output() {
2464        let p1 = Place::<i32>::new("p1");
2465        let p2 = Place::<i32>::new("p2");
2466
2467        let action = async_action(|mut ctx| async move {
2468            let val: i32 = *ctx.input::<i32>("p1")?;
2469            ctx.output("p2", val * 10)?;
2470            Ok(ctx)
2471        });
2472
2473        let t1 = Transition::builder("t1")
2474            .input(one(&p1))
2475            .output(out_place(&p2))
2476            .action(action)
2477            .build();
2478
2479        let net = PetriNet::builder("test").transition(t1).build();
2480        let mut marking = Marking::new();
2481        marking.add(&p1, Token::at(5, 0));
2482
2483        let mut executor =
2484            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2485
2486        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2487        executor.run_async(rx).await;
2488
2489        assert_eq!(executor.marking().count("p2"), 1);
2490        assert_eq!(*executor.marking().peek(&p2).unwrap(), 50);
2491    }
2492
2493    #[tokio::test]
2494    async fn async_chain_two_transitions() {
2495        let p1 = Place::<i32>::new("p1");
2496        let p2 = Place::<i32>::new("p2");
2497        let p3 = Place::<i32>::new("p3");
2498
2499        let t1 = Transition::builder("t1")
2500            .input(one(&p1))
2501            .output(out_place(&p2))
2502            .action(fork())
2503            .build();
2504
2505        let t2 = Transition::builder("t2")
2506            .input(one(&p2))
2507            .output(out_place(&p3))
2508            .action(fork())
2509            .build();
2510
2511        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
2512        let mut marking = Marking::new();
2513        marking.add(&p1, Token::at(99, 0));
2514
2515        let mut executor =
2516            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2517
2518        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2519        executor.run_async(rx).await;
2520
2521        assert_eq!(executor.marking().count("p3"), 1);
2522        assert_eq!(*executor.marking().peek(&p3).unwrap(), 99);
2523    }
2524
2525    #[tokio::test]
2526    async fn async_event_injection() {
2527        let p1 = Place::<i32>::new("p1");
2528        let p2 = Place::<i32>::new("p2");
2529
2530        let t1 = Transition::builder("t1")
2531            .input(one(&p1))
2532            .output(out_place(&p2))
2533            .action(fork())
2534            .build();
2535
2536        let net = PetriNet::builder("test").transition(t1).build();
2537        let marking = Marking::new(); // empty — no initial tokens
2538
2539        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2540            &net,
2541            marking,
2542            ExecutorOptions {
2543                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2544            },
2545        );
2546
2547        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2548
2549        // Inject a token after a short delay, then close the channel
2550        tokio::spawn(async move {
2551            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2552            let token = Token::at(77, 0);
2553            tx.send(ExecutorSignal::Event(ExternalEvent {
2554                place_name: Arc::from("p1"),
2555                token: ErasedToken::from_typed(&token),
2556            }))
2557            .unwrap();
2558            // Drop tx to close the channel and let executor terminate
2559        });
2560
2561        executor.run_async(rx).await;
2562
2563        assert_eq!(executor.marking().count("p2"), 1);
2564        assert_eq!(*executor.marking().peek(&p2).unwrap(), 77);
2565    }
2566
2567    #[tokio::test]
2568    async fn async_with_event_store_records_events() {
2569        let p1 = Place::<i32>::new("p1");
2570        let p2 = Place::<i32>::new("p2");
2571
2572        let t1 = Transition::builder("t1")
2573            .input(one(&p1))
2574            .output(out_place(&p2))
2575            .action(fork())
2576            .build();
2577
2578        let net = PetriNet::builder("test").transition(t1).build();
2579        let mut marking = Marking::new();
2580        marking.add(&p1, Token::at(1, 0));
2581
2582        let mut executor =
2583            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2584
2585        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2586        executor.run_async(rx).await;
2587
2588        let events = executor.event_store().events();
2589        assert!(!events.is_empty());
2590        // Should have ExecutionStarted, TransitionStarted, TransitionCompleted, ExecutionCompleted
2591        assert!(
2592            events
2593                .iter()
2594                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2595        );
2596        assert!(
2597            events
2598                .iter()
2599                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2600        );
2601        assert!(
2602            events
2603                .iter()
2604                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2605        );
2606        assert!(
2607            events
2608                .iter()
2609                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2610        );
2611    }
2612
2613    #[tokio::test]
2614    async fn async_action_error_does_not_crash() {
2615        let p1 = Place::<i32>::new("p1");
2616        let p2 = Place::<i32>::new("p2");
2617
2618        let action =
2619            async_action(|_ctx| async move { Err(ActionError::new("intentional failure")) });
2620
2621        let t1 = Transition::builder("t1")
2622            .input(one(&p1))
2623            .output(out_place(&p2))
2624            .action(action)
2625            .build();
2626
2627        let net = PetriNet::builder("test").transition(t1).build();
2628        let mut marking = Marking::new();
2629        marking.add(&p1, Token::at(1, 0));
2630
2631        let mut executor =
2632            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2633
2634        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2635        executor.run_async(rx).await;
2636
2637        // Token consumed but no output produced
2638        assert_eq!(executor.marking().count("p1"), 0);
2639        assert_eq!(executor.marking().count("p2"), 0);
2640
2641        // Failure event recorded
2642        let events = executor.event_store().events();
2643        assert!(
2644            events
2645                .iter()
2646                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2647        );
2648    }
2649
2650    #[tokio::test]
2651    async fn async_delayed_timing() {
2652        use libpetri_core::timing::delayed;
2653
2654        let p1 = Place::<i32>::new("p1");
2655        let p2 = Place::<i32>::new("p2");
2656
2657        let t1 = Transition::builder("t1")
2658            .input(one(&p1))
2659            .output(out_place(&p2))
2660            .timing(delayed(100))
2661            .action(fork())
2662            .build();
2663
2664        let net = PetriNet::builder("test").transition(t1).build();
2665        let mut marking = Marking::new();
2666        marking.add(&p1, Token::at(10, 0));
2667
2668        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2669            &net,
2670            marking,
2671            ExecutorOptions {
2672                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2673            },
2674        );
2675
2676        let start = std::time::Instant::now();
2677        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2678        // Keep channel open long enough for the delayed transition to fire
2679        tokio::spawn(async move {
2680            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2681            drop(tx);
2682        });
2683        executor.run_async(rx).await;
2684
2685        assert!(
2686            start.elapsed().as_millis() >= 80,
2687            "delayed(100) should wait ~100ms"
2688        );
2689        assert_eq!(executor.marking().count("p2"), 1);
2690        assert_eq!(*executor.marking().peek(&p2).unwrap(), 10);
2691    }
2692
2693    #[tokio::test]
2694    async fn async_exact_timing() {
2695        use libpetri_core::timing::exact;
2696
2697        let p1 = Place::<i32>::new("p1");
2698        let p2 = Place::<i32>::new("p2");
2699
2700        let t1 = Transition::builder("t1")
2701            .input(one(&p1))
2702            .output(out_place(&p2))
2703            .timing(exact(100))
2704            .action(fork())
2705            .build();
2706
2707        let net = PetriNet::builder("test").transition(t1).build();
2708        let mut marking = Marking::new();
2709        marking.add(&p1, Token::at(20, 0));
2710
2711        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2712            &net,
2713            marking,
2714            ExecutorOptions {
2715                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2716            },
2717        );
2718
2719        let start = std::time::Instant::now();
2720        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2721        tokio::spawn(async move {
2722            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2723            drop(tx);
2724        });
2725        executor.run_async(rx).await;
2726
2727        assert!(
2728            start.elapsed().as_millis() >= 80,
2729            "exact(100) should wait ~100ms"
2730        );
2731        assert_eq!(executor.marking().count("p2"), 1);
2732        assert_eq!(*executor.marking().peek(&p2).unwrap(), 20);
2733    }
2734
2735    #[tokio::test]
2736    async fn async_window_timing() {
2737        use libpetri_core::timing::window;
2738
2739        let p1 = Place::<i32>::new("p1");
2740        let p2 = Place::<i32>::new("p2");
2741
2742        let t1 = Transition::builder("t1")
2743            .input(one(&p1))
2744            .output(out_place(&p2))
2745            .timing(window(50, 200))
2746            .action(fork())
2747            .build();
2748
2749        let net = PetriNet::builder("test").transition(t1).build();
2750        let mut marking = Marking::new();
2751        marking.add(&p1, Token::at(30, 0));
2752
2753        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2754            &net,
2755            marking,
2756            ExecutorOptions {
2757                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2758            },
2759        );
2760
2761        let start = std::time::Instant::now();
2762        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2763        tokio::spawn(async move {
2764            tokio::time::sleep(std::time::Duration::from_millis(400)).await;
2765            drop(tx);
2766        });
2767        executor.run_async(rx).await;
2768
2769        assert!(
2770            start.elapsed().as_millis() >= 40,
2771            "window(50,200) should wait >= ~50ms"
2772        );
2773        assert_eq!(executor.marking().count("p2"), 1);
2774        assert_eq!(*executor.marking().peek(&p2).unwrap(), 30);
2775    }
2776
2777    #[tokio::test]
2778    async fn async_deadline_enforcement() {
2779        use libpetri_core::action::sync_action;
2780        use libpetri_core::timing::window;
2781
2782        let p_slow = Place::<i32>::new("p_slow");
2783        let p_windowed = Place::<i32>::new("p_windowed");
2784        let slow_out = Place::<i32>::new("slow_out");
2785        let windowed_out = Place::<i32>::new("windowed_out");
2786
2787        // Sync action that busy-waits for 200ms, blocking the executor thread.
2788        // This prevents the executor from reaching the fire phase for windowed
2789        // until after its deadline has passed.
2790        let t_slow = Transition::builder("slow")
2791            .input(one(&p_slow))
2792            .output(out_place(&slow_out))
2793            .priority(10)
2794            .action(sync_action(|ctx| {
2795                let v = ctx.input::<i32>("p_slow")?;
2796                let start = std::time::Instant::now();
2797                while start.elapsed().as_millis() < 200 {
2798                    std::hint::spin_loop();
2799                }
2800                ctx.output("slow_out", *v)?;
2801                Ok(())
2802            }))
2803            .build();
2804
2805        // Windowed transition: enabled at start, earliest=50ms, deadline=100ms.
2806        // Because the slow sync action blocks the executor for 200ms, by the time
2807        // enforce_deadlines runs again, elapsed (~200ms) > latest (100ms) + tolerance.
2808        let t_windowed = Transition::builder("windowed")
2809            .input(one(&p_windowed))
2810            .output(out_place(&windowed_out))
2811            .timing(window(50, 100))
2812            .action(fork())
2813            .build();
2814
2815        let net = PetriNet::builder("test")
2816            .transitions([t_slow, t_windowed])
2817            .build();
2818
2819        let mut marking = Marking::new();
2820        marking.add(&p_slow, Token::at(1, 0));
2821        marking.add(&p_windowed, Token::at(2, 0));
2822
2823        let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
2824            &net,
2825            marking,
2826            ExecutorOptions {
2827                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2828            },
2829        );
2830
2831        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2832        tokio::spawn(async move {
2833            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2834            drop(tx);
2835        });
2836        executor.run_async(rx).await;
2837
2838        // Slow fires and completes after 200ms busy-wait
2839        assert_eq!(executor.marking().count("slow_out"), 1);
2840        // Windowed should have been disabled by deadline enforcement
2841        assert_eq!(
2842            executor.marking().count("windowed_out"),
2843            0,
2844            "windowed transition should have been disabled by deadline"
2845        );
2846
2847        let events = executor.event_store().events();
2848        assert!(
2849            events.iter().any(|e| matches!(e, NetEvent::TransitionTimedOut { transition_name, .. } if &**transition_name == "windowed")),
2850            "expected TransitionTimedOut event for 'windowed'"
2851        );
2852    }
2853
2854    #[tokio::test]
2855    async fn async_multiple_injections() {
2856        let p1 = Place::<i32>::new("p1");
2857        let p2 = Place::<i32>::new("p2");
2858
2859        let t1 = Transition::builder("t1")
2860            .input(one(&p1))
2861            .output(out_place(&p2))
2862            .action(fork())
2863            .build();
2864
2865        let net = PetriNet::builder("test").transition(t1).build();
2866        let marking = Marking::new();
2867
2868        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2869            &net,
2870            marking,
2871            ExecutorOptions {
2872                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2873            },
2874        );
2875
2876        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2877
2878        tokio::spawn(async move {
2879            for i in 0..5 {
2880                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2881                let token = Token::at(i, 0);
2882                tx.send(ExecutorSignal::Event(ExternalEvent {
2883                    place_name: Arc::from("p1"),
2884                    token: ErasedToken::from_typed(&token),
2885                }))
2886                .unwrap();
2887            }
2888            // Drop tx to close channel
2889        });
2890
2891        executor.run_async(rx).await;
2892
2893        assert_eq!(
2894            executor.marking().count("p2"),
2895            5,
2896            "all 5 injected tokens should arrive"
2897        );
2898    }
2899
2900    #[tokio::test]
2901    async fn async_parallel_execution() {
2902        let p1 = Place::<i32>::new("p1");
2903        let p2 = Place::<i32>::new("p2");
2904        let p3 = Place::<i32>::new("p3");
2905        let out1 = Place::<i32>::new("out1");
2906        let out2 = Place::<i32>::new("out2");
2907        let out3 = Place::<i32>::new("out3");
2908
2909        let make_transition = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
2910            Transition::builder(name)
2911                .input(one(inp))
2912                .output(out_place(outp))
2913                .action(async_action(|mut ctx| async move {
2914                    let v: i32 =
2915                        *ctx.input::<i32>(ctx.transition_name().replace("t", "p").as_str())?;
2916                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2917                    ctx.output(&ctx.transition_name().replace("t", "out"), v)?;
2918                    Ok(ctx)
2919                }))
2920                .build()
2921        };
2922
2923        let t1 = make_transition("t1", &p1, &out1);
2924        let t2 = make_transition("t2", &p2, &out2);
2925        let t3 = make_transition("t3", &p3, &out3);
2926
2927        let net = PetriNet::builder("parallel")
2928            .transitions([t1, t2, t3])
2929            .build();
2930        let mut marking = Marking::new();
2931        marking.add(&p1, Token::at(1, 0));
2932        marking.add(&p2, Token::at(2, 0));
2933        marking.add(&p3, Token::at(3, 0));
2934
2935        let mut executor =
2936            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2937
2938        let start = std::time::Instant::now();
2939        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2940        executor.run_async(rx).await;
2941        let elapsed = start.elapsed().as_millis();
2942
2943        assert_eq!(executor.marking().count("out1"), 1);
2944        assert_eq!(executor.marking().count("out2"), 1);
2945        assert_eq!(executor.marking().count("out3"), 1);
2946        assert!(
2947            elapsed < 250,
2948            "parallel execution should take < 250ms, took {elapsed}ms"
2949        );
2950    }
2951
2952    #[tokio::test]
2953    async fn async_sequential_chain_order() {
2954        use std::sync::Mutex;
2955
2956        let p1 = Place::<i32>::new("p1");
2957        let p2 = Place::<i32>::new("p2");
2958        let p3 = Place::<i32>::new("p3");
2959        let p4 = Place::<i32>::new("p4");
2960
2961        let order: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
2962
2963        let make_chain = |name: &str,
2964                          inp: &Place<i32>,
2965                          outp: &Place<i32>,
2966                          id: i32,
2967                          order: Arc<Mutex<Vec<i32>>>| {
2968            let inp_name: Arc<str> = Arc::from(inp.name());
2969            let outp_name: Arc<str> = Arc::from(outp.name());
2970            Transition::builder(name)
2971                .input(one(inp))
2972                .output(out_place(outp))
2973                .action(async_action(move |mut ctx| {
2974                    let order = Arc::clone(&order);
2975                    let inp_name = Arc::clone(&inp_name);
2976                    let outp_name = Arc::clone(&outp_name);
2977                    async move {
2978                        let v: i32 = *ctx.input::<i32>(&inp_name)?;
2979                        order.lock().unwrap().push(id);
2980                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2981                        ctx.output(&outp_name, v)?;
2982                        Ok(ctx)
2983                    }
2984                }))
2985                .build()
2986        };
2987
2988        let t1 = make_chain("t1", &p1, &p2, 1, Arc::clone(&order));
2989        let t2 = make_chain("t2", &p2, &p3, 2, Arc::clone(&order));
2990        let t3 = make_chain("t3", &p3, &p4, 3, Arc::clone(&order));
2991
2992        let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
2993        let mut marking = Marking::new();
2994        marking.add(&p1, Token::at(1, 0));
2995
2996        let mut executor =
2997            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2998
2999        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3000        executor.run_async(rx).await;
3001
3002        assert_eq!(executor.marking().count("p4"), 1);
3003        let recorded = order.lock().unwrap().clone();
3004        assert_eq!(recorded, vec![1, 2, 3], "chain should execute in order");
3005    }
3006
3007    #[tokio::test]
3008    async fn async_fork_join() {
3009        use libpetri_core::output::and;
3010
3011        let p1 = Place::<i32>::new("p1");
3012        let p2 = Place::<i32>::new("p2");
3013        let p3 = Place::<i32>::new("p3");
3014        let p4 = Place::<i32>::new("p4");
3015
3016        // Fork: p1 -> (p2, p3) via AND output
3017        let t_fork = Transition::builder("fork")
3018            .input(one(&p1))
3019            .output(and(vec![out_place(&p2), out_place(&p3)]))
3020            .action(libpetri_core::action::sync_action(|ctx| {
3021                let v = ctx.input::<i32>("p1")?;
3022                ctx.output("p2", *v)?;
3023                ctx.output("p3", *v)?;
3024                Ok(())
3025            }))
3026            .build();
3027
3028        // Join: (p2, p3) -> p4
3029        let t_join = Transition::builder("join")
3030            .input(one(&p2))
3031            .input(one(&p3))
3032            .output(out_place(&p4))
3033            .action(libpetri_core::action::sync_action(|ctx| {
3034                let a = ctx.input::<i32>("p2")?;
3035                let b = ctx.input::<i32>("p3")?;
3036                ctx.output("p4", *a + *b)?;
3037                Ok(())
3038            }))
3039            .build();
3040
3041        let net = PetriNet::builder("fork_join")
3042            .transitions([t_fork, t_join])
3043            .build();
3044
3045        let mut marking = Marking::new();
3046        marking.add(&p1, Token::at(5, 0));
3047
3048        let mut executor =
3049            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3050
3051        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3052        executor.run_async(rx).await;
3053
3054        assert_eq!(executor.marking().count("p2"), 0);
3055        assert_eq!(executor.marking().count("p3"), 0);
3056        assert_eq!(executor.marking().count("p4"), 1);
3057        assert_eq!(*executor.marking().peek(&p4).unwrap(), 10);
3058    }
3059
3060    #[tokio::test]
3061    async fn async_xor_output_branching() {
3062        use libpetri_core::output::xor;
3063
3064        let p = Place::<i32>::new("p");
3065        let left = Place::<i32>::new("left");
3066        let right = Place::<i32>::new("right");
3067
3068        let t = Transition::builder("xor_t")
3069            .input(one(&p))
3070            .output(xor(vec![out_place(&left), out_place(&right)]))
3071            .action(libpetri_core::action::sync_action(|ctx| {
3072                let v = ctx.input::<i32>("p")?;
3073                if *v > 0 {
3074                    ctx.output("left", *v)?;
3075                } else {
3076                    ctx.output("right", *v)?;
3077                }
3078                Ok(())
3079            }))
3080            .build();
3081
3082        let net = PetriNet::builder("xor_test").transition(t).build();
3083
3084        // Test positive value goes left
3085        let mut marking = Marking::new();
3086        marking.add(&p, Token::at(42, 0));
3087
3088        let mut executor =
3089            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3090        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3091        executor.run_async(rx).await;
3092
3093        assert_eq!(executor.marking().count("left"), 1);
3094        assert_eq!(executor.marking().count("right"), 0);
3095        assert_eq!(*executor.marking().peek(&left).unwrap(), 42);
3096    }
3097
3098    #[tokio::test]
3099    async fn async_loop_with_guard() {
3100        use libpetri_core::input::one_guarded;
3101
3102        let counter = Place::<i32>::new("counter");
3103        let done = Place::<i32>::new("done");
3104
3105        // Loop transition: counter -> counter (increment), guarded to fire when < 3
3106        let t_loop = Transition::builder("loop")
3107            .input(one_guarded(&counter, |v: &i32| *v < 3))
3108            .output(out_place(&counter))
3109            .action(libpetri_core::action::sync_action(|ctx| {
3110                let v = ctx.input::<i32>("counter")?;
3111                ctx.output("counter", *v + 1)?;
3112                Ok(())
3113            }))
3114            .build();
3115
3116        // Exit transition: counter -> done, guarded to fire when >= 3
3117        let t_exit = Transition::builder("exit")
3118            .input(one_guarded(&counter, |v: &i32| *v >= 3))
3119            .output(out_place(&done))
3120            .action(fork())
3121            .build();
3122
3123        let net = PetriNet::builder("loop_net")
3124            .transitions([t_loop, t_exit])
3125            .build();
3126
3127        let mut marking = Marking::new();
3128        marking.add(&counter, Token::at(0, 0));
3129
3130        let mut executor =
3131            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3132
3133        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3134        executor.run_async(rx).await;
3135
3136        assert_eq!(executor.marking().count("done"), 1);
3137        assert_eq!(*executor.marking().peek(&done).unwrap(), 3);
3138    }
3139
3140    #[tokio::test]
3141    async fn async_delayed_fires_without_injection() {
3142        use libpetri_core::timing::delayed;
3143
3144        let p1 = Place::<i32>::new("p1");
3145        let p2 = Place::<i32>::new("p2");
3146
3147        let t1 = Transition::builder("t1")
3148            .input(one(&p1))
3149            .output(out_place(&p2))
3150            .timing(delayed(100))
3151            .action(fork())
3152            .build();
3153
3154        let net = PetriNet::builder("test").transition(t1).build();
3155        let mut marking = Marking::new();
3156        marking.add(&p1, Token::at(7, 0));
3157
3158        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3159            &net,
3160            marking,
3161            ExecutorOptions {
3162                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3163            },
3164        );
3165
3166        // No injections; keep channel open long enough for the delayed transition to fire
3167        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3168        tokio::spawn(async move {
3169            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3170            drop(tx);
3171        });
3172        executor.run_async(rx).await;
3173
3174        assert_eq!(executor.marking().count("p2"), 1);
3175        assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3176    }
3177
3178    #[tokio::test]
3179    #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3180    async fn async_timeout_produces_timeout_token() {
3181        use libpetri_core::output::{timeout_place, xor};
3182
3183        let p1 = Place::<i32>::new("p1");
3184        let success = Place::<i32>::new("success");
3185        let timeout_out = Place::<i32>::new("timeout_out");
3186
3187        let t1 = Transition::builder("t1")
3188            .input(one(&p1))
3189            .output(xor(vec![
3190                out_place(&success),
3191                timeout_place(50, &timeout_out),
3192            ]))
3193            .action(async_action(|mut ctx| async move {
3194                let v: i32 = *ctx.input::<i32>("p1")?;
3195                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3196                ctx.output("success", v)?;
3197                Ok(ctx)
3198            }))
3199            .build();
3200
3201        let net = PetriNet::builder("test").transition(t1).build();
3202        let mut marking = Marking::new();
3203        marking.add(&p1, Token::at(1, 0));
3204
3205        let mut executor =
3206            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3207
3208        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3209        executor.run_async(rx).await;
3210
3211        assert_eq!(executor.marking().count("timeout_out"), 1);
3212        assert_eq!(executor.marking().count("success"), 0);
3213    }
3214
3215    #[tokio::test]
3216    #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3217    async fn async_timeout_normal_when_fast() {
3218        use libpetri_core::output::{timeout_place, xor};
3219
3220        let p1 = Place::<i32>::new("p1");
3221        let success = Place::<i32>::new("success");
3222        let timeout_out = Place::<i32>::new("timeout_out");
3223
3224        let t1 = Transition::builder("t1")
3225            .input(one(&p1))
3226            .output(xor(vec![
3227                out_place(&success),
3228                timeout_place(500, &timeout_out),
3229            ]))
3230            .action(async_action(|mut ctx| async move {
3231                let v: i32 = *ctx.input::<i32>("p1")?;
3232                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3233                ctx.output("success", v)?;
3234                Ok(ctx)
3235            }))
3236            .build();
3237
3238        let net = PetriNet::builder("test").transition(t1).build();
3239        let mut marking = Marking::new();
3240        marking.add(&p1, Token::at(1, 0));
3241
3242        let mut executor =
3243            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3244
3245        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3246        executor.run_async(rx).await;
3247
3248        assert_eq!(executor.marking().count("success"), 1);
3249        assert_eq!(executor.marking().count("timeout_out"), 0);
3250    }
3251
3252    #[tokio::test]
3253    async fn async_event_store_records_token_added_for_injection() {
3254        let p1 = Place::<i32>::new("p1");
3255        let p2 = Place::<i32>::new("p2");
3256
3257        let t1 = Transition::builder("t1")
3258            .input(one(&p1))
3259            .output(out_place(&p2))
3260            .action(fork())
3261            .build();
3262
3263        let net = PetriNet::builder("test").transition(t1).build();
3264        let marking = Marking::new();
3265
3266        let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
3267            &net,
3268            marking,
3269            ExecutorOptions {
3270                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3271            },
3272        );
3273
3274        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3275
3276        tokio::spawn(async move {
3277            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3278            let token = Token::at(99, 0);
3279            tx.send(ExecutorSignal::Event(ExternalEvent {
3280                place_name: Arc::from("p1"),
3281                token: ErasedToken::from_typed(&token),
3282            }))
3283            .unwrap();
3284        });
3285
3286        executor.run_async(rx).await;
3287
3288        let events = executor.event_store().events();
3289        // Should have TokenAdded for the injected token into p1
3290        assert!(
3291            events.iter().any(
3292                |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p1")
3293            ),
3294            "expected TokenAdded event for injected token into p1"
3295        );
3296        // And also TokenAdded for the output into p2
3297        assert!(
3298            events.iter().any(
3299                |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p2")
3300            ),
3301            "expected TokenAdded event for output token into p2"
3302        );
3303    }
3304
3305    #[tokio::test]
3306    async fn async_inhibitor_blocks_in_async() {
3307        let p1 = Place::<()>::new("p1");
3308        let p2 = Place::<()>::new("p2");
3309        let p_inh = Place::<()>::new("inh");
3310
3311        let t = Transition::builder("t1")
3312            .input(one(&p1))
3313            .output(out_place(&p2))
3314            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
3315            .action(fork())
3316            .build();
3317
3318        let net = PetriNet::builder("inhibitor").transition(t).build();
3319
3320        let mut marking = Marking::new();
3321        marking.add(&p1, Token::at((), 0));
3322        marking.add(&p_inh, Token::at((), 0));
3323
3324        let mut executor =
3325            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3326
3327        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3328        executor.run_async(rx).await;
3329
3330        // Inhibitor should block — token remains in p1, nothing in p2
3331        assert_eq!(executor.marking().count("p1"), 1);
3332        assert_eq!(executor.marking().count("p2"), 0);
3333    }
3334
3335    // ==================== Drain/Close lifecycle tests ====================
3336
3337    #[tokio::test]
3338    async fn async_drain_terminates_at_quiescence() {
3339        let p1 = Place::<i32>::new("p1");
3340        let p2 = Place::<i32>::new("p2");
3341
3342        let t1 = Transition::builder("t1")
3343            .input(one(&p1))
3344            .output(out_place(&p2))
3345            .action(fork())
3346            .build();
3347
3348        let net = PetriNet::builder("test").transition(t1).build();
3349        let marking = Marking::new();
3350
3351        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3352            &net,
3353            marking,
3354            ExecutorOptions {
3355                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3356            },
3357        );
3358
3359        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3360
3361        tokio::spawn(async move {
3362            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3363            // Inject a token, then drain
3364            tx.send(ExecutorSignal::Event(ExternalEvent {
3365                place_name: Arc::from("p1"),
3366                token: ErasedToken::from_typed(&Token::at(42, 0)),
3367            }))
3368            .unwrap();
3369            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3370            tx.send(ExecutorSignal::Drain).unwrap();
3371        });
3372
3373        executor.run_async(rx).await;
3374
3375        // The injected token should have been processed
3376        assert_eq!(executor.marking().count("p2"), 1);
3377        assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
3378    }
3379
3380    #[tokio::test]
3381    async fn async_drain_rejects_post_drain_events() {
3382        let p1 = Place::<i32>::new("p1");
3383        let p2 = Place::<i32>::new("p2");
3384
3385        let t1 = Transition::builder("t1")
3386            .input(one(&p1))
3387            .output(out_place(&p2))
3388            .action(fork())
3389            .build();
3390
3391        let net = PetriNet::builder("test").transition(t1).build();
3392        let marking = Marking::new();
3393
3394        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3395            &net,
3396            marking,
3397            ExecutorOptions {
3398                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3399            },
3400        );
3401
3402        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3403
3404        tokio::spawn(async move {
3405            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3406            // Send Drain first, then an event — event should be discarded
3407            tx.send(ExecutorSignal::Drain).unwrap();
3408            tx.send(ExecutorSignal::Event(ExternalEvent {
3409                place_name: Arc::from("p1"),
3410                token: ErasedToken::from_typed(&Token::at(99, 0)),
3411            }))
3412            .unwrap();
3413        });
3414
3415        executor.run_async(rx).await;
3416
3417        // Event sent after drain should not have been processed
3418        assert_eq!(executor.marking().count("p2"), 0);
3419    }
3420
3421    #[tokio::test]
3422    async fn async_close_discards_queued_events() {
3423        let p1 = Place::<i32>::new("p1");
3424        let p2 = Place::<i32>::new("p2");
3425
3426        let t1 = Transition::builder("t1")
3427            .input(one(&p1))
3428            .output(out_place(&p2))
3429            .action(fork())
3430            .build();
3431
3432        let net = PetriNet::builder("test").transition(t1).build();
3433        let marking = Marking::new();
3434
3435        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3436            &net,
3437            marking,
3438            ExecutorOptions {
3439                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3440            },
3441        );
3442
3443        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3444
3445        // Queue events then close — close should discard all pending events
3446        tx.send(ExecutorSignal::Event(ExternalEvent {
3447            place_name: Arc::from("p1"),
3448            token: ErasedToken::from_typed(&Token::at(1, 0)),
3449        }))
3450        .unwrap();
3451        tx.send(ExecutorSignal::Close).unwrap();
3452        tx.send(ExecutorSignal::Event(ExternalEvent {
3453            place_name: Arc::from("p1"),
3454            token: ErasedToken::from_typed(&Token::at(2, 0)),
3455        }))
3456        .unwrap();
3457        drop(tx);
3458
3459        executor.run_async(rx).await;
3460
3461        // The first event arrives before Close — it gets processed in the first
3462        // try_recv batch. Close then discards all remaining events.
3463        // So at most 1 token should be in p2 (the one before Close).
3464        assert!(executor.marking().count("p2") <= 1);
3465    }
3466
3467    #[tokio::test]
3468    async fn async_close_after_drain_escalates() {
3469        let p1 = Place::<i32>::new("p1");
3470        let p2 = Place::<i32>::new("p2");
3471
3472        let t1 = Transition::builder("t1")
3473            .input(one(&p1))
3474            .output(out_place(&p2))
3475            .action(fork())
3476            .build();
3477
3478        let net = PetriNet::builder("test").transition(t1).build();
3479        let marking = Marking::new();
3480
3481        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3482            &net,
3483            marking,
3484            ExecutorOptions {
3485                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3486            },
3487        );
3488
3489        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3490
3491        tokio::spawn(async move {
3492            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3493            // Drain first, then escalate to close
3494            tx.send(ExecutorSignal::Drain).unwrap();
3495            tx.send(ExecutorSignal::Close).unwrap();
3496        });
3497
3498        // Executor should terminate — close escalates from drain
3499        executor.run_async(rx).await;
3500        // No assertions needed — the test passes if run_async returns
3501    }
3502
3503    #[tokio::test]
3504    async fn async_handle_raii_drain_on_drop() {
3505        use crate::executor_handle::ExecutorHandle;
3506
3507        let p1 = Place::<i32>::new("p1");
3508        let p2 = Place::<i32>::new("p2");
3509
3510        let t1 = Transition::builder("t1")
3511            .input(one(&p1))
3512            .output(out_place(&p2))
3513            .action(fork())
3514            .build();
3515
3516        let net = PetriNet::builder("test").transition(t1).build();
3517        let marking = Marking::new();
3518
3519        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3520            &net,
3521            marking,
3522            ExecutorOptions {
3523                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3524            },
3525        );
3526
3527        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3528
3529        tokio::spawn(async move {
3530            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3531            let mut handle = ExecutorHandle::new(tx);
3532            handle.inject(Arc::from("p1"), ErasedToken::from_typed(&Token::at(7, 0)));
3533            // handle dropped here — RAII sends Drain automatically
3534        });
3535
3536        executor.run_async(rx).await;
3537
3538        assert_eq!(executor.marking().count("p2"), 1);
3539        assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3540    }
3541}