1#![allow(unused)]
2
3use ag_ui_core::event::*;
4use ag_ui_core::types::input::RunAgentInput;
5use ag_ui_core::types::message::Message;
6use ag_ui_core::types::tool::ToolCall;
7use ag_ui_core::{AgentState, FwdProps};
8use serde_json::Value as JsonValue;
9use std::collections::HashMap;
10use std::slice::Iter;
11use std::sync::Arc;
12
13use crate::agent::{AgentError, AgentStateMutation};
14
15pub struct AgentSubscriberParams<'a, StateT: AgentState, FwdPropsT: FwdProps> {
16 pub messages: &'a [Message],
17 pub state: &'a StateT,
18 pub input: &'a RunAgentInput<StateT, FwdPropsT>,
19}
20
21#[async_trait::async_trait]
23pub trait AgentSubscriber<StateT = JsonValue, FwdPropsT = JsonValue>: Send + Sync
24where
25 StateT: AgentState,
26 FwdPropsT: FwdProps,
27{
28 async fn on_run_initialized(
30 &self,
31 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
32 ) -> Result<AgentStateMutation<StateT>, AgentError> {
33 Ok(AgentStateMutation::default())
34 }
35
36 async fn on_run_failed(
37 &self,
38 error: &AgentError,
39 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
40 ) -> Result<AgentStateMutation<StateT>, AgentError> {
41 Ok(AgentStateMutation::default())
42 }
43
44 async fn on_run_finalized(
45 &self,
46 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
47 ) -> Result<AgentStateMutation<StateT>, AgentError> {
48 Ok(AgentStateMutation::default())
49 }
50
51 async fn on_event(
53 &self,
54 event: &Event<StateT>,
55 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
56 ) -> Result<AgentStateMutation<StateT>, AgentError> {
57 Ok(AgentStateMutation::default())
58 }
59
60 async fn on_run_started_event(
61 &self,
62 event: &RunStartedEvent,
63 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
64 ) -> Result<AgentStateMutation<StateT>, AgentError> {
65 Ok(AgentStateMutation::default())
66 }
67
68 async fn on_run_finished_event(
69 &self,
70 event: &RunFinishedEvent,
71 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
72 ) -> Result<AgentStateMutation<StateT>, AgentError> {
73 Ok(AgentStateMutation::default())
74 }
75
76 async fn on_run_error_event(
77 &self,
78 event: &RunErrorEvent,
79 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
80 ) -> Result<AgentStateMutation<StateT>, AgentError> {
81 Ok(AgentStateMutation::default())
82 }
83
84 async fn on_step_started_event(
85 &self,
86 event: &StepStartedEvent,
87 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
88 ) -> Result<AgentStateMutation<StateT>, AgentError> {
89 Ok(AgentStateMutation::default())
90 }
91
92 async fn on_step_finished_event(
93 &self,
94 event: &StepFinishedEvent,
95 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
96 ) -> Result<AgentStateMutation<StateT>, AgentError> {
97 Ok(AgentStateMutation::default())
98 }
99
100 async fn on_text_message_start_event(
101 &self,
102 event: &TextMessageStartEvent,
103 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
104 ) -> Result<AgentStateMutation<StateT>, AgentError> {
105 Ok(AgentStateMutation::default())
106 }
107
108 async fn on_text_message_content_event(
109 &self,
110 event: &TextMessageContentEvent,
111 _text_message_buffer: &str,
112 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
113 ) -> Result<AgentStateMutation<StateT>, AgentError> {
114 Ok(AgentStateMutation::default())
115 }
116
117 async fn on_text_message_end_event(
118 &self,
119 event: &TextMessageEndEvent,
120 _text_message_buffer: &str,
121 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
122 ) -> Result<AgentStateMutation<StateT>, AgentError> {
123 Ok(AgentStateMutation::default())
124 }
125
126 async fn on_tool_call_start_event(
127 &self,
128 event: &ToolCallStartEvent,
129 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
130 ) -> Result<AgentStateMutation<StateT>, AgentError> {
131 Ok(AgentStateMutation::default())
132 }
133
134 async fn on_tool_call_args_event(
135 &self,
136 event: &ToolCallArgsEvent,
137 _tool_call_buffer: &str,
138 tool_call_name: &str,
139 _partial_tool_call_args: &HashMap<String, JsonValue>,
140 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
141 ) -> Result<AgentStateMutation<StateT>, AgentError> {
142 Ok(AgentStateMutation::default())
143 }
144
145 async fn on_tool_call_end_event(
146 &self,
147 event: &ToolCallEndEvent,
148 tool_call_name: &str,
149 _tool_call_args: &HashMap<String, JsonValue>,
150 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
151 ) -> Result<AgentStateMutation<StateT>, AgentError> {
152 Ok(AgentStateMutation::default())
153 }
154
155 async fn on_tool_call_result_event(
156 &self,
157 event: &ToolCallResultEvent,
158 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
159 ) -> Result<AgentStateMutation<StateT>, AgentError> {
160 Ok(AgentStateMutation::default())
161 }
162
163 async fn on_state_snapshot_event(
164 &self,
165 event: &StateSnapshotEvent<StateT>,
166 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
167 ) -> Result<AgentStateMutation<StateT>, AgentError> {
168 Ok(AgentStateMutation::default())
169 }
170
171 async fn on_state_delta_event(
172 &self,
173 event: &StateDeltaEvent,
174 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
175 ) -> Result<AgentStateMutation<StateT>, AgentError> {
176 Ok(AgentStateMutation::default())
177 }
178
179 async fn on_messages_snapshot_event(
180 &self,
181 event: &MessagesSnapshotEvent,
182 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
183 ) -> Result<AgentStateMutation<StateT>, AgentError> {
184 Ok(AgentStateMutation::default())
185 }
186
187 async fn on_raw_event(
188 &self,
189 event: &RawEvent,
190 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
191 ) -> Result<AgentStateMutation<StateT>, AgentError> {
192 Ok(AgentStateMutation::default())
193 }
194
195 async fn on_custom_event(
196 &self,
197 event: &CustomEvent,
198 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
199 ) -> Result<AgentStateMutation<StateT>, AgentError> {
200 Ok(AgentStateMutation::default())
201 }
202
203 async fn on_text_message_chunk_event(
204 &self,
205 event: &TextMessageChunkEvent,
206 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
207 ) -> Result<AgentStateMutation<StateT>, AgentError> {
208 Ok(AgentStateMutation::default())
209 }
210
211 async fn on_thinking_text_message_start_event(
212 &self,
213 event: &ThinkingTextMessageStartEvent,
214 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
215 ) -> Result<AgentStateMutation<StateT>, AgentError> {
216 Ok(AgentStateMutation::default())
217 }
218
219 async fn on_thinking_text_message_content_event(
220 &self,
221 event: &ThinkingTextMessageContentEvent,
222 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
223 ) -> Result<AgentStateMutation<StateT>, AgentError> {
224 Ok(AgentStateMutation::default())
225 }
226
227 async fn on_thinking_text_message_end_event(
228 &self,
229 event: &ThinkingTextMessageEndEvent,
230 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
231 ) -> Result<AgentStateMutation<StateT>, AgentError> {
232 Ok(AgentStateMutation::default())
233 }
234
235 async fn on_tool_call_chunk_event(
236 &self,
237 event: &ToolCallChunkEvent,
238 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
239 ) -> Result<AgentStateMutation<StateT>, AgentError> {
240 Ok(AgentStateMutation::default())
241 }
242
243 async fn on_thinking_start_event(
244 &self,
245 event: &ThinkingStartEvent,
246 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
247 ) -> Result<AgentStateMutation<StateT>, AgentError> {
248 Ok(AgentStateMutation::default())
249 }
250
251 async fn on_thinking_end_event(
252 &self,
253 event: &ThinkingEndEvent,
254 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
255 ) -> Result<AgentStateMutation<StateT>, AgentError> {
256 Ok(AgentStateMutation::default())
257 }
258
259 async fn on_messages_changed(
261 &self,
262 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
263 ) -> Result<(), AgentError> {
264 Ok(())
265 }
266
267 async fn on_state_changed(
268 &self,
269 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
270 ) -> Result<(), AgentError> {
271 Ok(())
272 }
273
274 async fn on_new_message(
275 &self,
276 message: &Message,
277 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
278 ) -> Result<(), AgentError> {
279 Ok(())
280 }
281
282 async fn on_new_tool_call(
283 &self,
284 tool_call: &ToolCall,
285 params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
286 ) -> Result<(), AgentError> {
287 Ok(())
288 }
289}
290
291#[derive(Clone)]
319pub struct Subscribers<StateT: AgentState = JsonValue, FwdPropsT: FwdProps = JsonValue> {
320 subs: Vec<Arc<dyn AgentSubscriber<StateT, FwdPropsT>>>,
321}
322
323impl<StateT: AgentState, FwdPropsT: FwdProps> Subscribers<StateT, FwdPropsT> {
324 pub fn new(subscribers: Vec<Arc<dyn AgentSubscriber<StateT, FwdPropsT>>>) -> Self {
325 Self { subs: subscribers }
326 }
327
328 pub fn from_subscriber<T>(subscriber: T) -> Self
330 where
331 T: AgentSubscriber<StateT, FwdPropsT> + 'static,
332 {
333 Self::new(vec![Arc::new(subscriber)])
334 }
335}
336
337impl<StateT, FwdPropsT, T> FromIterator<T> for Subscribers<StateT, FwdPropsT>
338where
339 StateT: AgentState,
340 FwdPropsT: FwdProps,
341 T: AgentSubscriber<StateT, FwdPropsT> + 'static,
342{
343 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
344 Self::new(
345 iter.into_iter()
346 .map(|s| Arc::new(s) as Arc<dyn AgentSubscriber<StateT, FwdPropsT>>)
347 .collect(),
348 )
349 }
350}
351
352impl<'a, StateT, FwdPropsT> IntoIterator for &'a Subscribers<StateT, FwdPropsT>
353where
354 StateT: AgentState,
355 FwdPropsT: FwdProps,
356{
357 type Item = &'a Arc<dyn AgentSubscriber<StateT, FwdPropsT>>;
358 type IntoIter = Iter<'a, Arc<dyn AgentSubscriber<StateT, FwdPropsT>>>;
359
360 fn into_iter(self) -> Self::IntoIter {
361 self.subs.iter()
362 }
363}
364
365pub trait IntoSubscribers<StateT: AgentState, FwdPropsT: FwdProps>: Send {
368 fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT>;
369}
370
371impl<StateT, FwdPropsT> IntoSubscribers<StateT, FwdPropsT> for Subscribers<StateT, FwdPropsT>
373where
374 StateT: AgentState,
375 FwdPropsT: FwdProps,
376{
377 fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
378 self
379 }
380}
381
382impl<StateT, FwdPropsT, T> IntoSubscribers<StateT, FwdPropsT> for (T,)
384where
385 StateT: AgentState,
386 FwdPropsT: FwdProps,
387 T: AgentSubscriber<StateT, FwdPropsT> + 'static,
388{
389 fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
390 Subscribers::from_subscriber(self.0)
391 }
392}
393
394impl<StateT, FwdPropsT, T> IntoSubscribers<StateT, FwdPropsT> for Vec<T>
396where
397 StateT: AgentState,
398 FwdPropsT: FwdProps,
399 T: AgentSubscriber<StateT, FwdPropsT> + 'static,
400{
401 fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
402 Subscribers::from_iter(self)
403 }
404}
405
406impl<StateT, FwdPropsT, T, const N: usize> IntoSubscribers<StateT, FwdPropsT> for [T; N]
408where
409 StateT: AgentState,
410 FwdPropsT: FwdProps,
411 T: AgentSubscriber<StateT, FwdPropsT> + 'static,
412{
413 fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
414 Subscribers::from_iter(self)
415 }
416}
417
418impl<StateT: AgentState, FwdPropsT: FwdProps> IntoSubscribers<StateT, FwdPropsT> for () {
420 fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
421 Subscribers::new(vec![])
422 }
423}