ag_ui_client/
subscriber.rs

1#![allow(unused)]
2
3use ag_ui_core::event::*;
4use ag_ui_core::types::input::RunAgentInput;
5use ag_ui_core::types::message::Message;
6use ag_ui_core::types::tool::ToolCall;
7use ag_ui_core::{AgentState, FwdProps};
8use serde_json::Value as JsonValue;
9use std::collections::HashMap;
10use std::slice::Iter;
11use std::sync::Arc;
12
13use crate::agent::{AgentError, AgentStateMutation};
14
15pub struct AgentSubscriberParams<'a, StateT: AgentState, FwdPropsT: FwdProps> {
16    pub messages: &'a [Message],
17    pub state: &'a StateT,
18    pub input: &'a RunAgentInput<StateT, FwdPropsT>,
19}
20
21/// Subscriber trait for handling agent events
22#[async_trait::async_trait]
23pub trait AgentSubscriber<StateT = JsonValue, FwdPropsT = JsonValue>: Send + Sync
24where
25    StateT: AgentState,
26    FwdPropsT: FwdProps,
27{
28    // Request lifecycle
29    async fn on_run_initialized(
30        &self,
31        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
32    ) -> Result<AgentStateMutation<StateT>, AgentError> {
33        Ok(AgentStateMutation::default())
34    }
35
36    async fn on_run_failed(
37        &self,
38        error: &AgentError,
39        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
40    ) -> Result<AgentStateMutation<StateT>, AgentError> {
41        Ok(AgentStateMutation::default())
42    }
43
44    async fn on_run_finalized(
45        &self,
46        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
47    ) -> Result<AgentStateMutation<StateT>, AgentError> {
48        Ok(AgentStateMutation::default())
49    }
50
51    // Events
52    async fn on_event(
53        &self,
54        event: &Event<StateT>,
55        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
56    ) -> Result<AgentStateMutation<StateT>, AgentError> {
57        Ok(AgentStateMutation::default())
58    }
59
60    async fn on_run_started_event(
61        &self,
62        event: &RunStartedEvent,
63        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
64    ) -> Result<AgentStateMutation<StateT>, AgentError> {
65        Ok(AgentStateMutation::default())
66    }
67
68    async fn on_run_finished_event(
69        &self,
70        event: &RunFinishedEvent,
71        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
72    ) -> Result<AgentStateMutation<StateT>, AgentError> {
73        Ok(AgentStateMutation::default())
74    }
75
76    async fn on_run_error_event(
77        &self,
78        event: &RunErrorEvent,
79        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
80    ) -> Result<AgentStateMutation<StateT>, AgentError> {
81        Ok(AgentStateMutation::default())
82    }
83
84    async fn on_step_started_event(
85        &self,
86        event: &StepStartedEvent,
87        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
88    ) -> Result<AgentStateMutation<StateT>, AgentError> {
89        Ok(AgentStateMutation::default())
90    }
91
92    async fn on_step_finished_event(
93        &self,
94        event: &StepFinishedEvent,
95        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
96    ) -> Result<AgentStateMutation<StateT>, AgentError> {
97        Ok(AgentStateMutation::default())
98    }
99
100    async fn on_text_message_start_event(
101        &self,
102        event: &TextMessageStartEvent,
103        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
104    ) -> Result<AgentStateMutation<StateT>, AgentError> {
105        Ok(AgentStateMutation::default())
106    }
107
108    async fn on_text_message_content_event(
109        &self,
110        event: &TextMessageContentEvent,
111        _text_message_buffer: &str,
112        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
113    ) -> Result<AgentStateMutation<StateT>, AgentError> {
114        Ok(AgentStateMutation::default())
115    }
116
117    async fn on_text_message_end_event(
118        &self,
119        event: &TextMessageEndEvent,
120        _text_message_buffer: &str,
121        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
122    ) -> Result<AgentStateMutation<StateT>, AgentError> {
123        Ok(AgentStateMutation::default())
124    }
125
126    async fn on_tool_call_start_event(
127        &self,
128        event: &ToolCallStartEvent,
129        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
130    ) -> Result<AgentStateMutation<StateT>, AgentError> {
131        Ok(AgentStateMutation::default())
132    }
133
134    async fn on_tool_call_args_event(
135        &self,
136        event: &ToolCallArgsEvent,
137        _tool_call_buffer: &str,
138        tool_call_name: &str,
139        _partial_tool_call_args: &HashMap<String, JsonValue>,
140        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
141    ) -> Result<AgentStateMutation<StateT>, AgentError> {
142        Ok(AgentStateMutation::default())
143    }
144
145    async fn on_tool_call_end_event(
146        &self,
147        event: &ToolCallEndEvent,
148        tool_call_name: &str,
149        _tool_call_args: &HashMap<String, JsonValue>,
150        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
151    ) -> Result<AgentStateMutation<StateT>, AgentError> {
152        Ok(AgentStateMutation::default())
153    }
154
155    async fn on_tool_call_result_event(
156        &self,
157        event: &ToolCallResultEvent,
158        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
159    ) -> Result<AgentStateMutation<StateT>, AgentError> {
160        Ok(AgentStateMutation::default())
161    }
162
163    async fn on_state_snapshot_event(
164        &self,
165        event: &StateSnapshotEvent<StateT>,
166        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
167    ) -> Result<AgentStateMutation<StateT>, AgentError> {
168        Ok(AgentStateMutation::default())
169    }
170
171    async fn on_state_delta_event(
172        &self,
173        event: &StateDeltaEvent,
174        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
175    ) -> Result<AgentStateMutation<StateT>, AgentError> {
176        Ok(AgentStateMutation::default())
177    }
178
179    async fn on_messages_snapshot_event(
180        &self,
181        event: &MessagesSnapshotEvent,
182        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
183    ) -> Result<AgentStateMutation<StateT>, AgentError> {
184        Ok(AgentStateMutation::default())
185    }
186
187    async fn on_raw_event(
188        &self,
189        event: &RawEvent,
190        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
191    ) -> Result<AgentStateMutation<StateT>, AgentError> {
192        Ok(AgentStateMutation::default())
193    }
194
195    async fn on_custom_event(
196        &self,
197        event: &CustomEvent,
198        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
199    ) -> Result<AgentStateMutation<StateT>, AgentError> {
200        Ok(AgentStateMutation::default())
201    }
202
203    async fn on_text_message_chunk_event(
204        &self,
205        event: &TextMessageChunkEvent,
206        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
207    ) -> Result<AgentStateMutation<StateT>, AgentError> {
208        Ok(AgentStateMutation::default())
209    }
210
211    async fn on_thinking_text_message_start_event(
212        &self,
213        event: &ThinkingTextMessageStartEvent,
214        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
215    ) -> Result<AgentStateMutation<StateT>, AgentError> {
216        Ok(AgentStateMutation::default())
217    }
218
219    async fn on_thinking_text_message_content_event(
220        &self,
221        event: &ThinkingTextMessageContentEvent,
222        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
223    ) -> Result<AgentStateMutation<StateT>, AgentError> {
224        Ok(AgentStateMutation::default())
225    }
226
227    async fn on_thinking_text_message_end_event(
228        &self,
229        event: &ThinkingTextMessageEndEvent,
230        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
231    ) -> Result<AgentStateMutation<StateT>, AgentError> {
232        Ok(AgentStateMutation::default())
233    }
234
235    async fn on_tool_call_chunk_event(
236        &self,
237        event: &ToolCallChunkEvent,
238        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
239    ) -> Result<AgentStateMutation<StateT>, AgentError> {
240        Ok(AgentStateMutation::default())
241    }
242
243    async fn on_thinking_start_event(
244        &self,
245        event: &ThinkingStartEvent,
246        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
247    ) -> Result<AgentStateMutation<StateT>, AgentError> {
248        Ok(AgentStateMutation::default())
249    }
250
251    async fn on_thinking_end_event(
252        &self,
253        event: &ThinkingEndEvent,
254        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
255    ) -> Result<AgentStateMutation<StateT>, AgentError> {
256        Ok(AgentStateMutation::default())
257    }
258
259    // State changes
260    async fn on_messages_changed(
261        &self,
262        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
263    ) -> Result<(), AgentError> {
264        Ok(())
265    }
266
267    async fn on_state_changed(
268        &self,
269        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
270    ) -> Result<(), AgentError> {
271        Ok(())
272    }
273
274    async fn on_new_message(
275        &self,
276        message: &Message,
277        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
278    ) -> Result<(), AgentError> {
279        Ok(())
280    }
281
282    async fn on_new_tool_call(
283        &self,
284        tool_call: &ToolCall,
285        params: AgentSubscriberParams<'async_trait, StateT, FwdPropsT>,
286    ) -> Result<(), AgentError> {
287        Ok(())
288    }
289}
290
291/// Wrapper for subscriber implementations.
292///
293/// Facilitates easy casting to and from types that implement [`AgentSubscriber`].
294///
295/// # Examples
296///
297/// ```
298/// # use ag_ui_client::subscriber::{Subscribers, AgentSubscriber};
299/// # use std::sync::Arc;
300/// # struct MySubscriber;
301/// # impl AgentSubscriber for MySubscriber {}
302///
303/// // Create from a single subscriber
304/// let subscriber = MySubscriber;
305/// let subscribers = Subscribers::from_subscriber(subscriber);
306///
307/// // Create from multiple subscribers
308/// let subscriber_vec = vec![MySubscriber, MySubscriber];
309/// let subscribers = Subscribers::from_iter(subscriber_vec);
310///
311/// // Create from pre-wrapped Arc subscribers
312/// let arc_subscribers: Vec<Arc<dyn AgentSubscriber>> = vec![
313///     Arc::new(MySubscriber)
314/// ];
315/// let subscribers = Subscribers::new(arc_subscribers);
316/// ```
317///
318#[derive(Clone)]
319pub struct Subscribers<StateT: AgentState = JsonValue, FwdPropsT: FwdProps = JsonValue> {
320    subs: Vec<Arc<dyn AgentSubscriber<StateT, FwdPropsT>>>,
321}
322
323impl<StateT: AgentState, FwdPropsT: FwdProps> Subscribers<StateT, FwdPropsT> {
324    pub fn new(subscribers: Vec<Arc<dyn AgentSubscriber<StateT, FwdPropsT>>>) -> Self {
325        Self { subs: subscribers }
326    }
327
328    /// Creates a new Subscribers collection from a single subscriber
329    pub fn from_subscriber<T>(subscriber: T) -> Self
330    where
331        T: AgentSubscriber<StateT, FwdPropsT> + 'static,
332    {
333        Self::new(vec![Arc::new(subscriber)])
334    }
335}
336
337impl<StateT, FwdPropsT, T> FromIterator<T> for Subscribers<StateT, FwdPropsT>
338where
339    StateT: AgentState,
340    FwdPropsT: FwdProps,
341    T: AgentSubscriber<StateT, FwdPropsT> + 'static,
342{
343    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
344        Self::new(
345            iter.into_iter()
346                .map(|s| Arc::new(s) as Arc<dyn AgentSubscriber<StateT, FwdPropsT>>)
347                .collect(),
348        )
349    }
350}
351
352impl<'a, StateT, FwdPropsT> IntoIterator for &'a Subscribers<StateT, FwdPropsT>
353where
354    StateT: AgentState,
355    FwdPropsT: FwdProps,
356{
357    type Item = &'a Arc<dyn AgentSubscriber<StateT, FwdPropsT>>;
358    type IntoIter = Iter<'a, Arc<dyn AgentSubscriber<StateT, FwdPropsT>>>;
359
360    fn into_iter(self) -> Self::IntoIter {
361        self.subs.iter()
362    }
363}
364
365/// Trait for types that can be converted into a Subscribers collection
366/// This allows for flexible input types in APIs that accept subscribers
367pub trait IntoSubscribers<StateT: AgentState, FwdPropsT: FwdProps>: Send {
368    fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT>;
369}
370
371// Implementation for Subscribers itself (identity conversion)
372impl<StateT, FwdPropsT> IntoSubscribers<StateT, FwdPropsT> for Subscribers<StateT, FwdPropsT>
373where
374    StateT: AgentState,
375    FwdPropsT: FwdProps,
376{
377    fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
378        self
379    }
380}
381
382// Implementation for single subscribers, as a unit-sized tuple
383impl<StateT, FwdPropsT, T> IntoSubscribers<StateT, FwdPropsT> for (T,)
384where
385    StateT: AgentState,
386    FwdPropsT: FwdProps,
387    T: AgentSubscriber<StateT, FwdPropsT> + 'static,
388{
389    fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
390        Subscribers::from_subscriber(self.0)
391    }
392}
393
394// Implementation for Vec of subscribers
395impl<StateT, FwdPropsT, T> IntoSubscribers<StateT, FwdPropsT> for Vec<T>
396where
397    StateT: AgentState,
398    FwdPropsT: FwdProps,
399    T: AgentSubscriber<StateT, FwdPropsT> + 'static,
400{
401    fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
402        Subscribers::from_iter(self)
403    }
404}
405
406// Implementation for arrays of subscribers
407impl<StateT, FwdPropsT, T, const N: usize> IntoSubscribers<StateT, FwdPropsT> for [T; N]
408where
409    StateT: AgentState,
410    FwdPropsT: FwdProps,
411    T: AgentSubscriber<StateT, FwdPropsT> + 'static,
412{
413    fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
414        Subscribers::from_iter(self)
415    }
416}
417
418// Implementation for empty case (no subscribers)
419impl<StateT: AgentState, FwdPropsT: FwdProps> IntoSubscribers<StateT, FwdPropsT> for () {
420    fn into_subscribers(self) -> Subscribers<StateT, FwdPropsT> {
421        Subscribers::new(vec![])
422    }
423}