synap_sdk/pubsub_reactive.rs
1//! Reactive Pub/Sub operations
2//!
3//! When the client uses the `synap://` URL scheme (SynapRpc transport), push
4//! messages are received over a dedicated TCP push connection wired directly
5//! through the server's pub/sub router. For `http://` / `https://` URLs the
6//! original WebSocket path is used instead.
7
8use crate::reactive::{MessageStream, SubscriptionHandle};
9use crate::types::PubSubMessage;
10use futures::{Stream, StreamExt};
11use serde_json::Value;
12use tokio::sync::mpsc;
13use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
14
15impl crate::pubsub::PubSubManager {
16 /// Observe messages from Pub/Sub topics reactively using WebSocket
17 ///
18 /// Returns a Stream of messages that are delivered in real-time via WebSocket.
19 /// Supports wildcard patterns:
20 /// - `user.*` - single-level wildcard
21 /// - `user.#` - multi-level wildcard
22 ///
23 /// # Arguments
24 /// * `subscriber_id` - Unique subscriber identifier
25 /// * `topics` - List of topics to subscribe to (supports wildcards)
26 ///
27 /// # Example
28 /// ```no_run
29 /// use futures::StreamExt;
30 /// use synap_sdk::{SynapClient, SynapConfig};
31 ///
32 /// # #[tokio::main]
33 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
34 /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
35 /// let (mut stream, handle) = client.pubsub()
36 /// .observe("subscriber-1", vec!["user.*".to_string(), "events.#".to_string()]);
37 ///
38 /// // Process messages reactively
39 /// while let Some(message) = stream.next().await {
40 /// tracing::info!("Received on {}: {:?}", message.topic, message.data);
41 /// }
42 ///
43 /// // Stop subscribing
44 /// handle.unsubscribe();
45 /// # Ok(())
46 /// # }
47 /// ```
48 pub fn observe(
49 &self,
50 subscriber_id: impl Into<String>,
51 topics: Vec<String>,
52 ) -> (
53 impl Stream<Item = PubSubMessage> + 'static,
54 SubscriptionHandle,
55 ) {
56 let _subscriber_id = subscriber_id.into();
57 let client = self.client.clone();
58 let topics_clone = topics.clone();
59
60 let (tx, rx) = mpsc::unbounded_channel::<PubSubMessage>();
61 let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
62
63 tokio::spawn(async move {
64 // ── SynapRPC native push path ─────────────────────────────────────
65 if let Some(rpc) = client.synap_rpc_transport() {
66 match rpc.subscribe_push(topics_clone).await {
67 Ok((_sub_id, mut push_rx)) => {
68 tracing::debug!(
69 sub_id = %_sub_id,
70 "PubSub SynapRPC push connection established"
71 );
72 loop {
73 tokio::select! {
74 _ = cancel_rx.recv() => {
75 tracing::debug!("PubSub RPC stream cancelled");
76 break;
77 }
78 msg = push_rx.recv() => {
79 match msg {
80 Some(json) => {
81 // Push frame: { topic, payload (JSON string), id, timestamp }
82 if let Some(topic) = json.get("topic").and_then(|t| t.as_str()) {
83 // payload is a JSON-encoded string produced by
84 // serde_json::Value::to_string() on the server.
85 let data = match json.get("payload").and_then(|p| p.as_str()) {
86 Some(s) => serde_json::from_str::<Value>(s).unwrap_or(Value::String(s.to_string())),
87 None => json.get("payload").cloned().unwrap_or(Value::Null),
88 };
89 let pubsub_msg = PubSubMessage {
90 topic: topic.to_string(),
91 data,
92 priority: None,
93 headers: None,
94 };
95 if tx.send(pubsub_msg).is_err() {
96 break; // downstream receiver dropped
97 }
98 }
99 }
100 None => {
101 tracing::debug!("PubSub RPC push connection closed");
102 break;
103 }
104 }
105 }
106 }
107 }
108 }
109 Err(e) => {
110 tracing::error!("SynapRPC subscribe_push failed: {}", e);
111 }
112 }
113 return;
114 }
115
116 // ── WebSocket fallback (HTTP / HTTPS transport) ───────────────────
117 let base_url = client.base_url();
118 let ws_url = match base_url.scheme() {
119 "http" => format!("ws://{}", base_url.authority()),
120 "https" => format!("wss://{}", base_url.authority()),
121 _ => {
122 tracing::error!(
123 "Unsupported URL scheme for WebSocket PubSub: {}",
124 base_url.scheme()
125 );
126 return;
127 }
128 };
129
130 let topics_query = topics_clone.join(",");
131 let ws_endpoint = format!("{}/pubsub/ws?topics={}", ws_url, topics_query);
132
133 tracing::debug!("Connecting to WebSocket: {}", ws_endpoint);
134
135 let ws_stream = match connect_async(&ws_endpoint).await {
136 Ok((stream, _)) => stream,
137 Err(e) => {
138 tracing::error!("Failed to connect WebSocket: {}", e);
139 return;
140 }
141 };
142
143 let (_write, mut read) = ws_stream.split();
144
145 loop {
146 tokio::select! {
147 _ = cancel_rx.recv() => {
148 tracing::debug!("PubSub stream cancelled");
149 break;
150 }
151 msg = read.next() => {
152 match msg {
153 Some(Ok(WsMessage::Text(text))) => {
154 match serde_json::from_str::<Value>(&text) {
155 Ok(json) => {
156 if let Some(msg_type) = json.get("type").and_then(|t| t.as_str()) {
157 match msg_type {
158 "connected" => {
159 tracing::debug!("PubSub WebSocket connected: {:?}", json);
160 }
161 "message" | "publish" => {
162 if let (Some(topic), Some(payload)) = (
163 json.get("topic").and_then(|t| t.as_str()),
164 json.get("payload")
165 ) {
166 let pubsub_msg = PubSubMessage {
167 topic: topic.to_string(),
168 data: payload.clone(),
169 priority: json.get("priority").and_then(|p| p.as_u64().map(|u| u as u8)),
170 headers: json.get("metadata").and_then(|h| serde_json::from_value(h.clone()).ok()),
171 };
172 if tx.send(pubsub_msg).is_err() {
173 break;
174 }
175 }
176 }
177 "error" => {
178 if let Some(error_msg) = json.get("error").and_then(|e| e.as_str()) {
179 tracing::error!("PubSub WebSocket error: {}", error_msg);
180 }
181 }
182 _ => {
183 tracing::debug!("Unknown WS message type: {}", msg_type);
184 }
185 }
186 } else if let Some(topic) = json.get("topic").and_then(|t| t.as_str()) {
187 if let Some(payload_or_data) = json.get("payload").or_else(|| json.get("data")) {
188 let pubsub_msg = PubSubMessage {
189 topic: topic.to_string(),
190 data: payload_or_data.clone(),
191 priority: json.get("priority").and_then(|p| p.as_u64().map(|u| u as u8)),
192 headers: json.get("metadata")
193 .or_else(|| json.get("headers"))
194 .and_then(|h| serde_json::from_value(h.clone()).ok()),
195 };
196 if tx.send(pubsub_msg).is_err() {
197 break;
198 }
199 }
200 }
201 }
202 Err(e) => {
203 tracing::warn!("Failed to parse WebSocket message: {}", e);
204 }
205 }
206 }
207 Some(Ok(WsMessage::Close(_))) => {
208 tracing::debug!("WebSocket closed by server");
209 break;
210 }
211 Some(Ok(WsMessage::Ping(_data))) => {
212 // pong handled automatically by tungstenite
213 }
214 Some(Ok(_)) => {}
215 Some(Err(e)) => {
216 tracing::error!("WebSocket error: {}", e);
217 break;
218 }
219 None => {
220 tracing::debug!("WebSocket stream ended");
221 break;
222 }
223 }
224 }
225 }
226 }
227
228 tracing::debug!("PubSub WebSocket connection closed");
229 });
230
231 let stream: MessageStream<PubSubMessage> =
232 Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
233 let handle = SubscriptionHandle::new(cancel_tx);
234
235 (stream, handle)
236 }
237
238 /// Observe messages from a single topic reactively
239 ///
240 /// Convenience method for subscribing to a single topic.
241 ///
242 /// # Example
243 /// ```no_run
244 /// use futures::StreamExt;
245 /// use synap_sdk::{SynapClient, SynapConfig};
246 ///
247 /// # #[tokio::main]
248 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
249 /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
250 /// let (mut stream, handle) = client.pubsub()
251 /// .observe_topic("subscriber-1", "user.events");
252 ///
253 /// while let Some(message) = stream.next().await {
254 /// tracing::info!("Received: {:?}", message);
255 /// }
256 ///
257 /// handle.unsubscribe();
258 /// # Ok(())
259 /// # }
260 /// ```
261 pub fn observe_topic(
262 &self,
263 subscriber_id: impl Into<String>,
264 topic: impl Into<String>,
265 ) -> (
266 impl Stream<Item = PubSubMessage> + 'static,
267 SubscriptionHandle,
268 ) {
269 self.observe(subscriber_id, vec![topic.into()])
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use crate::SynapConfig;
276
277 #[tokio::test]
278 async fn test_pubsub_reactive_creation() {
279 let config = SynapConfig::new("http://localhost:15500");
280 let client = crate::SynapClient::new(config).unwrap();
281 let pubsub = client.pubsub();
282
283 // Just verify the method exists and compiles
284 // Note: This will spawn a tokio task but won't actually connect
285 // since we're not waiting for the connection to complete
286 let (_stream, _handle) = pubsub.observe("test-sub", vec!["test.topic".to_string()]);
287
288 // Immediately unsubscribe to clean up
289 _handle.unsubscribe();
290 }
291
292 #[tokio::test]
293 async fn test_pubsub_reactive_single_topic() {
294 let config = SynapConfig::new("http://localhost:15500");
295 let client = crate::SynapClient::new(config).unwrap();
296 let pubsub = client.pubsub();
297
298 // Just verify the method exists and compiles
299 // Note: This will spawn a tokio task but won't actually connect
300 // since we're not waiting for the connection to complete
301 let (_stream, _handle) = pubsub.observe_topic("test-sub", "test.topic");
302
303 // Immediately unsubscribe to clean up
304 _handle.unsubscribe();
305 }
306}