syncable-ag-ui-client 0.2.0

Client-side AG-UI event consumer for Rust applications - Syncable SDK
Documentation
//! SSE (Server-Sent Events) Client
//!
//! This module provides a client for consuming AG-UI events via Server-Sent Events.
//!
//! # Example
//!
//! ```rust,ignore
//! use ag_ui_client::SseClient;
//! use futures::StreamExt;
//!
//! let client = SseClient::connect("http://localhost:3000/events").await?;
//! let mut stream = client.into_stream();
//!
//! while let Some(event) = stream.next().await {
//!     println!("Event: {:?}", event?.event_type());
//! }
//! ```

use std::pin::Pin;
use std::task::{Context, Poll};

use syncable_ag_ui_core::{Event, JsonValue};
use futures::Stream;
use reqwest::Client;
use reqwest_eventsource::{Event as SseEvent, EventSource};

use crate::error::{ClientError, Result};

/// Configuration for SSE client connections.
#[derive(Debug, Clone)]
pub struct SseConfig {
    /// Request timeout for the initial connection.
    pub connect_timeout: std::time::Duration,
    /// Custom headers to include in the request.
    pub headers: Vec<(String, String)>,
}

impl Default for SseConfig {
    fn default() -> Self {
        Self {
            connect_timeout: std::time::Duration::from_secs(30),
            headers: Vec::new(),
        }
    }
}

impl SseConfig {
    /// Creates a new configuration with default values.
    pub fn new() -> Self {
        Self::default()
    }

    /// Sets the connection timeout.
    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
        self.connect_timeout = timeout;
        self
    }

    /// Adds a custom header.
    pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
        self.headers.push((name.into(), value.into()));
        self
    }

    /// Adds an authorization bearer token.
    pub fn bearer_token(self, token: impl Into<String>) -> Self {
        self.header("Authorization", format!("Bearer {}", token.into()))
    }
}

/// SSE client for consuming AG-UI event streams.
///
/// The client connects to an SSE endpoint and provides a stream of
/// parsed AG-UI events.
pub struct SseClient {
    event_source: EventSource,
}

impl SseClient {
    /// Connects to an SSE endpoint with default configuration.
    ///
    /// # Arguments
    ///
    /// * `url` - The SSE endpoint URL
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// let client = SseClient::connect("http://localhost:3000/events").await?;
    /// ```
    pub async fn connect(url: &str) -> Result<Self> {
        Self::connect_with_config(url, SseConfig::default()).await
    }

    /// Connects to an SSE endpoint with custom configuration.
    ///
    /// # Arguments
    ///
    /// * `url` - The SSE endpoint URL
    /// * `config` - Connection configuration
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// let config = SseConfig::new()
    ///     .connect_timeout(Duration::from_secs(60))
    ///     .bearer_token("my-token");
    /// let client = SseClient::connect_with_config("http://localhost:3000/events", config).await?;
    /// ```
    pub async fn connect_with_config(url: &str, config: SseConfig) -> Result<Self> {
        let client = Client::builder()
            .timeout(config.connect_timeout)
            .build()
            .map_err(|e| ClientError::connection(e.to_string()))?;

        let mut request = client.get(url);

        for (name, value) in config.headers {
            request = request.header(&name, &value);
        }

        let event_source = EventSource::new(request)
            .map_err(|e| ClientError::connection(e.to_string()))?;

        Ok(Self { event_source })
    }

    /// Converts this client into an event stream.
    ///
    /// The stream yields parsed AG-UI events as they arrive.
    pub fn into_stream(self) -> SseEventStream {
        SseEventStream {
            event_source: self.event_source,
        }
    }
}

/// A stream of AG-UI events from an SSE connection.
///
/// This stream yields `Result<Event>` items as events arrive from the server.
pub struct SseEventStream {
    event_source: EventSource,
}

impl Stream for SseEventStream {
    type Item = Result<Event<JsonValue>>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match Pin::new(&mut self.event_source).poll_next(cx) {
                Poll::Ready(Some(Ok(sse_event))) => {
                    match sse_event {
                        SseEvent::Open => {
                            // Connection opened, continue polling
                            continue;
                        }
                        SseEvent::Message(msg) => {
                            // Parse the event data as JSON
                            match serde_json::from_str::<Event<JsonValue>>(&msg.data) {
                                Ok(event) => return Poll::Ready(Some(Ok(event))),
                                Err(e) => {
                                    return Poll::Ready(Some(Err(ClientError::parse(format!(
                                        "failed to parse event: {}",
                                        e
                                    )))))
                                }
                            }
                        }
                    }
                }
                Poll::Ready(Some(Err(e))) => {
                    return Poll::Ready(Some(Err(ClientError::sse(e.to_string()))))
                }
                Poll::Ready(None) => return Poll::Ready(None),
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_sse_config_default() {
        let config = SseConfig::default();
        assert_eq!(config.connect_timeout, std::time::Duration::from_secs(30));
        assert!(config.headers.is_empty());
    }

    #[test]
    fn test_sse_config_builder() {
        let config = SseConfig::new()
            .connect_timeout(std::time::Duration::from_secs(60))
            .header("X-Custom", "value")
            .bearer_token("token123");

        assert_eq!(config.connect_timeout, std::time::Duration::from_secs(60));
        assert_eq!(config.headers.len(), 2);
        assert_eq!(config.headers[0], ("X-Custom".to_string(), "value".to_string()));
        assert_eq!(
            config.headers[1],
            ("Authorization".to_string(), "Bearer token123".to_string())
        );
    }
}