autoagents_core/agent/executor/
event_helper.rs1use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
2use autoagents_protocol::StreamChunk;
3use autoagents_protocol::{ActorID, Event, SubmissionId};
4use serde_json::Value;
5
6#[cfg(not(target_arch = "wasm32"))]
7use tokio::sync::mpsc;
8
9#[cfg(target_arch = "wasm32")]
10use futures::channel::mpsc;
11
12#[cfg(target_arch = "wasm32")]
13use futures::SinkExt;
14
15pub struct EventHelper;
17
18impl EventHelper {
19 pub async fn send(tx: &Option<mpsc::Sender<Event>>, event: Event) {
21 if let Some(tx) = tx {
22 #[cfg(not(target_arch = "wasm32"))]
23 let _ = tx.send(event).await;
25 }
26 }
27
28 pub async fn send_task_started(
30 tx: &Option<mpsc::Sender<Event>>,
31 sub_id: SubmissionId,
32 actor_id: ActorID,
33 actor_name: String,
34 task_description: String,
35 ) {
36 Self::send(
37 tx,
38 Event::TaskStarted {
39 sub_id,
40 actor_id,
41 actor_name,
42 task_description,
43 },
44 )
45 .await;
46 }
47
48 pub async fn send_task_completed(
50 tx: &Option<mpsc::Sender<Event>>,
51 sub_id: SubmissionId,
52 actor_id: ActorID,
53 actor_name: String,
54 result: String,
55 ) {
56 Self::send(
57 tx,
58 Event::TaskComplete {
59 sub_id,
60 result,
61 actor_id,
62 actor_name,
63 },
64 )
65 .await;
66 }
67
68 pub async fn send_task_completed_value(
70 tx: &Option<mpsc::Sender<Event>>,
71 sub_id: SubmissionId,
72 actor_id: ActorID,
73 actor_name: String,
74 result: &Value,
75 ) -> Result<(), serde_json::Error> {
76 let result = serde_json::to_string_pretty(result)?;
77 Self::send_task_completed(tx, sub_id, actor_id, actor_name, result).await;
78 Ok(())
79 }
80
81 pub async fn send_task_error(
83 tx: &Option<mpsc::Sender<Event>>,
84 sub_id: SubmissionId,
85 actor_id: ActorID,
86 error: String,
87 ) {
88 Self::send(
89 tx,
90 Event::TaskError {
91 sub_id,
92 actor_id,
93 error,
94 },
95 )
96 .await;
97 }
98
99 pub async fn send_turn_started(
101 tx: &Option<mpsc::Sender<Event>>,
102 sub_id: SubmissionId,
103 actor_id: ActorID,
104 turn_number: usize,
105 max_turns: usize,
106 ) {
107 Self::send(
108 tx,
109 Event::TurnStarted {
110 sub_id,
111 actor_id,
112 turn_number,
113 max_turns,
114 },
115 )
116 .await;
117 }
118
119 pub async fn send_turn_completed(
121 tx: &Option<mpsc::Sender<Event>>,
122 sub_id: SubmissionId,
123 actor_id: ActorID,
124 turn_number: usize,
125 final_turn: bool,
126 ) {
127 Self::send(
128 tx,
129 Event::TurnCompleted {
130 sub_id,
131 actor_id,
132 turn_number,
133 final_turn,
134 },
135 )
136 .await;
137 }
138
139 pub async fn send_stream_chunk(
141 tx: &Option<mpsc::Sender<Event>>,
142 sub_id: SubmissionId,
143 chunk: LlmStreamChunk,
144 ) {
145 let chunk: StreamChunk = chunk.into();
146 Self::send(tx, Event::StreamChunk { sub_id, chunk }).await;
147 }
148
149 pub async fn send_stream_tool_call(
151 tx: &Option<mpsc::Sender<Event>>,
152 sub_id: SubmissionId,
153 tool_call: Value,
154 ) {
155 Self::send(tx, Event::StreamToolCall { sub_id, tool_call }).await;
156 }
157
158 pub async fn send_stream_complete(tx: &Option<mpsc::Sender<Event>>, sub_id: SubmissionId) {
160 Self::send(tx, Event::StreamComplete { sub_id }).await;
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
168 use autoagents_protocol::StreamChunk as ProtocolStreamChunk;
169
170 #[tokio::test]
171 async fn stream_chunk_is_converted_to_protocol() {
172 let (tx, mut rx) = mpsc::channel::<Event>(1);
173 let tx = Some(tx);
174 let sub_id = SubmissionId::new_v4();
175 let chunk = LlmStreamChunk::Text("hello".to_string());
176
177 let expected: ProtocolStreamChunk = chunk.clone().into();
178 EventHelper::send_stream_chunk(&tx, sub_id, chunk.clone()).await;
179
180 let event = rx.recv().await.expect("event");
181 match event {
182 Event::StreamChunk { sub_id: id, chunk } => {
183 assert_eq!(id, sub_id);
184 let expected_json = serde_json::to_string(&expected).unwrap();
185 let actual_json = serde_json::to_string(&chunk).unwrap();
186 assert_eq!(actual_json, expected_json);
187 }
188 _ => panic!("unexpected event"),
189 }
190 }
191}