1use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Event, StreamStats};
6use serde::Deserialize;
7use serde_json::{Value, json};
8
9#[derive(Deserialize)]
13struct RawStreamEvent {
14 #[serde(default)]
15 offset: u64,
16 event: String,
17 data: Value,
18 #[serde(default)]
19 timestamp: Option<u64>,
20}
21
22impl From<RawStreamEvent> for Event {
23 fn from(raw: RawStreamEvent) -> Self {
24 let data = match &raw.data {
25 Value::Array(arr) => {
27 let bytes: Vec<u8> = arr
28 .iter()
29 .filter_map(|v| v.as_u64().map(|n| n as u8))
30 .collect();
31 serde_json::from_slice(&bytes).unwrap_or(Value::Null)
32 }
33 Value::String(s) => serde_json::from_str(s).unwrap_or(Value::String(s.clone())),
35 other => other.clone(),
37 };
38 Self {
39 offset: raw.offset,
40 event: raw.event,
41 data,
42 timestamp: raw.timestamp,
43 }
44 }
45}
46
47#[derive(Clone)]
52pub struct StreamManager {
53 pub(crate) client: SynapClient,
54}
55
56impl StreamManager {
57 pub(crate) fn new(client: SynapClient) -> Self {
59 Self { client }
60 }
61
62 pub async fn create_room(&self, room: &str, max_events: Option<usize>) -> Result<()> {
64 let mut payload = json!({"room": room});
65 if let Some(max) = max_events {
66 payload["max_events"] = json!(max);
67 }
68
69 self.client.send_command("stream.create", payload).await?;
70 Ok(())
71 }
72
73 pub async fn publish(&self, room: &str, event: &str, data: Value) -> Result<u64> {
94 let payload = json!({
95 "room": room,
96 "event": event,
97 "data": data,
98 });
99
100 let response = self.client.send_command("stream.publish", payload).await?;
101
102 Ok(response["offset"].as_u64().unwrap_or(0))
103 }
104
105 pub async fn consume(
112 &self,
113 room: &str,
114 offset: Option<u64>,
115 limit: Option<usize>,
116 ) -> Result<Vec<Event>> {
117 let payload = json!({
118 "room": room,
119 "subscriber_id": "sdk-default",
120 "from_offset": offset.unwrap_or(0),
121 "limit": limit,
122 });
123
124 let response = self.client.send_command("stream.consume", payload).await?;
125
126 let raw_events: Vec<RawStreamEvent> = serde_json::from_value(response["events"].clone())?;
130 Ok(raw_events.into_iter().map(Into::into).collect())
131 }
132
133 pub async fn stats(&self, room: &str) -> Result<StreamStats> {
135 let payload = json!({"room": room});
136 let response = self.client.send_command("stream.stats", payload).await?;
137 Ok(serde_json::from_value(response)?)
138 }
139
140 pub async fn list(&self) -> Result<Vec<String>> {
142 let response = self.client.send_command("stream.list", json!({})).await?;
143 Ok(serde_json::from_value(response["rooms"].clone())?)
144 }
145
146 pub async fn delete_room(&self, room: &str) -> Result<()> {
148 let payload = json!({"room": room});
149 self.client.send_command("stream.delete", payload).await?;
150 Ok(())
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use crate::SynapConfig;
158
159 #[test]
160 fn test_stream_manager_creation() {
161 let config = SynapConfig::new("http://localhost:15500");
162 let client = SynapClient::new(config).unwrap();
163 let stream = client.stream();
164
165 assert!(std::mem::size_of_val(&stream) > 0);
166 }
167
168 #[test]
169 fn test_stream_manager_clone() {
170 let config = SynapConfig::new("http://localhost:15500");
171 let client = SynapClient::new(config).unwrap();
172 let stream1 = client.stream();
173 let stream2 = stream1.clone();
174
175 assert!(std::mem::size_of_val(&stream1) > 0);
176 assert!(std::mem::size_of_val(&stream2) > 0);
177 }
178}