arbiter_engine/messager.rs
1//! The messager module contains the core messager layer for the Arbiter Engine.
2
3use tokio::sync::broadcast::{channel, Receiver, Sender};
4
5use super::*;
6use crate::machine::EventStream;
7
8/// A message that can be sent between agents.
9#[derive(Clone, Debug, Deserialize, Serialize)]
10pub struct Message {
11 /// The sender of the message.
12 pub from: String,
13
14 /// The recipient of the message.
15 pub to: To,
16
17 /// The data of the message.
18 /// This can be a struct serialized into JSON.
19 pub data: String,
20}
21
22/// The recipient of the message.
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
24pub enum To {
25 /// Send the message to all agents who are listening for broadcasts.
26 All,
27
28 /// Send the message to a specific agent.
29 Agent(String),
30}
31
32/// A messager that can be used to send messages between agents.
33#[derive(Debug)]
34pub struct Messager {
35 /// The identifier of the entity that is using the messager.
36 pub id: Option<String>,
37
38 pub(crate) broadcast_sender: Sender<Message>,
39
40 broadcast_receiver: Option<Receiver<Message>>,
41}
42
43impl Clone for Messager {
44 fn clone(&self) -> Self {
45 Self {
46 broadcast_sender: self.broadcast_sender.clone(),
47 broadcast_receiver: Some(self.broadcast_sender.subscribe()),
48 id: self.id.clone(),
49 }
50 }
51}
52
53impl Messager {
54 /// Creates a new messager with the given capacity.
55 #[allow(clippy::new_without_default)]
56 pub fn new() -> Self {
57 let (broadcast_sender, broadcast_receiver) = channel(512);
58 Self {
59 broadcast_sender,
60 broadcast_receiver: Some(broadcast_receiver),
61 id: None,
62 }
63 }
64
65 /// Returns a [`Messager`] interface connected to the same instance but with
66 /// the `id` provided.
67 pub fn for_agent(&self, id: &str) -> Self {
68 Self {
69 broadcast_sender: self.broadcast_sender.clone(),
70 broadcast_receiver: Some(self.broadcast_sender.subscribe()),
71 id: Some(id.to_owned()),
72 }
73 }
74
75 /// utility function for getting the next value from the broadcast_receiver
76 /// without streaming
77 pub async fn get_next(&mut self) -> Result<Message, ArbiterEngineError> {
78 let mut receiver = match self.broadcast_receiver.take() {
79 Some(receiver) => receiver,
80 None => {
81 return Err(ArbiterEngineError::MessagerError(
82 "Receiver has been taken! Are you already streaming on this messager?"
83 .to_owned(),
84 ))
85 }
86 };
87 while let Ok(message) = receiver.recv().await {
88 match &message.to {
89 To::All => {
90 return Ok(message);
91 }
92 To::Agent(id) => {
93 if let Some(self_id) = &self.id {
94 if id == self_id {
95 return Ok(message);
96 }
97 }
98 continue;
99 }
100 }
101 }
102 unreachable!()
103 }
104
105 /// Returns a stream of messages that are either sent to [`To::All`] or to
106 /// the agent via [`To::Agent(id)`].
107 pub fn stream(mut self) -> Result<EventStream<Message>, ArbiterEngineError> {
108 let mut receiver = match self.broadcast_receiver.take() {
109 Some(receiver) => receiver,
110 None => {
111 return Err(ArbiterEngineError::MessagerError(
112 "Receiver has been taken! Are you already streaming on this messager?"
113 .to_owned(),
114 ))
115 }
116 };
117 Ok(Box::pin(async_stream::stream! {
118 while let Ok(message) = receiver.recv().await {
119 match &message.to {
120 To::All => {
121 yield message;
122 }
123 To::Agent(id) => {
124 if let Some(self_id) = &self.id {
125 if id == self_id {
126 yield message;
127 }
128 }
129 }
130 }
131 }
132 }))
133 }
134 /// Asynchronously sends a message to a specified recipient.
135 ///
136 /// This method constructs a message with the provided data and sends it to
137 /// the specified recipient. The recipient can either be a single agent
138 /// or all agents, depending on the `to` parameter. The data is
139 /// serialized into a JSON string before being sent.
140 ///
141 /// # Type Parameters
142 ///
143 /// - `T`: The type that can be converted into a recipient specification
144 /// (`To`).
145 /// - `S`: The type of the data being sent. Must implement `Serialize`.
146 ///
147 /// # Parameters
148 ///
149 /// - `to`: The recipient of the message. Can be an individual agent's ID or
150 /// a broadcast to all agents.
151 /// - `data`: The data to be sent in the message. This data is serialized
152 /// into JSON format.
153 pub async fn send<S: Serialize>(&self, to: To, data: S) -> Result<(), ArbiterEngineError> {
154 trace!("Sending message via messager.");
155 if let Some(id) = &self.id {
156 let message = Message {
157 from: id.clone(),
158 to,
159 data: serde_json::to_string(&data)?,
160 };
161 self.broadcast_sender.send(message)?;
162 Ok(())
163 } else {
164 Err(ArbiterEngineError::MessagerError(
165 "Messager has no ID! You must have an ID to send messages!".to_owned(),
166 ))
167 }
168 }
169}