synap_sdk/
stream.rs

1//! Event Stream operations
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Event, StreamStats};
6use serde_json::{Value, json};
7
8/// Stream Manager interface
9///
10/// Uses StreamableHTTP protocol for all operations.
11/// Event Streams are **reactive by default** - use `observe_events()` or `observe_event()`.
12#[derive(Clone)]
13pub struct StreamManager {
14    pub(crate) client: SynapClient,
15}
16
17impl StreamManager {
18    /// Create a new Stream manager interface
19    pub(crate) fn new(client: SynapClient) -> Self {
20        Self { client }
21    }
22
23    /// Create a new stream room
24    pub async fn create_room(&self, room: &str, max_events: Option<usize>) -> Result<()> {
25        let mut payload = json!({"room": room});
26        if let Some(max) = max_events {
27            payload["max_events"] = json!(max);
28        }
29
30        self.client.send_command("stream.create", payload).await?;
31        Ok(())
32    }
33
34    /// Publish an event to a stream
35    ///
36    /// # Returns
37    /// Returns the offset of the published event
38    ///
39    /// # Example
40    /// ```no_run
41    /// # use synap_sdk::{SynapClient, SynapConfig};
42    /// # use serde_json::json;
43    /// # #[tokio::main]
44    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
45    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
46    /// let offset = client.stream().publish(
47    ///     "chat-room",
48    ///     "message",
49    ///     json!({"user": "alice", "text": "Hello!"})
50    /// ).await?;
51    /// # Ok(())
52    /// # }
53    /// ```
54    pub async fn publish(&self, room: &str, event: &str, data: Value) -> Result<u64> {
55        let payload = json!({
56            "room": room,
57            "event": event,
58            "data": data,
59        });
60
61        let response = self.client.send_command("stream.publish", payload).await?;
62
63        Ok(response["offset"].as_u64().unwrap_or(0))
64    }
65
66    /// Consume events from a stream
67    ///
68    /// # Arguments
69    /// * `room` - Stream room name
70    /// * `offset` - Starting offset (None = from beginning)
71    /// * `limit` - Maximum events to fetch
72    pub async fn consume(
73        &self,
74        room: &str,
75        offset: Option<u64>,
76        limit: Option<usize>,
77    ) -> Result<Vec<Event>> {
78        let payload = json!({
79            "room": room,
80            "offset": offset,
81            "limit": limit,
82        });
83
84        let response = self.client.send_command("stream.consume", payload).await?;
85        Ok(serde_json::from_value(response["events"].clone())?)
86    }
87
88    /// Get stream statistics
89    pub async fn stats(&self, room: &str) -> Result<StreamStats> {
90        let payload = json!({"room": room});
91        let response = self.client.send_command("stream.stats", payload).await?;
92        Ok(serde_json::from_value(response)?)
93    }
94
95    /// List all stream rooms
96    pub async fn list(&self) -> Result<Vec<String>> {
97        let response = self.client.send_command("stream.list", json!({})).await?;
98        Ok(serde_json::from_value(response["rooms"].clone())?)
99    }
100
101    /// Delete a stream room
102    pub async fn delete_room(&self, room: &str) -> Result<()> {
103        let payload = json!({"room": room});
104        self.client.send_command("stream.delete", payload).await?;
105        Ok(())
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use crate::SynapConfig;
113
114    #[test]
115    fn test_stream_manager_creation() {
116        let config = SynapConfig::new("http://localhost:15500");
117        let client = SynapClient::new(config).unwrap();
118        let stream = client.stream();
119
120        assert!(std::mem::size_of_val(&stream) > 0);
121    }
122
123    #[test]
124    fn test_stream_manager_clone() {
125        let config = SynapConfig::new("http://localhost:15500");
126        let client = SynapClient::new(config).unwrap();
127        let stream1 = client.stream();
128        let stream2 = stream1.clone();
129
130        assert!(std::mem::size_of_val(&stream1) > 0);
131        assert!(std::mem::size_of_val(&stream2) > 0);
132    }
133}