use tokio::sync::mpsc::{Receiver, Sender, channel};
#[cfg(feature = "wasm")]
use tokio_with_wasm::alias as tokio;
use crate::{
traits::{EventSource, Strategy, System},
types::{AgentEvent, Control, EventContent},
world::WorldManager,
};
pub struct Agent<S: Strategy> {
event_tx: Sender<EventWithTx>,
event_rx: Receiver<EventWithTx>,
wm: WorldManager,
strategy: S,
}
impl<S: Strategy> Agent<S> {
pub fn new(strategy: S) -> Self {
let (tx, rx) = channel(4);
Self {
event_tx: tx,
event_rx: rx,
wm: WorldManager::new(),
strategy,
}
}
pub fn spawn_event_source<E: EventSource + Send + 'static>(
&mut self,
event_source: E,
on_finish: OnFinish,
) {
let event_tx = self.event_tx.clone();
let jh = event_source.spawn(move |event| {
tracing::debug!("On AgentEvent {:?}", event);
let event_tx = event_tx.clone();
async move {
let name = event.name;
let (tx, mut rx) = channel(1);
if let Err(err) = event_tx
.send(EventWithTx {
tx: Some(tx),
event,
})
.await
{
tracing::warn!("Failed to send AgentEvent {}", err);
} else {
tracing::info!("Sent AgentEvent {}", name);
}
rx.recv()
.await
.inspect(|reply| {
if let Some(reply) = reply {
tracing::debug!("Received reply message: {:?}", reply);
} else {
tracing::debug!("Received no reply message");
}
})
.unwrap_or_else(|| {
tracing::error!("Failed to receive reply message: channel closed");
None
})
}
});
match &on_finish {
OnFinish::Stop => {
let event_tx = self.event_tx.clone();
tokio::spawn(async move {
if let Err(err) = jh.await.unwrap() {
tracing::error!("Event source JoinError: {}", err);
return;
}
let terminate_event =
AgentEvent::new("Terminate", "spawn_event_source").control(Control::Quit);
if let Err(err) = event_tx
.send(EventWithTx {
tx: None,
event: terminate_event,
})
.await
{
tracing::warn!("Failed to send termination event: {}", err);
}
});
}
OnFinish::Continue => {}
}
}
pub fn add_system<SS: System>(&mut self, system: SS) {
self.wm.add_system(system);
}
pub async fn run(&mut self) {
while let Some(event_with_tx) = self.event_rx.recv().await {
let EventWithTx { tx, event } = event_with_tx;
tracing::debug!("Received AgentEvent {:?}", event);
if let Some(EventContent::Control(control)) = event.content {
tracing::debug!("Received control instruction {:?}", control);
match control {
Control::Quit => {
tracing::info!("Terminating event loop due to Quit control instruction");
break; }
}
} else {
tracing::debug!("Processing event {:?}", event);
let reply = self
.strategy
.deliberate(&event, self.wm.action_sender())
.await
.unwrap_or_else(|err| {
tracing::error!("Error processing event {:?}: {}", event, err);
None
});
if let Some(tx) = tx {
if let Err(err) = tx.send(reply).await {
tracing::error!("Failed to send reply message: {}", err);
}
}
}
}
tracing::info!("Exited event loop.");
}
}
pub enum OnFinish {
Continue,
Stop,
}
struct EventWithTx {
tx: Option<Sender<Option<String>>>,
event: AgentEvent,
}