Skip to main content

synap_sdk/
pubsub_reactive.rs

1//! Reactive Pub/Sub operations
2//!
3//! When the client uses the `synap://` URL scheme (SynapRpc transport), push
4//! messages are received over a dedicated TCP push connection wired directly
5//! through the server's pub/sub router.  For `http://` / `https://` URLs the
6//! original WebSocket path is used instead.
7
8use crate::reactive::{MessageStream, SubscriptionHandle};
9use crate::types::PubSubMessage;
10use futures::{Stream, StreamExt};
11use serde_json::Value;
12use tokio::sync::mpsc;
13use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
14
15impl crate::pubsub::PubSubManager {
16    /// Observe messages from Pub/Sub topics reactively using WebSocket
17    ///
18    /// Returns a Stream of messages that are delivered in real-time via WebSocket.
19    /// Supports wildcard patterns:
20    /// - `user.*` - single-level wildcard
21    /// - `user.#` - multi-level wildcard
22    ///
23    /// # Arguments
24    /// * `subscriber_id` - Unique subscriber identifier
25    /// * `topics` - List of topics to subscribe to (supports wildcards)
26    ///
27    /// # Example
28    /// ```no_run
29    /// use futures::StreamExt;
30    /// use synap_sdk::{SynapClient, SynapConfig};
31    ///
32    /// # #[tokio::main]
33    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
34    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
35    /// let (mut stream, handle) = client.pubsub()
36    ///     .observe("subscriber-1", vec!["user.*".to_string(), "events.#".to_string()]);
37    ///
38    /// // Process messages reactively
39    /// while let Some(message) = stream.next().await {
40    ///     tracing::info!("Received on {}: {:?}", message.topic, message.data);
41    /// }
42    ///
43    /// // Stop subscribing
44    /// handle.unsubscribe();
45    /// # Ok(())
46    /// # }
47    /// ```
48    pub fn observe(
49        &self,
50        subscriber_id: impl Into<String>,
51        topics: Vec<String>,
52    ) -> (
53        impl Stream<Item = PubSubMessage> + 'static,
54        SubscriptionHandle,
55    ) {
56        let _subscriber_id = subscriber_id.into();
57        let client = self.client.clone();
58        let topics_clone = topics.clone();
59
60        let (tx, rx) = mpsc::unbounded_channel::<PubSubMessage>();
61        let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
62
63        tokio::spawn(async move {
64            // ── SynapRPC native push path ─────────────────────────────────────
65            if let Some(rpc) = client.synap_rpc_transport() {
66                match rpc.subscribe_push(topics_clone).await {
67                    Ok((_sub_id, mut push_rx)) => {
68                        tracing::debug!(
69                            sub_id = %_sub_id,
70                            "PubSub SynapRPC push connection established"
71                        );
72                        loop {
73                            tokio::select! {
74                                _ = cancel_rx.recv() => {
75                                    tracing::debug!("PubSub RPC stream cancelled");
76                                    break;
77                                }
78                                msg = push_rx.recv() => {
79                                    match msg {
80                                        Some(json) => {
81                                            // Push frame: { topic, payload (JSON string), id, timestamp }
82                                            if let Some(topic) = json.get("topic").and_then(|t| t.as_str()) {
83                                                // payload is a JSON-encoded string produced by
84                                                // serde_json::Value::to_string() on the server.
85                                                let data = match json.get("payload").and_then(|p| p.as_str()) {
86                                                    Some(s) => serde_json::from_str::<Value>(s).unwrap_or(Value::String(s.to_string())),
87                                                    None => json.get("payload").cloned().unwrap_or(Value::Null),
88                                                };
89                                                let pubsub_msg = PubSubMessage {
90                                                    topic: topic.to_string(),
91                                                    data,
92                                                    priority: None,
93                                                    headers: None,
94                                                };
95                                                if tx.send(pubsub_msg).is_err() {
96                                                    break; // downstream receiver dropped
97                                                }
98                                            }
99                                        }
100                                        None => {
101                                            tracing::debug!("PubSub RPC push connection closed");
102                                            break;
103                                        }
104                                    }
105                                }
106                            }
107                        }
108                    }
109                    Err(e) => {
110                        tracing::error!("SynapRPC subscribe_push failed: {}", e);
111                    }
112                }
113                return;
114            }
115
116            // ── WebSocket fallback (HTTP / HTTPS transport) ───────────────────
117            let base_url = client.base_url();
118            let ws_url = match base_url.scheme() {
119                "http" => format!("ws://{}", base_url.authority()),
120                "https" => format!("wss://{}", base_url.authority()),
121                _ => {
122                    tracing::error!(
123                        "Unsupported URL scheme for WebSocket PubSub: {}",
124                        base_url.scheme()
125                    );
126                    return;
127                }
128            };
129
130            let topics_query = topics_clone.join(",");
131            let ws_endpoint = format!("{}/pubsub/ws?topics={}", ws_url, topics_query);
132
133            tracing::debug!("Connecting to WebSocket: {}", ws_endpoint);
134
135            let ws_stream = match connect_async(&ws_endpoint).await {
136                Ok((stream, _)) => stream,
137                Err(e) => {
138                    tracing::error!("Failed to connect WebSocket: {}", e);
139                    return;
140                }
141            };
142
143            let (_write, mut read) = ws_stream.split();
144
145            loop {
146                tokio::select! {
147                    _ = cancel_rx.recv() => {
148                        tracing::debug!("PubSub stream cancelled");
149                        break;
150                    }
151                    msg = read.next() => {
152                        match msg {
153                            Some(Ok(WsMessage::Text(text))) => {
154                                match serde_json::from_str::<Value>(&text) {
155                                    Ok(json) => {
156                                        if let Some(msg_type) = json.get("type").and_then(|t| t.as_str()) {
157                                            match msg_type {
158                                                "connected" => {
159                                                    tracing::debug!("PubSub WebSocket connected: {:?}", json);
160                                                }
161                                                "message" | "publish" => {
162                                                    if let (Some(topic), Some(payload)) = (
163                                                        json.get("topic").and_then(|t| t.as_str()),
164                                                        json.get("payload")
165                                                    ) {
166                                                        let pubsub_msg = PubSubMessage {
167                                                            topic: topic.to_string(),
168                                                            data: payload.clone(),
169                                                            priority: json.get("priority").and_then(|p| p.as_u64().map(|u| u as u8)),
170                                                            headers: json.get("metadata").and_then(|h| serde_json::from_value(h.clone()).ok()),
171                                                        };
172                                                        if tx.send(pubsub_msg).is_err() {
173                                                            break;
174                                                        }
175                                                    }
176                                                }
177                                                "error" => {
178                                                    if let Some(error_msg) = json.get("error").and_then(|e| e.as_str()) {
179                                                        tracing::error!("PubSub WebSocket error: {}", error_msg);
180                                                    }
181                                                }
182                                                _ => {
183                                                    tracing::debug!("Unknown WS message type: {}", msg_type);
184                                                }
185                                            }
186                                        } else if let Some(topic) = json.get("topic").and_then(|t| t.as_str()) {
187                                            if let Some(payload_or_data) = json.get("payload").or_else(|| json.get("data")) {
188                                                let pubsub_msg = PubSubMessage {
189                                                    topic: topic.to_string(),
190                                                    data: payload_or_data.clone(),
191                                                    priority: json.get("priority").and_then(|p| p.as_u64().map(|u| u as u8)),
192                                                    headers: json.get("metadata")
193                                                        .or_else(|| json.get("headers"))
194                                                        .and_then(|h| serde_json::from_value(h.clone()).ok()),
195                                                };
196                                                if tx.send(pubsub_msg).is_err() {
197                                                    break;
198                                                }
199                                            }
200                                        }
201                                    }
202                                    Err(e) => {
203                                        tracing::warn!("Failed to parse WebSocket message: {}", e);
204                                    }
205                                }
206                            }
207                            Some(Ok(WsMessage::Close(_))) => {
208                                tracing::debug!("WebSocket closed by server");
209                                break;
210                            }
211                            Some(Ok(WsMessage::Ping(_data))) => {
212                                // pong handled automatically by tungstenite
213                            }
214                            Some(Ok(_)) => {}
215                            Some(Err(e)) => {
216                                tracing::error!("WebSocket error: {}", e);
217                                break;
218                            }
219                            None => {
220                                tracing::debug!("WebSocket stream ended");
221                                break;
222                            }
223                        }
224                    }
225                }
226            }
227
228            tracing::debug!("PubSub WebSocket connection closed");
229        });
230
231        let stream: MessageStream<PubSubMessage> =
232            Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
233        let handle = SubscriptionHandle::new(cancel_tx);
234
235        (stream, handle)
236    }
237
238    /// Observe messages from a single topic reactively
239    ///
240    /// Convenience method for subscribing to a single topic.
241    ///
242    /// # Example
243    /// ```no_run
244    /// use futures::StreamExt;
245    /// use synap_sdk::{SynapClient, SynapConfig};
246    ///
247    /// # #[tokio::main]
248    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
249    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
250    /// let (mut stream, handle) = client.pubsub()
251    ///     .observe_topic("subscriber-1", "user.events");
252    ///
253    /// while let Some(message) = stream.next().await {
254    ///     tracing::info!("Received: {:?}", message);
255    /// }
256    ///
257    /// handle.unsubscribe();
258    /// # Ok(())
259    /// # }
260    /// ```
261    pub fn observe_topic(
262        &self,
263        subscriber_id: impl Into<String>,
264        topic: impl Into<String>,
265    ) -> (
266        impl Stream<Item = PubSubMessage> + 'static,
267        SubscriptionHandle,
268    ) {
269        self.observe(subscriber_id, vec![topic.into()])
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use crate::SynapConfig;
276
277    #[tokio::test]
278    async fn test_pubsub_reactive_creation() {
279        let config = SynapConfig::new("http://localhost:15500");
280        let client = crate::SynapClient::new(config).unwrap();
281        let pubsub = client.pubsub();
282
283        // Just verify the method exists and compiles
284        // Note: This will spawn a tokio task but won't actually connect
285        // since we're not waiting for the connection to complete
286        let (_stream, _handle) = pubsub.observe("test-sub", vec!["test.topic".to_string()]);
287
288        // Immediately unsubscribe to clean up
289        _handle.unsubscribe();
290    }
291
292    #[tokio::test]
293    async fn test_pubsub_reactive_single_topic() {
294        let config = SynapConfig::new("http://localhost:15500");
295        let client = crate::SynapClient::new(config).unwrap();
296        let pubsub = client.pubsub();
297
298        // Just verify the method exists and compiles
299        // Note: This will spawn a tokio task but won't actually connect
300        // since we're not waiting for the connection to complete
301        let (_stream, _handle) = pubsub.observe_topic("test-sub", "test.topic");
302
303        // Immediately unsubscribe to clean up
304        _handle.unsubscribe();
305    }
306}