Skip to main content

autoagents_core/agent/executor/
event_helper.rs

1use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
2use autoagents_protocol::StreamChunk;
3use autoagents_protocol::{ActorID, Event, SubmissionId};
4use serde_json::Value;
5
6#[cfg(not(target_arch = "wasm32"))]
7use tokio::sync::mpsc;
8
9#[cfg(target_arch = "wasm32")]
10use futures::channel::mpsc;
11
12#[cfg(target_arch = "wasm32")]
13use futures::SinkExt;
14
15/// Helper for managing event emissions
16pub struct EventHelper;
17
18impl EventHelper {
19    /// Send an event if sender is available
20    pub async fn send(tx: &Option<mpsc::Sender<Event>>, event: Event) {
21        if let Some(tx) = tx {
22            #[cfg(not(target_arch = "wasm32"))]
23            //TODO: WASM Targets currently does not support event handling
24            let _ = tx.send(event).await;
25        }
26    }
27
28    /// Send task started event
29    pub async fn send_task_started(
30        tx: &Option<mpsc::Sender<Event>>,
31        sub_id: SubmissionId,
32        actor_id: ActorID,
33        actor_name: String,
34        task_description: String,
35    ) {
36        Self::send(
37            tx,
38            Event::TaskStarted {
39                sub_id,
40                actor_id,
41                actor_name,
42                task_description,
43            },
44        )
45        .await;
46    }
47
48    /// Send task started event
49    pub async fn send_task_completed(
50        tx: &Option<mpsc::Sender<Event>>,
51        sub_id: SubmissionId,
52        actor_id: ActorID,
53        actor_name: String,
54        result: String,
55    ) {
56        Self::send(
57            tx,
58            Event::TaskComplete {
59                sub_id,
60                result,
61                actor_id,
62                actor_name,
63            },
64        )
65        .await;
66    }
67
68    /// Send turn started event
69    pub async fn send_turn_started(
70        tx: &Option<mpsc::Sender<Event>>,
71        sub_id: SubmissionId,
72        actor_id: ActorID,
73        turn_number: usize,
74        max_turns: usize,
75    ) {
76        Self::send(
77            tx,
78            Event::TurnStarted {
79                sub_id,
80                actor_id,
81                turn_number,
82                max_turns,
83            },
84        )
85        .await;
86    }
87
88    /// Send turn completed event
89    pub async fn send_turn_completed(
90        tx: &Option<mpsc::Sender<Event>>,
91        sub_id: SubmissionId,
92        actor_id: ActorID,
93        turn_number: usize,
94        final_turn: bool,
95    ) {
96        Self::send(
97            tx,
98            Event::TurnCompleted {
99                sub_id,
100                actor_id,
101                turn_number,
102                final_turn,
103            },
104        )
105        .await;
106    }
107
108    /// Send stream chunk event
109    pub async fn send_stream_chunk(
110        tx: &Option<mpsc::Sender<Event>>,
111        sub_id: SubmissionId,
112        chunk: LlmStreamChunk,
113    ) {
114        let chunk: StreamChunk = chunk.into();
115        Self::send(tx, Event::StreamChunk { sub_id, chunk }).await;
116    }
117
118    /// Send stream tool call event
119    pub async fn send_stream_tool_call(
120        tx: &Option<mpsc::Sender<Event>>,
121        sub_id: SubmissionId,
122        tool_call: Value,
123    ) {
124        Self::send(tx, Event::StreamToolCall { sub_id, tool_call }).await;
125    }
126
127    /// Send stream complete event
128    pub async fn send_stream_complete(tx: &Option<mpsc::Sender<Event>>, sub_id: SubmissionId) {
129        Self::send(tx, Event::StreamComplete { sub_id }).await;
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use autoagents_llm::chat::StreamChunk as LlmStreamChunk;
137    use autoagents_protocol::StreamChunk as ProtocolStreamChunk;
138
139    #[tokio::test]
140    async fn stream_chunk_is_converted_to_protocol() {
141        let (tx, mut rx) = mpsc::channel::<Event>(1);
142        let tx = Some(tx);
143        let sub_id = SubmissionId::new_v4();
144        let chunk = LlmStreamChunk::Text("hello".to_string());
145
146        let expected: ProtocolStreamChunk = chunk.clone().into();
147        EventHelper::send_stream_chunk(&tx, sub_id, chunk.clone()).await;
148
149        let event = rx.recv().await.expect("event");
150        match event {
151            Event::StreamChunk { sub_id: id, chunk } => {
152                assert_eq!(id, sub_id);
153                let expected_json = serde_json::to_string(&expected).unwrap();
154                let actual_json = serde_json::to_string(&chunk).unwrap();
155                assert_eq!(actual_json, expected_json);
156            }
157            _ => panic!("unexpected event"),
158        }
159    }
160}