autoagents_core/agent/executor/
event_helper.rs1use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
2use autoagents_protocol::StreamChunk;
3use autoagents_protocol::{ActorID, Event, SubmissionId};
4use serde_json::Value;
5
6#[cfg(not(target_arch = "wasm32"))]
7use tokio::sync::mpsc;
8
9#[cfg(target_arch = "wasm32")]
10use futures::channel::mpsc;
11
12#[cfg(target_arch = "wasm32")]
13use futures::SinkExt;
14
15pub struct EventHelper;
17
18impl EventHelper {
19 pub async fn send(tx: &Option<mpsc::Sender<Event>>, event: Event) {
21 if let Some(tx) = tx {
22 #[cfg(not(target_arch = "wasm32"))]
23 let _ = tx.send(event).await;
25 }
26 }
27
28 pub async fn send_task_started(
30 tx: &Option<mpsc::Sender<Event>>,
31 sub_id: SubmissionId,
32 actor_id: ActorID,
33 actor_name: String,
34 task_description: String,
35 ) {
36 Self::send(
37 tx,
38 Event::TaskStarted {
39 sub_id,
40 actor_id,
41 actor_name,
42 task_description,
43 },
44 )
45 .await;
46 }
47
48 pub async fn send_task_completed(
50 tx: &Option<mpsc::Sender<Event>>,
51 sub_id: SubmissionId,
52 actor_id: ActorID,
53 actor_name: String,
54 result: String,
55 ) {
56 Self::send(
57 tx,
58 Event::TaskComplete {
59 sub_id,
60 result,
61 actor_id,
62 actor_name,
63 },
64 )
65 .await;
66 }
67
68 pub async fn send_turn_started(
70 tx: &Option<mpsc::Sender<Event>>,
71 sub_id: SubmissionId,
72 actor_id: ActorID,
73 turn_number: usize,
74 max_turns: usize,
75 ) {
76 Self::send(
77 tx,
78 Event::TurnStarted {
79 sub_id,
80 actor_id,
81 turn_number,
82 max_turns,
83 },
84 )
85 .await;
86 }
87
88 pub async fn send_turn_completed(
90 tx: &Option<mpsc::Sender<Event>>,
91 sub_id: SubmissionId,
92 actor_id: ActorID,
93 turn_number: usize,
94 final_turn: bool,
95 ) {
96 Self::send(
97 tx,
98 Event::TurnCompleted {
99 sub_id,
100 actor_id,
101 turn_number,
102 final_turn,
103 },
104 )
105 .await;
106 }
107
108 pub async fn send_stream_chunk(
110 tx: &Option<mpsc::Sender<Event>>,
111 sub_id: SubmissionId,
112 chunk: LlmStreamChunk,
113 ) {
114 let chunk: StreamChunk = chunk.into();
115 Self::send(tx, Event::StreamChunk { sub_id, chunk }).await;
116 }
117
118 pub async fn send_stream_tool_call(
120 tx: &Option<mpsc::Sender<Event>>,
121 sub_id: SubmissionId,
122 tool_call: Value,
123 ) {
124 Self::send(tx, Event::StreamToolCall { sub_id, tool_call }).await;
125 }
126
127 pub async fn send_stream_complete(tx: &Option<mpsc::Sender<Event>>, sub_id: SubmissionId) {
129 Self::send(tx, Event::StreamComplete { sub_id }).await;
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
137 use autoagents_protocol::StreamChunk as ProtocolStreamChunk;
138
139 #[tokio::test]
140 async fn stream_chunk_is_converted_to_protocol() {
141 let (tx, mut rx) = mpsc::channel::<Event>(1);
142 let tx = Some(tx);
143 let sub_id = SubmissionId::new_v4();
144 let chunk = LlmStreamChunk::Text("hello".to_string());
145
146 let expected: ProtocolStreamChunk = chunk.clone().into();
147 EventHelper::send_stream_chunk(&tx, sub_id, chunk.clone()).await;
148
149 let event = rx.recv().await.expect("event");
150 match event {
151 Event::StreamChunk { sub_id: id, chunk } => {
152 assert_eq!(id, sub_id);
153 let expected_json = serde_json::to_string(&expected).unwrap();
154 let actual_json = serde_json::to_string(&chunk).unwrap();
155 assert_eq!(actual_json, expected_json);
156 }
157 _ => panic!("unexpected event"),
158 }
159 }
160}