amico_core/
agent.rs

1use tokio::sync::mpsc::{Receiver, Sender, channel};
2
3#[cfg(feature = "wasm")]
4use tokio_with_wasm::alias as tokio;
5
6use crate::{
7    traits::{EventSource, Strategy, System},
8    types::{AgentEvent, Control, EventContent},
9    world::WorldManager,
10};
11
12/// The core event-driven Agent program. Defines the workflow of the agent.
13///
14/// The `Agent` creates and manages an ECS `World`, manages
15/// `AgentEvent`s sent from `EventSource`s, and dispatches them.
16///
17/// ## Type parameters
18///
19/// - `S`: `Strategy` type, representing the Agent's action selection strategy.
20///
21/// ## Compatibility
22///
23/// - WASM: compatible.
24pub struct Agent<S: Strategy> {
25    /// The mpsc channel sender to send agent events to event sources.
26    event_tx: Sender<EventWithTx>,
27
28    /// The mpsc channel receiver to receive agent events from event sources.
29    event_rx: Receiver<EventWithTx>,
30
31    /// The ECS world manager.
32    wm: WorldManager,
33
34    /// The action selection strategy.
35    strategy: S,
36}
37
38impl<S: Strategy> Agent<S> {
39    /// Create a new agent.
40    pub fn new(strategy: S) -> Self {
41        // Create an event channel.
42        // TODO: make the channel size configurable.
43        let (tx, rx) = channel(4);
44
45        // Build the Agnet.
46        Self {
47            event_tx: tx,
48            event_rx: rx,
49            wm: WorldManager::new(),
50            strategy,
51        }
52    }
53
54    /// Spawn an event source for the agent.
55    ///
56    /// ## Spawns
57    ///
58    /// Spawns a new `tokio` thread for the event source.
59    ///
60    /// ## Compatibility
61    ///
62    /// - WASM: compatible with `tokio_with_wasm`
63    pub fn spawn_event_source<E: EventSource + Send + 'static>(
64        &mut self,
65        event_source: E,
66        on_finish: OnFinish,
67    ) {
68        let event_tx = self.event_tx.clone();
69        // Spawn the thread.
70        let jh = event_source.spawn(move |event| {
71            tracing::debug!("On AgentEvent {:?}", event);
72            let event_tx = event_tx.clone();
73
74            async move {
75                let name = event.name;
76
77                // Create a new channel for the reply message.
78                let (tx, mut rx) = channel(1);
79
80                if let Err(err) = event_tx
81                    .send(EventWithTx {
82                        tx: Some(tx),
83                        event,
84                    })
85                    .await
86                {
87                    tracing::warn!("Failed to send AgentEvent {}", err);
88                } else {
89                    tracing::info!("Sent AgentEvent {}", name);
90                }
91
92                // Wait and return the reply message.
93                rx.recv()
94                    .await
95                    .inspect(|reply| {
96                        if let Some(reply) = reply {
97                            tracing::debug!("Received reply message: {:?}", reply);
98                        } else {
99                            tracing::debug!("Received no reply message");
100                        }
101                    })
102                    .unwrap_or_else(|| {
103                        tracing::error!("Failed to receive reply message: channel closed");
104                        None
105                    })
106            }
107        });
108
109        // Wait for the event source to finish and send termination signal if needed.
110        match &on_finish {
111            OnFinish::Stop => {
112                // Spawn a new thread to wait for the event source to finish.
113                let event_tx = self.event_tx.clone();
114                tokio::spawn(async move {
115                    // Wait for the event source to finish.
116                    if let Err(err) = jh.await.unwrap() {
117                        tracing::error!("Event source JoinError: {}", err);
118                        return;
119                    }
120
121                    // Send a termination instruction to signal the main loop to exit
122                    let terminate_event =
123                        AgentEvent::new("Terminate", "spawn_event_source").control(Control::Quit);
124
125                    // Try to send the termination event, but don't panic if it fails
126                    // (channel might already be closed)
127                    if let Err(err) = event_tx
128                        .send(EventWithTx {
129                            tx: None,
130                            event: terminate_event,
131                        })
132                        .await
133                    {
134                        tracing::warn!("Failed to send termination event: {}", err);
135                    }
136                });
137            }
138            OnFinish::Continue => {}
139        }
140    }
141
142    /// Add a system to the agent.
143    pub fn add_system<SS: System>(&mut self, system: SS) {
144        self.wm.add_system(system);
145    }
146
147    /// The function to run the agent.
148    ///
149    /// `run` dispatches `AgentEvent`s into the ECS `World` based on the Agent's strategy.
150    pub async fn run(&mut self) {
151        // Listen for events sent by event sources.
152        while let Some(event_with_tx) = self.event_rx.recv().await {
153            let EventWithTx { tx, event } = event_with_tx;
154            tracing::debug!("Received AgentEvent {:?}", event);
155
156            if let Some(EventContent::Control(control)) = event.content {
157                // Received a control instruction
158                tracing::debug!("Received control instruction {:?}", control);
159                match control {
160                    // TODO: process other instructions
161                    Control::Quit => {
162                        tracing::info!("Terminating event loop due to Quit control instruction");
163                        break; // Exit the event loop immediately
164                    }
165                }
166            } else {
167                // The event is not an instruction, dispatch the event to the `World`.
168                tracing::debug!("Processing event {:?}", event);
169                let reply = self
170                    .strategy
171                    .deliberate(&event, self.wm.action_sender())
172                    .await
173                    .unwrap_or_else(|err| {
174                        // Report the error and return `None` to indicate no reply.
175                        tracing::error!("Error processing event {:?}: {}", event, err);
176                        None
177                    });
178
179                // Send the reply message back to the event source if needed.
180                if let Some(tx) = tx {
181                    if let Err(err) = tx.send(reply).await {
182                        tracing::error!("Failed to send reply message: {}", err);
183                    }
184                }
185            }
186        }
187
188        tracing::info!("Exited event loop.");
189    }
190}
191
192/// The behaviour to choose when event source thread finishes.
193pub enum OnFinish {
194    // Do nothing when the event source thread finishes.
195    Continue,
196
197    // Stop the Agent workflow when the thread finishes.
198    Stop,
199}
200
201/// A struct for the reply message to send back to the event source.
202struct EventWithTx {
203    tx: Option<Sender<Option<String>>>,
204    event: AgentEvent,
205}