Skip to main content

libpetri_runtime/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::output::Out;
8use libpetri_core::petri_net::PetriNet;
9use libpetri_core::token::ErasedToken;
10
11use libpetri_event::event_store::EventStore;
12use libpetri_event::net_event::NetEvent;
13
14use crate::bitmap;
15use crate::compiled_net::CompiledNet;
16use crate::marking::Marking;
17
18/// Tolerance for deadline enforcement to account for timer jitter.
19const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21/// Bitmap-based executor for Coloured Time Petri Nets.
22///
23/// Generic over `E: EventStore` for zero-cost noop event recording.
24/// The sync path (`run_sync`) executes inline without any async runtime.
25pub struct BitmapNetExecutor<E: EventStore> {
26    compiled: CompiledNet,
27    marking: Marking,
28    event_store: E,
29    #[allow(dead_code)]
30    environment_places: HashSet<Arc<str>>,
31    has_environment_places: bool,
32
33    // Bitmaps
34    marked_places: Vec<u64>,
35    dirty_set: Vec<u64>,
36    marking_snap_buffer: Vec<u64>,
37    dirty_snap_buffer: Vec<u64>,
38    firing_snap_buffer: Vec<u64>,
39
40    // Per-transition state
41    enabled_at_ms: Vec<f64>,
42    enabled_flags: Vec<bool>,
43    has_deadline_flags: Vec<bool>,
44    enabled_transition_count: usize,
45
46    // Precomputed flags
47    all_immediate: bool,
48    all_same_priority: bool,
49    has_any_deadlines: bool,
50
51    // Pending reset places for clock-restart detection
52    pending_reset_places: HashSet<Arc<str>>,
53    transition_input_place_names: Vec<HashSet<Arc<str>>>,
54
55    start_time: Instant,
56}
57
58/// Options for constructing a BitmapNetExecutor.
59#[derive(Default)]
60pub struct ExecutorOptions {
61    pub environment_places: HashSet<Arc<str>>,
62}
63
64impl<E: EventStore> BitmapNetExecutor<E> {
65    /// Creates a new executor for the given net with initial tokens.
66    pub fn new(net: &PetriNet, initial_tokens: Marking, options: ExecutorOptions) -> Self {
67        let compiled = CompiledNet::compile(net);
68        let word_count = compiled.word_count;
69        let tc = compiled.transition_count;
70        let dirty_word_count = bitmap::word_count(tc);
71
72        let mut has_any_deadlines = false;
73        let mut all_immediate = true;
74        let mut all_same_priority = true;
75        let first_priority = if tc > 0 {
76            compiled.transition(0).priority()
77        } else {
78            0
79        };
80        let mut has_deadline_flags = vec![false; tc];
81
82        for (tid, flag) in has_deadline_flags.iter_mut().enumerate() {
83            let t = compiled.transition(tid);
84            if t.timing().has_deadline() {
85                *flag = true;
86                has_any_deadlines = true;
87            }
88            if *t.timing() != libpetri_core::timing::Timing::Immediate {
89                all_immediate = false;
90            }
91            if t.priority() != first_priority {
92                all_same_priority = false;
93            }
94        }
95
96        // Precompute input place names per transition
97        let mut transition_input_place_names = Vec::with_capacity(tc);
98        for tid in 0..tc {
99            let t = compiled.transition(tid);
100            let names: HashSet<Arc<str>> = t
101                .input_specs()
102                .iter()
103                .map(|s| Arc::clone(s.place().name_arc()))
104                .collect();
105            transition_input_place_names.push(names);
106        }
107
108        Self {
109            compiled,
110            marking: initial_tokens,
111            event_store: E::default(),
112            has_environment_places: !options.environment_places.is_empty(),
113            environment_places: options.environment_places,
114            marked_places: vec![0u64; word_count],
115            dirty_set: vec![0u64; dirty_word_count],
116            marking_snap_buffer: vec![0u64; word_count],
117            dirty_snap_buffer: vec![0u64; dirty_word_count],
118            firing_snap_buffer: vec![0u64; word_count],
119            enabled_at_ms: vec![f64::NEG_INFINITY; tc],
120            enabled_flags: vec![false; tc],
121            has_deadline_flags,
122            enabled_transition_count: 0,
123            all_immediate,
124            all_same_priority,
125            has_any_deadlines,
126            pending_reset_places: HashSet::new(),
127            transition_input_place_names,
128            start_time: Instant::now(),
129        }
130    }
131
132    /// Runs the executor synchronously until completion.
133    ///
134    /// All transition actions must be sync (is_sync() returns true).
135    /// Returns the final marking.
136    pub fn run_sync(&mut self) -> &Marking {
137        self.initialize_marked_bitmap();
138        self.mark_all_dirty();
139
140        if E::ENABLED {
141            let now = now_millis();
142            self.event_store.append(NetEvent::ExecutionStarted {
143                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
144                timestamp: now,
145            });
146        }
147
148        loop {
149            self.update_dirty_transitions();
150
151            let cycle_now = self.elapsed_ms();
152
153            if self.has_any_deadlines {
154                self.enforce_deadlines(cycle_now);
155            }
156
157            if self.should_terminate() {
158                break;
159            }
160
161            if self.all_immediate && self.all_same_priority {
162                self.fire_ready_immediate_sync();
163            } else {
164                self.fire_ready_general_sync(cycle_now);
165            }
166
167            // If nothing is dirty anymore and nothing is enabled, we're done
168            if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
169                break;
170            }
171        }
172
173        if E::ENABLED {
174            let now = now_millis();
175            self.event_store.append(NetEvent::ExecutionCompleted {
176                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
177                timestamp: now,
178            });
179        }
180
181        &self.marking
182    }
183
184    /// Returns a reference to the current marking.
185    pub fn marking(&self) -> &Marking {
186        &self.marking
187    }
188
189    /// Returns a reference to the event store.
190    pub fn event_store(&self) -> &E {
191        &self.event_store
192    }
193
194    /// Returns true if the executor is quiescent (no enabled or in-flight transitions).
195    pub fn is_quiescent(&self) -> bool {
196        self.enabled_transition_count == 0
197    }
198
199    // ======================== Initialize ========================
200
201    fn initialize_marked_bitmap(&mut self) {
202        for pid in 0..self.compiled.place_count {
203            let place = self.compiled.place(pid);
204            if self.marking.has_tokens(place.name()) {
205                bitmap::set_bit(&mut self.marked_places, pid);
206            }
207        }
208    }
209
210    fn mark_all_dirty(&mut self) {
211        let tc = self.compiled.transition_count;
212        let dirty_words = self.dirty_set.len();
213        for w in 0..dirty_words.saturating_sub(1) {
214            self.dirty_set[w] = u64::MAX;
215        }
216        if dirty_words > 0 {
217            let last_word_bits = tc & bitmap::WORD_MASK;
218            self.dirty_set[dirty_words - 1] = if last_word_bits == 0 {
219                u64::MAX
220            } else {
221                (1u64 << last_word_bits) - 1
222            };
223        }
224    }
225
226    fn should_terminate(&self) -> bool {
227        if self.has_environment_places {
228            return false;
229        }
230        self.enabled_transition_count == 0
231    }
232
233    // ======================== Dirty Set Transitions ========================
234
235    fn update_dirty_transitions(&mut self) {
236        let now_ms = self.elapsed_ms();
237
238        // Snapshot marking bitmap
239        self.marking_snap_buffer
240            .copy_from_slice(&self.marked_places);
241
242        // Snapshot and clear dirty set
243        let dirty_words = self.dirty_set.len();
244        for w in 0..dirty_words {
245            self.dirty_snap_buffer[w] = self.dirty_set[w];
246            self.dirty_set[w] = 0;
247        }
248
249        // Collect dirty transition IDs first to avoid borrow conflict
250        let tc = self.compiled.transition_count;
251        let mut dirty_tids = Vec::new();
252        bitmap::for_each_set_bit(&self.dirty_snap_buffer, |tid| {
253            if tid < tc {
254                dirty_tids.push(tid);
255            }
256        });
257
258        let marking_snap = self.marking_snap_buffer.clone();
259        for tid in dirty_tids {
260            let was_enabled = self.enabled_flags[tid];
261            let can_now = self.can_enable(tid, &marking_snap);
262
263            if can_now && !was_enabled {
264                self.enabled_flags[tid] = true;
265                self.enabled_transition_count += 1;
266                self.enabled_at_ms[tid] = now_ms;
267
268                if E::ENABLED {
269                    self.event_store.append(NetEvent::TransitionEnabled {
270                        transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
271                        timestamp: now_millis(),
272                    });
273                }
274            } else if !can_now && was_enabled {
275                self.enabled_flags[tid] = false;
276                self.enabled_transition_count -= 1;
277                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
278            } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
279                self.enabled_at_ms[tid] = now_ms;
280                if E::ENABLED {
281                    self.event_store.append(NetEvent::TransitionClockRestarted {
282                        transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
283                        timestamp: now_millis(),
284                    });
285                }
286            }
287        }
288
289        self.pending_reset_places.clear();
290    }
291
292    fn enforce_deadlines(&mut self, now_ms: f64) {
293        for tid in 0..self.compiled.transition_count {
294            if !self.has_deadline_flags[tid] || !self.enabled_flags[tid] {
295                continue;
296            }
297            let t = self.compiled.transition(tid);
298            let elapsed = now_ms - self.enabled_at_ms[tid];
299            let latest_ms = t.timing().latest() as f64;
300            if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
301                self.enabled_flags[tid] = false;
302                self.enabled_transition_count -= 1;
303                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
304
305                if E::ENABLED {
306                    self.event_store.append(NetEvent::TransitionTimedOut {
307                        transition_name: Arc::clone(t.name_arc()),
308                        timestamp: now_millis(),
309                    });
310                }
311            }
312        }
313    }
314
315    fn can_enable(&self, tid: usize, marking_snap: &[u64]) -> bool {
316        if !self.compiled.can_enable_bitmap(tid, marking_snap) {
317            return false;
318        }
319
320        // Cardinality check
321        if let Some(card_check) = self.compiled.cardinality_check(tid) {
322            for i in 0..card_check.place_ids.len() {
323                let pid = card_check.place_ids[i];
324                let required = card_check.required_counts[i];
325                let place = self.compiled.place(pid);
326                if self.marking.count(place.name()) < required {
327                    return false;
328                }
329            }
330        }
331
332        // Guard check
333        if self.compiled.has_guards(tid) {
334            let t = self.compiled.transition(tid);
335            for spec in t.input_specs() {
336                if let Some(guard) = spec.guard() {
337                    let required = match spec {
338                        In::One { .. } => 1,
339                        In::Exactly { count, .. } => *count,
340                        In::AtLeast { minimum, .. } => *minimum,
341                        In::All { .. } => 1,
342                    };
343                    if self.marking.count_matching(spec.place_name(), &**guard) < required {
344                        return false;
345                    }
346                }
347            }
348        }
349
350        true
351    }
352
353    fn has_input_from_reset_place(&self, tid: usize) -> bool {
354        if self.pending_reset_places.is_empty() {
355            return false;
356        }
357        let input_names = &self.transition_input_place_names[tid];
358        for name in &self.pending_reset_places {
359            if input_names.contains(name) {
360                return true;
361            }
362        }
363        false
364    }
365
366    // ======================== Firing (Sync) ========================
367
368    fn fire_ready_immediate_sync(&mut self) {
369        for tid in 0..self.compiled.transition_count {
370            if !self.enabled_flags[tid] {
371                continue;
372            }
373            if self.can_enable(tid, &self.marked_places.clone()) {
374                self.fire_transition_sync(tid);
375            } else {
376                self.enabled_flags[tid] = false;
377                self.enabled_transition_count -= 1;
378                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
379            }
380        }
381    }
382
383    fn fire_ready_general_sync(&mut self, now_ms: f64) {
384        // Collect ready transitions
385        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
386        for tid in 0..self.compiled.transition_count {
387            if !self.enabled_flags[tid] {
388                continue;
389            }
390            let t = self.compiled.transition(tid);
391            let enabled_ms = self.enabled_at_ms[tid];
392            let elapsed = now_ms - enabled_ms;
393            let earliest_ms = t.timing().earliest() as f64;
394            if earliest_ms <= elapsed {
395                ready.push((tid, t.priority(), enabled_ms));
396            }
397        }
398        if ready.is_empty() {
399            return;
400        }
401
402        // Sort: higher priority first, then earlier enablement (FIFO)
403        ready.sort_by(|a, b| {
404            b.1.cmp(&a.1)
405                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
406        });
407
408        // Take fresh snapshot
409        self.firing_snap_buffer.copy_from_slice(&self.marked_places);
410
411        for (tid, _, _) in ready {
412            if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
413                self.fire_transition_sync(tid);
414                self.firing_snap_buffer.copy_from_slice(&self.marked_places);
415            } else {
416                self.enabled_flags[tid] = false;
417                self.enabled_transition_count -= 1;
418                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
419            }
420        }
421    }
422
423    fn fire_transition_sync(&mut self, tid: usize) {
424        // Clone all needed data from the transition to release the borrow on self.compiled
425        let t = self.compiled.transition(tid);
426        let transition_name = Arc::clone(t.name_arc());
427        let input_specs: Vec<In> = t.input_specs().to_vec();
428        let read_arcs: Vec<_> = t.reads().to_vec();
429        let reset_arcs: Vec<_> = t.resets().to_vec();
430        let output_place_names: HashSet<Arc<str>> = t
431            .output_places()
432            .iter()
433            .map(|p| Arc::clone(p.name_arc()))
434            .collect();
435        let action = Arc::clone(t.action());
436        // t (reference) is no longer used after this point
437
438        // Consume tokens based on input specs
439        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
440        for in_spec in &input_specs {
441            let place_name = in_spec.place_name();
442            let to_consume = match in_spec {
443                In::One { .. } => 1,
444                In::Exactly { count, .. } => *count,
445                In::All { guard, .. } | In::AtLeast { guard, .. } => {
446                    if guard.is_some() {
447                        self.marking
448                            .count_matching(place_name, &**guard.as_ref().unwrap())
449                    } else {
450                        self.marking.count(place_name)
451                    }
452                }
453            };
454
455            let place_name_arc = Arc::clone(in_spec.place().name_arc());
456            for _ in 0..to_consume {
457                let token = if let Some(guard) = in_spec.guard() {
458                    self.marking.remove_matching(place_name, &**guard)
459                } else {
460                    self.marking.remove_first(place_name)
461                };
462                if let Some(token) = token {
463                    if E::ENABLED {
464                        self.event_store.append(NetEvent::TokenRemoved {
465                            place_name: Arc::clone(&place_name_arc),
466                            timestamp: now_millis(),
467                        });
468                    }
469                    inputs
470                        .entry(Arc::clone(&place_name_arc))
471                        .or_default()
472                        .push(token);
473                }
474            }
475        }
476
477        // Read arcs (peek, don't consume)
478        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
479        for arc in &read_arcs {
480            if let Some(queue) = self.marking.queue(arc.place.name())
481                && let Some(token) = queue.front()
482            {
483                read_tokens
484                    .entry(Arc::clone(arc.place.name_arc()))
485                    .or_default()
486                    .push(token.clone());
487            }
488        }
489
490        // Reset arcs
491        for arc in &reset_arcs {
492            let removed = self.marking.remove_all(arc.place.name());
493            self.pending_reset_places
494                .insert(Arc::clone(arc.place.name_arc()));
495            if E::ENABLED {
496                for _ in &removed {
497                    self.event_store.append(NetEvent::TokenRemoved {
498                        place_name: Arc::clone(arc.place.name_arc()),
499                        timestamp: now_millis(),
500                    });
501                }
502            }
503        }
504
505        // Update bitmap after consumption
506        self.update_bitmap_after_consumption(tid);
507
508        if E::ENABLED {
509            self.event_store.append(NetEvent::TransitionStarted {
510                transition_name: Arc::clone(&transition_name),
511                timestamp: now_millis(),
512            });
513        }
514
515        // Create context and run action
516        let mut ctx = TransitionContext::new(
517            Arc::clone(&transition_name),
518            inputs,
519            read_tokens,
520            output_place_names,
521            None,
522        );
523
524        let result = action.run_sync(&mut ctx);
525
526        match result {
527            Ok(()) => {
528                // Process outputs
529                let outputs = ctx.take_outputs();
530                self.process_outputs(tid, &transition_name, outputs);
531
532                if E::ENABLED {
533                    self.event_store.append(NetEvent::TransitionCompleted {
534                        transition_name: Arc::clone(&transition_name),
535                        timestamp: now_millis(),
536                    });
537                }
538            }
539            Err(err) => {
540                if E::ENABLED {
541                    self.event_store.append(NetEvent::TransitionFailed {
542                        transition_name: Arc::clone(&transition_name),
543                        error: err.message,
544                        timestamp: now_millis(),
545                    });
546                }
547            }
548        }
549
550        // Mark transition as no longer enabled (it just fired)
551        self.enabled_flags[tid] = false;
552        self.enabled_transition_count -= 1;
553        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
554
555        // Mark this transition dirty for re-evaluation
556        self.mark_transition_dirty(tid);
557    }
558
559    fn process_outputs(
560        &mut self,
561        _tid: usize,
562        _transition_name: &Arc<str>,
563        outputs: Vec<OutputEntry>,
564    ) {
565        for entry in outputs {
566            self.marking.add_erased(&entry.place_name, entry.token);
567
568            if let Some(pid) = self.compiled.place_id(&entry.place_name) {
569                bitmap::set_bit(&mut self.marked_places, pid);
570                self.mark_dirty(pid);
571            }
572
573            if E::ENABLED {
574                self.event_store.append(NetEvent::TokenAdded {
575                    place_name: Arc::clone(&entry.place_name),
576                    timestamp: now_millis(),
577                });
578            }
579        }
580    }
581
582    fn update_bitmap_after_consumption(&mut self, tid: usize) {
583        let consumption_pids: Vec<usize> = self.compiled.consumption_place_ids(tid).to_vec();
584        for pid in consumption_pids {
585            let place = self.compiled.place(pid);
586            if !self.marking.has_tokens(place.name()) {
587                bitmap::clear_bit(&mut self.marked_places, pid);
588            }
589            self.mark_dirty(pid);
590        }
591    }
592
593    // ======================== Dirty Set Helpers ========================
594
595    fn has_dirty_bits(&self) -> bool {
596        !bitmap::is_empty(&self.dirty_set)
597    }
598
599    fn mark_dirty(&mut self, pid: usize) {
600        let tids: Vec<usize> = self.compiled.affected_transitions(pid).to_vec();
601        for tid in tids {
602            self.mark_transition_dirty(tid);
603        }
604    }
605
606    fn mark_transition_dirty(&mut self, tid: usize) {
607        bitmap::set_bit(&mut self.dirty_set, tid);
608    }
609
610    fn elapsed_ms(&self) -> f64 {
611        self.start_time.elapsed().as_secs_f64() * 1000.0
612    }
613}
614
615#[cfg(feature = "tokio")]
616use crate::environment::ExecutorSignal;
617
618/// Completion message sent by async actions back to the executor.
619#[cfg(feature = "tokio")]
620struct ActionCompletion {
621    transition_name: Arc<str>,
622    result: Result<Vec<OutputEntry>, String>,
623}
624
625#[cfg(feature = "tokio")]
626impl<E: EventStore> BitmapNetExecutor<E> {
627    /// Runs the executor asynchronously with tokio.
628    ///
629    /// Supports both sync and async transition actions. Sync actions execute
630    /// inline; async actions are spawned as tokio tasks and their completions
631    /// are collected via an mpsc channel.
632    ///
633    /// External events are injected via [`ExecutorSignal::Event`]. Lifecycle
634    /// signals [`ExecutorSignal::Drain`] and [`ExecutorSignal::Close`] control
635    /// graceful and immediate shutdown respectively. Use
636    /// [`ExecutorHandle`](crate::executor_handle::ExecutorHandle) for RAII-managed
637    /// lifecycle with automatic drain on drop.
638    ///
639    /// Returns the final marking when the executor quiesces or is closed.
640    pub async fn run_async(
641        &mut self,
642        mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
643    ) -> &Marking {
644        let (completion_tx, mut completion_rx) =
645            tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
646
647        self.initialize_marked_bitmap();
648        self.mark_all_dirty();
649
650        let mut in_flight_count: usize = 0;
651        let mut signal_channel_open = true;
652        let mut draining = false;
653        let mut closed = false;
654
655        if E::ENABLED {
656            let now = now_millis();
657            self.event_store.append(NetEvent::ExecutionStarted {
658                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
659                timestamp: now,
660            });
661        }
662
663        loop {
664            // Phase 1: Process completed async actions
665            while let Ok(completion) = completion_rx.try_recv() {
666                in_flight_count -= 1;
667                match completion.result {
668                    Ok(outputs) => {
669                        self.process_outputs(0, &completion.transition_name, outputs);
670                        if E::ENABLED {
671                            self.event_store.append(NetEvent::TransitionCompleted {
672                                transition_name: Arc::clone(&completion.transition_name),
673                                timestamp: now_millis(),
674                            });
675                        }
676                    }
677                    Err(err) => {
678                        if E::ENABLED {
679                            self.event_store.append(NetEvent::TransitionFailed {
680                                transition_name: Arc::clone(&completion.transition_name),
681                                error: err,
682                                timestamp: now_millis(),
683                            });
684                        }
685                    }
686                }
687            }
688
689            // Phase 2: Process signals (external events + lifecycle)
690            while let Ok(signal) = signal_rx.try_recv() {
691                match signal {
692                    ExecutorSignal::Event(event) if !draining => {
693                        self.marking.add_erased(&event.place_name, event.token);
694                        if let Some(pid) = self.compiled.place_id(&event.place_name) {
695                            bitmap::set_bit(&mut self.marked_places, pid);
696                            self.mark_dirty(pid);
697                        }
698                        if E::ENABLED {
699                            self.event_store.append(NetEvent::TokenAdded {
700                                place_name: Arc::clone(&event.place_name),
701                                timestamp: now_millis(),
702                            });
703                        }
704                    }
705                    ExecutorSignal::Event(_) => {
706                        // Draining: discard events arriving after drain signal
707                    }
708                    ExecutorSignal::Drain => {
709                        draining = true;
710                    }
711                    ExecutorSignal::Close => {
712                        closed = true;
713                        draining = true;
714                        // Discard remaining queued signals per ENV-013.
715                        // Note: events already processed earlier in this try_recv batch
716                        // are kept (single-channel design); Java/TS discard all queued
717                        // events via an atomic flag checked at processExternalEvents() entry.
718                        while signal_rx.try_recv().is_ok() {}
719                    }
720                }
721            }
722
723            // Phase 3: Update dirty transitions
724            self.update_dirty_transitions();
725
726            // Phase 4: Enforce deadlines
727            let cycle_now = self.elapsed_ms();
728            if self.has_any_deadlines {
729                self.enforce_deadlines(cycle_now);
730            }
731
732            // Termination check — O(1) flag checks
733            if closed && in_flight_count == 0 {
734                break; // ENV-013: immediate close, in-flight completed
735            }
736            if draining
737                && self.enabled_transition_count == 0
738                && in_flight_count == 0
739            {
740                break; // ENV-011: graceful drain, quiescent
741            }
742            if self.enabled_transition_count == 0
743                && in_flight_count == 0
744                && (!self.has_environment_places || !signal_channel_open)
745            {
746                break; // Standard termination
747            }
748
749            // Phase 5: Fire ready transitions
750            let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
751
752            // If we fired something or have dirty bits, loop immediately
753            if fired || self.has_dirty_bits() {
754                // Yield to let spawned tasks run
755                tokio::task::yield_now().await;
756                continue;
757            }
758
759            // Phase 6: Await work (completion, external event, or timer)
760            if in_flight_count == 0 && !self.has_environment_places {
761                break;
762            }
763            if in_flight_count == 0 && (draining || !signal_channel_open) {
764                break;
765            }
766
767            let timer_ms = self.millis_until_next_timed_transition();
768
769            tokio::select! {
770                Some(completion) = completion_rx.recv() => {
771                    in_flight_count -= 1;
772                    match completion.result {
773                        Ok(outputs) => {
774                            self.process_outputs(0, &completion.transition_name, outputs);
775                            if E::ENABLED {
776                                self.event_store.append(NetEvent::TransitionCompleted {
777                                    transition_name: Arc::clone(&completion.transition_name),
778                                    timestamp: now_millis(),
779                                });
780                            }
781                        }
782                        Err(err) => {
783                            if E::ENABLED {
784                                self.event_store.append(NetEvent::TransitionFailed {
785                                    transition_name: Arc::clone(&completion.transition_name),
786                                    error: err,
787                                    timestamp: now_millis(),
788                                });
789                            }
790                        }
791                    }
792                }
793                result = signal_rx.recv(), if signal_channel_open && !closed => {
794                    match result {
795                        Some(ExecutorSignal::Event(event)) if !draining => {
796                            self.marking.add_erased(&event.place_name, event.token);
797                            if let Some(pid) = self.compiled.place_id(&event.place_name) {
798                                bitmap::set_bit(&mut self.marked_places, pid);
799                                self.mark_dirty(pid);
800                            }
801                            if E::ENABLED {
802                                self.event_store.append(NetEvent::TokenAdded {
803                                    place_name: Arc::clone(&event.place_name),
804                                    timestamp: now_millis(),
805                                });
806                            }
807                        }
808                        Some(ExecutorSignal::Event(_)) => {
809                            // Draining: discard events
810                        }
811                        Some(ExecutorSignal::Drain) => {
812                            draining = true;
813                        }
814                        Some(ExecutorSignal::Close) => {
815                            closed = true;
816                            draining = true;
817                            while signal_rx.try_recv().is_ok() {}
818                        }
819                        None => {
820                            signal_channel_open = false;
821                        }
822                    }
823                }
824                _ = tokio::time::sleep(std::time::Duration::from_millis(
825                    if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
826                )) => {
827                    // Timer fired — re-evaluate transitions
828                }
829            }
830        }
831
832        if E::ENABLED {
833            let now = now_millis();
834            self.event_store.append(NetEvent::ExecutionCompleted {
835                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
836                timestamp: now,
837            });
838        }
839
840        &self.marking
841    }
842
843    /// Fires ready transitions, dispatching async actions via tokio::spawn.
844    /// Returns true if any transitions were fired.
845    fn fire_ready_async(
846        &mut self,
847        now_ms: f64,
848        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
849        in_flight_count: &mut usize,
850    ) -> bool {
851        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
852        for tid in 0..self.compiled.transition_count {
853            if !self.enabled_flags[tid] {
854                continue;
855            }
856            let t = self.compiled.transition(tid);
857            let enabled_ms = self.enabled_at_ms[tid];
858            let elapsed = now_ms - enabled_ms;
859            let earliest_ms = t.timing().earliest() as f64;
860            if earliest_ms <= elapsed {
861                ready.push((tid, t.priority(), enabled_ms));
862            }
863        }
864        if ready.is_empty() {
865            return false;
866        }
867
868        ready.sort_by(|a, b| {
869            b.1.cmp(&a.1)
870                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
871        });
872
873        self.firing_snap_buffer.copy_from_slice(&self.marked_places);
874
875        let mut fired_any = false;
876        for (tid, _, _) in ready {
877            if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
878                self.fire_transition_async(tid, completion_tx, in_flight_count);
879                self.firing_snap_buffer.copy_from_slice(&self.marked_places);
880                fired_any = true;
881            } else {
882                self.enabled_flags[tid] = false;
883                self.enabled_transition_count -= 1;
884                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
885            }
886        }
887        fired_any
888    }
889
890    /// Fires a single transition, either sync inline or async via tokio::spawn.
891    fn fire_transition_async(
892        &mut self,
893        tid: usize,
894        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
895        in_flight_count: &mut usize,
896    ) {
897        let t = self.compiled.transition(tid);
898        let transition_name = Arc::clone(t.name_arc());
899        let input_specs: Vec<In> = t.input_specs().to_vec();
900        let read_arcs: Vec<_> = t.reads().to_vec();
901        let reset_arcs: Vec<_> = t.resets().to_vec();
902        let output_place_names: HashSet<Arc<str>> = t
903            .output_places()
904            .iter()
905            .map(|p| Arc::clone(p.name_arc()))
906            .collect();
907        let action = Arc::clone(t.action());
908        let is_sync = action.is_sync();
909
910        // Consume tokens
911        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
912        for in_spec in &input_specs {
913            let place_name = in_spec.place_name();
914            let to_consume = match in_spec {
915                In::One { .. } => 1,
916                In::Exactly { count, .. } => *count,
917                In::All { guard, .. } | In::AtLeast { guard, .. } => {
918                    if guard.is_some() {
919                        self.marking
920                            .count_matching(place_name, &**guard.as_ref().unwrap())
921                    } else {
922                        self.marking.count(place_name)
923                    }
924                }
925            };
926
927            let place_name_arc = Arc::clone(in_spec.place().name_arc());
928            for _ in 0..to_consume {
929                let token = if let Some(guard) = in_spec.guard() {
930                    self.marking.remove_matching(place_name, &**guard)
931                } else {
932                    self.marking.remove_first(place_name)
933                };
934                if let Some(token) = token {
935                    if E::ENABLED {
936                        self.event_store.append(NetEvent::TokenRemoved {
937                            place_name: Arc::clone(&place_name_arc),
938                            timestamp: now_millis(),
939                        });
940                    }
941                    inputs
942                        .entry(Arc::clone(&place_name_arc))
943                        .or_default()
944                        .push(token);
945                }
946            }
947        }
948
949        // Read arcs
950        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
951        for arc in &read_arcs {
952            if let Some(queue) = self.marking.queue(arc.place.name())
953                && let Some(token) = queue.front()
954            {
955                read_tokens
956                    .entry(Arc::clone(arc.place.name_arc()))
957                    .or_default()
958                    .push(token.clone());
959            }
960        }
961
962        // Reset arcs
963        for arc in &reset_arcs {
964            let removed = self.marking.remove_all(arc.place.name());
965            self.pending_reset_places
966                .insert(Arc::clone(arc.place.name_arc()));
967            if E::ENABLED {
968                for _ in &removed {
969                    self.event_store.append(NetEvent::TokenRemoved {
970                        place_name: Arc::clone(arc.place.name_arc()),
971                        timestamp: now_millis(),
972                    });
973                }
974            }
975        }
976
977        self.update_bitmap_after_consumption(tid);
978
979        if E::ENABLED {
980            self.event_store.append(NetEvent::TransitionStarted {
981                transition_name: Arc::clone(&transition_name),
982                timestamp: now_millis(),
983            });
984        }
985
986        // Mark transition as no longer enabled
987        self.enabled_flags[tid] = false;
988        self.enabled_transition_count -= 1;
989        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
990        self.mark_transition_dirty(tid);
991
992        if is_sync {
993            // Inline sync execution
994            let mut ctx = TransitionContext::new(
995                Arc::clone(&transition_name),
996                inputs,
997                read_tokens,
998                output_place_names,
999                None,
1000            );
1001            let result = action.run_sync(&mut ctx);
1002            match result {
1003                Ok(()) => {
1004                    let outputs = ctx.take_outputs();
1005                    self.process_outputs(tid, &transition_name, outputs);
1006                    if E::ENABLED {
1007                        self.event_store.append(NetEvent::TransitionCompleted {
1008                            transition_name: Arc::clone(&transition_name),
1009                            timestamp: now_millis(),
1010                        });
1011                    }
1012                }
1013                Err(err) => {
1014                    if E::ENABLED {
1015                        self.event_store.append(NetEvent::TransitionFailed {
1016                            transition_name: Arc::clone(&transition_name),
1017                            error: err.message,
1018                            timestamp: now_millis(),
1019                        });
1020                    }
1021                }
1022            }
1023        } else {
1024            // Async: spawn tokio task
1025            *in_flight_count += 1;
1026            let tx = completion_tx.clone();
1027            let name = Arc::clone(&transition_name);
1028            let ctx = TransitionContext::new(
1029                Arc::clone(&transition_name),
1030                inputs,
1031                read_tokens,
1032                output_place_names,
1033                None,
1034            );
1035            tokio::spawn(async move {
1036                let result = action.run_async(ctx).await;
1037                let completion = match result {
1038                    Ok(mut completed_ctx) => ActionCompletion {
1039                        transition_name: Arc::clone(&name),
1040                        result: Ok(completed_ctx.take_outputs()),
1041                    },
1042                    Err(err) => ActionCompletion {
1043                        transition_name: Arc::clone(&name),
1044                        result: Err(err.message),
1045                    },
1046                };
1047                let _ = tx.send(completion);
1048            });
1049        }
1050    }
1051
1052    /// Computes milliseconds until the next timed transition needs attention.
1053    fn millis_until_next_timed_transition(&self) -> f64 {
1054        let mut min_wait = f64::INFINITY;
1055        let now_ms = self.elapsed_ms();
1056
1057        for tid in 0..self.compiled.transition_count {
1058            if !self.enabled_flags[tid] {
1059                continue;
1060            }
1061            let t = self.compiled.transition(tid);
1062            let elapsed = now_ms - self.enabled_at_ms[tid];
1063
1064            let earliest_ms = t.timing().earliest() as f64;
1065            let remaining_earliest = earliest_ms - elapsed;
1066            if remaining_earliest <= 0.0 {
1067                return 0.0;
1068            }
1069            min_wait = min_wait.min(remaining_earliest);
1070
1071            if self.has_deadline_flags[tid] {
1072                let latest_ms = t.timing().latest() as f64;
1073                let remaining_deadline = latest_ms - elapsed;
1074                if remaining_deadline <= 0.0 {
1075                    return 0.0;
1076                }
1077                min_wait = min_wait.min(remaining_deadline);
1078            }
1079        }
1080
1081        min_wait
1082    }
1083}
1084
1085fn now_millis() -> u64 {
1086    std::time::SystemTime::now()
1087        .duration_since(std::time::UNIX_EPOCH)
1088        .unwrap_or_default()
1089        .as_millis() as u64
1090}
1091
1092/// Validates that the produced outputs satisfy the output spec.
1093#[allow(dead_code)]
1094fn validate_out_spec(out: &Out, produced_places: &HashSet<Arc<str>>) -> bool {
1095    match out {
1096        Out::Place(p) => produced_places.contains(p.name()),
1097        Out::And(children) => children
1098            .iter()
1099            .all(|c| validate_out_spec(c, produced_places)),
1100        Out::Xor(children) => children
1101            .iter()
1102            .any(|c| validate_out_spec(c, produced_places)),
1103        Out::Timeout { child, .. } => validate_out_spec(child, produced_places),
1104        Out::ForwardInput { to, .. } => produced_places.contains(to.name()),
1105    }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use super::*;
1111    use libpetri_core::action::passthrough;
1112    use libpetri_core::input::one;
1113    use libpetri_core::output::out_place;
1114    use libpetri_core::place::Place;
1115    use libpetri_core::token::Token;
1116    use libpetri_core::transition::Transition;
1117    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1118
1119    fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1120        let p1 = Place::<i32>::new("p1");
1121        let p2 = Place::<i32>::new("p2");
1122        let p3 = Place::<i32>::new("p3");
1123
1124        let t1 = Transition::builder("t1")
1125            .input(one(&p1))
1126            .output(out_place(&p2))
1127            .action(passthrough())
1128            .build();
1129        let t2 = Transition::builder("t2")
1130            .input(one(&p2))
1131            .output(out_place(&p3))
1132            .action(passthrough())
1133            .build();
1134
1135        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1136        (net, p1, p2, p3)
1137    }
1138
1139    #[test]
1140    fn sync_passthrough_chain() {
1141        let (net, p1, _p2, _p3) = simple_chain();
1142
1143        let mut marking = Marking::new();
1144        marking.add(&p1, Token::at(42, 0));
1145
1146        let mut executor =
1147            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1148        let result = executor.run_sync();
1149
1150        // Token should flow p1 -> p2 -> p3
1151        // passthrough produces no outputs, so token stays consumed
1152        // Actually passthrough produces nothing, so p2 and p3 will be empty
1153        assert_eq!(result.count("p1"), 0);
1154    }
1155
1156    #[test]
1157    fn sync_with_event_store() {
1158        let (net, p1, _, _) = simple_chain();
1159
1160        let mut marking = Marking::new();
1161        marking.add(&p1, Token::at(42, 0));
1162
1163        let mut executor =
1164            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1165        executor.run_sync();
1166
1167        let store = executor.event_store();
1168        assert!(!store.is_empty());
1169    }
1170
1171    #[test]
1172    fn sync_fork_chain() {
1173        let p1 = Place::<i32>::new("p1");
1174        let p2 = Place::<i32>::new("p2");
1175        let p3 = Place::<i32>::new("p3");
1176
1177        let t1 = Transition::builder("t1")
1178            .input(one(&p1))
1179            .output(libpetri_core::output::and(vec![
1180                out_place(&p2),
1181                out_place(&p3),
1182            ]))
1183            .action(libpetri_core::action::fork())
1184            .build();
1185
1186        let net = PetriNet::builder("fork").transition(t1).build();
1187
1188        let mut marking = Marking::new();
1189        marking.add(&p1, Token::at(42, 0));
1190
1191        let mut executor =
1192            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1193        let result = executor.run_sync();
1194
1195        // Fork copies input to all outputs
1196        assert_eq!(result.count("p1"), 0);
1197        assert_eq!(result.count("p2"), 1);
1198        assert_eq!(result.count("p3"), 1);
1199    }
1200
1201    #[test]
1202    fn sync_no_initial_tokens() {
1203        let (net, _, _, _) = simple_chain();
1204        let marking = Marking::new();
1205        let mut executor =
1206            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1207        let result = executor.run_sync();
1208        assert_eq!(result.count("p1"), 0);
1209        assert_eq!(result.count("p2"), 0);
1210        assert_eq!(result.count("p3"), 0);
1211    }
1212
1213    #[test]
1214    fn sync_priority_ordering() {
1215        let p = Place::<()>::new("p");
1216        let out_a = Place::<()>::new("a");
1217        let out_b = Place::<()>::new("b");
1218
1219        // t_high has priority 10, t_low has priority 1
1220        // Both consume from p, but t_high should fire first
1221        let t_high = Transition::builder("t_high")
1222            .input(one(&p))
1223            .output(out_place(&out_a))
1224            .action(libpetri_core::action::passthrough())
1225            .priority(10)
1226            .build();
1227        let t_low = Transition::builder("t_low")
1228            .input(one(&p))
1229            .output(out_place(&out_b))
1230            .action(libpetri_core::action::passthrough())
1231            .priority(1)
1232            .build();
1233
1234        let net = PetriNet::builder("priority")
1235            .transitions([t_high, t_low])
1236            .build();
1237
1238        let mut marking = Marking::new();
1239        marking.add(&p, Token::at((), 0));
1240
1241        let mut executor =
1242            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1243        executor.run_sync();
1244
1245        // Only one token, high priority should have consumed it
1246        // Since passthrough doesn't produce output, both outputs empty
1247        // but p should be empty (consumed by the higher priority transition)
1248        assert_eq!(executor.marking().count("p"), 0);
1249    }
1250
1251    #[test]
1252    fn sync_inhibitor_blocks() {
1253        let p1 = Place::<()>::new("p1");
1254        let p2 = Place::<()>::new("p2");
1255        let p_inh = Place::<()>::new("inh");
1256
1257        let t = Transition::builder("t1")
1258            .input(one(&p1))
1259            .output(out_place(&p2))
1260            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1261            .action(libpetri_core::action::passthrough())
1262            .build();
1263
1264        let net = PetriNet::builder("inhibitor").transition(t).build();
1265
1266        let mut marking = Marking::new();
1267        marking.add(&p1, Token::at((), 0));
1268        marking.add(&p_inh, Token::at((), 0));
1269
1270        let mut executor =
1271            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1272        executor.run_sync();
1273
1274        // Inhibitor should block — token remains in p1
1275        assert_eq!(executor.marking().count("p1"), 1);
1276    }
1277
1278    #[test]
1279    fn sync_linear_chain_5() {
1280        // Build a chain: p0 -> t0 -> p1 -> t1 -> ... -> p5
1281        let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1282        let transitions: Vec<Transition> = (0..5)
1283            .map(|i| {
1284                Transition::builder(format!("t{i}"))
1285                    .input(one(&places[i]))
1286                    .output(out_place(&places[i + 1]))
1287                    .action(libpetri_core::action::fork())
1288                    .build()
1289            })
1290            .collect();
1291
1292        let net = PetriNet::builder("chain5").transitions(transitions).build();
1293
1294        let mut marking = Marking::new();
1295        marking.add(&places[0], Token::at(1, 0));
1296
1297        let mut executor =
1298            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1299        let result = executor.run_sync();
1300
1301        // Token should flow through all 5 transitions to p5
1302        assert_eq!(result.count("p0"), 0);
1303        assert_eq!(result.count("p5"), 1);
1304    }
1305
1306    #[test]
1307    fn input_arc_requires_token_to_enable() {
1308        let p1 = Place::<i32>::new("p1");
1309        let p2 = Place::<i32>::new("p2");
1310        let t = Transition::builder("t1")
1311            .input(one(&p1))
1312            .output(out_place(&p2))
1313            .action(libpetri_core::action::fork())
1314            .build();
1315        let net = PetriNet::builder("test").transition(t).build();
1316
1317        // No tokens — transition should not fire
1318        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1319            &net,
1320            Marking::new(),
1321            ExecutorOptions::default(),
1322        );
1323        executor.run_sync();
1324        assert_eq!(executor.marking().count("p1"), 0);
1325        assert_eq!(executor.marking().count("p2"), 0);
1326    }
1327
1328    #[test]
1329    fn multiple_input_arcs_require_all_tokens() {
1330        let p1 = Place::<i32>::new("p1");
1331        let p2 = Place::<i32>::new("p2");
1332        let p3 = Place::<i32>::new("p3");
1333
1334        let t = Transition::builder("t1")
1335            .input(one(&p1))
1336            .input(one(&p2))
1337            .output(out_place(&p3))
1338            .action(libpetri_core::action::sync_action(|ctx| {
1339                ctx.output("p3", 99i32)?;
1340                Ok(())
1341            }))
1342            .build();
1343        let net = PetriNet::builder("test").transition(t).build();
1344
1345        // Only p1 has a token — should not fire
1346        let mut marking = Marking::new();
1347        marking.add(&p1, Token::at(1, 0));
1348        let mut executor =
1349            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1350        executor.run_sync();
1351        assert_eq!(executor.marking().count("p1"), 1);
1352        assert_eq!(executor.marking().count("p3"), 0);
1353
1354        // Both p1 and p2 have tokens — should fire
1355        let mut marking = Marking::new();
1356        marking.add(&p1, Token::at(1, 0));
1357        marking.add(&p2, Token::at(2, 0));
1358        let mut executor =
1359            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1360        executor.run_sync();
1361        assert_eq!(executor.marking().count("p1"), 0);
1362        assert_eq!(executor.marking().count("p2"), 0);
1363        assert_eq!(executor.marking().count("p3"), 1);
1364    }
1365
1366    #[test]
1367    fn read_arc_does_not_consume() {
1368        let p_in = Place::<i32>::new("in");
1369        let p_ctx = Place::<i32>::new("ctx");
1370        let p_out = Place::<i32>::new("out");
1371
1372        let t = Transition::builder("t1")
1373            .input(one(&p_in))
1374            .read(libpetri_core::arc::read(&p_ctx))
1375            .output(out_place(&p_out))
1376            .action(libpetri_core::action::sync_action(|ctx| {
1377                let v = ctx.input::<i32>("in")?;
1378                let r = ctx.read::<i32>("ctx")?;
1379                ctx.output("out", *v + *r)?;
1380                Ok(())
1381            }))
1382            .build();
1383        let net = PetriNet::builder("test").transition(t).build();
1384
1385        let mut marking = Marking::new();
1386        marking.add(&p_in, Token::at(10, 0));
1387        marking.add(&p_ctx, Token::at(5, 0));
1388
1389        let mut executor =
1390            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1391        executor.run_sync();
1392
1393        assert_eq!(executor.marking().count("in"), 0); // consumed
1394        assert_eq!(executor.marking().count("ctx"), 1); // NOT consumed (read arc)
1395        assert_eq!(executor.marking().count("out"), 1);
1396    }
1397
1398    #[test]
1399    fn reset_arc_removes_all_tokens() {
1400        let p_in = Place::<()>::new("in");
1401        let p_reset = Place::<i32>::new("reset");
1402        let p_out = Place::<()>::new("out");
1403
1404        let t = Transition::builder("t1")
1405            .input(one(&p_in))
1406            .reset(libpetri_core::arc::reset(&p_reset))
1407            .output(out_place(&p_out))
1408            .action(libpetri_core::action::fork())
1409            .build();
1410        let net = PetriNet::builder("test").transition(t).build();
1411
1412        let mut marking = Marking::new();
1413        marking.add(&p_in, Token::at((), 0));
1414        marking.add(&p_reset, Token::at(1, 0));
1415        marking.add(&p_reset, Token::at(2, 0));
1416        marking.add(&p_reset, Token::at(3, 0));
1417
1418        let mut executor =
1419            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1420        executor.run_sync();
1421
1422        assert_eq!(executor.marking().count("reset"), 0); // all cleared
1423        assert_eq!(executor.marking().count("out"), 1);
1424    }
1425
1426    #[test]
1427    fn exactly_cardinality_consumes_n() {
1428        let p = Place::<i32>::new("p");
1429        let p_out = Place::<i32>::new("out");
1430
1431        let t = Transition::builder("t1")
1432            .input(libpetri_core::input::exactly(3, &p))
1433            .output(out_place(&p_out))
1434            .action(libpetri_core::action::sync_action(|ctx| {
1435                let vals = ctx.inputs::<i32>("p")?;
1436                for v in vals {
1437                    ctx.output("out", *v)?;
1438                }
1439                Ok(())
1440            }))
1441            .build();
1442        let net = PetriNet::builder("test").transition(t).build();
1443
1444        let mut marking = Marking::new();
1445        for i in 0..5 {
1446            marking.add(&p, Token::at(i, 0));
1447        }
1448
1449        let mut executor =
1450            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1451        executor.run_sync();
1452
1453        // Consumed 3 of 5, produced 3
1454        assert_eq!(executor.marking().count("p"), 2);
1455        assert_eq!(executor.marking().count("out"), 3);
1456    }
1457
1458    #[test]
1459    fn all_cardinality_consumes_everything() {
1460        let p = Place::<i32>::new("p");
1461        let p_out = Place::<()>::new("out");
1462
1463        let t = Transition::builder("t1")
1464            .input(libpetri_core::input::all(&p))
1465            .output(out_place(&p_out))
1466            .action(libpetri_core::action::sync_action(|ctx| {
1467                let vals = ctx.inputs::<i32>("p")?;
1468                ctx.output("out", vals.len() as i32)?;
1469                Ok(())
1470            }))
1471            .build();
1472        let net = PetriNet::builder("test").transition(t).build();
1473
1474        let mut marking = Marking::new();
1475        for i in 0..5 {
1476            marking.add(&p, Token::at(i, 0));
1477        }
1478
1479        let mut executor =
1480            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1481        executor.run_sync();
1482
1483        assert_eq!(executor.marking().count("p"), 0);
1484    }
1485
1486    #[test]
1487    fn at_least_blocks_insufficient() {
1488        let p = Place::<i32>::new("p");
1489        let p_out = Place::<()>::new("out");
1490
1491        let t = Transition::builder("t1")
1492            .input(libpetri_core::input::at_least(3, &p))
1493            .output(out_place(&p_out))
1494            .action(libpetri_core::action::passthrough())
1495            .build();
1496        let net = PetriNet::builder("test").transition(t).build();
1497
1498        // Only 2 tokens, need 3+
1499        let mut marking = Marking::new();
1500        marking.add(&p, Token::at(1, 0));
1501        marking.add(&p, Token::at(2, 0));
1502
1503        let mut executor =
1504            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1505        executor.run_sync();
1506
1507        assert_eq!(executor.marking().count("p"), 2); // not consumed
1508    }
1509
1510    #[test]
1511    fn at_least_fires_with_enough_and_consumes_all() {
1512        let p = Place::<i32>::new("p");
1513        let p_out = Place::<()>::new("out");
1514
1515        let t = Transition::builder("t1")
1516            .input(libpetri_core::input::at_least(3, &p))
1517            .output(out_place(&p_out))
1518            .action(libpetri_core::action::passthrough())
1519            .build();
1520        let net = PetriNet::builder("test").transition(t).build();
1521
1522        let mut marking = Marking::new();
1523        for i in 0..5 {
1524            marking.add(&p, Token::at(i, 0));
1525        }
1526
1527        let mut executor =
1528            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1529        executor.run_sync();
1530
1531        assert_eq!(executor.marking().count("p"), 0); // all consumed
1532    }
1533
1534    #[test]
1535    fn guarded_input_only_consumes_matching() {
1536        let p = Place::<i32>::new("p");
1537        let p_out = Place::<i32>::new("out");
1538
1539        let t = Transition::builder("t1")
1540            .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1541            .output(out_place(&p_out))
1542            .action(libpetri_core::action::fork())
1543            .build();
1544        let net = PetriNet::builder("test").transition(t).build();
1545
1546        let mut marking = Marking::new();
1547        marking.add(&p, Token::at(3, 0)); // doesn't match guard
1548        marking.add(&p, Token::at(10, 0)); // matches
1549
1550        let mut executor =
1551            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1552        executor.run_sync();
1553
1554        assert_eq!(executor.marking().count("p"), 1); // 3 remains
1555        assert_eq!(executor.marking().count("out"), 1); // 10 forwarded
1556        let peeked = executor.marking().peek(&p_out).unwrap();
1557        assert_eq!(*peeked, 10);
1558    }
1559
1560    #[test]
1561    fn guarded_input_blocks_when_no_match() {
1562        let p = Place::<i32>::new("p");
1563        let p_out = Place::<i32>::new("out");
1564
1565        let t = Transition::builder("t1")
1566            .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1567            .output(out_place(&p_out))
1568            .action(libpetri_core::action::fork())
1569            .build();
1570        let net = PetriNet::builder("test").transition(t).build();
1571
1572        let mut marking = Marking::new();
1573        marking.add(&p, Token::at(3, 0));
1574        marking.add(&p, Token::at(10, 0));
1575
1576        let mut executor =
1577            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1578        executor.run_sync();
1579
1580        // Nothing matches guard, transition does not fire
1581        assert_eq!(executor.marking().count("p"), 2);
1582        assert_eq!(executor.marking().count("out"), 0);
1583    }
1584
1585    #[test]
1586    fn transform_action_outputs_to_all_places() {
1587        let p_in = Place::<i32>::new("in");
1588        let p_a = Place::<i32>::new("a");
1589        let p_b = Place::<i32>::new("b");
1590
1591        let t = Transition::builder("t1")
1592            .input(one(&p_in))
1593            .output(libpetri_core::output::and(vec![
1594                out_place(&p_a),
1595                out_place(&p_b),
1596            ]))
1597            .action(libpetri_core::action::transform(|ctx| {
1598                let v = ctx.input::<i32>("in").unwrap();
1599                Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
1600            }))
1601            .build();
1602        let net = PetriNet::builder("test").transition(t).build();
1603
1604        let mut marking = Marking::new();
1605        marking.add(&p_in, Token::at(5, 0));
1606
1607        let mut executor =
1608            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1609        executor.run_sync();
1610
1611        assert_eq!(executor.marking().count("a"), 1);
1612        assert_eq!(executor.marking().count("b"), 1);
1613        assert_eq!(*executor.marking().peek(&p_a).unwrap(), 10);
1614        assert_eq!(*executor.marking().peek(&p_b).unwrap(), 10);
1615    }
1616
1617    #[test]
1618    fn sync_action_custom_logic() {
1619        let p_in = Place::<i32>::new("in");
1620        let p_out = Place::<String>::new("out");
1621
1622        let t = Transition::builder("t1")
1623            .input(one(&p_in))
1624            .output(out_place(&p_out))
1625            .action(libpetri_core::action::sync_action(|ctx| {
1626                let v = ctx.input::<i32>("in")?;
1627                ctx.output("out", format!("value={v}"))?;
1628                Ok(())
1629            }))
1630            .build();
1631        let net = PetriNet::builder("test").transition(t).build();
1632
1633        let mut marking = Marking::new();
1634        marking.add(&p_in, Token::at(42, 0));
1635
1636        let mut executor =
1637            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1638        executor.run_sync();
1639
1640        assert_eq!(executor.marking().count("out"), 1);
1641        let peeked = executor.marking().peek(&p_out).unwrap();
1642        assert_eq!(*peeked, "value=42");
1643    }
1644
1645    #[test]
1646    fn action_error_does_not_crash() {
1647        let p_in = Place::<i32>::new("in");
1648        let p_out = Place::<i32>::new("out");
1649
1650        let t = Transition::builder("t1")
1651            .input(one(&p_in))
1652            .output(out_place(&p_out))
1653            .action(libpetri_core::action::sync_action(|_ctx| {
1654                Err(libpetri_core::action::ActionError::new(
1655                    "intentional failure",
1656                ))
1657            }))
1658            .build();
1659        let net = PetriNet::builder("test").transition(t).build();
1660
1661        let mut marking = Marking::new();
1662        marking.add(&p_in, Token::at(42, 0));
1663
1664        let mut executor =
1665            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1666        executor.run_sync();
1667
1668        // Token consumed even though action failed
1669        assert_eq!(executor.marking().count("in"), 0);
1670        assert_eq!(executor.marking().count("out"), 0);
1671
1672        // Failure event should be recorded
1673        let events = executor.event_store().events();
1674        assert!(
1675            events
1676                .iter()
1677                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
1678        );
1679    }
1680
1681    #[test]
1682    fn event_store_records_lifecycle() {
1683        let p1 = Place::<i32>::new("p1");
1684        let p2 = Place::<i32>::new("p2");
1685        let t = Transition::builder("t1")
1686            .input(one(&p1))
1687            .output(out_place(&p2))
1688            .action(libpetri_core::action::fork())
1689            .build();
1690        let net = PetriNet::builder("test").transition(t).build();
1691
1692        let mut marking = Marking::new();
1693        marking.add(&p1, Token::at(1, 0));
1694
1695        let mut executor =
1696            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1697        executor.run_sync();
1698
1699        let events = executor.event_store().events();
1700
1701        // Should have: ExecutionStarted, TransitionEnabled, TokenRemoved, TransitionStarted,
1702        // TokenAdded, TransitionCompleted, ExecutionCompleted
1703        assert!(
1704            events
1705                .iter()
1706                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
1707        );
1708        assert!(
1709            events
1710                .iter()
1711                .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
1712        );
1713        assert!(
1714            events
1715                .iter()
1716                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
1717        );
1718        assert!(
1719            events
1720                .iter()
1721                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
1722        );
1723        assert!(
1724            events
1725                .iter()
1726                .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
1727        );
1728        assert!(
1729            events
1730                .iter()
1731                .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
1732        );
1733        assert!(
1734            events
1735                .iter()
1736                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
1737        );
1738    }
1739
1740    #[test]
1741    fn noop_event_store_has_no_events() {
1742        let p1 = Place::<i32>::new("p1");
1743        let p2 = Place::<i32>::new("p2");
1744        let t = Transition::builder("t1")
1745            .input(one(&p1))
1746            .output(out_place(&p2))
1747            .action(libpetri_core::action::fork())
1748            .build();
1749        let net = PetriNet::builder("test").transition(t).build();
1750
1751        let mut marking = Marking::new();
1752        marking.add(&p1, Token::at(1, 0));
1753
1754        let mut executor =
1755            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1756        executor.run_sync();
1757
1758        assert!(executor.event_store().is_empty());
1759    }
1760
1761    #[test]
1762    fn empty_net_completes() {
1763        let net = PetriNet::builder("empty").build();
1764        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1765            &net,
1766            Marking::new(),
1767            ExecutorOptions::default(),
1768        );
1769        executor.run_sync();
1770        assert!(executor.is_quiescent());
1771    }
1772
1773    #[test]
1774    fn single_transition_fires_once() {
1775        let p = Place::<i32>::new("p");
1776        let out = Place::<i32>::new("out");
1777        let t = Transition::builder("t1")
1778            .input(one(&p))
1779            .output(out_place(&out))
1780            .action(libpetri_core::action::fork())
1781            .build();
1782        let net = PetriNet::builder("test").transition(t).build();
1783
1784        let mut marking = Marking::new();
1785        marking.add(&p, Token::at(42, 0));
1786
1787        let mut executor =
1788            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1789        executor.run_sync();
1790
1791        assert_eq!(executor.marking().count("p"), 0);
1792        assert_eq!(executor.marking().count("out"), 1);
1793    }
1794
1795    #[test]
1796    fn many_tokens_all_processed() {
1797        let p = Place::<i32>::new("p");
1798        let out = Place::<i32>::new("out");
1799        let t = Transition::builder("t1")
1800            .input(one(&p))
1801            .output(out_place(&out))
1802            .action(libpetri_core::action::fork())
1803            .build();
1804        let net = PetriNet::builder("test").transition(t).build();
1805
1806        let mut marking = Marking::new();
1807        for i in 0..100 {
1808            marking.add(&p, Token::at(i, 0));
1809        }
1810
1811        let mut executor =
1812            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1813        executor.run_sync();
1814
1815        assert_eq!(executor.marking().count("p"), 0);
1816        assert_eq!(executor.marking().count("out"), 100);
1817    }
1818
1819    #[test]
1820    fn input_fifo_ordering() {
1821        let p = Place::<i32>::new("p");
1822        let out = Place::<i32>::new("out");
1823        let t = Transition::builder("t1")
1824            .input(one(&p))
1825            .output(out_place(&out))
1826            .action(libpetri_core::action::fork())
1827            .build();
1828        let net = PetriNet::builder("test").transition(t).build();
1829
1830        let mut marking = Marking::new();
1831        marking.add(&p, Token::at(1, 0));
1832        marking.add(&p, Token::at(2, 0));
1833        marking.add(&p, Token::at(3, 0));
1834
1835        let mut executor =
1836            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1837        executor.run_sync();
1838
1839        // All processed, first one should have been consumed first
1840        assert_eq!(executor.marking().count("out"), 3);
1841    }
1842
1843    #[test]
1844    fn inhibitor_unblocked_when_token_removed() {
1845        // p1 has token, p_inh has token (blocks t1)
1846        // t_clear removes from p_inh, then t1 can fire
1847        let p1 = Place::<()>::new("p1");
1848        let p_inh = Place::<()>::new("inh");
1849        let p_out = Place::<()>::new("out");
1850        let p_trigger = Place::<()>::new("trigger");
1851
1852        // t_clear: consumes from inh, outputs to trigger
1853        let t_clear = Transition::builder("t_clear")
1854            .input(one(&p_inh))
1855            .output(out_place(&p_trigger))
1856            .action(libpetri_core::action::fork())
1857            .priority(10) // higher priority fires first
1858            .build();
1859
1860        // t1: consumes from p1, inhibited by inh
1861        let t1 = Transition::builder("t1")
1862            .input(one(&p1))
1863            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1864            .output(out_place(&p_out))
1865            .action(libpetri_core::action::fork())
1866            .priority(1)
1867            .build();
1868
1869        let net = PetriNet::builder("test").transitions([t_clear, t1]).build();
1870
1871        let mut marking = Marking::new();
1872        marking.add(&p1, Token::at((), 0));
1873        marking.add(&p_inh, Token::at((), 0));
1874
1875        let mut executor =
1876            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1877        executor.run_sync();
1878
1879        // t_clear fires first (priority 10), removes inh token
1880        // then t1 fires (no longer inhibited)
1881        assert_eq!(executor.marking().count("inh"), 0);
1882        assert_eq!(executor.marking().count("p1"), 0);
1883        assert_eq!(executor.marking().count("out"), 1);
1884    }
1885
1886    #[test]
1887    fn combined_input_read_reset() {
1888        let p_in = Place::<i32>::new("in");
1889        let p_ctx = Place::<String>::new("ctx");
1890        let p_clear = Place::<i32>::new("clear");
1891        let p_out = Place::<String>::new("out");
1892
1893        let t = Transition::builder("t1")
1894            .input(one(&p_in))
1895            .read(libpetri_core::arc::read(&p_ctx))
1896            .reset(libpetri_core::arc::reset(&p_clear))
1897            .output(out_place(&p_out))
1898            .action(libpetri_core::action::sync_action(|ctx| {
1899                let v = ctx.input::<i32>("in")?;
1900                let r = ctx.read::<String>("ctx")?;
1901                ctx.output("out", format!("{v}-{r}"))?;
1902                Ok(())
1903            }))
1904            .build();
1905        let net = PetriNet::builder("test").transition(t).build();
1906
1907        let mut marking = Marking::new();
1908        marking.add(&p_in, Token::at(42, 0));
1909        marking.add(&p_ctx, Token::at("hello".to_string(), 0));
1910        marking.add(&p_clear, Token::at(1, 0));
1911        marking.add(&p_clear, Token::at(2, 0));
1912
1913        let mut executor =
1914            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1915        executor.run_sync();
1916
1917        assert_eq!(executor.marking().count("in"), 0); // consumed
1918        assert_eq!(executor.marking().count("ctx"), 1); // read, not consumed
1919        assert_eq!(executor.marking().count("clear"), 0); // reset
1920        assert_eq!(executor.marking().count("out"), 1);
1921        let peeked = executor.marking().peek(&p_out).unwrap();
1922        assert_eq!(*peeked, "42-hello");
1923    }
1924
1925    #[test]
1926    fn workflow_sequential_chain() {
1927        // p1 -> t1 -> p2 -> t2 -> p3 -> t3 -> p4
1928        // Each transition doubles the value
1929        let p1 = Place::<i32>::new("p1");
1930        let p2 = Place::<i32>::new("p2");
1931        let p3 = Place::<i32>::new("p3");
1932        let p4 = Place::<i32>::new("p4");
1933
1934        let make_doubler = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
1935            let out_name: Arc<str> = Arc::from(outp.name());
1936            Transition::builder(name)
1937                .input(one(inp))
1938                .output(out_place(outp))
1939                .action(libpetri_core::action::sync_action(move |ctx| {
1940                    let v = ctx
1941                        .input::<i32>("p1")
1942                        .or_else(|_| ctx.input::<i32>("p2"))
1943                        .or_else(|_| ctx.input::<i32>("p3"))
1944                        .unwrap();
1945                    ctx.output(&out_name, *v * 2)?;
1946                    Ok(())
1947                }))
1948                .build()
1949        };
1950
1951        let t1 = make_doubler("t1", &p1, &p2);
1952        let t2 = make_doubler("t2", &p2, &p3);
1953        let t3 = make_doubler("t3", &p3, &p4);
1954
1955        let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
1956
1957        let mut marking = Marking::new();
1958        marking.add(&p1, Token::at(1, 0));
1959
1960        let mut executor =
1961            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1962        executor.run_sync();
1963
1964        assert_eq!(executor.marking().count("p4"), 1);
1965        assert_eq!(*executor.marking().peek(&p4).unwrap(), 8); // 1 * 2 * 2 * 2
1966    }
1967
1968    #[test]
1969    fn workflow_fork_join() {
1970        // p_start -> t_fork -> p_a, p_b -> t_join -> p_end
1971        let p_start = Place::<i32>::new("start");
1972        let p_a = Place::<i32>::new("a");
1973        let p_b = Place::<i32>::new("b");
1974        let p_end = Place::<i32>::new("end");
1975
1976        let t_fork = Transition::builder("fork")
1977            .input(one(&p_start))
1978            .output(libpetri_core::output::and(vec![
1979                out_place(&p_a),
1980                out_place(&p_b),
1981            ]))
1982            .action(libpetri_core::action::fork())
1983            .build();
1984
1985        let t_join = Transition::builder("join")
1986            .input(one(&p_a))
1987            .input(one(&p_b))
1988            .output(out_place(&p_end))
1989            .action(libpetri_core::action::sync_action(|ctx| {
1990                let a = ctx.input::<i32>("a")?;
1991                let b = ctx.input::<i32>("b")?;
1992                ctx.output("end", *a + *b)?;
1993                Ok(())
1994            }))
1995            .build();
1996
1997        let net = PetriNet::builder("fork-join")
1998            .transitions([t_fork, t_join])
1999            .build();
2000
2001        let mut marking = Marking::new();
2002        marking.add(&p_start, Token::at(5, 0));
2003
2004        let mut executor =
2005            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2006        executor.run_sync();
2007
2008        assert_eq!(executor.marking().count("start"), 0);
2009        assert_eq!(executor.marking().count("a"), 0);
2010        assert_eq!(executor.marking().count("b"), 0);
2011        assert_eq!(executor.marking().count("end"), 1);
2012        assert_eq!(*executor.marking().peek(&p_end).unwrap(), 10); // 5 + 5
2013    }
2014
2015    #[test]
2016    fn workflow_mutual_exclusion() {
2017        // Two workers compete for a mutex token
2018        let p_mutex = Place::<()>::new("mutex");
2019        let p_w1 = Place::<()>::new("w1");
2020        let p_w2 = Place::<()>::new("w2");
2021        let p_done1 = Place::<()>::new("done1");
2022        let p_done2 = Place::<()>::new("done2");
2023
2024        let t_w1 = Transition::builder("work1")
2025            .input(one(&p_w1))
2026            .input(one(&p_mutex))
2027            .output(libpetri_core::output::and(vec![
2028                out_place(&p_done1),
2029                out_place(&p_mutex), // return mutex
2030            ]))
2031            .action(libpetri_core::action::sync_action(|ctx| {
2032                ctx.output("done1", ())?;
2033                ctx.output("mutex", ())?;
2034                Ok(())
2035            }))
2036            .build();
2037
2038        let t_w2 = Transition::builder("work2")
2039            .input(one(&p_w2))
2040            .input(one(&p_mutex))
2041            .output(libpetri_core::output::and(vec![
2042                out_place(&p_done2),
2043                out_place(&p_mutex), // return mutex
2044            ]))
2045            .action(libpetri_core::action::sync_action(|ctx| {
2046                ctx.output("done2", ())?;
2047                ctx.output("mutex", ())?;
2048                Ok(())
2049            }))
2050            .build();
2051
2052        let net = PetriNet::builder("mutex").transitions([t_w1, t_w2]).build();
2053
2054        let mut marking = Marking::new();
2055        marking.add(&p_mutex, Token::at((), 0));
2056        marking.add(&p_w1, Token::at((), 0));
2057        marking.add(&p_w2, Token::at((), 0));
2058
2059        let mut executor =
2060            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2061        executor.run_sync();
2062
2063        // Both should complete, mutex should be returned
2064        assert_eq!(executor.marking().count("done1"), 1);
2065        assert_eq!(executor.marking().count("done2"), 1);
2066        assert_eq!(executor.marking().count("mutex"), 1); // returned
2067    }
2068
2069    #[test]
2070    fn produce_action() {
2071        let p_in = Place::<()>::new("in");
2072        let p_out = Place::<String>::new("out");
2073
2074        let t = Transition::builder("t1")
2075            .input(one(&p_in))
2076            .output(out_place(&p_out))
2077            .action(libpetri_core::action::produce(
2078                Arc::from("out"),
2079                "produced_value".to_string(),
2080            ))
2081            .build();
2082        let net = PetriNet::builder("test").transition(t).build();
2083
2084        let mut marking = Marking::new();
2085        marking.add(&p_in, Token::at((), 0));
2086
2087        let mut executor =
2088            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2089        executor.run_sync();
2090
2091        assert_eq!(executor.marking().count("out"), 1);
2092    }
2093
2094    #[test]
2095    fn xor_output_fires_one_branch() {
2096        let p = Place::<i32>::new("p");
2097        let a = Place::<i32>::new("a");
2098        let b = Place::<i32>::new("b");
2099
2100        let t = Transition::builder("t1")
2101            .input(one(&p))
2102            .output(libpetri_core::output::xor(vec![
2103                out_place(&a),
2104                out_place(&b),
2105            ]))
2106            .action(libpetri_core::action::sync_action(|ctx| {
2107                let v = ctx.input::<i32>("p")?;
2108                // Output to first branch place
2109                ctx.output("a", *v)?;
2110                Ok(())
2111            }))
2112            .build();
2113        let net = PetriNet::builder("test").transition(t).build();
2114
2115        let mut marking = Marking::new();
2116        marking.add(&p, Token::at(1, 0));
2117
2118        let mut executor =
2119            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2120        executor.run_sync();
2121
2122        // XOR: token goes to one branch
2123        let total = executor.marking().count("a") + executor.marking().count("b");
2124        assert!(total >= 1);
2125    }
2126
2127    #[test]
2128    fn and_output_fires_to_all() {
2129        let p = Place::<i32>::new("p");
2130        let a = Place::<i32>::new("a");
2131        let b = Place::<i32>::new("b");
2132        let c = Place::<i32>::new("c");
2133
2134        let t = Transition::builder("t1")
2135            .input(one(&p))
2136            .output(libpetri_core::output::and(vec![
2137                out_place(&a),
2138                out_place(&b),
2139                out_place(&c),
2140            ]))
2141            .action(libpetri_core::action::sync_action(|ctx| {
2142                let v = ctx.input::<i32>("p")?;
2143                ctx.output("a", *v)?;
2144                ctx.output("b", *v * 10)?;
2145                ctx.output("c", *v * 100)?;
2146                Ok(())
2147            }))
2148            .build();
2149        let net = PetriNet::builder("test").transition(t).build();
2150
2151        let mut marking = Marking::new();
2152        marking.add(&p, Token::at(1, 0));
2153
2154        let mut executor =
2155            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2156        executor.run_sync();
2157
2158        assert_eq!(executor.marking().count("a"), 1);
2159        assert_eq!(executor.marking().count("b"), 1);
2160        assert_eq!(executor.marking().count("c"), 1);
2161        assert_eq!(*executor.marking().peek(&a).unwrap(), 1);
2162        assert_eq!(*executor.marking().peek(&b).unwrap(), 10);
2163        assert_eq!(*executor.marking().peek(&c).unwrap(), 100);
2164    }
2165
2166    #[test]
2167    fn transition_with_no_output_consumes_only() {
2168        let p = Place::<i32>::new("p");
2169
2170        let t = Transition::builder("t1")
2171            .input(one(&p))
2172            .action(libpetri_core::action::sync_action(|ctx| {
2173                let _ = ctx.input::<i32>("p")?;
2174                Ok(())
2175            }))
2176            .build();
2177        let net = PetriNet::builder("test").transition(t).build();
2178
2179        let mut marking = Marking::new();
2180        marking.add(&p, Token::at(42, 0));
2181
2182        let mut executor =
2183            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2184        executor.run_sync();
2185
2186        assert_eq!(executor.marking().count("p"), 0);
2187    }
2188
2189    #[test]
2190    fn multiple_transitions_same_input_compete() {
2191        // Two transitions both need p1 — only one should fire per token
2192        let p1 = Place::<()>::new("p1");
2193        let out_a = Place::<()>::new("a");
2194        let out_b = Place::<()>::new("b");
2195
2196        let t_a = Transition::builder("ta")
2197            .input(one(&p1))
2198            .output(out_place(&out_a))
2199            .action(libpetri_core::action::fork())
2200            .build();
2201        let t_b = Transition::builder("tb")
2202            .input(one(&p1))
2203            .output(out_place(&out_b))
2204            .action(libpetri_core::action::fork())
2205            .build();
2206
2207        let net = PetriNet::builder("test").transitions([t_a, t_b]).build();
2208
2209        let mut marking = Marking::new();
2210        marking.add(&p1, Token::at((), 0));
2211
2212        let mut executor =
2213            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2214        executor.run_sync();
2215
2216        // One token, two competing transitions — exactly one fires
2217        let total = executor.marking().count("a") + executor.marking().count("b");
2218        assert_eq!(total, 1);
2219        assert_eq!(executor.marking().count("p1"), 0);
2220    }
2221
2222    #[test]
2223    fn priority_higher_fires_first() {
2224        let p = Place::<()>::new("p");
2225        let out_hi = Place::<()>::new("hi");
2226        let out_lo = Place::<()>::new("lo");
2227
2228        let t_hi = Transition::builder("hi")
2229            .input(one(&p))
2230            .output(out_place(&out_hi))
2231            .action(libpetri_core::action::fork())
2232            .priority(10)
2233            .build();
2234        let t_lo = Transition::builder("lo")
2235            .input(one(&p))
2236            .output(out_place(&out_lo))
2237            .action(libpetri_core::action::fork())
2238            .priority(1)
2239            .build();
2240
2241        let net = PetriNet::builder("test").transitions([t_hi, t_lo]).build();
2242
2243        let mut marking = Marking::new();
2244        marking.add(&p, Token::at((), 0));
2245
2246        let mut executor =
2247            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2248        executor.run_sync();
2249
2250        // Higher priority should win
2251        assert_eq!(executor.marking().count("hi"), 1);
2252        assert_eq!(executor.marking().count("lo"), 0);
2253    }
2254
2255    #[test]
2256    fn quiescent_when_no_enabled_transitions() {
2257        let p = Place::<i32>::new("p");
2258        let out = Place::<i32>::new("out");
2259
2260        let t = Transition::builder("t1")
2261            .input(one(&p))
2262            .output(out_place(&out))
2263            .action(libpetri_core::action::fork())
2264            .build();
2265        let net = PetriNet::builder("test").transition(t).build();
2266
2267        let mut marking = Marking::new();
2268        marking.add(&p, Token::at(1, 0));
2269
2270        let mut executor =
2271            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2272        executor.run_sync();
2273
2274        assert!(executor.is_quiescent());
2275    }
2276
2277    #[test]
2278    fn event_store_transition_enabled_disabled() {
2279        let p1 = Place::<()>::new("p1");
2280        let p2 = Place::<()>::new("p2");
2281
2282        let t = Transition::builder("t1")
2283            .input(one(&p1))
2284            .output(out_place(&p2))
2285            .action(libpetri_core::action::fork())
2286            .build();
2287        let net = PetriNet::builder("test").transition(t).build();
2288
2289        let mut marking = Marking::new();
2290        marking.add(&p1, Token::at((), 0));
2291
2292        let mut executor =
2293            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2294        executor.run_sync();
2295
2296        let events = executor.event_store().events();
2297        // Should have at least: ExecutionStarted, TransitionEnabled, TransitionStarted,
2298        // TokenAdded, TransitionCompleted, ExecutionCompleted
2299        assert!(events.len() >= 4);
2300
2301        // Check for TransitionEnabled event
2302        let has_enabled = events.iter().any(|e| {
2303            matches!(e, NetEvent::TransitionEnabled { transition_name, .. } if transition_name.as_ref() == "t1")
2304        });
2305        assert!(has_enabled);
2306    }
2307
2308    #[test]
2309    fn diamond_pattern() {
2310        // p1 -> t1 -> p2, p3 -> t2 -> p4 (from p2), t3 -> p4 (from p3)
2311        let p1 = Place::<()>::new("p1");
2312        let p2 = Place::<()>::new("p2");
2313        let p3 = Place::<()>::new("p3");
2314        let p4 = Place::<()>::new("p4");
2315
2316        let t1 = Transition::builder("t1")
2317            .input(one(&p1))
2318            .output(libpetri_core::output::and(vec![
2319                out_place(&p2),
2320                out_place(&p3),
2321            ]))
2322            .action(libpetri_core::action::fork())
2323            .build();
2324        let t2 = Transition::builder("t2")
2325            .input(one(&p2))
2326            .output(out_place(&p4))
2327            .action(libpetri_core::action::fork())
2328            .build();
2329        let t3 = Transition::builder("t3")
2330            .input(one(&p3))
2331            .output(out_place(&p4))
2332            .action(libpetri_core::action::fork())
2333            .build();
2334
2335        let net = PetriNet::builder("diamond")
2336            .transitions([t1, t2, t3])
2337            .build();
2338
2339        let mut marking = Marking::new();
2340        marking.add(&p1, Token::at((), 0));
2341
2342        let mut executor =
2343            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2344        executor.run_sync();
2345
2346        assert_eq!(executor.marking().count("p4"), 2); // both branches produce to p4
2347    }
2348
2349    #[test]
2350    fn self_loop_with_guard_terminates() {
2351        // Transition that loops back, but only fires when value > 0
2352        // Decrements each time, terminates when 0
2353        let p = Place::<i32>::new("p");
2354        let done = Place::<i32>::new("done");
2355
2356        let t = Transition::builder("dec")
2357            .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v > 0))
2358            .output(out_place(&p))
2359            .action(libpetri_core::action::sync_action(|ctx| {
2360                let v = ctx.input::<i32>("p")?;
2361                ctx.output("p", *v - 1)?;
2362                Ok(())
2363            }))
2364            .build();
2365
2366        // When value hits 0, this transition moves it to done
2367        let t_done = Transition::builder("finish")
2368            .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v == 0))
2369            .output(out_place(&done))
2370            .action(libpetri_core::action::fork())
2371            .build();
2372
2373        let net = PetriNet::builder("countdown")
2374            .transitions([t, t_done])
2375            .build();
2376
2377        let mut marking = Marking::new();
2378        marking.add(&p, Token::at(3, 0));
2379
2380        let mut executor =
2381            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2382        executor.run_sync();
2383
2384        assert_eq!(executor.marking().count("done"), 1);
2385        assert_eq!(*executor.marking().peek(&done).unwrap(), 0);
2386    }
2387
2388    #[test]
2389    fn multiple_tokens_different_types() {
2390        let p_int = Place::<i32>::new("ints");
2391        let p_str = Place::<String>::new("strs");
2392        let p_out = Place::<String>::new("out");
2393
2394        let t = Transition::builder("combine")
2395            .input(one(&p_int))
2396            .input(one(&p_str))
2397            .output(out_place(&p_out))
2398            .action(libpetri_core::action::sync_action(|ctx| {
2399                let i = ctx.input::<i32>("ints")?;
2400                let s = ctx.input::<String>("strs")?;
2401                ctx.output("out", format!("{s}-{i}"))?;
2402                Ok(())
2403            }))
2404            .build();
2405
2406        let net = PetriNet::builder("test").transition(t).build();
2407
2408        let mut marking = Marking::new();
2409        marking.add(&p_int, Token::at(42, 0));
2410        marking.add(&p_str, Token::at("hello".to_string(), 0));
2411
2412        let mut executor =
2413            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2414        executor.run_sync();
2415
2416        assert_eq!(*executor.marking().peek(&p_out).unwrap(), "hello-42");
2417    }
2418}
2419
2420#[cfg(all(test, feature = "tokio"))]
2421mod async_tests {
2422    use super::*;
2423    use crate::environment::{ExecutorSignal, ExternalEvent};
2424    use libpetri_core::action::{ActionError, async_action, fork};
2425    use libpetri_core::input::one;
2426    use libpetri_core::output::out_place;
2427    use libpetri_core::place::Place;
2428    use libpetri_core::token::{ErasedToken, Token};
2429    use libpetri_core::transition::Transition;
2430    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
2431
2432    #[tokio::test]
2433    async fn async_fork_single_transition() {
2434        let p1 = Place::<i32>::new("p1");
2435        let p2 = Place::<i32>::new("p2");
2436
2437        let t1 = Transition::builder("t1")
2438            .input(one(&p1))
2439            .output(out_place(&p2))
2440            .action(fork())
2441            .build();
2442
2443        let net = PetriNet::builder("test").transition(t1).build();
2444        let mut marking = Marking::new();
2445        marking.add(&p1, Token::at(42, 0));
2446
2447        let mut executor =
2448            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2449
2450        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2451        executor.run_async(rx).await;
2452
2453        assert_eq!(executor.marking().count("p2"), 1);
2454        assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
2455    }
2456
2457    #[tokio::test]
2458    async fn async_action_produces_output() {
2459        let p1 = Place::<i32>::new("p1");
2460        let p2 = Place::<i32>::new("p2");
2461
2462        let action = async_action(|mut ctx| async move {
2463            let val: i32 = *ctx.input::<i32>("p1")?;
2464            ctx.output("p2", val * 10)?;
2465            Ok(ctx)
2466        });
2467
2468        let t1 = Transition::builder("t1")
2469            .input(one(&p1))
2470            .output(out_place(&p2))
2471            .action(action)
2472            .build();
2473
2474        let net = PetriNet::builder("test").transition(t1).build();
2475        let mut marking = Marking::new();
2476        marking.add(&p1, Token::at(5, 0));
2477
2478        let mut executor =
2479            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2480
2481        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2482        executor.run_async(rx).await;
2483
2484        assert_eq!(executor.marking().count("p2"), 1);
2485        assert_eq!(*executor.marking().peek(&p2).unwrap(), 50);
2486    }
2487
2488    #[tokio::test]
2489    async fn async_chain_two_transitions() {
2490        let p1 = Place::<i32>::new("p1");
2491        let p2 = Place::<i32>::new("p2");
2492        let p3 = Place::<i32>::new("p3");
2493
2494        let t1 = Transition::builder("t1")
2495            .input(one(&p1))
2496            .output(out_place(&p2))
2497            .action(fork())
2498            .build();
2499
2500        let t2 = Transition::builder("t2")
2501            .input(one(&p2))
2502            .output(out_place(&p3))
2503            .action(fork())
2504            .build();
2505
2506        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
2507        let mut marking = Marking::new();
2508        marking.add(&p1, Token::at(99, 0));
2509
2510        let mut executor =
2511            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2512
2513        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2514        executor.run_async(rx).await;
2515
2516        assert_eq!(executor.marking().count("p3"), 1);
2517        assert_eq!(*executor.marking().peek(&p3).unwrap(), 99);
2518    }
2519
2520    #[tokio::test]
2521    async fn async_event_injection() {
2522        let p1 = Place::<i32>::new("p1");
2523        let p2 = Place::<i32>::new("p2");
2524
2525        let t1 = Transition::builder("t1")
2526            .input(one(&p1))
2527            .output(out_place(&p2))
2528            .action(fork())
2529            .build();
2530
2531        let net = PetriNet::builder("test").transition(t1).build();
2532        let marking = Marking::new(); // empty — no initial tokens
2533
2534        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2535            &net,
2536            marking,
2537            ExecutorOptions {
2538                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2539            },
2540        );
2541
2542        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2543
2544        // Inject a token after a short delay, then close the channel
2545        tokio::spawn(async move {
2546            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2547            let token = Token::at(77, 0);
2548            tx.send(ExecutorSignal::Event(ExternalEvent {
2549                place_name: Arc::from("p1"),
2550                token: ErasedToken::from_typed(&token),
2551            }))
2552            .unwrap();
2553            // Drop tx to close the channel and let executor terminate
2554        });
2555
2556        executor.run_async(rx).await;
2557
2558        assert_eq!(executor.marking().count("p2"), 1);
2559        assert_eq!(*executor.marking().peek(&p2).unwrap(), 77);
2560    }
2561
2562    #[tokio::test]
2563    async fn async_with_event_store_records_events() {
2564        let p1 = Place::<i32>::new("p1");
2565        let p2 = Place::<i32>::new("p2");
2566
2567        let t1 = Transition::builder("t1")
2568            .input(one(&p1))
2569            .output(out_place(&p2))
2570            .action(fork())
2571            .build();
2572
2573        let net = PetriNet::builder("test").transition(t1).build();
2574        let mut marking = Marking::new();
2575        marking.add(&p1, Token::at(1, 0));
2576
2577        let mut executor =
2578            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2579
2580        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2581        executor.run_async(rx).await;
2582
2583        let events = executor.event_store().events();
2584        assert!(!events.is_empty());
2585        // Should have ExecutionStarted, TransitionStarted, TransitionCompleted, ExecutionCompleted
2586        assert!(
2587            events
2588                .iter()
2589                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2590        );
2591        assert!(
2592            events
2593                .iter()
2594                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2595        );
2596        assert!(
2597            events
2598                .iter()
2599                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2600        );
2601        assert!(
2602            events
2603                .iter()
2604                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2605        );
2606    }
2607
2608    #[tokio::test]
2609    async fn async_action_error_does_not_crash() {
2610        let p1 = Place::<i32>::new("p1");
2611        let p2 = Place::<i32>::new("p2");
2612
2613        let action =
2614            async_action(|_ctx| async move { Err(ActionError::new("intentional failure")) });
2615
2616        let t1 = Transition::builder("t1")
2617            .input(one(&p1))
2618            .output(out_place(&p2))
2619            .action(action)
2620            .build();
2621
2622        let net = PetriNet::builder("test").transition(t1).build();
2623        let mut marking = Marking::new();
2624        marking.add(&p1, Token::at(1, 0));
2625
2626        let mut executor =
2627            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2628
2629        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2630        executor.run_async(rx).await;
2631
2632        // Token consumed but no output produced
2633        assert_eq!(executor.marking().count("p1"), 0);
2634        assert_eq!(executor.marking().count("p2"), 0);
2635
2636        // Failure event recorded
2637        let events = executor.event_store().events();
2638        assert!(
2639            events
2640                .iter()
2641                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2642        );
2643    }
2644
2645    #[tokio::test]
2646    async fn async_delayed_timing() {
2647        use libpetri_core::timing::delayed;
2648
2649        let p1 = Place::<i32>::new("p1");
2650        let p2 = Place::<i32>::new("p2");
2651
2652        let t1 = Transition::builder("t1")
2653            .input(one(&p1))
2654            .output(out_place(&p2))
2655            .timing(delayed(100))
2656            .action(fork())
2657            .build();
2658
2659        let net = PetriNet::builder("test").transition(t1).build();
2660        let mut marking = Marking::new();
2661        marking.add(&p1, Token::at(10, 0));
2662
2663        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2664            &net,
2665            marking,
2666            ExecutorOptions {
2667                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2668            },
2669        );
2670
2671        let start = std::time::Instant::now();
2672        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2673        // Keep channel open long enough for the delayed transition to fire
2674        tokio::spawn(async move {
2675            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2676            drop(tx);
2677        });
2678        executor.run_async(rx).await;
2679
2680        assert!(
2681            start.elapsed().as_millis() >= 80,
2682            "delayed(100) should wait ~100ms"
2683        );
2684        assert_eq!(executor.marking().count("p2"), 1);
2685        assert_eq!(*executor.marking().peek(&p2).unwrap(), 10);
2686    }
2687
2688    #[tokio::test]
2689    async fn async_exact_timing() {
2690        use libpetri_core::timing::exact;
2691
2692        let p1 = Place::<i32>::new("p1");
2693        let p2 = Place::<i32>::new("p2");
2694
2695        let t1 = Transition::builder("t1")
2696            .input(one(&p1))
2697            .output(out_place(&p2))
2698            .timing(exact(100))
2699            .action(fork())
2700            .build();
2701
2702        let net = PetriNet::builder("test").transition(t1).build();
2703        let mut marking = Marking::new();
2704        marking.add(&p1, Token::at(20, 0));
2705
2706        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2707            &net,
2708            marking,
2709            ExecutorOptions {
2710                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2711            },
2712        );
2713
2714        let start = std::time::Instant::now();
2715        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2716        tokio::spawn(async move {
2717            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2718            drop(tx);
2719        });
2720        executor.run_async(rx).await;
2721
2722        assert!(
2723            start.elapsed().as_millis() >= 80,
2724            "exact(100) should wait ~100ms"
2725        );
2726        assert_eq!(executor.marking().count("p2"), 1);
2727        assert_eq!(*executor.marking().peek(&p2).unwrap(), 20);
2728    }
2729
2730    #[tokio::test]
2731    async fn async_window_timing() {
2732        use libpetri_core::timing::window;
2733
2734        let p1 = Place::<i32>::new("p1");
2735        let p2 = Place::<i32>::new("p2");
2736
2737        let t1 = Transition::builder("t1")
2738            .input(one(&p1))
2739            .output(out_place(&p2))
2740            .timing(window(50, 200))
2741            .action(fork())
2742            .build();
2743
2744        let net = PetriNet::builder("test").transition(t1).build();
2745        let mut marking = Marking::new();
2746        marking.add(&p1, Token::at(30, 0));
2747
2748        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2749            &net,
2750            marking,
2751            ExecutorOptions {
2752                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2753            },
2754        );
2755
2756        let start = std::time::Instant::now();
2757        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2758        tokio::spawn(async move {
2759            tokio::time::sleep(std::time::Duration::from_millis(400)).await;
2760            drop(tx);
2761        });
2762        executor.run_async(rx).await;
2763
2764        assert!(
2765            start.elapsed().as_millis() >= 40,
2766            "window(50,200) should wait >= ~50ms"
2767        );
2768        assert_eq!(executor.marking().count("p2"), 1);
2769        assert_eq!(*executor.marking().peek(&p2).unwrap(), 30);
2770    }
2771
2772    #[tokio::test]
2773    async fn async_deadline_enforcement() {
2774        use libpetri_core::action::sync_action;
2775        use libpetri_core::timing::window;
2776
2777        let p_slow = Place::<i32>::new("p_slow");
2778        let p_windowed = Place::<i32>::new("p_windowed");
2779        let slow_out = Place::<i32>::new("slow_out");
2780        let windowed_out = Place::<i32>::new("windowed_out");
2781
2782        // Sync action that busy-waits for 200ms, blocking the executor thread.
2783        // This prevents the executor from reaching the fire phase for windowed
2784        // until after its deadline has passed.
2785        let t_slow = Transition::builder("slow")
2786            .input(one(&p_slow))
2787            .output(out_place(&slow_out))
2788            .priority(10)
2789            .action(sync_action(|ctx| {
2790                let v = ctx.input::<i32>("p_slow")?;
2791                let start = std::time::Instant::now();
2792                while start.elapsed().as_millis() < 200 {
2793                    std::hint::spin_loop();
2794                }
2795                ctx.output("slow_out", *v)?;
2796                Ok(())
2797            }))
2798            .build();
2799
2800        // Windowed transition: enabled at start, earliest=50ms, deadline=100ms.
2801        // Because the slow sync action blocks the executor for 200ms, by the time
2802        // enforce_deadlines runs again, elapsed (~200ms) > latest (100ms) + tolerance.
2803        let t_windowed = Transition::builder("windowed")
2804            .input(one(&p_windowed))
2805            .output(out_place(&windowed_out))
2806            .timing(window(50, 100))
2807            .action(fork())
2808            .build();
2809
2810        let net = PetriNet::builder("test")
2811            .transitions([t_slow, t_windowed])
2812            .build();
2813
2814        let mut marking = Marking::new();
2815        marking.add(&p_slow, Token::at(1, 0));
2816        marking.add(&p_windowed, Token::at(2, 0));
2817
2818        let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
2819            &net,
2820            marking,
2821            ExecutorOptions {
2822                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2823            },
2824        );
2825
2826        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2827        tokio::spawn(async move {
2828            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2829            drop(tx);
2830        });
2831        executor.run_async(rx).await;
2832
2833        // Slow fires and completes after 200ms busy-wait
2834        assert_eq!(executor.marking().count("slow_out"), 1);
2835        // Windowed should have been disabled by deadline enforcement
2836        assert_eq!(
2837            executor.marking().count("windowed_out"),
2838            0,
2839            "windowed transition should have been disabled by deadline"
2840        );
2841
2842        let events = executor.event_store().events();
2843        assert!(
2844            events.iter().any(|e| matches!(e, NetEvent::TransitionTimedOut { transition_name, .. } if &**transition_name == "windowed")),
2845            "expected TransitionTimedOut event for 'windowed'"
2846        );
2847    }
2848
2849    #[tokio::test]
2850    async fn async_multiple_injections() {
2851        let p1 = Place::<i32>::new("p1");
2852        let p2 = Place::<i32>::new("p2");
2853
2854        let t1 = Transition::builder("t1")
2855            .input(one(&p1))
2856            .output(out_place(&p2))
2857            .action(fork())
2858            .build();
2859
2860        let net = PetriNet::builder("test").transition(t1).build();
2861        let marking = Marking::new();
2862
2863        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2864            &net,
2865            marking,
2866            ExecutorOptions {
2867                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2868            },
2869        );
2870
2871        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2872
2873        tokio::spawn(async move {
2874            for i in 0..5 {
2875                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2876                let token = Token::at(i, 0);
2877                tx.send(ExecutorSignal::Event(ExternalEvent {
2878                    place_name: Arc::from("p1"),
2879                    token: ErasedToken::from_typed(&token),
2880                }))
2881                .unwrap();
2882            }
2883            // Drop tx to close channel
2884        });
2885
2886        executor.run_async(rx).await;
2887
2888        assert_eq!(
2889            executor.marking().count("p2"),
2890            5,
2891            "all 5 injected tokens should arrive"
2892        );
2893    }
2894
2895    #[tokio::test]
2896    async fn async_parallel_execution() {
2897        let p1 = Place::<i32>::new("p1");
2898        let p2 = Place::<i32>::new("p2");
2899        let p3 = Place::<i32>::new("p3");
2900        let out1 = Place::<i32>::new("out1");
2901        let out2 = Place::<i32>::new("out2");
2902        let out3 = Place::<i32>::new("out3");
2903
2904        let make_transition = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
2905            Transition::builder(name)
2906                .input(one(inp))
2907                .output(out_place(outp))
2908                .action(async_action(|mut ctx| async move {
2909                    let v: i32 =
2910                        *ctx.input::<i32>(ctx.transition_name().replace("t", "p").as_str())?;
2911                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2912                    ctx.output(&ctx.transition_name().replace("t", "out"), v)?;
2913                    Ok(ctx)
2914                }))
2915                .build()
2916        };
2917
2918        let t1 = make_transition("t1", &p1, &out1);
2919        let t2 = make_transition("t2", &p2, &out2);
2920        let t3 = make_transition("t3", &p3, &out3);
2921
2922        let net = PetriNet::builder("parallel")
2923            .transitions([t1, t2, t3])
2924            .build();
2925        let mut marking = Marking::new();
2926        marking.add(&p1, Token::at(1, 0));
2927        marking.add(&p2, Token::at(2, 0));
2928        marking.add(&p3, Token::at(3, 0));
2929
2930        let mut executor =
2931            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2932
2933        let start = std::time::Instant::now();
2934        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2935        executor.run_async(rx).await;
2936        let elapsed = start.elapsed().as_millis();
2937
2938        assert_eq!(executor.marking().count("out1"), 1);
2939        assert_eq!(executor.marking().count("out2"), 1);
2940        assert_eq!(executor.marking().count("out3"), 1);
2941        assert!(
2942            elapsed < 250,
2943            "parallel execution should take < 250ms, took {elapsed}ms"
2944        );
2945    }
2946
2947    #[tokio::test]
2948    async fn async_sequential_chain_order() {
2949        use std::sync::Mutex;
2950
2951        let p1 = Place::<i32>::new("p1");
2952        let p2 = Place::<i32>::new("p2");
2953        let p3 = Place::<i32>::new("p3");
2954        let p4 = Place::<i32>::new("p4");
2955
2956        let order: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
2957
2958        let make_chain = |name: &str,
2959                          inp: &Place<i32>,
2960                          outp: &Place<i32>,
2961                          id: i32,
2962                          order: Arc<Mutex<Vec<i32>>>| {
2963            let inp_name: Arc<str> = Arc::from(inp.name());
2964            let outp_name: Arc<str> = Arc::from(outp.name());
2965            Transition::builder(name)
2966                .input(one(inp))
2967                .output(out_place(outp))
2968                .action(async_action(move |mut ctx| {
2969                    let order = Arc::clone(&order);
2970                    let inp_name = Arc::clone(&inp_name);
2971                    let outp_name = Arc::clone(&outp_name);
2972                    async move {
2973                        let v: i32 = *ctx.input::<i32>(&inp_name)?;
2974                        order.lock().unwrap().push(id);
2975                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2976                        ctx.output(&outp_name, v)?;
2977                        Ok(ctx)
2978                    }
2979                }))
2980                .build()
2981        };
2982
2983        let t1 = make_chain("t1", &p1, &p2, 1, Arc::clone(&order));
2984        let t2 = make_chain("t2", &p2, &p3, 2, Arc::clone(&order));
2985        let t3 = make_chain("t3", &p3, &p4, 3, Arc::clone(&order));
2986
2987        let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
2988        let mut marking = Marking::new();
2989        marking.add(&p1, Token::at(1, 0));
2990
2991        let mut executor =
2992            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2993
2994        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2995        executor.run_async(rx).await;
2996
2997        assert_eq!(executor.marking().count("p4"), 1);
2998        let recorded = order.lock().unwrap().clone();
2999        assert_eq!(recorded, vec![1, 2, 3], "chain should execute in order");
3000    }
3001
3002    #[tokio::test]
3003    async fn async_fork_join() {
3004        use libpetri_core::output::and;
3005
3006        let p1 = Place::<i32>::new("p1");
3007        let p2 = Place::<i32>::new("p2");
3008        let p3 = Place::<i32>::new("p3");
3009        let p4 = Place::<i32>::new("p4");
3010
3011        // Fork: p1 -> (p2, p3) via AND output
3012        let t_fork = Transition::builder("fork")
3013            .input(one(&p1))
3014            .output(and(vec![out_place(&p2), out_place(&p3)]))
3015            .action(libpetri_core::action::sync_action(|ctx| {
3016                let v = ctx.input::<i32>("p1")?;
3017                ctx.output("p2", *v)?;
3018                ctx.output("p3", *v)?;
3019                Ok(())
3020            }))
3021            .build();
3022
3023        // Join: (p2, p3) -> p4
3024        let t_join = Transition::builder("join")
3025            .input(one(&p2))
3026            .input(one(&p3))
3027            .output(out_place(&p4))
3028            .action(libpetri_core::action::sync_action(|ctx| {
3029                let a = ctx.input::<i32>("p2")?;
3030                let b = ctx.input::<i32>("p3")?;
3031                ctx.output("p4", *a + *b)?;
3032                Ok(())
3033            }))
3034            .build();
3035
3036        let net = PetriNet::builder("fork_join")
3037            .transitions([t_fork, t_join])
3038            .build();
3039
3040        let mut marking = Marking::new();
3041        marking.add(&p1, Token::at(5, 0));
3042
3043        let mut executor =
3044            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3045
3046        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3047        executor.run_async(rx).await;
3048
3049        assert_eq!(executor.marking().count("p2"), 0);
3050        assert_eq!(executor.marking().count("p3"), 0);
3051        assert_eq!(executor.marking().count("p4"), 1);
3052        assert_eq!(*executor.marking().peek(&p4).unwrap(), 10);
3053    }
3054
3055    #[tokio::test]
3056    async fn async_xor_output_branching() {
3057        use libpetri_core::output::xor;
3058
3059        let p = Place::<i32>::new("p");
3060        let left = Place::<i32>::new("left");
3061        let right = Place::<i32>::new("right");
3062
3063        let t = Transition::builder("xor_t")
3064            .input(one(&p))
3065            .output(xor(vec![out_place(&left), out_place(&right)]))
3066            .action(libpetri_core::action::sync_action(|ctx| {
3067                let v = ctx.input::<i32>("p")?;
3068                if *v > 0 {
3069                    ctx.output("left", *v)?;
3070                } else {
3071                    ctx.output("right", *v)?;
3072                }
3073                Ok(())
3074            }))
3075            .build();
3076
3077        let net = PetriNet::builder("xor_test").transition(t).build();
3078
3079        // Test positive value goes left
3080        let mut marking = Marking::new();
3081        marking.add(&p, Token::at(42, 0));
3082
3083        let mut executor =
3084            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3085        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3086        executor.run_async(rx).await;
3087
3088        assert_eq!(executor.marking().count("left"), 1);
3089        assert_eq!(executor.marking().count("right"), 0);
3090        assert_eq!(*executor.marking().peek(&left).unwrap(), 42);
3091    }
3092
3093    #[tokio::test]
3094    async fn async_loop_with_guard() {
3095        use libpetri_core::input::one_guarded;
3096
3097        let counter = Place::<i32>::new("counter");
3098        let done = Place::<i32>::new("done");
3099
3100        // Loop transition: counter -> counter (increment), guarded to fire when < 3
3101        let t_loop = Transition::builder("loop")
3102            .input(one_guarded(&counter, |v: &i32| *v < 3))
3103            .output(out_place(&counter))
3104            .action(libpetri_core::action::sync_action(|ctx| {
3105                let v = ctx.input::<i32>("counter")?;
3106                ctx.output("counter", *v + 1)?;
3107                Ok(())
3108            }))
3109            .build();
3110
3111        // Exit transition: counter -> done, guarded to fire when >= 3
3112        let t_exit = Transition::builder("exit")
3113            .input(one_guarded(&counter, |v: &i32| *v >= 3))
3114            .output(out_place(&done))
3115            .action(fork())
3116            .build();
3117
3118        let net = PetriNet::builder("loop_net")
3119            .transitions([t_loop, t_exit])
3120            .build();
3121
3122        let mut marking = Marking::new();
3123        marking.add(&counter, Token::at(0, 0));
3124
3125        let mut executor =
3126            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3127
3128        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3129        executor.run_async(rx).await;
3130
3131        assert_eq!(executor.marking().count("done"), 1);
3132        assert_eq!(*executor.marking().peek(&done).unwrap(), 3);
3133    }
3134
3135    #[tokio::test]
3136    async fn async_delayed_fires_without_injection() {
3137        use libpetri_core::timing::delayed;
3138
3139        let p1 = Place::<i32>::new("p1");
3140        let p2 = Place::<i32>::new("p2");
3141
3142        let t1 = Transition::builder("t1")
3143            .input(one(&p1))
3144            .output(out_place(&p2))
3145            .timing(delayed(100))
3146            .action(fork())
3147            .build();
3148
3149        let net = PetriNet::builder("test").transition(t1).build();
3150        let mut marking = Marking::new();
3151        marking.add(&p1, Token::at(7, 0));
3152
3153        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3154            &net,
3155            marking,
3156            ExecutorOptions {
3157                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3158            },
3159        );
3160
3161        // No injections; keep channel open long enough for the delayed transition to fire
3162        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3163        tokio::spawn(async move {
3164            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3165            drop(tx);
3166        });
3167        executor.run_async(rx).await;
3168
3169        assert_eq!(executor.marking().count("p2"), 1);
3170        assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3171    }
3172
3173    #[tokio::test]
3174    #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3175    async fn async_timeout_produces_timeout_token() {
3176        use libpetri_core::output::{timeout_place, xor};
3177
3178        let p1 = Place::<i32>::new("p1");
3179        let success = Place::<i32>::new("success");
3180        let timeout_out = Place::<i32>::new("timeout_out");
3181
3182        let t1 = Transition::builder("t1")
3183            .input(one(&p1))
3184            .output(xor(vec![
3185                out_place(&success),
3186                timeout_place(50, &timeout_out),
3187            ]))
3188            .action(async_action(|mut ctx| async move {
3189                let v: i32 = *ctx.input::<i32>("p1")?;
3190                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3191                ctx.output("success", v)?;
3192                Ok(ctx)
3193            }))
3194            .build();
3195
3196        let net = PetriNet::builder("test").transition(t1).build();
3197        let mut marking = Marking::new();
3198        marking.add(&p1, Token::at(1, 0));
3199
3200        let mut executor =
3201            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3202
3203        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3204        executor.run_async(rx).await;
3205
3206        assert_eq!(executor.marking().count("timeout_out"), 1);
3207        assert_eq!(executor.marking().count("success"), 0);
3208    }
3209
3210    #[tokio::test]
3211    #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3212    async fn async_timeout_normal_when_fast() {
3213        use libpetri_core::output::{timeout_place, xor};
3214
3215        let p1 = Place::<i32>::new("p1");
3216        let success = Place::<i32>::new("success");
3217        let timeout_out = Place::<i32>::new("timeout_out");
3218
3219        let t1 = Transition::builder("t1")
3220            .input(one(&p1))
3221            .output(xor(vec![
3222                out_place(&success),
3223                timeout_place(500, &timeout_out),
3224            ]))
3225            .action(async_action(|mut ctx| async move {
3226                let v: i32 = *ctx.input::<i32>("p1")?;
3227                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3228                ctx.output("success", v)?;
3229                Ok(ctx)
3230            }))
3231            .build();
3232
3233        let net = PetriNet::builder("test").transition(t1).build();
3234        let mut marking = Marking::new();
3235        marking.add(&p1, Token::at(1, 0));
3236
3237        let mut executor =
3238            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3239
3240        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3241        executor.run_async(rx).await;
3242
3243        assert_eq!(executor.marking().count("success"), 1);
3244        assert_eq!(executor.marking().count("timeout_out"), 0);
3245    }
3246
3247    #[tokio::test]
3248    async fn async_event_store_records_token_added_for_injection() {
3249        let p1 = Place::<i32>::new("p1");
3250        let p2 = Place::<i32>::new("p2");
3251
3252        let t1 = Transition::builder("t1")
3253            .input(one(&p1))
3254            .output(out_place(&p2))
3255            .action(fork())
3256            .build();
3257
3258        let net = PetriNet::builder("test").transition(t1).build();
3259        let marking = Marking::new();
3260
3261        let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
3262            &net,
3263            marking,
3264            ExecutorOptions {
3265                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3266            },
3267        );
3268
3269        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3270
3271        tokio::spawn(async move {
3272            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3273            let token = Token::at(99, 0);
3274            tx.send(ExecutorSignal::Event(ExternalEvent {
3275                place_name: Arc::from("p1"),
3276                token: ErasedToken::from_typed(&token),
3277            }))
3278            .unwrap();
3279        });
3280
3281        executor.run_async(rx).await;
3282
3283        let events = executor.event_store().events();
3284        // Should have TokenAdded for the injected token into p1
3285        assert!(
3286            events.iter().any(
3287                |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p1")
3288            ),
3289            "expected TokenAdded event for injected token into p1"
3290        );
3291        // And also TokenAdded for the output into p2
3292        assert!(
3293            events.iter().any(
3294                |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p2")
3295            ),
3296            "expected TokenAdded event for output token into p2"
3297        );
3298    }
3299
3300    #[tokio::test]
3301    async fn async_inhibitor_blocks_in_async() {
3302        let p1 = Place::<()>::new("p1");
3303        let p2 = Place::<()>::new("p2");
3304        let p_inh = Place::<()>::new("inh");
3305
3306        let t = Transition::builder("t1")
3307            .input(one(&p1))
3308            .output(out_place(&p2))
3309            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
3310            .action(fork())
3311            .build();
3312
3313        let net = PetriNet::builder("inhibitor").transition(t).build();
3314
3315        let mut marking = Marking::new();
3316        marking.add(&p1, Token::at((), 0));
3317        marking.add(&p_inh, Token::at((), 0));
3318
3319        let mut executor =
3320            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3321
3322        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3323        executor.run_async(rx).await;
3324
3325        // Inhibitor should block — token remains in p1, nothing in p2
3326        assert_eq!(executor.marking().count("p1"), 1);
3327        assert_eq!(executor.marking().count("p2"), 0);
3328    }
3329
3330    // ==================== Drain/Close lifecycle tests ====================
3331
3332    #[tokio::test]
3333    async fn async_drain_terminates_at_quiescence() {
3334        let p1 = Place::<i32>::new("p1");
3335        let p2 = Place::<i32>::new("p2");
3336
3337        let t1 = Transition::builder("t1")
3338            .input(one(&p1))
3339            .output(out_place(&p2))
3340            .action(fork())
3341            .build();
3342
3343        let net = PetriNet::builder("test").transition(t1).build();
3344        let marking = Marking::new();
3345
3346        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3347            &net,
3348            marking,
3349            ExecutorOptions {
3350                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3351            },
3352        );
3353
3354        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3355
3356        tokio::spawn(async move {
3357            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3358            // Inject a token, then drain
3359            tx.send(ExecutorSignal::Event(ExternalEvent {
3360                place_name: Arc::from("p1"),
3361                token: ErasedToken::from_typed(&Token::at(42, 0)),
3362            }))
3363            .unwrap();
3364            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3365            tx.send(ExecutorSignal::Drain).unwrap();
3366        });
3367
3368        executor.run_async(rx).await;
3369
3370        // The injected token should have been processed
3371        assert_eq!(executor.marking().count("p2"), 1);
3372        assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
3373    }
3374
3375    #[tokio::test]
3376    async fn async_drain_rejects_post_drain_events() {
3377        let p1 = Place::<i32>::new("p1");
3378        let p2 = Place::<i32>::new("p2");
3379
3380        let t1 = Transition::builder("t1")
3381            .input(one(&p1))
3382            .output(out_place(&p2))
3383            .action(fork())
3384            .build();
3385
3386        let net = PetriNet::builder("test").transition(t1).build();
3387        let marking = Marking::new();
3388
3389        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3390            &net,
3391            marking,
3392            ExecutorOptions {
3393                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3394            },
3395        );
3396
3397        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3398
3399        tokio::spawn(async move {
3400            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3401            // Send Drain first, then an event — event should be discarded
3402            tx.send(ExecutorSignal::Drain).unwrap();
3403            tx.send(ExecutorSignal::Event(ExternalEvent {
3404                place_name: Arc::from("p1"),
3405                token: ErasedToken::from_typed(&Token::at(99, 0)),
3406            }))
3407            .unwrap();
3408        });
3409
3410        executor.run_async(rx).await;
3411
3412        // Event sent after drain should not have been processed
3413        assert_eq!(executor.marking().count("p2"), 0);
3414    }
3415
3416    #[tokio::test]
3417    async fn async_close_discards_queued_events() {
3418        let p1 = Place::<i32>::new("p1");
3419        let p2 = Place::<i32>::new("p2");
3420
3421        let t1 = Transition::builder("t1")
3422            .input(one(&p1))
3423            .output(out_place(&p2))
3424            .action(fork())
3425            .build();
3426
3427        let net = PetriNet::builder("test").transition(t1).build();
3428        let marking = Marking::new();
3429
3430        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3431            &net,
3432            marking,
3433            ExecutorOptions {
3434                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3435            },
3436        );
3437
3438        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3439
3440        // Queue events then close — close should discard all pending events
3441        tx.send(ExecutorSignal::Event(ExternalEvent {
3442            place_name: Arc::from("p1"),
3443            token: ErasedToken::from_typed(&Token::at(1, 0)),
3444        }))
3445        .unwrap();
3446        tx.send(ExecutorSignal::Close).unwrap();
3447        tx.send(ExecutorSignal::Event(ExternalEvent {
3448            place_name: Arc::from("p1"),
3449            token: ErasedToken::from_typed(&Token::at(2, 0)),
3450        }))
3451        .unwrap();
3452        drop(tx);
3453
3454        executor.run_async(rx).await;
3455
3456        // The first event arrives before Close — it gets processed in the first
3457        // try_recv batch. Close then discards all remaining events.
3458        // So at most 1 token should be in p2 (the one before Close).
3459        assert!(executor.marking().count("p2") <= 1);
3460    }
3461
3462    #[tokio::test]
3463    async fn async_close_after_drain_escalates() {
3464        let p1 = Place::<i32>::new("p1");
3465        let p2 = Place::<i32>::new("p2");
3466
3467        let t1 = Transition::builder("t1")
3468            .input(one(&p1))
3469            .output(out_place(&p2))
3470            .action(fork())
3471            .build();
3472
3473        let net = PetriNet::builder("test").transition(t1).build();
3474        let marking = Marking::new();
3475
3476        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3477            &net,
3478            marking,
3479            ExecutorOptions {
3480                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3481            },
3482        );
3483
3484        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3485
3486        tokio::spawn(async move {
3487            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3488            // Drain first, then escalate to close
3489            tx.send(ExecutorSignal::Drain).unwrap();
3490            tx.send(ExecutorSignal::Close).unwrap();
3491        });
3492
3493        // Executor should terminate — close escalates from drain
3494        executor.run_async(rx).await;
3495        // No assertions needed — the test passes if run_async returns
3496    }
3497
3498    #[tokio::test]
3499    async fn async_handle_raii_drain_on_drop() {
3500        use crate::executor_handle::ExecutorHandle;
3501
3502        let p1 = Place::<i32>::new("p1");
3503        let p2 = Place::<i32>::new("p2");
3504
3505        let t1 = Transition::builder("t1")
3506            .input(one(&p1))
3507            .output(out_place(&p2))
3508            .action(fork())
3509            .build();
3510
3511        let net = PetriNet::builder("test").transition(t1).build();
3512        let marking = Marking::new();
3513
3514        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3515            &net,
3516            marking,
3517            ExecutorOptions {
3518                environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3519            },
3520        );
3521
3522        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3523
3524        tokio::spawn(async move {
3525            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3526            let mut handle = ExecutorHandle::new(tx);
3527            handle.inject(Arc::from("p1"), ErasedToken::from_typed(&Token::at(7, 0)));
3528            // handle dropped here — RAII sends Drain automatically
3529        });
3530
3531        executor.run_async(rx).await;
3532
3533        assert_eq!(executor.marking().count("p2"), 1);
3534        assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3535    }
3536}