amico_core/
agent.rs

1use tokio::sync::mpsc::{Receiver, Sender, channel};
2use tokio_with_wasm::alias as tokio;
3
4use crate::{
5    traits::{EventSource, Strategy, System},
6    types::{AgentEvent, EventContent, Instruction},
7    world::WorldManager,
8};
9
10/// The core event-driven Agent program. Defines the workflow of the agent.
11///
12/// The `Agent` creates and manages an ECS `World`, manages
13/// `AgentEvent`s sent from `EventSource`s, and dispatches them.
14///
15/// ## Type parameters
16///
17/// - `S`: `Strategy` type, representing the Agent's action selection strategy.
18///
19/// ## Compatibility
20///
21/// - WASM: compatible.
22pub struct Agent<S: Strategy> {
23    /// The mpsc channel sender to send agent events to event sources.
24    event_tx: Sender<AgentEvent>,
25
26    /// The mpsc channel receiver to receive agent events from event sources.
27    event_rx: Receiver<AgentEvent>,
28
29    /// The ECS world manager.
30    wm: WorldManager,
31
32    /// The action selection strategy.
33    strategy: S,
34}
35
36impl<S: Strategy> Agent<S> {
37    /// Create a new agent.
38    pub fn new(strategy: S) -> Self {
39        // Create an event channel.
40        // TODO: make the channel size configurable.
41        let (tx, rx) = channel(4);
42
43        // Build the Agnet.
44        Self {
45            event_tx: tx,
46            event_rx: rx,
47            wm: WorldManager::new(),
48            strategy,
49        }
50    }
51
52    /// Spawn an event source for the agent.
53    ///
54    /// ## Spawns
55    ///
56    /// Spawns a new `tokio` thread for the event source.
57    ///
58    /// ## Compatibility
59    ///
60    /// - WASM: compatible with `tokio_with_wasm`
61    pub fn spawn_event_source<E: EventSource + Send + 'static>(
62        &mut self,
63        event_source: E,
64        on_finish: OnFinish,
65    ) {
66        let event_tx = self.event_tx.clone();
67        // Spawn the thread.
68        let jh = event_source.spawn(move |event| {
69            tracing::debug!("On AgentEvent {:?}", event);
70            let tx = event_tx.clone();
71
72            async move {
73                let name = event.name;
74
75                if let Err(err) = tx.send(event).await {
76                    tracing::warn!("Failed to send AgentEvent {}", err);
77                } else {
78                    tracing::info!("Sent AgentEvent {}", name);
79                }
80            }
81        });
82
83        // Wait for the event source to finish and send termination signal if needed.
84        match &on_finish {
85            OnFinish::Stop => {
86                // Spawn a new thread to wait for the event source to finish.
87                let event_tx = self.event_tx.clone();
88                tokio::spawn(async move {
89                    // Wait for the event source to finish.
90                    if let Err(err) = jh.await.unwrap() {
91                        tracing::error!("Event source JoinError: {}", err);
92                        return;
93                    }
94
95                    // Send a termination instruction to signal the main loop to exit
96                    let terminate_event = AgentEvent::new("Terminate", "spawn_event_source")
97                        .instruction(Instruction::Terminate);
98
99                    // Try to send the termination event, but don't panic if it fails
100                    // (channel might already be closed)
101                    if let Err(err) = event_tx.send(terminate_event).await {
102                        tracing::warn!("Failed to send termination event: {}", err);
103                    }
104                });
105            }
106            OnFinish::Continue => {}
107        }
108    }
109
110    /// Add a system to the agent.
111    pub fn add_system<SS: System>(&mut self, system: SS) {
112        self.wm.add_system(system);
113    }
114
115    /// The function to run the agent.
116    ///
117    /// `run` dispatches `AgentEvent`s into the ECS `World` based on the Agent's strategy.
118    pub async fn run(&mut self) {
119        // Listen for events sent by event sources.
120        while let Some(event) = self.event_rx.recv().await {
121            tracing::debug!("Received AgentEvent {:?}", event);
122
123            if let Some(EventContent::Instruction(instruction)) = event.content {
124                // Received an instruction
125                tracing::debug!("Received instruction {:?}", instruction);
126                match instruction {
127                    // TODO: process other instructions
128                    Instruction::Terminate => {
129                        tracing::info!("Terminating event loop due to Terminate instruction");
130                        break; // Exit the event loop immediately
131                    }
132                }
133            } else {
134                // The event is not an instruction, dispatch the event to the `World`.
135                tracing::debug!("Dispatching event {:?}", event);
136                if let Err(err) = self
137                    .strategy
138                    .deliberate(&event, self.wm.action_sender())
139                    .await
140                {
141                    tracing::error!("Error dispatching event {:?}: {}", event, err);
142                }
143            }
144        }
145
146        tracing::info!("Exited event loop.");
147    }
148}
149
150/// The behaviour to choose when event source thread finishes.
151pub enum OnFinish {
152    // Do nothing when the event source thread finishes.
153    Continue,
154
155    // Stop the Agent workflow when the thread finishes.
156    Stop,
157}