Skip to main content

synap_sdk/
pubsub.rs

1//! Pub/Sub operations
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use serde_json::{Value, json};
6use std::collections::HashMap;
7
8/// Pub/Sub Manager interface
9///
10/// Uses StreamableHTTP protocol for all operations.
11/// Pub/Sub is **reactive by default** - use `subscribe()` and `subscribe_topic()`.
12#[derive(Clone)]
13pub struct PubSubManager {
14    pub(crate) client: SynapClient,
15}
16
17impl PubSubManager {
18    /// Create a new Pub/Sub manager interface
19    pub(crate) fn new(client: SynapClient) -> Self {
20        Self { client }
21    }
22
23    /// Publish a message to a topic
24    ///
25    /// # Returns
26    /// Returns the number of subscribers that received the message
27    ///
28    /// # Example
29    /// ```no_run
30    /// # use synap_sdk::{SynapClient, SynapConfig};
31    /// # use serde_json::json;
32    /// # #[tokio::main]
33    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
34    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
35    /// let count = client.pubsub().publish(
36    ///     "user.created",
37    ///     json!({"id": 123, "name": "Alice"}),
38    ///     Some(5),
39    ///     None
40    /// ).await?;
41    /// # Ok(())
42    /// # }
43    /// ```
44    pub async fn publish(
45        &self,
46        topic: &str,
47        data: Value,
48        priority: Option<u8>,
49        headers: Option<HashMap<String, String>>,
50    ) -> Result<usize> {
51        let mut payload = json!({
52            "topic": topic,
53            "payload": data,  // ✅ FIX: Use "payload" instead of "data" to match server API
54        });
55
56        if let Some(p) = priority {
57            payload["priority"] = json!(p);
58        }
59
60        if let Some(h) = headers {
61            payload["headers"] = json!(h);
62        }
63
64        let response = self.client.send_command("pubsub.publish", payload).await?;
65
66        Ok(response["subscribers_matched"].as_u64().unwrap_or(0) as usize)
67    }
68
69    /// Subscribe to topics
70    ///
71    /// Supports wildcard patterns:
72    /// - `user.*` - single-level wildcard
73    /// - `user.#` - multi-level wildcard
74    ///
75    /// # Returns
76    /// Returns a subscription ID
77    pub async fn subscribe_topics(
78        &self,
79        subscriber_id: &str,
80        topics: Vec<String>,
81    ) -> Result<String> {
82        let payload = json!({
83            "subscriber_id": subscriber_id,
84            "topics": topics,
85        });
86
87        let response = self
88            .client
89            .send_command("pubsub.subscribe", payload)
90            .await?;
91
92        // Server returns "subscriber_id"; check both field names for robustness.
93        let id = response["subscriber_id"]
94            .as_str()
95            .or_else(|| response["subscription_id"].as_str())
96            .unwrap_or_default();
97        Ok(id.to_string())
98    }
99
100    /// Unsubscribe from topics
101    pub async fn unsubscribe(&self, subscriber_id: &str, topics: Vec<String>) -> Result<()> {
102        let payload = json!({
103            "subscriber_id": subscriber_id,
104            "topics": topics,
105        });
106
107        self.client
108            .send_command("pubsub.unsubscribe", payload)
109            .await?;
110        Ok(())
111    }
112
113    /// List all active topics
114    pub async fn list_topics(&self) -> Result<Vec<String>> {
115        let response = self.client.send_command("pubsub.topics", json!({})).await?;
116        Ok(serde_json::from_value(response["topics"].clone())?)
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crate::SynapConfig;
124
125    #[test]
126    fn test_pubsub_manager_creation() {
127        let config = SynapConfig::new("http://localhost:15500");
128        let client = SynapClient::new(config).unwrap();
129        let pubsub = client.pubsub();
130
131        assert!(std::mem::size_of_val(&pubsub) > 0);
132    }
133
134    #[test]
135    fn test_pubsub_manager_clone() {
136        let config = SynapConfig::new("http://localhost:15500");
137        let client = SynapClient::new(config).unwrap();
138        let pubsub1 = client.pubsub();
139        let pubsub2 = pubsub1.clone();
140
141        assert!(std::mem::size_of_val(&pubsub1) > 0);
142        assert!(std::mem::size_of_val(&pubsub2) > 0);
143    }
144}