Skip to main content

libpetri_runtime/
precompiled_executor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::token::ErasedToken;
8
9use libpetri_event::event_store::EventStore;
10use libpetri_event::net_event::NetEvent;
11
12use crate::bitmap;
13use crate::marking::Marking;
14use crate::precompiled_net::{
15    CONSUME_ALL, CONSUME_ATLEAST, CONSUME_N, CONSUME_ONE, PrecompiledNet, RESET,
16};
17
18/// Tolerance for deadline enforcement to account for timer jitter.
19const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21/// Initial capacity for ring buffer per place.
22const INITIAL_RING_CAPACITY: usize = 4;
23
24/// High-performance precompiled flat-array Petri net executor.
25///
26/// Uses ring-buffer token storage, opcode-based consume operations,
27/// priority-partitioned ready queues, and two-level summary bitmaps
28/// for sparse iteration. Generic over `E: EventStore` for zero-cost
29/// noop event recording.
30pub struct PrecompiledNetExecutor<'a, E: EventStore> {
31    program: &'a PrecompiledNet<'a>,
32    event_store: E,
33    #[allow(dead_code)]
34    environment_places: HashSet<Arc<str>>,
35    #[allow(dead_code)]
36    long_running: bool,
37    #[allow(dead_code)]
38    skip_output_validation: bool,
39
40    // ==================== Flat Token Pool ====================
41    token_pool: Vec<Option<ErasedToken>>,
42    place_offset: Vec<usize>,
43    token_counts: Vec<usize>,
44    ring_head: Vec<usize>,
45    ring_tail: Vec<usize>,
46    ring_capacity: Vec<usize>,
47
48    // ==================== Presence Bitmap ====================
49    marking_bitmap: Vec<u64>,
50
51    // ==================== Transition State ====================
52    enabled_bitmap: Vec<u64>,
53    dirty_bitmap: Vec<u64>,
54    dirty_scan_buffer: Vec<u64>,
55    enabled_at_ms: Vec<f64>,
56    enabled_transition_count: usize,
57
58    // ==================== Summary Bitmaps (two-level) ====================
59    dirty_word_summary: Vec<u64>,
60    enabled_word_summary: Vec<u64>,
61    transition_words: usize,
62    summary_words: usize,
63
64    // ==================== Priority-Partitioned Ready Queues ====================
65    ready_queues: Vec<Vec<usize>>,
66    ready_queue_head: Vec<usize>,
67    ready_queue_tail: Vec<usize>,
68    ready_queue_size: Vec<usize>,
69
70    // ==================== Reset-Clock Detection ====================
71    pending_reset_words: Vec<u64>,
72    has_pending_resets: bool,
73
74    // ==================== Reusable Buffers (avoid per-firing allocation) ====================
75    reusable_inputs: HashMap<Arc<str>, Vec<ErasedToken>>,
76    reusable_reads: HashMap<Arc<str>, Vec<ErasedToken>>,
77
78    // ==================== Lifecycle ====================
79    start_time: Instant,
80}
81
82/// Builder for PrecompiledNetExecutor.
83pub struct PrecompiledExecutorBuilder<'a, E: EventStore> {
84    program: &'a PrecompiledNet<'a>,
85    initial_marking: Marking,
86    event_store: Option<E>,
87    environment_places: HashSet<Arc<str>>,
88    long_running: bool,
89    skip_output_validation: bool,
90}
91
92impl<'a, E: EventStore> PrecompiledExecutorBuilder<'a, E> {
93    /// Sets the event store.
94    pub fn event_store(mut self, store: E) -> Self {
95        self.event_store = Some(store);
96        self
97    }
98
99    /// Sets the environment places.
100    pub fn environment_places(mut self, places: HashSet<Arc<str>>) -> Self {
101        self.environment_places = places;
102        self
103    }
104
105    /// Enables long-running mode.
106    pub fn long_running(mut self, enabled: bool) -> Self {
107        self.long_running = enabled;
108        self
109    }
110
111    /// Skips output validation for trusted transition actions.
112    pub fn skip_output_validation(mut self, skip: bool) -> Self {
113        self.skip_output_validation = skip;
114        self
115    }
116
117    /// Builds the executor.
118    pub fn build(self) -> PrecompiledNetExecutor<'a, E> {
119        PrecompiledNetExecutor::new_inner(
120            self.program,
121            self.initial_marking,
122            self.event_store.unwrap_or_default(),
123            self.environment_places,
124            self.long_running,
125            self.skip_output_validation,
126        )
127    }
128}
129
130impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
131    /// Creates a builder for a PrecompiledNetExecutor.
132    pub fn builder(
133        program: &'a PrecompiledNet<'a>,
134        initial_marking: Marking,
135    ) -> PrecompiledExecutorBuilder<'a, E> {
136        PrecompiledExecutorBuilder {
137            program,
138            initial_marking,
139            event_store: None,
140            environment_places: HashSet::new(),
141            long_running: false,
142            skip_output_validation: false,
143        }
144    }
145
146    /// Creates a new executor with default options.
147    pub fn new(program: &'a PrecompiledNet<'a>, initial_marking: Marking) -> Self {
148        Self::new_inner(
149            program,
150            initial_marking,
151            E::default(),
152            HashSet::new(),
153            false,
154            false,
155        )
156    }
157
158    fn new_inner(
159        program: &'a PrecompiledNet<'a>,
160        initial_marking: Marking,
161        event_store: E,
162        environment_places: HashSet<Arc<str>>,
163        long_running: bool,
164        skip_output_validation: bool,
165    ) -> Self {
166        let pc = program.place_count();
167        let tc = program.transition_count();
168        let wc = program.word_count();
169
170        // Initialize flat token pool
171        let total_slots = pc * INITIAL_RING_CAPACITY;
172        let mut token_pool = vec![None; total_slots];
173        let mut place_offset = vec![0usize; pc];
174        let mut token_counts = vec![0usize; pc];
175        let mut ring_head = vec![0usize; pc];
176        let mut ring_tail = vec![0usize; pc];
177        let mut ring_capacity = vec![INITIAL_RING_CAPACITY; pc];
178
179        for (pid, offset) in place_offset.iter_mut().enumerate() {
180            *offset = pid * INITIAL_RING_CAPACITY;
181        }
182
183        // Load initial tokens into ring buffers
184        for pid in 0..pc {
185            let place = program.place(pid);
186            if let Some(queue) = initial_marking.queue(place.name()) {
187                for token in queue {
188                    // Ring add last
189                    if token_counts[pid] == ring_capacity[pid] {
190                        grow_ring_static(
191                            &mut token_pool,
192                            &mut place_offset,
193                            &mut ring_head,
194                            &mut ring_tail,
195                            &mut ring_capacity,
196                            &token_counts,
197                            pid,
198                        );
199                    }
200                    let tail = ring_tail[pid];
201                    let offset = place_offset[pid];
202                    token_pool[offset + tail] = Some(token.clone());
203                    ring_tail[pid] = (tail + 1) % ring_capacity[pid];
204                    token_counts[pid] += 1;
205                }
206            }
207        }
208
209        // Transition bitmaps
210        let transition_words = bitmap::word_count(tc);
211        let summary_words = bitmap::word_count(transition_words);
212
213        // Priority-partitioned ready queues
214        let prio_count = program.distinct_priority_count;
215        let queue_cap = tc.max(4);
216        let ready_queues = vec![vec![0usize; queue_cap]; prio_count];
217        let ready_queue_head = vec![0usize; prio_count];
218        let ready_queue_tail = vec![0usize; prio_count];
219        let ready_queue_size = vec![0usize; prio_count];
220
221        Self {
222            program,
223            event_store,
224            environment_places,
225            long_running,
226            skip_output_validation,
227            token_pool,
228            place_offset,
229            token_counts,
230            ring_head,
231            ring_tail,
232            ring_capacity,
233            marking_bitmap: vec![0u64; wc],
234            enabled_bitmap: vec![0u64; transition_words],
235            dirty_bitmap: vec![0u64; transition_words],
236            dirty_scan_buffer: vec![0u64; transition_words],
237            enabled_at_ms: vec![f64::NEG_INFINITY; tc],
238            enabled_transition_count: 0,
239            dirty_word_summary: vec![0u64; summary_words],
240            enabled_word_summary: vec![0u64; summary_words],
241            transition_words,
242            summary_words,
243            ready_queues,
244            ready_queue_head,
245            ready_queue_tail,
246            ready_queue_size,
247            pending_reset_words: vec![0u64; wc],
248            has_pending_resets: false,
249            reusable_inputs: HashMap::new(),
250            reusable_reads: HashMap::new(),
251            start_time: Instant::now(),
252        }
253    }
254
255    // ==================== Public API ====================
256
257    /// Runs the executor synchronously until completion.
258    /// Returns the final marking (materialized from ring buffers).
259    pub fn run_sync(&mut self) -> Marking {
260        self.run_to_completion()
261    }
262
263    /// Returns the materialized marking (synced from ring buffers).
264    pub fn marking(&self) -> Marking {
265        self.materialize_marking()
266    }
267
268    /// Returns a reference to the event store.
269    pub fn event_store(&self) -> &E {
270        &self.event_store
271    }
272
273    /// Returns true if the executor is quiescent.
274    pub fn is_quiescent(&self) -> bool {
275        self.enabled_transition_count == 0
276    }
277
278    // ==================== Ring Buffer Operations ====================
279
280    #[inline]
281    fn ring_remove_first(&mut self, pid: usize) -> ErasedToken {
282        let head = self.ring_head[pid];
283        let offset = self.place_offset[pid];
284        let token = self.token_pool[offset + head].take().unwrap();
285        self.ring_head[pid] = (head + 1) % self.ring_capacity[pid];
286        self.token_counts[pid] -= 1;
287        token
288    }
289
290    #[inline]
291    fn ring_add_last(&mut self, pid: usize, token: ErasedToken) {
292        if self.token_counts[pid] == self.ring_capacity[pid] {
293            self.grow_ring(pid);
294        }
295        let tail = self.ring_tail[pid];
296        let offset = self.place_offset[pid];
297        self.token_pool[offset + tail] = Some(token);
298        self.ring_tail[pid] = (tail + 1) % self.ring_capacity[pid];
299        self.token_counts[pid] += 1;
300    }
301
302    #[inline]
303    fn ring_peek_first(&self, pid: usize) -> Option<&ErasedToken> {
304        if self.token_counts[pid] == 0 {
305            return None;
306        }
307        self.token_pool[self.place_offset[pid] + self.ring_head[pid]].as_ref()
308    }
309
310    fn ring_remove_all(&mut self, pid: usize) -> Vec<ErasedToken> {
311        let count = self.token_counts[pid];
312        if count == 0 {
313            return Vec::new();
314        }
315        let mut result = Vec::with_capacity(count);
316        for _ in 0..count {
317            result.push(self.ring_remove_first(pid));
318        }
319        result
320    }
321
322    fn grow_ring(&mut self, pid: usize) {
323        grow_ring_static(
324            &mut self.token_pool,
325            &mut self.place_offset,
326            &mut self.ring_head,
327            &mut self.ring_tail,
328            &mut self.ring_capacity,
329            &self.token_counts,
330            pid,
331        );
332    }
333
334    // ==================== Bitmap Helpers ====================
335
336    #[inline]
337    fn set_enabled_bit(&mut self, tid: usize) {
338        let w = tid >> bitmap::WORD_SHIFT;
339        self.enabled_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
340        self.enabled_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
341    }
342
343    #[inline]
344    fn clear_enabled_bit(&mut self, tid: usize) {
345        let w = tid >> bitmap::WORD_SHIFT;
346        self.enabled_bitmap[w] &= !(1u64 << (tid & bitmap::WORD_MASK));
347        if self.enabled_bitmap[w] == 0 {
348            self.enabled_word_summary[w >> bitmap::WORD_SHIFT] &=
349                !(1u64 << (w & bitmap::WORD_MASK));
350        }
351    }
352
353    #[inline]
354    fn is_enabled(&self, tid: usize) -> bool {
355        (self.enabled_bitmap[tid >> bitmap::WORD_SHIFT] & (1u64 << (tid & bitmap::WORD_MASK))) != 0
356    }
357
358    #[inline]
359    fn set_marking_bit(&mut self, pid: usize) {
360        bitmap::set_bit(&mut self.marking_bitmap, pid);
361    }
362
363    #[inline]
364    fn clear_marking_bit(&mut self, pid: usize) {
365        bitmap::clear_bit(&mut self.marking_bitmap, pid);
366    }
367
368    // ==================== Ready Queue Operations ====================
369
370    fn ready_queue_push(&mut self, tid: usize) {
371        let pi = self.program.transition_to_priority_index[tid];
372        if self.ready_queue_size[pi] == self.ready_queues[pi].len() {
373            let old_cap = self.ready_queues[pi].len();
374            let new_cap = old_cap * 2;
375            let mut new_queue = vec![0usize; new_cap];
376            let head = self.ready_queue_head[pi];
377            for (i, slot) in new_queue.iter_mut().enumerate().take(old_cap) {
378                *slot = self.ready_queues[pi][(head + i) % old_cap];
379            }
380            self.ready_queues[pi] = new_queue;
381            self.ready_queue_head[pi] = 0;
382            self.ready_queue_tail[pi] = old_cap;
383        }
384        let tail = self.ready_queue_tail[pi];
385        self.ready_queues[pi][tail] = tid;
386        self.ready_queue_tail[pi] = (tail + 1) % self.ready_queues[pi].len();
387        self.ready_queue_size[pi] += 1;
388    }
389
390    fn ready_queue_pop(&mut self, pi: usize) -> usize {
391        let head = self.ready_queue_head[pi];
392        let tid = self.ready_queues[pi][head];
393        self.ready_queue_head[pi] = (head + 1) % self.ready_queues[pi].len();
394        self.ready_queue_size[pi] -= 1;
395        tid
396    }
397
398    fn clear_all_ready_queues(&mut self) {
399        for pi in 0..self.program.distinct_priority_count {
400            self.ready_queue_head[pi] = 0;
401            self.ready_queue_tail[pi] = 0;
402            self.ready_queue_size[pi] = 0;
403        }
404    }
405
406    // ==================== Initialize ====================
407
408    fn initialize_marking_bitmap(&mut self) {
409        for pid in 0..self.program.place_count() {
410            if self.token_counts[pid] > 0 {
411                self.set_marking_bit(pid);
412            }
413        }
414    }
415
416    fn mark_all_dirty(&mut self) {
417        let tc = self.program.transition_count();
418        let last_word_bits = tc & bitmap::WORD_MASK;
419        for w in 0..self.transition_words.saturating_sub(1) {
420            self.dirty_bitmap[w] = u64::MAX;
421        }
422        if self.transition_words > 0 {
423            self.dirty_bitmap[self.transition_words - 1] = if last_word_bits == 0 {
424                u64::MAX
425            } else {
426                (1u64 << last_word_bits) - 1
427            };
428        }
429        // Set all summary bits
430        for s in 0..self.summary_words {
431            let first_w = s << bitmap::WORD_SHIFT;
432            let last_w = (first_w + bitmap::WORD_MASK).min(self.transition_words.saturating_sub(1));
433            let count = last_w - first_w + 1;
434            let last_bits = count & bitmap::WORD_MASK;
435            self.dirty_word_summary[s] = if last_bits == 0 {
436                u64::MAX
437            } else {
438                (1u64 << last_bits) - 1
439            };
440        }
441    }
442
443    fn should_terminate(&self) -> bool {
444        if self.long_running {
445            return false;
446        }
447        self.enabled_transition_count == 0
448    }
449
450    // ==================== Dirty Set Processing ====================
451
452    fn update_dirty_transitions(&mut self) {
453        let now_ms = self.elapsed_ms();
454
455        // Snapshot and clear dirty bitmap using summary
456        for s in 0..self.summary_words {
457            let mut summary = self.dirty_word_summary[s];
458            self.dirty_word_summary[s] = 0;
459            while summary != 0 {
460                let local_w = summary.trailing_zeros() as usize;
461                summary &= summary - 1;
462                let w = (s << bitmap::WORD_SHIFT) | local_w;
463                if w < self.transition_words {
464                    self.dirty_scan_buffer[w] = self.dirty_bitmap[w];
465                    self.dirty_bitmap[w] = 0;
466                }
467            }
468        }
469
470        // Process dirty transitions
471        let tc = self.program.transition_count();
472        for w in 0..self.transition_words {
473            let mut word = self.dirty_scan_buffer[w];
474            if word == 0 {
475                continue;
476            }
477            self.dirty_scan_buffer[w] = 0;
478            while word != 0 {
479                let bit = word.trailing_zeros() as usize;
480                let tid = (w << bitmap::WORD_SHIFT) | bit;
481                word &= word - 1;
482
483                if tid >= tc {
484                    break;
485                }
486
487                let was_enabled = self.is_enabled(tid);
488                let can_now = self.can_enable(tid);
489
490                if can_now && !was_enabled {
491                    self.set_enabled_bit(tid);
492                    self.enabled_transition_count += 1;
493                    self.enabled_at_ms[tid] = now_ms;
494
495                    if E::ENABLED {
496                        self.event_store.append(NetEvent::TransitionEnabled {
497                            transition_name: Arc::clone(self.program.transition(tid).name_arc()),
498                            timestamp: now_millis(),
499                        });
500                    }
501                } else if !can_now && was_enabled {
502                    self.clear_enabled_bit(tid);
503                    self.enabled_transition_count -= 1;
504                    self.enabled_at_ms[tid] = f64::NEG_INFINITY;
505                } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
506                    self.enabled_at_ms[tid] = now_ms;
507                    if E::ENABLED {
508                        self.event_store.append(NetEvent::TransitionClockRestarted {
509                            transition_name: Arc::clone(self.program.transition(tid).name_arc()),
510                            timestamp: now_millis(),
511                        });
512                    }
513                }
514            }
515        }
516
517        self.clear_pending_resets();
518    }
519
520    fn can_enable(&self, tid: usize) -> bool {
521        if !self.program.can_enable_bitmap(tid, &self.marking_bitmap) {
522            return false;
523        }
524
525        // Cardinality check using token_counts directly (no HashMap lookup)
526        if let Some(card_check) = self.program.compiled().cardinality_check(tid) {
527            for i in 0..card_check.place_ids.len() {
528                let pid = card_check.place_ids[i];
529                let required = card_check.required_counts[i];
530                if self.token_counts[pid] < required {
531                    return false;
532                }
533            }
534        }
535
536        // Guard check (needs token access)
537        if self.program.compiled().has_guards(tid) {
538            let t = self.program.transition(tid);
539            for spec in t.input_specs() {
540                if let Some(guard) = spec.guard() {
541                    let required = match spec {
542                        In::One { .. } => 1,
543                        In::Exactly { count, .. } => *count,
544                        In::AtLeast { minimum, .. } => *minimum,
545                        In::All { .. } => 1,
546                    };
547                    let pid = self.program.place_id(spec.place_name()).unwrap();
548                    let count = self.count_matching_in_ring(pid, &**guard);
549                    if count < required {
550                        return false;
551                    }
552                }
553            }
554        }
555
556        true
557    }
558
559    fn count_matching_in_ring(
560        &self,
561        pid: usize,
562        guard: &dyn Fn(&dyn std::any::Any) -> bool,
563    ) -> usize {
564        let count = self.token_counts[pid];
565        if count == 0 {
566            return 0;
567        }
568        let offset = self.place_offset[pid];
569        let head = self.ring_head[pid];
570        let cap = self.ring_capacity[pid];
571        let mut matched = 0;
572        for i in 0..count {
573            let idx = offset + (head + i) % cap;
574            if let Some(token) = &self.token_pool[idx]
575                && guard(token.value.as_ref())
576            {
577                matched += 1;
578            }
579        }
580        matched
581    }
582
583    fn has_input_from_reset_place(&self, tid: usize) -> bool {
584        if !self.has_pending_resets {
585            return false;
586        }
587        let input_mask = &self.program.input_place_mask_words[tid];
588        for (im, pr) in input_mask.iter().zip(self.pending_reset_words.iter()) {
589            if (im & pr) != 0 {
590                return true;
591            }
592        }
593        false
594    }
595
596    fn clear_pending_resets(&mut self) {
597        if self.has_pending_resets {
598            for w in &mut self.pending_reset_words {
599                *w = 0;
600            }
601            self.has_pending_resets = false;
602        }
603    }
604
605    // ==================== Deadline Enforcement ====================
606
607    fn enforce_deadlines(&mut self, now_ms: f64) {
608        for s in 0..self.summary_words {
609            let mut summary = self.enabled_word_summary[s];
610            while summary != 0 {
611                let local_w = summary.trailing_zeros() as usize;
612                summary &= summary - 1;
613                let w = (s << bitmap::WORD_SHIFT) | local_w;
614                if w >= self.transition_words {
615                    continue;
616                }
617                let mut word = self.enabled_bitmap[w];
618                while word != 0 {
619                    let bit = word.trailing_zeros() as usize;
620                    let tid = (w << bitmap::WORD_SHIFT) | bit;
621                    word &= word - 1;
622
623                    if !self.program.has_deadline[tid] {
624                        continue;
625                    }
626
627                    let elapsed = now_ms - self.enabled_at_ms[tid];
628                    let latest_ms = self.program.latest_ms[tid];
629
630                    if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
631                        self.clear_enabled_bit(tid);
632                        self.enabled_transition_count -= 1;
633                        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
634                        self.mark_transition_dirty(tid);
635
636                        if E::ENABLED {
637                            self.event_store.append(NetEvent::TransitionTimedOut {
638                                transition_name: Arc::clone(
639                                    self.program.transition(tid).name_arc(),
640                                ),
641                                timestamp: now_millis(),
642                            });
643                        }
644                    }
645                }
646            }
647        }
648    }
649
650    // ==================== Firing (Sync) ====================
651
652    fn fire_ready_immediate_sync(&mut self) {
653        for s in 0..self.summary_words {
654            let mut summary = self.enabled_word_summary[s];
655            while summary != 0 {
656                let local_w = summary.trailing_zeros() as usize;
657                summary &= summary - 1;
658                let w = (s << bitmap::WORD_SHIFT) | local_w;
659                if w >= self.transition_words {
660                    continue;
661                }
662                let word = self.enabled_bitmap[w];
663                let mut remaining = word;
664                while remaining != 0 {
665                    let bit = remaining.trailing_zeros() as usize;
666                    let tid = (w << bitmap::WORD_SHIFT) | bit;
667                    remaining &= remaining - 1;
668
669                    if self.can_enable(tid) {
670                        self.fire_transition_sync(tid);
671                    } else {
672                        self.clear_enabled_bit(tid);
673                        self.enabled_transition_count -= 1;
674                        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
675                    }
676                }
677            }
678        }
679    }
680
681    fn fire_ready_general_sync(&mut self, now_ms: f64) {
682        // Populate ready queues from enabled bitmap using summary
683        self.clear_all_ready_queues();
684
685        for s in 0..self.summary_words {
686            let mut summary = self.enabled_word_summary[s];
687            while summary != 0 {
688                let local_w = summary.trailing_zeros() as usize;
689                summary &= summary - 1;
690                let w = (s << bitmap::WORD_SHIFT) | local_w;
691                if w >= self.transition_words {
692                    continue;
693                }
694                let mut word = self.enabled_bitmap[w];
695                while word != 0 {
696                    let bit = word.trailing_zeros() as usize;
697                    let tid = (w << bitmap::WORD_SHIFT) | bit;
698                    word &= word - 1;
699
700                    let enabled_ms = self.enabled_at_ms[tid];
701                    let elapsed = now_ms - enabled_ms;
702
703                    if self.program.earliest_ms[tid] <= elapsed {
704                        self.ready_queue_push(tid);
705                    }
706                }
707            }
708        }
709
710        // Fire from highest priority queue first
711        for pi in 0..self.program.distinct_priority_count {
712            while self.ready_queue_size[pi] > 0 {
713                let tid = self.ready_queue_pop(pi);
714                if !self.is_enabled(tid) {
715                    continue;
716                }
717
718                if self.can_enable(tid) {
719                    self.fire_transition_sync(tid);
720                } else {
721                    self.clear_enabled_bit(tid);
722                    self.enabled_transition_count -= 1;
723                    self.enabled_at_ms[tid] = f64::NEG_INFINITY;
724                }
725            }
726        }
727    }
728
729    // ==================== Opcode-Based Consume ====================
730
731    fn fire_transition_sync(&mut self, tid: usize) {
732        let has_guards = self.program.compiled().has_guards(tid);
733        let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
734        let action = Arc::clone(self.program.transition(tid).action());
735
736        // Reuse pre-allocated input/read buffers (clear entries, keep HashMap capacity)
737        self.reusable_inputs.clear();
738        self.reusable_reads.clear();
739
740        if has_guards {
741            // Fall back to spec-based consumption for guarded transitions
742            let input_specs: Vec<In> = self.program.transition(tid).input_specs().to_vec();
743            let reset_arcs: Vec<_> = self.program.transition(tid).resets().to_vec();
744
745            for in_spec in &input_specs {
746                let pid = self.program.place_id(in_spec.place_name()).unwrap();
747                let place_name_arc = Arc::clone(&self.program.place_name_arcs[pid]);
748                let to_consume = match in_spec {
749                    In::One { .. } => 1,
750                    In::Exactly { count, .. } => *count,
751                    In::All { guard, .. } | In::AtLeast { guard, .. } => {
752                        if guard.is_some() {
753                            self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
754                        } else {
755                            self.token_counts[pid]
756                        }
757                    }
758                };
759
760                for _ in 0..to_consume {
761                    let token = if let Some(guard) = in_spec.guard() {
762                        self.ring_remove_matching(pid, &**guard)
763                    } else {
764                        Some(self.ring_remove_first(pid))
765                    };
766                    if let Some(token) = token {
767                        if E::ENABLED {
768                            self.event_store.append(NetEvent::TokenRemoved {
769                                place_name: Arc::clone(&place_name_arc),
770                                timestamp: now_millis(),
771                            });
772                        }
773                        self.reusable_inputs
774                            .entry(Arc::clone(&place_name_arc))
775                            .or_default()
776                            .push(token);
777                    }
778                }
779            }
780
781            // Reset arcs
782            for arc in &reset_arcs {
783                let pid = self.program.place_id(arc.place.name()).unwrap();
784                let removed = self.ring_remove_all(pid);
785                if E::ENABLED {
786                    for _ in &removed {
787                        self.event_store.append(NetEvent::TokenRemoved {
788                            place_name: Arc::clone(arc.place.name_arc()),
789                            timestamp: now_millis(),
790                        });
791                    }
792                }
793                self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
794                    1u64 << (pid & bitmap::WORD_MASK);
795                self.has_pending_resets = true;
796            }
797        } else {
798            // Fast path: opcode-based consumption (no guards, no clone)
799            let ops_len = self.program.consume_ops[tid].len();
800            let mut pc = 0;
801            while pc < ops_len {
802                let opcode = self.program.consume_ops[tid][pc];
803                pc += 1;
804                match opcode {
805                    CONSUME_ONE => {
806                        let pid = self.program.consume_ops[tid][pc] as usize;
807                        pc += 1;
808                        let token = self.ring_remove_first(pid);
809                        if E::ENABLED {
810                            self.event_store.append(NetEvent::TokenRemoved {
811                                place_name: Arc::clone(&self.program.place_name_arcs[pid]),
812                                timestamp: now_millis(),
813                            });
814                        }
815                        self.reusable_inputs
816                            .entry(Arc::clone(&self.program.place_name_arcs[pid]))
817                            .or_default()
818                            .push(token);
819                    }
820                    CONSUME_N => {
821                        let pid = self.program.consume_ops[tid][pc] as usize;
822                        pc += 1;
823                        let count = self.program.consume_ops[tid][pc] as usize;
824                        pc += 1;
825                        for _ in 0..count {
826                            let token = self.ring_remove_first(pid);
827                            if E::ENABLED {
828                                self.event_store.append(NetEvent::TokenRemoved {
829                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
830                                    timestamp: now_millis(),
831                                });
832                            }
833                            self.reusable_inputs
834                                .entry(Arc::clone(&self.program.place_name_arcs[pid]))
835                                .or_default()
836                                .push(token);
837                        }
838                    }
839                    CONSUME_ALL | CONSUME_ATLEAST => {
840                        let pid = self.program.consume_ops[tid][pc] as usize;
841                        pc += 1;
842                        if opcode == CONSUME_ATLEAST {
843                            pc += 1;
844                        }
845                        let count = self.token_counts[pid];
846                        for _ in 0..count {
847                            let token = self.ring_remove_first(pid);
848                            if E::ENABLED {
849                                self.event_store.append(NetEvent::TokenRemoved {
850                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
851                                    timestamp: now_millis(),
852                                });
853                            }
854                            self.reusable_inputs
855                                .entry(Arc::clone(&self.program.place_name_arcs[pid]))
856                                .or_default()
857                                .push(token);
858                        }
859                    }
860                    RESET => {
861                        let pid = self.program.consume_ops[tid][pc] as usize;
862                        pc += 1;
863                        let count = self.token_counts[pid];
864                        for _ in 0..count {
865                            let _token = self.ring_remove_first(pid);
866                            if E::ENABLED {
867                                self.event_store.append(NetEvent::TokenRemoved {
868                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
869                                    timestamp: now_millis(),
870                                });
871                            }
872                        }
873                        self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
874                            1u64 << (pid & bitmap::WORD_MASK);
875                        self.has_pending_resets = true;
876                    }
877                    _ => unreachable!("Unknown opcode: {opcode}"),
878                }
879            }
880        }
881
882        // Execute read program — iterate by index, no clone of ops Vec
883        let read_ops_len = self.program.read_ops[tid].len();
884        for i in 0..read_ops_len {
885            let rpid = self.program.read_ops[tid][i];
886            let token_clone = self.ring_peek_first(rpid).cloned();
887            if let Some(token) = token_clone {
888                let place_name = Arc::clone(&self.program.place_name_arcs[rpid]);
889                self.reusable_reads
890                    .entry(place_name)
891                    .or_default()
892                    .push(token);
893            }
894        }
895
896        // Update bitmap for consumed/reset places
897        self.update_bitmap_after_consumption(tid);
898
899        if E::ENABLED {
900            self.event_store.append(NetEvent::TransitionStarted {
901                transition_name: Arc::clone(&transition_name),
902                timestamp: now_millis(),
903            });
904        }
905
906        // Create context using precomputed output place names and reusable buffers
907        let inputs = std::mem::take(&mut self.reusable_inputs);
908        let reads = std::mem::take(&mut self.reusable_reads);
909        let mut ctx = TransitionContext::new(
910            Arc::clone(&transition_name),
911            inputs,
912            reads,
913            self.program.output_place_name_sets[tid].clone(),
914            None,
915        );
916
917        let result = action.run_sync(&mut ctx);
918
919        // Reclaim buffers for reuse — keeps HashMap bucket allocations alive
920        let returned_inputs = ctx.take_inputs();
921        let returned_reads = ctx.take_reads();
922
923        match result {
924            Ok(()) => {
925                let outputs = ctx.take_outputs();
926                self.process_outputs(tid, &transition_name, outputs);
927
928                if E::ENABLED {
929                    self.event_store.append(NetEvent::TransitionCompleted {
930                        transition_name: Arc::clone(&transition_name),
931                        timestamp: now_millis(),
932                    });
933                }
934            }
935            Err(err) => {
936                if E::ENABLED {
937                    self.event_store.append(NetEvent::TransitionFailed {
938                        transition_name: Arc::clone(&transition_name),
939                        error: err.message,
940                        timestamp: now_millis(),
941                    });
942                }
943            }
944        }
945
946        // Return reclaimed buffers for next firing
947        self.reusable_inputs = returned_inputs;
948        self.reusable_reads = returned_reads;
949
950        // Clear enabled status
951        self.clear_enabled_bit(tid);
952        self.enabled_transition_count -= 1;
953        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
954
955        // Mark this transition dirty for re-evaluation
956        self.mark_transition_dirty(tid);
957    }
958
959    /// Removes the first token matching `guard` from the ring buffer at `pid`.
960    /// Returns at most one token per call (In::One guard semantics).
961    fn ring_remove_matching(
962        &mut self,
963        pid: usize,
964        guard: &dyn Fn(&dyn std::any::Any) -> bool,
965    ) -> Option<ErasedToken> {
966        let count = self.token_counts[pid];
967        if count == 0 {
968            return None;
969        }
970        let offset = self.place_offset[pid];
971        let head = self.ring_head[pid];
972        let cap = self.ring_capacity[pid];
973
974        // Find first matching token
975        for i in 0..count {
976            let idx = offset + (head + i) % cap;
977            if let Some(token) = &self.token_pool[idx]
978                && guard(token.value.as_ref())
979            {
980                let token = self.token_pool[idx].take().unwrap();
981                // Compact the ring by shifting remaining elements
982                for j in i..count - 1 {
983                    let from = offset + (head + j + 1) % cap;
984                    let to = offset + (head + j) % cap;
985                    self.token_pool[to] = self.token_pool[from].take();
986                }
987                self.token_counts[pid] -= 1;
988                self.ring_tail[pid] = if self.ring_tail[pid] == 0 {
989                    cap - 1
990                } else {
991                    self.ring_tail[pid] - 1
992                };
993                return Some(token);
994            }
995        }
996        None
997    }
998
999    fn process_outputs(
1000        &mut self,
1001        _tid: usize,
1002        _transition_name: &Arc<str>,
1003        outputs: Vec<OutputEntry>,
1004    ) {
1005        for entry in outputs {
1006            if let Some(pid) = self.program.place_id(&entry.place_name) {
1007                self.ring_add_last(pid, entry.token);
1008                self.set_marking_bit(pid);
1009                self.mark_dirty(pid);
1010            }
1011
1012            if E::ENABLED {
1013                self.event_store.append(NetEvent::TokenAdded {
1014                    place_name: Arc::clone(&entry.place_name),
1015                    timestamp: now_millis(),
1016                });
1017            }
1018        }
1019    }
1020
1021    fn update_bitmap_after_consumption(&mut self, tid: usize) {
1022        let n = self.program.compiled().consumption_place_ids(tid).len();
1023        for i in 0..n {
1024            let pid = self.program.compiled().consumption_place_ids(tid)[i];
1025            if self.token_counts[pid] == 0 {
1026                self.clear_marking_bit(pid);
1027            }
1028            self.mark_dirty(pid);
1029        }
1030    }
1031
1032    // ==================== Dirty Set Helpers ====================
1033
1034    fn has_dirty_bits(&self) -> bool {
1035        for &s in &self.dirty_word_summary {
1036            if s != 0 {
1037                return true;
1038            }
1039        }
1040        false
1041    }
1042
1043    fn mark_dirty(&mut self, pid: usize) {
1044        let n = self.program.compiled().affected_transitions(pid).len();
1045        for i in 0..n {
1046            let tid = self.program.compiled().affected_transitions(pid)[i];
1047            self.mark_transition_dirty(tid);
1048        }
1049    }
1050
1051    fn mark_transition_dirty(&mut self, tid: usize) {
1052        let w = tid >> bitmap::WORD_SHIFT;
1053        self.dirty_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
1054        self.dirty_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
1055    }
1056
1057    fn elapsed_ms(&self) -> f64 {
1058        self.start_time.elapsed().as_secs_f64() * 1000.0
1059    }
1060
1061    // ==================== Marking Sync ====================
1062
1063    fn materialize_marking(&self) -> Marking {
1064        let mut marking = Marking::new();
1065        for pid in 0..self.program.place_count() {
1066            let count = self.token_counts[pid];
1067            if count == 0 {
1068                continue;
1069            }
1070            let place_name = self.program.place(pid).name_arc();
1071            let offset = self.place_offset[pid];
1072            let head = self.ring_head[pid];
1073            let cap = self.ring_capacity[pid];
1074            for i in 0..count {
1075                let idx = offset + (head + i) % cap;
1076                if let Some(token) = &self.token_pool[idx] {
1077                    marking.add_erased(place_name, token.clone());
1078                }
1079            }
1080        }
1081        marking
1082    }
1083
1084    /// Runs the executor synchronously and returns the final marking.
1085    fn run_to_completion(&mut self) -> Marking {
1086        self.initialize_marking_bitmap();
1087        self.mark_all_dirty();
1088
1089        if E::ENABLED {
1090            let now = now_millis();
1091            self.event_store.append(NetEvent::ExecutionStarted {
1092                net_name: Arc::from(self.program.net().name()),
1093                timestamp: now,
1094            });
1095        }
1096
1097        loop {
1098            self.update_dirty_transitions();
1099
1100            let cycle_now = self.elapsed_ms();
1101
1102            if self.program.any_deadlines {
1103                self.enforce_deadlines(cycle_now);
1104            }
1105
1106            if self.should_terminate() {
1107                break;
1108            }
1109
1110            if self.program.all_immediate && self.program.all_same_priority {
1111                self.fire_ready_immediate_sync();
1112            } else {
1113                self.fire_ready_general_sync(cycle_now);
1114            }
1115
1116            if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
1117                break;
1118            }
1119        }
1120
1121        if E::ENABLED {
1122            let now = now_millis();
1123            self.event_store.append(NetEvent::ExecutionCompleted {
1124                net_name: Arc::from(self.program.net().name()),
1125                timestamp: now,
1126            });
1127        }
1128
1129        self.materialize_marking()
1130    }
1131}
1132
1133// Async path
1134#[cfg(feature = "tokio")]
1135use crate::environment::ExternalEvent;
1136
1137#[cfg(feature = "tokio")]
1138struct ActionCompletion {
1139    transition_name: Arc<str>,
1140    result: Result<Vec<OutputEntry>, String>,
1141}
1142
1143#[cfg(feature = "tokio")]
1144impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
1145    /// Runs the executor asynchronously with tokio.
1146    pub async fn run_async(
1147        &mut self,
1148        mut event_rx: tokio::sync::mpsc::UnboundedReceiver<ExternalEvent>,
1149    ) -> Marking {
1150        let (completion_tx, mut completion_rx) =
1151            tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
1152
1153        self.initialize_marking_bitmap();
1154        self.mark_all_dirty();
1155
1156        let mut in_flight_count: usize = 0;
1157        let mut event_channel_open = true;
1158
1159        if E::ENABLED {
1160            let now = now_millis();
1161            self.event_store.append(NetEvent::ExecutionStarted {
1162                net_name: Arc::from(self.program.net().name()),
1163                timestamp: now,
1164            });
1165        }
1166
1167        loop {
1168            // Phase 1: Process completed async actions
1169            while let Ok(completion) = completion_rx.try_recv() {
1170                in_flight_count -= 1;
1171                match completion.result {
1172                    Ok(outputs) => {
1173                        self.process_outputs(0, &completion.transition_name, outputs);
1174                        if E::ENABLED {
1175                            self.event_store.append(NetEvent::TransitionCompleted {
1176                                transition_name: Arc::clone(&completion.transition_name),
1177                                timestamp: now_millis(),
1178                            });
1179                        }
1180                    }
1181                    Err(err) => {
1182                        if E::ENABLED {
1183                            self.event_store.append(NetEvent::TransitionFailed {
1184                                transition_name: Arc::clone(&completion.transition_name),
1185                                error: err,
1186                                timestamp: now_millis(),
1187                            });
1188                        }
1189                    }
1190                }
1191            }
1192
1193            // Phase 2: Process external events
1194            while let Ok(event) = event_rx.try_recv() {
1195                if let Some(pid) = self.program.place_id(&event.place_name) {
1196                    self.ring_add_last(pid, event.token);
1197                    self.set_marking_bit(pid);
1198                    self.mark_dirty(pid);
1199                }
1200                if E::ENABLED {
1201                    self.event_store.append(NetEvent::TokenAdded {
1202                        place_name: Arc::clone(&event.place_name),
1203                        timestamp: now_millis(),
1204                    });
1205                }
1206            }
1207
1208            // Phase 3: Update dirty transitions
1209            self.update_dirty_transitions();
1210
1211            // Phase 4: Enforce deadlines
1212            let cycle_now = self.elapsed_ms();
1213            if self.program.any_deadlines {
1214                self.enforce_deadlines(cycle_now);
1215            }
1216
1217            // Termination check
1218            if self.enabled_transition_count == 0
1219                && in_flight_count == 0
1220                && (!self.long_running || !event_channel_open)
1221            {
1222                break;
1223            }
1224
1225            // Phase 5: Fire ready transitions
1226            let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
1227
1228            if fired || self.has_dirty_bits() {
1229                tokio::task::yield_now().await;
1230                continue;
1231            }
1232
1233            // Phase 6: Await work
1234            if in_flight_count == 0 && !self.long_running {
1235                break;
1236            }
1237            if in_flight_count == 0 && !event_channel_open {
1238                break;
1239            }
1240
1241            let timer_ms = self.millis_until_next_timed_transition();
1242
1243            tokio::select! {
1244                Some(completion) = completion_rx.recv() => {
1245                    in_flight_count -= 1;
1246                    match completion.result {
1247                        Ok(outputs) => {
1248                            self.process_outputs(0, &completion.transition_name, outputs);
1249                            if E::ENABLED {
1250                                self.event_store.append(NetEvent::TransitionCompleted {
1251                                    transition_name: Arc::clone(&completion.transition_name),
1252                                    timestamp: now_millis(),
1253                                });
1254                            }
1255                        }
1256                        Err(err) => {
1257                            if E::ENABLED {
1258                                self.event_store.append(NetEvent::TransitionFailed {
1259                                    transition_name: Arc::clone(&completion.transition_name),
1260                                    error: err,
1261                                    timestamp: now_millis(),
1262                                });
1263                            }
1264                        }
1265                    }
1266                }
1267                result = event_rx.recv(), if event_channel_open => {
1268                    match result {
1269                        Some(event) => {
1270                            if let Some(pid) = self.program.place_id(&event.place_name) {
1271                                self.ring_add_last(pid, event.token);
1272                                self.set_marking_bit(pid);
1273                                self.mark_dirty(pid);
1274                            }
1275                            if E::ENABLED {
1276                                self.event_store.append(NetEvent::TokenAdded {
1277                                    place_name: Arc::clone(&event.place_name),
1278                                    timestamp: now_millis(),
1279                                });
1280                            }
1281                        }
1282                        None => {
1283                            event_channel_open = false;
1284                        }
1285                    }
1286                }
1287                _ = tokio::time::sleep(std::time::Duration::from_millis(
1288                    if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
1289                )) => {}
1290            }
1291        }
1292
1293        if E::ENABLED {
1294            let now = now_millis();
1295            self.event_store.append(NetEvent::ExecutionCompleted {
1296                net_name: Arc::from(self.program.net().name()),
1297                timestamp: now,
1298            });
1299        }
1300
1301        self.materialize_marking()
1302    }
1303
1304    fn fire_ready_async(
1305        &mut self,
1306        now_ms: f64,
1307        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1308        in_flight_count: &mut usize,
1309    ) -> bool {
1310        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
1311
1312        for s in 0..self.summary_words {
1313            let mut summary = self.enabled_word_summary[s];
1314            while summary != 0 {
1315                let local_w = summary.trailing_zeros() as usize;
1316                summary &= summary - 1;
1317                let w = (s << bitmap::WORD_SHIFT) | local_w;
1318                if w >= self.transition_words {
1319                    continue;
1320                }
1321                let mut word = self.enabled_bitmap[w];
1322                while word != 0 {
1323                    let bit = word.trailing_zeros() as usize;
1324                    let tid = (w << bitmap::WORD_SHIFT) | bit;
1325                    word &= word - 1;
1326
1327                    let enabled_ms = self.enabled_at_ms[tid];
1328                    let elapsed = now_ms - enabled_ms;
1329                    if self.program.earliest_ms[tid] <= elapsed {
1330                        ready.push((tid, self.program.priorities[tid], enabled_ms));
1331                    }
1332                }
1333            }
1334        }
1335
1336        if ready.is_empty() {
1337            return false;
1338        }
1339
1340        ready.sort_by(|a, b| {
1341            b.1.cmp(&a.1)
1342                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
1343        });
1344
1345        let mut fired_any = false;
1346        for (tid, _, _) in ready {
1347            if self.is_enabled(tid) && self.can_enable(tid) {
1348                self.fire_transition_async(tid, completion_tx, in_flight_count);
1349                fired_any = true;
1350            } else if self.is_enabled(tid) {
1351                self.clear_enabled_bit(tid);
1352                self.enabled_transition_count -= 1;
1353                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1354            }
1355        }
1356        fired_any
1357    }
1358
1359    fn fire_transition_async(
1360        &mut self,
1361        tid: usize,
1362        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1363        in_flight_count: &mut usize,
1364    ) {
1365        let t = self.program.transition(tid);
1366        let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
1367        let input_specs: Vec<In> = t.input_specs().to_vec();
1368        let read_arcs: Vec<_> = t.reads().to_vec();
1369        let reset_arcs: Vec<_> = t.resets().to_vec();
1370        let output_place_names = self.program.output_place_name_sets[tid].clone();
1371        let action = Arc::clone(t.action());
1372        let is_sync = action.is_sync();
1373
1374        // Consume tokens (using spec-based for simplicity in async path)
1375        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1376        for in_spec in &input_specs {
1377            let pid = self.program.place_id(in_spec.place_name()).unwrap();
1378            let to_consume = match in_spec {
1379                In::One { .. } => 1,
1380                In::Exactly { count, .. } => *count,
1381                In::All { guard, .. } | In::AtLeast { guard, .. } => {
1382                    if guard.is_some() {
1383                        self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
1384                    } else {
1385                        self.token_counts[pid]
1386                    }
1387                }
1388            };
1389
1390            let place_name_arc = Arc::clone(in_spec.place().name_arc());
1391            for _ in 0..to_consume {
1392                let token = if let Some(guard) = in_spec.guard() {
1393                    self.ring_remove_matching(pid, &**guard)
1394                } else {
1395                    Some(self.ring_remove_first(pid))
1396                };
1397                if let Some(token) = token {
1398                    if E::ENABLED {
1399                        self.event_store.append(NetEvent::TokenRemoved {
1400                            place_name: Arc::clone(&place_name_arc),
1401                            timestamp: now_millis(),
1402                        });
1403                    }
1404                    inputs
1405                        .entry(Arc::clone(&place_name_arc))
1406                        .or_default()
1407                        .push(token);
1408                }
1409            }
1410        }
1411
1412        // Read arcs
1413        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1414        for arc in &read_arcs {
1415            let rpid = self.program.place_id(arc.place.name()).unwrap();
1416            if let Some(token) = self.ring_peek_first(rpid) {
1417                read_tokens
1418                    .entry(Arc::clone(arc.place.name_arc()))
1419                    .or_default()
1420                    .push(token.clone());
1421            }
1422        }
1423
1424        // Reset arcs
1425        for arc in &reset_arcs {
1426            let pid = self.program.place_id(arc.place.name()).unwrap();
1427            let removed = self.ring_remove_all(pid);
1428            if E::ENABLED {
1429                for _ in &removed {
1430                    self.event_store.append(NetEvent::TokenRemoved {
1431                        place_name: Arc::clone(arc.place.name_arc()),
1432                        timestamp: now_millis(),
1433                    });
1434                }
1435            }
1436            self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
1437                1u64 << (pid & bitmap::WORD_MASK);
1438            self.has_pending_resets = true;
1439        }
1440
1441        self.update_bitmap_after_consumption(tid);
1442
1443        if E::ENABLED {
1444            self.event_store.append(NetEvent::TransitionStarted {
1445                transition_name: Arc::clone(&transition_name),
1446                timestamp: now_millis(),
1447            });
1448        }
1449
1450        // Clear enabled status
1451        self.clear_enabled_bit(tid);
1452        self.enabled_transition_count -= 1;
1453        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1454        self.mark_transition_dirty(tid);
1455
1456        if is_sync {
1457            let mut ctx = TransitionContext::new(
1458                Arc::clone(&transition_name),
1459                inputs,
1460                read_tokens,
1461                output_place_names,
1462                None,
1463            );
1464            let result = action.run_sync(&mut ctx);
1465            match result {
1466                Ok(()) => {
1467                    let outputs = ctx.take_outputs();
1468                    self.process_outputs(tid, &transition_name, outputs);
1469                    if E::ENABLED {
1470                        self.event_store.append(NetEvent::TransitionCompleted {
1471                            transition_name: Arc::clone(&transition_name),
1472                            timestamp: now_millis(),
1473                        });
1474                    }
1475                }
1476                Err(err) => {
1477                    if E::ENABLED {
1478                        self.event_store.append(NetEvent::TransitionFailed {
1479                            transition_name: Arc::clone(&transition_name),
1480                            error: err.message,
1481                            timestamp: now_millis(),
1482                        });
1483                    }
1484                }
1485            }
1486        } else {
1487            *in_flight_count += 1;
1488            let tx = completion_tx.clone();
1489            let name = Arc::clone(&transition_name);
1490            let ctx = TransitionContext::new(
1491                Arc::clone(&transition_name),
1492                inputs,
1493                read_tokens,
1494                output_place_names,
1495                None,
1496            );
1497            tokio::spawn(async move {
1498                let result = action.run_async(ctx).await;
1499                let completion = match result {
1500                    Ok(mut completed_ctx) => ActionCompletion {
1501                        transition_name: Arc::clone(&name),
1502                        result: Ok(completed_ctx.take_outputs()),
1503                    },
1504                    Err(err) => ActionCompletion {
1505                        transition_name: Arc::clone(&name),
1506                        result: Err(err.message),
1507                    },
1508                };
1509                let _ = tx.send(completion);
1510            });
1511        }
1512    }
1513
1514    fn millis_until_next_timed_transition(&self) -> f64 {
1515        let mut min_wait = f64::INFINITY;
1516        let now_ms = self.elapsed_ms();
1517
1518        for s in 0..self.summary_words {
1519            let mut summary = self.enabled_word_summary[s];
1520            while summary != 0 {
1521                let local_w = summary.trailing_zeros() as usize;
1522                summary &= summary - 1;
1523                let w = (s << bitmap::WORD_SHIFT) | local_w;
1524                if w >= self.transition_words {
1525                    continue;
1526                }
1527                let mut word = self.enabled_bitmap[w];
1528                while word != 0 {
1529                    let bit = word.trailing_zeros() as usize;
1530                    let tid = (w << bitmap::WORD_SHIFT) | bit;
1531                    word &= word - 1;
1532
1533                    let elapsed = now_ms - self.enabled_at_ms[tid];
1534                    let remaining_earliest = self.program.earliest_ms[tid] - elapsed;
1535                    if remaining_earliest <= 0.0 {
1536                        return 0.0;
1537                    }
1538                    min_wait = min_wait.min(remaining_earliest);
1539
1540                    if self.program.has_deadline[tid] {
1541                        let remaining_deadline = self.program.latest_ms[tid] - elapsed;
1542                        if remaining_deadline <= 0.0 {
1543                            return 0.0;
1544                        }
1545                        min_wait = min_wait.min(remaining_deadline);
1546                    }
1547                }
1548            }
1549        }
1550
1551        min_wait
1552    }
1553}
1554
1555// ==================== Static Helpers ====================
1556
1557fn grow_ring_static(
1558    token_pool: &mut Vec<Option<ErasedToken>>,
1559    place_offset: &mut [usize],
1560    ring_head: &mut [usize],
1561    ring_tail: &mut [usize],
1562    ring_capacity: &mut [usize],
1563    token_counts: &[usize],
1564    pid: usize,
1565) {
1566    let old_cap = ring_capacity[pid];
1567    let new_cap = old_cap * 2;
1568    let old_offset = place_offset[pid];
1569    let head = ring_head[pid];
1570    let count = token_counts[pid];
1571
1572    // Relocate to end of pool
1573    let new_offset = token_pool.len();
1574    token_pool.resize_with(new_offset + new_cap, || None);
1575
1576    // Copy ring contents linearized
1577    for i in 0..count {
1578        let old_idx = old_offset + (head + i) % old_cap;
1579        token_pool[new_offset + i] = token_pool[old_idx].take();
1580    }
1581
1582    place_offset[pid] = new_offset;
1583    ring_head[pid] = 0;
1584    ring_tail[pid] = count;
1585    ring_capacity[pid] = new_cap;
1586}
1587
1588fn now_millis() -> u64 {
1589    std::time::SystemTime::now()
1590        .duration_since(std::time::UNIX_EPOCH)
1591        .unwrap_or_default()
1592        .as_millis() as u64
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597    use super::*;
1598    use crate::compiled_net::CompiledNet;
1599    use libpetri_core::action::{fork, passthrough, sync_action};
1600    use libpetri_core::input::one;
1601    use libpetri_core::output::out_place;
1602    use libpetri_core::petri_net::PetriNet;
1603    use libpetri_core::place::Place;
1604    use libpetri_core::token::Token;
1605    use libpetri_core::transition::Transition;
1606    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1607
1608    fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1609        let p1 = Place::<i32>::new("p1");
1610        let p2 = Place::<i32>::new("p2");
1611        let p3 = Place::<i32>::new("p3");
1612
1613        let t1 = Transition::builder("t1")
1614            .input(one(&p1))
1615            .output(out_place(&p2))
1616            .action(passthrough())
1617            .build();
1618        let t2 = Transition::builder("t2")
1619            .input(one(&p2))
1620            .output(out_place(&p3))
1621            .action(passthrough())
1622            .build();
1623
1624        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1625        (net, p1, p2, p3)
1626    }
1627
1628    #[test]
1629    fn sync_passthrough_chain() {
1630        let (net, p1, _p2, _p3) = simple_chain();
1631        let compiled = CompiledNet::compile(&net);
1632        let prog = PrecompiledNet::from_compiled(&compiled);
1633
1634        let mut marking = Marking::new();
1635        marking.add(&p1, Token::at(42, 0));
1636
1637        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1638        let result = executor.run_to_completion();
1639
1640        assert_eq!(result.count("p1"), 0);
1641    }
1642
1643    #[test]
1644    fn sync_fork_chain() {
1645        let p1 = Place::<i32>::new("p1");
1646        let p2 = Place::<i32>::new("p2");
1647        let p3 = Place::<i32>::new("p3");
1648
1649        let t1 = Transition::builder("t1")
1650            .input(one(&p1))
1651            .output(libpetri_core::output::and(vec![
1652                out_place(&p2),
1653                out_place(&p3),
1654            ]))
1655            .action(fork())
1656            .build();
1657
1658        let net = PetriNet::builder("fork").transition(t1).build();
1659        let compiled = CompiledNet::compile(&net);
1660        let prog = PrecompiledNet::from_compiled(&compiled);
1661
1662        let mut marking = Marking::new();
1663        marking.add(&p1, Token::at(42, 0));
1664
1665        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1666        let result = executor.run_to_completion();
1667
1668        assert_eq!(result.count("p1"), 0);
1669        assert_eq!(result.count("p2"), 1);
1670        assert_eq!(result.count("p3"), 1);
1671    }
1672
1673    #[test]
1674    fn sync_linear_chain_5() {
1675        let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1676        let transitions: Vec<Transition> = (0..5)
1677            .map(|i| {
1678                Transition::builder(format!("t{i}"))
1679                    .input(one(&places[i]))
1680                    .output(out_place(&places[i + 1]))
1681                    .action(fork())
1682                    .build()
1683            })
1684            .collect();
1685
1686        let net = PetriNet::builder("chain5").transitions(transitions).build();
1687        let compiled = CompiledNet::compile(&net);
1688        let prog = PrecompiledNet::from_compiled(&compiled);
1689
1690        let mut marking = Marking::new();
1691        marking.add(&places[0], Token::at(1, 0));
1692
1693        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1694        let result = executor.run_to_completion();
1695
1696        assert_eq!(result.count("p0"), 0);
1697        assert_eq!(result.count("p5"), 1);
1698    }
1699
1700    #[test]
1701    fn sync_no_initial_tokens() {
1702        let (net, _, _, _) = simple_chain();
1703        let compiled = CompiledNet::compile(&net);
1704        let prog = PrecompiledNet::from_compiled(&compiled);
1705        let marking = Marking::new();
1706        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1707        let result = executor.run_to_completion();
1708        assert_eq!(result.count("p1"), 0);
1709        assert_eq!(result.count("p2"), 0);
1710        assert_eq!(result.count("p3"), 0);
1711    }
1712
1713    #[test]
1714    fn sync_priority_ordering() {
1715        let p = Place::<()>::new("p");
1716        let out_a = Place::<()>::new("a");
1717        let out_b = Place::<()>::new("b");
1718
1719        let t_high = Transition::builder("t_high")
1720            .input(one(&p))
1721            .output(out_place(&out_a))
1722            .action(passthrough())
1723            .priority(10)
1724            .build();
1725        let t_low = Transition::builder("t_low")
1726            .input(one(&p))
1727            .output(out_place(&out_b))
1728            .action(passthrough())
1729            .priority(1)
1730            .build();
1731
1732        let net = PetriNet::builder("priority")
1733            .transitions([t_high, t_low])
1734            .build();
1735        let compiled = CompiledNet::compile(&net);
1736        let prog = PrecompiledNet::from_compiled(&compiled);
1737
1738        let mut marking = Marking::new();
1739        marking.add(&p, Token::at((), 0));
1740
1741        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1742        let result = executor.run_to_completion();
1743
1744        assert_eq!(result.count("p"), 0);
1745    }
1746
1747    #[test]
1748    fn sync_inhibitor_blocks() {
1749        let p1 = Place::<()>::new("p1");
1750        let p2 = Place::<()>::new("p2");
1751        let p_inh = Place::<()>::new("inh");
1752
1753        let t = Transition::builder("t1")
1754            .input(one(&p1))
1755            .output(out_place(&p2))
1756            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1757            .action(passthrough())
1758            .build();
1759
1760        let net = PetriNet::builder("inhibitor").transition(t).build();
1761        let compiled = CompiledNet::compile(&net);
1762        let prog = PrecompiledNet::from_compiled(&compiled);
1763
1764        let mut marking = Marking::new();
1765        marking.add(&p1, Token::at((), 0));
1766        marking.add(&p_inh, Token::at((), 0));
1767
1768        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1769        let result = executor.run_to_completion();
1770
1771        assert_eq!(result.count("p1"), 1);
1772    }
1773
1774    #[test]
1775    fn read_arc_does_not_consume() {
1776        let p_in = Place::<i32>::new("in");
1777        let p_ctx = Place::<i32>::new("ctx");
1778        let p_out = Place::<i32>::new("out");
1779
1780        let t = Transition::builder("t1")
1781            .input(one(&p_in))
1782            .read(libpetri_core::arc::read(&p_ctx))
1783            .output(out_place(&p_out))
1784            .action(sync_action(|ctx| {
1785                let v = ctx.input::<i32>("in")?;
1786                let r = ctx.read::<i32>("ctx")?;
1787                ctx.output("out", *v + *r)?;
1788                Ok(())
1789            }))
1790            .build();
1791        let net = PetriNet::builder("test").transition(t).build();
1792        let compiled = CompiledNet::compile(&net);
1793        let prog = PrecompiledNet::from_compiled(&compiled);
1794
1795        let mut marking = Marking::new();
1796        marking.add(&p_in, Token::at(10, 0));
1797        marking.add(&p_ctx, Token::at(5, 0));
1798
1799        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1800        let result = executor.run_to_completion();
1801
1802        assert_eq!(result.count("in"), 0);
1803        assert_eq!(result.count("ctx"), 1);
1804        assert_eq!(result.count("out"), 1);
1805    }
1806
1807    #[test]
1808    fn reset_arc_removes_all_tokens() {
1809        let p_in = Place::<()>::new("in");
1810        let p_reset = Place::<i32>::new("reset");
1811        let p_out = Place::<()>::new("out");
1812
1813        let t = Transition::builder("t1")
1814            .input(one(&p_in))
1815            .reset(libpetri_core::arc::reset(&p_reset))
1816            .output(out_place(&p_out))
1817            .action(fork())
1818            .build();
1819        let net = PetriNet::builder("test").transition(t).build();
1820        let compiled = CompiledNet::compile(&net);
1821        let prog = PrecompiledNet::from_compiled(&compiled);
1822
1823        let mut marking = Marking::new();
1824        marking.add(&p_in, Token::at((), 0));
1825        marking.add(&p_reset, Token::at(1, 0));
1826        marking.add(&p_reset, Token::at(2, 0));
1827        marking.add(&p_reset, Token::at(3, 0));
1828
1829        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1830        let result = executor.run_to_completion();
1831
1832        assert_eq!(result.count("reset"), 0);
1833        assert_eq!(result.count("out"), 1);
1834    }
1835
1836    #[test]
1837    fn exactly_cardinality_consumes_n() {
1838        let p = Place::<i32>::new("p");
1839        let p_out = Place::<i32>::new("out");
1840
1841        let t = Transition::builder("t1")
1842            .input(libpetri_core::input::exactly(3, &p))
1843            .output(out_place(&p_out))
1844            .action(sync_action(|ctx| {
1845                let vals = ctx.inputs::<i32>("p")?;
1846                for v in vals {
1847                    ctx.output("out", *v)?;
1848                }
1849                Ok(())
1850            }))
1851            .build();
1852        let net = PetriNet::builder("test").transition(t).build();
1853        let compiled = CompiledNet::compile(&net);
1854        let prog = PrecompiledNet::from_compiled(&compiled);
1855
1856        let mut marking = Marking::new();
1857        for i in 0..5 {
1858            marking.add(&p, Token::at(i, 0));
1859        }
1860
1861        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1862        let result = executor.run_to_completion();
1863
1864        assert_eq!(result.count("p"), 2);
1865        assert_eq!(result.count("out"), 3);
1866    }
1867
1868    #[test]
1869    fn all_cardinality_consumes_everything() {
1870        let p = Place::<i32>::new("p");
1871        let p_out = Place::<()>::new("out");
1872
1873        let t = Transition::builder("t1")
1874            .input(libpetri_core::input::all(&p))
1875            .output(out_place(&p_out))
1876            .action(sync_action(|ctx| {
1877                let vals = ctx.inputs::<i32>("p")?;
1878                ctx.output("out", vals.len() as i32)?;
1879                Ok(())
1880            }))
1881            .build();
1882        let net = PetriNet::builder("test").transition(t).build();
1883        let compiled = CompiledNet::compile(&net);
1884        let prog = PrecompiledNet::from_compiled(&compiled);
1885
1886        let mut marking = Marking::new();
1887        for i in 0..5 {
1888            marking.add(&p, Token::at(i, 0));
1889        }
1890
1891        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1892        let result = executor.run_to_completion();
1893
1894        assert_eq!(result.count("p"), 0);
1895    }
1896
1897    #[test]
1898    fn at_least_blocks_insufficient() {
1899        let p = Place::<i32>::new("p");
1900        let p_out = Place::<()>::new("out");
1901
1902        let t = Transition::builder("t1")
1903            .input(libpetri_core::input::at_least(3, &p))
1904            .output(out_place(&p_out))
1905            .action(passthrough())
1906            .build();
1907        let net = PetriNet::builder("test").transition(t).build();
1908        let compiled = CompiledNet::compile(&net);
1909        let prog = PrecompiledNet::from_compiled(&compiled);
1910
1911        let mut marking = Marking::new();
1912        marking.add(&p, Token::at(1, 0));
1913        marking.add(&p, Token::at(2, 0));
1914
1915        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1916        let result = executor.run_to_completion();
1917
1918        assert_eq!(result.count("p"), 2);
1919    }
1920
1921    #[test]
1922    fn at_least_fires_with_enough() {
1923        let p = Place::<i32>::new("p");
1924        let p_out = Place::<()>::new("out");
1925
1926        let t = Transition::builder("t1")
1927            .input(libpetri_core::input::at_least(3, &p))
1928            .output(out_place(&p_out))
1929            .action(passthrough())
1930            .build();
1931        let net = PetriNet::builder("test").transition(t).build();
1932        let compiled = CompiledNet::compile(&net);
1933        let prog = PrecompiledNet::from_compiled(&compiled);
1934
1935        let mut marking = Marking::new();
1936        for i in 0..5 {
1937            marking.add(&p, Token::at(i, 0));
1938        }
1939
1940        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1941        let result = executor.run_to_completion();
1942
1943        assert_eq!(result.count("p"), 0);
1944    }
1945
1946    #[test]
1947    fn guarded_input_only_consumes_matching() {
1948        let p = Place::<i32>::new("p");
1949        let p_out = Place::<i32>::new("out");
1950
1951        let t = Transition::builder("t1")
1952            .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1953            .output(out_place(&p_out))
1954            .action(fork())
1955            .build();
1956        let net = PetriNet::builder("test").transition(t).build();
1957        let compiled = CompiledNet::compile(&net);
1958        let prog = PrecompiledNet::from_compiled(&compiled);
1959
1960        let mut marking = Marking::new();
1961        marking.add(&p, Token::at(3, 0));
1962        marking.add(&p, Token::at(10, 0));
1963
1964        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1965        let result = executor.run_to_completion();
1966
1967        assert_eq!(result.count("p"), 1);
1968        assert_eq!(result.count("out"), 1);
1969    }
1970
1971    #[test]
1972    fn guarded_input_blocks_when_no_match() {
1973        let p = Place::<i32>::new("p");
1974        let p_out = Place::<i32>::new("out");
1975
1976        let t = Transition::builder("t1")
1977            .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1978            .output(out_place(&p_out))
1979            .action(fork())
1980            .build();
1981        let net = PetriNet::builder("test").transition(t).build();
1982        let compiled = CompiledNet::compile(&net);
1983        let prog = PrecompiledNet::from_compiled(&compiled);
1984
1985        let mut marking = Marking::new();
1986        marking.add(&p, Token::at(3, 0));
1987        marking.add(&p, Token::at(10, 0));
1988
1989        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1990        let result = executor.run_to_completion();
1991
1992        assert_eq!(result.count("p"), 2);
1993        assert_eq!(result.count("out"), 0);
1994    }
1995
1996    #[test]
1997    fn event_store_records_lifecycle() {
1998        let p1 = Place::<i32>::new("p1");
1999        let p2 = Place::<i32>::new("p2");
2000        let t = Transition::builder("t1")
2001            .input(one(&p1))
2002            .output(out_place(&p2))
2003            .action(fork())
2004            .build();
2005        let net = PetriNet::builder("test").transition(t).build();
2006        let compiled = CompiledNet::compile(&net);
2007        let prog = PrecompiledNet::from_compiled(&compiled);
2008
2009        let mut marking = Marking::new();
2010        marking.add(&p1, Token::at(1, 0));
2011
2012        let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2013        let _result = executor.run_to_completion();
2014
2015        let events = executor.event_store().events();
2016        assert!(
2017            events
2018                .iter()
2019                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2020        );
2021        assert!(
2022            events
2023                .iter()
2024                .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
2025        );
2026        assert!(
2027            events
2028                .iter()
2029                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2030        );
2031        assert!(
2032            events
2033                .iter()
2034                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2035        );
2036        assert!(
2037            events
2038                .iter()
2039                .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
2040        );
2041        assert!(
2042            events
2043                .iter()
2044                .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
2045        );
2046        assert!(
2047            events
2048                .iter()
2049                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2050        );
2051    }
2052
2053    #[test]
2054    fn action_error_does_not_crash() {
2055        let p_in = Place::<i32>::new("in");
2056        let p_out = Place::<i32>::new("out");
2057
2058        let t = Transition::builder("t1")
2059            .input(one(&p_in))
2060            .output(out_place(&p_out))
2061            .action(sync_action(|_ctx| {
2062                Err(libpetri_core::action::ActionError::new(
2063                    "intentional failure",
2064                ))
2065            }))
2066            .build();
2067        let net = PetriNet::builder("test").transition(t).build();
2068        let compiled = CompiledNet::compile(&net);
2069        let prog = PrecompiledNet::from_compiled(&compiled);
2070
2071        let mut marking = Marking::new();
2072        marking.add(&p_in, Token::at(42, 0));
2073
2074        let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2075        let result = executor.run_to_completion();
2076
2077        assert_eq!(result.count("in"), 0);
2078        assert_eq!(result.count("out"), 0);
2079
2080        let events = executor.event_store().events();
2081        assert!(
2082            events
2083                .iter()
2084                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2085        );
2086    }
2087
2088    #[test]
2089    fn multiple_input_arcs_require_all() {
2090        let p1 = Place::<i32>::new("p1");
2091        let p2 = Place::<i32>::new("p2");
2092        let p3 = Place::<i32>::new("p3");
2093
2094        let t = Transition::builder("t1")
2095            .input(one(&p1))
2096            .input(one(&p2))
2097            .output(out_place(&p3))
2098            .action(sync_action(|ctx| {
2099                ctx.output("p3", 99i32)?;
2100                Ok(())
2101            }))
2102            .build();
2103        let net = PetriNet::builder("test").transition(t).build();
2104        let compiled = CompiledNet::compile(&net);
2105        let prog = PrecompiledNet::from_compiled(&compiled);
2106
2107        // Only p1 has token — should not fire
2108        let mut marking = Marking::new();
2109        marking.add(&p1, Token::at(1, 0));
2110        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2111        let result = executor.run_to_completion();
2112        assert_eq!(result.count("p1"), 1);
2113        assert_eq!(result.count("p3"), 0);
2114
2115        // Both p1 and p2 — should fire
2116        let compiled2 = CompiledNet::compile(&net);
2117        let prog2 = PrecompiledNet::from_compiled(&compiled2);
2118        let mut marking2 = Marking::new();
2119        marking2.add(&p1, Token::at(1, 0));
2120        marking2.add(&p2, Token::at(2, 0));
2121        let mut executor2 = PrecompiledNetExecutor::<NoopEventStore>::new(&prog2, marking2);
2122        let result2 = executor2.run_to_completion();
2123        assert_eq!(result2.count("p1"), 0);
2124        assert_eq!(result2.count("p2"), 0);
2125        assert_eq!(result2.count("p3"), 1);
2126    }
2127
2128    #[test]
2129    fn sync_action_custom_logic() {
2130        let p_in = Place::<i32>::new("in");
2131        let p_out = Place::<String>::new("out");
2132
2133        let t = Transition::builder("t1")
2134            .input(one(&p_in))
2135            .output(out_place(&p_out))
2136            .action(sync_action(|ctx| {
2137                let v = ctx.input::<i32>("in")?;
2138                ctx.output("out", format!("value={v}"))?;
2139                Ok(())
2140            }))
2141            .build();
2142        let net = PetriNet::builder("test").transition(t).build();
2143        let compiled = CompiledNet::compile(&net);
2144        let prog = PrecompiledNet::from_compiled(&compiled);
2145
2146        let mut marking = Marking::new();
2147        marking.add(&p_in, Token::at(42, 0));
2148
2149        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2150        let result = executor.run_to_completion();
2151
2152        assert_eq!(result.count("out"), 1);
2153    }
2154
2155    #[test]
2156    fn transform_action_outputs_to_all_places() {
2157        let p_in = Place::<i32>::new("in");
2158        let p_a = Place::<i32>::new("a");
2159        let p_b = Place::<i32>::new("b");
2160
2161        let t = Transition::builder("t1")
2162            .input(one(&p_in))
2163            .output(libpetri_core::output::and(vec![
2164                out_place(&p_a),
2165                out_place(&p_b),
2166            ]))
2167            .action(libpetri_core::action::transform(|ctx| {
2168                let v = ctx.input::<i32>("in").unwrap();
2169                Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
2170            }))
2171            .build();
2172        let net = PetriNet::builder("test").transition(t).build();
2173        let compiled = CompiledNet::compile(&net);
2174        let prog = PrecompiledNet::from_compiled(&compiled);
2175
2176        let mut marking = Marking::new();
2177        marking.add(&p_in, Token::at(5, 0));
2178
2179        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2180        let result = executor.run_to_completion();
2181
2182        assert_eq!(result.count("a"), 1);
2183        assert_eq!(result.count("b"), 1);
2184    }
2185
2186    #[test]
2187    fn complex_workflow() {
2188        use libpetri_core::output::{and, xor};
2189
2190        let input = Place::<i32>::new("v_input");
2191        let guard_in = Place::<i32>::new("v_guardIn");
2192        let intent_in = Place::<i32>::new("v_intentIn");
2193        let search_in = Place::<i32>::new("v_searchIn");
2194        let output_guard_in = Place::<i32>::new("v_outputGuardIn");
2195        let guard_safe = Place::<i32>::new("v_guardSafe");
2196        let guard_violation = Place::<i32>::new("v_guardViolation");
2197        let _violated = Place::<i32>::new("v_violated");
2198        let intent_ready = Place::<i32>::new("v_intentReady");
2199        let topic_ready = Place::<i32>::new("v_topicReady");
2200        let search_ready = Place::<i32>::new("v_searchReady");
2201        let _output_guard_done = Place::<i32>::new("v_outputGuardDone");
2202        let response = Place::<i32>::new("v_response");
2203
2204        let fork_trans = Transition::builder("Fork")
2205            .input(one(&input))
2206            .output(and(vec![
2207                out_place(&guard_in),
2208                out_place(&intent_in),
2209                out_place(&search_in),
2210                out_place(&output_guard_in),
2211            ]))
2212            .action(fork())
2213            .build();
2214
2215        let guard_trans = Transition::builder("Guard")
2216            .input(one(&guard_in))
2217            .output(xor(vec![
2218                out_place(&guard_safe),
2219                out_place(&guard_violation),
2220            ]))
2221            .action(fork())
2222            .build();
2223
2224        let handle_violation = Transition::builder("HandleViolation")
2225            .input(one(&guard_violation))
2226            .output(out_place(&_violated))
2227            .inhibitor(libpetri_core::arc::inhibitor(&guard_safe))
2228            .action(fork())
2229            .build();
2230
2231        let intent_trans = Transition::builder("Intent")
2232            .input(one(&intent_in))
2233            .output(out_place(&intent_ready))
2234            .action(fork())
2235            .build();
2236
2237        let topic_trans = Transition::builder("TopicKnowledge")
2238            .input(one(&intent_ready))
2239            .output(out_place(&topic_ready))
2240            .action(fork())
2241            .build();
2242
2243        let search_trans = Transition::builder("Search")
2244            .input(one(&search_in))
2245            .output(out_place(&search_ready))
2246            .read(libpetri_core::arc::read(&intent_ready))
2247            .inhibitor(libpetri_core::arc::inhibitor(&guard_violation))
2248            .priority(-5)
2249            .action(fork())
2250            .build();
2251
2252        let output_guard_trans = Transition::builder("OutputGuard")
2253            .input(one(&output_guard_in))
2254            .output(out_place(&_output_guard_done))
2255            .read(libpetri_core::arc::read(&guard_safe))
2256            .action(fork())
2257            .build();
2258
2259        let compose_trans = Transition::builder("Compose")
2260            .input(one(&guard_safe))
2261            .input(one(&search_ready))
2262            .input(one(&topic_ready))
2263            .output(out_place(&response))
2264            .priority(10)
2265            .action(fork())
2266            .build();
2267
2268        let net = PetriNet::builder("ComplexWorkflow")
2269            .transition(fork_trans)
2270            .transition(guard_trans)
2271            .transition(handle_violation)
2272            .transition(intent_trans)
2273            .transition(topic_trans)
2274            .transition(search_trans)
2275            .transition(output_guard_trans)
2276            .transition(compose_trans)
2277            .build();
2278
2279        let compiled = CompiledNet::compile(&net);
2280        let prog = PrecompiledNet::from_compiled(&compiled);
2281
2282        let mut marking = Marking::new();
2283        marking.add(&input, Token::at(1, 0));
2284
2285        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2286        let result = executor.run_to_completion();
2287
2288        // fork() produces to ALL output places, including both XOR branches.
2289        // This means guard_safe AND guard_violation both get tokens.
2290        // Search is inhibited by guard_violation, so it deadlocks.
2291        // The important thing is the executor doesn't crash and terminates.
2292        assert_eq!(result.count("v_input"), 0); // consumed by Fork
2293    }
2294
2295    #[cfg(feature = "tokio")]
2296    mod async_tests {
2297        use super::*;
2298        use libpetri_core::action::async_action;
2299        use libpetri_core::petri_net::PetriNet;
2300
2301        #[tokio::test]
2302        async fn async_linear_chain() {
2303            let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
2304            let transitions: Vec<Transition> = (0..5)
2305                .map(|i| {
2306                    Transition::builder(format!("t{i}"))
2307                        .input(one(&places[i]))
2308                        .output(out_place(&places[i + 1]))
2309                        .action(fork())
2310                        .build()
2311                })
2312                .collect();
2313
2314            let net = PetriNet::builder("chain5").transitions(transitions).build();
2315            let compiled = CompiledNet::compile(&net);
2316            let prog = PrecompiledNet::from_compiled(&compiled);
2317
2318            let mut marking = Marking::new();
2319            marking.add(&places[0], Token::at(1, 0));
2320
2321            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2322            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2323            let result = executor.run_async(rx).await;
2324
2325            assert_eq!(result.count("p0"), 0);
2326            assert_eq!(result.count("p5"), 1);
2327        }
2328
2329        #[tokio::test]
2330        async fn async_action_execution() {
2331            let p1 = Place::<i32>::new("p1");
2332            let p2 = Place::<i32>::new("p2");
2333
2334            let t = Transition::builder("t1")
2335                .input(one(&p1))
2336                .output(out_place(&p2))
2337                .action(async_action(|ctx| async { Ok(ctx) }))
2338                .build();
2339
2340            let net = PetriNet::builder("async_test").transition(t).build();
2341            let compiled = CompiledNet::compile(&net);
2342            let prog = PrecompiledNet::from_compiled(&compiled);
2343
2344            let mut marking = Marking::new();
2345            marking.add(&p1, Token::at(42, 0));
2346
2347            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2348            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2349            let result = executor.run_async(rx).await;
2350
2351            assert_eq!(result.count("p1"), 0);
2352        }
2353    }
2354}