Skip to main content

syncable_ag_ui_client/
state.rs

1//! State Reconstruction
2//!
3//! This module provides utilities for reconstructing agent state from
4//! AG-UI event streams. It handles state snapshots and delta updates.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use ag_ui_client::{StateReconstructor, SseClient};
10//! use futures::StreamExt;
11//!
12//! let client = SseClient::connect("http://localhost:3000/events").await?;
13//! let mut stream = client.into_stream();
14//! let mut state = StateReconstructor::new();
15//!
16//! while let Some(event) = stream.next().await {
17//!     let event = event?;
18//!     state.apply_event(&event)?;
19//!     println!("Current state: {:?}", state.current());
20//! }
21//! ```
22
23use syncable_ag_ui_core::{
24    patch::{apply_patch_from_value, Patch},
25    Event, JsonValue, Message,
26};
27
28use crate::error::{ClientError, Result};
29
30/// Result type alias for state operations.
31pub type StateResult<T> = std::result::Result<T, ClientError>;
32
33/// Reconstructs agent state from AG-UI events.
34///
35/// This struct tracks the current agent state and message history,
36/// updating them as events are received. It handles both snapshot
37/// and delta events for efficient state synchronization.
38#[derive(Debug, Clone)]
39pub struct StateReconstructor {
40    /// Current agent state.
41    state: JsonValue,
42    /// Message history.
43    messages: Vec<Message>,
44    /// Current run ID (if any).
45    run_id: Option<String>,
46    /// Whether a run is currently active.
47    run_active: bool,
48}
49
50impl Default for StateReconstructor {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56impl StateReconstructor {
57    /// Creates a new state reconstructor with empty initial state.
58    pub fn new() -> Self {
59        Self {
60            state: JsonValue::Object(serde_json::Map::new()),
61            messages: Vec::new(),
62            run_id: None,
63            run_active: false,
64        }
65    }
66
67    /// Creates a new state reconstructor with initial state.
68    pub fn with_state(initial: JsonValue) -> Self {
69        Self {
70            state: initial,
71            messages: Vec::new(),
72            run_id: None,
73            run_active: false,
74        }
75    }
76
77    /// Returns a reference to the current state.
78    pub fn current(&self) -> &JsonValue {
79        &self.state
80    }
81
82    /// Returns the current state, consuming the reconstructor.
83    pub fn into_state(self) -> JsonValue {
84        self.state
85    }
86
87    /// Returns a reference to the message history.
88    pub fn messages(&self) -> &[Message] {
89        &self.messages
90    }
91
92    /// Returns the current run ID, if any.
93    pub fn run_id(&self) -> Option<&str> {
94        self.run_id.as_deref()
95    }
96
97    /// Returns whether a run is currently active.
98    pub fn is_run_active(&self) -> bool {
99        self.run_active
100    }
101
102    /// Applies an event to update the state.
103    ///
104    /// This method processes the event and updates the internal state
105    /// accordingly. State snapshots replace the entire state, while
106    /// deltas are applied as JSON Patches.
107    pub fn apply_event(&mut self, event: &Event<JsonValue>) -> Result<()> {
108        match event {
109            Event::RunStarted(e) => {
110                self.run_id = Some(e.run_id.to_string());
111                self.run_active = true;
112            }
113            Event::RunFinished(_) | Event::RunError(_) => {
114                self.run_active = false;
115            }
116            Event::StateSnapshot(e) => {
117                self.state = e.snapshot.clone();
118            }
119            Event::StateDelta(e) => {
120                self.apply_delta(&e.delta)?;
121            }
122            Event::MessagesSnapshot(e) => {
123                self.messages = e.messages.clone();
124            }
125            // Other events don't affect state reconstruction
126            _ => {}
127        }
128        Ok(())
129    }
130
131    /// Applies a JSON Patch delta to the current state.
132    pub fn apply_delta(&mut self, delta: &[JsonValue]) -> Result<()> {
133        // Convert Vec<JsonValue> to JsonValue array for apply_patch_from_value
134        let delta_array = JsonValue::Array(delta.to_vec());
135        apply_patch_from_value(&mut self.state, &delta_array)
136            .map_err(|e| ClientError::state(e.to_string()))
137    }
138
139    /// Applies a Patch directly to the current state.
140    pub fn apply_patch(&mut self, patch: &Patch) -> Result<()> {
141        syncable_ag_ui_core::patch::apply_patch(&mut self.state, patch)
142            .map_err(|e| ClientError::state(e.to_string()))
143    }
144
145    /// Resets the state to a new value.
146    pub fn reset(&mut self, state: JsonValue) {
147        self.state = state;
148    }
149
150    /// Clears all state and message history.
151    pub fn clear(&mut self) {
152        self.state = JsonValue::Object(serde_json::Map::new());
153        self.messages.clear();
154        self.run_id = None;
155        self.run_active = false;
156    }
157
158    /// Gets a value from the state by JSON pointer path.
159    ///
160    /// # Example
161    ///
162    /// ```rust,ignore
163    /// let count = state.get("/count");
164    /// let nested = state.get("/user/name");
165    /// ```
166    pub fn get(&self, path: &str) -> Option<&JsonValue> {
167        self.state.pointer(path)
168    }
169
170    /// Gets a typed value from the state by JSON pointer path.
171    ///
172    /// Returns `None` if the path doesn't exist or the value can't be
173    /// deserialized to the target type.
174    pub fn get_as<T: serde::de::DeserializeOwned>(&self, path: &str) -> Option<T> {
175        self.state
176            .pointer(path)
177            .and_then(|v| serde_json::from_value(v.clone()).ok())
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use syncable_ag_ui_core::{
185        BaseEvent, MessageId, MessagesSnapshotEvent, RunId, RunStartedEvent, StateDeltaEvent,
186        StateSnapshotEvent, ThreadId,
187    };
188    use serde_json::json;
189
190    fn base_event() -> BaseEvent {
191        BaseEvent::new()
192    }
193
194    #[test]
195    fn test_new_state() {
196        let state = StateReconstructor::new();
197        assert!(state.current().is_object());
198        assert!(state.messages().is_empty());
199        assert!(!state.is_run_active());
200    }
201
202    #[test]
203    fn test_with_initial_state() {
204        let initial = json!({"count": 0});
205        let state = StateReconstructor::with_state(initial.clone());
206        assert_eq!(state.current(), &initial);
207    }
208
209    #[test]
210    fn test_apply_state_snapshot() {
211        let mut state = StateReconstructor::new();
212
213        let event = Event::StateSnapshot(StateSnapshotEvent {
214            base: base_event(),
215            snapshot: json!({"count": 42, "name": "test"}),
216        });
217
218        state.apply_event(&event).unwrap();
219        assert_eq!(state.current()["count"], 42);
220        assert_eq!(state.current()["name"], "test");
221    }
222
223    #[test]
224    fn test_apply_state_delta() {
225        let mut state = StateReconstructor::with_state(json!({"count": 0}));
226
227        let event = Event::StateDelta(StateDeltaEvent {
228            base: base_event(),
229            delta: vec![json!({
230                "op": "replace",
231                "path": "/count",
232                "value": 10
233            })],
234        });
235
236        state.apply_event(&event).unwrap();
237        assert_eq!(state.current()["count"], 10);
238    }
239
240    #[test]
241    fn test_apply_run_started() {
242        let mut state = StateReconstructor::new();
243
244        let run_id = RunId::random();
245        let run_id_str = run_id.to_string();
246
247        let event = Event::RunStarted(RunStartedEvent {
248            base: base_event(),
249            thread_id: ThreadId::random(),
250            run_id,
251        });
252
253        state.apply_event(&event).unwrap();
254        assert!(state.is_run_active());
255        assert_eq!(state.run_id(), Some(run_id_str.as_str()));
256    }
257
258    #[test]
259    fn test_apply_messages_snapshot() {
260        let mut state = StateReconstructor::new();
261
262        let msg = Message::Assistant {
263            id: MessageId::random(),
264            content: Some("Hello".to_string()),
265            name: None,
266            tool_calls: None,
267        };
268
269        let event = Event::MessagesSnapshot(MessagesSnapshotEvent {
270            base: base_event(),
271            messages: vec![msg],
272        });
273
274        state.apply_event(&event).unwrap();
275        assert_eq!(state.messages().len(), 1);
276    }
277
278    #[test]
279    fn test_get_by_path() {
280        let state = StateReconstructor::with_state(json!({
281            "user": {
282                "name": "Alice",
283                "age": 30
284            }
285        }));
286
287        assert_eq!(state.get("/user/name"), Some(&json!("Alice")));
288        assert_eq!(state.get("/user/age"), Some(&json!(30)));
289        assert_eq!(state.get("/nonexistent"), None);
290    }
291
292    #[test]
293    fn test_get_as_typed() {
294        let state = StateReconstructor::with_state(json!({
295            "count": 42,
296            "name": "test"
297        }));
298
299        let count: Option<i32> = state.get_as("/count");
300        assert_eq!(count, Some(42));
301
302        let name: Option<String> = state.get_as("/name");
303        assert_eq!(name, Some("test".to_string()));
304
305        let missing: Option<i32> = state.get_as("/missing");
306        assert_eq!(missing, None);
307    }
308
309    #[test]
310    fn test_clear() {
311        let mut state = StateReconstructor::with_state(json!({"count": 42}));
312        state.run_active = true;
313        state.run_id = Some("run1".to_string());
314
315        state.clear();
316
317        assert!(state.current().is_object());
318        assert!(state.current().as_object().unwrap().is_empty());
319        assert!(!state.is_run_active());
320        assert!(state.run_id().is_none());
321    }
322
323    #[test]
324    fn test_reset() {
325        let mut state = StateReconstructor::with_state(json!({"old": true}));
326
327        state.reset(json!({"new": true}));
328
329        assert_eq!(state.current(), &json!({"new": true}));
330    }
331}