Skip to main content

autoagents_core/agent/executor/
event_helper.rs

1use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
2use autoagents_protocol::StreamChunk;
3use autoagents_protocol::{ActorID, Event, SubmissionId};
4use serde_json::Value;
5
6#[cfg(not(target_arch = "wasm32"))]
7use tokio::sync::mpsc;
8
9#[cfg(target_arch = "wasm32")]
10use futures::channel::mpsc;
11
12#[cfg(target_arch = "wasm32")]
13use futures::SinkExt;
14
15/// Helper for managing event emissions
16pub struct EventHelper;
17
18impl EventHelper {
19    /// Send an event if sender is available
20    pub async fn send(tx: &Option<mpsc::Sender<Event>>, event: Event) {
21        if let Some(tx) = tx {
22            #[cfg(not(target_arch = "wasm32"))]
23            //TODO: WASM Targets currently does not support event handling
24            let _ = tx.send(event).await;
25        }
26    }
27
28    /// Send task started event
29    pub async fn send_task_started(
30        tx: &Option<mpsc::Sender<Event>>,
31        sub_id: SubmissionId,
32        actor_id: ActorID,
33        actor_name: String,
34        task_description: String,
35    ) {
36        Self::send(
37            tx,
38            Event::TaskStarted {
39                sub_id,
40                actor_id,
41                actor_name,
42                task_description,
43            },
44        )
45        .await;
46    }
47
48    /// Send task started event
49    pub async fn send_task_completed(
50        tx: &Option<mpsc::Sender<Event>>,
51        sub_id: SubmissionId,
52        actor_id: ActorID,
53        actor_name: String,
54        result: String,
55    ) {
56        Self::send(
57            tx,
58            Event::TaskComplete {
59                sub_id,
60                result,
61                actor_id,
62                actor_name,
63            },
64        )
65        .await;
66    }
67
68    /// Send task completed event with a JSON value result
69    pub async fn send_task_completed_value(
70        tx: &Option<mpsc::Sender<Event>>,
71        sub_id: SubmissionId,
72        actor_id: ActorID,
73        actor_name: String,
74        result: &Value,
75    ) -> Result<(), serde_json::Error> {
76        let result = serde_json::to_string_pretty(result)?;
77        Self::send_task_completed(tx, sub_id, actor_id, actor_name, result).await;
78        Ok(())
79    }
80
81    /// Send task error event
82    pub async fn send_task_error(
83        tx: &Option<mpsc::Sender<Event>>,
84        sub_id: SubmissionId,
85        actor_id: ActorID,
86        error: String,
87    ) {
88        Self::send(
89            tx,
90            Event::TaskError {
91                sub_id,
92                actor_id,
93                error,
94            },
95        )
96        .await;
97    }
98
99    /// Send turn started event
100    pub async fn send_turn_started(
101        tx: &Option<mpsc::Sender<Event>>,
102        sub_id: SubmissionId,
103        actor_id: ActorID,
104        turn_number: usize,
105        max_turns: usize,
106    ) {
107        Self::send(
108            tx,
109            Event::TurnStarted {
110                sub_id,
111                actor_id,
112                turn_number,
113                max_turns,
114            },
115        )
116        .await;
117    }
118
119    /// Send turn completed event
120    pub async fn send_turn_completed(
121        tx: &Option<mpsc::Sender<Event>>,
122        sub_id: SubmissionId,
123        actor_id: ActorID,
124        turn_number: usize,
125        final_turn: bool,
126    ) {
127        Self::send(
128            tx,
129            Event::TurnCompleted {
130                sub_id,
131                actor_id,
132                turn_number,
133                final_turn,
134            },
135        )
136        .await;
137    }
138
139    /// Send stream chunk event
140    pub async fn send_stream_chunk(
141        tx: &Option<mpsc::Sender<Event>>,
142        sub_id: SubmissionId,
143        chunk: LlmStreamChunk,
144    ) {
145        let chunk: StreamChunk = chunk.into();
146        Self::send(tx, Event::StreamChunk { sub_id, chunk }).await;
147    }
148
149    /// Send stream tool call event
150    pub async fn send_stream_tool_call(
151        tx: &Option<mpsc::Sender<Event>>,
152        sub_id: SubmissionId,
153        tool_call: Value,
154    ) {
155        Self::send(tx, Event::StreamToolCall { sub_id, tool_call }).await;
156    }
157
158    /// Send stream complete event
159    pub async fn send_stream_complete(tx: &Option<mpsc::Sender<Event>>, sub_id: SubmissionId) {
160        Self::send(tx, Event::StreamComplete { sub_id }).await;
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
168    use autoagents_protocol::StreamChunk as ProtocolStreamChunk;
169
170    #[tokio::test]
171    async fn stream_chunk_is_converted_to_protocol() {
172        let (tx, mut rx) = mpsc::channel::<Event>(1);
173        let tx = Some(tx);
174        let sub_id = SubmissionId::new_v4();
175        let chunk = LlmStreamChunk::Text("hello".to_string());
176
177        let expected: ProtocolStreamChunk = chunk.clone().into();
178        EventHelper::send_stream_chunk(&tx, sub_id, chunk.clone()).await;
179
180        let event = rx.recv().await.expect("event");
181        match event {
182            Event::StreamChunk { sub_id: id, chunk } => {
183                assert_eq!(id, sub_id);
184                let expected_json = serde_json::to_string(&expected).unwrap();
185                let actual_json = serde_json::to_string(&chunk).unwrap();
186                assert_eq!(actual_json, expected_json);
187            }
188            _ => panic!("unexpected event"),
189        }
190    }
191}