Skip to main content

qorechain/
subscribe.rs

1//! WebSocket subscriptions to the chain RPC `/websocket` endpoint, exposing
2//! typed helpers for new-block and transaction event subscriptions over
3//! JSON-RPC, mirroring the TS / Go SDKs.
4//!
5//! The transport is `tokio-tungstenite`; the framing follows the chain RPC's
6//! JSON-RPC subscribe protocol (method `"subscribe"`, a `"query"` param, and a
7//! per-subscription string id used to correlate pushed events). The pushed-frame
8//! routing is exposed via [`dispatch_frame`] so the framing can be exercised in
9//! tests without a live socket.
10
11use crate::error::{Error, Result};
12use futures_util::{SinkExt, StreamExt};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use tokio::sync::mpsc;
17use tokio_tungstenite::tungstenite::Message as WsMessage;
18
19/// The query matching every committed block.
20pub const QUERY_NEW_BLOCK: &str = "tm.event='NewBlock'";
21/// The query matching every committed transaction.
22pub const QUERY_TX: &str = "tm.event='Tx'";
23
24/// A single pushed subscription event.
25#[derive(Debug, Clone)]
26pub struct Event {
27    /// The subscription query that produced this event.
28    pub query: String,
29    /// The raw JSON of the event's `result.data` field.
30    pub data: Value,
31    /// The full raw JSON-RPC `result` object.
32    pub result: Value,
33}
34
35type Subs = Arc<Mutex<HashMap<String, mpsc::UnboundedSender<Event>>>>;
36
37/// A WebSocket subscription client for the chain RPC.
38pub struct SubscribeClient {
39    writer: mpsc::UnboundedSender<WsMessage>,
40    subs: Subs,
41    next_id: Arc<Mutex<u64>>,
42}
43
44/// A live subscription: a stream of [`Event`]s plus an `unsubscribe` handle.
45pub struct Subscription {
46    /// The receiver yielding pushed events for this subscription.
47    pub events: mpsc::UnboundedReceiver<Event>,
48    id: String,
49    query: String,
50    writer: mpsc::UnboundedSender<WsMessage>,
51    subs: Subs,
52}
53
54impl Subscription {
55    /// Cancels the subscription: removes the local handler and sends an
56    /// `unsubscribe` frame.
57    pub fn unsubscribe(&self) -> Result<()> {
58        self.subs.lock().unwrap().remove(&self.id);
59        let req = serde_json::json!({
60            "jsonrpc": "2.0",
61            "id": format!("{}-unsub", self.id),
62            "method": "unsubscribe",
63            "params": { "query": self.query },
64        });
65        self.writer
66            .send(WsMessage::Text(req.to_string()))
67            .map_err(|e| Error::Transport(format!("send unsubscribe: {e}")))
68    }
69}
70
71impl SubscribeClient {
72    /// Connects to the chain RPC WebSocket endpoint.
73    ///
74    /// `endpoint` may be a full `ws://` / `wss://` URL, or an `http://` /
75    /// `https://` base URL (the scheme is upgraded and `/websocket` appended if
76    /// absent). A background task pumps incoming frames to the per-subscription
77    /// channels until the connection closes.
78    pub async fn connect(endpoint: &str) -> Result<Self> {
79        let ws_url = normalize_ws_url(endpoint);
80        let (stream, _) = tokio_tungstenite::connect_async(&ws_url)
81            .await
82            .map_err(|e| Error::Transport(format!("dial {ws_url}: {e}")))?;
83        let (mut sink, mut source) = stream.split();
84
85        let (write_tx, mut write_rx) = mpsc::unbounded_channel::<WsMessage>();
86        let subs: Subs = Arc::new(Mutex::new(HashMap::new()));
87
88        // Writer task: forward outbound frames to the socket.
89        tokio::spawn(async move {
90            while let Some(msg) = write_rx.recv().await {
91                if sink.send(msg).await.is_err() {
92                    break;
93                }
94            }
95        });
96
97        // Reader task: dispatch inbound frames to subscription channels.
98        let reader_subs = subs.clone();
99        tokio::spawn(async move {
100            while let Some(Ok(msg)) = source.next().await {
101                if let WsMessage::Text(text) = msg {
102                    dispatch_frame(&reader_subs, text.as_bytes());
103                }
104            }
105        });
106
107        Ok(Self {
108            writer: write_tx,
109            subs,
110            next_id: Arc::new(Mutex::new(0)),
111        })
112    }
113
114    /// Registers a subscription for `query` and sends the subscribe request,
115    /// returning the [`Subscription`] handle.
116    pub fn subscribe(&self, query: &str) -> Result<Subscription> {
117        let id = {
118            let mut n = self.next_id.lock().unwrap();
119            *n += 1;
120            n.to_string()
121        };
122        let (tx, rx) = mpsc::unbounded_channel::<Event>();
123        self.subs.lock().unwrap().insert(id.clone(), tx);
124
125        let req = serde_json::json!({
126            "jsonrpc": "2.0",
127            "id": id,
128            "method": "subscribe",
129            "params": { "query": query },
130        });
131        self.writer
132            .send(WsMessage::Text(req.to_string()))
133            .map_err(|e| {
134                self.subs.lock().unwrap().remove(&id);
135                Error::Transport(format!("send subscribe: {e}"))
136            })?;
137
138        Ok(Subscription {
139            events: rx,
140            id,
141            query: query.to_string(),
142            writer: self.writer.clone(),
143            subs: self.subs.clone(),
144        })
145    }
146
147    /// Subscribes to committed blocks ([`QUERY_NEW_BLOCK`]).
148    pub fn subscribe_new_blocks(&self) -> Result<Subscription> {
149        self.subscribe(QUERY_NEW_BLOCK)
150    }
151
152    /// Subscribes to committed transactions matching `query`. An empty `query`
153    /// subscribes to all transactions; a query without `tm.event` is AND-ed with
154    /// [`QUERY_TX`].
155    pub fn subscribe_tx(&self, query: &str) -> Result<Subscription> {
156        let q = if query.trim().is_empty() {
157            QUERY_TX.to_string()
158        } else if !query.contains("tm.event") {
159            format!("{QUERY_TX} AND {query}")
160        } else {
161            query.to_string()
162        };
163        self.subscribe(&q)
164    }
165}
166
167/// Routes a raw incoming JSON-RPC frame to its subscription channel.
168///
169/// Exposed so the framing can be exercised in tests without a live socket: a
170/// subscribe ack (empty `result.data`) is ignored; a pushed event is delivered
171/// to the channel registered under the frame's `id`.
172pub fn dispatch_frame(subs: &Subs, frame: &[u8]) {
173    let v: Value = match serde_json::from_slice(frame) {
174        Ok(v) => v,
175        Err(_) => return,
176    };
177    let result = &v["result"];
178    let data = &result["data"];
179    // The subscribe ack carries an empty result.data; skip it.
180    if data.is_null() {
181        return;
182    }
183    let id = match v["id"].as_str() {
184        Some(id) => id.to_string(),
185        None => return,
186    };
187    let tx = {
188        let guard = subs.lock().unwrap();
189        match guard.get(&id) {
190            Some(tx) => tx.clone(),
191            None => return,
192        }
193    };
194    let _ = tx.send(Event {
195        query: result["query"].as_str().unwrap_or("").to_string(),
196        data: data.clone(),
197        result: result.clone(),
198    });
199}
200
201/// Builds a fresh subscription map (used by [`dispatch_frame`] in tests).
202pub fn new_subs() -> Subs {
203    Arc::new(Mutex::new(HashMap::new()))
204}
205
206/// Registers a channel under `id` in `subs` and returns the receiver. Exposed so
207/// the framing/dispatch can be exercised in tests without a live socket.
208pub fn register_for_test(subs: &Subs, id: &str) -> mpsc::UnboundedReceiver<Event> {
209    let (tx, rx) = mpsc::unbounded_channel::<Event>();
210    subs.lock().unwrap().insert(id.to_string(), tx);
211    rx
212}
213
214/// Converts an http(s)/ws(s) endpoint into a `ws(s)://` URL ending in
215/// `/websocket`.
216fn normalize_ws_url(endpoint: &str) -> String {
217    let mut u = if let Some(rest) = endpoint.strip_prefix("https://") {
218        format!("wss://{rest}")
219    } else if let Some(rest) = endpoint.strip_prefix("http://") {
220        format!("ws://{rest}")
221    } else {
222        endpoint.to_string()
223    };
224    while u.ends_with('/') {
225        u.pop();
226    }
227    if !u.ends_with("/websocket") {
228        u.push_str("/websocket");
229    }
230    u
231}
232
233/// The shared subscription-map type, exposed so tests can register channels and
234/// drive [`dispatch_frame`] directly.
235pub type SubscriptionMap = Subs;