Skip to main content

ckb_rpc/module/
subscription.rs

1use async_trait::async_trait;
2use broadcast::error::RecvError;
3use ckb_async_runtime::Handle;
4use ckb_jsonrpc_types::Topic;
5use ckb_logger::error;
6use ckb_notify::NotifyController;
7use ckb_notify::{LogEntry, NOTIFY_CHANNEL_SIZE};
8use ckb_stop_handler::new_tokio_exit_rx;
9use futures_util::{Stream, stream::BoxStream};
10use jsonrpc_core::Result;
11use jsonrpc_utils::{pub_sub::PublishMsg, rpc};
12use tokio::sync::broadcast;
13
14/// RPC Module Subscription that CKB node will push new messages to subscribers, support with WebSocket or TCP.
15///
16/// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of
17/// TCP (enable with `rpc.tcp_listen_address` configuration option) and WebSocket (enable with
18/// `rpc.ws_listen_address`).
19///
20/// ###### Examples
21///
22/// TCP RPC subscription:
23///
24/// ```bash
25/// telnet localhost 18114
26/// > {"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}
27/// < {"jsonrpc":"2.0","result":"0x0","id":2}
28/// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
29///"subscription":0}}
30/// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
31///"subscription":0}}
32/// < ...
33/// > {"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["0x0"]}
34/// < {"jsonrpc":"2.0","result":true,"id":2}
35/// ```
36///
37/// WebSocket RPC subscription:
38///
39/// ```javascript
40/// let socket = new WebSocket("ws://localhost:28114")
41///
42/// socket.onmessage = function(event) {
43///   console.log(`Data received from server: ${event.data}`);
44/// }
45///
46/// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}`)
47///
48/// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["0x0"]}`)
49/// ```
50#[allow(clippy::needless_return)]
51#[rpc(openrpc)]
52#[async_trait]
53pub trait SubscriptionRpc {
54    /// Context to implement the subscription RPC.
55    ///
56    /// The stream of subscription messages.
57    type S: Stream<Item = PublishMsg<String>> + Send + 'static;
58    /// #### Method `subscribe`
59    /// Subscribes to a topic.
60    ///
61    /// ###### Params
62    ///
63    /// * `topic` - Subscription topic (enum: new_tip_header | new_tip_block | new_transaction | proposed_transaction | rejected_transaction | logs)
64    ///
65    /// ###### Returns
66    ///
67    /// This RPC returns the subscription ID as the result. CKB node will push messages in the subscribed
68    /// topics to the current RPC connection. The subscript ID is also attached as
69    /// `params.subscription` in the push messages.
70    ///
71    /// Example push message:
72    ///
73    /// ```json+skip
74    /// {
75    ///   "jsonrpc": "2.0",
76    ///   "method": "subscribe",
77    ///   "params": {
78    ///     "result": { ... },
79    ///     "subscription": "0x2a"
80    ///   }
81    /// }
82    /// ```
83    ///
84    /// ##### Topics
85    ///
86    /// ###### `new_tip_header`
87    ///
88    /// Whenever there's a block that is appended to the canonical chain, the CKB node will publish the
89    /// block header to subscribers.
90    ///
91    /// The type of the `params.result` in the push message is [`HeaderView`](../../ckb_jsonrpc_types/struct.HeaderView.html).
92    ///
93    /// ###### `new_tip_block`
94    ///
95    /// Whenever there's a block that is appended to the canonical chain, the CKB node will publish the
96    /// whole block to subscribers.
97    ///
98    /// The type of the `params.result` in the push message is [`BlockView`](../../ckb_jsonrpc_types/struct.BlockView.html).
99    ///
100    /// ###### `new_transaction`
101    ///
102    /// Subscribers will get notified when a new transaction is submitted to the pool.
103    ///
104    /// The type of the `params.result` in the push message is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html).
105    ///
106    /// ###### `proposed_transaction`
107    ///
108    /// Subscribers will get notified when an in-pool transaction is proposed by chain.
109    ///
110    /// The type of the `params.result` in the push message is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html).
111    ///
112    /// ###### `rejected_transaction`
113    ///
114    /// Subscribers will get notified when a pending transaction is rejected by tx-pool.
115    ///
116    /// The type of the `params.result` in the push message is an array contain:
117    ///
118    /// The type of the `params.result` in the push message is a two-elements array, where
119    ///
120    /// -   the first item type is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html), and
121    /// -   the second item type is [`PoolTransactionReject`](../../ckb_jsonrpc_types/struct.PoolTransactionReject.html).
122    ///
123    /// ###### `log`
124    ///
125    /// Subscribers will get notified when a new log message is generated.
126    ///
127    /// The type of the `params.result` in the push message is [`LogEntry`](../../ckb_jsonrpc_types/struct.LogEntry.html).
128    ///
129    /// ###### Examples
130    ///
131    /// Subscribe Request
132    ///
133    /// ```json
134    /// {
135    ///   "id": 42,
136    ///   "jsonrpc": "2.0",
137    ///   "method": "subscribe",
138    ///   "params": [
139    ///     "new_tip_header"
140    ///   ]
141    /// }
142    /// ```
143    ///
144    /// Subscribe Response
145    ///
146    /// ```json
147    /// {
148    ///   "id": 42,
149    ///   "jsonrpc": "2.0",
150    ///   "result": "0xf3"
151    /// }
152    /// ```
153    ///
154    /// #### Method `unsubscribe`
155    /// * `unsubscribe(id)`
156    ///     * `id`: `string`
157    /// * result: `boolean`
158    ///
159    /// Unsubscribes from a subscribed topic.
160    ///
161    /// ###### Params
162    /// *   `id` - Subscription ID
163    ///
164    /// ###### Examples
165    ///
166    /// Unsubscribe Request
167    ///
168    /// ```json
169    /// {
170    ///   "id": 42,
171    ///   "jsonrpc": "2.0",
172    ///   "method": "unsubscribe",
173    ///   "params": [
174    ///     "0xf3"
175    ///   ]
176    /// }
177    /// ```
178    ///
179    /// Unsubscribe Response
180    ///
181    /// ```json
182    /// {
183    ///  "id": 42,
184    ///  "jsonrpc": "2.0",
185    ///  "result": true
186    /// }
187    /// ```
188    ///
189    #[rpc(pub_sub(notify = "subscribe", unsubscribe = "unsubscribe"))]
190    fn subscribe(&self, topic: Topic) -> Result<Self::S>;
191}
192
193#[derive(Clone)]
194pub struct SubscriptionRpcImpl {
195    pub new_tip_header_sender: broadcast::Sender<PublishMsg<String>>,
196    pub new_tip_block_sender: broadcast::Sender<PublishMsg<String>>,
197    pub new_transaction_sender: broadcast::Sender<PublishMsg<String>>,
198    pub proposed_transaction_sender: broadcast::Sender<PublishMsg<String>>,
199    pub new_reject_transaction_sender: broadcast::Sender<PublishMsg<String>>,
200    pub log_sender: broadcast::Sender<PublishMsg<String>>,
201}
202
203macro_rules! publiser_send {
204    ($ty:ty, $info:expr, $sender:ident) => {{
205        let msg: $ty = $info.into();
206        let json_string = serde_json::to_string(&msg).expect("serialization should be ok");
207        drop($sender.send(PublishMsg::result(&json_string)));
208    }};
209}
210
211#[async_trait]
212impl SubscriptionRpc for SubscriptionRpcImpl {
213    type S = BoxStream<'static, PublishMsg<String>>;
214    fn subscribe(&self, topic: Topic) -> Result<Self::S> {
215        let tx = match topic {
216            Topic::NewTipHeader => self.new_tip_header_sender.clone(),
217            Topic::NewTipBlock => self.new_tip_block_sender.clone(),
218            Topic::NewTransaction => self.new_transaction_sender.clone(),
219            Topic::ProposedTransaction => self.proposed_transaction_sender.clone(),
220            Topic::RejectedTransaction => self.new_reject_transaction_sender.clone(),
221            Topic::Log => self.log_sender.clone(),
222        };
223        let mut rx = tx.subscribe();
224        Ok(Box::pin(async_stream::stream! {
225                loop {
226                    match rx.recv().await {
227                        Ok(msg) => {
228                            yield msg;
229                        }
230                        Err(RecvError::Lagged(cnt)) => {
231                            error!("subscription lagged error: {:?}", cnt);
232                        }
233                        Err(RecvError::Closed) => {
234                            break;
235                        }
236                    }
237                }
238        }))
239    }
240}
241
242fn convert_log_entry(entry: LogEntry) -> ckb_jsonrpc_types::LogEntry {
243    use ckb_logger::Level;
244    let level = match entry.level {
245        Level::Error => ckb_jsonrpc_types::LogLevel::Error,
246        Level::Warn => ckb_jsonrpc_types::LogLevel::Warn,
247        Level::Info => ckb_jsonrpc_types::LogLevel::Info,
248        Level::Debug => ckb_jsonrpc_types::LogLevel::Debug,
249        Level::Trace => ckb_jsonrpc_types::LogLevel::Trace,
250    };
251    ckb_jsonrpc_types::LogEntry {
252        message: entry.message,
253        level,
254        date: entry.date,
255        target: entry.target,
256    }
257}
258
259impl SubscriptionRpcImpl {
260    pub fn new(notify_controller: NotifyController, handle: Handle) -> Self {
261        const SUBSCRIBER_NAME: &str = "TcpSubscription";
262
263        let mut new_block_receiver =
264            handle.block_on(notify_controller.subscribe_new_block(SUBSCRIBER_NAME.to_string()));
265        let mut new_transaction_receiver = handle
266            .block_on(notify_controller.subscribe_new_transaction(SUBSCRIBER_NAME.to_string()));
267        let mut proposed_transaction_receiver = handle.block_on(
268            notify_controller.subscribe_proposed_transaction(SUBSCRIBER_NAME.to_string()),
269        );
270        let mut reject_transaction_receiver = handle
271            .block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));
272        let mut log_receiver =
273            handle.block_on(notify_controller.subscribe_log(SUBSCRIBER_NAME.to_string()));
274
275        let (new_tip_header_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
276        let (new_tip_block_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
277        let (proposed_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
278        let (new_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
279        let (new_reject_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
280        let (log_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
281
282        let stop_rx = new_tokio_exit_rx();
283        handle.spawn({
284            let new_tip_header_sender = new_tip_header_sender.clone();
285            let new_tip_block_sender = new_tip_block_sender.clone();
286            let new_transaction_sender = new_transaction_sender.clone();
287            let proposed_transaction_sender = proposed_transaction_sender.clone();
288            let new_reject_transaction_sender = new_reject_transaction_sender.clone();
289            let log_sender = log_sender.clone();
290            async move {
291                loop {
292                    tokio::select! {
293                        Some(block) = new_block_receiver.recv() => {
294                            publiser_send!(ckb_jsonrpc_types::HeaderView, block.header(), new_tip_header_sender);
295                            publiser_send!(ckb_jsonrpc_types::BlockView, block, new_tip_block_sender);
296                        },
297                        Some(tx_entry) = new_transaction_receiver.recv() => {
298                            publiser_send!(ckb_jsonrpc_types::PoolTransactionEntry, tx_entry, new_transaction_sender);
299                        },
300                        Some(tx_entry) = proposed_transaction_receiver.recv() => {
301                            publiser_send!(ckb_jsonrpc_types::PoolTransactionEntry, tx_entry, proposed_transaction_sender);
302                        },
303                        Some((tx_entry, reject)) = reject_transaction_receiver.recv() => {
304                            publiser_send!((ckb_jsonrpc_types::PoolTransactionEntry, ckb_jsonrpc_types::PoolTransactionReject),
305                                            (tx_entry.into(), reject.into()),
306                                            new_reject_transaction_sender);
307                        },
308                        Some(log_entry) = log_receiver.recv() => {
309                            publiser_send!(ckb_jsonrpc_types::LogEntry, convert_log_entry(log_entry), log_sender);
310                        },
311                        _ = stop_rx.cancelled() => {
312                            break;
313                        },
314                        else => {
315                            error!("SubscriptionRpcImpl tokio::select! unexpected error");
316                            break;
317                        }
318                    }
319                }
320            }
321        });
322
323        Self {
324            new_tip_header_sender,
325            new_tip_block_sender,
326            new_transaction_sender,
327            proposed_transaction_sender,
328            new_reject_transaction_sender,
329            log_sender,
330        }
331    }
332}