1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::output::Out;
8use libpetri_core::petri_net::PetriNet;
9use libpetri_core::token::ErasedToken;
10
11use libpetri_event::event_store::EventStore;
12use libpetri_event::net_event::NetEvent;
13
14use crate::bitmap;
15use crate::compiled_net::CompiledNet;
16use crate::marking::Marking;
17
18const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21pub struct BitmapNetExecutor<E: EventStore> {
26 compiled: CompiledNet,
27 marking: Marking,
28 event_store: E,
29 #[allow(dead_code)]
30 environment_places: HashSet<Arc<str>>,
31 has_environment_places: bool,
32
33 marked_places: Vec<u64>,
35 dirty_set: Vec<u64>,
36 marking_snap_buffer: Vec<u64>,
37 dirty_snap_buffer: Vec<u64>,
38 firing_snap_buffer: Vec<u64>,
39
40 enabled_at_ms: Vec<f64>,
42 enabled_flags: Vec<bool>,
43 has_deadline_flags: Vec<bool>,
44 enabled_transition_count: usize,
45
46 all_immediate: bool,
48 all_same_priority: bool,
49 has_any_deadlines: bool,
50
51 pending_reset_places: HashSet<Arc<str>>,
53 transition_input_place_names: Vec<HashSet<Arc<str>>>,
54
55 start_time: Instant,
56}
57
58#[derive(Default)]
60pub struct ExecutorOptions {
61 pub environment_places: HashSet<Arc<str>>,
62}
63
64impl<E: EventStore> BitmapNetExecutor<E> {
65 pub fn new(net: &PetriNet, initial_tokens: Marking, options: ExecutorOptions) -> Self {
67 let compiled = CompiledNet::compile(net);
68 let word_count = compiled.word_count;
69 let tc = compiled.transition_count;
70 let dirty_word_count = bitmap::word_count(tc);
71
72 let mut has_any_deadlines = false;
73 let mut all_immediate = true;
74 let mut all_same_priority = true;
75 let first_priority = if tc > 0 {
76 compiled.transition(0).priority()
77 } else {
78 0
79 };
80 let mut has_deadline_flags = vec![false; tc];
81
82 for (tid, flag) in has_deadline_flags.iter_mut().enumerate() {
83 let t = compiled.transition(tid);
84 if t.timing().has_deadline() {
85 *flag = true;
86 has_any_deadlines = true;
87 }
88 if *t.timing() != libpetri_core::timing::Timing::Immediate {
89 all_immediate = false;
90 }
91 if t.priority() != first_priority {
92 all_same_priority = false;
93 }
94 }
95
96 let mut transition_input_place_names = Vec::with_capacity(tc);
98 for tid in 0..tc {
99 let t = compiled.transition(tid);
100 let names: HashSet<Arc<str>> = t
101 .input_specs()
102 .iter()
103 .map(|s| Arc::clone(s.place().name_arc()))
104 .collect();
105 transition_input_place_names.push(names);
106 }
107
108 Self {
109 compiled,
110 marking: initial_tokens,
111 event_store: E::default(),
112 has_environment_places: !options.environment_places.is_empty(),
113 environment_places: options.environment_places,
114 marked_places: vec![0u64; word_count],
115 dirty_set: vec![0u64; dirty_word_count],
116 marking_snap_buffer: vec![0u64; word_count],
117 dirty_snap_buffer: vec![0u64; dirty_word_count],
118 firing_snap_buffer: vec![0u64; word_count],
119 enabled_at_ms: vec![f64::NEG_INFINITY; tc],
120 enabled_flags: vec![false; tc],
121 has_deadline_flags,
122 enabled_transition_count: 0,
123 all_immediate,
124 all_same_priority,
125 has_any_deadlines,
126 pending_reset_places: HashSet::new(),
127 transition_input_place_names,
128 start_time: Instant::now(),
129 }
130 }
131
132 pub fn run_sync(&mut self) -> &Marking {
137 self.initialize_marked_bitmap();
138 self.mark_all_dirty();
139
140 if E::ENABLED {
141 let now = now_millis();
142 self.event_store.append(NetEvent::ExecutionStarted {
143 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
144 timestamp: now,
145 });
146 }
147
148 loop {
149 self.update_dirty_transitions();
150
151 let cycle_now = self.elapsed_ms();
152
153 if self.has_any_deadlines {
154 self.enforce_deadlines(cycle_now);
155 }
156
157 if self.should_terminate() {
158 break;
159 }
160
161 if self.all_immediate && self.all_same_priority {
162 self.fire_ready_immediate_sync();
163 } else {
164 self.fire_ready_general_sync(cycle_now);
165 }
166
167 if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
169 break;
170 }
171 }
172
173 if E::ENABLED {
174 let now = now_millis();
175 self.event_store.append(NetEvent::ExecutionCompleted {
176 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
177 timestamp: now,
178 });
179 }
180
181 &self.marking
182 }
183
184 pub fn marking(&self) -> &Marking {
186 &self.marking
187 }
188
189 pub fn event_store(&self) -> &E {
191 &self.event_store
192 }
193
194 pub fn is_quiescent(&self) -> bool {
196 self.enabled_transition_count == 0
197 }
198
199 fn initialize_marked_bitmap(&mut self) {
202 for pid in 0..self.compiled.place_count {
203 let place = self.compiled.place(pid);
204 if self.marking.has_tokens(place.name()) {
205 bitmap::set_bit(&mut self.marked_places, pid);
206 }
207 }
208 }
209
210 fn mark_all_dirty(&mut self) {
211 let tc = self.compiled.transition_count;
212 let dirty_words = self.dirty_set.len();
213 for w in 0..dirty_words.saturating_sub(1) {
214 self.dirty_set[w] = u64::MAX;
215 }
216 if dirty_words > 0 {
217 let last_word_bits = tc & bitmap::WORD_MASK;
218 self.dirty_set[dirty_words - 1] = if last_word_bits == 0 {
219 u64::MAX
220 } else {
221 (1u64 << last_word_bits) - 1
222 };
223 }
224 }
225
226 fn should_terminate(&self) -> bool {
227 if self.has_environment_places {
228 return false;
229 }
230 self.enabled_transition_count == 0
231 }
232
233 fn update_dirty_transitions(&mut self) {
236 let now_ms = self.elapsed_ms();
237
238 self.marking_snap_buffer
240 .copy_from_slice(&self.marked_places);
241
242 let dirty_words = self.dirty_set.len();
244 for w in 0..dirty_words {
245 self.dirty_snap_buffer[w] = self.dirty_set[w];
246 self.dirty_set[w] = 0;
247 }
248
249 let tc = self.compiled.transition_count;
251 let mut dirty_tids = Vec::new();
252 bitmap::for_each_set_bit(&self.dirty_snap_buffer, |tid| {
253 if tid < tc {
254 dirty_tids.push(tid);
255 }
256 });
257
258 let marking_snap = self.marking_snap_buffer.clone();
259 for tid in dirty_tids {
260 let was_enabled = self.enabled_flags[tid];
261 let can_now = self.can_enable(tid, &marking_snap);
262
263 if can_now && !was_enabled {
264 self.enabled_flags[tid] = true;
265 self.enabled_transition_count += 1;
266 self.enabled_at_ms[tid] = now_ms;
267
268 if E::ENABLED {
269 self.event_store.append(NetEvent::TransitionEnabled {
270 transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
271 timestamp: now_millis(),
272 });
273 }
274 } else if !can_now && was_enabled {
275 self.enabled_flags[tid] = false;
276 self.enabled_transition_count -= 1;
277 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
278 } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
279 self.enabled_at_ms[tid] = now_ms;
280 if E::ENABLED {
281 self.event_store.append(NetEvent::TransitionClockRestarted {
282 transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
283 timestamp: now_millis(),
284 });
285 }
286 }
287 }
288
289 self.pending_reset_places.clear();
290 }
291
292 fn enforce_deadlines(&mut self, now_ms: f64) {
293 for tid in 0..self.compiled.transition_count {
294 if !self.has_deadline_flags[tid] || !self.enabled_flags[tid] {
295 continue;
296 }
297 let t = self.compiled.transition(tid);
298 let elapsed = now_ms - self.enabled_at_ms[tid];
299 let latest_ms = t.timing().latest() as f64;
300 if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
301 self.enabled_flags[tid] = false;
302 self.enabled_transition_count -= 1;
303 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
304
305 if E::ENABLED {
306 self.event_store.append(NetEvent::TransitionTimedOut {
307 transition_name: Arc::clone(t.name_arc()),
308 timestamp: now_millis(),
309 });
310 }
311 }
312 }
313 }
314
315 fn can_enable(&self, tid: usize, marking_snap: &[u64]) -> bool {
316 if !self.compiled.can_enable_bitmap(tid, marking_snap) {
317 return false;
318 }
319
320 if let Some(card_check) = self.compiled.cardinality_check(tid) {
322 for i in 0..card_check.place_ids.len() {
323 let pid = card_check.place_ids[i];
324 let required = card_check.required_counts[i];
325 let place = self.compiled.place(pid);
326 if self.marking.count(place.name()) < required {
327 return false;
328 }
329 }
330 }
331
332 if self.compiled.has_guards(tid) {
334 let t = self.compiled.transition(tid);
335 for spec in t.input_specs() {
336 if let Some(guard) = spec.guard() {
337 let required = match spec {
338 In::One { .. } => 1,
339 In::Exactly { count, .. } => *count,
340 In::AtLeast { minimum, .. } => *minimum,
341 In::All { .. } => 1,
342 };
343 if self.marking.count_matching(spec.place_name(), &**guard) < required {
344 return false;
345 }
346 }
347 }
348 }
349
350 true
351 }
352
353 fn has_input_from_reset_place(&self, tid: usize) -> bool {
354 if self.pending_reset_places.is_empty() {
355 return false;
356 }
357 let input_names = &self.transition_input_place_names[tid];
358 for name in &self.pending_reset_places {
359 if input_names.contains(name) {
360 return true;
361 }
362 }
363 false
364 }
365
366 fn fire_ready_immediate_sync(&mut self) {
369 for tid in 0..self.compiled.transition_count {
370 if !self.enabled_flags[tid] {
371 continue;
372 }
373 if self.can_enable(tid, &self.marked_places.clone()) {
374 self.fire_transition_sync(tid);
375 } else {
376 self.enabled_flags[tid] = false;
377 self.enabled_transition_count -= 1;
378 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
379 }
380 }
381 }
382
383 fn fire_ready_general_sync(&mut self, now_ms: f64) {
384 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
386 for tid in 0..self.compiled.transition_count {
387 if !self.enabled_flags[tid] {
388 continue;
389 }
390 let t = self.compiled.transition(tid);
391 let enabled_ms = self.enabled_at_ms[tid];
392 let elapsed = now_ms - enabled_ms;
393 let earliest_ms = t.timing().earliest() as f64;
394 if earliest_ms <= elapsed {
395 ready.push((tid, t.priority(), enabled_ms));
396 }
397 }
398 if ready.is_empty() {
399 return;
400 }
401
402 ready.sort_by(|a, b| {
404 b.1.cmp(&a.1)
405 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
406 });
407
408 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
410
411 for (tid, _, _) in ready {
412 if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
413 self.fire_transition_sync(tid);
414 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
415 } else {
416 self.enabled_flags[tid] = false;
417 self.enabled_transition_count -= 1;
418 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
419 }
420 }
421 }
422
423 fn fire_transition_sync(&mut self, tid: usize) {
424 let t = self.compiled.transition(tid);
426 let transition_name = Arc::clone(t.name_arc());
427 let input_specs: Vec<In> = t.input_specs().to_vec();
428 let read_arcs: Vec<_> = t.reads().to_vec();
429 let reset_arcs: Vec<_> = t.resets().to_vec();
430 let output_place_names: HashSet<Arc<str>> = t
431 .output_places()
432 .iter()
433 .map(|p| Arc::clone(p.name_arc()))
434 .collect();
435 let action = Arc::clone(t.action());
436 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
440 for in_spec in &input_specs {
441 let place_name = in_spec.place_name();
442 let to_consume = match in_spec {
443 In::One { .. } => 1,
444 In::Exactly { count, .. } => *count,
445 In::All { guard, .. } | In::AtLeast { guard, .. } => {
446 if guard.is_some() {
447 self.marking
448 .count_matching(place_name, &**guard.as_ref().unwrap())
449 } else {
450 self.marking.count(place_name)
451 }
452 }
453 };
454
455 let place_name_arc = Arc::clone(in_spec.place().name_arc());
456 for _ in 0..to_consume {
457 let token = if let Some(guard) = in_spec.guard() {
458 self.marking.remove_matching(place_name, &**guard)
459 } else {
460 self.marking.remove_first(place_name)
461 };
462 if let Some(token) = token {
463 if E::ENABLED {
464 self.event_store.append(NetEvent::TokenRemoved {
465 place_name: Arc::clone(&place_name_arc),
466 timestamp: now_millis(),
467 });
468 }
469 inputs
470 .entry(Arc::clone(&place_name_arc))
471 .or_default()
472 .push(token);
473 }
474 }
475 }
476
477 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
479 for arc in &read_arcs {
480 if let Some(queue) = self.marking.queue(arc.place.name())
481 && let Some(token) = queue.front()
482 {
483 read_tokens
484 .entry(Arc::clone(arc.place.name_arc()))
485 .or_default()
486 .push(token.clone());
487 }
488 }
489
490 for arc in &reset_arcs {
492 let removed = self.marking.remove_all(arc.place.name());
493 self.pending_reset_places
494 .insert(Arc::clone(arc.place.name_arc()));
495 if E::ENABLED {
496 for _ in &removed {
497 self.event_store.append(NetEvent::TokenRemoved {
498 place_name: Arc::clone(arc.place.name_arc()),
499 timestamp: now_millis(),
500 });
501 }
502 }
503 }
504
505 self.update_bitmap_after_consumption(tid);
507
508 if E::ENABLED {
509 self.event_store.append(NetEvent::TransitionStarted {
510 transition_name: Arc::clone(&transition_name),
511 timestamp: now_millis(),
512 });
513 }
514
515 let mut ctx = TransitionContext::new(
517 Arc::clone(&transition_name),
518 inputs,
519 read_tokens,
520 output_place_names,
521 None,
522 );
523
524 let result = action.run_sync(&mut ctx);
525
526 match result {
527 Ok(()) => {
528 let outputs = ctx.take_outputs();
530 self.process_outputs(tid, &transition_name, outputs);
531
532 if E::ENABLED {
533 self.event_store.append(NetEvent::TransitionCompleted {
534 transition_name: Arc::clone(&transition_name),
535 timestamp: now_millis(),
536 });
537 }
538 }
539 Err(err) => {
540 if E::ENABLED {
541 self.event_store.append(NetEvent::TransitionFailed {
542 transition_name: Arc::clone(&transition_name),
543 error: err.message,
544 timestamp: now_millis(),
545 });
546 }
547 }
548 }
549
550 self.enabled_flags[tid] = false;
552 self.enabled_transition_count -= 1;
553 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
554
555 self.mark_transition_dirty(tid);
557 }
558
559 fn process_outputs(
560 &mut self,
561 _tid: usize,
562 _transition_name: &Arc<str>,
563 outputs: Vec<OutputEntry>,
564 ) {
565 for entry in outputs {
566 self.marking.add_erased(&entry.place_name, entry.token);
567
568 if let Some(pid) = self.compiled.place_id(&entry.place_name) {
569 bitmap::set_bit(&mut self.marked_places, pid);
570 self.mark_dirty(pid);
571 }
572
573 if E::ENABLED {
574 self.event_store.append(NetEvent::TokenAdded {
575 place_name: Arc::clone(&entry.place_name),
576 timestamp: now_millis(),
577 });
578 }
579 }
580 }
581
582 fn update_bitmap_after_consumption(&mut self, tid: usize) {
583 let consumption_pids: Vec<usize> = self.compiled.consumption_place_ids(tid).to_vec();
584 for pid in consumption_pids {
585 let place = self.compiled.place(pid);
586 if !self.marking.has_tokens(place.name()) {
587 bitmap::clear_bit(&mut self.marked_places, pid);
588 }
589 self.mark_dirty(pid);
590 }
591 }
592
593 fn has_dirty_bits(&self) -> bool {
596 !bitmap::is_empty(&self.dirty_set)
597 }
598
599 fn mark_dirty(&mut self, pid: usize) {
600 let tids: Vec<usize> = self.compiled.affected_transitions(pid).to_vec();
601 for tid in tids {
602 self.mark_transition_dirty(tid);
603 }
604 }
605
606 fn mark_transition_dirty(&mut self, tid: usize) {
607 bitmap::set_bit(&mut self.dirty_set, tid);
608 }
609
610 fn elapsed_ms(&self) -> f64 {
611 self.start_time.elapsed().as_secs_f64() * 1000.0
612 }
613}
614
615#[cfg(feature = "tokio")]
616use crate::environment::ExecutorSignal;
617
618#[cfg(feature = "tokio")]
620struct ActionCompletion {
621 transition_name: Arc<str>,
622 result: Result<Vec<OutputEntry>, String>,
623}
624
625#[cfg(feature = "tokio")]
626impl<E: EventStore> BitmapNetExecutor<E> {
627 pub async fn run_async(
641 &mut self,
642 mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
643 ) -> &Marking {
644 let (completion_tx, mut completion_rx) =
645 tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
646
647 self.initialize_marked_bitmap();
648 self.mark_all_dirty();
649
650 let mut in_flight_count: usize = 0;
651 let mut signal_channel_open = true;
652 let mut draining = false;
653 let mut closed = false;
654
655 if E::ENABLED {
656 let now = now_millis();
657 self.event_store.append(NetEvent::ExecutionStarted {
658 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
659 timestamp: now,
660 });
661 }
662
663 loop {
664 while let Ok(completion) = completion_rx.try_recv() {
666 in_flight_count -= 1;
667 match completion.result {
668 Ok(outputs) => {
669 self.process_outputs(0, &completion.transition_name, outputs);
670 if E::ENABLED {
671 self.event_store.append(NetEvent::TransitionCompleted {
672 transition_name: Arc::clone(&completion.transition_name),
673 timestamp: now_millis(),
674 });
675 }
676 }
677 Err(err) => {
678 if E::ENABLED {
679 self.event_store.append(NetEvent::TransitionFailed {
680 transition_name: Arc::clone(&completion.transition_name),
681 error: err,
682 timestamp: now_millis(),
683 });
684 }
685 }
686 }
687 }
688
689 while let Ok(signal) = signal_rx.try_recv() {
691 match signal {
692 ExecutorSignal::Event(event) if !draining => {
693 self.marking.add_erased(&event.place_name, event.token);
694 if let Some(pid) = self.compiled.place_id(&event.place_name) {
695 bitmap::set_bit(&mut self.marked_places, pid);
696 self.mark_dirty(pid);
697 }
698 if E::ENABLED {
699 self.event_store.append(NetEvent::TokenAdded {
700 place_name: Arc::clone(&event.place_name),
701 timestamp: now_millis(),
702 });
703 }
704 }
705 ExecutorSignal::Event(_) => {
706 }
708 ExecutorSignal::Drain => {
709 draining = true;
710 }
711 ExecutorSignal::Close => {
712 closed = true;
713 draining = true;
714 while signal_rx.try_recv().is_ok() {}
719 }
720 }
721 }
722
723 self.update_dirty_transitions();
725
726 let cycle_now = self.elapsed_ms();
728 if self.has_any_deadlines {
729 self.enforce_deadlines(cycle_now);
730 }
731
732 if closed && in_flight_count == 0 {
734 break; }
736 if draining
737 && self.enabled_transition_count == 0
738 && in_flight_count == 0
739 {
740 break; }
742 if self.enabled_transition_count == 0
743 && in_flight_count == 0
744 && (!self.has_environment_places || !signal_channel_open)
745 {
746 break; }
748
749 let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
751
752 if fired || self.has_dirty_bits() {
754 tokio::task::yield_now().await;
756 continue;
757 }
758
759 if in_flight_count == 0 && !self.has_environment_places
761 && self.enabled_transition_count == 0
762 {
763 break;
764 }
765 if in_flight_count == 0
766 && self.enabled_transition_count == 0
767 && (draining || !signal_channel_open)
768 {
769 break;
770 }
771
772 let timer_ms = self.millis_until_next_timed_transition();
773
774 tokio::select! {
775 Some(completion) = completion_rx.recv() => {
776 in_flight_count -= 1;
777 match completion.result {
778 Ok(outputs) => {
779 self.process_outputs(0, &completion.transition_name, outputs);
780 if E::ENABLED {
781 self.event_store.append(NetEvent::TransitionCompleted {
782 transition_name: Arc::clone(&completion.transition_name),
783 timestamp: now_millis(),
784 });
785 }
786 }
787 Err(err) => {
788 if E::ENABLED {
789 self.event_store.append(NetEvent::TransitionFailed {
790 transition_name: Arc::clone(&completion.transition_name),
791 error: err,
792 timestamp: now_millis(),
793 });
794 }
795 }
796 }
797 }
798 result = signal_rx.recv(), if signal_channel_open && !closed => {
799 match result {
800 Some(ExecutorSignal::Event(event)) if !draining => {
801 self.marking.add_erased(&event.place_name, event.token);
802 if let Some(pid) = self.compiled.place_id(&event.place_name) {
803 bitmap::set_bit(&mut self.marked_places, pid);
804 self.mark_dirty(pid);
805 }
806 if E::ENABLED {
807 self.event_store.append(NetEvent::TokenAdded {
808 place_name: Arc::clone(&event.place_name),
809 timestamp: now_millis(),
810 });
811 }
812 }
813 Some(ExecutorSignal::Event(_)) => {
814 }
816 Some(ExecutorSignal::Drain) => {
817 draining = true;
818 }
819 Some(ExecutorSignal::Close) => {
820 closed = true;
821 draining = true;
822 while signal_rx.try_recv().is_ok() {}
823 }
824 None => {
825 signal_channel_open = false;
826 }
827 }
828 }
829 _ = tokio::time::sleep(std::time::Duration::from_millis(
830 if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
831 )) => {
832 }
834 }
835 }
836
837 if E::ENABLED {
838 let now = now_millis();
839 self.event_store.append(NetEvent::ExecutionCompleted {
840 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
841 timestamp: now,
842 });
843 }
844
845 &self.marking
846 }
847
848 fn fire_ready_async(
851 &mut self,
852 now_ms: f64,
853 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
854 in_flight_count: &mut usize,
855 ) -> bool {
856 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
857 for tid in 0..self.compiled.transition_count {
858 if !self.enabled_flags[tid] {
859 continue;
860 }
861 let t = self.compiled.transition(tid);
862 let enabled_ms = self.enabled_at_ms[tid];
863 let elapsed = now_ms - enabled_ms;
864 let earliest_ms = t.timing().earliest() as f64;
865 if earliest_ms <= elapsed {
866 ready.push((tid, t.priority(), enabled_ms));
867 }
868 }
869 if ready.is_empty() {
870 return false;
871 }
872
873 ready.sort_by(|a, b| {
874 b.1.cmp(&a.1)
875 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
876 });
877
878 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
879
880 let mut fired_any = false;
881 for (tid, _, _) in ready {
882 if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
883 self.fire_transition_async(tid, completion_tx, in_flight_count);
884 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
885 fired_any = true;
886 } else {
887 self.enabled_flags[tid] = false;
888 self.enabled_transition_count -= 1;
889 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
890 }
891 }
892 fired_any
893 }
894
895 fn fire_transition_async(
897 &mut self,
898 tid: usize,
899 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
900 in_flight_count: &mut usize,
901 ) {
902 let t = self.compiled.transition(tid);
903 let transition_name = Arc::clone(t.name_arc());
904 let input_specs: Vec<In> = t.input_specs().to_vec();
905 let read_arcs: Vec<_> = t.reads().to_vec();
906 let reset_arcs: Vec<_> = t.resets().to_vec();
907 let output_place_names: HashSet<Arc<str>> = t
908 .output_places()
909 .iter()
910 .map(|p| Arc::clone(p.name_arc()))
911 .collect();
912 let action = Arc::clone(t.action());
913 let is_sync = action.is_sync();
914
915 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
917 for in_spec in &input_specs {
918 let place_name = in_spec.place_name();
919 let to_consume = match in_spec {
920 In::One { .. } => 1,
921 In::Exactly { count, .. } => *count,
922 In::All { guard, .. } | In::AtLeast { guard, .. } => {
923 if guard.is_some() {
924 self.marking
925 .count_matching(place_name, &**guard.as_ref().unwrap())
926 } else {
927 self.marking.count(place_name)
928 }
929 }
930 };
931
932 let place_name_arc = Arc::clone(in_spec.place().name_arc());
933 for _ in 0..to_consume {
934 let token = if let Some(guard) = in_spec.guard() {
935 self.marking.remove_matching(place_name, &**guard)
936 } else {
937 self.marking.remove_first(place_name)
938 };
939 if let Some(token) = token {
940 if E::ENABLED {
941 self.event_store.append(NetEvent::TokenRemoved {
942 place_name: Arc::clone(&place_name_arc),
943 timestamp: now_millis(),
944 });
945 }
946 inputs
947 .entry(Arc::clone(&place_name_arc))
948 .or_default()
949 .push(token);
950 }
951 }
952 }
953
954 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
956 for arc in &read_arcs {
957 if let Some(queue) = self.marking.queue(arc.place.name())
958 && let Some(token) = queue.front()
959 {
960 read_tokens
961 .entry(Arc::clone(arc.place.name_arc()))
962 .or_default()
963 .push(token.clone());
964 }
965 }
966
967 for arc in &reset_arcs {
969 let removed = self.marking.remove_all(arc.place.name());
970 self.pending_reset_places
971 .insert(Arc::clone(arc.place.name_arc()));
972 if E::ENABLED {
973 for _ in &removed {
974 self.event_store.append(NetEvent::TokenRemoved {
975 place_name: Arc::clone(arc.place.name_arc()),
976 timestamp: now_millis(),
977 });
978 }
979 }
980 }
981
982 self.update_bitmap_after_consumption(tid);
983
984 if E::ENABLED {
985 self.event_store.append(NetEvent::TransitionStarted {
986 transition_name: Arc::clone(&transition_name),
987 timestamp: now_millis(),
988 });
989 }
990
991 self.enabled_flags[tid] = false;
993 self.enabled_transition_count -= 1;
994 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
995 self.mark_transition_dirty(tid);
996
997 if is_sync {
998 let mut ctx = TransitionContext::new(
1000 Arc::clone(&transition_name),
1001 inputs,
1002 read_tokens,
1003 output_place_names,
1004 None,
1005 );
1006 let result = action.run_sync(&mut ctx);
1007 match result {
1008 Ok(()) => {
1009 let outputs = ctx.take_outputs();
1010 self.process_outputs(tid, &transition_name, outputs);
1011 if E::ENABLED {
1012 self.event_store.append(NetEvent::TransitionCompleted {
1013 transition_name: Arc::clone(&transition_name),
1014 timestamp: now_millis(),
1015 });
1016 }
1017 }
1018 Err(err) => {
1019 if E::ENABLED {
1020 self.event_store.append(NetEvent::TransitionFailed {
1021 transition_name: Arc::clone(&transition_name),
1022 error: err.message,
1023 timestamp: now_millis(),
1024 });
1025 }
1026 }
1027 }
1028 } else {
1029 *in_flight_count += 1;
1031 let tx = completion_tx.clone();
1032 let name = Arc::clone(&transition_name);
1033 let ctx = TransitionContext::new(
1034 Arc::clone(&transition_name),
1035 inputs,
1036 read_tokens,
1037 output_place_names,
1038 None,
1039 );
1040 tokio::spawn(async move {
1041 let result = action.run_async(ctx).await;
1042 let completion = match result {
1043 Ok(mut completed_ctx) => ActionCompletion {
1044 transition_name: Arc::clone(&name),
1045 result: Ok(completed_ctx.take_outputs()),
1046 },
1047 Err(err) => ActionCompletion {
1048 transition_name: Arc::clone(&name),
1049 result: Err(err.message),
1050 },
1051 };
1052 let _ = tx.send(completion);
1053 });
1054 }
1055 }
1056
1057 fn millis_until_next_timed_transition(&self) -> f64 {
1059 let mut min_wait = f64::INFINITY;
1060 let now_ms = self.elapsed_ms();
1061
1062 for tid in 0..self.compiled.transition_count {
1063 if !self.enabled_flags[tid] {
1064 continue;
1065 }
1066 let t = self.compiled.transition(tid);
1067 let elapsed = now_ms - self.enabled_at_ms[tid];
1068
1069 let earliest_ms = t.timing().earliest() as f64;
1070 let remaining_earliest = earliest_ms - elapsed;
1071 if remaining_earliest <= 0.0 {
1072 return 0.0;
1073 }
1074 min_wait = min_wait.min(remaining_earliest);
1075
1076 if self.has_deadline_flags[tid] {
1077 let latest_ms = t.timing().latest() as f64;
1078 let remaining_deadline = latest_ms - elapsed;
1079 if remaining_deadline <= 0.0 {
1080 return 0.0;
1081 }
1082 min_wait = min_wait.min(remaining_deadline);
1083 }
1084 }
1085
1086 min_wait
1087 }
1088}
1089
1090fn now_millis() -> u64 {
1091 std::time::SystemTime::now()
1092 .duration_since(std::time::UNIX_EPOCH)
1093 .unwrap_or_default()
1094 .as_millis() as u64
1095}
1096
1097#[allow(dead_code)]
1099fn validate_out_spec(out: &Out, produced_places: &HashSet<Arc<str>>) -> bool {
1100 match out {
1101 Out::Place(p) => produced_places.contains(p.name()),
1102 Out::And(children) => children
1103 .iter()
1104 .all(|c| validate_out_spec(c, produced_places)),
1105 Out::Xor(children) => children
1106 .iter()
1107 .any(|c| validate_out_spec(c, produced_places)),
1108 Out::Timeout { child, .. } => validate_out_spec(child, produced_places),
1109 Out::ForwardInput { to, .. } => produced_places.contains(to.name()),
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use super::*;
1116 use libpetri_core::action::passthrough;
1117 use libpetri_core::input::one;
1118 use libpetri_core::output::out_place;
1119 use libpetri_core::place::Place;
1120 use libpetri_core::token::Token;
1121 use libpetri_core::transition::Transition;
1122 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1123
1124 fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1125 let p1 = Place::<i32>::new("p1");
1126 let p2 = Place::<i32>::new("p2");
1127 let p3 = Place::<i32>::new("p3");
1128
1129 let t1 = Transition::builder("t1")
1130 .input(one(&p1))
1131 .output(out_place(&p2))
1132 .action(passthrough())
1133 .build();
1134 let t2 = Transition::builder("t2")
1135 .input(one(&p2))
1136 .output(out_place(&p3))
1137 .action(passthrough())
1138 .build();
1139
1140 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1141 (net, p1, p2, p3)
1142 }
1143
1144 #[test]
1145 fn sync_passthrough_chain() {
1146 let (net, p1, _p2, _p3) = simple_chain();
1147
1148 let mut marking = Marking::new();
1149 marking.add(&p1, Token::at(42, 0));
1150
1151 let mut executor =
1152 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1153 let result = executor.run_sync();
1154
1155 assert_eq!(result.count("p1"), 0);
1159 }
1160
1161 #[test]
1162 fn sync_with_event_store() {
1163 let (net, p1, _, _) = simple_chain();
1164
1165 let mut marking = Marking::new();
1166 marking.add(&p1, Token::at(42, 0));
1167
1168 let mut executor =
1169 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1170 executor.run_sync();
1171
1172 let store = executor.event_store();
1173 assert!(!store.is_empty());
1174 }
1175
1176 #[test]
1177 fn sync_fork_chain() {
1178 let p1 = Place::<i32>::new("p1");
1179 let p2 = Place::<i32>::new("p2");
1180 let p3 = Place::<i32>::new("p3");
1181
1182 let t1 = Transition::builder("t1")
1183 .input(one(&p1))
1184 .output(libpetri_core::output::and(vec![
1185 out_place(&p2),
1186 out_place(&p3),
1187 ]))
1188 .action(libpetri_core::action::fork())
1189 .build();
1190
1191 let net = PetriNet::builder("fork").transition(t1).build();
1192
1193 let mut marking = Marking::new();
1194 marking.add(&p1, Token::at(42, 0));
1195
1196 let mut executor =
1197 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1198 let result = executor.run_sync();
1199
1200 assert_eq!(result.count("p1"), 0);
1202 assert_eq!(result.count("p2"), 1);
1203 assert_eq!(result.count("p3"), 1);
1204 }
1205
1206 #[test]
1207 fn sync_no_initial_tokens() {
1208 let (net, _, _, _) = simple_chain();
1209 let marking = Marking::new();
1210 let mut executor =
1211 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1212 let result = executor.run_sync();
1213 assert_eq!(result.count("p1"), 0);
1214 assert_eq!(result.count("p2"), 0);
1215 assert_eq!(result.count("p3"), 0);
1216 }
1217
1218 #[test]
1219 fn sync_priority_ordering() {
1220 let p = Place::<()>::new("p");
1221 let out_a = Place::<()>::new("a");
1222 let out_b = Place::<()>::new("b");
1223
1224 let t_high = Transition::builder("t_high")
1227 .input(one(&p))
1228 .output(out_place(&out_a))
1229 .action(libpetri_core::action::passthrough())
1230 .priority(10)
1231 .build();
1232 let t_low = Transition::builder("t_low")
1233 .input(one(&p))
1234 .output(out_place(&out_b))
1235 .action(libpetri_core::action::passthrough())
1236 .priority(1)
1237 .build();
1238
1239 let net = PetriNet::builder("priority")
1240 .transitions([t_high, t_low])
1241 .build();
1242
1243 let mut marking = Marking::new();
1244 marking.add(&p, Token::at((), 0));
1245
1246 let mut executor =
1247 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1248 executor.run_sync();
1249
1250 assert_eq!(executor.marking().count("p"), 0);
1254 }
1255
1256 #[test]
1257 fn sync_inhibitor_blocks() {
1258 let p1 = Place::<()>::new("p1");
1259 let p2 = Place::<()>::new("p2");
1260 let p_inh = Place::<()>::new("inh");
1261
1262 let t = Transition::builder("t1")
1263 .input(one(&p1))
1264 .output(out_place(&p2))
1265 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1266 .action(libpetri_core::action::passthrough())
1267 .build();
1268
1269 let net = PetriNet::builder("inhibitor").transition(t).build();
1270
1271 let mut marking = Marking::new();
1272 marking.add(&p1, Token::at((), 0));
1273 marking.add(&p_inh, Token::at((), 0));
1274
1275 let mut executor =
1276 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1277 executor.run_sync();
1278
1279 assert_eq!(executor.marking().count("p1"), 1);
1281 }
1282
1283 #[test]
1284 fn sync_linear_chain_5() {
1285 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1287 let transitions: Vec<Transition> = (0..5)
1288 .map(|i| {
1289 Transition::builder(format!("t{i}"))
1290 .input(one(&places[i]))
1291 .output(out_place(&places[i + 1]))
1292 .action(libpetri_core::action::fork())
1293 .build()
1294 })
1295 .collect();
1296
1297 let net = PetriNet::builder("chain5").transitions(transitions).build();
1298
1299 let mut marking = Marking::new();
1300 marking.add(&places[0], Token::at(1, 0));
1301
1302 let mut executor =
1303 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1304 let result = executor.run_sync();
1305
1306 assert_eq!(result.count("p0"), 0);
1308 assert_eq!(result.count("p5"), 1);
1309 }
1310
1311 #[test]
1312 fn input_arc_requires_token_to_enable() {
1313 let p1 = Place::<i32>::new("p1");
1314 let p2 = Place::<i32>::new("p2");
1315 let t = Transition::builder("t1")
1316 .input(one(&p1))
1317 .output(out_place(&p2))
1318 .action(libpetri_core::action::fork())
1319 .build();
1320 let net = PetriNet::builder("test").transition(t).build();
1321
1322 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1324 &net,
1325 Marking::new(),
1326 ExecutorOptions::default(),
1327 );
1328 executor.run_sync();
1329 assert_eq!(executor.marking().count("p1"), 0);
1330 assert_eq!(executor.marking().count("p2"), 0);
1331 }
1332
1333 #[test]
1334 fn multiple_input_arcs_require_all_tokens() {
1335 let p1 = Place::<i32>::new("p1");
1336 let p2 = Place::<i32>::new("p2");
1337 let p3 = Place::<i32>::new("p3");
1338
1339 let t = Transition::builder("t1")
1340 .input(one(&p1))
1341 .input(one(&p2))
1342 .output(out_place(&p3))
1343 .action(libpetri_core::action::sync_action(|ctx| {
1344 ctx.output("p3", 99i32)?;
1345 Ok(())
1346 }))
1347 .build();
1348 let net = PetriNet::builder("test").transition(t).build();
1349
1350 let mut marking = Marking::new();
1352 marking.add(&p1, Token::at(1, 0));
1353 let mut executor =
1354 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1355 executor.run_sync();
1356 assert_eq!(executor.marking().count("p1"), 1);
1357 assert_eq!(executor.marking().count("p3"), 0);
1358
1359 let mut marking = Marking::new();
1361 marking.add(&p1, Token::at(1, 0));
1362 marking.add(&p2, Token::at(2, 0));
1363 let mut executor =
1364 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1365 executor.run_sync();
1366 assert_eq!(executor.marking().count("p1"), 0);
1367 assert_eq!(executor.marking().count("p2"), 0);
1368 assert_eq!(executor.marking().count("p3"), 1);
1369 }
1370
1371 #[test]
1372 fn read_arc_does_not_consume() {
1373 let p_in = Place::<i32>::new("in");
1374 let p_ctx = Place::<i32>::new("ctx");
1375 let p_out = Place::<i32>::new("out");
1376
1377 let t = Transition::builder("t1")
1378 .input(one(&p_in))
1379 .read(libpetri_core::arc::read(&p_ctx))
1380 .output(out_place(&p_out))
1381 .action(libpetri_core::action::sync_action(|ctx| {
1382 let v = ctx.input::<i32>("in")?;
1383 let r = ctx.read::<i32>("ctx")?;
1384 ctx.output("out", *v + *r)?;
1385 Ok(())
1386 }))
1387 .build();
1388 let net = PetriNet::builder("test").transition(t).build();
1389
1390 let mut marking = Marking::new();
1391 marking.add(&p_in, Token::at(10, 0));
1392 marking.add(&p_ctx, Token::at(5, 0));
1393
1394 let mut executor =
1395 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1396 executor.run_sync();
1397
1398 assert_eq!(executor.marking().count("in"), 0); assert_eq!(executor.marking().count("ctx"), 1); assert_eq!(executor.marking().count("out"), 1);
1401 }
1402
1403 #[test]
1404 fn reset_arc_removes_all_tokens() {
1405 let p_in = Place::<()>::new("in");
1406 let p_reset = Place::<i32>::new("reset");
1407 let p_out = Place::<()>::new("out");
1408
1409 let t = Transition::builder("t1")
1410 .input(one(&p_in))
1411 .reset(libpetri_core::arc::reset(&p_reset))
1412 .output(out_place(&p_out))
1413 .action(libpetri_core::action::fork())
1414 .build();
1415 let net = PetriNet::builder("test").transition(t).build();
1416
1417 let mut marking = Marking::new();
1418 marking.add(&p_in, Token::at((), 0));
1419 marking.add(&p_reset, Token::at(1, 0));
1420 marking.add(&p_reset, Token::at(2, 0));
1421 marking.add(&p_reset, Token::at(3, 0));
1422
1423 let mut executor =
1424 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1425 executor.run_sync();
1426
1427 assert_eq!(executor.marking().count("reset"), 0); assert_eq!(executor.marking().count("out"), 1);
1429 }
1430
1431 #[test]
1432 fn exactly_cardinality_consumes_n() {
1433 let p = Place::<i32>::new("p");
1434 let p_out = Place::<i32>::new("out");
1435
1436 let t = Transition::builder("t1")
1437 .input(libpetri_core::input::exactly(3, &p))
1438 .output(out_place(&p_out))
1439 .action(libpetri_core::action::sync_action(|ctx| {
1440 let vals = ctx.inputs::<i32>("p")?;
1441 for v in vals {
1442 ctx.output("out", *v)?;
1443 }
1444 Ok(())
1445 }))
1446 .build();
1447 let net = PetriNet::builder("test").transition(t).build();
1448
1449 let mut marking = Marking::new();
1450 for i in 0..5 {
1451 marking.add(&p, Token::at(i, 0));
1452 }
1453
1454 let mut executor =
1455 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1456 executor.run_sync();
1457
1458 assert_eq!(executor.marking().count("p"), 2);
1460 assert_eq!(executor.marking().count("out"), 3);
1461 }
1462
1463 #[test]
1464 fn all_cardinality_consumes_everything() {
1465 let p = Place::<i32>::new("p");
1466 let p_out = Place::<()>::new("out");
1467
1468 let t = Transition::builder("t1")
1469 .input(libpetri_core::input::all(&p))
1470 .output(out_place(&p_out))
1471 .action(libpetri_core::action::sync_action(|ctx| {
1472 let vals = ctx.inputs::<i32>("p")?;
1473 ctx.output("out", vals.len() as i32)?;
1474 Ok(())
1475 }))
1476 .build();
1477 let net = PetriNet::builder("test").transition(t).build();
1478
1479 let mut marking = Marking::new();
1480 for i in 0..5 {
1481 marking.add(&p, Token::at(i, 0));
1482 }
1483
1484 let mut executor =
1485 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1486 executor.run_sync();
1487
1488 assert_eq!(executor.marking().count("p"), 0);
1489 }
1490
1491 #[test]
1492 fn at_least_blocks_insufficient() {
1493 let p = Place::<i32>::new("p");
1494 let p_out = Place::<()>::new("out");
1495
1496 let t = Transition::builder("t1")
1497 .input(libpetri_core::input::at_least(3, &p))
1498 .output(out_place(&p_out))
1499 .action(libpetri_core::action::passthrough())
1500 .build();
1501 let net = PetriNet::builder("test").transition(t).build();
1502
1503 let mut marking = Marking::new();
1505 marking.add(&p, Token::at(1, 0));
1506 marking.add(&p, Token::at(2, 0));
1507
1508 let mut executor =
1509 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1510 executor.run_sync();
1511
1512 assert_eq!(executor.marking().count("p"), 2); }
1514
1515 #[test]
1516 fn at_least_fires_with_enough_and_consumes_all() {
1517 let p = Place::<i32>::new("p");
1518 let p_out = Place::<()>::new("out");
1519
1520 let t = Transition::builder("t1")
1521 .input(libpetri_core::input::at_least(3, &p))
1522 .output(out_place(&p_out))
1523 .action(libpetri_core::action::passthrough())
1524 .build();
1525 let net = PetriNet::builder("test").transition(t).build();
1526
1527 let mut marking = Marking::new();
1528 for i in 0..5 {
1529 marking.add(&p, Token::at(i, 0));
1530 }
1531
1532 let mut executor =
1533 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1534 executor.run_sync();
1535
1536 assert_eq!(executor.marking().count("p"), 0); }
1538
1539 #[test]
1540 fn guarded_input_only_consumes_matching() {
1541 let p = Place::<i32>::new("p");
1542 let p_out = Place::<i32>::new("out");
1543
1544 let t = Transition::builder("t1")
1545 .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1546 .output(out_place(&p_out))
1547 .action(libpetri_core::action::fork())
1548 .build();
1549 let net = PetriNet::builder("test").transition(t).build();
1550
1551 let mut marking = Marking::new();
1552 marking.add(&p, Token::at(3, 0)); marking.add(&p, Token::at(10, 0)); let mut executor =
1556 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1557 executor.run_sync();
1558
1559 assert_eq!(executor.marking().count("p"), 1); assert_eq!(executor.marking().count("out"), 1); let peeked = executor.marking().peek(&p_out).unwrap();
1562 assert_eq!(*peeked, 10);
1563 }
1564
1565 #[test]
1566 fn guarded_input_blocks_when_no_match() {
1567 let p = Place::<i32>::new("p");
1568 let p_out = Place::<i32>::new("out");
1569
1570 let t = Transition::builder("t1")
1571 .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1572 .output(out_place(&p_out))
1573 .action(libpetri_core::action::fork())
1574 .build();
1575 let net = PetriNet::builder("test").transition(t).build();
1576
1577 let mut marking = Marking::new();
1578 marking.add(&p, Token::at(3, 0));
1579 marking.add(&p, Token::at(10, 0));
1580
1581 let mut executor =
1582 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1583 executor.run_sync();
1584
1585 assert_eq!(executor.marking().count("p"), 2);
1587 assert_eq!(executor.marking().count("out"), 0);
1588 }
1589
1590 #[test]
1591 fn transform_action_outputs_to_all_places() {
1592 let p_in = Place::<i32>::new("in");
1593 let p_a = Place::<i32>::new("a");
1594 let p_b = Place::<i32>::new("b");
1595
1596 let t = Transition::builder("t1")
1597 .input(one(&p_in))
1598 .output(libpetri_core::output::and(vec![
1599 out_place(&p_a),
1600 out_place(&p_b),
1601 ]))
1602 .action(libpetri_core::action::transform(|ctx| {
1603 let v = ctx.input::<i32>("in").unwrap();
1604 Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
1605 }))
1606 .build();
1607 let net = PetriNet::builder("test").transition(t).build();
1608
1609 let mut marking = Marking::new();
1610 marking.add(&p_in, Token::at(5, 0));
1611
1612 let mut executor =
1613 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1614 executor.run_sync();
1615
1616 assert_eq!(executor.marking().count("a"), 1);
1617 assert_eq!(executor.marking().count("b"), 1);
1618 assert_eq!(*executor.marking().peek(&p_a).unwrap(), 10);
1619 assert_eq!(*executor.marking().peek(&p_b).unwrap(), 10);
1620 }
1621
1622 #[test]
1623 fn sync_action_custom_logic() {
1624 let p_in = Place::<i32>::new("in");
1625 let p_out = Place::<String>::new("out");
1626
1627 let t = Transition::builder("t1")
1628 .input(one(&p_in))
1629 .output(out_place(&p_out))
1630 .action(libpetri_core::action::sync_action(|ctx| {
1631 let v = ctx.input::<i32>("in")?;
1632 ctx.output("out", format!("value={v}"))?;
1633 Ok(())
1634 }))
1635 .build();
1636 let net = PetriNet::builder("test").transition(t).build();
1637
1638 let mut marking = Marking::new();
1639 marking.add(&p_in, Token::at(42, 0));
1640
1641 let mut executor =
1642 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1643 executor.run_sync();
1644
1645 assert_eq!(executor.marking().count("out"), 1);
1646 let peeked = executor.marking().peek(&p_out).unwrap();
1647 assert_eq!(*peeked, "value=42");
1648 }
1649
1650 #[test]
1651 fn action_error_does_not_crash() {
1652 let p_in = Place::<i32>::new("in");
1653 let p_out = Place::<i32>::new("out");
1654
1655 let t = Transition::builder("t1")
1656 .input(one(&p_in))
1657 .output(out_place(&p_out))
1658 .action(libpetri_core::action::sync_action(|_ctx| {
1659 Err(libpetri_core::action::ActionError::new(
1660 "intentional failure",
1661 ))
1662 }))
1663 .build();
1664 let net = PetriNet::builder("test").transition(t).build();
1665
1666 let mut marking = Marking::new();
1667 marking.add(&p_in, Token::at(42, 0));
1668
1669 let mut executor =
1670 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1671 executor.run_sync();
1672
1673 assert_eq!(executor.marking().count("in"), 0);
1675 assert_eq!(executor.marking().count("out"), 0);
1676
1677 let events = executor.event_store().events();
1679 assert!(
1680 events
1681 .iter()
1682 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
1683 );
1684 }
1685
1686 #[test]
1687 fn event_store_records_lifecycle() {
1688 let p1 = Place::<i32>::new("p1");
1689 let p2 = Place::<i32>::new("p2");
1690 let t = Transition::builder("t1")
1691 .input(one(&p1))
1692 .output(out_place(&p2))
1693 .action(libpetri_core::action::fork())
1694 .build();
1695 let net = PetriNet::builder("test").transition(t).build();
1696
1697 let mut marking = Marking::new();
1698 marking.add(&p1, Token::at(1, 0));
1699
1700 let mut executor =
1701 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1702 executor.run_sync();
1703
1704 let events = executor.event_store().events();
1705
1706 assert!(
1709 events
1710 .iter()
1711 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
1712 );
1713 assert!(
1714 events
1715 .iter()
1716 .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
1717 );
1718 assert!(
1719 events
1720 .iter()
1721 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
1722 );
1723 assert!(
1724 events
1725 .iter()
1726 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
1727 );
1728 assert!(
1729 events
1730 .iter()
1731 .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
1732 );
1733 assert!(
1734 events
1735 .iter()
1736 .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
1737 );
1738 assert!(
1739 events
1740 .iter()
1741 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
1742 );
1743 }
1744
1745 #[test]
1746 fn noop_event_store_has_no_events() {
1747 let p1 = Place::<i32>::new("p1");
1748 let p2 = Place::<i32>::new("p2");
1749 let t = Transition::builder("t1")
1750 .input(one(&p1))
1751 .output(out_place(&p2))
1752 .action(libpetri_core::action::fork())
1753 .build();
1754 let net = PetriNet::builder("test").transition(t).build();
1755
1756 let mut marking = Marking::new();
1757 marking.add(&p1, Token::at(1, 0));
1758
1759 let mut executor =
1760 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1761 executor.run_sync();
1762
1763 assert!(executor.event_store().is_empty());
1764 }
1765
1766 #[test]
1767 fn empty_net_completes() {
1768 let net = PetriNet::builder("empty").build();
1769 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1770 &net,
1771 Marking::new(),
1772 ExecutorOptions::default(),
1773 );
1774 executor.run_sync();
1775 assert!(executor.is_quiescent());
1776 }
1777
1778 #[test]
1779 fn single_transition_fires_once() {
1780 let p = Place::<i32>::new("p");
1781 let out = Place::<i32>::new("out");
1782 let t = Transition::builder("t1")
1783 .input(one(&p))
1784 .output(out_place(&out))
1785 .action(libpetri_core::action::fork())
1786 .build();
1787 let net = PetriNet::builder("test").transition(t).build();
1788
1789 let mut marking = Marking::new();
1790 marking.add(&p, Token::at(42, 0));
1791
1792 let mut executor =
1793 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1794 executor.run_sync();
1795
1796 assert_eq!(executor.marking().count("p"), 0);
1797 assert_eq!(executor.marking().count("out"), 1);
1798 }
1799
1800 #[test]
1801 fn many_tokens_all_processed() {
1802 let p = Place::<i32>::new("p");
1803 let out = Place::<i32>::new("out");
1804 let t = Transition::builder("t1")
1805 .input(one(&p))
1806 .output(out_place(&out))
1807 .action(libpetri_core::action::fork())
1808 .build();
1809 let net = PetriNet::builder("test").transition(t).build();
1810
1811 let mut marking = Marking::new();
1812 for i in 0..100 {
1813 marking.add(&p, Token::at(i, 0));
1814 }
1815
1816 let mut executor =
1817 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1818 executor.run_sync();
1819
1820 assert_eq!(executor.marking().count("p"), 0);
1821 assert_eq!(executor.marking().count("out"), 100);
1822 }
1823
1824 #[test]
1825 fn input_fifo_ordering() {
1826 let p = Place::<i32>::new("p");
1827 let out = Place::<i32>::new("out");
1828 let t = Transition::builder("t1")
1829 .input(one(&p))
1830 .output(out_place(&out))
1831 .action(libpetri_core::action::fork())
1832 .build();
1833 let net = PetriNet::builder("test").transition(t).build();
1834
1835 let mut marking = Marking::new();
1836 marking.add(&p, Token::at(1, 0));
1837 marking.add(&p, Token::at(2, 0));
1838 marking.add(&p, Token::at(3, 0));
1839
1840 let mut executor =
1841 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1842 executor.run_sync();
1843
1844 assert_eq!(executor.marking().count("out"), 3);
1846 }
1847
1848 #[test]
1849 fn inhibitor_unblocked_when_token_removed() {
1850 let p1 = Place::<()>::new("p1");
1853 let p_inh = Place::<()>::new("inh");
1854 let p_out = Place::<()>::new("out");
1855 let p_trigger = Place::<()>::new("trigger");
1856
1857 let t_clear = Transition::builder("t_clear")
1859 .input(one(&p_inh))
1860 .output(out_place(&p_trigger))
1861 .action(libpetri_core::action::fork())
1862 .priority(10) .build();
1864
1865 let t1 = Transition::builder("t1")
1867 .input(one(&p1))
1868 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1869 .output(out_place(&p_out))
1870 .action(libpetri_core::action::fork())
1871 .priority(1)
1872 .build();
1873
1874 let net = PetriNet::builder("test").transitions([t_clear, t1]).build();
1875
1876 let mut marking = Marking::new();
1877 marking.add(&p1, Token::at((), 0));
1878 marking.add(&p_inh, Token::at((), 0));
1879
1880 let mut executor =
1881 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1882 executor.run_sync();
1883
1884 assert_eq!(executor.marking().count("inh"), 0);
1887 assert_eq!(executor.marking().count("p1"), 0);
1888 assert_eq!(executor.marking().count("out"), 1);
1889 }
1890
1891 #[test]
1892 fn combined_input_read_reset() {
1893 let p_in = Place::<i32>::new("in");
1894 let p_ctx = Place::<String>::new("ctx");
1895 let p_clear = Place::<i32>::new("clear");
1896 let p_out = Place::<String>::new("out");
1897
1898 let t = Transition::builder("t1")
1899 .input(one(&p_in))
1900 .read(libpetri_core::arc::read(&p_ctx))
1901 .reset(libpetri_core::arc::reset(&p_clear))
1902 .output(out_place(&p_out))
1903 .action(libpetri_core::action::sync_action(|ctx| {
1904 let v = ctx.input::<i32>("in")?;
1905 let r = ctx.read::<String>("ctx")?;
1906 ctx.output("out", format!("{v}-{r}"))?;
1907 Ok(())
1908 }))
1909 .build();
1910 let net = PetriNet::builder("test").transition(t).build();
1911
1912 let mut marking = Marking::new();
1913 marking.add(&p_in, Token::at(42, 0));
1914 marking.add(&p_ctx, Token::at("hello".to_string(), 0));
1915 marking.add(&p_clear, Token::at(1, 0));
1916 marking.add(&p_clear, Token::at(2, 0));
1917
1918 let mut executor =
1919 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1920 executor.run_sync();
1921
1922 assert_eq!(executor.marking().count("in"), 0); assert_eq!(executor.marking().count("ctx"), 1); assert_eq!(executor.marking().count("clear"), 0); assert_eq!(executor.marking().count("out"), 1);
1926 let peeked = executor.marking().peek(&p_out).unwrap();
1927 assert_eq!(*peeked, "42-hello");
1928 }
1929
1930 #[test]
1931 fn workflow_sequential_chain() {
1932 let p1 = Place::<i32>::new("p1");
1935 let p2 = Place::<i32>::new("p2");
1936 let p3 = Place::<i32>::new("p3");
1937 let p4 = Place::<i32>::new("p4");
1938
1939 let make_doubler = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
1940 let out_name: Arc<str> = Arc::from(outp.name());
1941 Transition::builder(name)
1942 .input(one(inp))
1943 .output(out_place(outp))
1944 .action(libpetri_core::action::sync_action(move |ctx| {
1945 let v = ctx
1946 .input::<i32>("p1")
1947 .or_else(|_| ctx.input::<i32>("p2"))
1948 .or_else(|_| ctx.input::<i32>("p3"))
1949 .unwrap();
1950 ctx.output(&out_name, *v * 2)?;
1951 Ok(())
1952 }))
1953 .build()
1954 };
1955
1956 let t1 = make_doubler("t1", &p1, &p2);
1957 let t2 = make_doubler("t2", &p2, &p3);
1958 let t3 = make_doubler("t3", &p3, &p4);
1959
1960 let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
1961
1962 let mut marking = Marking::new();
1963 marking.add(&p1, Token::at(1, 0));
1964
1965 let mut executor =
1966 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1967 executor.run_sync();
1968
1969 assert_eq!(executor.marking().count("p4"), 1);
1970 assert_eq!(*executor.marking().peek(&p4).unwrap(), 8); }
1972
1973 #[test]
1974 fn workflow_fork_join() {
1975 let p_start = Place::<i32>::new("start");
1977 let p_a = Place::<i32>::new("a");
1978 let p_b = Place::<i32>::new("b");
1979 let p_end = Place::<i32>::new("end");
1980
1981 let t_fork = Transition::builder("fork")
1982 .input(one(&p_start))
1983 .output(libpetri_core::output::and(vec![
1984 out_place(&p_a),
1985 out_place(&p_b),
1986 ]))
1987 .action(libpetri_core::action::fork())
1988 .build();
1989
1990 let t_join = Transition::builder("join")
1991 .input(one(&p_a))
1992 .input(one(&p_b))
1993 .output(out_place(&p_end))
1994 .action(libpetri_core::action::sync_action(|ctx| {
1995 let a = ctx.input::<i32>("a")?;
1996 let b = ctx.input::<i32>("b")?;
1997 ctx.output("end", *a + *b)?;
1998 Ok(())
1999 }))
2000 .build();
2001
2002 let net = PetriNet::builder("fork-join")
2003 .transitions([t_fork, t_join])
2004 .build();
2005
2006 let mut marking = Marking::new();
2007 marking.add(&p_start, Token::at(5, 0));
2008
2009 let mut executor =
2010 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2011 executor.run_sync();
2012
2013 assert_eq!(executor.marking().count("start"), 0);
2014 assert_eq!(executor.marking().count("a"), 0);
2015 assert_eq!(executor.marking().count("b"), 0);
2016 assert_eq!(executor.marking().count("end"), 1);
2017 assert_eq!(*executor.marking().peek(&p_end).unwrap(), 10); }
2019
2020 #[test]
2021 fn workflow_mutual_exclusion() {
2022 let p_mutex = Place::<()>::new("mutex");
2024 let p_w1 = Place::<()>::new("w1");
2025 let p_w2 = Place::<()>::new("w2");
2026 let p_done1 = Place::<()>::new("done1");
2027 let p_done2 = Place::<()>::new("done2");
2028
2029 let t_w1 = Transition::builder("work1")
2030 .input(one(&p_w1))
2031 .input(one(&p_mutex))
2032 .output(libpetri_core::output::and(vec![
2033 out_place(&p_done1),
2034 out_place(&p_mutex), ]))
2036 .action(libpetri_core::action::sync_action(|ctx| {
2037 ctx.output("done1", ())?;
2038 ctx.output("mutex", ())?;
2039 Ok(())
2040 }))
2041 .build();
2042
2043 let t_w2 = Transition::builder("work2")
2044 .input(one(&p_w2))
2045 .input(one(&p_mutex))
2046 .output(libpetri_core::output::and(vec![
2047 out_place(&p_done2),
2048 out_place(&p_mutex), ]))
2050 .action(libpetri_core::action::sync_action(|ctx| {
2051 ctx.output("done2", ())?;
2052 ctx.output("mutex", ())?;
2053 Ok(())
2054 }))
2055 .build();
2056
2057 let net = PetriNet::builder("mutex").transitions([t_w1, t_w2]).build();
2058
2059 let mut marking = Marking::new();
2060 marking.add(&p_mutex, Token::at((), 0));
2061 marking.add(&p_w1, Token::at((), 0));
2062 marking.add(&p_w2, Token::at((), 0));
2063
2064 let mut executor =
2065 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2066 executor.run_sync();
2067
2068 assert_eq!(executor.marking().count("done1"), 1);
2070 assert_eq!(executor.marking().count("done2"), 1);
2071 assert_eq!(executor.marking().count("mutex"), 1); }
2073
2074 #[test]
2075 fn produce_action() {
2076 let p_in = Place::<()>::new("in");
2077 let p_out = Place::<String>::new("out");
2078
2079 let t = Transition::builder("t1")
2080 .input(one(&p_in))
2081 .output(out_place(&p_out))
2082 .action(libpetri_core::action::produce(
2083 Arc::from("out"),
2084 "produced_value".to_string(),
2085 ))
2086 .build();
2087 let net = PetriNet::builder("test").transition(t).build();
2088
2089 let mut marking = Marking::new();
2090 marking.add(&p_in, Token::at((), 0));
2091
2092 let mut executor =
2093 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2094 executor.run_sync();
2095
2096 assert_eq!(executor.marking().count("out"), 1);
2097 }
2098
2099 #[test]
2100 fn xor_output_fires_one_branch() {
2101 let p = Place::<i32>::new("p");
2102 let a = Place::<i32>::new("a");
2103 let b = Place::<i32>::new("b");
2104
2105 let t = Transition::builder("t1")
2106 .input(one(&p))
2107 .output(libpetri_core::output::xor(vec![
2108 out_place(&a),
2109 out_place(&b),
2110 ]))
2111 .action(libpetri_core::action::sync_action(|ctx| {
2112 let v = ctx.input::<i32>("p")?;
2113 ctx.output("a", *v)?;
2115 Ok(())
2116 }))
2117 .build();
2118 let net = PetriNet::builder("test").transition(t).build();
2119
2120 let mut marking = Marking::new();
2121 marking.add(&p, Token::at(1, 0));
2122
2123 let mut executor =
2124 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2125 executor.run_sync();
2126
2127 let total = executor.marking().count("a") + executor.marking().count("b");
2129 assert!(total >= 1);
2130 }
2131
2132 #[test]
2133 fn and_output_fires_to_all() {
2134 let p = Place::<i32>::new("p");
2135 let a = Place::<i32>::new("a");
2136 let b = Place::<i32>::new("b");
2137 let c = Place::<i32>::new("c");
2138
2139 let t = Transition::builder("t1")
2140 .input(one(&p))
2141 .output(libpetri_core::output::and(vec![
2142 out_place(&a),
2143 out_place(&b),
2144 out_place(&c),
2145 ]))
2146 .action(libpetri_core::action::sync_action(|ctx| {
2147 let v = ctx.input::<i32>("p")?;
2148 ctx.output("a", *v)?;
2149 ctx.output("b", *v * 10)?;
2150 ctx.output("c", *v * 100)?;
2151 Ok(())
2152 }))
2153 .build();
2154 let net = PetriNet::builder("test").transition(t).build();
2155
2156 let mut marking = Marking::new();
2157 marking.add(&p, Token::at(1, 0));
2158
2159 let mut executor =
2160 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2161 executor.run_sync();
2162
2163 assert_eq!(executor.marking().count("a"), 1);
2164 assert_eq!(executor.marking().count("b"), 1);
2165 assert_eq!(executor.marking().count("c"), 1);
2166 assert_eq!(*executor.marking().peek(&a).unwrap(), 1);
2167 assert_eq!(*executor.marking().peek(&b).unwrap(), 10);
2168 assert_eq!(*executor.marking().peek(&c).unwrap(), 100);
2169 }
2170
2171 #[test]
2172 fn transition_with_no_output_consumes_only() {
2173 let p = Place::<i32>::new("p");
2174
2175 let t = Transition::builder("t1")
2176 .input(one(&p))
2177 .action(libpetri_core::action::sync_action(|ctx| {
2178 let _ = ctx.input::<i32>("p")?;
2179 Ok(())
2180 }))
2181 .build();
2182 let net = PetriNet::builder("test").transition(t).build();
2183
2184 let mut marking = Marking::new();
2185 marking.add(&p, Token::at(42, 0));
2186
2187 let mut executor =
2188 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2189 executor.run_sync();
2190
2191 assert_eq!(executor.marking().count("p"), 0);
2192 }
2193
2194 #[test]
2195 fn multiple_transitions_same_input_compete() {
2196 let p1 = Place::<()>::new("p1");
2198 let out_a = Place::<()>::new("a");
2199 let out_b = Place::<()>::new("b");
2200
2201 let t_a = Transition::builder("ta")
2202 .input(one(&p1))
2203 .output(out_place(&out_a))
2204 .action(libpetri_core::action::fork())
2205 .build();
2206 let t_b = Transition::builder("tb")
2207 .input(one(&p1))
2208 .output(out_place(&out_b))
2209 .action(libpetri_core::action::fork())
2210 .build();
2211
2212 let net = PetriNet::builder("test").transitions([t_a, t_b]).build();
2213
2214 let mut marking = Marking::new();
2215 marking.add(&p1, Token::at((), 0));
2216
2217 let mut executor =
2218 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2219 executor.run_sync();
2220
2221 let total = executor.marking().count("a") + executor.marking().count("b");
2223 assert_eq!(total, 1);
2224 assert_eq!(executor.marking().count("p1"), 0);
2225 }
2226
2227 #[test]
2228 fn priority_higher_fires_first() {
2229 let p = Place::<()>::new("p");
2230 let out_hi = Place::<()>::new("hi");
2231 let out_lo = Place::<()>::new("lo");
2232
2233 let t_hi = Transition::builder("hi")
2234 .input(one(&p))
2235 .output(out_place(&out_hi))
2236 .action(libpetri_core::action::fork())
2237 .priority(10)
2238 .build();
2239 let t_lo = Transition::builder("lo")
2240 .input(one(&p))
2241 .output(out_place(&out_lo))
2242 .action(libpetri_core::action::fork())
2243 .priority(1)
2244 .build();
2245
2246 let net = PetriNet::builder("test").transitions([t_hi, t_lo]).build();
2247
2248 let mut marking = Marking::new();
2249 marking.add(&p, Token::at((), 0));
2250
2251 let mut executor =
2252 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2253 executor.run_sync();
2254
2255 assert_eq!(executor.marking().count("hi"), 1);
2257 assert_eq!(executor.marking().count("lo"), 0);
2258 }
2259
2260 #[test]
2261 fn quiescent_when_no_enabled_transitions() {
2262 let p = Place::<i32>::new("p");
2263 let out = Place::<i32>::new("out");
2264
2265 let t = Transition::builder("t1")
2266 .input(one(&p))
2267 .output(out_place(&out))
2268 .action(libpetri_core::action::fork())
2269 .build();
2270 let net = PetriNet::builder("test").transition(t).build();
2271
2272 let mut marking = Marking::new();
2273 marking.add(&p, Token::at(1, 0));
2274
2275 let mut executor =
2276 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2277 executor.run_sync();
2278
2279 assert!(executor.is_quiescent());
2280 }
2281
2282 #[test]
2283 fn event_store_transition_enabled_disabled() {
2284 let p1 = Place::<()>::new("p1");
2285 let p2 = Place::<()>::new("p2");
2286
2287 let t = Transition::builder("t1")
2288 .input(one(&p1))
2289 .output(out_place(&p2))
2290 .action(libpetri_core::action::fork())
2291 .build();
2292 let net = PetriNet::builder("test").transition(t).build();
2293
2294 let mut marking = Marking::new();
2295 marking.add(&p1, Token::at((), 0));
2296
2297 let mut executor =
2298 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2299 executor.run_sync();
2300
2301 let events = executor.event_store().events();
2302 assert!(events.len() >= 4);
2305
2306 let has_enabled = events.iter().any(|e| {
2308 matches!(e, NetEvent::TransitionEnabled { transition_name, .. } if transition_name.as_ref() == "t1")
2309 });
2310 assert!(has_enabled);
2311 }
2312
2313 #[test]
2314 fn diamond_pattern() {
2315 let p1 = Place::<()>::new("p1");
2317 let p2 = Place::<()>::new("p2");
2318 let p3 = Place::<()>::new("p3");
2319 let p4 = Place::<()>::new("p4");
2320
2321 let t1 = Transition::builder("t1")
2322 .input(one(&p1))
2323 .output(libpetri_core::output::and(vec![
2324 out_place(&p2),
2325 out_place(&p3),
2326 ]))
2327 .action(libpetri_core::action::fork())
2328 .build();
2329 let t2 = Transition::builder("t2")
2330 .input(one(&p2))
2331 .output(out_place(&p4))
2332 .action(libpetri_core::action::fork())
2333 .build();
2334 let t3 = Transition::builder("t3")
2335 .input(one(&p3))
2336 .output(out_place(&p4))
2337 .action(libpetri_core::action::fork())
2338 .build();
2339
2340 let net = PetriNet::builder("diamond")
2341 .transitions([t1, t2, t3])
2342 .build();
2343
2344 let mut marking = Marking::new();
2345 marking.add(&p1, Token::at((), 0));
2346
2347 let mut executor =
2348 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2349 executor.run_sync();
2350
2351 assert_eq!(executor.marking().count("p4"), 2); }
2353
2354 #[test]
2355 fn self_loop_with_guard_terminates() {
2356 let p = Place::<i32>::new("p");
2359 let done = Place::<i32>::new("done");
2360
2361 let t = Transition::builder("dec")
2362 .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v > 0))
2363 .output(out_place(&p))
2364 .action(libpetri_core::action::sync_action(|ctx| {
2365 let v = ctx.input::<i32>("p")?;
2366 ctx.output("p", *v - 1)?;
2367 Ok(())
2368 }))
2369 .build();
2370
2371 let t_done = Transition::builder("finish")
2373 .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v == 0))
2374 .output(out_place(&done))
2375 .action(libpetri_core::action::fork())
2376 .build();
2377
2378 let net = PetriNet::builder("countdown")
2379 .transitions([t, t_done])
2380 .build();
2381
2382 let mut marking = Marking::new();
2383 marking.add(&p, Token::at(3, 0));
2384
2385 let mut executor =
2386 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2387 executor.run_sync();
2388
2389 assert_eq!(executor.marking().count("done"), 1);
2390 assert_eq!(*executor.marking().peek(&done).unwrap(), 0);
2391 }
2392
2393 #[test]
2394 fn multiple_tokens_different_types() {
2395 let p_int = Place::<i32>::new("ints");
2396 let p_str = Place::<String>::new("strs");
2397 let p_out = Place::<String>::new("out");
2398
2399 let t = Transition::builder("combine")
2400 .input(one(&p_int))
2401 .input(one(&p_str))
2402 .output(out_place(&p_out))
2403 .action(libpetri_core::action::sync_action(|ctx| {
2404 let i = ctx.input::<i32>("ints")?;
2405 let s = ctx.input::<String>("strs")?;
2406 ctx.output("out", format!("{s}-{i}"))?;
2407 Ok(())
2408 }))
2409 .build();
2410
2411 let net = PetriNet::builder("test").transition(t).build();
2412
2413 let mut marking = Marking::new();
2414 marking.add(&p_int, Token::at(42, 0));
2415 marking.add(&p_str, Token::at("hello".to_string(), 0));
2416
2417 let mut executor =
2418 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2419 executor.run_sync();
2420
2421 assert_eq!(*executor.marking().peek(&p_out).unwrap(), "hello-42");
2422 }
2423}
2424
2425#[cfg(all(test, feature = "tokio"))]
2426mod async_tests {
2427 use super::*;
2428 use crate::environment::{ExecutorSignal, ExternalEvent};
2429 use libpetri_core::action::{ActionError, async_action, fork};
2430 use libpetri_core::input::one;
2431 use libpetri_core::output::out_place;
2432 use libpetri_core::place::Place;
2433 use libpetri_core::token::{ErasedToken, Token};
2434 use libpetri_core::transition::Transition;
2435 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
2436
2437 #[tokio::test]
2438 async fn async_fork_single_transition() {
2439 let p1 = Place::<i32>::new("p1");
2440 let p2 = Place::<i32>::new("p2");
2441
2442 let t1 = Transition::builder("t1")
2443 .input(one(&p1))
2444 .output(out_place(&p2))
2445 .action(fork())
2446 .build();
2447
2448 let net = PetriNet::builder("test").transition(t1).build();
2449 let mut marking = Marking::new();
2450 marking.add(&p1, Token::at(42, 0));
2451
2452 let mut executor =
2453 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2454
2455 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2456 executor.run_async(rx).await;
2457
2458 assert_eq!(executor.marking().count("p2"), 1);
2459 assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
2460 }
2461
2462 #[tokio::test]
2463 async fn async_action_produces_output() {
2464 let p1 = Place::<i32>::new("p1");
2465 let p2 = Place::<i32>::new("p2");
2466
2467 let action = async_action(|mut ctx| async move {
2468 let val: i32 = *ctx.input::<i32>("p1")?;
2469 ctx.output("p2", val * 10)?;
2470 Ok(ctx)
2471 });
2472
2473 let t1 = Transition::builder("t1")
2474 .input(one(&p1))
2475 .output(out_place(&p2))
2476 .action(action)
2477 .build();
2478
2479 let net = PetriNet::builder("test").transition(t1).build();
2480 let mut marking = Marking::new();
2481 marking.add(&p1, Token::at(5, 0));
2482
2483 let mut executor =
2484 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2485
2486 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2487 executor.run_async(rx).await;
2488
2489 assert_eq!(executor.marking().count("p2"), 1);
2490 assert_eq!(*executor.marking().peek(&p2).unwrap(), 50);
2491 }
2492
2493 #[tokio::test]
2494 async fn async_chain_two_transitions() {
2495 let p1 = Place::<i32>::new("p1");
2496 let p2 = Place::<i32>::new("p2");
2497 let p3 = Place::<i32>::new("p3");
2498
2499 let t1 = Transition::builder("t1")
2500 .input(one(&p1))
2501 .output(out_place(&p2))
2502 .action(fork())
2503 .build();
2504
2505 let t2 = Transition::builder("t2")
2506 .input(one(&p2))
2507 .output(out_place(&p3))
2508 .action(fork())
2509 .build();
2510
2511 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
2512 let mut marking = Marking::new();
2513 marking.add(&p1, Token::at(99, 0));
2514
2515 let mut executor =
2516 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2517
2518 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2519 executor.run_async(rx).await;
2520
2521 assert_eq!(executor.marking().count("p3"), 1);
2522 assert_eq!(*executor.marking().peek(&p3).unwrap(), 99);
2523 }
2524
2525 #[tokio::test]
2526 async fn async_event_injection() {
2527 let p1 = Place::<i32>::new("p1");
2528 let p2 = Place::<i32>::new("p2");
2529
2530 let t1 = Transition::builder("t1")
2531 .input(one(&p1))
2532 .output(out_place(&p2))
2533 .action(fork())
2534 .build();
2535
2536 let net = PetriNet::builder("test").transition(t1).build();
2537 let marking = Marking::new(); let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2540 &net,
2541 marking,
2542 ExecutorOptions {
2543 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2544 },
2545 );
2546
2547 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2548
2549 tokio::spawn(async move {
2551 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2552 let token = Token::at(77, 0);
2553 tx.send(ExecutorSignal::Event(ExternalEvent {
2554 place_name: Arc::from("p1"),
2555 token: ErasedToken::from_typed(&token),
2556 }))
2557 .unwrap();
2558 });
2560
2561 executor.run_async(rx).await;
2562
2563 assert_eq!(executor.marking().count("p2"), 1);
2564 assert_eq!(*executor.marking().peek(&p2).unwrap(), 77);
2565 }
2566
2567 #[tokio::test]
2568 async fn async_with_event_store_records_events() {
2569 let p1 = Place::<i32>::new("p1");
2570 let p2 = Place::<i32>::new("p2");
2571
2572 let t1 = Transition::builder("t1")
2573 .input(one(&p1))
2574 .output(out_place(&p2))
2575 .action(fork())
2576 .build();
2577
2578 let net = PetriNet::builder("test").transition(t1).build();
2579 let mut marking = Marking::new();
2580 marking.add(&p1, Token::at(1, 0));
2581
2582 let mut executor =
2583 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2584
2585 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2586 executor.run_async(rx).await;
2587
2588 let events = executor.event_store().events();
2589 assert!(!events.is_empty());
2590 assert!(
2592 events
2593 .iter()
2594 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2595 );
2596 assert!(
2597 events
2598 .iter()
2599 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2600 );
2601 assert!(
2602 events
2603 .iter()
2604 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2605 );
2606 assert!(
2607 events
2608 .iter()
2609 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2610 );
2611 }
2612
2613 #[tokio::test]
2614 async fn async_action_error_does_not_crash() {
2615 let p1 = Place::<i32>::new("p1");
2616 let p2 = Place::<i32>::new("p2");
2617
2618 let action =
2619 async_action(|_ctx| async move { Err(ActionError::new("intentional failure")) });
2620
2621 let t1 = Transition::builder("t1")
2622 .input(one(&p1))
2623 .output(out_place(&p2))
2624 .action(action)
2625 .build();
2626
2627 let net = PetriNet::builder("test").transition(t1).build();
2628 let mut marking = Marking::new();
2629 marking.add(&p1, Token::at(1, 0));
2630
2631 let mut executor =
2632 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2633
2634 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2635 executor.run_async(rx).await;
2636
2637 assert_eq!(executor.marking().count("p1"), 0);
2639 assert_eq!(executor.marking().count("p2"), 0);
2640
2641 let events = executor.event_store().events();
2643 assert!(
2644 events
2645 .iter()
2646 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2647 );
2648 }
2649
2650 #[tokio::test]
2651 async fn async_delayed_timing() {
2652 use libpetri_core::timing::delayed;
2653
2654 let p1 = Place::<i32>::new("p1");
2655 let p2 = Place::<i32>::new("p2");
2656
2657 let t1 = Transition::builder("t1")
2658 .input(one(&p1))
2659 .output(out_place(&p2))
2660 .timing(delayed(100))
2661 .action(fork())
2662 .build();
2663
2664 let net = PetriNet::builder("test").transition(t1).build();
2665 let mut marking = Marking::new();
2666 marking.add(&p1, Token::at(10, 0));
2667
2668 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2669 &net,
2670 marking,
2671 ExecutorOptions {
2672 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2673 },
2674 );
2675
2676 let start = std::time::Instant::now();
2677 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2678 tokio::spawn(async move {
2680 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2681 drop(tx);
2682 });
2683 executor.run_async(rx).await;
2684
2685 assert!(
2686 start.elapsed().as_millis() >= 80,
2687 "delayed(100) should wait ~100ms"
2688 );
2689 assert_eq!(executor.marking().count("p2"), 1);
2690 assert_eq!(*executor.marking().peek(&p2).unwrap(), 10);
2691 }
2692
2693 #[tokio::test]
2694 async fn async_exact_timing() {
2695 use libpetri_core::timing::exact;
2696
2697 let p1 = Place::<i32>::new("p1");
2698 let p2 = Place::<i32>::new("p2");
2699
2700 let t1 = Transition::builder("t1")
2701 .input(one(&p1))
2702 .output(out_place(&p2))
2703 .timing(exact(100))
2704 .action(fork())
2705 .build();
2706
2707 let net = PetriNet::builder("test").transition(t1).build();
2708 let mut marking = Marking::new();
2709 marking.add(&p1, Token::at(20, 0));
2710
2711 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2712 &net,
2713 marking,
2714 ExecutorOptions {
2715 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2716 },
2717 );
2718
2719 let start = std::time::Instant::now();
2720 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2721 tokio::spawn(async move {
2722 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2723 drop(tx);
2724 });
2725 executor.run_async(rx).await;
2726
2727 assert!(
2728 start.elapsed().as_millis() >= 80,
2729 "exact(100) should wait ~100ms"
2730 );
2731 assert_eq!(executor.marking().count("p2"), 1);
2732 assert_eq!(*executor.marking().peek(&p2).unwrap(), 20);
2733 }
2734
2735 #[tokio::test]
2736 async fn async_window_timing() {
2737 use libpetri_core::timing::window;
2738
2739 let p1 = Place::<i32>::new("p1");
2740 let p2 = Place::<i32>::new("p2");
2741
2742 let t1 = Transition::builder("t1")
2743 .input(one(&p1))
2744 .output(out_place(&p2))
2745 .timing(window(50, 200))
2746 .action(fork())
2747 .build();
2748
2749 let net = PetriNet::builder("test").transition(t1).build();
2750 let mut marking = Marking::new();
2751 marking.add(&p1, Token::at(30, 0));
2752
2753 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2754 &net,
2755 marking,
2756 ExecutorOptions {
2757 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2758 },
2759 );
2760
2761 let start = std::time::Instant::now();
2762 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2763 tokio::spawn(async move {
2764 tokio::time::sleep(std::time::Duration::from_millis(400)).await;
2765 drop(tx);
2766 });
2767 executor.run_async(rx).await;
2768
2769 assert!(
2770 start.elapsed().as_millis() >= 40,
2771 "window(50,200) should wait >= ~50ms"
2772 );
2773 assert_eq!(executor.marking().count("p2"), 1);
2774 assert_eq!(*executor.marking().peek(&p2).unwrap(), 30);
2775 }
2776
2777 #[tokio::test]
2778 async fn async_deadline_enforcement() {
2779 use libpetri_core::action::sync_action;
2780 use libpetri_core::timing::window;
2781
2782 let p_slow = Place::<i32>::new("p_slow");
2783 let p_windowed = Place::<i32>::new("p_windowed");
2784 let slow_out = Place::<i32>::new("slow_out");
2785 let windowed_out = Place::<i32>::new("windowed_out");
2786
2787 let t_slow = Transition::builder("slow")
2791 .input(one(&p_slow))
2792 .output(out_place(&slow_out))
2793 .priority(10)
2794 .action(sync_action(|ctx| {
2795 let v = ctx.input::<i32>("p_slow")?;
2796 let start = std::time::Instant::now();
2797 while start.elapsed().as_millis() < 200 {
2798 std::hint::spin_loop();
2799 }
2800 ctx.output("slow_out", *v)?;
2801 Ok(())
2802 }))
2803 .build();
2804
2805 let t_windowed = Transition::builder("windowed")
2809 .input(one(&p_windowed))
2810 .output(out_place(&windowed_out))
2811 .timing(window(50, 100))
2812 .action(fork())
2813 .build();
2814
2815 let net = PetriNet::builder("test")
2816 .transitions([t_slow, t_windowed])
2817 .build();
2818
2819 let mut marking = Marking::new();
2820 marking.add(&p_slow, Token::at(1, 0));
2821 marking.add(&p_windowed, Token::at(2, 0));
2822
2823 let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
2824 &net,
2825 marking,
2826 ExecutorOptions {
2827 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2828 },
2829 );
2830
2831 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2832 tokio::spawn(async move {
2833 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2834 drop(tx);
2835 });
2836 executor.run_async(rx).await;
2837
2838 assert_eq!(executor.marking().count("slow_out"), 1);
2840 assert_eq!(
2842 executor.marking().count("windowed_out"),
2843 0,
2844 "windowed transition should have been disabled by deadline"
2845 );
2846
2847 let events = executor.event_store().events();
2848 assert!(
2849 events.iter().any(|e| matches!(e, NetEvent::TransitionTimedOut { transition_name, .. } if &**transition_name == "windowed")),
2850 "expected TransitionTimedOut event for 'windowed'"
2851 );
2852 }
2853
2854 #[tokio::test]
2855 async fn async_multiple_injections() {
2856 let p1 = Place::<i32>::new("p1");
2857 let p2 = Place::<i32>::new("p2");
2858
2859 let t1 = Transition::builder("t1")
2860 .input(one(&p1))
2861 .output(out_place(&p2))
2862 .action(fork())
2863 .build();
2864
2865 let net = PetriNet::builder("test").transition(t1).build();
2866 let marking = Marking::new();
2867
2868 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2869 &net,
2870 marking,
2871 ExecutorOptions {
2872 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2873 },
2874 );
2875
2876 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2877
2878 tokio::spawn(async move {
2879 for i in 0..5 {
2880 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2881 let token = Token::at(i, 0);
2882 tx.send(ExecutorSignal::Event(ExternalEvent {
2883 place_name: Arc::from("p1"),
2884 token: ErasedToken::from_typed(&token),
2885 }))
2886 .unwrap();
2887 }
2888 });
2890
2891 executor.run_async(rx).await;
2892
2893 assert_eq!(
2894 executor.marking().count("p2"),
2895 5,
2896 "all 5 injected tokens should arrive"
2897 );
2898 }
2899
2900 #[tokio::test]
2901 async fn async_parallel_execution() {
2902 let p1 = Place::<i32>::new("p1");
2903 let p2 = Place::<i32>::new("p2");
2904 let p3 = Place::<i32>::new("p3");
2905 let out1 = Place::<i32>::new("out1");
2906 let out2 = Place::<i32>::new("out2");
2907 let out3 = Place::<i32>::new("out3");
2908
2909 let make_transition = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
2910 Transition::builder(name)
2911 .input(one(inp))
2912 .output(out_place(outp))
2913 .action(async_action(|mut ctx| async move {
2914 let v: i32 =
2915 *ctx.input::<i32>(ctx.transition_name().replace("t", "p").as_str())?;
2916 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2917 ctx.output(&ctx.transition_name().replace("t", "out"), v)?;
2918 Ok(ctx)
2919 }))
2920 .build()
2921 };
2922
2923 let t1 = make_transition("t1", &p1, &out1);
2924 let t2 = make_transition("t2", &p2, &out2);
2925 let t3 = make_transition("t3", &p3, &out3);
2926
2927 let net = PetriNet::builder("parallel")
2928 .transitions([t1, t2, t3])
2929 .build();
2930 let mut marking = Marking::new();
2931 marking.add(&p1, Token::at(1, 0));
2932 marking.add(&p2, Token::at(2, 0));
2933 marking.add(&p3, Token::at(3, 0));
2934
2935 let mut executor =
2936 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2937
2938 let start = std::time::Instant::now();
2939 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2940 executor.run_async(rx).await;
2941 let elapsed = start.elapsed().as_millis();
2942
2943 assert_eq!(executor.marking().count("out1"), 1);
2944 assert_eq!(executor.marking().count("out2"), 1);
2945 assert_eq!(executor.marking().count("out3"), 1);
2946 assert!(
2947 elapsed < 250,
2948 "parallel execution should take < 250ms, took {elapsed}ms"
2949 );
2950 }
2951
2952 #[tokio::test]
2953 async fn async_sequential_chain_order() {
2954 use std::sync::Mutex;
2955
2956 let p1 = Place::<i32>::new("p1");
2957 let p2 = Place::<i32>::new("p2");
2958 let p3 = Place::<i32>::new("p3");
2959 let p4 = Place::<i32>::new("p4");
2960
2961 let order: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
2962
2963 let make_chain = |name: &str,
2964 inp: &Place<i32>,
2965 outp: &Place<i32>,
2966 id: i32,
2967 order: Arc<Mutex<Vec<i32>>>| {
2968 let inp_name: Arc<str> = Arc::from(inp.name());
2969 let outp_name: Arc<str> = Arc::from(outp.name());
2970 Transition::builder(name)
2971 .input(one(inp))
2972 .output(out_place(outp))
2973 .action(async_action(move |mut ctx| {
2974 let order = Arc::clone(&order);
2975 let inp_name = Arc::clone(&inp_name);
2976 let outp_name = Arc::clone(&outp_name);
2977 async move {
2978 let v: i32 = *ctx.input::<i32>(&inp_name)?;
2979 order.lock().unwrap().push(id);
2980 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2981 ctx.output(&outp_name, v)?;
2982 Ok(ctx)
2983 }
2984 }))
2985 .build()
2986 };
2987
2988 let t1 = make_chain("t1", &p1, &p2, 1, Arc::clone(&order));
2989 let t2 = make_chain("t2", &p2, &p3, 2, Arc::clone(&order));
2990 let t3 = make_chain("t3", &p3, &p4, 3, Arc::clone(&order));
2991
2992 let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
2993 let mut marking = Marking::new();
2994 marking.add(&p1, Token::at(1, 0));
2995
2996 let mut executor =
2997 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2998
2999 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3000 executor.run_async(rx).await;
3001
3002 assert_eq!(executor.marking().count("p4"), 1);
3003 let recorded = order.lock().unwrap().clone();
3004 assert_eq!(recorded, vec![1, 2, 3], "chain should execute in order");
3005 }
3006
3007 #[tokio::test]
3008 async fn async_fork_join() {
3009 use libpetri_core::output::and;
3010
3011 let p1 = Place::<i32>::new("p1");
3012 let p2 = Place::<i32>::new("p2");
3013 let p3 = Place::<i32>::new("p3");
3014 let p4 = Place::<i32>::new("p4");
3015
3016 let t_fork = Transition::builder("fork")
3018 .input(one(&p1))
3019 .output(and(vec![out_place(&p2), out_place(&p3)]))
3020 .action(libpetri_core::action::sync_action(|ctx| {
3021 let v = ctx.input::<i32>("p1")?;
3022 ctx.output("p2", *v)?;
3023 ctx.output("p3", *v)?;
3024 Ok(())
3025 }))
3026 .build();
3027
3028 let t_join = Transition::builder("join")
3030 .input(one(&p2))
3031 .input(one(&p3))
3032 .output(out_place(&p4))
3033 .action(libpetri_core::action::sync_action(|ctx| {
3034 let a = ctx.input::<i32>("p2")?;
3035 let b = ctx.input::<i32>("p3")?;
3036 ctx.output("p4", *a + *b)?;
3037 Ok(())
3038 }))
3039 .build();
3040
3041 let net = PetriNet::builder("fork_join")
3042 .transitions([t_fork, t_join])
3043 .build();
3044
3045 let mut marking = Marking::new();
3046 marking.add(&p1, Token::at(5, 0));
3047
3048 let mut executor =
3049 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3050
3051 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3052 executor.run_async(rx).await;
3053
3054 assert_eq!(executor.marking().count("p2"), 0);
3055 assert_eq!(executor.marking().count("p3"), 0);
3056 assert_eq!(executor.marking().count("p4"), 1);
3057 assert_eq!(*executor.marking().peek(&p4).unwrap(), 10);
3058 }
3059
3060 #[tokio::test]
3061 async fn async_xor_output_branching() {
3062 use libpetri_core::output::xor;
3063
3064 let p = Place::<i32>::new("p");
3065 let left = Place::<i32>::new("left");
3066 let right = Place::<i32>::new("right");
3067
3068 let t = Transition::builder("xor_t")
3069 .input(one(&p))
3070 .output(xor(vec![out_place(&left), out_place(&right)]))
3071 .action(libpetri_core::action::sync_action(|ctx| {
3072 let v = ctx.input::<i32>("p")?;
3073 if *v > 0 {
3074 ctx.output("left", *v)?;
3075 } else {
3076 ctx.output("right", *v)?;
3077 }
3078 Ok(())
3079 }))
3080 .build();
3081
3082 let net = PetriNet::builder("xor_test").transition(t).build();
3083
3084 let mut marking = Marking::new();
3086 marking.add(&p, Token::at(42, 0));
3087
3088 let mut executor =
3089 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3090 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3091 executor.run_async(rx).await;
3092
3093 assert_eq!(executor.marking().count("left"), 1);
3094 assert_eq!(executor.marking().count("right"), 0);
3095 assert_eq!(*executor.marking().peek(&left).unwrap(), 42);
3096 }
3097
3098 #[tokio::test]
3099 async fn async_loop_with_guard() {
3100 use libpetri_core::input::one_guarded;
3101
3102 let counter = Place::<i32>::new("counter");
3103 let done = Place::<i32>::new("done");
3104
3105 let t_loop = Transition::builder("loop")
3107 .input(one_guarded(&counter, |v: &i32| *v < 3))
3108 .output(out_place(&counter))
3109 .action(libpetri_core::action::sync_action(|ctx| {
3110 let v = ctx.input::<i32>("counter")?;
3111 ctx.output("counter", *v + 1)?;
3112 Ok(())
3113 }))
3114 .build();
3115
3116 let t_exit = Transition::builder("exit")
3118 .input(one_guarded(&counter, |v: &i32| *v >= 3))
3119 .output(out_place(&done))
3120 .action(fork())
3121 .build();
3122
3123 let net = PetriNet::builder("loop_net")
3124 .transitions([t_loop, t_exit])
3125 .build();
3126
3127 let mut marking = Marking::new();
3128 marking.add(&counter, Token::at(0, 0));
3129
3130 let mut executor =
3131 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3132
3133 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3134 executor.run_async(rx).await;
3135
3136 assert_eq!(executor.marking().count("done"), 1);
3137 assert_eq!(*executor.marking().peek(&done).unwrap(), 3);
3138 }
3139
3140 #[tokio::test]
3141 async fn async_delayed_fires_without_injection() {
3142 use libpetri_core::timing::delayed;
3143
3144 let p1 = Place::<i32>::new("p1");
3145 let p2 = Place::<i32>::new("p2");
3146
3147 let t1 = Transition::builder("t1")
3148 .input(one(&p1))
3149 .output(out_place(&p2))
3150 .timing(delayed(100))
3151 .action(fork())
3152 .build();
3153
3154 let net = PetriNet::builder("test").transition(t1).build();
3155 let mut marking = Marking::new();
3156 marking.add(&p1, Token::at(7, 0));
3157
3158 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3159 &net,
3160 marking,
3161 ExecutorOptions {
3162 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3163 },
3164 );
3165
3166 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3168 tokio::spawn(async move {
3169 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3170 drop(tx);
3171 });
3172 executor.run_async(rx).await;
3173
3174 assert_eq!(executor.marking().count("p2"), 1);
3175 assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3176 }
3177
3178 #[tokio::test]
3179 #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3180 async fn async_timeout_produces_timeout_token() {
3181 use libpetri_core::output::{timeout_place, xor};
3182
3183 let p1 = Place::<i32>::new("p1");
3184 let success = Place::<i32>::new("success");
3185 let timeout_out = Place::<i32>::new("timeout_out");
3186
3187 let t1 = Transition::builder("t1")
3188 .input(one(&p1))
3189 .output(xor(vec![
3190 out_place(&success),
3191 timeout_place(50, &timeout_out),
3192 ]))
3193 .action(async_action(|mut ctx| async move {
3194 let v: i32 = *ctx.input::<i32>("p1")?;
3195 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3196 ctx.output("success", v)?;
3197 Ok(ctx)
3198 }))
3199 .build();
3200
3201 let net = PetriNet::builder("test").transition(t1).build();
3202 let mut marking = Marking::new();
3203 marking.add(&p1, Token::at(1, 0));
3204
3205 let mut executor =
3206 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3207
3208 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3209 executor.run_async(rx).await;
3210
3211 assert_eq!(executor.marking().count("timeout_out"), 1);
3212 assert_eq!(executor.marking().count("success"), 0);
3213 }
3214
3215 #[tokio::test]
3216 #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3217 async fn async_timeout_normal_when_fast() {
3218 use libpetri_core::output::{timeout_place, xor};
3219
3220 let p1 = Place::<i32>::new("p1");
3221 let success = Place::<i32>::new("success");
3222 let timeout_out = Place::<i32>::new("timeout_out");
3223
3224 let t1 = Transition::builder("t1")
3225 .input(one(&p1))
3226 .output(xor(vec![
3227 out_place(&success),
3228 timeout_place(500, &timeout_out),
3229 ]))
3230 .action(async_action(|mut ctx| async move {
3231 let v: i32 = *ctx.input::<i32>("p1")?;
3232 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3233 ctx.output("success", v)?;
3234 Ok(ctx)
3235 }))
3236 .build();
3237
3238 let net = PetriNet::builder("test").transition(t1).build();
3239 let mut marking = Marking::new();
3240 marking.add(&p1, Token::at(1, 0));
3241
3242 let mut executor =
3243 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3244
3245 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3246 executor.run_async(rx).await;
3247
3248 assert_eq!(executor.marking().count("success"), 1);
3249 assert_eq!(executor.marking().count("timeout_out"), 0);
3250 }
3251
3252 #[tokio::test]
3253 async fn async_event_store_records_token_added_for_injection() {
3254 let p1 = Place::<i32>::new("p1");
3255 let p2 = Place::<i32>::new("p2");
3256
3257 let t1 = Transition::builder("t1")
3258 .input(one(&p1))
3259 .output(out_place(&p2))
3260 .action(fork())
3261 .build();
3262
3263 let net = PetriNet::builder("test").transition(t1).build();
3264 let marking = Marking::new();
3265
3266 let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
3267 &net,
3268 marking,
3269 ExecutorOptions {
3270 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3271 },
3272 );
3273
3274 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3275
3276 tokio::spawn(async move {
3277 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3278 let token = Token::at(99, 0);
3279 tx.send(ExecutorSignal::Event(ExternalEvent {
3280 place_name: Arc::from("p1"),
3281 token: ErasedToken::from_typed(&token),
3282 }))
3283 .unwrap();
3284 });
3285
3286 executor.run_async(rx).await;
3287
3288 let events = executor.event_store().events();
3289 assert!(
3291 events.iter().any(
3292 |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p1")
3293 ),
3294 "expected TokenAdded event for injected token into p1"
3295 );
3296 assert!(
3298 events.iter().any(
3299 |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p2")
3300 ),
3301 "expected TokenAdded event for output token into p2"
3302 );
3303 }
3304
3305 #[tokio::test]
3306 async fn async_inhibitor_blocks_in_async() {
3307 let p1 = Place::<()>::new("p1");
3308 let p2 = Place::<()>::new("p2");
3309 let p_inh = Place::<()>::new("inh");
3310
3311 let t = Transition::builder("t1")
3312 .input(one(&p1))
3313 .output(out_place(&p2))
3314 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
3315 .action(fork())
3316 .build();
3317
3318 let net = PetriNet::builder("inhibitor").transition(t).build();
3319
3320 let mut marking = Marking::new();
3321 marking.add(&p1, Token::at((), 0));
3322 marking.add(&p_inh, Token::at((), 0));
3323
3324 let mut executor =
3325 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3326
3327 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3328 executor.run_async(rx).await;
3329
3330 assert_eq!(executor.marking().count("p1"), 1);
3332 assert_eq!(executor.marking().count("p2"), 0);
3333 }
3334
3335 #[tokio::test]
3338 async fn async_drain_terminates_at_quiescence() {
3339 let p1 = Place::<i32>::new("p1");
3340 let p2 = Place::<i32>::new("p2");
3341
3342 let t1 = Transition::builder("t1")
3343 .input(one(&p1))
3344 .output(out_place(&p2))
3345 .action(fork())
3346 .build();
3347
3348 let net = PetriNet::builder("test").transition(t1).build();
3349 let marking = Marking::new();
3350
3351 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3352 &net,
3353 marking,
3354 ExecutorOptions {
3355 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3356 },
3357 );
3358
3359 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3360
3361 tokio::spawn(async move {
3362 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3363 tx.send(ExecutorSignal::Event(ExternalEvent {
3365 place_name: Arc::from("p1"),
3366 token: ErasedToken::from_typed(&Token::at(42, 0)),
3367 }))
3368 .unwrap();
3369 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3370 tx.send(ExecutorSignal::Drain).unwrap();
3371 });
3372
3373 executor.run_async(rx).await;
3374
3375 assert_eq!(executor.marking().count("p2"), 1);
3377 assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
3378 }
3379
3380 #[tokio::test]
3381 async fn async_drain_rejects_post_drain_events() {
3382 let p1 = Place::<i32>::new("p1");
3383 let p2 = Place::<i32>::new("p2");
3384
3385 let t1 = Transition::builder("t1")
3386 .input(one(&p1))
3387 .output(out_place(&p2))
3388 .action(fork())
3389 .build();
3390
3391 let net = PetriNet::builder("test").transition(t1).build();
3392 let marking = Marking::new();
3393
3394 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3395 &net,
3396 marking,
3397 ExecutorOptions {
3398 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3399 },
3400 );
3401
3402 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3403
3404 tokio::spawn(async move {
3405 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3406 tx.send(ExecutorSignal::Drain).unwrap();
3408 tx.send(ExecutorSignal::Event(ExternalEvent {
3409 place_name: Arc::from("p1"),
3410 token: ErasedToken::from_typed(&Token::at(99, 0)),
3411 }))
3412 .unwrap();
3413 });
3414
3415 executor.run_async(rx).await;
3416
3417 assert_eq!(executor.marking().count("p2"), 0);
3419 }
3420
3421 #[tokio::test]
3422 async fn async_close_discards_queued_events() {
3423 let p1 = Place::<i32>::new("p1");
3424 let p2 = Place::<i32>::new("p2");
3425
3426 let t1 = Transition::builder("t1")
3427 .input(one(&p1))
3428 .output(out_place(&p2))
3429 .action(fork())
3430 .build();
3431
3432 let net = PetriNet::builder("test").transition(t1).build();
3433 let marking = Marking::new();
3434
3435 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3436 &net,
3437 marking,
3438 ExecutorOptions {
3439 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3440 },
3441 );
3442
3443 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3444
3445 tx.send(ExecutorSignal::Event(ExternalEvent {
3447 place_name: Arc::from("p1"),
3448 token: ErasedToken::from_typed(&Token::at(1, 0)),
3449 }))
3450 .unwrap();
3451 tx.send(ExecutorSignal::Close).unwrap();
3452 tx.send(ExecutorSignal::Event(ExternalEvent {
3453 place_name: Arc::from("p1"),
3454 token: ErasedToken::from_typed(&Token::at(2, 0)),
3455 }))
3456 .unwrap();
3457 drop(tx);
3458
3459 executor.run_async(rx).await;
3460
3461 assert!(executor.marking().count("p2") <= 1);
3465 }
3466
3467 #[tokio::test]
3468 async fn async_close_after_drain_escalates() {
3469 let p1 = Place::<i32>::new("p1");
3470 let p2 = Place::<i32>::new("p2");
3471
3472 let t1 = Transition::builder("t1")
3473 .input(one(&p1))
3474 .output(out_place(&p2))
3475 .action(fork())
3476 .build();
3477
3478 let net = PetriNet::builder("test").transition(t1).build();
3479 let marking = Marking::new();
3480
3481 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3482 &net,
3483 marking,
3484 ExecutorOptions {
3485 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3486 },
3487 );
3488
3489 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3490
3491 tokio::spawn(async move {
3492 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3493 tx.send(ExecutorSignal::Drain).unwrap();
3495 tx.send(ExecutorSignal::Close).unwrap();
3496 });
3497
3498 executor.run_async(rx).await;
3500 }
3502
3503 #[tokio::test]
3504 async fn async_handle_raii_drain_on_drop() {
3505 use crate::executor_handle::ExecutorHandle;
3506
3507 let p1 = Place::<i32>::new("p1");
3508 let p2 = Place::<i32>::new("p2");
3509
3510 let t1 = Transition::builder("t1")
3511 .input(one(&p1))
3512 .output(out_place(&p2))
3513 .action(fork())
3514 .build();
3515
3516 let net = PetriNet::builder("test").transition(t1).build();
3517 let marking = Marking::new();
3518
3519 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3520 &net,
3521 marking,
3522 ExecutorOptions {
3523 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3524 },
3525 );
3526
3527 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3528
3529 tokio::spawn(async move {
3530 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3531 let mut handle = ExecutorHandle::new(tx);
3532 handle.inject(Arc::from("p1"), ErasedToken::from_typed(&Token::at(7, 0)));
3533 });
3535
3536 executor.run_async(rx).await;
3537
3538 assert_eq!(executor.marking().count("p2"), 1);
3539 assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3540 }
3541}