Skip to main content

libpetri_runtime/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::output::Out;
8use libpetri_core::petri_net::PetriNet;
9use libpetri_core::token::ErasedToken;
10
11use libpetri_event::event_store::EventStore;
12use libpetri_event::net_event::NetEvent;
13
14use crate::bitmap;
15use crate::compiled_net::CompiledNet;
16use crate::marking::Marking;
17
18/// Tolerance for deadline enforcement to account for timer jitter.
19const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21/// Bitmap-based executor for Coloured Time Petri Nets.
22///
23/// Generic over `E: EventStore` for zero-cost noop event recording.
24/// The sync path (`run_sync`) executes inline without any async runtime.
25pub struct BitmapNetExecutor<E: EventStore> {
26    compiled: CompiledNet,
27    marking: Marking,
28    event_store: E,
29    #[allow(dead_code)]
30    environment_places: HashSet<Arc<str>>,
31    #[allow(dead_code)]
32    long_running: bool,
33
34    // Bitmaps
35    marked_places: Vec<u64>,
36    dirty_set: Vec<u64>,
37    marking_snap_buffer: Vec<u64>,
38    dirty_snap_buffer: Vec<u64>,
39    firing_snap_buffer: Vec<u64>,
40
41    // Per-transition state
42    enabled_at_ms: Vec<f64>,
43    enabled_flags: Vec<bool>,
44    has_deadline_flags: Vec<bool>,
45    enabled_transition_count: usize,
46
47    // Precomputed flags
48    all_immediate: bool,
49    all_same_priority: bool,
50    has_any_deadlines: bool,
51
52    // Pending reset places for clock-restart detection
53    pending_reset_places: HashSet<Arc<str>>,
54    transition_input_place_names: Vec<HashSet<Arc<str>>>,
55
56    start_time: Instant,
57}
58
59/// Options for constructing a BitmapNetExecutor.
60#[derive(Default)]
61pub struct ExecutorOptions {
62    pub environment_places: HashSet<Arc<str>>,
63    pub long_running: bool,
64}
65
66impl<E: EventStore> BitmapNetExecutor<E> {
67    /// Creates a new executor for the given net with initial tokens.
68    pub fn new(net: &PetriNet, initial_tokens: Marking, options: ExecutorOptions) -> Self {
69        let compiled = CompiledNet::compile(net);
70        let word_count = compiled.word_count;
71        let tc = compiled.transition_count;
72        let dirty_word_count = bitmap::word_count(tc);
73
74        let mut has_any_deadlines = false;
75        let mut all_immediate = true;
76        let mut all_same_priority = true;
77        let first_priority = if tc > 0 {
78            compiled.transition(0).priority()
79        } else {
80            0
81        };
82        let mut has_deadline_flags = vec![false; tc];
83
84        for (tid, flag) in has_deadline_flags.iter_mut().enumerate() {
85            let t = compiled.transition(tid);
86            if t.timing().has_deadline() {
87                *flag = true;
88                has_any_deadlines = true;
89            }
90            if *t.timing() != libpetri_core::timing::Timing::Immediate {
91                all_immediate = false;
92            }
93            if t.priority() != first_priority {
94                all_same_priority = false;
95            }
96        }
97
98        // Precompute input place names per transition
99        let mut transition_input_place_names = Vec::with_capacity(tc);
100        for tid in 0..tc {
101            let t = compiled.transition(tid);
102            let names: HashSet<Arc<str>> = t
103                .input_specs()
104                .iter()
105                .map(|s| Arc::clone(s.place().name_arc()))
106                .collect();
107            transition_input_place_names.push(names);
108        }
109
110        Self {
111            compiled,
112            marking: initial_tokens,
113            event_store: E::default(),
114            environment_places: options.environment_places,
115            long_running: options.long_running,
116            marked_places: vec![0u64; word_count],
117            dirty_set: vec![0u64; dirty_word_count],
118            marking_snap_buffer: vec![0u64; word_count],
119            dirty_snap_buffer: vec![0u64; dirty_word_count],
120            firing_snap_buffer: vec![0u64; word_count],
121            enabled_at_ms: vec![f64::NEG_INFINITY; tc],
122            enabled_flags: vec![false; tc],
123            has_deadline_flags,
124            enabled_transition_count: 0,
125            all_immediate,
126            all_same_priority,
127            has_any_deadlines,
128            pending_reset_places: HashSet::new(),
129            transition_input_place_names,
130            start_time: Instant::now(),
131        }
132    }
133
134    /// Runs the executor synchronously until completion.
135    ///
136    /// All transition actions must be sync (is_sync() returns true).
137    /// Returns the final marking.
138    pub fn run_sync(&mut self) -> &Marking {
139        self.initialize_marked_bitmap();
140        self.mark_all_dirty();
141
142        if E::ENABLED {
143            let now = now_millis();
144            self.event_store.append(NetEvent::ExecutionStarted {
145                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
146                timestamp: now,
147            });
148        }
149
150        loop {
151            self.update_dirty_transitions();
152
153            let cycle_now = self.elapsed_ms();
154
155            if self.has_any_deadlines {
156                self.enforce_deadlines(cycle_now);
157            }
158
159            if self.should_terminate() {
160                break;
161            }
162
163            if self.all_immediate && self.all_same_priority {
164                self.fire_ready_immediate_sync();
165            } else {
166                self.fire_ready_general_sync(cycle_now);
167            }
168
169            // If nothing is dirty anymore and nothing is enabled, we're done
170            if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
171                break;
172            }
173        }
174
175        if E::ENABLED {
176            let now = now_millis();
177            self.event_store.append(NetEvent::ExecutionCompleted {
178                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
179                timestamp: now,
180            });
181        }
182
183        &self.marking
184    }
185
186    /// Returns a reference to the current marking.
187    pub fn marking(&self) -> &Marking {
188        &self.marking
189    }
190
191    /// Returns a reference to the event store.
192    pub fn event_store(&self) -> &E {
193        &self.event_store
194    }
195
196    /// Returns true if the executor is quiescent (no enabled or in-flight transitions).
197    pub fn is_quiescent(&self) -> bool {
198        self.enabled_transition_count == 0
199    }
200
201    // ======================== Initialize ========================
202
203    fn initialize_marked_bitmap(&mut self) {
204        for pid in 0..self.compiled.place_count {
205            let place = self.compiled.place(pid);
206            if self.marking.has_tokens(place.name()) {
207                bitmap::set_bit(&mut self.marked_places, pid);
208            }
209        }
210    }
211
212    fn mark_all_dirty(&mut self) {
213        let tc = self.compiled.transition_count;
214        let dirty_words = self.dirty_set.len();
215        for w in 0..dirty_words.saturating_sub(1) {
216            self.dirty_set[w] = u64::MAX;
217        }
218        if dirty_words > 0 {
219            let last_word_bits = tc & bitmap::WORD_MASK;
220            self.dirty_set[dirty_words - 1] = if last_word_bits == 0 {
221                u64::MAX
222            } else {
223                (1u64 << last_word_bits) - 1
224            };
225        }
226    }
227
228    fn should_terminate(&self) -> bool {
229        if self.long_running {
230            return false;
231        }
232        self.enabled_transition_count == 0
233    }
234
235    // ======================== Dirty Set Transitions ========================
236
237    fn update_dirty_transitions(&mut self) {
238        let now_ms = self.elapsed_ms();
239
240        // Snapshot marking bitmap
241        self.marking_snap_buffer
242            .copy_from_slice(&self.marked_places);
243
244        // Snapshot and clear dirty set
245        let dirty_words = self.dirty_set.len();
246        for w in 0..dirty_words {
247            self.dirty_snap_buffer[w] = self.dirty_set[w];
248            self.dirty_set[w] = 0;
249        }
250
251        // Collect dirty transition IDs first to avoid borrow conflict
252        let tc = self.compiled.transition_count;
253        let mut dirty_tids = Vec::new();
254        bitmap::for_each_set_bit(&self.dirty_snap_buffer, |tid| {
255            if tid < tc {
256                dirty_tids.push(tid);
257            }
258        });
259
260        let marking_snap = self.marking_snap_buffer.clone();
261        for tid in dirty_tids {
262            let was_enabled = self.enabled_flags[tid];
263            let can_now = self.can_enable(tid, &marking_snap);
264
265            if can_now && !was_enabled {
266                self.enabled_flags[tid] = true;
267                self.enabled_transition_count += 1;
268                self.enabled_at_ms[tid] = now_ms;
269
270                if E::ENABLED {
271                    self.event_store.append(NetEvent::TransitionEnabled {
272                        transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
273                        timestamp: now_millis(),
274                    });
275                }
276            } else if !can_now && was_enabled {
277                self.enabled_flags[tid] = false;
278                self.enabled_transition_count -= 1;
279                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
280            } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
281                self.enabled_at_ms[tid] = now_ms;
282                if E::ENABLED {
283                    self.event_store.append(NetEvent::TransitionClockRestarted {
284                        transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
285                        timestamp: now_millis(),
286                    });
287                }
288            }
289        }
290
291        self.pending_reset_places.clear();
292    }
293
294    fn enforce_deadlines(&mut self, now_ms: f64) {
295        for tid in 0..self.compiled.transition_count {
296            if !self.has_deadline_flags[tid] || !self.enabled_flags[tid] {
297                continue;
298            }
299            let t = self.compiled.transition(tid);
300            let elapsed = now_ms - self.enabled_at_ms[tid];
301            let latest_ms = t.timing().latest() as f64;
302            if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
303                self.enabled_flags[tid] = false;
304                self.enabled_transition_count -= 1;
305                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
306
307                if E::ENABLED {
308                    self.event_store.append(NetEvent::TransitionTimedOut {
309                        transition_name: Arc::clone(t.name_arc()),
310                        timestamp: now_millis(),
311                    });
312                }
313            }
314        }
315    }
316
317    fn can_enable(&self, tid: usize, marking_snap: &[u64]) -> bool {
318        if !self.compiled.can_enable_bitmap(tid, marking_snap) {
319            return false;
320        }
321
322        // Cardinality check
323        if let Some(card_check) = self.compiled.cardinality_check(tid) {
324            for i in 0..card_check.place_ids.len() {
325                let pid = card_check.place_ids[i];
326                let required = card_check.required_counts[i];
327                let place = self.compiled.place(pid);
328                if self.marking.count(place.name()) < required {
329                    return false;
330                }
331            }
332        }
333
334        // Guard check
335        if self.compiled.has_guards(tid) {
336            let t = self.compiled.transition(tid);
337            for spec in t.input_specs() {
338                if let Some(guard) = spec.guard() {
339                    let required = match spec {
340                        In::One { .. } => 1,
341                        In::Exactly { count, .. } => *count,
342                        In::AtLeast { minimum, .. } => *minimum,
343                        In::All { .. } => 1,
344                    };
345                    if self.marking.count_matching(spec.place_name(), &**guard) < required {
346                        return false;
347                    }
348                }
349            }
350        }
351
352        true
353    }
354
355    fn has_input_from_reset_place(&self, tid: usize) -> bool {
356        if self.pending_reset_places.is_empty() {
357            return false;
358        }
359        let input_names = &self.transition_input_place_names[tid];
360        for name in &self.pending_reset_places {
361            if input_names.contains(name) {
362                return true;
363            }
364        }
365        false
366    }
367
368    // ======================== Firing (Sync) ========================
369
370    fn fire_ready_immediate_sync(&mut self) {
371        for tid in 0..self.compiled.transition_count {
372            if !self.enabled_flags[tid] {
373                continue;
374            }
375            if self.can_enable(tid, &self.marked_places.clone()) {
376                self.fire_transition_sync(tid);
377            } else {
378                self.enabled_flags[tid] = false;
379                self.enabled_transition_count -= 1;
380                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
381            }
382        }
383    }
384
385    fn fire_ready_general_sync(&mut self, now_ms: f64) {
386        // Collect ready transitions
387        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
388        for tid in 0..self.compiled.transition_count {
389            if !self.enabled_flags[tid] {
390                continue;
391            }
392            let t = self.compiled.transition(tid);
393            let enabled_ms = self.enabled_at_ms[tid];
394            let elapsed = now_ms - enabled_ms;
395            let earliest_ms = t.timing().earliest() as f64;
396            if earliest_ms <= elapsed {
397                ready.push((tid, t.priority(), enabled_ms));
398            }
399        }
400        if ready.is_empty() {
401            return;
402        }
403
404        // Sort: higher priority first, then earlier enablement (FIFO)
405        ready.sort_by(|a, b| {
406            b.1.cmp(&a.1)
407                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
408        });
409
410        // Take fresh snapshot
411        self.firing_snap_buffer.copy_from_slice(&self.marked_places);
412
413        for (tid, _, _) in ready {
414            if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
415                self.fire_transition_sync(tid);
416                self.firing_snap_buffer.copy_from_slice(&self.marked_places);
417            } else {
418                self.enabled_flags[tid] = false;
419                self.enabled_transition_count -= 1;
420                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
421            }
422        }
423    }
424
425    fn fire_transition_sync(&mut self, tid: usize) {
426        // Clone all needed data from the transition to release the borrow on self.compiled
427        let t = self.compiled.transition(tid);
428        let transition_name = Arc::clone(t.name_arc());
429        let input_specs: Vec<In> = t.input_specs().to_vec();
430        let read_arcs: Vec<_> = t.reads().to_vec();
431        let reset_arcs: Vec<_> = t.resets().to_vec();
432        let output_place_names: HashSet<Arc<str>> = t
433            .output_places()
434            .iter()
435            .map(|p| Arc::clone(p.name_arc()))
436            .collect();
437        let action = Arc::clone(t.action());
438        // t (reference) is no longer used after this point
439
440        // Consume tokens based on input specs
441        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
442        for in_spec in &input_specs {
443            let place_name = in_spec.place_name();
444            let to_consume = match in_spec {
445                In::One { .. } => 1,
446                In::Exactly { count, .. } => *count,
447                In::All { guard, .. } | In::AtLeast { guard, .. } => {
448                    if guard.is_some() {
449                        self.marking
450                            .count_matching(place_name, &**guard.as_ref().unwrap())
451                    } else {
452                        self.marking.count(place_name)
453                    }
454                }
455            };
456
457            let place_name_arc = Arc::clone(in_spec.place().name_arc());
458            for _ in 0..to_consume {
459                let token = if let Some(guard) = in_spec.guard() {
460                    self.marking.remove_matching(place_name, &**guard)
461                } else {
462                    self.marking.remove_first(place_name)
463                };
464                if let Some(token) = token {
465                    if E::ENABLED {
466                        self.event_store.append(NetEvent::TokenRemoved {
467                            place_name: Arc::clone(&place_name_arc),
468                            timestamp: now_millis(),
469                        });
470                    }
471                    inputs
472                        .entry(Arc::clone(&place_name_arc))
473                        .or_default()
474                        .push(token);
475                }
476            }
477        }
478
479        // Read arcs (peek, don't consume)
480        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
481        for arc in &read_arcs {
482            if let Some(queue) = self.marking.queue(arc.place.name())
483                && let Some(token) = queue.front()
484            {
485                read_tokens
486                    .entry(Arc::clone(arc.place.name_arc()))
487                    .or_default()
488                    .push(token.clone());
489            }
490        }
491
492        // Reset arcs
493        for arc in &reset_arcs {
494            let removed = self.marking.remove_all(arc.place.name());
495            self.pending_reset_places
496                .insert(Arc::clone(arc.place.name_arc()));
497            if E::ENABLED {
498                for _ in &removed {
499                    self.event_store.append(NetEvent::TokenRemoved {
500                        place_name: Arc::clone(arc.place.name_arc()),
501                        timestamp: now_millis(),
502                    });
503                }
504            }
505        }
506
507        // Update bitmap after consumption
508        self.update_bitmap_after_consumption(tid);
509
510        if E::ENABLED {
511            self.event_store.append(NetEvent::TransitionStarted {
512                transition_name: Arc::clone(&transition_name),
513                timestamp: now_millis(),
514            });
515        }
516
517        // Create context and run action
518        let mut ctx = TransitionContext::new(
519            Arc::clone(&transition_name),
520            inputs,
521            read_tokens,
522            output_place_names,
523            None,
524        );
525
526        let result = action.run_sync(&mut ctx);
527
528        match result {
529            Ok(()) => {
530                // Process outputs
531                let outputs = ctx.take_outputs();
532                self.process_outputs(tid, &transition_name, outputs);
533
534                if E::ENABLED {
535                    self.event_store.append(NetEvent::TransitionCompleted {
536                        transition_name: Arc::clone(&transition_name),
537                        timestamp: now_millis(),
538                    });
539                }
540            }
541            Err(err) => {
542                if E::ENABLED {
543                    self.event_store.append(NetEvent::TransitionFailed {
544                        transition_name: Arc::clone(&transition_name),
545                        error: err.message,
546                        timestamp: now_millis(),
547                    });
548                }
549            }
550        }
551
552        // Mark transition as no longer enabled (it just fired)
553        self.enabled_flags[tid] = false;
554        self.enabled_transition_count -= 1;
555        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
556
557        // Mark this transition dirty for re-evaluation
558        self.mark_transition_dirty(tid);
559    }
560
561    fn process_outputs(
562        &mut self,
563        _tid: usize,
564        _transition_name: &Arc<str>,
565        outputs: Vec<OutputEntry>,
566    ) {
567        for entry in outputs {
568            self.marking.add_erased(&entry.place_name, entry.token);
569
570            if let Some(pid) = self.compiled.place_id(&entry.place_name) {
571                bitmap::set_bit(&mut self.marked_places, pid);
572                self.mark_dirty(pid);
573            }
574
575            if E::ENABLED {
576                self.event_store.append(NetEvent::TokenAdded {
577                    place_name: Arc::clone(&entry.place_name),
578                    timestamp: now_millis(),
579                });
580            }
581        }
582    }
583
584    fn update_bitmap_after_consumption(&mut self, tid: usize) {
585        let consumption_pids: Vec<usize> = self.compiled.consumption_place_ids(tid).to_vec();
586        for pid in consumption_pids {
587            let place = self.compiled.place(pid);
588            if !self.marking.has_tokens(place.name()) {
589                bitmap::clear_bit(&mut self.marked_places, pid);
590            }
591            self.mark_dirty(pid);
592        }
593    }
594
595    // ======================== Dirty Set Helpers ========================
596
597    fn has_dirty_bits(&self) -> bool {
598        !bitmap::is_empty(&self.dirty_set)
599    }
600
601    fn mark_dirty(&mut self, pid: usize) {
602        let tids: Vec<usize> = self.compiled.affected_transitions(pid).to_vec();
603        for tid in tids {
604            self.mark_transition_dirty(tid);
605        }
606    }
607
608    fn mark_transition_dirty(&mut self, tid: usize) {
609        bitmap::set_bit(&mut self.dirty_set, tid);
610    }
611
612    fn elapsed_ms(&self) -> f64 {
613        self.start_time.elapsed().as_secs_f64() * 1000.0
614    }
615}
616
617#[cfg(feature = "tokio")]
618use crate::environment::ExternalEvent;
619
620/// Completion message sent by async actions back to the executor.
621#[cfg(feature = "tokio")]
622struct ActionCompletion {
623    transition_name: Arc<str>,
624    result: Result<Vec<OutputEntry>, String>,
625}
626
627#[cfg(feature = "tokio")]
628impl<E: EventStore> BitmapNetExecutor<E> {
629    /// Runs the executor asynchronously with tokio.
630    ///
631    /// Supports both sync and async transition actions. Sync actions execute
632    /// inline; async actions are spawned as tokio tasks and their completions
633    /// are collected via an mpsc channel.
634    ///
635    /// The executor also accepts external events via `inject()` for
636    /// environment place token injection.
637    ///
638    /// Returns the final marking when the executor quiesces or is closed.
639    pub async fn run_async(
640        &mut self,
641        mut event_rx: tokio::sync::mpsc::UnboundedReceiver<ExternalEvent>,
642    ) -> &Marking {
643        let (completion_tx, mut completion_rx) =
644            tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
645
646        self.initialize_marked_bitmap();
647        self.mark_all_dirty();
648
649        let mut in_flight_count: usize = 0;
650        let mut event_channel_open = true;
651
652        if E::ENABLED {
653            let now = now_millis();
654            self.event_store.append(NetEvent::ExecutionStarted {
655                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
656                timestamp: now,
657            });
658        }
659
660        loop {
661            // Phase 1: Process completed async actions
662            while let Ok(completion) = completion_rx.try_recv() {
663                in_flight_count -= 1;
664                match completion.result {
665                    Ok(outputs) => {
666                        self.process_outputs(0, &completion.transition_name, outputs);
667                        if E::ENABLED {
668                            self.event_store.append(NetEvent::TransitionCompleted {
669                                transition_name: Arc::clone(&completion.transition_name),
670                                timestamp: now_millis(),
671                            });
672                        }
673                    }
674                    Err(err) => {
675                        if E::ENABLED {
676                            self.event_store.append(NetEvent::TransitionFailed {
677                                transition_name: Arc::clone(&completion.transition_name),
678                                error: err,
679                                timestamp: now_millis(),
680                            });
681                        }
682                    }
683                }
684            }
685
686            // Phase 2: Process external events
687            while let Ok(event) = event_rx.try_recv() {
688                self.marking.add_erased(&event.place_name, event.token);
689                if let Some(pid) = self.compiled.place_id(&event.place_name) {
690                    bitmap::set_bit(&mut self.marked_places, pid);
691                    self.mark_dirty(pid);
692                }
693                if E::ENABLED {
694                    self.event_store.append(NetEvent::TokenAdded {
695                        place_name: Arc::clone(&event.place_name),
696                        timestamp: now_millis(),
697                    });
698                }
699            }
700
701            // Phase 3: Update dirty transitions
702            self.update_dirty_transitions();
703
704            // Phase 4: Enforce deadlines
705            let cycle_now = self.elapsed_ms();
706            if self.has_any_deadlines {
707                self.enforce_deadlines(cycle_now);
708            }
709
710            // Termination check
711            if self.enabled_transition_count == 0
712                && in_flight_count == 0
713                && (!self.long_running || !event_channel_open)
714            {
715                break;
716            }
717
718            // Phase 5: Fire ready transitions
719            let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
720
721            // If we fired something or have dirty bits, loop immediately
722            if fired || self.has_dirty_bits() {
723                // Yield to let spawned tasks run
724                tokio::task::yield_now().await;
725                continue;
726            }
727
728            // Phase 6: Await work (completion, external event, or timer)
729            if in_flight_count == 0 && !self.long_running {
730                break;
731            }
732            if in_flight_count == 0 && !event_channel_open {
733                break;
734            }
735
736            let timer_ms = self.millis_until_next_timed_transition();
737
738            tokio::select! {
739                Some(completion) = completion_rx.recv() => {
740                    in_flight_count -= 1;
741                    match completion.result {
742                        Ok(outputs) => {
743                            self.process_outputs(0, &completion.transition_name, outputs);
744                            if E::ENABLED {
745                                self.event_store.append(NetEvent::TransitionCompleted {
746                                    transition_name: Arc::clone(&completion.transition_name),
747                                    timestamp: now_millis(),
748                                });
749                            }
750                        }
751                        Err(err) => {
752                            if E::ENABLED {
753                                self.event_store.append(NetEvent::TransitionFailed {
754                                    transition_name: Arc::clone(&completion.transition_name),
755                                    error: err,
756                                    timestamp: now_millis(),
757                                });
758                            }
759                        }
760                    }
761                }
762                result = event_rx.recv(), if event_channel_open => {
763                    match result {
764                        Some(event) => {
765                            self.marking.add_erased(&event.place_name, event.token);
766                            if let Some(pid) = self.compiled.place_id(&event.place_name) {
767                                bitmap::set_bit(&mut self.marked_places, pid);
768                                self.mark_dirty(pid);
769                            }
770                            if E::ENABLED {
771                                self.event_store.append(NetEvent::TokenAdded {
772                                    place_name: Arc::clone(&event.place_name),
773                                    timestamp: now_millis(),
774                                });
775                            }
776                        }
777                        None => {
778                            event_channel_open = false;
779                        }
780                    }
781                }
782                _ = tokio::time::sleep(std::time::Duration::from_millis(
783                    if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
784                )) => {
785                    // Timer fired — re-evaluate transitions
786                }
787            }
788        }
789
790        if E::ENABLED {
791            let now = now_millis();
792            self.event_store.append(NetEvent::ExecutionCompleted {
793                net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
794                timestamp: now,
795            });
796        }
797
798        &self.marking
799    }
800
801    /// Fires ready transitions, dispatching async actions via tokio::spawn.
802    /// Returns true if any transitions were fired.
803    fn fire_ready_async(
804        &mut self,
805        now_ms: f64,
806        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
807        in_flight_count: &mut usize,
808    ) -> bool {
809        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
810        for tid in 0..self.compiled.transition_count {
811            if !self.enabled_flags[tid] {
812                continue;
813            }
814            let t = self.compiled.transition(tid);
815            let enabled_ms = self.enabled_at_ms[tid];
816            let elapsed = now_ms - enabled_ms;
817            let earliest_ms = t.timing().earliest() as f64;
818            if earliest_ms <= elapsed {
819                ready.push((tid, t.priority(), enabled_ms));
820            }
821        }
822        if ready.is_empty() {
823            return false;
824        }
825
826        ready.sort_by(|a, b| {
827            b.1.cmp(&a.1)
828                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
829        });
830
831        self.firing_snap_buffer.copy_from_slice(&self.marked_places);
832
833        let mut fired_any = false;
834        for (tid, _, _) in ready {
835            if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
836                self.fire_transition_async(tid, completion_tx, in_flight_count);
837                self.firing_snap_buffer.copy_from_slice(&self.marked_places);
838                fired_any = true;
839            } else {
840                self.enabled_flags[tid] = false;
841                self.enabled_transition_count -= 1;
842                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
843            }
844        }
845        fired_any
846    }
847
848    /// Fires a single transition, either sync inline or async via tokio::spawn.
849    fn fire_transition_async(
850        &mut self,
851        tid: usize,
852        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
853        in_flight_count: &mut usize,
854    ) {
855        let t = self.compiled.transition(tid);
856        let transition_name = Arc::clone(t.name_arc());
857        let input_specs: Vec<In> = t.input_specs().to_vec();
858        let read_arcs: Vec<_> = t.reads().to_vec();
859        let reset_arcs: Vec<_> = t.resets().to_vec();
860        let output_place_names: HashSet<Arc<str>> = t
861            .output_places()
862            .iter()
863            .map(|p| Arc::clone(p.name_arc()))
864            .collect();
865        let action = Arc::clone(t.action());
866        let is_sync = action.is_sync();
867
868        // Consume tokens
869        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
870        for in_spec in &input_specs {
871            let place_name = in_spec.place_name();
872            let to_consume = match in_spec {
873                In::One { .. } => 1,
874                In::Exactly { count, .. } => *count,
875                In::All { guard, .. } | In::AtLeast { guard, .. } => {
876                    if guard.is_some() {
877                        self.marking
878                            .count_matching(place_name, &**guard.as_ref().unwrap())
879                    } else {
880                        self.marking.count(place_name)
881                    }
882                }
883            };
884
885            let place_name_arc = Arc::clone(in_spec.place().name_arc());
886            for _ in 0..to_consume {
887                let token = if let Some(guard) = in_spec.guard() {
888                    self.marking.remove_matching(place_name, &**guard)
889                } else {
890                    self.marking.remove_first(place_name)
891                };
892                if let Some(token) = token {
893                    if E::ENABLED {
894                        self.event_store.append(NetEvent::TokenRemoved {
895                            place_name: Arc::clone(&place_name_arc),
896                            timestamp: now_millis(),
897                        });
898                    }
899                    inputs
900                        .entry(Arc::clone(&place_name_arc))
901                        .or_default()
902                        .push(token);
903                }
904            }
905        }
906
907        // Read arcs
908        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
909        for arc in &read_arcs {
910            if let Some(queue) = self.marking.queue(arc.place.name())
911                && let Some(token) = queue.front()
912            {
913                read_tokens
914                    .entry(Arc::clone(arc.place.name_arc()))
915                    .or_default()
916                    .push(token.clone());
917            }
918        }
919
920        // Reset arcs
921        for arc in &reset_arcs {
922            let removed = self.marking.remove_all(arc.place.name());
923            self.pending_reset_places
924                .insert(Arc::clone(arc.place.name_arc()));
925            if E::ENABLED {
926                for _ in &removed {
927                    self.event_store.append(NetEvent::TokenRemoved {
928                        place_name: Arc::clone(arc.place.name_arc()),
929                        timestamp: now_millis(),
930                    });
931                }
932            }
933        }
934
935        self.update_bitmap_after_consumption(tid);
936
937        if E::ENABLED {
938            self.event_store.append(NetEvent::TransitionStarted {
939                transition_name: Arc::clone(&transition_name),
940                timestamp: now_millis(),
941            });
942        }
943
944        // Mark transition as no longer enabled
945        self.enabled_flags[tid] = false;
946        self.enabled_transition_count -= 1;
947        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
948        self.mark_transition_dirty(tid);
949
950        if is_sync {
951            // Inline sync execution
952            let mut ctx = TransitionContext::new(
953                Arc::clone(&transition_name),
954                inputs,
955                read_tokens,
956                output_place_names,
957                None,
958            );
959            let result = action.run_sync(&mut ctx);
960            match result {
961                Ok(()) => {
962                    let outputs = ctx.take_outputs();
963                    self.process_outputs(tid, &transition_name, outputs);
964                    if E::ENABLED {
965                        self.event_store.append(NetEvent::TransitionCompleted {
966                            transition_name: Arc::clone(&transition_name),
967                            timestamp: now_millis(),
968                        });
969                    }
970                }
971                Err(err) => {
972                    if E::ENABLED {
973                        self.event_store.append(NetEvent::TransitionFailed {
974                            transition_name: Arc::clone(&transition_name),
975                            error: err.message,
976                            timestamp: now_millis(),
977                        });
978                    }
979                }
980            }
981        } else {
982            // Async: spawn tokio task
983            *in_flight_count += 1;
984            let tx = completion_tx.clone();
985            let name = Arc::clone(&transition_name);
986            let ctx = TransitionContext::new(
987                Arc::clone(&transition_name),
988                inputs,
989                read_tokens,
990                output_place_names,
991                None,
992            );
993            tokio::spawn(async move {
994                let result = action.run_async(ctx).await;
995                let completion = match result {
996                    Ok(mut completed_ctx) => ActionCompletion {
997                        transition_name: Arc::clone(&name),
998                        result: Ok(completed_ctx.take_outputs()),
999                    },
1000                    Err(err) => ActionCompletion {
1001                        transition_name: Arc::clone(&name),
1002                        result: Err(err.message),
1003                    },
1004                };
1005                let _ = tx.send(completion);
1006            });
1007        }
1008    }
1009
1010    /// Computes milliseconds until the next timed transition needs attention.
1011    fn millis_until_next_timed_transition(&self) -> f64 {
1012        let mut min_wait = f64::INFINITY;
1013        let now_ms = self.elapsed_ms();
1014
1015        for tid in 0..self.compiled.transition_count {
1016            if !self.enabled_flags[tid] {
1017                continue;
1018            }
1019            let t = self.compiled.transition(tid);
1020            let elapsed = now_ms - self.enabled_at_ms[tid];
1021
1022            let earliest_ms = t.timing().earliest() as f64;
1023            let remaining_earliest = earliest_ms - elapsed;
1024            if remaining_earliest <= 0.0 {
1025                return 0.0;
1026            }
1027            min_wait = min_wait.min(remaining_earliest);
1028
1029            if self.has_deadline_flags[tid] {
1030                let latest_ms = t.timing().latest() as f64;
1031                let remaining_deadline = latest_ms - elapsed;
1032                if remaining_deadline <= 0.0 {
1033                    return 0.0;
1034                }
1035                min_wait = min_wait.min(remaining_deadline);
1036            }
1037        }
1038
1039        min_wait
1040    }
1041}
1042
1043fn now_millis() -> u64 {
1044    std::time::SystemTime::now()
1045        .duration_since(std::time::UNIX_EPOCH)
1046        .unwrap_or_default()
1047        .as_millis() as u64
1048}
1049
1050/// Validates that the produced outputs satisfy the output spec.
1051#[allow(dead_code)]
1052fn validate_out_spec(out: &Out, produced_places: &HashSet<Arc<str>>) -> bool {
1053    match out {
1054        Out::Place(p) => produced_places.contains(p.name()),
1055        Out::And(children) => children
1056            .iter()
1057            .all(|c| validate_out_spec(c, produced_places)),
1058        Out::Xor(children) => children
1059            .iter()
1060            .any(|c| validate_out_spec(c, produced_places)),
1061        Out::Timeout { child, .. } => validate_out_spec(child, produced_places),
1062        Out::ForwardInput { to, .. } => produced_places.contains(to.name()),
1063    }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069    use libpetri_core::action::passthrough;
1070    use libpetri_core::input::one;
1071    use libpetri_core::output::out_place;
1072    use libpetri_core::place::Place;
1073    use libpetri_core::token::Token;
1074    use libpetri_core::transition::Transition;
1075    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1076
1077    fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1078        let p1 = Place::<i32>::new("p1");
1079        let p2 = Place::<i32>::new("p2");
1080        let p3 = Place::<i32>::new("p3");
1081
1082        let t1 = Transition::builder("t1")
1083            .input(one(&p1))
1084            .output(out_place(&p2))
1085            .action(passthrough())
1086            .build();
1087        let t2 = Transition::builder("t2")
1088            .input(one(&p2))
1089            .output(out_place(&p3))
1090            .action(passthrough())
1091            .build();
1092
1093        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1094        (net, p1, p2, p3)
1095    }
1096
1097    #[test]
1098    fn sync_passthrough_chain() {
1099        let (net, p1, _p2, _p3) = simple_chain();
1100
1101        let mut marking = Marking::new();
1102        marking.add(&p1, Token::at(42, 0));
1103
1104        let mut executor =
1105            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1106        let result = executor.run_sync();
1107
1108        // Token should flow p1 -> p2 -> p3
1109        // passthrough produces no outputs, so token stays consumed
1110        // Actually passthrough produces nothing, so p2 and p3 will be empty
1111        assert_eq!(result.count("p1"), 0);
1112    }
1113
1114    #[test]
1115    fn sync_with_event_store() {
1116        let (net, p1, _, _) = simple_chain();
1117
1118        let mut marking = Marking::new();
1119        marking.add(&p1, Token::at(42, 0));
1120
1121        let mut executor =
1122            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1123        executor.run_sync();
1124
1125        let store = executor.event_store();
1126        assert!(!store.is_empty());
1127    }
1128
1129    #[test]
1130    fn sync_fork_chain() {
1131        let p1 = Place::<i32>::new("p1");
1132        let p2 = Place::<i32>::new("p2");
1133        let p3 = Place::<i32>::new("p3");
1134
1135        let t1 = Transition::builder("t1")
1136            .input(one(&p1))
1137            .output(libpetri_core::output::and(vec![
1138                out_place(&p2),
1139                out_place(&p3),
1140            ]))
1141            .action(libpetri_core::action::fork())
1142            .build();
1143
1144        let net = PetriNet::builder("fork").transition(t1).build();
1145
1146        let mut marking = Marking::new();
1147        marking.add(&p1, Token::at(42, 0));
1148
1149        let mut executor =
1150            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1151        let result = executor.run_sync();
1152
1153        // Fork copies input to all outputs
1154        assert_eq!(result.count("p1"), 0);
1155        assert_eq!(result.count("p2"), 1);
1156        assert_eq!(result.count("p3"), 1);
1157    }
1158
1159    #[test]
1160    fn sync_no_initial_tokens() {
1161        let (net, _, _, _) = simple_chain();
1162        let marking = Marking::new();
1163        let mut executor =
1164            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1165        let result = executor.run_sync();
1166        assert_eq!(result.count("p1"), 0);
1167        assert_eq!(result.count("p2"), 0);
1168        assert_eq!(result.count("p3"), 0);
1169    }
1170
1171    #[test]
1172    fn sync_priority_ordering() {
1173        let p = Place::<()>::new("p");
1174        let out_a = Place::<()>::new("a");
1175        let out_b = Place::<()>::new("b");
1176
1177        // t_high has priority 10, t_low has priority 1
1178        // Both consume from p, but t_high should fire first
1179        let t_high = Transition::builder("t_high")
1180            .input(one(&p))
1181            .output(out_place(&out_a))
1182            .action(libpetri_core::action::passthrough())
1183            .priority(10)
1184            .build();
1185        let t_low = Transition::builder("t_low")
1186            .input(one(&p))
1187            .output(out_place(&out_b))
1188            .action(libpetri_core::action::passthrough())
1189            .priority(1)
1190            .build();
1191
1192        let net = PetriNet::builder("priority")
1193            .transitions([t_high, t_low])
1194            .build();
1195
1196        let mut marking = Marking::new();
1197        marking.add(&p, Token::at((), 0));
1198
1199        let mut executor =
1200            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1201        executor.run_sync();
1202
1203        // Only one token, high priority should have consumed it
1204        // Since passthrough doesn't produce output, both outputs empty
1205        // but p should be empty (consumed by the higher priority transition)
1206        assert_eq!(executor.marking().count("p"), 0);
1207    }
1208
1209    #[test]
1210    fn sync_inhibitor_blocks() {
1211        let p1 = Place::<()>::new("p1");
1212        let p2 = Place::<()>::new("p2");
1213        let p_inh = Place::<()>::new("inh");
1214
1215        let t = Transition::builder("t1")
1216            .input(one(&p1))
1217            .output(out_place(&p2))
1218            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1219            .action(libpetri_core::action::passthrough())
1220            .build();
1221
1222        let net = PetriNet::builder("inhibitor").transition(t).build();
1223
1224        let mut marking = Marking::new();
1225        marking.add(&p1, Token::at((), 0));
1226        marking.add(&p_inh, Token::at((), 0));
1227
1228        let mut executor =
1229            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1230        executor.run_sync();
1231
1232        // Inhibitor should block — token remains in p1
1233        assert_eq!(executor.marking().count("p1"), 1);
1234    }
1235
1236    #[test]
1237    fn sync_linear_chain_5() {
1238        // Build a chain: p0 -> t0 -> p1 -> t1 -> ... -> p5
1239        let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1240        let transitions: Vec<Transition> = (0..5)
1241            .map(|i| {
1242                Transition::builder(format!("t{i}"))
1243                    .input(one(&places[i]))
1244                    .output(out_place(&places[i + 1]))
1245                    .action(libpetri_core::action::fork())
1246                    .build()
1247            })
1248            .collect();
1249
1250        let net = PetriNet::builder("chain5").transitions(transitions).build();
1251
1252        let mut marking = Marking::new();
1253        marking.add(&places[0], Token::at(1, 0));
1254
1255        let mut executor =
1256            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1257        let result = executor.run_sync();
1258
1259        // Token should flow through all 5 transitions to p5
1260        assert_eq!(result.count("p0"), 0);
1261        assert_eq!(result.count("p5"), 1);
1262    }
1263
1264    #[test]
1265    fn input_arc_requires_token_to_enable() {
1266        let p1 = Place::<i32>::new("p1");
1267        let p2 = Place::<i32>::new("p2");
1268        let t = Transition::builder("t1")
1269            .input(one(&p1))
1270            .output(out_place(&p2))
1271            .action(libpetri_core::action::fork())
1272            .build();
1273        let net = PetriNet::builder("test").transition(t).build();
1274
1275        // No tokens — transition should not fire
1276        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1277            &net,
1278            Marking::new(),
1279            ExecutorOptions::default(),
1280        );
1281        executor.run_sync();
1282        assert_eq!(executor.marking().count("p1"), 0);
1283        assert_eq!(executor.marking().count("p2"), 0);
1284    }
1285
1286    #[test]
1287    fn multiple_input_arcs_require_all_tokens() {
1288        let p1 = Place::<i32>::new("p1");
1289        let p2 = Place::<i32>::new("p2");
1290        let p3 = Place::<i32>::new("p3");
1291
1292        let t = Transition::builder("t1")
1293            .input(one(&p1))
1294            .input(one(&p2))
1295            .output(out_place(&p3))
1296            .action(libpetri_core::action::sync_action(|ctx| {
1297                ctx.output("p3", 99i32)?;
1298                Ok(())
1299            }))
1300            .build();
1301        let net = PetriNet::builder("test").transition(t).build();
1302
1303        // Only p1 has a token — should not fire
1304        let mut marking = Marking::new();
1305        marking.add(&p1, Token::at(1, 0));
1306        let mut executor =
1307            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1308        executor.run_sync();
1309        assert_eq!(executor.marking().count("p1"), 1);
1310        assert_eq!(executor.marking().count("p3"), 0);
1311
1312        // Both p1 and p2 have tokens — should fire
1313        let mut marking = Marking::new();
1314        marking.add(&p1, Token::at(1, 0));
1315        marking.add(&p2, Token::at(2, 0));
1316        let mut executor =
1317            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1318        executor.run_sync();
1319        assert_eq!(executor.marking().count("p1"), 0);
1320        assert_eq!(executor.marking().count("p2"), 0);
1321        assert_eq!(executor.marking().count("p3"), 1);
1322    }
1323
1324    #[test]
1325    fn read_arc_does_not_consume() {
1326        let p_in = Place::<i32>::new("in");
1327        let p_ctx = Place::<i32>::new("ctx");
1328        let p_out = Place::<i32>::new("out");
1329
1330        let t = Transition::builder("t1")
1331            .input(one(&p_in))
1332            .read(libpetri_core::arc::read(&p_ctx))
1333            .output(out_place(&p_out))
1334            .action(libpetri_core::action::sync_action(|ctx| {
1335                let v = ctx.input::<i32>("in")?;
1336                let r = ctx.read::<i32>("ctx")?;
1337                ctx.output("out", *v + *r)?;
1338                Ok(())
1339            }))
1340            .build();
1341        let net = PetriNet::builder("test").transition(t).build();
1342
1343        let mut marking = Marking::new();
1344        marking.add(&p_in, Token::at(10, 0));
1345        marking.add(&p_ctx, Token::at(5, 0));
1346
1347        let mut executor =
1348            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1349        executor.run_sync();
1350
1351        assert_eq!(executor.marking().count("in"), 0); // consumed
1352        assert_eq!(executor.marking().count("ctx"), 1); // NOT consumed (read arc)
1353        assert_eq!(executor.marking().count("out"), 1);
1354    }
1355
1356    #[test]
1357    fn reset_arc_removes_all_tokens() {
1358        let p_in = Place::<()>::new("in");
1359        let p_reset = Place::<i32>::new("reset");
1360        let p_out = Place::<()>::new("out");
1361
1362        let t = Transition::builder("t1")
1363            .input(one(&p_in))
1364            .reset(libpetri_core::arc::reset(&p_reset))
1365            .output(out_place(&p_out))
1366            .action(libpetri_core::action::fork())
1367            .build();
1368        let net = PetriNet::builder("test").transition(t).build();
1369
1370        let mut marking = Marking::new();
1371        marking.add(&p_in, Token::at((), 0));
1372        marking.add(&p_reset, Token::at(1, 0));
1373        marking.add(&p_reset, Token::at(2, 0));
1374        marking.add(&p_reset, Token::at(3, 0));
1375
1376        let mut executor =
1377            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1378        executor.run_sync();
1379
1380        assert_eq!(executor.marking().count("reset"), 0); // all cleared
1381        assert_eq!(executor.marking().count("out"), 1);
1382    }
1383
1384    #[test]
1385    fn exactly_cardinality_consumes_n() {
1386        let p = Place::<i32>::new("p");
1387        let p_out = Place::<i32>::new("out");
1388
1389        let t = Transition::builder("t1")
1390            .input(libpetri_core::input::exactly(3, &p))
1391            .output(out_place(&p_out))
1392            .action(libpetri_core::action::sync_action(|ctx| {
1393                let vals = ctx.inputs::<i32>("p")?;
1394                for v in vals {
1395                    ctx.output("out", *v)?;
1396                }
1397                Ok(())
1398            }))
1399            .build();
1400        let net = PetriNet::builder("test").transition(t).build();
1401
1402        let mut marking = Marking::new();
1403        for i in 0..5 {
1404            marking.add(&p, Token::at(i, 0));
1405        }
1406
1407        let mut executor =
1408            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1409        executor.run_sync();
1410
1411        // Consumed 3 of 5, produced 3
1412        assert_eq!(executor.marking().count("p"), 2);
1413        assert_eq!(executor.marking().count("out"), 3);
1414    }
1415
1416    #[test]
1417    fn all_cardinality_consumes_everything() {
1418        let p = Place::<i32>::new("p");
1419        let p_out = Place::<()>::new("out");
1420
1421        let t = Transition::builder("t1")
1422            .input(libpetri_core::input::all(&p))
1423            .output(out_place(&p_out))
1424            .action(libpetri_core::action::sync_action(|ctx| {
1425                let vals = ctx.inputs::<i32>("p")?;
1426                ctx.output("out", vals.len() as i32)?;
1427                Ok(())
1428            }))
1429            .build();
1430        let net = PetriNet::builder("test").transition(t).build();
1431
1432        let mut marking = Marking::new();
1433        for i in 0..5 {
1434            marking.add(&p, Token::at(i, 0));
1435        }
1436
1437        let mut executor =
1438            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1439        executor.run_sync();
1440
1441        assert_eq!(executor.marking().count("p"), 0);
1442    }
1443
1444    #[test]
1445    fn at_least_blocks_insufficient() {
1446        let p = Place::<i32>::new("p");
1447        let p_out = Place::<()>::new("out");
1448
1449        let t = Transition::builder("t1")
1450            .input(libpetri_core::input::at_least(3, &p))
1451            .output(out_place(&p_out))
1452            .action(libpetri_core::action::passthrough())
1453            .build();
1454        let net = PetriNet::builder("test").transition(t).build();
1455
1456        // Only 2 tokens, need 3+
1457        let mut marking = Marking::new();
1458        marking.add(&p, Token::at(1, 0));
1459        marking.add(&p, Token::at(2, 0));
1460
1461        let mut executor =
1462            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1463        executor.run_sync();
1464
1465        assert_eq!(executor.marking().count("p"), 2); // not consumed
1466    }
1467
1468    #[test]
1469    fn at_least_fires_with_enough_and_consumes_all() {
1470        let p = Place::<i32>::new("p");
1471        let p_out = Place::<()>::new("out");
1472
1473        let t = Transition::builder("t1")
1474            .input(libpetri_core::input::at_least(3, &p))
1475            .output(out_place(&p_out))
1476            .action(libpetri_core::action::passthrough())
1477            .build();
1478        let net = PetriNet::builder("test").transition(t).build();
1479
1480        let mut marking = Marking::new();
1481        for i in 0..5 {
1482            marking.add(&p, Token::at(i, 0));
1483        }
1484
1485        let mut executor =
1486            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1487        executor.run_sync();
1488
1489        assert_eq!(executor.marking().count("p"), 0); // all consumed
1490    }
1491
1492    #[test]
1493    fn guarded_input_only_consumes_matching() {
1494        let p = Place::<i32>::new("p");
1495        let p_out = Place::<i32>::new("out");
1496
1497        let t = Transition::builder("t1")
1498            .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1499            .output(out_place(&p_out))
1500            .action(libpetri_core::action::fork())
1501            .build();
1502        let net = PetriNet::builder("test").transition(t).build();
1503
1504        let mut marking = Marking::new();
1505        marking.add(&p, Token::at(3, 0)); // doesn't match guard
1506        marking.add(&p, Token::at(10, 0)); // matches
1507
1508        let mut executor =
1509            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1510        executor.run_sync();
1511
1512        assert_eq!(executor.marking().count("p"), 1); // 3 remains
1513        assert_eq!(executor.marking().count("out"), 1); // 10 forwarded
1514        let peeked = executor.marking().peek(&p_out).unwrap();
1515        assert_eq!(*peeked, 10);
1516    }
1517
1518    #[test]
1519    fn guarded_input_blocks_when_no_match() {
1520        let p = Place::<i32>::new("p");
1521        let p_out = Place::<i32>::new("out");
1522
1523        let t = Transition::builder("t1")
1524            .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1525            .output(out_place(&p_out))
1526            .action(libpetri_core::action::fork())
1527            .build();
1528        let net = PetriNet::builder("test").transition(t).build();
1529
1530        let mut marking = Marking::new();
1531        marking.add(&p, Token::at(3, 0));
1532        marking.add(&p, Token::at(10, 0));
1533
1534        let mut executor =
1535            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1536        executor.run_sync();
1537
1538        // Nothing matches guard, transition does not fire
1539        assert_eq!(executor.marking().count("p"), 2);
1540        assert_eq!(executor.marking().count("out"), 0);
1541    }
1542
1543    #[test]
1544    fn transform_action_outputs_to_all_places() {
1545        let p_in = Place::<i32>::new("in");
1546        let p_a = Place::<i32>::new("a");
1547        let p_b = Place::<i32>::new("b");
1548
1549        let t = Transition::builder("t1")
1550            .input(one(&p_in))
1551            .output(libpetri_core::output::and(vec![
1552                out_place(&p_a),
1553                out_place(&p_b),
1554            ]))
1555            .action(libpetri_core::action::transform(|ctx| {
1556                let v = ctx.input::<i32>("in").unwrap();
1557                Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
1558            }))
1559            .build();
1560        let net = PetriNet::builder("test").transition(t).build();
1561
1562        let mut marking = Marking::new();
1563        marking.add(&p_in, Token::at(5, 0));
1564
1565        let mut executor =
1566            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1567        executor.run_sync();
1568
1569        assert_eq!(executor.marking().count("a"), 1);
1570        assert_eq!(executor.marking().count("b"), 1);
1571        assert_eq!(*executor.marking().peek(&p_a).unwrap(), 10);
1572        assert_eq!(*executor.marking().peek(&p_b).unwrap(), 10);
1573    }
1574
1575    #[test]
1576    fn sync_action_custom_logic() {
1577        let p_in = Place::<i32>::new("in");
1578        let p_out = Place::<String>::new("out");
1579
1580        let t = Transition::builder("t1")
1581            .input(one(&p_in))
1582            .output(out_place(&p_out))
1583            .action(libpetri_core::action::sync_action(|ctx| {
1584                let v = ctx.input::<i32>("in")?;
1585                ctx.output("out", format!("value={v}"))?;
1586                Ok(())
1587            }))
1588            .build();
1589        let net = PetriNet::builder("test").transition(t).build();
1590
1591        let mut marking = Marking::new();
1592        marking.add(&p_in, Token::at(42, 0));
1593
1594        let mut executor =
1595            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1596        executor.run_sync();
1597
1598        assert_eq!(executor.marking().count("out"), 1);
1599        let peeked = executor.marking().peek(&p_out).unwrap();
1600        assert_eq!(*peeked, "value=42");
1601    }
1602
1603    #[test]
1604    fn action_error_does_not_crash() {
1605        let p_in = Place::<i32>::new("in");
1606        let p_out = Place::<i32>::new("out");
1607
1608        let t = Transition::builder("t1")
1609            .input(one(&p_in))
1610            .output(out_place(&p_out))
1611            .action(libpetri_core::action::sync_action(|_ctx| {
1612                Err(libpetri_core::action::ActionError::new(
1613                    "intentional failure",
1614                ))
1615            }))
1616            .build();
1617        let net = PetriNet::builder("test").transition(t).build();
1618
1619        let mut marking = Marking::new();
1620        marking.add(&p_in, Token::at(42, 0));
1621
1622        let mut executor =
1623            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1624        executor.run_sync();
1625
1626        // Token consumed even though action failed
1627        assert_eq!(executor.marking().count("in"), 0);
1628        assert_eq!(executor.marking().count("out"), 0);
1629
1630        // Failure event should be recorded
1631        let events = executor.event_store().events();
1632        assert!(
1633            events
1634                .iter()
1635                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
1636        );
1637    }
1638
1639    #[test]
1640    fn event_store_records_lifecycle() {
1641        let p1 = Place::<i32>::new("p1");
1642        let p2 = Place::<i32>::new("p2");
1643        let t = Transition::builder("t1")
1644            .input(one(&p1))
1645            .output(out_place(&p2))
1646            .action(libpetri_core::action::fork())
1647            .build();
1648        let net = PetriNet::builder("test").transition(t).build();
1649
1650        let mut marking = Marking::new();
1651        marking.add(&p1, Token::at(1, 0));
1652
1653        let mut executor =
1654            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1655        executor.run_sync();
1656
1657        let events = executor.event_store().events();
1658
1659        // Should have: ExecutionStarted, TransitionEnabled, TokenRemoved, TransitionStarted,
1660        // TokenAdded, TransitionCompleted, ExecutionCompleted
1661        assert!(
1662            events
1663                .iter()
1664                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
1665        );
1666        assert!(
1667            events
1668                .iter()
1669                .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
1670        );
1671        assert!(
1672            events
1673                .iter()
1674                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
1675        );
1676        assert!(
1677            events
1678                .iter()
1679                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
1680        );
1681        assert!(
1682            events
1683                .iter()
1684                .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
1685        );
1686        assert!(
1687            events
1688                .iter()
1689                .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
1690        );
1691        assert!(
1692            events
1693                .iter()
1694                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
1695        );
1696    }
1697
1698    #[test]
1699    fn noop_event_store_has_no_events() {
1700        let p1 = Place::<i32>::new("p1");
1701        let p2 = Place::<i32>::new("p2");
1702        let t = Transition::builder("t1")
1703            .input(one(&p1))
1704            .output(out_place(&p2))
1705            .action(libpetri_core::action::fork())
1706            .build();
1707        let net = PetriNet::builder("test").transition(t).build();
1708
1709        let mut marking = Marking::new();
1710        marking.add(&p1, Token::at(1, 0));
1711
1712        let mut executor =
1713            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1714        executor.run_sync();
1715
1716        assert!(executor.event_store().is_empty());
1717    }
1718
1719    #[test]
1720    fn empty_net_completes() {
1721        let net = PetriNet::builder("empty").build();
1722        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1723            &net,
1724            Marking::new(),
1725            ExecutorOptions::default(),
1726        );
1727        executor.run_sync();
1728        assert!(executor.is_quiescent());
1729    }
1730
1731    #[test]
1732    fn single_transition_fires_once() {
1733        let p = Place::<i32>::new("p");
1734        let out = Place::<i32>::new("out");
1735        let t = Transition::builder("t1")
1736            .input(one(&p))
1737            .output(out_place(&out))
1738            .action(libpetri_core::action::fork())
1739            .build();
1740        let net = PetriNet::builder("test").transition(t).build();
1741
1742        let mut marking = Marking::new();
1743        marking.add(&p, Token::at(42, 0));
1744
1745        let mut executor =
1746            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1747        executor.run_sync();
1748
1749        assert_eq!(executor.marking().count("p"), 0);
1750        assert_eq!(executor.marking().count("out"), 1);
1751    }
1752
1753    #[test]
1754    fn many_tokens_all_processed() {
1755        let p = Place::<i32>::new("p");
1756        let out = Place::<i32>::new("out");
1757        let t = Transition::builder("t1")
1758            .input(one(&p))
1759            .output(out_place(&out))
1760            .action(libpetri_core::action::fork())
1761            .build();
1762        let net = PetriNet::builder("test").transition(t).build();
1763
1764        let mut marking = Marking::new();
1765        for i in 0..100 {
1766            marking.add(&p, Token::at(i, 0));
1767        }
1768
1769        let mut executor =
1770            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1771        executor.run_sync();
1772
1773        assert_eq!(executor.marking().count("p"), 0);
1774        assert_eq!(executor.marking().count("out"), 100);
1775    }
1776
1777    #[test]
1778    fn input_fifo_ordering() {
1779        let p = Place::<i32>::new("p");
1780        let out = Place::<i32>::new("out");
1781        let t = Transition::builder("t1")
1782            .input(one(&p))
1783            .output(out_place(&out))
1784            .action(libpetri_core::action::fork())
1785            .build();
1786        let net = PetriNet::builder("test").transition(t).build();
1787
1788        let mut marking = Marking::new();
1789        marking.add(&p, Token::at(1, 0));
1790        marking.add(&p, Token::at(2, 0));
1791        marking.add(&p, Token::at(3, 0));
1792
1793        let mut executor =
1794            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1795        executor.run_sync();
1796
1797        // All processed, first one should have been consumed first
1798        assert_eq!(executor.marking().count("out"), 3);
1799    }
1800
1801    #[test]
1802    fn inhibitor_unblocked_when_token_removed() {
1803        // p1 has token, p_inh has token (blocks t1)
1804        // t_clear removes from p_inh, then t1 can fire
1805        let p1 = Place::<()>::new("p1");
1806        let p_inh = Place::<()>::new("inh");
1807        let p_out = Place::<()>::new("out");
1808        let p_trigger = Place::<()>::new("trigger");
1809
1810        // t_clear: consumes from inh, outputs to trigger
1811        let t_clear = Transition::builder("t_clear")
1812            .input(one(&p_inh))
1813            .output(out_place(&p_trigger))
1814            .action(libpetri_core::action::fork())
1815            .priority(10) // higher priority fires first
1816            .build();
1817
1818        // t1: consumes from p1, inhibited by inh
1819        let t1 = Transition::builder("t1")
1820            .input(one(&p1))
1821            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1822            .output(out_place(&p_out))
1823            .action(libpetri_core::action::fork())
1824            .priority(1)
1825            .build();
1826
1827        let net = PetriNet::builder("test").transitions([t_clear, t1]).build();
1828
1829        let mut marking = Marking::new();
1830        marking.add(&p1, Token::at((), 0));
1831        marking.add(&p_inh, Token::at((), 0));
1832
1833        let mut executor =
1834            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1835        executor.run_sync();
1836
1837        // t_clear fires first (priority 10), removes inh token
1838        // then t1 fires (no longer inhibited)
1839        assert_eq!(executor.marking().count("inh"), 0);
1840        assert_eq!(executor.marking().count("p1"), 0);
1841        assert_eq!(executor.marking().count("out"), 1);
1842    }
1843
1844    #[test]
1845    fn combined_input_read_reset() {
1846        let p_in = Place::<i32>::new("in");
1847        let p_ctx = Place::<String>::new("ctx");
1848        let p_clear = Place::<i32>::new("clear");
1849        let p_out = Place::<String>::new("out");
1850
1851        let t = Transition::builder("t1")
1852            .input(one(&p_in))
1853            .read(libpetri_core::arc::read(&p_ctx))
1854            .reset(libpetri_core::arc::reset(&p_clear))
1855            .output(out_place(&p_out))
1856            .action(libpetri_core::action::sync_action(|ctx| {
1857                let v = ctx.input::<i32>("in")?;
1858                let r = ctx.read::<String>("ctx")?;
1859                ctx.output("out", format!("{v}-{r}"))?;
1860                Ok(())
1861            }))
1862            .build();
1863        let net = PetriNet::builder("test").transition(t).build();
1864
1865        let mut marking = Marking::new();
1866        marking.add(&p_in, Token::at(42, 0));
1867        marking.add(&p_ctx, Token::at("hello".to_string(), 0));
1868        marking.add(&p_clear, Token::at(1, 0));
1869        marking.add(&p_clear, Token::at(2, 0));
1870
1871        let mut executor =
1872            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1873        executor.run_sync();
1874
1875        assert_eq!(executor.marking().count("in"), 0); // consumed
1876        assert_eq!(executor.marking().count("ctx"), 1); // read, not consumed
1877        assert_eq!(executor.marking().count("clear"), 0); // reset
1878        assert_eq!(executor.marking().count("out"), 1);
1879        let peeked = executor.marking().peek(&p_out).unwrap();
1880        assert_eq!(*peeked, "42-hello");
1881    }
1882
1883    #[test]
1884    fn workflow_sequential_chain() {
1885        // p1 -> t1 -> p2 -> t2 -> p3 -> t3 -> p4
1886        // Each transition doubles the value
1887        let p1 = Place::<i32>::new("p1");
1888        let p2 = Place::<i32>::new("p2");
1889        let p3 = Place::<i32>::new("p3");
1890        let p4 = Place::<i32>::new("p4");
1891
1892        let make_doubler = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
1893            let out_name: Arc<str> = Arc::from(outp.name());
1894            Transition::builder(name)
1895                .input(one(inp))
1896                .output(out_place(outp))
1897                .action(libpetri_core::action::sync_action(move |ctx| {
1898                    let v = ctx
1899                        .input::<i32>("p1")
1900                        .or_else(|_| ctx.input::<i32>("p2"))
1901                        .or_else(|_| ctx.input::<i32>("p3"))
1902                        .unwrap();
1903                    ctx.output(&out_name, *v * 2)?;
1904                    Ok(())
1905                }))
1906                .build()
1907        };
1908
1909        let t1 = make_doubler("t1", &p1, &p2);
1910        let t2 = make_doubler("t2", &p2, &p3);
1911        let t3 = make_doubler("t3", &p3, &p4);
1912
1913        let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
1914
1915        let mut marking = Marking::new();
1916        marking.add(&p1, Token::at(1, 0));
1917
1918        let mut executor =
1919            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1920        executor.run_sync();
1921
1922        assert_eq!(executor.marking().count("p4"), 1);
1923        assert_eq!(*executor.marking().peek(&p4).unwrap(), 8); // 1 * 2 * 2 * 2
1924    }
1925
1926    #[test]
1927    fn workflow_fork_join() {
1928        // p_start -> t_fork -> p_a, p_b -> t_join -> p_end
1929        let p_start = Place::<i32>::new("start");
1930        let p_a = Place::<i32>::new("a");
1931        let p_b = Place::<i32>::new("b");
1932        let p_end = Place::<i32>::new("end");
1933
1934        let t_fork = Transition::builder("fork")
1935            .input(one(&p_start))
1936            .output(libpetri_core::output::and(vec![
1937                out_place(&p_a),
1938                out_place(&p_b),
1939            ]))
1940            .action(libpetri_core::action::fork())
1941            .build();
1942
1943        let t_join = Transition::builder("join")
1944            .input(one(&p_a))
1945            .input(one(&p_b))
1946            .output(out_place(&p_end))
1947            .action(libpetri_core::action::sync_action(|ctx| {
1948                let a = ctx.input::<i32>("a")?;
1949                let b = ctx.input::<i32>("b")?;
1950                ctx.output("end", *a + *b)?;
1951                Ok(())
1952            }))
1953            .build();
1954
1955        let net = PetriNet::builder("fork-join")
1956            .transitions([t_fork, t_join])
1957            .build();
1958
1959        let mut marking = Marking::new();
1960        marking.add(&p_start, Token::at(5, 0));
1961
1962        let mut executor =
1963            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1964        executor.run_sync();
1965
1966        assert_eq!(executor.marking().count("start"), 0);
1967        assert_eq!(executor.marking().count("a"), 0);
1968        assert_eq!(executor.marking().count("b"), 0);
1969        assert_eq!(executor.marking().count("end"), 1);
1970        assert_eq!(*executor.marking().peek(&p_end).unwrap(), 10); // 5 + 5
1971    }
1972
1973    #[test]
1974    fn workflow_mutual_exclusion() {
1975        // Two workers compete for a mutex token
1976        let p_mutex = Place::<()>::new("mutex");
1977        let p_w1 = Place::<()>::new("w1");
1978        let p_w2 = Place::<()>::new("w2");
1979        let p_done1 = Place::<()>::new("done1");
1980        let p_done2 = Place::<()>::new("done2");
1981
1982        let t_w1 = Transition::builder("work1")
1983            .input(one(&p_w1))
1984            .input(one(&p_mutex))
1985            .output(libpetri_core::output::and(vec![
1986                out_place(&p_done1),
1987                out_place(&p_mutex), // return mutex
1988            ]))
1989            .action(libpetri_core::action::sync_action(|ctx| {
1990                ctx.output("done1", ())?;
1991                ctx.output("mutex", ())?;
1992                Ok(())
1993            }))
1994            .build();
1995
1996        let t_w2 = Transition::builder("work2")
1997            .input(one(&p_w2))
1998            .input(one(&p_mutex))
1999            .output(libpetri_core::output::and(vec![
2000                out_place(&p_done2),
2001                out_place(&p_mutex), // return mutex
2002            ]))
2003            .action(libpetri_core::action::sync_action(|ctx| {
2004                ctx.output("done2", ())?;
2005                ctx.output("mutex", ())?;
2006                Ok(())
2007            }))
2008            .build();
2009
2010        let net = PetriNet::builder("mutex").transitions([t_w1, t_w2]).build();
2011
2012        let mut marking = Marking::new();
2013        marking.add(&p_mutex, Token::at((), 0));
2014        marking.add(&p_w1, Token::at((), 0));
2015        marking.add(&p_w2, Token::at((), 0));
2016
2017        let mut executor =
2018            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2019        executor.run_sync();
2020
2021        // Both should complete, mutex should be returned
2022        assert_eq!(executor.marking().count("done1"), 1);
2023        assert_eq!(executor.marking().count("done2"), 1);
2024        assert_eq!(executor.marking().count("mutex"), 1); // returned
2025    }
2026
2027    #[test]
2028    fn produce_action() {
2029        let p_in = Place::<()>::new("in");
2030        let p_out = Place::<String>::new("out");
2031
2032        let t = Transition::builder("t1")
2033            .input(one(&p_in))
2034            .output(out_place(&p_out))
2035            .action(libpetri_core::action::produce(
2036                Arc::from("out"),
2037                "produced_value".to_string(),
2038            ))
2039            .build();
2040        let net = PetriNet::builder("test").transition(t).build();
2041
2042        let mut marking = Marking::new();
2043        marking.add(&p_in, Token::at((), 0));
2044
2045        let mut executor =
2046            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2047        executor.run_sync();
2048
2049        assert_eq!(executor.marking().count("out"), 1);
2050    }
2051
2052    #[test]
2053    fn xor_output_fires_one_branch() {
2054        let p = Place::<i32>::new("p");
2055        let a = Place::<i32>::new("a");
2056        let b = Place::<i32>::new("b");
2057
2058        let t = Transition::builder("t1")
2059            .input(one(&p))
2060            .output(libpetri_core::output::xor(vec![
2061                out_place(&a),
2062                out_place(&b),
2063            ]))
2064            .action(libpetri_core::action::sync_action(|ctx| {
2065                let v = ctx.input::<i32>("p")?;
2066                // Output to first branch place
2067                ctx.output("a", *v)?;
2068                Ok(())
2069            }))
2070            .build();
2071        let net = PetriNet::builder("test").transition(t).build();
2072
2073        let mut marking = Marking::new();
2074        marking.add(&p, Token::at(1, 0));
2075
2076        let mut executor =
2077            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2078        executor.run_sync();
2079
2080        // XOR: token goes to one branch
2081        let total = executor.marking().count("a") + executor.marking().count("b");
2082        assert!(total >= 1);
2083    }
2084
2085    #[test]
2086    fn and_output_fires_to_all() {
2087        let p = Place::<i32>::new("p");
2088        let a = Place::<i32>::new("a");
2089        let b = Place::<i32>::new("b");
2090        let c = Place::<i32>::new("c");
2091
2092        let t = Transition::builder("t1")
2093            .input(one(&p))
2094            .output(libpetri_core::output::and(vec![
2095                out_place(&a),
2096                out_place(&b),
2097                out_place(&c),
2098            ]))
2099            .action(libpetri_core::action::sync_action(|ctx| {
2100                let v = ctx.input::<i32>("p")?;
2101                ctx.output("a", *v)?;
2102                ctx.output("b", *v * 10)?;
2103                ctx.output("c", *v * 100)?;
2104                Ok(())
2105            }))
2106            .build();
2107        let net = PetriNet::builder("test").transition(t).build();
2108
2109        let mut marking = Marking::new();
2110        marking.add(&p, Token::at(1, 0));
2111
2112        let mut executor =
2113            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2114        executor.run_sync();
2115
2116        assert_eq!(executor.marking().count("a"), 1);
2117        assert_eq!(executor.marking().count("b"), 1);
2118        assert_eq!(executor.marking().count("c"), 1);
2119        assert_eq!(*executor.marking().peek(&a).unwrap(), 1);
2120        assert_eq!(*executor.marking().peek(&b).unwrap(), 10);
2121        assert_eq!(*executor.marking().peek(&c).unwrap(), 100);
2122    }
2123
2124    #[test]
2125    fn transition_with_no_output_consumes_only() {
2126        let p = Place::<i32>::new("p");
2127
2128        let t = Transition::builder("t1")
2129            .input(one(&p))
2130            .action(libpetri_core::action::sync_action(|ctx| {
2131                let _ = ctx.input::<i32>("p")?;
2132                Ok(())
2133            }))
2134            .build();
2135        let net = PetriNet::builder("test").transition(t).build();
2136
2137        let mut marking = Marking::new();
2138        marking.add(&p, Token::at(42, 0));
2139
2140        let mut executor =
2141            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2142        executor.run_sync();
2143
2144        assert_eq!(executor.marking().count("p"), 0);
2145    }
2146
2147    #[test]
2148    fn multiple_transitions_same_input_compete() {
2149        // Two transitions both need p1 — only one should fire per token
2150        let p1 = Place::<()>::new("p1");
2151        let out_a = Place::<()>::new("a");
2152        let out_b = Place::<()>::new("b");
2153
2154        let t_a = Transition::builder("ta")
2155            .input(one(&p1))
2156            .output(out_place(&out_a))
2157            .action(libpetri_core::action::fork())
2158            .build();
2159        let t_b = Transition::builder("tb")
2160            .input(one(&p1))
2161            .output(out_place(&out_b))
2162            .action(libpetri_core::action::fork())
2163            .build();
2164
2165        let net = PetriNet::builder("test").transitions([t_a, t_b]).build();
2166
2167        let mut marking = Marking::new();
2168        marking.add(&p1, Token::at((), 0));
2169
2170        let mut executor =
2171            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2172        executor.run_sync();
2173
2174        // One token, two competing transitions — exactly one fires
2175        let total = executor.marking().count("a") + executor.marking().count("b");
2176        assert_eq!(total, 1);
2177        assert_eq!(executor.marking().count("p1"), 0);
2178    }
2179
2180    #[test]
2181    fn priority_higher_fires_first() {
2182        let p = Place::<()>::new("p");
2183        let out_hi = Place::<()>::new("hi");
2184        let out_lo = Place::<()>::new("lo");
2185
2186        let t_hi = Transition::builder("hi")
2187            .input(one(&p))
2188            .output(out_place(&out_hi))
2189            .action(libpetri_core::action::fork())
2190            .priority(10)
2191            .build();
2192        let t_lo = Transition::builder("lo")
2193            .input(one(&p))
2194            .output(out_place(&out_lo))
2195            .action(libpetri_core::action::fork())
2196            .priority(1)
2197            .build();
2198
2199        let net = PetriNet::builder("test").transitions([t_hi, t_lo]).build();
2200
2201        let mut marking = Marking::new();
2202        marking.add(&p, Token::at((), 0));
2203
2204        let mut executor =
2205            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2206        executor.run_sync();
2207
2208        // Higher priority should win
2209        assert_eq!(executor.marking().count("hi"), 1);
2210        assert_eq!(executor.marking().count("lo"), 0);
2211    }
2212
2213    #[test]
2214    fn quiescent_when_no_enabled_transitions() {
2215        let p = Place::<i32>::new("p");
2216        let out = Place::<i32>::new("out");
2217
2218        let t = Transition::builder("t1")
2219            .input(one(&p))
2220            .output(out_place(&out))
2221            .action(libpetri_core::action::fork())
2222            .build();
2223        let net = PetriNet::builder("test").transition(t).build();
2224
2225        let mut marking = Marking::new();
2226        marking.add(&p, Token::at(1, 0));
2227
2228        let mut executor =
2229            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2230        executor.run_sync();
2231
2232        assert!(executor.is_quiescent());
2233    }
2234
2235    #[test]
2236    fn event_store_transition_enabled_disabled() {
2237        let p1 = Place::<()>::new("p1");
2238        let p2 = Place::<()>::new("p2");
2239
2240        let t = Transition::builder("t1")
2241            .input(one(&p1))
2242            .output(out_place(&p2))
2243            .action(libpetri_core::action::fork())
2244            .build();
2245        let net = PetriNet::builder("test").transition(t).build();
2246
2247        let mut marking = Marking::new();
2248        marking.add(&p1, Token::at((), 0));
2249
2250        let mut executor =
2251            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2252        executor.run_sync();
2253
2254        let events = executor.event_store().events();
2255        // Should have at least: ExecutionStarted, TransitionEnabled, TransitionStarted,
2256        // TokenAdded, TransitionCompleted, ExecutionCompleted
2257        assert!(events.len() >= 4);
2258
2259        // Check for TransitionEnabled event
2260        let has_enabled = events.iter().any(|e| {
2261            matches!(e, NetEvent::TransitionEnabled { transition_name, .. } if transition_name.as_ref() == "t1")
2262        });
2263        assert!(has_enabled);
2264    }
2265
2266    #[test]
2267    fn diamond_pattern() {
2268        // p1 -> t1 -> p2, p3 -> t2 -> p4 (from p2), t3 -> p4 (from p3)
2269        let p1 = Place::<()>::new("p1");
2270        let p2 = Place::<()>::new("p2");
2271        let p3 = Place::<()>::new("p3");
2272        let p4 = Place::<()>::new("p4");
2273
2274        let t1 = Transition::builder("t1")
2275            .input(one(&p1))
2276            .output(libpetri_core::output::and(vec![
2277                out_place(&p2),
2278                out_place(&p3),
2279            ]))
2280            .action(libpetri_core::action::fork())
2281            .build();
2282        let t2 = Transition::builder("t2")
2283            .input(one(&p2))
2284            .output(out_place(&p4))
2285            .action(libpetri_core::action::fork())
2286            .build();
2287        let t3 = Transition::builder("t3")
2288            .input(one(&p3))
2289            .output(out_place(&p4))
2290            .action(libpetri_core::action::fork())
2291            .build();
2292
2293        let net = PetriNet::builder("diamond")
2294            .transitions([t1, t2, t3])
2295            .build();
2296
2297        let mut marking = Marking::new();
2298        marking.add(&p1, Token::at((), 0));
2299
2300        let mut executor =
2301            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2302        executor.run_sync();
2303
2304        assert_eq!(executor.marking().count("p4"), 2); // both branches produce to p4
2305    }
2306
2307    #[test]
2308    fn self_loop_with_guard_terminates() {
2309        // Transition that loops back, but only fires when value > 0
2310        // Decrements each time, terminates when 0
2311        let p = Place::<i32>::new("p");
2312        let done = Place::<i32>::new("done");
2313
2314        let t = Transition::builder("dec")
2315            .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v > 0))
2316            .output(out_place(&p))
2317            .action(libpetri_core::action::sync_action(|ctx| {
2318                let v = ctx.input::<i32>("p")?;
2319                ctx.output("p", *v - 1)?;
2320                Ok(())
2321            }))
2322            .build();
2323
2324        // When value hits 0, this transition moves it to done
2325        let t_done = Transition::builder("finish")
2326            .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v == 0))
2327            .output(out_place(&done))
2328            .action(libpetri_core::action::fork())
2329            .build();
2330
2331        let net = PetriNet::builder("countdown")
2332            .transitions([t, t_done])
2333            .build();
2334
2335        let mut marking = Marking::new();
2336        marking.add(&p, Token::at(3, 0));
2337
2338        let mut executor =
2339            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2340        executor.run_sync();
2341
2342        assert_eq!(executor.marking().count("done"), 1);
2343        assert_eq!(*executor.marking().peek(&done).unwrap(), 0);
2344    }
2345
2346    #[test]
2347    fn multiple_tokens_different_types() {
2348        let p_int = Place::<i32>::new("ints");
2349        let p_str = Place::<String>::new("strs");
2350        let p_out = Place::<String>::new("out");
2351
2352        let t = Transition::builder("combine")
2353            .input(one(&p_int))
2354            .input(one(&p_str))
2355            .output(out_place(&p_out))
2356            .action(libpetri_core::action::sync_action(|ctx| {
2357                let i = ctx.input::<i32>("ints")?;
2358                let s = ctx.input::<String>("strs")?;
2359                ctx.output("out", format!("{s}-{i}"))?;
2360                Ok(())
2361            }))
2362            .build();
2363
2364        let net = PetriNet::builder("test").transition(t).build();
2365
2366        let mut marking = Marking::new();
2367        marking.add(&p_int, Token::at(42, 0));
2368        marking.add(&p_str, Token::at("hello".to_string(), 0));
2369
2370        let mut executor =
2371            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2372        executor.run_sync();
2373
2374        assert_eq!(*executor.marking().peek(&p_out).unwrap(), "hello-42");
2375    }
2376}
2377
2378#[cfg(all(test, feature = "tokio"))]
2379mod async_tests {
2380    use super::*;
2381    use crate::environment::ExternalEvent;
2382    use libpetri_core::action::{ActionError, async_action, fork};
2383    use libpetri_core::input::one;
2384    use libpetri_core::output::out_place;
2385    use libpetri_core::place::Place;
2386    use libpetri_core::token::{ErasedToken, Token};
2387    use libpetri_core::transition::Transition;
2388    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
2389
2390    #[tokio::test]
2391    async fn async_fork_single_transition() {
2392        let p1 = Place::<i32>::new("p1");
2393        let p2 = Place::<i32>::new("p2");
2394
2395        let t1 = Transition::builder("t1")
2396            .input(one(&p1))
2397            .output(out_place(&p2))
2398            .action(fork())
2399            .build();
2400
2401        let net = PetriNet::builder("test").transition(t1).build();
2402        let mut marking = Marking::new();
2403        marking.add(&p1, Token::at(42, 0));
2404
2405        let mut executor =
2406            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2407
2408        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2409        executor.run_async(rx).await;
2410
2411        assert_eq!(executor.marking().count("p2"), 1);
2412        assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
2413    }
2414
2415    #[tokio::test]
2416    async fn async_action_produces_output() {
2417        let p1 = Place::<i32>::new("p1");
2418        let p2 = Place::<i32>::new("p2");
2419
2420        let action = async_action(|mut ctx| async move {
2421            let val: i32 = *ctx.input::<i32>("p1")?;
2422            ctx.output("p2", val * 10)?;
2423            Ok(ctx)
2424        });
2425
2426        let t1 = Transition::builder("t1")
2427            .input(one(&p1))
2428            .output(out_place(&p2))
2429            .action(action)
2430            .build();
2431
2432        let net = PetriNet::builder("test").transition(t1).build();
2433        let mut marking = Marking::new();
2434        marking.add(&p1, Token::at(5, 0));
2435
2436        let mut executor =
2437            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2438
2439        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2440        executor.run_async(rx).await;
2441
2442        assert_eq!(executor.marking().count("p2"), 1);
2443        assert_eq!(*executor.marking().peek(&p2).unwrap(), 50);
2444    }
2445
2446    #[tokio::test]
2447    async fn async_chain_two_transitions() {
2448        let p1 = Place::<i32>::new("p1");
2449        let p2 = Place::<i32>::new("p2");
2450        let p3 = Place::<i32>::new("p3");
2451
2452        let t1 = Transition::builder("t1")
2453            .input(one(&p1))
2454            .output(out_place(&p2))
2455            .action(fork())
2456            .build();
2457
2458        let t2 = Transition::builder("t2")
2459            .input(one(&p2))
2460            .output(out_place(&p3))
2461            .action(fork())
2462            .build();
2463
2464        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
2465        let mut marking = Marking::new();
2466        marking.add(&p1, Token::at(99, 0));
2467
2468        let mut executor =
2469            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2470
2471        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2472        executor.run_async(rx).await;
2473
2474        assert_eq!(executor.marking().count("p3"), 1);
2475        assert_eq!(*executor.marking().peek(&p3).unwrap(), 99);
2476    }
2477
2478    #[tokio::test]
2479    async fn async_event_injection() {
2480        let p1 = Place::<i32>::new("p1");
2481        let p2 = Place::<i32>::new("p2");
2482
2483        let t1 = Transition::builder("t1")
2484            .input(one(&p1))
2485            .output(out_place(&p2))
2486            .action(fork())
2487            .build();
2488
2489        let net = PetriNet::builder("test").transition(t1).build();
2490        let marking = Marking::new(); // empty — no initial tokens
2491
2492        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2493            &net,
2494            marking,
2495            ExecutorOptions {
2496                long_running: true,
2497                ..Default::default()
2498            },
2499        );
2500
2501        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2502
2503        // Inject a token after a short delay, then close the channel
2504        tokio::spawn(async move {
2505            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2506            let token = Token::at(77, 0);
2507            tx.send(ExternalEvent {
2508                place_name: Arc::from("p1"),
2509                token: ErasedToken::from_typed(&token),
2510            })
2511            .unwrap();
2512            // Drop tx to close the channel and let executor terminate
2513        });
2514
2515        executor.run_async(rx).await;
2516
2517        assert_eq!(executor.marking().count("p2"), 1);
2518        assert_eq!(*executor.marking().peek(&p2).unwrap(), 77);
2519    }
2520
2521    #[tokio::test]
2522    async fn async_with_event_store_records_events() {
2523        let p1 = Place::<i32>::new("p1");
2524        let p2 = Place::<i32>::new("p2");
2525
2526        let t1 = Transition::builder("t1")
2527            .input(one(&p1))
2528            .output(out_place(&p2))
2529            .action(fork())
2530            .build();
2531
2532        let net = PetriNet::builder("test").transition(t1).build();
2533        let mut marking = Marking::new();
2534        marking.add(&p1, Token::at(1, 0));
2535
2536        let mut executor =
2537            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2538
2539        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2540        executor.run_async(rx).await;
2541
2542        let events = executor.event_store().events();
2543        assert!(!events.is_empty());
2544        // Should have ExecutionStarted, TransitionStarted, TransitionCompleted, ExecutionCompleted
2545        assert!(
2546            events
2547                .iter()
2548                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2549        );
2550        assert!(
2551            events
2552                .iter()
2553                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2554        );
2555        assert!(
2556            events
2557                .iter()
2558                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2559        );
2560        assert!(
2561            events
2562                .iter()
2563                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2564        );
2565    }
2566
2567    #[tokio::test]
2568    async fn async_action_error_does_not_crash() {
2569        let p1 = Place::<i32>::new("p1");
2570        let p2 = Place::<i32>::new("p2");
2571
2572        let action =
2573            async_action(|_ctx| async move { Err(ActionError::new("intentional failure")) });
2574
2575        let t1 = Transition::builder("t1")
2576            .input(one(&p1))
2577            .output(out_place(&p2))
2578            .action(action)
2579            .build();
2580
2581        let net = PetriNet::builder("test").transition(t1).build();
2582        let mut marking = Marking::new();
2583        marking.add(&p1, Token::at(1, 0));
2584
2585        let mut executor =
2586            BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2587
2588        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2589        executor.run_async(rx).await;
2590
2591        // Token consumed but no output produced
2592        assert_eq!(executor.marking().count("p1"), 0);
2593        assert_eq!(executor.marking().count("p2"), 0);
2594
2595        // Failure event recorded
2596        let events = executor.event_store().events();
2597        assert!(
2598            events
2599                .iter()
2600                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2601        );
2602    }
2603
2604    #[tokio::test]
2605    async fn async_delayed_timing() {
2606        use libpetri_core::timing::delayed;
2607
2608        let p1 = Place::<i32>::new("p1");
2609        let p2 = Place::<i32>::new("p2");
2610
2611        let t1 = Transition::builder("t1")
2612            .input(one(&p1))
2613            .output(out_place(&p2))
2614            .timing(delayed(100))
2615            .action(fork())
2616            .build();
2617
2618        let net = PetriNet::builder("test").transition(t1).build();
2619        let mut marking = Marking::new();
2620        marking.add(&p1, Token::at(10, 0));
2621
2622        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2623            &net,
2624            marking,
2625            ExecutorOptions {
2626                long_running: true,
2627                ..Default::default()
2628            },
2629        );
2630
2631        let start = std::time::Instant::now();
2632        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2633        // Keep channel open long enough for the delayed transition to fire
2634        tokio::spawn(async move {
2635            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2636            drop(tx);
2637        });
2638        executor.run_async(rx).await;
2639
2640        assert!(
2641            start.elapsed().as_millis() >= 80,
2642            "delayed(100) should wait ~100ms"
2643        );
2644        assert_eq!(executor.marking().count("p2"), 1);
2645        assert_eq!(*executor.marking().peek(&p2).unwrap(), 10);
2646    }
2647
2648    #[tokio::test]
2649    async fn async_exact_timing() {
2650        use libpetri_core::timing::exact;
2651
2652        let p1 = Place::<i32>::new("p1");
2653        let p2 = Place::<i32>::new("p2");
2654
2655        let t1 = Transition::builder("t1")
2656            .input(one(&p1))
2657            .output(out_place(&p2))
2658            .timing(exact(100))
2659            .action(fork())
2660            .build();
2661
2662        let net = PetriNet::builder("test").transition(t1).build();
2663        let mut marking = Marking::new();
2664        marking.add(&p1, Token::at(20, 0));
2665
2666        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2667            &net,
2668            marking,
2669            ExecutorOptions {
2670                long_running: true,
2671                ..Default::default()
2672            },
2673        );
2674
2675        let start = std::time::Instant::now();
2676        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2677        tokio::spawn(async move {
2678            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2679            drop(tx);
2680        });
2681        executor.run_async(rx).await;
2682
2683        assert!(
2684            start.elapsed().as_millis() >= 80,
2685            "exact(100) should wait ~100ms"
2686        );
2687        assert_eq!(executor.marking().count("p2"), 1);
2688        assert_eq!(*executor.marking().peek(&p2).unwrap(), 20);
2689    }
2690
2691    #[tokio::test]
2692    async fn async_window_timing() {
2693        use libpetri_core::timing::window;
2694
2695        let p1 = Place::<i32>::new("p1");
2696        let p2 = Place::<i32>::new("p2");
2697
2698        let t1 = Transition::builder("t1")
2699            .input(one(&p1))
2700            .output(out_place(&p2))
2701            .timing(window(50, 200))
2702            .action(fork())
2703            .build();
2704
2705        let net = PetriNet::builder("test").transition(t1).build();
2706        let mut marking = Marking::new();
2707        marking.add(&p1, Token::at(30, 0));
2708
2709        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2710            &net,
2711            marking,
2712            ExecutorOptions {
2713                long_running: true,
2714                ..Default::default()
2715            },
2716        );
2717
2718        let start = std::time::Instant::now();
2719        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2720        tokio::spawn(async move {
2721            tokio::time::sleep(std::time::Duration::from_millis(400)).await;
2722            drop(tx);
2723        });
2724        executor.run_async(rx).await;
2725
2726        assert!(
2727            start.elapsed().as_millis() >= 40,
2728            "window(50,200) should wait >= ~50ms"
2729        );
2730        assert_eq!(executor.marking().count("p2"), 1);
2731        assert_eq!(*executor.marking().peek(&p2).unwrap(), 30);
2732    }
2733
2734    #[tokio::test]
2735    async fn async_deadline_enforcement() {
2736        use libpetri_core::action::sync_action;
2737        use libpetri_core::timing::window;
2738
2739        let p_slow = Place::<i32>::new("p_slow");
2740        let p_windowed = Place::<i32>::new("p_windowed");
2741        let slow_out = Place::<i32>::new("slow_out");
2742        let windowed_out = Place::<i32>::new("windowed_out");
2743
2744        // Sync action that busy-waits for 200ms, blocking the executor thread.
2745        // This prevents the executor from reaching the fire phase for windowed
2746        // until after its deadline has passed.
2747        let t_slow = Transition::builder("slow")
2748            .input(one(&p_slow))
2749            .output(out_place(&slow_out))
2750            .priority(10)
2751            .action(sync_action(|ctx| {
2752                let v = ctx.input::<i32>("p_slow")?;
2753                let start = std::time::Instant::now();
2754                while start.elapsed().as_millis() < 200 {
2755                    std::hint::spin_loop();
2756                }
2757                ctx.output("slow_out", *v)?;
2758                Ok(())
2759            }))
2760            .build();
2761
2762        // Windowed transition: enabled at start, earliest=50ms, deadline=100ms.
2763        // Because the slow sync action blocks the executor for 200ms, by the time
2764        // enforce_deadlines runs again, elapsed (~200ms) > latest (100ms) + tolerance.
2765        let t_windowed = Transition::builder("windowed")
2766            .input(one(&p_windowed))
2767            .output(out_place(&windowed_out))
2768            .timing(window(50, 100))
2769            .action(fork())
2770            .build();
2771
2772        let net = PetriNet::builder("test")
2773            .transitions([t_slow, t_windowed])
2774            .build();
2775
2776        let mut marking = Marking::new();
2777        marking.add(&p_slow, Token::at(1, 0));
2778        marking.add(&p_windowed, Token::at(2, 0));
2779
2780        let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
2781            &net,
2782            marking,
2783            ExecutorOptions {
2784                long_running: true,
2785                ..Default::default()
2786            },
2787        );
2788
2789        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2790        tokio::spawn(async move {
2791            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2792            drop(tx);
2793        });
2794        executor.run_async(rx).await;
2795
2796        // Slow fires and completes after 200ms busy-wait
2797        assert_eq!(executor.marking().count("slow_out"), 1);
2798        // Windowed should have been disabled by deadline enforcement
2799        assert_eq!(
2800            executor.marking().count("windowed_out"),
2801            0,
2802            "windowed transition should have been disabled by deadline"
2803        );
2804
2805        let events = executor.event_store().events();
2806        assert!(
2807            events.iter().any(|e| matches!(e, NetEvent::TransitionTimedOut { transition_name, .. } if &**transition_name == "windowed")),
2808            "expected TransitionTimedOut event for 'windowed'"
2809        );
2810    }
2811
2812    #[tokio::test]
2813    async fn async_multiple_injections() {
2814        let p1 = Place::<i32>::new("p1");
2815        let p2 = Place::<i32>::new("p2");
2816
2817        let t1 = Transition::builder("t1")
2818            .input(one(&p1))
2819            .output(out_place(&p2))
2820            .action(fork())
2821            .build();
2822
2823        let net = PetriNet::builder("test").transition(t1).build();
2824        let marking = Marking::new();
2825
2826        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2827            &net,
2828            marking,
2829            ExecutorOptions {
2830                long_running: true,
2831                ..Default::default()
2832            },
2833        );
2834
2835        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2836
2837        tokio::spawn(async move {
2838            for i in 0..5 {
2839                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2840                let token = Token::at(i, 0);
2841                tx.send(ExternalEvent {
2842                    place_name: Arc::from("p1"),
2843                    token: ErasedToken::from_typed(&token),
2844                })
2845                .unwrap();
2846            }
2847            // Drop tx to close channel
2848        });
2849
2850        executor.run_async(rx).await;
2851
2852        assert_eq!(
2853            executor.marking().count("p2"),
2854            5,
2855            "all 5 injected tokens should arrive"
2856        );
2857    }
2858
2859    #[tokio::test]
2860    async fn async_parallel_execution() {
2861        let p1 = Place::<i32>::new("p1");
2862        let p2 = Place::<i32>::new("p2");
2863        let p3 = Place::<i32>::new("p3");
2864        let out1 = Place::<i32>::new("out1");
2865        let out2 = Place::<i32>::new("out2");
2866        let out3 = Place::<i32>::new("out3");
2867
2868        let make_transition = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
2869            Transition::builder(name)
2870                .input(one(inp))
2871                .output(out_place(outp))
2872                .action(async_action(|mut ctx| async move {
2873                    let v: i32 =
2874                        *ctx.input::<i32>(ctx.transition_name().replace("t", "p").as_str())?;
2875                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2876                    ctx.output(&ctx.transition_name().replace("t", "out"), v)?;
2877                    Ok(ctx)
2878                }))
2879                .build()
2880        };
2881
2882        let t1 = make_transition("t1", &p1, &out1);
2883        let t2 = make_transition("t2", &p2, &out2);
2884        let t3 = make_transition("t3", &p3, &out3);
2885
2886        let net = PetriNet::builder("parallel")
2887            .transitions([t1, t2, t3])
2888            .build();
2889        let mut marking = Marking::new();
2890        marking.add(&p1, Token::at(1, 0));
2891        marking.add(&p2, Token::at(2, 0));
2892        marking.add(&p3, Token::at(3, 0));
2893
2894        let mut executor =
2895            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2896
2897        let start = std::time::Instant::now();
2898        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2899        executor.run_async(rx).await;
2900        let elapsed = start.elapsed().as_millis();
2901
2902        assert_eq!(executor.marking().count("out1"), 1);
2903        assert_eq!(executor.marking().count("out2"), 1);
2904        assert_eq!(executor.marking().count("out3"), 1);
2905        assert!(
2906            elapsed < 250,
2907            "parallel execution should take < 250ms, took {elapsed}ms"
2908        );
2909    }
2910
2911    #[tokio::test]
2912    async fn async_sequential_chain_order() {
2913        use std::sync::Mutex;
2914
2915        let p1 = Place::<i32>::new("p1");
2916        let p2 = Place::<i32>::new("p2");
2917        let p3 = Place::<i32>::new("p3");
2918        let p4 = Place::<i32>::new("p4");
2919
2920        let order: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
2921
2922        let make_chain = |name: &str,
2923                          inp: &Place<i32>,
2924                          outp: &Place<i32>,
2925                          id: i32,
2926                          order: Arc<Mutex<Vec<i32>>>| {
2927            let inp_name: Arc<str> = Arc::from(inp.name());
2928            let outp_name: Arc<str> = Arc::from(outp.name());
2929            Transition::builder(name)
2930                .input(one(inp))
2931                .output(out_place(outp))
2932                .action(async_action(move |mut ctx| {
2933                    let order = Arc::clone(&order);
2934                    let inp_name = Arc::clone(&inp_name);
2935                    let outp_name = Arc::clone(&outp_name);
2936                    async move {
2937                        let v: i32 = *ctx.input::<i32>(&inp_name)?;
2938                        order.lock().unwrap().push(id);
2939                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2940                        ctx.output(&outp_name, v)?;
2941                        Ok(ctx)
2942                    }
2943                }))
2944                .build()
2945        };
2946
2947        let t1 = make_chain("t1", &p1, &p2, 1, Arc::clone(&order));
2948        let t2 = make_chain("t2", &p2, &p3, 2, Arc::clone(&order));
2949        let t3 = make_chain("t3", &p3, &p4, 3, Arc::clone(&order));
2950
2951        let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
2952        let mut marking = Marking::new();
2953        marking.add(&p1, Token::at(1, 0));
2954
2955        let mut executor =
2956            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2957
2958        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2959        executor.run_async(rx).await;
2960
2961        assert_eq!(executor.marking().count("p4"), 1);
2962        let recorded = order.lock().unwrap().clone();
2963        assert_eq!(recorded, vec![1, 2, 3], "chain should execute in order");
2964    }
2965
2966    #[tokio::test]
2967    async fn async_fork_join() {
2968        use libpetri_core::output::and;
2969
2970        let p1 = Place::<i32>::new("p1");
2971        let p2 = Place::<i32>::new("p2");
2972        let p3 = Place::<i32>::new("p3");
2973        let p4 = Place::<i32>::new("p4");
2974
2975        // Fork: p1 -> (p2, p3) via AND output
2976        let t_fork = Transition::builder("fork")
2977            .input(one(&p1))
2978            .output(and(vec![out_place(&p2), out_place(&p3)]))
2979            .action(libpetri_core::action::sync_action(|ctx| {
2980                let v = ctx.input::<i32>("p1")?;
2981                ctx.output("p2", *v)?;
2982                ctx.output("p3", *v)?;
2983                Ok(())
2984            }))
2985            .build();
2986
2987        // Join: (p2, p3) -> p4
2988        let t_join = Transition::builder("join")
2989            .input(one(&p2))
2990            .input(one(&p3))
2991            .output(out_place(&p4))
2992            .action(libpetri_core::action::sync_action(|ctx| {
2993                let a = ctx.input::<i32>("p2")?;
2994                let b = ctx.input::<i32>("p3")?;
2995                ctx.output("p4", *a + *b)?;
2996                Ok(())
2997            }))
2998            .build();
2999
3000        let net = PetriNet::builder("fork_join")
3001            .transitions([t_fork, t_join])
3002            .build();
3003
3004        let mut marking = Marking::new();
3005        marking.add(&p1, Token::at(5, 0));
3006
3007        let mut executor =
3008            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3009
3010        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3011        executor.run_async(rx).await;
3012
3013        assert_eq!(executor.marking().count("p2"), 0);
3014        assert_eq!(executor.marking().count("p3"), 0);
3015        assert_eq!(executor.marking().count("p4"), 1);
3016        assert_eq!(*executor.marking().peek(&p4).unwrap(), 10);
3017    }
3018
3019    #[tokio::test]
3020    async fn async_xor_output_branching() {
3021        use libpetri_core::output::xor;
3022
3023        let p = Place::<i32>::new("p");
3024        let left = Place::<i32>::new("left");
3025        let right = Place::<i32>::new("right");
3026
3027        let t = Transition::builder("xor_t")
3028            .input(one(&p))
3029            .output(xor(vec![out_place(&left), out_place(&right)]))
3030            .action(libpetri_core::action::sync_action(|ctx| {
3031                let v = ctx.input::<i32>("p")?;
3032                if *v > 0 {
3033                    ctx.output("left", *v)?;
3034                } else {
3035                    ctx.output("right", *v)?;
3036                }
3037                Ok(())
3038            }))
3039            .build();
3040
3041        let net = PetriNet::builder("xor_test").transition(t).build();
3042
3043        // Test positive value goes left
3044        let mut marking = Marking::new();
3045        marking.add(&p, Token::at(42, 0));
3046
3047        let mut executor =
3048            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3049        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3050        executor.run_async(rx).await;
3051
3052        assert_eq!(executor.marking().count("left"), 1);
3053        assert_eq!(executor.marking().count("right"), 0);
3054        assert_eq!(*executor.marking().peek(&left).unwrap(), 42);
3055    }
3056
3057    #[tokio::test]
3058    async fn async_loop_with_guard() {
3059        use libpetri_core::input::one_guarded;
3060
3061        let counter = Place::<i32>::new("counter");
3062        let done = Place::<i32>::new("done");
3063
3064        // Loop transition: counter -> counter (increment), guarded to fire when < 3
3065        let t_loop = Transition::builder("loop")
3066            .input(one_guarded(&counter, |v: &i32| *v < 3))
3067            .output(out_place(&counter))
3068            .action(libpetri_core::action::sync_action(|ctx| {
3069                let v = ctx.input::<i32>("counter")?;
3070                ctx.output("counter", *v + 1)?;
3071                Ok(())
3072            }))
3073            .build();
3074
3075        // Exit transition: counter -> done, guarded to fire when >= 3
3076        let t_exit = Transition::builder("exit")
3077            .input(one_guarded(&counter, |v: &i32| *v >= 3))
3078            .output(out_place(&done))
3079            .action(fork())
3080            .build();
3081
3082        let net = PetriNet::builder("loop_net")
3083            .transitions([t_loop, t_exit])
3084            .build();
3085
3086        let mut marking = Marking::new();
3087        marking.add(&counter, Token::at(0, 0));
3088
3089        let mut executor =
3090            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3091
3092        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3093        executor.run_async(rx).await;
3094
3095        assert_eq!(executor.marking().count("done"), 1);
3096        assert_eq!(*executor.marking().peek(&done).unwrap(), 3);
3097    }
3098
3099    #[tokio::test]
3100    async fn async_delayed_fires_without_injection() {
3101        use libpetri_core::timing::delayed;
3102
3103        let p1 = Place::<i32>::new("p1");
3104        let p2 = Place::<i32>::new("p2");
3105
3106        let t1 = Transition::builder("t1")
3107            .input(one(&p1))
3108            .output(out_place(&p2))
3109            .timing(delayed(100))
3110            .action(fork())
3111            .build();
3112
3113        let net = PetriNet::builder("test").transition(t1).build();
3114        let mut marking = Marking::new();
3115        marking.add(&p1, Token::at(7, 0));
3116
3117        let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3118            &net,
3119            marking,
3120            ExecutorOptions {
3121                long_running: true,
3122                ..Default::default()
3123            },
3124        );
3125
3126        // No injections; keep channel open long enough for the delayed transition to fire
3127        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3128        tokio::spawn(async move {
3129            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3130            drop(tx);
3131        });
3132        executor.run_async(rx).await;
3133
3134        assert_eq!(executor.marking().count("p2"), 1);
3135        assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3136    }
3137
3138    #[tokio::test]
3139    #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3140    async fn async_timeout_produces_timeout_token() {
3141        use libpetri_core::output::{timeout_place, xor};
3142
3143        let p1 = Place::<i32>::new("p1");
3144        let success = Place::<i32>::new("success");
3145        let timeout_out = Place::<i32>::new("timeout_out");
3146
3147        let t1 = Transition::builder("t1")
3148            .input(one(&p1))
3149            .output(xor(vec![
3150                out_place(&success),
3151                timeout_place(50, &timeout_out),
3152            ]))
3153            .action(async_action(|mut ctx| async move {
3154                let v: i32 = *ctx.input::<i32>("p1")?;
3155                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3156                ctx.output("success", v)?;
3157                Ok(ctx)
3158            }))
3159            .build();
3160
3161        let net = PetriNet::builder("test").transition(t1).build();
3162        let mut marking = Marking::new();
3163        marking.add(&p1, Token::at(1, 0));
3164
3165        let mut executor =
3166            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3167
3168        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3169        executor.run_async(rx).await;
3170
3171        assert_eq!(executor.marking().count("timeout_out"), 1);
3172        assert_eq!(executor.marking().count("success"), 0);
3173    }
3174
3175    #[tokio::test]
3176    #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3177    async fn async_timeout_normal_when_fast() {
3178        use libpetri_core::output::{timeout_place, xor};
3179
3180        let p1 = Place::<i32>::new("p1");
3181        let success = Place::<i32>::new("success");
3182        let timeout_out = Place::<i32>::new("timeout_out");
3183
3184        let t1 = Transition::builder("t1")
3185            .input(one(&p1))
3186            .output(xor(vec![
3187                out_place(&success),
3188                timeout_place(500, &timeout_out),
3189            ]))
3190            .action(async_action(|mut ctx| async move {
3191                let v: i32 = *ctx.input::<i32>("p1")?;
3192                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3193                ctx.output("success", v)?;
3194                Ok(ctx)
3195            }))
3196            .build();
3197
3198        let net = PetriNet::builder("test").transition(t1).build();
3199        let mut marking = Marking::new();
3200        marking.add(&p1, Token::at(1, 0));
3201
3202        let mut executor =
3203            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3204
3205        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3206        executor.run_async(rx).await;
3207
3208        assert_eq!(executor.marking().count("success"), 1);
3209        assert_eq!(executor.marking().count("timeout_out"), 0);
3210    }
3211
3212    #[tokio::test]
3213    async fn async_event_store_records_token_added_for_injection() {
3214        let p1 = Place::<i32>::new("p1");
3215        let p2 = Place::<i32>::new("p2");
3216
3217        let t1 = Transition::builder("t1")
3218            .input(one(&p1))
3219            .output(out_place(&p2))
3220            .action(fork())
3221            .build();
3222
3223        let net = PetriNet::builder("test").transition(t1).build();
3224        let marking = Marking::new();
3225
3226        let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
3227            &net,
3228            marking,
3229            ExecutorOptions {
3230                long_running: true,
3231                ..Default::default()
3232            },
3233        );
3234
3235        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3236
3237        tokio::spawn(async move {
3238            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3239            let token = Token::at(99, 0);
3240            tx.send(ExternalEvent {
3241                place_name: Arc::from("p1"),
3242                token: ErasedToken::from_typed(&token),
3243            })
3244            .unwrap();
3245        });
3246
3247        executor.run_async(rx).await;
3248
3249        let events = executor.event_store().events();
3250        // Should have TokenAdded for the injected token into p1
3251        assert!(
3252            events.iter().any(
3253                |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p1")
3254            ),
3255            "expected TokenAdded event for injected token into p1"
3256        );
3257        // And also TokenAdded for the output into p2
3258        assert!(
3259            events.iter().any(
3260                |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p2")
3261            ),
3262            "expected TokenAdded event for output token into p2"
3263        );
3264    }
3265
3266    #[tokio::test]
3267    async fn async_inhibitor_blocks_in_async() {
3268        let p1 = Place::<()>::new("p1");
3269        let p2 = Place::<()>::new("p2");
3270        let p_inh = Place::<()>::new("inh");
3271
3272        let t = Transition::builder("t1")
3273            .input(one(&p1))
3274            .output(out_place(&p2))
3275            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
3276            .action(fork())
3277            .build();
3278
3279        let net = PetriNet::builder("inhibitor").transition(t).build();
3280
3281        let mut marking = Marking::new();
3282        marking.add(&p1, Token::at((), 0));
3283        marking.add(&p_inh, Token::at((), 0));
3284
3285        let mut executor =
3286            BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3287
3288        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3289        executor.run_async(rx).await;
3290
3291        // Inhibitor should block — token remains in p1, nothing in p2
3292        assert_eq!(executor.marking().count("p1"), 1);
3293        assert_eq!(executor.marking().count("p2"), 0);
3294    }
3295}