syncable-ag-ui-client 0.2.0

Client-side AG-UI event consumer for Rust applications - Syncable SDK
Documentation
//! State Reconstruction
//!
//! This module provides utilities for reconstructing agent state from
//! AG-UI event streams. It handles state snapshots and delta updates.
//!
//! # Example
//!
//! ```rust,ignore
//! use ag_ui_client::{StateReconstructor, SseClient};
//! use futures::StreamExt;
//!
//! let client = SseClient::connect("http://localhost:3000/events").await?;
//! let mut stream = client.into_stream();
//! let mut state = StateReconstructor::new();
//!
//! while let Some(event) = stream.next().await {
//!     let event = event?;
//!     state.apply_event(&event)?;
//!     println!("Current state: {:?}", state.current());
//! }
//! ```

use syncable_ag_ui_core::{
    patch::{apply_patch_from_value, Patch},
    Event, JsonValue, Message,
};

use crate::error::{ClientError, Result};

/// Result type alias for state operations.
pub type StateResult<T> = std::result::Result<T, ClientError>;

/// Reconstructs agent state from AG-UI events.
///
/// This struct tracks the current agent state and message history,
/// updating them as events are received. It handles both snapshot
/// and delta events for efficient state synchronization.
#[derive(Debug, Clone)]
pub struct StateReconstructor {
    /// Current agent state.
    state: JsonValue,
    /// Message history.
    messages: Vec<Message>,
    /// Current run ID (if any).
    run_id: Option<String>,
    /// Whether a run is currently active.
    run_active: bool,
}

impl Default for StateReconstructor {
    fn default() -> Self {
        Self::new()
    }
}

impl StateReconstructor {
    /// Creates a new state reconstructor with empty initial state.
    pub fn new() -> Self {
        Self {
            state: JsonValue::Object(serde_json::Map::new()),
            messages: Vec::new(),
            run_id: None,
            run_active: false,
        }
    }

    /// Creates a new state reconstructor with initial state.
    pub fn with_state(initial: JsonValue) -> Self {
        Self {
            state: initial,
            messages: Vec::new(),
            run_id: None,
            run_active: false,
        }
    }

    /// Returns a reference to the current state.
    pub fn current(&self) -> &JsonValue {
        &self.state
    }

    /// Returns the current state, consuming the reconstructor.
    pub fn into_state(self) -> JsonValue {
        self.state
    }

    /// Returns a reference to the message history.
    pub fn messages(&self) -> &[Message] {
        &self.messages
    }

    /// Returns the current run ID, if any.
    pub fn run_id(&self) -> Option<&str> {
        self.run_id.as_deref()
    }

    /// Returns whether a run is currently active.
    pub fn is_run_active(&self) -> bool {
        self.run_active
    }

    /// Applies an event to update the state.
    ///
    /// This method processes the event and updates the internal state
    /// accordingly. State snapshots replace the entire state, while
    /// deltas are applied as JSON Patches.
    pub fn apply_event(&mut self, event: &Event<JsonValue>) -> Result<()> {
        match event {
            Event::RunStarted(e) => {
                self.run_id = Some(e.run_id.to_string());
                self.run_active = true;
            }
            Event::RunFinished(_) | Event::RunError(_) => {
                self.run_active = false;
            }
            Event::StateSnapshot(e) => {
                self.state = e.snapshot.clone();
            }
            Event::StateDelta(e) => {
                self.apply_delta(&e.delta)?;
            }
            Event::MessagesSnapshot(e) => {
                self.messages = e.messages.clone();
            }
            // Other events don't affect state reconstruction
            _ => {}
        }
        Ok(())
    }

    /// Applies a JSON Patch delta to the current state.
    pub fn apply_delta(&mut self, delta: &[JsonValue]) -> Result<()> {
        // Convert Vec<JsonValue> to JsonValue array for apply_patch_from_value
        let delta_array = JsonValue::Array(delta.to_vec());
        apply_patch_from_value(&mut self.state, &delta_array)
            .map_err(|e| ClientError::state(e.to_string()))
    }

    /// Applies a Patch directly to the current state.
    pub fn apply_patch(&mut self, patch: &Patch) -> Result<()> {
        syncable_ag_ui_core::patch::apply_patch(&mut self.state, patch)
            .map_err(|e| ClientError::state(e.to_string()))
    }

    /// Resets the state to a new value.
    pub fn reset(&mut self, state: JsonValue) {
        self.state = state;
    }

    /// Clears all state and message history.
    pub fn clear(&mut self) {
        self.state = JsonValue::Object(serde_json::Map::new());
        self.messages.clear();
        self.run_id = None;
        self.run_active = false;
    }

