1use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Event, StreamStats};
6use serde_json::{Value, json};
7
8#[derive(Clone)]
13pub struct StreamManager {
14 pub(crate) client: SynapClient,
15}
16
17impl StreamManager {
18 pub(crate) fn new(client: SynapClient) -> Self {
20 Self { client }
21 }
22
23 pub async fn create_room(&self, room: &str, max_events: Option<usize>) -> Result<()> {
25 let mut payload = json!({"room": room});
26 if let Some(max) = max_events {
27 payload["max_events"] = json!(max);
28 }
29
30 self.client.send_command("stream.create", payload).await?;
31 Ok(())
32 }
33
34 pub async fn publish(&self, room: &str, event: &str, data: Value) -> Result<u64> {
55 let payload = json!({
56 "room": room,
57 "event": event,
58 "data": data,
59 });
60
61 let response = self.client.send_command("stream.publish", payload).await?;
62
63 Ok(response["offset"].as_u64().unwrap_or(0))
64 }
65
66 pub async fn consume(
73 &self,
74 room: &str,
75 offset: Option<u64>,
76 limit: Option<usize>,
77 ) -> Result<Vec<Event>> {
78 let payload = json!({
79 "room": room,
80 "offset": offset,
81 "limit": limit,
82 });
83
84 let response = self.client.send_command("stream.consume", payload).await?;
85 Ok(serde_json::from_value(response["events"].clone())?)
86 }
87
88 pub async fn stats(&self, room: &str) -> Result<StreamStats> {
90 let payload = json!({"room": room});
91 let response = self.client.send_command("stream.stats", payload).await?;
92 Ok(serde_json::from_value(response)?)
93 }
94
95 pub async fn list(&self) -> Result<Vec<String>> {
97 let response = self.client.send_command("stream.list", json!({})).await?;
98 Ok(serde_json::from_value(response["rooms"].clone())?)
99 }
100
101 pub async fn delete_room(&self, room: &str) -> Result<()> {
103 let payload = json!({"room": room});
104 self.client.send_command("stream.delete", payload).await?;
105 Ok(())
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use crate::SynapConfig;
113
114 #[test]
115 fn test_stream_manager_creation() {
116 let config = SynapConfig::new("http://localhost:15500");
117 let client = SynapClient::new(config).unwrap();
118 let stream = client.stream();
119
120 assert!(std::mem::size_of_val(&stream) > 0);
121 }
122
123 #[test]
124 fn test_stream_manager_clone() {
125 let config = SynapConfig::new("http://localhost:15500");
126 let client = SynapClient::new(config).unwrap();
127 let stream1 = client.stream();
128 let stream2 = stream1.clone();
129
130 assert!(std::mem::size_of_val(&stream1) > 0);
131 assert!(std::mem::size_of_val(&stream2) > 0);
132 }
133}