Skip to main content

libpetri_runtime/
precompiled_executor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::token::ErasedToken;
8
9use libpetri_event::event_store::EventStore;
10use libpetri_event::net_event::NetEvent;
11
12use crate::bitmap;
13use crate::marking::Marking;
14use crate::precompiled_net::{
15    CONSUME_ALL, CONSUME_ATLEAST, CONSUME_N, CONSUME_ONE, PrecompiledNet, RESET,
16};
17
18/// Tolerance for deadline enforcement to account for timer jitter.
19const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21/// Initial capacity for ring buffer per place.
22const INITIAL_RING_CAPACITY: usize = 4;
23
24/// High-performance precompiled flat-array Petri net executor.
25///
26/// Uses ring-buffer token storage, opcode-based consume operations,
27/// priority-partitioned ready queues, and two-level summary bitmaps
28/// for sparse iteration. Generic over `E: EventStore` for zero-cost
29/// noop event recording.
30pub struct PrecompiledNetExecutor<'a, E: EventStore> {
31    program: &'a PrecompiledNet<'a>,
32    event_store: E,
33    #[allow(dead_code)]
34    environment_places: HashSet<Arc<str>>,
35    has_environment_places: bool,
36    #[allow(dead_code)]
37    skip_output_validation: bool,
38
39    // ==================== Flat Token Pool ====================
40    token_pool: Vec<Option<ErasedToken>>,
41    place_offset: Vec<usize>,
42    token_counts: Vec<usize>,
43    ring_head: Vec<usize>,
44    ring_tail: Vec<usize>,
45    ring_capacity: Vec<usize>,
46
47    // ==================== Presence Bitmap ====================
48    marking_bitmap: Vec<u64>,
49
50    // ==================== Transition State ====================
51    enabled_bitmap: Vec<u64>,
52    dirty_bitmap: Vec<u64>,
53    dirty_scan_buffer: Vec<u64>,
54    enabled_at_ms: Vec<f64>,
55    enabled_transition_count: usize,
56
57    // ==================== Summary Bitmaps (two-level) ====================
58    dirty_word_summary: Vec<u64>,
59    enabled_word_summary: Vec<u64>,
60    transition_words: usize,
61    summary_words: usize,
62
63    // ==================== Priority-Partitioned Ready Queues ====================
64    ready_queues: Vec<Vec<usize>>,
65    ready_queue_head: Vec<usize>,
66    ready_queue_tail: Vec<usize>,
67    ready_queue_size: Vec<usize>,
68
69    // ==================== Reset-Clock Detection ====================
70    pending_reset_words: Vec<u64>,
71    has_pending_resets: bool,
72
73    // ==================== Reusable Buffers (avoid per-firing allocation) ====================
74    reusable_inputs: HashMap<Arc<str>, Vec<ErasedToken>>,
75    reusable_reads: HashMap<Arc<str>, Vec<ErasedToken>>,
76
77    // ==================== Lifecycle ====================
78    start_time: Instant,
79}
80
81/// Builder for PrecompiledNetExecutor.
82pub struct PrecompiledExecutorBuilder<'a, E: EventStore> {
83    program: &'a PrecompiledNet<'a>,
84    initial_marking: Marking,
85    event_store: Option<E>,
86    environment_places: HashSet<Arc<str>>,
87    skip_output_validation: bool,
88}
89
90impl<'a, E: EventStore> PrecompiledExecutorBuilder<'a, E> {
91    /// Sets the event store.
92    pub fn event_store(mut self, store: E) -> Self {
93        self.event_store = Some(store);
94        self
95    }
96
97    /// Sets the environment places.
98    pub fn environment_places(mut self, places: HashSet<Arc<str>>) -> Self {
99        self.environment_places = places;
100        self
101    }
102
103    /// Skips output validation for trusted transition actions.
104    pub fn skip_output_validation(mut self, skip: bool) -> Self {
105        self.skip_output_validation = skip;
106        self
107    }
108
109    /// Builds the executor.
110    pub fn build(self) -> PrecompiledNetExecutor<'a, E> {
111        PrecompiledNetExecutor::new_inner(
112            self.program,
113            self.initial_marking,
114            self.event_store.unwrap_or_default(),
115            self.environment_places,
116            self.skip_output_validation,
117        )
118    }
119}
120
121impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
122    /// Creates a builder for a PrecompiledNetExecutor.
123    pub fn builder(
124        program: &'a PrecompiledNet<'a>,
125        initial_marking: Marking,
126    ) -> PrecompiledExecutorBuilder<'a, E> {
127        PrecompiledExecutorBuilder {
128            program,
129            initial_marking,
130            event_store: None,
131            environment_places: HashSet::new(),
132            skip_output_validation: false,
133        }
134    }
135
136    /// Creates a new executor with default options.
137    pub fn new(program: &'a PrecompiledNet<'a>, initial_marking: Marking) -> Self {
138        Self::new_inner(
139            program,
140            initial_marking,
141            E::default(),
142            HashSet::new(),
143            false,
144        )
145    }
146
147    fn new_inner(
148        program: &'a PrecompiledNet<'a>,
149        initial_marking: Marking,
150        event_store: E,
151        environment_places: HashSet<Arc<str>>,
152        skip_output_validation: bool,
153    ) -> Self {
154        let pc = program.place_count();
155        let tc = program.transition_count();
156        let wc = program.word_count();
157
158        // Initialize flat token pool
159        let total_slots = pc * INITIAL_RING_CAPACITY;
160        let mut token_pool = vec![None; total_slots];
161        let mut place_offset = vec![0usize; pc];
162        let mut token_counts = vec![0usize; pc];
163        let mut ring_head = vec![0usize; pc];
164        let mut ring_tail = vec![0usize; pc];
165        let mut ring_capacity = vec![INITIAL_RING_CAPACITY; pc];
166
167        for (pid, offset) in place_offset.iter_mut().enumerate() {
168            *offset = pid * INITIAL_RING_CAPACITY;
169        }
170
171        // Load initial tokens into ring buffers
172        for pid in 0..pc {
173            let place = program.place(pid);
174            if let Some(queue) = initial_marking.queue(place.name()) {
175                for token in queue {
176                    // Ring add last
177                    if token_counts[pid] == ring_capacity[pid] {
178                        grow_ring_static(
179                            &mut token_pool,
180                            &mut place_offset,
181                            &mut ring_head,
182                            &mut ring_tail,
183                            &mut ring_capacity,
184                            &token_counts,
185                            pid,
186                        );
187                    }
188                    let tail = ring_tail[pid];
189                    let offset = place_offset[pid];
190                    token_pool[offset + tail] = Some(token.clone());
191                    ring_tail[pid] = (tail + 1) % ring_capacity[pid];
192                    token_counts[pid] += 1;
193                }
194            }
195        }
196
197        // Transition bitmaps
198        let transition_words = bitmap::word_count(tc);
199        let summary_words = bitmap::word_count(transition_words);
200
201        // Priority-partitioned ready queues
202        let prio_count = program.distinct_priority_count;
203        let queue_cap = tc.max(4);
204        let ready_queues = vec![vec![0usize; queue_cap]; prio_count];
205        let ready_queue_head = vec![0usize; prio_count];
206        let ready_queue_tail = vec![0usize; prio_count];
207        let ready_queue_size = vec![0usize; prio_count];
208
209        Self {
210            program,
211            event_store,
212            has_environment_places: !environment_places.is_empty(),
213            environment_places,
214            skip_output_validation,
215            token_pool,
216            place_offset,
217            token_counts,
218            ring_head,
219            ring_tail,
220            ring_capacity,
221            marking_bitmap: vec![0u64; wc],
222            enabled_bitmap: vec![0u64; transition_words],
223            dirty_bitmap: vec![0u64; transition_words],
224            dirty_scan_buffer: vec![0u64; transition_words],
225            enabled_at_ms: vec![f64::NEG_INFINITY; tc],
226            enabled_transition_count: 0,
227            dirty_word_summary: vec![0u64; summary_words],
228            enabled_word_summary: vec![0u64; summary_words],
229            transition_words,
230            summary_words,
231            ready_queues,
232            ready_queue_head,
233            ready_queue_tail,
234            ready_queue_size,
235            pending_reset_words: vec![0u64; wc],
236            has_pending_resets: false,
237            reusable_inputs: HashMap::new(),
238            reusable_reads: HashMap::new(),
239            start_time: Instant::now(),
240        }
241    }
242
243    // ==================== Public API ====================
244
245    /// Runs the executor synchronously until completion.
246    /// Returns the final marking (materialized from ring buffers).
247    pub fn run_sync(&mut self) -> Marking {
248        self.run_to_completion()
249    }
250
251    /// Returns the materialized marking (synced from ring buffers).
252    pub fn marking(&self) -> Marking {
253        self.materialize_marking()
254    }
255
256    /// Returns a reference to the event store.
257    pub fn event_store(&self) -> &E {
258        &self.event_store
259    }
260
261    /// Returns true if the executor is quiescent.
262    pub fn is_quiescent(&self) -> bool {
263        self.enabled_transition_count == 0
264    }
265
266    // ==================== Ring Buffer Operations ====================
267
268    #[inline]
269    fn ring_remove_first(&mut self, pid: usize) -> ErasedToken {
270        let head = self.ring_head[pid];
271        let offset = self.place_offset[pid];
272        let token = self.token_pool[offset + head].take().unwrap();
273        self.ring_head[pid] = (head + 1) % self.ring_capacity[pid];
274        self.token_counts[pid] -= 1;
275        token
276    }
277
278    #[inline]
279    fn ring_add_last(&mut self, pid: usize, token: ErasedToken) {
280        if self.token_counts[pid] == self.ring_capacity[pid] {
281            self.grow_ring(pid);
282        }
283        let tail = self.ring_tail[pid];
284        let offset = self.place_offset[pid];
285        self.token_pool[offset + tail] = Some(token);
286        self.ring_tail[pid] = (tail + 1) % self.ring_capacity[pid];
287        self.token_counts[pid] += 1;
288    }
289
290    #[inline]
291    fn ring_peek_first(&self, pid: usize) -> Option<&ErasedToken> {
292        if self.token_counts[pid] == 0 {
293            return None;
294        }
295        self.token_pool[self.place_offset[pid] + self.ring_head[pid]].as_ref()
296    }
297
298    fn ring_remove_all(&mut self, pid: usize) -> Vec<ErasedToken> {
299        let count = self.token_counts[pid];
300        if count == 0 {
301            return Vec::new();
302        }
303        let mut result = Vec::with_capacity(count);
304        for _ in 0..count {
305            result.push(self.ring_remove_first(pid));
306        }
307        result
308    }
309
310    fn grow_ring(&mut self, pid: usize) {
311        grow_ring_static(
312            &mut self.token_pool,
313            &mut self.place_offset,
314            &mut self.ring_head,
315            &mut self.ring_tail,
316            &mut self.ring_capacity,
317            &self.token_counts,
318            pid,
319        );
320    }
321
322    // ==================== Bitmap Helpers ====================
323
324    #[inline]
325    fn set_enabled_bit(&mut self, tid: usize) {
326        let w = tid >> bitmap::WORD_SHIFT;
327        self.enabled_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
328        self.enabled_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
329    }
330
331    #[inline]
332    fn clear_enabled_bit(&mut self, tid: usize) {
333        let w = tid >> bitmap::WORD_SHIFT;
334        self.enabled_bitmap[w] &= !(1u64 << (tid & bitmap::WORD_MASK));
335        if self.enabled_bitmap[w] == 0 {
336            self.enabled_word_summary[w >> bitmap::WORD_SHIFT] &=
337                !(1u64 << (w & bitmap::WORD_MASK));
338        }
339    }
340
341    #[inline]
342    fn is_enabled(&self, tid: usize) -> bool {
343        (self.enabled_bitmap[tid >> bitmap::WORD_SHIFT] & (1u64 << (tid & bitmap::WORD_MASK))) != 0
344    }
345
346    #[inline]
347    fn set_marking_bit(&mut self, pid: usize) {
348        bitmap::set_bit(&mut self.marking_bitmap, pid);
349    }
350
351    #[inline]
352    fn clear_marking_bit(&mut self, pid: usize) {
353        bitmap::clear_bit(&mut self.marking_bitmap, pid);
354    }
355
356    // ==================== Ready Queue Operations ====================
357
358    fn ready_queue_push(&mut self, tid: usize) {
359        let pi = self.program.transition_to_priority_index[tid];
360        if self.ready_queue_size[pi] == self.ready_queues[pi].len() {
361            let old_cap = self.ready_queues[pi].len();
362            let new_cap = old_cap * 2;
363            let mut new_queue = vec![0usize; new_cap];
364            let head = self.ready_queue_head[pi];
365            for (i, slot) in new_queue.iter_mut().enumerate().take(old_cap) {
366                *slot = self.ready_queues[pi][(head + i) % old_cap];
367            }
368            self.ready_queues[pi] = new_queue;
369            self.ready_queue_head[pi] = 0;
370            self.ready_queue_tail[pi] = old_cap;
371        }
372        let tail = self.ready_queue_tail[pi];
373        self.ready_queues[pi][tail] = tid;
374        self.ready_queue_tail[pi] = (tail + 1) % self.ready_queues[pi].len();
375        self.ready_queue_size[pi] += 1;
376    }
377
378    fn ready_queue_pop(&mut self, pi: usize) -> usize {
379        let head = self.ready_queue_head[pi];
380        let tid = self.ready_queues[pi][head];
381        self.ready_queue_head[pi] = (head + 1) % self.ready_queues[pi].len();
382        self.ready_queue_size[pi] -= 1;
383        tid
384    }
385
386    fn clear_all_ready_queues(&mut self) {
387        for pi in 0..self.program.distinct_priority_count {
388            self.ready_queue_head[pi] = 0;
389            self.ready_queue_tail[pi] = 0;
390            self.ready_queue_size[pi] = 0;
391        }
392    }
393
394    // ==================== Initialize ====================
395
396    fn initialize_marking_bitmap(&mut self) {
397        for pid in 0..self.program.place_count() {
398            if self.token_counts[pid] > 0 {
399                self.set_marking_bit(pid);
400            }
401        }
402    }
403
404    fn mark_all_dirty(&mut self) {
405        let tc = self.program.transition_count();
406        let last_word_bits = tc & bitmap::WORD_MASK;
407        for w in 0..self.transition_words.saturating_sub(1) {
408            self.dirty_bitmap[w] = u64::MAX;
409        }
410        if self.transition_words > 0 {
411            self.dirty_bitmap[self.transition_words - 1] = if last_word_bits == 0 {
412                u64::MAX
413            } else {
414                (1u64 << last_word_bits) - 1
415            };
416        }
417        // Set all summary bits
418        for s in 0..self.summary_words {
419            let first_w = s << bitmap::WORD_SHIFT;
420            let last_w = (first_w + bitmap::WORD_MASK).min(self.transition_words.saturating_sub(1));
421            let count = last_w - first_w + 1;
422            let last_bits = count & bitmap::WORD_MASK;
423            self.dirty_word_summary[s] = if last_bits == 0 {
424                u64::MAX
425            } else {
426                (1u64 << last_bits) - 1
427            };
428        }
429    }
430
431    fn should_terminate(&self) -> bool {
432        if self.has_environment_places {
433            return false;
434        }
435        self.enabled_transition_count == 0
436    }
437
438    // ==================== Dirty Set Processing ====================
439
440    fn update_dirty_transitions(&mut self) {
441        let now_ms = self.elapsed_ms();
442
443        // Snapshot and clear dirty bitmap using summary
444        for s in 0..self.summary_words {
445            let mut summary = self.dirty_word_summary[s];
446            self.dirty_word_summary[s] = 0;
447            while summary != 0 {
448                let local_w = summary.trailing_zeros() as usize;
449                summary &= summary - 1;
450                let w = (s << bitmap::WORD_SHIFT) | local_w;
451                if w < self.transition_words {
452                    self.dirty_scan_buffer[w] = self.dirty_bitmap[w];
453                    self.dirty_bitmap[w] = 0;
454                }
455            }
456        }
457
458        // Process dirty transitions
459        let tc = self.program.transition_count();
460        for w in 0..self.transition_words {
461            let mut word = self.dirty_scan_buffer[w];
462            if word == 0 {
463                continue;
464            }
465            self.dirty_scan_buffer[w] = 0;
466            while word != 0 {
467                let bit = word.trailing_zeros() as usize;
468                let tid = (w << bitmap::WORD_SHIFT) | bit;
469                word &= word - 1;
470
471                if tid >= tc {
472                    break;
473                }
474
475                let was_enabled = self.is_enabled(tid);
476                let can_now = self.can_enable(tid);
477
478                if can_now && !was_enabled {
479                    self.set_enabled_bit(tid);
480                    self.enabled_transition_count += 1;
481                    self.enabled_at_ms[tid] = now_ms;
482
483                    if E::ENABLED {
484                        self.event_store.append(NetEvent::TransitionEnabled {
485                            transition_name: Arc::clone(self.program.transition(tid).name_arc()),
486                            timestamp: now_millis(),
487                        });
488                    }
489                } else if !can_now && was_enabled {
490                    self.clear_enabled_bit(tid);
491                    self.enabled_transition_count -= 1;
492                    self.enabled_at_ms[tid] = f64::NEG_INFINITY;
493                } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
494                    self.enabled_at_ms[tid] = now_ms;
495                    if E::ENABLED {
496                        self.event_store.append(NetEvent::TransitionClockRestarted {
497                            transition_name: Arc::clone(self.program.transition(tid).name_arc()),
498                            timestamp: now_millis(),
499                        });
500                    }
501                }
502            }
503        }
504
505        self.clear_pending_resets();
506    }
507
508    fn can_enable(&self, tid: usize) -> bool {
509        if !self.program.can_enable_bitmap(tid, &self.marking_bitmap) {
510            return false;
511        }
512
513        // Cardinality check using token_counts directly (no HashMap lookup)
514        if let Some(card_check) = self.program.compiled().cardinality_check(tid) {
515            for i in 0..card_check.place_ids.len() {
516                let pid = card_check.place_ids[i];
517                let required = card_check.required_counts[i];
518                if self.token_counts[pid] < required {
519                    return false;
520                }
521            }
522        }
523
524        // Guard check (needs token access)
525        if self.program.compiled().has_guards(tid) {
526            let t = self.program.transition(tid);
527            for spec in t.input_specs() {
528                if let Some(guard) = spec.guard() {
529                    let required = match spec {
530                        In::One { .. } => 1,
531                        In::Exactly { count, .. } => *count,
532                        In::AtLeast { minimum, .. } => *minimum,
533                        In::All { .. } => 1,
534                    };
535                    let pid = self.program.place_id(spec.place_name()).unwrap();
536                    let count = self.count_matching_in_ring(pid, &**guard);
537                    if count < required {
538                        return false;
539                    }
540                }
541            }
542        }
543
544        true
545    }
546
547    fn count_matching_in_ring(
548        &self,
549        pid: usize,
550        guard: &dyn Fn(&dyn std::any::Any) -> bool,
551    ) -> usize {
552        let count = self.token_counts[pid];
553        if count == 0 {
554            return 0;
555        }
556        let offset = self.place_offset[pid];
557        let head = self.ring_head[pid];
558        let cap = self.ring_capacity[pid];
559        let mut matched = 0;
560        for i in 0..count {
561            let idx = offset + (head + i) % cap;
562            if let Some(token) = &self.token_pool[idx]
563                && guard(token.value.as_ref())
564            {
565                matched += 1;
566            }
567        }
568        matched
569    }
570
571    fn has_input_from_reset_place(&self, tid: usize) -> bool {
572        if !self.has_pending_resets {
573            return false;
574        }
575        let input_mask = &self.program.input_place_mask_words[tid];
576        for (im, pr) in input_mask.iter().zip(self.pending_reset_words.iter()) {
577            if (im & pr) != 0 {
578                return true;
579            }
580        }
581        false
582    }
583
584    fn clear_pending_resets(&mut self) {
585        if self.has_pending_resets {
586            for w in &mut self.pending_reset_words {
587                *w = 0;
588            }
589            self.has_pending_resets = false;
590        }
591    }
592
593    // ==================== Deadline Enforcement ====================
594
595    fn enforce_deadlines(&mut self, now_ms: f64) {
596        for s in 0..self.summary_words {
597            let mut summary = self.enabled_word_summary[s];
598            while summary != 0 {
599                let local_w = summary.trailing_zeros() as usize;
600                summary &= summary - 1;
601                let w = (s << bitmap::WORD_SHIFT) | local_w;
602                if w >= self.transition_words {
603                    continue;
604                }
605                let mut word = self.enabled_bitmap[w];
606                while word != 0 {
607                    let bit = word.trailing_zeros() as usize;
608                    let tid = (w << bitmap::WORD_SHIFT) | bit;
609                    word &= word - 1;
610
611                    if !self.program.has_deadline[tid] {
612                        continue;
613                    }
614
615                    let elapsed = now_ms - self.enabled_at_ms[tid];
616                    let latest_ms = self.program.latest_ms[tid];
617
618                    if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
619                        self.clear_enabled_bit(tid);
620                        self.enabled_transition_count -= 1;
621                        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
622                        self.mark_transition_dirty(tid);
623
624                        if E::ENABLED {
625                            self.event_store.append(NetEvent::TransitionTimedOut {
626                                transition_name: Arc::clone(
627                                    self.program.transition(tid).name_arc(),
628                                ),
629                                timestamp: now_millis(),
630                            });
631                        }
632                    }
633                }
634            }
635        }
636    }
637
638    // ==================== Firing (Sync) ====================
639
640    fn fire_ready_immediate_sync(&mut self) {
641        for s in 0..self.summary_words {
642            let mut summary = self.enabled_word_summary[s];
643            while summary != 0 {
644                let local_w = summary.trailing_zeros() as usize;
645                summary &= summary - 1;
646                let w = (s << bitmap::WORD_SHIFT) | local_w;
647                if w >= self.transition_words {
648                    continue;
649                }
650                let word = self.enabled_bitmap[w];
651                let mut remaining = word;
652                while remaining != 0 {
653                    let bit = remaining.trailing_zeros() as usize;
654                    let tid = (w << bitmap::WORD_SHIFT) | bit;
655                    remaining &= remaining - 1;
656
657                    if self.can_enable(tid) {
658                        self.fire_transition_sync(tid);
659                    } else {
660                        self.clear_enabled_bit(tid);
661                        self.enabled_transition_count -= 1;
662                        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
663                    }
664                }
665            }
666        }
667    }
668
669    fn fire_ready_general_sync(&mut self, now_ms: f64) {
670        // Populate ready queues from enabled bitmap using summary
671        self.clear_all_ready_queues();
672
673        for s in 0..self.summary_words {
674            let mut summary = self.enabled_word_summary[s];
675            while summary != 0 {
676                let local_w = summary.trailing_zeros() as usize;
677                summary &= summary - 1;
678                let w = (s << bitmap::WORD_SHIFT) | local_w;
679                if w >= self.transition_words {
680                    continue;
681                }
682                let mut word = self.enabled_bitmap[w];
683                while word != 0 {
684                    let bit = word.trailing_zeros() as usize;
685                    let tid = (w << bitmap::WORD_SHIFT) | bit;
686                    word &= word - 1;
687
688                    let enabled_ms = self.enabled_at_ms[tid];
689                    let elapsed = now_ms - enabled_ms;
690
691                    if self.program.earliest_ms[tid] <= elapsed {
692                        self.ready_queue_push(tid);
693                    }
694                }
695            }
696        }
697
698        // Fire from highest priority queue first
699        for pi in 0..self.program.distinct_priority_count {
700            while self.ready_queue_size[pi] > 0 {
701                let tid = self.ready_queue_pop(pi);
702                if !self.is_enabled(tid) {
703                    continue;
704                }
705
706                if self.can_enable(tid) {
707                    self.fire_transition_sync(tid);
708                } else {
709                    self.clear_enabled_bit(tid);
710                    self.enabled_transition_count -= 1;
711                    self.enabled_at_ms[tid] = f64::NEG_INFINITY;
712                }
713            }
714        }
715    }
716
717    // ==================== Opcode-Based Consume ====================
718
719    fn fire_transition_sync(&mut self, tid: usize) {
720        let has_guards = self.program.compiled().has_guards(tid);
721        let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
722        let action = Arc::clone(self.program.transition(tid).action());
723
724        // Reuse pre-allocated input/read buffers (clear entries, keep HashMap capacity)
725        self.reusable_inputs.clear();
726        self.reusable_reads.clear();
727
728        if has_guards {
729            // Fall back to spec-based consumption for guarded transitions
730            let input_specs: Vec<In> = self.program.transition(tid).input_specs().to_vec();
731            let reset_arcs: Vec<_> = self.program.transition(tid).resets().to_vec();
732
733            for in_spec in &input_specs {
734                let pid = self.program.place_id(in_spec.place_name()).unwrap();
735                let place_name_arc = Arc::clone(&self.program.place_name_arcs[pid]);
736                let to_consume = match in_spec {
737                    In::One { .. } => 1,
738                    In::Exactly { count, .. } => *count,
739                    In::All { guard, .. } | In::AtLeast { guard, .. } => {
740                        if guard.is_some() {
741                            self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
742                        } else {
743                            self.token_counts[pid]
744                        }
745                    }
746                };
747
748                for _ in 0..to_consume {
749                    let token = if let Some(guard) = in_spec.guard() {
750                        self.ring_remove_matching(pid, &**guard)
751                    } else {
752                        Some(self.ring_remove_first(pid))
753                    };
754                    if let Some(token) = token {
755                        if E::ENABLED {
756                            self.event_store.append(NetEvent::TokenRemoved {
757                                place_name: Arc::clone(&place_name_arc),
758                                timestamp: now_millis(),
759                            });
760                        }
761                        self.reusable_inputs
762                            .entry(Arc::clone(&place_name_arc))
763                            .or_default()
764                            .push(token);
765                    }
766                }
767            }
768
769            // Reset arcs
770            for arc in &reset_arcs {
771                let pid = self.program.place_id(arc.place.name()).unwrap();
772                let removed = self.ring_remove_all(pid);
773                if E::ENABLED {
774                    for _ in &removed {
775                        self.event_store.append(NetEvent::TokenRemoved {
776                            place_name: Arc::clone(arc.place.name_arc()),
777                            timestamp: now_millis(),
778                        });
779                    }
780                }
781                self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
782                    1u64 << (pid & bitmap::WORD_MASK);
783                self.has_pending_resets = true;
784            }
785        } else {
786            // Fast path: opcode-based consumption (no guards, no clone)
787            let ops_len = self.program.consume_ops[tid].len();
788            let mut pc = 0;
789            while pc < ops_len {
790                let opcode = self.program.consume_ops[tid][pc];
791                pc += 1;
792                match opcode {
793                    CONSUME_ONE => {
794                        let pid = self.program.consume_ops[tid][pc] as usize;
795                        pc += 1;
796                        let token = self.ring_remove_first(pid);
797                        if E::ENABLED {
798                            self.event_store.append(NetEvent::TokenRemoved {
799                                place_name: Arc::clone(&self.program.place_name_arcs[pid]),
800                                timestamp: now_millis(),
801                            });
802                        }
803                        self.reusable_inputs
804                            .entry(Arc::clone(&self.program.place_name_arcs[pid]))
805                            .or_default()
806                            .push(token);
807                    }
808                    CONSUME_N => {
809                        let pid = self.program.consume_ops[tid][pc] as usize;
810                        pc += 1;
811                        let count = self.program.consume_ops[tid][pc] as usize;
812                        pc += 1;
813                        for _ in 0..count {
814                            let token = self.ring_remove_first(pid);
815                            if E::ENABLED {
816                                self.event_store.append(NetEvent::TokenRemoved {
817                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
818                                    timestamp: now_millis(),
819                                });
820                            }
821                            self.reusable_inputs
822                                .entry(Arc::clone(&self.program.place_name_arcs[pid]))
823                                .or_default()
824                                .push(token);
825                        }
826                    }
827                    CONSUME_ALL | CONSUME_ATLEAST => {
828                        let pid = self.program.consume_ops[tid][pc] as usize;
829                        pc += 1;
830                        if opcode == CONSUME_ATLEAST {
831                            pc += 1;
832                        }
833                        let count = self.token_counts[pid];
834                        for _ in 0..count {
835                            let token = self.ring_remove_first(pid);
836                            if E::ENABLED {
837                                self.event_store.append(NetEvent::TokenRemoved {
838                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
839                                    timestamp: now_millis(),
840                                });
841                            }
842                            self.reusable_inputs
843                                .entry(Arc::clone(&self.program.place_name_arcs[pid]))
844                                .or_default()
845                                .push(token);
846                        }
847                    }
848                    RESET => {
849                        let pid = self.program.consume_ops[tid][pc] as usize;
850                        pc += 1;
851                        let count = self.token_counts[pid];
852                        for _ in 0..count {
853                            let _token = self.ring_remove_first(pid);
854                            if E::ENABLED {
855                                self.event_store.append(NetEvent::TokenRemoved {
856                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
857                                    timestamp: now_millis(),
858                                });
859                            }
860                        }
861                        self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
862                            1u64 << (pid & bitmap::WORD_MASK);
863                        self.has_pending_resets = true;
864                    }
865                    _ => unreachable!("Unknown opcode: {opcode}"),
866                }
867            }
868        }
869
870        // Execute read program — iterate by index, no clone of ops Vec
871        let read_ops_len = self.program.read_ops[tid].len();
872        for i in 0..read_ops_len {
873            let rpid = self.program.read_ops[tid][i];
874            let token_clone = self.ring_peek_first(rpid).cloned();
875            if let Some(token) = token_clone {
876                let place_name = Arc::clone(&self.program.place_name_arcs[rpid]);
877                self.reusable_reads
878                    .entry(place_name)
879                    .or_default()
880                    .push(token);
881            }
882        }
883
884        // Update bitmap for consumed/reset places
885        self.update_bitmap_after_consumption(tid);
886
887        if E::ENABLED {
888            self.event_store.append(NetEvent::TransitionStarted {
889                transition_name: Arc::clone(&transition_name),
890                timestamp: now_millis(),
891            });
892        }
893
894        // Create context using precomputed output place names and reusable buffers
895        let inputs = std::mem::take(&mut self.reusable_inputs);
896        let reads = std::mem::take(&mut self.reusable_reads);
897        let mut ctx = TransitionContext::new(
898            Arc::clone(&transition_name),
899            inputs,
900            reads,
901            self.program.output_place_name_sets[tid].clone(),
902            None,
903        );
904
905        let result = action.run_sync(&mut ctx);
906
907        // Reclaim buffers for reuse — keeps HashMap bucket allocations alive
908        let returned_inputs = ctx.take_inputs();
909        let returned_reads = ctx.take_reads();
910
911        match result {
912            Ok(()) => {
913                let outputs = ctx.take_outputs();
914                self.process_outputs(tid, &transition_name, outputs);
915
916                if E::ENABLED {
917                    self.event_store.append(NetEvent::TransitionCompleted {
918                        transition_name: Arc::clone(&transition_name),
919                        timestamp: now_millis(),
920                    });
921                }
922            }
923            Err(err) => {
924                if E::ENABLED {
925                    self.event_store.append(NetEvent::TransitionFailed {
926                        transition_name: Arc::clone(&transition_name),
927                        error: err.message,
928                        timestamp: now_millis(),
929                    });
930                }
931            }
932        }
933
934        // Return reclaimed buffers for next firing
935        self.reusable_inputs = returned_inputs;
936        self.reusable_reads = returned_reads;
937
938        // Clear enabled status
939        self.clear_enabled_bit(tid);
940        self.enabled_transition_count -= 1;
941        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
942
943        // Mark this transition dirty for re-evaluation
944        self.mark_transition_dirty(tid);
945    }
946
947    /// Removes the first token matching `guard` from the ring buffer at `pid`.
948    /// Returns at most one token per call (In::One guard semantics).
949    fn ring_remove_matching(
950        &mut self,
951        pid: usize,
952        guard: &dyn Fn(&dyn std::any::Any) -> bool,
953    ) -> Option<ErasedToken> {
954        let count = self.token_counts[pid];
955        if count == 0 {
956            return None;
957        }
958        let offset = self.place_offset[pid];
959        let head = self.ring_head[pid];
960        let cap = self.ring_capacity[pid];
961
962        // Find first matching token
963        for i in 0..count {
964            let idx = offset + (head + i) % cap;
965            if let Some(token) = &self.token_pool[idx]
966                && guard(token.value.as_ref())
967            {
968                let token = self.token_pool[idx].take().unwrap();
969                // Compact the ring by shifting remaining elements
970                for j in i..count - 1 {
971                    let from = offset + (head + j + 1) % cap;
972                    let to = offset + (head + j) % cap;
973                    self.token_pool[to] = self.token_pool[from].take();
974                }
975                self.token_counts[pid] -= 1;
976                self.ring_tail[pid] = if self.ring_tail[pid] == 0 {
977                    cap - 1
978                } else {
979                    self.ring_tail[pid] - 1
980                };
981                return Some(token);
982            }
983        }
984        None
985    }
986
987    fn process_outputs(
988        &mut self,
989        _tid: usize,
990        _transition_name: &Arc<str>,
991        outputs: Vec<OutputEntry>,
992    ) {
993        for entry in outputs {
994            if let Some(pid) = self.program.place_id(&entry.place_name) {
995                self.ring_add_last(pid, entry.token);
996                self.set_marking_bit(pid);
997                self.mark_dirty(pid);
998            }
999
1000            if E::ENABLED {
1001                self.event_store.append(NetEvent::TokenAdded {
1002                    place_name: Arc::clone(&entry.place_name),
1003                    timestamp: now_millis(),
1004                });
1005            }
1006        }
1007    }
1008
1009    fn update_bitmap_after_consumption(&mut self, tid: usize) {
1010        let n = self.program.compiled().consumption_place_ids(tid).len();
1011        for i in 0..n {
1012            let pid = self.program.compiled().consumption_place_ids(tid)[i];
1013            if self.token_counts[pid] == 0 {
1014                self.clear_marking_bit(pid);
1015            }
1016            self.mark_dirty(pid);
1017        }
1018    }
1019
1020    // ==================== Dirty Set Helpers ====================
1021
1022    fn has_dirty_bits(&self) -> bool {
1023        for &s in &self.dirty_word_summary {
1024            if s != 0 {
1025                return true;
1026            }
1027        }
1028        false
1029    }
1030
1031    fn mark_dirty(&mut self, pid: usize) {
1032        let n = self.program.compiled().affected_transitions(pid).len();
1033        for i in 0..n {
1034            let tid = self.program.compiled().affected_transitions(pid)[i];
1035            self.mark_transition_dirty(tid);
1036        }
1037    }
1038
1039    fn mark_transition_dirty(&mut self, tid: usize) {
1040        let w = tid >> bitmap::WORD_SHIFT;
1041        self.dirty_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
1042        self.dirty_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
1043    }
1044
1045    fn elapsed_ms(&self) -> f64 {
1046        self.start_time.elapsed().as_secs_f64() * 1000.0
1047    }
1048
1049    // ==================== Marking Sync ====================
1050
1051    fn materialize_marking(&self) -> Marking {
1052        let mut marking = Marking::new();
1053        for pid in 0..self.program.place_count() {
1054            let count = self.token_counts[pid];
1055            if count == 0 {
1056                continue;
1057            }
1058            let place_name = self.program.place(pid).name_arc();
1059            let offset = self.place_offset[pid];
1060            let head = self.ring_head[pid];
1061            let cap = self.ring_capacity[pid];
1062            for i in 0..count {
1063                let idx = offset + (head + i) % cap;
1064                if let Some(token) = &self.token_pool[idx] {
1065                    marking.add_erased(place_name, token.clone());
1066                }
1067            }
1068        }
1069        marking
1070    }
1071
1072    /// Runs the executor synchronously and returns the final marking.
1073    fn run_to_completion(&mut self) -> Marking {
1074        self.initialize_marking_bitmap();
1075        self.mark_all_dirty();
1076
1077        if E::ENABLED {
1078            let now = now_millis();
1079            self.event_store.append(NetEvent::ExecutionStarted {
1080                net_name: Arc::from(self.program.net().name()),
1081                timestamp: now,
1082            });
1083        }
1084
1085        loop {
1086            self.update_dirty_transitions();
1087
1088            let cycle_now = self.elapsed_ms();
1089
1090            if self.program.any_deadlines {
1091                self.enforce_deadlines(cycle_now);
1092            }
1093
1094            if self.should_terminate() {
1095                break;
1096            }
1097
1098            if self.program.all_immediate && self.program.all_same_priority {
1099                self.fire_ready_immediate_sync();
1100            } else {
1101                self.fire_ready_general_sync(cycle_now);
1102            }
1103
1104            if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
1105                break;
1106            }
1107        }
1108
1109        if E::ENABLED {
1110            let now = now_millis();
1111            self.event_store.append(NetEvent::ExecutionCompleted {
1112                net_name: Arc::from(self.program.net().name()),
1113                timestamp: now,
1114            });
1115        }
1116
1117        self.materialize_marking()
1118    }
1119}
1120
1121// Async path
1122#[cfg(feature = "tokio")]
1123use crate::environment::ExecutorSignal;
1124
1125#[cfg(feature = "tokio")]
1126struct ActionCompletion {
1127    transition_name: Arc<str>,
1128    result: Result<Vec<OutputEntry>, String>,
1129}
1130
1131#[cfg(feature = "tokio")]
1132impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
1133    /// Runs the executor asynchronously with tokio.
1134    ///
1135    /// External events are injected via [`ExecutorSignal::Event`]. Lifecycle
1136    /// signals [`ExecutorSignal::Drain`] and [`ExecutorSignal::Close`] control
1137    /// graceful and immediate shutdown respectively. Use
1138    /// [`ExecutorHandle`](crate::executor_handle::ExecutorHandle) for RAII-managed
1139    /// lifecycle with automatic drain on drop.
1140    pub async fn run_async(
1141        &mut self,
1142        mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
1143    ) -> Marking {
1144        let (completion_tx, mut completion_rx) =
1145            tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
1146
1147        self.initialize_marking_bitmap();
1148        self.mark_all_dirty();
1149
1150        let mut in_flight_count: usize = 0;
1151        let mut signal_channel_open = true;
1152        let mut draining = false;
1153        let mut closed = false;
1154
1155        if E::ENABLED {
1156            let now = now_millis();
1157            self.event_store.append(NetEvent::ExecutionStarted {
1158                net_name: Arc::from(self.program.net().name()),
1159                timestamp: now,
1160            });
1161        }
1162
1163        loop {
1164            // Phase 1: Process completed async actions
1165            while let Ok(completion) = completion_rx.try_recv() {
1166                in_flight_count -= 1;
1167                match completion.result {
1168                    Ok(outputs) => {
1169                        self.process_outputs(0, &completion.transition_name, outputs);
1170                        if E::ENABLED {
1171                            self.event_store.append(NetEvent::TransitionCompleted {
1172                                transition_name: Arc::clone(&completion.transition_name),
1173                                timestamp: now_millis(),
1174                            });
1175                        }
1176                    }
1177                    Err(err) => {
1178                        if E::ENABLED {
1179                            self.event_store.append(NetEvent::TransitionFailed {
1180                                transition_name: Arc::clone(&completion.transition_name),
1181                                error: err,
1182                                timestamp: now_millis(),
1183                            });
1184                        }
1185                    }
1186                }
1187            }
1188
1189            // Phase 2: Process signals (external events + lifecycle)
1190            while let Ok(signal) = signal_rx.try_recv() {
1191                match signal {
1192                    ExecutorSignal::Event(event) if !draining => {
1193                        if let Some(pid) = self.program.place_id(&event.place_name) {
1194                            self.ring_add_last(pid, event.token);
1195                            self.set_marking_bit(pid);
1196                            self.mark_dirty(pid);
1197                        }
1198                        if E::ENABLED {
1199                            self.event_store.append(NetEvent::TokenAdded {
1200                                place_name: Arc::clone(&event.place_name),
1201                                timestamp: now_millis(),
1202                            });
1203                        }
1204                    }
1205                    ExecutorSignal::Event(_) => {
1206                        // Draining: discard events arriving after drain signal
1207                    }
1208                    ExecutorSignal::Drain => {
1209                        draining = true;
1210                    }
1211                    ExecutorSignal::Close => {
1212                        closed = true;
1213                        draining = true;
1214                        // Discard remaining queued signals per ENV-013.
1215                        // Note: events already processed earlier in this try_recv batch
1216                        // are kept (single-channel design); Java/TS discard all queued
1217                        // events via an atomic flag checked at processExternalEvents() entry.
1218                        while signal_rx.try_recv().is_ok() {}
1219                    }
1220                }
1221            }
1222
1223            // Phase 3: Update dirty transitions
1224            self.update_dirty_transitions();
1225
1226            // Phase 4: Enforce deadlines
1227            let cycle_now = self.elapsed_ms();
1228            if self.program.any_deadlines {
1229                self.enforce_deadlines(cycle_now);
1230            }
1231
1232            // Termination check — O(1) flag checks
1233            if closed && in_flight_count == 0 {
1234                break; // ENV-013: immediate close, in-flight completed
1235            }
1236            if draining
1237                && self.enabled_transition_count == 0
1238                && in_flight_count == 0
1239            {
1240                break; // ENV-011: graceful drain, quiescent
1241            }
1242            if self.enabled_transition_count == 0
1243                && in_flight_count == 0
1244                && (!self.has_environment_places || !signal_channel_open)
1245            {
1246                break; // Standard termination
1247            }
1248
1249            // Phase 5: Fire ready transitions
1250            let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
1251
1252            if fired || self.has_dirty_bits() {
1253                tokio::task::yield_now().await;
1254                continue;
1255            }
1256
1257            // Phase 6: Await work (completion, external event, or timer)
1258            if in_flight_count == 0 && !self.has_environment_places
1259                && self.enabled_transition_count == 0
1260            {
1261                break;
1262            }
1263            if in_flight_count == 0
1264                && self.enabled_transition_count == 0
1265                && (draining || !signal_channel_open)
1266            {
1267                break;
1268            }
1269
1270            let timer_ms = self.millis_until_next_timed_transition();
1271
1272            tokio::select! {
1273                Some(completion) = completion_rx.recv() => {
1274                    in_flight_count -= 1;
1275                    match completion.result {
1276                        Ok(outputs) => {
1277                            self.process_outputs(0, &completion.transition_name, outputs);
1278                            if E::ENABLED {
1279                                self.event_store.append(NetEvent::TransitionCompleted {
1280                                    transition_name: Arc::clone(&completion.transition_name),
1281                                    timestamp: now_millis(),
1282                                });
1283                            }
1284                        }
1285                        Err(err) => {
1286                            if E::ENABLED {
1287                                self.event_store.append(NetEvent::TransitionFailed {
1288                                    transition_name: Arc::clone(&completion.transition_name),
1289                                    error: err,
1290                                    timestamp: now_millis(),
1291                                });
1292                            }
1293                        }
1294                    }
1295                }
1296                result = signal_rx.recv(), if signal_channel_open && !closed => {
1297                    match result {
1298                        Some(ExecutorSignal::Event(event)) if !draining => {
1299                            if let Some(pid) = self.program.place_id(&event.place_name) {
1300                                self.ring_add_last(pid, event.token);
1301                                self.set_marking_bit(pid);
1302                                self.mark_dirty(pid);
1303                            }
1304                            if E::ENABLED {
1305                                self.event_store.append(NetEvent::TokenAdded {
1306                                    place_name: Arc::clone(&event.place_name),
1307                                    timestamp: now_millis(),
1308                                });
1309                            }
1310                        }
1311                        Some(ExecutorSignal::Event(_)) => {
1312                            // Draining: discard events
1313                        }
1314                        Some(ExecutorSignal::Drain) => {
1315                            draining = true;
1316                        }
1317                        Some(ExecutorSignal::Close) => {
1318                            closed = true;
1319                            draining = true;
1320                            while signal_rx.try_recv().is_ok() {}
1321                        }
1322                        None => {
1323                            signal_channel_open = false;
1324                        }
1325                    }
1326                }
1327                _ = tokio::time::sleep(std::time::Duration::from_millis(
1328                    if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
1329                )) => {}
1330            }
1331        }
1332
1333        if E::ENABLED {
1334            let now = now_millis();
1335            self.event_store.append(NetEvent::ExecutionCompleted {
1336                net_name: Arc::from(self.program.net().name()),
1337                timestamp: now,
1338            });
1339        }
1340
1341        self.materialize_marking()
1342    }
1343
1344    fn fire_ready_async(
1345        &mut self,
1346        now_ms: f64,
1347        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1348        in_flight_count: &mut usize,
1349    ) -> bool {
1350        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
1351
1352        for s in 0..self.summary_words {
1353            let mut summary = self.enabled_word_summary[s];
1354            while summary != 0 {
1355                let local_w = summary.trailing_zeros() as usize;
1356                summary &= summary - 1;
1357                let w = (s << bitmap::WORD_SHIFT) | local_w;
1358                if w >= self.transition_words {
1359                    continue;
1360                }
1361                let mut word = self.enabled_bitmap[w];
1362                while word != 0 {
1363                    let bit = word.trailing_zeros() as usize;
1364                    let tid = (w << bitmap::WORD_SHIFT) | bit;
1365                    word &= word - 1;
1366
1367                    let enabled_ms = self.enabled_at_ms[tid];
1368                    let elapsed = now_ms - enabled_ms;
1369                    if self.program.earliest_ms[tid] <= elapsed {
1370                        ready.push((tid, self.program.priorities[tid], enabled_ms));
1371                    }
1372                }
1373            }
1374        }
1375
1376        if ready.is_empty() {
1377            return false;
1378        }
1379
1380        ready.sort_by(|a, b| {
1381            b.1.cmp(&a.1)
1382                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
1383        });
1384
1385        let mut fired_any = false;
1386        for (tid, _, _) in ready {
1387            if self.is_enabled(tid) && self.can_enable(tid) {
1388                self.fire_transition_async(tid, completion_tx, in_flight_count);
1389                fired_any = true;
1390            } else if self.is_enabled(tid) {
1391                self.clear_enabled_bit(tid);
1392                self.enabled_transition_count -= 1;
1393                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1394            }
1395        }
1396        fired_any
1397    }
1398
1399    fn fire_transition_async(
1400        &mut self,
1401        tid: usize,
1402        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1403        in_flight_count: &mut usize,
1404    ) {
1405        let t = self.program.transition(tid);
1406        let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
1407        let input_specs: Vec<In> = t.input_specs().to_vec();
1408        let read_arcs: Vec<_> = t.reads().to_vec();
1409        let reset_arcs: Vec<_> = t.resets().to_vec();
1410        let output_place_names = self.program.output_place_name_sets[tid].clone();
1411        let action = Arc::clone(t.action());
1412        let is_sync = action.is_sync();
1413
1414        // Consume tokens (using spec-based for simplicity in async path)
1415        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1416        for in_spec in &input_specs {
1417            let pid = self.program.place_id(in_spec.place_name()).unwrap();
1418            let to_consume = match in_spec {
1419                In::One { .. } => 1,
1420                In::Exactly { count, .. } => *count,
1421                In::All { guard, .. } | In::AtLeast { guard, .. } => {
1422                    if guard.is_some() {
1423                        self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
1424                    } else {
1425                        self.token_counts[pid]
1426                    }
1427                }
1428            };
1429
1430            let place_name_arc = Arc::clone(in_spec.place().name_arc());
1431            for _ in 0..to_consume {
1432                let token = if let Some(guard) = in_spec.guard() {
1433                    self.ring_remove_matching(pid, &**guard)
1434                } else {
1435                    Some(self.ring_remove_first(pid))
1436                };
1437                if let Some(token) = token {
1438                    if E::ENABLED {
1439                        self.event_store.append(NetEvent::TokenRemoved {
1440                            place_name: Arc::clone(&place_name_arc),
1441                            timestamp: now_millis(),
1442                        });
1443                    }
1444                    inputs
1445                        .entry(Arc::clone(&place_name_arc))
1446                        .or_default()
1447                        .push(token);
1448                }
1449            }
1450        }
1451
1452        // Read arcs
1453        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1454        for arc in &read_arcs {
1455            let rpid = self.program.place_id(arc.place.name()).unwrap();
1456            if let Some(token) = self.ring_peek_first(rpid) {
1457                read_tokens
1458                    .entry(Arc::clone(arc.place.name_arc()))
1459                    .or_default()
1460                    .push(token.clone());
1461            }
1462        }
1463
1464        // Reset arcs
1465        for arc in &reset_arcs {
1466            let pid = self.program.place_id(arc.place.name()).unwrap();
1467            let removed = self.ring_remove_all(pid);
1468            if E::ENABLED {
1469                for _ in &removed {
1470                    self.event_store.append(NetEvent::TokenRemoved {
1471                        place_name: Arc::clone(arc.place.name_arc()),
1472                        timestamp: now_millis(),
1473                    });
1474                }
1475            }
1476            self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
1477                1u64 << (pid & bitmap::WORD_MASK);
1478            self.has_pending_resets = true;
1479        }
1480
1481        self.update_bitmap_after_consumption(tid);
1482
1483        if E::ENABLED {
1484            self.event_store.append(NetEvent::TransitionStarted {
1485                transition_name: Arc::clone(&transition_name),
1486                timestamp: now_millis(),
1487            });
1488        }
1489
1490        // Clear enabled status
1491        self.clear_enabled_bit(tid);
1492        self.enabled_transition_count -= 1;
1493        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1494        self.mark_transition_dirty(tid);
1495
1496        if is_sync {
1497            let mut ctx = TransitionContext::new(
1498                Arc::clone(&transition_name),
1499                inputs,
1500                read_tokens,
1501                output_place_names,
1502                None,
1503            );
1504            let result = action.run_sync(&mut ctx);
1505            match result {
1506                Ok(()) => {
1507                    let outputs = ctx.take_outputs();
1508                    self.process_outputs(tid, &transition_name, outputs);
1509                    if E::ENABLED {
1510                        self.event_store.append(NetEvent::TransitionCompleted {
1511                            transition_name: Arc::clone(&transition_name),
1512                            timestamp: now_millis(),
1513                        });
1514                    }
1515                }
1516                Err(err) => {
1517                    if E::ENABLED {
1518                        self.event_store.append(NetEvent::TransitionFailed {
1519                            transition_name: Arc::clone(&transition_name),
1520                            error: err.message,
1521                            timestamp: now_millis(),
1522                        });
1523                    }
1524                }
1525            }
1526        } else {
1527            *in_flight_count += 1;
1528            let tx = completion_tx.clone();
1529            let name = Arc::clone(&transition_name);
1530            let ctx = TransitionContext::new(
1531                Arc::clone(&transition_name),
1532                inputs,
1533                read_tokens,
1534                output_place_names,
1535                None,
1536            );
1537            tokio::spawn(async move {
1538                let result = action.run_async(ctx).await;
1539                let completion = match result {
1540                    Ok(mut completed_ctx) => ActionCompletion {
1541                        transition_name: Arc::clone(&name),
1542                        result: Ok(completed_ctx.take_outputs()),
1543                    },
1544                    Err(err) => ActionCompletion {
1545                        transition_name: Arc::clone(&name),
1546                        result: Err(err.message),
1547                    },
1548                };
1549                let _ = tx.send(completion);
1550            });
1551        }
1552    }
1553
1554    fn millis_until_next_timed_transition(&self) -> f64 {
1555        let mut min_wait = f64::INFINITY;
1556        let now_ms = self.elapsed_ms();
1557
1558        for s in 0..self.summary_words {
1559            let mut summary = self.enabled_word_summary[s];
1560            while summary != 0 {
1561                let local_w = summary.trailing_zeros() as usize;
1562                summary &= summary - 1;
1563                let w = (s << bitmap::WORD_SHIFT) | local_w;
1564                if w >= self.transition_words {
1565                    continue;
1566                }
1567                let mut word = self.enabled_bitmap[w];
1568                while word != 0 {
1569                    let bit = word.trailing_zeros() as usize;
1570                    let tid = (w << bitmap::WORD_SHIFT) | bit;
1571                    word &= word - 1;
1572
1573                    let elapsed = now_ms - self.enabled_at_ms[tid];
1574                    let remaining_earliest = self.program.earliest_ms[tid] - elapsed;
1575                    if remaining_earliest <= 0.0 {
1576                        return 0.0;
1577                    }
1578                    min_wait = min_wait.min(remaining_earliest);
1579
1580                    if self.program.has_deadline[tid] {
1581                        let remaining_deadline = self.program.latest_ms[tid] - elapsed;
1582                        if remaining_deadline <= 0.0 {
1583                            return 0.0;
1584                        }
1585                        min_wait = min_wait.min(remaining_deadline);
1586                    }
1587                }
1588            }
1589        }
1590
1591        min_wait
1592    }
1593}
1594
1595// ==================== Static Helpers ====================
1596
1597fn grow_ring_static(
1598    token_pool: &mut Vec<Option<ErasedToken>>,
1599    place_offset: &mut [usize],
1600    ring_head: &mut [usize],
1601    ring_tail: &mut [usize],
1602    ring_capacity: &mut [usize],
1603    token_counts: &[usize],
1604    pid: usize,
1605) {
1606    let old_cap = ring_capacity[pid];
1607    let new_cap = old_cap * 2;
1608    let old_offset = place_offset[pid];
1609    let head = ring_head[pid];
1610    let count = token_counts[pid];
1611
1612    // Relocate to end of pool
1613    let new_offset = token_pool.len();
1614    token_pool.resize_with(new_offset + new_cap, || None);
1615
1616    // Copy ring contents linearized
1617    for i in 0..count {
1618        let old_idx = old_offset + (head + i) % old_cap;
1619        token_pool[new_offset + i] = token_pool[old_idx].take();
1620    }
1621
1622    place_offset[pid] = new_offset;
1623    ring_head[pid] = 0;
1624    ring_tail[pid] = count;
1625    ring_capacity[pid] = new_cap;
1626}
1627
1628fn now_millis() -> u64 {
1629    std::time::SystemTime::now()
1630        .duration_since(std::time::UNIX_EPOCH)
1631        .unwrap_or_default()
1632        .as_millis() as u64
1633}
1634
1635#[cfg(test)]
1636mod tests {
1637    use super::*;
1638    use crate::compiled_net::CompiledNet;
1639    use libpetri_core::action::{fork, passthrough, sync_action};
1640    use libpetri_core::input::one;
1641    use libpetri_core::output::out_place;
1642    use libpetri_core::petri_net::PetriNet;
1643    use libpetri_core::place::Place;
1644    use libpetri_core::token::Token;
1645    use libpetri_core::transition::Transition;
1646    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1647
1648    fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1649        let p1 = Place::<i32>::new("p1");
1650        let p2 = Place::<i32>::new("p2");
1651        let p3 = Place::<i32>::new("p3");
1652
1653        let t1 = Transition::builder("t1")
1654            .input(one(&p1))
1655            .output(out_place(&p2))
1656            .action(passthrough())
1657            .build();
1658        let t2 = Transition::builder("t2")
1659            .input(one(&p2))
1660            .output(out_place(&p3))
1661            .action(passthrough())
1662            .build();
1663
1664        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1665        (net, p1, p2, p3)
1666    }
1667
1668    #[test]
1669    fn sync_passthrough_chain() {
1670        let (net, p1, _p2, _p3) = simple_chain();
1671        let compiled = CompiledNet::compile(&net);
1672        let prog = PrecompiledNet::from_compiled(&compiled);
1673
1674        let mut marking = Marking::new();
1675        marking.add(&p1, Token::at(42, 0));
1676
1677        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1678        let result = executor.run_to_completion();
1679
1680        assert_eq!(result.count("p1"), 0);
1681    }
1682
1683    #[test]
1684    fn sync_fork_chain() {
1685        let p1 = Place::<i32>::new("p1");
1686        let p2 = Place::<i32>::new("p2");
1687        let p3 = Place::<i32>::new("p3");
1688
1689        let t1 = Transition::builder("t1")
1690            .input(one(&p1))
1691            .output(libpetri_core::output::and(vec![
1692                out_place(&p2),
1693                out_place(&p3),
1694            ]))
1695            .action(fork())
1696            .build();
1697
1698        let net = PetriNet::builder("fork").transition(t1).build();
1699        let compiled = CompiledNet::compile(&net);
1700        let prog = PrecompiledNet::from_compiled(&compiled);
1701
1702        let mut marking = Marking::new();
1703        marking.add(&p1, Token::at(42, 0));
1704
1705        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1706        let result = executor.run_to_completion();
1707
1708        assert_eq!(result.count("p1"), 0);
1709        assert_eq!(result.count("p2"), 1);
1710        assert_eq!(result.count("p3"), 1);
1711    }
1712
1713    #[test]
1714    fn sync_linear_chain_5() {
1715        let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1716        let transitions: Vec<Transition> = (0..5)
1717            .map(|i| {
1718                Transition::builder(format!("t{i}"))
1719                    .input(one(&places[i]))
1720                    .output(out_place(&places[i + 1]))
1721                    .action(fork())
1722                    .build()
1723            })
1724            .collect();
1725
1726        let net = PetriNet::builder("chain5").transitions(transitions).build();
1727        let compiled = CompiledNet::compile(&net);
1728        let prog = PrecompiledNet::from_compiled(&compiled);
1729
1730        let mut marking = Marking::new();
1731        marking.add(&places[0], Token::at(1, 0));
1732
1733        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1734        let result = executor.run_to_completion();
1735
1736        assert_eq!(result.count("p0"), 0);
1737        assert_eq!(result.count("p5"), 1);
1738    }
1739
1740    #[test]
1741    fn sync_no_initial_tokens() {
1742        let (net, _, _, _) = simple_chain();
1743        let compiled = CompiledNet::compile(&net);
1744        let prog = PrecompiledNet::from_compiled(&compiled);
1745        let marking = Marking::new();
1746        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1747        let result = executor.run_to_completion();
1748        assert_eq!(result.count("p1"), 0);
1749        assert_eq!(result.count("p2"), 0);
1750        assert_eq!(result.count("p3"), 0);
1751    }
1752
1753    #[test]
1754    fn sync_priority_ordering() {
1755        let p = Place::<()>::new("p");
1756        let out_a = Place::<()>::new("a");
1757        let out_b = Place::<()>::new("b");
1758
1759        let t_high = Transition::builder("t_high")
1760            .input(one(&p))
1761            .output(out_place(&out_a))
1762            .action(passthrough())
1763            .priority(10)
1764            .build();
1765        let t_low = Transition::builder("t_low")
1766            .input(one(&p))
1767            .output(out_place(&out_b))
1768            .action(passthrough())
1769            .priority(1)
1770            .build();
1771
1772        let net = PetriNet::builder("priority")
1773            .transitions([t_high, t_low])
1774            .build();
1775        let compiled = CompiledNet::compile(&net);
1776        let prog = PrecompiledNet::from_compiled(&compiled);
1777
1778        let mut marking = Marking::new();
1779        marking.add(&p, Token::at((), 0));
1780
1781        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1782        let result = executor.run_to_completion();
1783
1784        assert_eq!(result.count("p"), 0);
1785    }
1786
1787    #[test]
1788    fn sync_inhibitor_blocks() {
1789        let p1 = Place::<()>::new("p1");
1790        let p2 = Place::<()>::new("p2");
1791        let p_inh = Place::<()>::new("inh");
1792
1793        let t = Transition::builder("t1")
1794            .input(one(&p1))
1795            .output(out_place(&p2))
1796            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1797            .action(passthrough())
1798            .build();
1799
1800        let net = PetriNet::builder("inhibitor").transition(t).build();
1801        let compiled = CompiledNet::compile(&net);
1802        let prog = PrecompiledNet::from_compiled(&compiled);
1803
1804        let mut marking = Marking::new();
1805        marking.add(&p1, Token::at((), 0));
1806        marking.add(&p_inh, Token::at((), 0));
1807
1808        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1809        let result = executor.run_to_completion();
1810
1811        assert_eq!(result.count("p1"), 1);
1812    }
1813
1814    #[test]
1815    fn read_arc_does_not_consume() {
1816        let p_in = Place::<i32>::new("in");
1817        let p_ctx = Place::<i32>::new("ctx");
1818        let p_out = Place::<i32>::new("out");
1819
1820        let t = Transition::builder("t1")
1821            .input(one(&p_in))
1822            .read(libpetri_core::arc::read(&p_ctx))
1823            .output(out_place(&p_out))
1824            .action(sync_action(|ctx| {
1825                let v = ctx.input::<i32>("in")?;
1826                let r = ctx.read::<i32>("ctx")?;
1827                ctx.output("out", *v + *r)?;
1828                Ok(())
1829            }))
1830            .build();
1831        let net = PetriNet::builder("test").transition(t).build();
1832        let compiled = CompiledNet::compile(&net);
1833        let prog = PrecompiledNet::from_compiled(&compiled);
1834
1835        let mut marking = Marking::new();
1836        marking.add(&p_in, Token::at(10, 0));
1837        marking.add(&p_ctx, Token::at(5, 0));
1838
1839        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1840        let result = executor.run_to_completion();
1841
1842        assert_eq!(result.count("in"), 0);
1843        assert_eq!(result.count("ctx"), 1);
1844        assert_eq!(result.count("out"), 1);
1845    }
1846
1847    #[test]
1848    fn reset_arc_removes_all_tokens() {
1849        let p_in = Place::<()>::new("in");
1850        let p_reset = Place::<i32>::new("reset");
1851        let p_out = Place::<()>::new("out");
1852
1853        let t = Transition::builder("t1")
1854            .input(one(&p_in))
1855            .reset(libpetri_core::arc::reset(&p_reset))
1856            .output(out_place(&p_out))
1857            .action(fork())
1858            .build();
1859        let net = PetriNet::builder("test").transition(t).build();
1860        let compiled = CompiledNet::compile(&net);
1861        let prog = PrecompiledNet::from_compiled(&compiled);
1862
1863        let mut marking = Marking::new();
1864        marking.add(&p_in, Token::at((), 0));
1865        marking.add(&p_reset, Token::at(1, 0));
1866        marking.add(&p_reset, Token::at(2, 0));
1867        marking.add(&p_reset, Token::at(3, 0));
1868
1869        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1870        let result = executor.run_to_completion();
1871
1872        assert_eq!(result.count("reset"), 0);
1873        assert_eq!(result.count("out"), 1);
1874    }
1875
1876    #[test]
1877    fn exactly_cardinality_consumes_n() {
1878        let p = Place::<i32>::new("p");
1879        let p_out = Place::<i32>::new("out");
1880
1881        let t = Transition::builder("t1")
1882            .input(libpetri_core::input::exactly(3, &p))
1883            .output(out_place(&p_out))
1884            .action(sync_action(|ctx| {
1885                let vals = ctx.inputs::<i32>("p")?;
1886                for v in vals {
1887                    ctx.output("out", *v)?;
1888                }
1889                Ok(())
1890            }))
1891            .build();
1892        let net = PetriNet::builder("test").transition(t).build();
1893        let compiled = CompiledNet::compile(&net);
1894        let prog = PrecompiledNet::from_compiled(&compiled);
1895
1896        let mut marking = Marking::new();
1897        for i in 0..5 {
1898            marking.add(&p, Token::at(i, 0));
1899        }
1900
1901        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1902        let result = executor.run_to_completion();
1903
1904        assert_eq!(result.count("p"), 2);
1905        assert_eq!(result.count("out"), 3);
1906    }
1907
1908    #[test]
1909    fn all_cardinality_consumes_everything() {
1910        let p = Place::<i32>::new("p");
1911        let p_out = Place::<()>::new("out");
1912
1913        let t = Transition::builder("t1")
1914            .input(libpetri_core::input::all(&p))
1915            .output(out_place(&p_out))
1916            .action(sync_action(|ctx| {
1917                let vals = ctx.inputs::<i32>("p")?;
1918                ctx.output("out", vals.len() as i32)?;
1919                Ok(())
1920            }))
1921            .build();
1922        let net = PetriNet::builder("test").transition(t).build();
1923        let compiled = CompiledNet::compile(&net);
1924        let prog = PrecompiledNet::from_compiled(&compiled);
1925
1926        let mut marking = Marking::new();
1927        for i in 0..5 {
1928            marking.add(&p, Token::at(i, 0));
1929        }
1930
1931        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1932        let result = executor.run_to_completion();
1933
1934        assert_eq!(result.count("p"), 0);
1935    }
1936
1937    #[test]
1938    fn at_least_blocks_insufficient() {
1939        let p = Place::<i32>::new("p");
1940        let p_out = Place::<()>::new("out");
1941
1942        let t = Transition::builder("t1")
1943            .input(libpetri_core::input::at_least(3, &p))
1944            .output(out_place(&p_out))
1945            .action(passthrough())
1946            .build();
1947        let net = PetriNet::builder("test").transition(t).build();
1948        let compiled = CompiledNet::compile(&net);
1949        let prog = PrecompiledNet::from_compiled(&compiled);
1950
1951        let mut marking = Marking::new();
1952        marking.add(&p, Token::at(1, 0));
1953        marking.add(&p, Token::at(2, 0));
1954
1955        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1956        let result = executor.run_to_completion();
1957
1958        assert_eq!(result.count("p"), 2);
1959    }
1960
1961    #[test]
1962    fn at_least_fires_with_enough() {
1963        let p = Place::<i32>::new("p");
1964        let p_out = Place::<()>::new("out");
1965
1966        let t = Transition::builder("t1")
1967            .input(libpetri_core::input::at_least(3, &p))
1968            .output(out_place(&p_out))
1969            .action(passthrough())
1970            .build();
1971        let net = PetriNet::builder("test").transition(t).build();
1972        let compiled = CompiledNet::compile(&net);
1973        let prog = PrecompiledNet::from_compiled(&compiled);
1974
1975        let mut marking = Marking::new();
1976        for i in 0..5 {
1977            marking.add(&p, Token::at(i, 0));
1978        }
1979
1980        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1981        let result = executor.run_to_completion();
1982
1983        assert_eq!(result.count("p"), 0);
1984    }
1985
1986    #[test]
1987    fn guarded_input_only_consumes_matching() {
1988        let p = Place::<i32>::new("p");
1989        let p_out = Place::<i32>::new("out");
1990
1991        let t = Transition::builder("t1")
1992            .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1993            .output(out_place(&p_out))
1994            .action(fork())
1995            .build();
1996        let net = PetriNet::builder("test").transition(t).build();
1997        let compiled = CompiledNet::compile(&net);
1998        let prog = PrecompiledNet::from_compiled(&compiled);
1999
2000        let mut marking = Marking::new();
2001        marking.add(&p, Token::at(3, 0));
2002        marking.add(&p, Token::at(10, 0));
2003
2004        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2005        let result = executor.run_to_completion();
2006
2007        assert_eq!(result.count("p"), 1);
2008        assert_eq!(result.count("out"), 1);
2009    }
2010
2011    #[test]
2012    fn guarded_input_blocks_when_no_match() {
2013        let p = Place::<i32>::new("p");
2014        let p_out = Place::<i32>::new("out");
2015
2016        let t = Transition::builder("t1")
2017            .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
2018            .output(out_place(&p_out))
2019            .action(fork())
2020            .build();
2021        let net = PetriNet::builder("test").transition(t).build();
2022        let compiled = CompiledNet::compile(&net);
2023        let prog = PrecompiledNet::from_compiled(&compiled);
2024
2025        let mut marking = Marking::new();
2026        marking.add(&p, Token::at(3, 0));
2027        marking.add(&p, Token::at(10, 0));
2028
2029        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2030        let result = executor.run_to_completion();
2031
2032        assert_eq!(result.count("p"), 2);
2033        assert_eq!(result.count("out"), 0);
2034    }
2035
2036    #[test]
2037    fn event_store_records_lifecycle() {
2038        let p1 = Place::<i32>::new("p1");
2039        let p2 = Place::<i32>::new("p2");
2040        let t = Transition::builder("t1")
2041            .input(one(&p1))
2042            .output(out_place(&p2))
2043            .action(fork())
2044            .build();
2045        let net = PetriNet::builder("test").transition(t).build();
2046        let compiled = CompiledNet::compile(&net);
2047        let prog = PrecompiledNet::from_compiled(&compiled);
2048
2049        let mut marking = Marking::new();
2050        marking.add(&p1, Token::at(1, 0));
2051
2052        let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2053        let _result = executor.run_to_completion();
2054
2055        let events = executor.event_store().events();
2056        assert!(
2057            events
2058                .iter()
2059                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2060        );
2061        assert!(
2062            events
2063                .iter()
2064                .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
2065        );
2066        assert!(
2067            events
2068                .iter()
2069                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2070        );
2071        assert!(
2072            events
2073                .iter()
2074                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2075        );
2076        assert!(
2077            events
2078                .iter()
2079                .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
2080        );
2081        assert!(
2082            events
2083                .iter()
2084                .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
2085        );
2086        assert!(
2087            events
2088                .iter()
2089                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2090        );
2091    }
2092
2093    #[test]
2094    fn action_error_does_not_crash() {
2095        let p_in = Place::<i32>::new("in");
2096        let p_out = Place::<i32>::new("out");
2097
2098        let t = Transition::builder("t1")
2099            .input(one(&p_in))
2100            .output(out_place(&p_out))
2101            .action(sync_action(|_ctx| {
2102                Err(libpetri_core::action::ActionError::new(
2103                    "intentional failure",
2104                ))
2105            }))
2106            .build();
2107        let net = PetriNet::builder("test").transition(t).build();
2108        let compiled = CompiledNet::compile(&net);
2109        let prog = PrecompiledNet::from_compiled(&compiled);
2110
2111        let mut marking = Marking::new();
2112        marking.add(&p_in, Token::at(42, 0));
2113
2114        let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2115        let result = executor.run_to_completion();
2116
2117        assert_eq!(result.count("in"), 0);
2118        assert_eq!(result.count("out"), 0);
2119
2120        let events = executor.event_store().events();
2121        assert!(
2122            events
2123                .iter()
2124                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2125        );
2126    }
2127
2128    #[test]
2129    fn multiple_input_arcs_require_all() {
2130        let p1 = Place::<i32>::new("p1");
2131        let p2 = Place::<i32>::new("p2");
2132        let p3 = Place::<i32>::new("p3");
2133
2134        let t = Transition::builder("t1")
2135            .input(one(&p1))
2136            .input(one(&p2))
2137            .output(out_place(&p3))
2138            .action(sync_action(|ctx| {
2139                ctx.output("p3", 99i32)?;
2140                Ok(())
2141            }))
2142            .build();
2143        let net = PetriNet::builder("test").transition(t).build();
2144        let compiled = CompiledNet::compile(&net);
2145        let prog = PrecompiledNet::from_compiled(&compiled);
2146
2147        // Only p1 has token — should not fire
2148        let mut marking = Marking::new();
2149        marking.add(&p1, Token::at(1, 0));
2150        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2151        let result = executor.run_to_completion();
2152        assert_eq!(result.count("p1"), 1);
2153        assert_eq!(result.count("p3"), 0);
2154
2155        // Both p1 and p2 — should fire
2156        let compiled2 = CompiledNet::compile(&net);
2157        let prog2 = PrecompiledNet::from_compiled(&compiled2);
2158        let mut marking2 = Marking::new();
2159        marking2.add(&p1, Token::at(1, 0));
2160        marking2.add(&p2, Token::at(2, 0));
2161        let mut executor2 = PrecompiledNetExecutor::<NoopEventStore>::new(&prog2, marking2);
2162        let result2 = executor2.run_to_completion();
2163        assert_eq!(result2.count("p1"), 0);
2164        assert_eq!(result2.count("p2"), 0);
2165        assert_eq!(result2.count("p3"), 1);
2166    }
2167
2168    #[test]
2169    fn sync_action_custom_logic() {
2170        let p_in = Place::<i32>::new("in");
2171        let p_out = Place::<String>::new("out");
2172
2173        let t = Transition::builder("t1")
2174            .input(one(&p_in))
2175            .output(out_place(&p_out))
2176            .action(sync_action(|ctx| {
2177                let v = ctx.input::<i32>("in")?;
2178                ctx.output("out", format!("value={v}"))?;
2179                Ok(())
2180            }))
2181            .build();
2182        let net = PetriNet::builder("test").transition(t).build();
2183        let compiled = CompiledNet::compile(&net);
2184        let prog = PrecompiledNet::from_compiled(&compiled);
2185
2186        let mut marking = Marking::new();
2187        marking.add(&p_in, Token::at(42, 0));
2188
2189        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2190        let result = executor.run_to_completion();
2191
2192        assert_eq!(result.count("out"), 1);
2193    }
2194
2195    #[test]
2196    fn transform_action_outputs_to_all_places() {
2197        let p_in = Place::<i32>::new("in");
2198        let p_a = Place::<i32>::new("a");
2199        let p_b = Place::<i32>::new("b");
2200
2201        let t = Transition::builder("t1")
2202            .input(one(&p_in))
2203            .output(libpetri_core::output::and(vec![
2204                out_place(&p_a),
2205                out_place(&p_b),
2206            ]))
2207            .action(libpetri_core::action::transform(|ctx| {
2208                let v = ctx.input::<i32>("in").unwrap();
2209                Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
2210            }))
2211            .build();
2212        let net = PetriNet::builder("test").transition(t).build();
2213        let compiled = CompiledNet::compile(&net);
2214        let prog = PrecompiledNet::from_compiled(&compiled);
2215
2216        let mut marking = Marking::new();
2217        marking.add(&p_in, Token::at(5, 0));
2218
2219        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2220        let result = executor.run_to_completion();
2221
2222        assert_eq!(result.count("a"), 1);
2223        assert_eq!(result.count("b"), 1);
2224    }
2225
2226    #[test]
2227    fn complex_workflow() {
2228        use libpetri_core::output::{and, xor};
2229
2230        let input = Place::<i32>::new("v_input");
2231        let guard_in = Place::<i32>::new("v_guardIn");
2232        let intent_in = Place::<i32>::new("v_intentIn");
2233        let search_in = Place::<i32>::new("v_searchIn");
2234        let output_guard_in = Place::<i32>::new("v_outputGuardIn");
2235        let guard_safe = Place::<i32>::new("v_guardSafe");
2236        let guard_violation = Place::<i32>::new("v_guardViolation");
2237        let _violated = Place::<i32>::new("v_violated");
2238        let intent_ready = Place::<i32>::new("v_intentReady");
2239        let topic_ready = Place::<i32>::new("v_topicReady");
2240        let search_ready = Place::<i32>::new("v_searchReady");
2241        let _output_guard_done = Place::<i32>::new("v_outputGuardDone");
2242        let response = Place::<i32>::new("v_response");
2243
2244        let fork_trans = Transition::builder("Fork")
2245            .input(one(&input))
2246            .output(and(vec![
2247                out_place(&guard_in),
2248                out_place(&intent_in),
2249                out_place(&search_in),
2250                out_place(&output_guard_in),
2251            ]))
2252            .action(fork())
2253            .build();
2254
2255        let guard_trans = Transition::builder("Guard")
2256            .input(one(&guard_in))
2257            .output(xor(vec![
2258                out_place(&guard_safe),
2259                out_place(&guard_violation),
2260            ]))
2261            .action(fork())
2262            .build();
2263
2264        let handle_violation = Transition::builder("HandleViolation")
2265            .input(one(&guard_violation))
2266            .output(out_place(&_violated))
2267            .inhibitor(libpetri_core::arc::inhibitor(&guard_safe))
2268            .action(fork())
2269            .build();
2270
2271        let intent_trans = Transition::builder("Intent")
2272            .input(one(&intent_in))
2273            .output(out_place(&intent_ready))
2274            .action(fork())
2275            .build();
2276
2277        let topic_trans = Transition::builder("TopicKnowledge")
2278            .input(one(&intent_ready))
2279            .output(out_place(&topic_ready))
2280            .action(fork())
2281            .build();
2282
2283        let search_trans = Transition::builder("Search")
2284            .input(one(&search_in))
2285            .output(out_place(&search_ready))
2286            .read(libpetri_core::arc::read(&intent_ready))
2287            .inhibitor(libpetri_core::arc::inhibitor(&guard_violation))
2288            .priority(-5)
2289            .action(fork())
2290            .build();
2291
2292        let output_guard_trans = Transition::builder("OutputGuard")
2293            .input(one(&output_guard_in))
2294            .output(out_place(&_output_guard_done))
2295            .read(libpetri_core::arc::read(&guard_safe))
2296            .action(fork())
2297            .build();
2298
2299        let compose_trans = Transition::builder("Compose")
2300            .input(one(&guard_safe))
2301            .input(one(&search_ready))
2302            .input(one(&topic_ready))
2303            .output(out_place(&response))
2304            .priority(10)
2305            .action(fork())
2306            .build();
2307
2308        let net = PetriNet::builder("ComplexWorkflow")
2309            .transition(fork_trans)
2310            .transition(guard_trans)
2311            .transition(handle_violation)
2312            .transition(intent_trans)
2313            .transition(topic_trans)
2314            .transition(search_trans)
2315            .transition(output_guard_trans)
2316            .transition(compose_trans)
2317            .build();
2318
2319        let compiled = CompiledNet::compile(&net);
2320        let prog = PrecompiledNet::from_compiled(&compiled);
2321
2322        let mut marking = Marking::new();
2323        marking.add(&input, Token::at(1, 0));
2324
2325        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2326        let result = executor.run_to_completion();
2327
2328        // fork() produces to ALL output places, including both XOR branches.
2329        // This means guard_safe AND guard_violation both get tokens.
2330        // Search is inhibited by guard_violation, so it deadlocks.
2331        // The important thing is the executor doesn't crash and terminates.
2332        assert_eq!(result.count("v_input"), 0); // consumed by Fork
2333    }
2334
2335    #[cfg(feature = "tokio")]
2336    mod async_tests {
2337        use super::*;
2338        use crate::environment::ExternalEvent;
2339        use libpetri_core::action::async_action;
2340        use libpetri_core::petri_net::PetriNet;
2341        use libpetri_core::token::ErasedToken;
2342
2343        #[tokio::test]
2344        async fn async_linear_chain() {
2345            let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
2346            let transitions: Vec<Transition> = (0..5)
2347                .map(|i| {
2348                    Transition::builder(format!("t{i}"))
2349                        .input(one(&places[i]))
2350                        .output(out_place(&places[i + 1]))
2351                        .action(fork())
2352                        .build()
2353                })
2354                .collect();
2355
2356            let net = PetriNet::builder("chain5").transitions(transitions).build();
2357            let compiled = CompiledNet::compile(&net);
2358            let prog = PrecompiledNet::from_compiled(&compiled);
2359
2360            let mut marking = Marking::new();
2361            marking.add(&places[0], Token::at(1, 0));
2362
2363            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2364            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2365            let result = executor.run_async(rx).await;
2366
2367            assert_eq!(result.count("p0"), 0);
2368            assert_eq!(result.count("p5"), 1);
2369        }
2370
2371        #[tokio::test]
2372        async fn async_action_execution() {
2373            let p1 = Place::<i32>::new("p1");
2374            let p2 = Place::<i32>::new("p2");
2375
2376            let t = Transition::builder("t1")
2377                .input(one(&p1))
2378                .output(out_place(&p2))
2379                .action(async_action(|ctx| async { Ok(ctx) }))
2380                .build();
2381
2382            let net = PetriNet::builder("async_test").transition(t).build();
2383            let compiled = CompiledNet::compile(&net);
2384            let prog = PrecompiledNet::from_compiled(&compiled);
2385
2386            let mut marking = Marking::new();
2387            marking.add(&p1, Token::at(42, 0));
2388
2389            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2390            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2391            let result = executor.run_async(rx).await;
2392
2393            assert_eq!(result.count("p1"), 0);
2394        }
2395
2396        // ==================== Drain/Close lifecycle tests ====================
2397
2398        #[tokio::test]
2399        async fn async_drain_terminates_at_quiescence() {
2400            let p1 = Place::<i32>::new("p1");
2401            let p2 = Place::<i32>::new("p2");
2402
2403            let t1 = Transition::builder("t1")
2404                .input(one(&p1))
2405                .output(out_place(&p2))
2406                .action(fork())
2407                .build();
2408
2409            let net = PetriNet::builder("test").transition(t1).build();
2410            let compiled = CompiledNet::compile(&net);
2411            let prog = PrecompiledNet::from_compiled(&compiled);
2412
2413            let marking = Marking::new();
2414            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2415                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2416                .build();
2417
2418            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2419
2420            tokio::spawn(async move {
2421                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2422                tx.send(ExecutorSignal::Event(ExternalEvent {
2423                    place_name: Arc::from("p1"),
2424                    token: ErasedToken::from_typed(&Token::at(42, 0)),
2425                }))
2426                .unwrap();
2427                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2428                tx.send(ExecutorSignal::Drain).unwrap();
2429            });
2430
2431            let result = executor.run_async(rx).await;
2432            assert_eq!(result.count("p2"), 1);
2433        }
2434
2435        #[tokio::test]
2436        async fn async_drain_rejects_post_drain_events() {
2437            let p1 = Place::<i32>::new("p1");
2438            let p2 = Place::<i32>::new("p2");
2439
2440            let t1 = Transition::builder("t1")
2441                .input(one(&p1))
2442                .output(out_place(&p2))
2443                .action(fork())
2444                .build();
2445
2446            let net = PetriNet::builder("test").transition(t1).build();
2447            let compiled = CompiledNet::compile(&net);
2448            let prog = PrecompiledNet::from_compiled(&compiled);
2449
2450            let marking = Marking::new();
2451            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2452                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2453                .build();
2454
2455            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2456
2457            tokio::spawn(async move {
2458                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2459                tx.send(ExecutorSignal::Drain).unwrap();
2460                tx.send(ExecutorSignal::Event(ExternalEvent {
2461                    place_name: Arc::from("p1"),
2462                    token: ErasedToken::from_typed(&Token::at(99, 0)),
2463                }))
2464                .unwrap();
2465            });
2466
2467            let result = executor.run_async(rx).await;
2468            assert_eq!(result.count("p2"), 0);
2469        }
2470
2471        #[tokio::test]
2472        async fn async_close_discards_queued_events() {
2473            let p1 = Place::<i32>::new("p1");
2474            let p2 = Place::<i32>::new("p2");
2475
2476            let t1 = Transition::builder("t1")
2477                .input(one(&p1))
2478                .output(out_place(&p2))
2479                .action(fork())
2480                .build();
2481
2482            let net = PetriNet::builder("test").transition(t1).build();
2483            let compiled = CompiledNet::compile(&net);
2484            let prog = PrecompiledNet::from_compiled(&compiled);
2485
2486            let marking = Marking::new();
2487            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2488                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2489                .build();
2490
2491            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2492
2493            tx.send(ExecutorSignal::Event(ExternalEvent {
2494                place_name: Arc::from("p1"),
2495                token: ErasedToken::from_typed(&Token::at(1, 0)),
2496            }))
2497            .unwrap();
2498            tx.send(ExecutorSignal::Close).unwrap();
2499            tx.send(ExecutorSignal::Event(ExternalEvent {
2500                place_name: Arc::from("p1"),
2501                token: ErasedToken::from_typed(&Token::at(2, 0)),
2502            }))
2503            .unwrap();
2504            drop(tx);
2505
2506            let result = executor.run_async(rx).await;
2507            assert!(result.count("p2") <= 1);
2508        }
2509
2510        #[tokio::test]
2511        async fn async_close_after_drain_escalates() {
2512            let p1 = Place::<i32>::new("p1");
2513            let p2 = Place::<i32>::new("p2");
2514
2515            let t1 = Transition::builder("t1")
2516                .input(one(&p1))
2517                .output(out_place(&p2))
2518                .action(fork())
2519                .build();
2520
2521            let net = PetriNet::builder("test").transition(t1).build();
2522            let compiled = CompiledNet::compile(&net);
2523            let prog = PrecompiledNet::from_compiled(&compiled);
2524
2525            let marking = Marking::new();
2526            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2527                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2528                .build();
2529
2530            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2531
2532            tokio::spawn(async move {
2533                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2534                tx.send(ExecutorSignal::Drain).unwrap();
2535                tx.send(ExecutorSignal::Close).unwrap();
2536            });
2537
2538            let _result = executor.run_async(rx).await;
2539            // Test passes if run_async returns — close escalated from drain
2540        }
2541
2542        #[tokio::test]
2543        async fn async_handle_raii_drain_on_drop() {
2544            use crate::executor_handle::ExecutorHandle;
2545
2546            let p1 = Place::<i32>::new("p1");
2547            let p2 = Place::<i32>::new("p2");
2548
2549            let t1 = Transition::builder("t1")
2550                .input(one(&p1))
2551                .output(out_place(&p2))
2552                .action(fork())
2553                .build();
2554
2555            let net = PetriNet::builder("test").transition(t1).build();
2556            let compiled = CompiledNet::compile(&net);
2557            let prog = PrecompiledNet::from_compiled(&compiled);
2558
2559            let marking = Marking::new();
2560            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2561                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2562                .build();
2563
2564            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2565
2566            tokio::spawn(async move {
2567                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2568                let mut handle = ExecutorHandle::new(tx);
2569                handle.inject(
2570                    Arc::from("p1"),
2571                    ErasedToken::from_typed(&Token::at(7, 0)),
2572                );
2573                // handle dropped here — RAII sends Drain automatically
2574            });
2575
2576            let result = executor.run_async(rx).await;
2577            assert_eq!(result.count("p2"), 1);
2578        }
2579    }
2580}