1#![expect(clippy::wildcard_enum_match_arm, clippy::unwrap_used, clippy::panic, clippy::expect_used)]
16
17use std::{
18 collections::{BTreeMap, VecDeque},
19 mem::replace,
20 sync::Arc,
21 task::{Context, Poll, Waker},
22};
23
24use either::Either::{Left, Right};
25use futures_util::{StreamExt, stream::FuturesUnordered};
26use override_external_effect::OverrideExternalEffect;
27pub use override_external_effect::OverrideResult;
28use parking_lot::Mutex;
29use tokio::{runtime::Handle, select, sync::watch};
30
31use crate::{
32 BLACKHOLE_NAME, BoxFuture, Effect, ExternalEffect, ExternalEffectAPI, Instant, Name, Resources, ScheduleId,
33 SendData, StageRef, StageResponse,
34 adapter::{Adapter, StageOrAdapter, find_recipient},
35 effect::{CallExtra, CanSupervise, ScheduleIds, StageEffect},
36 effect_box::EffectBox,
37 simulation::{
38 blocked::{Blocked, SendBlock},
39 inputs::Inputs,
40 random::EvalStrategy,
41 running::{
42 resume::{
43 resume_add_stage_internal, resume_call_internal, resume_call_send_internal,
44 resume_cancel_schedule_internal, resume_clock_internal, resume_contramap_internal,
45 resume_external_internal, resume_receive_internal, resume_schedule_internal, resume_send_internal,
46 resume_wait_internal, resume_wire_stage_internal,
47 },
48 scheduled_runnables::ScheduledRunnables,
49 },
50 state::{StageData, StageState},
51 },
52 stage_name,
53 stage_ref::StageStateRef,
54 stagegraph::StageGraphRunning,
55 time::Clock,
56 trace_buffer::{TerminationReason, TraceBuffer},
57};
58
59mod resume;
60mod scheduled_runnables;
61
62pub struct SimulationRunning {
74 stages: BTreeMap<Name, StageOrAdapter<StageData>>,
75 stage_count: usize,
76 inputs: Inputs,
77 effect: EffectBox,
78 clock: Arc<dyn Clock + Send + Sync>,
79 resources: Resources,
80 runnable: VecDeque<(Name, StageResponse)>,
81 scheduled: ScheduledRunnables,
82 mailbox_size: usize,
83 overrides: Vec<OverrideExternalEffect>,
84 breakpoints: Vec<(Name, Box<dyn Fn(&Effect) -> bool + Send + 'static>)>,
85 schedule_ids: ScheduleIds,
86 trace_buffer: Arc<Mutex<TraceBuffer>>,
87 eval_strategy: Box<dyn EvalStrategy>,
88 terminate: watch::Sender<bool>,
89 termination: watch::Receiver<bool>,
90 external_effects: FuturesUnordered<BoxFuture<'static, (Name, Box<dyn SendData>)>>,
91}
92
93impl SimulationRunning {
94 #[expect(clippy::too_many_arguments)]
95 pub(super) fn new(
96 stages: BTreeMap<Name, StageOrAdapter<StageData>>,
97 inputs: Inputs,
98 effect: EffectBox,
99 clock: Arc<dyn Clock + Send + Sync>,
100 resources: Resources,
101 mailbox_size: usize,
102 schedule_ids: ScheduleIds,
103 trace_buffer: Arc<Mutex<TraceBuffer>>,
104 eval_strategy: Box<dyn EvalStrategy>,
105 ) -> Self {
106 let (terminate, termination) = watch::channel(false);
107 Self {
108 stage_count: stages.len(),
109 stages,
110 inputs,
111 effect,
112 clock,
113 resources,
114 runnable: VecDeque::new(),
115 scheduled: ScheduledRunnables::new(),
116 mailbox_size,
117 overrides: Vec::new(),
118 breakpoints: Vec::new(),
119 schedule_ids,
120 trace_buffer,
121 eval_strategy,
122 terminate,
123 termination,
124 external_effects: FuturesUnordered::new(),
125 }
126 }
127
128 pub fn resources(&self) -> &Resources {
132 &self.resources
133 }
134
135 pub fn trace_buffer(&self) -> &Arc<Mutex<TraceBuffer>> {
136 &self.trace_buffer
137 }
138
139 pub fn has_runnable(&self) -> bool {
141 !self.runnable.is_empty()
142 }
143
144 pub fn breakpoint(&mut self, name: impl AsRef<str>, predicate: impl Fn(&Effect) -> bool + Send + 'static) {
146 self.breakpoints.push((Name::from(name.as_ref()), Box::new(predicate)));
147 }
148
149 pub fn clear_breakpoints(&mut self) {
151 self.breakpoints.clear();
152 }
153
154 pub fn clear_breakpoint(&mut self, name: impl AsRef<str>) {
156 self.breakpoints.retain(|(n, _)| n.as_str() != name.as_ref());
157 }
158
159 pub fn override_external_effect<T: ExternalEffect>(
169 &mut self,
170 remaining: usize,
171 mut transform: impl FnMut(Box<T>) -> OverrideResult<Box<T>, Box<dyn ExternalEffect>> + Send + 'static,
172 ) {
173 self.overrides.push(OverrideExternalEffect::new(
174 remaining,
175 Box::new(move |effect| {
176 if effect.is::<T>() {
177 #[expect(clippy::expect_used)]
180 match transform(effect.cast::<T>().expect("checked above")) {
181 OverrideResult::NoMatch(effect) => OverrideResult::NoMatch(effect as Box<dyn ExternalEffect>),
182 OverrideResult::Handled(msg) => OverrideResult::Handled(msg),
183 OverrideResult::Replaced(effect) => OverrideResult::Replaced(effect),
184 }
185 } else {
186 OverrideResult::NoMatch(effect)
187 }
188 }),
189 ));
190 }
191
192 pub fn now(&self) -> Instant {
194 self.clock.now()
195 }
196
197 pub fn skip_to_next_wakeup(&mut self, mut max_time: Option<Instant>) -> bool {
202 let mut tasks_run = false;
206 while let Some((t, r)) = self.scheduled.wakeup(max_time) {
207 if self.clock.now() < t {
208 self.clock.advance_to(t);
209 self.trace_buffer.lock().push_clock(t);
210 }
211 max_time = Some(t);
213 r(self);
214 tasks_run = true;
215 }
216
217 if !tasks_run && let Some(t) = max_time {
218 self.clock.advance_to(t);
219 self.trace_buffer.lock().push_clock(t);
220 }
221
222 tasks_run
223 }
224
225 pub fn next_wakeup(&self) -> Option<Instant> {
226 self.scheduled.next_wakeup_time()
227 }
228
229 fn schedule_wakeup(&mut self, id: ScheduleId, wakeup: impl FnOnce(&mut SimulationRunning) + Send + 'static) {
230 self.scheduled.schedule(id, Box::new(wakeup));
231 }
232
233 pub fn enqueue_msg<Msg: SendData>(&mut self, sr: impl AsRef<StageRef<Msg>>, msg: impl IntoIterator<Item = Msg>) {
241 for msg in msg.into_iter() {
242 let ok = deliver_message(
243 &mut self.stages,
244 self.mailbox_size,
245 sr.as_ref().name().clone(),
246 Box::new(msg) as Box<dyn SendData>,
247 );
248 if matches!(ok, DeliverMessageResult::Full(..)) {
249 panic!("stage `{}` mailbox is full", sr.as_ref().name());
250 }
251 }
252 }
253
254 pub fn mailbox_len<Msg>(&self, sr: impl AsRef<StageRef<Msg>>) -> usize {
260 let data = self.stages.get(sr.as_ref().name()).assert_stage("which has no mailbox");
261 data.mailbox.len()
262 }
263
264 pub fn get_state<Msg, St: SendData>(&self, sr: &StageStateRef<Msg, St>) -> Option<&St> {
276 let data = self.stages.get(sr.name()).assert_stage("which has no state");
277 match &data.state {
278 StageState::Idle(state) => Some(state.cast_ref::<St>().expect("internal state type error")),
279 _ => None,
280 }
281 }
282
283 pub fn effect(&mut self) -> Effect {
285 self.try_effect().unwrap()
286 }
287
288 pub fn try_effect(&mut self) -> Result<Effect, Blocked> {
293 if self.runnable.is_empty() {
294 let reason = block_reason(self);
295 tracing::debug!("blocking for reason: {:?}", reason);
296 return Err(reason);
297 }
298 let (name, response) = self.eval_strategy.pick_runnable(&mut self.runnable);
299
300 tracing::debug!(name = %name, "resuming stage");
301 self.trace_buffer.lock().push_resume(&name, &response);
302
303 let data = self.stages.get_mut(&name).assert_stage("which is not runnable");
304
305 let effect =
306 poll_stage(&self.trace_buffer, &self.schedule_ids, data, name, response, &self.effect, self.clock.now());
307
308 if !matches!(effect, Effect::Receive { .. }) {
309 self.trace_buffer.lock().push_suspend(&effect);
310 }
311
312 Ok(effect)
313 }
314
315 pub fn try_inputs(&mut self) -> InputsResult {
321 let mut delivered = Vec::new();
322 while let Some(mut envelope) = self.inputs.try_next() {
323 let msg = replace(&mut envelope.msg, Box::new(()));
324 match deliver_message(&mut self.stages, self.mailbox_size, envelope.name.clone(), msg) {
325 DeliverMessageResult::Delivered(_) => {
326 delivered.push(envelope.name);
327 envelope.tx.send(()).ok();
328 }
329 DeliverMessageResult::NotFound => {
330 tracing::warn!(name = %envelope.name, msg = ?envelope.msg, "stage was terminated, skipping input delivery");
331 envelope.tx.send(()).ok();
332 continue; }
334 DeliverMessageResult::Full(_, msg) => {
335 envelope.msg = msg;
336 let name = envelope.name.clone();
337 self.inputs.put_back(envelope);
338 if delivered.is_empty() {
339 return InputsResult::Blocked(name);
340 } else {
341 break;
342 }
343 }
344 }
345 }
346 InputsResult::Delivered(delivered)
347 }
348
349 pub async fn await_external_effect(&mut self) -> Option<Name> {
352 if self.external_effects.is_empty() {
353 return None;
354 }
355 let (at_stage, result) = select! {
356 x = self.external_effects.next() => x?,
357 env = self.inputs.next() => {
358 self.inputs.put_back(env);
359 return None;
360 }
361 };
362
363 let runnable = &mut self.runnable;
364 let run = &mut |name, response| {
365 runnable.push_back((name, response));
366 };
367
368 let Some(data) = self.stages.get_mut(&at_stage) else {
369 tracing::warn!(name = %at_stage, "stage was terminated, skipping external effect delivery");
370 return Some(at_stage);
371 };
372 let data = data.assert_stage("which cannot receive external effects");
373 resume_external_internal(data, result, run).expect("external effect is always runnable");
374 Some(at_stage)
375 }
376
377 pub async fn await_external_input(&mut self) {
379 let envelope = self.inputs.next().await;
380 tracing::debug!(target = %envelope.name, "awaited external input received");
381 self.inputs.put_back(envelope);
382 }
383
384 pub fn run_until_blocked_incl_effects(&mut self, rt: &Handle) -> Blocked {
388 loop {
389 match self.run_until_sleeping_or_blocked() {
390 Blocked::Busy { .. } => {
391 rt.block_on(self.await_external_effect());
392 }
393 Blocked::Sleeping { .. } => {
394 assert!(self.skip_to_next_wakeup(None));
395 }
396 blocked => return blocked,
397 }
398 }
399 }
400
401 pub fn run_until_blocked(&mut self) -> Blocked {
417 loop {
418 match self.run_until_sleeping_or_blocked() {
419 Blocked::Sleeping { .. } => assert!(self.skip_to_next_wakeup(None)),
420 blocked => return blocked,
421 }
422 }
423 }
424
425 pub fn run_until_blocked_or_time(&mut self, time: Instant) -> Blocked {
426 loop {
427 match self.run_until_sleeping_or_blocked() {
428 Blocked::Sleeping { next_wakeup } => {
429 if !self.skip_to_next_wakeup(Some(time)) {
430 return Blocked::Sleeping { next_wakeup };
431 }
432 }
433 blocked => return blocked,
434 }
435 }
436 }
437
438 pub fn run_until_sleeping_or_blocked(&mut self) -> Blocked {
445 self.receive_inputs();
446 loop {
447 if let Some(value) = self.run_effect() {
448 return value;
449 }
450 }
451 }
452
453 pub fn run_one_step(&mut self, rt: &Handle) -> Option<Blocked> {
455 self.receive_inputs();
456 match self.run_effect() {
457 Some(Blocked::Busy { .. }) => {
458 rt.block_on(self.await_external_effect());
459 None
460 }
461 Some(Blocked::Sleeping { .. }) => {
462 assert!(self.skip_to_next_wakeup(None));
463 None
464 }
465 other => other,
466 }
467 }
468
469 pub fn receive_inputs(&mut self) {
470 self.try_inputs();
471 let receiving = self
472 .stages
473 .iter()
474 .filter_map(|(n, d)| {
475 matches!(d, StageOrAdapter::Stage(StageData { waiting: Some(StageEffect::Receive), .. }))
476 .then_some(n.clone())
477 })
478 .collect::<Vec<_>>();
479 for name in receiving {
480 resume_receive_internal(self, &name).ok();
482 }
483 }
484
485 fn run_effect(&mut self) -> Option<Blocked> {
486 let effect = match self.try_effect() {
487 Ok(effect) => effect,
488 Err(blocked) => return Some(blocked),
489 };
490
491 tracing::debug!(runnable = ?self.runnable.iter().map(|r| r.0.as_str()).collect::<Vec<&str>>(), effect = ?effect, "run effect");
492
493 for (name, predicate) in &self.breakpoints {
494 if (predicate)(&effect) {
495 tracing::info!("breakpoint `{}` hit: {:?}", name, effect);
496 return Some(Blocked::Breakpoint(name.clone(), effect));
497 }
498 }
499
500 self.handle_effect(effect)
501 }
502
503 pub fn handle_effect(&mut self, effect: Effect) -> Option<Blocked> {
509 let runnable = &mut self.runnable;
510 let run = &mut |name, response| {
511 tracing::debug!(%name, ?response, "enqueuing stage");
512 runnable.push_back((name, response));
513 };
514
515 match effect {
516 Effect::Receive { at_stage: to } => {
517 match resume_receive_internal(self, &to) {
518 Ok(true) => {}
519 Ok(false) => {
520 return None;
522 }
523 Err(err) => {
524 tracing::warn!(%to, ?err, "cannot resume receive, shutting down simulation");
525 let terminated =
526 err.downcast::<resume::UnsupervisedChildTermination>().map(|e| e.0).unwrap_or(to);
527 return Some(Blocked::Terminated(terminated));
528 }
529 }
530 let Some(StageOrAdapter::Stage(data_to)) = self.stages.get_mut(&to) else {
531 return None;
532 };
533 let (from, msg) = data_to.senders.pop_front()?;
535 post_message(data_to, self.mailbox_size, msg);
536 let data_from = self
537 .stages
538 .get_mut(&from)
539 .log_termination(&from)?
540 .assert_stage("which cannot receive send effects");
541 resume_send_internal(
542 data_from,
543 &mut |name, response| {
544 tracing::debug!(%name, ?response, "enqueuing stage");
545 self.runnable.push_back((name, response));
546 },
547 to.clone(),
548 )
549 .expect("call is always runnable");
550 }
551 Effect::Send { from, to, .. } if to.is_empty() => {
552 tracing::info!(stage = %from, "message send to blackhole dropped");
553 let data_from =
554 self.stages.get_mut(&from).log_termination(&from)?.assert_stage("which cannot emit send effects");
555 resume_send_internal(data_from, run, to.clone()).expect("call is always runnable");
556 }
557 Effect::Send { from, to, msg } => {
558 let is_call = self
559 .stages
560 .get(&from)
561 .map(|d| {
562 matches!(
563 d,
564 StageOrAdapter::Stage(StageData { waiting: Some(StageEffect::Send(_, Some(_), _)), .. })
565 )
566 })
567 .unwrap_or_default();
568 if is_call {
569 let data_from = self
571 .stages
572 .get_mut(&from)
573 .log_termination(&from)?
576 .assert_stage("which cannot receive send effects");
577 let id = resume_send_internal(data_from, run, to.clone()).expect("call is always runnable");
578 if let Some(id) = id {
579 self.scheduled.remove(&id);
580 }
581 let data_to = self.stages.get_mut(&to).log_termination(&to)?.assert_stage("which cannot call");
582 resume_call_internal(data_to, run, id, msg).ok();
584 } else {
585 let mb = self.mailbox_size;
586 let resume = match deliver_message(&mut self.stages, mb, to.clone(), msg) {
587 DeliverMessageResult::Delivered(data_to) => {
588 let name = data_to.name.clone();
590 if let Err(err) = resume_receive_internal(self, &name) {
591 tracing::warn!(%from, %to, ?err, "cannot deliver send, shutting down simulation");
592 let terminated =
593 err.downcast::<resume::UnsupervisedChildTermination>().map(|e| e.0).unwrap_or(name);
594 return Some(Blocked::Terminated(terminated));
595 }
596 Some(from)
597 }
598 DeliverMessageResult::Full(data_to, send_data) => {
599 data_to.senders.push_back((from, send_data));
600 None
601 }
602 DeliverMessageResult::NotFound => {
603 tracing::debug!(stage = %to, "message send to terminated stage dropped");
604 Some(from)
605 }
606 };
607 if let Some(from) = resume {
608 let data_from =
609 self.stages.get_mut(&from).log_termination(&from)?.assert_stage("which cannot have sent");
610 resume_send_internal(
611 data_from,
612 &mut |name, response| {
613 tracing::debug!(%name, ?response, "enqueuing stage");
614 self.runnable.push_back((name, response));
615 },
616 to.clone(),
617 )
618 .expect("call is always runnable");
619 }
620 }
621 }
622 Effect::Call { from, to, duration: _, msg } => {
623 if let Err(err) = resume_call_send_internal(self, from.clone(), to.clone(), msg) {
624 tracing::warn!(%from, %to, %err, "couldn’t deliver call effect");
625 return Some(Blocked::Terminated(from));
626 }
627 }
628 Effect::Clock { at_stage } => {
629 let data = self
630 .stages
631 .get_mut(&at_stage)
632 .log_termination(&at_stage)?
633 .assert_stage("which cannot ask for the clock");
634 resume_clock_internal(data, run, self.clock.now()).expect("clock effect is always runnable");
635 }
636 Effect::Wait { at_stage, duration } => {
637 let now = self.clock.now();
638 let id = self.schedule_ids.next_at(now + duration);
639 self.schedule_wakeup(id, move |sim| {
640 let Some(data) = sim.stages.get_mut(&at_stage) else {
641 tracing::warn!(name = %at_stage, "stage was terminated, skipping wait effect delivery");
642 return;
643 };
644 resume_wait_internal(
645 data.assert_stage("which cannot wait"),
646 &mut |name, response| {
647 tracing::debug!(%name, ?response, "enqueuing stage");
648 sim.runnable.push_back((name, response));
649 },
650 sim.clock.now(),
651 )
652 .expect("wait effect is always runnable");
653 });
654 }
655 Effect::Schedule { at_stage, msg, id } => {
656 let data =
657 self.stages.get_mut(&at_stage).log_termination(&at_stage)?.assert_stage("which cannot schedule");
658 resume_schedule_internal(data, run, id).expect("schedule effect is always runnable");
659 let now = self.clock.now();
661 if id.time() > now {
662 self.schedule_wakeup(id, {
664 move |sim| {
665 let _ = deliver_message(&mut sim.stages, sim.mailbox_size, at_stage, msg);
666 }
667 });
668 } else {
669 let _ = deliver_message(&mut self.stages, self.mailbox_size, at_stage, msg);
671 }
672 }
673 Effect::CancelSchedule { at_stage, id } => {
674 let cancelled = self.scheduled.remove(&id).is_some();
675 let data = self
676 .stages
677 .get_mut(&at_stage)
678 .log_termination(&at_stage)?
679 .assert_stage("which cannot cancel schedule");
680 resume_cancel_schedule_internal(data, run, cancelled)
681 .expect("cancel_schedule effect is always runnable");
682 }
683 Effect::External { at_stage, mut effect } => {
684 let mut result = None;
685 for idx in 0..self.overrides.len() {
686 let over = &mut self.overrides[idx];
687 match over.transform(effect) {
688 OverrideResult::NoMatch(effect2) => {
689 effect = effect2;
690 }
691 OverrideResult::Handled(msg) => {
692 result = Some(msg);
693 effect = Box::new(());
695 if over.register_use_and_get_removal() {
696 self.overrides.remove(idx);
697 }
698 break;
699 }
700 OverrideResult::Replaced(effect2) => {
701 effect = effect2;
702 if over.register_use_and_get_removal() {
703 self.overrides.remove(idx);
704 }
705 break;
706 }
707 }
708 }
709 if let Some(result) = result {
710 let data = self
711 .stages
712 .get_mut(&at_stage)
713 .log_termination(&at_stage)?
714 .assert_stage("which cannot receive external effects");
715 resume_external_internal(data, result, run).expect("external effect is always runnable");
716 return None;
717 }
718 let resources = self.resources.clone();
719 self.external_effects.push(Box::pin(async move { (at_stage, effect.run(resources).await) }));
720 }
721 Effect::Terminate { at_stage } => {
722 tracing::info!(stage = %at_stage, "terminated");
723 let (supervised_by, msg) = self.terminate_stage(at_stage.clone(), TerminationReason::Voluntary)?;
724 if supervised_by == *BLACKHOLE_NAME {
725 self.terminate.send_replace(true);
727 return Some(Blocked::Terminated(at_stage));
728 }
729 let supervisor = self.stages.get_mut(&supervised_by).assert_stage("which cannot supervise");
730 supervisor.tombstones.push_back(msg);
731 if let Err(err) = resume_receive_internal(self, &supervised_by) {
732 tracing::warn!(%supervised_by, ?err, "shutting down simulation");
733 let terminated =
734 err.downcast::<resume::UnsupervisedChildTermination>().map(|e| e.0).unwrap_or(supervised_by);
735 return Some(Blocked::Terminated(terminated));
736 }
737 }
738 Effect::AddStage { at_stage, name } => {
739 let name = stage_name(&mut self.stage_count, name.as_str());
740 let data =
741 self.stages.get_mut(&at_stage).log_termination(&at_stage)?.assert_stage("which cannot add a stage");
742 resume_add_stage_internal(data, run, name).expect("add stage effect is always runnable");
743 }
744 Effect::WireStage { at_stage, name, initial_state, tombstone } => {
745 self.trace_buffer.lock().push_state(&name, &initial_state);
746 let data = self
747 .stages
748 .get_mut(&at_stage)
749 .log_termination(&at_stage)?
750 .assert_stage("which cannot wire a stage");
751 let transition = resume_wire_stage_internal(data, run).expect("wire stage effect is always runnable");
752 let tombstone = tombstone.try_cast::<CanSupervise>().err();
753 self.stages.insert(
754 name.clone(),
755 StageOrAdapter::Stage(StageData {
756 name,
757 mailbox: VecDeque::new(),
758 tombstones: VecDeque::new(),
759 state: StageState::Idle(initial_state),
760 transition: (transition)(self.effect.clone()),
761 waiting: Some(StageEffect::Receive),
762 senders: VecDeque::new(),
763 supervised_by: at_stage,
764 tombstone,
765 }),
766 );
767 }
768 Effect::Contramap { at_stage, original, new_name } => {
769 let name = stage_name(&mut self.stage_count, new_name.as_str());
770 let data = self.stages.get_mut(&at_stage).assert_stage("which cannot call contramap");
771 let transform = resume_contramap_internal(data, run, original.clone(), name.clone())
772 .expect("contramap effect is always runnable");
773 self.stages
774 .insert(name.clone(), StageOrAdapter::Adapter(Adapter { name, target: original, transform }));
775 }
776 }
777 None
778 }
779
780 fn terminate_stage(
785 &mut self,
786 at_stage: Name,
787 reason: TerminationReason,
788 ) -> Option<(Name, Result<Box<dyn SendData>, Name>)> {
789 let Some(data) = self.stages.get_mut(&at_stage) else {
794 tracing::warn!(name = %at_stage, "stage was already terminated, skipping terminate stage effect");
795 return None;
796 };
797 let data = data.assert_stage("which cannot terminate");
798
799 data.state = StageState::Terminating;
802
803 self.runnable.retain(|(n, _)| n != &at_stage);
805
806 let runnable = &mut self.runnable;
807 let run = &mut |name, response| {
808 tracing::debug!(%name, ?response, "enqueuing stage");
809 runnable.push_back((name, response));
810 };
811 let senders = std::mem::take(&mut data.senders);
812 for (waiting, _) in senders {
813 let data = self.stages.get_mut(&waiting).assert_stage("which cannot send");
814 if let Err(err) = resume_send_internal(data, run, at_stage.clone()) {
815 tracing::error!(from = %waiting, to = %at_stage, %err, "failed to resume send");
816 continue;
817 };
818 }
819
820 let children = self
821 .stages
822 .iter()
823 .filter(|(_, d)| {
824 matches!(d, StageOrAdapter::Stage(StageData { supervised_by, .. })
825 if supervised_by == &at_stage)
826 })
827 .map(|(n, _)| n.clone())
828 .collect::<Vec<_>>();
829 for child in children {
830 tracing::info!(stage = %child, parent = %at_stage, "terminating child stage");
831 self.terminate_stage(child, TerminationReason::Aborted);
832 }
833 self.trace_buffer.lock().push_terminated(&at_stage, reason);
834 let Some(StageOrAdapter::Stage(stage)) = self.stages.remove(&at_stage) else {
835 unreachable!();
836 };
837 Some((stage.supervised_by, stage.tombstone.ok_or(at_stage)))
838 }
839
840 #[cfg(test)]
845 fn invariants(&self) {
846 for (name, data) in &self.stages {
847 let StageOrAdapter::Stage(data) = data else {
848 if self.runnable.iter().any(|(n, _)| n == name) {
849 panic!("stage `{name}` is runnable but is an adapter");
850 }
851 continue;
852 };
853 let waiting = &data.waiting;
854 match &data.state {
855 StageState::Idle(_) => {
856 if !matches!(waiting, Some(StageEffect::Receive)) {
857 panic!("stage `{name}` is Idle but waiting for {waiting:?}");
858 }
859 }
860 StageState::Running(_) => {
861 if matches!(waiting, Some(StageEffect::Receive)) {
862 panic!("stage `{name}` is Running but waiting for Receive");
863 }
864 }
865 StageState::Terminating => {
866 if waiting.is_some() {
867 panic!("stage `{name}` is Terminating but waiting for {waiting:?}");
868 }
869 return;
870 }
871 }
872 let waiting = waiting.is_some();
873 let runnable = self.runnable.iter().any(|(n, _)| n == name);
874 if waiting && runnable {
875 panic!("stage `{name}` is waiting for an effect and runnable");
876 }
877 if !waiting && !runnable {
878 panic!("stage `{name}` is not waiting for an effect and not runnable");
879 }
880 }
881 }
882
883 pub fn resume_receive<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>) -> anyhow::Result<()> {
885 resume_receive_internal(self, at_stage.as_ref().name()).and_then(|resumed| {
886 if resumed { Ok(()) } else { Err(anyhow::anyhow!("stage was not waiting for a receive effect")) }
887 })
888 }
889
890 pub fn resume_send<Msg1, Msg2: SendData>(
896 &mut self,
897 from: impl AsRef<StageRef<Msg1>>,
898 to: impl AsRef<StageRef<Msg2>>,
899 mut msg: Option<Msg2>,
900 ) -> anyhow::Result<()> {
901 let to = to.as_ref();
902 if to.extra().is_none()
903 && let Some(msg) = msg.take()
904 && deliver_message(&mut self.stages, self.mailbox_size, to.name().clone(), Box::new(msg)).is_full()
905 {
906 anyhow::bail!("mailbox is full while resuming send");
907 }
908
909 let data = self.stages.get_mut(from.as_ref().name()).assert_stage("which cannot send");
910 let id = resume_send_internal(
911 data,
912 &mut |name, response| {
913 tracing::debug!(%name, ?response, "enqueuing stage");
914 self.runnable.push_back((name, response));
915 },
916 to.name().clone(),
917 )?;
918
919 if let Some(id) = id
920 && let Some(msg) = msg
921 {
922 self.scheduled.remove(&id);
923 let data = self.stages.get_mut(to.name()).assert_stage("which cannot call");
924 resume_call_internal(
925 data,
926 &mut |name, response| {
927 tracing::debug!(%name, ?response, "enqueuing stage");
928 self.runnable.push_back((name, response));
929 },
930 Some(id),
931 Box::new(msg),
932 )?;
933 }
934
935 Ok(())
936 }
937
938 pub fn resume_clock<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>, time: Instant) -> anyhow::Result<()> {
944 let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot ask for the clock");
945 resume_clock_internal(
946 data,
947 &mut |name, response| {
948 tracing::debug!(%name, ?response, "enqueuing stage");
949 self.runnable.push_back((name, response));
950 },
951 time,
952 )
953 }
954
955 pub fn resume_wait<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>, time: Instant) -> anyhow::Result<()> {
963 let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot wait");
964 resume_wait_internal(
965 data,
966 &mut |name, response| {
967 tracing::debug!(%name, ?response, "enqueuing stage");
968 self.runnable.push_back((name, response));
969 },
970 time,
971 )
972 }
973
974 pub fn resume_call_send<Msg: SendData, Msg2: SendData>(
980 &mut self,
981 from: impl AsRef<StageRef<Msg>>,
982 to: impl AsRef<StageRef<Msg2>>,
983 msg: Msg2,
984 ) -> anyhow::Result<()> {
985 resume_call_send_internal(self, from.as_ref().name().clone(), to.as_ref().name().clone(), Box::new(msg))
986 .and_then(
987 |resumed| {
988 if resumed { Ok(()) } else { Err(anyhow::anyhow!("stage was not waiting for a call effect")) }
989 },
990 )
991 }
992
993 pub fn resume_call<Msg: SendData, Resp: SendData>(
999 &mut self,
1000 at_stage: impl AsRef<StageRef<Msg>>,
1001 msg: Resp,
1002 ) -> anyhow::Result<()> {
1003 let at_stage = at_stage.as_ref();
1004 let data = self.stages.get_mut(at_stage.name()).assert_stage("which cannot make a call");
1005 resume_call_internal(
1006 data,
1007 &mut |name, response| {
1008 tracing::debug!(%name, ?response, "enqueuing stage");
1009 self.runnable.push_back((name, response));
1010 },
1011 None,
1012 Box::new(msg),
1013 )
1014 }
1015
1016 pub fn resume_external_box(&mut self, at_stage: impl AsRef<Name>, result: Box<dyn SendData>) -> anyhow::Result<()> {
1022 let data = self.stages.get_mut(at_stage.as_ref()).assert_stage("which cannot receive external effects");
1023 resume_external_internal(data, result, &mut |name, response| {
1024 tracing::debug!(%name, ?response, "enqueuing stage");
1025 self.runnable.push_back((name, response));
1026 })
1027 }
1028
1029 pub fn resume_external<Eff: ExternalEffectAPI>(
1035 &mut self,
1036 at_stage: impl AsRef<Name>,
1037 result: Eff::Response,
1038 ) -> anyhow::Result<()> {
1039 let data = self.stages.get_mut(at_stage.as_ref()).assert_stage("which cannot receive external effects");
1040 resume_external_internal(data, Box::new(result), &mut |name, response| {
1041 tracing::debug!(%name, ?response, "enqueuing stage");
1042 self.runnable.push_back((name, response));
1043 })
1044 }
1045
1046 pub fn resume_add_stage<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>, name: Name) -> anyhow::Result<()> {
1052 let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot add a stage");
1053 resume_add_stage_internal(
1054 data,
1055 &mut |name, response| {
1056 tracing::debug!(%name, ?response, "enqueuing stage");
1057 self.runnable.push_back((name, response));
1058 },
1059 name,
1060 )
1061 }
1062
1063 pub fn resume_wire_stage<Msg>(
1069 &mut self,
1070 at_stage: impl AsRef<StageRef<Msg>>,
1071 name: Name,
1072 initial_state: Box<dyn SendData>,
1073 tombstone: Option<Box<dyn SendData>>,
1074 ) -> anyhow::Result<()> {
1075 let at_stage = at_stage.as_ref();
1076 let data = self.stages.get_mut(at_stage.name()).assert_stage("which cannot wire a stage");
1077 let transition = resume_wire_stage_internal(data, &mut |name, response| {
1078 tracing::debug!(%name, ?response, "enqueuing stage");
1079 self.runnable.push_back((name, response));
1080 })?;
1081
1082 self.stages.insert(
1083 name.clone(),
1084 StageOrAdapter::Stage(StageData {
1085 name,
1086 mailbox: VecDeque::new(),
1087 tombstones: VecDeque::new(),
1088 state: StageState::Idle(initial_state),
1089 transition: (transition)(self.effect.clone()),
1090 waiting: Some(StageEffect::Receive),
1091 senders: VecDeque::new(),
1092 supervised_by: at_stage.name().clone(),
1093 tombstone,
1094 }),
1095 );
1096 Ok(())
1097 }
1098
1099 pub fn resume_contramap<Msg>(
1101 &mut self,
1102 at_stage: impl AsRef<StageRef<Msg>>,
1103 original: Name,
1104 name: Name,
1105 ) -> anyhow::Result<()> {
1106 let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot contramap");
1107 let transform = resume_contramap_internal(
1108 data,
1109 &mut |name, response| {
1110 tracing::debug!(%name, ?response, "enqueuing stage");
1111 self.runnable.push_back((name, response));
1112 },
1113 original.clone(),
1114 name.clone(),
1115 )?;
1116 self.stages.insert(name.clone(), StageOrAdapter::Adapter(Adapter { name, target: original, transform }));
1117 Ok(())
1118 }
1119}
1120
1121trait AssertStage<'a> {
1122 type Output: 'a;
1123 fn assert_stage(self, hint: &'static str) -> Self::Output
1124 where
1125 Self: 'a;
1126}
1127impl<'a> AssertStage<'a> for &'a mut StageOrAdapter<StageData> {
1128 type Output = &'a mut StageData;
1129 fn assert_stage(self, hint: &'static str) -> Self::Output {
1130 match self {
1131 StageOrAdapter::Stage(stage) => stage,
1132 StageOrAdapter::Adapter(_) => {
1133 panic!("stage is an adapter, {hint}")
1134 }
1135 }
1136 }
1137}
1138impl<'a> AssertStage<'a> for Option<&'a mut StageOrAdapter<StageData>> {
1139 type Output = &'a mut StageData;
1140 fn assert_stage(self, hint: &'static str) -> Self::Output {
1141 let this = match self {
1142 Some(this) => this,
1143 None => panic!("stage not found"),
1144 };
1145 match this {
1146 StageOrAdapter::Stage(stage) => stage,
1147 StageOrAdapter::Adapter(_) => {
1148 panic!("stage is an adapter, {hint}")
1149 }
1150 }
1151 }
1152}
1153impl<'a> AssertStage<'a> for Option<&'a StageOrAdapter<StageData>> {
1154 type Output = &'a StageData;
1155 fn assert_stage(self, hint: &'static str) -> Self::Output {
1156 let this = match self {
1157 Some(this) => this,
1158 None => panic!("stage not found"),
1159 };
1160 match this {
1161 StageOrAdapter::Stage(stage) => stage,
1162 StageOrAdapter::Adapter(_) => {
1163 panic!("stage is an adapter, {hint}")
1164 }
1165 }
1166 }
1167}
1168
1169trait LogTermination<'a> {
1170 type Output: 'a;
1171 fn log_termination(self, name: &Name) -> Self::Output
1172 where
1173 Self: 'a;
1174}
1175impl<'a> LogTermination<'a> for Option<&'a mut StageOrAdapter<StageData>> {
1176 type Output = Option<&'a mut StageOrAdapter<StageData>>;
1177 fn log_termination(self, name: &Name) -> Self::Output {
1178 if self.is_none() {
1179 tracing::warn!(%name, "stage was terminated, skipping effect handling");
1180 }
1181 self
1182 }
1183}
1184
1185impl StageGraphRunning for SimulationRunning {
1186 fn is_terminated(&self) -> bool {
1187 *self.termination.borrow()
1188 }
1189
1190 fn termination(&self) -> BoxFuture<'static, ()> {
1191 let mut rx = self.termination.clone();
1192 Box::pin(async move {
1193 rx.wait_for(|x| *x).await.ok();
1194 })
1195 }
1196}
1197
1198mod override_external_effect {
1200 use super::*;
1201
1202 pub struct OverrideExternalEffect {
1203 remaining: usize,
1204 transform: Box<
1205 dyn FnMut(Box<dyn ExternalEffect>) -> OverrideResult<Box<dyn ExternalEffect>, Box<dyn ExternalEffect>>
1206 + Send
1207 + 'static,
1208 >,
1209 }
1210
1211 pub enum OverrideResult<In, Out> {
1215 NoMatch(In),
1217 Handled(Box<dyn SendData>),
1219 Replaced(Out),
1221 }
1222
1223 impl OverrideExternalEffect {
1224 pub fn new(
1225 remaining: usize,
1226 transform: Box<
1227 dyn FnMut(Box<dyn ExternalEffect>) -> OverrideResult<Box<dyn ExternalEffect>, Box<dyn ExternalEffect>>
1228 + Send
1229 + 'static,
1230 >,
1231 ) -> Self {
1232 Self { remaining, transform }
1233 }
1234
1235 pub fn transform(
1236 &mut self,
1237 effect: Box<dyn ExternalEffect>,
1238 ) -> OverrideResult<Box<dyn ExternalEffect>, Box<dyn ExternalEffect>> {
1239 (self.transform)(effect)
1240 }
1241
1242 pub fn register_use_and_get_removal(&mut self) -> bool {
1243 if self.remaining == usize::MAX {
1244 return false;
1245 }
1246 self.remaining -= 1;
1247 self.remaining == 0
1248 }
1249 }
1250}
1251
1252#[derive(Debug, PartialEq, Eq)]
1253pub enum InputsResult {
1254 Delivered(Vec<Name>),
1255 Blocked(Name),
1256}
1257
1258fn block_reason(sim: &SimulationRunning) -> Blocked {
1259 debug_assert!(sim.runnable.is_empty(), "runnable must be empty");
1260 if sim
1261 .stages
1262 .values()
1263 .filter_map(|d| d.as_stage().and_then(|d| d.waiting.as_ref()))
1264 .all(|v| matches!(v, StageEffect::Receive))
1265 {
1266 if let Some(next_wakeup) = sim.next_wakeup() {
1267 return Blocked::Sleeping { next_wakeup };
1268 } else {
1269 return Blocked::Idle;
1270 }
1271 }
1272 let mut send = Vec::new();
1273 let mut busy = Vec::new();
1274 let mut sleep = Vec::new();
1275 for (k, v) in sim.stages.iter().filter_map(|(k, d)| d.as_stage().and_then(|d| d.waiting.as_ref()).map(|w| (k, w))) {
1276 match v {
1277 StageEffect::Send(name, None, _msg) => {
1278 send.push(SendBlock { from: k.clone(), to: name.clone(), is_call: false })
1279 }
1280 StageEffect::Receive => {}
1281 StageEffect::Wait(..) => sleep.push(k.clone()),
1282 StageEffect::Call(_, _, CallExtra::Scheduled(id)) if sim.scheduled.contains(id) => sleep.push(k.clone()),
1283 _ => busy.push(k.clone()),
1284 }
1285 }
1286
1287 if !busy.is_empty() {
1288 Blocked::Busy { stages: busy, external_effects: sim.external_effects.len() }
1289 } else if !sleep.is_empty() {
1290 Blocked::Sleeping { next_wakeup: sim.next_wakeup().expect("stages are waiting for a wait effect") }
1291 } else if !send.is_empty() {
1292 Blocked::Deadlock(send)
1293 } else {
1294 Blocked::Idle
1295 }
1296}
1297
1298pub(crate) fn poll_stage(
1303 trace_buffer: &Arc<Mutex<TraceBuffer>>,
1304 schedule_ids: &ScheduleIds,
1305 data: &mut StageData,
1306 name: Name,
1307 response: StageResponse,
1308 effect: &EffectBox,
1309 now: Instant,
1310) -> Effect {
1311 let StageState::Running(pin) = &mut data.state else {
1312 panic!("runnable stage `{name}` is not running but {:?}", data.state);
1313 };
1314
1315 *effect.lock() = Some(Right(response));
1316 let result = pin.as_mut().poll(&mut Context::from_waker(Waker::noop()));
1317
1318 if let Poll::Ready(state) = result {
1319 trace_buffer.lock().push_state(&name, &state);
1320 data.state = StageState::Idle(state);
1321 data.waiting = Some(StageEffect::Receive);
1322 Effect::Receive { at_stage: name }
1323 } else {
1324 let stage_effect = match effect.lock().take() {
1325 Some(Left(effect)) => effect,
1326 Some(Right(response)) => {
1327 panic!("found response {response:?} instead of effect when polling stage `{name}`")
1328 }
1329 None => {
1330 panic!("stage `{name}` returned without awaiting any tracked effect")
1331 }
1332 };
1333 let (wait_effect, effect) = stage_effect.split(name.clone(), schedule_ids, now);
1334 if !matches!(wait_effect, StageEffect::Terminate) {
1335 data.waiting = Some(wait_effect);
1336 }
1337 effect
1338 }
1339}
1340
1341enum DeliverMessageResult<'a> {
1342 Delivered(&'a mut StageData),
1343 Full(&'a mut StageData, Box<dyn SendData>),
1344 NotFound,
1345}
1346
1347impl<'a> DeliverMessageResult<'a> {
1348 pub fn is_full(&self) -> bool {
1349 matches!(self, DeliverMessageResult::Full(..))
1350 }
1351}
1352
1353fn deliver_message(
1358 stages: &mut BTreeMap<Name, StageOrAdapter<StageData>>,
1359 mailbox_size: usize,
1360 name: Name,
1361 msg: Box<dyn SendData>,
1362) -> DeliverMessageResult<'_> {
1363 let Some((data, msg)) = find_recipient(stages, name, Some(msg)) else {
1364 return DeliverMessageResult::NotFound;
1365 };
1366
1367 post_message(data, mailbox_size, msg)
1368}
1369
1370fn post_message(data: &mut StageData, mailbox_size: usize, msg: Box<dyn SendData>) -> DeliverMessageResult<'_> {
1371 if data.mailbox.len() >= mailbox_size {
1372 return DeliverMessageResult::Full(data, msg);
1373 }
1374 data.mailbox.push_back(msg);
1375 DeliverMessageResult::Delivered(data)
1376}
1377
1378#[test]
1379fn simulation_invariants() {
1380 use crate::StageGraph;
1381
1382 tracing_subscriber::fmt()
1383 .with_test_writer()
1384 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1385 .try_init()
1386 .ok();
1387
1388 #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
1389 struct Msg(Option<StageRef<()>>);
1390
1391 let mut network = crate::simulation::SimulationBuilder::default();
1392 let stage = network.stage("stage", async |_state, _msg: Msg, eff| {
1393 eff.send(&eff.me(), Msg(None)).await;
1394 eff.clock().await;
1395 eff.wait(std::time::Duration::from_secs(1)).await;
1396 eff.call(&eff.me(), std::time::Duration::from_secs(1), |cr| Msg(Some(cr))).await;
1397 true
1398 });
1399
1400 let stage = network.wire_up(stage, false);
1401 let mut sim = network.run();
1402
1403 #[expect(clippy::type_complexity)]
1404 let ops: [(
1405 Box<dyn Fn(&Effect) -> bool>,
1406 Box<dyn Fn(&mut SimulationRunning, &StageRef<Msg>) -> anyhow::Result<()>>,
1407 &'static str,
1408 ); _] = [
1409 (
1410 Box::new(|eff: &Effect| matches!(eff, Effect::Receive { .. })),
1411 Box::new(|sim, stage| sim.resume_receive(stage)),
1412 "resume_receive",
1413 ),
1414 (
1415 Box::new(|eff: &Effect| matches!(eff, Effect::Send { .. })),
1416 Box::new(|sim, stage| sim.resume_send(stage, stage, Some(Msg(None)))),
1417 "resume_send",
1418 ),
1419 (
1420 Box::new(|eff: &Effect| matches!(eff, Effect::Clock { .. })),
1421 Box::new(|sim, stage| sim.resume_clock(stage, Instant::now())),
1422 "resume_clock",
1423 ),
1424 (
1425 Box::new(|eff: &Effect| matches!(eff, Effect::Wait { .. })),
1426 Box::new(|sim, stage| sim.resume_wait(stage, Instant::now())),
1427 "resume_wait",
1428 ),
1429 (
1430 Box::new(|eff: &Effect| matches!(eff, Effect::Call { .. })),
1431 Box::new(|sim, stage| sim.resume_call(stage, ())),
1432 "resume_call",
1433 ),
1434 ];
1435
1436 sim.invariants();
1437 sim.enqueue_msg(&stage, [Msg(None)]);
1438 sim.invariants();
1439
1440 for idx in 0..ops.len() {
1441 let effect = if idx == 0 { Effect::Receive { at_stage: "stage".into() } } else { sim.effect() };
1442 tracing::info!(effect = ?effect, "effect");
1443 assert!(ops[idx].0(&effect), "effect {effect:?} should match predicate for `{idx}`");
1444 for (pred, op, name) in &ops {
1445 if !pred(&effect) {
1446 tracing::info!("op `{}` should not work", name);
1447 op(&mut sim, &stage.clone().without_state()).unwrap_err();
1448 sim.invariants();
1449 }
1450 }
1451 for (pred, op, name) in &ops {
1452 if pred(&effect) {
1453 tracing::info!("op `{}` should work", name);
1454 op(&mut sim, &stage.clone().without_state()).unwrap();
1455 sim.invariants();
1456 }
1457 }
1458 }
1459 tracing::info!("final invariants");
1460 sim.effect().assert_receive(&stage);
1461 let state = sim.get_state(&stage).unwrap();
1462 assert!(state);
1463}