1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::output::Out;
8use libpetri_core::petri_net::PetriNet;
9use libpetri_core::token::ErasedToken;
10
11use libpetri_event::event_store::EventStore;
12use libpetri_event::net_event::NetEvent;
13
14use crate::bitmap;
15use crate::compiled_net::CompiledNet;
16use crate::marking::Marking;
17
18const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21pub struct BitmapNetExecutor<E: EventStore> {
26 compiled: CompiledNet,
27 marking: Marking,
28 event_store: E,
29 #[allow(dead_code)]
30 environment_places: HashSet<Arc<str>>,
31 has_environment_places: bool,
32
33 marked_places: Vec<u64>,
35 dirty_set: Vec<u64>,
36 marking_snap_buffer: Vec<u64>,
37 dirty_snap_buffer: Vec<u64>,
38 firing_snap_buffer: Vec<u64>,
39
40 enabled_at_ms: Vec<f64>,
42 enabled_flags: Vec<bool>,
43 has_deadline_flags: Vec<bool>,
44 enabled_transition_count: usize,
45
46 all_immediate: bool,
48 all_same_priority: bool,
49 has_any_deadlines: bool,
50
51 pending_reset_places: HashSet<Arc<str>>,
53 transition_input_place_names: Vec<HashSet<Arc<str>>>,
54
55 start_time: Instant,
56}
57
58#[derive(Default)]
60pub struct ExecutorOptions {
61 pub environment_places: HashSet<Arc<str>>,
62}
63
64impl<E: EventStore> BitmapNetExecutor<E> {
65 pub fn new(net: &PetriNet, initial_tokens: Marking, options: ExecutorOptions) -> Self {
67 let compiled = CompiledNet::compile(net);
68 let word_count = compiled.word_count;
69 let tc = compiled.transition_count;
70 let dirty_word_count = bitmap::word_count(tc);
71
72 let mut has_any_deadlines = false;
73 let mut all_immediate = true;
74 let mut all_same_priority = true;
75 let first_priority = if tc > 0 {
76 compiled.transition(0).priority()
77 } else {
78 0
79 };
80 let mut has_deadline_flags = vec![false; tc];
81
82 for (tid, flag) in has_deadline_flags.iter_mut().enumerate() {
83 let t = compiled.transition(tid);
84 if t.timing().has_deadline() {
85 *flag = true;
86 has_any_deadlines = true;
87 }
88 if *t.timing() != libpetri_core::timing::Timing::Immediate {
89 all_immediate = false;
90 }
91 if t.priority() != first_priority {
92 all_same_priority = false;
93 }
94 }
95
96 let mut transition_input_place_names = Vec::with_capacity(tc);
98 for tid in 0..tc {
99 let t = compiled.transition(tid);
100 let names: HashSet<Arc<str>> = t
101 .input_specs()
102 .iter()
103 .map(|s| Arc::clone(s.place().name_arc()))
104 .collect();
105 transition_input_place_names.push(names);
106 }
107
108 Self {
109 compiled,
110 marking: initial_tokens,
111 event_store: E::default(),
112 has_environment_places: !options.environment_places.is_empty(),
113 environment_places: options.environment_places,
114 marked_places: vec![0u64; word_count],
115 dirty_set: vec![0u64; dirty_word_count],
116 marking_snap_buffer: vec![0u64; word_count],
117 dirty_snap_buffer: vec![0u64; dirty_word_count],
118 firing_snap_buffer: vec![0u64; word_count],
119 enabled_at_ms: vec![f64::NEG_INFINITY; tc],
120 enabled_flags: vec![false; tc],
121 has_deadline_flags,
122 enabled_transition_count: 0,
123 all_immediate,
124 all_same_priority,
125 has_any_deadlines,
126 pending_reset_places: HashSet::new(),
127 transition_input_place_names,
128 start_time: Instant::now(),
129 }
130 }
131
132 pub fn run_sync(&mut self) -> &Marking {
137 self.initialize_marked_bitmap();
138 self.mark_all_dirty();
139
140 if E::ENABLED {
141 let now = now_millis();
142 self.event_store.append(NetEvent::ExecutionStarted {
143 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
144 timestamp: now,
145 });
146 }
147
148 loop {
149 self.update_dirty_transitions();
150
151 let cycle_now = self.elapsed_ms();
152
153 if self.has_any_deadlines {
154 self.enforce_deadlines(cycle_now);
155 }
156
157 if self.should_terminate() {
158 break;
159 }
160
161 if self.all_immediate && self.all_same_priority {
162 self.fire_ready_immediate_sync();
163 } else {
164 self.fire_ready_general_sync(cycle_now);
165 }
166
167 if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
169 break;
170 }
171 }
172
173 if E::ENABLED {
174 let now = now_millis();
175 self.event_store.append(NetEvent::ExecutionCompleted {
176 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
177 timestamp: now,
178 });
179 }
180
181 &self.marking
182 }
183
184 pub fn marking(&self) -> &Marking {
186 &self.marking
187 }
188
189 pub fn event_store(&self) -> &E {
191 &self.event_store
192 }
193
194 pub fn is_quiescent(&self) -> bool {
196 self.enabled_transition_count == 0
197 }
198
199 fn initialize_marked_bitmap(&mut self) {
202 for pid in 0..self.compiled.place_count {
203 let place = self.compiled.place(pid);
204 if self.marking.has_tokens(place.name()) {
205 bitmap::set_bit(&mut self.marked_places, pid);
206 }
207 }
208 }
209
210 fn mark_all_dirty(&mut self) {
211 let tc = self.compiled.transition_count;
212 let dirty_words = self.dirty_set.len();
213 for w in 0..dirty_words.saturating_sub(1) {
214 self.dirty_set[w] = u64::MAX;
215 }
216 if dirty_words > 0 {
217 let last_word_bits = tc & bitmap::WORD_MASK;
218 self.dirty_set[dirty_words - 1] = if last_word_bits == 0 {
219 u64::MAX
220 } else {
221 (1u64 << last_word_bits) - 1
222 };
223 }
224 }
225
226 fn should_terminate(&self) -> bool {
227 if self.has_environment_places {
228 return false;
229 }
230 self.enabled_transition_count == 0
231 }
232
233 fn update_dirty_transitions(&mut self) {
236 let now_ms = self.elapsed_ms();
237
238 self.marking_snap_buffer
240 .copy_from_slice(&self.marked_places);
241
242 let dirty_words = self.dirty_set.len();
244 for w in 0..dirty_words {
245 self.dirty_snap_buffer[w] = self.dirty_set[w];
246 self.dirty_set[w] = 0;
247 }
248
249 let tc = self.compiled.transition_count;
251 let mut dirty_tids = Vec::new();
252 bitmap::for_each_set_bit(&self.dirty_snap_buffer, |tid| {
253 if tid < tc {
254 dirty_tids.push(tid);
255 }
256 });
257
258 let marking_snap = self.marking_snap_buffer.clone();
259 for tid in dirty_tids {
260 let was_enabled = self.enabled_flags[tid];
261 let can_now = self.can_enable(tid, &marking_snap);
262
263 if can_now && !was_enabled {
264 self.enabled_flags[tid] = true;
265 self.enabled_transition_count += 1;
266 self.enabled_at_ms[tid] = now_ms;
267
268 if E::ENABLED {
269 self.event_store.append(NetEvent::TransitionEnabled {
270 transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
271 timestamp: now_millis(),
272 });
273 }
274 } else if !can_now && was_enabled {
275 self.enabled_flags[tid] = false;
276 self.enabled_transition_count -= 1;
277 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
278 } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
279 self.enabled_at_ms[tid] = now_ms;
280 if E::ENABLED {
281 self.event_store.append(NetEvent::TransitionClockRestarted {
282 transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
283 timestamp: now_millis(),
284 });
285 }
286 }
287 }
288
289 self.pending_reset_places.clear();
290 }
291
292 fn enforce_deadlines(&mut self, now_ms: f64) {
293 for tid in 0..self.compiled.transition_count {
294 if !self.has_deadline_flags[tid] || !self.enabled_flags[tid] {
295 continue;
296 }
297 let t = self.compiled.transition(tid);
298 let elapsed = now_ms - self.enabled_at_ms[tid];
299 let latest_ms = t.timing().latest() as f64;
300 if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
301 self.enabled_flags[tid] = false;
302 self.enabled_transition_count -= 1;
303 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
304
305 if E::ENABLED {
306 self.event_store.append(NetEvent::TransitionTimedOut {
307 transition_name: Arc::clone(t.name_arc()),
308 timestamp: now_millis(),
309 });
310 }
311 }
312 }
313 }
314
315 fn can_enable(&self, tid: usize, marking_snap: &[u64]) -> bool {
316 if !self.compiled.can_enable_bitmap(tid, marking_snap) {
317 return false;
318 }
319
320 if let Some(card_check) = self.compiled.cardinality_check(tid) {
322 for i in 0..card_check.place_ids.len() {
323 let pid = card_check.place_ids[i];
324 let required = card_check.required_counts[i];
325 let place = self.compiled.place(pid);
326 if self.marking.count(place.name()) < required {
327 return false;
328 }
329 }
330 }
331
332 if self.compiled.has_guards(tid) {
334 let t = self.compiled.transition(tid);
335 for spec in t.input_specs() {
336 if let Some(guard) = spec.guard() {
337 let required = match spec {
338 In::One { .. } => 1,
339 In::Exactly { count, .. } => *count,
340 In::AtLeast { minimum, .. } => *minimum,
341 In::All { .. } => 1,
342 };
343 if self.marking.count_matching(spec.place_name(), &**guard) < required {
344 return false;
345 }
346 }
347 }
348 }
349
350 true
351 }
352
353 fn has_input_from_reset_place(&self, tid: usize) -> bool {
354 if self.pending_reset_places.is_empty() {
355 return false;
356 }
357 let input_names = &self.transition_input_place_names[tid];
358 for name in &self.pending_reset_places {
359 if input_names.contains(name) {
360 return true;
361 }
362 }
363 false
364 }
365
366 fn fire_ready_immediate_sync(&mut self) {
369 for tid in 0..self.compiled.transition_count {
370 if !self.enabled_flags[tid] {
371 continue;
372 }
373 if self.can_enable(tid, &self.marked_places.clone()) {
374 self.fire_transition_sync(tid);
375 } else {
376 self.enabled_flags[tid] = false;
377 self.enabled_transition_count -= 1;
378 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
379 }
380 }
381 }
382
383 fn fire_ready_general_sync(&mut self, now_ms: f64) {
384 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
386 for tid in 0..self.compiled.transition_count {
387 if !self.enabled_flags[tid] {
388 continue;
389 }
390 let t = self.compiled.transition(tid);
391 let enabled_ms = self.enabled_at_ms[tid];
392 let elapsed = now_ms - enabled_ms;
393 let earliest_ms = t.timing().earliest() as f64;
394 if earliest_ms <= elapsed {
395 ready.push((tid, t.priority(), enabled_ms));
396 }
397 }
398 if ready.is_empty() {
399 return;
400 }
401
402 ready.sort_by(|a, b| {
404 b.1.cmp(&a.1)
405 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
406 });
407
408 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
410
411 for (tid, _, _) in ready {
412 if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
413 self.fire_transition_sync(tid);
414 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
415 } else {
416 self.enabled_flags[tid] = false;
417 self.enabled_transition_count -= 1;
418 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
419 }
420 }
421 }
422
423 fn fire_transition_sync(&mut self, tid: usize) {
424 let t = self.compiled.transition(tid);
426 let transition_name = Arc::clone(t.name_arc());
427 let input_specs: Vec<In> = t.input_specs().to_vec();
428 let read_arcs: Vec<_> = t.reads().to_vec();
429 let reset_arcs: Vec<_> = t.resets().to_vec();
430 let output_place_names: HashSet<Arc<str>> = t
431 .output_places()
432 .iter()
433 .map(|p| Arc::clone(p.name_arc()))
434 .collect();
435 let action = Arc::clone(t.action());
436 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
440 for in_spec in &input_specs {
441 let place_name = in_spec.place_name();
442 let to_consume = match in_spec {
443 In::One { .. } => 1,
444 In::Exactly { count, .. } => *count,
445 In::All { guard, .. } | In::AtLeast { guard, .. } => {
446 if guard.is_some() {
447 self.marking
448 .count_matching(place_name, &**guard.as_ref().unwrap())
449 } else {
450 self.marking.count(place_name)
451 }
452 }
453 };
454
455 let place_name_arc = Arc::clone(in_spec.place().name_arc());
456 for _ in 0..to_consume {
457 let token = if let Some(guard) = in_spec.guard() {
458 self.marking.remove_matching(place_name, &**guard)
459 } else {
460 self.marking.remove_first(place_name)
461 };
462 if let Some(token) = token {
463 if E::ENABLED {
464 self.event_store.append(NetEvent::TokenRemoved {
465 place_name: Arc::clone(&place_name_arc),
466 timestamp: now_millis(),
467 });
468 }
469 inputs
470 .entry(Arc::clone(&place_name_arc))
471 .or_default()
472 .push(token);
473 }
474 }
475 }
476
477 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
479 for arc in &read_arcs {
480 if let Some(queue) = self.marking.queue(arc.place.name())
481 && let Some(token) = queue.front()
482 {
483 read_tokens
484 .entry(Arc::clone(arc.place.name_arc()))
485 .or_default()
486 .push(token.clone());
487 }
488 }
489
490 for arc in &reset_arcs {
492 let removed = self.marking.remove_all(arc.place.name());
493 self.pending_reset_places
494 .insert(Arc::clone(arc.place.name_arc()));
495 if E::ENABLED {
496 for _ in &removed {
497 self.event_store.append(NetEvent::TokenRemoved {
498 place_name: Arc::clone(arc.place.name_arc()),
499 timestamp: now_millis(),
500 });
501 }
502 }
503 }
504
505 self.update_bitmap_after_consumption(tid);
507
508 if E::ENABLED {
509 self.event_store.append(NetEvent::TransitionStarted {
510 transition_name: Arc::clone(&transition_name),
511 timestamp: now_millis(),
512 });
513 }
514
515 let mut ctx = TransitionContext::new(
517 Arc::clone(&transition_name),
518 inputs,
519 read_tokens,
520 output_place_names,
521 None,
522 );
523
524 let result = action.run_sync(&mut ctx);
525
526 match result {
527 Ok(()) => {
528 let outputs = ctx.take_outputs();
530 self.process_outputs(tid, &transition_name, outputs);
531
532 if E::ENABLED {
533 self.event_store.append(NetEvent::TransitionCompleted {
534 transition_name: Arc::clone(&transition_name),
535 timestamp: now_millis(),
536 });
537 }
538 }
539 Err(err) => {
540 if E::ENABLED {
541 self.event_store.append(NetEvent::TransitionFailed {
542 transition_name: Arc::clone(&transition_name),
543 error: err.message,
544 timestamp: now_millis(),
545 });
546 }
547 }
548 }
549
550 self.enabled_flags[tid] = false;
552 self.enabled_transition_count -= 1;
553 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
554
555 self.mark_transition_dirty(tid);
557 }
558
559 fn process_outputs(
560 &mut self,
561 _tid: usize,
562 _transition_name: &Arc<str>,
563 outputs: Vec<OutputEntry>,
564 ) {
565 for entry in outputs {
566 self.marking.add_erased(&entry.place_name, entry.token);
567
568 if let Some(pid) = self.compiled.place_id(&entry.place_name) {
569 bitmap::set_bit(&mut self.marked_places, pid);
570 self.mark_dirty(pid);
571 }
572
573 if E::ENABLED {
574 self.event_store.append(NetEvent::TokenAdded {
575 place_name: Arc::clone(&entry.place_name),
576 timestamp: now_millis(),
577 });
578 }
579 }
580 }
581
582 fn update_bitmap_after_consumption(&mut self, tid: usize) {
583 let consumption_pids: Vec<usize> = self.compiled.consumption_place_ids(tid).to_vec();
584 for pid in consumption_pids {
585 let place = self.compiled.place(pid);
586 if !self.marking.has_tokens(place.name()) {
587 bitmap::clear_bit(&mut self.marked_places, pid);
588 }
589 self.mark_dirty(pid);
590 }
591 }
592
593 fn has_dirty_bits(&self) -> bool {
596 !bitmap::is_empty(&self.dirty_set)
597 }
598
599 fn mark_dirty(&mut self, pid: usize) {
600 let tids: Vec<usize> = self.compiled.affected_transitions(pid).to_vec();
601 for tid in tids {
602 self.mark_transition_dirty(tid);
603 }
604 }
605
606 fn mark_transition_dirty(&mut self, tid: usize) {
607 bitmap::set_bit(&mut self.dirty_set, tid);
608 }
609
610 fn elapsed_ms(&self) -> f64 {
611 self.start_time.elapsed().as_secs_f64() * 1000.0
612 }
613}
614
615#[cfg(feature = "tokio")]
616use crate::environment::ExecutorSignal;
617
618#[cfg(feature = "tokio")]
620struct ActionCompletion {
621 transition_name: Arc<str>,
622 result: Result<Vec<OutputEntry>, String>,
623}
624
625#[cfg(feature = "tokio")]
626impl<E: EventStore> BitmapNetExecutor<E> {
627 pub async fn run_async(
641 &mut self,
642 mut signal_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutorSignal>,
643 ) -> &Marking {
644 let (completion_tx, mut completion_rx) =
645 tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
646
647 self.initialize_marked_bitmap();
648 self.mark_all_dirty();
649
650 let mut in_flight_count: usize = 0;
651 let mut signal_channel_open = true;
652 let mut draining = false;
653 let mut closed = false;
654
655 if E::ENABLED {
656 let now = now_millis();
657 self.event_store.append(NetEvent::ExecutionStarted {
658 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
659 timestamp: now,
660 });
661 }
662
663 loop {
664 while let Ok(completion) = completion_rx.try_recv() {
666 in_flight_count -= 1;
667 match completion.result {
668 Ok(outputs) => {
669 self.process_outputs(0, &completion.transition_name, outputs);
670 if E::ENABLED {
671 self.event_store.append(NetEvent::TransitionCompleted {
672 transition_name: Arc::clone(&completion.transition_name),
673 timestamp: now_millis(),
674 });
675 }
676 }
677 Err(err) => {
678 if E::ENABLED {
679 self.event_store.append(NetEvent::TransitionFailed {
680 transition_name: Arc::clone(&completion.transition_name),
681 error: err,
682 timestamp: now_millis(),
683 });
684 }
685 }
686 }
687 }
688
689 while let Ok(signal) = signal_rx.try_recv() {
691 match signal {
692 ExecutorSignal::Event(event) if !draining => {
693 self.marking.add_erased(&event.place_name, event.token);
694 if let Some(pid) = self.compiled.place_id(&event.place_name) {
695 bitmap::set_bit(&mut self.marked_places, pid);
696 self.mark_dirty(pid);
697 }
698 if E::ENABLED {
699 self.event_store.append(NetEvent::TokenAdded {
700 place_name: Arc::clone(&event.place_name),
701 timestamp: now_millis(),
702 });
703 }
704 }
705 ExecutorSignal::Event(_) => {
706 }
708 ExecutorSignal::Drain => {
709 draining = true;
710 }
711 ExecutorSignal::Close => {
712 closed = true;
713 draining = true;
714 while signal_rx.try_recv().is_ok() {}
719 }
720 }
721 }
722
723 self.update_dirty_transitions();
725
726 let cycle_now = self.elapsed_ms();
728 if self.has_any_deadlines {
729 self.enforce_deadlines(cycle_now);
730 }
731
732 if closed && in_flight_count == 0 {
734 break; }
736 if draining
737 && self.enabled_transition_count == 0
738 && in_flight_count == 0
739 {
740 break; }
742 if self.enabled_transition_count == 0
743 && in_flight_count == 0
744 && (!self.has_environment_places || !signal_channel_open)
745 {
746 break; }
748
749 let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
751
752 if fired || self.has_dirty_bits() {
754 tokio::task::yield_now().await;
756 continue;
757 }
758
759 if in_flight_count == 0 && !self.has_environment_places {
761 break;
762 }
763 if in_flight_count == 0 && (draining || !signal_channel_open) {
764 break;
765 }
766
767 let timer_ms = self.millis_until_next_timed_transition();
768
769 tokio::select! {
770 Some(completion) = completion_rx.recv() => {
771 in_flight_count -= 1;
772 match completion.result {
773 Ok(outputs) => {
774 self.process_outputs(0, &completion.transition_name, outputs);
775 if E::ENABLED {
776 self.event_store.append(NetEvent::TransitionCompleted {
777 transition_name: Arc::clone(&completion.transition_name),
778 timestamp: now_millis(),
779 });
780 }
781 }
782 Err(err) => {
783 if E::ENABLED {
784 self.event_store.append(NetEvent::TransitionFailed {
785 transition_name: Arc::clone(&completion.transition_name),
786 error: err,
787 timestamp: now_millis(),
788 });
789 }
790 }
791 }
792 }
793 result = signal_rx.recv(), if signal_channel_open && !closed => {
794 match result {
795 Some(ExecutorSignal::Event(event)) if !draining => {
796 self.marking.add_erased(&event.place_name, event.token);
797 if let Some(pid) = self.compiled.place_id(&event.place_name) {
798 bitmap::set_bit(&mut self.marked_places, pid);
799 self.mark_dirty(pid);
800 }
801 if E::ENABLED {
802 self.event_store.append(NetEvent::TokenAdded {
803 place_name: Arc::clone(&event.place_name),
804 timestamp: now_millis(),
805 });
806 }
807 }
808 Some(ExecutorSignal::Event(_)) => {
809 }
811 Some(ExecutorSignal::Drain) => {
812 draining = true;
813 }
814 Some(ExecutorSignal::Close) => {
815 closed = true;
816 draining = true;
817 while signal_rx.try_recv().is_ok() {}
818 }
819 None => {
820 signal_channel_open = false;
821 }
822 }
823 }
824 _ = tokio::time::sleep(std::time::Duration::from_millis(
825 if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
826 )) => {
827 }
829 }
830 }
831
832 if E::ENABLED {
833 let now = now_millis();
834 self.event_store.append(NetEvent::ExecutionCompleted {
835 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
836 timestamp: now,
837 });
838 }
839
840 &self.marking
841 }
842
843 fn fire_ready_async(
846 &mut self,
847 now_ms: f64,
848 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
849 in_flight_count: &mut usize,
850 ) -> bool {
851 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
852 for tid in 0..self.compiled.transition_count {
853 if !self.enabled_flags[tid] {
854 continue;
855 }
856 let t = self.compiled.transition(tid);
857 let enabled_ms = self.enabled_at_ms[tid];
858 let elapsed = now_ms - enabled_ms;
859 let earliest_ms = t.timing().earliest() as f64;
860 if earliest_ms <= elapsed {
861 ready.push((tid, t.priority(), enabled_ms));
862 }
863 }
864 if ready.is_empty() {
865 return false;
866 }
867
868 ready.sort_by(|a, b| {
869 b.1.cmp(&a.1)
870 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
871 });
872
873 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
874
875 let mut fired_any = false;
876 for (tid, _, _) in ready {
877 if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
878 self.fire_transition_async(tid, completion_tx, in_flight_count);
879 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
880 fired_any = true;
881 } else {
882 self.enabled_flags[tid] = false;
883 self.enabled_transition_count -= 1;
884 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
885 }
886 }
887 fired_any
888 }
889
890 fn fire_transition_async(
892 &mut self,
893 tid: usize,
894 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
895 in_flight_count: &mut usize,
896 ) {
897 let t = self.compiled.transition(tid);
898 let transition_name = Arc::clone(t.name_arc());
899 let input_specs: Vec<In> = t.input_specs().to_vec();
900 let read_arcs: Vec<_> = t.reads().to_vec();
901 let reset_arcs: Vec<_> = t.resets().to_vec();
902 let output_place_names: HashSet<Arc<str>> = t
903 .output_places()
904 .iter()
905 .map(|p| Arc::clone(p.name_arc()))
906 .collect();
907 let action = Arc::clone(t.action());
908 let is_sync = action.is_sync();
909
910 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
912 for in_spec in &input_specs {
913 let place_name = in_spec.place_name();
914 let to_consume = match in_spec {
915 In::One { .. } => 1,
916 In::Exactly { count, .. } => *count,
917 In::All { guard, .. } | In::AtLeast { guard, .. } => {
918 if guard.is_some() {
919 self.marking
920 .count_matching(place_name, &**guard.as_ref().unwrap())
921 } else {
922 self.marking.count(place_name)
923 }
924 }
925 };
926
927 let place_name_arc = Arc::clone(in_spec.place().name_arc());
928 for _ in 0..to_consume {
929 let token = if let Some(guard) = in_spec.guard() {
930 self.marking.remove_matching(place_name, &**guard)
931 } else {
932 self.marking.remove_first(place_name)
933 };
934 if let Some(token) = token {
935 if E::ENABLED {
936 self.event_store.append(NetEvent::TokenRemoved {
937 place_name: Arc::clone(&place_name_arc),
938 timestamp: now_millis(),
939 });
940 }
941 inputs
942 .entry(Arc::clone(&place_name_arc))
943 .or_default()
944 .push(token);
945 }
946 }
947 }
948
949 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
951 for arc in &read_arcs {
952 if let Some(queue) = self.marking.queue(arc.place.name())
953 && let Some(token) = queue.front()
954 {
955 read_tokens
956 .entry(Arc::clone(arc.place.name_arc()))
957 .or_default()
958 .push(token.clone());
959 }
960 }
961
962 for arc in &reset_arcs {
964 let removed = self.marking.remove_all(arc.place.name());
965 self.pending_reset_places
966 .insert(Arc::clone(arc.place.name_arc()));
967 if E::ENABLED {
968 for _ in &removed {
969 self.event_store.append(NetEvent::TokenRemoved {
970 place_name: Arc::clone(arc.place.name_arc()),
971 timestamp: now_millis(),
972 });
973 }
974 }
975 }
976
977 self.update_bitmap_after_consumption(tid);
978
979 if E::ENABLED {
980 self.event_store.append(NetEvent::TransitionStarted {
981 transition_name: Arc::clone(&transition_name),
982 timestamp: now_millis(),
983 });
984 }
985
986 self.enabled_flags[tid] = false;
988 self.enabled_transition_count -= 1;
989 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
990 self.mark_transition_dirty(tid);
991
992 if is_sync {
993 let mut ctx = TransitionContext::new(
995 Arc::clone(&transition_name),
996 inputs,
997 read_tokens,
998 output_place_names,
999 None,
1000 );
1001 let result = action.run_sync(&mut ctx);
1002 match result {
1003 Ok(()) => {
1004 let outputs = ctx.take_outputs();
1005 self.process_outputs(tid, &transition_name, outputs);
1006 if E::ENABLED {
1007 self.event_store.append(NetEvent::TransitionCompleted {
1008 transition_name: Arc::clone(&transition_name),
1009 timestamp: now_millis(),
1010 });
1011 }
1012 }
1013 Err(err) => {
1014 if E::ENABLED {
1015 self.event_store.append(NetEvent::TransitionFailed {
1016 transition_name: Arc::clone(&transition_name),
1017 error: err.message,
1018 timestamp: now_millis(),
1019 });
1020 }
1021 }
1022 }
1023 } else {
1024 *in_flight_count += 1;
1026 let tx = completion_tx.clone();
1027 let name = Arc::clone(&transition_name);
1028 let ctx = TransitionContext::new(
1029 Arc::clone(&transition_name),
1030 inputs,
1031 read_tokens,
1032 output_place_names,
1033 None,
1034 );
1035 tokio::spawn(async move {
1036 let result = action.run_async(ctx).await;
1037 let completion = match result {
1038 Ok(mut completed_ctx) => ActionCompletion {
1039 transition_name: Arc::clone(&name),
1040 result: Ok(completed_ctx.take_outputs()),
1041 },
1042 Err(err) => ActionCompletion {
1043 transition_name: Arc::clone(&name),
1044 result: Err(err.message),
1045 },
1046 };
1047 let _ = tx.send(completion);
1048 });
1049 }
1050 }
1051
1052 fn millis_until_next_timed_transition(&self) -> f64 {
1054 let mut min_wait = f64::INFINITY;
1055 let now_ms = self.elapsed_ms();
1056
1057 for tid in 0..self.compiled.transition_count {
1058 if !self.enabled_flags[tid] {
1059 continue;
1060 }
1061 let t = self.compiled.transition(tid);
1062 let elapsed = now_ms - self.enabled_at_ms[tid];
1063
1064 let earliest_ms = t.timing().earliest() as f64;
1065 let remaining_earliest = earliest_ms - elapsed;
1066 if remaining_earliest <= 0.0 {
1067 return 0.0;
1068 }
1069 min_wait = min_wait.min(remaining_earliest);
1070
1071 if self.has_deadline_flags[tid] {
1072 let latest_ms = t.timing().latest() as f64;
1073 let remaining_deadline = latest_ms - elapsed;
1074 if remaining_deadline <= 0.0 {
1075 return 0.0;
1076 }
1077 min_wait = min_wait.min(remaining_deadline);
1078 }
1079 }
1080
1081 min_wait
1082 }
1083}
1084
1085fn now_millis() -> u64 {
1086 std::time::SystemTime::now()
1087 .duration_since(std::time::UNIX_EPOCH)
1088 .unwrap_or_default()
1089 .as_millis() as u64
1090}
1091
1092#[allow(dead_code)]
1094fn validate_out_spec(out: &Out, produced_places: &HashSet<Arc<str>>) -> bool {
1095 match out {
1096 Out::Place(p) => produced_places.contains(p.name()),
1097 Out::And(children) => children
1098 .iter()
1099 .all(|c| validate_out_spec(c, produced_places)),
1100 Out::Xor(children) => children
1101 .iter()
1102 .any(|c| validate_out_spec(c, produced_places)),
1103 Out::Timeout { child, .. } => validate_out_spec(child, produced_places),
1104 Out::ForwardInput { to, .. } => produced_places.contains(to.name()),
1105 }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111 use libpetri_core::action::passthrough;
1112 use libpetri_core::input::one;
1113 use libpetri_core::output::out_place;
1114 use libpetri_core::place::Place;
1115 use libpetri_core::token::Token;
1116 use libpetri_core::transition::Transition;
1117 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1118
1119 fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1120 let p1 = Place::<i32>::new("p1");
1121 let p2 = Place::<i32>::new("p2");
1122 let p3 = Place::<i32>::new("p3");
1123
1124 let t1 = Transition::builder("t1")
1125 .input(one(&p1))
1126 .output(out_place(&p2))
1127 .action(passthrough())
1128 .build();
1129 let t2 = Transition::builder("t2")
1130 .input(one(&p2))
1131 .output(out_place(&p3))
1132 .action(passthrough())
1133 .build();
1134
1135 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1136 (net, p1, p2, p3)
1137 }
1138
1139 #[test]
1140 fn sync_passthrough_chain() {
1141 let (net, p1, _p2, _p3) = simple_chain();
1142
1143 let mut marking = Marking::new();
1144 marking.add(&p1, Token::at(42, 0));
1145
1146 let mut executor =
1147 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1148 let result = executor.run_sync();
1149
1150 assert_eq!(result.count("p1"), 0);
1154 }
1155
1156 #[test]
1157 fn sync_with_event_store() {
1158 let (net, p1, _, _) = simple_chain();
1159
1160 let mut marking = Marking::new();
1161 marking.add(&p1, Token::at(42, 0));
1162
1163 let mut executor =
1164 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1165 executor.run_sync();
1166
1167 let store = executor.event_store();
1168 assert!(!store.is_empty());
1169 }
1170
1171 #[test]
1172 fn sync_fork_chain() {
1173 let p1 = Place::<i32>::new("p1");
1174 let p2 = Place::<i32>::new("p2");
1175 let p3 = Place::<i32>::new("p3");
1176
1177 let t1 = Transition::builder("t1")
1178 .input(one(&p1))
1179 .output(libpetri_core::output::and(vec![
1180 out_place(&p2),
1181 out_place(&p3),
1182 ]))
1183 .action(libpetri_core::action::fork())
1184 .build();
1185
1186 let net = PetriNet::builder("fork").transition(t1).build();
1187
1188 let mut marking = Marking::new();
1189 marking.add(&p1, Token::at(42, 0));
1190
1191 let mut executor =
1192 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1193 let result = executor.run_sync();
1194
1195 assert_eq!(result.count("p1"), 0);
1197 assert_eq!(result.count("p2"), 1);
1198 assert_eq!(result.count("p3"), 1);
1199 }
1200
1201 #[test]
1202 fn sync_no_initial_tokens() {
1203 let (net, _, _, _) = simple_chain();
1204 let marking = Marking::new();
1205 let mut executor =
1206 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1207 let result = executor.run_sync();
1208 assert_eq!(result.count("p1"), 0);
1209 assert_eq!(result.count("p2"), 0);
1210 assert_eq!(result.count("p3"), 0);
1211 }
1212
1213 #[test]
1214 fn sync_priority_ordering() {
1215 let p = Place::<()>::new("p");
1216 let out_a = Place::<()>::new("a");
1217 let out_b = Place::<()>::new("b");
1218
1219 let t_high = Transition::builder("t_high")
1222 .input(one(&p))
1223 .output(out_place(&out_a))
1224 .action(libpetri_core::action::passthrough())
1225 .priority(10)
1226 .build();
1227 let t_low = Transition::builder("t_low")
1228 .input(one(&p))
1229 .output(out_place(&out_b))
1230 .action(libpetri_core::action::passthrough())
1231 .priority(1)
1232 .build();
1233
1234 let net = PetriNet::builder("priority")
1235 .transitions([t_high, t_low])
1236 .build();
1237
1238 let mut marking = Marking::new();
1239 marking.add(&p, Token::at((), 0));
1240
1241 let mut executor =
1242 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1243 executor.run_sync();
1244
1245 assert_eq!(executor.marking().count("p"), 0);
1249 }
1250
1251 #[test]
1252 fn sync_inhibitor_blocks() {
1253 let p1 = Place::<()>::new("p1");
1254 let p2 = Place::<()>::new("p2");
1255 let p_inh = Place::<()>::new("inh");
1256
1257 let t = Transition::builder("t1")
1258 .input(one(&p1))
1259 .output(out_place(&p2))
1260 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1261 .action(libpetri_core::action::passthrough())
1262 .build();
1263
1264 let net = PetriNet::builder("inhibitor").transition(t).build();
1265
1266 let mut marking = Marking::new();
1267 marking.add(&p1, Token::at((), 0));
1268 marking.add(&p_inh, Token::at((), 0));
1269
1270 let mut executor =
1271 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1272 executor.run_sync();
1273
1274 assert_eq!(executor.marking().count("p1"), 1);
1276 }
1277
1278 #[test]
1279 fn sync_linear_chain_5() {
1280 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1282 let transitions: Vec<Transition> = (0..5)
1283 .map(|i| {
1284 Transition::builder(format!("t{i}"))
1285 .input(one(&places[i]))
1286 .output(out_place(&places[i + 1]))
1287 .action(libpetri_core::action::fork())
1288 .build()
1289 })
1290 .collect();
1291
1292 let net = PetriNet::builder("chain5").transitions(transitions).build();
1293
1294 let mut marking = Marking::new();
1295 marking.add(&places[0], Token::at(1, 0));
1296
1297 let mut executor =
1298 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1299 let result = executor.run_sync();
1300
1301 assert_eq!(result.count("p0"), 0);
1303 assert_eq!(result.count("p5"), 1);
1304 }
1305
1306 #[test]
1307 fn input_arc_requires_token_to_enable() {
1308 let p1 = Place::<i32>::new("p1");
1309 let p2 = Place::<i32>::new("p2");
1310 let t = Transition::builder("t1")
1311 .input(one(&p1))
1312 .output(out_place(&p2))
1313 .action(libpetri_core::action::fork())
1314 .build();
1315 let net = PetriNet::builder("test").transition(t).build();
1316
1317 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1319 &net,
1320 Marking::new(),
1321 ExecutorOptions::default(),
1322 );
1323 executor.run_sync();
1324 assert_eq!(executor.marking().count("p1"), 0);
1325 assert_eq!(executor.marking().count("p2"), 0);
1326 }
1327
1328 #[test]
1329 fn multiple_input_arcs_require_all_tokens() {
1330 let p1 = Place::<i32>::new("p1");
1331 let p2 = Place::<i32>::new("p2");
1332 let p3 = Place::<i32>::new("p3");
1333
1334 let t = Transition::builder("t1")
1335 .input(one(&p1))
1336 .input(one(&p2))
1337 .output(out_place(&p3))
1338 .action(libpetri_core::action::sync_action(|ctx| {
1339 ctx.output("p3", 99i32)?;
1340 Ok(())
1341 }))
1342 .build();
1343 let net = PetriNet::builder("test").transition(t).build();
1344
1345 let mut marking = Marking::new();
1347 marking.add(&p1, Token::at(1, 0));
1348 let mut executor =
1349 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1350 executor.run_sync();
1351 assert_eq!(executor.marking().count("p1"), 1);
1352 assert_eq!(executor.marking().count("p3"), 0);
1353
1354 let mut marking = Marking::new();
1356 marking.add(&p1, Token::at(1, 0));
1357 marking.add(&p2, Token::at(2, 0));
1358 let mut executor =
1359 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1360 executor.run_sync();
1361 assert_eq!(executor.marking().count("p1"), 0);
1362 assert_eq!(executor.marking().count("p2"), 0);
1363 assert_eq!(executor.marking().count("p3"), 1);
1364 }
1365
1366 #[test]
1367 fn read_arc_does_not_consume() {
1368 let p_in = Place::<i32>::new("in");
1369 let p_ctx = Place::<i32>::new("ctx");
1370 let p_out = Place::<i32>::new("out");
1371
1372 let t = Transition::builder("t1")
1373 .input(one(&p_in))
1374 .read(libpetri_core::arc::read(&p_ctx))
1375 .output(out_place(&p_out))
1376 .action(libpetri_core::action::sync_action(|ctx| {
1377 let v = ctx.input::<i32>("in")?;
1378 let r = ctx.read::<i32>("ctx")?;
1379 ctx.output("out", *v + *r)?;
1380 Ok(())
1381 }))
1382 .build();
1383 let net = PetriNet::builder("test").transition(t).build();
1384
1385 let mut marking = Marking::new();
1386 marking.add(&p_in, Token::at(10, 0));
1387 marking.add(&p_ctx, Token::at(5, 0));
1388
1389 let mut executor =
1390 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1391 executor.run_sync();
1392
1393 assert_eq!(executor.marking().count("in"), 0); assert_eq!(executor.marking().count("ctx"), 1); assert_eq!(executor.marking().count("out"), 1);
1396 }
1397
1398 #[test]
1399 fn reset_arc_removes_all_tokens() {
1400 let p_in = Place::<()>::new("in");
1401 let p_reset = Place::<i32>::new("reset");
1402 let p_out = Place::<()>::new("out");
1403
1404 let t = Transition::builder("t1")
1405 .input(one(&p_in))
1406 .reset(libpetri_core::arc::reset(&p_reset))
1407 .output(out_place(&p_out))
1408 .action(libpetri_core::action::fork())
1409 .build();
1410 let net = PetriNet::builder("test").transition(t).build();
1411
1412 let mut marking = Marking::new();
1413 marking.add(&p_in, Token::at((), 0));
1414 marking.add(&p_reset, Token::at(1, 0));
1415 marking.add(&p_reset, Token::at(2, 0));
1416 marking.add(&p_reset, Token::at(3, 0));
1417
1418 let mut executor =
1419 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1420 executor.run_sync();
1421
1422 assert_eq!(executor.marking().count("reset"), 0); assert_eq!(executor.marking().count("out"), 1);
1424 }
1425
1426 #[test]
1427 fn exactly_cardinality_consumes_n() {
1428 let p = Place::<i32>::new("p");
1429 let p_out = Place::<i32>::new("out");
1430
1431 let t = Transition::builder("t1")
1432 .input(libpetri_core::input::exactly(3, &p))
1433 .output(out_place(&p_out))
1434 .action(libpetri_core::action::sync_action(|ctx| {
1435 let vals = ctx.inputs::<i32>("p")?;
1436 for v in vals {
1437 ctx.output("out", *v)?;
1438 }
1439 Ok(())
1440 }))
1441 .build();
1442 let net = PetriNet::builder("test").transition(t).build();
1443
1444 let mut marking = Marking::new();
1445 for i in 0..5 {
1446 marking.add(&p, Token::at(i, 0));
1447 }
1448
1449 let mut executor =
1450 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1451 executor.run_sync();
1452
1453 assert_eq!(executor.marking().count("p"), 2);
1455 assert_eq!(executor.marking().count("out"), 3);
1456 }
1457
1458 #[test]
1459 fn all_cardinality_consumes_everything() {
1460 let p = Place::<i32>::new("p");
1461 let p_out = Place::<()>::new("out");
1462
1463 let t = Transition::builder("t1")
1464 .input(libpetri_core::input::all(&p))
1465 .output(out_place(&p_out))
1466 .action(libpetri_core::action::sync_action(|ctx| {
1467 let vals = ctx.inputs::<i32>("p")?;
1468 ctx.output("out", vals.len() as i32)?;
1469 Ok(())
1470 }))
1471 .build();
1472 let net = PetriNet::builder("test").transition(t).build();
1473
1474 let mut marking = Marking::new();
1475 for i in 0..5 {
1476 marking.add(&p, Token::at(i, 0));
1477 }
1478
1479 let mut executor =
1480 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1481 executor.run_sync();
1482
1483 assert_eq!(executor.marking().count("p"), 0);
1484 }
1485
1486 #[test]
1487 fn at_least_blocks_insufficient() {
1488 let p = Place::<i32>::new("p");
1489 let p_out = Place::<()>::new("out");
1490
1491 let t = Transition::builder("t1")
1492 .input(libpetri_core::input::at_least(3, &p))
1493 .output(out_place(&p_out))
1494 .action(libpetri_core::action::passthrough())
1495 .build();
1496 let net = PetriNet::builder("test").transition(t).build();
1497
1498 let mut marking = Marking::new();
1500 marking.add(&p, Token::at(1, 0));
1501 marking.add(&p, Token::at(2, 0));
1502
1503 let mut executor =
1504 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1505 executor.run_sync();
1506
1507 assert_eq!(executor.marking().count("p"), 2); }
1509
1510 #[test]
1511 fn at_least_fires_with_enough_and_consumes_all() {
1512 let p = Place::<i32>::new("p");
1513 let p_out = Place::<()>::new("out");
1514
1515 let t = Transition::builder("t1")
1516 .input(libpetri_core::input::at_least(3, &p))
1517 .output(out_place(&p_out))
1518 .action(libpetri_core::action::passthrough())
1519 .build();
1520 let net = PetriNet::builder("test").transition(t).build();
1521
1522 let mut marking = Marking::new();
1523 for i in 0..5 {
1524 marking.add(&p, Token::at(i, 0));
1525 }
1526
1527 let mut executor =
1528 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1529 executor.run_sync();
1530
1531 assert_eq!(executor.marking().count("p"), 0); }
1533
1534 #[test]
1535 fn guarded_input_only_consumes_matching() {
1536 let p = Place::<i32>::new("p");
1537 let p_out = Place::<i32>::new("out");
1538
1539 let t = Transition::builder("t1")
1540 .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1541 .output(out_place(&p_out))
1542 .action(libpetri_core::action::fork())
1543 .build();
1544 let net = PetriNet::builder("test").transition(t).build();
1545
1546 let mut marking = Marking::new();
1547 marking.add(&p, Token::at(3, 0)); marking.add(&p, Token::at(10, 0)); let mut executor =
1551 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1552 executor.run_sync();
1553
1554 assert_eq!(executor.marking().count("p"), 1); assert_eq!(executor.marking().count("out"), 1); let peeked = executor.marking().peek(&p_out).unwrap();
1557 assert_eq!(*peeked, 10);
1558 }
1559
1560 #[test]
1561 fn guarded_input_blocks_when_no_match() {
1562 let p = Place::<i32>::new("p");
1563 let p_out = Place::<i32>::new("out");
1564
1565 let t = Transition::builder("t1")
1566 .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1567 .output(out_place(&p_out))
1568 .action(libpetri_core::action::fork())
1569 .build();
1570 let net = PetriNet::builder("test").transition(t).build();
1571
1572 let mut marking = Marking::new();
1573 marking.add(&p, Token::at(3, 0));
1574 marking.add(&p, Token::at(10, 0));
1575
1576 let mut executor =
1577 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1578 executor.run_sync();
1579
1580 assert_eq!(executor.marking().count("p"), 2);
1582 assert_eq!(executor.marking().count("out"), 0);
1583 }
1584
1585 #[test]
1586 fn transform_action_outputs_to_all_places() {
1587 let p_in = Place::<i32>::new("in");
1588 let p_a = Place::<i32>::new("a");
1589 let p_b = Place::<i32>::new("b");
1590
1591 let t = Transition::builder("t1")
1592 .input(one(&p_in))
1593 .output(libpetri_core::output::and(vec![
1594 out_place(&p_a),
1595 out_place(&p_b),
1596 ]))
1597 .action(libpetri_core::action::transform(|ctx| {
1598 let v = ctx.input::<i32>("in").unwrap();
1599 Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
1600 }))
1601 .build();
1602 let net = PetriNet::builder("test").transition(t).build();
1603
1604 let mut marking = Marking::new();
1605 marking.add(&p_in, Token::at(5, 0));
1606
1607 let mut executor =
1608 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1609 executor.run_sync();
1610
1611 assert_eq!(executor.marking().count("a"), 1);
1612 assert_eq!(executor.marking().count("b"), 1);
1613 assert_eq!(*executor.marking().peek(&p_a).unwrap(), 10);
1614 assert_eq!(*executor.marking().peek(&p_b).unwrap(), 10);
1615 }
1616
1617 #[test]
1618 fn sync_action_custom_logic() {
1619 let p_in = Place::<i32>::new("in");
1620 let p_out = Place::<String>::new("out");
1621
1622 let t = Transition::builder("t1")
1623 .input(one(&p_in))
1624 .output(out_place(&p_out))
1625 .action(libpetri_core::action::sync_action(|ctx| {
1626 let v = ctx.input::<i32>("in")?;
1627 ctx.output("out", format!("value={v}"))?;
1628 Ok(())
1629 }))
1630 .build();
1631 let net = PetriNet::builder("test").transition(t).build();
1632
1633 let mut marking = Marking::new();
1634 marking.add(&p_in, Token::at(42, 0));
1635
1636 let mut executor =
1637 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1638 executor.run_sync();
1639
1640 assert_eq!(executor.marking().count("out"), 1);
1641 let peeked = executor.marking().peek(&p_out).unwrap();
1642 assert_eq!(*peeked, "value=42");
1643 }
1644
1645 #[test]
1646 fn action_error_does_not_crash() {
1647 let p_in = Place::<i32>::new("in");
1648 let p_out = Place::<i32>::new("out");
1649
1650 let t = Transition::builder("t1")
1651 .input(one(&p_in))
1652 .output(out_place(&p_out))
1653 .action(libpetri_core::action::sync_action(|_ctx| {
1654 Err(libpetri_core::action::ActionError::new(
1655 "intentional failure",
1656 ))
1657 }))
1658 .build();
1659 let net = PetriNet::builder("test").transition(t).build();
1660
1661 let mut marking = Marking::new();
1662 marking.add(&p_in, Token::at(42, 0));
1663
1664 let mut executor =
1665 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1666 executor.run_sync();
1667
1668 assert_eq!(executor.marking().count("in"), 0);
1670 assert_eq!(executor.marking().count("out"), 0);
1671
1672 let events = executor.event_store().events();
1674 assert!(
1675 events
1676 .iter()
1677 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
1678 );
1679 }
1680
1681 #[test]
1682 fn event_store_records_lifecycle() {
1683 let p1 = Place::<i32>::new("p1");
1684 let p2 = Place::<i32>::new("p2");
1685 let t = Transition::builder("t1")
1686 .input(one(&p1))
1687 .output(out_place(&p2))
1688 .action(libpetri_core::action::fork())
1689 .build();
1690 let net = PetriNet::builder("test").transition(t).build();
1691
1692 let mut marking = Marking::new();
1693 marking.add(&p1, Token::at(1, 0));
1694
1695 let mut executor =
1696 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1697 executor.run_sync();
1698
1699 let events = executor.event_store().events();
1700
1701 assert!(
1704 events
1705 .iter()
1706 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
1707 );
1708 assert!(
1709 events
1710 .iter()
1711 .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
1712 );
1713 assert!(
1714 events
1715 .iter()
1716 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
1717 );
1718 assert!(
1719 events
1720 .iter()
1721 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
1722 );
1723 assert!(
1724 events
1725 .iter()
1726 .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
1727 );
1728 assert!(
1729 events
1730 .iter()
1731 .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
1732 );
1733 assert!(
1734 events
1735 .iter()
1736 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
1737 );
1738 }
1739
1740 #[test]
1741 fn noop_event_store_has_no_events() {
1742 let p1 = Place::<i32>::new("p1");
1743 let p2 = Place::<i32>::new("p2");
1744 let t = Transition::builder("t1")
1745 .input(one(&p1))
1746 .output(out_place(&p2))
1747 .action(libpetri_core::action::fork())
1748 .build();
1749 let net = PetriNet::builder("test").transition(t).build();
1750
1751 let mut marking = Marking::new();
1752 marking.add(&p1, Token::at(1, 0));
1753
1754 let mut executor =
1755 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1756 executor.run_sync();
1757
1758 assert!(executor.event_store().is_empty());
1759 }
1760
1761 #[test]
1762 fn empty_net_completes() {
1763 let net = PetriNet::builder("empty").build();
1764 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1765 &net,
1766 Marking::new(),
1767 ExecutorOptions::default(),
1768 );
1769 executor.run_sync();
1770 assert!(executor.is_quiescent());
1771 }
1772
1773 #[test]
1774 fn single_transition_fires_once() {
1775 let p = Place::<i32>::new("p");
1776 let out = Place::<i32>::new("out");
1777 let t = Transition::builder("t1")
1778 .input(one(&p))
1779 .output(out_place(&out))
1780 .action(libpetri_core::action::fork())
1781 .build();
1782 let net = PetriNet::builder("test").transition(t).build();
1783
1784 let mut marking = Marking::new();
1785 marking.add(&p, Token::at(42, 0));
1786
1787 let mut executor =
1788 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1789 executor.run_sync();
1790
1791 assert_eq!(executor.marking().count("p"), 0);
1792 assert_eq!(executor.marking().count("out"), 1);
1793 }
1794
1795 #[test]
1796 fn many_tokens_all_processed() {
1797 let p = Place::<i32>::new("p");
1798 let out = Place::<i32>::new("out");
1799 let t = Transition::builder("t1")
1800 .input(one(&p))
1801 .output(out_place(&out))
1802 .action(libpetri_core::action::fork())
1803 .build();
1804 let net = PetriNet::builder("test").transition(t).build();
1805
1806 let mut marking = Marking::new();
1807 for i in 0..100 {
1808 marking.add(&p, Token::at(i, 0));
1809 }
1810
1811 let mut executor =
1812 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1813 executor.run_sync();
1814
1815 assert_eq!(executor.marking().count("p"), 0);
1816 assert_eq!(executor.marking().count("out"), 100);
1817 }
1818
1819 #[test]
1820 fn input_fifo_ordering() {
1821 let p = Place::<i32>::new("p");
1822 let out = Place::<i32>::new("out");
1823 let t = Transition::builder("t1")
1824 .input(one(&p))
1825 .output(out_place(&out))
1826 .action(libpetri_core::action::fork())
1827 .build();
1828 let net = PetriNet::builder("test").transition(t).build();
1829
1830 let mut marking = Marking::new();
1831 marking.add(&p, Token::at(1, 0));
1832 marking.add(&p, Token::at(2, 0));
1833 marking.add(&p, Token::at(3, 0));
1834
1835 let mut executor =
1836 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1837 executor.run_sync();
1838
1839 assert_eq!(executor.marking().count("out"), 3);
1841 }
1842
1843 #[test]
1844 fn inhibitor_unblocked_when_token_removed() {
1845 let p1 = Place::<()>::new("p1");
1848 let p_inh = Place::<()>::new("inh");
1849 let p_out = Place::<()>::new("out");
1850 let p_trigger = Place::<()>::new("trigger");
1851
1852 let t_clear = Transition::builder("t_clear")
1854 .input(one(&p_inh))
1855 .output(out_place(&p_trigger))
1856 .action(libpetri_core::action::fork())
1857 .priority(10) .build();
1859
1860 let t1 = Transition::builder("t1")
1862 .input(one(&p1))
1863 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1864 .output(out_place(&p_out))
1865 .action(libpetri_core::action::fork())
1866 .priority(1)
1867 .build();
1868
1869 let net = PetriNet::builder("test").transitions([t_clear, t1]).build();
1870
1871 let mut marking = Marking::new();
1872 marking.add(&p1, Token::at((), 0));
1873 marking.add(&p_inh, Token::at((), 0));
1874
1875 let mut executor =
1876 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1877 executor.run_sync();
1878
1879 assert_eq!(executor.marking().count("inh"), 0);
1882 assert_eq!(executor.marking().count("p1"), 0);
1883 assert_eq!(executor.marking().count("out"), 1);
1884 }
1885
1886 #[test]
1887 fn combined_input_read_reset() {
1888 let p_in = Place::<i32>::new("in");
1889 let p_ctx = Place::<String>::new("ctx");
1890 let p_clear = Place::<i32>::new("clear");
1891 let p_out = Place::<String>::new("out");
1892
1893 let t = Transition::builder("t1")
1894 .input(one(&p_in))
1895 .read(libpetri_core::arc::read(&p_ctx))
1896 .reset(libpetri_core::arc::reset(&p_clear))
1897 .output(out_place(&p_out))
1898 .action(libpetri_core::action::sync_action(|ctx| {
1899 let v = ctx.input::<i32>("in")?;
1900 let r = ctx.read::<String>("ctx")?;
1901 ctx.output("out", format!("{v}-{r}"))?;
1902 Ok(())
1903 }))
1904 .build();
1905 let net = PetriNet::builder("test").transition(t).build();
1906
1907 let mut marking = Marking::new();
1908 marking.add(&p_in, Token::at(42, 0));
1909 marking.add(&p_ctx, Token::at("hello".to_string(), 0));
1910 marking.add(&p_clear, Token::at(1, 0));
1911 marking.add(&p_clear, Token::at(2, 0));
1912
1913 let mut executor =
1914 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1915 executor.run_sync();
1916
1917 assert_eq!(executor.marking().count("in"), 0); assert_eq!(executor.marking().count("ctx"), 1); assert_eq!(executor.marking().count("clear"), 0); assert_eq!(executor.marking().count("out"), 1);
1921 let peeked = executor.marking().peek(&p_out).unwrap();
1922 assert_eq!(*peeked, "42-hello");
1923 }
1924
1925 #[test]
1926 fn workflow_sequential_chain() {
1927 let p1 = Place::<i32>::new("p1");
1930 let p2 = Place::<i32>::new("p2");
1931 let p3 = Place::<i32>::new("p3");
1932 let p4 = Place::<i32>::new("p4");
1933
1934 let make_doubler = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
1935 let out_name: Arc<str> = Arc::from(outp.name());
1936 Transition::builder(name)
1937 .input(one(inp))
1938 .output(out_place(outp))
1939 .action(libpetri_core::action::sync_action(move |ctx| {
1940 let v = ctx
1941 .input::<i32>("p1")
1942 .or_else(|_| ctx.input::<i32>("p2"))
1943 .or_else(|_| ctx.input::<i32>("p3"))
1944 .unwrap();
1945 ctx.output(&out_name, *v * 2)?;
1946 Ok(())
1947 }))
1948 .build()
1949 };
1950
1951 let t1 = make_doubler("t1", &p1, &p2);
1952 let t2 = make_doubler("t2", &p2, &p3);
1953 let t3 = make_doubler("t3", &p3, &p4);
1954
1955 let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
1956
1957 let mut marking = Marking::new();
1958 marking.add(&p1, Token::at(1, 0));
1959
1960 let mut executor =
1961 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1962 executor.run_sync();
1963
1964 assert_eq!(executor.marking().count("p4"), 1);
1965 assert_eq!(*executor.marking().peek(&p4).unwrap(), 8); }
1967
1968 #[test]
1969 fn workflow_fork_join() {
1970 let p_start = Place::<i32>::new("start");
1972 let p_a = Place::<i32>::new("a");
1973 let p_b = Place::<i32>::new("b");
1974 let p_end = Place::<i32>::new("end");
1975
1976 let t_fork = Transition::builder("fork")
1977 .input(one(&p_start))
1978 .output(libpetri_core::output::and(vec![
1979 out_place(&p_a),
1980 out_place(&p_b),
1981 ]))
1982 .action(libpetri_core::action::fork())
1983 .build();
1984
1985 let t_join = Transition::builder("join")
1986 .input(one(&p_a))
1987 .input(one(&p_b))
1988 .output(out_place(&p_end))
1989 .action(libpetri_core::action::sync_action(|ctx| {
1990 let a = ctx.input::<i32>("a")?;
1991 let b = ctx.input::<i32>("b")?;
1992 ctx.output("end", *a + *b)?;
1993 Ok(())
1994 }))
1995 .build();
1996
1997 let net = PetriNet::builder("fork-join")
1998 .transitions([t_fork, t_join])
1999 .build();
2000
2001 let mut marking = Marking::new();
2002 marking.add(&p_start, Token::at(5, 0));
2003
2004 let mut executor =
2005 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2006 executor.run_sync();
2007
2008 assert_eq!(executor.marking().count("start"), 0);
2009 assert_eq!(executor.marking().count("a"), 0);
2010 assert_eq!(executor.marking().count("b"), 0);
2011 assert_eq!(executor.marking().count("end"), 1);
2012 assert_eq!(*executor.marking().peek(&p_end).unwrap(), 10); }
2014
2015 #[test]
2016 fn workflow_mutual_exclusion() {
2017 let p_mutex = Place::<()>::new("mutex");
2019 let p_w1 = Place::<()>::new("w1");
2020 let p_w2 = Place::<()>::new("w2");
2021 let p_done1 = Place::<()>::new("done1");
2022 let p_done2 = Place::<()>::new("done2");
2023
2024 let t_w1 = Transition::builder("work1")
2025 .input(one(&p_w1))
2026 .input(one(&p_mutex))
2027 .output(libpetri_core::output::and(vec![
2028 out_place(&p_done1),
2029 out_place(&p_mutex), ]))
2031 .action(libpetri_core::action::sync_action(|ctx| {
2032 ctx.output("done1", ())?;
2033 ctx.output("mutex", ())?;
2034 Ok(())
2035 }))
2036 .build();
2037
2038 let t_w2 = Transition::builder("work2")
2039 .input(one(&p_w2))
2040 .input(one(&p_mutex))
2041 .output(libpetri_core::output::and(vec![
2042 out_place(&p_done2),
2043 out_place(&p_mutex), ]))
2045 .action(libpetri_core::action::sync_action(|ctx| {
2046 ctx.output("done2", ())?;
2047 ctx.output("mutex", ())?;
2048 Ok(())
2049 }))
2050 .build();
2051
2052 let net = PetriNet::builder("mutex").transitions([t_w1, t_w2]).build();
2053
2054 let mut marking = Marking::new();
2055 marking.add(&p_mutex, Token::at((), 0));
2056 marking.add(&p_w1, Token::at((), 0));
2057 marking.add(&p_w2, Token::at((), 0));
2058
2059 let mut executor =
2060 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2061 executor.run_sync();
2062
2063 assert_eq!(executor.marking().count("done1"), 1);
2065 assert_eq!(executor.marking().count("done2"), 1);
2066 assert_eq!(executor.marking().count("mutex"), 1); }
2068
2069 #[test]
2070 fn produce_action() {
2071 let p_in = Place::<()>::new("in");
2072 let p_out = Place::<String>::new("out");
2073
2074 let t = Transition::builder("t1")
2075 .input(one(&p_in))
2076 .output(out_place(&p_out))
2077 .action(libpetri_core::action::produce(
2078 Arc::from("out"),
2079 "produced_value".to_string(),
2080 ))
2081 .build();
2082 let net = PetriNet::builder("test").transition(t).build();
2083
2084 let mut marking = Marking::new();
2085 marking.add(&p_in, Token::at((), 0));
2086
2087 let mut executor =
2088 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2089 executor.run_sync();
2090
2091 assert_eq!(executor.marking().count("out"), 1);
2092 }
2093
2094 #[test]
2095 fn xor_output_fires_one_branch() {
2096 let p = Place::<i32>::new("p");
2097 let a = Place::<i32>::new("a");
2098 let b = Place::<i32>::new("b");
2099
2100 let t = Transition::builder("t1")
2101 .input(one(&p))
2102 .output(libpetri_core::output::xor(vec![
2103 out_place(&a),
2104 out_place(&b),
2105 ]))
2106 .action(libpetri_core::action::sync_action(|ctx| {
2107 let v = ctx.input::<i32>("p")?;
2108 ctx.output("a", *v)?;
2110 Ok(())
2111 }))
2112 .build();
2113 let net = PetriNet::builder("test").transition(t).build();
2114
2115 let mut marking = Marking::new();
2116 marking.add(&p, Token::at(1, 0));
2117
2118 let mut executor =
2119 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2120 executor.run_sync();
2121
2122 let total = executor.marking().count("a") + executor.marking().count("b");
2124 assert!(total >= 1);
2125 }
2126
2127 #[test]
2128 fn and_output_fires_to_all() {
2129 let p = Place::<i32>::new("p");
2130 let a = Place::<i32>::new("a");
2131 let b = Place::<i32>::new("b");
2132 let c = Place::<i32>::new("c");
2133
2134 let t = Transition::builder("t1")
2135 .input(one(&p))
2136 .output(libpetri_core::output::and(vec![
2137 out_place(&a),
2138 out_place(&b),
2139 out_place(&c),
2140 ]))
2141 .action(libpetri_core::action::sync_action(|ctx| {
2142 let v = ctx.input::<i32>("p")?;
2143 ctx.output("a", *v)?;
2144 ctx.output("b", *v * 10)?;
2145 ctx.output("c", *v * 100)?;
2146 Ok(())
2147 }))
2148 .build();
2149 let net = PetriNet::builder("test").transition(t).build();
2150
2151 let mut marking = Marking::new();
2152 marking.add(&p, Token::at(1, 0));
2153
2154 let mut executor =
2155 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2156 executor.run_sync();
2157
2158 assert_eq!(executor.marking().count("a"), 1);
2159 assert_eq!(executor.marking().count("b"), 1);
2160 assert_eq!(executor.marking().count("c"), 1);
2161 assert_eq!(*executor.marking().peek(&a).unwrap(), 1);
2162 assert_eq!(*executor.marking().peek(&b).unwrap(), 10);
2163 assert_eq!(*executor.marking().peek(&c).unwrap(), 100);
2164 }
2165
2166 #[test]
2167 fn transition_with_no_output_consumes_only() {
2168 let p = Place::<i32>::new("p");
2169
2170 let t = Transition::builder("t1")
2171 .input(one(&p))
2172 .action(libpetri_core::action::sync_action(|ctx| {
2173 let _ = ctx.input::<i32>("p")?;
2174 Ok(())
2175 }))
2176 .build();
2177 let net = PetriNet::builder("test").transition(t).build();
2178
2179 let mut marking = Marking::new();
2180 marking.add(&p, Token::at(42, 0));
2181
2182 let mut executor =
2183 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2184 executor.run_sync();
2185
2186 assert_eq!(executor.marking().count("p"), 0);
2187 }
2188
2189 #[test]
2190 fn multiple_transitions_same_input_compete() {
2191 let p1 = Place::<()>::new("p1");
2193 let out_a = Place::<()>::new("a");
2194 let out_b = Place::<()>::new("b");
2195
2196 let t_a = Transition::builder("ta")
2197 .input(one(&p1))
2198 .output(out_place(&out_a))
2199 .action(libpetri_core::action::fork())
2200 .build();
2201 let t_b = Transition::builder("tb")
2202 .input(one(&p1))
2203 .output(out_place(&out_b))
2204 .action(libpetri_core::action::fork())
2205 .build();
2206
2207 let net = PetriNet::builder("test").transitions([t_a, t_b]).build();
2208
2209 let mut marking = Marking::new();
2210 marking.add(&p1, Token::at((), 0));
2211
2212 let mut executor =
2213 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2214 executor.run_sync();
2215
2216 let total = executor.marking().count("a") + executor.marking().count("b");
2218 assert_eq!(total, 1);
2219 assert_eq!(executor.marking().count("p1"), 0);
2220 }
2221
2222 #[test]
2223 fn priority_higher_fires_first() {
2224 let p = Place::<()>::new("p");
2225 let out_hi = Place::<()>::new("hi");
2226 let out_lo = Place::<()>::new("lo");
2227
2228 let t_hi = Transition::builder("hi")
2229 .input(one(&p))
2230 .output(out_place(&out_hi))
2231 .action(libpetri_core::action::fork())
2232 .priority(10)
2233 .build();
2234 let t_lo = Transition::builder("lo")
2235 .input(one(&p))
2236 .output(out_place(&out_lo))
2237 .action(libpetri_core::action::fork())
2238 .priority(1)
2239 .build();
2240
2241 let net = PetriNet::builder("test").transitions([t_hi, t_lo]).build();
2242
2243 let mut marking = Marking::new();
2244 marking.add(&p, Token::at((), 0));
2245
2246 let mut executor =
2247 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2248 executor.run_sync();
2249
2250 assert_eq!(executor.marking().count("hi"), 1);
2252 assert_eq!(executor.marking().count("lo"), 0);
2253 }
2254
2255 #[test]
2256 fn quiescent_when_no_enabled_transitions() {
2257 let p = Place::<i32>::new("p");
2258 let out = Place::<i32>::new("out");
2259
2260 let t = Transition::builder("t1")
2261 .input(one(&p))
2262 .output(out_place(&out))
2263 .action(libpetri_core::action::fork())
2264 .build();
2265 let net = PetriNet::builder("test").transition(t).build();
2266
2267 let mut marking = Marking::new();
2268 marking.add(&p, Token::at(1, 0));
2269
2270 let mut executor =
2271 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2272 executor.run_sync();
2273
2274 assert!(executor.is_quiescent());
2275 }
2276
2277 #[test]
2278 fn event_store_transition_enabled_disabled() {
2279 let p1 = Place::<()>::new("p1");
2280 let p2 = Place::<()>::new("p2");
2281
2282 let t = Transition::builder("t1")
2283 .input(one(&p1))
2284 .output(out_place(&p2))
2285 .action(libpetri_core::action::fork())
2286 .build();
2287 let net = PetriNet::builder("test").transition(t).build();
2288
2289 let mut marking = Marking::new();
2290 marking.add(&p1, Token::at((), 0));
2291
2292 let mut executor =
2293 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2294 executor.run_sync();
2295
2296 let events = executor.event_store().events();
2297 assert!(events.len() >= 4);
2300
2301 let has_enabled = events.iter().any(|e| {
2303 matches!(e, NetEvent::TransitionEnabled { transition_name, .. } if transition_name.as_ref() == "t1")
2304 });
2305 assert!(has_enabled);
2306 }
2307
2308 #[test]
2309 fn diamond_pattern() {
2310 let p1 = Place::<()>::new("p1");
2312 let p2 = Place::<()>::new("p2");
2313 let p3 = Place::<()>::new("p3");
2314 let p4 = Place::<()>::new("p4");
2315
2316 let t1 = Transition::builder("t1")
2317 .input(one(&p1))
2318 .output(libpetri_core::output::and(vec![
2319 out_place(&p2),
2320 out_place(&p3),
2321 ]))
2322 .action(libpetri_core::action::fork())
2323 .build();
2324 let t2 = Transition::builder("t2")
2325 .input(one(&p2))
2326 .output(out_place(&p4))
2327 .action(libpetri_core::action::fork())
2328 .build();
2329 let t3 = Transition::builder("t3")
2330 .input(one(&p3))
2331 .output(out_place(&p4))
2332 .action(libpetri_core::action::fork())
2333 .build();
2334
2335 let net = PetriNet::builder("diamond")
2336 .transitions([t1, t2, t3])
2337 .build();
2338
2339 let mut marking = Marking::new();
2340 marking.add(&p1, Token::at((), 0));
2341
2342 let mut executor =
2343 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2344 executor.run_sync();
2345
2346 assert_eq!(executor.marking().count("p4"), 2); }
2348
2349 #[test]
2350 fn self_loop_with_guard_terminates() {
2351 let p = Place::<i32>::new("p");
2354 let done = Place::<i32>::new("done");
2355
2356 let t = Transition::builder("dec")
2357 .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v > 0))
2358 .output(out_place(&p))
2359 .action(libpetri_core::action::sync_action(|ctx| {
2360 let v = ctx.input::<i32>("p")?;
2361 ctx.output("p", *v - 1)?;
2362 Ok(())
2363 }))
2364 .build();
2365
2366 let t_done = Transition::builder("finish")
2368 .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v == 0))
2369 .output(out_place(&done))
2370 .action(libpetri_core::action::fork())
2371 .build();
2372
2373 let net = PetriNet::builder("countdown")
2374 .transitions([t, t_done])
2375 .build();
2376
2377 let mut marking = Marking::new();
2378 marking.add(&p, Token::at(3, 0));
2379
2380 let mut executor =
2381 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2382 executor.run_sync();
2383
2384 assert_eq!(executor.marking().count("done"), 1);
2385 assert_eq!(*executor.marking().peek(&done).unwrap(), 0);
2386 }
2387
2388 #[test]
2389 fn multiple_tokens_different_types() {
2390 let p_int = Place::<i32>::new("ints");
2391 let p_str = Place::<String>::new("strs");
2392 let p_out = Place::<String>::new("out");
2393
2394 let t = Transition::builder("combine")
2395 .input(one(&p_int))
2396 .input(one(&p_str))
2397 .output(out_place(&p_out))
2398 .action(libpetri_core::action::sync_action(|ctx| {
2399 let i = ctx.input::<i32>("ints")?;
2400 let s = ctx.input::<String>("strs")?;
2401 ctx.output("out", format!("{s}-{i}"))?;
2402 Ok(())
2403 }))
2404 .build();
2405
2406 let net = PetriNet::builder("test").transition(t).build();
2407
2408 let mut marking = Marking::new();
2409 marking.add(&p_int, Token::at(42, 0));
2410 marking.add(&p_str, Token::at("hello".to_string(), 0));
2411
2412 let mut executor =
2413 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2414 executor.run_sync();
2415
2416 assert_eq!(*executor.marking().peek(&p_out).unwrap(), "hello-42");
2417 }
2418}
2419
2420#[cfg(all(test, feature = "tokio"))]
2421mod async_tests {
2422 use super::*;
2423 use crate::environment::{ExecutorSignal, ExternalEvent};
2424 use libpetri_core::action::{ActionError, async_action, fork};
2425 use libpetri_core::input::one;
2426 use libpetri_core::output::out_place;
2427 use libpetri_core::place::Place;
2428 use libpetri_core::token::{ErasedToken, Token};
2429 use libpetri_core::transition::Transition;
2430 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
2431
2432 #[tokio::test]
2433 async fn async_fork_single_transition() {
2434 let p1 = Place::<i32>::new("p1");
2435 let p2 = Place::<i32>::new("p2");
2436
2437 let t1 = Transition::builder("t1")
2438 .input(one(&p1))
2439 .output(out_place(&p2))
2440 .action(fork())
2441 .build();
2442
2443 let net = PetriNet::builder("test").transition(t1).build();
2444 let mut marking = Marking::new();
2445 marking.add(&p1, Token::at(42, 0));
2446
2447 let mut executor =
2448 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2449
2450 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2451 executor.run_async(rx).await;
2452
2453 assert_eq!(executor.marking().count("p2"), 1);
2454 assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
2455 }
2456
2457 #[tokio::test]
2458 async fn async_action_produces_output() {
2459 let p1 = Place::<i32>::new("p1");
2460 let p2 = Place::<i32>::new("p2");
2461
2462 let action = async_action(|mut ctx| async move {
2463 let val: i32 = *ctx.input::<i32>("p1")?;
2464 ctx.output("p2", val * 10)?;
2465 Ok(ctx)
2466 });
2467
2468 let t1 = Transition::builder("t1")
2469 .input(one(&p1))
2470 .output(out_place(&p2))
2471 .action(action)
2472 .build();
2473
2474 let net = PetriNet::builder("test").transition(t1).build();
2475 let mut marking = Marking::new();
2476 marking.add(&p1, Token::at(5, 0));
2477
2478 let mut executor =
2479 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2480
2481 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2482 executor.run_async(rx).await;
2483
2484 assert_eq!(executor.marking().count("p2"), 1);
2485 assert_eq!(*executor.marking().peek(&p2).unwrap(), 50);
2486 }
2487
2488 #[tokio::test]
2489 async fn async_chain_two_transitions() {
2490 let p1 = Place::<i32>::new("p1");
2491 let p2 = Place::<i32>::new("p2");
2492 let p3 = Place::<i32>::new("p3");
2493
2494 let t1 = Transition::builder("t1")
2495 .input(one(&p1))
2496 .output(out_place(&p2))
2497 .action(fork())
2498 .build();
2499
2500 let t2 = Transition::builder("t2")
2501 .input(one(&p2))
2502 .output(out_place(&p3))
2503 .action(fork())
2504 .build();
2505
2506 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
2507 let mut marking = Marking::new();
2508 marking.add(&p1, Token::at(99, 0));
2509
2510 let mut executor =
2511 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2512
2513 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2514 executor.run_async(rx).await;
2515
2516 assert_eq!(executor.marking().count("p3"), 1);
2517 assert_eq!(*executor.marking().peek(&p3).unwrap(), 99);
2518 }
2519
2520 #[tokio::test]
2521 async fn async_event_injection() {
2522 let p1 = Place::<i32>::new("p1");
2523 let p2 = Place::<i32>::new("p2");
2524
2525 let t1 = Transition::builder("t1")
2526 .input(one(&p1))
2527 .output(out_place(&p2))
2528 .action(fork())
2529 .build();
2530
2531 let net = PetriNet::builder("test").transition(t1).build();
2532 let marking = Marking::new(); let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2535 &net,
2536 marking,
2537 ExecutorOptions {
2538 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2539 },
2540 );
2541
2542 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2543
2544 tokio::spawn(async move {
2546 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2547 let token = Token::at(77, 0);
2548 tx.send(ExecutorSignal::Event(ExternalEvent {
2549 place_name: Arc::from("p1"),
2550 token: ErasedToken::from_typed(&token),
2551 }))
2552 .unwrap();
2553 });
2555
2556 executor.run_async(rx).await;
2557
2558 assert_eq!(executor.marking().count("p2"), 1);
2559 assert_eq!(*executor.marking().peek(&p2).unwrap(), 77);
2560 }
2561
2562 #[tokio::test]
2563 async fn async_with_event_store_records_events() {
2564 let p1 = Place::<i32>::new("p1");
2565 let p2 = Place::<i32>::new("p2");
2566
2567 let t1 = Transition::builder("t1")
2568 .input(one(&p1))
2569 .output(out_place(&p2))
2570 .action(fork())
2571 .build();
2572
2573 let net = PetriNet::builder("test").transition(t1).build();
2574 let mut marking = Marking::new();
2575 marking.add(&p1, Token::at(1, 0));
2576
2577 let mut executor =
2578 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2579
2580 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2581 executor.run_async(rx).await;
2582
2583 let events = executor.event_store().events();
2584 assert!(!events.is_empty());
2585 assert!(
2587 events
2588 .iter()
2589 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2590 );
2591 assert!(
2592 events
2593 .iter()
2594 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2595 );
2596 assert!(
2597 events
2598 .iter()
2599 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2600 );
2601 assert!(
2602 events
2603 .iter()
2604 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2605 );
2606 }
2607
2608 #[tokio::test]
2609 async fn async_action_error_does_not_crash() {
2610 let p1 = Place::<i32>::new("p1");
2611 let p2 = Place::<i32>::new("p2");
2612
2613 let action =
2614 async_action(|_ctx| async move { Err(ActionError::new("intentional failure")) });
2615
2616 let t1 = Transition::builder("t1")
2617 .input(one(&p1))
2618 .output(out_place(&p2))
2619 .action(action)
2620 .build();
2621
2622 let net = PetriNet::builder("test").transition(t1).build();
2623 let mut marking = Marking::new();
2624 marking.add(&p1, Token::at(1, 0));
2625
2626 let mut executor =
2627 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2628
2629 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2630 executor.run_async(rx).await;
2631
2632 assert_eq!(executor.marking().count("p1"), 0);
2634 assert_eq!(executor.marking().count("p2"), 0);
2635
2636 let events = executor.event_store().events();
2638 assert!(
2639 events
2640 .iter()
2641 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2642 );
2643 }
2644
2645 #[tokio::test]
2646 async fn async_delayed_timing() {
2647 use libpetri_core::timing::delayed;
2648
2649 let p1 = Place::<i32>::new("p1");
2650 let p2 = Place::<i32>::new("p2");
2651
2652 let t1 = Transition::builder("t1")
2653 .input(one(&p1))
2654 .output(out_place(&p2))
2655 .timing(delayed(100))
2656 .action(fork())
2657 .build();
2658
2659 let net = PetriNet::builder("test").transition(t1).build();
2660 let mut marking = Marking::new();
2661 marking.add(&p1, Token::at(10, 0));
2662
2663 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2664 &net,
2665 marking,
2666 ExecutorOptions {
2667 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2668 },
2669 );
2670
2671 let start = std::time::Instant::now();
2672 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2673 tokio::spawn(async move {
2675 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2676 drop(tx);
2677 });
2678 executor.run_async(rx).await;
2679
2680 assert!(
2681 start.elapsed().as_millis() >= 80,
2682 "delayed(100) should wait ~100ms"
2683 );
2684 assert_eq!(executor.marking().count("p2"), 1);
2685 assert_eq!(*executor.marking().peek(&p2).unwrap(), 10);
2686 }
2687
2688 #[tokio::test]
2689 async fn async_exact_timing() {
2690 use libpetri_core::timing::exact;
2691
2692 let p1 = Place::<i32>::new("p1");
2693 let p2 = Place::<i32>::new("p2");
2694
2695 let t1 = Transition::builder("t1")
2696 .input(one(&p1))
2697 .output(out_place(&p2))
2698 .timing(exact(100))
2699 .action(fork())
2700 .build();
2701
2702 let net = PetriNet::builder("test").transition(t1).build();
2703 let mut marking = Marking::new();
2704 marking.add(&p1, Token::at(20, 0));
2705
2706 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2707 &net,
2708 marking,
2709 ExecutorOptions {
2710 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2711 },
2712 );
2713
2714 let start = std::time::Instant::now();
2715 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2716 tokio::spawn(async move {
2717 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2718 drop(tx);
2719 });
2720 executor.run_async(rx).await;
2721
2722 assert!(
2723 start.elapsed().as_millis() >= 80,
2724 "exact(100) should wait ~100ms"
2725 );
2726 assert_eq!(executor.marking().count("p2"), 1);
2727 assert_eq!(*executor.marking().peek(&p2).unwrap(), 20);
2728 }
2729
2730 #[tokio::test]
2731 async fn async_window_timing() {
2732 use libpetri_core::timing::window;
2733
2734 let p1 = Place::<i32>::new("p1");
2735 let p2 = Place::<i32>::new("p2");
2736
2737 let t1 = Transition::builder("t1")
2738 .input(one(&p1))
2739 .output(out_place(&p2))
2740 .timing(window(50, 200))
2741 .action(fork())
2742 .build();
2743
2744 let net = PetriNet::builder("test").transition(t1).build();
2745 let mut marking = Marking::new();
2746 marking.add(&p1, Token::at(30, 0));
2747
2748 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2749 &net,
2750 marking,
2751 ExecutorOptions {
2752 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2753 },
2754 );
2755
2756 let start = std::time::Instant::now();
2757 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2758 tokio::spawn(async move {
2759 tokio::time::sleep(std::time::Duration::from_millis(400)).await;
2760 drop(tx);
2761 });
2762 executor.run_async(rx).await;
2763
2764 assert!(
2765 start.elapsed().as_millis() >= 40,
2766 "window(50,200) should wait >= ~50ms"
2767 );
2768 assert_eq!(executor.marking().count("p2"), 1);
2769 assert_eq!(*executor.marking().peek(&p2).unwrap(), 30);
2770 }
2771
2772 #[tokio::test]
2773 async fn async_deadline_enforcement() {
2774 use libpetri_core::action::sync_action;
2775 use libpetri_core::timing::window;
2776
2777 let p_slow = Place::<i32>::new("p_slow");
2778 let p_windowed = Place::<i32>::new("p_windowed");
2779 let slow_out = Place::<i32>::new("slow_out");
2780 let windowed_out = Place::<i32>::new("windowed_out");
2781
2782 let t_slow = Transition::builder("slow")
2786 .input(one(&p_slow))
2787 .output(out_place(&slow_out))
2788 .priority(10)
2789 .action(sync_action(|ctx| {
2790 let v = ctx.input::<i32>("p_slow")?;
2791 let start = std::time::Instant::now();
2792 while start.elapsed().as_millis() < 200 {
2793 std::hint::spin_loop();
2794 }
2795 ctx.output("slow_out", *v)?;
2796 Ok(())
2797 }))
2798 .build();
2799
2800 let t_windowed = Transition::builder("windowed")
2804 .input(one(&p_windowed))
2805 .output(out_place(&windowed_out))
2806 .timing(window(50, 100))
2807 .action(fork())
2808 .build();
2809
2810 let net = PetriNet::builder("test")
2811 .transitions([t_slow, t_windowed])
2812 .build();
2813
2814 let mut marking = Marking::new();
2815 marking.add(&p_slow, Token::at(1, 0));
2816 marking.add(&p_windowed, Token::at(2, 0));
2817
2818 let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
2819 &net,
2820 marking,
2821 ExecutorOptions {
2822 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2823 },
2824 );
2825
2826 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2827 tokio::spawn(async move {
2828 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2829 drop(tx);
2830 });
2831 executor.run_async(rx).await;
2832
2833 assert_eq!(executor.marking().count("slow_out"), 1);
2835 assert_eq!(
2837 executor.marking().count("windowed_out"),
2838 0,
2839 "windowed transition should have been disabled by deadline"
2840 );
2841
2842 let events = executor.event_store().events();
2843 assert!(
2844 events.iter().any(|e| matches!(e, NetEvent::TransitionTimedOut { transition_name, .. } if &**transition_name == "windowed")),
2845 "expected TransitionTimedOut event for 'windowed'"
2846 );
2847 }
2848
2849 #[tokio::test]
2850 async fn async_multiple_injections() {
2851 let p1 = Place::<i32>::new("p1");
2852 let p2 = Place::<i32>::new("p2");
2853
2854 let t1 = Transition::builder("t1")
2855 .input(one(&p1))
2856 .output(out_place(&p2))
2857 .action(fork())
2858 .build();
2859
2860 let net = PetriNet::builder("test").transition(t1).build();
2861 let marking = Marking::new();
2862
2863 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2864 &net,
2865 marking,
2866 ExecutorOptions {
2867 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
2868 },
2869 );
2870
2871 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2872
2873 tokio::spawn(async move {
2874 for i in 0..5 {
2875 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2876 let token = Token::at(i, 0);
2877 tx.send(ExecutorSignal::Event(ExternalEvent {
2878 place_name: Arc::from("p1"),
2879 token: ErasedToken::from_typed(&token),
2880 }))
2881 .unwrap();
2882 }
2883 });
2885
2886 executor.run_async(rx).await;
2887
2888 assert_eq!(
2889 executor.marking().count("p2"),
2890 5,
2891 "all 5 injected tokens should arrive"
2892 );
2893 }
2894
2895 #[tokio::test]
2896 async fn async_parallel_execution() {
2897 let p1 = Place::<i32>::new("p1");
2898 let p2 = Place::<i32>::new("p2");
2899 let p3 = Place::<i32>::new("p3");
2900 let out1 = Place::<i32>::new("out1");
2901 let out2 = Place::<i32>::new("out2");
2902 let out3 = Place::<i32>::new("out3");
2903
2904 let make_transition = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
2905 Transition::builder(name)
2906 .input(one(inp))
2907 .output(out_place(outp))
2908 .action(async_action(|mut ctx| async move {
2909 let v: i32 =
2910 *ctx.input::<i32>(ctx.transition_name().replace("t", "p").as_str())?;
2911 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2912 ctx.output(&ctx.transition_name().replace("t", "out"), v)?;
2913 Ok(ctx)
2914 }))
2915 .build()
2916 };
2917
2918 let t1 = make_transition("t1", &p1, &out1);
2919 let t2 = make_transition("t2", &p2, &out2);
2920 let t3 = make_transition("t3", &p3, &out3);
2921
2922 let net = PetriNet::builder("parallel")
2923 .transitions([t1, t2, t3])
2924 .build();
2925 let mut marking = Marking::new();
2926 marking.add(&p1, Token::at(1, 0));
2927 marking.add(&p2, Token::at(2, 0));
2928 marking.add(&p3, Token::at(3, 0));
2929
2930 let mut executor =
2931 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2932
2933 let start = std::time::Instant::now();
2934 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2935 executor.run_async(rx).await;
2936 let elapsed = start.elapsed().as_millis();
2937
2938 assert_eq!(executor.marking().count("out1"), 1);
2939 assert_eq!(executor.marking().count("out2"), 1);
2940 assert_eq!(executor.marking().count("out3"), 1);
2941 assert!(
2942 elapsed < 250,
2943 "parallel execution should take < 250ms, took {elapsed}ms"
2944 );
2945 }
2946
2947 #[tokio::test]
2948 async fn async_sequential_chain_order() {
2949 use std::sync::Mutex;
2950
2951 let p1 = Place::<i32>::new("p1");
2952 let p2 = Place::<i32>::new("p2");
2953 let p3 = Place::<i32>::new("p3");
2954 let p4 = Place::<i32>::new("p4");
2955
2956 let order: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
2957
2958 let make_chain = |name: &str,
2959 inp: &Place<i32>,
2960 outp: &Place<i32>,
2961 id: i32,
2962 order: Arc<Mutex<Vec<i32>>>| {
2963 let inp_name: Arc<str> = Arc::from(inp.name());
2964 let outp_name: Arc<str> = Arc::from(outp.name());
2965 Transition::builder(name)
2966 .input(one(inp))
2967 .output(out_place(outp))
2968 .action(async_action(move |mut ctx| {
2969 let order = Arc::clone(&order);
2970 let inp_name = Arc::clone(&inp_name);
2971 let outp_name = Arc::clone(&outp_name);
2972 async move {
2973 let v: i32 = *ctx.input::<i32>(&inp_name)?;
2974 order.lock().unwrap().push(id);
2975 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2976 ctx.output(&outp_name, v)?;
2977 Ok(ctx)
2978 }
2979 }))
2980 .build()
2981 };
2982
2983 let t1 = make_chain("t1", &p1, &p2, 1, Arc::clone(&order));
2984 let t2 = make_chain("t2", &p2, &p3, 2, Arc::clone(&order));
2985 let t3 = make_chain("t3", &p3, &p4, 3, Arc::clone(&order));
2986
2987 let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
2988 let mut marking = Marking::new();
2989 marking.add(&p1, Token::at(1, 0));
2990
2991 let mut executor =
2992 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2993
2994 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
2995 executor.run_async(rx).await;
2996
2997 assert_eq!(executor.marking().count("p4"), 1);
2998 let recorded = order.lock().unwrap().clone();
2999 assert_eq!(recorded, vec![1, 2, 3], "chain should execute in order");
3000 }
3001
3002 #[tokio::test]
3003 async fn async_fork_join() {
3004 use libpetri_core::output::and;
3005
3006 let p1 = Place::<i32>::new("p1");
3007 let p2 = Place::<i32>::new("p2");
3008 let p3 = Place::<i32>::new("p3");
3009 let p4 = Place::<i32>::new("p4");
3010
3011 let t_fork = Transition::builder("fork")
3013 .input(one(&p1))
3014 .output(and(vec![out_place(&p2), out_place(&p3)]))
3015 .action(libpetri_core::action::sync_action(|ctx| {
3016 let v = ctx.input::<i32>("p1")?;
3017 ctx.output("p2", *v)?;
3018 ctx.output("p3", *v)?;
3019 Ok(())
3020 }))
3021 .build();
3022
3023 let t_join = Transition::builder("join")
3025 .input(one(&p2))
3026 .input(one(&p3))
3027 .output(out_place(&p4))
3028 .action(libpetri_core::action::sync_action(|ctx| {
3029 let a = ctx.input::<i32>("p2")?;
3030 let b = ctx.input::<i32>("p3")?;
3031 ctx.output("p4", *a + *b)?;
3032 Ok(())
3033 }))
3034 .build();
3035
3036 let net = PetriNet::builder("fork_join")
3037 .transitions([t_fork, t_join])
3038 .build();
3039
3040 let mut marking = Marking::new();
3041 marking.add(&p1, Token::at(5, 0));
3042
3043 let mut executor =
3044 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3045
3046 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3047 executor.run_async(rx).await;
3048
3049 assert_eq!(executor.marking().count("p2"), 0);
3050 assert_eq!(executor.marking().count("p3"), 0);
3051 assert_eq!(executor.marking().count("p4"), 1);
3052 assert_eq!(*executor.marking().peek(&p4).unwrap(), 10);
3053 }
3054
3055 #[tokio::test]
3056 async fn async_xor_output_branching() {
3057 use libpetri_core::output::xor;
3058
3059 let p = Place::<i32>::new("p");
3060 let left = Place::<i32>::new("left");
3061 let right = Place::<i32>::new("right");
3062
3063 let t = Transition::builder("xor_t")
3064 .input(one(&p))
3065 .output(xor(vec![out_place(&left), out_place(&right)]))
3066 .action(libpetri_core::action::sync_action(|ctx| {
3067 let v = ctx.input::<i32>("p")?;
3068 if *v > 0 {
3069 ctx.output("left", *v)?;
3070 } else {
3071 ctx.output("right", *v)?;
3072 }
3073 Ok(())
3074 }))
3075 .build();
3076
3077 let net = PetriNet::builder("xor_test").transition(t).build();
3078
3079 let mut marking = Marking::new();
3081 marking.add(&p, Token::at(42, 0));
3082
3083 let mut executor =
3084 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3085 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3086 executor.run_async(rx).await;
3087
3088 assert_eq!(executor.marking().count("left"), 1);
3089 assert_eq!(executor.marking().count("right"), 0);
3090 assert_eq!(*executor.marking().peek(&left).unwrap(), 42);
3091 }
3092
3093 #[tokio::test]
3094 async fn async_loop_with_guard() {
3095 use libpetri_core::input::one_guarded;
3096
3097 let counter = Place::<i32>::new("counter");
3098 let done = Place::<i32>::new("done");
3099
3100 let t_loop = Transition::builder("loop")
3102 .input(one_guarded(&counter, |v: &i32| *v < 3))
3103 .output(out_place(&counter))
3104 .action(libpetri_core::action::sync_action(|ctx| {
3105 let v = ctx.input::<i32>("counter")?;
3106 ctx.output("counter", *v + 1)?;
3107 Ok(())
3108 }))
3109 .build();
3110
3111 let t_exit = Transition::builder("exit")
3113 .input(one_guarded(&counter, |v: &i32| *v >= 3))
3114 .output(out_place(&done))
3115 .action(fork())
3116 .build();
3117
3118 let net = PetriNet::builder("loop_net")
3119 .transitions([t_loop, t_exit])
3120 .build();
3121
3122 let mut marking = Marking::new();
3123 marking.add(&counter, Token::at(0, 0));
3124
3125 let mut executor =
3126 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3127
3128 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3129 executor.run_async(rx).await;
3130
3131 assert_eq!(executor.marking().count("done"), 1);
3132 assert_eq!(*executor.marking().peek(&done).unwrap(), 3);
3133 }
3134
3135 #[tokio::test]
3136 async fn async_delayed_fires_without_injection() {
3137 use libpetri_core::timing::delayed;
3138
3139 let p1 = Place::<i32>::new("p1");
3140 let p2 = Place::<i32>::new("p2");
3141
3142 let t1 = Transition::builder("t1")
3143 .input(one(&p1))
3144 .output(out_place(&p2))
3145 .timing(delayed(100))
3146 .action(fork())
3147 .build();
3148
3149 let net = PetriNet::builder("test").transition(t1).build();
3150 let mut marking = Marking::new();
3151 marking.add(&p1, Token::at(7, 0));
3152
3153 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3154 &net,
3155 marking,
3156 ExecutorOptions {
3157 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3158 },
3159 );
3160
3161 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3163 tokio::spawn(async move {
3164 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3165 drop(tx);
3166 });
3167 executor.run_async(rx).await;
3168
3169 assert_eq!(executor.marking().count("p2"), 1);
3170 assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3171 }
3172
3173 #[tokio::test]
3174 #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3175 async fn async_timeout_produces_timeout_token() {
3176 use libpetri_core::output::{timeout_place, xor};
3177
3178 let p1 = Place::<i32>::new("p1");
3179 let success = Place::<i32>::new("success");
3180 let timeout_out = Place::<i32>::new("timeout_out");
3181
3182 let t1 = Transition::builder("t1")
3183 .input(one(&p1))
3184 .output(xor(vec![
3185 out_place(&success),
3186 timeout_place(50, &timeout_out),
3187 ]))
3188 .action(async_action(|mut ctx| async move {
3189 let v: i32 = *ctx.input::<i32>("p1")?;
3190 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3191 ctx.output("success", v)?;
3192 Ok(ctx)
3193 }))
3194 .build();
3195
3196 let net = PetriNet::builder("test").transition(t1).build();
3197 let mut marking = Marking::new();
3198 marking.add(&p1, Token::at(1, 0));
3199
3200 let mut executor =
3201 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3202
3203 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3204 executor.run_async(rx).await;
3205
3206 assert_eq!(executor.marking().count("timeout_out"), 1);
3207 assert_eq!(executor.marking().count("success"), 0);
3208 }
3209
3210 #[tokio::test]
3211 #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3212 async fn async_timeout_normal_when_fast() {
3213 use libpetri_core::output::{timeout_place, xor};
3214
3215 let p1 = Place::<i32>::new("p1");
3216 let success = Place::<i32>::new("success");
3217 let timeout_out = Place::<i32>::new("timeout_out");
3218
3219 let t1 = Transition::builder("t1")
3220 .input(one(&p1))
3221 .output(xor(vec![
3222 out_place(&success),
3223 timeout_place(500, &timeout_out),
3224 ]))
3225 .action(async_action(|mut ctx| async move {
3226 let v: i32 = *ctx.input::<i32>("p1")?;
3227 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3228 ctx.output("success", v)?;
3229 Ok(ctx)
3230 }))
3231 .build();
3232
3233 let net = PetriNet::builder("test").transition(t1).build();
3234 let mut marking = Marking::new();
3235 marking.add(&p1, Token::at(1, 0));
3236
3237 let mut executor =
3238 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3239
3240 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3241 executor.run_async(rx).await;
3242
3243 assert_eq!(executor.marking().count("success"), 1);
3244 assert_eq!(executor.marking().count("timeout_out"), 0);
3245 }
3246
3247 #[tokio::test]
3248 async fn async_event_store_records_token_added_for_injection() {
3249 let p1 = Place::<i32>::new("p1");
3250 let p2 = Place::<i32>::new("p2");
3251
3252 let t1 = Transition::builder("t1")
3253 .input(one(&p1))
3254 .output(out_place(&p2))
3255 .action(fork())
3256 .build();
3257
3258 let net = PetriNet::builder("test").transition(t1).build();
3259 let marking = Marking::new();
3260
3261 let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
3262 &net,
3263 marking,
3264 ExecutorOptions {
3265 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3266 },
3267 );
3268
3269 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3270
3271 tokio::spawn(async move {
3272 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3273 let token = Token::at(99, 0);
3274 tx.send(ExecutorSignal::Event(ExternalEvent {
3275 place_name: Arc::from("p1"),
3276 token: ErasedToken::from_typed(&token),
3277 }))
3278 .unwrap();
3279 });
3280
3281 executor.run_async(rx).await;
3282
3283 let events = executor.event_store().events();
3284 assert!(
3286 events.iter().any(
3287 |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p1")
3288 ),
3289 "expected TokenAdded event for injected token into p1"
3290 );
3291 assert!(
3293 events.iter().any(
3294 |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p2")
3295 ),
3296 "expected TokenAdded event for output token into p2"
3297 );
3298 }
3299
3300 #[tokio::test]
3301 async fn async_inhibitor_blocks_in_async() {
3302 let p1 = Place::<()>::new("p1");
3303 let p2 = Place::<()>::new("p2");
3304 let p_inh = Place::<()>::new("inh");
3305
3306 let t = Transition::builder("t1")
3307 .input(one(&p1))
3308 .output(out_place(&p2))
3309 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
3310 .action(fork())
3311 .build();
3312
3313 let net = PetriNet::builder("inhibitor").transition(t).build();
3314
3315 let mut marking = Marking::new();
3316 marking.add(&p1, Token::at((), 0));
3317 marking.add(&p_inh, Token::at((), 0));
3318
3319 let mut executor =
3320 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3321
3322 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3323 executor.run_async(rx).await;
3324
3325 assert_eq!(executor.marking().count("p1"), 1);
3327 assert_eq!(executor.marking().count("p2"), 0);
3328 }
3329
3330 #[tokio::test]
3333 async fn async_drain_terminates_at_quiescence() {
3334 let p1 = Place::<i32>::new("p1");
3335 let p2 = Place::<i32>::new("p2");
3336
3337 let t1 = Transition::builder("t1")
3338 .input(one(&p1))
3339 .output(out_place(&p2))
3340 .action(fork())
3341 .build();
3342
3343 let net = PetriNet::builder("test").transition(t1).build();
3344 let marking = Marking::new();
3345
3346 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3347 &net,
3348 marking,
3349 ExecutorOptions {
3350 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3351 },
3352 );
3353
3354 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3355
3356 tokio::spawn(async move {
3357 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3358 tx.send(ExecutorSignal::Event(ExternalEvent {
3360 place_name: Arc::from("p1"),
3361 token: ErasedToken::from_typed(&Token::at(42, 0)),
3362 }))
3363 .unwrap();
3364 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3365 tx.send(ExecutorSignal::Drain).unwrap();
3366 });
3367
3368 executor.run_async(rx).await;
3369
3370 assert_eq!(executor.marking().count("p2"), 1);
3372 assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
3373 }
3374
3375 #[tokio::test]
3376 async fn async_drain_rejects_post_drain_events() {
3377 let p1 = Place::<i32>::new("p1");
3378 let p2 = Place::<i32>::new("p2");
3379
3380 let t1 = Transition::builder("t1")
3381 .input(one(&p1))
3382 .output(out_place(&p2))
3383 .action(fork())
3384 .build();
3385
3386 let net = PetriNet::builder("test").transition(t1).build();
3387 let marking = Marking::new();
3388
3389 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3390 &net,
3391 marking,
3392 ExecutorOptions {
3393 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3394 },
3395 );
3396
3397 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3398
3399 tokio::spawn(async move {
3400 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3401 tx.send(ExecutorSignal::Drain).unwrap();
3403 tx.send(ExecutorSignal::Event(ExternalEvent {
3404 place_name: Arc::from("p1"),
3405 token: ErasedToken::from_typed(&Token::at(99, 0)),
3406 }))
3407 .unwrap();
3408 });
3409
3410 executor.run_async(rx).await;
3411
3412 assert_eq!(executor.marking().count("p2"), 0);
3414 }
3415
3416 #[tokio::test]
3417 async fn async_close_discards_queued_events() {
3418 let p1 = Place::<i32>::new("p1");
3419 let p2 = Place::<i32>::new("p2");
3420
3421 let t1 = Transition::builder("t1")
3422 .input(one(&p1))
3423 .output(out_place(&p2))
3424 .action(fork())
3425 .build();
3426
3427 let net = PetriNet::builder("test").transition(t1).build();
3428 let marking = Marking::new();
3429
3430 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3431 &net,
3432 marking,
3433 ExecutorOptions {
3434 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3435 },
3436 );
3437
3438 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3439
3440 tx.send(ExecutorSignal::Event(ExternalEvent {
3442 place_name: Arc::from("p1"),
3443 token: ErasedToken::from_typed(&Token::at(1, 0)),
3444 }))
3445 .unwrap();
3446 tx.send(ExecutorSignal::Close).unwrap();
3447 tx.send(ExecutorSignal::Event(ExternalEvent {
3448 place_name: Arc::from("p1"),
3449 token: ErasedToken::from_typed(&Token::at(2, 0)),
3450 }))
3451 .unwrap();
3452 drop(tx);
3453
3454 executor.run_async(rx).await;
3455
3456 assert!(executor.marking().count("p2") <= 1);
3460 }
3461
3462 #[tokio::test]
3463 async fn async_close_after_drain_escalates() {
3464 let p1 = Place::<i32>::new("p1");
3465 let p2 = Place::<i32>::new("p2");
3466
3467 let t1 = Transition::builder("t1")
3468 .input(one(&p1))
3469 .output(out_place(&p2))
3470 .action(fork())
3471 .build();
3472
3473 let net = PetriNet::builder("test").transition(t1).build();
3474 let marking = Marking::new();
3475
3476 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3477 &net,
3478 marking,
3479 ExecutorOptions {
3480 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3481 },
3482 );
3483
3484 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3485
3486 tokio::spawn(async move {
3487 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3488 tx.send(ExecutorSignal::Drain).unwrap();
3490 tx.send(ExecutorSignal::Close).unwrap();
3491 });
3492
3493 executor.run_async(rx).await;
3495 }
3497
3498 #[tokio::test]
3499 async fn async_handle_raii_drain_on_drop() {
3500 use crate::executor_handle::ExecutorHandle;
3501
3502 let p1 = Place::<i32>::new("p1");
3503 let p2 = Place::<i32>::new("p2");
3504
3505 let t1 = Transition::builder("t1")
3506 .input(one(&p1))
3507 .output(out_place(&p2))
3508 .action(fork())
3509 .build();
3510
3511 let net = PetriNet::builder("test").transition(t1).build();
3512 let marking = Marking::new();
3513
3514 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3515 &net,
3516 marking,
3517 ExecutorOptions {
3518 environment_places: ["p1"].iter().map(|s| Arc::from(*s)).collect(),
3519 },
3520 );
3521
3522 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
3523
3524 tokio::spawn(async move {
3525 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3526 let mut handle = ExecutorHandle::new(tx);
3527 handle.inject(Arc::from("p1"), ErasedToken::from_typed(&Token::at(7, 0)));
3528 });
3530
3531 executor.run_async(rx).await;
3532
3533 assert_eq!(executor.marking().count("p2"), 1);
3534 assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3535 }
3536}