1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::token::ErasedToken;
8
9use libpetri_event::event_store::EventStore;
10use libpetri_event::net_event::NetEvent;
11
12use crate::bitmap;
13use crate::marking::Marking;
14use crate::precompiled_net::{
15 CONSUME_ALL, CONSUME_ATLEAST, CONSUME_N, CONSUME_ONE, PrecompiledNet, RESET,
16};
17
18const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21const INITIAL_RING_CAPACITY: usize = 4;
23
24pub struct PrecompiledNetExecutor<'a, E: EventStore> {
31 program: &'a PrecompiledNet<'a>,
32 event_store: E,
33 #[allow(dead_code)]
34 environment_places: HashSet<Arc<str>>,
35 has_environment_places: bool,
36 #[allow(dead_code)]
37 skip_output_validation: bool,
38
39 token_pool: Vec<Option<ErasedToken>>,
41 place_offset: Vec<usize>,
42 token_counts: Vec<usize>,
43 ring_head: Vec<usize>,
44 ring_tail: Vec<usize>,
45 ring_capacity: Vec<usize>,
46
47 marking_bitmap: Vec<u64>,
49
50 enabled_bitmap: Vec<u64>,
52 dirty_bitmap: Vec<u64>,
53 dirty_scan_buffer: Vec<u64>,
54 enabled_at_ms: Vec<f64>,
55 enabled_transition_count: usize,
56
57 dirty_word_summary: Vec<u64>,
59 enabled_word_summary: Vec<u64>,
60 transition_words: usize,
61 summary_words: usize,
62
63 ready_queues: Vec<Vec<usize>>,
65 ready_queue_head: Vec<usize>,
66 ready_queue_tail: Vec<usize>,
67 ready_queue_size: Vec<usize>,
68
69 pending_reset_words: Vec<u64>,
71 has_pending_resets: bool,
72
73 reusable_inputs: HashMap<Arc<str>, Vec<ErasedToken>>,
75 reusable_reads: HashMap<Arc<str>, Vec<ErasedToken>>,
76
77 start_time: Instant,
79}
80
81pub struct PrecompiledExecutorBuilder<'a, E: EventStore> {
83 program: &'a PrecompiledNet<'a>,
84 initial_marking: Marking,
85 event_store: Option<E>,
86 environment_places: HashSet<Arc<str>>,
87 skip_output_validation: bool,
88}
89
90impl<'a, E: EventStore> PrecompiledExecutorBuilder<'a, E> {
91 pub fn event_store(mut self, store: E) -> Self {
93 self.event_store = Some(store);
94 self
95 }
96
97 pub fn environment_places(mut self, places: HashSet<Arc<str>>) -> Self {
99 self.environment_places = places;
100 self
101 }
102
103 pub fn skip_output_validation(mut self, skip: bool) -> Self {
105 self.skip_output_validation = skip;
106 self
107 }
108
109 pub fn build(self) -> PrecompiledNetExecutor<'a, E> {
111 PrecompiledNetExecutor::new_inner(
112 self.program,
113 self.initial_marking,
114 self.event_store.unwrap_or_default(),
115 self.environment_places,
116 self.skip_output_validation,
117 )
118 }
119}
120
121impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
122 pub fn builder(
124 program: &'a PrecompiledNet<'a>,
125 initial_marking: Marking,
126 ) -> PrecompiledExecutorBuilder<'a, E> {
127 PrecompiledExecutorBuilder {
128 program,
129 initial_marking,
130 event_store: None,
131 environment_places: HashSet::new(),
132 skip_output_validation: false,
133 }
134 }
135
136 pub fn new(program: &'a PrecompiledNet<'a>, initial_marking: Marking) -> Self {
138 Self::new_inner(
139 program,
140 initial_marking,
141 E::default(),
142 HashSet::new(),
143 false,
144 )
145 }
146
147 fn new_inner(
148 program: &'a PrecompiledNet<'a>,
149 initial_marking: Marking,
150 event_store: E,
151 environment_places: HashSet<Arc<str>>,
152 skip_output_validation: bool,
153 ) -> Self {
154 let pc = program.place_count();
155 let tc = program.transition_count();
156 let wc = program.word_count();
157
158 let total_slots = pc * INITIAL_RING_CAPACITY;
160 let mut token_pool = vec![None; total_slots];
161 let mut place_offset = vec![0usize; pc];
162 let mut token_counts = vec![0usize; pc];
163 let mut ring_head = vec![0usize; pc];
164 let mut ring_tail = vec![0usize; pc];
165 let mut ring_capacity = vec![INITIAL_RING_CAPACITY; pc];
166
167 for (pid, offset) in place_offset.iter_mut().enumerate() {
168 *offset = pid * INITIAL_RING_CAPACITY;
169 }
170
171 for pid in 0..pc {
173 let place = program.place(pid);
174 if let Some(queue) = initial_marking.queue(place.name()) {
175 for token in queue {
176 if token_counts[pid] == ring_capacity[pid] {
178 grow_ring_static(
179 &mut token_pool,
180 &mut place_offset,
181 &mut ring_head,
182 &mut ring_tail,
183 &mut ring_capacity,
184 &token_counts,
185 pid,
186 );
187 }
188 let tail = ring_tail[pid];
189 let offset = place_offset[pid];
190 token_pool[offset + tail] = Some(token.clone());
191 ring_tail[pid] = (tail + 1) % ring_capacity[pid];
192 token_counts[pid] += 1;
193 }
194 }
195 }
196
197 let transition_words = bitmap::word_count(tc);
199 let summary_words = bitmap::word_count(transition_words);
200
201 let prio_count = program.distinct_priority_count;
203 let queue_cap = tc.max(4);
204 let ready_queues = vec![vec![0usize; queue_cap]; prio_count];
205 let ready_queue_head = vec![0usize; prio_count];
206 let ready_queue_tail = vec![0usize; prio_count];
207 let ready_queue_size = vec![0usize; prio_count];
208
209 Self {
210 program,
211 event_store,
212 has_environment_places: !environment_places.is_empty(),
213 environment_places,
214 skip_output_validation,
215 token_pool,
216 place_offset,
217 token_counts,
218 ring_head,
219 ring_tail,
220 ring_capacity,
221 marking_bitmap: vec![0u64; wc],
222 enabled_bitmap: vec![0u64; transition_words],
223 dirty_bitmap: vec![0u64; transition_words],
224 dirty_scan_buffer: vec![0u64; transition_words],
225 enabled_at_ms: vec![f64::NEG_INFINITY; tc],
226 enabled_transition_count: 0,
227 dirty_word_summary: vec![0u64; summary_words],
228 enabled_word_summary: vec![0u64; summary_words],
229 transition_words,
230 summary_words,
231 ready_queues,
232 ready_queue_head,
233 ready_queue_tail,
234 ready_queue_size,
235 pending_reset_words: vec![0u64; wc],
236 has_pending_resets: false,
237 reusable_inputs: HashMap::new(),
238 reusable_reads: HashMap::new(),
239 start_time: Instant::now(),
240 }
241 }
242
243 pub fn run_sync(&mut self) -> Marking {
248 self.run_to_completion()
249 }
250
251 pub fn marking(&self) -> Marking {
253 self.materialize_marking()
254 }
255
256 pub fn event_store(&self) -> &E {
258 &self.event_store
259 }
260
261 pub fn is_quiescent(&self) -> bool {
263 self.enabled_transition_count == 0
264 }
265
266 #[inline]
269 fn ring_remove_first(&mut self, pid: usize) -> ErasedToken {
270 let head = self.ring_head[pid];
271 let offset = self.place_offset[pid];
272 let token = self.token_pool[offset + head].take().unwrap();
273 self.ring_head[pid] = (head + 1) % self.ring_capacity[pid];
274 self.token_counts[pid] -= 1;
275 token
276 }
277
278 #[inline]
279 fn ring_add_last(&mut self, pid: usize, token: ErasedToken) {
280 if self.token_counts[pid] == self.ring_capacity[pid] {
281 self.grow_ring(pid);
282 }
283 let tail = self.ring_tail[pid];
284 let offset = self.place_offset[pid];
285 self.token_pool[offset + tail] = Some(token);
286 self.ring_tail[pid] = (tail + 1) % self.ring_capacity[pid];
287 self.token_counts[pid] += 1;
288 }
289
290 #[inline]
291 fn ring_peek_first(&self, pid: usize) -> Option<&ErasedToken> {
292 if self.token_counts[pid] == 0 {
293 return None;
294 }
295 self.token_pool[self.place_offset[pid] + self.ring_head[pid]].as_ref()
296 }
297
298 fn ring_remove_all(&mut self, pid: usize) -> Vec<ErasedToken> {
299 let count = self.token_counts[pid];
300 if count == 0 {
301 return Vec::new();
302 }
303 let mut result = Vec::with_capacity(count);
304 for _ in 0..count {
305 result.push(self.ring_remove_first(pid));
306 }
307 result
308 }
309
310 fn grow_ring(&mut self, pid: usize) {
311 grow_ring_static(
312 &mut self.token_pool,
313 &mut self.place_offset,
314 &mut self.ring_head,
315 &mut self.ring_tail,
316 &mut self.ring_capacity,
317 &self.token_counts,
318 pid,
319 );
320 }
321
322 #[inline]
325 fn set_enabled_bit(&mut self, tid: usize) {
326 let w = tid >> bitmap::WORD_SHIFT;
327 self.enabled_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
328 self.enabled_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
329 }
330
331 #[inline]
332 fn clear_enabled_bit(&mut self, tid: usize) {
333 let w = tid >> bitmap::WORD_SHIFT;
334 self.enabled_bitmap[w] &= !(1u64 << (tid & bitmap::WORD_MASK));
335 if self.enabled_bitmap[w] == 0 {
336 self.enabled_word_summary[w >> bitmap::WORD_SHIFT] &=
337 !(1u64 << (w & bitmap::WORD_MASK));
338 }
339 }
340
341 #[inline]
342 fn is_enabled(&self, tid: usize) -> bool {
343 (self.enabled_bitmap[tid >> bitmap::WORD_SHIFT] & (1u64 << (tid & bitmap::WORD_MASK))) != 0
344 }
345
346 #[inline]
347 fn set_marking_bit(&mut self, pid: usize) {
348 bitmap::set_bit(&mut self.marking_bitmap, pid);
349 }
350
351 #[inline]
352 fn clear_marking_bit(&mut self, pid: usize) {
353 bitmap::clear_bit(&mut self.marking_bitmap, pid);
354 }
355
356 fn ready_queue_push(&mut self, tid: usize) {
359 let pi = self.program.transition_to_priority_index[tid];
360 if self.ready_queue_size[pi] == self.ready_queues[pi].len() {
361 let old_cap = self.ready_queues[pi].len();
362 let new_cap = old_cap * 2;
363 let mut new_queue = vec![0usize; new_cap];
364 let head = self.ready_queue_head[pi];
365 for (i, slot) in new_queue.iter_mut().enumerate().take(old_cap) {
366 *slot = self.ready_queues[pi][(head + i) % old_cap];
367 }
368 self.ready_queues[pi] = new_queue;
369 self.ready_queue_head[pi] = 0;
370 self.ready_queue_tail[pi] = old_cap;
371 }
372 let tail = self.ready_queue_tail[pi];
373 self.ready_queues[pi][tail] = tid;
374 self.ready_queue_tail[pi] = (tail + 1) % self.ready_queues[pi].len();
375 self.ready_queue_size[pi] += 1;
376 }
377
378 fn ready_queue_pop(&mut self, pi: usize) -> usize {
379 let head = self.ready_queue_head[pi];
380 let tid = self.ready_queues[pi][head];
381 self.ready_queue_head[pi] = (head + 1) % self.ready_queues[pi].len();
382 self.ready_queue_size[pi] -= 1;
383 tid
384 }
385
386 fn clear_all_ready_queues(&mut self) {
387 for pi in 0..self.program.distinct_priority_count {
388 self.ready_queue_head[pi] = 0;
389 self.ready_queue_tail[pi] = 0;
390 self.ready_queue_size[pi] = 0;
391 }
392 }
393
394 fn initialize_marking_bitmap(&mut self) {
397 for pid in 0..self.program.place_count() {
398 if self.token_counts[pid] > 0 {
399 self.set_marking_bit(pid);
400 }
401 }
402 }
403
404 fn mark_all_dirty(&mut self) {
405 let tc = self.program.transition_count();
406 let last_word_bits = tc & bitmap::WORD_MASK;
407 for w in 0..self.transition_words.saturating_sub(1) {
408 self.dirty_bitmap[w] = u64::MAX;
409 }
410 if self.transition_words > 0 {
411 self.dirty_bitmap[self.transition_words - 1] = if last_word_bits == 0 {
412 u64::MAX
413 } else {
414 (1u64 << last_word_bits) - 1
415 };
416 }
417 for s in 0..self.summary_words {
419 let first_w = s << bitmap::WORD_SHIFT;
420 let last_w = (first_w + bitmap::WORD_MASK).min(self.transition_words.saturating_sub(1));
421 let count = last_w - first_w + 1;
422 let last_bits = count & bitmap::WORD_MASK;
423 self.dirty_word_summary[s] = if last_bits == 0 {
424 u64::MAX
425 } else {
426 (1u64 << last_bits) - 1
427 };
428 }
429 }
430
431 fn should_terminate(&self) -> bool {
432 if self.has_environment_places {
433 return false;
434 }
435 self.enabled_transition_count == 0
436 }
437
438 fn update_dirty_transitions(&mut self) {
441 let now_ms = self.elapsed_ms();
442
443 for s in 0..self.summary_words {
445 let mut summary = self.dirty_word_summary[s];
446 self.dirty_word_summary[s] = 0;
447 while summary != 0 {
448 let local_w = summary.trailing_zeros() as usize;
449 summary &= summary - 1;
450 let w = (s << bitmap::WORD_SHIFT) | local_w;
451 if w < self.transition_words {
452 self.dirty_scan_buffer[w] = self.dirty_bitmap[w];
453 self.dirty_bitmap[w] = 0;
454 }
455 }
456 }
457
458 let tc = self.program.transition_count();
460 for w in 0..self.transition_words {
461 let mut word = self.dirty_scan_buffer[w];
462 if word == 0 {
463 continue;
464 }
465 self.dirty_scan_buffer[w] = 0;
466 while word != 0 {
467 let bit = word.trailing_zeros() as usize;
468 let tid = (w << bitmap::WORD_SHIFT) | bit;
469 word &= word - 1;
470
471 if tid >= tc {
472 break;
473 }
474
475 let was_enabled = self.is_enabled(tid);
476 let can_now = self.can_enable(tid);
477
478 if can_now && !was_enabled {
479 self.set_enabled_bit(tid);
480 self.enabled_transition_count += 1;
481 self.enabled_at_ms[tid] = now_ms;
482
483 if E::ENABLED {
484 self.event_store.append(NetEvent::TransitionEnabled {
485 transition_name: Arc::clone(self.program.transition(tid).name_arc()),
486 timestamp: now_millis(),
487 });
488 }
489 } else if !can_now && was_enabled {
490 self.clear_enabled_bit(tid);
491 self.enabled_transition_count -= 1;
492 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
493 } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
494 self.enabled_at_ms[tid] = now_ms;
495 if E::ENABLED {
496 self.event_store.append(NetEvent::TransitionClockRestarted {
497 transition_name: Arc::clone(self.program.transition(tid).name_arc()),
498 timestamp: now_millis(),
499 });
500 }
501 }
502 }
503 }
504
505 self.clear_pending_resets();
506 }
507
508 fn can_enable(&self, tid: usize) -> bool {
509 if !self.program.can_enable_bitmap(tid, &self.marking_bitmap) {
510 return false;
511 }
512
513 if let Some(card_check) = self.program.compiled().cardinality_check(tid) {
515 for i in 0..card_check.place_ids.len() {
516 let pid = card_check.place_ids[i];
517 let required = card_check.required_counts[i];
518 if self.token_counts[pid] < required {
519 return false;
520 }
521 }
522 }
523
524 if self.program.compiled().has_guards(tid) {
526 let t = self.program.transition(tid);
527 for spec in t.input_specs() {
528 if let Some(guard) = spec.guard() {
529 let required = match spec {
530 In::One { .. } => 1,
531 In::Exactly { count, .. } => *count,
532 In::AtLeast { minimum, .. } => *minimum,
533 In::All { .. } => 1,
534 };
535 let pid = self.program.place_id(spec.place_name()).unwrap();
536 let count = self.count_matching_in_ring(pid, &**guard);
537 if count < required {
538 return false;
539 }
540 }
541 }
542 }
543
544 true
545 }
546
547 fn count_matching_in_ring(
548 &self,
549 pid: usize,
550 guard: &dyn Fn(&dyn std::any::Any) -> bool,
551 ) -> usize {
552 let count = self.token_counts[pid];
553 if count == 0 {
554 return 0;
555 }
556 let offset = self.place_offset[pid];
557 let head = self.ring_head[pid];
558 let cap = self.ring_capacity[pid];
559 let mut matched = 0;
560 for i in 0..count {
561 let idx = offset + (head + i) % cap;
562 if let Some(token) = &self.token_pool[idx]
563 && guard(token.value.as_ref())
564 {
565 matched += 1;
566 }
567 }
568 matched
569 }
570
571 fn has_input_from_reset_place(&self, tid: usize) -> bool {
572 if !self.has_pending_resets {
573 return false;
574 }
575 let input_mask = &self.program.input_place_mask_words[tid];
576 for (im, pr) in input_mask.iter().zip(self.pending_reset_words.iter()) {
577 if (im & pr) != 0 {
578 return true;
579 }
580 }
581 false
582 }
583
584 fn clear_pending_resets(&mut self) {
585 if self.has_pending_resets {
586 for w in &mut self.pending_reset_words {
587 *w = 0;
588 }
589 self.has_pending_resets = false;
590 }
591 }
592
593 fn enforce_deadlines(&mut self, now_ms: f64) {
596 for s in 0..self.summary_words {
597 let mut summary = self.enabled_word_summary[s];
598 while summary != 0 {
599 let local_w = summary.trailing_zeros() as usize;
600 summary &= summary - 1;
601 let w = (s << bitmap::WORD_SHIFT) | local_w;
602 if w >= self.transition_words {
603 continue;
604 }
605 let mut word = self.enabled_bitmap[w];
606 while word != 0 {
607 let bit = word.trailing_zeros() as usize;
608 let tid = (w << bitmap::WORD_SHIFT) | bit;
609 word &= word - 1;
610
611 if !self.program.has_deadline[tid] {
612 continue;
613 }
614
615 let elapsed = now_ms - self.enabled_at_ms[tid];
616 let latest_ms = self.program.latest_ms[tid];
617
618 if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
619 self.clear_enabled_bit(tid);
620 self.enabled_transition_count -= 1;
621 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
622 self.mark_transition_dirty(tid);
623
624 if E::ENABLED {
625 self.event_store.append(NetEvent::TransitionTimedOut {
626 transition_name: Arc::clone(
627 self.program.transition(tid).name_arc(),
628 ),
629 timestamp: now_millis(),
630 });
631 }
632 }
633 }
634 }
635 }
636 }
637
638 fn fire_ready_immediate_sync(&mut self) {
641 for s in 0..self.summary_words {
642 let mut summary = self.enabled_word_summary[s];
643 while summary != 0 {
644 let local_w = summary.trailing_zeros() as usize;
645 summary &= summary - 1;
646 let w = (s << bitmap::WORD_SHIFT) | local_w;
647 if w >= self.transition_words {
648 continue;
649 }
650 let word = self.enabled_bitmap[w];
651 let mut remaining = word;
652 while remaining != 0 {
653 let bit = remaining.trailing_zeros() as usize;
654 let tid = (w << bitmap::WORD_SHIFT) | bit;
655 remaining &= remaining - 1;
656
657 if self.can_enable(tid) {
658 self.fire_transition_sync(tid);
659 } else {
660 self.clear_enabled_bit(tid);
661 self.enabled_transition_count -= 1;
662 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
663 }
664 }
665 }
666 }
667 }
668
669 fn fire_ready_general_sync(&mut self, now_ms: f64) {
670 self.clear_all_ready_queues();
672
673 for s in 0..self.summary_words {
674 let mut summary = self.enabled_word_summary[s];
675 while summary != 0 {
676 let local_w = summary.trailing_zeros() as usize;
677 summary &= summary - 1;
678 let w = (s << bitmap::WORD_SHIFT) | local_w;
679 if w >= self.transition_words {
680 continue;
681 }
682 let mut word = self.enabled_bitmap[w];
683 while word != 0 {
684 let bit = word.trailing_zeros() as usize;
685 let tid = (w << bitmap::WORD_SHIFT) | bit;
686 word &= word - 1;
687
688 let enabled_ms = self.enabled_at_ms[tid];
689 let elapsed = now_ms - enabled_ms;
690
691 if self.program.earliest_ms[tid] <= elapsed {
692 self.ready_queue_push(tid);
693 }
694 }
695 }
696 }
697
698 for pi in 0..self.program.distinct_priority_count {
700 while self.ready_queue_size[pi] > 0 {
701 let tid = self.ready_queue_pop(pi);
702 if !self.is_enabled(tid) {
703 continue;
704 }
705
706 if self.can_enable(tid) {
707 self.fire_transition_sync(tid);
708 } else {
709 self.clear_enabled_bit(tid);
710 self.enabled_transition_count -= 1;
711 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
712 }
713 }
714 }
715 }
716
717 fn fire_transition_sync(&mut self, tid: usize) {
720 let has_guards = self.program.compiled().has_guards(tid);
721 let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
722 let action = Arc::clone(self.program.transition(tid).action());
723
724 self.reusable_inputs.clear();
726 self.reusable_reads.clear();
727
728 if has_guards {
729 let input_specs: Vec<In> = self.program.transition(tid).input_specs().to_vec();
731 let reset_arcs: Vec<_> = self.program.transition(tid).resets().to_vec();
732
733 for in_spec in &input_specs {
734 let pid = self.program.place_id(in_spec.place_name()).unwrap();
735 let place_name_arc = Arc::clone(&self.program.place_name_arcs[pid]);
736 let to_consume = match in_spec {
737 In::One { .. } => 1,
738 In::Exactly { count, .. } => *count,
739 In::All { guard, .. } | In::AtLeast { guard, .. } => {
740 if guard.is_some() {
741 self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
742 } else {
743 self.token_counts[pid]
744 }
745 }
746 };
747
748 for _ in 0..to_consume {
749 let token = if let Some(guard) = in_spec.guard() {
750 self.ring_remove_matching(pid, &**guard)
751 } else {
752 Some(self.ring_remove_first(pid))
753 };
754 if let Some(token) = token {
755 if E::ENABLED {
756 self.event_store.append(NetEvent::TokenRemoved {
757 place_name: Arc::clone(&place_name_arc),
758 timestamp: now_millis(),
759 });
760 }
761 self.reusable_inputs
762 .entry(Arc::clone(&place_name_arc))
763 .or_default()
764 .push(token);
765 }
766 }
767 }
768
769 for arc in &reset_arcs {
771 let pid = self.program.place_id(arc.place.name()).unwrap();
772 let removed = self.ring_remove_all(pid);
773 if E::ENABLED {
774 for _ in &removed {
775 self.event_store.append(NetEvent::TokenRemoved {
776 place_name: Arc::clone(arc.place.name_arc()),
777 timestamp: now_millis(),
778 });
779 }
780 }
781 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
782 1u64 << (pid & bitmap::WORD_MASK);
783 self.has_pending_resets = true;
784 }
785 } else {
786 let ops_len = self.program.consume_ops[tid].len();
788 let mut pc = 0;
789 while pc < ops_len {
790 let opcode = self.program.consume_ops[tid][pc];
791 pc += 1;
792 match opcode {
793 CONSUME_ONE => {
794 let pid = self.program.consume_ops[tid][pc] as usize;
795 pc += 1;
796 let token = self.ring_remove_first(pid);
797 if E::ENABLED {
798 self.event_store.append(NetEvent::TokenRemoved {
799 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
800 timestamp: now_millis(),
801 });
802 }
803 self.reusable_inputs
804 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
805 .or_default()
806 .push(token);
807 }
808 CONSUME_N => {
809 let pid = self.program.consume_ops[tid][pc] as usize;
810 pc += 1;
811 let count = self.program.consume_ops[tid][pc] as usize;
812 pc += 1;
813 for _ in 0..count {
814 let token = self.ring_remove_first(pid);
815 if E::ENABLED {
816 self.event_store.append(NetEvent::TokenRemoved {
817 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
818 timestamp: now_millis(),
819 });
820 }
821 self.reusable_inputs
822 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
823 .or_default()
824 .push(token);
825 }
826 }
827 CONSUME_ALL | CONSUME_ATLEAST => {
828 let pid = self.program.consume_ops[tid][pc] as usize;
829 pc += 1;
830 if opcode == CONSUME_ATLEAST {
831 pc += 1;
832 }
833 let count = self.token_counts[pid];
834 for _ in 0..count {
835 let token = self.ring_remove_first(pid);
836 if E::ENABLED {
837 self.event_store.append(NetEvent::TokenRemoved {
838 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
839 timestamp: now_millis(),
840 });
841 }
842 self.reusable_inputs
843 .entry(Arc::clone(&self.program.place_name_arcs[pid]))
844 .or_default()
845 .push(token);
846 }
847 }
848 RESET => {
849 let pid = self.program.consume_ops[tid][pc] as usize;
850 pc += 1;
851 let count = self.token_counts[pid];
852 for _ in 0..count {
853 let _token = self.ring_remove_first(pid);
854 if E::ENABLED {
855 self.event_store.append(NetEvent::TokenRemoved {
856 place_name: Arc::clone(&self.program.place_name_arcs[pid]),
857 timestamp: now_millis(),
858 });
859 }
860 }
861 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
862 1u64 << (pid & bitmap::WORD_MASK);
863 self.has_pending_resets = true;
864 }
865 _ => unreachable!("Unknown opcode: {opcode}"),
866 }
867 }
868 }
869
870 let read_ops_len = self.program.read_ops[tid].len();
872 for i in 0..read_ops_len {
873 let rpid = self.program.read_ops[tid][i];
874 let token_clone = self.ring_peek_first(rpid).cloned();
875 if let Some(token) = token_clone {
876 let place_name = Arc::clone(&self.program.place_name_arcs[rpid]);
877 self.reusable_reads
878 .entry(place_name)
879 .or_default()
880 .push(token);
881 }
882 }
883
884 self.update_bitmap_after_consumption(tid);
886
887 if E::ENABLED {
888 self.event_store.append(NetEvent::TransitionStarted {
889 transition_name: Arc::clone(&transition_name),
890 timestamp: now_millis(),
891 });
892 }
893
894 let inputs = std::mem::take(&mut self.reusable_inputs);
896 let reads = std::mem::take(&mut self.reusable_reads);
897 let mut ctx = TransitionContext::new(
898 Arc::clone(&transition_name),
899 inputs,
900 reads,
901 self.program.output_place_name_sets[tid].clone(),
902 None,
903 );
904
905 let result = action.run_sync(&mut ctx);
906
907 let returned_inputs = ctx.take_inputs();
909 let returned_reads = ctx.take_reads();
910
911 match result {
912 Ok(()) => {
913 let outputs = ctx.take_outputs();
914 self.process_outputs(tid, &transition_name, outputs);
915
916 if E::ENABLED {
917 self.event_store.append(NetEvent::TransitionCompleted {
918 transition_name: Arc::clone(&transition_name),
919 timestamp: now_millis(),
920 });
921 }
922 }
923 Err(err) => {
924 if E::ENABLED {
925 self.event_store.append(NetEvent::TransitionFailed {
926 transition_name: Arc::clone(&transition_name),
927 error: err.message,
928 timestamp: now_millis(),
929 });
930 }
931 }
932 }
933
934 self.reusable_inputs = returned_inputs;
936 self.reusable_reads = returned_reads;
937
938 self.clear_enabled_bit(tid);
940 self.enabled_transition_count -= 1;
941 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
942
943 self.mark_transition_dirty(tid);
945 }
946
947 fn ring_remove_matching(
950 &mut self,
951 pid: usize,
952 guard: &dyn Fn(&dyn std::any::Any) -> bool,
953 ) -> Option<ErasedToken> {
954 let count = self.token_counts[pid];
955 if count == 0 {
956 return None;
957 }
958 let offset = self.place_offset[pid];
959 let head = self.ring_head[pid];
960 let cap = self.ring_capacity[pid];
961
962 for i in 0..count {
964 let idx = offset + (head + i) % cap;
965 if let Some(token) = &self.token_pool[idx]
966 && guard(token.value.as_ref())
967 {
968 let token = self.token_pool[idx].take().unwrap();
969 for j in i..count - 1 {
971 let from = offset + (head + j + 1) % cap;
972 let to = offset + (head + j) % cap;
973 self.token_pool[to] = self.token_pool[from].take();
974 }
975 self.token_counts[pid] -= 1;
976 self.ring_tail[pid] = if self.ring_tail[pid] == 0 {
977 cap - 1
978 } else {
979 self.ring_tail[pid] - 1
980 };
981 return Some(token);
982 }
983 }
984 None
985 }
986
987 fn process_outputs(
988 &mut self,
989 _tid: usize,
990 _transition_name: &Arc<str>,
991 outputs: Vec<OutputEntry>,
992 ) {
993 for entry in outputs {
994 if let Some(pid) = self.program.place_id(&entry.place_name) {
995 self.ring_add_last(pid, entry.token);
996 self.set_marking_bit(pid);
997 self.mark_dirty(pid);
998 }
999
1000 if E::ENABLED {
1001 self.event_store.append(NetEvent::TokenAdded {
1002 place_name: Arc::clone(&entry.place_name),
1003 timestamp: now_millis(),
1004 });
1005 }
1006 }
1007 }
1008
1009 fn update_bitmap_after_consumption(&mut self, tid: usize) {
1010 let n = self.program.compiled().consumption_place_ids(tid).len();
1011 for i in 0..n {
1012 let pid = self.program.compiled().consumption_place_ids(tid)[i];
1013 if self.token_counts[pid] == 0 {
1014 self.clear_marking_bit(pid);
1015 }
1016 self.mark_dirty(pid);
1017 }
1018 }
1019
1020 fn has_dirty_bits(&self) -> bool {
1023 for &s in &self.dirty_word_summary {
1024 if s != 0 {
1025 return true;
1026 }
1027 }
1028 false
1029 }
1030
1031 fn mark_dirty(&mut self, pid: usize) {
1032 let n = self.program.compiled().affected_transitions(pid).len();
1033 for i in 0..n {
1034 let tid = self.program.compiled().affected_transitions(pid)[i];
1035 self.mark_transition_dirty(tid);
1036 }
1037 }
1038
1039 fn mark_transition_dirty(&mut self, tid: usize) {
1040 let w = tid >> bitmap::WORD_SHIFT;
1041 self.dirty_bitmap[w] |= 1u64 << (tid & bitmap::WORD_MASK);
1042 self.dirty_word_summary[w >> bitmap::WORD_SHIFT] |= 1u64 << (w & bitmap::WORD_MASK);
1043 }
1044
1045 fn elapsed_ms(&self) -> f64 {
1046 self.start_time.elapsed().as_secs_f64() * 1000.0
1047 }
1048
1049 fn materialize_marking(&self) -> Marking {
1052 let mut marking = Marking::new();
1053 for pid in 0..self.program.place_count() {
1054 let count = self.token_counts[pid];
1055 if count == 0 {
1056 continue;
1057 }
1058 let place_name = self.program.place(pid).name_arc();
1059 let offset = self.place_offset[pid];
1060 let head = self.ring_head[pid];
1061 let cap = self.ring_capacity[pid];
1062 for i in 0..count {
1063 let idx = offset + (head + i) % cap;
1064 if let Some(token) = &self.token_pool[idx] {
1065 marking.add_erased(place_name, token.clone());
1066 }
1067 }
1068 }
1069 marking
1070 }
1071
1072 fn run_to_completion(&mut self) -> Marking {
1074 self.initialize_marking_bitmap();
1075 self.mark_all_dirty();
1076
1077 if E::ENABLED {
1078 let now = now_millis();
1079 self.event_store.append(NetEvent::ExecutionStarted {
1080 net_name: Arc::from(self.program.net().name()),
1081 timestamp: now,
1082 });
1083 }
1084
1085 loop {
1086 self.update_dirty_transitions();
1087
1088 let cycle_now = self.elapsed_ms();
1089
1090 if self.program.any_deadlines {
1091 self.enforce_deadlines(cycle_now);
1092 }
1093
1094 if self.should_terminate() {
1095 break;
1096 }
1097
1098 if self.program.all_immediate && self.program.all_same_priority {
1099 self.fire_ready_immediate_sync();
1100 } else {
1101 self.fire_ready_general_sync(cycle_now);
1102 }
1103
1104 if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
1105 break;
1106 }
1107 }
1108
1109 if E::ENABLED {
1110 let now = now_millis();
1111 self.event_store.append(NetEvent::ExecutionCompleted {
1112 net_name: Arc::from(self.program.net().name()),
1113 timestamp: now,
1114 });
1115 }
1116
1117 self.materialize_marking()
1118 }
1119}
1120
1121#[cfg(feature = "tokio")]
1123use crate::environment::ExecutorSignal;
1124
1125#[cfg(feature = "tokio")]
1126struct ActionCompletion {
1127 transition_name: Arc<str>,
1128 result: Result<Vec<OutputEntry>, String>,
1129}
1130
1131#[cfg(feature = "tokio")]
1132impl<'a, E: EventStore> PrecompiledNetExecutor<'a, E> {
1133 pub async fn run_async(
1141 &mut self,
1142 mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
1143 ) -> Marking {
1144 let (completion_tx, mut completion_rx) =
1145 tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
1146
1147 self.initialize_marking_bitmap();
1148 self.mark_all_dirty();
1149
1150 let mut in_flight_count: usize = 0;
1151 let mut signal_channel_open = true;
1152 let mut draining = false;
1153 let mut closed = false;
1154
1155 if E::ENABLED {
1156 let now = now_millis();
1157 self.event_store.append(NetEvent::ExecutionStarted {
1158 net_name: Arc::from(self.program.net().name()),
1159 timestamp: now,
1160 });
1161 }
1162
1163 loop {
1164 while let Ok(completion) = completion_rx.try_recv() {
1166 in_flight_count -= 1;
1167 match completion.result {
1168 Ok(outputs) => {
1169 self.process_outputs(0, &completion.transition_name, outputs);
1170 if E::ENABLED {
1171 self.event_store.append(NetEvent::TransitionCompleted {
1172 transition_name: Arc::clone(&completion.transition_name),
1173 timestamp: now_millis(),
1174 });
1175 }
1176 }
1177 Err(err) => {
1178 if E::ENABLED {
1179 self.event_store.append(NetEvent::TransitionFailed {
1180 transition_name: Arc::clone(&completion.transition_name),
1181 error: err,
1182 timestamp: now_millis(),
1183 });
1184 }
1185 }
1186 }
1187 }
1188
1189 while let Ok(signal) = signal_rx.try_recv() {
1191 match signal {
1192 ExecutorSignal::Event(event) if !draining => {
1193 if let Some(pid) = self.program.place_id(&event.place_name) {
1194 self.ring_add_last(pid, event.token);
1195 self.set_marking_bit(pid);
1196 self.mark_dirty(pid);
1197 }
1198 if E::ENABLED {
1199 self.event_store.append(NetEvent::TokenAdded {
1200 place_name: Arc::clone(&event.place_name),
1201 timestamp: now_millis(),
1202 });
1203 }
1204 }
1205 ExecutorSignal::Event(_) => {
1206 }
1208 ExecutorSignal::Drain => {
1209 draining = true;
1210 }
1211 ExecutorSignal::Close => {
1212 closed = true;
1213 draining = true;
1214 while signal_rx.try_recv().is_ok() {}
1219 }
1220 }
1221 }
1222
1223 self.update_dirty_transitions();
1225
1226 let cycle_now = self.elapsed_ms();
1228 if self.program.any_deadlines {
1229 self.enforce_deadlines(cycle_now);
1230 }
1231
1232 if closed && in_flight_count == 0 {
1234 break; }
1236 if draining
1237 && self.enabled_transition_count == 0
1238 && in_flight_count == 0
1239 {
1240 break; }
1242 if self.enabled_transition_count == 0
1243 && in_flight_count == 0
1244 && (!self.has_environment_places || !signal_channel_open)
1245 {
1246 break; }
1248
1249 let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
1251
1252 if fired || self.has_dirty_bits() {
1253 tokio::task::yield_now().await;
1254 continue;
1255 }
1256
1257 if in_flight_count == 0 && !self.has_environment_places {
1259 break;
1260 }
1261 if in_flight_count == 0 && (draining || !signal_channel_open) {
1262 break;
1263 }
1264
1265 let timer_ms = self.millis_until_next_timed_transition();
1266
1267 tokio::select! {
1268 Some(completion) = completion_rx.recv() => {
1269 in_flight_count -= 1;
1270 match completion.result {
1271 Ok(outputs) => {
1272 self.process_outputs(0, &completion.transition_name, outputs);
1273 if E::ENABLED {
1274 self.event_store.append(NetEvent::TransitionCompleted {
1275 transition_name: Arc::clone(&completion.transition_name),
1276 timestamp: now_millis(),
1277 });
1278 }
1279 }
1280 Err(err) => {
1281 if E::ENABLED {
1282 self.event_store.append(NetEvent::TransitionFailed {
1283 transition_name: Arc::clone(&completion.transition_name),
1284 error: err,
1285 timestamp: now_millis(),
1286 });
1287 }
1288 }
1289 }
1290 }
1291 result = signal_rx.recv(), if signal_channel_open && !closed => {
1292 match result {
1293 Some(ExecutorSignal::Event(event)) if !draining => {
1294 if let Some(pid) = self.program.place_id(&event.place_name) {
1295 self.ring_add_last(pid, event.token);
1296 self.set_marking_bit(pid);
1297 self.mark_dirty(pid);
1298 }
1299 if E::ENABLED {
1300 self.event_store.append(NetEvent::TokenAdded {
1301 place_name: Arc::clone(&event.place_name),
1302 timestamp: now_millis(),
1303 });
1304 }
1305 }
1306 Some(ExecutorSignal::Event(_)) => {
1307 }
1309 Some(ExecutorSignal::Drain) => {
1310 draining = true;
1311 }
1312 Some(ExecutorSignal::Close) => {
1313 closed = true;
1314 draining = true;
1315 while signal_rx.try_recv().is_ok() {}
1316 }
1317 None => {
1318 signal_channel_open = false;
1319 }
1320 }
1321 }
1322 _ = tokio::time::sleep(std::time::Duration::from_millis(
1323 if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
1324 )) => {}
1325 }
1326 }
1327
1328 if E::ENABLED {
1329 let now = now_millis();
1330 self.event_store.append(NetEvent::ExecutionCompleted {
1331 net_name: Arc::from(self.program.net().name()),
1332 timestamp: now,
1333 });
1334 }
1335
1336 self.materialize_marking()
1337 }
1338
1339 fn fire_ready_async(
1340 &mut self,
1341 now_ms: f64,
1342 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1343 in_flight_count: &mut usize,
1344 ) -> bool {
1345 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
1346
1347 for s in 0..self.summary_words {
1348 let mut summary = self.enabled_word_summary[s];
1349 while summary != 0 {
1350 let local_w = summary.trailing_zeros() as usize;
1351 summary &= summary - 1;
1352 let w = (s << bitmap::WORD_SHIFT) | local_w;
1353 if w >= self.transition_words {
1354 continue;
1355 }
1356 let mut word = self.enabled_bitmap[w];
1357 while word != 0 {
1358 let bit = word.trailing_zeros() as usize;
1359 let tid = (w << bitmap::WORD_SHIFT) | bit;
1360 word &= word - 1;
1361
1362 let enabled_ms = self.enabled_at_ms[tid];
1363 let elapsed = now_ms - enabled_ms;
1364 if self.program.earliest_ms[tid] <= elapsed {
1365 ready.push((tid, self.program.priorities[tid], enabled_ms));
1366 }
1367 }
1368 }
1369 }
1370
1371 if ready.is_empty() {
1372 return false;
1373 }
1374
1375 ready.sort_by(|a, b| {
1376 b.1.cmp(&a.1)
1377 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
1378 });
1379
1380 let mut fired_any = false;
1381 for (tid, _, _) in ready {
1382 if self.is_enabled(tid) && self.can_enable(tid) {
1383 self.fire_transition_async(tid, completion_tx, in_flight_count);
1384 fired_any = true;
1385 } else if self.is_enabled(tid) {
1386 self.clear_enabled_bit(tid);
1387 self.enabled_transition_count -= 1;
1388 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1389 }
1390 }
1391 fired_any
1392 }
1393
1394 fn fire_transition_async(
1395 &mut self,
1396 tid: usize,
1397 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
1398 in_flight_count: &mut usize,
1399 ) {
1400 let t = self.program.transition(tid);
1401 let transition_name = Arc::clone(&self.program.transition_name_arcs[tid]);
1402 let input_specs: Vec<In> = t.input_specs().to_vec();
1403 let read_arcs: Vec<_> = t.reads().to_vec();
1404 let reset_arcs: Vec<_> = t.resets().to_vec();
1405 let output_place_names = self.program.output_place_name_sets[tid].clone();
1406 let action = Arc::clone(t.action());
1407 let is_sync = action.is_sync();
1408
1409 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1411 for in_spec in &input_specs {
1412 let pid = self.program.place_id(in_spec.place_name()).unwrap();
1413 let to_consume = match in_spec {
1414 In::One { .. } => 1,
1415 In::Exactly { count, .. } => *count,
1416 In::All { guard, .. } | In::AtLeast { guard, .. } => {
1417 if guard.is_some() {
1418 self.count_matching_in_ring(pid, &**guard.as_ref().unwrap())
1419 } else {
1420 self.token_counts[pid]
1421 }
1422 }
1423 };
1424
1425 let place_name_arc = Arc::clone(in_spec.place().name_arc());
1426 for _ in 0..to_consume {
1427 let token = if let Some(guard) = in_spec.guard() {
1428 self.ring_remove_matching(pid, &**guard)
1429 } else {
1430 Some(self.ring_remove_first(pid))
1431 };
1432 if let Some(token) = token {
1433 if E::ENABLED {
1434 self.event_store.append(NetEvent::TokenRemoved {
1435 place_name: Arc::clone(&place_name_arc),
1436 timestamp: now_millis(),
1437 });
1438 }
1439 inputs
1440 .entry(Arc::clone(&place_name_arc))
1441 .or_default()
1442 .push(token);
1443 }
1444 }
1445 }
1446
1447 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
1449 for arc in &read_arcs {
1450 let rpid = self.program.place_id(arc.place.name()).unwrap();
1451 if let Some(token) = self.ring_peek_first(rpid) {
1452 read_tokens
1453 .entry(Arc::clone(arc.place.name_arc()))
1454 .or_default()
1455 .push(token.clone());
1456 }
1457 }
1458
1459 for arc in &reset_arcs {
1461 let pid = self.program.place_id(arc.place.name()).unwrap();
1462 let removed = self.ring_remove_all(pid);
1463 if E::ENABLED {
1464 for _ in &removed {
1465 self.event_store.append(NetEvent::TokenRemoved {
1466 place_name: Arc::clone(arc.place.name_arc()),
1467 timestamp: now_millis(),
1468 });
1469 }
1470 }
1471 self.pending_reset_words[pid >> bitmap::WORD_SHIFT] |=
1472 1u64 << (pid & bitmap::WORD_MASK);
1473 self.has_pending_resets = true;
1474 }
1475
1476 self.update_bitmap_after_consumption(tid);
1477
1478 if E::ENABLED {
1479 self.event_store.append(NetEvent::TransitionStarted {
1480 transition_name: Arc::clone(&transition_name),
1481 timestamp: now_millis(),
1482 });
1483 }
1484
1485 self.clear_enabled_bit(tid);
1487 self.enabled_transition_count -= 1;
1488 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
1489 self.mark_transition_dirty(tid);
1490
1491 if is_sync {
1492 let mut ctx = TransitionContext::new(
1493 Arc::clone(&transition_name),
1494 inputs,
1495 read_tokens,
1496 output_place_names,
1497 None,
1498 );
1499 let result = action.run_sync(&mut ctx);
1500 match result {
1501 Ok(()) => {
1502 let outputs = ctx.take_outputs();
1503 self.process_outputs(tid, &transition_name, outputs);
1504 if E::ENABLED {
1505 self.event_store.append(NetEvent::TransitionCompleted {
1506 transition_name: Arc::clone(&transition_name),
1507 timestamp: now_millis(),
1508 });
1509 }
1510 }
1511 Err(err) => {
1512 if E::ENABLED {
1513 self.event_store.append(NetEvent::TransitionFailed {
1514 transition_name: Arc::clone(&transition_name),
1515 error: err.message,
1516 timestamp: now_millis(),
1517 });
1518 }
1519 }
1520 }
1521 } else {
1522 *in_flight_count += 1;
1523 let tx = completion_tx.clone();
1524 let name = Arc::clone(&transition_name);
1525 let ctx = TransitionContext::new(
1526 Arc::clone(&transition_name),
1527 inputs,
1528 read_tokens,
1529 output_place_names,
1530 None,
1531 );
1532 tokio::spawn(async move {
1533 let result = action.run_async(ctx).await;
1534 let completion = match result {
1535 Ok(mut completed_ctx) => ActionCompletion {
1536 transition_name: Arc::clone(&name),
1537 result: Ok(completed_ctx.take_outputs()),
1538 },
1539 Err(err) => ActionCompletion {
1540 transition_name: Arc::clone(&name),
1541 result: Err(err.message),
1542 },
1543 };
1544 let _ = tx.send(completion);
1545 });
1546 }
1547 }
1548
1549 fn millis_until_next_timed_transition(&self) -> f64 {
1550 let mut min_wait = f64::INFINITY;
1551 let now_ms = self.elapsed_ms();
1552
1553 for s in 0..self.summary_words {
1554 let mut summary = self.enabled_word_summary[s];
1555 while summary != 0 {
1556 let local_w = summary.trailing_zeros() as usize;
1557 summary &= summary - 1;
1558 let w = (s << bitmap::WORD_SHIFT) | local_w;
1559 if w >= self.transition_words {
1560 continue;
1561 }
1562 let mut word = self.enabled_bitmap[w];
1563 while word != 0 {
1564 let bit = word.trailing_zeros() as usize;
1565 let tid = (w << bitmap::WORD_SHIFT) | bit;
1566 word &= word - 1;
1567
1568 let elapsed = now_ms - self.enabled_at_ms[tid];
1569 let remaining_earliest = self.program.earliest_ms[tid] - elapsed;
1570 if remaining_earliest <= 0.0 {
1571 return 0.0;
1572 }
1573 min_wait = min_wait.min(remaining_earliest);
1574
1575 if self.program.has_deadline[tid] {
1576 let remaining_deadline = self.program.latest_ms[tid] - elapsed;
1577 if remaining_deadline <= 0.0 {
1578 return 0.0;
1579 }
1580 min_wait = min_wait.min(remaining_deadline);
1581 }
1582 }
1583 }
1584 }
1585
1586 min_wait
1587 }
1588}
1589
1590fn grow_ring_static(
1593 token_pool: &mut Vec<Option<ErasedToken>>,
1594 place_offset: &mut [usize],
1595 ring_head: &mut [usize],
1596 ring_tail: &mut [usize],
1597 ring_capacity: &mut [usize],
1598 token_counts: &[usize],
1599 pid: usize,
1600) {
1601 let old_cap = ring_capacity[pid];
1602 let new_cap = old_cap * 2;
1603 let old_offset = place_offset[pid];
1604 let head = ring_head[pid];
1605 let count = token_counts[pid];
1606
1607 let new_offset = token_pool.len();
1609 token_pool.resize_with(new_offset + new_cap, || None);
1610
1611 for i in 0..count {
1613 let old_idx = old_offset + (head + i) % old_cap;
1614 token_pool[new_offset + i] = token_pool[old_idx].take();
1615 }
1616
1617 place_offset[pid] = new_offset;
1618 ring_head[pid] = 0;
1619 ring_tail[pid] = count;
1620 ring_capacity[pid] = new_cap;
1621}
1622
1623fn now_millis() -> u64 {
1624 std::time::SystemTime::now()
1625 .duration_since(std::time::UNIX_EPOCH)
1626 .unwrap_or_default()
1627 .as_millis() as u64
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632 use super::*;
1633 use crate::compiled_net::CompiledNet;
1634 use libpetri_core::action::{fork, passthrough, sync_action};
1635 use libpetri_core::input::one;
1636 use libpetri_core::output::out_place;
1637 use libpetri_core::petri_net::PetriNet;
1638 use libpetri_core::place::Place;
1639 use libpetri_core::token::Token;
1640 use libpetri_core::transition::Transition;
1641 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1642
1643 fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1644 let p1 = Place::<i32>::new("p1");
1645 let p2 = Place::<i32>::new("p2");
1646 let p3 = Place::<i32>::new("p3");
1647
1648 let t1 = Transition::builder("t1")
1649 .input(one(&p1))
1650 .output(out_place(&p2))
1651 .action(passthrough())
1652 .build();
1653 let t2 = Transition::builder("t2")
1654 .input(one(&p2))
1655 .output(out_place(&p3))
1656 .action(passthrough())
1657 .build();
1658
1659 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1660 (net, p1, p2, p3)
1661 }
1662
1663 #[test]
1664 fn sync_passthrough_chain() {
1665 let (net, p1, _p2, _p3) = simple_chain();
1666 let compiled = CompiledNet::compile(&net);
1667 let prog = PrecompiledNet::from_compiled(&compiled);
1668
1669 let mut marking = Marking::new();
1670 marking.add(&p1, Token::at(42, 0));
1671
1672 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1673 let result = executor.run_to_completion();
1674
1675 assert_eq!(result.count("p1"), 0);
1676 }
1677
1678 #[test]
1679 fn sync_fork_chain() {
1680 let p1 = Place::<i32>::new("p1");
1681 let p2 = Place::<i32>::new("p2");
1682 let p3 = Place::<i32>::new("p3");
1683
1684 let t1 = Transition::builder("t1")
1685 .input(one(&p1))
1686 .output(libpetri_core::output::and(vec![
1687 out_place(&p2),
1688 out_place(&p3),
1689 ]))
1690 .action(fork())
1691 .build();
1692
1693 let net = PetriNet::builder("fork").transition(t1).build();
1694 let compiled = CompiledNet::compile(&net);
1695 let prog = PrecompiledNet::from_compiled(&compiled);
1696
1697 let mut marking = Marking::new();
1698 marking.add(&p1, Token::at(42, 0));
1699
1700 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1701 let result = executor.run_to_completion();
1702
1703 assert_eq!(result.count("p1"), 0);
1704 assert_eq!(result.count("p2"), 1);
1705 assert_eq!(result.count("p3"), 1);
1706 }
1707
1708 #[test]
1709 fn sync_linear_chain_5() {
1710 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1711 let transitions: Vec<Transition> = (0..5)
1712 .map(|i| {
1713 Transition::builder(format!("t{i}"))
1714 .input(one(&places[i]))
1715 .output(out_place(&places[i + 1]))
1716 .action(fork())
1717 .build()
1718 })
1719 .collect();
1720
1721 let net = PetriNet::builder("chain5").transitions(transitions).build();
1722 let compiled = CompiledNet::compile(&net);
1723 let prog = PrecompiledNet::from_compiled(&compiled);
1724
1725 let mut marking = Marking::new();
1726 marking.add(&places[0], Token::at(1, 0));
1727
1728 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1729 let result = executor.run_to_completion();
1730
1731 assert_eq!(result.count("p0"), 0);
1732 assert_eq!(result.count("p5"), 1);
1733 }
1734
1735 #[test]
1736 fn sync_no_initial_tokens() {
1737 let (net, _, _, _) = simple_chain();
1738 let compiled = CompiledNet::compile(&net);
1739 let prog = PrecompiledNet::from_compiled(&compiled);
1740 let marking = Marking::new();
1741 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1742 let result = executor.run_to_completion();
1743 assert_eq!(result.count("p1"), 0);
1744 assert_eq!(result.count("p2"), 0);
1745 assert_eq!(result.count("p3"), 0);
1746 }
1747
1748 #[test]
1749 fn sync_priority_ordering() {
1750 let p = Place::<()>::new("p");
1751 let out_a = Place::<()>::new("a");
1752 let out_b = Place::<()>::new("b");
1753
1754 let t_high = Transition::builder("t_high")
1755 .input(one(&p))
1756 .output(out_place(&out_a))
1757 .action(passthrough())
1758 .priority(10)
1759 .build();
1760 let t_low = Transition::builder("t_low")
1761 .input(one(&p))
1762 .output(out_place(&out_b))
1763 .action(passthrough())
1764 .priority(1)
1765 .build();
1766
1767 let net = PetriNet::builder("priority")
1768 .transitions([t_high, t_low])
1769 .build();
1770 let compiled = CompiledNet::compile(&net);
1771 let prog = PrecompiledNet::from_compiled(&compiled);
1772
1773 let mut marking = Marking::new();
1774 marking.add(&p, Token::at((), 0));
1775
1776 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1777 let result = executor.run_to_completion();
1778
1779 assert_eq!(result.count("p"), 0);
1780 }
1781
1782 #[test]
1783 fn sync_inhibitor_blocks() {
1784 let p1 = Place::<()>::new("p1");
1785 let p2 = Place::<()>::new("p2");
1786 let p_inh = Place::<()>::new("inh");
1787
1788 let t = Transition::builder("t1")
1789 .input(one(&p1))
1790 .output(out_place(&p2))
1791 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1792 .action(passthrough())
1793 .build();
1794
1795 let net = PetriNet::builder("inhibitor").transition(t).build();
1796 let compiled = CompiledNet::compile(&net);
1797 let prog = PrecompiledNet::from_compiled(&compiled);
1798
1799 let mut marking = Marking::new();
1800 marking.add(&p1, Token::at((), 0));
1801 marking.add(&p_inh, Token::at((), 0));
1802
1803 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1804 let result = executor.run_to_completion();
1805
1806 assert_eq!(result.count("p1"), 1);
1807 }
1808
1809 #[test]
1810 fn read_arc_does_not_consume() {
1811 let p_in = Place::<i32>::new("in");
1812 let p_ctx = Place::<i32>::new("ctx");
1813 let p_out = Place::<i32>::new("out");
1814
1815 let t = Transition::builder("t1")
1816 .input(one(&p_in))
1817 .read(libpetri_core::arc::read(&p_ctx))
1818 .output(out_place(&p_out))
1819 .action(sync_action(|ctx| {
1820 let v = ctx.input::<i32>("in")?;
1821 let r = ctx.read::<i32>("ctx")?;
1822 ctx.output("out", *v + *r)?;
1823 Ok(())
1824 }))
1825 .build();
1826 let net = PetriNet::builder("test").transition(t).build();
1827 let compiled = CompiledNet::compile(&net);
1828 let prog = PrecompiledNet::from_compiled(&compiled);
1829
1830 let mut marking = Marking::new();
1831 marking.add(&p_in, Token::at(10, 0));
1832 marking.add(&p_ctx, Token::at(5, 0));
1833
1834 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1835 let result = executor.run_to_completion();
1836
1837 assert_eq!(result.count("in"), 0);
1838 assert_eq!(result.count("ctx"), 1);
1839 assert_eq!(result.count("out"), 1);
1840 }
1841
1842 #[test]
1843 fn reset_arc_removes_all_tokens() {
1844 let p_in = Place::<()>::new("in");
1845 let p_reset = Place::<i32>::new("reset");
1846 let p_out = Place::<()>::new("out");
1847
1848 let t = Transition::builder("t1")
1849 .input(one(&p_in))
1850 .reset(libpetri_core::arc::reset(&p_reset))
1851 .output(out_place(&p_out))
1852 .action(fork())
1853 .build();
1854 let net = PetriNet::builder("test").transition(t).build();
1855 let compiled = CompiledNet::compile(&net);
1856 let prog = PrecompiledNet::from_compiled(&compiled);
1857
1858 let mut marking = Marking::new();
1859 marking.add(&p_in, Token::at((), 0));
1860 marking.add(&p_reset, Token::at(1, 0));
1861 marking.add(&p_reset, Token::at(2, 0));
1862 marking.add(&p_reset, Token::at(3, 0));
1863
1864 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1865 let result = executor.run_to_completion();
1866
1867 assert_eq!(result.count("reset"), 0);
1868 assert_eq!(result.count("out"), 1);
1869 }
1870
1871 #[test]
1872 fn exactly_cardinality_consumes_n() {
1873 let p = Place::<i32>::new("p");
1874 let p_out = Place::<i32>::new("out");
1875
1876 let t = Transition::builder("t1")
1877 .input(libpetri_core::input::exactly(3, &p))
1878 .output(out_place(&p_out))
1879 .action(sync_action(|ctx| {
1880 let vals = ctx.inputs::<i32>("p")?;
1881 for v in vals {
1882 ctx.output("out", *v)?;
1883 }
1884 Ok(())
1885 }))
1886 .build();
1887 let net = PetriNet::builder("test").transition(t).build();
1888 let compiled = CompiledNet::compile(&net);
1889 let prog = PrecompiledNet::from_compiled(&compiled);
1890
1891 let mut marking = Marking::new();
1892 for i in 0..5 {
1893 marking.add(&p, Token::at(i, 0));
1894 }
1895
1896 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1897 let result = executor.run_to_completion();
1898
1899 assert_eq!(result.count("p"), 2);
1900 assert_eq!(result.count("out"), 3);
1901 }
1902
1903 #[test]
1904 fn all_cardinality_consumes_everything() {
1905 let p = Place::<i32>::new("p");
1906 let p_out = Place::<()>::new("out");
1907
1908 let t = Transition::builder("t1")
1909 .input(libpetri_core::input::all(&p))
1910 .output(out_place(&p_out))
1911 .action(sync_action(|ctx| {
1912 let vals = ctx.inputs::<i32>("p")?;
1913 ctx.output("out", vals.len() as i32)?;
1914 Ok(())
1915 }))
1916 .build();
1917 let net = PetriNet::builder("test").transition(t).build();
1918 let compiled = CompiledNet::compile(&net);
1919 let prog = PrecompiledNet::from_compiled(&compiled);
1920
1921 let mut marking = Marking::new();
1922 for i in 0..5 {
1923 marking.add(&p, Token::at(i, 0));
1924 }
1925
1926 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1927 let result = executor.run_to_completion();
1928
1929 assert_eq!(result.count("p"), 0);
1930 }
1931
1932 #[test]
1933 fn at_least_blocks_insufficient() {
1934 let p = Place::<i32>::new("p");
1935 let p_out = Place::<()>::new("out");
1936
1937 let t = Transition::builder("t1")
1938 .input(libpetri_core::input::at_least(3, &p))
1939 .output(out_place(&p_out))
1940 .action(passthrough())
1941 .build();
1942 let net = PetriNet::builder("test").transition(t).build();
1943 let compiled = CompiledNet::compile(&net);
1944 let prog = PrecompiledNet::from_compiled(&compiled);
1945
1946 let mut marking = Marking::new();
1947 marking.add(&p, Token::at(1, 0));
1948 marking.add(&p, Token::at(2, 0));
1949
1950 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1951 let result = executor.run_to_completion();
1952
1953 assert_eq!(result.count("p"), 2);
1954 }
1955
1956 #[test]
1957 fn at_least_fires_with_enough() {
1958 let p = Place::<i32>::new("p");
1959 let p_out = Place::<()>::new("out");
1960
1961 let t = Transition::builder("t1")
1962 .input(libpetri_core::input::at_least(3, &p))
1963 .output(out_place(&p_out))
1964 .action(passthrough())
1965 .build();
1966 let net = PetriNet::builder("test").transition(t).build();
1967 let compiled = CompiledNet::compile(&net);
1968 let prog = PrecompiledNet::from_compiled(&compiled);
1969
1970 let mut marking = Marking::new();
1971 for i in 0..5 {
1972 marking.add(&p, Token::at(i, 0));
1973 }
1974
1975 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
1976 let result = executor.run_to_completion();
1977
1978 assert_eq!(result.count("p"), 0);
1979 }
1980
1981 #[test]
1982 fn guarded_input_only_consumes_matching() {
1983 let p = Place::<i32>::new("p");
1984 let p_out = Place::<i32>::new("out");
1985
1986 let t = Transition::builder("t1")
1987 .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1988 .output(out_place(&p_out))
1989 .action(fork())
1990 .build();
1991 let net = PetriNet::builder("test").transition(t).build();
1992 let compiled = CompiledNet::compile(&net);
1993 let prog = PrecompiledNet::from_compiled(&compiled);
1994
1995 let mut marking = Marking::new();
1996 marking.add(&p, Token::at(3, 0));
1997 marking.add(&p, Token::at(10, 0));
1998
1999 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2000 let result = executor.run_to_completion();
2001
2002 assert_eq!(result.count("p"), 1);
2003 assert_eq!(result.count("out"), 1);
2004 }
2005
2006 #[test]
2007 fn guarded_input_blocks_when_no_match() {
2008 let p = Place::<i32>::new("p");
2009 let p_out = Place::<i32>::new("out");
2010
2011 let t = Transition::builder("t1")
2012 .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
2013 .output(out_place(&p_out))
2014 .action(fork())
2015 .build();
2016 let net = PetriNet::builder("test").transition(t).build();
2017 let compiled = CompiledNet::compile(&net);
2018 let prog = PrecompiledNet::from_compiled(&compiled);
2019
2020 let mut marking = Marking::new();
2021 marking.add(&p, Token::at(3, 0));
2022 marking.add(&p, Token::at(10, 0));
2023
2024 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2025 let result = executor.run_to_completion();
2026
2027 assert_eq!(result.count("p"), 2);
2028 assert_eq!(result.count("out"), 0);
2029 }
2030
2031 #[test]
2032 fn event_store_records_lifecycle() {
2033 let p1 = Place::<i32>::new("p1");
2034 let p2 = Place::<i32>::new("p2");
2035 let t = Transition::builder("t1")
2036 .input(one(&p1))
2037 .output(out_place(&p2))
2038 .action(fork())
2039 .build();
2040 let net = PetriNet::builder("test").transition(t).build();
2041 let compiled = CompiledNet::compile(&net);
2042 let prog = PrecompiledNet::from_compiled(&compiled);
2043
2044 let mut marking = Marking::new();
2045 marking.add(&p1, Token::at(1, 0));
2046
2047 let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2048 let _result = executor.run_to_completion();
2049
2050 let events = executor.event_store().events();
2051 assert!(
2052 events
2053 .iter()
2054 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2055 );
2056 assert!(
2057 events
2058 .iter()
2059 .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
2060 );
2061 assert!(
2062 events
2063 .iter()
2064 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2065 );
2066 assert!(
2067 events
2068 .iter()
2069 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2070 );
2071 assert!(
2072 events
2073 .iter()
2074 .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
2075 );
2076 assert!(
2077 events
2078 .iter()
2079 .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
2080 );
2081 assert!(
2082 events
2083 .iter()
2084 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2085 );
2086 }
2087
2088 #[test]
2089 fn action_error_does_not_crash() {
2090 let p_in = Place::<i32>::new("in");
2091 let p_out = Place::<i32>::new("out");
2092
2093 let t = Transition::builder("t1")
2094 .input(one(&p_in))
2095 .output(out_place(&p_out))
2096 .action(sync_action(|_ctx| {
2097 Err(libpetri_core::action::ActionError::new(
2098 "intentional failure",
2099 ))
2100 }))
2101 .build();
2102 let net = PetriNet::builder("test").transition(t).build();
2103 let compiled = CompiledNet::compile(&net);
2104 let prog = PrecompiledNet::from_compiled(&compiled);
2105
2106 let mut marking = Marking::new();
2107 marking.add(&p_in, Token::at(42, 0));
2108
2109 let mut executor = PrecompiledNetExecutor::<InMemoryEventStore>::new(&prog, marking);
2110 let result = executor.run_to_completion();
2111
2112 assert_eq!(result.count("in"), 0);
2113 assert_eq!(result.count("out"), 0);
2114
2115 let events = executor.event_store().events();
2116 assert!(
2117 events
2118 .iter()
2119 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2120 );
2121 }
2122
2123 #[test]
2124 fn multiple_input_arcs_require_all() {
2125 let p1 = Place::<i32>::new("p1");
2126 let p2 = Place::<i32>::new("p2");
2127 let p3 = Place::<i32>::new("p3");
2128
2129 let t = Transition::builder("t1")
2130 .input(one(&p1))
2131 .input(one(&p2))
2132 .output(out_place(&p3))
2133 .action(sync_action(|ctx| {
2134 ctx.output("p3", 99i32)?;
2135 Ok(())
2136 }))
2137 .build();
2138 let net = PetriNet::builder("test").transition(t).build();
2139 let compiled = CompiledNet::compile(&net);
2140 let prog = PrecompiledNet::from_compiled(&compiled);
2141
2142 let mut marking = Marking::new();
2144 marking.add(&p1, Token::at(1, 0));
2145 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2146 let result = executor.run_to_completion();
2147 assert_eq!(result.count("p1"), 1);
2148 assert_eq!(result.count("p3"), 0);
2149
2150 let compiled2 = CompiledNet::compile(&net);
2152 let prog2 = PrecompiledNet::from_compiled(&compiled2);
2153 let mut marking2 = Marking::new();
2154 marking2.add(&p1, Token::at(1, 0));
2155 marking2.add(&p2, Token::at(2, 0));
2156 let mut executor2 = PrecompiledNetExecutor::<NoopEventStore>::new(&prog2, marking2);
2157 let result2 = executor2.run_to_completion();
2158 assert_eq!(result2.count("p1"), 0);
2159 assert_eq!(result2.count("p2"), 0);
2160 assert_eq!(result2.count("p3"), 1);
2161 }
2162
2163 #[test]
2164 fn sync_action_custom_logic() {
2165 let p_in = Place::<i32>::new("in");
2166 let p_out = Place::<String>::new("out");
2167
2168 let t = Transition::builder("t1")
2169 .input(one(&p_in))
2170 .output(out_place(&p_out))
2171 .action(sync_action(|ctx| {
2172 let v = ctx.input::<i32>("in")?;
2173 ctx.output("out", format!("value={v}"))?;
2174 Ok(())
2175 }))
2176 .build();
2177 let net = PetriNet::builder("test").transition(t).build();
2178 let compiled = CompiledNet::compile(&net);
2179 let prog = PrecompiledNet::from_compiled(&compiled);
2180
2181 let mut marking = Marking::new();
2182 marking.add(&p_in, Token::at(42, 0));
2183
2184 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2185 let result = executor.run_to_completion();
2186
2187 assert_eq!(result.count("out"), 1);
2188 }
2189
2190 #[test]
2191 fn transform_action_outputs_to_all_places() {
2192 let p_in = Place::<i32>::new("in");
2193 let p_a = Place::<i32>::new("a");
2194 let p_b = Place::<i32>::new("b");
2195
2196 let t = Transition::builder("t1")
2197 .input(one(&p_in))
2198 .output(libpetri_core::output::and(vec![
2199 out_place(&p_a),
2200 out_place(&p_b),
2201 ]))
2202 .action(libpetri_core::action::transform(|ctx| {
2203 let v = ctx.input::<i32>("in").unwrap();
2204 Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
2205 }))
2206 .build();
2207 let net = PetriNet::builder("test").transition(t).build();
2208 let compiled = CompiledNet::compile(&net);
2209 let prog = PrecompiledNet::from_compiled(&compiled);
2210
2211 let mut marking = Marking::new();
2212 marking.add(&p_in, Token::at(5, 0));
2213
2214 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2215 let result = executor.run_to_completion();
2216
2217 assert_eq!(result.count("a"), 1);
2218 assert_eq!(result.count("b"), 1);
2219 }
2220
2221 #[test]
2222 fn complex_workflow() {
2223 use libpetri_core::output::{and, xor};
2224
2225 let input = Place::<i32>::new("v_input");
2226 let guard_in = Place::<i32>::new("v_guardIn");
2227 let intent_in = Place::<i32>::new("v_intentIn");
2228 let search_in = Place::<i32>::new("v_searchIn");
2229 let output_guard_in = Place::<i32>::new("v_outputGuardIn");
2230 let guard_safe = Place::<i32>::new("v_guardSafe");
2231 let guard_violation = Place::<i32>::new("v_guardViolation");
2232 let _violated = Place::<i32>::new("v_violated");
2233 let intent_ready = Place::<i32>::new("v_intentReady");
2234 let topic_ready = Place::<i32>::new("v_topicReady");
2235 let search_ready = Place::<i32>::new("v_searchReady");
2236 let _output_guard_done = Place::<i32>::new("v_outputGuardDone");
2237 let response = Place::<i32>::new("v_response");
2238
2239 let fork_trans = Transition::builder("Fork")
2240 .input(one(&input))
2241 .output(and(vec![
2242 out_place(&guard_in),
2243 out_place(&intent_in),
2244 out_place(&search_in),
2245 out_place(&output_guard_in),
2246 ]))
2247 .action(fork())
2248 .build();
2249
2250 let guard_trans = Transition::builder("Guard")
2251 .input(one(&guard_in))
2252 .output(xor(vec![
2253 out_place(&guard_safe),
2254 out_place(&guard_violation),
2255 ]))
2256 .action(fork())
2257 .build();
2258
2259 let handle_violation = Transition::builder("HandleViolation")
2260 .input(one(&guard_violation))
2261 .output(out_place(&_violated))
2262 .inhibitor(libpetri_core::arc::inhibitor(&guard_safe))
2263 .action(fork())
2264 .build();
2265
2266 let intent_trans = Transition::builder("Intent")
2267 .input(one(&intent_in))
2268 .output(out_place(&intent_ready))
2269 .action(fork())
2270 .build();
2271
2272 let topic_trans = Transition::builder("TopicKnowledge")
2273 .input(one(&intent_ready))
2274 .output(out_place(&topic_ready))
2275 .action(fork())
2276 .build();
2277
2278 let search_trans = Transition::builder("Search")
2279 .input(one(&search_in))
2280 .output(out_place(&search_ready))
2281 .read(libpetri_core::arc::read(&intent_ready))
2282 .inhibitor(libpetri_core::arc::inhibitor(&guard_violation))
2283 .priority(-5)
2284 .action(fork())
2285 .build();
2286
2287 let output_guard_trans = Transition::builder("OutputGuard")
2288 .input(one(&output_guard_in))
2289 .output(out_place(&_output_guard_done))
2290 .read(libpetri_core::arc::read(&guard_safe))
2291 .action(fork())
2292 .build();
2293
2294 let compose_trans = Transition::builder("Compose")
2295 .input(one(&guard_safe))
2296 .input(one(&search_ready))
2297 .input(one(&topic_ready))
2298 .output(out_place(&response))
2299 .priority(10)
2300 .action(fork())
2301 .build();
2302
2303 let net = PetriNet::builder("ComplexWorkflow")
2304 .transition(fork_trans)
2305 .transition(guard_trans)
2306 .transition(handle_violation)
2307 .transition(intent_trans)
2308 .transition(topic_trans)
2309 .transition(search_trans)
2310 .transition(output_guard_trans)
2311 .transition(compose_trans)
2312 .build();
2313
2314 let compiled = CompiledNet::compile(&net);
2315 let prog = PrecompiledNet::from_compiled(&compiled);
2316
2317 let mut marking = Marking::new();
2318 marking.add(&input, Token::at(1, 0));
2319
2320 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2321 let result = executor.run_to_completion();
2322
2323 assert_eq!(result.count("v_input"), 0); }
2329
2330 #[cfg(feature = "tokio")]
2331 mod async_tests {
2332 use super::*;
2333 use crate::environment::ExternalEvent;
2334 use libpetri_core::action::async_action;
2335 use libpetri_core::petri_net::PetriNet;
2336 use libpetri_core::token::ErasedToken;
2337
2338 #[tokio::test]
2339 async fn async_linear_chain() {
2340 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
2341 let transitions: Vec<Transition> = (0..5)
2342 .map(|i| {
2343 Transition::builder(format!("t{i}"))
2344 .input(one(&places[i]))
2345 .output(out_place(&places[i + 1]))
2346 .action(fork())
2347 .build()
2348 })
2349 .collect();
2350
2351 let net = PetriNet::builder("chain5").transitions(transitions).build();
2352 let compiled = CompiledNet::compile(&net);
2353 let prog = PrecompiledNet::from_compiled(&compiled);
2354
2355 let mut marking = Marking::new();
2356 marking.add(&places[0], Token::at(1, 0));
2357
2358 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2359 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2360 let result = executor.run_async(rx).await;
2361
2362 assert_eq!(result.count("p0"), 0);
2363 assert_eq!(result.count("p5"), 1);
2364 }
2365
2366 #[tokio::test]
2367 async fn async_action_execution() {
2368 let p1 = Place::<i32>::new("p1");
2369 let p2 = Place::<i32>::new("p2");
2370
2371 let t = Transition::builder("t1")
2372 .input(one(&p1))
2373 .output(out_place(&p2))
2374 .action(async_action(|ctx| async { Ok(ctx) }))
2375 .build();
2376
2377 let net = PetriNet::builder("async_test").transition(t).build();
2378 let compiled = CompiledNet::compile(&net);
2379 let prog = PrecompiledNet::from_compiled(&compiled);
2380
2381 let mut marking = Marking::new();
2382 marking.add(&p1, Token::at(42, 0));
2383
2384 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::new(&prog, marking);
2385 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2386 let result = executor.run_async(rx).await;
2387
2388 assert_eq!(result.count("p1"), 0);
2389 }
2390
2391 #[tokio::test]
2394 async fn async_drain_terminates_at_quiescence() {
2395 let p1 = Place::<i32>::new("p1");
2396 let p2 = Place::<i32>::new("p2");
2397
2398 let t1 = Transition::builder("t1")
2399 .input(one(&p1))
2400 .output(out_place(&p2))
2401 .action(fork())
2402 .build();
2403
2404 let net = PetriNet::builder("test").transition(t1).build();
2405 let compiled = CompiledNet::compile(&net);
2406 let prog = PrecompiledNet::from_compiled(&compiled);
2407
2408 let marking = Marking::new();
2409 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2410 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2411 .build();
2412
2413 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2414
2415 tokio::spawn(async move {
2416 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2417 tx.send(ExecutorSignal::Event(ExternalEvent {
2418 place_name: Arc::from("p1"),
2419 token: ErasedToken::from_typed(&Token::at(42, 0)),
2420 }))
2421 .unwrap();
2422 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2423 tx.send(ExecutorSignal::Drain).unwrap();
2424 });
2425
2426 let result = executor.run_async(rx).await;
2427 assert_eq!(result.count("p2"), 1);
2428 }
2429
2430 #[tokio::test]
2431 async fn async_drain_rejects_post_drain_events() {
2432 let p1 = Place::<i32>::new("p1");
2433 let p2 = Place::<i32>::new("p2");
2434
2435 let t1 = Transition::builder("t1")
2436 .input(one(&p1))
2437 .output(out_place(&p2))
2438 .action(fork())
2439 .build();
2440
2441 let net = PetriNet::builder("test").transition(t1).build();
2442 let compiled = CompiledNet::compile(&net);
2443 let prog = PrecompiledNet::from_compiled(&compiled);
2444
2445 let marking = Marking::new();
2446 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2447 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2448 .build();
2449
2450 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2451
2452 tokio::spawn(async move {
2453 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2454 tx.send(ExecutorSignal::Drain).unwrap();
2455 tx.send(ExecutorSignal::Event(ExternalEvent {
2456 place_name: Arc::from("p1"),
2457 token: ErasedToken::from_typed(&Token::at(99, 0)),
2458 }))
2459 .unwrap();
2460 });
2461
2462 let result = executor.run_async(rx).await;
2463 assert_eq!(result.count("p2"), 0);
2464 }
2465
2466 #[tokio::test]
2467 async fn async_close_discards_queued_events() {
2468 let p1 = Place::<i32>::new("p1");
2469 let p2 = Place::<i32>::new("p2");
2470
2471 let t1 = Transition::builder("t1")
2472 .input(one(&p1))
2473 .output(out_place(&p2))
2474 .action(fork())
2475 .build();
2476
2477 let net = PetriNet::builder("test").transition(t1).build();
2478 let compiled = CompiledNet::compile(&net);
2479 let prog = PrecompiledNet::from_compiled(&compiled);
2480
2481 let marking = Marking::new();
2482 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2483 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2484 .build();
2485
2486 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2487
2488 tx.send(ExecutorSignal::Event(ExternalEvent {
2489 place_name: Arc::from("p1"),
2490 token: ErasedToken::from_typed(&Token::at(1, 0)),
2491 }))
2492 .unwrap();
2493 tx.send(ExecutorSignal::Close).unwrap();
2494 tx.send(ExecutorSignal::Event(ExternalEvent {
2495 place_name: Arc::from("p1"),
2496 token: ErasedToken::from_typed(&Token::at(2, 0)),
2497 }))
2498 .unwrap();
2499 drop(tx);
2500
2501 let result = executor.run_async(rx).await;
2502 assert!(result.count("p2") <= 1);
2503 }
2504
2505 #[tokio::test]
2506 async fn async_close_after_drain_escalates() {
2507 let p1 = Place::<i32>::new("p1");
2508 let p2 = Place::<i32>::new("p2");
2509
2510 let t1 = Transition::builder("t1")
2511 .input(one(&p1))
2512 .output(out_place(&p2))
2513 .action(fork())
2514 .build();
2515
2516 let net = PetriNet::builder("test").transition(t1).build();
2517 let compiled = CompiledNet::compile(&net);
2518 let prog = PrecompiledNet::from_compiled(&compiled);
2519
2520 let marking = Marking::new();
2521 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2522 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2523 .build();
2524
2525 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2526
2527 tokio::spawn(async move {
2528 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2529 tx.send(ExecutorSignal::Drain).unwrap();
2530 tx.send(ExecutorSignal::Close).unwrap();
2531 });
2532
2533 let _result = executor.run_async(rx).await;
2534 }
2536
2537 #[tokio::test]
2538 async fn async_handle_raii_drain_on_drop() {
2539 use crate::executor_handle::ExecutorHandle;
2540
2541 let p1 = Place::<i32>::new("p1");
2542 let p2 = Place::<i32>::new("p2");
2543
2544 let t1 = Transition::builder("t1")
2545 .input(one(&p1))
2546 .output(out_place(&p2))
2547 .action(fork())
2548 .build();
2549
2550 let net = PetriNet::builder("test").transition(t1).build();
2551 let compiled = CompiledNet::compile(&net);
2552 let prog = PrecompiledNet::from_compiled(&compiled);
2553
2554 let marking = Marking::new();
2555 let mut executor = PrecompiledNetExecutor::<NoopEventStore>::builder(&prog, marking)
2556 .environment_places(["p1"].iter().map(|s| Arc::from(*s)).collect())
2557 .build();
2558
2559 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2560
2561 tokio::spawn(async move {
2562 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2563 let mut handle = ExecutorHandle::new(tx);
2564 handle.inject(
2565 Arc::from("p1"),
2566 ErasedToken::from_typed(&Token::at(7, 0)),
2567 );
2568 });
2570
2571 let result = executor.run_async(rx).await;
2572 assert_eq!(result.count("p2"), 1);
2573 }
2574 }
2575}