amico_core/agent.rs
1use tokio::sync::mpsc::{Receiver, Sender, channel};
2use tokio_with_wasm::alias as tokio;
3
4use crate::{
5 traits::{EventSource, Strategy, System},
6 types::{AgentEvent, EventContent, Instruction},
7 world::WorldManager,
8};
9
10/// The core event-driven Agent program. Defines the workflow of the agent.
11///
12/// The `Agent` creates and manages an ECS `World`, manages
13/// `AgentEvent`s sent from `EventSource`s, and dispatches them.
14///
15/// ## Type parameters
16///
17/// - `S`: `Strategy` type, representing the Agent's action selection strategy.
18///
19/// ## Compatibility
20///
21/// - WASM: compatible.
22pub struct Agent<S: Strategy> {
23 /// The mpsc channel sender to send agent events to event sources.
24 event_tx: Sender<AgentEvent>,
25
26 /// The mpsc channel receiver to receive agent events from event sources.
27 event_rx: Receiver<AgentEvent>,
28
29 /// The ECS world manager.
30 wm: WorldManager,
31
32 /// The action selection strategy.
33 strategy: S,
34}
35
36impl<S: Strategy> Agent<S> {
37 /// Create a new agent.
38 pub fn new(strategy: S) -> Self {
39 // Create an event channel.
40 // TODO: make the channel size configurable.
41 let (tx, rx) = channel(4);
42
43 // Build the Agnet.
44 Self {
45 event_tx: tx,
46 event_rx: rx,
47 wm: WorldManager::new(),
48 strategy,
49 }
50 }
51
52 /// Spawn an event source for the agent.
53 ///
54 /// ## Spawns
55 ///
56 /// Spawns a new `tokio` thread for the event source.
57 ///
58 /// ## Compatibility
59 ///
60 /// - WASM: compatible with `tokio_with_wasm`
61 pub fn spawn_event_source<E: EventSource + Send + 'static>(
62 &mut self,
63 event_source: E,
64 on_finish: OnFinish,
65 ) {
66 let event_tx = self.event_tx.clone();
67 // Spawn the thread.
68 let jh = event_source.spawn(move |event| {
69 tracing::debug!("On AgentEvent {:?}", event);
70 let tx = event_tx.clone();
71
72 async move {
73 let name = event.name;
74
75 if let Err(err) = tx.send(event).await {
76 tracing::warn!("Failed to send AgentEvent {}", err);
77 } else {
78 tracing::info!("Sent AgentEvent {}", name);
79 }
80 }
81 });
82
83 // Wait for the event source to finish and send termination signal if needed.
84 match &on_finish {
85 OnFinish::Stop => {
86 // Spawn a new thread to wait for the event source to finish.
87 let event_tx = self.event_tx.clone();
88 tokio::spawn(async move {
89 // Wait for the event source to finish.
90 if let Err(err) = jh.await.unwrap() {
91 tracing::error!("Event source JoinError: {}", err);
92 return;
93 }
94
95 // Send a termination instruction to signal the main loop to exit
96 let terminate_event = AgentEvent::new("Terminate", "spawn_event_source")
97 .instruction(Instruction::Terminate);
98
99 // Try to send the termination event, but don't panic if it fails
100 // (channel might already be closed)
101 if let Err(err) = event_tx.send(terminate_event).await {
102 tracing::warn!("Failed to send termination event: {}", err);
103 }
104 });
105 }
106 OnFinish::Continue => {}
107 }
108 }
109
110 /// Add a system to the agent.
111 pub fn add_system<SS: System>(&mut self, system: SS) {
112 self.wm.add_system(system);
113 }
114
115 /// The function to run the agent.
116 ///
117 /// `run` dispatches `AgentEvent`s into the ECS `World` based on the Agent's strategy.
118 pub async fn run(&mut self) {
119 // Listen for events sent by event sources.
120 while let Some(event) = self.event_rx.recv().await {
121 tracing::debug!("Received AgentEvent {:?}", event);
122
123 if let Some(EventContent::Instruction(instruction)) = event.content {
124 // Received an instruction
125 tracing::debug!("Received instruction {:?}", instruction);
126 match instruction {
127 // TODO: process other instructions
128 Instruction::Terminate => {
129 tracing::info!("Terminating event loop due to Terminate instruction");
130 break; // Exit the event loop immediately
131 }
132 }
133 } else {
134 // The event is not an instruction, dispatch the event to the `World`.
135 tracing::debug!("Dispatching event {:?}", event);
136 if let Err(err) = self
137 .strategy
138 .deliberate(&event, self.wm.action_sender())
139 .await
140 {
141 tracing::error!("Error dispatching event {:?}: {}", event, err);
142 }
143 }
144 }
145
146 tracing::info!("Exited event loop.");
147 }
148}
149
150/// The behaviour to choose when event source thread finishes.
151pub enum OnFinish {
152 // Do nothing when the event source thread finishes.
153 Continue,
154
155 // Stop the Agent workflow when the thread finishes.
156 Stop,
157}