Skip to main content

pure_stage/
output.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16
17use tokio::sync::mpsc;
18
19use crate::{ExternalEffect, ExternalEffectAPI, Name, Resources, SendData, types::MpscSender};
20
21/// An effect that sends a message to an output channel.
22///
23/// This is used to send messages to the output stage, which is used to collect the results of the simulation.
24///
25/// The [`OutputEffect`] is created by [`StageGraph::output`](crate::StageGraph::output).
26#[derive(Clone, serde::Serialize, serde::Deserialize)]
27pub struct OutputEffect<Msg> {
28    pub name: Name,
29    pub msg: Msg,
30    sender: MpscSender<Msg>,
31}
32
33impl<Msg> OutputEffect<Msg> {
34    pub fn new(name: Name, msg: Msg, sender: mpsc::Sender<Msg>) -> Self {
35        Self { name, msg, sender: MpscSender { sender } }
36    }
37
38    /// Create a fake output effect for testing.
39    pub fn fake(name: Name, msg: Msg) -> (Self, mpsc::Receiver<Msg>) {
40        let (tx, rx) = mpsc::channel(1);
41        (Self { name, msg, sender: MpscSender { sender: tx } }, rx)
42    }
43}
44
45impl<Msg: SendData> fmt::Debug for OutputEffect<Msg> {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        f.debug_struct("OutputEffect")
48            .field("name", &self.name)
49            .field("msg", &self.msg)
50            .field("type", &self.msg.typetag_name())
51            .finish()
52    }
53}
54
55impl<Msg: SendData + PartialEq> PartialEq for OutputEffect<Msg> {
56    fn eq(&self, other: &Self) -> bool {
57        self.name == other.name && self.msg == other.msg
58    }
59}
60
61impl<Msg> ExternalEffect for OutputEffect<Msg>
62where
63    Msg: SendData + PartialEq + serde::Serialize + serde::de::DeserializeOwned,
64{
65    fn run(self: Box<Self>, _resources: Resources) -> crate::BoxFuture<'static, Box<dyn SendData>> {
66        Box::pin(async move {
67            if let Err(e) = self.sender.send(self.msg).await {
68                tracing::debug!("output `{}` failed to send message: {:?}", self.name, e.0);
69            }
70            Box::new(()) as Box<dyn SendData>
71        })
72    }
73}
74
75impl<Msg> ExternalEffectAPI for OutputEffect<Msg>
76where
77    Msg: SendData + PartialEq + serde::Serialize + serde::de::DeserializeOwned,
78{
79    type Response = ();
80}