1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::token::ErasedToken;
8
9use libpetri_event::event_store::EventStore;
10use libpetri_event::net_event::NetEvent;
11
12use crate::bitmap;
13use crate::marking::Marking;
14use crate::precompiled_net::{
15 CONSUME_ALL, CONSUME_ATLEAST, CONSUME_N, CONSUME_ONE, PrecompiledNet, RESET,
16};
17
18const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21const INITIAL_RING_CAPACITY: usize = 4;
23
24pub struct PrecompiledNetExecutor<'a, E: EventStore> {
31 program: &'a PrecompiledNet<'a>,
32 event_store: E,
33 #[allow(dead_code)]
34 environment_places: HashSet<Arc<str>>,
35 has_environment_places: bool,
36 #[allow(dead_code)]
37 skip_output_validation: bool,
38
39 token_pool: Vec<Option<ErasedToken>>,
41 place_offset: Vec<usize>,
42 token_counts: Vec<usize>,
43 ring_head: Vec<usize>,
44 ring_tail: Vec<usize>,
45 ring_capacity: Vec<usize>,
46
47 marking_bitmap: Vec<u64>,
49
50 enabled_bitmap: Vec<u64>,
52 dirty_bitmap: Vec<u64>,
53 dirty_scan_buffer: Vec<u64>,
54 enabled_at_ms: Vec<f64>,
55 enabled_transition_count: usize,
56
57 dirty_word_summary: Vec<u64>,
59 enabled_word_summary: Vec<u64>,
60 transition_words: usize,
61 summary_words: usize,
62
63 ready_queues: Vec<Vec<usize>>,
65 ready_queue_head: Vec<usize>,
66 ready_queue_tail: Vec<usize>,
67 ready_queue_size: Vec<usize>,
68
69 pending_reset_words: Vec<u64>,
71 has_pending_resets: bool,
72
73 reusable_inputs: HashMap<Arc<str>, Vec<ErasedToken>>,
75 reusable_reads: HashMap<Arc<str>, Vec<ErasedToken>>,
76
77 start_time: Instant,
79}
80
81pub struct PrecompiledExecutorBuilder<'a, E: EventStore> {
83 program: &'a PrecompiledNet<'a>,
84 initial_marking: Marking,
85 event_store: Option<E>,
86 environment_places: HashSet<Arc<str>>,
87 skip_output_validation: bool,
88}
89
90impl<'a, E: EventStore> PrecompiledExecutorBuilder<'a, E> {
91 pub fn event_store(mut self, store: E) -> Self {
93 self.event_store = Some(store);
94 self
95 }
96
97 pub fn environment_places(mut self, places: HashSet<Arc<str>>) -> Self {
99 self.environment_places = places;
100 self
101 }
102
103 pub fn skip_output_validation(mut self, skip: bool) -> Self {
105 self.skip_output_validation = skip;
106 self
107 }
108
109 pub fn build(self) -> PrecompiledNetExecutor<'a, E> {
111 PrecompiledNetExecutor::new_inner(
112 self.program,
113 self.initial_marking,
114 self.event_store.unwrap_or_default(),
115 self.environment_places,
116 self.skip_output_validation,
117 )
118 }
119}
120
121impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
122 pub fn builder(
124 program: &'a PrecompiledNet<'a>,
125 initial_marking: Marking,
126 ) -> PrecompiledExecutorBuilder<'a, E> {
127 PrecompiledExecutorBuilder {
128 program,
129 initial_marking,
130 event_store: None,
131 environment_places: HashSet::new(),
132 skip_output_validation: false,
133 }
134 }
135
136 pub fn new(program: &'a PrecompiledNet<'a>, initial_marking: Marking) -> Self {
138 Self::new_inner(
139 program,
140 initial_marking,
141 E::default(),
142 HashSet::new(),
143 false,
144 )
145 }
146
147 fn new_inner(
148 program: &'a PrecompiledNet<'a>,
149 initial_marking: Marking,
150 event_store: E,
151 environment_places: HashSet<Arc<str>>,
152 skip_output_validation: bool,
153 ) -> Self {
154 let pc = program.place_count();
155 let tc = program.transition_count();
156 let wc = program.word_count();
157
158 let total_slots = pc * INITIAL_RING_CAPACITY;
160 let mut token_pool = vec![None; total_slots];
161 let mut place_offset = vec![0usize; pc];
162 let mut token_counts = vec![0usize; pc];
163 let mut ring_head = vec![0usize; pc];
164 let mut ring_tail = vec![0usize; pc];
165 let mut ring_capacity = vec![INITIAL_RING_CAPACITY; pc];
166
167 for (pid, offset) in place_offset.iter_mut().enumerate() {
168 *offset = pid * INITIAL_RING_CAPACITY;
169 }
170
171 for pid in 0..pc {
173 let place = program.place(pid);
174 if let Some(queue) = initial_marking.queue(place.name()) {
175 for token in queue {
176 if token_counts[pid] == ring_capacity[pid] {
178 grow_ring_static(
179 &mut token_pool,
180 &mut place_offset,
181 &mut ring_head,
182 &mut ring_tail,
183 &mut ring_capacity,
184 &token_counts,
185 pid,
186 );
187 }
188 let tail = ring_tail[pid];
189 let offset = place_offset[pid];
190 token_pool[offset + tail] = Some(token.clone());
191 ring_tail[pid] = (tail + 1) % ring_capacity[pid];
192 token_counts[pid] += 1;
193 }
194 }
195 }
196
197 let transition_words = bitmap::word_count(tc);
199 let summary_words = bitmap::word_count(transition_words);
200
201 let prio_count = program.distinct_priority_count;
203 let queue_cap = tc.max(4);
204 let ready_queues = vec![vec![0usize; queue_cap]; prio_count];
205 let ready_queue_head = vec![0usize; prio_count];
206 let ready_queue_tail = vec![0usize; prio_count];
207 let ready_queue_size = vec![0usize; prio_count];
208
209 Self {
210 program,
211 event_store,
212 has_environment_places: !environment_places.is_empty(),
213 environment_places,
214 skip_output_validation,
215 token_pool,
216 place_offset,
217 token_counts,
218 ring_head,
219 ring_tail,
220 ring_capacity,
221 marking_bitmap: vec![0u64; wc],
222 enabled_bitmap: vec![0u64; transition_words],
223 dirty_bitmap: vec![0u64; transition_words],
224 dirty_scan_buffer: vec![0u64; transition_words],
225 enabled_at_ms: vec![f64::NEG_INFINITY; tc],
226 enabled_transition_count: 0,
227 dirty_word_summary: vec![0u64; summary_words],
228 enabled_word_summary: vec![0u64; summary_words],
229 transition_words,
230 summary_words,
231 ready_queues,
232 ready_queue_head,
233 ready_queue_tail,
234 ready_queue_size,
235 pending_reset_words: vec![0u64; wc],
236 has_pending_resets: false,
237 reusable_inputs: HashMap::new(),
238 reusable_reads: HashMap::new(),
239 start_time: Instant::now(),
240 }
241 }
242
243 pub fn run_sync(&mut self) -> Marking {
248 self.run_to_completion()
249 }
250
251 pub fn marking(&self) -> Marking {
253 self.materialize_marking()
254 }
255
256 pub fn event_store(&self) -> &E {
258 &self.event_store
259 }
260
261 pub fn is_quiescent(&self) -> bool {
263 self.enabled_transition_count == 0
264 }
265
266 #[inline]
269 fn ring_remove_first(&mut self, pid: usize) -> ErasedToken {
270 let head = self.ring_head[pid];
271 let offset = self.place_offset[pid];
272 let token = self.token_pool[offset + head].take().unwrap();
273 self.ring_head[pid] = (head + 1) % self.ring_capacity[pid];
274 self.token_counts[pid] -= 1;
275 token
276 }
277
278 #[inline]
279 fn ring_add_last(&mut self, pid: usize, token: ErasedToken) {
280 if self.token_counts[pid] == self.ring_capacity[pid] {
281 self.grow_ring(pid);
282 }
283 let tail = self.ring_tail[pid];
284 let offset = self.place_offset[pid];
285 self.token_pool[offset + tail] = Some(token);
286 self.ring_tail[pid] = (tail + 1) % self.ring_capacity[pid];
287 self.token_counts[pid] += 1;
288 }
289
290 #[inline]
291 fn ring_peek_first(&self, pid: usize) -> Option<&ErasedToken> {
292 if self.token_counts[pid] == 0 {
293 return None;
294 }
295 self.token_pool[self.place_offset[pid] + self.ring_head[pid]].as_ref()
296 }
297
298 fn ring_remove_all(&mut self, pid: usize) -> Vec<ErasedToken> {
299 let count = self.token_counts[pid];
300 if count == 0 {
301 return Vec::new();
302 }
303 let mut result = Vec::with_capacity(count);
304 for _ in 0..count {
305 result.push(self.ring_remove_first(pid));
306 }
307 result
308 }
309
310 fn grow_ring(&mut self, pid: usize) {
311 grow_ring_static(
312 &mut self.token_pool,
313 &mut self.place_offset,
314 &mut self.ring_head,
315 &mut self.ring_tail,
316 &mut self.ring_capacity,
317 &self.token_counts,
318 pid,
319 );
320 }
321
322 #[inline]
325 fn set_enabled_bit(&mut self, tid: usize) {
326 let w = tid >> bitmap::WORD_SHIFT;
327 self.enabled_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
328 self.enabled_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
329 }
330
331 #[inline]
332 fn clear_enabled_bit(&mut self, tid: usize) {
333 let w = tid >> bitmap::WORD_SHIFT;
334 self.enabled_bitmap[w] &= !(1u64 << (tid & bitmap::WORD_MASK));
335 if self.enabled_bitmap[w] == 0 {
336 self.enabled_word_summary[w >> bitmap::WORD_SHIFT] &=
337 !(1u64 << (w & bitmap::WORD_MASK));
338 }
339 }
340
341 #[inline]
342 fn is_enabled(&self, tid: usize) -> bool {
343 (self.enabled_bitmap[tid >> bitmap::WORD_SHIFT] & (1u64 << (tid & bitmap::WORD_MASK))) != 0
344 }
345
346 #[inline]
347 fn set_marking_bit(&mut self, pid: usize) {
348 bitmap::set_bit(&mut self.marking_bitmap, pid);
349 }
350
351 #[inline]
352 fn clear_marking_bit(&mut self, pid: usize) {
353 bitmap::clear_bit(&mut self.marking_bitmap, pid);
354 }
355
356 fn ready_queue_push(&mut self, tid: usize) {
359 let pi = self.program.transition_to_priority_index[tid];
360 if self.ready_queue_size[pi] == self.ready_queues[pi].len() {
361 let old_cap = self.ready_queues[pi].len();
362 let new_cap = old_cap * 2;
363 let mut new_queue = vec![0usize; new_cap];
364 let head = self.ready_queue_head[pi];
365 for (i, slot) in new_queue.iter_mut().enumerate().take(old_cap) {
366 *slot = self.ready_queues[pi][(head + i) % old_cap];
367 }
368 self.ready_queues[pi] = new_queue;
369 self.ready_queue_head[pi] = 0;
370 self.ready_queue_tail[pi] = old_cap;
371 }
372 let tail = self.ready_queue_tail[pi];
373 self.ready_queues[pi][tail] = tid;
374 self.ready_queue_tail[pi] = (tail + 1) % self.ready_queues[pi].len();
375 self.ready_queue_size[pi] += 1;
376 }
377
378 fn ready_queue_pop(&mut self, pi: usize) -> usize {
379 let head = self.ready_queue_head[pi];
380 let tid = self.ready_queues[pi][head];
381 self.ready_queue_head[pi] = (head + 1) % self.ready_queues[pi].len();
382 self.ready_queue_size[pi] -= 1;
383 tid
384 }
385
386 fn clear_all_ready_queues(&mut self) {
387 for pi in 0..self.program.distinct_priority_count {
388 self.ready_queue_head[pi] = 0;
389 self.ready_queue_tail[pi] = 0;
390 self.ready_queue_size[pi] = 0;
391 }
392 }
393
394 fn initialize_marking_bitmap(&mut self) {
397 for pid in 0..self.program.place_count() {
398 if self.token_counts[pid] > 0 {
399 self.set_marking_bit(pid);
400 }
401 }
402 }
403
404 fn mark_all_dirty(&mut self) {
405 let tc = self.program.transition_count();
406 let last_word_bits = tc & bitmap::WORD_MASK;
407 for w in 0..self.transition_words.saturating_sub(1) {
408 self.dirty_bitmap[w] = u64::MAX;
409 }
410 if self.transition_words > 0 {
411 self.dirty_bitmap[self.transition_words - 1] = if last_word_bits == 0 {
412 u64::MAX
413 } else {
414 (1u64 << last_word_bits) - 1
415 };
416 }
417 for s in 0..self.summary_words {
419 let first_w = s << bitmap::WORD_SHIFT;
420 let last_w = (first_w + bitmap::WORD_MASK).min(self.transition_words.saturating_sub(1));
421 let count = last_w - first_w + 1;
422 let last_bits = count & bitmap::WORD_MASK;
423 self.dirty_word_summary[s] = if last_bits == 0 {
424 u64::MAX
425 } else {
426 (1u64 << last_bits) - 1
427 };
428 }
429 }
430
431 fn should_terminate(&self) -> bool {
432 if self.has_environment_places {
433 return false;
434 }
435 self.enabled_transition_count == 0
436 }
437
438 fn update_dirty_transitions(&mut self) {
441 let now_ms = self.elapsed_ms();
442
443 for s in 0..self.summary_words {
445 let mut summary = self.dirty_word_summary[s];
446 self.dirty_word_summary[s] = 0;
447 while summary != 0 {
448 let local_w = summary.trailing_zeros() as usize;
449 summary &= summary - 1;
450 let w = (s << bitmap::WORD_SHIFT) | local_w;
451 if w < self.transition_words {
452 self.dirty_scan_buffer[w] = self.dirty_bitmap[w];
453 self.dirty_bitmap[w] = 0;
454 }
455 }
456 }
457
458 let tc = self.program.transition_count();
460 for w in 0..self.transition_words {
461 let mut word = self.dirty_scan_buffer[w];
462 if word == 0 {
463 continue;
464 }
465 self.dirty_scan_buffer[w] = 0;
466 while word != 0 {
467 let bit = word.trailing_zeros() as usize;
468 let tid = (w << bitmap::WORD_SHIFT) | bit;
469 word &= word - 1;
470
471 if tid >= tc {
472 break;
473 }
474
475 let was_enabled = self.is_enabled(tid);
476 let can_now = self.can_enable(tid);
477
478 if can_now && !was_enabled {
479 self.set_enabled_bit(tid);
480 self.enabled_transition_count += 1;
481 self.enabled_at_ms[tid] = now_ms;
482
483 if E::ENABLED {
484 self.event_store.append(NetEvent::TransitionEnabled {
485 transition_name: Arc::clone(self.program.transition(tid).name_arc()),
486 timestamp: now_millis(),
487 });
488 }
489 } else if !can_now && was_enabled {
490 self.clear_enabled_bit(tid);
491 self.enabled_transition_count -= 1;
492 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
493 } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
494 self.enabled_at_ms[tid] = now_ms;
495 if E::ENABLED {
496 self.event_store.append(NetEvent::TransitionClockRestarted {
497 transition_name: Arc::clone(self.program.transition(tid).name_arc()),
498 timestamp: now_millis(),
499 });
500 }
501 }
502 }
503 }
504
505 self.clear_pending_resets();
506 }
507
508 fn can_enable(&self, tid: usize) -> bool {
509 if !self.program.can_enable_bitmap(tid, &self.marking_bitmap) {
510 return false;
511 }
512
513 if let Some(card_check) = self.program.compiled().cardinality_check(tid) {
515 for i in 0..card_check.place_ids.len() {
516 let pid = card_check.place_ids[i];
517 let required = card_check.required_counts[i];
518 if self.token_counts[pid] < required {
519 return false;
520 }
521 }
522 }
523
524 if self.program.compiled().has_guards(tid) {
526 let t = self.program.transition(tid);
527 for spec in t.input_specs() {
528 if let Some(guard) = spec.guard() {
529 let required = match spec {
530 In::One { .. } => 1,
531 In::Exactly { count, .. } => *count,
532 In::AtLeast { minimum, .. } => *minimum,
533 In::All { .. } => 1,
534 };
535 let pid = self.program.place_id(spec.place_name()).unwrap();
536 let count = self.count_matching_in_ring(pid, &**guard);
537 if count < required {
538 return false;
539 }
540 }
541 }
542 }
543
544 true
545 }
546
547 fn count_matching_in_ring(
548 &self,
549 pid: usize,
550 guard: &dyn Fn(&dyn std::any::Any) -> bool,
551 ) -> usize {
552 let count = self.token_counts[pid];
553 if count == 0 {
554 return 0;
555 }
556 let offset = self.place_offset[pid];
557 let head = self.ring_head[pid];
558 let cap = self.ring_capacity[pid];
559 let mut matched = 0;
560 for i in 0..count {
561 let idx = offset + (head + i) % cap;
562 if let Some(token) = &self.token_pool[idx]
563 && guard(token.value.as_ref())
564 {
565 matched += 1;
566 }
567 }
568 matched
569 }
570
571 fn has_input_from_reset_place(&self, tid: usize) -> bool {
572 if !self.has_pending_resets {
573 return false;
574 }
575 let input_mask = &self.program.input_place_mask_words[tid];
576 for (im, pr) in input_mask.iter().zip(self.pending_reset_words.iter()) {
577 if (im & pr) != 0 {
578 return true;
579 }
580 }
581 false
582 }
583
584 fn clear_pending_resets(&mut self) {
585 if self.has_pending_resets {
586 for w in &mut self.pending_reset_words {
587 *w = 0;
588 }
589 self.has_pending_resets = false;
590 }
591 }
592
593 fn enforce_deadlines(&mut self, now_ms: f64) {
596 for s in 0..self.summary_words {
597 let mut summary = self.enabled_word_summary[s];
598 while summary != 0 {
599 let local_w = summary.trailing_zeros() as usize;
600 summary &= summary - 1;
601 let w = (s << bitmap::WORD_SHIFT) | local_w;
602 if w >= self.transition_words {
603 continue;
604 }
605 let mut word = self.enabled_bitmap[w];
606 while word != 0 {
607 let bit = word.trailing_zeros() as usize;
608 let tid = (w << bitmap::WORD_SHIFT) | bit;
609 word &= word - 1;
610
611 if !self.program.has_deadline[tid] {
612 continue;
613 }
614
615 let elapsed = now_ms - self.enabled_at_ms[tid];
616 let latest_ms = self.program.latest_ms[tid];
617
618 if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
619 self.clear_enabled_bit(tid);
620 self.enabled_transition_count -= 1;
621 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
622 self.mark_transition_dirty(tid);
623
624 if E::ENABLED {
625 self.event_store.append(NetEvent::TransitionTimedOut {
626 transition_name: Arc::clone(
627 self.program.transition(tid).name_arc(),
628 ),
629 timestamp: now_millis(),
630 });
631 }
632 }
633 }
634 }
635 }
636 }
637
638 fn fire_ready_immediate_sync(&mut self) {
641 for s in 0..self.summary_words {
642 let mut summary = self.enabled_word_summary[s];
643 while summary != 0 {
644 let local_w = summary.trailing_zeros() as usize;
645 summary &= summary - 1;
646 let w = (s << bitmap::WORD_SHIFT) | local_w;
647 if w >= self.transition_words {
648 continue;
649 }
650 let word = self.enabled_bitmap[w];
651 let mut remaining = word;
652 while remaining != 0 {
653 let bit = remaining.trailing_zeros() as usize;
654 let tid = (w << bitmap::WORD_SHIFT) | bit;
655 remaining &= remaining - 1;
656
657 if self.can_enable(tid) {
658 self.fire_transition_sync(tid);
659 } else {
660 self.clear_enabled_bit(tid);
661 self.enabled_transition_count -= 1;
662 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
663 }
664 }
665 }
666 }
667 }
668
669 fn fire_ready_general_sync(&mut self, now_ms: f64) {
670 self.clear_all_ready_queues();
672
673 for s in 0..self.summary_words {
674 let mut summary = self.enabled_word_summary[s];
675 while summary != 0 {
676 let local_w = summary.trailing_zeros() as usize;
677 summary &= summary - 1;
678 let w = (s << bitmap::WORD_SHIFT) | local_w;
679 if w >= self.transition_words {
680 continue;
681 }
682 let mut word = self.enabled_bitmap[w];
683 while word != 0 {
684 let bit = word.trailing_zeros() as usize;
685 let tid = (w << bitmap::WORD_SHIFT) | bit;
686 word &= word - 1;
687
688 let enabled_ms = self.enabled_at_ms[tid];
689 let elapsed = now_ms - enabled_ms;
690
691 if self.program.earliest_ms[tid] <= elapsed {
692 self.ready_queue_push(tid);
693 }
694 }
695 }
696 }
697
698 for pi in 0..self.program.distinct_priority_count {
700 while self.ready_queue_size[pi] > 0 {
701 let tid = self.ready_queue_pop(pi);
702 if !self.is_enabled(tid) {
703 continue;
704 }
705
706 if self.can_enable(tid) {
707 self.fire_transition_sync(tid);
708 } else {
709 self.clear_enabled_bit(tid);
710 self.enabled_transition_count -= 1;
711 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
712 }
713 }
714 }
715 }
716
717 fn fire_transition_sync(&mut self, tid: usize) {
720 let has_guards = self.program.compiled().has_guards(tid);
721 let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
722 let action = Arc::clone(self.program.transition(tid).action());
723
724 self.reusable_inputs.clear();
726 self.reusable_reads.clear();
727
728 if has_guards {
729 let input_specs: Vec<In> = self.program.transition(tid).input_specs().to_vec();
731 let reset_arcs: Vec<_> = self.program.transition(tid).resets().to_vec();
732
733 for in_spec in &input_specs {
734 let pid = self.program.place_id(in_spec.place_name()).unwrap();
735 let place_name_arc = Arc::clone(&self.program.place_name_arcs[pid]);
736 let to_consume = match in_spec {
737 In::One { .. } => 1,
738 In::Exactly { count, .. } => *count,
739 In::All { guard, .. } | In::AtLeast { guard, .. } => {
740 if guard.is_some() {
741 self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
742 } else {
743 self.token_counts[pid]
744 }
745 }
746 };
747
748 for _ in 0..to_consume {
749 let token = if let Some(guard) = in_spec.guard() {
750 self.ring_remove_matching(pid, &**guard)
751 } else {
752 Some(self.ring_remove_first(pid))
753 };
754 if let Some(token) = token {
755 if E::ENABLED {
756 self.event_store.append(NetEvent::TokenRemoved {
757 place_name: Arc::clone(&place_name_arc),
758 timestamp: now_millis(),
759 });
760 }
761 self.reusable_inputs
762 .entry(Arc::clone(&place_name_arc))
763 .or_default()
764 .push(token);
765 }
766 }
767 }
768
769 for arc in &reset_arcs {
771 let pid = self.program.place_id(arc.place.name()).unwrap();
772 let removed = self.ring_remove_all(pid);
773 if E::ENABLED {
774 for _ in &removed {
775 self.event_store.append(NetEvent::TokenRemoved {
776 place_name: Arc::clone(arc.place.name_arc()),
777 timestamp: now_millis(),
778 });
779 }
780 }
781 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
782 1u64 << (pid & bitmap::WORD_MASK);
783 self.has_pending_resets = true;
784 }
785 } else {
786 let ops_len = self.program.consume_ops[tid].len();
788 let mut pc = 0;
789 while pc < ops_len {
790 let opcode = self.program.consume_ops[tid][pc];
791 pc += 1;
792 match opcode {
793 CONSUME_ONE => {
794 let pid = self.program.consume_ops[tid][pc] as usize;
795 pc += 1;
796 let token = self.ring_remove_first(pid);
797 if E::ENABLED {
798 self.event_store.append(NetEvent::TokenRemoved {
799 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
800 timestamp: now_millis(),
801 });
802 }
803 self.reusable_inputs
804 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
805 .or_default()
806 .push(token);
807 }
808 CONSUME_N => {
809 let pid = self.program.consume_ops[tid][pc] as usize;
810 pc += 1;
811 let count = self.program.consume_ops[tid][pc] as usize;
812 pc += 1;
813 for _ in 0..count {
814 let token = self.ring_remove_first(pid);
815 if E::ENABLED {
816 self.event_store.append(NetEvent::TokenRemoved {
817 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
818 timestamp: now_millis(),
819 });
820 }
821 self.reusable_inputs
822 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
823 .or_default()
824 .push(token);
825 }
826 }
827 CONSUME_ALL | CONSUME_ATLEAST => {
828 let pid = self.program.consume_ops[tid][pc] as usize;
829 pc += 1;
830 if opcode == CONSUME_ATLEAST {
831 pc += 1;
832 }
833 let count = self.token_counts[pid];
834 for _ in 0..count {
835 let token = self.ring_remove_first(pid);
836 if E::ENABLED {
837 self.event_store.append(NetEvent::TokenRemoved {
838 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
839 timestamp: now_millis(),
840 });
841 }
842 self.reusable_inputs
843 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
844 .or_default()
845 .push(token);
846 }
847 }
848 RESET => {
849 let pid = self.program.consume_ops[tid][pc] as usize;
850 pc += 1;
851 let count = self.token_counts[pid];
852 for _ in 0..count {
853 let _token = self.ring_remove_first(pid);
854 if E::ENABLED {
855 self.event_store.append(NetEvent::TokenRemoved {
856 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
857 timestamp: now_millis(),
858 });
859 }
860 }
861 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
862 1u64 << (pid & bitmap::WORD_MASK);
863 self.has_pending_resets = true;
864 }
865 _ => unreachable!("Unknown opcode: {opcode}"),
866 }
867 }
868 }
869
870 let read_ops_len = self.program.read_ops[tid].len();
872 for i in 0..read_ops_len {
873 let rpid = self.program.read_ops[tid][i];
874 let token_clone = self.ring_peek_first(rpid).cloned();
875 if let Some(token) = token_clone {
876 let place_name = Arc::clone(&self.program.place_name_arcs[rpid]);
877 self.reusable_reads
878 .entry(place_name)
879 .or_default()
880 .push(token);
881 }
882 }
883
884 self.update_bitmap_after_consumption(tid);
886
887 if E::ENABLED {
888 self.event_store.append(NetEvent::TransitionStarted {
889 transition_name: Arc::clone(&transition_name),
890 timestamp: now_millis(),
891 });
892 }
893
894 let inputs = std::mem::take(&mut self.reusable_inputs);
896 let reads = std::mem::take(&mut self.reusable_reads);
897 let mut ctx = TransitionContext::new(
898 Arc::clone(&transition_name),
899 inputs,
900 reads,
901 self.program.output_place_name_sets[tid].clone(),
902 None,
903 );
904
905 let result = action.run_sync(&mut ctx);
906
907 let returned_inputs = ctx.take_inputs();
909 let returned_reads = ctx.take_reads();
910
911 match result {
912 Ok(()) => {
913 let outputs = ctx.take_outputs();
914 self.process_outputs(tid, &transition_name, outputs);
915
916 if E::ENABLED {
917 self.event_store.append(NetEvent::TransitionCompleted {
918 transition_name: Arc::clone(&transition_name),
919 timestamp: now_millis(),
920 });
921 }
922 }
923 Err(err) => {
924 if E::ENABLED {
925 self.event_store.append(NetEvent::TransitionFailed {
926 transition_name: Arc::clone(&transition_name),
927 error: err.message,
928 timestamp: now_millis(),
929 });
930 }
931 }
932 }
933
934 self.reusable_inputs = returned_inputs;
936 self.reusable_reads = returned_reads;
937
938 self.clear_enabled_bit(tid);
940 self.enabled_transition_count -= 1;
941 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
942
943 self.mark_transition_dirty(tid);
945 }
946
947 fn ring_remove_matching(
950 &mut self,
951 pid: usize,
952 guard: &dyn Fn(&dyn std::any::Any) -> bool,
953 ) -> Option<ErasedToken> {
954 let count = self.token_counts[pid];
955 if count == 0 {
956 return None;
957 }
958 let offset = self.place_offset[pid];
959 let head = self.ring_head[pid];
960 let cap = self.ring_capacity[pid];
961
962 for i in 0..count {
964 let idx = offset + (head + i) % cap;
965 if let Some(token) = &self.token_pool[idx]
966 && guard(token.value.as_ref())
967 {
968 let token = self.token_pool[idx].take().unwrap();
969 for j in i..count - 1 {
971 let from = offset + (head + j + 1) % cap;
972 let to = offset + (head + j) % cap;
973 self.token_pool[to] = self.token_pool[from].take();
974 }
975 self.token_counts[pid] -= 1;
976 self.ring_tail[pid] = if self.ring_tail[pid] == 0 {
977 cap - 1
978 } else {
979 self.ring_tail[pid] - 1
980 };
981 return Some(token);
982 }
983 }
984 None
985 }
986
987 fn process_outputs(
988 &mut self,
989 _tid: usize,
990 _transition_name: &Arc<str>,
991 outputs: Vec<OutputEntry>,
992 ) {
993 for entry in outputs {
994 if let Some(pid) = self.program.place_id(&entry.place_name) {
995 self.ring_add_last(pid, entry.token);
996 self.set_marking_bit(pid);
997 self.mark_dirty(pid);
998 }
999
1000 if E::ENABLED {
1001 self.event_store.append(NetEvent::TokenAdded {
1002 place_name: Arc::clone(&entry.place_name),
1003 timestamp: now_millis(),
1004 });
1005 }
1006 }
1007 }
1008
1009 fn update_bitmap_after_consumption(&mut self, tid: usize) {
1010 let n = self.program.compiled().consumption_place_ids(tid).len();
1011 for i in 0..n {
1012 let pid = self.program.compiled().consumption_place_ids(tid)[i];
1013 if self.token_counts[pid] == 0 {
1014 self.clear_marking_bit(pid);
1015 }
1016 self.mark_dirty(pid);
1017 }
1018 }
1019
1020 fn has_dirty_bits(&self) -> bool {
1023 for &s in &self.dirty_word_summary {
1024 if s != 0 {
1025 return true;
1026 }
1027 }
1028 false
1029 }
1030
1031 fn mark_dirty(&mut self, pid: usize) {
1032 let n = self.program.compiled().affected_transitions(pid).len();
1033 for i in 0..n {
1034 let tid = self.program.compiled().affected_transitions(pid)[i];
1035 self.mark_transition_dirty(tid);
1036 }
1037 }
1038
1039 fn mark_transition_dirty(&mut self, tid: usize) {
1040 let w = tid >> bitmap::WORD_SHIFT;
1041 self.dirty_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
1042 self.dirty_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
1043 }
1044
1045 fn elapsed_ms(&self) -> f64 {
1046 self.start_time.elapsed().as_secs_f64() * 1000.0
1047 }
1048
1049 fn materialize_marking(&self) -> Marking {
1052 let mut marking = Marking::new();
1053 for pid in 0..self.program.place_count() {
1054 let count = self.token_counts[pid];
1055 if count == 0 {
1056 continue;
1057 }
1058 let place_name = self.program.place(pid).name_arc();
1059 let offset = self.place_offset[pid];
1060 let head = self.ring_head[pid];
1061 let cap = self.ring_capacity[pid];
1062 for i in 0..count {
1063 let idx = offset + (head + i) % cap;
1064 if let Some(token) = &self.token_pool[idx] {
1065 marking.add_erased(place_name, token.clone());
1066 }
1067 }
1068 }
1069 marking
1070 }
1071
1072 fn run_to_completion(&mut self) -> Marking {
1074 self.initialize_marking_bitmap();
1075 self.mark_all_dirty();
1076
1077 if E::ENABLED {
1078 let now = now_millis();
1079 self.event_store.append(NetEvent::ExecutionStarted {
1080 net_name: Arc::from(self.program.net().name()),
1081 timestamp: now,
1082 });
1083 }
1084
1085 loop {
1086 self.update_dirty_transitions();
1087
1088 let cycle_now = self.elapsed_ms();
1089
1090 if self.program.any_deadlines {
1091 self.enforce_deadlines(cycle_now);
1092 }
1093
1094 if self.should_terminate() {
1095 break;
1096 }
1097
1098 if self.program.all_immediate && self.program.all_same_priority {
1099 self.fire_ready_immediate_sync();
1100 } else {
1101 self.fire_ready_general_sync(cycle_now);
1102 }
1103
1104 if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
1105 break;
1106 }
1107 }
1108
1109 if E::ENABLED {
1110 let now = now_millis();
1111 self.event_store.append(NetEvent::ExecutionCompleted {
1112 net_name: Arc::from(self.program.net().name()),
1113 timestamp: now,
1114 });
1115 }
1116
1117 self.materialize_marking()
1118 }
1119}
1120
1121#[cfg(feature = "tokio")]
1123use crate::environment::ExecutorSignal;
1124
1125#[cfg(feature = "tokio")]
1126struct ActionCompletion {
1127 transition_name: Arc<str>,
1128 result: Result<Vec<OutputEntry>, String>,
1129}
1130
1131#[cfg(feature = "tokio")]
1132impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
1133 pub async fn run_async(
1141 &mut self,
1142 mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
1143 ) -> Marking {
1144 let (completion_tx, mut completion_rx) =
1145 tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
1146
1147 self.initialize_marking_bitmap();
1148 self.mark_all_dirty();
1149
1150 let mut in_flight_count: usize = 0;
1151 let mut signal_channel_open = true;
1152 let mut draining = false;
1153 let mut closed = false;
1154
1155 if E::ENABLED {
1156 let now = now_millis();
1157 self.event_store.append(NetEvent::ExecutionStarted {
1158 net_name: Arc::from(self.program.net().name()),
1159 timestamp: now,
1160 });
1161 }
1162
1163 loop {
1164 while let Ok(completion) = completion_rx.try_recv() {
1166 in_flight_count -= 1;
1167 match completion.result {
1168 Ok(outputs) => {
1169 self.process_outputs(0, &completion.transition_name, outputs);
1170 if E::ENABLED {
1171 self.event_store.append(NetEvent::TransitionCompleted {
1172 transition_name: Arc::clone(&completion.transition_name),
1173 timestamp: now_millis(),
1174 });
1175 }
1176 }
1177 Err(err) => {
1178 if E::ENABLED {
1179 self.event_store.append(NetEvent::TransitionFailed {
1180 transition_name: Arc::clone(&completion.transition_name),
1181 error: err,
1182 timestamp: now_millis(),
1183 });
1184 }
1185 }
1186 }
1187 }
1188
1189 while let Ok(signal) = signal_rx.try_recv() {
1191 match signal {
1192 ExecutorSignal::Event(event) if !draining => {
1193 if let Some(pid) = self.program.place_id(&event.place_name) {
1194 self.ring_add_last(pid, event.token);
1195 self.set_marking_bit(pid);
1196 self.mark_dirty(pid);
1197 }
1198 if E::ENABLED {
1199 self.event_store.append(NetEvent::TokenAdded {
1200 place_name: Arc::clone(&event.place_name),
1201 timestamp: now_millis(),
1202 });
1203 }
1204 }
1205 ExecutorSignal::Event(_) => {
1206 }
1208 ExecutorSignal::Drain => {
1209 draining = true;
1210 }
1211 ExecutorSignal::Close => {
1212 closed = true;
1213 draining = true;
1214 while signal_rx.try_recv().is_ok() {}
1219 }
1220 }
1221 }
1222
1223 self.update_dirty_transitions();
1225
1226 let cycle_now = self.elapsed_ms();
1228 if self.program.any_deadlines {
1229 self.enforce_deadlines(cycle_now);
1230 }
1231
1232 if closed && in_flight_count == 0 {
1234 break; }
1236 if draining
1237 && self.enabled_transition_count == 0
1238 && in_flight_count == 0
1239 {
1240 break; }
1242 if self.enabled_transition_count == 0
1243 && in_flight_count == 0
1244 && (!self.has_environment_places || !signal_channel_open)
1245 {
1246 break; }
1248
1249 let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
1251
1252 if fired || self.has_dirty_bits() {
1253 tokio::task::yield_now().await;
1254 continue;
1255 }
1256
1257 if in_flight_count == 0 && !self.has_environment_places
1259 && self.enabled_transition_count == 0
1260 {
1261 break;
1262 }
1263 if in_flight_count == 0
1264 && self.enabled_transition_count == 0
1265 && (draining || !signal_channel_open)
1266 {
1267 break;
1268 }
1269
1270 let timer_ms = self.millis_until_next_timed_transition();
1271
1272 tokio::select! {
1273 Some(completion) = completion_rx.recv() => {
1274 in_flight_count -= 1;
1275 match completion.result {
1276 Ok(outputs) => {
1277 self.process_outputs(0, &completion.transition_name, outputs);
1278 if E::ENABLED {
1279 self.event_store.append(NetEvent::TransitionCompleted {
1280 transition_name: Arc::clone(&completion.transition_name),
1281 timestamp: now_millis(),
1282 });
1283 }
1284 }
1285 Err(err) => {
1286 if E::ENABLED {
1287 self.event_store.append(NetEvent::TransitionFailed {
1288 transition_name: Arc::clone(&completion.transition_name),
1289 error: err,
1290 timestamp: now_millis(),
1291 });
1292 }
1293 }
1294 }
1295 }
1296 result = signal_rx.recv(), if signal_channel_open && !closed => {
1297 match result {
1298 Some(ExecutorSignal::Event(event)) if !draining => {
1299 if let Some(pid) = self.program.place_id(&event.place_name) {
1300 self.ring_add_last(pid, event.token);
1301 self.set_marking_bit(pid);
1302 self.mark_dirty(pid);
1303 }
1304 if E::ENABLED {
1305 self.event_store.append(NetEvent::TokenAdded {
1306 place_name: Arc::clone(&event.place_name),
1307 timestamp: now_millis(),
1308 });
1309 }
1310 }
1311 Some(ExecutorSignal::Event(_)) => {
1312 }
1314 Some(ExecutorSignal::Drain) => {
1315 draining = true;
1316 }
1317 Some(ExecutorSignal::Close) => {
1318 closed = true;
1319 draining = true;
1320 while signal_rx.try_recv().is_ok() {}
1321 }
1322 None => {
1323 signal_channel_open = false;
1324 }
1325 }
1326 }
1327 _ = tokio::time::sleep(std::time::Duration::from_millis(
1328 if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
1329 )) => {}
1330 }
1331 }
1332
1333 if E::ENABLED {
1334 let now = now_millis();
1335 self.event_store.append(NetEvent::ExecutionCompleted {
1336 net_name: Arc::from(self.program.net().name()),
1337 timestamp: now,
1338 });
1339 }
1340
1341 self.materialize_marking()
1342 }
1343
1344 fn fire_ready_async(
1345 &mut self,
1346 now_ms: f64,
1347 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1348 in_flight_count: &mut usize,
1349 ) -> bool {
1350 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
1351
1352 for s in 0..self.summary_words {
1353 let mut summary = self.enabled_word_summary[s];
1354 while summary != 0 {
1355 let local_w = summary.trailing_zeros() as usize;
1356 summary &= summary - 1;
1357 let w = (s << bitmap::WORD_SHIFT) | local_w;
1358 if w >= self.transition_words {
1359 continue;
1360 }
1361 let mut word = self.enabled_bitmap[w];
1362 while word != 0 {
1363 let bit = word.trailing_zeros() as usize;
1364 let tid = (w << bitmap::WORD_SHIFT) | bit;
1365 word &= word - 1;
1366
1367 let enabled_ms = self.enabled_at_ms[tid];
1368 let elapsed = now_ms - enabled_ms;
1369 if self.program.earliest_ms[tid] <= elapsed {
1370 ready.push((tid, self.program.priorities[tid], enabled_ms));
1371 }
1372 }
1373 }
1374 }
1375
1376 if ready.is_empty() {
1377 return false;
1378 }
1379
1380 ready.sort_by(|a, b| {
1381 b.1.cmp(&a.1)
1382 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
1383 });
1384
1385 let mut fired_any = false;
1386 for (tid, _, _) in ready {
1387 if self.is_enabled(tid) && self.can_enable(tid) {
1388 self.fire_transition_async(tid, completion_tx, in_flight_count);
1389 fired_any = true;
1390 } else if self.is_enabled(tid) {
1391 self.clear_enabled_bit(tid);
1392 self.enabled_transition_count -= 1;
1393 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1394 }
1395 }
1396 fired_any
1397 }
1398
1399 fn fire_transition_async(
1400 &mut self,
1401 tid: usize,
1402 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1403 in_flight_count: &mut usize,
1404 ) {
1405 let t = self.program.transition(tid);
1406 let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
1407 let input_specs: Vec<In> = t.input_specs().to_vec();
1408 let read_arcs: Vec<_> = t.reads().to_vec();
1409 let reset_arcs: Vec<_> = t.resets().to_vec();
1410 let output_place_names = self.program.output_place_name_sets[tid].clone();
1411 let action = Arc::clone(t.action());
1412 let is_sync = action.is_sync();
1413
1414 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1416 for in_spec in &input_specs {
1417 let pid = self.program.place_id(in_spec.place_name()).unwrap();
1418 let to_consume = match in_spec {
1419 In::One { .. } => 1,
1420 In::Exactly { count, .. } => *count,
1421 In::All { guard, .. } | In::AtLeast { guard, .. } => {
1422 if guard.is_some() {
1423 self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
1424 } else {
1425 self.token_counts[pid]
1426 }
1427 }
1428 };
1429
1430 let place_name_arc = Arc::clone(in_spec.place().name_arc());
1431 for _ in 0..to_consume {
1432 let token = if let Some(guard) = in_spec.guard() {
1433 self.ring_remove_matching(pid, &**guard)
1434 } else {
1435 Some(self.ring_remove_first(pid))
1436 };
1437 if let Some(token) = token {
1438 if E::ENABLED {
1439 self.event_store.append(NetEvent::TokenRemoved {
1440 place_name: Arc::clone(&place_name_arc),
1441 timestamp: now_millis(),
1442 });
1443 }
1444 inputs
1445 .entry(Arc::clone(&place_name_arc))
1446 .or_default()
1447 .push(token);
1448 }
1449 }
1450 }
1451
1452 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1454 for arc in &read_arcs {
1455 let rpid = self.program.place_id(arc.place.name()).unwrap();
1456 if let Some(token) = self.ring_peek_first(rpid) {
1457 read_tokens
1458 .entry(Arc::clone(arc.place.name_arc()))
1459 .or_default()
1460 .push(token.clone());
1461 }
1462 }
1463
1464 for arc in &reset_arcs {
1466 let pid = self.program.place_id(arc.place.name()).unwrap();
1467 let removed = self.ring_remove_all(pid);
1468 if E::ENABLED {
1469 for _ in &removed {
1470 self.event_store.append(NetEvent::TokenRemoved {
1471 place_name: Arc::clone(arc.place.name_arc()),
1472 timestamp: now_millis(),
1473 });
1474 }
1475 }
1476 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
1477 1u64 << (pid & bitmap::WORD_MASK);
1478 self.has_pending_resets = true;
1479 }
1480
1481 self.update_bitmap_after_consumption(tid);
1482
1483 if E::ENABLED {
1484 self.event_store.append(NetEvent::TransitionStarted {
1485 transition_name: Arc::clone(&transition_name),
1486 timestamp: now_millis(),
1487 });
1488 }
1489
1490 self.clear_enabled_bit(tid);
1492 self.enabled_transition_count -= 1;
1493 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1494 self.mark_transition_dirty(tid);
1495
1496 if is_sync {
1497 let mut ctx = TransitionContext::new(
1498 Arc::clone(&transition_name),
1499 inputs,
1500 read_tokens,
1501 output_place_names,
1502 None,
1503 );
1504 let result = action.run_sync(&mut ctx);
1505 match result {
1506 Ok(()) => {
1507 let outputs = ctx.take_outputs();
1508 self.process_outputs(tid, &transition_name, outputs);
1509 if E::ENABLED {
1510 self.event_store.append(NetEvent::TransitionCompleted {
1511 transition_name: Arc::clone(&transition_name),
1512 timestamp: now_millis(),
1513 });
1514 }
1515 }
1516 Err(err) => {
1517 if E::ENABLED {
1518 self.event_store.append(NetEvent::TransitionFailed {
1519 transition_name: Arc::clone(&transition_name),
1520 error: err.message,
1521 timestamp: now_millis(),
1522 });
1523 }
1524 }
1525 }
1526 } else {
1527 *in_flight_count += 1;
1528 let tx = completion_tx.clone();
1529 let name = Arc::clone(&transition_name);
1530 let ctx = TransitionContext::new(
1531 Arc::clone(&transition_name),
1532 inputs,
1533 read_tokens,
1534 output_place_names,
1535 None,
1536 );
1537 tokio::spawn(async move {
1538 let result = action.run_async(ctx).await;
1539 let completion = match result {
1540 Ok(mut completed_ctx) => ActionCompletion {
1541 transition_name: Arc::clone(&name),
1542 result: Ok(completed_ctx.take_outputs()),
1543 },
1544 Err(err) => ActionCompletion {
1545 transition_name: Arc::clone(&name),
1546 result: Err(err.message),
1547 },
1548 };
1549 let _ = tx.send(completion);
1550 });
1551 }
1552 }
1553
1554 fn millis_until_next_timed_transition(&self) -> f64 {
1555 let mut min_wait = f64::INFINITY;
1556 let now_ms = self.elapsed_ms();
1557
1558 for s in 0..self.summary_words {
1559 let mut summary = self.enabled_word_summary[s];
1560 while summary != 0 {
1561 let local_w = summary.trailing_zeros() as usize;
1562 summary &= summary - 1;
1563 let w = (s << bitmap::WORD_SHIFT) | local_w;
1564 if w >= self.transition_words {
1565 continue;
1566 }
1567 let mut word = self.enabled_bitmap[w];
1568 while word != 0 {
1569 let bit = word.trailing_zeros() as usize;
1570 let tid = (w << bitmap::WORD_SHIFT) | bit;
1571 word &= word - 1;
1572
1573 let elapsed = now_ms - self.enabled_at_ms[tid];
1574 let remaining_earliest = self.program.earliest_ms[tid] - elapsed;
1575 if remaining_earliest <= 0.0 {
1576 return 0.0;
1577 }
1578 min_wait = min_wait.min(remaining_earliest);
1579
1580 if self.program.has_deadline[tid] {
1581 let remaining_deadline = self.program.latest_ms[tid] - elapsed;
1582 if remaining_deadline <= 0.0 {
1583 return 0.0;
1584 }
1585 min_wait = min_wait.min(remaining_deadline);
1586 }
1587 }
1588 }
1589 }
1590
1591 min_wait
1592 }
1593}
1594
1595fn grow_ring_static(
1598 token_pool: &mut Vec<Option<ErasedToken>>,
1599 place_offset: &mut [usize],
1600 ring_head: &mut [usize],
1601 ring_tail: &mut [usize],
1602 ring_capacity: &mut [usize],
1603 token_counts: &[usize],
1604 pid: usize,
1605) {
1606 let old_cap = ring_capacity[pid];
1607 let new_cap = old_cap * 2;
1608 let old_offset = place_offset[pid];
1609 let head = ring_head[pid];
1610 let count = token_counts[pid];
1611
1612 let new_offset = token_pool.len();
1614 token_pool.resize_with(new_offset + new_cap, || None);
1615
1616 for i in 0..count {
1618 let old_idx = old_offset + (head + i) % old_cap;
1619 token_pool[new_offset + i] = token_pool[old_idx].take();
1620 }
1621
1622 place_offset[pid] = new_offset;
1623 ring_head[pid] = 0;
1624 ring_tail[pid] = count;
1625 ring_capacity[pid] = new_cap;
1626}
1627
1628fn now_millis() -> u64 {
1629 std::time::SystemTime::now()
1630 .duration_since(std::time::UNIX_EPOCH)
1631 .unwrap_or_default()
1632 .as_millis() as u64
1633}
1634
1635#[cfg(test)]
1636mod tests {
1637 use super::*;
1638 use crate::compiled_net::CompiledNet;
1639 use libpetri_core::action::{fork, passthrough, sync_action};
1640 use libpetri_core::input::one;
1641 use libpetri_core::output::out_place;
1642 use libpetri_core::petri_net::PetriNet;
1643 use libpetri_core::place::Place;
1644 use libpetri_core::token::Token;
1645 use libpetri_core::transition::Transition;
1646 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1647
1648 fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1649 let p1 = Place::<i32>::new("p1");
1650 let p2 = Place::<i32>::new("p2");
1651 let p3 = Place::<i32>::new("p3");
1652
1653 let t1 = Transition::builder("t1")
1654 .input(one(&p1))
1655 .output(out_place(&p2))
1656 .action(passthrough())
1657 .build();
1658 let t2 = Transition::builder("t2")
1659 .input(one(&p2))
1660 .output(out_place(&p3))
1661 .action(passthrough())
1662 .build();
1663
1664 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1665 (net, p1, p2, p3)
1666 }
1667
1668 #[test]
1669 fn sync_passthrough_chain() {
1670 let (net, p1, _p2, _p3) = simple_chain();
1671 let compiled = CompiledNet::compile(&net);
1672 let prog = PrecompiledNet::from_compiled(&compiled);
1673
1674 let mut marking = Marking::new();
1675 marking.add(&p1, Token::at(42, 0));
1676
1677 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1678 let result = executor.run_to_completion();
1679
1680 assert_eq!(result.count("p1"), 0);
1681 }
1682
1683 #[test]
1684 fn sync_fork_chain() {
1685 let p1 = Place::<i32>::new("p1");
1686 let p2 = Place::<i32>::new("p2");
1687 let p3 = Place::<i32>::new("p3");
1688
1689 let t1 = Transition::builder("t1")
1690 .input(one(&p1))
1691 .output(libpetri_core::output::and(vec![
1692 out_place(&p2),
1693 out_place(&p3),
1694 ]))
1695 .action(fork())
1696 .build();
1697
1698 let net = PetriNet::builder("fork").transition(t1).build();
1699 let compiled = CompiledNet::compile(&net);
1700 let prog = PrecompiledNet::from_compiled(&compiled);
1701
1702 let mut marking = Marking::new();
1703 marking.add(&p1, Token::at(42, 0));
1704
1705 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1706 let result = executor.run_to_completion();
1707
1708 assert_eq!(result.count("p1"), 0);
1709 assert_eq!(result.count("p2"), 1);
1710 assert_eq!(result.count("p3"), 1);
1711 }
1712
1713 #[test]
1714 fn sync_linear_chain_5() {
1715 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1716 let transitions: Vec<Transition> = (0..5)
1717 .map(|i| {
1718 Transition::builder(format!("t{i}"))
1719 .input(one(&places[i]))
1720 .output(out_place(&places[i + 1]))
1721 .action(fork())
1722 .build()
1723 })
1724 .collect();
1725
1726 let net = PetriNet::builder("chain5").transitions(transitions).build();
1727 let compiled = CompiledNet::compile(&net);
1728 let prog = PrecompiledNet::from_compiled(&compiled);
1729
1730 let mut marking = Marking::new();
1731 marking.add(&places[0], Token::at(1, 0));
1732
1733 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1734 let result = executor.run_to_completion();
1735
1736 assert_eq!(result.count("p0"), 0);
1737 assert_eq!(result.count("p5"), 1);
1738 }
1739
1740 #[test]
1741 fn sync_no_initial_tokens() {
1742 let (net, _, _, _) = simple_chain();
1743 let compiled = CompiledNet::compile(&net);
1744 let prog = PrecompiledNet::from_compiled(&compiled);
1745 let marking = Marking::new();
1746 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1747 let result = executor.run_to_completion();
1748 assert_eq!(result.count("p1"), 0);
1749 assert_eq!(result.count("p2"), 0);
1750 assert_eq!(result.count("p3"), 0);
1751 }
1752
1753 #[test]
1754 fn sync_priority_ordering() {
1755 let p = Place::<()>::new("p");
1756 let out_a = Place::<()>::new("a");
1757 let out_b = Place::<()>::new("b");
1758
1759 let t_high = Transition::builder("t_high")
1760 .input(one(&p))
1761 .output(out_place(&out_a))
1762 .action(passthrough())
1763 .priority(10)
1764 .build();
1765 let t_low = Transition::builder("t_low")
1766 .input(one(&p))
1767 .output(out_place(&out_b))
1768 .action(passthrough())
1769 .priority(1)
1770 .build();
1771
1772 let net = PetriNet::builder("priority")
1773 .transitions([t_high, t_low])
1774 .build();
1775 let compiled = CompiledNet::compile(&net);
1776 let prog = PrecompiledNet::from_compiled(&compiled);
1777
1778 let mut marking = Marking::new();
1779 marking.add(&p, Token::at((), 0));
1780
1781 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1782 let result = executor.run_to_completion();
1783
1784 assert_eq!(result.count("p"), 0);
1785 }
1786
1787 #[test]
1788 fn sync_inhibitor_blocks() {
1789 let p1 = Place::<()>::new("p1");
1790 let p2 = Place::<()>::new("p2");
1791 let p_inh = Place::<()>::new("inh");
1792
1793 let t = Transition::builder("t1")
1794 .input(one(&p1))
1795 .output(out_place(&p2))
1796 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1797 .action(passthrough())
1798 .build();
1799
1800 let net = PetriNet::builder("inhibitor").transition(t).build();
1801 let compiled = CompiledNet::compile(&net);
1802 let prog = PrecompiledNet::from_compiled(&compiled);
1803
1804 let mut marking = Marking::new();
1805 marking.add(&p1, Token::at((), 0));
1806 marking.add(&p_inh, Token::at((), 0));
1807
1808 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1809 let result = executor.run_to_completion();
1810
1811 assert_eq!(result.count("p1"), 1);
1812 }
1813
1814 #[test]
1815 fn read_arc_does_not_consume() {
1816 let p_in = Place::<i32>::new("in");
1817 let p_ctx = Place::<i32>::new("ctx");
1818 let p_out = Place::<i32>::new("out");
1819
1820 let t = Transition::builder("t1")
1821 .input(one(&p_in))
1822 .read(libpetri_core::arc::read(&p_ctx))
1823 .output(out_place(&p_out))
1824 .action(sync_action(|ctx| {
1825 let v = ctx.input::<i32>("in")?;
1826 let r = ctx.read::<i32>("ctx")?;
1827 ctx.output("out", *v + *r)?;
1828 Ok(())
1829 }))
1830 .build();
1831 let net = PetriNet::builder("test").transition(t).build();
1832 let compiled = CompiledNet::compile(&net);
1833 let prog = PrecompiledNet::from_compiled(&compiled);
1834
1835 let mut marking = Marking::new();
1836 marking.add(&p_in, Token::at(10, 0));
1837 marking.add(&p_ctx, Token::at(5, 0));
1838
1839 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1840 let result = executor.run_to_completion();
1841
1842 assert_eq!(result.count("in"), 0);
1843 assert_eq!(result.count("ctx"), 1);
1844 assert_eq!(result.count("out"), 1);
1845 }
1846
1847 #[test]
1848 fn reset_arc_removes_all_tokens() {
1849 let p_in = Place::<()>::new("in");
1850 let p_reset = Place::<i32>::new("reset");
1851 let p_out = Place::<()>::new("out");
1852
1853 let t = Transition::builder("t1")
1854 .input(one(&p_in))
1855 .reset(libpetri_core::arc::reset(&p_reset))
1856 .output(out_place(&p_out))
1857 .action(fork())
1858 .build();
1859 let net = PetriNet::builder("test").transition(t).build();
1860 let compiled = CompiledNet::compile(&net);
1861 let prog = PrecompiledNet::from_compiled(&compiled);
1862
1863 let mut marking = Marking::new();
1864 marking.add(&p_in, Token::at((), 0));
1865 marking.add(&p_reset, Token::at(1, 0));
1866 marking.add(&p_reset, Token::at(2, 0));
1867 marking.add(&p_reset, Token::at(3, 0));
1868
1869 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1870 let result = executor.run_to_completion();
1871
1872 assert_eq!(result.count("reset"), 0);
1873 assert_eq!(result.count("out"), 1);
1874 }
1875
1876 #[test]
1877 fn exactly_cardinality_consumes_n() {
1878 let p = Place::<i32>::new("p");
1879 let p_out = Place::<i32>::new("out");
1880
1881 let t = Transition::builder("t1")
1882 .input(libpetri_core::input::exactly(3, &p))
1883 .output(out_place(&p_out))
1884 .action(sync_action(|ctx| {
1885 let vals = ctx.inputs::<i32>("p")?;
1886 for v in vals {
1887 ctx.output("out", *v)?;
1888 }
1889 Ok(())
1890 }))
1891 .build();
1892 let net = PetriNet::builder("test").transition(t).build();
1893 let compiled = CompiledNet::compile(&net);
1894 let prog = PrecompiledNet::from_compiled(&compiled);
1895
1896 let mut marking = Marking::new();
1897 for i in 0..5 {
1898 marking.add(&p, Token::at(i, 0));
1899 }
1900
1901 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1902 let result = executor.run_to_completion();
1903
1904 assert_eq!(result.count("p"), 2);
1905 assert_eq!(result.count("out"), 3);
1906 }
1907
1908 #[test]
1909 fn all_cardinality_consumes_everything() {
1910 let p = Place::<i32>::new("p");
1911 let p_out = Place::<()>::new("out");
1912
1913 let t = Transition::builder("t1")
1914 .input(libpetri_core::input::all(&p))
1915 .output(out_place(&p_out))
1916 .action(sync_action(|ctx| {
1917 let vals = ctx.inputs::<i32>("p")?;
1918 ctx.output("out", vals.len() as i32)?;
1919 Ok(())
1920 }))
1921 .build();
1922 let net = PetriNet::builder("test").transition(t).build();
1923 let compiled = CompiledNet::compile(&net);
1924 let prog = PrecompiledNet::from_compiled(&compiled);
1925
1926 let mut marking = Marking::new();
1927 for i in 0..5 {
1928 marking.add(&p, Token::at(i, 0));
1929 }
1930
1931 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1932 let result = executor.run_to_completion();
1933
1934 assert_eq!(result.count("p"), 0);
1935 }
1936
1937 #[test]
1938 fn at_least_blocks_insufficient() {
1939 let p = Place::<i32>::new("p");
1940 let p_out = Place::<()>::new("out");
1941
1942 let t = Transition::builder("t1")
1943 .input(libpetri_core::input::at_least(3, &p))
1944 .output(out_place(&p_out))
1945 .action(passthrough())
1946 .build();
1947 let net = PetriNet::builder("test").transition(t).build();
1948 let compiled = CompiledNet::compile(&net);
1949 let prog = PrecompiledNet::from_compiled(&compiled);
1950
1951 let mut marking = Marking::new();
1952 marking.add(&p, Token::at(1, 0));
1953 marking.add(&p, Token::at(2, 0));
1954
1955 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1956 let result = executor.run_to_completion();
1957
1958 assert_eq!(result.count("p"), 2);
1959 }
1960
1961 #[test]
1962 fn at_least_fires_with_enough() {
1963 let p = Place::<i32>::new("p");
1964 let p_out = Place::<()>::new("out");
1965
1966 let t = Transition::builder("t1")
1967 .input(libpetri_core::input::at_least(3, &p))
1968 .output(out_place(&p_out))
1969 .action(passthrough())
1970 .build();
1971 let net = PetriNet::builder("test").transition(t).build();
1972 let compiled = CompiledNet::compile(&net);
1973 let prog = PrecompiledNet::from_compiled(&compiled);
1974
1975 let mut marking = Marking::new();
1976 for i in 0..5 {
1977 marking.add(&p, Token::at(i, 0));
1978 }
1979
1980 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1981 let result = executor.run_to_completion();
1982
1983 assert_eq!(result.count("p"), 0);
1984 }
1985
1986 #[test]
1987 fn guarded_input_only_consumes_matching() {
1988 let p = Place::<i32>::new("p");
1989 let p_out = Place::<i32>::new("out");
1990
1991 let t = Transition::builder("t1")
1992 .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1993 .output(out_place(&p_out))
1994 .action(fork())
1995 .build();
1996 let net = PetriNet::builder("test").transition(t).build();
1997 let compiled = CompiledNet::compile(&net);
1998 let prog = PrecompiledNet::from_compiled(&compiled);
1999
2000 let mut marking = Marking::new();
2001 marking.add(&p, Token::at(3, 0));
2002 marking.add(&p, Token::at(10, 0));
2003
2004 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2005 let result = executor.run_to_completion();
2006
2007 assert_eq!(result.count("p"), 1);
2008 assert_eq!(result.count("out"), 1);
2009 }
2010
2011 #[test]
2012 fn guarded_input_blocks_when_no_match() {
2013 let p = Place::<i32>::new("p");
2014 let p_out = Place::<i32>::new("out");
2015
2016 let t = Transition::builder("t1")
2017 .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
2018 .output(out_place(&p_out))
2019 .action(fork())
2020 .build();
2021 let net = PetriNet::builder("test").transition(t).build();
2022 let compiled = CompiledNet::compile(&net);
2023 let prog = PrecompiledNet::from_compiled(&compiled);
2024
2025 let mut marking = Marking::new();
2026 marking.add(&p, Token::at(3, 0));
2027 marking.add(&p, Token::at(10, 0));
2028
2029 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2030 let result = executor.run_to_completion();
2031
2032 assert_eq!(result.count("p"), 2);
2033 assert_eq!(result.count("out"), 0);
2034 }
2035
2036 #[test]
2037 fn event_store_records_lifecycle() {
2038 let p1 = Place::<i32>::new("p1");
2039 let p2 = Place::<i32>::new("p2");
2040 let t = Transition::builder("t1")
2041 .input(one(&p1))
2042 .output(out_place(&p2))
2043 .action(fork())
2044 .build();
2045 let net = PetriNet::builder("test").transition(t).build();
2046 let compiled = CompiledNet::compile(&net);
2047 let prog = PrecompiledNet::from_compiled(&compiled);
2048
2049 let mut marking = Marking::new();
2050 marking.add(&p1, Token::at(1, 0));
2051
2052 let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2053 let _result = executor.run_to_completion();
2054
2055 let events = executor.event_store().events();
2056 assert!(
2057 events
2058 .iter()
2059 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2060 );
2061 assert!(
2062 events
2063 .iter()
2064 .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
2065 );
2066 assert!(
2067 events
2068 .iter()
2069 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2070 );
2071 assert!(
2072 events
2073 .iter()
2074 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2075 );
2076 assert!(
2077 events
2078 .iter()
2079 .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
2080 );
2081 assert!(
2082 events
2083 .iter()
2084 .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
2085 );
2086 assert!(
2087 events
2088 .iter()
2089 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2090 );
2091 }
2092
2093 #[test]
2094 fn action_error_does_not_crash() {
2095 let p_in = Place::<i32>::new("in");
2096 let p_out = Place::<i32>::new("out");
2097
2098 let t = Transition::builder("t1")
2099 .input(one(&p_in))
2100 .output(out_place(&p_out))
2101 .action(sync_action(|_ctx| {
2102 Err(libpetri_core::action::ActionError::new(
2103 "intentional failure",
2104 ))
2105 }))
2106 .build();
2107 let net = PetriNet::builder("test").transition(t).build();
2108 let compiled = CompiledNet::compile(&net);
2109 let prog = PrecompiledNet::from_compiled(&compiled);
2110
2111 let mut marking = Marking::new();
2112 marking.add(&p_in, Token::at(42, 0));
2113
2114 let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2115 let result = executor.run_to_completion();
2116
2117 assert_eq!(result.count("in"), 0);
2118 assert_eq!(result.count("out"), 0);
2119
2120 let events = executor.event_store().events();
2121 assert!(
2122 events
2123 .iter()
2124 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2125 );
2126 }
2127
2128 #[test]
2129 fn multiple_input_arcs_require_all() {
2130 let p1 = Place::<i32>::new("p1");
2131 let p2 = Place::<i32>::new("p2");
2132 let p3 = Place::<i32>::new("p3");
2133
2134 let t = Transition::builder("t1")
2135 .input(one(&p1))
2136 .input(one(&p2))
2137 .output(out_place(&p3))
2138 .action(sync_action(|ctx| {
2139 ctx.output("p3", 99i32)?;
2140 Ok(())
2141 }))
2142 .build();
2143 let net = PetriNet::builder("test").transition(t).build();
2144 let compiled = CompiledNet::compile(&net);
2145 let prog = PrecompiledNet::from_compiled(&compiled);
2146
2147 let mut marking = Marking::new();
2149 marking.add(&p1, Token::at(1, 0));
2150 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2151 let result = executor.run_to_completion();
2152 assert_eq!(result.count("p1"), 1);
2153 assert_eq!(result.count("p3"), 0);
2154
2155 let compiled2 = CompiledNet::compile(&net);
2157 let prog2 = PrecompiledNet::from_compiled(&compiled2);
2158 let mut marking2 = Marking::new();
2159 marking2.add(&p1, Token::at(1, 0));
2160 marking2.add(&p2, Token::at(2, 0));
2161 let mut executor2 = PrecompiledNetExecutor::<NoopEventStore>::new(&prog2, marking2);
2162 let result2 = executor2.run_to_completion();
2163 assert_eq!(result2.count("p1"), 0);
2164 assert_eq!(result2.count("p2"), 0);
2165 assert_eq!(result2.count("p3"), 1);
2166 }
2167
2168 #[test]
2169 fn sync_action_custom_logic() {
2170 let p_in = Place::<i32>::new("in");
2171 let p_out = Place::<String>::new("out");
2172
2173 let t = Transition::builder("t1")
2174 .input(one(&p_in))
2175 .output(out_place(&p_out))
2176 .action(sync_action(|ctx| {
2177 let v = ctx.input::<i32>("in")?;
2178 ctx.output("out", format!("value={v}"))?;
2179 Ok(())
2180 }))
2181 .build();
2182 let net = PetriNet::builder("test").transition(t).build();
2183 let compiled = CompiledNet::compile(&net);
2184 let prog = PrecompiledNet::from_compiled(&compiled);
2185
2186 let mut marking = Marking::new();
2187 marking.add(&p_in, Token::at(42, 0));
2188
2189 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2190 let result = executor.run_to_completion();
2191
2192 assert_eq!(result.count("out"), 1);
2193 }
2194
2195 #[test]
2196 fn transform_action_outputs_to_all_places() {
2197 let p_in = Place::<i32>::new("in");
2198 let p_a = Place::<i32>::new("a");
2199 let p_b = Place::<i32>::new("b");
2200
2201 let t = Transition::builder("t1")
2202 .input(one(&p_in))
2203 .output(libpetri_core::output::and(vec![
2204 out_place(&p_a),
2205 out_place(&p_b),
2206 ]))
2207 .action(libpetri_core::action::transform(|ctx| {
2208 let v = ctx.input::<i32>("in").unwrap();
2209 Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
2210 }))
2211 .build();
2212 let net = PetriNet::builder("test").transition(t).build();
2213 let compiled = CompiledNet::compile(&net);
2214 let prog = PrecompiledNet::from_compiled(&compiled);
2215
2216 let mut marking = Marking::new();
2217 marking.add(&p_in, Token::at(5, 0));
2218
2219 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2220 let result = executor.run_to_completion();
2221
2222 assert_eq!(result.count("a"), 1);
2223 assert_eq!(result.count("b"), 1);
2224 }
2225
2226 #[test]
2227 fn complex_workflow() {
2228 use libpetri_core::output::{and, xor};
2229
2230 let input = Place::<i32>::new("v_input");
2231 let guard_in = Place::<i32>::new("v_guardIn");
2232 let intent_in = Place::<i32>::new("v_intentIn");
2233 let search_in = Place::<i32>::new("v_searchIn");
2234 let output_guard_in = Place::<i32>::new("v_outputGuardIn");
2235 let guard_safe = Place::<i32>::new("v_guardSafe");
2236 let guard_violation = Place::<i32>::new("v_guardViolation");
2237 let _violated = Place::<i32>::new("v_violated");
2238 let intent_ready = Place::<i32>::new("v_intentReady");
2239 let topic_ready = Place::<i32>::new("v_topicReady");
2240 let search_ready = Place::<i32>::new("v_searchReady");
2241 let _output_guard_done = Place::<i32>::new("v_outputGuardDone");
2242 let response = Place::<i32>::new("v_response");
2243
2244 let fork_trans = Transition::builder("Fork")
2245 .input(one(&input))
2246 .output(and(vec![
2247 out_place(&guard_in),
2248 out_place(&intent_in),
2249 out_place(&search_in),
2250 out_place(&output_guard_in),
2251 ]))
2252 .action(fork())
2253 .build();
2254
2255 let guard_trans = Transition::builder("Guard")
2256 .input(one(&guard_in))
2257 .output(xor(vec![
2258 out_place(&guard_safe),
2259 out_place(&guard_violation),
2260 ]))
2261 .action(fork())
2262 .build();
2263
2264 let handle_violation = Transition::builder("HandleViolation")
2265 .input(one(&guard_violation))
2266 .output(out_place(&_violated))
2267 .inhibitor(libpetri_core::arc::inhibitor(&guard_safe))
2268 .action(fork())
2269 .build();
2270
2271 let intent_trans = Transition::builder("Intent")
2272 .input(one(&intent_in))
2273 .output(out_place(&intent_ready))
2274 .action(fork())
2275 .build();
2276
2277 let topic_trans = Transition::builder("TopicKnowledge")
2278 .input(one(&intent_ready))
2279 .output(out_place(&topic_ready))
2280 .action(fork())
2281 .build();
2282
2283 let search_trans = Transition::builder("Search")
2284 .input(one(&search_in))
2285 .output(out_place(&search_ready))
2286 .read(libpetri_core::arc::read(&intent_ready))
2287 .inhibitor(libpetri_core::arc::inhibitor(&guard_violation))
2288 .priority(-5)
2289 .action(fork())
2290 .build();
2291
2292 let output_guard_trans = Transition::builder("OutputGuard")
2293 .input(one(&output_guard_in))
2294 .output(out_place(&_output_guard_done))
2295 .read(libpetri_core::arc::read(&guard_safe))
2296 .action(fork())
2297 .build();
2298
2299 let compose_trans = Transition::builder("Compose")
2300 .input(one(&guard_safe))
2301 .input(one(&search_ready))
2302 .input(one(&topic_ready))
2303 .output(out_place(&response))
2304 .priority(10)
2305 .action(fork())
2306 .build();
2307
2308 let net = PetriNet::builder("ComplexWorkflow")
2309 .transition(fork_trans)
2310 .transition(guard_trans)
2311 .transition(handle_violation)
2312 .transition(intent_trans)
2313 .transition(topic_trans)
2314 .transition(search_trans)
2315 .transition(output_guard_trans)
2316 .transition(compose_trans)
2317 .build();
2318
2319 let compiled = CompiledNet::compile(&net);
2320 let prog = PrecompiledNet::from_compiled(&compiled);
2321
2322 let mut marking = Marking::new();
2323 marking.add(&input, Token::at(1, 0));
2324
2325 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2326 let result = executor.run_to_completion();
2327
2328 assert_eq!(result.count("v_input"), 0); }
2334
2335 #[cfg(feature = "tokio")]
2336 mod async_tests {
2337 use super::*;
2338 use crate::environment::ExternalEvent;
2339 use libpetri_core::action::async_action;
2340 use libpetri_core::petri_net::PetriNet;
2341 use libpetri_core::token::ErasedToken;
2342
2343 #[tokio::test]
2344 async fn async_linear_chain() {
2345 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
2346 let transitions: Vec<Transition> = (0..5)
2347 .map(|i| {
2348 Transition::builder(format!("t{i}"))
2349 .input(one(&places[i]))
2350 .output(out_place(&places[i + 1]))
2351 .action(fork())
2352 .build()
2353 })
2354 .collect();
2355
2356 let net = PetriNet::builder("chain5").transitions(transitions).build();
2357 let compiled = CompiledNet::compile(&net);
2358 let prog = PrecompiledNet::from_compiled(&compiled);
2359
2360 let mut marking = Marking::new();
2361 marking.add(&places[0], Token::at(1, 0));
2362
2363 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2364 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2365 let result = executor.run_async(rx).await;
2366
2367 assert_eq!(result.count("p0"), 0);
2368 assert_eq!(result.count("p5"), 1);
2369 }
2370
2371 #[tokio::test]
2372 async fn async_action_execution() {
2373 let p1 = Place::<i32>::new("p1");
2374 let p2 = Place::<i32>::new("p2");
2375
2376 let t = Transition::builder("t1")
2377 .input(one(&p1))
2378 .output(out_place(&p2))
2379 .action(async_action(|ctx| async { Ok(ctx) }))
2380 .build();
2381
2382 let net = PetriNet::builder("async_test").transition(t).build();
2383 let compiled = CompiledNet::compile(&net);
2384 let prog = PrecompiledNet::from_compiled(&compiled);
2385
2386 let mut marking = Marking::new();
2387 marking.add(&p1, Token::at(42, 0));
2388
2389 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2390 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2391 let result = executor.run_async(rx).await;
2392
2393 assert_eq!(result.count("p1"), 0);
2394 }
2395
2396 #[tokio::test]
2399 async fn async_drain_terminates_at_quiescence() {
2400 let p1 = Place::<i32>::new("p1");
2401 let p2 = Place::<i32>::new("p2");
2402
2403 let t1 = Transition::builder("t1")
2404 .input(one(&p1))
2405 .output(out_place(&p2))
2406 .action(fork())
2407 .build();
2408
2409 let net = PetriNet::builder("test").transition(t1).build();
2410 let compiled = CompiledNet::compile(&net);
2411 let prog = PrecompiledNet::from_compiled(&compiled);
2412
2413 let marking = Marking::new();
2414 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2415 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2416 .build();
2417
2418 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2419
2420 tokio::spawn(async move {
2421 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2422 tx.send(ExecutorSignal::Event(ExternalEvent {
2423 place_name: Arc::from("p1"),
2424 token: ErasedToken::from_typed(&Token::at(42, 0)),
2425 }))
2426 .unwrap();
2427 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2428 tx.send(ExecutorSignal::Drain).unwrap();
2429 });
2430
2431 let result = executor.run_async(rx).await;
2432 assert_eq!(result.count("p2"), 1);
2433 }
2434
2435 #[tokio::test]
2436 async fn async_drain_rejects_post_drain_events() {
2437 let p1 = Place::<i32>::new("p1");
2438 let p2 = Place::<i32>::new("p2");
2439
2440 let t1 = Transition::builder("t1")
2441 .input(one(&p1))
2442 .output(out_place(&p2))
2443 .action(fork())
2444 .build();
2445
2446 let net = PetriNet::builder("test").transition(t1).build();
2447 let compiled = CompiledNet::compile(&net);
2448 let prog = PrecompiledNet::from_compiled(&compiled);
2449
2450 let marking = Marking::new();
2451 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2452 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2453 .build();
2454
2455 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2456
2457 tokio::spawn(async move {
2458 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2459 tx.send(ExecutorSignal::Drain).unwrap();
2460 tx.send(ExecutorSignal::Event(ExternalEvent {
2461 place_name: Arc::from("p1"),
2462 token: ErasedToken::from_typed(&Token::at(99, 0)),
2463 }))
2464 .unwrap();
2465 });
2466
2467 let result = executor.run_async(rx).await;
2468 assert_eq!(result.count("p2"), 0);
2469 }
2470
2471 #[tokio::test]
2472 async fn async_close_discards_queued_events() {
2473 let p1 = Place::<i32>::new("p1");
2474 let p2 = Place::<i32>::new("p2");
2475
2476 let t1 = Transition::builder("t1")
2477 .input(one(&p1))
2478 .output(out_place(&p2))
2479 .action(fork())
2480 .build();
2481
2482 let net = PetriNet::builder("test").transition(t1).build();
2483 let compiled = CompiledNet::compile(&net);
2484 let prog = PrecompiledNet::from_compiled(&compiled);
2485
2486 let marking = Marking::new();
2487 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2488 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2489 .build();
2490
2491 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2492
2493 tx.send(ExecutorSignal::Event(ExternalEvent {
2494 place_name: Arc::from("p1"),
2495 token: ErasedToken::from_typed(&Token::at(1, 0)),
2496 }))
2497 .unwrap();
2498 tx.send(ExecutorSignal::Close).unwrap();
2499 tx.send(ExecutorSignal::Event(ExternalEvent {
2500 place_name: Arc::from("p1"),
2501 token: ErasedToken::from_typed(&Token::at(2, 0)),
2502 }))
2503 .unwrap();
2504 drop(tx);
2505
2506 let result = executor.run_async(rx).await;
2507 assert!(result.count("p2") <= 1);
2508 }
2509
2510 #[tokio::test]
2511 async fn async_close_after_drain_escalates() {
2512 let p1 = Place::<i32>::new("p1");
2513 let p2 = Place::<i32>::new("p2");
2514
2515 let t1 = Transition::builder("t1")
2516 .input(one(&p1))
2517 .output(out_place(&p2))
2518 .action(fork())
2519 .build();
2520
2521 let net = PetriNet::builder("test").transition(t1).build();
2522 let compiled = CompiledNet::compile(&net);
2523 let prog = PrecompiledNet::from_compiled(&compiled);
2524
2525 let marking = Marking::new();
2526 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2527 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2528 .build();
2529
2530 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2531
2532 tokio::spawn(async move {
2533 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2534 tx.send(ExecutorSignal::Drain).unwrap();
2535 tx.send(ExecutorSignal::Close).unwrap();
2536 });
2537
2538 let _result = executor.run_async(rx).await;
2539 }
2541
2542 #[tokio::test]
2543 async fn async_handle_raii_drain_on_drop() {
2544 use crate::executor_handle::ExecutorHandle;
2545
2546 let p1 = Place::<i32>::new("p1");
2547 let p2 = Place::<i32>::new("p2");
2548
2549 let t1 = Transition::builder("t1")
2550 .input(one(&p1))
2551 .output(out_place(&p2))
2552 .action(fork())
2553 .build();
2554
2555 let net = PetriNet::builder("test").transition(t1).build();
2556 let compiled = CompiledNet::compile(&net);
2557 let prog = PrecompiledNet::from_compiled(&compiled);
2558
2559 let marking = Marking::new();
2560 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2561 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2562 .build();
2563
2564 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2565
2566 tokio::spawn(async move {
2567 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2568 let mut handle = ExecutorHandle::new(tx);
2569 handle.inject(
2570 Arc::from("p1"),
2571 ErasedToken::from_typed(&Token::at(7, 0)),
2572 );
2573 });
2575
2576 let result = executor.run_async(rx).await;
2577 assert_eq!(result.count("p2"), 1);
2578 }
2579 }
2580}