1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::token::ErasedToken;
8
9use libpetri_event::event_store::EventStore;
10use libpetri_event::net_event::NetEvent;
11
12use crate::bitmap;
13use crate::marking::Marking;
14use crate::precompiled_net::{
15 CONSUME_ALL, CONSUME_ATLEAST, CONSUME_N, CONSUME_ONE, PrecompiledNet, RESET,
16};
17
18const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21const INITIAL_RING_CAPACITY: usize = 4;
23
24pub struct PrecompiledNetExecutor<'a, E: EventStore> {
31 program: &'a PrecompiledNet<'a>,
32 event_store: E,
33 #[allow(dead_code)]
34 environment_places: HashSet<Arc<str>>,
35 #[allow(dead_code)]
36 long_running: bool,
37 #[allow(dead_code)]
38 skip_output_validation: bool,
39
40 token_pool: Vec<Option<ErasedToken>>,
42 place_offset: Vec<usize>,
43 token_counts: Vec<usize>,
44 ring_head: Vec<usize>,
45 ring_tail: Vec<usize>,
46 ring_capacity: Vec<usize>,
47
48 marking_bitmap: Vec<u64>,
50
51 enabled_bitmap: Vec<u64>,
53 dirty_bitmap: Vec<u64>,
54 dirty_scan_buffer: Vec<u64>,
55 enabled_at_ms: Vec<f64>,
56 enabled_transition_count: usize,
57
58 dirty_word_summary: Vec<u64>,
60 enabled_word_summary: Vec<u64>,
61 transition_words: usize,
62 summary_words: usize,
63
64 ready_queues: Vec<Vec<usize>>,
66 ready_queue_head: Vec<usize>,
67 ready_queue_tail: Vec<usize>,
68 ready_queue_size: Vec<usize>,
69
70 pending_reset_words: Vec<u64>,
72 has_pending_resets: bool,
73
74 reusable_inputs: HashMap<Arc<str>, Vec<ErasedToken>>,
76 reusable_reads: HashMap<Arc<str>, Vec<ErasedToken>>,
77
78 start_time: Instant,
80}
81
82pub struct PrecompiledExecutorBuilder<'a, E: EventStore> {
84 program: &'a PrecompiledNet<'a>,
85 initial_marking: Marking,
86 event_store: Option<E>,
87 environment_places: HashSet<Arc<str>>,
88 long_running: bool,
89 skip_output_validation: bool,
90}
91
92impl<'a, E: EventStore> PrecompiledExecutorBuilder<'a, E> {
93 pub fn event_store(mut self, store: E) -> Self {
95 self.event_store = Some(store);
96 self
97 }
98
99 pub fn environment_places(mut self, places: HashSet<Arc<str>>) -> Self {
101 self.environment_places = places;
102 self
103 }
104
105 pub fn long_running(mut self, enabled: bool) -> Self {
107 self.long_running = enabled;
108 self
109 }
110
111 pub fn skip_output_validation(mut self, skip: bool) -> Self {
113 self.skip_output_validation = skip;
114 self
115 }
116
117 pub fn build(self) -> PrecompiledNetExecutor<'a, E> {
119 PrecompiledNetExecutor::new_inner(
120 self.program,
121 self.initial_marking,
122 self.event_store.unwrap_or_default(),
123 self.environment_places,
124 self.long_running,
125 self.skip_output_validation,
126 )
127 }
128}
129
130impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
131 pub fn builder(
133 program: &'a PrecompiledNet<'a>,
134 initial_marking: Marking,
135 ) -> PrecompiledExecutorBuilder<'a, E> {
136 PrecompiledExecutorBuilder {
137 program,
138 initial_marking,
139 event_store: None,
140 environment_places: HashSet::new(),
141 long_running: false,
142 skip_output_validation: false,
143 }
144 }
145
146 pub fn new(program: &'a PrecompiledNet<'a>, initial_marking: Marking) -> Self {
148 Self::new_inner(
149 program,
150 initial_marking,
151 E::default(),
152 HashSet::new(),
153 false,
154 false,
155 )
156 }
157
158 fn new_inner(
159 program: &'a PrecompiledNet<'a>,
160 initial_marking: Marking,
161 event_store: E,
162 environment_places: HashSet<Arc<str>>,
163 long_running: bool,
164 skip_output_validation: bool,
165 ) -> Self {
166 let pc = program.place_count();
167 let tc = program.transition_count();
168 let wc = program.word_count();
169
170 let total_slots = pc * INITIAL_RING_CAPACITY;
172 let mut token_pool = vec![None; total_slots];
173 let mut place_offset = vec![0usize; pc];
174 let mut token_counts = vec![0usize; pc];
175 let mut ring_head = vec![0usize; pc];
176 let mut ring_tail = vec![0usize; pc];
177 let mut ring_capacity = vec![INITIAL_RING_CAPACITY; pc];
178
179 for (pid, offset) in place_offset.iter_mut().enumerate() {
180 *offset = pid * INITIAL_RING_CAPACITY;
181 }
182
183 for pid in 0..pc {
185 let place = program.place(pid);
186 if let Some(queue) = initial_marking.queue(place.name()) {
187 for token in queue {
188 if token_counts[pid] == ring_capacity[pid] {
190 grow_ring_static(
191 &mut token_pool,
192 &mut place_offset,
193 &mut ring_head,
194 &mut ring_tail,
195 &mut ring_capacity,
196 &token_counts,
197 pid,
198 );
199 }
200 let tail = ring_tail[pid];
201 let offset = place_offset[pid];
202 token_pool[offset + tail] = Some(token.clone());
203 ring_tail[pid] = (tail + 1) % ring_capacity[pid];
204 token_counts[pid] += 1;
205 }
206 }
207 }
208
209 let transition_words = bitmap::word_count(tc);
211 let summary_words = bitmap::word_count(transition_words);
212
213 let prio_count = program.distinct_priority_count;
215 let queue_cap = tc.max(4);
216 let ready_queues = vec![vec![0usize; queue_cap]; prio_count];
217 let ready_queue_head = vec![0usize; prio_count];
218 let ready_queue_tail = vec![0usize; prio_count];
219 let ready_queue_size = vec![0usize; prio_count];
220
221 Self {
222 program,
223 event_store,
224 environment_places,
225 long_running,
226 skip_output_validation,
227 token_pool,
228 place_offset,
229 token_counts,
230 ring_head,
231 ring_tail,
232 ring_capacity,
233 marking_bitmap: vec![0u64; wc],
234 enabled_bitmap: vec![0u64; transition_words],
235 dirty_bitmap: vec![0u64; transition_words],
236 dirty_scan_buffer: vec![0u64; transition_words],
237 enabled_at_ms: vec![f64::NEG_INFINITY; tc],
238 enabled_transition_count: 0,
239 dirty_word_summary: vec![0u64; summary_words],
240 enabled_word_summary: vec![0u64; summary_words],
241 transition_words,
242 summary_words,
243 ready_queues,
244 ready_queue_head,
245 ready_queue_tail,
246 ready_queue_size,
247 pending_reset_words: vec![0u64; wc],
248 has_pending_resets: false,
249 reusable_inputs: HashMap::new(),
250 reusable_reads: HashMap::new(),
251 start_time: Instant::now(),
252 }
253 }
254
255 pub fn run_sync(&mut self) -> Marking {
260 self.run_to_completion()
261 }
262
263 pub fn marking(&self) -> Marking {
265 self.materialize_marking()
266 }
267
268 pub fn event_store(&self) -> &E {
270 &self.event_store
271 }
272
273 pub fn is_quiescent(&self) -> bool {
275 self.enabled_transition_count == 0
276 }
277
278 #[inline]
281 fn ring_remove_first(&mut self, pid: usize) -> ErasedToken {
282 let head = self.ring_head[pid];
283 let offset = self.place_offset[pid];
284 let token = self.token_pool[offset + head].take().unwrap();
285 self.ring_head[pid] = (head + 1) % self.ring_capacity[pid];
286 self.token_counts[pid] -= 1;
287 token
288 }
289
290 #[inline]
291 fn ring_add_last(&mut self, pid: usize, token: ErasedToken) {
292 if self.token_counts[pid] == self.ring_capacity[pid] {
293 self.grow_ring(pid);
294 }
295 let tail = self.ring_tail[pid];
296 let offset = self.place_offset[pid];
297 self.token_pool[offset + tail] = Some(token);
298 self.ring_tail[pid] = (tail + 1) % self.ring_capacity[pid];
299 self.token_counts[pid] += 1;
300 }
301
302 #[inline]
303 fn ring_peek_first(&self, pid: usize) -> Option<&ErasedToken> {
304 if self.token_counts[pid] == 0 {
305 return None;
306 }
307 self.token_pool[self.place_offset[pid] + self.ring_head[pid]].as_ref()
308 }
309
310 fn ring_remove_all(&mut self, pid: usize) -> Vec<ErasedToken> {
311 let count = self.token_counts[pid];
312 if count == 0 {
313 return Vec::new();
314 }
315 let mut result = Vec::with_capacity(count);
316 for _ in 0..count {
317 result.push(self.ring_remove_first(pid));
318 }
319 result
320 }
321
322 fn grow_ring(&mut self, pid: usize) {
323 grow_ring_static(
324 &mut self.token_pool,
325 &mut self.place_offset,
326 &mut self.ring_head,
327 &mut self.ring_tail,
328 &mut self.ring_capacity,
329 &self.token_counts,
330 pid,
331 );
332 }
333
334 #[inline]
337 fn set_enabled_bit(&mut self, tid: usize) {
338 let w = tid >> bitmap::WORD_SHIFT;
339 self.enabled_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
340 self.enabled_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
341 }
342
343 #[inline]
344 fn clear_enabled_bit(&mut self, tid: usize) {
345 let w = tid >> bitmap::WORD_SHIFT;
346 self.enabled_bitmap[w] &= !(1u64 << (tid & bitmap::WORD_MASK));
347 if self.enabled_bitmap[w] == 0 {
348 self.enabled_word_summary[w >> bitmap::WORD_SHIFT] &=
349 !(1u64 << (w & bitmap::WORD_MASK));
350 }
351 }
352
353 #[inline]
354 fn is_enabled(&self, tid: usize) -> bool {
355 (self.enabled_bitmap[tid >> bitmap::WORD_SHIFT] & (1u64 << (tid & bitmap::WORD_MASK))) != 0
356 }
357
358 #[inline]
359 fn set_marking_bit(&mut self, pid: usize) {
360 bitmap::set_bit(&mut self.marking_bitmap, pid);
361 }
362
363 #[inline]
364 fn clear_marking_bit(&mut self, pid: usize) {
365 bitmap::clear_bit(&mut self.marking_bitmap, pid);
366 }
367
368 fn ready_queue_push(&mut self, tid: usize) {
371 let pi = self.program.transition_to_priority_index[tid];
372 if self.ready_queue_size[pi] == self.ready_queues[pi].len() {
373 let old_cap = self.ready_queues[pi].len();
374 let new_cap = old_cap * 2;
375 let mut new_queue = vec![0usize; new_cap];
376 let head = self.ready_queue_head[pi];
377 for (i, slot) in new_queue.iter_mut().enumerate().take(old_cap) {
378 *slot = self.ready_queues[pi][(head + i) % old_cap];
379 }
380 self.ready_queues[pi] = new_queue;
381 self.ready_queue_head[pi] = 0;
382 self.ready_queue_tail[pi] = old_cap;
383 }
384 let tail = self.ready_queue_tail[pi];
385 self.ready_queues[pi][tail] = tid;
386 self.ready_queue_tail[pi] = (tail + 1) % self.ready_queues[pi].len();
387 self.ready_queue_size[pi] += 1;
388 }
389
390 fn ready_queue_pop(&mut self, pi: usize) -> usize {
391 let head = self.ready_queue_head[pi];
392 let tid = self.ready_queues[pi][head];
393 self.ready_queue_head[pi] = (head + 1) % self.ready_queues[pi].len();
394 self.ready_queue_size[pi] -= 1;
395 tid
396 }
397
398 fn clear_all_ready_queues(&mut self) {
399 for pi in 0..self.program.distinct_priority_count {
400 self.ready_queue_head[pi] = 0;
401 self.ready_queue_tail[pi] = 0;
402 self.ready_queue_size[pi] = 0;
403 }
404 }
405
406 fn initialize_marking_bitmap(&mut self) {
409 for pid in 0..self.program.place_count() {
410 if self.token_counts[pid] > 0 {
411 self.set_marking_bit(pid);
412 }
413 }
414 }
415
416 fn mark_all_dirty(&mut self) {
417 let tc = self.program.transition_count();
418 let last_word_bits = tc & bitmap::WORD_MASK;
419 for w in 0..self.transition_words.saturating_sub(1) {
420 self.dirty_bitmap[w] = u64::MAX;
421 }
422 if self.transition_words > 0 {
423 self.dirty_bitmap[self.transition_words - 1] = if last_word_bits == 0 {
424 u64::MAX
425 } else {
426 (1u64 << last_word_bits) - 1
427 };
428 }
429 for s in 0..self.summary_words {
431 let first_w = s << bitmap::WORD_SHIFT;
432 let last_w = (first_w + bitmap::WORD_MASK).min(self.transition_words.saturating_sub(1));
433 let count = last_w - first_w + 1;
434 let last_bits = count & bitmap::WORD_MASK;
435 self.dirty_word_summary[s] = if last_bits == 0 {
436 u64::MAX
437 } else {
438 (1u64 << last_bits) - 1
439 };
440 }
441 }
442
443 fn should_terminate(&self) -> bool {
444 if self.long_running {
445 return false;
446 }
447 self.enabled_transition_count == 0
448 }
449
450 fn update_dirty_transitions(&mut self) {
453 let now_ms = self.elapsed_ms();
454
455 for s in 0..self.summary_words {
457 let mut summary = self.dirty_word_summary[s];
458 self.dirty_word_summary[s] = 0;
459 while summary != 0 {
460 let local_w = summary.trailing_zeros() as usize;
461 summary &= summary - 1;
462 let w = (s << bitmap::WORD_SHIFT) | local_w;
463 if w < self.transition_words {
464 self.dirty_scan_buffer[w] = self.dirty_bitmap[w];
465 self.dirty_bitmap[w] = 0;
466 }
467 }
468 }
469
470 let tc = self.program.transition_count();
472 for w in 0..self.transition_words {
473 let mut word = self.dirty_scan_buffer[w];
474 if word == 0 {
475 continue;
476 }
477 self.dirty_scan_buffer[w] = 0;
478 while word != 0 {
479 let bit = word.trailing_zeros() as usize;
480 let tid = (w << bitmap::WORD_SHIFT) | bit;
481 word &= word - 1;
482
483 if tid >= tc {
484 break;
485 }
486
487 let was_enabled = self.is_enabled(tid);
488 let can_now = self.can_enable(tid);
489
490 if can_now && !was_enabled {
491 self.set_enabled_bit(tid);
492 self.enabled_transition_count += 1;
493 self.enabled_at_ms[tid] = now_ms;
494
495 if E::ENABLED {
496 self.event_store.append(NetEvent::TransitionEnabled {
497 transition_name: Arc::clone(self.program.transition(tid).name_arc()),
498 timestamp: now_millis(),
499 });
500 }
501 } else if !can_now && was_enabled {
502 self.clear_enabled_bit(tid);
503 self.enabled_transition_count -= 1;
504 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
505 } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
506 self.enabled_at_ms[tid] = now_ms;
507 if E::ENABLED {
508 self.event_store.append(NetEvent::TransitionClockRestarted {
509 transition_name: Arc::clone(self.program.transition(tid).name_arc()),
510 timestamp: now_millis(),
511 });
512 }
513 }
514 }
515 }
516
517 self.clear_pending_resets();
518 }
519
520 fn can_enable(&self, tid: usize) -> bool {
521 if !self.program.can_enable_bitmap(tid, &self.marking_bitmap) {
522 return false;
523 }
524
525 if let Some(card_check) = self.program.compiled().cardinality_check(tid) {
527 for i in 0..card_check.place_ids.len() {
528 let pid = card_check.place_ids[i];
529 let required = card_check.required_counts[i];
530 if self.token_counts[pid] < required {
531 return false;
532 }
533 }
534 }
535
536 if self.program.compiled().has_guards(tid) {
538 let t = self.program.transition(tid);
539 for spec in t.input_specs() {
540 if let Some(guard) = spec.guard() {
541 let required = match spec {
542 In::One { .. } => 1,
543 In::Exactly { count, .. } => *count,
544 In::AtLeast { minimum, .. } => *minimum,
545 In::All { .. } => 1,
546 };
547 let pid = self.program.place_id(spec.place_name()).unwrap();
548 let count = self.count_matching_in_ring(pid, &**guard);
549 if count < required {
550 return false;
551 }
552 }
553 }
554 }
555
556 true
557 }
558
559 fn count_matching_in_ring(
560 &self,
561 pid: usize,
562 guard: &dyn Fn(&dyn std::any::Any) -> bool,
563 ) -> usize {
564 let count = self.token_counts[pid];
565 if count == 0 {
566 return 0;
567 }
568 let offset = self.place_offset[pid];
569 let head = self.ring_head[pid];
570 let cap = self.ring_capacity[pid];
571 let mut matched = 0;
572 for i in 0..count {
573 let idx = offset + (head + i) % cap;
574 if let Some(token) = &self.token_pool[idx]
575 && guard(token.value.as_ref())
576 {
577 matched += 1;
578 }
579 }
580 matched
581 }
582
583 fn has_input_from_reset_place(&self, tid: usize) -> bool {
584 if !self.has_pending_resets {
585 return false;
586 }
587 let input_mask = &self.program.input_place_mask_words[tid];
588 for (im, pr) in input_mask.iter().zip(self.pending_reset_words.iter()) {
589 if (im & pr) != 0 {
590 return true;
591 }
592 }
593 false
594 }
595
596 fn clear_pending_resets(&mut self) {
597 if self.has_pending_resets {
598 for w in &mut self.pending_reset_words {
599 *w = 0;
600 }
601 self.has_pending_resets = false;
602 }
603 }
604
605 fn enforce_deadlines(&mut self, now_ms: f64) {
608 for s in 0..self.summary_words {
609 let mut summary = self.enabled_word_summary[s];
610 while summary != 0 {
611 let local_w = summary.trailing_zeros() as usize;
612 summary &= summary - 1;
613 let w = (s << bitmap::WORD_SHIFT) | local_w;
614 if w >= self.transition_words {
615 continue;
616 }
617 let mut word = self.enabled_bitmap[w];
618 while word != 0 {
619 let bit = word.trailing_zeros() as usize;
620 let tid = (w << bitmap::WORD_SHIFT) | bit;
621 word &= word - 1;
622
623 if !self.program.has_deadline[tid] {
624 continue;
625 }
626
627 let elapsed = now_ms - self.enabled_at_ms[tid];
628 let latest_ms = self.program.latest_ms[tid];
629
630 if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
631 self.clear_enabled_bit(tid);
632 self.enabled_transition_count -= 1;
633 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
634 self.mark_transition_dirty(tid);
635
636 if E::ENABLED {
637 self.event_store.append(NetEvent::TransitionTimedOut {
638 transition_name: Arc::clone(
639 self.program.transition(tid).name_arc(),
640 ),
641 timestamp: now_millis(),
642 });
643 }
644 }
645 }
646 }
647 }
648 }
649
650 fn fire_ready_immediate_sync(&mut self) {
653 for s in 0..self.summary_words {
654 let mut summary = self.enabled_word_summary[s];
655 while summary != 0 {
656 let local_w = summary.trailing_zeros() as usize;
657 summary &= summary - 1;
658 let w = (s << bitmap::WORD_SHIFT) | local_w;
659 if w >= self.transition_words {
660 continue;
661 }
662 let word = self.enabled_bitmap[w];
663 let mut remaining = word;
664 while remaining != 0 {
665 let bit = remaining.trailing_zeros() as usize;
666 let tid = (w << bitmap::WORD_SHIFT) | bit;
667 remaining &= remaining - 1;
668
669 if self.can_enable(tid) {
670 self.fire_transition_sync(tid);
671 } else {
672 self.clear_enabled_bit(tid);
673 self.enabled_transition_count -= 1;
674 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
675 }
676 }
677 }
678 }
679 }
680
681 fn fire_ready_general_sync(&mut self, now_ms: f64) {
682 self.clear_all_ready_queues();
684
685 for s in 0..self.summary_words {
686 let mut summary = self.enabled_word_summary[s];
687 while summary != 0 {
688 let local_w = summary.trailing_zeros() as usize;
689 summary &= summary - 1;
690 let w = (s << bitmap::WORD_SHIFT) | local_w;
691 if w >= self.transition_words {
692 continue;
693 }
694 let mut word = self.enabled_bitmap[w];
695 while word != 0 {
696 let bit = word.trailing_zeros() as usize;
697 let tid = (w << bitmap::WORD_SHIFT) | bit;
698 word &= word - 1;
699
700 let enabled_ms = self.enabled_at_ms[tid];
701 let elapsed = now_ms - enabled_ms;
702
703 if self.program.earliest_ms[tid] <= elapsed {
704 self.ready_queue_push(tid);
705 }
706 }
707 }
708 }
709
710 for pi in 0..self.program.distinct_priority_count {
712 while self.ready_queue_size[pi] > 0 {
713 let tid = self.ready_queue_pop(pi);
714 if !self.is_enabled(tid) {
715 continue;
716 }
717
718 if self.can_enable(tid) {
719 self.fire_transition_sync(tid);
720 } else {
721 self.clear_enabled_bit(tid);
722 self.enabled_transition_count -= 1;
723 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
724 }
725 }
726 }
727 }
728
729 fn fire_transition_sync(&mut self, tid: usize) {
732 let has_guards = self.program.compiled().has_guards(tid);
733 let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
734 let action = Arc::clone(self.program.transition(tid).action());
735
736 self.reusable_inputs.clear();
738 self.reusable_reads.clear();
739
740 if has_guards {
741 let input_specs: Vec<In> = self.program.transition(tid).input_specs().to_vec();
743 let reset_arcs: Vec<_> = self.program.transition(tid).resets().to_vec();
744
745 for in_spec in &input_specs {
746 let pid = self.program.place_id(in_spec.place_name()).unwrap();
747 let place_name_arc = Arc::clone(&self.program.place_name_arcs[pid]);
748 let to_consume = match in_spec {
749 In::One { .. } => 1,
750 In::Exactly { count, .. } => *count,
751 In::All { guard, .. } | In::AtLeast { guard, .. } => {
752 if guard.is_some() {
753 self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
754 } else {
755 self.token_counts[pid]
756 }
757 }
758 };
759
760 for _ in 0..to_consume {
761 let token = if let Some(guard) = in_spec.guard() {
762 self.ring_remove_matching(pid, &**guard)
763 } else {
764 Some(self.ring_remove_first(pid))
765 };
766 if let Some(token) = token {
767 if E::ENABLED {
768 self.event_store.append(NetEvent::TokenRemoved {
769 place_name: Arc::clone(&place_name_arc),
770 timestamp: now_millis(),
771 });
772 }
773 self.reusable_inputs
774 .entry(Arc::clone(&place_name_arc))
775 .or_default()
776 .push(token);
777 }
778 }
779 }
780
781 for arc in &reset_arcs {
783 let pid = self.program.place_id(arc.place.name()).unwrap();
784 let removed = self.ring_remove_all(pid);
785 if E::ENABLED {
786 for _ in &removed {
787 self.event_store.append(NetEvent::TokenRemoved {
788 place_name: Arc::clone(arc.place.name_arc()),
789 timestamp: now_millis(),
790 });
791 }
792 }
793 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
794 1u64 << (pid & bitmap::WORD_MASK);
795 self.has_pending_resets = true;
796 }
797 } else {
798 let ops_len = self.program.consume_ops[tid].len();
800 let mut pc = 0;
801 while pc < ops_len {
802 let opcode = self.program.consume_ops[tid][pc];
803 pc += 1;
804 match opcode {
805 CONSUME_ONE => {
806 let pid = self.program.consume_ops[tid][pc] as usize;
807 pc += 1;
808 let token = self.ring_remove_first(pid);
809 if E::ENABLED {
810 self.event_store.append(NetEvent::TokenRemoved {
811 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
812 timestamp: now_millis(),
813 });
814 }
815 self.reusable_inputs
816 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
817 .or_default()
818 .push(token);
819 }
820 CONSUME_N => {
821 let pid = self.program.consume_ops[tid][pc] as usize;
822 pc += 1;
823 let count = self.program.consume_ops[tid][pc] as usize;
824 pc += 1;
825 for _ in 0..count {
826 let token = self.ring_remove_first(pid);
827 if E::ENABLED {
828 self.event_store.append(NetEvent::TokenRemoved {
829 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
830 timestamp: now_millis(),
831 });
832 }
833 self.reusable_inputs
834 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
835 .or_default()
836 .push(token);
837 }
838 }
839 CONSUME_ALL | CONSUME_ATLEAST => {
840 let pid = self.program.consume_ops[tid][pc] as usize;
841 pc += 1;
842 if opcode == CONSUME_ATLEAST {
843 pc += 1;
844 }
845 let count = self.token_counts[pid];
846 for _ in 0..count {
847 let token = self.ring_remove_first(pid);
848 if E::ENABLED {
849 self.event_store.append(NetEvent::TokenRemoved {
850 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
851 timestamp: now_millis(),
852 });
853 }
854 self.reusable_inputs
855 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
856 .or_default()
857 .push(token);
858 }
859 }
860 RESET => {
861 let pid = self.program.consume_ops[tid][pc] as usize;
862 pc += 1;
863 let count = self.token_counts[pid];
864 for _ in 0..count {
865 let _token = self.ring_remove_first(pid);
866 if E::ENABLED {
867 self.event_store.append(NetEvent::TokenRemoved {
868 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
869 timestamp: now_millis(),
870 });
871 }
872 }
873 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
874 1u64 << (pid & bitmap::WORD_MASK);
875 self.has_pending_resets = true;
876 }
877 _ => unreachable!("Unknown opcode: {opcode}"),
878 }
879 }
880 }
881
882 let read_ops_len = self.program.read_ops[tid].len();
884 for i in 0..read_ops_len {
885 let rpid = self.program.read_ops[tid][i];
886 let token_clone = self.ring_peek_first(rpid).cloned();
887 if let Some(token) = token_clone {
888 let place_name = Arc::clone(&self.program.place_name_arcs[rpid]);
889 self.reusable_reads
890 .entry(place_name)
891 .or_default()
892 .push(token);
893 }
894 }
895
896 self.update_bitmap_after_consumption(tid);
898
899 if E::ENABLED {
900 self.event_store.append(NetEvent::TransitionStarted {
901 transition_name: Arc::clone(&transition_name),
902 timestamp: now_millis(),
903 });
904 }
905
906 let inputs = std::mem::take(&mut self.reusable_inputs);
908 let reads = std::mem::take(&mut self.reusable_reads);
909 let mut ctx = TransitionContext::new(
910 Arc::clone(&transition_name),
911 inputs,
912 reads,
913 self.program.output_place_name_sets[tid].clone(),
914 None,
915 );
916
917 let result = action.run_sync(&mut ctx);
918
919 let returned_inputs = ctx.take_inputs();
921 let returned_reads = ctx.take_reads();
922
923 match result {
924 Ok(()) => {
925 let outputs = ctx.take_outputs();
926 self.process_outputs(tid, &transition_name, outputs);
927
928 if E::ENABLED {
929 self.event_store.append(NetEvent::TransitionCompleted {
930 transition_name: Arc::clone(&transition_name),
931 timestamp: now_millis(),
932 });
933 }
934 }
935 Err(err) => {
936 if E::ENABLED {
937 self.event_store.append(NetEvent::TransitionFailed {
938 transition_name: Arc::clone(&transition_name),
939 error: err.message,
940 timestamp: now_millis(),
941 });
942 }
943 }
944 }
945
946 self.reusable_inputs = returned_inputs;
948 self.reusable_reads = returned_reads;
949
950 self.clear_enabled_bit(tid);
952 self.enabled_transition_count -= 1;
953 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
954
955 self.mark_transition_dirty(tid);
957 }
958
959 fn ring_remove_matching(
962 &mut self,
963 pid: usize,
964 guard: &dyn Fn(&dyn std::any::Any) -> bool,
965 ) -> Option<ErasedToken> {
966 let count = self.token_counts[pid];
967 if count == 0 {
968 return None;
969 }
970 let offset = self.place_offset[pid];
971 let head = self.ring_head[pid];
972 let cap = self.ring_capacity[pid];
973
974 for i in 0..count {
976 let idx = offset + (head + i) % cap;
977 if let Some(token) = &self.token_pool[idx]
978 && guard(token.value.as_ref())
979 {
980 let token = self.token_pool[idx].take().unwrap();
981 for j in i..count - 1 {
983 let from = offset + (head + j + 1) % cap;
984 let to = offset + (head + j) % cap;
985 self.token_pool[to] = self.token_pool[from].take();
986 }
987 self.token_counts[pid] -= 1;
988 self.ring_tail[pid] = if self.ring_tail[pid] == 0 {
989 cap - 1
990 } else {
991 self.ring_tail[pid] - 1
992 };
993 return Some(token);
994 }
995 }
996 None
997 }
998
999 fn process_outputs(
1000 &mut self,
1001 _tid: usize,
1002 _transition_name: &Arc<str>,
1003 outputs: Vec<OutputEntry>,
1004 ) {
1005 for entry in outputs {
1006 if let Some(pid) = self.program.place_id(&entry.place_name) {
1007 self.ring_add_last(pid, entry.token);
1008 self.set_marking_bit(pid);
1009 self.mark_dirty(pid);
1010 }
1011
1012 if E::ENABLED {
1013 self.event_store.append(NetEvent::TokenAdded {
1014 place_name: Arc::clone(&entry.place_name),
1015 timestamp: now_millis(),
1016 });
1017 }
1018 }
1019 }
1020
1021 fn update_bitmap_after_consumption(&mut self, tid: usize) {
1022 let n = self.program.compiled().consumption_place_ids(tid).len();
1023 for i in 0..n {
1024 let pid = self.program.compiled().consumption_place_ids(tid)[i];
1025 if self.token_counts[pid] == 0 {
1026 self.clear_marking_bit(pid);
1027 }
1028 self.mark_dirty(pid);
1029 }
1030 }
1031
1032 fn has_dirty_bits(&self) -> bool {
1035 for &s in &self.dirty_word_summary {
1036 if s != 0 {
1037 return true;
1038 }
1039 }
1040 false
1041 }
1042
1043 fn mark_dirty(&mut self, pid: usize) {
1044 let n = self.program.compiled().affected_transitions(pid).len();
1045 for i in 0..n {
1046 let tid = self.program.compiled().affected_transitions(pid)[i];
1047 self.mark_transition_dirty(tid);
1048 }
1049 }
1050
1051 fn mark_transition_dirty(&mut self, tid: usize) {
1052 let w = tid >> bitmap::WORD_SHIFT;
1053 self.dirty_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
1054 self.dirty_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
1055 }
1056
1057 fn elapsed_ms(&self) -> f64 {
1058 self.start_time.elapsed().as_secs_f64() * 1000.0
1059 }
1060
1061 fn materialize_marking(&self) -> Marking {
1064 let mut marking = Marking::new();
1065 for pid in 0..self.program.place_count() {
1066 let count = self.token_counts[pid];
1067 if count == 0 {
1068 continue;
1069 }
1070 let place_name = self.program.place(pid).name_arc();
1071 let offset = self.place_offset[pid];
1072 let head = self.ring_head[pid];
1073 let cap = self.ring_capacity[pid];
1074 for i in 0..count {
1075 let idx = offset + (head + i) % cap;
1076 if let Some(token) = &self.token_pool[idx] {
1077 marking.add_erased(place_name, token.clone());
1078 }
1079 }
1080 }
1081 marking
1082 }
1083
1084 fn run_to_completion(&mut self) -> Marking {
1086 self.initialize_marking_bitmap();
1087 self.mark_all_dirty();
1088
1089 if E::ENABLED {
1090 let now = now_millis();
1091 self.event_store.append(NetEvent::ExecutionStarted {
1092 net_name: Arc::from(self.program.net().name()),
1093 timestamp: now,
1094 });
1095 }
1096
1097 loop {
1098 self.update_dirty_transitions();
1099
1100 let cycle_now = self.elapsed_ms();
1101
1102 if self.program.any_deadlines {
1103 self.enforce_deadlines(cycle_now);
1104 }
1105
1106 if self.should_terminate() {
1107 break;
1108 }
1109
1110 if self.program.all_immediate && self.program.all_same_priority {
1111 self.fire_ready_immediate_sync();
1112 } else {
1113 self.fire_ready_general_sync(cycle_now);
1114 }
1115
1116 if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
1117 break;
1118 }
1119 }
1120
1121 if E::ENABLED {
1122 let now = now_millis();
1123 self.event_store.append(NetEvent::ExecutionCompleted {
1124 net_name: Arc::from(self.program.net().name()),
1125 timestamp: now,
1126 });
1127 }
1128
1129 self.materialize_marking()
1130 }
1131}
1132
1133#[cfg(feature = "tokio")]
1135use crate::environment::ExternalEvent;
1136
1137#[cfg(feature = "tokio")]
1138struct ActionCompletion {
1139 transition_name: Arc<str>,
1140 result: Result<Vec<OutputEntry>, String>,
1141}
1142
1143#[cfg(feature = "tokio")]
1144impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
1145 pub async fn run_async(
1147 &mut self,
1148 mut event_rx: tokio::sync::mpsc::UnboundedReceiver<ExternalEvent>,
1149 ) -> Marking {
1150 let (completion_tx, mut completion_rx) =
1151 tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
1152
1153 self.initialize_marking_bitmap();
1154 self.mark_all_dirty();
1155
1156 let mut in_flight_count: usize = 0;
1157 let mut event_channel_open = true;
1158
1159 if E::ENABLED {
1160 let now = now_millis();
1161 self.event_store.append(NetEvent::ExecutionStarted {
1162 net_name: Arc::from(self.program.net().name()),
1163 timestamp: now,
1164 });
1165 }
1166
1167 loop {
1168 while let Ok(completion) = completion_rx.try_recv() {
1170 in_flight_count -= 1;
1171 match completion.result {
1172 Ok(outputs) => {
1173 self.process_outputs(0, &completion.transition_name, outputs);
1174 if E::ENABLED {
1175 self.event_store.append(NetEvent::TransitionCompleted {
1176 transition_name: Arc::clone(&completion.transition_name),
1177 timestamp: now_millis(),
1178 });
1179 }
1180 }
1181 Err(err) => {
1182 if E::ENABLED {
1183 self.event_store.append(NetEvent::TransitionFailed {
1184 transition_name: Arc::clone(&completion.transition_name),
1185 error: err,
1186 timestamp: now_millis(),
1187 });
1188 }
1189 }
1190 }
1191 }
1192
1193 while let Ok(event) = event_rx.try_recv() {
1195 if let Some(pid) = self.program.place_id(&event.place_name) {
1196 self.ring_add_last(pid, event.token);
1197 self.set_marking_bit(pid);
1198 self.mark_dirty(pid);
1199 }
1200 if E::ENABLED {
1201 self.event_store.append(NetEvent::TokenAdded {
1202 place_name: Arc::clone(&event.place_name),
1203 timestamp: now_millis(),
1204 });
1205 }
1206 }
1207
1208 self.update_dirty_transitions();
1210
1211 let cycle_now = self.elapsed_ms();
1213 if self.program.any_deadlines {
1214 self.enforce_deadlines(cycle_now);
1215 }
1216
1217 if self.enabled_transition_count == 0
1219 && in_flight_count == 0
1220 && (!self.long_running || !event_channel_open)
1221 {
1222 break;
1223 }
1224
1225 let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
1227
1228 if fired || self.has_dirty_bits() {
1229 tokio::task::yield_now().await;
1230 continue;
1231 }
1232
1233 if in_flight_count == 0 && !self.long_running {
1235 break;
1236 }
1237 if in_flight_count == 0 && !event_channel_open {
1238 break;
1239 }
1240
1241 let timer_ms = self.millis_until_next_timed_transition();
1242
1243 tokio::select! {
1244 Some(completion) = completion_rx.recv() => {
1245 in_flight_count -= 1;
1246 match completion.result {
1247 Ok(outputs) => {
1248 self.process_outputs(0, &completion.transition_name, outputs);
1249 if E::ENABLED {
1250 self.event_store.append(NetEvent::TransitionCompleted {
1251 transition_name: Arc::clone(&completion.transition_name),
1252 timestamp: now_millis(),
1253 });
1254 }
1255 }
1256 Err(err) => {
1257 if E::ENABLED {
1258 self.event_store.append(NetEvent::TransitionFailed {
1259 transition_name: Arc::clone(&completion.transition_name),
1260 error: err,
1261 timestamp: now_millis(),
1262 });
1263 }
1264 }
1265 }
1266 }
1267 result = event_rx.recv(), if event_channel_open => {
1268 match result {
1269 Some(event) => {
1270 if let Some(pid) = self.program.place_id(&event.place_name) {
1271 self.ring_add_last(pid, event.token);
1272 self.set_marking_bit(pid);
1273 self.mark_dirty(pid);
1274 }
1275 if E::ENABLED {
1276 self.event_store.append(NetEvent::TokenAdded {
1277 place_name: Arc::clone(&event.place_name),
1278 timestamp: now_millis(),
1279 });
1280 }
1281 }
1282 None => {
1283 event_channel_open = false;
1284 }
1285 }
1286 }
1287 _ = tokio::time::sleep(std::time::Duration::from_millis(
1288 if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
1289 )) => {}
1290 }
1291 }
1292
1293 if E::ENABLED {
1294 let now = now_millis();
1295 self.event_store.append(NetEvent::ExecutionCompleted {
1296 net_name: Arc::from(self.program.net().name()),
1297 timestamp: now,
1298 });
1299 }
1300
1301 self.materialize_marking()
1302 }
1303
1304 fn fire_ready_async(
1305 &mut self,
1306 now_ms: f64,
1307 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1308 in_flight_count: &mut usize,
1309 ) -> bool {
1310 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
1311
1312 for s in 0..self.summary_words {
1313 let mut summary = self.enabled_word_summary[s];
1314 while summary != 0 {
1315 let local_w = summary.trailing_zeros() as usize;
1316 summary &= summary - 1;
1317 let w = (s << bitmap::WORD_SHIFT) | local_w;
1318 if w >= self.transition_words {
1319 continue;
1320 }
1321 let mut word = self.enabled_bitmap[w];
1322 while word != 0 {
1323 let bit = word.trailing_zeros() as usize;
1324 let tid = (w << bitmap::WORD_SHIFT) | bit;
1325 word &= word - 1;
1326
1327 let enabled_ms = self.enabled_at_ms[tid];
1328 let elapsed = now_ms - enabled_ms;
1329 if self.program.earliest_ms[tid] <= elapsed {
1330 ready.push((tid, self.program.priorities[tid], enabled_ms));
1331 }
1332 }
1333 }
1334 }
1335
1336 if ready.is_empty() {
1337 return false;
1338 }
1339
1340 ready.sort_by(|a, b| {
1341 b.1.cmp(&a.1)
1342 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
1343 });
1344
1345 let mut fired_any = false;
1346 for (tid, _, _) in ready {
1347 if self.is_enabled(tid) && self.can_enable(tid) {
1348 self.fire_transition_async(tid, completion_tx, in_flight_count);
1349 fired_any = true;
1350 } else if self.is_enabled(tid) {
1351 self.clear_enabled_bit(tid);
1352 self.enabled_transition_count -= 1;
1353 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1354 }
1355 }
1356 fired_any
1357 }
1358
1359 fn fire_transition_async(
1360 &mut self,
1361 tid: usize,
1362 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1363 in_flight_count: &mut usize,
1364 ) {
1365 let t = self.program.transition(tid);
1366 let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
1367 let input_specs: Vec<In> = t.input_specs().to_vec();
1368 let read_arcs: Vec<_> = t.reads().to_vec();
1369 let reset_arcs: Vec<_> = t.resets().to_vec();
1370 let output_place_names = self.program.output_place_name_sets[tid].clone();
1371 let action = Arc::clone(t.action());
1372 let is_sync = action.is_sync();
1373
1374 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1376 for in_spec in &input_specs {
1377 let pid = self.program.place_id(in_spec.place_name()).unwrap();
1378 let to_consume = match in_spec {
1379 In::One { .. } => 1,
1380 In::Exactly { count, .. } => *count,
1381 In::All { guard, .. } | In::AtLeast { guard, .. } => {
1382 if guard.is_some() {
1383 self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
1384 } else {
1385 self.token_counts[pid]
1386 }
1387 }
1388 };
1389
1390 let place_name_arc = Arc::clone(in_spec.place().name_arc());
1391 for _ in 0..to_consume {
1392 let token = if let Some(guard) = in_spec.guard() {
1393 self.ring_remove_matching(pid, &**guard)
1394 } else {
1395 Some(self.ring_remove_first(pid))
1396 };
1397 if let Some(token) = token {
1398 if E::ENABLED {
1399 self.event_store.append(NetEvent::TokenRemoved {
1400 place_name: Arc::clone(&place_name_arc),
1401 timestamp: now_millis(),
1402 });
1403 }
1404 inputs
1405 .entry(Arc::clone(&place_name_arc))
1406 .or_default()
1407 .push(token);
1408 }
1409 }
1410 }
1411
1412 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1414 for arc in &read_arcs {
1415 let rpid = self.program.place_id(arc.place.name()).unwrap();
1416 if let Some(token) = self.ring_peek_first(rpid) {
1417 read_tokens
1418 .entry(Arc::clone(arc.place.name_arc()))
1419 .or_default()
1420 .push(token.clone());
1421 }
1422 }
1423
1424 for arc in &reset_arcs {
1426 let pid = self.program.place_id(arc.place.name()).unwrap();
1427 let removed = self.ring_remove_all(pid);
1428 if E::ENABLED {
1429 for _ in &removed {
1430 self.event_store.append(NetEvent::TokenRemoved {
1431 place_name: Arc::clone(arc.place.name_arc()),
1432 timestamp: now_millis(),
1433 });
1434 }
1435 }
1436 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
1437 1u64 << (pid & bitmap::WORD_MASK);
1438 self.has_pending_resets = true;
1439 }
1440
1441 self.update_bitmap_after_consumption(tid);
1442
1443 if E::ENABLED {
1444 self.event_store.append(NetEvent::TransitionStarted {
1445 transition_name: Arc::clone(&transition_name),
1446 timestamp: now_millis(),
1447 });
1448 }
1449
1450 self.clear_enabled_bit(tid);
1452 self.enabled_transition_count -= 1;
1453 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1454 self.mark_transition_dirty(tid);
1455
1456 if is_sync {
1457 let mut ctx = TransitionContext::new(
1458 Arc::clone(&transition_name),
1459 inputs,
1460 read_tokens,
1461 output_place_names,
1462 None,
1463 );
1464 let result = action.run_sync(&mut ctx);
1465 match result {
1466 Ok(()) => {
1467 let outputs = ctx.take_outputs();
1468 self.process_outputs(tid, &transition_name, outputs);
1469 if E::ENABLED {
1470 self.event_store.append(NetEvent::TransitionCompleted {
1471 transition_name: Arc::clone(&transition_name),
1472 timestamp: now_millis(),
1473 });
1474 }
1475 }
1476 Err(err) => {
1477 if E::ENABLED {
1478 self.event_store.append(NetEvent::TransitionFailed {
1479 transition_name: Arc::clone(&transition_name),
1480 error: err.message,
1481 timestamp: now_millis(),
1482 });
1483 }
1484 }
1485 }
1486 } else {
1487 *in_flight_count += 1;
1488 let tx = completion_tx.clone();
1489 let name = Arc::clone(&transition_name);
1490 let ctx = TransitionContext::new(
1491 Arc::clone(&transition_name),
1492 inputs,
1493 read_tokens,
1494 output_place_names,
1495 None,
1496 );
1497 tokio::spawn(async move {
1498 let result = action.run_async(ctx).await;
1499 let completion = match result {
1500 Ok(mut completed_ctx) => ActionCompletion {
1501 transition_name: Arc::clone(&name),
1502 result: Ok(completed_ctx.take_outputs()),
1503 },
1504 Err(err) => ActionCompletion {
1505 transition_name: Arc::clone(&name),
1506 result: Err(err.message),
1507 },
1508 };
1509 let _ = tx.send(completion);
1510 });
1511 }
1512 }
1513
1514 fn millis_until_next_timed_transition(&self) -> f64 {
1515 let mut min_wait = f64::INFINITY;
1516 let now_ms = self.elapsed_ms();
1517
1518 for s in 0..self.summary_words {
1519 let mut summary = self.enabled_word_summary[s];
1520 while summary != 0 {
1521 let local_w = summary.trailing_zeros() as usize;
1522 summary &= summary - 1;
1523 let w = (s << bitmap::WORD_SHIFT) | local_w;
1524 if w >= self.transition_words {
1525 continue;
1526 }
1527 let mut word = self.enabled_bitmap[w];
1528 while word != 0 {
1529 let bit = word.trailing_zeros() as usize;
1530 let tid = (w << bitmap::WORD_SHIFT) | bit;
1531 word &= word - 1;
1532
1533 let elapsed = now_ms - self.enabled_at_ms[tid];
1534 let remaining_earliest = self.program.earliest_ms[tid] - elapsed;
1535 if remaining_earliest <= 0.0 {
1536 return 0.0;
1537 }
1538 min_wait = min_wait.min(remaining_earliest);
1539
1540 if self.program.has_deadline[tid] {
1541 let remaining_deadline = self.program.latest_ms[tid] - elapsed;
1542 if remaining_deadline <= 0.0 {
1543 return 0.0;
1544 }
1545 min_wait = min_wait.min(remaining_deadline);
1546 }
1547 }
1548 }
1549 }
1550
1551 min_wait
1552 }
1553}
1554
1555fn grow_ring_static(
1558 token_pool: &mut Vec<Option<ErasedToken>>,
1559 place_offset: &mut [usize],
1560 ring_head: &mut [usize],
1561 ring_tail: &mut [usize],
1562 ring_capacity: &mut [usize],
1563 token_counts: &[usize],
1564 pid: usize,
1565) {
1566 let old_cap = ring_capacity[pid];
1567 let new_cap = old_cap * 2;
1568 let old_offset = place_offset[pid];
1569 let head = ring_head[pid];
1570 let count = token_counts[pid];
1571
1572 let new_offset = token_pool.len();
1574 token_pool.resize_with(new_offset + new_cap, || None);
1575
1576 for i in 0..count {
1578 let old_idx = old_offset + (head + i) % old_cap;
1579 token_pool[new_offset + i] = token_pool[old_idx].take();
1580 }
1581
1582 place_offset[pid] = new_offset;
1583 ring_head[pid] = 0;
1584 ring_tail[pid] = count;
1585 ring_capacity[pid] = new_cap;
1586}
1587
1588fn now_millis() -> u64 {
1589 std::time::SystemTime::now()
1590 .duration_since(std::time::UNIX_EPOCH)
1591 .unwrap_or_default()
1592 .as_millis() as u64
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use super::*;
1598 use crate::compiled_net::CompiledNet;
1599 use libpetri_core::action::{fork, passthrough, sync_action};
1600 use libpetri_core::input::one;
1601 use libpetri_core::output::out_place;
1602 use libpetri_core::petri_net::PetriNet;
1603 use libpetri_core::place::Place;
1604 use libpetri_core::token::Token;
1605 use libpetri_core::transition::Transition;
1606 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1607
1608 fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1609 let p1 = Place::<i32>::new("p1");
1610 let p2 = Place::<i32>::new("p2");
1611 let p3 = Place::<i32>::new("p3");
1612
1613 let t1 = Transition::builder("t1")
1614 .input(one(&p1))
1615 .output(out_place(&p2))
1616 .action(passthrough())
1617 .build();
1618 let t2 = Transition::builder("t2")
1619 .input(one(&p2))
1620 .output(out_place(&p3))
1621 .action(passthrough())
1622 .build();
1623
1624 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1625 (net, p1, p2, p3)
1626 }
1627
1628 #[test]
1629 fn sync_passthrough_chain() {
1630 let (net, p1, _p2, _p3) = simple_chain();
1631 let compiled = CompiledNet::compile(&net);
1632 let prog = PrecompiledNet::from_compiled(&compiled);
1633
1634 let mut marking = Marking::new();
1635 marking.add(&p1, Token::at(42, 0));
1636
1637 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1638 let result = executor.run_to_completion();
1639
1640 assert_eq!(result.count("p1"), 0);
1641 }
1642
1643 #[test]
1644 fn sync_fork_chain() {
1645 let p1 = Place::<i32>::new("p1");
1646 let p2 = Place::<i32>::new("p2");
1647 let p3 = Place::<i32>::new("p3");
1648
1649 let t1 = Transition::builder("t1")
1650 .input(one(&p1))
1651 .output(libpetri_core::output::and(vec![
1652 out_place(&p2),
1653 out_place(&p3),
1654 ]))
1655 .action(fork())
1656 .build();
1657
1658 let net = PetriNet::builder("fork").transition(t1).build();
1659 let compiled = CompiledNet::compile(&net);
1660 let prog = PrecompiledNet::from_compiled(&compiled);
1661
1662 let mut marking = Marking::new();
1663 marking.add(&p1, Token::at(42, 0));
1664
1665 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1666 let result = executor.run_to_completion();
1667
1668 assert_eq!(result.count("p1"), 0);
1669 assert_eq!(result.count("p2"), 1);
1670 assert_eq!(result.count("p3"), 1);
1671 }
1672
1673 #[test]
1674 fn sync_linear_chain_5() {
1675 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1676 let transitions: Vec<Transition> = (0..5)
1677 .map(|i| {
1678 Transition::builder(format!("t{i}"))
1679 .input(one(&places[i]))
1680 .output(out_place(&places[i + 1]))
1681 .action(fork())
1682 .build()
1683 })
1684 .collect();
1685
1686 let net = PetriNet::builder("chain5").transitions(transitions).build();
1687 let compiled = CompiledNet::compile(&net);
1688 let prog = PrecompiledNet::from_compiled(&compiled);
1689
1690 let mut marking = Marking::new();
1691 marking.add(&places[0], Token::at(1, 0));
1692
1693 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1694 let result = executor.run_to_completion();
1695
1696 assert_eq!(result.count("p0"), 0);
1697 assert_eq!(result.count("p5"), 1);
1698 }
1699
1700 #[test]
1701 fn sync_no_initial_tokens() {
1702 let (net, _, _, _) = simple_chain();
1703 let compiled = CompiledNet::compile(&net);
1704 let prog = PrecompiledNet::from_compiled(&compiled);
1705 let marking = Marking::new();
1706 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1707 let result = executor.run_to_completion();
1708 assert_eq!(result.count("p1"), 0);
1709 assert_eq!(result.count("p2"), 0);
1710 assert_eq!(result.count("p3"), 0);
1711 }
1712
1713 #[test]
1714 fn sync_priority_ordering() {
1715 let p = Place::<()>::new("p");
1716 let out_a = Place::<()>::new("a");
1717 let out_b = Place::<()>::new("b");
1718
1719 let t_high = Transition::builder("t_high")
1720 .input(one(&p))
1721 .output(out_place(&out_a))
1722 .action(passthrough())
1723 .priority(10)
1724 .build();
1725 let t_low = Transition::builder("t_low")
1726 .input(one(&p))
1727 .output(out_place(&out_b))
1728 .action(passthrough())
1729 .priority(1)
1730 .build();
1731
1732 let net = PetriNet::builder("priority")
1733 .transitions([t_high, t_low])
1734 .build();
1735 let compiled = CompiledNet::compile(&net);
1736 let prog = PrecompiledNet::from_compiled(&compiled);
1737
1738 let mut marking = Marking::new();
1739 marking.add(&p, Token::at((), 0));
1740
1741 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1742 let result = executor.run_to_completion();
1743
1744 assert_eq!(result.count("p"), 0);
1745 }
1746
1747 #[test]
1748 fn sync_inhibitor_blocks() {
1749 let p1 = Place::<()>::new("p1");
1750 let p2 = Place::<()>::new("p2");
1751 let p_inh = Place::<()>::new("inh");
1752
1753 let t = Transition::builder("t1")
1754 .input(one(&p1))
1755 .output(out_place(&p2))
1756 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1757 .action(passthrough())
1758 .build();
1759
1760 let net = PetriNet::builder("inhibitor").transition(t).build();
1761 let compiled = CompiledNet::compile(&net);
1762 let prog = PrecompiledNet::from_compiled(&compiled);
1763
1764 let mut marking = Marking::new();
1765 marking.add(&p1, Token::at((), 0));
1766 marking.add(&p_inh, Token::at((), 0));
1767
1768 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1769 let result = executor.run_to_completion();
1770
1771 assert_eq!(result.count("p1"), 1);
1772 }
1773
1774 #[test]
1775 fn read_arc_does_not_consume() {
1776 let p_in = Place::<i32>::new("in");
1777 let p_ctx = Place::<i32>::new("ctx");
1778 let p_out = Place::<i32>::new("out");
1779
1780 let t = Transition::builder("t1")
1781 .input(one(&p_in))
1782 .read(libpetri_core::arc::read(&p_ctx))
1783 .output(out_place(&p_out))
1784 .action(sync_action(|ctx| {
1785 let v = ctx.input::<i32>("in")?;
1786 let r = ctx.read::<i32>("ctx")?;
1787 ctx.output("out", *v + *r)?;
1788 Ok(())
1789 }))
1790 .build();
1791 let net = PetriNet::builder("test").transition(t).build();
1792 let compiled = CompiledNet::compile(&net);
1793 let prog = PrecompiledNet::from_compiled(&compiled);
1794
1795 let mut marking = Marking::new();
1796 marking.add(&p_in, Token::at(10, 0));
1797 marking.add(&p_ctx, Token::at(5, 0));
1798
1799 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1800 let result = executor.run_to_completion();
1801
1802 assert_eq!(result.count("in"), 0);
1803 assert_eq!(result.count("ctx"), 1);
1804 assert_eq!(result.count("out"), 1);
1805 }
1806
1807 #[test]
1808 fn reset_arc_removes_all_tokens() {
1809 let p_in = Place::<()>::new("in");
1810 let p_reset = Place::<i32>::new("reset");
1811 let p_out = Place::<()>::new("out");
1812
1813 let t = Transition::builder("t1")
1814 .input(one(&p_in))
1815 .reset(libpetri_core::arc::reset(&p_reset))
1816 .output(out_place(&p_out))
1817 .action(fork())
1818 .build();
1819 let net = PetriNet::builder("test").transition(t).build();
1820 let compiled = CompiledNet::compile(&net);
1821 let prog = PrecompiledNet::from_compiled(&compiled);
1822
1823 let mut marking = Marking::new();
1824 marking.add(&p_in, Token::at((), 0));
1825 marking.add(&p_reset, Token::at(1, 0));
1826 marking.add(&p_reset, Token::at(2, 0));
1827 marking.add(&p_reset, Token::at(3, 0));
1828
1829 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1830 let result = executor.run_to_completion();
1831
1832 assert_eq!(result.count("reset"), 0);
1833 assert_eq!(result.count("out"), 1);
1834 }
1835
1836 #[test]
1837 fn exactly_cardinality_consumes_n() {
1838 let p = Place::<i32>::new("p");
1839 let p_out = Place::<i32>::new("out");
1840
1841 let t = Transition::builder("t1")
1842 .input(libpetri_core::input::exactly(3, &p))
1843 .output(out_place(&p_out))
1844 .action(sync_action(|ctx| {
1845 let vals = ctx.inputs::<i32>("p")?;
1846 for v in vals {
1847 ctx.output("out", *v)?;
1848 }
1849 Ok(())
1850 }))
1851 .build();
1852 let net = PetriNet::builder("test").transition(t).build();
1853 let compiled = CompiledNet::compile(&net);
1854 let prog = PrecompiledNet::from_compiled(&compiled);
1855
1856 let mut marking = Marking::new();
1857 for i in 0..5 {
1858 marking.add(&p, Token::at(i, 0));
1859 }
1860
1861 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1862 let result = executor.run_to_completion();
1863
1864 assert_eq!(result.count("p"), 2);
1865 assert_eq!(result.count("out"), 3);
1866 }
1867
1868 #[test]
1869 fn all_cardinality_consumes_everything() {
1870 let p = Place::<i32>::new("p");
1871 let p_out = Place::<()>::new("out");
1872
1873 let t = Transition::builder("t1")
1874 .input(libpetri_core::input::all(&p))
1875 .output(out_place(&p_out))
1876 .action(sync_action(|ctx| {
1877 let vals = ctx.inputs::<i32>("p")?;
1878 ctx.output("out", vals.len() as i32)?;
1879 Ok(())
1880 }))
1881 .build();
1882 let net = PetriNet::builder("test").transition(t).build();
1883 let compiled = CompiledNet::compile(&net);
1884 let prog = PrecompiledNet::from_compiled(&compiled);
1885
1886 let mut marking = Marking::new();
1887 for i in 0..5 {
1888 marking.add(&p, Token::at(i, 0));
1889 }
1890
1891 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1892 let result = executor.run_to_completion();
1893
1894 assert_eq!(result.count("p"), 0);
1895 }
1896
1897 #[test]
1898 fn at_least_blocks_insufficient() {
1899 let p = Place::<i32>::new("p");
1900 let p_out = Place::<()>::new("out");
1901
1902 let t = Transition::builder("t1")
1903 .input(libpetri_core::input::at_least(3, &p))
1904 .output(out_place(&p_out))
1905 .action(passthrough())
1906 .build();
1907 let net = PetriNet::builder("test").transition(t).build();
1908 let compiled = CompiledNet::compile(&net);
1909 let prog = PrecompiledNet::from_compiled(&compiled);
1910
1911 let mut marking = Marking::new();
1912 marking.add(&p, Token::at(1, 0));
1913 marking.add(&p, Token::at(2, 0));
1914
1915 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1916 let result = executor.run_to_completion();
1917
1918 assert_eq!(result.count("p"), 2);
1919 }
1920
1921 #[test]
1922 fn at_least_fires_with_enough() {
1923 let p = Place::<i32>::new("p");
1924 let p_out = Place::<()>::new("out");
1925
1926 let t = Transition::builder("t1")
1927 .input(libpetri_core::input::at_least(3, &p))
1928 .output(out_place(&p_out))
1929 .action(passthrough())
1930 .build();
1931 let net = PetriNet::builder("test").transition(t).build();
1932 let compiled = CompiledNet::compile(&net);
1933 let prog = PrecompiledNet::from_compiled(&compiled);
1934
1935 let mut marking = Marking::new();
1936 for i in 0..5 {
1937 marking.add(&p, Token::at(i, 0));
1938 }
1939
1940 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1941 let result = executor.run_to_completion();
1942
1943 assert_eq!(result.count("p"), 0);
1944 }
1945
1946 #[test]
1947 fn guarded_input_only_consumes_matching() {
1948 let p = Place::<i32>::new("p");
1949 let p_out = Place::<i32>::new("out");
1950
1951 let t = Transition::builder("t1")
1952 .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1953 .output(out_place(&p_out))
1954 .action(fork())
1955 .build();
1956 let net = PetriNet::builder("test").transition(t).build();
1957 let compiled = CompiledNet::compile(&net);
1958 let prog = PrecompiledNet::from_compiled(&compiled);
1959
1960 let mut marking = Marking::new();
1961 marking.add(&p, Token::at(3, 0));
1962 marking.add(&p, Token::at(10, 0));
1963
1964 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1965 let result = executor.run_to_completion();
1966
1967 assert_eq!(result.count("p"), 1);
1968 assert_eq!(result.count("out"), 1);
1969 }
1970
1971 #[test]
1972 fn guarded_input_blocks_when_no_match() {
1973 let p = Place::<i32>::new("p");
1974 let p_out = Place::<i32>::new("out");
1975
1976 let t = Transition::builder("t1")
1977 .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1978 .output(out_place(&p_out))
1979 .action(fork())
1980 .build();
1981 let net = PetriNet::builder("test").transition(t).build();
1982 let compiled = CompiledNet::compile(&net);
1983 let prog = PrecompiledNet::from_compiled(&compiled);
1984
1985 let mut marking = Marking::new();
1986 marking.add(&p, Token::at(3, 0));
1987 marking.add(&p, Token::at(10, 0));
1988
1989 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1990 let result = executor.run_to_completion();
1991
1992 assert_eq!(result.count("p"), 2);
1993 assert_eq!(result.count("out"), 0);
1994 }
1995
1996 #[test]
1997 fn event_store_records_lifecycle() {
1998 let p1 = Place::<i32>::new("p1");
1999 let p2 = Place::<i32>::new("p2");
2000 let t = Transition::builder("t1")
2001 .input(one(&p1))
2002 .output(out_place(&p2))
2003 .action(fork())
2004 .build();
2005 let net = PetriNet::builder("test").transition(t).build();
2006 let compiled = CompiledNet::compile(&net);
2007 let prog = PrecompiledNet::from_compiled(&compiled);
2008
2009 let mut marking = Marking::new();
2010 marking.add(&p1, Token::at(1, 0));
2011
2012 let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2013 let _result = executor.run_to_completion();
2014
2015 let events = executor.event_store().events();
2016 assert!(
2017 events
2018 .iter()
2019 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2020 );
2021 assert!(
2022 events
2023 .iter()
2024 .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
2025 );
2026 assert!(
2027 events
2028 .iter()
2029 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2030 );
2031 assert!(
2032 events
2033 .iter()
2034 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2035 );
2036 assert!(
2037 events
2038 .iter()
2039 .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
2040 );
2041 assert!(
2042 events
2043 .iter()
2044 .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
2045 );
2046 assert!(
2047 events
2048 .iter()
2049 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2050 );
2051 }
2052
2053 #[test]
2054 fn action_error_does_not_crash() {
2055 let p_in = Place::<i32>::new("in");
2056 let p_out = Place::<i32>::new("out");
2057
2058 let t = Transition::builder("t1")
2059 .input(one(&p_in))
2060 .output(out_place(&p_out))
2061 .action(sync_action(|_ctx| {
2062 Err(libpetri_core::action::ActionError::new(
2063 "intentional failure",
2064 ))
2065 }))
2066 .build();
2067 let net = PetriNet::builder("test").transition(t).build();
2068 let compiled = CompiledNet::compile(&net);
2069 let prog = PrecompiledNet::from_compiled(&compiled);
2070
2071 let mut marking = Marking::new();
2072 marking.add(&p_in, Token::at(42, 0));
2073
2074 let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2075 let result = executor.run_to_completion();
2076
2077 assert_eq!(result.count("in"), 0);
2078 assert_eq!(result.count("out"), 0);
2079
2080 let events = executor.event_store().events();
2081 assert!(
2082 events
2083 .iter()
2084 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2085 );
2086 }
2087
2088 #[test]
2089 fn multiple_input_arcs_require_all() {
2090 let p1 = Place::<i32>::new("p1");
2091 let p2 = Place::<i32>::new("p2");
2092 let p3 = Place::<i32>::new("p3");
2093
2094 let t = Transition::builder("t1")
2095 .input(one(&p1))
2096 .input(one(&p2))
2097 .output(out_place(&p3))
2098 .action(sync_action(|ctx| {
2099 ctx.output("p3", 99i32)?;
2100 Ok(())
2101 }))
2102 .build();
2103 let net = PetriNet::builder("test").transition(t).build();
2104 let compiled = CompiledNet::compile(&net);
2105 let prog = PrecompiledNet::from_compiled(&compiled);
2106
2107 let mut marking = Marking::new();
2109 marking.add(&p1, Token::at(1, 0));
2110 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2111 let result = executor.run_to_completion();
2112 assert_eq!(result.count("p1"), 1);
2113 assert_eq!(result.count("p3"), 0);
2114
2115 let compiled2 = CompiledNet::compile(&net);
2117 let prog2 = PrecompiledNet::from_compiled(&compiled2);
2118 let mut marking2 = Marking::new();
2119 marking2.add(&p1, Token::at(1, 0));
2120 marking2.add(&p2, Token::at(2, 0));
2121 let mut executor2 = PrecompiledNetExecutor::<NoopEventStore>::new(&prog2, marking2);
2122 let result2 = executor2.run_to_completion();
2123 assert_eq!(result2.count("p1"), 0);
2124 assert_eq!(result2.count("p2"), 0);
2125 assert_eq!(result2.count("p3"), 1);
2126 }
2127
2128 #[test]
2129 fn sync_action_custom_logic() {
2130 let p_in = Place::<i32>::new("in");
2131 let p_out = Place::<String>::new("out");
2132
2133 let t = Transition::builder("t1")
2134 .input(one(&p_in))
2135 .output(out_place(&p_out))
2136 .action(sync_action(|ctx| {
2137 let v = ctx.input::<i32>("in")?;
2138 ctx.output("out", format!("value={v}"))?;
2139 Ok(())
2140 }))
2141 .build();
2142 let net = PetriNet::builder("test").transition(t).build();
2143 let compiled = CompiledNet::compile(&net);
2144 let prog = PrecompiledNet::from_compiled(&compiled);
2145
2146 let mut marking = Marking::new();
2147 marking.add(&p_in, Token::at(42, 0));
2148
2149 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2150 let result = executor.run_to_completion();
2151
2152 assert_eq!(result.count("out"), 1);
2153 }
2154
2155 #[test]
2156 fn transform_action_outputs_to_all_places() {
2157 let p_in = Place::<i32>::new("in");
2158 let p_a = Place::<i32>::new("a");
2159 let p_b = Place::<i32>::new("b");
2160
2161 let t = Transition::builder("t1")
2162 .input(one(&p_in))
2163 .output(libpetri_core::output::and(vec![
2164 out_place(&p_a),
2165 out_place(&p_b),
2166 ]))
2167 .action(libpetri_core::action::transform(|ctx| {
2168 let v = ctx.input::<i32>("in").unwrap();
2169 Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
2170 }))
2171 .build();
2172 let net = PetriNet::builder("test").transition(t).build();
2173 let compiled = CompiledNet::compile(&net);
2174 let prog = PrecompiledNet::from_compiled(&compiled);
2175
2176 let mut marking = Marking::new();
2177 marking.add(&p_in, Token::at(5, 0));
2178
2179 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2180 let result = executor.run_to_completion();
2181
2182 assert_eq!(result.count("a"), 1);
2183 assert_eq!(result.count("b"), 1);
2184 }
2185
2186 #[test]
2187 fn complex_workflow() {
2188 use libpetri_core::output::{and, xor};
2189
2190 let input = Place::<i32>::new("v_input");
2191 let guard_in = Place::<i32>::new("v_guardIn");
2192 let intent_in = Place::<i32>::new("v_intentIn");
2193 let search_in = Place::<i32>::new("v_searchIn");
2194 let output_guard_in = Place::<i32>::new("v_outputGuardIn");
2195 let guard_safe = Place::<i32>::new("v_guardSafe");
2196 let guard_violation = Place::<i32>::new("v_guardViolation");
2197 let _violated = Place::<i32>::new("v_violated");
2198 let intent_ready = Place::<i32>::new("v_intentReady");
2199 let topic_ready = Place::<i32>::new("v_topicReady");
2200 let search_ready = Place::<i32>::new("v_searchReady");
2201 let _output_guard_done = Place::<i32>::new("v_outputGuardDone");
2202 let response = Place::<i32>::new("v_response");
2203
2204 let fork_trans = Transition::builder("Fork")
2205 .input(one(&input))
2206 .output(and(vec![
2207 out_place(&guard_in),
2208 out_place(&intent_in),
2209 out_place(&search_in),
2210 out_place(&output_guard_in),
2211 ]))
2212 .action(fork())
2213 .build();
2214
2215 let guard_trans = Transition::builder("Guard")
2216 .input(one(&guard_in))
2217 .output(xor(vec![
2218 out_place(&guard_safe),
2219 out_place(&guard_violation),
2220 ]))
2221 .action(fork())
2222 .build();
2223
2224 let handle_violation = Transition::builder("HandleViolation")
2225 .input(one(&guard_violation))
2226 .output(out_place(&_violated))
2227 .inhibitor(libpetri_core::arc::inhibitor(&guard_safe))
2228 .action(fork())
2229 .build();
2230
2231 let intent_trans = Transition::builder("Intent")
2232 .input(one(&intent_in))
2233 .output(out_place(&intent_ready))
2234 .action(fork())
2235 .build();
2236
2237 let topic_trans = Transition::builder("TopicKnowledge")
2238 .input(one(&intent_ready))
2239 .output(out_place(&topic_ready))
2240 .action(fork())
2241 .build();
2242
2243 let search_trans = Transition::builder("Search")
2244 .input(one(&search_in))
2245 .output(out_place(&search_ready))
2246 .read(libpetri_core::arc::read(&intent_ready))
2247 .inhibitor(libpetri_core::arc::inhibitor(&guard_violation))
2248 .priority(-5)
2249 .action(fork())
2250 .build();
2251
2252 let output_guard_trans = Transition::builder("OutputGuard")
2253 .input(one(&output_guard_in))
2254 .output(out_place(&_output_guard_done))
2255 .read(libpetri_core::arc::read(&guard_safe))
2256 .action(fork())
2257 .build();
2258
2259 let compose_trans = Transition::builder("Compose")
2260 .input(one(&guard_safe))
2261 .input(one(&search_ready))
2262 .input(one(&topic_ready))
2263 .output(out_place(&response))
2264 .priority(10)
2265 .action(fork())
2266 .build();
2267
2268 let net = PetriNet::builder("ComplexWorkflow")
2269 .transition(fork_trans)
2270 .transition(guard_trans)
2271 .transition(handle_violation)
2272 .transition(intent_trans)
2273 .transition(topic_trans)
2274 .transition(search_trans)
2275 .transition(output_guard_trans)
2276 .transition(compose_trans)
2277 .build();
2278
2279 let compiled = CompiledNet::compile(&net);
2280 let prog = PrecompiledNet::from_compiled(&compiled);
2281
2282 let mut marking = Marking::new();
2283 marking.add(&input, Token::at(1, 0));
2284
2285 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2286 let result = executor.run_to_completion();
2287
2288 assert_eq!(result.count("v_input"), 0); }
2294
2295 #[cfg(feature = "tokio")]
2296 mod async_tests {
2297 use super::*;
2298 use libpetri_core::action::async_action;
2299 use libpetri_core::petri_net::PetriNet;
2300
2301 #[tokio::test]
2302 async fn async_linear_chain() {
2303 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
2304 let transitions: Vec<Transition> = (0..5)
2305 .map(|i| {
2306 Transition::builder(format!("t{i}"))
2307 .input(one(&places[i]))
2308 .output(out_place(&places[i + 1]))
2309 .action(fork())
2310 .build()
2311 })
2312 .collect();
2313
2314 let net = PetriNet::builder("chain5").transitions(transitions).build();
2315 let compiled = CompiledNet::compile(&net);
2316 let prog = PrecompiledNet::from_compiled(&compiled);
2317
2318 let mut marking = Marking::new();
2319 marking.add(&places[0], Token::at(1, 0));
2320
2321 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2322 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2323 let result = executor.run_async(rx).await;
2324
2325 assert_eq!(result.count("p0"), 0);
2326 assert_eq!(result.count("p5"), 1);
2327 }
2328
2329 #[tokio::test]
2330 async fn async_action_execution() {
2331 let p1 = Place::<i32>::new("p1");
2332 let p2 = Place::<i32>::new("p2");
2333
2334 let t = Transition::builder("t1")
2335 .input(one(&p1))
2336 .output(out_place(&p2))
2337 .action(async_action(|ctx| async { Ok(ctx) }))
2338 .build();
2339
2340 let net = PetriNet::builder("async_test").transition(t).build();
2341 let compiled = CompiledNet::compile(&net);
2342 let prog = PrecompiledNet::from_compiled(&compiled);
2343
2344 let mut marking = Marking::new();
2345 marking.add(&p1, Token::at(42, 0));
2346
2347 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2348 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2349 let result = executor.run_async(rx).await;
2350
2351 assert_eq!(result.count("p1"), 0);
2352 }
2353 }
2354}