Skip to main content

libpetri_runtime/
precompiled_executor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::token::ErasedToken;
8
9use libpetri_event::event_store::EventStore;
10use libpetri_event::net_event::NetEvent;
11
12use crate::bitmap;
13use crate::marking::Marking;
14use crate::precompiled_net::{
15    CONSUME_ALL, CONSUME_ATLEAST, CONSUME_N, CONSUME_ONE, PrecompiledNet, RESET,
16};
17
18/// Tolerance for deadline enforcement to account for timer jitter.
19const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21/// Initial capacity for ring buffer per place.
22const INITIAL_RING_CAPACITY: usize = 4;
23
24/// High-performance precompiled flat-array Petri net executor.
25///
26/// Uses ring-buffer token storage, opcode-based consume operations,
27/// priority-partitioned ready queues, and two-level summary bitmaps
28/// for sparse iteration. Generic over `E: EventStore` for zero-cost
29/// noop event recording.
30pub struct PrecompiledNetExecutor<'a, E: EventStore> {
31    program: &'a PrecompiledNet<'a>,
32    event_store: E,
33    #[allow(dead_code)]
34    environment_places: HashSet<Arc<str>>,
35    has_environment_places: bool,
36    #[allow(dead_code)]
37    skip_output_validation: bool,
38
39    // ==================== Flat Token Pool ====================
40    token_pool: Vec<Option<ErasedToken>>,
41    place_offset: Vec<usize>,
42    token_counts: Vec<usize>,
43    ring_head: Vec<usize>,
44    ring_tail: Vec<usize>,
45    ring_capacity: Vec<usize>,
46
47    // ==================== Presence Bitmap ====================
48    marking_bitmap: Vec<u64>,
49
50    // ==================== Transition State ====================
51    enabled_bitmap: Vec<u64>,
52    dirty_bitmap: Vec<u64>,
53    dirty_scan_buffer: Vec<u64>,
54    enabled_at_ms: Vec<f64>,
55    enabled_transition_count: usize,
56
57    // ==================== Summary Bitmaps (two-level) ====================
58    dirty_word_summary: Vec<u64>,
59    enabled_word_summary: Vec<u64>,
60    transition_words: usize,
61    summary_words: usize,
62
63    // ==================== Priority-Partitioned Ready Queues ====================
64    ready_queues: Vec<Vec<usize>>,
65    ready_queue_head: Vec<usize>,
66    ready_queue_tail: Vec<usize>,
67    ready_queue_size: Vec<usize>,
68
69    // ==================== Reset-Clock Detection ====================
70    pending_reset_words: Vec<u64>,
71    has_pending_resets: bool,
72
73    // ==================== Reusable Buffers (avoid per-firing allocation) ====================
74    reusable_inputs: HashMap<Arc<str>, Vec<ErasedToken>>,
75    reusable_reads: HashMap<Arc<str>, Vec<ErasedToken>>,
76
77    // ==================== Lifecycle ====================
78    start_time: Instant,
79}
80
81/// Builder for PrecompiledNetExecutor.
82pub struct PrecompiledExecutorBuilder<'a, E: EventStore> {
83    program: &'a PrecompiledNet<'a>,
84    initial_marking: Marking,
85    event_store: Option<E>,
86    environment_places: HashSet<Arc<str>>,
87    skip_output_validation: bool,
88}
89
90impl<'a, E: EventStore> PrecompiledExecutorBuilder<'a, E> {
91    /// Sets the event store.
92    pub fn event_store(mut self, store: E) -> Self {
93        self.event_store = Some(store);
94        self
95    }
96
97    /// Sets the environment places.
98    pub fn environment_places(mut self, places: HashSet<Arc<str>>) -> Self {
99        self.environment_places = places;
100        self
101    }
102
103    /// Skips output validation for trusted transition actions.
104    pub fn skip_output_validation(mut self, skip: bool) -> Self {
105        self.skip_output_validation = skip;
106        self
107    }
108
109    /// Builds the executor.
110    pub fn build(self) -> PrecompiledNetExecutor<'a, E> {
111        PrecompiledNetExecutor::new_inner(
112            self.program,
113            self.initial_marking,
114            self.event_store.unwrap_or_default(),
115            self.environment_places,
116            self.skip_output_validation,
117        )
118    }
119}
120
121impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
122    /// Creates a builder for a PrecompiledNetExecutor.
123    pub fn builder(
124        program: &'a PrecompiledNet<'a>,
125        initial_marking: Marking,
126    ) -> PrecompiledExecutorBuilder<'a, E> {
127        PrecompiledExecutorBuilder {
128            program,
129            initial_marking,
130            event_store: None,
131            environment_places: HashSet::new(),
132            skip_output_validation: false,
133        }
134    }
135
136    /// Creates a new executor with default options.
137    pub fn new(program: &'a PrecompiledNet<'a>, initial_marking: Marking) -> Self {
138        Self::new_inner(
139            program,
140            initial_marking,
141            E::default(),
142            HashSet::new(),
143            false,
144        )
145    }
146
147    fn new_inner(
148        program: &'a PrecompiledNet<'a>,
149        initial_marking: Marking,
150        event_store: E,
151        environment_places: HashSet<Arc<str>>,
152        skip_output_validation: bool,
153    ) -> Self {
154        let pc = program.place_count();
155        let tc = program.transition_count();
156        let wc = program.word_count();
157
158        // Initialize flat token pool
159        let total_slots = pc * INITIAL_RING_CAPACITY;
160        let mut token_pool = vec![None; total_slots];
161        let mut place_offset = vec![0usize; pc];
162        let mut token_counts = vec![0usize; pc];
163        let mut ring_head = vec![0usize; pc];
164        let mut ring_tail = vec![0usize; pc];
165        let mut ring_capacity = vec![INITIAL_RING_CAPACITY; pc];
166
167        for (pid, offset) in place_offset.iter_mut().enumerate() {
168            *offset = pid * INITIAL_RING_CAPACITY;
169        }
170
171        // Load initial tokens into ring buffers
172        for pid in 0..pc {
173            let place = program.place(pid);
174            if let Some(queue) = initial_marking.queue(place.name()) {
175                for token in queue {
176                    // Ring add last
177                    if token_counts[pid] == ring_capacity[pid] {
178                        grow_ring_static(
179                            &mut token_pool,
180                            &mut place_offset,
181                            &mut ring_head,
182                            &mut ring_tail,
183                            &mut ring_capacity,
184                            &token_counts,
185                            pid,
186                        );
187                    }
188                    let tail = ring_tail[pid];
189                    let offset = place_offset[pid];
190                    token_pool[offset + tail] = Some(token.clone());
191                    ring_tail[pid] = (tail + 1) % ring_capacity[pid];
192                    token_counts[pid] += 1;
193                }
194            }
195        }
196
197        // Transition bitmaps
198        let transition_words = bitmap::word_count(tc);
199        let summary_words = bitmap::word_count(transition_words);
200
201        // Priority-partitioned ready queues
202        let prio_count = program.distinct_priority_count;
203        let queue_cap = tc.max(4);
204        let ready_queues = vec![vec![0usize; queue_cap]; prio_count];
205        let ready_queue_head = vec![0usize; prio_count];
206        let ready_queue_tail = vec![0usize; prio_count];
207        let ready_queue_size = vec![0usize; prio_count];
208
209        Self {
210            program,
211            event_store,
212            has_environment_places: !environment_places.is_empty(),
213            environment_places,
214            skip_output_validation,
215            token_pool,
216            place_offset,
217            token_counts,
218            ring_head,
219            ring_tail,
220            ring_capacity,
221            marking_bitmap: vec![0u64; wc],
222            enabled_bitmap: vec![0u64; transition_words],
223            dirty_bitmap: vec![0u64; transition_words],
224            dirty_scan_buffer: vec![0u64; transition_words],
225            enabled_at_ms: vec![f64::NEG_INFINITY; tc],
226            enabled_transition_count: 0,
227            dirty_word_summary: vec![0u64; summary_words],
228            enabled_word_summary: vec![0u64; summary_words],
229            transition_words,
230            summary_words,
231            ready_queues,
232            ready_queue_head,
233            ready_queue_tail,
234            ready_queue_size,
235            pending_reset_words: vec![0u64; wc],
236            has_pending_resets: false,
237            reusable_inputs: HashMap::new(),
238            reusable_reads: HashMap::new(),
239            start_time: Instant::now(),
240        }
241    }
242
243    // ==================== Public API ====================
244
245    /// Runs the executor synchronously until completion.
246    /// Returns the final marking (materialized from ring buffers).
247    pub fn run_sync(&mut self) -> Marking {
248        self.run_to_completion()
249    }
250
251    /// Returns the materialized marking (synced from ring buffers).
252    pub fn marking(&self) -> Marking {
253        self.materialize_marking()
254    }
255
256    /// Returns a reference to the event store.
257    pub fn event_store(&self) -> &E {
258        &self.event_store
259    }
260
261    /// Returns true if the executor is quiescent.
262    pub fn is_quiescent(&self) -> bool {
263        self.enabled_transition_count == 0
264    }
265
266    // ==================== Ring Buffer Operations ====================
267
268    #[inline]
269    fn ring_remove_first(&mut self, pid: usize) -> ErasedToken {
270        let head = self.ring_head[pid];
271        let offset = self.place_offset[pid];
272        let token = self.token_pool[offset + head].take().unwrap();
273        self.ring_head[pid] = (head + 1) % self.ring_capacity[pid];
274        self.token_counts[pid] -= 1;
275        token
276    }
277
278    #[inline]
279    fn ring_add_last(&mut self, pid: usize, token: ErasedToken) {
280        if self.token_counts[pid] == self.ring_capacity[pid] {
281            self.grow_ring(pid);
282        }
283        let tail = self.ring_tail[pid];
284        let offset = self.place_offset[pid];
285        self.token_pool[offset + tail] = Some(token);
286        self.ring_tail[pid] = (tail + 1) % self.ring_capacity[pid];
287        self.token_counts[pid] += 1;
288    }
289
290    #[inline]
291    fn ring_peek_first(&self, pid: usize) -> Option<&ErasedToken> {
292        if self.token_counts[pid] == 0 {
293            return None;
294        }
295        self.token_pool[self.place_offset[pid] + self.ring_head[pid]].as_ref()
296    }
297
298    fn ring_remove_all(&mut self, pid: usize) -> Vec<ErasedToken> {
299        let count = self.token_counts[pid];
300        if count == 0 {
301            return Vec::new();
302        }
303        let mut result = Vec::with_capacity(count);
304        for _ in 0..count {
305            result.push(self.ring_remove_first(pid));
306        }
307        result
308    }
309
310    fn grow_ring(&mut self, pid: usize) {
311        grow_ring_static(
312            &mut self.token_pool,
313            &mut self.place_offset,
314            &mut self.ring_head,
315            &mut self.ring_tail,
316            &mut self.ring_capacity,
317            &self.token_counts,
318            pid,
319        );
320    }
321
322    // ==================== Bitmap Helpers ====================
323
324    #[inline]
325    fn set_enabled_bit(&mut self, tid: usize) {
326        let w = tid >> bitmap::WORD_SHIFT;
327        self.enabled_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
328        self.enabled_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
329    }
330
331    #[inline]
332    fn clear_enabled_bit(&mut self, tid: usize) {
333        let w = tid >> bitmap::WORD_SHIFT;
334        self.enabled_bitmap[w] &= !(1u64 << (tid & bitmap::WORD_MASK));
335        if self.enabled_bitmap[w] == 0 {
336            self.enabled_word_summary[w >> bitmap::WORD_SHIFT] &=
337                !(1u64 << (w & bitmap::WORD_MASK));
338        }
339    }
340
341    #[inline]
342    fn is_enabled(&self, tid: usize) -> bool {
343        (self.enabled_bitmap[tid >> bitmap::WORD_SHIFT] & (1u64 << (tid & bitmap::WORD_MASK))) != 0
344    }
345
346    #[inline]
347    fn set_marking_bit(&mut self, pid: usize) {
348        bitmap::set_bit(&mut self.marking_bitmap, pid);
349    }
350
351    #[inline]
352    fn clear_marking_bit(&mut self, pid: usize) {
353        bitmap::clear_bit(&mut self.marking_bitmap, pid);
354    }
355
356    // ==================== Ready Queue Operations ====================
357
358    fn ready_queue_push(&mut self, tid: usize) {
359        let pi = self.program.transition_to_priority_index[tid];
360        if self.ready_queue_size[pi] == self.ready_queues[pi].len() {
361            let old_cap = self.ready_queues[pi].len();
362            let new_cap = old_cap * 2;
363            let mut new_queue = vec![0usize; new_cap];
364            let head = self.ready_queue_head[pi];
365            for (i, slot) in new_queue.iter_mut().enumerate().take(old_cap) {
366                *slot = self.ready_queues[pi][(head + i) % old_cap];
367            }
368            self.ready_queues[pi] = new_queue;
369            self.ready_queue_head[pi] = 0;
370            self.ready_queue_tail[pi] = old_cap;
371        }
372        let tail = self.ready_queue_tail[pi];
373        self.ready_queues[pi][tail] = tid;
374        self.ready_queue_tail[pi] = (tail + 1) % self.ready_queues[pi].len();
375        self.ready_queue_size[pi] += 1;
376    }
377
378    fn ready_queue_pop(&mut self, pi: usize) -> usize {
379        let head = self.ready_queue_head[pi];
380        let tid = self.ready_queues[pi][head];
381        self.ready_queue_head[pi] = (head + 1) % self.ready_queues[pi].len();
382        self.ready_queue_size[pi] -= 1;
383        tid
384    }
385
386    fn clear_all_ready_queues(&mut self) {
387        for pi in 0..self.program.distinct_priority_count {
388            self.ready_queue_head[pi] = 0;
389            self.ready_queue_tail[pi] = 0;
390            self.ready_queue_size[pi] = 0;
391        }
392    }
393
394    // ==================== Initialize ====================
395
396    fn initialize_marking_bitmap(&mut self) {
397        for pid in 0..self.program.place_count() {
398            if self.token_counts[pid] > 0 {
399                self.set_marking_bit(pid);
400            }
401        }
402    }
403
404    fn mark_all_dirty(&mut self) {
405        let tc = self.program.transition_count();
406        let last_word_bits = tc & bitmap::WORD_MASK;
407        for w in 0..self.transition_words.saturating_sub(1) {
408            self.dirty_bitmap[w] = u64::MAX;
409        }
410        if self.transition_words > 0 {
411            self.dirty_bitmap[self.transition_words - 1] = if last_word_bits == 0 {
412                u64::MAX
413            } else {
414                (1u64 << last_word_bits) - 1
415            };
416        }
417        // Set all summary bits
418        for s in 0..self.summary_words {
419            let first_w = s << bitmap::WORD_SHIFT;
420            let last_w = (first_w + bitmap::WORD_MASK).min(self.transition_words.saturating_sub(1));
421            let count = last_w - first_w + 1;
422            let last_bits = count & bitmap::WORD_MASK;
423            self.dirty_word_summary[s] = if last_bits == 0 {
424                u64::MAX
425            } else {
426                (1u64 << last_bits) - 1
427            };
428        }
429    }
430
431    fn should_terminate(&self) -> bool {
432        if self.has_environment_places {
433            return false;
434        }
435        self.enabled_transition_count == 0
436    }
437
438    // ==================== Dirty Set Processing ====================
439
440    fn update_dirty_transitions(&mut self) {
441        let now_ms = self.elapsed_ms();
442
443        // Snapshot and clear dirty bitmap using summary
444        for s in 0..self.summary_words {
445            let mut summary = self.dirty_word_summary[s];
446            self.dirty_word_summary[s] = 0;
447            while summary != 0 {
448                let local_w = summary.trailing_zeros() as usize;
449                summary &= summary - 1;
450                let w = (s << bitmap::WORD_SHIFT) | local_w;
451                if w < self.transition_words {
452                    self.dirty_scan_buffer[w] = self.dirty_bitmap[w];
453                    self.dirty_bitmap[w] = 0;
454                }
455            }
456        }
457
458        // Process dirty transitions
459        let tc = self.program.transition_count();
460        for w in 0..self.transition_words {
461            let mut word = self.dirty_scan_buffer[w];
462            if word == 0 {
463                continue;
464            }
465            self.dirty_scan_buffer[w] = 0;
466            while word != 0 {
467                let bit = word.trailing_zeros() as usize;
468                let tid = (w << bitmap::WORD_SHIFT) | bit;
469                word &= word - 1;
470
471                if tid >= tc {
472                    break;
473                }
474
475                let was_enabled = self.is_enabled(tid);
476                let can_now = self.can_enable(tid);
477
478                if can_now && !was_enabled {
479                    self.set_enabled_bit(tid);
480                    self.enabled_transition_count += 1;
481                    self.enabled_at_ms[tid] = now_ms;
482
483                    if E::ENABLED {
484                        self.event_store.append(NetEvent::TransitionEnabled {
485                            transition_name: Arc::clone(self.program.transition(tid).name_arc()),
486                            timestamp: now_millis(),
487                        });
488                    }
489                } else if !can_now && was_enabled {
490                    self.clear_enabled_bit(tid);
491                    self.enabled_transition_count -= 1;
492                    self.enabled_at_ms[tid] = f64::NEG_INFINITY;
493                } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
494                    self.enabled_at_ms[tid] = now_ms;
495                    if E::ENABLED {
496                        self.event_store.append(NetEvent::TransitionClockRestarted {
497                            transition_name: Arc::clone(self.program.transition(tid).name_arc()),
498                            timestamp: now_millis(),
499                        });
500                    }
501                }
502            }
503        }
504
505        self.clear_pending_resets();
506    }
507
508    fn can_enable(&self, tid: usize) -> bool {
509        if !self.program.can_enable_bitmap(tid, &self.marking_bitmap) {
510            return false;
511        }
512
513        // Cardinality check using token_counts directly (no HashMap lookup)
514        if let Some(card_check) = self.program.compiled().cardinality_check(tid) {
515            for i in 0..card_check.place_ids.len() {
516                let pid = card_check.place_ids[i];
517                let required = card_check.required_counts[i];
518                if self.token_counts[pid] < required {
519                    return false;
520                }
521            }
522        }
523
524        // Guard check (needs token access)
525        if self.program.compiled().has_guards(tid) {
526            let t = self.program.transition(tid);
527            for spec in t.input_specs() {
528                if let Some(guard) = spec.guard() {
529                    let required = match spec {
530                        In::One { .. } => 1,
531                        In::Exactly { count, .. } => *count,
532                        In::AtLeast { minimum, .. } => *minimum,
533                        In::All { .. } => 1,
534                    };
535                    let pid = self.program.place_id(spec.place_name()).unwrap();
536                    let count = self.count_matching_in_ring(pid, &**guard);
537                    if count < required {
538                        return false;
539                    }
540                }
541            }
542        }
543
544        true
545    }
546
547    fn count_matching_in_ring(
548        &self,
549        pid: usize,
550        guard: &dyn Fn(&dyn std::any::Any) -> bool,
551    ) -> usize {
552        let count = self.token_counts[pid];
553        if count == 0 {
554            return 0;
555        }
556        let offset = self.place_offset[pid];
557        let head = self.ring_head[pid];
558        let cap = self.ring_capacity[pid];
559        let mut matched = 0;
560        for i in 0..count {
561            let idx = offset + (head + i) % cap;
562            if let Some(token) = &self.token_pool[idx]
563                && guard(token.value.as_ref())
564            {
565                matched += 1;
566            }
567        }
568        matched
569    }
570
571    fn has_input_from_reset_place(&self, tid: usize) -> bool {
572        if !self.has_pending_resets {
573            return false;
574        }
575        let input_mask = &self.program.input_place_mask_words[tid];
576        for (im, pr) in input_mask.iter().zip(self.pending_reset_words.iter()) {
577            if (im & pr) != 0 {
578                return true;
579            }
580        }
581        false
582    }
583
584    fn clear_pending_resets(&mut self) {
585        if self.has_pending_resets {
586            for w in &mut self.pending_reset_words {
587                *w = 0;
588            }
589            self.has_pending_resets = false;
590        }
591    }
592
593    // ==================== Deadline Enforcement ====================
594
595    fn enforce_deadlines(&mut self, now_ms: f64) {
596        for s in 0..self.summary_words {
597            let mut summary = self.enabled_word_summary[s];
598            while summary != 0 {
599                let local_w = summary.trailing_zeros() as usize;
600                summary &= summary - 1;
601                let w = (s << bitmap::WORD_SHIFT) | local_w;
602                if w >= self.transition_words {
603                    continue;
604                }
605                let mut word = self.enabled_bitmap[w];
606                while word != 0 {
607                    let bit = word.trailing_zeros() as usize;
608                    let tid = (w << bitmap::WORD_SHIFT) | bit;
609                    word &= word - 1;
610
611                    if !self.program.has_deadline[tid] {
612                        continue;
613                    }
614
615                    let elapsed = now_ms - self.enabled_at_ms[tid];
616                    let latest_ms = self.program.latest_ms[tid];
617
618                    if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
619                        self.clear_enabled_bit(tid);
620                        self.enabled_transition_count -= 1;
621                        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
622                        self.mark_transition_dirty(tid);
623
624                        if E::ENABLED {
625                            self.event_store.append(NetEvent::TransitionTimedOut {
626                                transition_name: Arc::clone(
627                                    self.program.transition(tid).name_arc(),
628                                ),
629                                timestamp: now_millis(),
630                            });
631                        }
632                    }
633                }
634            }
635        }
636    }
637
638    // ==================== Firing (Sync) ====================
639
640    fn fire_ready_immediate_sync(&mut self) {
641        for s in 0..self.summary_words {
642            let mut summary = self.enabled_word_summary[s];
643            while summary != 0 {
644                let local_w = summary.trailing_zeros() as usize;
645                summary &= summary - 1;
646                let w = (s << bitmap::WORD_SHIFT) | local_w;
647                if w >= self.transition_words {
648                    continue;
649                }
650                let word = self.enabled_bitmap[w];
651                let mut remaining = word;
652                while remaining != 0 {
653                    let bit = remaining.trailing_zeros() as usize;
654                    let tid = (w << bitmap::WORD_SHIFT) | bit;
655                    remaining &= remaining - 1;
656
657                    if self.can_enable(tid) {
658                        self.fire_transition_sync(tid);
659                    } else {
660                        self.clear_enabled_bit(tid);
661                        self.enabled_transition_count -= 1;
662                        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
663                    }
664                }
665            }
666        }
667    }
668
669    fn fire_ready_general_sync(&mut self, now_ms: f64) {
670        // Populate ready queues from enabled bitmap using summary
671        self.clear_all_ready_queues();
672
673        for s in 0..self.summary_words {
674            let mut summary = self.enabled_word_summary[s];
675            while summary != 0 {
676                let local_w = summary.trailing_zeros() as usize;
677                summary &= summary - 1;
678                let w = (s << bitmap::WORD_SHIFT) | local_w;
679                if w >= self.transition_words {
680                    continue;
681                }
682                let mut word = self.enabled_bitmap[w];
683                while word != 0 {
684                    let bit = word.trailing_zeros() as usize;
685                    let tid = (w << bitmap::WORD_SHIFT) | bit;
686                    word &= word - 1;
687
688                    let enabled_ms = self.enabled_at_ms[tid];
689                    let elapsed = now_ms - enabled_ms;
690
691                    if self.program.earliest_ms[tid] <= elapsed {
692                        self.ready_queue_push(tid);
693                    }
694                }
695            }
696        }
697
698        // Fire from highest priority queue first
699        for pi in 0..self.program.distinct_priority_count {
700            while self.ready_queue_size[pi] > 0 {
701                let tid = self.ready_queue_pop(pi);
702                if !self.is_enabled(tid) {
703                    continue;
704                }
705
706                if self.can_enable(tid) {
707                    self.fire_transition_sync(tid);
708                } else {
709                    self.clear_enabled_bit(tid);
710                    self.enabled_transition_count -= 1;
711                    self.enabled_at_ms[tid] = f64::NEG_INFINITY;
712                }
713            }
714        }
715    }
716
717    // ==================== Opcode-Based Consume ====================
718
719    fn fire_transition_sync(&mut self, tid: usize) {
720        let has_guards = self.program.compiled().has_guards(tid);
721        let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
722        let action = Arc::clone(self.program.transition(tid).action());
723
724        // Reuse pre-allocated input/read buffers (clear entries, keep HashMap capacity)
725        self.reusable_inputs.clear();
726        self.reusable_reads.clear();
727
728        if has_guards {
729            // Fall back to spec-based consumption for guarded transitions
730            let input_specs: Vec<In> = self.program.transition(tid).input_specs().to_vec();
731            let reset_arcs: Vec<_> = self.program.transition(tid).resets().to_vec();
732
733            for in_spec in &input_specs {
734                let pid = self.program.place_id(in_spec.place_name()).unwrap();
735                let place_name_arc = Arc::clone(&self.program.place_name_arcs[pid]);
736                let to_consume = match in_spec {
737                    In::One { .. } => 1,
738                    In::Exactly { count, .. } => *count,
739                    In::All { guard, .. } | In::AtLeast { guard, .. } => {
740                        if guard.is_some() {
741                            self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
742                        } else {
743                            self.token_counts[pid]
744                        }
745                    }
746                };
747
748                for _ in 0..to_consume {
749                    let token = if let Some(guard) = in_spec.guard() {
750                        self.ring_remove_matching(pid, &**guard)
751                    } else {
752                        Some(self.ring_remove_first(pid))
753                    };
754                    if let Some(token) = token {
755                        if E::ENABLED {
756                            self.event_store.append(NetEvent::TokenRemoved {
757                                place_name: Arc::clone(&place_name_arc),
758                                timestamp: now_millis(),
759                            });
760                        }
761                        self.reusable_inputs
762                            .entry(Arc::clone(&place_name_arc))
763                            .or_default()
764                            .push(token);
765                    }
766                }
767            }
768
769            // Reset arcs
770            for arc in &reset_arcs {
771                let pid = self.program.place_id(arc.place.name()).unwrap();
772                let removed = self.ring_remove_all(pid);
773                if E::ENABLED {
774                    for _ in &removed {
775                        self.event_store.append(NetEvent::TokenRemoved {
776                            place_name: Arc::clone(arc.place.name_arc()),
777                            timestamp: now_millis(),
778                        });
779                    }
780                }
781                self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
782                    1u64 << (pid & bitmap::WORD_MASK);
783                self.has_pending_resets = true;
784            }
785        } else {
786            // Fast path: opcode-based consumption (no guards, no clone)
787            let ops_len = self.program.consume_ops[tid].len();
788            let mut pc = 0;
789            while pc < ops_len {
790                let opcode = self.program.consume_ops[tid][pc];
791                pc += 1;
792                match opcode {
793                    CONSUME_ONE => {
794                        let pid = self.program.consume_ops[tid][pc] as usize;
795                        pc += 1;
796                        let token = self.ring_remove_first(pid);
797                        if E::ENABLED {
798                            self.event_store.append(NetEvent::TokenRemoved {
799                                place_name: Arc::clone(&self.program.place_name_arcs[pid]),
800                                timestamp: now_millis(),
801                            });
802                        }
803                        self.reusable_inputs
804                            .entry(Arc::clone(&self.program.place_name_arcs[pid]))
805                            .or_default()
806                            .push(token);
807                    }
808                    CONSUME_N => {
809                        let pid = self.program.consume_ops[tid][pc] as usize;
810                        pc += 1;
811                        let count = self.program.consume_ops[tid][pc] as usize;
812                        pc += 1;
813                        for _ in 0..count {
814                            let token = self.ring_remove_first(pid);
815                            if E::ENABLED {
816                                self.event_store.append(NetEvent::TokenRemoved {
817                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
818                                    timestamp: now_millis(),
819                                });
820                            }
821                            self.reusable_inputs
822                                .entry(Arc::clone(&self.program.place_name_arcs[pid]))
823                                .or_default()
824                                .push(token);
825                        }
826                    }
827                    CONSUME_ALL | CONSUME_ATLEAST => {
828                        let pid = self.program.consume_ops[tid][pc] as usize;
829                        pc += 1;
830                        if opcode == CONSUME_ATLEAST {
831                            pc += 1;
832                        }
833                        let count = self.token_counts[pid];
834                        for _ in 0..count {
835                            let token = self.ring_remove_first(pid);
836                            if E::ENABLED {
837                                self.event_store.append(NetEvent::TokenRemoved {
838                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
839                                    timestamp: now_millis(),
840                                });
841                            }
842                            self.reusable_inputs
843                                .entry(Arc::clone(&self.program.place_name_arcs[pid]))
844                                .or_default()
845                                .push(token);
846                        }
847                    }
848                    RESET => {
849                        let pid = self.program.consume_ops[tid][pc] as usize;
850                        pc += 1;
851                        let count = self.token_counts[pid];
852                        for _ in 0..count {
853                            let _token = self.ring_remove_first(pid);
854                            if E::ENABLED {
855                                self.event_store.append(NetEvent::TokenRemoved {
856                                    place_name: Arc::clone(&self.program.place_name_arcs[pid]),
857                                    timestamp: now_millis(),
858                                });
859                            }
860                        }
861                        self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
862                            1u64 << (pid & bitmap::WORD_MASK);
863                        self.has_pending_resets = true;
864                    }
865                    _ => unreachable!("Unknown opcode: {opcode}"),
866                }
867            }
868        }
869
870        // Execute read program — iterate by index, no clone of ops Vec
871        let read_ops_len = self.program.read_ops[tid].len();
872        for i in 0..read_ops_len {
873            let rpid = self.program.read_ops[tid][i];
874            let token_clone = self.ring_peek_first(rpid).cloned();
875            if let Some(token) = token_clone {
876                let place_name = Arc::clone(&self.program.place_name_arcs[rpid]);
877                self.reusable_reads
878                    .entry(place_name)
879                    .or_default()
880                    .push(token);
881            }
882        }
883
884        // Update bitmap for consumed/reset places
885        self.update_bitmap_after_consumption(tid);
886
887        if E::ENABLED {
888            self.event_store.append(NetEvent::TransitionStarted {
889                transition_name: Arc::clone(&transition_name),
890                timestamp: now_millis(),
891            });
892        }
893
894        // Create context using precomputed output place names and reusable buffers
895        let inputs = std::mem::take(&mut self.reusable_inputs);
896        let reads = std::mem::take(&mut self.reusable_reads);
897        let mut ctx = TransitionContext::new(
898            Arc::clone(&transition_name),
899            inputs,
900            reads,
901            self.program.output_place_name_sets[tid].clone(),
902            None,
903        );
904
905        let result = action.run_sync(&mut ctx);
906
907        // Reclaim buffers for reuse — keeps HashMap bucket allocations alive
908        let returned_inputs = ctx.take_inputs();
909        let returned_reads = ctx.take_reads();
910
911        match result {
912            Ok(()) => {
913                let outputs = ctx.take_outputs();
914                self.process_outputs(tid, &transition_name, outputs);
915
916                if E::ENABLED {
917                    self.event_store.append(NetEvent::TransitionCompleted {
918                        transition_name: Arc::clone(&transition_name),
919                        timestamp: now_millis(),
920                    });
921                }
922            }
923            Err(err) => {
924                if E::ENABLED {
925                    self.event_store.append(NetEvent::TransitionFailed {
926                        transition_name: Arc::clone(&transition_name),
927                        error: err.message,
928                        timestamp: now_millis(),
929                    });
930                }
931            }
932        }
933
934        // Return reclaimed buffers for next firing
935        self.reusable_inputs = returned_inputs;
936        self.reusable_reads = returned_reads;
937
938        // Clear enabled status
939        self.clear_enabled_bit(tid);
940        self.enabled_transition_count -= 1;
941        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
942
943        // Mark this transition dirty for re-evaluation
944        self.mark_transition_dirty(tid);
945    }
946
947    /// Removes the first token matching `guard` from the ring buffer at `pid`.
948    /// Returns at most one token per call (In::One guard semantics).
949    fn ring_remove_matching(
950        &mut self,
951        pid: usize,
952        guard: &dyn Fn(&dyn std::any::Any) -> bool,
953    ) -> Option<ErasedToken> {
954        let count = self.token_counts[pid];
955        if count == 0 {
956            return None;
957        }
958        let offset = self.place_offset[pid];
959        let head = self.ring_head[pid];
960        let cap = self.ring_capacity[pid];
961
962        // Find first matching token
963        for i in 0..count {
964            let idx = offset + (head + i) % cap;
965            if let Some(token) = &self.token_pool[idx]
966                && guard(token.value.as_ref())
967            {
968                let token = self.token_pool[idx].take().unwrap();
969                // Compact the ring by shifting remaining elements
970                for j in i..count - 1 {
971                    let from = offset + (head + j + 1) % cap;
972                    let to = offset + (head + j) % cap;
973                    self.token_pool[to] = self.token_pool[from].take();
974                }
975                self.token_counts[pid] -= 1;
976                self.ring_tail[pid] = if self.ring_tail[pid] == 0 {
977                    cap - 1
978                } else {
979                    self.ring_tail[pid] - 1
980                };
981                return Some(token);
982            }
983        }
984        None
985    }
986
987    fn process_outputs(
988        &mut self,
989        _tid: usize,
990        _transition_name: &Arc<str>,
991        outputs: Vec<OutputEntry>,
992    ) {
993        for entry in outputs {
994            if let Some(pid) = self.program.place_id(&entry.place_name) {
995                self.ring_add_last(pid, entry.token);
996                self.set_marking_bit(pid);
997                self.mark_dirty(pid);
998            }
999
1000            if E::ENABLED {
1001                self.event_store.append(NetEvent::TokenAdded {
1002                    place_name: Arc::clone(&entry.place_name),
1003                    timestamp: now_millis(),
1004                });
1005            }
1006        }
1007    }
1008
1009    fn update_bitmap_after_consumption(&mut self, tid: usize) {
1010        let n = self.program.compiled().consumption_place_ids(tid).len();
1011        for i in 0..n {
1012            let pid = self.program.compiled().consumption_place_ids(tid)[i];
1013            if self.token_counts[pid] == 0 {
1014                self.clear_marking_bit(pid);
1015            }
1016            self.mark_dirty(pid);
1017        }
1018    }
1019
1020    // ==================== Dirty Set Helpers ====================
1021
1022    fn has_dirty_bits(&self) -> bool {
1023        for &s in &self.dirty_word_summary {
1024            if s != 0 {
1025                return true;
1026            }
1027        }
1028        false
1029    }
1030
1031    fn mark_dirty(&mut self, pid: usize) {
1032        let n = self.program.compiled().affected_transitions(pid).len();
1033        for i in 0..n {
1034            let tid = self.program.compiled().affected_transitions(pid)[i];
1035            self.mark_transition_dirty(tid);
1036        }
1037    }
1038
1039    fn mark_transition_dirty(&mut self, tid: usize) {
1040        let w = tid >> bitmap::WORD_SHIFT;
1041        self.dirty_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
1042        self.dirty_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
1043    }
1044
1045    fn elapsed_ms(&self) -> f64 {
1046        self.start_time.elapsed().as_secs_f64() * 1000.0
1047    }
1048
1049    // ==================== Marking Sync ====================
1050
1051    fn materialize_marking(&self) -> Marking {
1052        let mut marking = Marking::new();
1053        for pid in 0..self.program.place_count() {
1054            let count = self.token_counts[pid];
1055            if count == 0 {
1056                continue;
1057            }
1058            let place_name = self.program.place(pid).name_arc();
1059            let offset = self.place_offset[pid];
1060            let head = self.ring_head[pid];
1061            let cap = self.ring_capacity[pid];
1062            for i in 0..count {
1063                let idx = offset + (head + i) % cap;
1064                if let Some(token) = &self.token_pool[idx] {
1065                    marking.add_erased(place_name, token.clone());
1066                }
1067            }
1068        }
1069        marking
1070    }
1071
1072    /// Runs the executor synchronously and returns the final marking.
1073    fn run_to_completion(&mut self) -> Marking {
1074        self.initialize_marking_bitmap();
1075        self.mark_all_dirty();
1076
1077        if E::ENABLED {
1078            let now = now_millis();
1079            self.event_store.append(NetEvent::ExecutionStarted {
1080                net_name: Arc::from(self.program.net().name()),
1081                timestamp: now,
1082            });
1083        }
1084
1085        loop {
1086            self.update_dirty_transitions();
1087
1088            let cycle_now = self.elapsed_ms();
1089
1090            if self.program.any_deadlines {
1091                self.enforce_deadlines(cycle_now);
1092            }
1093
1094            if self.should_terminate() {
1095                break;
1096            }
1097
1098            if self.program.all_immediate && self.program.all_same_priority {
1099                self.fire_ready_immediate_sync();
1100            } else {
1101                self.fire_ready_general_sync(cycle_now);
1102            }
1103
1104            if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
1105                break;
1106            }
1107        }
1108
1109        if E::ENABLED {
1110            let now = now_millis();
1111            self.event_store.append(NetEvent::ExecutionCompleted {
1112                net_name: Arc::from(self.program.net().name()),
1113                timestamp: now,
1114            });
1115        }
1116
1117        self.materialize_marking()
1118    }
1119}
1120
1121// Async path
1122#[cfg(feature = "tokio")]
1123use crate::environment::ExecutorSignal;
1124
1125#[cfg(feature = "tokio")]
1126struct ActionCompletion {
1127    transition_name: Arc<str>,
1128    result: Result<Vec<OutputEntry>, String>,
1129}
1130
1131#[cfg(feature = "tokio")]
1132impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
1133    /// Runs the executor asynchronously with tokio.
1134    ///
1135    /// External events are injected via [`ExecutorSignal::Event`]. Lifecycle
1136    /// signals [`ExecutorSignal::Drain`] and [`ExecutorSignal::Close`] control
1137    /// graceful and immediate shutdown respectively. Use
1138    /// [`ExecutorHandle`](crate::executor_handle::ExecutorHandle) for RAII-managed
1139    /// lifecycle with automatic drain on drop.
1140    pub async fn run_async(
1141        &mut self,
1142        mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
1143    ) -> Marking {
1144        let (completion_tx, mut completion_rx) =
1145            tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
1146
1147        self.initialize_marking_bitmap();
1148        self.mark_all_dirty();
1149
1150        let mut in_flight_count: usize = 0;
1151        let mut signal_channel_open = true;
1152        let mut draining = false;
1153        let mut closed = false;
1154
1155        if E::ENABLED {
1156            let now = now_millis();
1157            self.event_store.append(NetEvent::ExecutionStarted {
1158                net_name: Arc::from(self.program.net().name()),
1159                timestamp: now,
1160            });
1161        }
1162
1163        loop {
1164            // Phase 1: Process completed async actions
1165            while let Ok(completion) = completion_rx.try_recv() {
1166                in_flight_count -= 1;
1167                match completion.result {
1168                    Ok(outputs) => {
1169                        self.process_outputs(0, &completion.transition_name, outputs);
1170                        if E::ENABLED {
1171                            self.event_store.append(NetEvent::TransitionCompleted {
1172                                transition_name: Arc::clone(&completion.transition_name),
1173                                timestamp: now_millis(),
1174                            });
1175                        }
1176                    }
1177                    Err(err) => {
1178                        if E::ENABLED {
1179                            self.event_store.append(NetEvent::TransitionFailed {
1180                                transition_name: Arc::clone(&completion.transition_name),
1181                                error: err,
1182                                timestamp: now_millis(),
1183                            });
1184                        }
1185                    }
1186                }
1187            }
1188
1189            // Phase 2: Process signals (external events + lifecycle)
1190            while let Ok(signal) = signal_rx.try_recv() {
1191                match signal {
1192                    ExecutorSignal::Event(event) if !draining => {
1193                        if let Some(pid) = self.program.place_id(&event.place_name) {
1194                            self.ring_add_last(pid, event.token);
1195                            self.set_marking_bit(pid);
1196                            self.mark_dirty(pid);
1197                        }
1198                        if E::ENABLED {
1199                            self.event_store.append(NetEvent::TokenAdded {
1200                                place_name: Arc::clone(&event.place_name),
1201                                timestamp: now_millis(),
1202                            });
1203                        }
1204                    }
1205                    ExecutorSignal::Event(_) => {
1206                        // Draining: discard events arriving after drain signal
1207                    }
1208                    ExecutorSignal::Drain => {
1209                        draining = true;
1210                    }
1211                    ExecutorSignal::Close => {
1212                        closed = true;
1213                        draining = true;
1214                        // Discard remaining queued signals per ENV-013.
1215                        // Note: events already processed earlier in this try_recv batch
1216                        // are kept (single-channel design); Java/TS discard all queued
1217                        // events via an atomic flag checked at processExternalEvents() entry.
1218                        while signal_rx.try_recv().is_ok() {}
1219                    }
1220                }
1221            }
1222
1223            // Phase 3: Update dirty transitions
1224            self.update_dirty_transitions();
1225
1226            // Phase 4: Enforce deadlines
1227            let cycle_now = self.elapsed_ms();
1228            if self.program.any_deadlines {
1229                self.enforce_deadlines(cycle_now);
1230            }
1231
1232            // Termination check — O(1) flag checks
1233            if closed && in_flight_count == 0 {
1234                break; // ENV-013: immediate close, in-flight completed
1235            }
1236            if draining
1237                && self.enabled_transition_count == 0
1238                && in_flight_count == 0
1239            {
1240                break; // ENV-011: graceful drain, quiescent
1241            }
1242            if self.enabled_transition_count == 0
1243                && in_flight_count == 0
1244                && (!self.has_environment_places || !signal_channel_open)
1245            {
1246                break; // Standard termination
1247            }
1248
1249            // Phase 5: Fire ready transitions
1250            let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
1251
1252            if fired || self.has_dirty_bits() {
1253                tokio::task::yield_now().await;
1254                continue;
1255            }
1256
1257            // Phase 6: Await work
1258            if in_flight_count == 0 && !self.has_environment_places {
1259                break;
1260            }
1261            if in_flight_count == 0 && (draining || !signal_channel_open) {
1262                break;
1263            }
1264
1265            let timer_ms = self.millis_until_next_timed_transition();
1266
1267            tokio::select! {
1268                Some(completion) = completion_rx.recv() => {
1269                    in_flight_count -= 1;
1270                    match completion.result {
1271                        Ok(outputs) => {
1272                            self.process_outputs(0, &completion.transition_name, outputs);
1273                            if E::ENABLED {
1274                                self.event_store.append(NetEvent::TransitionCompleted {
1275                                    transition_name: Arc::clone(&completion.transition_name),
1276                                    timestamp: now_millis(),
1277                                });
1278                            }
1279                        }
1280                        Err(err) => {
1281                            if E::ENABLED {
1282                                self.event_store.append(NetEvent::TransitionFailed {
1283                                    transition_name: Arc::clone(&completion.transition_name),
1284                                    error: err,
1285                                    timestamp: now_millis(),
1286                                });
1287                            }
1288                        }
1289                    }
1290                }
1291                result = signal_rx.recv(), if signal_channel_open && !closed => {
1292                    match result {
1293                        Some(ExecutorSignal::Event(event)) if !draining => {
1294                            if let Some(pid) = self.program.place_id(&event.place_name) {
1295                                self.ring_add_last(pid, event.token);
1296                                self.set_marking_bit(pid);
1297                                self.mark_dirty(pid);
1298                            }
1299                            if E::ENABLED {
1300                                self.event_store.append(NetEvent::TokenAdded {
1301                                    place_name: Arc::clone(&event.place_name),
1302                                    timestamp: now_millis(),
1303                                });
1304                            }
1305                        }
1306                        Some(ExecutorSignal::Event(_)) => {
1307                            // Draining: discard events
1308                        }
1309                        Some(ExecutorSignal::Drain) => {
1310                            draining = true;
1311                        }
1312                        Some(ExecutorSignal::Close) => {
1313                            closed = true;
1314                            draining = true;
1315                            while signal_rx.try_recv().is_ok() {}
1316                        }
1317                        None => {
1318                            signal_channel_open = false;
1319                        }
1320                    }
1321                }
1322                _ = tokio::time::sleep(std::time::Duration::from_millis(
1323                    if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
1324                )) => {}
1325            }
1326        }
1327
1328        if E::ENABLED {
1329            let now = now_millis();
1330            self.event_store.append(NetEvent::ExecutionCompleted {
1331                net_name: Arc::from(self.program.net().name()),
1332                timestamp: now,
1333            });
1334        }
1335
1336        self.materialize_marking()
1337    }
1338
1339    fn fire_ready_async(
1340        &mut self,
1341        now_ms: f64,
1342        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1343        in_flight_count: &mut usize,
1344    ) -> bool {
1345        let mut ready: Vec<(usize, i32, f64)> = Vec::new();
1346
1347        for s in 0..self.summary_words {
1348            let mut summary = self.enabled_word_summary[s];
1349            while summary != 0 {
1350                let local_w = summary.trailing_zeros() as usize;
1351                summary &= summary - 1;
1352                let w = (s << bitmap::WORD_SHIFT) | local_w;
1353                if w >= self.transition_words {
1354                    continue;
1355                }
1356                let mut word = self.enabled_bitmap[w];
1357                while word != 0 {
1358                    let bit = word.trailing_zeros() as usize;
1359                    let tid = (w << bitmap::WORD_SHIFT) | bit;
1360                    word &= word - 1;
1361
1362                    let enabled_ms = self.enabled_at_ms[tid];
1363                    let elapsed = now_ms - enabled_ms;
1364                    if self.program.earliest_ms[tid] <= elapsed {
1365                        ready.push((tid, self.program.priorities[tid], enabled_ms));
1366                    }
1367                }
1368            }
1369        }
1370
1371        if ready.is_empty() {
1372            return false;
1373        }
1374
1375        ready.sort_by(|a, b| {
1376            b.1.cmp(&a.1)
1377                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
1378        });
1379
1380        let mut fired_any = false;
1381        for (tid, _, _) in ready {
1382            if self.is_enabled(tid) && self.can_enable(tid) {
1383                self.fire_transition_async(tid, completion_tx, in_flight_count);
1384                fired_any = true;
1385            } else if self.is_enabled(tid) {
1386                self.clear_enabled_bit(tid);
1387                self.enabled_transition_count -= 1;
1388                self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1389            }
1390        }
1391        fired_any
1392    }
1393
1394    fn fire_transition_async(
1395        &mut self,
1396        tid: usize,
1397        completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1398        in_flight_count: &mut usize,
1399    ) {
1400        let t = self.program.transition(tid);
1401        let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
1402        let input_specs: Vec<In> = t.input_specs().to_vec();
1403        let read_arcs: Vec<_> = t.reads().to_vec();
1404        let reset_arcs: Vec<_> = t.resets().to_vec();
1405        let output_place_names = self.program.output_place_name_sets[tid].clone();
1406        let action = Arc::clone(t.action());
1407        let is_sync = action.is_sync();
1408
1409        // Consume tokens (using spec-based for simplicity in async path)
1410        let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1411        for in_spec in &input_specs {
1412            let pid = self.program.place_id(in_spec.place_name()).unwrap();
1413            let to_consume = match in_spec {
1414                In::One { .. } => 1,
1415                In::Exactly { count, .. } => *count,
1416                In::All { guard, .. } | In::AtLeast { guard, .. } => {
1417                    if guard.is_some() {
1418                        self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
1419                    } else {
1420                        self.token_counts[pid]
1421                    }
1422                }
1423            };
1424
1425            let place_name_arc = Arc::clone(in_spec.place().name_arc());
1426            for _ in 0..to_consume {
1427                let token = if let Some(guard) = in_spec.guard() {
1428                    self.ring_remove_matching(pid, &**guard)
1429                } else {
1430                    Some(self.ring_remove_first(pid))
1431                };
1432                if let Some(token) = token {
1433                    if E::ENABLED {
1434                        self.event_store.append(NetEvent::TokenRemoved {
1435                            place_name: Arc::clone(&place_name_arc),
1436                            timestamp: now_millis(),
1437                        });
1438                    }
1439                    inputs
1440                        .entry(Arc::clone(&place_name_arc))
1441                        .or_default()
1442                        .push(token);
1443                }
1444            }
1445        }
1446
1447        // Read arcs
1448        let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1449        for arc in &read_arcs {
1450            let rpid = self.program.place_id(arc.place.name()).unwrap();
1451            if let Some(token) = self.ring_peek_first(rpid) {
1452                read_tokens
1453                    .entry(Arc::clone(arc.place.name_arc()))
1454                    .or_default()
1455                    .push(token.clone());
1456            }
1457        }
1458
1459        // Reset arcs
1460        for arc in &reset_arcs {
1461            let pid = self.program.place_id(arc.place.name()).unwrap();
1462            let removed = self.ring_remove_all(pid);
1463            if E::ENABLED {
1464                for _ in &removed {
1465                    self.event_store.append(NetEvent::TokenRemoved {
1466                        place_name: Arc::clone(arc.place.name_arc()),
1467                        timestamp: now_millis(),
1468                    });
1469                }
1470            }
1471            self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
1472                1u64 << (pid & bitmap::WORD_MASK);
1473            self.has_pending_resets = true;
1474        }
1475
1476        self.update_bitmap_after_consumption(tid);
1477
1478        if E::ENABLED {
1479            self.event_store.append(NetEvent::TransitionStarted {
1480                transition_name: Arc::clone(&transition_name),
1481                timestamp: now_millis(),
1482            });
1483        }
1484
1485        // Clear enabled status
1486        self.clear_enabled_bit(tid);
1487        self.enabled_transition_count -= 1;
1488        self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1489        self.mark_transition_dirty(tid);
1490
1491        if is_sync {
1492            let mut ctx = TransitionContext::new(
1493                Arc::clone(&transition_name),
1494                inputs,
1495                read_tokens,
1496                output_place_names,
1497                None,
1498            );
1499            let result = action.run_sync(&mut ctx);
1500            match result {
1501                Ok(()) => {
1502                    let outputs = ctx.take_outputs();
1503                    self.process_outputs(tid, &transition_name, outputs);
1504                    if E::ENABLED {
1505                        self.event_store.append(NetEvent::TransitionCompleted {
1506                            transition_name: Arc::clone(&transition_name),
1507                            timestamp: now_millis(),
1508                        });
1509                    }
1510                }
1511                Err(err) => {
1512                    if E::ENABLED {
1513                        self.event_store.append(NetEvent::TransitionFailed {
1514                            transition_name: Arc::clone(&transition_name),
1515                            error: err.message,
1516                            timestamp: now_millis(),
1517                        });
1518                    }
1519                }
1520            }
1521        } else {
1522            *in_flight_count += 1;
1523            let tx = completion_tx.clone();
1524            let name = Arc::clone(&transition_name);
1525            let ctx = TransitionContext::new(
1526                Arc::clone(&transition_name),
1527                inputs,
1528                read_tokens,
1529                output_place_names,
1530                None,
1531            );
1532            tokio::spawn(async move {
1533                let result = action.run_async(ctx).await;
1534                let completion = match result {
1535                    Ok(mut completed_ctx) => ActionCompletion {
1536                        transition_name: Arc::clone(&name),
1537                        result: Ok(completed_ctx.take_outputs()),
1538                    },
1539                    Err(err) => ActionCompletion {
1540                        transition_name: Arc::clone(&name),
1541                        result: Err(err.message),
1542                    },
1543                };
1544                let _ = tx.send(completion);
1545            });
1546        }
1547    }
1548
1549    fn millis_until_next_timed_transition(&self) -> f64 {
1550        let mut min_wait = f64::INFINITY;
1551        let now_ms = self.elapsed_ms();
1552
1553        for s in 0..self.summary_words {
1554            let mut summary = self.enabled_word_summary[s];
1555            while summary != 0 {
1556                let local_w = summary.trailing_zeros() as usize;
1557                summary &= summary - 1;
1558                let w = (s << bitmap::WORD_SHIFT) | local_w;
1559                if w >= self.transition_words {
1560                    continue;
1561                }
1562                let mut word = self.enabled_bitmap[w];
1563                while word != 0 {
1564                    let bit = word.trailing_zeros() as usize;
1565                    let tid = (w << bitmap::WORD_SHIFT) | bit;
1566                    word &= word - 1;
1567
1568                    let elapsed = now_ms - self.enabled_at_ms[tid];
1569                    let remaining_earliest = self.program.earliest_ms[tid] - elapsed;
1570                    if remaining_earliest <= 0.0 {
1571                        return 0.0;
1572                    }
1573                    min_wait = min_wait.min(remaining_earliest);
1574
1575                    if self.program.has_deadline[tid] {
1576                        let remaining_deadline = self.program.latest_ms[tid] - elapsed;
1577                        if remaining_deadline <= 0.0 {
1578                            return 0.0;
1579                        }
1580                        min_wait = min_wait.min(remaining_deadline);
1581                    }
1582                }
1583            }
1584        }
1585
1586        min_wait
1587    }
1588}
1589
1590// ==================== Static Helpers ====================
1591
1592fn grow_ring_static(
1593    token_pool: &mut Vec<Option<ErasedToken>>,
1594    place_offset: &mut [usize],
1595    ring_head: &mut [usize],
1596    ring_tail: &mut [usize],
1597    ring_capacity: &mut [usize],
1598    token_counts: &[usize],
1599    pid: usize,
1600) {
1601    let old_cap = ring_capacity[pid];
1602    let new_cap = old_cap * 2;
1603    let old_offset = place_offset[pid];
1604    let head = ring_head[pid];
1605    let count = token_counts[pid];
1606
1607    // Relocate to end of pool
1608    let new_offset = token_pool.len();
1609    token_pool.resize_with(new_offset + new_cap, || None);
1610
1611    // Copy ring contents linearized
1612    for i in 0..count {
1613        let old_idx = old_offset + (head + i) % old_cap;
1614        token_pool[new_offset + i] = token_pool[old_idx].take();
1615    }
1616
1617    place_offset[pid] = new_offset;
1618    ring_head[pid] = 0;
1619    ring_tail[pid] = count;
1620    ring_capacity[pid] = new_cap;
1621}
1622
1623fn now_millis() -> u64 {
1624    std::time::SystemTime::now()
1625        .duration_since(std::time::UNIX_EPOCH)
1626        .unwrap_or_default()
1627        .as_millis() as u64
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632    use super::*;
1633    use crate::compiled_net::CompiledNet;
1634    use libpetri_core::action::{fork, passthrough, sync_action};
1635    use libpetri_core::input::one;
1636    use libpetri_core::output::out_place;
1637    use libpetri_core::petri_net::PetriNet;
1638    use libpetri_core::place::Place;
1639    use libpetri_core::token::Token;
1640    use libpetri_core::transition::Transition;
1641    use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1642
1643    fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1644        let p1 = Place::<i32>::new("p1");
1645        let p2 = Place::<i32>::new("p2");
1646        let p3 = Place::<i32>::new("p3");
1647
1648        let t1 = Transition::builder("t1")
1649            .input(one(&p1))
1650            .output(out_place(&p2))
1651            .action(passthrough())
1652            .build();
1653        let t2 = Transition::builder("t2")
1654            .input(one(&p2))
1655            .output(out_place(&p3))
1656            .action(passthrough())
1657            .build();
1658
1659        let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1660        (net, p1, p2, p3)
1661    }
1662
1663    #[test]
1664    fn sync_passthrough_chain() {
1665        let (net, p1, _p2, _p3) = simple_chain();
1666        let compiled = CompiledNet::compile(&net);
1667        let prog = PrecompiledNet::from_compiled(&compiled);
1668
1669        let mut marking = Marking::new();
1670        marking.add(&p1, Token::at(42, 0));
1671
1672        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1673        let result = executor.run_to_completion();
1674
1675        assert_eq!(result.count("p1"), 0);
1676    }
1677
1678    #[test]
1679    fn sync_fork_chain() {
1680        let p1 = Place::<i32>::new("p1");
1681        let p2 = Place::<i32>::new("p2");
1682        let p3 = Place::<i32>::new("p3");
1683
1684        let t1 = Transition::builder("t1")
1685            .input(one(&p1))
1686            .output(libpetri_core::output::and(vec![
1687                out_place(&p2),
1688                out_place(&p3),
1689            ]))
1690            .action(fork())
1691            .build();
1692
1693        let net = PetriNet::builder("fork").transition(t1).build();
1694        let compiled = CompiledNet::compile(&net);
1695        let prog = PrecompiledNet::from_compiled(&compiled);
1696
1697        let mut marking = Marking::new();
1698        marking.add(&p1, Token::at(42, 0));
1699
1700        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1701        let result = executor.run_to_completion();
1702
1703        assert_eq!(result.count("p1"), 0);
1704        assert_eq!(result.count("p2"), 1);
1705        assert_eq!(result.count("p3"), 1);
1706    }
1707
1708    #[test]
1709    fn sync_linear_chain_5() {
1710        let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1711        let transitions: Vec<Transition> = (0..5)
1712            .map(|i| {
1713                Transition::builder(format!("t{i}"))
1714                    .input(one(&places[i]))
1715                    .output(out_place(&places[i + 1]))
1716                    .action(fork())
1717                    .build()
1718            })
1719            .collect();
1720
1721        let net = PetriNet::builder("chain5").transitions(transitions).build();
1722        let compiled = CompiledNet::compile(&net);
1723        let prog = PrecompiledNet::from_compiled(&compiled);
1724
1725        let mut marking = Marking::new();
1726        marking.add(&places[0], Token::at(1, 0));
1727
1728        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1729        let result = executor.run_to_completion();
1730
1731        assert_eq!(result.count("p0"), 0);
1732        assert_eq!(result.count("p5"), 1);
1733    }
1734
1735    #[test]
1736    fn sync_no_initial_tokens() {
1737        let (net, _, _, _) = simple_chain();
1738        let compiled = CompiledNet::compile(&net);
1739        let prog = PrecompiledNet::from_compiled(&compiled);
1740        let marking = Marking::new();
1741        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1742        let result = executor.run_to_completion();
1743        assert_eq!(result.count("p1"), 0);
1744        assert_eq!(result.count("p2"), 0);
1745        assert_eq!(result.count("p3"), 0);
1746    }
1747
1748    #[test]
1749    fn sync_priority_ordering() {
1750        let p = Place::<()>::new("p");
1751        let out_a = Place::<()>::new("a");
1752        let out_b = Place::<()>::new("b");
1753
1754        let t_high = Transition::builder("t_high")
1755            .input(one(&p))
1756            .output(out_place(&out_a))
1757            .action(passthrough())
1758            .priority(10)
1759            .build();
1760        let t_low = Transition::builder("t_low")
1761            .input(one(&p))
1762            .output(out_place(&out_b))
1763            .action(passthrough())
1764            .priority(1)
1765            .build();
1766
1767        let net = PetriNet::builder("priority")
1768            .transitions([t_high, t_low])
1769            .build();
1770        let compiled = CompiledNet::compile(&net);
1771        let prog = PrecompiledNet::from_compiled(&compiled);
1772
1773        let mut marking = Marking::new();
1774        marking.add(&p, Token::at((), 0));
1775
1776        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1777        let result = executor.run_to_completion();
1778
1779        assert_eq!(result.count("p"), 0);
1780    }
1781
1782    #[test]
1783    fn sync_inhibitor_blocks() {
1784        let p1 = Place::<()>::new("p1");
1785        let p2 = Place::<()>::new("p2");
1786        let p_inh = Place::<()>::new("inh");
1787
1788        let t = Transition::builder("t1")
1789            .input(one(&p1))
1790            .output(out_place(&p2))
1791            .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1792            .action(passthrough())
1793            .build();
1794
1795        let net = PetriNet::builder("inhibitor").transition(t).build();
1796        let compiled = CompiledNet::compile(&net);
1797        let prog = PrecompiledNet::from_compiled(&compiled);
1798
1799        let mut marking = Marking::new();
1800        marking.add(&p1, Token::at((), 0));
1801        marking.add(&p_inh, Token::at((), 0));
1802
1803        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1804        let result = executor.run_to_completion();
1805
1806        assert_eq!(result.count("p1"), 1);
1807    }
1808
1809    #[test]
1810    fn read_arc_does_not_consume() {
1811        let p_in = Place::<i32>::new("in");
1812        let p_ctx = Place::<i32>::new("ctx");
1813        let p_out = Place::<i32>::new("out");
1814
1815        let t = Transition::builder("t1")
1816            .input(one(&p_in))
1817            .read(libpetri_core::arc::read(&p_ctx))
1818            .output(out_place(&p_out))
1819            .action(sync_action(|ctx| {
1820                let v = ctx.input::<i32>("in")?;
1821                let r = ctx.read::<i32>("ctx")?;
1822                ctx.output("out", *v + *r)?;
1823                Ok(())
1824            }))
1825            .build();
1826        let net = PetriNet::builder("test").transition(t).build();
1827        let compiled = CompiledNet::compile(&net);
1828        let prog = PrecompiledNet::from_compiled(&compiled);
1829
1830        let mut marking = Marking::new();
1831        marking.add(&p_in, Token::at(10, 0));
1832        marking.add(&p_ctx, Token::at(5, 0));
1833
1834        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1835        let result = executor.run_to_completion();
1836
1837        assert_eq!(result.count("in"), 0);
1838        assert_eq!(result.count("ctx"), 1);
1839        assert_eq!(result.count("out"), 1);
1840    }
1841
1842    #[test]
1843    fn reset_arc_removes_all_tokens() {
1844        let p_in = Place::<()>::new("in");
1845        let p_reset = Place::<i32>::new("reset");
1846        let p_out = Place::<()>::new("out");
1847
1848        let t = Transition::builder("t1")
1849            .input(one(&p_in))
1850            .reset(libpetri_core::arc::reset(&p_reset))
1851            .output(out_place(&p_out))
1852            .action(fork())
1853            .build();
1854        let net = PetriNet::builder("test").transition(t).build();
1855        let compiled = CompiledNet::compile(&net);
1856        let prog = PrecompiledNet::from_compiled(&compiled);
1857
1858        let mut marking = Marking::new();
1859        marking.add(&p_in, Token::at((), 0));
1860        marking.add(&p_reset, Token::at(1, 0));
1861        marking.add(&p_reset, Token::at(2, 0));
1862        marking.add(&p_reset, Token::at(3, 0));
1863
1864        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1865        let result = executor.run_to_completion();
1866
1867        assert_eq!(result.count("reset"), 0);
1868        assert_eq!(result.count("out"), 1);
1869    }
1870
1871    #[test]
1872    fn exactly_cardinality_consumes_n() {
1873        let p = Place::<i32>::new("p");
1874        let p_out = Place::<i32>::new("out");
1875
1876        let t = Transition::builder("t1")
1877            .input(libpetri_core::input::exactly(3, &p))
1878            .output(out_place(&p_out))
1879            .action(sync_action(|ctx| {
1880                let vals = ctx.inputs::<i32>("p")?;
1881                for v in vals {
1882                    ctx.output("out", *v)?;
1883                }
1884                Ok(())
1885            }))
1886            .build();
1887        let net = PetriNet::builder("test").transition(t).build();
1888        let compiled = CompiledNet::compile(&net);
1889        let prog = PrecompiledNet::from_compiled(&compiled);
1890
1891        let mut marking = Marking::new();
1892        for i in 0..5 {
1893            marking.add(&p, Token::at(i, 0));
1894        }
1895
1896        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1897        let result = executor.run_to_completion();
1898
1899        assert_eq!(result.count("p"), 2);
1900        assert_eq!(result.count("out"), 3);
1901    }
1902
1903    #[test]
1904    fn all_cardinality_consumes_everything() {
1905        let p = Place::<i32>::new("p");
1906        let p_out = Place::<()>::new("out");
1907
1908        let t = Transition::builder("t1")
1909            .input(libpetri_core::input::all(&p))
1910            .output(out_place(&p_out))
1911            .action(sync_action(|ctx| {
1912                let vals = ctx.inputs::<i32>("p")?;
1913                ctx.output("out", vals.len() as i32)?;
1914                Ok(())
1915            }))
1916            .build();
1917        let net = PetriNet::builder("test").transition(t).build();
1918        let compiled = CompiledNet::compile(&net);
1919        let prog = PrecompiledNet::from_compiled(&compiled);
1920
1921        let mut marking = Marking::new();
1922        for i in 0..5 {
1923            marking.add(&p, Token::at(i, 0));
1924        }
1925
1926        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1927        let result = executor.run_to_completion();
1928
1929        assert_eq!(result.count("p"), 0);
1930    }
1931
1932    #[test]
1933    fn at_least_blocks_insufficient() {
1934        let p = Place::<i32>::new("p");
1935        let p_out = Place::<()>::new("out");
1936
1937        let t = Transition::builder("t1")
1938            .input(libpetri_core::input::at_least(3, &p))
1939            .output(out_place(&p_out))
1940            .action(passthrough())
1941            .build();
1942        let net = PetriNet::builder("test").transition(t).build();
1943        let compiled = CompiledNet::compile(&net);
1944        let prog = PrecompiledNet::from_compiled(&compiled);
1945
1946        let mut marking = Marking::new();
1947        marking.add(&p, Token::at(1, 0));
1948        marking.add(&p, Token::at(2, 0));
1949
1950        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1951        let result = executor.run_to_completion();
1952
1953        assert_eq!(result.count("p"), 2);
1954    }
1955
1956    #[test]
1957    fn at_least_fires_with_enough() {
1958        let p = Place::<i32>::new("p");
1959        let p_out = Place::<()>::new("out");
1960
1961        let t = Transition::builder("t1")
1962            .input(libpetri_core::input::at_least(3, &p))
1963            .output(out_place(&p_out))
1964            .action(passthrough())
1965            .build();
1966        let net = PetriNet::builder("test").transition(t).build();
1967        let compiled = CompiledNet::compile(&net);
1968        let prog = PrecompiledNet::from_compiled(&compiled);
1969
1970        let mut marking = Marking::new();
1971        for i in 0..5 {
1972            marking.add(&p, Token::at(i, 0));
1973        }
1974
1975        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1976        let result = executor.run_to_completion();
1977
1978        assert_eq!(result.count("p"), 0);
1979    }
1980
1981    #[test]
1982    fn guarded_input_only_consumes_matching() {
1983        let p = Place::<i32>::new("p");
1984        let p_out = Place::<i32>::new("out");
1985
1986        let t = Transition::builder("t1")
1987            .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1988            .output(out_place(&p_out))
1989            .action(fork())
1990            .build();
1991        let net = PetriNet::builder("test").transition(t).build();
1992        let compiled = CompiledNet::compile(&net);
1993        let prog = PrecompiledNet::from_compiled(&compiled);
1994
1995        let mut marking = Marking::new();
1996        marking.add(&p, Token::at(3, 0));
1997        marking.add(&p, Token::at(10, 0));
1998
1999        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2000        let result = executor.run_to_completion();
2001
2002        assert_eq!(result.count("p"), 1);
2003        assert_eq!(result.count("out"), 1);
2004    }
2005
2006    #[test]
2007    fn guarded_input_blocks_when_no_match() {
2008        let p = Place::<i32>::new("p");
2009        let p_out = Place::<i32>::new("out");
2010
2011        let t = Transition::builder("t1")
2012            .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
2013            .output(out_place(&p_out))
2014            .action(fork())
2015            .build();
2016        let net = PetriNet::builder("test").transition(t).build();
2017        let compiled = CompiledNet::compile(&net);
2018        let prog = PrecompiledNet::from_compiled(&compiled);
2019
2020        let mut marking = Marking::new();
2021        marking.add(&p, Token::at(3, 0));
2022        marking.add(&p, Token::at(10, 0));
2023
2024        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2025        let result = executor.run_to_completion();
2026
2027        assert_eq!(result.count("p"), 2);
2028        assert_eq!(result.count("out"), 0);
2029    }
2030
2031    #[test]
2032    fn event_store_records_lifecycle() {
2033        let p1 = Place::<i32>::new("p1");
2034        let p2 = Place::<i32>::new("p2");
2035        let t = Transition::builder("t1")
2036            .input(one(&p1))
2037            .output(out_place(&p2))
2038            .action(fork())
2039            .build();
2040        let net = PetriNet::builder("test").transition(t).build();
2041        let compiled = CompiledNet::compile(&net);
2042        let prog = PrecompiledNet::from_compiled(&compiled);
2043
2044        let mut marking = Marking::new();
2045        marking.add(&p1, Token::at(1, 0));
2046
2047        let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2048        let _result = executor.run_to_completion();
2049
2050        let events = executor.event_store().events();
2051        assert!(
2052            events
2053                .iter()
2054                .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2055        );
2056        assert!(
2057            events
2058                .iter()
2059                .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
2060        );
2061        assert!(
2062            events
2063                .iter()
2064                .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2065        );
2066        assert!(
2067            events
2068                .iter()
2069                .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2070        );
2071        assert!(
2072            events
2073                .iter()
2074                .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
2075        );
2076        assert!(
2077            events
2078                .iter()
2079                .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
2080        );
2081        assert!(
2082            events
2083                .iter()
2084                .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2085        );
2086    }
2087
2088    #[test]
2089    fn action_error_does_not_crash() {
2090        let p_in = Place::<i32>::new("in");
2091        let p_out = Place::<i32>::new("out");
2092
2093        let t = Transition::builder("t1")
2094            .input(one(&p_in))
2095            .output(out_place(&p_out))
2096            .action(sync_action(|_ctx| {
2097                Err(libpetri_core::action::ActionError::new(
2098                    "intentional failure",
2099                ))
2100            }))
2101            .build();
2102        let net = PetriNet::builder("test").transition(t).build();
2103        let compiled = CompiledNet::compile(&net);
2104        let prog = PrecompiledNet::from_compiled(&compiled);
2105
2106        let mut marking = Marking::new();
2107        marking.add(&p_in, Token::at(42, 0));
2108
2109        let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2110        let result = executor.run_to_completion();
2111
2112        assert_eq!(result.count("in"), 0);
2113        assert_eq!(result.count("out"), 0);
2114
2115        let events = executor.event_store().events();
2116        assert!(
2117            events
2118                .iter()
2119                .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2120        );
2121    }
2122
2123    #[test]
2124    fn multiple_input_arcs_require_all() {
2125        let p1 = Place::<i32>::new("p1");
2126        let p2 = Place::<i32>::new("p2");
2127        let p3 = Place::<i32>::new("p3");
2128
2129        let t = Transition::builder("t1")
2130            .input(one(&p1))
2131            .input(one(&p2))
2132            .output(out_place(&p3))
2133            .action(sync_action(|ctx| {
2134                ctx.output("p3", 99i32)?;
2135                Ok(())
2136            }))
2137            .build();
2138        let net = PetriNet::builder("test").transition(t).build();
2139        let compiled = CompiledNet::compile(&net);
2140        let prog = PrecompiledNet::from_compiled(&compiled);
2141
2142        // Only p1 has token — should not fire
2143        let mut marking = Marking::new();
2144        marking.add(&p1, Token::at(1, 0));
2145        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2146        let result = executor.run_to_completion();
2147        assert_eq!(result.count("p1"), 1);
2148        assert_eq!(result.count("p3"), 0);
2149
2150        // Both p1 and p2 — should fire
2151        let compiled2 = CompiledNet::compile(&net);
2152        let prog2 = PrecompiledNet::from_compiled(&compiled2);
2153        let mut marking2 = Marking::new();
2154        marking2.add(&p1, Token::at(1, 0));
2155        marking2.add(&p2, Token::at(2, 0));
2156        let mut executor2 = PrecompiledNetExecutor::<NoopEventStore>::new(&prog2, marking2);
2157        let result2 = executor2.run_to_completion();
2158        assert_eq!(result2.count("p1"), 0);
2159        assert_eq!(result2.count("p2"), 0);
2160        assert_eq!(result2.count("p3"), 1);
2161    }
2162
2163    #[test]
2164    fn sync_action_custom_logic() {
2165        let p_in = Place::<i32>::new("in");
2166        let p_out = Place::<String>::new("out");
2167
2168        let t = Transition::builder("t1")
2169            .input(one(&p_in))
2170            .output(out_place(&p_out))
2171            .action(sync_action(|ctx| {
2172                let v = ctx.input::<i32>("in")?;
2173                ctx.output("out", format!("value={v}"))?;
2174                Ok(())
2175            }))
2176            .build();
2177        let net = PetriNet::builder("test").transition(t).build();
2178        let compiled = CompiledNet::compile(&net);
2179        let prog = PrecompiledNet::from_compiled(&compiled);
2180
2181        let mut marking = Marking::new();
2182        marking.add(&p_in, Token::at(42, 0));
2183
2184        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2185        let result = executor.run_to_completion();
2186
2187        assert_eq!(result.count("out"), 1);
2188    }
2189
2190    #[test]
2191    fn transform_action_outputs_to_all_places() {
2192        let p_in = Place::<i32>::new("in");
2193        let p_a = Place::<i32>::new("a");
2194        let p_b = Place::<i32>::new("b");
2195
2196        let t = Transition::builder("t1")
2197            .input(one(&p_in))
2198            .output(libpetri_core::output::and(vec![
2199                out_place(&p_a),
2200                out_place(&p_b),
2201            ]))
2202            .action(libpetri_core::action::transform(|ctx| {
2203                let v = ctx.input::<i32>("in").unwrap();
2204                Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
2205            }))
2206            .build();
2207        let net = PetriNet::builder("test").transition(t).build();
2208        let compiled = CompiledNet::compile(&net);
2209        let prog = PrecompiledNet::from_compiled(&compiled);
2210
2211        let mut marking = Marking::new();
2212        marking.add(&p_in, Token::at(5, 0));
2213
2214        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2215        let result = executor.run_to_completion();
2216
2217        assert_eq!(result.count("a"), 1);
2218        assert_eq!(result.count("b"), 1);
2219    }
2220
2221    #[test]
2222    fn complex_workflow() {
2223        use libpetri_core::output::{and, xor};
2224
2225        let input = Place::<i32>::new("v_input");
2226        let guard_in = Place::<i32>::new("v_guardIn");
2227        let intent_in = Place::<i32>::new("v_intentIn");
2228        let search_in = Place::<i32>::new("v_searchIn");
2229        let output_guard_in = Place::<i32>::new("v_outputGuardIn");
2230        let guard_safe = Place::<i32>::new("v_guardSafe");
2231        let guard_violation = Place::<i32>::new("v_guardViolation");
2232        let _violated = Place::<i32>::new("v_violated");
2233        let intent_ready = Place::<i32>::new("v_intentReady");
2234        let topic_ready = Place::<i32>::new("v_topicReady");
2235        let search_ready = Place::<i32>::new("v_searchReady");
2236        let _output_guard_done = Place::<i32>::new("v_outputGuardDone");
2237        let response = Place::<i32>::new("v_response");
2238
2239        let fork_trans = Transition::builder("Fork")
2240            .input(one(&input))
2241            .output(and(vec![
2242                out_place(&guard_in),
2243                out_place(&intent_in),
2244                out_place(&search_in),
2245                out_place(&output_guard_in),
2246            ]))
2247            .action(fork())
2248            .build();
2249
2250        let guard_trans = Transition::builder("Guard")
2251            .input(one(&guard_in))
2252            .output(xor(vec![
2253                out_place(&guard_safe),
2254                out_place(&guard_violation),
2255            ]))
2256            .action(fork())
2257            .build();
2258
2259        let handle_violation = Transition::builder("HandleViolation")
2260            .input(one(&guard_violation))
2261            .output(out_place(&_violated))
2262            .inhibitor(libpetri_core::arc::inhibitor(&guard_safe))
2263            .action(fork())
2264            .build();
2265
2266        let intent_trans = Transition::builder("Intent")
2267            .input(one(&intent_in))
2268            .output(out_place(&intent_ready))
2269            .action(fork())
2270            .build();
2271
2272        let topic_trans = Transition::builder("TopicKnowledge")
2273            .input(one(&intent_ready))
2274            .output(out_place(&topic_ready))
2275            .action(fork())
2276            .build();
2277
2278        let search_trans = Transition::builder("Search")
2279            .input(one(&search_in))
2280            .output(out_place(&search_ready))
2281            .read(libpetri_core::arc::read(&intent_ready))
2282            .inhibitor(libpetri_core::arc::inhibitor(&guard_violation))
2283            .priority(-5)
2284            .action(fork())
2285            .build();
2286
2287        let output_guard_trans = Transition::builder("OutputGuard")
2288            .input(one(&output_guard_in))
2289            .output(out_place(&_output_guard_done))
2290            .read(libpetri_core::arc::read(&guard_safe))
2291            .action(fork())
2292            .build();
2293
2294        let compose_trans = Transition::builder("Compose")
2295            .input(one(&guard_safe))
2296            .input(one(&search_ready))
2297            .input(one(&topic_ready))
2298            .output(out_place(&response))
2299            .priority(10)
2300            .action(fork())
2301            .build();
2302
2303        let net = PetriNet::builder("ComplexWorkflow")
2304            .transition(fork_trans)
2305            .transition(guard_trans)
2306            .transition(handle_violation)
2307            .transition(intent_trans)
2308            .transition(topic_trans)
2309            .transition(search_trans)
2310            .transition(output_guard_trans)
2311            .transition(compose_trans)
2312            .build();
2313
2314        let compiled = CompiledNet::compile(&net);
2315        let prog = PrecompiledNet::from_compiled(&compiled);
2316
2317        let mut marking = Marking::new();
2318        marking.add(&input, Token::at(1, 0));
2319
2320        let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2321        let result = executor.run_to_completion();
2322
2323        // fork() produces to ALL output places, including both XOR branches.
2324        // This means guard_safe AND guard_violation both get tokens.
2325        // Search is inhibited by guard_violation, so it deadlocks.
2326        // The important thing is the executor doesn't crash and terminates.
2327        assert_eq!(result.count("v_input"), 0); // consumed by Fork
2328    }
2329
2330    #[cfg(feature = "tokio")]
2331    mod async_tests {
2332        use super::*;
2333        use crate::environment::ExternalEvent;
2334        use libpetri_core::action::async_action;
2335        use libpetri_core::petri_net::PetriNet;
2336        use libpetri_core::token::ErasedToken;
2337
2338        #[tokio::test]
2339        async fn async_linear_chain() {
2340            let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
2341            let transitions: Vec<Transition> = (0..5)
2342                .map(|i| {
2343                    Transition::builder(format!("t{i}"))
2344                        .input(one(&places[i]))
2345                        .output(out_place(&places[i + 1]))
2346                        .action(fork())
2347                        .build()
2348                })
2349                .collect();
2350
2351            let net = PetriNet::builder("chain5").transitions(transitions).build();
2352            let compiled = CompiledNet::compile(&net);
2353            let prog = PrecompiledNet::from_compiled(&compiled);
2354
2355            let mut marking = Marking::new();
2356            marking.add(&places[0], Token::at(1, 0));
2357
2358            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2359            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2360            let result = executor.run_async(rx).await;
2361
2362            assert_eq!(result.count("p0"), 0);
2363            assert_eq!(result.count("p5"), 1);
2364        }
2365
2366        #[tokio::test]
2367        async fn async_action_execution() {
2368            let p1 = Place::<i32>::new("p1");
2369            let p2 = Place::<i32>::new("p2");
2370
2371            let t = Transition::builder("t1")
2372                .input(one(&p1))
2373                .output(out_place(&p2))
2374                .action(async_action(|ctx| async { Ok(ctx) }))
2375                .build();
2376
2377            let net = PetriNet::builder("async_test").transition(t).build();
2378            let compiled = CompiledNet::compile(&net);
2379            let prog = PrecompiledNet::from_compiled(&compiled);
2380
2381            let mut marking = Marking::new();
2382            marking.add(&p1, Token::at(42, 0));
2383
2384            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2385            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2386            let result = executor.run_async(rx).await;
2387
2388            assert_eq!(result.count("p1"), 0);
2389        }
2390
2391        // ==================== Drain/Close lifecycle tests ====================
2392
2393        #[tokio::test]
2394        async fn async_drain_terminates_at_quiescence() {
2395            let p1 = Place::<i32>::new("p1");
2396            let p2 = Place::<i32>::new("p2");
2397
2398            let t1 = Transition::builder("t1")
2399                .input(one(&p1))
2400                .output(out_place(&p2))
2401                .action(fork())
2402                .build();
2403
2404            let net = PetriNet::builder("test").transition(t1).build();
2405            let compiled = CompiledNet::compile(&net);
2406            let prog = PrecompiledNet::from_compiled(&compiled);
2407
2408            let marking = Marking::new();
2409            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2410                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2411                .build();
2412
2413            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2414
2415            tokio::spawn(async move {
2416                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2417                tx.send(ExecutorSignal::Event(ExternalEvent {
2418                    place_name: Arc::from("p1"),
2419                    token: ErasedToken::from_typed(&Token::at(42, 0)),
2420                }))
2421                .unwrap();
2422                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2423                tx.send(ExecutorSignal::Drain).unwrap();
2424            });
2425
2426            let result = executor.run_async(rx).await;
2427            assert_eq!(result.count("p2"), 1);
2428        }
2429
2430        #[tokio::test]
2431        async fn async_drain_rejects_post_drain_events() {
2432            let p1 = Place::<i32>::new("p1");
2433            let p2 = Place::<i32>::new("p2");
2434
2435            let t1 = Transition::builder("t1")
2436                .input(one(&p1))
2437                .output(out_place(&p2))
2438                .action(fork())
2439                .build();
2440
2441            let net = PetriNet::builder("test").transition(t1).build();
2442            let compiled = CompiledNet::compile(&net);
2443            let prog = PrecompiledNet::from_compiled(&compiled);
2444
2445            let marking = Marking::new();
2446            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2447                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2448                .build();
2449
2450            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2451
2452            tokio::spawn(async move {
2453                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2454                tx.send(ExecutorSignal::Drain).unwrap();
2455                tx.send(ExecutorSignal::Event(ExternalEvent {
2456                    place_name: Arc::from("p1"),
2457                    token: ErasedToken::from_typed(&Token::at(99, 0)),
2458                }))
2459                .unwrap();
2460            });
2461
2462            let result = executor.run_async(rx).await;
2463            assert_eq!(result.count("p2"), 0);
2464        }
2465
2466        #[tokio::test]
2467        async fn async_close_discards_queued_events() {
2468            let p1 = Place::<i32>::new("p1");
2469            let p2 = Place::<i32>::new("p2");
2470
2471            let t1 = Transition::builder("t1")
2472                .input(one(&p1))
2473                .output(out_place(&p2))
2474                .action(fork())
2475                .build();
2476
2477            let net = PetriNet::builder("test").transition(t1).build();
2478            let compiled = CompiledNet::compile(&net);
2479            let prog = PrecompiledNet::from_compiled(&compiled);
2480
2481            let marking = Marking::new();
2482            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2483                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2484                .build();
2485
2486            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2487
2488            tx.send(ExecutorSignal::Event(ExternalEvent {
2489                place_name: Arc::from("p1"),
2490                token: ErasedToken::from_typed(&Token::at(1, 0)),
2491            }))
2492            .unwrap();
2493            tx.send(ExecutorSignal::Close).unwrap();
2494            tx.send(ExecutorSignal::Event(ExternalEvent {
2495                place_name: Arc::from("p1"),
2496                token: ErasedToken::from_typed(&Token::at(2, 0)),
2497            }))
2498            .unwrap();
2499            drop(tx);
2500
2501            let result = executor.run_async(rx).await;
2502            assert!(result.count("p2") <= 1);
2503        }
2504
2505        #[tokio::test]
2506        async fn async_close_after_drain_escalates() {
2507            let p1 = Place::<i32>::new("p1");
2508            let p2 = Place::<i32>::new("p2");
2509
2510            let t1 = Transition::builder("t1")
2511                .input(one(&p1))
2512                .output(out_place(&p2))
2513                .action(fork())
2514                .build();
2515
2516            let net = PetriNet::builder("test").transition(t1).build();
2517            let compiled = CompiledNet::compile(&net);
2518            let prog = PrecompiledNet::from_compiled(&compiled);
2519
2520            let marking = Marking::new();
2521            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2522                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2523                .build();
2524
2525            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2526
2527            tokio::spawn(async move {
2528                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2529                tx.send(ExecutorSignal::Drain).unwrap();
2530                tx.send(ExecutorSignal::Close).unwrap();
2531            });
2532
2533            let _result = executor.run_async(rx).await;
2534            // Test passes if run_async returns — close escalated from drain
2535        }
2536
2537        #[tokio::test]
2538        async fn async_handle_raii_drain_on_drop() {
2539            use crate::executor_handle::ExecutorHandle;
2540
2541            let p1 = Place::<i32>::new("p1");
2542            let p2 = Place::<i32>::new("p2");
2543
2544            let t1 = Transition::builder("t1")
2545                .input(one(&p1))
2546                .output(out_place(&p2))
2547                .action(fork())
2548                .build();
2549
2550            let net = PetriNet::builder("test").transition(t1).build();
2551            let compiled = CompiledNet::compile(&net);
2552            let prog = PrecompiledNet::from_compiled(&compiled);
2553
2554            let marking = Marking::new();
2555            let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2556                .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2557                .build();
2558
2559            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2560
2561            tokio::spawn(async move {
2562                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2563                let mut handle = ExecutorHandle::new(tx);
2564                handle.inject(
2565                    Arc::from("p1"),
2566                    ErasedToken::from_typed(&Token::at(7, 0)),
2567                );
2568                // handle dropped here — RAII sends Drain automatically
2569            });
2570
2571            let result = executor.run_async(rx).await;
2572            assert_eq!(result.count("p2"), 1);
2573        }
2574    }
2575}