use tokio::sync::broadcast::{channel, Receiver, Sender};
use super::*;
use crate::machine::EventStream;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Message {
pub from: String,
pub to: To,
pub data: String,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum To {
All,
Agent(String),
}
#[derive(Debug)]
pub struct Messager {
pub id: Option<String>,
pub(crate) broadcast_sender: Sender<Message>,
broadcast_receiver: Option<Receiver<Message>>,
}
impl Clone for Messager {
fn clone(&self) -> Self {
Self {
broadcast_sender: self.broadcast_sender.clone(),
broadcast_receiver: Some(self.broadcast_sender.subscribe()),
id: self.id.clone(),
}
}
}
impl Messager {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let (broadcast_sender, broadcast_receiver) = channel(512);
Self {
broadcast_sender,
broadcast_receiver: Some(broadcast_receiver),
id: None,
}
}
pub fn for_agent(&self, id: &str) -> Self {
Self {
broadcast_sender: self.broadcast_sender.clone(),
broadcast_receiver: Some(self.broadcast_sender.subscribe()),
id: Some(id.to_owned()),
}
}
pub async fn get_next(&mut self) -> Result<Message, ArbiterEngineError> {
let mut receiver = match self.broadcast_receiver.take() {
Some(receiver) => receiver,
None => {
return Err(ArbiterEngineError::MessagerError(
"Receiver has been taken! Are you already streaming on this messager?"
.to_owned(),
))
}
};
while let Ok(message) = receiver.recv().await {
match &message.to {
To::All => {
return Ok(message);
}
To::Agent(id) => {
if let Some(self_id) = &self.id {
if id == self_id {
return Ok(message);
}
}
continue;
}
}
}
unreachable!()
}
pub fn stream(mut self) -> Result<EventStream<Message>, ArbiterEngineError> {
let mut receiver = match self.broadcast_receiver.take() {
Some(receiver) => receiver,
None => {
return Err(ArbiterEngineError::MessagerError(
"Receiver has been taken! Are you already streaming on this messager?"
.to_owned(),
))
}
};
Ok(Box::pin(async_stream::stream! {
while let Ok(message) = receiver.recv().await {
match &message.to {
To::All => {
yield message;
}
To::Agent(id) => {
if let Some(self_id) = &self.id {
if id == self_id {
yield message;
}
}
}
}
}
}))
}
pub async fn send<S: Serialize>(&self, to: To, data: S) -> Result<(), ArbiterEngineError> {
trace!("Sending message via messager.");
if let Some(id) = &self.id {
let message = Message {
from: id.clone(),
to,
data: serde_json::to_string(&data)?,
};
self.broadcast_sender.send(message)?;
Ok(())
} else {
Err(ArbiterEngineError::MessagerError(
"Messager has no ID! You must have an ID to send messages!".to_owned(),
))
}
}
}