amico_core/agent.rs
1use tokio::sync::mpsc::{Receiver, Sender, channel};
2
3#[cfg(feature = "wasm")]
4use tokio_with_wasm::alias as tokio;
5
6use crate::{
7 traits::{EventSource, Strategy, System},
8 types::{AgentEvent, Control, EventContent},
9 world::WorldManager,
10};
11
12/// The core event-driven Agent program. Defines the workflow of the agent.
13///
14/// The `Agent` creates and manages an ECS `World`, manages
15/// `AgentEvent`s sent from `EventSource`s, and dispatches them.
16///
17/// ## Type parameters
18///
19/// - `S`: `Strategy` type, representing the Agent's action selection strategy.
20///
21/// ## Compatibility
22///
23/// - WASM: compatible.
24pub struct Agent<S: Strategy> {
25 /// The mpsc channel sender to send agent events to event sources.
26 event_tx: Sender<EventWithTx>,
27
28 /// The mpsc channel receiver to receive agent events from event sources.
29 event_rx: Receiver<EventWithTx>,
30
31 /// The ECS world manager.
32 wm: WorldManager,
33
34 /// The action selection strategy.
35 strategy: S,
36}
37
38impl<S: Strategy> Agent<S> {
39 /// Create a new agent.
40 pub fn new(strategy: S) -> Self {
41 // Create an event channel.
42 // TODO: make the channel size configurable.
43 let (tx, rx) = channel(4);
44
45 // Build the Agnet.
46 Self {
47 event_tx: tx,
48 event_rx: rx,
49 wm: WorldManager::new(),
50 strategy,
51 }
52 }
53
54 /// Spawn an event source for the agent.
55 ///
56 /// ## Spawns
57 ///
58 /// Spawns a new `tokio` thread for the event source.
59 ///
60 /// ## Compatibility
61 ///
62 /// - WASM: compatible with `tokio_with_wasm`
63 pub fn spawn_event_source<E: EventSource + Send + 'static>(
64 &mut self,
65 event_source: E,
66 on_finish: OnFinish,
67 ) {
68 let event_tx = self.event_tx.clone();
69 // Spawn the thread.
70 let jh = event_source.spawn(move |event| {
71 tracing::debug!("On AgentEvent {:?}", event);
72 let event_tx = event_tx.clone();
73
74 async move {
75 let name = event.name;
76
77 // Create a new channel for the reply message.
78 let (tx, mut rx) = channel(1);
79
80 if let Err(err) = event_tx
81 .send(EventWithTx {
82 tx: Some(tx),
83 event,
84 })
85 .await
86 {
87 tracing::warn!("Failed to send AgentEvent {}", err);
88 } else {
89 tracing::info!("Sent AgentEvent {}", name);
90 }
91
92 // Wait and return the reply message.
93 rx.recv()
94 .await
95 .inspect(|reply| {
96 if let Some(reply) = reply {
97 tracing::debug!("Received reply message: {:?}", reply);
98 } else {
99 tracing::debug!("Received no reply message");
100 }
101 })
102 .unwrap_or_else(|| {
103 tracing::error!("Failed to receive reply message: channel closed");
104 None
105 })
106 }
107 });
108
109 // Wait for the event source to finish and send termination signal if needed.
110 match &on_finish {
111 OnFinish::Stop => {
112 // Spawn a new thread to wait for the event source to finish.
113 let event_tx = self.event_tx.clone();
114 tokio::spawn(async move {
115 // Wait for the event source to finish.
116 if let Err(err) = jh.await.unwrap() {
117 tracing::error!("Event source JoinError: {}", err);
118 return;
119 }
120
121 // Send a termination instruction to signal the main loop to exit
122 let terminate_event =
123 AgentEvent::new("Terminate", "spawn_event_source").control(Control::Quit);
124
125 // Try to send the termination event, but don't panic if it fails
126 // (channel might already be closed)
127 if let Err(err) = event_tx
128 .send(EventWithTx {
129 tx: None,
130 event: terminate_event,
131 })
132 .await
133 {
134 tracing::warn!("Failed to send termination event: {}", err);
135 }
136 });
137 }
138 OnFinish::Continue => {}
139 }
140 }
141
142 /// Add a system to the agent.
143 pub fn add_system<SS: System>(&mut self, system: SS) {
144 self.wm.add_system(system);
145 }
146
147 /// The function to run the agent.
148 ///
149 /// `run` dispatches `AgentEvent`s into the ECS `World` based on the Agent's strategy.
150 pub async fn run(&mut self) {
151 // Listen for events sent by event sources.
152 while let Some(event_with_tx) = self.event_rx.recv().await {
153 let EventWithTx { tx, event } = event_with_tx;
154 tracing::debug!("Received AgentEvent {:?}", event);
155
156 if let Some(EventContent::Control(control)) = event.content {
157 // Received a control instruction
158 tracing::debug!("Received control instruction {:?}", control);
159 match control {
160 // TODO: process other instructions
161 Control::Quit => {
162 tracing::info!("Terminating event loop due to Quit control instruction");
163 break; // Exit the event loop immediately
164 }
165 }
166 } else {
167 // The event is not an instruction, dispatch the event to the `World`.
168 tracing::debug!("Processing event {:?}", event);
169 let reply = self
170 .strategy
171 .deliberate(&event, self.wm.action_sender())
172 .await
173 .unwrap_or_else(|err| {
174 // Report the error and return `None` to indicate no reply.
175 tracing::error!("Error processing event {:?}: {}", event, err);
176 None
177 });
178
179 // Send the reply message back to the event source if needed.
180 if let Some(tx) = tx {
181 if let Err(err) = tx.send(reply).await {
182 tracing::error!("Failed to send reply message: {}", err);
183 }
184 }
185 }
186 }
187
188 tracing::info!("Exited event loop.");
189 }
190}
191
192/// The behaviour to choose when event source thread finishes.
193pub enum OnFinish {
194 // Do nothing when the event source thread finishes.
195 Continue,
196
197 // Stop the Agent workflow when the thread finishes.
198 Stop,
199}
200
201/// A struct for the reply message to send back to the event source.
202struct EventWithTx {
203 tx: Option<Sender<Option<String>>>,
204 event: AgentEvent,
205}