Skip to main content

pure_stage/simulation/running/
mod.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(clippy::wildcard_enum_match_arm, clippy::unwrap_used, clippy::panic, clippy::expect_used)]
16
17use std::{
18    collections::{BTreeMap, VecDeque},
19    mem::replace,
20    sync::Arc,
21    task::{Context, Poll, Waker},
22};
23
24use either::Either::{Left, Right};
25use futures_util::{StreamExt, stream::FuturesUnordered};
26use override_external_effect::OverrideExternalEffect;
27pub use override_external_effect::OverrideResult;
28use parking_lot::Mutex;
29use tokio::{runtime::Handle, select, sync::watch};
30
31use crate::{
32    BLACKHOLE_NAME, BoxFuture, Effect, ExternalEffect, ExternalEffectAPI, Instant, Name, Resources, ScheduleId,
33    SendData, StageRef, StageResponse,
34    adapter::{Adapter, StageOrAdapter, find_recipient},
35    effect::{CallExtra, CanSupervise, ScheduleIds, StageEffect},
36    effect_box::EffectBox,
37    simulation::{
38        blocked::{Blocked, SendBlock},
39        inputs::Inputs,
40        random::EvalStrategy,
41        running::{
42            resume::{
43                resume_add_stage_internal, resume_call_internal, resume_call_send_internal,
44                resume_cancel_schedule_internal, resume_clock_internal, resume_contramap_internal,
45                resume_external_internal, resume_receive_internal, resume_schedule_internal, resume_send_internal,
46                resume_wait_internal, resume_wire_stage_internal,
47            },
48            scheduled_runnables::ScheduledRunnables,
49        },
50        state::{StageData, StageState},
51    },
52    stage_name,
53    stage_ref::StageStateRef,
54    stagegraph::StageGraphRunning,
55    time::Clock,
56    trace_buffer::{TerminationReason, TraceBuffer},
57};
58
59mod resume;
60mod scheduled_runnables;
61
62/// A handle to a running [`crate::simulation::SimulationBuilder`].
63///
64/// It allows fine-grained control over single-stepping the simulation and when each
65/// stage effect is resumed (using [`Self::try_effect`] and [`Self::handle_effect`],
66/// respectively). This means that any interleaving of computations can be exercised.
67/// Where this is not needed, you use [`Self::run_until_blocked`] to automate the
68/// sending and receiving of messages within the simulated processing network.
69///
70/// Note that all stages start out in the state of waiting to receive their first message,
71/// so you need to use [`resume_receive`](Self::resume_receive) to get them running.
72/// See also [`run_until_blocked`](Self::run_until_blocked) for how to achieve this.
73pub struct SimulationRunning {
74    stages: BTreeMap<Name, StageOrAdapter<StageData>>,
75    stage_count: usize,
76    inputs: Inputs,
77    effect: EffectBox,
78    clock: Arc<dyn Clock + Send + Sync>,
79    resources: Resources,
80    runnable: VecDeque<(Name, StageResponse)>,
81    scheduled: ScheduledRunnables,
82    mailbox_size: usize,
83    overrides: Vec<OverrideExternalEffect>,
84    breakpoints: Vec<(Name, Box<dyn Fn(&Effect) -> bool + Send + 'static>)>,
85    schedule_ids: ScheduleIds,
86    trace_buffer: Arc<Mutex<TraceBuffer>>,
87    eval_strategy: Box<dyn EvalStrategy>,
88    terminate: watch::Sender<bool>,
89    termination: watch::Receiver<bool>,
90    external_effects: FuturesUnordered<BoxFuture<'static, (Name, Box<dyn SendData>)>>,
91}
92
93impl SimulationRunning {
94    #[expect(clippy::too_many_arguments)]
95    pub(super) fn new(
96        stages: BTreeMap<Name, StageOrAdapter<StageData>>,
97        inputs: Inputs,
98        effect: EffectBox,
99        clock: Arc<dyn Clock + Send + Sync>,
100        resources: Resources,
101        mailbox_size: usize,
102        schedule_ids: ScheduleIds,
103        trace_buffer: Arc<Mutex<TraceBuffer>>,
104        eval_strategy: Box<dyn EvalStrategy>,
105    ) -> Self {
106        let (terminate, termination) = watch::channel(false);
107        Self {
108            stage_count: stages.len(),
109            stages,
110            inputs,
111            effect,
112            clock,
113            resources,
114            runnable: VecDeque::new(),
115            scheduled: ScheduledRunnables::new(),
116            mailbox_size,
117            overrides: Vec::new(),
118            breakpoints: Vec::new(),
119            schedule_ids,
120            trace_buffer,
121            eval_strategy,
122            terminate,
123            termination,
124            external_effects: FuturesUnordered::new(),
125        }
126    }
127
128    /// Get the resources collection for the network.
129    ///
130    /// This can be used during tests to modify the available resources at specific points in time.
131    pub fn resources(&self) -> &Resources {
132        &self.resources
133    }
134
135    pub fn trace_buffer(&self) -> &Arc<Mutex<TraceBuffer>> {
136        &self.trace_buffer
137    }
138
139    /// Return true if some stages are runnable.
140    pub fn has_runnable(&self) -> bool {
141        !self.runnable.is_empty()
142    }
143
144    /// Install a breakpoint that will be hit when an effect matching the given predicate is encountered.
145    pub fn breakpoint(&mut self, name: impl AsRef<str>, predicate: impl Fn(&Effect) -> bool + Send + 'static) {
146        self.breakpoints.push((Name::from(name.as_ref()), Box::new(predicate)));
147    }
148
149    /// Remove all breakpoints.
150    pub fn clear_breakpoints(&mut self) {
151        self.breakpoints.clear();
152    }
153
154    /// Remove the breakpoint with the given name.
155    pub fn clear_breakpoint(&mut self, name: impl AsRef<str>) {
156        self.breakpoints.retain(|(n, _)| n.as_str() != name.as_ref());
157    }
158
159    /// Install an override for the given external effect type.
160    ///
161    /// The `remaining` parameter is the number of times the override will be applied
162    /// (use `usize::MAX` to apply the override indefinitely).
163    /// When the override is applied, the `transform` function is called with the effect
164    /// and the result is used to possibly replace the effect.
165    ///
166    /// If the override result is [`OverrideResult::NoMatch`], the effect is passed to overrides
167    /// installed later than this one.
168    pub fn override_external_effect<T: ExternalEffect>(
169        &mut self,
170        remaining: usize,
171        mut transform: impl FnMut(Box<T>) -> OverrideResult<Box<T>, Box<dyn ExternalEffect>> + Send + 'static,
172    ) {
173        self.overrides.push(OverrideExternalEffect::new(
174            remaining,
175            Box::new(move |effect| {
176                if effect.is::<T>() {
177                    // if this casting turns out to be a significant cost, we can split the
178                    // overrides by TypeId and run each in an appropriately typed closure
179                    #[expect(clippy::expect_used)]
180                    match transform(effect.cast::<T>().expect("checked above")) {
181                        OverrideResult::NoMatch(effect) => OverrideResult::NoMatch(effect as Box<dyn ExternalEffect>),
182                        OverrideResult::Handled(msg) => OverrideResult::Handled(msg),
183                        OverrideResult::Replaced(effect) => OverrideResult::Replaced(effect),
184                    }
185                } else {
186                    OverrideResult::NoMatch(effect)
187                }
188            }),
189        ));
190    }
191
192    /// Get the current simulation time.
193    pub fn now(&self) -> Instant {
194        self.clock.now()
195    }
196
197    /// Advance the clock to the next wakeup time.
198    ///
199    /// Returns `true` if wakeups were performed, `false` if there are no more wakeups or
200    /// the clock was advanced to the given `max_time`.
201    pub fn skip_to_next_wakeup(&mut self, mut max_time: Option<Instant>) -> bool {
202        // Get the runnables that can be woken up until max_time (everything if None)
203        // and run them.
204        // The last wakeup time becomes the new simulation time.
205        let mut tasks_run = false;
206        while let Some((t, r)) = self.scheduled.wakeup(max_time) {
207            if self.clock.now() < t {
208                self.clock.advance_to(t);
209                self.trace_buffer.lock().push_clock(t);
210            }
211            // limit further wakeups to the same time, i.e. the clock only advances once within this method
212            max_time = Some(t);
213            r(self);
214            tasks_run = true;
215        }
216
217        if !tasks_run && let Some(t) = max_time {
218            self.clock.advance_to(t);
219            self.trace_buffer.lock().push_clock(t);
220        }
221
222        tasks_run
223    }
224
225    pub fn next_wakeup(&self) -> Option<Instant> {
226        self.scheduled.next_wakeup_time()
227    }
228
229    fn schedule_wakeup(&mut self, id: ScheduleId, wakeup: impl FnOnce(&mut SimulationRunning) + Send + 'static) {
230        self.scheduled.schedule(id, Box::new(wakeup));
231    }
232
233    /// Place messages in the given stage’s mailbox, but don’t resume it.
234    /// The next message will be consumed when resuming an [`Effect::Receive`]
235    /// for this stage.
236    ///
237    /// # Panics
238    ///
239    /// Panics if the stage name does not exist (which may also happen due to termination).
240    pub fn enqueue_msg<Msg: SendData>(&mut self, sr: impl AsRef<StageRef<Msg>>, msg: impl IntoIterator<Item = Msg>) {
241        for msg in msg.into_iter() {
242            let ok = deliver_message(
243                &mut self.stages,
244                self.mailbox_size,
245                sr.as_ref().name().clone(),
246                Box::new(msg) as Box<dyn SendData>,
247            );
248            if matches!(ok, DeliverMessageResult::Full(..)) {
249                panic!("stage `{}` mailbox is full", sr.as_ref().name());
250            }
251        }
252    }
253
254    /// Retrieve the number of messages currently in the given stage’s mailbox.
255    ///
256    /// # Panics
257    ///
258    /// Panics if the stage name does not exist (which may also happen due to termination).
259    pub fn mailbox_len<Msg>(&self, sr: impl AsRef<StageRef<Msg>>) -> usize {
260        let data = self.stages.get(sr.as_ref().name()).assert_stage("which has no mailbox");
261        data.mailbox.len()
262    }
263
264    /// Obtain a reference to the current state of the given stage.
265    /// This only works while the stage is suspended on an [`Effect::Receive`]
266    /// because otherwise the state is captured by the opaque `Future` returned
267    /// from the state transition function.
268    ///
269    /// Returns `None` if the stage is not suspended on [`Effect::Receive`], panics if the
270    /// state type is incorrect.
271    ///
272    /// # Panics
273    ///
274    /// Panics if the stage name does not exist (which may also happen due to termination).
275    pub fn get_state<Msg, St: SendData>(&self, sr: &StageStateRef<Msg, St>) -> Option<&St> {
276        let data = self.stages.get(sr.name()).assert_stage("which has no state");
277        match &data.state {
278            StageState::Idle(state) => Some(state.cast_ref::<St>().expect("internal state type error")),
279            _ => None,
280        }
281    }
282
283    /// Assert that a simulation step can be taken, take it and return the resulting effect.
284    pub fn effect(&mut self) -> Effect {
285        self.try_effect().unwrap()
286    }
287
288    /// If any stage is runnable, run it and return the resulting effect; otherwise return
289    /// the classification of why no step can be taken (can be because the network is idle
290    /// and needs more inputs, it could be deadlocked, or a stage is still suspended on an
291    /// effect other than send (the latter case is called “busy” for want of a better term).
292    pub fn try_effect(&mut self) -> Result<Effect, Blocked> {
293        if self.runnable.is_empty() {
294            let reason = block_reason(self);
295            tracing::debug!("blocking for reason: {:?}", reason);
296            return Err(reason);
297        }
298        let (name, response) = self.eval_strategy.pick_runnable(&mut self.runnable);
299
300        tracing::debug!(name = %name, "resuming stage");
301        self.trace_buffer.lock().push_resume(&name, &response);
302
303        let data = self.stages.get_mut(&name).assert_stage("which is not runnable");
304
305        let effect =
306            poll_stage(&self.trace_buffer, &self.schedule_ids, data, name, response, &self.effect, self.clock.now());
307
308        if !matches!(effect, Effect::Receive { .. }) {
309            self.trace_buffer.lock().push_suspend(&effect);
310        }
311
312        Ok(effect)
313    }
314
315    /// Try to deliver external messages to stages that are waiting for them.
316    ///
317    /// Returns `InputsResult::Delivered(names)` if any messages were delivered,
318    /// or `InputsResult::Blocked(name)` if delivery is blocked because the given
319    /// stage's mailbox is full.
320    pub fn try_inputs(&mut self) -> InputsResult {
321        let mut delivered = Vec::new();
322        while let Some(mut envelope) = self.inputs.try_next() {
323            let msg = replace(&mut envelope.msg, Box::new(()));
324            match deliver_message(&mut self.stages, self.mailbox_size, envelope.name.clone(), msg) {
325                DeliverMessageResult::Delivered(_) => {
326                    delivered.push(envelope.name);
327                    envelope.tx.send(()).ok();
328                }
329                DeliverMessageResult::NotFound => {
330                    tracing::warn!(name = %envelope.name, msg = ?envelope.msg, "stage was terminated, skipping input delivery");
331                    envelope.tx.send(()).ok();
332                    continue; // stage was terminated
333                }
334                DeliverMessageResult::Full(_, msg) => {
335                    envelope.msg = msg;
336                    let name = envelope.name.clone();
337                    self.inputs.put_back(envelope);
338                    if delivered.is_empty() {
339                        return InputsResult::Blocked(name);
340                    } else {
341                        break;
342                    }
343                }
344            }
345        }
346        InputsResult::Delivered(delivered)
347    }
348
349    /// When external effects are currently unresolved, await either the resolution of an effect
350    /// or the arrival of a new external input message.
351    pub async fn await_external_effect(&mut self) -> Option<Name> {
352        if self.external_effects.is_empty() {
353            return None;
354        }
355        let (at_stage, result) = select! {
356            x = self.external_effects.next() => x?,
357            env = self.inputs.next() => {
358                self.inputs.put_back(env);
359                return None;
360            }
361        };
362
363        let runnable = &mut self.runnable;
364        let run = &mut |name, response| {
365            runnable.push_back((name, response));
366        };
367
368        let Some(data) = self.stages.get_mut(&at_stage) else {
369            tracing::warn!(name = %at_stage, "stage was terminated, skipping external effect delivery");
370            return Some(at_stage);
371        };
372        let data = data.assert_stage("which cannot receive external effects");
373        resume_external_internal(data, result, run).expect("external effect is always runnable");
374        Some(at_stage)
375    }
376
377    /// Wait for a message to be enqueued via an external input to the simulation.
378    pub async fn await_external_input(&mut self) {
379        let envelope = self.inputs.next().await;
380        tracing::debug!(target = %envelope.name, "awaited external input received");
381        self.inputs.put_back(envelope);
382    }
383
384    /// Keep alternating between [`Self::run_until_blocked`] and
385    /// [`Self::await_external_effect`] until the simulation is blocked
386    /// without waiting for external effects to be resolved.
387    pub fn run_until_blocked_incl_effects(&mut self, rt: &Handle) -> Blocked {
388        loop {
389            match self.run_until_sleeping_or_blocked() {
390                Blocked::Busy { .. } => {
391                    rt.block_on(self.await_external_effect());
392                }
393                Blocked::Sleeping { .. } => {
394                    assert!(self.skip_to_next_wakeup(None));
395                }
396                blocked => return blocked,
397            }
398        }
399    }
400
401    /// Keep on performing steps using [`Self::try_effect`] while possible and automatically
402    /// resume send and receive effects based on availability of space or messages in the
403    /// mailbox in question.
404    ///
405    /// See [`Self::run_until_sleeping_or_blocked`] for a variant that stops when the simulation is
406    /// waiting for a wakeup.
407    ///
408    /// When hitting a [`breakpoint`](Self::breakpoint), the simulation will return
409    /// `Blocked::Breakpoint`, which allows you to extract the effect in progress
410    /// using [`Blocked::assert_breakpoint`]. The result can later be passed to
411    /// [`Self::handle_effect`] to resume the stage in question.
412    ///
413    /// **NOTE** that `Receive` effects are implicitly attempted to be resumed after completing
414    /// a `Send` operation to that stage or whenever starting `run_until_*` and the stage's mailbox
415    /// is not empty.
416    pub fn run_until_blocked(&mut self) -> Blocked {
417        loop {
418            match self.run_until_sleeping_or_blocked() {
419                Blocked::Sleeping { .. } => assert!(self.skip_to_next_wakeup(None)),
420                blocked => return blocked,
421            }
422        }
423    }
424
425    pub fn run_until_blocked_or_time(&mut self, time: Instant) -> Blocked {
426        loop {
427            match self.run_until_sleeping_or_blocked() {
428                Blocked::Sleeping { next_wakeup } => {
429                    if !self.skip_to_next_wakeup(Some(time)) {
430                        return Blocked::Sleeping { next_wakeup };
431                    }
432                }
433                blocked => return blocked,
434            }
435        }
436    }
437
438    /// Keep on performing steps using [`Self::try_effect`] while possible and automatically
439    /// resume send and receive effects based on availability of space or messages in the
440    /// mailbox in question. It stops when the simulation is waiting for a wakeup.
441    ///
442    /// See [`Self::run_until_blocked`] for a variant that automatically advances
443    /// the clock.
444    pub fn run_until_sleeping_or_blocked(&mut self) -> Blocked {
445        self.receive_inputs();
446        loop {
447            if let Some(value) = self.run_effect() {
448                return value;
449            }
450        }
451    }
452
453    // TODO: shouldn’t this have a clock ceiling?
454    pub fn run_one_step(&mut self, rt: &Handle) -> Option<Blocked> {
455        self.receive_inputs();
456        match self.run_effect() {
457            Some(Blocked::Busy { .. }) => {
458                rt.block_on(self.await_external_effect());
459                None
460            }
461            Some(Blocked::Sleeping { .. }) => {
462                assert!(self.skip_to_next_wakeup(None));
463                None
464            }
465            other => other,
466        }
467    }
468
469    pub fn receive_inputs(&mut self) {
470        self.try_inputs();
471        let receiving = self
472            .stages
473            .iter()
474            .filter_map(|(n, d)| {
475                matches!(d, StageOrAdapter::Stage(StageData { waiting: Some(StageEffect::Receive), .. }))
476                    .then_some(n.clone())
477            })
478            .collect::<Vec<_>>();
479        for name in receiving {
480            // ignore all errors since this is a purely optimistic wake-up
481            resume_receive_internal(self, &name).ok();
482        }
483    }
484
485    fn run_effect(&mut self) -> Option<Blocked> {
486        let effect = match self.try_effect() {
487            Ok(effect) => effect,
488            Err(blocked) => return Some(blocked),
489        };
490
491        tracing::debug!(runnable = ?self.runnable.iter().map(|r| r.0.as_str()).collect::<Vec<&str>>(), effect = ?effect, "run effect");
492
493        for (name, predicate) in &self.breakpoints {
494            if (predicate)(&effect) {
495                tracing::info!("breakpoint `{}` hit: {:?}", name, effect);
496                return Some(Blocked::Breakpoint(name.clone(), effect));
497            }
498        }
499
500        self.handle_effect(effect)
501    }
502
503    /// Handle the given effect as it would be by [`Self::run_until_sleeping_or_blocked`].
504    /// This will resume the affected stage(s), it may involve multiple resumptions.
505    ///
506    /// Inputs to this method can be obtained from [`Self::effect`], [`Self::try_effect`]
507    /// or [`Blocked::assert_breakpoint`].
508    pub fn handle_effect(&mut self, effect: Effect) -> Option<Blocked> {
509        let runnable = &mut self.runnable;
510        let run = &mut |name, response| {
511            tracing::debug!(%name, ?response, "enqueuing stage");
512            runnable.push_back((name, response));
513        };
514
515        match effect {
516            Effect::Receive { at_stage: to } => {
517                match resume_receive_internal(self, &to) {
518                    Ok(true) => {}
519                    Ok(false) => {
520                        // nothing in the mailbox
521                        return None;
522                    }
523                    Err(err) => {
524                        tracing::warn!(%to, ?err, "cannot resume receive, shutting down simulation");
525                        let terminated =
526                            err.downcast::<resume::UnsupervisedChildTermination>().map(|e| e.0).unwrap_or(to);
527                        return Some(Blocked::Terminated(terminated));
528                    }
529                }
530                let Some(StageOrAdapter::Stage(data_to)) = self.stages.get_mut(&to) else {
531                    return None;
532                };
533                // resuming receive has removed one message from the mailbox, so check for blocked senders
534                let (from, msg) = data_to.senders.pop_front()?;
535                post_message(data_to, self.mailbox_size, msg);
536                let data_from = self
537                    .stages
538                    .get_mut(&from)
539                    .log_termination(&from)?
540                    .assert_stage("which cannot receive send effects");
541                resume_send_internal(
542                    data_from,
543                    &mut |name, response| {
544                        tracing::debug!(%name, ?response, "enqueuing stage");
545                        self.runnable.push_back((name, response));
546                    },
547                    to.clone(),
548                )
549                .expect("call is always runnable");
550            }
551            Effect::Send { from, to, .. } if to.is_empty() => {
552                tracing::info!(stage = %from, "message send to blackhole dropped");
553                let data_from =
554                    self.stages.get_mut(&from).log_termination(&from)?.assert_stage("which cannot emit send effects");
555                resume_send_internal(data_from, run, to.clone()).expect("call is always runnable");
556            }
557            Effect::Send { from, to, msg } => {
558                let is_call = self
559                    .stages
560                    .get(&from)
561                    .map(|d| {
562                        matches!(
563                            d,
564                            StageOrAdapter::Stage(StageData { waiting: Some(StageEffect::Send(_, Some(_), _)), .. })
565                        )
566                    })
567                    .unwrap_or_default();
568                if is_call {
569                    // sending stage is always resumed
570                    let data_from = self
571                        .stages
572                        .get_mut(&from)
573                        // if the stage was killed while waiting for its turn in sending this response
574                        // then the response is simply dropped and the call may time out
575                        .log_termination(&from)?
576                        .assert_stage("which cannot receive send effects");
577                    let id = resume_send_internal(data_from, run, to.clone()).expect("call is always runnable");
578                    if let Some(id) = id {
579                        self.scheduled.remove(&id);
580                    }
581                    let data_to = self.stages.get_mut(&to).log_termination(&to)?.assert_stage("which cannot call");
582                    // call response races with other responses and timeout, so failure to resume is okay
583                    resume_call_internal(data_to, run, id, msg).ok();
584                } else {
585                    let mb = self.mailbox_size;
586                    let resume = match deliver_message(&mut self.stages, mb, to.clone(), msg) {
587                        DeliverMessageResult::Delivered(data_to) => {
588                            // `to` may not be suspended on receive, so failure to resume is okay
589                            let name = data_to.name.clone();
590                            if let Err(err) = resume_receive_internal(self, &name) {
591                                tracing::warn!(%from, %to, ?err, "cannot deliver send, shutting down simulation");
592                                let terminated =
593                                    err.downcast::<resume::UnsupervisedChildTermination>().map(|e| e.0).unwrap_or(name);
594                                return Some(Blocked::Terminated(terminated));
595                            }
596                            Some(from)
597                        }
598                        DeliverMessageResult::Full(data_to, send_data) => {
599                            data_to.senders.push_back((from, send_data));
600                            None
601                        }
602                        DeliverMessageResult::NotFound => {
603                            tracing::debug!(stage = %to, "message send to terminated stage dropped");
604                            Some(from)
605                        }
606                    };
607                    if let Some(from) = resume {
608                        let data_from =
609                            self.stages.get_mut(&from).log_termination(&from)?.assert_stage("which cannot have sent");
610                        resume_send_internal(
611                            data_from,
612                            &mut |name, response| {
613                                tracing::debug!(%name, ?response, "enqueuing stage");
614                                self.runnable.push_back((name, response));
615                            },
616                            to.clone(),
617                        )
618                        .expect("call is always runnable");
619                    }
620                }
621            }
622            Effect::Call { from, to, duration: _, msg } => {
623                if let Err(err) = resume_call_send_internal(self, from.clone(), to.clone(), msg) {
624                    tracing::warn!(%from, %to, %err, "couldn’t deliver call effect");
625                    return Some(Blocked::Terminated(from));
626                }
627            }
628            Effect::Clock { at_stage } => {
629                let data = self
630                    .stages
631                    .get_mut(&at_stage)
632                    .log_termination(&at_stage)?
633                    .assert_stage("which cannot ask for the clock");
634                resume_clock_internal(data, run, self.clock.now()).expect("clock effect is always runnable");
635            }
636            Effect::Wait { at_stage, duration } => {
637                let now = self.clock.now();
638                let id = self.schedule_ids.next_at(now + duration);
639                self.schedule_wakeup(id, move |sim| {
640                    let Some(data) = sim.stages.get_mut(&at_stage) else {
641                        tracing::warn!(name = %at_stage, "stage was terminated, skipping wait effect delivery");
642                        return;
643                    };
644                    resume_wait_internal(
645                        data.assert_stage("which cannot wait"),
646                        &mut |name, response| {
647                            tracing::debug!(%name, ?response, "enqueuing stage");
648                            sim.runnable.push_back((name, response));
649                        },
650                        sim.clock.now(),
651                    )
652                    .expect("wait effect is always runnable");
653                });
654            }
655            Effect::Schedule { at_stage, msg, id } => {
656                let data =
657                    self.stages.get_mut(&at_stage).log_termination(&at_stage)?.assert_stage("which cannot schedule");
658                resume_schedule_internal(data, run, id).expect("schedule effect is always runnable");
659                // Now schedule the wakeup (after run is dropped)
660                let now = self.clock.now();
661                if id.time() > now {
662                    // Schedule wakeup
663                    self.schedule_wakeup(id, {
664                        move |sim| {
665                            let _ = deliver_message(&mut sim.stages, sim.mailbox_size, at_stage, msg);
666                        }
667                    });
668                } else {
669                    // Send immediately
670                    let _ = deliver_message(&mut self.stages, self.mailbox_size, at_stage, msg);
671                }
672            }
673            Effect::CancelSchedule { at_stage, id } => {
674                let cancelled = self.scheduled.remove(&id).is_some();
675                let data = self
676                    .stages
677                    .get_mut(&at_stage)
678                    .log_termination(&at_stage)?
679                    .assert_stage("which cannot cancel schedule");
680                resume_cancel_schedule_internal(data, run, cancelled)
681                    .expect("cancel_schedule effect is always runnable");
682            }
683            Effect::External { at_stage, mut effect } => {
684                let mut result = None;
685                for idx in 0..self.overrides.len() {
686                    let over = &mut self.overrides[idx];
687                    match over.transform(effect) {
688                        OverrideResult::NoMatch(effect2) => {
689                            effect = effect2;
690                        }
691                        OverrideResult::Handled(msg) => {
692                            result = Some(msg);
693                            // dummy effect value since we moved out of `effect` and need it later in the other case
694                            effect = Box::new(());
695                            if over.register_use_and_get_removal() {
696                                self.overrides.remove(idx);
697                            }
698                            break;
699                        }
700                        OverrideResult::Replaced(effect2) => {
701                            effect = effect2;
702                            if over.register_use_and_get_removal() {
703                                self.overrides.remove(idx);
704                            }
705                            break;
706                        }
707                    }
708                }
709                if let Some(result) = result {
710                    let data = self
711                        .stages
712                        .get_mut(&at_stage)
713                        .log_termination(&at_stage)?
714                        .assert_stage("which cannot receive external effects");
715                    resume_external_internal(data, result, run).expect("external effect is always runnable");
716                    return None;
717                }
718                let resources = self.resources.clone();
719                self.external_effects.push(Box::pin(async move { (at_stage, effect.run(resources).await) }));
720            }
721            Effect::Terminate { at_stage } => {
722                tracing::info!(stage = %at_stage, "terminated");
723                let (supervised_by, msg) = self.terminate_stage(at_stage.clone(), TerminationReason::Voluntary)?;
724                if supervised_by == *BLACKHOLE_NAME {
725                    // top-level stage terminated, terminate the simulation
726                    self.terminate.send_replace(true);
727                    return Some(Blocked::Terminated(at_stage));
728                }
729                let supervisor = self.stages.get_mut(&supervised_by).assert_stage("which cannot supervise");
730                supervisor.tombstones.push_back(msg);
731                if let Err(err) = resume_receive_internal(self, &supervised_by) {
732                    tracing::warn!(%supervised_by, ?err, "shutting down simulation");
733                    let terminated =
734                        err.downcast::<resume::UnsupervisedChildTermination>().map(|e| e.0).unwrap_or(supervised_by);
735                    return Some(Blocked::Terminated(terminated));
736                }
737            }
738            Effect::AddStage { at_stage, name } => {
739                let name = stage_name(&mut self.stage_count, name.as_str());
740                let data =
741                    self.stages.get_mut(&at_stage).log_termination(&at_stage)?.assert_stage("which cannot add a stage");
742                resume_add_stage_internal(data, run, name).expect("add stage effect is always runnable");
743            }
744            Effect::WireStage { at_stage, name, initial_state, tombstone } => {
745                self.trace_buffer.lock().push_state(&name, &initial_state);
746                let data = self
747                    .stages
748                    .get_mut(&at_stage)
749                    .log_termination(&at_stage)?
750                    .assert_stage("which cannot wire a stage");
751                let transition = resume_wire_stage_internal(data, run).expect("wire stage effect is always runnable");
752                let tombstone = tombstone.try_cast::<CanSupervise>().err();
753                self.stages.insert(
754                    name.clone(),
755                    StageOrAdapter::Stage(StageData {
756                        name,
757                        mailbox: VecDeque::new(),
758                        tombstones: VecDeque::new(),
759                        state: StageState::Idle(initial_state),
760                        transition: (transition)(self.effect.clone()),
761                        waiting: Some(StageEffect::Receive),
762                        senders: VecDeque::new(),
763                        supervised_by: at_stage,
764                        tombstone,
765                    }),
766                );
767            }
768            Effect::Contramap { at_stage, original, new_name } => {
769                let name = stage_name(&mut self.stage_count, new_name.as_str());
770                let data = self.stages.get_mut(&at_stage).assert_stage("which cannot call contramap");
771                let transform = resume_contramap_internal(data, run, original.clone(), name.clone())
772                    .expect("contramap effect is always runnable");
773                self.stages
774                    .insert(name.clone(), StageOrAdapter::Adapter(Adapter { name, target: original, transform }));
775            }
776        }
777        None
778    }
779
780    /// Recursively terminate the given stage and all its children.
781    ///
782    /// This also cleans up the state of all terminated stages in the simulation,
783    /// like run queue and sleeping message senders.
784    fn terminate_stage(
785        &mut self,
786        at_stage: Name,
787        reason: TerminationReason,
788    ) -> Option<(Name, Result<Box<dyn SendData>, Name>)> {
789        // TODO(network):
790        // - add kill switch to scheduled external effects to terminate them
791        // - record source stage for scheduled messages to remove them
792
793        let Some(data) = self.stages.get_mut(&at_stage) else {
794            tracing::warn!(name = %at_stage, "stage was already terminated, skipping terminate stage effect");
795            return None;
796        };
797        let data = data.assert_stage("which cannot terminate");
798
799        // parent state is dropped before the children, but pure-stage states are just dumb data
800        // anyway, so this should usually be what we want
801        data.state = StageState::Terminating;
802
803        // clean up simulation state for this stage
804        self.runnable.retain(|(n, _)| n != &at_stage);
805
806        let runnable = &mut self.runnable;
807        let run = &mut |name, response| {
808            tracing::debug!(%name, ?response, "enqueuing stage");
809            runnable.push_back((name, response));
810        };
811        let senders = std::mem::take(&mut data.senders);
812        for (waiting, _) in senders {
813            let data = self.stages.get_mut(&waiting).assert_stage("which cannot send");
814            if let Err(err) = resume_send_internal(data, run, at_stage.clone()) {
815                tracing::error!(from = %waiting, to = %at_stage, %err, "failed to resume send");
816                continue;
817            };
818        }
819
820        let children = self
821            .stages
822            .iter()
823            .filter(|(_, d)| {
824                matches!(d, StageOrAdapter::Stage(StageData { supervised_by, .. })
825                    if supervised_by == &at_stage)
826            })
827            .map(|(n, _)| n.clone())
828            .collect::<Vec<_>>();
829        for child in children {
830            tracing::info!(stage = %child, parent = %at_stage, "terminating child stage");
831            self.terminate_stage(child, TerminationReason::Aborted);
832        }
833        self.trace_buffer.lock().push_terminated(&at_stage, reason);
834        let Some(StageOrAdapter::Stage(stage)) = self.stages.remove(&at_stage) else {
835            unreachable!();
836        };
837        Some((stage.supervised_by, stage.tombstone.ok_or(at_stage)))
838    }
839
840    /// If a stage is Idle, it is waiting for Receive and NOT runnable.
841    /// If a stage is Running, it may be waiting for a non-Receive effect and may be runnable.
842    /// If a stage is Failed, it is not waiting for any effect and is not runnable.
843    /// A non-Failed stage is either waiting or runnable.
844    #[cfg(test)]
845    fn invariants(&self) {
846        for (name, data) in &self.stages {
847            let StageOrAdapter::Stage(data) = data else {
848                if self.runnable.iter().any(|(n, _)| n == name) {
849                    panic!("stage `{name}` is runnable but is an adapter");
850                }
851                continue;
852            };
853            let waiting = &data.waiting;
854            match &data.state {
855                StageState::Idle(_) => {
856                    if !matches!(waiting, Some(StageEffect::Receive)) {
857                        panic!("stage `{name}` is Idle but waiting for {waiting:?}");
858                    }
859                }
860                StageState::Running(_) => {
861                    if matches!(waiting, Some(StageEffect::Receive)) {
862                        panic!("stage `{name}` is Running but waiting for Receive");
863                    }
864                }
865                StageState::Terminating => {
866                    if waiting.is_some() {
867                        panic!("stage `{name}` is Terminating but waiting for {waiting:?}");
868                    }
869                    return;
870                }
871            }
872            let waiting = waiting.is_some();
873            let runnable = self.runnable.iter().any(|(n, _)| n == name);
874            if waiting && runnable {
875                panic!("stage `{name}` is waiting for an effect and runnable");
876            }
877            if !waiting && !runnable {
878                panic!("stage `{name}` is not waiting for an effect and not runnable");
879            }
880        }
881    }
882
883    /// Resume an [`Effect::Receive`].
884    pub fn resume_receive<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>) -> anyhow::Result<()> {
885        resume_receive_internal(self, at_stage.as_ref().name()).and_then(|resumed| {
886            if resumed { Ok(()) } else { Err(anyhow::anyhow!("stage was not waiting for a receive effect")) }
887        })
888    }
889
890    /// Resume an [`Effect::Send`].
891    ///
892    /// # Panics
893    ///
894    /// Panics if the stage name does not exist (which may also happen due to termination).
895    pub fn resume_send<Msg1, Msg2: SendData>(
896        &mut self,
897        from: impl AsRef<StageRef<Msg1>>,
898        to: impl AsRef<StageRef<Msg2>>,
899        mut msg: Option<Msg2>,
900    ) -> anyhow::Result<()> {
901        let to = to.as_ref();
902        if to.extra().is_none()
903            && let Some(msg) = msg.take()
904            && deliver_message(&mut self.stages, self.mailbox_size, to.name().clone(), Box::new(msg)).is_full()
905        {
906            anyhow::bail!("mailbox is full while resuming send");
907        }
908
909        let data = self.stages.get_mut(from.as_ref().name()).assert_stage("which cannot send");
910        let id = resume_send_internal(
911            data,
912            &mut |name, response| {
913                tracing::debug!(%name, ?response, "enqueuing stage");
914                self.runnable.push_back((name, response));
915            },
916            to.name().clone(),
917        )?;
918
919        if let Some(id) = id
920            && let Some(msg) = msg
921        {
922            self.scheduled.remove(&id);
923            let data = self.stages.get_mut(to.name()).assert_stage("which cannot call");
924            resume_call_internal(
925                data,
926                &mut |name, response| {
927                    tracing::debug!(%name, ?response, "enqueuing stage");
928                    self.runnable.push_back((name, response));
929                },
930                Some(id),
931                Box::new(msg),
932            )?;
933        }
934
935        Ok(())
936    }
937
938    /// Resume an [`Effect::Clock`].
939    ///
940    /// # Panics
941    ///
942    /// Panics if the stage name does not exist (which may also happen due to termination).
943    pub fn resume_clock<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>, time: Instant) -> anyhow::Result<()> {
944        let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot ask for the clock");
945        resume_clock_internal(
946            data,
947            &mut |name, response| {
948                tracing::debug!(%name, ?response, "enqueuing stage");
949                self.runnable.push_back((name, response));
950            },
951            time,
952        )
953    }
954
955    /// Resume an [`Effect::Wait`].
956    ///
957    /// The given time is the clock when the stage wakes up.
958    ///
959    /// # Panics
960    ///
961    /// Panics if the stage name does not exist (which may also happen due to termination).
962    pub fn resume_wait<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>, time: Instant) -> anyhow::Result<()> {
963        let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot wait");
964        resume_wait_internal(
965            data,
966            &mut |name, response| {
967                tracing::debug!(%name, ?response, "enqueuing stage");
968                self.runnable.push_back((name, response));
969            },
970            time,
971        )
972    }
973
974    /// Resume the sending part of a [`Effect::Call`].
975    ///
976    /// # Panics
977    ///
978    /// Panics if the stage name does not exist (which may also happen due to termination).
979    pub fn resume_call_send<Msg: SendData, Msg2: SendData>(
980        &mut self,
981        from: impl AsRef<StageRef<Msg>>,
982        to: impl AsRef<StageRef<Msg2>>,
983        msg: Msg2,
984    ) -> anyhow::Result<()> {
985        resume_call_send_internal(self, from.as_ref().name().clone(), to.as_ref().name().clone(), Box::new(msg))
986            .and_then(
987                |resumed| {
988                    if resumed { Ok(()) } else { Err(anyhow::anyhow!("stage was not waiting for a call effect")) }
989                },
990            )
991    }
992
993    /// Resume an [`Effect::Call`].
994    ///
995    /// # Panics
996    ///
997    /// Panics if the stage name does not exist (which may also happen due to termination).
998    pub fn resume_call<Msg: SendData, Resp: SendData>(
999        &mut self,
1000        at_stage: impl AsRef<StageRef<Msg>>,
1001        msg: Resp,
1002    ) -> anyhow::Result<()> {
1003        let at_stage = at_stage.as_ref();
1004        let data = self.stages.get_mut(at_stage.name()).assert_stage("which cannot make a call");
1005        resume_call_internal(
1006            data,
1007            &mut |name, response| {
1008                tracing::debug!(%name, ?response, "enqueuing stage");
1009                self.runnable.push_back((name, response));
1010            },
1011            None,
1012            Box::new(msg),
1013        )
1014    }
1015
1016    /// Resume an [`Effect::External`].
1017    ///
1018    /// # Panics
1019    ///
1020    /// Panics if the stage name does not exist (which may also happen due to termination).
1021    pub fn resume_external_box(&mut self, at_stage: impl AsRef<Name>, result: Box<dyn SendData>) -> anyhow::Result<()> {
1022        let data = self.stages.get_mut(at_stage.as_ref()).assert_stage("which cannot receive external effects");
1023        resume_external_internal(data, result, &mut |name, response| {
1024            tracing::debug!(%name, ?response, "enqueuing stage");
1025            self.runnable.push_back((name, response));
1026        })
1027    }
1028
1029    /// Resume an [`Effect::External`].
1030    ///
1031    /// # Panics
1032    ///
1033    /// Panics if the stage name does not exist (which may also happen due to termination).
1034    pub fn resume_external<Eff: ExternalEffectAPI>(
1035        &mut self,
1036        at_stage: impl AsRef<Name>,
1037        result: Eff::Response,
1038    ) -> anyhow::Result<()> {
1039        let data = self.stages.get_mut(at_stage.as_ref()).assert_stage("which cannot receive external effects");
1040        resume_external_internal(data, Box::new(result), &mut |name, response| {
1041            tracing::debug!(%name, ?response, "enqueuing stage");
1042            self.runnable.push_back((name, response));
1043        })
1044    }
1045
1046    /// Resume an [`Effect::AddStage`].
1047    ///
1048    /// # Panics
1049    ///
1050    /// Panics if the stage name does not exist (which may also happen due to termination).
1051    pub fn resume_add_stage<Msg>(&mut self, at_stage: impl AsRef<StageRef<Msg>>, name: Name) -> anyhow::Result<()> {
1052        let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot add a stage");
1053        resume_add_stage_internal(
1054            data,
1055            &mut |name, response| {
1056                tracing::debug!(%name, ?response, "enqueuing stage");
1057                self.runnable.push_back((name, response));
1058            },
1059            name,
1060        )
1061    }
1062
1063    /// Resume an [`Effect::WireStage`].
1064    ///
1065    /// # Panics
1066    ///
1067    /// Panics if the stage name does not exist (which may also happen due to termination).
1068    pub fn resume_wire_stage<Msg>(
1069        &mut self,
1070        at_stage: impl AsRef<StageRef<Msg>>,
1071        name: Name,
1072        initial_state: Box<dyn SendData>,
1073        tombstone: Option<Box<dyn SendData>>,
1074    ) -> anyhow::Result<()> {
1075        let at_stage = at_stage.as_ref();
1076        let data = self.stages.get_mut(at_stage.name()).assert_stage("which cannot wire a stage");
1077        let transition = resume_wire_stage_internal(data, &mut |name, response| {
1078            tracing::debug!(%name, ?response, "enqueuing stage");
1079            self.runnable.push_back((name, response));
1080        })?;
1081
1082        self.stages.insert(
1083            name.clone(),
1084            StageOrAdapter::Stage(StageData {
1085                name,
1086                mailbox: VecDeque::new(),
1087                tombstones: VecDeque::new(),
1088                state: StageState::Idle(initial_state),
1089                transition: (transition)(self.effect.clone()),
1090                waiting: Some(StageEffect::Receive),
1091                senders: VecDeque::new(),
1092                supervised_by: at_stage.name().clone(),
1093                tombstone,
1094            }),
1095        );
1096        Ok(())
1097    }
1098
1099    /// Resume an [`Effect::Contramap`].
1100    pub fn resume_contramap<Msg>(
1101        &mut self,
1102        at_stage: impl AsRef<StageRef<Msg>>,
1103        original: Name,
1104        name: Name,
1105    ) -> anyhow::Result<()> {
1106        let data = self.stages.get_mut(at_stage.as_ref().name()).assert_stage("which cannot contramap");
1107        let transform = resume_contramap_internal(
1108            data,
1109            &mut |name, response| {
1110                tracing::debug!(%name, ?response, "enqueuing stage");
1111                self.runnable.push_back((name, response));
1112            },
1113            original.clone(),
1114            name.clone(),
1115        )?;
1116        self.stages.insert(name.clone(), StageOrAdapter::Adapter(Adapter { name, target: original, transform }));
1117        Ok(())
1118    }
1119}
1120
1121trait AssertStage<'a> {
1122    type Output: 'a;
1123    fn assert_stage(self, hint: &'static str) -> Self::Output
1124    where
1125        Self: 'a;
1126}
1127impl<'a> AssertStage<'a> for &'a mut StageOrAdapter<StageData> {
1128    type Output = &'a mut StageData;
1129    fn assert_stage(self, hint: &'static str) -> Self::Output {
1130        match self {
1131            StageOrAdapter::Stage(stage) => stage,
1132            StageOrAdapter::Adapter(_) => {
1133                panic!("stage is an adapter, {hint}")
1134            }
1135        }
1136    }
1137}
1138impl<'a> AssertStage<'a> for Option<&'a mut StageOrAdapter<StageData>> {
1139    type Output = &'a mut StageData;
1140    fn assert_stage(self, hint: &'static str) -> Self::Output {
1141        let this = match self {
1142            Some(this) => this,
1143            None => panic!("stage not found"),
1144        };
1145        match this {
1146            StageOrAdapter::Stage(stage) => stage,
1147            StageOrAdapter::Adapter(_) => {
1148                panic!("stage is an adapter, {hint}")
1149            }
1150        }
1151    }
1152}
1153impl<'a> AssertStage<'a> for Option<&'a StageOrAdapter<StageData>> {
1154    type Output = &'a StageData;
1155    fn assert_stage(self, hint: &'static str) -> Self::Output {
1156        let this = match self {
1157            Some(this) => this,
1158            None => panic!("stage not found"),
1159        };
1160        match this {
1161            StageOrAdapter::Stage(stage) => stage,
1162            StageOrAdapter::Adapter(_) => {
1163                panic!("stage is an adapter, {hint}")
1164            }
1165        }
1166    }
1167}
1168
1169trait LogTermination<'a> {
1170    type Output: 'a;
1171    fn log_termination(self, name: &Name) -> Self::Output
1172    where
1173        Self: 'a;
1174}
1175impl<'a> LogTermination<'a> for Option<&'a mut StageOrAdapter<StageData>> {
1176    type Output = Option<&'a mut StageOrAdapter<StageData>>;
1177    fn log_termination(self, name: &Name) -> Self::Output {
1178        if self.is_none() {
1179            tracing::warn!(%name, "stage was terminated, skipping effect handling");
1180        }
1181        self
1182    }
1183}
1184
1185impl StageGraphRunning for SimulationRunning {
1186    fn is_terminated(&self) -> bool {
1187        *self.termination.borrow()
1188    }
1189
1190    fn termination(&self) -> BoxFuture<'static, ()> {
1191        let mut rx = self.termination.clone();
1192        Box::pin(async move {
1193            rx.wait_for(|x| *x).await.ok();
1194        })
1195    }
1196}
1197
1198// module to make fields actually private
1199mod override_external_effect {
1200    use super::*;
1201
1202    pub struct OverrideExternalEffect {
1203        remaining: usize,
1204        transform: Box<
1205            dyn FnMut(Box<dyn ExternalEffect>) -> OverrideResult<Box<dyn ExternalEffect>, Box<dyn ExternalEffect>>
1206                + Send
1207                + 'static,
1208        >,
1209    }
1210
1211    /// The result of an override.
1212    ///
1213    /// This is used to determine what to do with an effect that has been passed to an override.
1214    pub enum OverrideResult<In, Out> {
1215        /// The effect was not handled and shall be passed to overrides installed later than this one.
1216        NoMatch(In),
1217        /// The effect was handled and the message shall be delivered to the stage as the result.
1218        Handled(Box<dyn SendData>),
1219        /// The effect was replaced by this new effect that will be run instead.
1220        Replaced(Out),
1221    }
1222
1223    impl OverrideExternalEffect {
1224        pub fn new(
1225            remaining: usize,
1226            transform: Box<
1227                dyn FnMut(Box<dyn ExternalEffect>) -> OverrideResult<Box<dyn ExternalEffect>, Box<dyn ExternalEffect>>
1228                    + Send
1229                    + 'static,
1230            >,
1231        ) -> Self {
1232            Self { remaining, transform }
1233        }
1234
1235        pub fn transform(
1236            &mut self,
1237            effect: Box<dyn ExternalEffect>,
1238        ) -> OverrideResult<Box<dyn ExternalEffect>, Box<dyn ExternalEffect>> {
1239            (self.transform)(effect)
1240        }
1241
1242        pub fn register_use_and_get_removal(&mut self) -> bool {
1243            if self.remaining == usize::MAX {
1244                return false;
1245            }
1246            self.remaining -= 1;
1247            self.remaining == 0
1248        }
1249    }
1250}
1251
1252#[derive(Debug, PartialEq, Eq)]
1253pub enum InputsResult {
1254    Delivered(Vec<Name>),
1255    Blocked(Name),
1256}
1257
1258fn block_reason(sim: &SimulationRunning) -> Blocked {
1259    debug_assert!(sim.runnable.is_empty(), "runnable must be empty");
1260    if sim
1261        .stages
1262        .values()
1263        .filter_map(|d| d.as_stage().and_then(|d| d.waiting.as_ref()))
1264        .all(|v| matches!(v, StageEffect::Receive))
1265    {
1266        if let Some(next_wakeup) = sim.next_wakeup() {
1267            return Blocked::Sleeping { next_wakeup };
1268        } else {
1269            return Blocked::Idle;
1270        }
1271    }
1272    let mut send = Vec::new();
1273    let mut busy = Vec::new();
1274    let mut sleep = Vec::new();
1275    for (k, v) in sim.stages.iter().filter_map(|(k, d)| d.as_stage().and_then(|d| d.waiting.as_ref()).map(|w| (k, w))) {
1276        match v {
1277            StageEffect::Send(name, None, _msg) => {
1278                send.push(SendBlock { from: k.clone(), to: name.clone(), is_call: false })
1279            }
1280            StageEffect::Receive => {}
1281            StageEffect::Wait(..) => sleep.push(k.clone()),
1282            StageEffect::Call(_, _, CallExtra::Scheduled(id)) if sim.scheduled.contains(id) => sleep.push(k.clone()),
1283            _ => busy.push(k.clone()),
1284        }
1285    }
1286
1287    if !busy.is_empty() {
1288        Blocked::Busy { stages: busy, external_effects: sim.external_effects.len() }
1289    } else if !sleep.is_empty() {
1290        Blocked::Sleeping { next_wakeup: sim.next_wakeup().expect("stages are waiting for a wait effect") }
1291    } else if !send.is_empty() {
1292        Blocked::Deadlock(send)
1293    } else {
1294        Blocked::Idle
1295    }
1296}
1297
1298/// Poll a stage and return the effect that should be run next.
1299///
1300/// It is used to poll a stage and return the effect that should be run next.
1301/// The `response` is the input with which the stage is resumed.
1302pub(crate) fn poll_stage(
1303    trace_buffer: &Arc<Mutex<TraceBuffer>>,
1304    schedule_ids: &ScheduleIds,
1305    data: &mut StageData,
1306    name: Name,
1307    response: StageResponse,
1308    effect: &EffectBox,
1309    now: Instant,
1310) -> Effect {
1311    let StageState::Running(pin) = &mut data.state else {
1312        panic!("runnable stage `{name}` is not running but {:?}", data.state);
1313    };
1314
1315    *effect.lock() = Some(Right(response));
1316    let result = pin.as_mut().poll(&mut Context::from_waker(Waker::noop()));
1317
1318    if let Poll::Ready(state) = result {
1319        trace_buffer.lock().push_state(&name, &state);
1320        data.state = StageState::Idle(state);
1321        data.waiting = Some(StageEffect::Receive);
1322        Effect::Receive { at_stage: name }
1323    } else {
1324        let stage_effect = match effect.lock().take() {
1325            Some(Left(effect)) => effect,
1326            Some(Right(response)) => {
1327                panic!("found response {response:?} instead of effect when polling stage `{name}`")
1328            }
1329            None => {
1330                panic!("stage `{name}` returned without awaiting any tracked effect")
1331            }
1332        };
1333        let (wait_effect, effect) = stage_effect.split(name.clone(), schedule_ids, now);
1334        if !matches!(wait_effect, StageEffect::Terminate) {
1335            data.waiting = Some(wait_effect);
1336        }
1337        effect
1338    }
1339}
1340
1341enum DeliverMessageResult<'a> {
1342    Delivered(&'a mut StageData),
1343    Full(&'a mut StageData, Box<dyn SendData>),
1344    NotFound,
1345}
1346
1347impl<'a> DeliverMessageResult<'a> {
1348    pub fn is_full(&self) -> bool {
1349        matches!(self, DeliverMessageResult::Full(..))
1350    }
1351}
1352
1353/// Deliver a message to a stage or adapter.
1354///
1355/// Returns `true` if the message was delivered, `false` if the target mailbox
1356/// does not exist, or `Err` if the mailbox is full.
1357fn deliver_message(
1358    stages: &mut BTreeMap<Name, StageOrAdapter<StageData>>,
1359    mailbox_size: usize,
1360    name: Name,
1361    msg: Box<dyn SendData>,
1362) -> DeliverMessageResult<'_> {
1363    let Some((data, msg)) = find_recipient(stages, name, Some(msg)) else {
1364        return DeliverMessageResult::NotFound;
1365    };
1366
1367    post_message(data, mailbox_size, msg)
1368}
1369
1370fn post_message(data: &mut StageData, mailbox_size: usize, msg: Box<dyn SendData>) -> DeliverMessageResult<'_> {
1371    if data.mailbox.len() >= mailbox_size {
1372        return DeliverMessageResult::Full(data, msg);
1373    }
1374    data.mailbox.push_back(msg);
1375    DeliverMessageResult::Delivered(data)
1376}
1377
1378#[test]
1379fn simulation_invariants() {
1380    use crate::StageGraph;
1381
1382    tracing_subscriber::fmt()
1383        .with_test_writer()
1384        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1385        .try_init()
1386        .ok();
1387
1388    #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
1389    struct Msg(Option<StageRef<()>>);
1390
1391    let mut network = crate::simulation::SimulationBuilder::default();
1392    let stage = network.stage("stage", async |_state, _msg: Msg, eff| {
1393        eff.send(&eff.me(), Msg(None)).await;
1394        eff.clock().await;
1395        eff.wait(std::time::Duration::from_secs(1)).await;
1396        eff.call(&eff.me(), std::time::Duration::from_secs(1), |cr| Msg(Some(cr))).await;
1397        true
1398    });
1399
1400    let stage = network.wire_up(stage, false);
1401    let mut sim = network.run();
1402
1403    #[expect(clippy::type_complexity)]
1404    let ops: [(
1405        Box<dyn Fn(&Effect) -> bool>,
1406        Box<dyn Fn(&mut SimulationRunning, &StageRef<Msg>) -> anyhow::Result<()>>,
1407        &'static str,
1408    ); _] = [
1409        (
1410            Box::new(|eff: &Effect| matches!(eff, Effect::Receive { .. })),
1411            Box::new(|sim, stage| sim.resume_receive(stage)),
1412            "resume_receive",
1413        ),
1414        (
1415            Box::new(|eff: &Effect| matches!(eff, Effect::Send { .. })),
1416            Box::new(|sim, stage| sim.resume_send(stage, stage, Some(Msg(None)))),
1417            "resume_send",
1418        ),
1419        (
1420            Box::new(|eff: &Effect| matches!(eff, Effect::Clock { .. })),
1421            Box::new(|sim, stage| sim.resume_clock(stage, Instant::now())),
1422            "resume_clock",
1423        ),
1424        (
1425            Box::new(|eff: &Effect| matches!(eff, Effect::Wait { .. })),
1426            Box::new(|sim, stage| sim.resume_wait(stage, Instant::now())),
1427            "resume_wait",
1428        ),
1429        (
1430            Box::new(|eff: &Effect| matches!(eff, Effect::Call { .. })),
1431            Box::new(|sim, stage| sim.resume_call(stage, ())),
1432            "resume_call",
1433        ),
1434    ];
1435
1436    sim.invariants();
1437    sim.enqueue_msg(&stage, [Msg(None)]);
1438    sim.invariants();
1439
1440    for idx in 0..ops.len() {
1441        let effect = if idx == 0 { Effect::Receive { at_stage: "stage".into() } } else { sim.effect() };
1442        tracing::info!(effect = ?effect, "effect");
1443        assert!(ops[idx].0(&effect), "effect {effect:?} should match predicate for `{idx}`");
1444        for (pred, op, name) in &ops {
1445            if !pred(&effect) {
1446                tracing::info!("op `{}` should not work", name);
1447                op(&mut sim, &stage.clone().without_state()).unwrap_err();
1448                sim.invariants();
1449            }
1450        }
1451        for (pred, op, name) in &ops {
1452            if pred(&effect) {
1453                tracing::info!("op `{}` should work", name);
1454                op(&mut sim, &stage.clone().without_state()).unwrap();
1455                sim.invariants();
1456            }
1457        }
1458    }
1459    tracing::info!("final invariants");
1460    sim.effect().assert_receive(&stage);
1461    let state = sim.get_state(&stage).unwrap();
1462    assert!(state);
1463}