    /// Gets a value from the state by JSON pointer path.
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// let count = state.get("/count");
    /// let nested = state.get("/user/name");
    /// ```
    pub fn get(&self, path: &str) -> Option<&JsonValue> {
        self.state.pointer(path)
    }

    /// Gets a typed value from the state by JSON pointer path.
    ///
    /// Returns `None` if the path doesn't exist or the value can't be
    /// deserialized to the target type.
    pub fn get_as<T: serde::de::DeserializeOwned>(&self, path: &str) -> Option<T> {
        self.state
            .pointer(path)
            .and_then(|v| serde_json::from_value(v.clone()).ok())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use syncable_ag_ui_core::{
        BaseEvent, MessageId, MessagesSnapshotEvent, RunId, RunStartedEvent, StateDeltaEvent,
        StateSnapshotEvent, ThreadId,
    };
    use serde_json::json;

    fn base_event() -> BaseEvent {
        BaseEvent::new()
    }

    #[test]
    fn test_new_state() {
        let state = StateReconstructor::new();
        assert!(state.current().is_object());
        assert!(state.messages().is_empty());
        assert!(!state.is_run_active());
    }

    #[test]
    fn test_with_initial_state() {
        let initial = json!({"count": 0});
        let state = StateReconstructor::with_state(initial.clone());
        assert_eq!(state.current(), &initial);
    }

    #[test]
    fn test_apply_state_snapshot() {
        let mut state = StateReconstructor::new();

        let event = Event::StateSnapshot(StateSnapshotEvent {
            base: base_event(),
            snapshot: json!({"count": 42, "name": "test"}),
        });

        state.apply_event(&event).unwrap();
        assert_eq!(state.current()["count"], 42);
        assert_eq!(state.current()["name"], "test");
    }

    #[test]
    fn test_apply_state_delta() {
        let mut state = StateReconstructor::with_state(json!({"count": 0}));

        let event = Event::StateDelta(StateDeltaEvent {
            base: base_event(),
            delta: vec![json!({
                "op": "replace",
                "path": "/count",
                "value": 10
            })],
        });

        state.apply_event(&event).unwrap();
        assert_eq!(state.current()["count"], 10);
    }

    #[test]
    fn test_apply_run_started() {
        let mut state = StateReconstructor::new();

        let run_id = RunId::random();
        let run_id_str = run_id.to_string();

        let event = Event::RunStarted(RunStartedEvent {
            base: base_event(),
            thread_id: ThreadId::random(),
            run_id,
        });

        state.apply_event(&event).unwrap();
        assert!(state.is_run_active());
        assert_eq!(state.run_id(), Some(run_id_str.as_str()));
    }

    #[test]
    fn test_apply_messages_snapshot() {
        let mut state = StateReconstructor::new();

        let msg = Message::Assistant {
            id: MessageId::random(),
            content: Some("Hello".to_string()),
            name: None,
            tool_calls: None,
        };

        let event = Event::MessagesSnapshot(MessagesSnapshotEvent {
            base: base_event(),
            messages: vec![msg],
        });

        state.apply_event(&event).unwrap();
        assert_eq!(state.messages().len(), 1);
    }

    #[test]
    fn test_get_by_path() {
        let state = StateReconstructor::with_state(json!({
            "user": {
                "name": "Alice",
                "age": 30
            }
        }));

        assert_eq!(state.get("/user/name"), Some(&json!("Alice")));
        assert_eq!(state.get("/user/age"), Some(&json!(30)));
        assert_eq!(state.get("/nonexistent"), None);
    }

    #[test]
    fn test_get_as_typed() {
        let state = StateReconstructor::with_state(json!({
            "count": 42,
            "name": "test"
        }));

        let count: Option<i32> = state.get_as("/count");
        assert_eq!(count, Some(42));

        let name: Option<String> = state.get_as("/name");
        assert_eq!(name, Some("test".to_string()));

        let missing: Option<i32> = state.get_as("/missing");
        assert_eq!(missing, None);
    }

    #[test]
    fn test_clear() {
        let mut state = StateReconstructor::with_state(json!({"count": 42}));
        state.run_active = true;
        state.run_id = Some("run1".to_string());

        state.clear();

        assert!(state.current().is_object());
        assert!(state.current().as_object().unwrap().is_empty());
        assert!(!state.is_run_active());
        assert!(state.run_id().is_none());
    }

    #[test]
    fn test_reset() {
        let mut state = StateReconstructor::with_state(json!({"old": true}));

        state.reset(json!({"new": true}));

        assert_eq!(state.current(), &json!({"new": true}));
    }
}