1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Instant;
4
5use libpetri_core::context::{OutputEntry, TransitionContext};
6use libpetri_core::input::In;
7use libpetri_core::output::Out;
8use libpetri_core::petri_net::PetriNet;
9use libpetri_core::token::ErasedToken;
10
11use libpetri_event::event_store::EventStore;
12use libpetri_event::net_event::NetEvent;
13
14use crate::bitmap;
15use crate::compiled_net::CompiledNet;
16use crate::marking::Marking;
17
18const DEADLINE_TOLERANCE_MS: f64 = 5.0;
20
21pub struct BitmapNetExecutor<E: EventStore> {
26 compiled: CompiledNet,
27 marking: Marking,
28 event_store: E,
29 #[allow(dead_code)]
30 environment_places: HashSet<Arc<str>>,
31 #[allow(dead_code)]
32 long_running: bool,
33
34 marked_places: Vec<u64>,
36 dirty_set: Vec<u64>,
37 marking_snap_buffer: Vec<u64>,
38 dirty_snap_buffer: Vec<u64>,
39 firing_snap_buffer: Vec<u64>,
40
41 enabled_at_ms: Vec<f64>,
43 enabled_flags: Vec<bool>,
44 has_deadline_flags: Vec<bool>,
45 enabled_transition_count: usize,
46
47 all_immediate: bool,
49 all_same_priority: bool,
50 has_any_deadlines: bool,
51
52 pending_reset_places: HashSet<Arc<str>>,
54 transition_input_place_names: Vec<HashSet<Arc<str>>>,
55
56 start_time: Instant,
57}
58
59#[derive(Default)]
61pub struct ExecutorOptions {
62 pub environment_places: HashSet<Arc<str>>,
63 pub long_running: bool,
64}
65
66impl<E: EventStore> BitmapNetExecutor<E> {
67 pub fn new(net: &PetriNet, initial_tokens: Marking, options: ExecutorOptions) -> Self {
69 let compiled = CompiledNet::compile(net);
70 let word_count = compiled.word_count;
71 let tc = compiled.transition_count;
72 let dirty_word_count = bitmap::word_count(tc);
73
74 let mut has_any_deadlines = false;
75 let mut all_immediate = true;
76 let mut all_same_priority = true;
77 let first_priority = if tc > 0 {
78 compiled.transition(0).priority()
79 } else {
80 0
81 };
82 let mut has_deadline_flags = vec![false; tc];
83
84 for (tid, flag) in has_deadline_flags.iter_mut().enumerate() {
85 let t = compiled.transition(tid);
86 if t.timing().has_deadline() {
87 *flag = true;
88 has_any_deadlines = true;
89 }
90 if *t.timing() != libpetri_core::timing::Timing::Immediate {
91 all_immediate = false;
92 }
93 if t.priority() != first_priority {
94 all_same_priority = false;
95 }
96 }
97
98 let mut transition_input_place_names = Vec::with_capacity(tc);
100 for tid in 0..tc {
101 let t = compiled.transition(tid);
102 let names: HashSet<Arc<str>> = t
103 .input_specs()
104 .iter()
105 .map(|s| Arc::clone(s.place().name_arc()))
106 .collect();
107 transition_input_place_names.push(names);
108 }
109
110 Self {
111 compiled,
112 marking: initial_tokens,
113 event_store: E::default(),
114 environment_places: options.environment_places,
115 long_running: options.long_running,
116 marked_places: vec![0u64; word_count],
117 dirty_set: vec![0u64; dirty_word_count],
118 marking_snap_buffer: vec![0u64; word_count],
119 dirty_snap_buffer: vec![0u64; dirty_word_count],
120 firing_snap_buffer: vec![0u64; word_count],
121 enabled_at_ms: vec![f64::NEG_INFINITY; tc],
122 enabled_flags: vec![false; tc],
123 has_deadline_flags,
124 enabled_transition_count: 0,
125 all_immediate,
126 all_same_priority,
127 has_any_deadlines,
128 pending_reset_places: HashSet::new(),
129 transition_input_place_names,
130 start_time: Instant::now(),
131 }
132 }
133
134 pub fn run_sync(&mut self) -> &Marking {
139 self.initialize_marked_bitmap();
140 self.mark_all_dirty();
141
142 if E::ENABLED {
143 let now = now_millis();
144 self.event_store.append(NetEvent::ExecutionStarted {
145 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
146 timestamp: now,
147 });
148 }
149
150 loop {
151 self.update_dirty_transitions();
152
153 let cycle_now = self.elapsed_ms();
154
155 if self.has_any_deadlines {
156 self.enforce_deadlines(cycle_now);
157 }
158
159 if self.should_terminate() {
160 break;
161 }
162
163 if self.all_immediate && self.all_same_priority {
164 self.fire_ready_immediate_sync();
165 } else {
166 self.fire_ready_general_sync(cycle_now);
167 }
168
169 if !self.has_dirty_bits() && self.enabled_transition_count == 0 {
171 break;
172 }
173 }
174
175 if E::ENABLED {
176 let now = now_millis();
177 self.event_store.append(NetEvent::ExecutionCompleted {
178 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
179 timestamp: now,
180 });
181 }
182
183 &self.marking
184 }
185
186 pub fn marking(&self) -> &Marking {
188 &self.marking
189 }
190
191 pub fn event_store(&self) -> &E {
193 &self.event_store
194 }
195
196 pub fn is_quiescent(&self) -> bool {
198 self.enabled_transition_count == 0
199 }
200
201 fn initialize_marked_bitmap(&mut self) {
204 for pid in 0..self.compiled.place_count {
205 let place = self.compiled.place(pid);
206 if self.marking.has_tokens(place.name()) {
207 bitmap::set_bit(&mut self.marked_places, pid);
208 }
209 }
210 }
211
212 fn mark_all_dirty(&mut self) {
213 let tc = self.compiled.transition_count;
214 let dirty_words = self.dirty_set.len();
215 for w in 0..dirty_words.saturating_sub(1) {
216 self.dirty_set[w] = u64::MAX;
217 }
218 if dirty_words > 0 {
219 let last_word_bits = tc & bitmap::WORD_MASK;
220 self.dirty_set[dirty_words - 1] = if last_word_bits == 0 {
221 u64::MAX
222 } else {
223 (1u64 << last_word_bits) - 1
224 };
225 }
226 }
227
228 fn should_terminate(&self) -> bool {
229 if self.long_running {
230 return false;
231 }
232 self.enabled_transition_count == 0
233 }
234
235 fn update_dirty_transitions(&mut self) {
238 let now_ms = self.elapsed_ms();
239
240 self.marking_snap_buffer
242 .copy_from_slice(&self.marked_places);
243
244 let dirty_words = self.dirty_set.len();
246 for w in 0..dirty_words {
247 self.dirty_snap_buffer[w] = self.dirty_set[w];
248 self.dirty_set[w] = 0;
249 }
250
251 let tc = self.compiled.transition_count;
253 let mut dirty_tids = Vec::new();
254 bitmap::for_each_set_bit(&self.dirty_snap_buffer, |tid| {
255 if tid < tc {
256 dirty_tids.push(tid);
257 }
258 });
259
260 let marking_snap = self.marking_snap_buffer.clone();
261 for tid in dirty_tids {
262 let was_enabled = self.enabled_flags[tid];
263 let can_now = self.can_enable(tid, &marking_snap);
264
265 if can_now && !was_enabled {
266 self.enabled_flags[tid] = true;
267 self.enabled_transition_count += 1;
268 self.enabled_at_ms[tid] = now_ms;
269
270 if E::ENABLED {
271 self.event_store.append(NetEvent::TransitionEnabled {
272 transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
273 timestamp: now_millis(),
274 });
275 }
276 } else if !can_now && was_enabled {
277 self.enabled_flags[tid] = false;
278 self.enabled_transition_count -= 1;
279 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
280 } else if can_now && was_enabled && self.has_input_from_reset_place(tid) {
281 self.enabled_at_ms[tid] = now_ms;
282 if E::ENABLED {
283 self.event_store.append(NetEvent::TransitionClockRestarted {
284 transition_name: Arc::clone(self.compiled.transition(tid).name_arc()),
285 timestamp: now_millis(),
286 });
287 }
288 }
289 }
290
291 self.pending_reset_places.clear();
292 }
293
294 fn enforce_deadlines(&mut self, now_ms: f64) {
295 for tid in 0..self.compiled.transition_count {
296 if !self.has_deadline_flags[tid] || !self.enabled_flags[tid] {
297 continue;
298 }
299 let t = self.compiled.transition(tid);
300 let elapsed = now_ms - self.enabled_at_ms[tid];
301 let latest_ms = t.timing().latest() as f64;
302 if elapsed > latest_ms + DEADLINE_TOLERANCE_MS {
303 self.enabled_flags[tid] = false;
304 self.enabled_transition_count -= 1;
305 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
306
307 if E::ENABLED {
308 self.event_store.append(NetEvent::TransitionTimedOut {
309 transition_name: Arc::clone(t.name_arc()),
310 timestamp: now_millis(),
311 });
312 }
313 }
314 }
315 }
316
317 fn can_enable(&self, tid: usize, marking_snap: &[u64]) -> bool {
318 if !self.compiled.can_enable_bitmap(tid, marking_snap) {
319 return false;
320 }
321
322 if let Some(card_check) = self.compiled.cardinality_check(tid) {
324 for i in 0..card_check.place_ids.len() {
325 let pid = card_check.place_ids[i];
326 let required = card_check.required_counts[i];
327 let place = self.compiled.place(pid);
328 if self.marking.count(place.name()) < required {
329 return false;
330 }
331 }
332 }
333
334 if self.compiled.has_guards(tid) {
336 let t = self.compiled.transition(tid);
337 for spec in t.input_specs() {
338 if let Some(guard) = spec.guard() {
339 let required = match spec {
340 In::One { .. } => 1,
341 In::Exactly { count, .. } => *count,
342 In::AtLeast { minimum, .. } => *minimum,
343 In::All { .. } => 1,
344 };
345 if self.marking.count_matching(spec.place_name(), &**guard) < required {
346 return false;
347 }
348 }
349 }
350 }
351
352 true
353 }
354
355 fn has_input_from_reset_place(&self, tid: usize) -> bool {
356 if self.pending_reset_places.is_empty() {
357 return false;
358 }
359 let input_names = &self.transition_input_place_names[tid];
360 for name in &self.pending_reset_places {
361 if input_names.contains(name) {
362 return true;
363 }
364 }
365 false
366 }
367
368 fn fire_ready_immediate_sync(&mut self) {
371 for tid in 0..self.compiled.transition_count {
372 if !self.enabled_flags[tid] {
373 continue;
374 }
375 if self.can_enable(tid, &self.marked_places.clone()) {
376 self.fire_transition_sync(tid);
377 } else {
378 self.enabled_flags[tid] = false;
379 self.enabled_transition_count -= 1;
380 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
381 }
382 }
383 }
384
385 fn fire_ready_general_sync(&mut self, now_ms: f64) {
386 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
388 for tid in 0..self.compiled.transition_count {
389 if !self.enabled_flags[tid] {
390 continue;
391 }
392 let t = self.compiled.transition(tid);
393 let enabled_ms = self.enabled_at_ms[tid];
394 let elapsed = now_ms - enabled_ms;
395 let earliest_ms = t.timing().earliest() as f64;
396 if earliest_ms <= elapsed {
397 ready.push((tid, t.priority(), enabled_ms));
398 }
399 }
400 if ready.is_empty() {
401 return;
402 }
403
404 ready.sort_by(|a, b| {
406 b.1.cmp(&a.1)
407 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
408 });
409
410 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
412
413 for (tid, _, _) in ready {
414 if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
415 self.fire_transition_sync(tid);
416 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
417 } else {
418 self.enabled_flags[tid] = false;
419 self.enabled_transition_count -= 1;
420 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
421 }
422 }
423 }
424
425 fn fire_transition_sync(&mut self, tid: usize) {
426 let t = self.compiled.transition(tid);
428 let transition_name = Arc::clone(t.name_arc());
429 let input_specs: Vec<In> = t.input_specs().to_vec();
430 let read_arcs: Vec<_> = t.reads().to_vec();
431 let reset_arcs: Vec<_> = t.resets().to_vec();
432 let output_place_names: HashSet<Arc<str>> = t
433 .output_places()
434 .iter()
435 .map(|p| Arc::clone(p.name_arc()))
436 .collect();
437 let action = Arc::clone(t.action());
438 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
442 for in_spec in &input_specs {
443 let place_name = in_spec.place_name();
444 let to_consume = match in_spec {
445 In::One { .. } => 1,
446 In::Exactly { count, .. } => *count,
447 In::All { guard, .. } | In::AtLeast { guard, .. } => {
448 if guard.is_some() {
449 self.marking
450 .count_matching(place_name, &**guard.as_ref().unwrap())
451 } else {
452 self.marking.count(place_name)
453 }
454 }
455 };
456
457 let place_name_arc = Arc::clone(in_spec.place().name_arc());
458 for _ in 0..to_consume {
459 let token = if let Some(guard) = in_spec.guard() {
460 self.marking.remove_matching(place_name, &**guard)
461 } else {
462 self.marking.remove_first(place_name)
463 };
464 if let Some(token) = token {
465 if E::ENABLED {
466 self.event_store.append(NetEvent::TokenRemoved {
467 place_name: Arc::clone(&place_name_arc),
468 timestamp: now_millis(),
469 });
470 }
471 inputs
472 .entry(Arc::clone(&place_name_arc))
473 .or_default()
474 .push(token);
475 }
476 }
477 }
478
479 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
481 for arc in &read_arcs {
482 if let Some(queue) = self.marking.queue(arc.place.name())
483 && let Some(token) = queue.front()
484 {
485 read_tokens
486 .entry(Arc::clone(arc.place.name_arc()))
487 .or_default()
488 .push(token.clone());
489 }
490 }
491
492 for arc in &reset_arcs {
494 let removed = self.marking.remove_all(arc.place.name());
495 self.pending_reset_places
496 .insert(Arc::clone(arc.place.name_arc()));
497 if E::ENABLED {
498 for _ in &removed {
499 self.event_store.append(NetEvent::TokenRemoved {
500 place_name: Arc::clone(arc.place.name_arc()),
501 timestamp: now_millis(),
502 });
503 }
504 }
505 }
506
507 self.update_bitmap_after_consumption(tid);
509
510 if E::ENABLED {
511 self.event_store.append(NetEvent::TransitionStarted {
512 transition_name: Arc::clone(&transition_name),
513 timestamp: now_millis(),
514 });
515 }
516
517 let mut ctx = TransitionContext::new(
519 Arc::clone(&transition_name),
520 inputs,
521 read_tokens,
522 output_place_names,
523 None,
524 );
525
526 let result = action.run_sync(&mut ctx);
527
528 match result {
529 Ok(()) => {
530 let outputs = ctx.take_outputs();
532 self.process_outputs(tid, &transition_name, outputs);
533
534 if E::ENABLED {
535 self.event_store.append(NetEvent::TransitionCompleted {
536 transition_name: Arc::clone(&transition_name),
537 timestamp: now_millis(),
538 });
539 }
540 }
541 Err(err) => {
542 if E::ENABLED {
543 self.event_store.append(NetEvent::TransitionFailed {
544 transition_name: Arc::clone(&transition_name),
545 error: err.message,
546 timestamp: now_millis(),
547 });
548 }
549 }
550 }
551
552 self.enabled_flags[tid] = false;
554 self.enabled_transition_count -= 1;
555 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
556
557 self.mark_transition_dirty(tid);
559 }
560
561 fn process_outputs(
562 &mut self,
563 _tid: usize,
564 _transition_name: &Arc<str>,
565 outputs: Vec<OutputEntry>,
566 ) {
567 for entry in outputs {
568 self.marking.add_erased(&entry.place_name, entry.token);
569
570 if let Some(pid) = self.compiled.place_id(&entry.place_name) {
571 bitmap::set_bit(&mut self.marked_places, pid);
572 self.mark_dirty(pid);
573 }
574
575 if E::ENABLED {
576 self.event_store.append(NetEvent::TokenAdded {
577 place_name: Arc::clone(&entry.place_name),
578 timestamp: now_millis(),
579 });
580 }
581 }
582 }
583
584 fn update_bitmap_after_consumption(&mut self, tid: usize) {
585 let consumption_pids: Vec<usize> = self.compiled.consumption_place_ids(tid).to_vec();
586 for pid in consumption_pids {
587 let place = self.compiled.place(pid);
588 if !self.marking.has_tokens(place.name()) {
589 bitmap::clear_bit(&mut self.marked_places, pid);
590 }
591 self.mark_dirty(pid);
592 }
593 }
594
595 fn has_dirty_bits(&self) -> bool {
598 !bitmap::is_empty(&self.dirty_set)
599 }
600
601 fn mark_dirty(&mut self, pid: usize) {
602 let tids: Vec<usize> = self.compiled.affected_transitions(pid).to_vec();
603 for tid in tids {
604 self.mark_transition_dirty(tid);
605 }
606 }
607
608 fn mark_transition_dirty(&mut self, tid: usize) {
609 bitmap::set_bit(&mut self.dirty_set, tid);
610 }
611
612 fn elapsed_ms(&self) -> f64 {
613 self.start_time.elapsed().as_secs_f64() * 1000.0
614 }
615}
616
617#[cfg(feature = "tokio")]
618use crate::environment::ExternalEvent;
619
620#[cfg(feature = "tokio")]
622struct ActionCompletion {
623 transition_name: Arc<str>,
624 result: Result<Vec<OutputEntry>, String>,
625}
626
627#[cfg(feature = "tokio")]
628impl<E: EventStore> BitmapNetExecutor<E> {
629 pub async fn run_async(
640 &mut self,
641 mut event_rx: tokio::sync::mpsc::UnboundedReceiver<ExternalEvent>,
642 ) -> &Marking {
643 let (completion_tx, mut completion_rx) =
644 tokio::sync::mpsc::unbounded_channel::<ActionCompletion>();
645
646 self.initialize_marked_bitmap();
647 self.mark_all_dirty();
648
649 let mut in_flight_count: usize = 0;
650 let mut event_channel_open = true;
651
652 if E::ENABLED {
653 let now = now_millis();
654 self.event_store.append(NetEvent::ExecutionStarted {
655 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
656 timestamp: now,
657 });
658 }
659
660 loop {
661 while let Ok(completion) = completion_rx.try_recv() {
663 in_flight_count -= 1;
664 match completion.result {
665 Ok(outputs) => {
666 self.process_outputs(0, &completion.transition_name, outputs);
667 if E::ENABLED {
668 self.event_store.append(NetEvent::TransitionCompleted {
669 transition_name: Arc::clone(&completion.transition_name),
670 timestamp: now_millis(),
671 });
672 }
673 }
674 Err(err) => {
675 if E::ENABLED {
676 self.event_store.append(NetEvent::TransitionFailed {
677 transition_name: Arc::clone(&completion.transition_name),
678 error: err,
679 timestamp: now_millis(),
680 });
681 }
682 }
683 }
684 }
685
686 while let Ok(event) = event_rx.try_recv() {
688 self.marking.add_erased(&event.place_name, event.token);
689 if let Some(pid) = self.compiled.place_id(&event.place_name) {
690 bitmap::set_bit(&mut self.marked_places, pid);
691 self.mark_dirty(pid);
692 }
693 if E::ENABLED {
694 self.event_store.append(NetEvent::TokenAdded {
695 place_name: Arc::clone(&event.place_name),
696 timestamp: now_millis(),
697 });
698 }
699 }
700
701 self.update_dirty_transitions();
703
704 let cycle_now = self.elapsed_ms();
706 if self.has_any_deadlines {
707 self.enforce_deadlines(cycle_now);
708 }
709
710 if self.enabled_transition_count == 0
712 && in_flight_count == 0
713 && (!self.long_running || !event_channel_open)
714 {
715 break;
716 }
717
718 let fired = self.fire_ready_async(cycle_now, &completion_tx, &mut in_flight_count);
720
721 if fired || self.has_dirty_bits() {
723 tokio::task::yield_now().await;
725 continue;
726 }
727
728 if in_flight_count == 0 && !self.long_running {
730 break;
731 }
732 if in_flight_count == 0 && !event_channel_open {
733 break;
734 }
735
736 let timer_ms = self.millis_until_next_timed_transition();
737
738 tokio::select! {
739 Some(completion) = completion_rx.recv() => {
740 in_flight_count -= 1;
741 match completion.result {
742 Ok(outputs) => {
743 self.process_outputs(0, &completion.transition_name, outputs);
744 if E::ENABLED {
745 self.event_store.append(NetEvent::TransitionCompleted {
746 transition_name: Arc::clone(&completion.transition_name),
747 timestamp: now_millis(),
748 });
749 }
750 }
751 Err(err) => {
752 if E::ENABLED {
753 self.event_store.append(NetEvent::TransitionFailed {
754 transition_name: Arc::clone(&completion.transition_name),
755 error: err,
756 timestamp: now_millis(),
757 });
758 }
759 }
760 }
761 }
762 result = event_rx.recv(), if event_channel_open => {
763 match result {
764 Some(event) => {
765 self.marking.add_erased(&event.place_name, event.token);
766 if let Some(pid) = self.compiled.place_id(&event.place_name) {
767 bitmap::set_bit(&mut self.marked_places, pid);
768 self.mark_dirty(pid);
769 }
770 if E::ENABLED {
771 self.event_store.append(NetEvent::TokenAdded {
772 place_name: Arc::clone(&event.place_name),
773 timestamp: now_millis(),
774 });
775 }
776 }
777 None => {
778 event_channel_open = false;
779 }
780 }
781 }
782 _ = tokio::time::sleep(std::time::Duration::from_millis(
783 if timer_ms < f64::INFINITY { timer_ms as u64 } else { 60_000 }
784 )) => {
785 }
787 }
788 }
789
790 if E::ENABLED {
791 let now = now_millis();
792 self.event_store.append(NetEvent::ExecutionCompleted {
793 net_name: Arc::clone(&Arc::from(self.compiled.net().name())),
794 timestamp: now,
795 });
796 }
797
798 &self.marking
799 }
800
801 fn fire_ready_async(
804 &mut self,
805 now_ms: f64,
806 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
807 in_flight_count: &mut usize,
808 ) -> bool {
809 let mut ready: Vec<(usize, i32, f64)> = Vec::new();
810 for tid in 0..self.compiled.transition_count {
811 if !self.enabled_flags[tid] {
812 continue;
813 }
814 let t = self.compiled.transition(tid);
815 let enabled_ms = self.enabled_at_ms[tid];
816 let elapsed = now_ms - enabled_ms;
817 let earliest_ms = t.timing().earliest() as f64;
818 if earliest_ms <= elapsed {
819 ready.push((tid, t.priority(), enabled_ms));
820 }
821 }
822 if ready.is_empty() {
823 return false;
824 }
825
826 ready.sort_by(|a, b| {
827 b.1.cmp(&a.1)
828 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
829 });
830
831 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
832
833 let mut fired_any = false;
834 for (tid, _, _) in ready {
835 if self.enabled_flags[tid] && self.can_enable(tid, &self.firing_snap_buffer.clone()) {
836 self.fire_transition_async(tid, completion_tx, in_flight_count);
837 self.firing_snap_buffer.copy_from_slice(&self.marked_places);
838 fired_any = true;
839 } else {
840 self.enabled_flags[tid] = false;
841 self.enabled_transition_count -= 1;
842 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
843 }
844 }
845 fired_any
846 }
847
848 fn fire_transition_async(
850 &mut self,
851 tid: usize,
852 completion_tx: &tokio::sync::mpsc::UnboundedSender<ActionCompletion>,
853 in_flight_count: &mut usize,
854 ) {
855 let t = self.compiled.transition(tid);
856 let transition_name = Arc::clone(t.name_arc());
857 let input_specs: Vec<In> = t.input_specs().to_vec();
858 let read_arcs: Vec<_> = t.reads().to_vec();
859 let reset_arcs: Vec<_> = t.resets().to_vec();
860 let output_place_names: HashSet<Arc<str>> = t
861 .output_places()
862 .iter()
863 .map(|p| Arc::clone(p.name_arc()))
864 .collect();
865 let action = Arc::clone(t.action());
866 let is_sync = action.is_sync();
867
868 let mut inputs: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
870 for in_spec in &input_specs {
871 let place_name = in_spec.place_name();
872 let to_consume = match in_spec {
873 In::One { .. } => 1,
874 In::Exactly { count, .. } => *count,
875 In::All { guard, .. } | In::AtLeast { guard, .. } => {
876 if guard.is_some() {
877 self.marking
878 .count_matching(place_name, &**guard.as_ref().unwrap())
879 } else {
880 self.marking.count(place_name)
881 }
882 }
883 };
884
885 let place_name_arc = Arc::clone(in_spec.place().name_arc());
886 for _ in 0..to_consume {
887 let token = if let Some(guard) = in_spec.guard() {
888 self.marking.remove_matching(place_name, &**guard)
889 } else {
890 self.marking.remove_first(place_name)
891 };
892 if let Some(token) = token {
893 if E::ENABLED {
894 self.event_store.append(NetEvent::TokenRemoved {
895 place_name: Arc::clone(&place_name_arc),
896 timestamp: now_millis(),
897 });
898 }
899 inputs
900 .entry(Arc::clone(&place_name_arc))
901 .or_default()
902 .push(token);
903 }
904 }
905 }
906
907 let mut read_tokens: HashMap<Arc<str>, Vec<ErasedToken>> = HashMap::new();
909 for arc in &read_arcs {
910 if let Some(queue) = self.marking.queue(arc.place.name())
911 && let Some(token) = queue.front()
912 {
913 read_tokens
914 .entry(Arc::clone(arc.place.name_arc()))
915 .or_default()
916 .push(token.clone());
917 }
918 }
919
920 for arc in &reset_arcs {
922 let removed = self.marking.remove_all(arc.place.name());
923 self.pending_reset_places
924 .insert(Arc::clone(arc.place.name_arc()));
925 if E::ENABLED {
926 for _ in &removed {
927 self.event_store.append(NetEvent::TokenRemoved {
928 place_name: Arc::clone(arc.place.name_arc()),
929 timestamp: now_millis(),
930 });
931 }
932 }
933 }
934
935 self.update_bitmap_after_consumption(tid);
936
937 if E::ENABLED {
938 self.event_store.append(NetEvent::TransitionStarted {
939 transition_name: Arc::clone(&transition_name),
940 timestamp: now_millis(),
941 });
942 }
943
944 self.enabled_flags[tid] = false;
946 self.enabled_transition_count -= 1;
947 self.enabled_at_ms[tid] = f64::NEG_INFINITY;
948 self.mark_transition_dirty(tid);
949
950 if is_sync {
951 let mut ctx = TransitionContext::new(
953 Arc::clone(&transition_name),
954 inputs,
955 read_tokens,
956 output_place_names,
957 None,
958 );
959 let result = action.run_sync(&mut ctx);
960 match result {
961 Ok(()) => {
962 let outputs = ctx.take_outputs();
963 self.process_outputs(tid, &transition_name, outputs);
964 if E::ENABLED {
965 self.event_store.append(NetEvent::TransitionCompleted {
966 transition_name: Arc::clone(&transition_name),
967 timestamp: now_millis(),
968 });
969 }
970 }
971 Err(err) => {
972 if E::ENABLED {
973 self.event_store.append(NetEvent::TransitionFailed {
974 transition_name: Arc::clone(&transition_name),
975 error: err.message,
976 timestamp: now_millis(),
977 });
978 }
979 }
980 }
981 } else {
982 *in_flight_count += 1;
984 let tx = completion_tx.clone();
985 let name = Arc::clone(&transition_name);
986 let ctx = TransitionContext::new(
987 Arc::clone(&transition_name),
988 inputs,
989 read_tokens,
990 output_place_names,
991 None,
992 );
993 tokio::spawn(async move {
994 let result = action.run_async(ctx).await;
995 let completion = match result {
996 Ok(mut completed_ctx) => ActionCompletion {
997 transition_name: Arc::clone(&name),
998 result: Ok(completed_ctx.take_outputs()),
999 },
1000 Err(err) => ActionCompletion {
1001 transition_name: Arc::clone(&name),
1002 result: Err(err.message),
1003 },
1004 };
1005 let _ = tx.send(completion);
1006 });
1007 }
1008 }
1009
1010 fn millis_until_next_timed_transition(&self) -> f64 {
1012 let mut min_wait = f64::INFINITY;
1013 let now_ms = self.elapsed_ms();
1014
1015 for tid in 0..self.compiled.transition_count {
1016 if !self.enabled_flags[tid] {
1017 continue;
1018 }
1019 let t = self.compiled.transition(tid);
1020 let elapsed = now_ms - self.enabled_at_ms[tid];
1021
1022 let earliest_ms = t.timing().earliest() as f64;
1023 let remaining_earliest = earliest_ms - elapsed;
1024 if remaining_earliest <= 0.0 {
1025 return 0.0;
1026 }
1027 min_wait = min_wait.min(remaining_earliest);
1028
1029 if self.has_deadline_flags[tid] {
1030 let latest_ms = t.timing().latest() as f64;
1031 let remaining_deadline = latest_ms - elapsed;
1032 if remaining_deadline <= 0.0 {
1033 return 0.0;
1034 }
1035 min_wait = min_wait.min(remaining_deadline);
1036 }
1037 }
1038
1039 min_wait
1040 }
1041}
1042
1043fn now_millis() -> u64 {
1044 std::time::SystemTime::now()
1045 .duration_since(std::time::UNIX_EPOCH)
1046 .unwrap_or_default()
1047 .as_millis() as u64
1048}
1049
1050#[allow(dead_code)]
1052fn validate_out_spec(out: &Out, produced_places: &HashSet<Arc<str>>) -> bool {
1053 match out {
1054 Out::Place(p) => produced_places.contains(p.name()),
1055 Out::And(children) => children
1056 .iter()
1057 .all(|c| validate_out_spec(c, produced_places)),
1058 Out::Xor(children) => children
1059 .iter()
1060 .any(|c| validate_out_spec(c, produced_places)),
1061 Out::Timeout { child, .. } => validate_out_spec(child, produced_places),
1062 Out::ForwardInput { to, .. } => produced_places.contains(to.name()),
1063 }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069 use libpetri_core::action::passthrough;
1070 use libpetri_core::input::one;
1071 use libpetri_core::output::out_place;
1072 use libpetri_core::place::Place;
1073 use libpetri_core::token::Token;
1074 use libpetri_core::transition::Transition;
1075 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
1076
1077 fn simple_chain() -> (PetriNet, Place<i32>, Place<i32>, Place<i32>) {
1078 let p1 = Place::<i32>::new("p1");
1079 let p2 = Place::<i32>::new("p2");
1080 let p3 = Place::<i32>::new("p3");
1081
1082 let t1 = Transition::builder("t1")
1083 .input(one(&p1))
1084 .output(out_place(&p2))
1085 .action(passthrough())
1086 .build();
1087 let t2 = Transition::builder("t2")
1088 .input(one(&p2))
1089 .output(out_place(&p3))
1090 .action(passthrough())
1091 .build();
1092
1093 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
1094 (net, p1, p2, p3)
1095 }
1096
1097 #[test]
1098 fn sync_passthrough_chain() {
1099 let (net, p1, _p2, _p3) = simple_chain();
1100
1101 let mut marking = Marking::new();
1102 marking.add(&p1, Token::at(42, 0));
1103
1104 let mut executor =
1105 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1106 let result = executor.run_sync();
1107
1108 assert_eq!(result.count("p1"), 0);
1112 }
1113
1114 #[test]
1115 fn sync_with_event_store() {
1116 let (net, p1, _, _) = simple_chain();
1117
1118 let mut marking = Marking::new();
1119 marking.add(&p1, Token::at(42, 0));
1120
1121 let mut executor =
1122 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1123 executor.run_sync();
1124
1125 let store = executor.event_store();
1126 assert!(!store.is_empty());
1127 }
1128
1129 #[test]
1130 fn sync_fork_chain() {
1131 let p1 = Place::<i32>::new("p1");
1132 let p2 = Place::<i32>::new("p2");
1133 let p3 = Place::<i32>::new("p3");
1134
1135 let t1 = Transition::builder("t1")
1136 .input(one(&p1))
1137 .output(libpetri_core::output::and(vec![
1138 out_place(&p2),
1139 out_place(&p3),
1140 ]))
1141 .action(libpetri_core::action::fork())
1142 .build();
1143
1144 let net = PetriNet::builder("fork").transition(t1).build();
1145
1146 let mut marking = Marking::new();
1147 marking.add(&p1, Token::at(42, 0));
1148
1149 let mut executor =
1150 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1151 let result = executor.run_sync();
1152
1153 assert_eq!(result.count("p1"), 0);
1155 assert_eq!(result.count("p2"), 1);
1156 assert_eq!(result.count("p3"), 1);
1157 }
1158
1159 #[test]
1160 fn sync_no_initial_tokens() {
1161 let (net, _, _, _) = simple_chain();
1162 let marking = Marking::new();
1163 let mut executor =
1164 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1165 let result = executor.run_sync();
1166 assert_eq!(result.count("p1"), 0);
1167 assert_eq!(result.count("p2"), 0);
1168 assert_eq!(result.count("p3"), 0);
1169 }
1170
1171 #[test]
1172 fn sync_priority_ordering() {
1173 let p = Place::<()>::new("p");
1174 let out_a = Place::<()>::new("a");
1175 let out_b = Place::<()>::new("b");
1176
1177 let t_high = Transition::builder("t_high")
1180 .input(one(&p))
1181 .output(out_place(&out_a))
1182 .action(libpetri_core::action::passthrough())
1183 .priority(10)
1184 .build();
1185 let t_low = Transition::builder("t_low")
1186 .input(one(&p))
1187 .output(out_place(&out_b))
1188 .action(libpetri_core::action::passthrough())
1189 .priority(1)
1190 .build();
1191
1192 let net = PetriNet::builder("priority")
1193 .transitions([t_high, t_low])
1194 .build();
1195
1196 let mut marking = Marking::new();
1197 marking.add(&p, Token::at((), 0));
1198
1199 let mut executor =
1200 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1201 executor.run_sync();
1202
1203 assert_eq!(executor.marking().count("p"), 0);
1207 }
1208
1209 #[test]
1210 fn sync_inhibitor_blocks() {
1211 let p1 = Place::<()>::new("p1");
1212 let p2 = Place::<()>::new("p2");
1213 let p_inh = Place::<()>::new("inh");
1214
1215 let t = Transition::builder("t1")
1216 .input(one(&p1))
1217 .output(out_place(&p2))
1218 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1219 .action(libpetri_core::action::passthrough())
1220 .build();
1221
1222 let net = PetriNet::builder("inhibitor").transition(t).build();
1223
1224 let mut marking = Marking::new();
1225 marking.add(&p1, Token::at((), 0));
1226 marking.add(&p_inh, Token::at((), 0));
1227
1228 let mut executor =
1229 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1230 executor.run_sync();
1231
1232 assert_eq!(executor.marking().count("p1"), 1);
1234 }
1235
1236 #[test]
1237 fn sync_linear_chain_5() {
1238 let places: Vec<Place<i32>> = (0..6).map(|i| Place::new(format!("p{i}"))).collect();
1240 let transitions: Vec<Transition> = (0..5)
1241 .map(|i| {
1242 Transition::builder(format!("t{i}"))
1243 .input(one(&places[i]))
1244 .output(out_place(&places[i + 1]))
1245 .action(libpetri_core::action::fork())
1246 .build()
1247 })
1248 .collect();
1249
1250 let net = PetriNet::builder("chain5").transitions(transitions).build();
1251
1252 let mut marking = Marking::new();
1253 marking.add(&places[0], Token::at(1, 0));
1254
1255 let mut executor =
1256 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1257 let result = executor.run_sync();
1258
1259 assert_eq!(result.count("p0"), 0);
1261 assert_eq!(result.count("p5"), 1);
1262 }
1263
1264 #[test]
1265 fn input_arc_requires_token_to_enable() {
1266 let p1 = Place::<i32>::new("p1");
1267 let p2 = Place::<i32>::new("p2");
1268 let t = Transition::builder("t1")
1269 .input(one(&p1))
1270 .output(out_place(&p2))
1271 .action(libpetri_core::action::fork())
1272 .build();
1273 let net = PetriNet::builder("test").transition(t).build();
1274
1275 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1277 &net,
1278 Marking::new(),
1279 ExecutorOptions::default(),
1280 );
1281 executor.run_sync();
1282 assert_eq!(executor.marking().count("p1"), 0);
1283 assert_eq!(executor.marking().count("p2"), 0);
1284 }
1285
1286 #[test]
1287 fn multiple_input_arcs_require_all_tokens() {
1288 let p1 = Place::<i32>::new("p1");
1289 let p2 = Place::<i32>::new("p2");
1290 let p3 = Place::<i32>::new("p3");
1291
1292 let t = Transition::builder("t1")
1293 .input(one(&p1))
1294 .input(one(&p2))
1295 .output(out_place(&p3))
1296 .action(libpetri_core::action::sync_action(|ctx| {
1297 ctx.output("p3", 99i32)?;
1298 Ok(())
1299 }))
1300 .build();
1301 let net = PetriNet::builder("test").transition(t).build();
1302
1303 let mut marking = Marking::new();
1305 marking.add(&p1, Token::at(1, 0));
1306 let mut executor =
1307 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1308 executor.run_sync();
1309 assert_eq!(executor.marking().count("p1"), 1);
1310 assert_eq!(executor.marking().count("p3"), 0);
1311
1312 let mut marking = Marking::new();
1314 marking.add(&p1, Token::at(1, 0));
1315 marking.add(&p2, Token::at(2, 0));
1316 let mut executor =
1317 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1318 executor.run_sync();
1319 assert_eq!(executor.marking().count("p1"), 0);
1320 assert_eq!(executor.marking().count("p2"), 0);
1321 assert_eq!(executor.marking().count("p3"), 1);
1322 }
1323
1324 #[test]
1325 fn read_arc_does_not_consume() {
1326 let p_in = Place::<i32>::new("in");
1327 let p_ctx = Place::<i32>::new("ctx");
1328 let p_out = Place::<i32>::new("out");
1329
1330 let t = Transition::builder("t1")
1331 .input(one(&p_in))
1332 .read(libpetri_core::arc::read(&p_ctx))
1333 .output(out_place(&p_out))
1334 .action(libpetri_core::action::sync_action(|ctx| {
1335 let v = ctx.input::<i32>("in")?;
1336 let r = ctx.read::<i32>("ctx")?;
1337 ctx.output("out", *v + *r)?;
1338 Ok(())
1339 }))
1340 .build();
1341 let net = PetriNet::builder("test").transition(t).build();
1342
1343 let mut marking = Marking::new();
1344 marking.add(&p_in, Token::at(10, 0));
1345 marking.add(&p_ctx, Token::at(5, 0));
1346
1347 let mut executor =
1348 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1349 executor.run_sync();
1350
1351 assert_eq!(executor.marking().count("in"), 0); assert_eq!(executor.marking().count("ctx"), 1); assert_eq!(executor.marking().count("out"), 1);
1354 }
1355
1356 #[test]
1357 fn reset_arc_removes_all_tokens() {
1358 let p_in = Place::<()>::new("in");
1359 let p_reset = Place::<i32>::new("reset");
1360 let p_out = Place::<()>::new("out");
1361
1362 let t = Transition::builder("t1")
1363 .input(one(&p_in))
1364 .reset(libpetri_core::arc::reset(&p_reset))
1365 .output(out_place(&p_out))
1366 .action(libpetri_core::action::fork())
1367 .build();
1368 let net = PetriNet::builder("test").transition(t).build();
1369
1370 let mut marking = Marking::new();
1371 marking.add(&p_in, Token::at((), 0));
1372 marking.add(&p_reset, Token::at(1, 0));
1373 marking.add(&p_reset, Token::at(2, 0));
1374 marking.add(&p_reset, Token::at(3, 0));
1375
1376 let mut executor =
1377 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1378 executor.run_sync();
1379
1380 assert_eq!(executor.marking().count("reset"), 0); assert_eq!(executor.marking().count("out"), 1);
1382 }
1383
1384 #[test]
1385 fn exactly_cardinality_consumes_n() {
1386 let p = Place::<i32>::new("p");
1387 let p_out = Place::<i32>::new("out");
1388
1389 let t = Transition::builder("t1")
1390 .input(libpetri_core::input::exactly(3, &p))
1391 .output(out_place(&p_out))
1392 .action(libpetri_core::action::sync_action(|ctx| {
1393 let vals = ctx.inputs::<i32>("p")?;
1394 for v in vals {
1395 ctx.output("out", *v)?;
1396 }
1397 Ok(())
1398 }))
1399 .build();
1400 let net = PetriNet::builder("test").transition(t).build();
1401
1402 let mut marking = Marking::new();
1403 for i in 0..5 {
1404 marking.add(&p, Token::at(i, 0));
1405 }
1406
1407 let mut executor =
1408 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1409 executor.run_sync();
1410
1411 assert_eq!(executor.marking().count("p"), 2);
1413 assert_eq!(executor.marking().count("out"), 3);
1414 }
1415
1416 #[test]
1417 fn all_cardinality_consumes_everything() {
1418 let p = Place::<i32>::new("p");
1419 let p_out = Place::<()>::new("out");
1420
1421 let t = Transition::builder("t1")
1422 .input(libpetri_core::input::all(&p))
1423 .output(out_place(&p_out))
1424 .action(libpetri_core::action::sync_action(|ctx| {
1425 let vals = ctx.inputs::<i32>("p")?;
1426 ctx.output("out", vals.len() as i32)?;
1427 Ok(())
1428 }))
1429 .build();
1430 let net = PetriNet::builder("test").transition(t).build();
1431
1432 let mut marking = Marking::new();
1433 for i in 0..5 {
1434 marking.add(&p, Token::at(i, 0));
1435 }
1436
1437 let mut executor =
1438 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1439 executor.run_sync();
1440
1441 assert_eq!(executor.marking().count("p"), 0);
1442 }
1443
1444 #[test]
1445 fn at_least_blocks_insufficient() {
1446 let p = Place::<i32>::new("p");
1447 let p_out = Place::<()>::new("out");
1448
1449 let t = Transition::builder("t1")
1450 .input(libpetri_core::input::at_least(3, &p))
1451 .output(out_place(&p_out))
1452 .action(libpetri_core::action::passthrough())
1453 .build();
1454 let net = PetriNet::builder("test").transition(t).build();
1455
1456 let mut marking = Marking::new();
1458 marking.add(&p, Token::at(1, 0));
1459 marking.add(&p, Token::at(2, 0));
1460
1461 let mut executor =
1462 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1463 executor.run_sync();
1464
1465 assert_eq!(executor.marking().count("p"), 2); }
1467
1468 #[test]
1469 fn at_least_fires_with_enough_and_consumes_all() {
1470 let p = Place::<i32>::new("p");
1471 let p_out = Place::<()>::new("out");
1472
1473 let t = Transition::builder("t1")
1474 .input(libpetri_core::input::at_least(3, &p))
1475 .output(out_place(&p_out))
1476 .action(libpetri_core::action::passthrough())
1477 .build();
1478 let net = PetriNet::builder("test").transition(t).build();
1479
1480 let mut marking = Marking::new();
1481 for i in 0..5 {
1482 marking.add(&p, Token::at(i, 0));
1483 }
1484
1485 let mut executor =
1486 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1487 executor.run_sync();
1488
1489 assert_eq!(executor.marking().count("p"), 0); }
1491
1492 #[test]
1493 fn guarded_input_only_consumes_matching() {
1494 let p = Place::<i32>::new("p");
1495 let p_out = Place::<i32>::new("out");
1496
1497 let t = Transition::builder("t1")
1498 .input(libpetri_core::input::one_guarded(&p, |v| *v > 5))
1499 .output(out_place(&p_out))
1500 .action(libpetri_core::action::fork())
1501 .build();
1502 let net = PetriNet::builder("test").transition(t).build();
1503
1504 let mut marking = Marking::new();
1505 marking.add(&p, Token::at(3, 0)); marking.add(&p, Token::at(10, 0)); let mut executor =
1509 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1510 executor.run_sync();
1511
1512 assert_eq!(executor.marking().count("p"), 1); assert_eq!(executor.marking().count("out"), 1); let peeked = executor.marking().peek(&p_out).unwrap();
1515 assert_eq!(*peeked, 10);
1516 }
1517
1518 #[test]
1519 fn guarded_input_blocks_when_no_match() {
1520 let p = Place::<i32>::new("p");
1521 let p_out = Place::<i32>::new("out");
1522
1523 let t = Transition::builder("t1")
1524 .input(libpetri_core::input::one_guarded(&p, |v| *v > 100))
1525 .output(out_place(&p_out))
1526 .action(libpetri_core::action::fork())
1527 .build();
1528 let net = PetriNet::builder("test").transition(t).build();
1529
1530 let mut marking = Marking::new();
1531 marking.add(&p, Token::at(3, 0));
1532 marking.add(&p, Token::at(10, 0));
1533
1534 let mut executor =
1535 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1536 executor.run_sync();
1537
1538 assert_eq!(executor.marking().count("p"), 2);
1540 assert_eq!(executor.marking().count("out"), 0);
1541 }
1542
1543 #[test]
1544 fn transform_action_outputs_to_all_places() {
1545 let p_in = Place::<i32>::new("in");
1546 let p_a = Place::<i32>::new("a");
1547 let p_b = Place::<i32>::new("b");
1548
1549 let t = Transition::builder("t1")
1550 .input(one(&p_in))
1551 .output(libpetri_core::output::and(vec![
1552 out_place(&p_a),
1553 out_place(&p_b),
1554 ]))
1555 .action(libpetri_core::action::transform(|ctx| {
1556 let v = ctx.input::<i32>("in").unwrap();
1557 Arc::new(*v * 2) as Arc<dyn std::any::Any + Send + Sync>
1558 }))
1559 .build();
1560 let net = PetriNet::builder("test").transition(t).build();
1561
1562 let mut marking = Marking::new();
1563 marking.add(&p_in, Token::at(5, 0));
1564
1565 let mut executor =
1566 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1567 executor.run_sync();
1568
1569 assert_eq!(executor.marking().count("a"), 1);
1570 assert_eq!(executor.marking().count("b"), 1);
1571 assert_eq!(*executor.marking().peek(&p_a).unwrap(), 10);
1572 assert_eq!(*executor.marking().peek(&p_b).unwrap(), 10);
1573 }
1574
1575 #[test]
1576 fn sync_action_custom_logic() {
1577 let p_in = Place::<i32>::new("in");
1578 let p_out = Place::<String>::new("out");
1579
1580 let t = Transition::builder("t1")
1581 .input(one(&p_in))
1582 .output(out_place(&p_out))
1583 .action(libpetri_core::action::sync_action(|ctx| {
1584 let v = ctx.input::<i32>("in")?;
1585 ctx.output("out", format!("value={v}"))?;
1586 Ok(())
1587 }))
1588 .build();
1589 let net = PetriNet::builder("test").transition(t).build();
1590
1591 let mut marking = Marking::new();
1592 marking.add(&p_in, Token::at(42, 0));
1593
1594 let mut executor =
1595 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1596 executor.run_sync();
1597
1598 assert_eq!(executor.marking().count("out"), 1);
1599 let peeked = executor.marking().peek(&p_out).unwrap();
1600 assert_eq!(*peeked, "value=42");
1601 }
1602
1603 #[test]
1604 fn action_error_does_not_crash() {
1605 let p_in = Place::<i32>::new("in");
1606 let p_out = Place::<i32>::new("out");
1607
1608 let t = Transition::builder("t1")
1609 .input(one(&p_in))
1610 .output(out_place(&p_out))
1611 .action(libpetri_core::action::sync_action(|_ctx| {
1612 Err(libpetri_core::action::ActionError::new(
1613 "intentional failure",
1614 ))
1615 }))
1616 .build();
1617 let net = PetriNet::builder("test").transition(t).build();
1618
1619 let mut marking = Marking::new();
1620 marking.add(&p_in, Token::at(42, 0));
1621
1622 let mut executor =
1623 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1624 executor.run_sync();
1625
1626 assert_eq!(executor.marking().count("in"), 0);
1628 assert_eq!(executor.marking().count("out"), 0);
1629
1630 let events = executor.event_store().events();
1632 assert!(
1633 events
1634 .iter()
1635 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
1636 );
1637 }
1638
1639 #[test]
1640 fn event_store_records_lifecycle() {
1641 let p1 = Place::<i32>::new("p1");
1642 let p2 = Place::<i32>::new("p2");
1643 let t = Transition::builder("t1")
1644 .input(one(&p1))
1645 .output(out_place(&p2))
1646 .action(libpetri_core::action::fork())
1647 .build();
1648 let net = PetriNet::builder("test").transition(t).build();
1649
1650 let mut marking = Marking::new();
1651 marking.add(&p1, Token::at(1, 0));
1652
1653 let mut executor =
1654 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
1655 executor.run_sync();
1656
1657 let events = executor.event_store().events();
1658
1659 assert!(
1662 events
1663 .iter()
1664 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
1665 );
1666 assert!(
1667 events
1668 .iter()
1669 .any(|e| matches!(e, NetEvent::TransitionEnabled { .. }))
1670 );
1671 assert!(
1672 events
1673 .iter()
1674 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
1675 );
1676 assert!(
1677 events
1678 .iter()
1679 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
1680 );
1681 assert!(
1682 events
1683 .iter()
1684 .any(|e| matches!(e, NetEvent::TokenRemoved { .. }))
1685 );
1686 assert!(
1687 events
1688 .iter()
1689 .any(|e| matches!(e, NetEvent::TokenAdded { .. }))
1690 );
1691 assert!(
1692 events
1693 .iter()
1694 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
1695 );
1696 }
1697
1698 #[test]
1699 fn noop_event_store_has_no_events() {
1700 let p1 = Place::<i32>::new("p1");
1701 let p2 = Place::<i32>::new("p2");
1702 let t = Transition::builder("t1")
1703 .input(one(&p1))
1704 .output(out_place(&p2))
1705 .action(libpetri_core::action::fork())
1706 .build();
1707 let net = PetriNet::builder("test").transition(t).build();
1708
1709 let mut marking = Marking::new();
1710 marking.add(&p1, Token::at(1, 0));
1711
1712 let mut executor =
1713 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1714 executor.run_sync();
1715
1716 assert!(executor.event_store().is_empty());
1717 }
1718
1719 #[test]
1720 fn empty_net_completes() {
1721 let net = PetriNet::builder("empty").build();
1722 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
1723 &net,
1724 Marking::new(),
1725 ExecutorOptions::default(),
1726 );
1727 executor.run_sync();
1728 assert!(executor.is_quiescent());
1729 }
1730
1731 #[test]
1732 fn single_transition_fires_once() {
1733 let p = Place::<i32>::new("p");
1734 let out = Place::<i32>::new("out");
1735 let t = Transition::builder("t1")
1736 .input(one(&p))
1737 .output(out_place(&out))
1738 .action(libpetri_core::action::fork())
1739 .build();
1740 let net = PetriNet::builder("test").transition(t).build();
1741
1742 let mut marking = Marking::new();
1743 marking.add(&p, Token::at(42, 0));
1744
1745 let mut executor =
1746 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1747 executor.run_sync();
1748
1749 assert_eq!(executor.marking().count("p"), 0);
1750 assert_eq!(executor.marking().count("out"), 1);
1751 }
1752
1753 #[test]
1754 fn many_tokens_all_processed() {
1755 let p = Place::<i32>::new("p");
1756 let out = Place::<i32>::new("out");
1757 let t = Transition::builder("t1")
1758 .input(one(&p))
1759 .output(out_place(&out))
1760 .action(libpetri_core::action::fork())
1761 .build();
1762 let net = PetriNet::builder("test").transition(t).build();
1763
1764 let mut marking = Marking::new();
1765 for i in 0..100 {
1766 marking.add(&p, Token::at(i, 0));
1767 }
1768
1769 let mut executor =
1770 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1771 executor.run_sync();
1772
1773 assert_eq!(executor.marking().count("p"), 0);
1774 assert_eq!(executor.marking().count("out"), 100);
1775 }
1776
1777 #[test]
1778 fn input_fifo_ordering() {
1779 let p = Place::<i32>::new("p");
1780 let out = Place::<i32>::new("out");
1781 let t = Transition::builder("t1")
1782 .input(one(&p))
1783 .output(out_place(&out))
1784 .action(libpetri_core::action::fork())
1785 .build();
1786 let net = PetriNet::builder("test").transition(t).build();
1787
1788 let mut marking = Marking::new();
1789 marking.add(&p, Token::at(1, 0));
1790 marking.add(&p, Token::at(2, 0));
1791 marking.add(&p, Token::at(3, 0));
1792
1793 let mut executor =
1794 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1795 executor.run_sync();
1796
1797 assert_eq!(executor.marking().count("out"), 3);
1799 }
1800
1801 #[test]
1802 fn inhibitor_unblocked_when_token_removed() {
1803 let p1 = Place::<()>::new("p1");
1806 let p_inh = Place::<()>::new("inh");
1807 let p_out = Place::<()>::new("out");
1808 let p_trigger = Place::<()>::new("trigger");
1809
1810 let t_clear = Transition::builder("t_clear")
1812 .input(one(&p_inh))
1813 .output(out_place(&p_trigger))
1814 .action(libpetri_core::action::fork())
1815 .priority(10) .build();
1817
1818 let t1 = Transition::builder("t1")
1820 .input(one(&p1))
1821 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
1822 .output(out_place(&p_out))
1823 .action(libpetri_core::action::fork())
1824 .priority(1)
1825 .build();
1826
1827 let net = PetriNet::builder("test").transitions([t_clear, t1]).build();
1828
1829 let mut marking = Marking::new();
1830 marking.add(&p1, Token::at((), 0));
1831 marking.add(&p_inh, Token::at((), 0));
1832
1833 let mut executor =
1834 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1835 executor.run_sync();
1836
1837 assert_eq!(executor.marking().count("inh"), 0);
1840 assert_eq!(executor.marking().count("p1"), 0);
1841 assert_eq!(executor.marking().count("out"), 1);
1842 }
1843
1844 #[test]
1845 fn combined_input_read_reset() {
1846 let p_in = Place::<i32>::new("in");
1847 let p_ctx = Place::<String>::new("ctx");
1848 let p_clear = Place::<i32>::new("clear");
1849 let p_out = Place::<String>::new("out");
1850
1851 let t = Transition::builder("t1")
1852 .input(one(&p_in))
1853 .read(libpetri_core::arc::read(&p_ctx))
1854 .reset(libpetri_core::arc::reset(&p_clear))
1855 .output(out_place(&p_out))
1856 .action(libpetri_core::action::sync_action(|ctx| {
1857 let v = ctx.input::<i32>("in")?;
1858 let r = ctx.read::<String>("ctx")?;
1859 ctx.output("out", format!("{v}-{r}"))?;
1860 Ok(())
1861 }))
1862 .build();
1863 let net = PetriNet::builder("test").transition(t).build();
1864
1865 let mut marking = Marking::new();
1866 marking.add(&p_in, Token::at(42, 0));
1867 marking.add(&p_ctx, Token::at("hello".to_string(), 0));
1868 marking.add(&p_clear, Token::at(1, 0));
1869 marking.add(&p_clear, Token::at(2, 0));
1870
1871 let mut executor =
1872 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1873 executor.run_sync();
1874
1875 assert_eq!(executor.marking().count("in"), 0); assert_eq!(executor.marking().count("ctx"), 1); assert_eq!(executor.marking().count("clear"), 0); assert_eq!(executor.marking().count("out"), 1);
1879 let peeked = executor.marking().peek(&p_out).unwrap();
1880 assert_eq!(*peeked, "42-hello");
1881 }
1882
1883 #[test]
1884 fn workflow_sequential_chain() {
1885 let p1 = Place::<i32>::new("p1");
1888 let p2 = Place::<i32>::new("p2");
1889 let p3 = Place::<i32>::new("p3");
1890 let p4 = Place::<i32>::new("p4");
1891
1892 let make_doubler = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
1893 let out_name: Arc<str> = Arc::from(outp.name());
1894 Transition::builder(name)
1895 .input(one(inp))
1896 .output(out_place(outp))
1897 .action(libpetri_core::action::sync_action(move |ctx| {
1898 let v = ctx
1899 .input::<i32>("p1")
1900 .or_else(|_| ctx.input::<i32>("p2"))
1901 .or_else(|_| ctx.input::<i32>("p3"))
1902 .unwrap();
1903 ctx.output(&out_name, *v * 2)?;
1904 Ok(())
1905 }))
1906 .build()
1907 };
1908
1909 let t1 = make_doubler("t1", &p1, &p2);
1910 let t2 = make_doubler("t2", &p2, &p3);
1911 let t3 = make_doubler("t3", &p3, &p4);
1912
1913 let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
1914
1915 let mut marking = Marking::new();
1916 marking.add(&p1, Token::at(1, 0));
1917
1918 let mut executor =
1919 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1920 executor.run_sync();
1921
1922 assert_eq!(executor.marking().count("p4"), 1);
1923 assert_eq!(*executor.marking().peek(&p4).unwrap(), 8); }
1925
1926 #[test]
1927 fn workflow_fork_join() {
1928 let p_start = Place::<i32>::new("start");
1930 let p_a = Place::<i32>::new("a");
1931 let p_b = Place::<i32>::new("b");
1932 let p_end = Place::<i32>::new("end");
1933
1934 let t_fork = Transition::builder("fork")
1935 .input(one(&p_start))
1936 .output(libpetri_core::output::and(vec![
1937 out_place(&p_a),
1938 out_place(&p_b),
1939 ]))
1940 .action(libpetri_core::action::fork())
1941 .build();
1942
1943 let t_join = Transition::builder("join")
1944 .input(one(&p_a))
1945 .input(one(&p_b))
1946 .output(out_place(&p_end))
1947 .action(libpetri_core::action::sync_action(|ctx| {
1948 let a = ctx.input::<i32>("a")?;
1949 let b = ctx.input::<i32>("b")?;
1950 ctx.output("end", *a + *b)?;
1951 Ok(())
1952 }))
1953 .build();
1954
1955 let net = PetriNet::builder("fork-join")
1956 .transitions([t_fork, t_join])
1957 .build();
1958
1959 let mut marking = Marking::new();
1960 marking.add(&p_start, Token::at(5, 0));
1961
1962 let mut executor =
1963 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
1964 executor.run_sync();
1965
1966 assert_eq!(executor.marking().count("start"), 0);
1967 assert_eq!(executor.marking().count("a"), 0);
1968 assert_eq!(executor.marking().count("b"), 0);
1969 assert_eq!(executor.marking().count("end"), 1);
1970 assert_eq!(*executor.marking().peek(&p_end).unwrap(), 10); }
1972
1973 #[test]
1974 fn workflow_mutual_exclusion() {
1975 let p_mutex = Place::<()>::new("mutex");
1977 let p_w1 = Place::<()>::new("w1");
1978 let p_w2 = Place::<()>::new("w2");
1979 let p_done1 = Place::<()>::new("done1");
1980 let p_done2 = Place::<()>::new("done2");
1981
1982 let t_w1 = Transition::builder("work1")
1983 .input(one(&p_w1))
1984 .input(one(&p_mutex))
1985 .output(libpetri_core::output::and(vec![
1986 out_place(&p_done1),
1987 out_place(&p_mutex), ]))
1989 .action(libpetri_core::action::sync_action(|ctx| {
1990 ctx.output("done1", ())?;
1991 ctx.output("mutex", ())?;
1992 Ok(())
1993 }))
1994 .build();
1995
1996 let t_w2 = Transition::builder("work2")
1997 .input(one(&p_w2))
1998 .input(one(&p_mutex))
1999 .output(libpetri_core::output::and(vec![
2000 out_place(&p_done2),
2001 out_place(&p_mutex), ]))
2003 .action(libpetri_core::action::sync_action(|ctx| {
2004 ctx.output("done2", ())?;
2005 ctx.output("mutex", ())?;
2006 Ok(())
2007 }))
2008 .build();
2009
2010 let net = PetriNet::builder("mutex").transitions([t_w1, t_w2]).build();
2011
2012 let mut marking = Marking::new();
2013 marking.add(&p_mutex, Token::at((), 0));
2014 marking.add(&p_w1, Token::at((), 0));
2015 marking.add(&p_w2, Token::at((), 0));
2016
2017 let mut executor =
2018 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2019 executor.run_sync();
2020
2021 assert_eq!(executor.marking().count("done1"), 1);
2023 assert_eq!(executor.marking().count("done2"), 1);
2024 assert_eq!(executor.marking().count("mutex"), 1); }
2026
2027 #[test]
2028 fn produce_action() {
2029 let p_in = Place::<()>::new("in");
2030 let p_out = Place::<String>::new("out");
2031
2032 let t = Transition::builder("t1")
2033 .input(one(&p_in))
2034 .output(out_place(&p_out))
2035 .action(libpetri_core::action::produce(
2036 Arc::from("out"),
2037 "produced_value".to_string(),
2038 ))
2039 .build();
2040 let net = PetriNet::builder("test").transition(t).build();
2041
2042 let mut marking = Marking::new();
2043 marking.add(&p_in, Token::at((), 0));
2044
2045 let mut executor =
2046 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2047 executor.run_sync();
2048
2049 assert_eq!(executor.marking().count("out"), 1);
2050 }
2051
2052 #[test]
2053 fn xor_output_fires_one_branch() {
2054 let p = Place::<i32>::new("p");
2055 let a = Place::<i32>::new("a");
2056 let b = Place::<i32>::new("b");
2057
2058 let t = Transition::builder("t1")
2059 .input(one(&p))
2060 .output(libpetri_core::output::xor(vec![
2061 out_place(&a),
2062 out_place(&b),
2063 ]))
2064 .action(libpetri_core::action::sync_action(|ctx| {
2065 let v = ctx.input::<i32>("p")?;
2066 ctx.output("a", *v)?;
2068 Ok(())
2069 }))
2070 .build();
2071 let net = PetriNet::builder("test").transition(t).build();
2072
2073 let mut marking = Marking::new();
2074 marking.add(&p, Token::at(1, 0));
2075
2076 let mut executor =
2077 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2078 executor.run_sync();
2079
2080 let total = executor.marking().count("a") + executor.marking().count("b");
2082 assert!(total >= 1);
2083 }
2084
2085 #[test]
2086 fn and_output_fires_to_all() {
2087 let p = Place::<i32>::new("p");
2088 let a = Place::<i32>::new("a");
2089 let b = Place::<i32>::new("b");
2090 let c = Place::<i32>::new("c");
2091
2092 let t = Transition::builder("t1")
2093 .input(one(&p))
2094 .output(libpetri_core::output::and(vec![
2095 out_place(&a),
2096 out_place(&b),
2097 out_place(&c),
2098 ]))
2099 .action(libpetri_core::action::sync_action(|ctx| {
2100 let v = ctx.input::<i32>("p")?;
2101 ctx.output("a", *v)?;
2102 ctx.output("b", *v * 10)?;
2103 ctx.output("c", *v * 100)?;
2104 Ok(())
2105 }))
2106 .build();
2107 let net = PetriNet::builder("test").transition(t).build();
2108
2109 let mut marking = Marking::new();
2110 marking.add(&p, Token::at(1, 0));
2111
2112 let mut executor =
2113 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2114 executor.run_sync();
2115
2116 assert_eq!(executor.marking().count("a"), 1);
2117 assert_eq!(executor.marking().count("b"), 1);
2118 assert_eq!(executor.marking().count("c"), 1);
2119 assert_eq!(*executor.marking().peek(&a).unwrap(), 1);
2120 assert_eq!(*executor.marking().peek(&b).unwrap(), 10);
2121 assert_eq!(*executor.marking().peek(&c).unwrap(), 100);
2122 }
2123
2124 #[test]
2125 fn transition_with_no_output_consumes_only() {
2126 let p = Place::<i32>::new("p");
2127
2128 let t = Transition::builder("t1")
2129 .input(one(&p))
2130 .action(libpetri_core::action::sync_action(|ctx| {
2131 let _ = ctx.input::<i32>("p")?;
2132 Ok(())
2133 }))
2134 .build();
2135 let net = PetriNet::builder("test").transition(t).build();
2136
2137 let mut marking = Marking::new();
2138 marking.add(&p, Token::at(42, 0));
2139
2140 let mut executor =
2141 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2142 executor.run_sync();
2143
2144 assert_eq!(executor.marking().count("p"), 0);
2145 }
2146
2147 #[test]
2148 fn multiple_transitions_same_input_compete() {
2149 let p1 = Place::<()>::new("p1");
2151 let out_a = Place::<()>::new("a");
2152 let out_b = Place::<()>::new("b");
2153
2154 let t_a = Transition::builder("ta")
2155 .input(one(&p1))
2156 .output(out_place(&out_a))
2157 .action(libpetri_core::action::fork())
2158 .build();
2159 let t_b = Transition::builder("tb")
2160 .input(one(&p1))
2161 .output(out_place(&out_b))
2162 .action(libpetri_core::action::fork())
2163 .build();
2164
2165 let net = PetriNet::builder("test").transitions([t_a, t_b]).build();
2166
2167 let mut marking = Marking::new();
2168 marking.add(&p1, Token::at((), 0));
2169
2170 let mut executor =
2171 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2172 executor.run_sync();
2173
2174 let total = executor.marking().count("a") + executor.marking().count("b");
2176 assert_eq!(total, 1);
2177 assert_eq!(executor.marking().count("p1"), 0);
2178 }
2179
2180 #[test]
2181 fn priority_higher_fires_first() {
2182 let p = Place::<()>::new("p");
2183 let out_hi = Place::<()>::new("hi");
2184 let out_lo = Place::<()>::new("lo");
2185
2186 let t_hi = Transition::builder("hi")
2187 .input(one(&p))
2188 .output(out_place(&out_hi))
2189 .action(libpetri_core::action::fork())
2190 .priority(10)
2191 .build();
2192 let t_lo = Transition::builder("lo")
2193 .input(one(&p))
2194 .output(out_place(&out_lo))
2195 .action(libpetri_core::action::fork())
2196 .priority(1)
2197 .build();
2198
2199 let net = PetriNet::builder("test").transitions([t_hi, t_lo]).build();
2200
2201 let mut marking = Marking::new();
2202 marking.add(&p, Token::at((), 0));
2203
2204 let mut executor =
2205 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2206 executor.run_sync();
2207
2208 assert_eq!(executor.marking().count("hi"), 1);
2210 assert_eq!(executor.marking().count("lo"), 0);
2211 }
2212
2213 #[test]
2214 fn quiescent_when_no_enabled_transitions() {
2215 let p = Place::<i32>::new("p");
2216 let out = Place::<i32>::new("out");
2217
2218 let t = Transition::builder("t1")
2219 .input(one(&p))
2220 .output(out_place(&out))
2221 .action(libpetri_core::action::fork())
2222 .build();
2223 let net = PetriNet::builder("test").transition(t).build();
2224
2225 let mut marking = Marking::new();
2226 marking.add(&p, Token::at(1, 0));
2227
2228 let mut executor =
2229 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2230 executor.run_sync();
2231
2232 assert!(executor.is_quiescent());
2233 }
2234
2235 #[test]
2236 fn event_store_transition_enabled_disabled() {
2237 let p1 = Place::<()>::new("p1");
2238 let p2 = Place::<()>::new("p2");
2239
2240 let t = Transition::builder("t1")
2241 .input(one(&p1))
2242 .output(out_place(&p2))
2243 .action(libpetri_core::action::fork())
2244 .build();
2245 let net = PetriNet::builder("test").transition(t).build();
2246
2247 let mut marking = Marking::new();
2248 marking.add(&p1, Token::at((), 0));
2249
2250 let mut executor =
2251 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2252 executor.run_sync();
2253
2254 let events = executor.event_store().events();
2255 assert!(events.len() >= 4);
2258
2259 let has_enabled = events.iter().any(|e| {
2261 matches!(e, NetEvent::TransitionEnabled { transition_name, .. } if transition_name.as_ref() == "t1")
2262 });
2263 assert!(has_enabled);
2264 }
2265
2266 #[test]
2267 fn diamond_pattern() {
2268 let p1 = Place::<()>::new("p1");
2270 let p2 = Place::<()>::new("p2");
2271 let p3 = Place::<()>::new("p3");
2272 let p4 = Place::<()>::new("p4");
2273
2274 let t1 = Transition::builder("t1")
2275 .input(one(&p1))
2276 .output(libpetri_core::output::and(vec![
2277 out_place(&p2),
2278 out_place(&p3),
2279 ]))
2280 .action(libpetri_core::action::fork())
2281 .build();
2282 let t2 = Transition::builder("t2")
2283 .input(one(&p2))
2284 .output(out_place(&p4))
2285 .action(libpetri_core::action::fork())
2286 .build();
2287 let t3 = Transition::builder("t3")
2288 .input(one(&p3))
2289 .output(out_place(&p4))
2290 .action(libpetri_core::action::fork())
2291 .build();
2292
2293 let net = PetriNet::builder("diamond")
2294 .transitions([t1, t2, t3])
2295 .build();
2296
2297 let mut marking = Marking::new();
2298 marking.add(&p1, Token::at((), 0));
2299
2300 let mut executor =
2301 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2302 executor.run_sync();
2303
2304 assert_eq!(executor.marking().count("p4"), 2); }
2306
2307 #[test]
2308 fn self_loop_with_guard_terminates() {
2309 let p = Place::<i32>::new("p");
2312 let done = Place::<i32>::new("done");
2313
2314 let t = Transition::builder("dec")
2315 .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v > 0))
2316 .output(out_place(&p))
2317 .action(libpetri_core::action::sync_action(|ctx| {
2318 let v = ctx.input::<i32>("p")?;
2319 ctx.output("p", *v - 1)?;
2320 Ok(())
2321 }))
2322 .build();
2323
2324 let t_done = Transition::builder("finish")
2326 .input(libpetri_core::input::one_guarded(&p, |v: &i32| *v == 0))
2327 .output(out_place(&done))
2328 .action(libpetri_core::action::fork())
2329 .build();
2330
2331 let net = PetriNet::builder("countdown")
2332 .transitions([t, t_done])
2333 .build();
2334
2335 let mut marking = Marking::new();
2336 marking.add(&p, Token::at(3, 0));
2337
2338 let mut executor =
2339 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2340 executor.run_sync();
2341
2342 assert_eq!(executor.marking().count("done"), 1);
2343 assert_eq!(*executor.marking().peek(&done).unwrap(), 0);
2344 }
2345
2346 #[test]
2347 fn multiple_tokens_different_types() {
2348 let p_int = Place::<i32>::new("ints");
2349 let p_str = Place::<String>::new("strs");
2350 let p_out = Place::<String>::new("out");
2351
2352 let t = Transition::builder("combine")
2353 .input(one(&p_int))
2354 .input(one(&p_str))
2355 .output(out_place(&p_out))
2356 .action(libpetri_core::action::sync_action(|ctx| {
2357 let i = ctx.input::<i32>("ints")?;
2358 let s = ctx.input::<String>("strs")?;
2359 ctx.output("out", format!("{s}-{i}"))?;
2360 Ok(())
2361 }))
2362 .build();
2363
2364 let net = PetriNet::builder("test").transition(t).build();
2365
2366 let mut marking = Marking::new();
2367 marking.add(&p_int, Token::at(42, 0));
2368 marking.add(&p_str, Token::at("hello".to_string(), 0));
2369
2370 let mut executor =
2371 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2372 executor.run_sync();
2373
2374 assert_eq!(*executor.marking().peek(&p_out).unwrap(), "hello-42");
2375 }
2376}
2377
2378#[cfg(all(test, feature = "tokio"))]
2379mod async_tests {
2380 use super::*;
2381 use crate::environment::ExternalEvent;
2382 use libpetri_core::action::{ActionError, async_action, fork};
2383 use libpetri_core::input::one;
2384 use libpetri_core::output::out_place;
2385 use libpetri_core::place::Place;
2386 use libpetri_core::token::{ErasedToken, Token};
2387 use libpetri_core::transition::Transition;
2388 use libpetri_event::event_store::{InMemoryEventStore, NoopEventStore};
2389
2390 #[tokio::test]
2391 async fn async_fork_single_transition() {
2392 let p1 = Place::<i32>::new("p1");
2393 let p2 = Place::<i32>::new("p2");
2394
2395 let t1 = Transition::builder("t1")
2396 .input(one(&p1))
2397 .output(out_place(&p2))
2398 .action(fork())
2399 .build();
2400
2401 let net = PetriNet::builder("test").transition(t1).build();
2402 let mut marking = Marking::new();
2403 marking.add(&p1, Token::at(42, 0));
2404
2405 let mut executor =
2406 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2407
2408 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2409 executor.run_async(rx).await;
2410
2411 assert_eq!(executor.marking().count("p2"), 1);
2412 assert_eq!(*executor.marking().peek(&p2).unwrap(), 42);
2413 }
2414
2415 #[tokio::test]
2416 async fn async_action_produces_output() {
2417 let p1 = Place::<i32>::new("p1");
2418 let p2 = Place::<i32>::new("p2");
2419
2420 let action = async_action(|mut ctx| async move {
2421 let val: i32 = *ctx.input::<i32>("p1")?;
2422 ctx.output("p2", val * 10)?;
2423 Ok(ctx)
2424 });
2425
2426 let t1 = Transition::builder("t1")
2427 .input(one(&p1))
2428 .output(out_place(&p2))
2429 .action(action)
2430 .build();
2431
2432 let net = PetriNet::builder("test").transition(t1).build();
2433 let mut marking = Marking::new();
2434 marking.add(&p1, Token::at(5, 0));
2435
2436 let mut executor =
2437 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2438
2439 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2440 executor.run_async(rx).await;
2441
2442 assert_eq!(executor.marking().count("p2"), 1);
2443 assert_eq!(*executor.marking().peek(&p2).unwrap(), 50);
2444 }
2445
2446 #[tokio::test]
2447 async fn async_chain_two_transitions() {
2448 let p1 = Place::<i32>::new("p1");
2449 let p2 = Place::<i32>::new("p2");
2450 let p3 = Place::<i32>::new("p3");
2451
2452 let t1 = Transition::builder("t1")
2453 .input(one(&p1))
2454 .output(out_place(&p2))
2455 .action(fork())
2456 .build();
2457
2458 let t2 = Transition::builder("t2")
2459 .input(one(&p2))
2460 .output(out_place(&p3))
2461 .action(fork())
2462 .build();
2463
2464 let net = PetriNet::builder("chain").transitions([t1, t2]).build();
2465 let mut marking = Marking::new();
2466 marking.add(&p1, Token::at(99, 0));
2467
2468 let mut executor =
2469 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2470
2471 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2472 executor.run_async(rx).await;
2473
2474 assert_eq!(executor.marking().count("p3"), 1);
2475 assert_eq!(*executor.marking().peek(&p3).unwrap(), 99);
2476 }
2477
2478 #[tokio::test]
2479 async fn async_event_injection() {
2480 let p1 = Place::<i32>::new("p1");
2481 let p2 = Place::<i32>::new("p2");
2482
2483 let t1 = Transition::builder("t1")
2484 .input(one(&p1))
2485 .output(out_place(&p2))
2486 .action(fork())
2487 .build();
2488
2489 let net = PetriNet::builder("test").transition(t1).build();
2490 let marking = Marking::new(); let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2493 &net,
2494 marking,
2495 ExecutorOptions {
2496 long_running: true,
2497 ..Default::default()
2498 },
2499 );
2500
2501 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2502
2503 tokio::spawn(async move {
2505 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2506 let token = Token::at(77, 0);
2507 tx.send(ExternalEvent {
2508 place_name: Arc::from("p1"),
2509 token: ErasedToken::from_typed(&token),
2510 })
2511 .unwrap();
2512 });
2514
2515 executor.run_async(rx).await;
2516
2517 assert_eq!(executor.marking().count("p2"), 1);
2518 assert_eq!(*executor.marking().peek(&p2).unwrap(), 77);
2519 }
2520
2521 #[tokio::test]
2522 async fn async_with_event_store_records_events() {
2523 let p1 = Place::<i32>::new("p1");
2524 let p2 = Place::<i32>::new("p2");
2525
2526 let t1 = Transition::builder("t1")
2527 .input(one(&p1))
2528 .output(out_place(&p2))
2529 .action(fork())
2530 .build();
2531
2532 let net = PetriNet::builder("test").transition(t1).build();
2533 let mut marking = Marking::new();
2534 marking.add(&p1, Token::at(1, 0));
2535
2536 let mut executor =
2537 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2538
2539 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2540 executor.run_async(rx).await;
2541
2542 let events = executor.event_store().events();
2543 assert!(!events.is_empty());
2544 assert!(
2546 events
2547 .iter()
2548 .any(|e| matches!(e, NetEvent::ExecutionStarted { .. }))
2549 );
2550 assert!(
2551 events
2552 .iter()
2553 .any(|e| matches!(e, NetEvent::TransitionStarted { .. }))
2554 );
2555 assert!(
2556 events
2557 .iter()
2558 .any(|e| matches!(e, NetEvent::TransitionCompleted { .. }))
2559 );
2560 assert!(
2561 events
2562 .iter()
2563 .any(|e| matches!(e, NetEvent::ExecutionCompleted { .. }))
2564 );
2565 }
2566
2567 #[tokio::test]
2568 async fn async_action_error_does_not_crash() {
2569 let p1 = Place::<i32>::new("p1");
2570 let p2 = Place::<i32>::new("p2");
2571
2572 let action =
2573 async_action(|_ctx| async move { Err(ActionError::new("intentional failure")) });
2574
2575 let t1 = Transition::builder("t1")
2576 .input(one(&p1))
2577 .output(out_place(&p2))
2578 .action(action)
2579 .build();
2580
2581 let net = PetriNet::builder("test").transition(t1).build();
2582 let mut marking = Marking::new();
2583 marking.add(&p1, Token::at(1, 0));
2584
2585 let mut executor =
2586 BitmapNetExecutor::<InMemoryEventStore>::new(&net, marking, ExecutorOptions::default());
2587
2588 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2589 executor.run_async(rx).await;
2590
2591 assert_eq!(executor.marking().count("p1"), 0);
2593 assert_eq!(executor.marking().count("p2"), 0);
2594
2595 let events = executor.event_store().events();
2597 assert!(
2598 events
2599 .iter()
2600 .any(|e| matches!(e, NetEvent::TransitionFailed { .. }))
2601 );
2602 }
2603
2604 #[tokio::test]
2605 async fn async_delayed_timing() {
2606 use libpetri_core::timing::delayed;
2607
2608 let p1 = Place::<i32>::new("p1");
2609 let p2 = Place::<i32>::new("p2");
2610
2611 let t1 = Transition::builder("t1")
2612 .input(one(&p1))
2613 .output(out_place(&p2))
2614 .timing(delayed(100))
2615 .action(fork())
2616 .build();
2617
2618 let net = PetriNet::builder("test").transition(t1).build();
2619 let mut marking = Marking::new();
2620 marking.add(&p1, Token::at(10, 0));
2621
2622 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2623 &net,
2624 marking,
2625 ExecutorOptions {
2626 long_running: true,
2627 ..Default::default()
2628 },
2629 );
2630
2631 let start = std::time::Instant::now();
2632 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2633 tokio::spawn(async move {
2635 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2636 drop(tx);
2637 });
2638 executor.run_async(rx).await;
2639
2640 assert!(
2641 start.elapsed().as_millis() >= 80,
2642 "delayed(100) should wait ~100ms"
2643 );
2644 assert_eq!(executor.marking().count("p2"), 1);
2645 assert_eq!(*executor.marking().peek(&p2).unwrap(), 10);
2646 }
2647
2648 #[tokio::test]
2649 async fn async_exact_timing() {
2650 use libpetri_core::timing::exact;
2651
2652 let p1 = Place::<i32>::new("p1");
2653 let p2 = Place::<i32>::new("p2");
2654
2655 let t1 = Transition::builder("t1")
2656 .input(one(&p1))
2657 .output(out_place(&p2))
2658 .timing(exact(100))
2659 .action(fork())
2660 .build();
2661
2662 let net = PetriNet::builder("test").transition(t1).build();
2663 let mut marking = Marking::new();
2664 marking.add(&p1, Token::at(20, 0));
2665
2666 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2667 &net,
2668 marking,
2669 ExecutorOptions {
2670 long_running: true,
2671 ..Default::default()
2672 },
2673 );
2674
2675 let start = std::time::Instant::now();
2676 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2677 tokio::spawn(async move {
2678 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
2679 drop(tx);
2680 });
2681 executor.run_async(rx).await;
2682
2683 assert!(
2684 start.elapsed().as_millis() >= 80,
2685 "exact(100) should wait ~100ms"
2686 );
2687 assert_eq!(executor.marking().count("p2"), 1);
2688 assert_eq!(*executor.marking().peek(&p2).unwrap(), 20);
2689 }
2690
2691 #[tokio::test]
2692 async fn async_window_timing() {
2693 use libpetri_core::timing::window;
2694
2695 let p1 = Place::<i32>::new("p1");
2696 let p2 = Place::<i32>::new("p2");
2697
2698 let t1 = Transition::builder("t1")
2699 .input(one(&p1))
2700 .output(out_place(&p2))
2701 .timing(window(50, 200))
2702 .action(fork())
2703 .build();
2704
2705 let net = PetriNet::builder("test").transition(t1).build();
2706 let mut marking = Marking::new();
2707 marking.add(&p1, Token::at(30, 0));
2708
2709 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2710 &net,
2711 marking,
2712 ExecutorOptions {
2713 long_running: true,
2714 ..Default::default()
2715 },
2716 );
2717
2718 let start = std::time::Instant::now();
2719 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2720 tokio::spawn(async move {
2721 tokio::time::sleep(std::time::Duration::from_millis(400)).await;
2722 drop(tx);
2723 });
2724 executor.run_async(rx).await;
2725
2726 assert!(
2727 start.elapsed().as_millis() >= 40,
2728 "window(50,200) should wait >= ~50ms"
2729 );
2730 assert_eq!(executor.marking().count("p2"), 1);
2731 assert_eq!(*executor.marking().peek(&p2).unwrap(), 30);
2732 }
2733
2734 #[tokio::test]
2735 async fn async_deadline_enforcement() {
2736 use libpetri_core::action::sync_action;
2737 use libpetri_core::timing::window;
2738
2739 let p_slow = Place::<i32>::new("p_slow");
2740 let p_windowed = Place::<i32>::new("p_windowed");
2741 let slow_out = Place::<i32>::new("slow_out");
2742 let windowed_out = Place::<i32>::new("windowed_out");
2743
2744 let t_slow = Transition::builder("slow")
2748 .input(one(&p_slow))
2749 .output(out_place(&slow_out))
2750 .priority(10)
2751 .action(sync_action(|ctx| {
2752 let v = ctx.input::<i32>("p_slow")?;
2753 let start = std::time::Instant::now();
2754 while start.elapsed().as_millis() < 200 {
2755 std::hint::spin_loop();
2756 }
2757 ctx.output("slow_out", *v)?;
2758 Ok(())
2759 }))
2760 .build();
2761
2762 let t_windowed = Transition::builder("windowed")
2766 .input(one(&p_windowed))
2767 .output(out_place(&windowed_out))
2768 .timing(window(50, 100))
2769 .action(fork())
2770 .build();
2771
2772 let net = PetriNet::builder("test")
2773 .transitions([t_slow, t_windowed])
2774 .build();
2775
2776 let mut marking = Marking::new();
2777 marking.add(&p_slow, Token::at(1, 0));
2778 marking.add(&p_windowed, Token::at(2, 0));
2779
2780 let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
2781 &net,
2782 marking,
2783 ExecutorOptions {
2784 long_running: true,
2785 ..Default::default()
2786 },
2787 );
2788
2789 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2790 tokio::spawn(async move {
2791 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2792 drop(tx);
2793 });
2794 executor.run_async(rx).await;
2795
2796 assert_eq!(executor.marking().count("slow_out"), 1);
2798 assert_eq!(
2800 executor.marking().count("windowed_out"),
2801 0,
2802 "windowed transition should have been disabled by deadline"
2803 );
2804
2805 let events = executor.event_store().events();
2806 assert!(
2807 events.iter().any(|e| matches!(e, NetEvent::TransitionTimedOut { transition_name, .. } if &**transition_name == "windowed")),
2808 "expected TransitionTimedOut event for 'windowed'"
2809 );
2810 }
2811
2812 #[tokio::test]
2813 async fn async_multiple_injections() {
2814 let p1 = Place::<i32>::new("p1");
2815 let p2 = Place::<i32>::new("p2");
2816
2817 let t1 = Transition::builder("t1")
2818 .input(one(&p1))
2819 .output(out_place(&p2))
2820 .action(fork())
2821 .build();
2822
2823 let net = PetriNet::builder("test").transition(t1).build();
2824 let marking = Marking::new();
2825
2826 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
2827 &net,
2828 marking,
2829 ExecutorOptions {
2830 long_running: true,
2831 ..Default::default()
2832 },
2833 );
2834
2835 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2836
2837 tokio::spawn(async move {
2838 for i in 0..5 {
2839 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2840 let token = Token::at(i, 0);
2841 tx.send(ExternalEvent {
2842 place_name: Arc::from("p1"),
2843 token: ErasedToken::from_typed(&token),
2844 })
2845 .unwrap();
2846 }
2847 });
2849
2850 executor.run_async(rx).await;
2851
2852 assert_eq!(
2853 executor.marking().count("p2"),
2854 5,
2855 "all 5 injected tokens should arrive"
2856 );
2857 }
2858
2859 #[tokio::test]
2860 async fn async_parallel_execution() {
2861 let p1 = Place::<i32>::new("p1");
2862 let p2 = Place::<i32>::new("p2");
2863 let p3 = Place::<i32>::new("p3");
2864 let out1 = Place::<i32>::new("out1");
2865 let out2 = Place::<i32>::new("out2");
2866 let out3 = Place::<i32>::new("out3");
2867
2868 let make_transition = |name: &str, inp: &Place<i32>, outp: &Place<i32>| {
2869 Transition::builder(name)
2870 .input(one(inp))
2871 .output(out_place(outp))
2872 .action(async_action(|mut ctx| async move {
2873 let v: i32 =
2874 *ctx.input::<i32>(ctx.transition_name().replace("t", "p").as_str())?;
2875 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2876 ctx.output(&ctx.transition_name().replace("t", "out"), v)?;
2877 Ok(ctx)
2878 }))
2879 .build()
2880 };
2881
2882 let t1 = make_transition("t1", &p1, &out1);
2883 let t2 = make_transition("t2", &p2, &out2);
2884 let t3 = make_transition("t3", &p3, &out3);
2885
2886 let net = PetriNet::builder("parallel")
2887 .transitions([t1, t2, t3])
2888 .build();
2889 let mut marking = Marking::new();
2890 marking.add(&p1, Token::at(1, 0));
2891 marking.add(&p2, Token::at(2, 0));
2892 marking.add(&p3, Token::at(3, 0));
2893
2894 let mut executor =
2895 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2896
2897 let start = std::time::Instant::now();
2898 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2899 executor.run_async(rx).await;
2900 let elapsed = start.elapsed().as_millis();
2901
2902 assert_eq!(executor.marking().count("out1"), 1);
2903 assert_eq!(executor.marking().count("out2"), 1);
2904 assert_eq!(executor.marking().count("out3"), 1);
2905 assert!(
2906 elapsed < 250,
2907 "parallel execution should take < 250ms, took {elapsed}ms"
2908 );
2909 }
2910
2911 #[tokio::test]
2912 async fn async_sequential_chain_order() {
2913 use std::sync::Mutex;
2914
2915 let p1 = Place::<i32>::new("p1");
2916 let p2 = Place::<i32>::new("p2");
2917 let p3 = Place::<i32>::new("p3");
2918 let p4 = Place::<i32>::new("p4");
2919
2920 let order: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
2921
2922 let make_chain = |name: &str,
2923 inp: &Place<i32>,
2924 outp: &Place<i32>,
2925 id: i32,
2926 order: Arc<Mutex<Vec<i32>>>| {
2927 let inp_name: Arc<str> = Arc::from(inp.name());
2928 let outp_name: Arc<str> = Arc::from(outp.name());
2929 Transition::builder(name)
2930 .input(one(inp))
2931 .output(out_place(outp))
2932 .action(async_action(move |mut ctx| {
2933 let order = Arc::clone(&order);
2934 let inp_name = Arc::clone(&inp_name);
2935 let outp_name = Arc::clone(&outp_name);
2936 async move {
2937 let v: i32 = *ctx.input::<i32>(&inp_name)?;
2938 order.lock().unwrap().push(id);
2939 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2940 ctx.output(&outp_name, v)?;
2941 Ok(ctx)
2942 }
2943 }))
2944 .build()
2945 };
2946
2947 let t1 = make_chain("t1", &p1, &p2, 1, Arc::clone(&order));
2948 let t2 = make_chain("t2", &p2, &p3, 2, Arc::clone(&order));
2949 let t3 = make_chain("t3", &p3, &p4, 3, Arc::clone(&order));
2950
2951 let net = PetriNet::builder("chain").transitions([t1, t2, t3]).build();
2952 let mut marking = Marking::new();
2953 marking.add(&p1, Token::at(1, 0));
2954
2955 let mut executor =
2956 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
2957
2958 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
2959 executor.run_async(rx).await;
2960
2961 assert_eq!(executor.marking().count("p4"), 1);
2962 let recorded = order.lock().unwrap().clone();
2963 assert_eq!(recorded, vec![1, 2, 3], "chain should execute in order");
2964 }
2965
2966 #[tokio::test]
2967 async fn async_fork_join() {
2968 use libpetri_core::output::and;
2969
2970 let p1 = Place::<i32>::new("p1");
2971 let p2 = Place::<i32>::new("p2");
2972 let p3 = Place::<i32>::new("p3");
2973 let p4 = Place::<i32>::new("p4");
2974
2975 let t_fork = Transition::builder("fork")
2977 .input(one(&p1))
2978 .output(and(vec![out_place(&p2), out_place(&p3)]))
2979 .action(libpetri_core::action::sync_action(|ctx| {
2980 let v = ctx.input::<i32>("p1")?;
2981 ctx.output("p2", *v)?;
2982 ctx.output("p3", *v)?;
2983 Ok(())
2984 }))
2985 .build();
2986
2987 let t_join = Transition::builder("join")
2989 .input(one(&p2))
2990 .input(one(&p3))
2991 .output(out_place(&p4))
2992 .action(libpetri_core::action::sync_action(|ctx| {
2993 let a = ctx.input::<i32>("p2")?;
2994 let b = ctx.input::<i32>("p3")?;
2995 ctx.output("p4", *a + *b)?;
2996 Ok(())
2997 }))
2998 .build();
2999
3000 let net = PetriNet::builder("fork_join")
3001 .transitions([t_fork, t_join])
3002 .build();
3003
3004 let mut marking = Marking::new();
3005 marking.add(&p1, Token::at(5, 0));
3006
3007 let mut executor =
3008 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3009
3010 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3011 executor.run_async(rx).await;
3012
3013 assert_eq!(executor.marking().count("p2"), 0);
3014 assert_eq!(executor.marking().count("p3"), 0);
3015 assert_eq!(executor.marking().count("p4"), 1);
3016 assert_eq!(*executor.marking().peek(&p4).unwrap(), 10);
3017 }
3018
3019 #[tokio::test]
3020 async fn async_xor_output_branching() {
3021 use libpetri_core::output::xor;
3022
3023 let p = Place::<i32>::new("p");
3024 let left = Place::<i32>::new("left");
3025 let right = Place::<i32>::new("right");
3026
3027 let t = Transition::builder("xor_t")
3028 .input(one(&p))
3029 .output(xor(vec![out_place(&left), out_place(&right)]))
3030 .action(libpetri_core::action::sync_action(|ctx| {
3031 let v = ctx.input::<i32>("p")?;
3032 if *v > 0 {
3033 ctx.output("left", *v)?;
3034 } else {
3035 ctx.output("right", *v)?;
3036 }
3037 Ok(())
3038 }))
3039 .build();
3040
3041 let net = PetriNet::builder("xor_test").transition(t).build();
3042
3043 let mut marking = Marking::new();
3045 marking.add(&p, Token::at(42, 0));
3046
3047 let mut executor =
3048 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3049 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3050 executor.run_async(rx).await;
3051
3052 assert_eq!(executor.marking().count("left"), 1);
3053 assert_eq!(executor.marking().count("right"), 0);
3054 assert_eq!(*executor.marking().peek(&left).unwrap(), 42);
3055 }
3056
3057 #[tokio::test]
3058 async fn async_loop_with_guard() {
3059 use libpetri_core::input::one_guarded;
3060
3061 let counter = Place::<i32>::new("counter");
3062 let done = Place::<i32>::new("done");
3063
3064 let t_loop = Transition::builder("loop")
3066 .input(one_guarded(&counter, |v: &i32| *v < 3))
3067 .output(out_place(&counter))
3068 .action(libpetri_core::action::sync_action(|ctx| {
3069 let v = ctx.input::<i32>("counter")?;
3070 ctx.output("counter", *v + 1)?;
3071 Ok(())
3072 }))
3073 .build();
3074
3075 let t_exit = Transition::builder("exit")
3077 .input(one_guarded(&counter, |v: &i32| *v >= 3))
3078 .output(out_place(&done))
3079 .action(fork())
3080 .build();
3081
3082 let net = PetriNet::builder("loop_net")
3083 .transitions([t_loop, t_exit])
3084 .build();
3085
3086 let mut marking = Marking::new();
3087 marking.add(&counter, Token::at(0, 0));
3088
3089 let mut executor =
3090 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3091
3092 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3093 executor.run_async(rx).await;
3094
3095 assert_eq!(executor.marking().count("done"), 1);
3096 assert_eq!(*executor.marking().peek(&done).unwrap(), 3);
3097 }
3098
3099 #[tokio::test]
3100 async fn async_delayed_fires_without_injection() {
3101 use libpetri_core::timing::delayed;
3102
3103 let p1 = Place::<i32>::new("p1");
3104 let p2 = Place::<i32>::new("p2");
3105
3106 let t1 = Transition::builder("t1")
3107 .input(one(&p1))
3108 .output(out_place(&p2))
3109 .timing(delayed(100))
3110 .action(fork())
3111 .build();
3112
3113 let net = PetriNet::builder("test").transition(t1).build();
3114 let mut marking = Marking::new();
3115 marking.add(&p1, Token::at(7, 0));
3116
3117 let mut executor = BitmapNetExecutor::<NoopEventStore>::new(
3118 &net,
3119 marking,
3120 ExecutorOptions {
3121 long_running: true,
3122 ..Default::default()
3123 },
3124 );
3125
3126 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3128 tokio::spawn(async move {
3129 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3130 drop(tx);
3131 });
3132 executor.run_async(rx).await;
3133
3134 assert_eq!(executor.marking().count("p2"), 1);
3135 assert_eq!(*executor.marking().peek(&p2).unwrap(), 7);
3136 }
3137
3138 #[tokio::test]
3139 #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3140 async fn async_timeout_produces_timeout_token() {
3141 use libpetri_core::output::{timeout_place, xor};
3142
3143 let p1 = Place::<i32>::new("p1");
3144 let success = Place::<i32>::new("success");
3145 let timeout_out = Place::<i32>::new("timeout_out");
3146
3147 let t1 = Transition::builder("t1")
3148 .input(one(&p1))
3149 .output(xor(vec![
3150 out_place(&success),
3151 timeout_place(50, &timeout_out),
3152 ]))
3153 .action(async_action(|mut ctx| async move {
3154 let v: i32 = *ctx.input::<i32>("p1")?;
3155 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3156 ctx.output("success", v)?;
3157 Ok(ctx)
3158 }))
3159 .build();
3160
3161 let net = PetriNet::builder("test").transition(t1).build();
3162 let mut marking = Marking::new();
3163 marking.add(&p1, Token::at(1, 0));
3164
3165 let mut executor =
3166 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3167
3168 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3169 executor.run_async(rx).await;
3170
3171 assert_eq!(executor.marking().count("timeout_out"), 1);
3172 assert_eq!(executor.marking().count("success"), 0);
3173 }
3174
3175 #[tokio::test]
3176 #[ignore = "Executor does not yet implement per-action timeout (Out::Timeout) in the async path"]
3177 async fn async_timeout_normal_when_fast() {
3178 use libpetri_core::output::{timeout_place, xor};
3179
3180 let p1 = Place::<i32>::new("p1");
3181 let success = Place::<i32>::new("success");
3182 let timeout_out = Place::<i32>::new("timeout_out");
3183
3184 let t1 = Transition::builder("t1")
3185 .input(one(&p1))
3186 .output(xor(vec![
3187 out_place(&success),
3188 timeout_place(500, &timeout_out),
3189 ]))
3190 .action(async_action(|mut ctx| async move {
3191 let v: i32 = *ctx.input::<i32>("p1")?;
3192 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3193 ctx.output("success", v)?;
3194 Ok(ctx)
3195 }))
3196 .build();
3197
3198 let net = PetriNet::builder("test").transition(t1).build();
3199 let mut marking = Marking::new();
3200 marking.add(&p1, Token::at(1, 0));
3201
3202 let mut executor =
3203 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3204
3205 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3206 executor.run_async(rx).await;
3207
3208 assert_eq!(executor.marking().count("success"), 1);
3209 assert_eq!(executor.marking().count("timeout_out"), 0);
3210 }
3211
3212 #[tokio::test]
3213 async fn async_event_store_records_token_added_for_injection() {
3214 let p1 = Place::<i32>::new("p1");
3215 let p2 = Place::<i32>::new("p2");
3216
3217 let t1 = Transition::builder("t1")
3218 .input(one(&p1))
3219 .output(out_place(&p2))
3220 .action(fork())
3221 .build();
3222
3223 let net = PetriNet::builder("test").transition(t1).build();
3224 let marking = Marking::new();
3225
3226 let mut executor = BitmapNetExecutor::<InMemoryEventStore>::new(
3227 &net,
3228 marking,
3229 ExecutorOptions {
3230 long_running: true,
3231 ..Default::default()
3232 },
3233 );
3234
3235 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3236
3237 tokio::spawn(async move {
3238 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3239 let token = Token::at(99, 0);
3240 tx.send(ExternalEvent {
3241 place_name: Arc::from("p1"),
3242 token: ErasedToken::from_typed(&token),
3243 })
3244 .unwrap();
3245 });
3246
3247 executor.run_async(rx).await;
3248
3249 let events = executor.event_store().events();
3250 assert!(
3252 events.iter().any(
3253 |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p1")
3254 ),
3255 "expected TokenAdded event for injected token into p1"
3256 );
3257 assert!(
3259 events.iter().any(
3260 |e| matches!(e, NetEvent::TokenAdded { place_name, .. } if &**place_name == "p2")
3261 ),
3262 "expected TokenAdded event for output token into p2"
3263 );
3264 }
3265
3266 #[tokio::test]
3267 async fn async_inhibitor_blocks_in_async() {
3268 let p1 = Place::<()>::new("p1");
3269 let p2 = Place::<()>::new("p2");
3270 let p_inh = Place::<()>::new("inh");
3271
3272 let t = Transition::builder("t1")
3273 .input(one(&p1))
3274 .output(out_place(&p2))
3275 .inhibitor(libpetri_core::arc::inhibitor(&p_inh))
3276 .action(fork())
3277 .build();
3278
3279 let net = PetriNet::builder("inhibitor").transition(t).build();
3280
3281 let mut marking = Marking::new();
3282 marking.add(&p1, Token::at((), 0));
3283 marking.add(&p_inh, Token::at((), 0));
3284
3285 let mut executor =
3286 BitmapNetExecutor::<NoopEventStore>::new(&net, marking, ExecutorOptions::default());
3287
3288 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<ExternalEvent>();
3289 executor.run_async(rx).await;
3290
3291 assert_eq!(executor.marking().count("p1"), 1);
3293 assert_eq!(executor.marking().count("p2"), 0);
3294 }
3295}