1use crate::client::SynapClient;
4use crate::error::Result;
5use serde_json::{Value, json};
6use std::collections::HashMap;
7
8#[derive(Clone)]
13pub struct PubSubManager {
14 pub(crate) client: SynapClient,
15}
16
17impl PubSubManager {
18 pub(crate) fn new(client: SynapClient) -> Self {
20 Self { client }
21 }
22
23 pub async fn publish(
45 &self,
46 topic: &str,
47 data: Value,
48 priority: Option<u8>,
49 headers: Option<HashMap<String, String>>,
50 ) -> Result<usize> {
51 let mut payload = json!({
52 "topic": topic,
53 "payload": data, });
55
56 if let Some(p) = priority {
57 payload["priority"] = json!(p);
58 }
59
60 if let Some(h) = headers {
61 payload["headers"] = json!(h);
62 }
63
64 let response = self.client.send_command("pubsub.publish", payload).await?;
65
66 Ok(response["subscribers_matched"].as_u64().unwrap_or(0) as usize)
67 }
68
69 pub async fn subscribe_topics(
78 &self,
79 subscriber_id: &str,
80 topics: Vec<String>,
81 ) -> Result<String> {
82 let payload = json!({
83 "subscriber_id": subscriber_id,
84 "topics": topics,
85 });
86
87 let response = self
88 .client
89 .send_command("pubsub.subscribe", payload)
90 .await?;
91
92 let id = response["subscriber_id"]
94 .as_str()
95 .or_else(|| response["subscription_id"].as_str())
96 .unwrap_or_default();
97 Ok(id.to_string())
98 }
99
100 pub async fn unsubscribe(&self, subscriber_id: &str, topics: Vec<String>) -> Result<()> {
102 let payload = json!({
103 "subscriber_id": subscriber_id,
104 "topics": topics,
105 });
106
107 self.client
108 .send_command("pubsub.unsubscribe", payload)
109 .await?;
110 Ok(())
111 }
112
113 pub async fn list_topics(&self) -> Result<Vec<String>> {
115 let response = self.client.send_command("pubsub.topics", json!({})).await?;
116 Ok(serde_json::from_value(response["topics"].clone())?)
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use crate::SynapConfig;
124
125 #[test]
126 fn test_pubsub_manager_creation() {
127 let config = SynapConfig::new("http://localhost:15500");
128 let client = SynapClient::new(config).unwrap();
129 let pubsub = client.pubsub();
130
131 assert!(std::mem::size_of_val(&pubsub) > 0);
132 }
133
134 #[test]
135 fn test_pubsub_manager_clone() {
136 let config = SynapConfig::new("http://localhost:15500");
137 let client = SynapClient::new(config).unwrap();
138 let pubsub1 = client.pubsub();
139 let pubsub2 = pubsub1.clone();
140
141 assert!(std::mem::size_of_val(&pubsub1) > 0);
142 assert!(std::mem::size_of_val(&pubsub2) > 0);
143 }
144}