Skip to main content

synap_sdk/
stream.rs

1//! Event Stream operations
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Event, StreamStats};
6use serde::Deserialize;
7use serde_json::{Value, json};
8
9/// Wire format of a stream event as returned by the server.
10/// HTTP returns `data` as `Vec<u8>` (serde_json::to_vec of the original JSON).
11/// SynapRPC may return `data` as a string.  We accept both via `Value`.
12#[derive(Deserialize)]
13struct RawStreamEvent {
14    #[serde(default)]
15    offset: u64,
16    event: String,
17    data: Value,
18    #[serde(default)]
19    timestamp: Option<u64>,
20}
21
22impl From<RawStreamEvent> for Event {
23    fn from(raw: RawStreamEvent) -> Self {
24        let data = match &raw.data {
25            // HTTP: data arrives as JSON array of bytes → decode back to JSON
26            Value::Array(arr) => {
27                let bytes: Vec<u8> = arr
28                    .iter()
29                    .filter_map(|v| v.as_u64().map(|n| n as u8))
30                    .collect();
31                serde_json::from_slice(&bytes).unwrap_or(Value::Null)
32            }
33            // RPC: data arrives as a JSON string (the serialized JSON)
34            Value::String(s) => serde_json::from_str(s).unwrap_or(Value::String(s.clone())),
35            // Already a JSON value (unlikely but handle gracefully)
36            other => other.clone(),
37        };
38        Self {
39            offset: raw.offset,
40            event: raw.event,
41            data,
42            timestamp: raw.timestamp,
43        }
44    }
45}
46
47/// Stream Manager interface
48///
49/// Uses StreamableHTTP protocol for all operations.
50/// Event Streams are **reactive by default** - use `observe_events()` or `observe_event()`.
51#[derive(Clone)]
52pub struct StreamManager {
53    pub(crate) client: SynapClient,
54}
55
56impl StreamManager {
57    /// Create a new Stream manager interface
58    pub(crate) fn new(client: SynapClient) -> Self {
59        Self { client }
60    }
61
62    /// Create a new stream room
63    pub async fn create_room(&self, room: &str, max_events: Option<usize>) -> Result<()> {
64        let mut payload = json!({"room": room});
65        if let Some(max) = max_events {
66            payload["max_events"] = json!(max);
67        }
68
69        self.client.send_command("stream.create", payload).await?;
70        Ok(())
71    }
72
73    /// Publish an event to a stream
74    ///
75    /// # Returns
76    /// Returns the offset of the published event
77    ///
78    /// # Example
79    /// ```no_run
80    /// # use synap_sdk::{SynapClient, SynapConfig};
81    /// # use serde_json::json;
82    /// # #[tokio::main]
83    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
84    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
85    /// let offset = client.stream().publish(
86    ///     "chat-room",
87    ///     "message",
88    ///     json!({"user": "alice", "text": "Hello!"})
89    /// ).await?;
90    /// # Ok(())
91    /// # }
92    /// ```
93    pub async fn publish(&self, room: &str, event: &str, data: Value) -> Result<u64> {
94        let payload = json!({
95            "room": room,
96            "event": event,
97            "data": data,
98        });
99
100        let response = self.client.send_command("stream.publish", payload).await?;
101
102        Ok(response["offset"].as_u64().unwrap_or(0))
103    }
104
105    /// Consume events from a stream
106    ///
107    /// # Arguments
108    /// * `room` - Stream room name
109    /// * `offset` - Starting offset (None = from beginning)
110    /// * `limit` - Maximum events to fetch
111    pub async fn consume(
112        &self,
113        room: &str,
114        offset: Option<u64>,
115        limit: Option<usize>,
116    ) -> Result<Vec<Event>> {
117        let payload = json!({
118            "room": room,
119            "subscriber_id": "sdk-default",
120            "from_offset": offset.unwrap_or(0),
121            "limit": limit,
122        });
123
124        let response = self.client.send_command("stream.consume", payload).await?;
125
126        // Server stores data as Vec<u8> (serde_json::to_vec) and serialises the
127        // struct directly, so `data` arrives as a JSON byte-array.  We decode
128        // each event's data bytes back into the original JSON value.
129        let raw_events: Vec<RawStreamEvent> = serde_json::from_value(response["events"].clone())?;
130        Ok(raw_events.into_iter().map(Into::into).collect())
131    }
132
133    /// Get stream statistics
134    pub async fn stats(&self, room: &str) -> Result<StreamStats> {
135        let payload = json!({"room": room});
136        let response = self.client.send_command("stream.stats", payload).await?;
137        Ok(serde_json::from_value(response)?)
138    }
139
140    /// List all stream rooms
141    pub async fn list(&self) -> Result<Vec<String>> {
142        let response = self.client.send_command("stream.list", json!({})).await?;
143        Ok(serde_json::from_value(response["rooms"].clone())?)
144    }
145
146    /// Delete a stream room
147    pub async fn delete_room(&self, room: &str) -> Result<()> {
148        let payload = json!({"room": room});
149        self.client.send_command("stream.delete", payload).await?;
150        Ok(())
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use crate::SynapConfig;
158
159    #[test]
160    fn test_stream_manager_creation() {
161        let config = SynapConfig::new("http://localhost:15500");
162        let client = SynapClient::new(config).unwrap();
163        let stream = client.stream();
164
165        assert!(std::mem::size_of_val(&stream) > 0);
166    }
167
168    #[test]
169    fn test_stream_manager_clone() {
170        let config = SynapConfig::new("http://localhost:15500");
171        let client = SynapClient::new(config).unwrap();
172        let stream1 = client.stream();
173        let stream2 = stream1.clone();
174
175        assert!(std::mem::size_of_val(&stream1) > 0);
176        assert!(std::mem::size_of_val(&stream2) > 0);
177    }
178}