Skip to main content

pure_stage/
effect.rs

1#![expect(clippy::expect_used)]
2// Copyright 2025 PRAGMA
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(not(target_arch = "riscv32"))]
17use std::sync::atomic::AtomicU64;
18use std::{
19    any::{Any, type_name},
20    borrow::Borrow,
21    fmt::{Debug, Display, Formatter},
22    future,
23    marker::PhantomData,
24    sync::Arc,
25    time::Duration,
26};
27
28use cbor4ii::{core::Value, serde::from_slice};
29use futures_util::FutureExt;
30use parking_lot::Mutex;
31#[cfg(target_arch = "riscv32")]
32use parking_lot::Mutex;
33use serde::de::DeserializeOwned;
34
35use crate::{
36    BLACKHOLE_NAME, BoxFuture, Instant, Name, Resources, ScheduleId, SendData, StageBuildRef, StageRef,
37    effect_box::{EffectBox, airlock_effect},
38    serde::{NoDebug, SendDataValue, never, to_cbor},
39    simulation::Transition,
40    time::Clock,
41    trace_buffer::{TraceBuffer, find_next_external_resume, find_next_external_suspend},
42};
43
44/// A handle for performing effects on the current stage.
45///
46/// This is used to send messages to other stages, wait for durations, and call other stages.
47///
48/// The [`StageRef`] is used to obtain a reference to the current stage, which can be used
49/// in messages sent to other stages.
50pub struct Effects<M> {
51    me: StageRef<M>,
52    effect: EffectBox,
53    clock: Arc<dyn Clock + Send + Sync>,
54    resources: Resources,
55    schedule_ids: ScheduleIds,
56    trace_buffer: Arc<Mutex<TraceBuffer>>,
57}
58
59impl<M> Clone for Effects<M> {
60    fn clone(&self) -> Self {
61        Self {
62            me: self.me.clone(),
63            effect: self.effect.clone(),
64            schedule_ids: self.schedule_ids.clone(),
65            clock: self.clock.clone(),
66            resources: self.resources.clone(),
67            trace_buffer: self.trace_buffer.clone(),
68        }
69    }
70}
71
72impl<M> Debug for Effects<M> {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("Effects").field("me", &self.me).field("effect", &self.effect).finish()
75    }
76}
77
78impl<M: SendData> Effects<M> {
79    pub(crate) fn new(
80        me: StageRef<M>,
81        effect: EffectBox,
82        clock: Arc<dyn Clock + Send + Sync>,
83        resources: Resources,
84        schedule_ids: ScheduleIds,
85        trace_buffer: Arc<Mutex<TraceBuffer>>,
86    ) -> Self {
87        Self { me, effect, schedule_ids, clock, resources, trace_buffer }
88    }
89
90    /// Obtain a reference to the current stage.
91    ///
92    /// This is useful for sending to other stages that may want to send or call back.
93    pub fn me(&self) -> StageRef<M> {
94        self.me.clone()
95    }
96
97    /// Obtain a reference to the current stage.
98    ///
99    /// Returns a borrowed reference without cloning. Use this e.g. when sending a
100    /// message to the current stage. For owned access, see [`me()`](Self::me).
101    pub fn me_ref(&self) -> &StageRef<M> {
102        &self.me
103    }
104}
105
106#[derive(Debug, Clone)]
107pub struct ScheduleIds {
108    #[cfg(not(target_arch = "riscv32"))]
109    counter: Arc<AtomicU64>,
110    #[cfg(target_arch = "riscv32")]
111    counter: Arc<parking_lot::Mutex<u64>>,
112}
113
114impl Default for ScheduleIds {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl ScheduleIds {
121    pub fn new() -> Self {
122        Self {
123            #[cfg(not(target_arch = "riscv32"))]
124            counter: Arc::new(AtomicU64::new(0)),
125            #[cfg(target_arch = "riscv32")]
126            counter: Arc::new(parking_lot::Mutex::new(0)),
127        }
128    }
129
130    pub fn next_at(&self, instant: Instant) -> ScheduleId {
131        #[cfg(not(target_arch = "riscv32"))]
132        let id = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
133        #[cfg(target_arch = "riscv32")]
134        let id = {
135            let mut guard = self.counter.lock();
136            let id = *guard;
137            *guard += 1;
138            id
139        };
140        ScheduleId::new(id, instant)
141    }
142}
143
144#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
145pub(crate) struct CallTimeout;
146
147impl<M> Effects<M> {
148    /// Send a message to the given stage, blocking the current stage until space has been
149    /// made available in the target stage’s send queue.
150    pub fn send<Msg: SendData>(&self, target: &StageRef<Msg>, msg: Msg) -> BoxFuture<'static, ()> {
151        let call = target.extra().cloned();
152        airlock_effect(&self.effect, StageEffect::Send(target.name().clone(), call, Box::new(msg)), |_eff| Some(()))
153    }
154
155    /// Obtain the current simulation time.
156    pub fn clock(&self) -> BoxFuture<'static, Instant> {
157        airlock_effect(&self.effect, StageEffect::Clock, |eff| match eff {
158            Some(StageResponse::ClockResponse(instant)) => Some(instant),
159            _ => None,
160        })
161    }
162
163    /// Wait for the given duration.
164    pub fn wait(&self, duration: Duration) -> BoxFuture<'static, Instant> {
165        airlock_effect(&self.effect, StageEffect::Wait(duration), |eff| match eff {
166            Some(StageResponse::WaitResponse(instant)) => Some(instant),
167            _ => None,
168        })
169    }
170
171    /// Call the given stage, blocking the current stage until the response is received.
172    ///
173    /// The `msg` closure is called with a reference to the call effect, which can be used
174    /// to respond to the call.
175    ///
176    /// The returned future will resolve to `Some(resp)` if the call was successful, or `None`
177    /// if the call timed out.
178    ///
179    /// # Panics
180    ///
181    /// - If `target` is a call-context StageRef (i.e., carries an `extra()`), which would imply a nested call.
182    ///   This restriction may be lifted in the future.
183    // TODO(rkuhn): lift nested call restriction if/when needed.
184    #[expect(clippy::panic)]
185    #[track_caller]
186    pub fn call<Req: SendData, Resp: SendData + DeserializeOwned>(
187        &self,
188        target: &StageRef<Req>,
189        timeout: Duration,
190        msg: impl FnOnce(StageRef<Resp>) -> Req + Send + 'static,
191    ) -> BoxFuture<'static, Option<Resp>> {
192        if target.extra().is_some() {
193            panic!("cannot answer a call with a call ({} -> {})", self.me.name(), target.name());
194        }
195        let msg = Box::new(move |name: Name, extra: Arc<dyn Any + Send + Sync>| {
196            Box::new(msg(StageRef::new(name).with_extra(extra))) as Box<dyn SendData>
197        }) as CallFn;
198        airlock_effect(
199            &self.effect,
200            StageEffect::Call(target.name().clone(), timeout, CallExtra::CallFn(NoDebug::new(msg))),
201            |eff| match eff {
202                Some(StageResponse::CallResponse(resp)) => {
203                    if resp.typetag_name() == type_name::<CallTimeout>() {
204                        Some(None)
205                    } else {
206                        Some(Some(resp.cast_deserialize::<Resp>().expect("internal message type error")))
207                    }
208                }
209                _ => None,
210            },
211        )
212    }
213
214    /// Schedule a message to be sent to the given stage at a specific instant in the future.
215    ///
216    /// Returns a token that can be used to cancel the scheduled message using [`cancel_schedule`](Self::cancel_schedule).
217    ///
218    /// The scheduled message will be sent at the specified `when` instant. If `when` is in the past,
219    /// the message will be sent immediately.
220    pub fn schedule_at(&self, msg: M, when: Instant) -> BoxFuture<'static, ScheduleId>
221    where
222        M: SendData,
223    {
224        let id = self.schedule_ids.next_at(when);
225        airlock_effect(&self.effect, StageEffect::Schedule(Box::new(msg), id), move |eff| match eff {
226            Some(StageResponse::Unit) => Some(id),
227            _ => None,
228        })
229    }
230
231    /// Schedule a message to be sent to the given stage after a delay.
232    ///
233    /// Returns a token that can be used to cancel the scheduled message using [`cancel_schedule`](Self::cancel_schedule).
234    ///
235    /// The scheduled message will be sent after the specified `delay` duration from now.
236    pub fn schedule_after(&self, msg: M, delay: Duration) -> BoxFuture<'static, ScheduleId>
237    where
238        M: SendData,
239    {
240        let now = self.clock.now();
241        let when = now + delay;
242        self.schedule_at(msg, when)
243    }
244
245    /// Cancel a previously scheduled message.
246    ///
247    /// Returns `true` if the scheduled message was found and cancelled, or `false` if it was
248    /// not found (e.g., it was already sent or already cancelled).
249    pub fn cancel_schedule(&self, id: ScheduleId) -> BoxFuture<'static, bool> {
250        airlock_effect(&self.effect, StageEffect::CancelSchedule(id), |eff| match eff {
251            Some(StageResponse::CancelScheduleResponse(cancelled)) => Some(cancelled),
252            _ => None,
253        })
254    }
255
256    /// Run an effect that is not part of the StageGraph, as an asynchronous effect.
257    pub fn external<T: ExternalEffectAPI>(&self, effect: T) -> BoxFuture<'static, T::Response> {
258        airlock_effect(&self.effect, StageEffect::External(Box::new(effect)), |eff| match eff {
259            Some(StageResponse::ExternalResponse(resp)) => {
260                Some(resp.cast_deserialize::<T::Response>().expect("internal messaging type error"))
261            }
262            _ => None,
263        })
264    }
265
266    /// Run an effect that is not part of the StageGraph as a synchronous effect.
267    /// In that case we return the response directly.
268    #[allow(clippy::panic)]
269    pub fn external_sync<T: ExternalEffectSync + serde::Serialize>(&self, effect: T) -> T::Response {
270        self.trace_buffer.lock().push_suspend_external(self.me.name(), &effect);
271
272        let response = if let Some(fetch_replay) = self.trace_buffer.lock().fetch_replay_mut() {
273            let Some(replayed) = find_next_external_suspend(fetch_replay, self.me.name()) else {
274                panic!("no entry found in fetch replay");
275            };
276            let Effect::External { effect, .. } =
277                from_slice(&to_cbor(&Effect::External { at_stage: self.me.name().clone(), effect: Box::new(effect) }))
278                    .expect("internal replay error")
279            else {
280                panic!("serde roundtrip broken");
281            };
282
283            assert!(
284                replayed.test_eq(&*effect),
285                "replayed effect {replayed:?}\ndoes not match performed effect {effect:?}"
286            );
287            find_next_external_resume(fetch_replay, self.me.name())
288                .expect("no response found in fetch replay")
289                .cast_deserialize::<T::Response>()
290                .expect("internal messaging type error")
291        } else {
292            Box::new(effect)
293                .run(self.resources.clone())
294                .now_or_never()
295                .expect("an external sync effect must complete immediately in sync context")
296                .cast_deserialize::<T::Response>()
297                .expect("internal messaging type error")
298        };
299
300        self.trace_buffer.lock().push_resume_external(self.me.name(), &response);
301        response
302    }
303
304    /// Terminate this stage
305    ///
306    /// This will terminate this stage graph if done from a stage that was created before running the graph.
307    /// This future never resolves, so you can safely return the value to exit the transition function.
308    ///
309    /// Example:
310    ///
311    /// ```ignore
312    /// async |state, msg, eff| {
313    ///     if msg.is_fatal() {
314    ///         return eff.terminate().await;
315    ///     }
316    ///     // ...
317    ///     state
318    /// }
319    /// ```
320    pub fn terminate<T>(&self) -> BoxFuture<'static, T> {
321        airlock_effect(&self.effect, StageEffect::Terminate, |_eff| never())
322    }
323
324    #[expect(clippy::future_not_send)]
325    pub async fn stage<Msg, St, F, Fut>(
326        &self,
327        name: impl AsRef<str>,
328        mut f: F,
329    ) -> crate::StageBuildRef<Msg, St, (TransitionFactory, CanSupervise)>
330    where
331        F: FnMut(St, Msg, Effects<Msg>) -> Fut + 'static + Send,
332        Fut: Future<Output = St> + 'static + Send,
333        Msg: SendData + serde::de::DeserializeOwned,
334        St: SendData,
335    {
336        let name = Name::from(name.as_ref());
337
338        let name = airlock_effect(&self.effect, StageEffect::AddStage(name), |eff| match eff {
339            Some(StageResponse::AddStageResponse(name)) => Some(name),
340            _ => None,
341        })
342        .await;
343
344        let clock = self.clock.clone();
345        let resources = self.resources.clone();
346        let me = StageRef::new(name.clone());
347        let trace_buffer = self.trace_buffer.clone();
348        let schedule_ids = self.schedule_ids.clone();
349
350        let transition = move |effect: EffectBox| {
351            let eff = Effects::new(me, effect, clock, resources, schedule_ids, trace_buffer);
352            Box::new(move |state: Box<dyn SendData>, msg: Box<dyn SendData>| {
353                let state = state.cast::<St>().expect("internal state type error");
354                let msg = msg.cast_deserialize::<Msg>().expect("internal message type error");
355                let state = f(*state, msg, eff.clone());
356                Box::pin(async move { Box::new(state.await) as Box<dyn SendData> })
357                    as BoxFuture<'static, Box<dyn SendData>>
358            }) as Transition
359        };
360        let can_supervise = CanSupervise(name.clone());
361        crate::StageBuildRef { name, network: (Box::new(transition), can_supervise), _ph: PhantomData }
362    }
363
364    /// Supervise the given stage by sending the tombstone when it terminates.
365    ///
366    /// When an unsupervised child stage terminates, this stage will terminate as well.
367    pub fn supervise<Msg, St>(
368        &self,
369        stage: crate::StageBuildRef<Msg, St, (TransitionFactory, CanSupervise)>,
370        tombstone: M,
371    ) -> crate::StageBuildRef<Msg, St, (TransitionFactory, M)> {
372        let StageBuildRef { name, network, .. } = stage;
373
374        crate::StageBuildRef { name, network: (network.0, tombstone), _ph: PhantomData }
375    }
376
377    #[expect(clippy::future_not_send)]
378    pub async fn contramap<Original: SendData, Mapped: SendData>(
379        &self,
380        stage: impl AsRef<StageRef<Original>>,
381        new_name: impl AsRef<str>,
382        transform: impl Fn(Mapped) -> Original + Send + 'static,
383    ) -> StageRef<Mapped> {
384        let new_name = Name::from(new_name.as_ref());
385        let transform = Box::new(move |mapped: Box<dyn SendData>| {
386            let mapped = mapped.cast::<Mapped>().expect("internal message type error");
387            let original = transform(*mapped);
388            Box::new(original) as Box<dyn SendData>
389        });
390        let name = airlock_effect(
391            &self.effect,
392            StageEffect::Contramap {
393                original: stage.as_ref().name().clone(),
394                new_name,
395                transform: NoDebug::new(transform),
396            },
397            |eff| match eff {
398                Some(StageResponse::ContramapResponse(name)) => Some(name),
399                _ => None,
400            },
401        )
402        .await;
403        StageRef::new(name)
404    }
405
406    #[expect(clippy::future_not_send)]
407    pub async fn wire_up<Msg, St, T: SendData>(
408        &self,
409        stage: crate::StageBuildRef<Msg, St, (TransitionFactory, T)>,
410        state: St,
411    ) -> StageRef<Msg>
412    where
413        Msg: SendData + serde::de::DeserializeOwned,
414        St: SendData,
415    {
416        let StageBuildRef { name, network, _ph } = stage;
417        let (transition, tombstone) = network;
418
419        airlock_effect(
420            &self.effect,
421            StageEffect::WireStage(name.clone(), NoDebug::new(transition), Box::new(state), Box::new(tombstone)),
422            |eff| match eff {
423                Some(StageResponse::Unit) => Some(()),
424                _ => None,
425            },
426        )
427        .await;
428
429        StageRef::new(name)
430    }
431}
432
433#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
434pub struct CanSupervise(pub(crate) Name); // private field to prevent instantiation
435
436impl CanSupervise {
437    pub fn for_test() -> Self {
438        Self(BLACKHOLE_NAME.clone())
439    }
440}
441
442/// A trait for effects that are not part of the StageGraph.
443///
444/// The [`run`](ExternalEffect::run) method is used to perform the effect unless a
445/// simulator chooses differently. The latter can be done by downcasting to the concrete type.
446pub trait ExternalEffect: SendData {
447    /// Run the effect in production mode.
448    ///
449    /// Implementations typically retrieve shared services via typed lookups
450    /// (e.g., `resources.get::<Arc<MyStore>>()?`).
451    ///
452    /// This can be overridden in simulation using [`SimulationRunning::handle_effect`](crate::simulation::SimulationRunning::handle_effect).
453    fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>>;
454
455    /// Helper method for implementers of ExternalEffect.
456    fn wrap(
457        f: impl Future<Output = <Self as ExternalEffectAPI>::Response> + Send + 'static,
458    ) -> BoxFuture<'static, Box<dyn SendData>>
459    where
460        Self: Sized + ExternalEffectAPI,
461    {
462        Box::pin(async move {
463            let response = f.await;
464            Box::new(response) as Box<dyn SendData>
465        })
466    }
467
468    /// Helper method for implementers of ExternalEffect that have a synchronous response.
469    fn wrap_sync(response: <Self as ExternalEffectAPI>::Response) -> BoxFuture<'static, Box<dyn SendData>>
470    where
471        Self: Sized + ExternalEffectAPI,
472    {
473        Box::pin(future::ready(Box::new(response) as Box<dyn SendData>))
474    }
475}
476
477impl Display for dyn ExternalEffect {
478    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
479        let me = (self as &dyn SendData).as_send_data_value();
480        write!(f, "{}", me.borrow())
481    }
482}
483
484/// Separate trait for fixing the response type of an external effect.
485///
486/// This cannot be included in [`ExternalEffect`] because it would require a type parameter, which
487/// in turn would make that trait non-object-safe.
488pub trait ExternalEffectAPI: ExternalEffect {
489    type Response: SendData + DeserializeOwned;
490}
491
492/// Marker trait for external effects that have a synchronous response.
493///
494/// The [`ExternalEffect::run`](ExternalEffect::run) method must return a Future that resolves
495/// at the first call to `poll`. You can ensure this by not using `.await` or by using `.await`
496/// only on Futures that resolve at the first call to `poll`.
497pub trait ExternalEffectSync: ExternalEffectAPI {}
498
499impl dyn ExternalEffect {
500    pub fn is<T: ExternalEffect>(&self) -> bool {
501        (self as &dyn Any).is::<T>()
502    }
503
504    pub fn cast_ref<T: ExternalEffect>(&self) -> Option<&T> {
505        (self as &dyn Any).downcast_ref::<T>()
506    }
507
508    pub fn cast<T: ExternalEffect>(self: Box<Self>) -> anyhow::Result<Box<T>> {
509        if (&*self as &dyn Any).is::<T>() {
510            #[expect(clippy::expect_used)]
511            Ok(Box::new(*(self as Box<dyn Any>).downcast::<T>().expect("checked above")))
512        } else {
513            anyhow::bail!("external effect type error: expected {}, got {:?}", std::any::type_name::<T>(), self)
514        }
515    }
516}
517
518impl serde::Serialize for dyn ExternalEffect {
519    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
520    where
521        S: serde::Serializer,
522    {
523        (self as &dyn SendData).serialize(serializer)
524    }
525}
526
527impl ExternalEffect for () {
528    fn run(self: Box<Self>, _resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
529        Box::pin(async { Box::new(()) as Box<dyn SendData> })
530    }
531}
532
533impl ExternalEffectAPI for () {
534    type Response = ();
535}
536
537/// Generic deserialization result of external effects.
538///
539/// External effects are serialized as part of the trace but can only generically be deserialized.
540#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
541pub struct UnknownExternalEffect {
542    value: SendDataValue,
543}
544
545impl UnknownExternalEffect {
546    pub fn new(value: SendDataValue) -> Self {
547        Self { value }
548    }
549
550    pub fn value(&self) -> &Value {
551        &self.value.value
552    }
553
554    pub fn send_data_value(&self) -> &SendDataValue {
555        &self.value
556    }
557
558    pub fn cast<T: ExternalEffect + DeserializeOwned>(self) -> anyhow::Result<T> {
559        anyhow::ensure!(
560            self.value.typetag == type_name::<T>(),
561            "expected `{}`, got `{}`",
562            type_name::<T>(),
563            self.value.typetag
564        );
565        let bytes = to_cbor(&self.value.value);
566        Ok(from_slice(&bytes)?)
567    }
568
569    pub fn boxed<T: ExternalEffect>(value: &T) -> Box<dyn ExternalEffect> {
570        Box::new(Self::new(SendDataValue::new(value)))
571    }
572}
573
574impl ExternalEffect for UnknownExternalEffect {
575    fn run(self: Box<Self>, _resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
576        Box::pin(async { Box::new(()) as Box<dyn SendData> })
577    }
578}
579
580#[test]
581fn unknown_external_effect() {
582    #[derive(serde::Serialize, serde::Deserialize)]
583    struct Container(#[serde(with = "crate::serde::serialize_external_effect")] Box<dyn ExternalEffect>);
584
585    let output = crate::OutputEffect::new(Name::from("from"), 3.2, tokio::sync::mpsc::channel(1).0);
586    let container = Container(Box::new(output));
587    let bytes = to_cbor(&container);
588    let container2: Container = from_slice(&bytes).unwrap();
589    let output2 = *container2.0.cast::<UnknownExternalEffect>().unwrap();
590    let output2 = output2.cast::<crate::OutputEffect<f64>>().unwrap();
591    assert_eq!(output2.name, Name::from("from"));
592    assert_eq!(output2.msg, 3.2);
593}
594
595type CallFn = Box<dyn FnOnce(Name, Arc<dyn Any + Send + Sync>) -> Box<dyn SendData> + Send + 'static>;
596#[derive(Debug)]
597pub enum CallExtra {
598    CallFn(NoDebug<CallFn>),
599    Scheduled(ScheduleId),
600}
601
602/// An effect emitted by a stage (in which case T is `Box<dyn Message>`) or an effect
603/// upon whose resumption the stage waits (in which case T is `()`).
604#[derive(Debug)]
605pub enum StageEffect<T> {
606    Receive,
607    Send(Name, Option<Arc<dyn Any + Send + Sync>>, T),
608    Call(Name, Duration, CallExtra),
609    Clock,
610    Wait(Duration),
611    Schedule(T, ScheduleId),
612    CancelSchedule(ScheduleId),
613    External(Box<dyn ExternalEffect>),
614    Terminate,
615    AddStage(Name),
616    WireStage(Name, NoDebug<TransitionFactory>, T, T),
617    Contramap {
618        original: Name,
619        new_name: Name,
620        transform: NoDebug<Box<dyn Fn(Box<dyn SendData>) -> Box<dyn SendData> + Send + 'static>>,
621    },
622}
623
624pub type TransitionFactory = Box<dyn FnOnce(EffectBox) -> Transition + Send + 'static>;
625
626/// The response a stage receives from the execution of an effect.
627#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
628pub enum StageResponse {
629    Unit,
630    ClockResponse(Instant),
631    WaitResponse(Instant),
632    CallResponse(#[serde(with = "crate::serde::serialize_send_data")] Box<dyn SendData>),
633    CancelScheduleResponse(bool),
634    ExternalResponse(#[serde(with = "crate::serde::serialize_send_data")] Box<dyn SendData>),
635    AddStageResponse(Name),
636    ContramapResponse(Name),
637}
638
639impl StageResponse {
640    pub fn to_json(&self) -> serde_json::Value {
641        match self {
642            StageResponse::Unit => serde_json::json!({"type": "unit"}),
643            StageResponse::ClockResponse(instant) => serde_json::json!({
644                "type": "clock",
645                "instant": instant,
646            }),
647            StageResponse::WaitResponse(instant) => serde_json::json!({
648                "type": "wait",
649                "instant": instant,
650            }),
651            StageResponse::CallResponse(msg) => serde_json::json!({
652                "type": "call",
653                "msg": format!("{msg}"),
654            }),
655            StageResponse::CancelScheduleResponse(cancelled) => serde_json::json!({
656                "type": "cancel_schedule",
657                "cancelled": cancelled,
658            }),
659            StageResponse::ExternalResponse(msg) => serde_json::json!({
660                "type": "external",
661                "msg": format!("{msg}"),
662            }),
663            StageResponse::AddStageResponse(name) => serde_json::json!({
664                "type": "add_stage",
665                "name": name,
666            }),
667            StageResponse::ContramapResponse(name) => serde_json::json!({
668                "type": "contramap",
669                "name": name,
670            }),
671        }
672    }
673}
674
675impl Display for StageResponse {
676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677        match self {
678            StageResponse::Unit => write!(f, "()"),
679            StageResponse::ClockResponse(instant) => write!(f, "clock-{instant}"),
680            StageResponse::WaitResponse(instant) => write!(f, "wait-{instant}"),
681            StageResponse::CallResponse(msg) => {
682                write!(f, "{}", msg.as_send_data_value().borrow())
683            }
684            StageResponse::CancelScheduleResponse(cancelled) => {
685                write!(f, "cancel_schedule-{}", cancelled)
686            }
687            StageResponse::ExternalResponse(msg) => {
688                write!(f, "{}", msg.as_send_data_value().borrow())
689            }
690            StageResponse::AddStageResponse(name) => write!(f, "add_stage {name}"),
691            StageResponse::ContramapResponse(name) => write!(f, "contramap {name}"),
692        }
693    }
694}
695
696impl StageEffect<Box<dyn SendData>> {
697    /// Split this effect from the stage into two parts:
698    /// - the marker we remember in the running simulation
699    /// - the effect we emit to the outside world
700    pub(crate) fn split(self, at_name: Name, schedule_ids: &ScheduleIds, now: Instant) -> (StageEffect<()>, Effect) {
701        #[expect(clippy::panic)]
702        match self {
703            StageEffect::Receive => (StageEffect::Receive, Effect::Receive { at_stage: at_name }),
704            StageEffect::Send(name, call, msg) => {
705                (StageEffect::Send(name.clone(), call, ()), Effect::Send { from: at_name, to: name, msg })
706            }
707            StageEffect::Call(name, duration, msg) => {
708                let id = schedule_ids.next_at(now + duration);
709                let CallExtra::CallFn(msg) = msg else {
710                    panic!("expected CallFn, got {:?}", msg);
711                };
712                let msg = (msg.into_inner())(at_name.clone(), Arc::new(id));
713                (
714                    StageEffect::Call(name.clone(), duration, CallExtra::Scheduled(id)),
715                    Effect::Call { from: at_name, to: name, duration, msg },
716                )
717            }
718            StageEffect::Clock => (StageEffect::Clock, Effect::Clock { at_stage: at_name }),
719            StageEffect::Wait(duration) => (StageEffect::Wait(duration), Effect::Wait { at_stage: at_name, duration }),
720            StageEffect::Schedule(msg, id) => {
721                (StageEffect::Schedule((), id), Effect::Schedule { at_stage: at_name, msg, id })
722            }
723            StageEffect::CancelSchedule(id) => {
724                (StageEffect::CancelSchedule(id), Effect::CancelSchedule { at_stage: at_name, id })
725            }
726            StageEffect::External(effect) => {
727                (StageEffect::External(Box::new(())), Effect::External { at_stage: at_name, effect })
728            }
729            StageEffect::Terminate => (StageEffect::Terminate, Effect::Terminate { at_stage: at_name }),
730            StageEffect::AddStage(name) => {
731                (StageEffect::AddStage(name.clone()), Effect::AddStage { at_stage: at_name, name })
732            }
733            StageEffect::WireStage(name, transition, initial_state, tombstone) => (
734                StageEffect::WireStage(name.clone(), transition, (), ()),
735                Effect::WireStage { at_stage: at_name, name, initial_state, tombstone },
736            ),
737            StageEffect::Contramap { original, new_name, transform } => (
738                StageEffect::Contramap { original: original.clone(), new_name: new_name.clone(), transform },
739                Effect::Contramap { at_stage: at_name, original, new_name },
740            ),
741        }
742    }
743}
744
745/// An effect emitted by a stage.
746#[derive(Debug, serde::Serialize, serde::Deserialize)]
747pub enum Effect {
748    Receive {
749        at_stage: Name,
750    },
751    Send {
752        from: Name,
753        to: Name,
754        #[serde(with = "crate::serde::serialize_send_data")]
755        msg: Box<dyn SendData>,
756    },
757    Call {
758        from: Name,
759        to: Name,
760        duration: Duration,
761        #[serde(with = "crate::serde::serialize_send_data")]
762        msg: Box<dyn SendData>,
763    },
764    Clock {
765        at_stage: Name,
766    },
767    Wait {
768        at_stage: Name,
769        duration: Duration,
770    },
771    Schedule {
772        at_stage: Name,
773        #[serde(with = "crate::serde::serialize_send_data")]
774        msg: Box<dyn SendData>,
775        id: ScheduleId,
776    },
777    CancelSchedule {
778        at_stage: Name,
779        id: ScheduleId,
780    },
781    External {
782        at_stage: Name,
783        #[serde(with = "crate::serde::serialize_external_effect")]
784        effect: Box<dyn ExternalEffect>,
785    },
786    Terminate {
787        at_stage: Name,
788    },
789    AddStage {
790        at_stage: Name,
791        name: Name,
792    },
793    WireStage {
794        at_stage: Name,
795        name: Name,
796        #[serde(with = "crate::serde::serialize_send_data")]
797        initial_state: Box<dyn SendData>,
798        #[serde(with = "crate::serde::serialize_send_data")]
799        tombstone: Box<dyn SendData>,
800    },
801    Contramap {
802        at_stage: Name,
803        original: Name,
804        new_name: Name,
805    },
806}
807
808impl Effect {
809    pub fn to_json(&self) -> serde_json::Value {
810        match self {
811            Effect::Receive { at_stage } => serde_json::json!({
812                "type": "receive",
813                "at_stage": at_stage,
814            }),
815            Effect::Send { from, to, msg } => {
816                serde_json::json!({
817                    "type": "send",
818                    "from": from,
819                    "to": to,
820                    "msg": format!("{msg}"),
821                })
822            }
823            Effect::Call { from, to, duration, msg } => serde_json::json!({
824                "type": "call",
825                "from": from,
826                "to": to,
827                "duration": duration.as_millis(),
828                "msg": format!("{msg}"),
829            }),
830            Effect::Clock { at_stage } => serde_json::json!({
831                "type": "clock",
832                "at_stage": at_stage,
833            }),
834            Effect::Wait { at_stage, duration } => serde_json::json!({
835                "type": "wait",
836                "at_stage": at_stage,
837                "duration": duration.as_millis(),
838            }),
839            Effect::Schedule { at_stage, msg, id } => serde_json::json!({
840                "type": "schedule",
841                "at_stage": at_stage,
842                "msg": format!("{msg}"),
843                "id": format!("{:?}", id),
844            }),
845            Effect::CancelSchedule { at_stage, id } => serde_json::json!({
846                "type": "cancel_schedule",
847                "at_stage": at_stage,
848                "id": format!("{:?}", id),
849            }),
850            Effect::External { at_stage, effect } => {
851                let effect_type = effect
852                    .cast_ref::<UnknownExternalEffect>()
853                    .map(|e| e.send_data_value().typetag.as_str())
854                    .unwrap_or_else(|| effect.typetag_name());
855                serde_json::json!({
856                    "type": "external",
857                    "at_stage": at_stage,
858                    "effect": effect.to_string(),
859                    "effect_type": effect_type,
860                })
861            }
862            Effect::Terminate { at_stage } => serde_json::json!({
863                "type": "terminate",
864                "at_stage": at_stage,
865            }),
866            Effect::AddStage { at_stage, name } => serde_json::json!({
867                "type": "add_stage",
868                "at_stage": at_stage,
869                "name": name,
870            }),
871            Effect::WireStage { at_stage, name, initial_state, tombstone } => serde_json::json!({
872                "type": "wire_stage",
873                "at_stage": at_stage,
874                "name": name,
875                "initial_state": format!("{initial_state}"),
876                "tombstone": format!("{tombstone}"),
877            }),
878            Effect::Contramap { at_stage, original, new_name } => serde_json::json!({
879                "type": "contramap",
880                "at_stage": at_stage,
881                "original": original,
882                "new_name": new_name,
883            }),
884        }
885    }
886}
887
888impl Display for Effect {
889    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
890        match self {
891            Effect::Receive { at_stage } => write!(f, "receive {at_stage}"),
892            Effect::Send { from, to, msg } => {
893                write!(f, "send {from} -> {to}: {msg}",)
894            }
895            Effect::Call { from, to, duration, msg } => {
896                write!(f, "call {from} -> {to}: {duration:?} {msg}")
897            }
898            Effect::Clock { at_stage } => write!(f, "clock {at_stage}"),
899            Effect::Wait { at_stage, duration } => {
900                write!(f, "wait {at_stage}: {duration:?}")
901            }
902            Effect::Schedule { at_stage, msg, id } => write!(f, "schedule {at_stage} {id}: {msg}",),
903            Effect::CancelSchedule { at_stage, id } => {
904                write!(f, "cancel_schedule {at_stage} {id}")
905            }
906            Effect::External { at_stage, effect } => {
907                write!(f, "external {at_stage}: {effect}",)
908            }
909            Effect::Terminate { at_stage } => write!(f, "terminate {at_stage}"),
910            Effect::AddStage { at_stage, name } => write!(f, "add_stage {at_stage} {name}"),
911            Effect::WireStage { at_stage, name, initial_state, tombstone } => {
912                write!(f, "wire_stage {at_stage} {name} {initial_state} {tombstone}")
913            }
914            Effect::Contramap { at_stage, original, new_name } => {
915                write!(f, "contramap {at_stage} {original} -> {new_name}")
916            }
917        }
918    }
919}
920
921#[expect(clippy::wildcard_enum_match_arm, clippy::panic)]
922impl Effect {
923    /// Construct a send effect.
924    pub fn send(from: impl AsRef<str>, to: impl AsRef<str>, msg: Box<dyn SendData>) -> Self {
925        Self::Send { from: Name::from(from.as_ref()), to: Name::from(to.as_ref()), msg }
926    }
927
928    /// Construct a call effect.
929    pub fn call(from: impl AsRef<str>, to: impl AsRef<str>, duration: Duration, msg: Box<dyn SendData>) -> Self {
930        Self::Call { from: Name::from(from.as_ref()), to: Name::from(to.as_ref()), duration, msg }
931    }
932
933    /// Construct a clock effect.
934    pub fn clock(at_stage: impl AsRef<str>) -> Self {
935        Self::Clock { at_stage: Name::from(at_stage.as_ref()) }
936    }
937
938    /// Construct a wait effect.
939    pub fn wait(at_stage: impl AsRef<str>, duration: Duration) -> Self {
940        Self::Wait { at_stage: Name::from(at_stage.as_ref()), duration }
941    }
942
943    /// Construct a schedule effect.
944    pub fn schedule(at_stage: impl AsRef<str>, msg: Box<dyn SendData>, schedule_id: &ScheduleId) -> Self {
945        Self::Schedule { at_stage: Name::from(at_stage.as_ref()), msg, id: *schedule_id }
946    }
947
948    pub fn cancel(at_stage: impl AsRef<str>, schedule_id: &ScheduleId) -> Self {
949        Self::CancelSchedule { at_stage: Name::from(at_stage.as_ref()), id: *schedule_id }
950    }
951
952    /// Construct an external effect.
953    pub fn external(at_stage: impl AsRef<str>, effect: Box<dyn ExternalEffect>) -> Self {
954        Self::External { at_stage: Name::from(at_stage.as_ref()), effect }
955    }
956
957    /// Construct a terminate effect.
958    pub fn terminate(at_stage: impl AsRef<str>) -> Self {
959        Self::Terminate { at_stage: Name::from(at_stage.as_ref()) }
960    }
961
962    /// Construct an add stage effect.
963    pub fn add_stage(at_stage: impl AsRef<str>, name: impl AsRef<str>) -> Self {
964        Self::AddStage { at_stage: Name::from(at_stage.as_ref()), name: Name::from(name.as_ref()) }
965    }
966
967    /// Construct a wire stage effect.
968    pub fn wire_stage(
969        at_stage: impl AsRef<str>,
970        name: impl AsRef<str>,
971        initial_state: Box<dyn SendData>,
972        tombstone: Option<Box<dyn SendData>>,
973    ) -> Self {
974        Self::WireStage {
975            at_stage: Name::from(at_stage.as_ref()),
976            name: Name::from(name.as_ref()),
977            initial_state,
978            tombstone: tombstone.unwrap_or_else(|| SendDataValue::boxed(&CanSupervise(Name::from(name.as_ref())))),
979        }
980    }
981
982    pub fn contramap(at_stage: impl AsRef<str>, original: impl AsRef<str>, new_name: impl AsRef<str>) -> Self {
983        Self::Contramap {
984            at_stage: Name::from(at_stage.as_ref()),
985            original: Name::from(original.as_ref()),
986            new_name: Name::from(new_name.as_ref()),
987        }
988    }
989
990    /// Get the stage name of this effect.
991    pub fn at_stage(&self) -> &Name {
992        match self {
993            Effect::Receive { at_stage, .. } => at_stage,
994            Effect::Send { from, .. } => from,
995            Effect::Call { from, .. } => from,
996            Effect::Clock { at_stage, .. } => at_stage,
997            Effect::Wait { at_stage, .. } => at_stage,
998            Effect::Schedule { at_stage, .. } => at_stage,
999            Effect::CancelSchedule { at_stage, .. } => at_stage,
1000            Effect::External { at_stage, .. } => at_stage,
1001            Effect::Terminate { at_stage, .. } => at_stage,
1002            Effect::AddStage { at_stage, .. } => at_stage,
1003            Effect::WireStage { at_stage, .. } => at_stage,
1004            Effect::Contramap { at_stage, .. } => at_stage,
1005        }
1006    }
1007
1008    /// Assert that this effect is a receive effect.
1009    #[track_caller]
1010    pub fn assert_receive<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>) {
1011        let at_stage = at_stage.as_ref();
1012        match self {
1013            Effect::Receive { at_stage: a } if a == at_stage.name() => {}
1014            _ => panic!("unexpected effect {self:?}\n  looking for Receive at `{}`", at_stage.name()),
1015        }
1016    }
1017
1018    /// Assert that this effect is a send effect.
1019    #[expect(clippy::unwrap_used)]
1020    #[track_caller]
1021    pub fn assert_send<Msg1, Msg2: SendData + PartialEq>(
1022        &self,
1023        at_stage: impl AsRef<StageRef<Msg1>>,
1024        target: impl AsRef<StageRef<Msg2>>,
1025        msg: Msg2,
1026    ) {
1027        let at_stage = at_stage.as_ref();
1028        let target = target.as_ref();
1029        match self {
1030            Effect::Send { from, to, msg: m }
1031                if from == at_stage.name()
1032                    && to == target.name()
1033                    && (&**m as &dyn Any).downcast_ref::<Msg2>().unwrap() == &msg => {}
1034            _ => {
1035                panic!(
1036                    "unexpected effect {self:?}\n  looking for Send from `{}` to `{}` with msg {msg:?}",
1037                    at_stage.name(),
1038                    target.name()
1039                )
1040            }
1041        }
1042    }
1043
1044    /// Assert that this effect is a clock effect.
1045    #[track_caller]
1046    pub fn assert_clock<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>) {
1047        let at_stage = at_stage.as_ref();
1048        match self {
1049            Effect::Clock { at_stage: a } if a == at_stage.name() => {}
1050            _ => panic!("unexpected effect {self:?}\n  looking for Clock at `{}`", at_stage.name()),
1051        }
1052    }
1053
1054    /// Assert that this effect is a wait effect.
1055    #[track_caller]
1056    pub fn assert_wait<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>, duration: Duration) {
1057        let at_stage = at_stage.as_ref();
1058        match self {
1059            Effect::Wait { at_stage: a, duration: d } if a == at_stage.name() && d == &duration => {}
1060            _ => panic!(
1061                "unexpected effect {self:?}\n  looking for Wait at `{}` with duration {duration:?}",
1062                at_stage.name()
1063            ),
1064        }
1065    }
1066
1067    /// Assert that this effect is a call effect.
1068    #[track_caller]
1069    pub fn assert_call<Msg1, Msg2: SendData, Out>(
1070        self,
1071        at_stage: impl AsRef<StageRef<Msg1>>,
1072        target: impl AsRef<StageRef<Msg2>>,
1073        extract: impl FnOnce(Msg2) -> Out,
1074        duration: Duration,
1075    ) -> Out {
1076        let at_stage = at_stage.as_ref();
1077        let target = target.as_ref();
1078        match self {
1079            Effect::Call { from, to, duration: d, msg }
1080                if &from == at_stage.name() && &to == target.name() && d == duration =>
1081            {
1082                extract(*msg.cast::<Msg2>().expect("internal messaging type error"))
1083            }
1084            _ => panic!(
1085                "unexpected effect {self:?}\n  looking for Send from `{}` to `{}` with duration {duration:?}",
1086                at_stage.name(),
1087                target.name()
1088            ),
1089        }
1090    }
1091
1092    /// Assert that this effect is an external effect.
1093    #[track_caller]
1094    pub fn assert_external<Eff: ExternalEffect + PartialEq>(&self, at_stage: impl AsRef<Name>, effect: &Eff) {
1095        let at_stage = at_stage.as_ref();
1096        match self {
1097            Effect::External { at_stage: a, effect: e }
1098                if a == at_stage && &**e as &dyn SendData == effect as &dyn SendData => {}
1099            _ => panic!("unexpected effect {self:?}\n  looking for External at `{}` with effect {effect:?}", at_stage),
1100        }
1101    }
1102
1103    /// Extract the external effect from this effect.
1104    #[track_caller]
1105    pub fn extract_external<Eff: ExternalEffectAPI + PartialEq>(self, at_stage: impl AsRef<Name>) -> Box<Eff> {
1106        let at_stage = at_stage.as_ref();
1107        match self {
1108            Effect::External { at_stage: a, effect: e } if &a == at_stage =>
1109            {
1110                #[expect(clippy::unwrap_used)]
1111                e.cast::<Eff>().unwrap()
1112            }
1113            _ => panic!(
1114                "unexpected effect {self:?}\n  looking for External at `{}` with effect {}",
1115                at_stage,
1116                type_name::<Eff>()
1117            ),
1118        }
1119    }
1120
1121    /// Assert that this effect is an add stage effect.
1122    #[track_caller]
1123    pub fn assert_add_stage<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>, name: impl AsRef<str>) {
1124        let at_stage = at_stage.as_ref();
1125        match self {
1126            Effect::AddStage { at_stage: a, name: n } if a == at_stage.name() && n.as_str() == name.as_ref() => {}
1127            _ => panic!(
1128                "unexpected effect {self:?}\n  looking for AddStage at `{}` with name `{}`",
1129                at_stage.name(),
1130                name.as_ref()
1131            ),
1132        }
1133    }
1134
1135    /// Assert that this effect is a wire stage effect.
1136    #[track_caller]
1137    pub fn assert_wire_stage<Msg, St: SendData + PartialEq>(
1138        &self,
1139        at_stage: impl AsRef<StageRef<Msg>>,
1140        name: impl AsRef<str>,
1141        initial_state: St,
1142    ) {
1143        let at_stage = at_stage.as_ref();
1144        match self {
1145            Effect::WireStage { at_stage: a, name: n, initial_state: i, tombstone: _ }
1146                if a == at_stage.name()
1147                    && n.as_str() == name.as_ref()
1148                    && i.cast_ref::<St>().expect("type error") == &initial_state => {}
1149            _ => panic!(
1150                "unexpected effect {self:?}\n  looking for WireStage at `{}` with name `{}`",
1151                at_stage.name(),
1152                name.as_ref()
1153            ),
1154        }
1155    }
1156
1157    #[track_caller]
1158    pub fn extract_wire_stage<Msg, St: SendData + PartialEq>(
1159        &self,
1160        at_stage: impl AsRef<StageRef<Msg>>,
1161        initial_state: St,
1162    ) -> &Name {
1163        let at_stage = at_stage.as_ref();
1164        match self {
1165            Effect::WireStage { at_stage: a, name: n, initial_state: i, tombstone: _ }
1166                if a == at_stage.name() && i.cast_ref::<St>().expect("type error") == &initial_state =>
1167            {
1168                n
1169            }
1170            _ => {
1171                panic!(
1172                    "unexpected effect {self:?}\n  looking for WireStage at `{}` with initial state {initial_state:?}",
1173                    at_stage.name()
1174                )
1175            }
1176        }
1177    }
1178}
1179
1180impl PartialEq for Effect {
1181    #[expect(clippy::wildcard_enum_match_arm)]
1182    fn eq(&self, other: &Self) -> bool {
1183        match self {
1184            Effect::Receive { at_stage } => match other {
1185                Effect::Receive { at_stage: other_at_stage } => at_stage == other_at_stage,
1186                _ => false,
1187            },
1188            Effect::Send { from, to, msg } => match other {
1189                Effect::Send { from: other_from, to: other_to, msg: other_msg } => {
1190                    from == other_from && to == other_to && msg == other_msg
1191                }
1192                _ => false,
1193            },
1194            Effect::Call { from, to, duration, msg } => match other {
1195                Effect::Call { from: other_from, to: other_to, duration: other_duration, msg: other_msg } => {
1196                    from == other_from && to == other_to && duration == other_duration && msg == other_msg
1197                }
1198                _ => false,
1199            },
1200            Effect::Clock { at_stage } => match other {
1201                Effect::Clock { at_stage: other_at_stage } => at_stage == other_at_stage,
1202                _ => false,
1203            },
1204            Effect::Wait { at_stage, duration } => match other {
1205                Effect::Wait { at_stage: other_at_stage, duration: other_duration } => {
1206                    at_stage == other_at_stage && duration == other_duration
1207                }
1208                _ => false,
1209            },
1210            Effect::Schedule { at_stage, msg, id } => match other {
1211                Effect::Schedule { at_stage: other_at_stage, msg: other_msg, id: other_id } => {
1212                    at_stage == other_at_stage && msg == other_msg && id == other_id
1213                }
1214                _ => false,
1215            },
1216            Effect::CancelSchedule { at_stage, id } => match other {
1217                Effect::CancelSchedule { at_stage: other_at_stage, id: other_id } => {
1218                    at_stage == other_at_stage && id == other_id
1219                }
1220                _ => false,
1221            },
1222            Effect::External { at_stage, effect } => match other {
1223                Effect::External { at_stage: other_at_stage, effect: other_effect } => {
1224                    at_stage == other_at_stage && &**effect as &dyn SendData == &**other_effect as &dyn SendData
1225                }
1226                _ => false,
1227            },
1228            Effect::Terminate { at_stage } => match other {
1229                Effect::Terminate { at_stage: other_at_stage } => at_stage == other_at_stage,
1230                _ => false,
1231            },
1232            Effect::AddStage { at_stage, name } => match other {
1233                Effect::AddStage { at_stage: other_at_stage, name: other_name } => {
1234                    at_stage == other_at_stage && name == other_name
1235                }
1236                _ => false,
1237            },
1238            Effect::WireStage { at_stage, name, initial_state, tombstone } => match other {
1239                Effect::WireStage {
1240                    at_stage: other_at_stage,
1241                    name: other_name,
1242                    initial_state: other_initial_state,
1243                    tombstone: other_tombstone,
1244                } => {
1245                    at_stage == other_at_stage
1246                        && name == other_name
1247                        && initial_state == other_initial_state
1248                        && tombstone == other_tombstone
1249                }
1250                _ => false,
1251            },
1252            Effect::Contramap { at_stage, original, new_name } => match other {
1253                Effect::Contramap { at_stage: other_at_stage, original: other_original, new_name: other_new_name } => {
1254                    at_stage == other_at_stage && original == other_original && new_name == other_new_name
1255                }
1256                _ => false,
1257            },
1258        }
1259    }
1260}