1#![expect(clippy::expect_used)]
2#[cfg(not(target_arch = "riscv32"))]
17use std::sync::atomic::AtomicU64;
18use std::{
19 any::{Any, type_name},
20 borrow::Borrow,
21 fmt::{Debug, Display, Formatter},
22 future,
23 marker::PhantomData,
24 sync::Arc,
25 time::Duration,
26};
27
28use cbor4ii::{core::Value, serde::from_slice};
29use futures_util::FutureExt;
30use parking_lot::Mutex;
31#[cfg(target_arch = "riscv32")]
32use parking_lot::Mutex;
33use serde::de::DeserializeOwned;
34
35use crate::{
36 BLACKHOLE_NAME, BoxFuture, Instant, Name, Resources, ScheduleId, SendData, StageBuildRef, StageRef,
37 effect_box::{EffectBox, airlock_effect},
38 serde::{NoDebug, SendDataValue, never, to_cbor},
39 simulation::Transition,
40 time::Clock,
41 trace_buffer::{TraceBuffer, find_next_external_resume, find_next_external_suspend},
42};
43
44pub struct Effects<M> {
51 me: StageRef<M>,
52 effect: EffectBox,
53 clock: Arc<dyn Clock + Send + Sync>,
54 resources: Resources,
55 schedule_ids: ScheduleIds,
56 trace_buffer: Arc<Mutex<TraceBuffer>>,
57}
58
59impl<M> Clone for Effects<M> {
60 fn clone(&self) -> Self {
61 Self {
62 me: self.me.clone(),
63 effect: self.effect.clone(),
64 schedule_ids: self.schedule_ids.clone(),
65 clock: self.clock.clone(),
66 resources: self.resources.clone(),
67 trace_buffer: self.trace_buffer.clone(),
68 }
69 }
70}
71
72impl<M> Debug for Effects<M> {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("Effects").field("me", &self.me).field("effect", &self.effect).finish()
75 }
76}
77
78impl<M: SendData> Effects<M> {
79 pub(crate) fn new(
80 me: StageRef<M>,
81 effect: EffectBox,
82 clock: Arc<dyn Clock + Send + Sync>,
83 resources: Resources,
84 schedule_ids: ScheduleIds,
85 trace_buffer: Arc<Mutex<TraceBuffer>>,
86 ) -> Self {
87 Self { me, effect, schedule_ids, clock, resources, trace_buffer }
88 }
89
90 pub fn me(&self) -> StageRef<M> {
94 self.me.clone()
95 }
96
97 pub fn me_ref(&self) -> &StageRef<M> {
102 &self.me
103 }
104}
105
106#[derive(Debug, Clone)]
107pub struct ScheduleIds {
108 #[cfg(not(target_arch = "riscv32"))]
109 counter: Arc<AtomicU64>,
110 #[cfg(target_arch = "riscv32")]
111 counter: Arc<parking_lot::Mutex<u64>>,
112}
113
114impl Default for ScheduleIds {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl ScheduleIds {
121 pub fn new() -> Self {
122 Self {
123 #[cfg(not(target_arch = "riscv32"))]
124 counter: Arc::new(AtomicU64::new(0)),
125 #[cfg(target_arch = "riscv32")]
126 counter: Arc::new(parking_lot::Mutex::new(0)),
127 }
128 }
129
130 pub fn next_at(&self, instant: Instant) -> ScheduleId {
131 #[cfg(not(target_arch = "riscv32"))]
132 let id = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
133 #[cfg(target_arch = "riscv32")]
134 let id = {
135 let mut guard = self.counter.lock();
136 let id = *guard;
137 *guard += 1;
138 id
139 };
140 ScheduleId::new(id, instant)
141 }
142}
143
144#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
145pub(crate) struct CallTimeout;
146
147impl<M> Effects<M> {
148 pub fn send<Msg: SendData>(&self, target: &StageRef<Msg>, msg: Msg) -> BoxFuture<'static, ()> {
151 let call = target.extra().cloned();
152 airlock_effect(&self.effect, StageEffect::Send(target.name().clone(), call, Box::new(msg)), |_eff| Some(()))
153 }
154
155 pub fn clock(&self) -> BoxFuture<'static, Instant> {
157 airlock_effect(&self.effect, StageEffect::Clock, |eff| match eff {
158 Some(StageResponse::ClockResponse(instant)) => Some(instant),
159 _ => None,
160 })
161 }
162
163 pub fn wait(&self, duration: Duration) -> BoxFuture<'static, Instant> {
165 airlock_effect(&self.effect, StageEffect::Wait(duration), |eff| match eff {
166 Some(StageResponse::WaitResponse(instant)) => Some(instant),
167 _ => None,
168 })
169 }
170
171 #[expect(clippy::panic)]
185 #[track_caller]
186 pub fn call<Req: SendData, Resp: SendData + DeserializeOwned>(
187 &self,
188 target: &StageRef<Req>,
189 timeout: Duration,
190 msg: impl FnOnce(StageRef<Resp>) -> Req + Send + 'static,
191 ) -> BoxFuture<'static, Option<Resp>> {
192 if target.extra().is_some() {
193 panic!("cannot answer a call with a call ({} -> {})", self.me.name(), target.name());
194 }
195 let msg = Box::new(move |name: Name, extra: Arc<dyn Any + Send + Sync>| {
196 Box::new(msg(StageRef::new(name).with_extra(extra))) as Box<dyn SendData>
197 }) as CallFn;
198 airlock_effect(
199 &self.effect,
200 StageEffect::Call(target.name().clone(), timeout, CallExtra::CallFn(NoDebug::new(msg))),
201 |eff| match eff {
202 Some(StageResponse::CallResponse(resp)) => {
203 if resp.typetag_name() == type_name::<CallTimeout>() {
204 Some(None)
205 } else {
206 Some(Some(resp.cast_deserialize::<Resp>().expect("internal message type error")))
207 }
208 }
209 _ => None,
210 },
211 )
212 }
213
214 pub fn schedule_at(&self, msg: M, when: Instant) -> BoxFuture<'static, ScheduleId>
221 where
222 M: SendData,
223 {
224 let id = self.schedule_ids.next_at(when);
225 airlock_effect(&self.effect, StageEffect::Schedule(Box::new(msg), id), move |eff| match eff {
226 Some(StageResponse::Unit) => Some(id),
227 _ => None,
228 })
229 }
230
231 pub fn schedule_after(&self, msg: M, delay: Duration) -> BoxFuture<'static, ScheduleId>
237 where
238 M: SendData,
239 {
240 let now = self.clock.now();
241 let when = now + delay;
242 self.schedule_at(msg, when)
243 }
244
245 pub fn cancel_schedule(&self, id: ScheduleId) -> BoxFuture<'static, bool> {
250 airlock_effect(&self.effect, StageEffect::CancelSchedule(id), |eff| match eff {
251 Some(StageResponse::CancelScheduleResponse(cancelled)) => Some(cancelled),
252 _ => None,
253 })
254 }
255
256 pub fn external<T: ExternalEffectAPI>(&self, effect: T) -> BoxFuture<'static, T::Response> {
258 airlock_effect(&self.effect, StageEffect::External(Box::new(effect)), |eff| match eff {
259 Some(StageResponse::ExternalResponse(resp)) => {
260 Some(resp.cast_deserialize::<T::Response>().expect("internal messaging type error"))
261 }
262 _ => None,
263 })
264 }
265
266 #[allow(clippy::panic)]
269 pub fn external_sync<T: ExternalEffectSync + serde::Serialize>(&self, effect: T) -> T::Response {
270 self.trace_buffer.lock().push_suspend_external(self.me.name(), &effect);
271
272 let response = if let Some(fetch_replay) = self.trace_buffer.lock().fetch_replay_mut() {
273 let Some(replayed) = find_next_external_suspend(fetch_replay, self.me.name()) else {
274 panic!("no entry found in fetch replay");
275 };
276 let Effect::External { effect, .. } =
277 from_slice(&to_cbor(&Effect::External { at_stage: self.me.name().clone(), effect: Box::new(effect) }))
278 .expect("internal replay error")
279 else {
280 panic!("serde roundtrip broken");
281 };
282
283 assert!(
284 replayed.test_eq(&*effect),
285 "replayed effect {replayed:?}\ndoes not match performed effect {effect:?}"
286 );
287 find_next_external_resume(fetch_replay, self.me.name())
288 .expect("no response found in fetch replay")
289 .cast_deserialize::<T::Response>()
290 .expect("internal messaging type error")
291 } else {
292 Box::new(effect)
293 .run(self.resources.clone())
294 .now_or_never()
295 .expect("an external sync effect must complete immediately in sync context")
296 .cast_deserialize::<T::Response>()
297 .expect("internal messaging type error")
298 };
299
300 self.trace_buffer.lock().push_resume_external(self.me.name(), &response);
301 response
302 }
303
304 pub fn terminate<T>(&self) -> BoxFuture<'static, T> {
321 airlock_effect(&self.effect, StageEffect::Terminate, |_eff| never())
322 }
323
324 #[expect(clippy::future_not_send)]
325 pub async fn stage<Msg, St, F, Fut>(
326 &self,
327 name: impl AsRef<str>,
328 mut f: F,
329 ) -> crate::StageBuildRef<Msg, St, (TransitionFactory, CanSupervise)>
330 where
331 F: FnMut(St, Msg, Effects<Msg>) -> Fut + 'static + Send,
332 Fut: Future<Output = St> + 'static + Send,
333 Msg: SendData + serde::de::DeserializeOwned,
334 St: SendData,
335 {
336 let name = Name::from(name.as_ref());
337
338 let name = airlock_effect(&self.effect, StageEffect::AddStage(name), |eff| match eff {
339 Some(StageResponse::AddStageResponse(name)) => Some(name),
340 _ => None,
341 })
342 .await;
343
344 let clock = self.clock.clone();
345 let resources = self.resources.clone();
346 let me = StageRef::new(name.clone());
347 let trace_buffer = self.trace_buffer.clone();
348 let schedule_ids = self.schedule_ids.clone();
349
350 let transition = move |effect: EffectBox| {
351 let eff = Effects::new(me, effect, clock, resources, schedule_ids, trace_buffer);
352 Box::new(move |state: Box<dyn SendData>, msg: Box<dyn SendData>| {
353 let state = state.cast::<St>().expect("internal state type error");
354 let msg = msg.cast_deserialize::<Msg>().expect("internal message type error");
355 let state = f(*state, msg, eff.clone());
356 Box::pin(async move { Box::new(state.await) as Box<dyn SendData> })
357 as BoxFuture<'static, Box<dyn SendData>>
358 }) as Transition
359 };
360 let can_supervise = CanSupervise(name.clone());
361 crate::StageBuildRef { name, network: (Box::new(transition), can_supervise), _ph: PhantomData }
362 }
363
364 pub fn supervise<Msg, St>(
368 &self,
369 stage: crate::StageBuildRef<Msg, St, (TransitionFactory, CanSupervise)>,
370 tombstone: M,
371 ) -> crate::StageBuildRef<Msg, St, (TransitionFactory, M)> {
372 let StageBuildRef { name, network, .. } = stage;
373
374 crate::StageBuildRef { name, network: (network.0, tombstone), _ph: PhantomData }
375 }
376
377 #[expect(clippy::future_not_send)]
378 pub async fn contramap<Original: SendData, Mapped: SendData>(
379 &self,
380 stage: impl AsRef<StageRef<Original>>,
381 new_name: impl AsRef<str>,
382 transform: impl Fn(Mapped) -> Original + Send + 'static,
383 ) -> StageRef<Mapped> {
384 let new_name = Name::from(new_name.as_ref());
385 let transform = Box::new(move |mapped: Box<dyn SendData>| {
386 let mapped = mapped.cast::<Mapped>().expect("internal message type error");
387 let original = transform(*mapped);
388 Box::new(original) as Box<dyn SendData>
389 });
390 let name = airlock_effect(
391 &self.effect,
392 StageEffect::Contramap {
393 original: stage.as_ref().name().clone(),
394 new_name,
395 transform: NoDebug::new(transform),
396 },
397 |eff| match eff {
398 Some(StageResponse::ContramapResponse(name)) => Some(name),
399 _ => None,
400 },
401 )
402 .await;
403 StageRef::new(name)
404 }
405
406 #[expect(clippy::future_not_send)]
407 pub async fn wire_up<Msg, St, T: SendData>(
408 &self,
409 stage: crate::StageBuildRef<Msg, St, (TransitionFactory, T)>,
410 state: St,
411 ) -> StageRef<Msg>
412 where
413 Msg: SendData + serde::de::DeserializeOwned,
414 St: SendData,
415 {
416 let StageBuildRef { name, network, _ph } = stage;
417 let (transition, tombstone) = network;
418
419 airlock_effect(
420 &self.effect,
421 StageEffect::WireStage(name.clone(), NoDebug::new(transition), Box::new(state), Box::new(tombstone)),
422 |eff| match eff {
423 Some(StageResponse::Unit) => Some(()),
424 _ => None,
425 },
426 )
427 .await;
428
429 StageRef::new(name)
430 }
431}
432
433#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
434pub struct CanSupervise(pub(crate) Name); impl CanSupervise {
437 pub fn for_test() -> Self {
438 Self(BLACKHOLE_NAME.clone())
439 }
440}
441
442pub trait ExternalEffect: SendData {
447 fn run(self: Box<Self>, resources: Resources) -> BoxFuture<'static, Box<dyn SendData>>;
454
455 fn wrap(
457 f: impl Future<Output = <Self as ExternalEffectAPI>::Response> + Send + 'static,
458 ) -> BoxFuture<'static, Box<dyn SendData>>
459 where
460 Self: Sized + ExternalEffectAPI,
461 {
462 Box::pin(async move {
463 let response = f.await;
464 Box::new(response) as Box<dyn SendData>
465 })
466 }
467
468 fn wrap_sync(response: <Self as ExternalEffectAPI>::Response) -> BoxFuture<'static, Box<dyn SendData>>
470 where
471 Self: Sized + ExternalEffectAPI,
472 {
473 Box::pin(future::ready(Box::new(response) as Box<dyn SendData>))
474 }
475}
476
477impl Display for dyn ExternalEffect {
478 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
479 let me = (self as &dyn SendData).as_send_data_value();
480 write!(f, "{}", me.borrow())
481 }
482}
483
484pub trait ExternalEffectAPI: ExternalEffect {
489 type Response: SendData + DeserializeOwned;
490}
491
492pub trait ExternalEffectSync: ExternalEffectAPI {}
498
499impl dyn ExternalEffect {
500 pub fn is<T: ExternalEffect>(&self) -> bool {
501 (self as &dyn Any).is::<T>()
502 }
503
504 pub fn cast_ref<T: ExternalEffect>(&self) -> Option<&T> {
505 (self as &dyn Any).downcast_ref::<T>()
506 }
507
508 pub fn cast<T: ExternalEffect>(self: Box<Self>) -> anyhow::Result<Box<T>> {
509 if (&*self as &dyn Any).is::<T>() {
510 #[expect(clippy::expect_used)]
511 Ok(Box::new(*(self as Box<dyn Any>).downcast::<T>().expect("checked above")))
512 } else {
513 anyhow::bail!("external effect type error: expected {}, got {:?}", std::any::type_name::<T>(), self)
514 }
515 }
516}
517
518impl serde::Serialize for dyn ExternalEffect {
519 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
520 where
521 S: serde::Serializer,
522 {
523 (self as &dyn SendData).serialize(serializer)
524 }
525}
526
527impl ExternalEffect for () {
528 fn run(self: Box<Self>, _resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
529 Box::pin(async { Box::new(()) as Box<dyn SendData> })
530 }
531}
532
533impl ExternalEffectAPI for () {
534 type Response = ();
535}
536
537#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
541pub struct UnknownExternalEffect {
542 value: SendDataValue,
543}
544
545impl UnknownExternalEffect {
546 pub fn new(value: SendDataValue) -> Self {
547 Self { value }
548 }
549
550 pub fn value(&self) -> &Value {
551 &self.value.value
552 }
553
554 pub fn send_data_value(&self) -> &SendDataValue {
555 &self.value
556 }
557
558 pub fn cast<T: ExternalEffect + DeserializeOwned>(self) -> anyhow::Result<T> {
559 anyhow::ensure!(
560 self.value.typetag == type_name::<T>(),
561 "expected `{}`, got `{}`",
562 type_name::<T>(),
563 self.value.typetag
564 );
565 let bytes = to_cbor(&self.value.value);
566 Ok(from_slice(&bytes)?)
567 }
568
569 pub fn boxed<T: ExternalEffect>(value: &T) -> Box<dyn ExternalEffect> {
570 Box::new(Self::new(SendDataValue::new(value)))
571 }
572}
573
574impl ExternalEffect for UnknownExternalEffect {
575 fn run(self: Box<Self>, _resources: Resources) -> BoxFuture<'static, Box<dyn SendData>> {
576 Box::pin(async { Box::new(()) as Box<dyn SendData> })
577 }
578}
579
580#[test]
581fn unknown_external_effect() {
582 #[derive(serde::Serialize, serde::Deserialize)]
583 struct Container(#[serde(with = "crate::serde::serialize_external_effect")] Box<dyn ExternalEffect>);
584
585 let output = crate::OutputEffect::new(Name::from("from"), 3.2, tokio::sync::mpsc::channel(1).0);
586 let container = Container(Box::new(output));
587 let bytes = to_cbor(&container);
588 let container2: Container = from_slice(&bytes).unwrap();
589 let output2 = *container2.0.cast::<UnknownExternalEffect>().unwrap();
590 let output2 = output2.cast::<crate::OutputEffect<f64>>().unwrap();
591 assert_eq!(output2.name, Name::from("from"));
592 assert_eq!(output2.msg, 3.2);
593}
594
595type CallFn = Box<dyn FnOnce(Name, Arc<dyn Any + Send + Sync>) -> Box<dyn SendData> + Send + 'static>;
596#[derive(Debug)]
597pub enum CallExtra {
598 CallFn(NoDebug<CallFn>),
599 Scheduled(ScheduleId),
600}
601
602#[derive(Debug)]
605pub enum StageEffect<T> {
606 Receive,
607 Send(Name, Option<Arc<dyn Any + Send + Sync>>, T),
608 Call(Name, Duration, CallExtra),
609 Clock,
610 Wait(Duration),
611 Schedule(T, ScheduleId),
612 CancelSchedule(ScheduleId),
613 External(Box<dyn ExternalEffect>),
614 Terminate,
615 AddStage(Name),
616 WireStage(Name, NoDebug<TransitionFactory>, T, T),
617 Contramap {
618 original: Name,
619 new_name: Name,
620 transform: NoDebug<Box<dyn Fn(Box<dyn SendData>) -> Box<dyn SendData> + Send + 'static>>,
621 },
622}
623
624pub type TransitionFactory = Box<dyn FnOnce(EffectBox) -> Transition + Send + 'static>;
625
626#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
628pub enum StageResponse {
629 Unit,
630 ClockResponse(Instant),
631 WaitResponse(Instant),
632 CallResponse(#[serde(with = "crate::serde::serialize_send_data")] Box<dyn SendData>),
633 CancelScheduleResponse(bool),
634 ExternalResponse(#[serde(with = "crate::serde::serialize_send_data")] Box<dyn SendData>),
635 AddStageResponse(Name),
636 ContramapResponse(Name),
637}
638
639impl StageResponse {
640 pub fn to_json(&self) -> serde_json::Value {
641 match self {
642 StageResponse::Unit => serde_json::json!({"type": "unit"}),
643 StageResponse::ClockResponse(instant) => serde_json::json!({
644 "type": "clock",
645 "instant": instant,
646 }),
647 StageResponse::WaitResponse(instant) => serde_json::json!({
648 "type": "wait",
649 "instant": instant,
650 }),
651 StageResponse::CallResponse(msg) => serde_json::json!({
652 "type": "call",
653 "msg": format!("{msg}"),
654 }),
655 StageResponse::CancelScheduleResponse(cancelled) => serde_json::json!({
656 "type": "cancel_schedule",
657 "cancelled": cancelled,
658 }),
659 StageResponse::ExternalResponse(msg) => serde_json::json!({
660 "type": "external",
661 "msg": format!("{msg}"),
662 }),
663 StageResponse::AddStageResponse(name) => serde_json::json!({
664 "type": "add_stage",
665 "name": name,
666 }),
667 StageResponse::ContramapResponse(name) => serde_json::json!({
668 "type": "contramap",
669 "name": name,
670 }),
671 }
672 }
673}
674
675impl Display for StageResponse {
676 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677 match self {
678 StageResponse::Unit => write!(f, "()"),
679 StageResponse::ClockResponse(instant) => write!(f, "clock-{instant}"),
680 StageResponse::WaitResponse(instant) => write!(f, "wait-{instant}"),
681 StageResponse::CallResponse(msg) => {
682 write!(f, "{}", msg.as_send_data_value().borrow())
683 }
684 StageResponse::CancelScheduleResponse(cancelled) => {
685 write!(f, "cancel_schedule-{}", cancelled)
686 }
687 StageResponse::ExternalResponse(msg) => {
688 write!(f, "{}", msg.as_send_data_value().borrow())
689 }
690 StageResponse::AddStageResponse(name) => write!(f, "add_stage {name}"),
691 StageResponse::ContramapResponse(name) => write!(f, "contramap {name}"),
692 }
693 }
694}
695
696impl StageEffect<Box<dyn SendData>> {
697 pub(crate) fn split(self, at_name: Name, schedule_ids: &ScheduleIds, now: Instant) -> (StageEffect<()>, Effect) {
701 #[expect(clippy::panic)]
702 match self {
703 StageEffect::Receive => (StageEffect::Receive, Effect::Receive { at_stage: at_name }),
704 StageEffect::Send(name, call, msg) => {
705 (StageEffect::Send(name.clone(), call, ()), Effect::Send { from: at_name, to: name, msg })
706 }
707 StageEffect::Call(name, duration, msg) => {
708 let id = schedule_ids.next_at(now + duration);
709 let CallExtra::CallFn(msg) = msg else {
710 panic!("expected CallFn, got {:?}", msg);
711 };
712 let msg = (msg.into_inner())(at_name.clone(), Arc::new(id));
713 (
714 StageEffect::Call(name.clone(), duration, CallExtra::Scheduled(id)),
715 Effect::Call { from: at_name, to: name, duration, msg },
716 )
717 }
718 StageEffect::Clock => (StageEffect::Clock, Effect::Clock { at_stage: at_name }),
719 StageEffect::Wait(duration) => (StageEffect::Wait(duration), Effect::Wait { at_stage: at_name, duration }),
720 StageEffect::Schedule(msg, id) => {
721 (StageEffect::Schedule((), id), Effect::Schedule { at_stage: at_name, msg, id })
722 }
723 StageEffect::CancelSchedule(id) => {
724 (StageEffect::CancelSchedule(id), Effect::CancelSchedule { at_stage: at_name, id })
725 }
726 StageEffect::External(effect) => {
727 (StageEffect::External(Box::new(())), Effect::External { at_stage: at_name, effect })
728 }
729 StageEffect::Terminate => (StageEffect::Terminate, Effect::Terminate { at_stage: at_name }),
730 StageEffect::AddStage(name) => {
731 (StageEffect::AddStage(name.clone()), Effect::AddStage { at_stage: at_name, name })
732 }
733 StageEffect::WireStage(name, transition, initial_state, tombstone) => (
734 StageEffect::WireStage(name.clone(), transition, (), ()),
735 Effect::WireStage { at_stage: at_name, name, initial_state, tombstone },
736 ),
737 StageEffect::Contramap { original, new_name, transform } => (
738 StageEffect::Contramap { original: original.clone(), new_name: new_name.clone(), transform },
739 Effect::Contramap { at_stage: at_name, original, new_name },
740 ),
741 }
742 }
743}
744
745#[derive(Debug, serde::Serialize, serde::Deserialize)]
747pub enum Effect {
748 Receive {
749 at_stage: Name,
750 },
751 Send {
752 from: Name,
753 to: Name,
754 #[serde(with = "crate::serde::serialize_send_data")]
755 msg: Box<dyn SendData>,
756 },
757 Call {
758 from: Name,
759 to: Name,
760 duration: Duration,
761 #[serde(with = "crate::serde::serialize_send_data")]
762 msg: Box<dyn SendData>,
763 },
764 Clock {
765 at_stage: Name,
766 },
767 Wait {
768 at_stage: Name,
769 duration: Duration,
770 },
771 Schedule {
772 at_stage: Name,
773 #[serde(with = "crate::serde::serialize_send_data")]
774 msg: Box<dyn SendData>,
775 id: ScheduleId,
776 },
777 CancelSchedule {
778 at_stage: Name,
779 id: ScheduleId,
780 },
781 External {
782 at_stage: Name,
783 #[serde(with = "crate::serde::serialize_external_effect")]
784 effect: Box<dyn ExternalEffect>,
785 },
786 Terminate {
787 at_stage: Name,
788 },
789 AddStage {
790 at_stage: Name,
791 name: Name,
792 },
793 WireStage {
794 at_stage: Name,
795 name: Name,
796 #[serde(with = "crate::serde::serialize_send_data")]
797 initial_state: Box<dyn SendData>,
798 #[serde(with = "crate::serde::serialize_send_data")]
799 tombstone: Box<dyn SendData>,
800 },
801 Contramap {
802 at_stage: Name,
803 original: Name,
804 new_name: Name,
805 },
806}
807
808impl Effect {
809 pub fn to_json(&self) -> serde_json::Value {
810 match self {
811 Effect::Receive { at_stage } => serde_json::json!({
812 "type": "receive",
813 "at_stage": at_stage,
814 }),
815 Effect::Send { from, to, msg } => {
816 serde_json::json!({
817 "type": "send",
818 "from": from,
819 "to": to,
820 "msg": format!("{msg}"),
821 })
822 }
823 Effect::Call { from, to, duration, msg } => serde_json::json!({
824 "type": "call",
825 "from": from,
826 "to": to,
827 "duration": duration.as_millis(),
828 "msg": format!("{msg}"),
829 }),
830 Effect::Clock { at_stage } => serde_json::json!({
831 "type": "clock",
832 "at_stage": at_stage,
833 }),
834 Effect::Wait { at_stage, duration } => serde_json::json!({
835 "type": "wait",
836 "at_stage": at_stage,
837 "duration": duration.as_millis(),
838 }),
839 Effect::Schedule { at_stage, msg, id } => serde_json::json!({
840 "type": "schedule",
841 "at_stage": at_stage,
842 "msg": format!("{msg}"),
843 "id": format!("{:?}", id),
844 }),
845 Effect::CancelSchedule { at_stage, id } => serde_json::json!({
846 "type": "cancel_schedule",
847 "at_stage": at_stage,
848 "id": format!("{:?}", id),
849 }),
850 Effect::External { at_stage, effect } => {
851 let effect_type = effect
852 .cast_ref::<UnknownExternalEffect>()
853 .map(|e| e.send_data_value().typetag.as_str())
854 .unwrap_or_else(|| effect.typetag_name());
855 serde_json::json!({
856 "type": "external",
857 "at_stage": at_stage,
858 "effect": effect.to_string(),
859 "effect_type": effect_type,
860 })
861 }
862 Effect::Terminate { at_stage } => serde_json::json!({
863 "type": "terminate",
864 "at_stage": at_stage,
865 }),
866 Effect::AddStage { at_stage, name } => serde_json::json!({
867 "type": "add_stage",
868 "at_stage": at_stage,
869 "name": name,
870 }),
871 Effect::WireStage { at_stage, name, initial_state, tombstone } => serde_json::json!({
872 "type": "wire_stage",
873 "at_stage": at_stage,
874 "name": name,
875 "initial_state": format!("{initial_state}"),
876 "tombstone": format!("{tombstone}"),
877 }),
878 Effect::Contramap { at_stage, original, new_name } => serde_json::json!({
879 "type": "contramap",
880 "at_stage": at_stage,
881 "original": original,
882 "new_name": new_name,
883 }),
884 }
885 }
886}
887
888impl Display for Effect {
889 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
890 match self {
891 Effect::Receive { at_stage } => write!(f, "receive {at_stage}"),
892 Effect::Send { from, to, msg } => {
893 write!(f, "send {from} -> {to}: {msg}",)
894 }
895 Effect::Call { from, to, duration, msg } => {
896 write!(f, "call {from} -> {to}: {duration:?} {msg}")
897 }
898 Effect::Clock { at_stage } => write!(f, "clock {at_stage}"),
899 Effect::Wait { at_stage, duration } => {
900 write!(f, "wait {at_stage}: {duration:?}")
901 }
902 Effect::Schedule { at_stage, msg, id } => write!(f, "schedule {at_stage} {id}: {msg}",),
903 Effect::CancelSchedule { at_stage, id } => {
904 write!(f, "cancel_schedule {at_stage} {id}")
905 }
906 Effect::External { at_stage, effect } => {
907 write!(f, "external {at_stage}: {effect}",)
908 }
909 Effect::Terminate { at_stage } => write!(f, "terminate {at_stage}"),
910 Effect::AddStage { at_stage, name } => write!(f, "add_stage {at_stage} {name}"),
911 Effect::WireStage { at_stage, name, initial_state, tombstone } => {
912 write!(f, "wire_stage {at_stage} {name} {initial_state} {tombstone}")
913 }
914 Effect::Contramap { at_stage, original, new_name } => {
915 write!(f, "contramap {at_stage} {original} -> {new_name}")
916 }
917 }
918 }
919}
920
921#[expect(clippy::wildcard_enum_match_arm, clippy::panic)]
922impl Effect {
923 pub fn send(from: impl AsRef<str>, to: impl AsRef<str>, msg: Box<dyn SendData>) -> Self {
925 Self::Send { from: Name::from(from.as_ref()), to: Name::from(to.as_ref()), msg }
926 }
927
928 pub fn call(from: impl AsRef<str>, to: impl AsRef<str>, duration: Duration, msg: Box<dyn SendData>) -> Self {
930 Self::Call { from: Name::from(from.as_ref()), to: Name::from(to.as_ref()), duration, msg }
931 }
932
933 pub fn clock(at_stage: impl AsRef<str>) -> Self {
935 Self::Clock { at_stage: Name::from(at_stage.as_ref()) }
936 }
937
938 pub fn wait(at_stage: impl AsRef<str>, duration: Duration) -> Self {
940 Self::Wait { at_stage: Name::from(at_stage.as_ref()), duration }
941 }
942
943 pub fn schedule(at_stage: impl AsRef<str>, msg: Box<dyn SendData>, schedule_id: &ScheduleId) -> Self {
945 Self::Schedule { at_stage: Name::from(at_stage.as_ref()), msg, id: *schedule_id }
946 }
947
948 pub fn cancel(at_stage: impl AsRef<str>, schedule_id: &ScheduleId) -> Self {
949 Self::CancelSchedule { at_stage: Name::from(at_stage.as_ref()), id: *schedule_id }
950 }
951
952 pub fn external(at_stage: impl AsRef<str>, effect: Box<dyn ExternalEffect>) -> Self {
954 Self::External { at_stage: Name::from(at_stage.as_ref()), effect }
955 }
956
957 pub fn terminate(at_stage: impl AsRef<str>) -> Self {
959 Self::Terminate { at_stage: Name::from(at_stage.as_ref()) }
960 }
961
962 pub fn add_stage(at_stage: impl AsRef<str>, name: impl AsRef<str>) -> Self {
964 Self::AddStage { at_stage: Name::from(at_stage.as_ref()), name: Name::from(name.as_ref()) }
965 }
966
967 pub fn wire_stage(
969 at_stage: impl AsRef<str>,
970 name: impl AsRef<str>,
971 initial_state: Box<dyn SendData>,
972 tombstone: Option<Box<dyn SendData>>,
973 ) -> Self {
974 Self::WireStage {
975 at_stage: Name::from(at_stage.as_ref()),
976 name: Name::from(name.as_ref()),
977 initial_state,
978 tombstone: tombstone.unwrap_or_else(|| SendDataValue::boxed(&CanSupervise(Name::from(name.as_ref())))),
979 }
980 }
981
982 pub fn contramap(at_stage: impl AsRef<str>, original: impl AsRef<str>, new_name: impl AsRef<str>) -> Self {
983 Self::Contramap {
984 at_stage: Name::from(at_stage.as_ref()),
985 original: Name::from(original.as_ref()),
986 new_name: Name::from(new_name.as_ref()),
987 }
988 }
989
990 pub fn at_stage(&self) -> &Name {
992 match self {
993 Effect::Receive { at_stage, .. } => at_stage,
994 Effect::Send { from, .. } => from,
995 Effect::Call { from, .. } => from,
996 Effect::Clock { at_stage, .. } => at_stage,
997 Effect::Wait { at_stage, .. } => at_stage,
998 Effect::Schedule { at_stage, .. } => at_stage,
999 Effect::CancelSchedule { at_stage, .. } => at_stage,
1000 Effect::External { at_stage, .. } => at_stage,
1001 Effect::Terminate { at_stage, .. } => at_stage,
1002 Effect::AddStage { at_stage, .. } => at_stage,
1003 Effect::WireStage { at_stage, .. } => at_stage,
1004 Effect::Contramap { at_stage, .. } => at_stage,
1005 }
1006 }
1007
1008 #[track_caller]
1010 pub fn assert_receive<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>) {
1011 let at_stage = at_stage.as_ref();
1012 match self {
1013 Effect::Receive { at_stage: a } if a == at_stage.name() => {}
1014 _ => panic!("unexpected effect {self:?}\n looking for Receive at `{}`", at_stage.name()),
1015 }
1016 }
1017
1018 #[expect(clippy::unwrap_used)]
1020 #[track_caller]
1021 pub fn assert_send<Msg1, Msg2: SendData + PartialEq>(
1022 &self,
1023 at_stage: impl AsRef<StageRef<Msg1>>,
1024 target: impl AsRef<StageRef<Msg2>>,
1025 msg: Msg2,
1026 ) {
1027 let at_stage = at_stage.as_ref();
1028 let target = target.as_ref();
1029 match self {
1030 Effect::Send { from, to, msg: m }
1031 if from == at_stage.name()
1032 && to == target.name()
1033 && (&**m as &dyn Any).downcast_ref::<Msg2>().unwrap() == &msg => {}
1034 _ => {
1035 panic!(
1036 "unexpected effect {self:?}\n looking for Send from `{}` to `{}` with msg {msg:?}",
1037 at_stage.name(),
1038 target.name()
1039 )
1040 }
1041 }
1042 }
1043
1044 #[track_caller]
1046 pub fn assert_clock<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>) {
1047 let at_stage = at_stage.as_ref();
1048 match self {
1049 Effect::Clock { at_stage: a } if a == at_stage.name() => {}
1050 _ => panic!("unexpected effect {self:?}\n looking for Clock at `{}`", at_stage.name()),
1051 }
1052 }
1053
1054 #[track_caller]
1056 pub fn assert_wait<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>, duration: Duration) {
1057 let at_stage = at_stage.as_ref();
1058 match self {
1059 Effect::Wait { at_stage: a, duration: d } if a == at_stage.name() && d == &duration => {}
1060 _ => panic!(
1061 "unexpected effect {self:?}\n looking for Wait at `{}` with duration {duration:?}",
1062 at_stage.name()
1063 ),
1064 }
1065 }
1066
1067 #[track_caller]
1069 pub fn assert_call<Msg1, Msg2: SendData, Out>(
1070 self,
1071 at_stage: impl AsRef<StageRef<Msg1>>,
1072 target: impl AsRef<StageRef<Msg2>>,
1073 extract: impl FnOnce(Msg2) -> Out,
1074 duration: Duration,
1075 ) -> Out {
1076 let at_stage = at_stage.as_ref();
1077 let target = target.as_ref();
1078 match self {
1079 Effect::Call { from, to, duration: d, msg }
1080 if &from == at_stage.name() && &to == target.name() && d == duration =>
1081 {
1082 extract(*msg.cast::<Msg2>().expect("internal messaging type error"))
1083 }
1084 _ => panic!(
1085 "unexpected effect {self:?}\n looking for Send from `{}` to `{}` with duration {duration:?}",
1086 at_stage.name(),
1087 target.name()
1088 ),
1089 }
1090 }
1091
1092 #[track_caller]
1094 pub fn assert_external<Eff: ExternalEffect + PartialEq>(&self, at_stage: impl AsRef<Name>, effect: &Eff) {
1095 let at_stage = at_stage.as_ref();
1096 match self {
1097 Effect::External { at_stage: a, effect: e }
1098 if a == at_stage && &**e as &dyn SendData == effect as &dyn SendData => {}
1099 _ => panic!("unexpected effect {self:?}\n looking for External at `{}` with effect {effect:?}", at_stage),
1100 }
1101 }
1102
1103 #[track_caller]
1105 pub fn extract_external<Eff: ExternalEffectAPI + PartialEq>(self, at_stage: impl AsRef<Name>) -> Box<Eff> {
1106 let at_stage = at_stage.as_ref();
1107 match self {
1108 Effect::External { at_stage: a, effect: e } if &a == at_stage =>
1109 {
1110 #[expect(clippy::unwrap_used)]
1111 e.cast::<Eff>().unwrap()
1112 }
1113 _ => panic!(
1114 "unexpected effect {self:?}\n looking for External at `{}` with effect {}",
1115 at_stage,
1116 type_name::<Eff>()
1117 ),
1118 }
1119 }
1120
1121 #[track_caller]
1123 pub fn assert_add_stage<Msg>(&self, at_stage: impl AsRef<StageRef<Msg>>, name: impl AsRef<str>) {
1124 let at_stage = at_stage.as_ref();
1125 match self {
1126 Effect::AddStage { at_stage: a, name: n } if a == at_stage.name() && n.as_str() == name.as_ref() => {}
1127 _ => panic!(
1128 "unexpected effect {self:?}\n looking for AddStage at `{}` with name `{}`",
1129 at_stage.name(),
1130 name.as_ref()
1131 ),
1132 }
1133 }
1134
1135 #[track_caller]
1137 pub fn assert_wire_stage<Msg, St: SendData + PartialEq>(
1138 &self,
1139 at_stage: impl AsRef<StageRef<Msg>>,
1140 name: impl AsRef<str>,
1141 initial_state: St,
1142 ) {
1143 let at_stage = at_stage.as_ref();
1144 match self {
1145 Effect::WireStage { at_stage: a, name: n, initial_state: i, tombstone: _ }
1146 if a == at_stage.name()
1147 && n.as_str() == name.as_ref()
1148 && i.cast_ref::<St>().expect("type error") == &initial_state => {}
1149 _ => panic!(
1150 "unexpected effect {self:?}\n looking for WireStage at `{}` with name `{}`",
1151 at_stage.name(),
1152 name.as_ref()
1153 ),
1154 }
1155 }
1156
1157 #[track_caller]
1158 pub fn extract_wire_stage<Msg, St: SendData + PartialEq>(
1159 &self,
1160 at_stage: impl AsRef<StageRef<Msg>>,
1161 initial_state: St,
1162 ) -> &Name {
1163 let at_stage = at_stage.as_ref();
1164 match self {
1165 Effect::WireStage { at_stage: a, name: n, initial_state: i, tombstone: _ }
1166 if a == at_stage.name() && i.cast_ref::<St>().expect("type error") == &initial_state =>
1167 {
1168 n
1169 }
1170 _ => {
1171 panic!(
1172 "unexpected effect {self:?}\n looking for WireStage at `{}` with initial state {initial_state:?}",
1173 at_stage.name()
1174 )
1175 }
1176 }
1177 }
1178}
1179
1180impl PartialEq for Effect {
1181 #[expect(clippy::wildcard_enum_match_arm)]
1182 fn eq(&self, other: &Self) -> bool {
1183 match self {
1184 Effect::Receive { at_stage } => match other {
1185 Effect::Receive { at_stage: other_at_stage } => at_stage == other_at_stage,
1186 _ => false,
1187 },
1188 Effect::Send { from, to, msg } => match other {
1189 Effect::Send { from: other_from, to: other_to, msg: other_msg } => {
1190 from == other_from && to == other_to && msg == other_msg
1191 }
1192 _ => false,
1193 },
1194 Effect::Call { from, to, duration, msg } => match other {
1195 Effect::Call { from: other_from, to: other_to, duration: other_duration, msg: other_msg } => {
1196 from == other_from && to == other_to && duration == other_duration && msg == other_msg
1197 }
1198 _ => false,
1199 },
1200 Effect::Clock { at_stage } => match other {
1201 Effect::Clock { at_stage: other_at_stage } => at_stage == other_at_stage,
1202 _ => false,
1203 },
1204 Effect::Wait { at_stage, duration } => match other {
1205 Effect::Wait { at_stage: other_at_stage, duration: other_duration } => {
1206 at_stage == other_at_stage && duration == other_duration
1207 }
1208 _ => false,
1209 },
1210 Effect::Schedule { at_stage, msg, id } => match other {
1211 Effect::Schedule { at_stage: other_at_stage, msg: other_msg, id: other_id } => {
1212 at_stage == other_at_stage && msg == other_msg && id == other_id
1213 }
1214 _ => false,
1215 },
1216 Effect::CancelSchedule { at_stage, id } => match other {
1217 Effect::CancelSchedule { at_stage: other_at_stage, id: other_id } => {
1218 at_stage == other_at_stage && id == other_id
1219 }
1220 _ => false,
1221 },
1222 Effect::External { at_stage, effect } => match other {
1223 Effect::External { at_stage: other_at_stage, effect: other_effect } => {
1224 at_stage == other_at_stage && &**effect as &dyn SendData == &**other_effect as &dyn SendData
1225 }
1226 _ => false,
1227 },
1228 Effect::Terminate { at_stage } => match other {
1229 Effect::Terminate { at_stage: other_at_stage } => at_stage == other_at_stage,
1230 _ => false,
1231 },
1232 Effect::AddStage { at_stage, name } => match other {
1233 Effect::AddStage { at_stage: other_at_stage, name: other_name } => {
1234 at_stage == other_at_stage && name == other_name
1235 }
1236 _ => false,
1237 },
1238 Effect::WireStage { at_stage, name, initial_state, tombstone } => match other {
1239 Effect::WireStage {
1240 at_stage: other_at_stage,
1241 name: other_name,
1242 initial_state: other_initial_state,
1243 tombstone: other_tombstone,
1244 } => {
1245 at_stage == other_at_stage
1246 && name == other_name
1247 && initial_state == other_initial_state
1248 && tombstone == other_tombstone
1249 }
1250 _ => false,
1251 },
1252 Effect::Contramap { at_stage, original, new_name } => match other {
1253 Effect::Contramap { at_stage: other_at_stage, original: other_original, new_name: other_new_name } => {
1254 at_stage == other_at_stage && original == other_original && new_name == other_new_name
1255 }
1256 _ => false,
1257 },
1258 }
1259 }
1260}