arbiter_engine/
messager.rs

1//! The messager module contains the core messager layer for the Arbiter Engine.
2
3use tokio::sync::broadcast::{channel, Receiver, Sender};
4
5use super::*;
6use crate::machine::EventStream;
7
8/// A message that can be sent between agents.
9#[derive(Clone, Debug, Deserialize, Serialize)]
10pub struct Message {
11    /// The sender of the message.
12    pub from: String,
13
14    /// The recipient of the message.
15    pub to: To,
16
17    /// The data of the message.
18    /// This can be a struct serialized into JSON.
19    pub data: String,
20}
21
22/// The recipient of the message.
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
24pub enum To {
25    /// Send the message to all agents who are listening for broadcasts.
26    All,
27
28    /// Send the message to a specific agent.
29    Agent(String),
30}
31
32/// A messager that can be used to send messages between agents.
33#[derive(Debug)]
34pub struct Messager {
35    /// The identifier of the entity that is using the messager.
36    pub id: Option<String>,
37
38    pub(crate) broadcast_sender: Sender<Message>,
39
40    broadcast_receiver: Option<Receiver<Message>>,
41}
42
43impl Clone for Messager {
44    fn clone(&self) -> Self {
45        Self {
46            broadcast_sender: self.broadcast_sender.clone(),
47            broadcast_receiver: Some(self.broadcast_sender.subscribe()),
48            id: self.id.clone(),
49        }
50    }
51}
52
53impl Messager {
54    /// Creates a new messager with the given capacity.
55    #[allow(clippy::new_without_default)]
56    pub fn new() -> Self {
57        let (broadcast_sender, broadcast_receiver) = channel(512);
58        Self {
59            broadcast_sender,
60            broadcast_receiver: Some(broadcast_receiver),
61            id: None,
62        }
63    }
64
65    /// Returns a [`Messager`] interface connected to the same instance but with
66    /// the `id` provided.
67    pub fn for_agent(&self, id: &str) -> Self {
68        Self {
69            broadcast_sender: self.broadcast_sender.clone(),
70            broadcast_receiver: Some(self.broadcast_sender.subscribe()),
71            id: Some(id.to_owned()),
72        }
73    }
74
75    /// utility function for getting the next value from the broadcast_receiver
76    /// without streaming
77    pub async fn get_next(&mut self) -> Result<Message, ArbiterEngineError> {
78        let mut receiver = match self.broadcast_receiver.take() {
79            Some(receiver) => receiver,
80            None => {
81                return Err(ArbiterEngineError::MessagerError(
82                    "Receiver has been taken! Are you already streaming on this messager?"
83                        .to_owned(),
84                ))
85            }
86        };
87        while let Ok(message) = receiver.recv().await {
88            match &message.to {
89                To::All => {
90                    return Ok(message);
91                }
92                To::Agent(id) => {
93                    if let Some(self_id) = &self.id {
94                        if id == self_id {
95                            return Ok(message);
96                        }
97                    }
98                    continue;
99                }
100            }
101        }
102        unreachable!()
103    }
104
105    /// Returns a stream of messages that are either sent to [`To::All`] or to
106    /// the agent via [`To::Agent(id)`].
107    pub fn stream(mut self) -> Result<EventStream<Message>, ArbiterEngineError> {
108        let mut receiver = match self.broadcast_receiver.take() {
109            Some(receiver) => receiver,
110            None => {
111                return Err(ArbiterEngineError::MessagerError(
112                    "Receiver has been taken! Are you already streaming on this messager?"
113                        .to_owned(),
114                ))
115            }
116        };
117        Ok(Box::pin(async_stream::stream! {
118            while let Ok(message) = receiver.recv().await {
119                match &message.to {
120                    To::All => {
121                        yield message;
122                    }
123                    To::Agent(id) => {
124                        if let Some(self_id) = &self.id {
125                            if id == self_id {
126                                yield message;
127                            }
128                        }
129                    }
130                }
131            }
132        }))
133    }
134    /// Asynchronously sends a message to a specified recipient.
135    ///
136    /// This method constructs a message with the provided data and sends it to
137    /// the specified recipient. The recipient can either be a single agent
138    /// or all agents, depending on the `to` parameter. The data is
139    /// serialized into a JSON string before being sent.
140    ///
141    /// # Type Parameters
142    ///
143    /// - `T`: The type that can be converted into a recipient specification
144    ///   (`To`).
145    /// - `S`: The type of the data being sent. Must implement `Serialize`.
146    ///
147    /// # Parameters
148    ///
149    /// - `to`: The recipient of the message. Can be an individual agent's ID or
150    ///   a broadcast to all agents.
151    /// - `data`: The data to be sent in the message. This data is serialized
152    ///   into JSON format.
153    pub async fn send<S: Serialize>(&self, to: To, data: S) -> Result<(), ArbiterEngineError> {
154        trace!("Sending message via messager.");
155        if let Some(id) = &self.id {
156            let message = Message {
157                from: id.clone(),
158                to,
159                data: serde_json::to_string(&data)?,
160            };
161            self.broadcast_sender.send(message)?;
162            Ok(())
163        } else {
164            Err(ArbiterEngineError::MessagerError(
165                "Messager has no ID! You must have an ID to send messages!".to_owned(),
166            ))
167        }
168    }
169}