ckb_rpc/module/subscription.rs
1use async_trait::async_trait;
2use broadcast::error::RecvError;
3use ckb_async_runtime::Handle;
4use ckb_jsonrpc_types::Topic;
5use ckb_logger::error;
6use ckb_notify::NotifyController;
7use ckb_notify::{LogEntry, NOTIFY_CHANNEL_SIZE};
8use ckb_stop_handler::new_tokio_exit_rx;
9use futures_util::{Stream, stream::BoxStream};
10use jsonrpc_core::Result;
11use jsonrpc_utils::{pub_sub::PublishMsg, rpc};
12use tokio::sync::broadcast;
13
14/// RPC Module Subscription that CKB node will push new messages to subscribers, support with WebSocket or TCP.
15///
16/// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of
17/// TCP (enable with `rpc.tcp_listen_address` configuration option) and WebSocket (enable with
18/// `rpc.ws_listen_address`).
19///
20/// ###### Examples
21///
22/// TCP RPC subscription:
23///
24/// ```bash
25/// telnet localhost 18114
26/// > {"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}
27/// < {"jsonrpc":"2.0","result":"0x0","id":2}
28/// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
29///"subscription":0}}
30/// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
31///"subscription":0}}
32/// < ...
33/// > {"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["0x0"]}
34/// < {"jsonrpc":"2.0","result":true,"id":2}
35/// ```
36///
37/// WebSocket RPC subscription:
38///
39/// ```javascript
40/// let socket = new WebSocket("ws://localhost:28114")
41///
42/// socket.onmessage = function(event) {
43/// console.log(`Data received from server: ${event.data}`);
44/// }
45///
46/// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}`)
47///
48/// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["0x0"]}`)
49/// ```
50#[allow(clippy::needless_return)]
51#[rpc(openrpc)]
52#[async_trait]
53pub trait SubscriptionRpc {
54 /// Context to implement the subscription RPC.
55 ///
56 /// The stream of subscription messages.
57 type S: Stream<Item = PublishMsg<String>> + Send + 'static;
58 /// #### Method `subscribe`
59 /// Subscribes to a topic.
60 ///
61 /// ###### Params
62 ///
63 /// * `topic` - Subscription topic (enum: new_tip_header | new_tip_block | new_transaction | proposed_transaction | rejected_transaction | logs)
64 ///
65 /// ###### Returns
66 ///
67 /// This RPC returns the subscription ID as the result. CKB node will push messages in the subscribed
68 /// topics to the current RPC connection. The subscript ID is also attached as
69 /// `params.subscription` in the push messages.
70 ///
71 /// Example push message:
72 ///
73 /// ```json+skip
74 /// {
75 /// "jsonrpc": "2.0",
76 /// "method": "subscribe",
77 /// "params": {
78 /// "result": { ... },
79 /// "subscription": "0x2a"
80 /// }
81 /// }
82 /// ```
83 ///
84 /// ##### Topics
85 ///
86 /// ###### `new_tip_header`
87 ///
88 /// Whenever there's a block that is appended to the canonical chain, the CKB node will publish the
89 /// block header to subscribers.
90 ///
91 /// The type of the `params.result` in the push message is [`HeaderView`](../../ckb_jsonrpc_types/struct.HeaderView.html).
92 ///
93 /// ###### `new_tip_block`
94 ///
95 /// Whenever there's a block that is appended to the canonical chain, the CKB node will publish the
96 /// whole block to subscribers.
97 ///
98 /// The type of the `params.result` in the push message is [`BlockView`](../../ckb_jsonrpc_types/struct.BlockView.html).
99 ///
100 /// ###### `new_transaction`
101 ///
102 /// Subscribers will get notified when a new transaction is submitted to the pool.
103 ///
104 /// The type of the `params.result` in the push message is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html).
105 ///
106 /// ###### `proposed_transaction`
107 ///
108 /// Subscribers will get notified when an in-pool transaction is proposed by chain.
109 ///
110 /// The type of the `params.result` in the push message is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html).
111 ///
112 /// ###### `rejected_transaction`
113 ///
114 /// Subscribers will get notified when a pending transaction is rejected by tx-pool.
115 ///
116 /// The type of the `params.result` in the push message is an array contain:
117 ///
118 /// The type of the `params.result` in the push message is a two-elements array, where
119 ///
120 /// - the first item type is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html), and
121 /// - the second item type is [`PoolTransactionReject`](../../ckb_jsonrpc_types/struct.PoolTransactionReject.html).
122 ///
123 /// ###### `log`
124 ///
125 /// Subscribers will get notified when a new log message is generated.
126 ///
127 /// The type of the `params.result` in the push message is [`LogEntry`](../../ckb_jsonrpc_types/struct.LogEntry.html).
128 ///
129 /// ###### Examples
130 ///
131 /// Subscribe Request
132 ///
133 /// ```json
134 /// {
135 /// "id": 42,
136 /// "jsonrpc": "2.0",
137 /// "method": "subscribe",
138 /// "params": [
139 /// "new_tip_header"
140 /// ]
141 /// }
142 /// ```
143 ///
144 /// Subscribe Response
145 ///
146 /// ```json
147 /// {
148 /// "id": 42,
149 /// "jsonrpc": "2.0",
150 /// "result": "0xf3"
151 /// }
152 /// ```
153 ///
154 /// #### Method `unsubscribe`
155 /// * `unsubscribe(id)`
156 /// * `id`: `string`
157 /// * result: `boolean`
158 ///
159 /// Unsubscribes from a subscribed topic.
160 ///
161 /// ###### Params
162 /// * `id` - Subscription ID
163 ///
164 /// ###### Examples
165 ///
166 /// Unsubscribe Request
167 ///
168 /// ```json
169 /// {
170 /// "id": 42,
171 /// "jsonrpc": "2.0",
172 /// "method": "unsubscribe",
173 /// "params": [
174 /// "0xf3"
175 /// ]
176 /// }
177 /// ```
178 ///
179 /// Unsubscribe Response
180 ///
181 /// ```json
182 /// {
183 /// "id": 42,
184 /// "jsonrpc": "2.0",
185 /// "result": true
186 /// }
187 /// ```
188 ///
189 #[rpc(pub_sub(notify = "subscribe", unsubscribe = "unsubscribe"))]
190 fn subscribe(&self, topic: Topic) -> Result<Self::S>;
191}
192
193#[derive(Clone)]
194pub struct SubscriptionRpcImpl {
195 pub new_tip_header_sender: broadcast::Sender<PublishMsg<String>>,
196 pub new_tip_block_sender: broadcast::Sender<PublishMsg<String>>,
197 pub new_transaction_sender: broadcast::Sender<PublishMsg<String>>,
198 pub proposed_transaction_sender: broadcast::Sender<PublishMsg<String>>,
199 pub new_reject_transaction_sender: broadcast::Sender<PublishMsg<String>>,
200 pub log_sender: broadcast::Sender<PublishMsg<String>>,
201}
202
203macro_rules! publiser_send {
204 ($ty:ty, $info:expr, $sender:ident) => {{
205 let msg: $ty = $info.into();
206 let json_string = serde_json::to_string(&msg).expect("serialization should be ok");
207 drop($sender.send(PublishMsg::result(&json_string)));
208 }};
209}
210
211#[async_trait]
212impl SubscriptionRpc for SubscriptionRpcImpl {
213 type S = BoxStream<'static, PublishMsg<String>>;
214 fn subscribe(&self, topic: Topic) -> Result<Self::S> {
215 let tx = match topic {
216 Topic::NewTipHeader => self.new_tip_header_sender.clone(),
217 Topic::NewTipBlock => self.new_tip_block_sender.clone(),
218 Topic::NewTransaction => self.new_transaction_sender.clone(),
219 Topic::ProposedTransaction => self.proposed_transaction_sender.clone(),
220 Topic::RejectedTransaction => self.new_reject_transaction_sender.clone(),
221 Topic::Log => self.log_sender.clone(),
222 };
223 let mut rx = tx.subscribe();
224 Ok(Box::pin(async_stream::stream! {
225 loop {
226 match rx.recv().await {
227 Ok(msg) => {
228 yield msg;
229 }
230 Err(RecvError::Lagged(cnt)) => {
231 error!("subscription lagged error: {:?}", cnt);
232 }
233 Err(RecvError::Closed) => {
234 break;
235 }
236 }
237 }
238 }))
239 }
240}
241
242fn convert_log_entry(entry: LogEntry) -> ckb_jsonrpc_types::LogEntry {
243 use ckb_logger::Level;
244 let level = match entry.level {
245 Level::Error => ckb_jsonrpc_types::LogLevel::Error,
246 Level::Warn => ckb_jsonrpc_types::LogLevel::Warn,
247 Level::Info => ckb_jsonrpc_types::LogLevel::Info,
248 Level::Debug => ckb_jsonrpc_types::LogLevel::Debug,
249 Level::Trace => ckb_jsonrpc_types::LogLevel::Trace,
250 };
251 ckb_jsonrpc_types::LogEntry {
252 message: entry.message,
253 level,
254 date: entry.date,
255 target: entry.target,
256 }
257}
258
259impl SubscriptionRpcImpl {
260 pub fn new(notify_controller: NotifyController, handle: Handle) -> Self {
261 const SUBSCRIBER_NAME: &str = "TcpSubscription";
262
263 let mut new_block_receiver =
264 handle.block_on(notify_controller.subscribe_new_block(SUBSCRIBER_NAME.to_string()));
265 let mut new_transaction_receiver = handle
266 .block_on(notify_controller.subscribe_new_transaction(SUBSCRIBER_NAME.to_string()));
267 let mut proposed_transaction_receiver = handle.block_on(
268 notify_controller.subscribe_proposed_transaction(SUBSCRIBER_NAME.to_string()),
269 );
270 let mut reject_transaction_receiver = handle
271 .block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));
272 let mut log_receiver =
273 handle.block_on(notify_controller.subscribe_log(SUBSCRIBER_NAME.to_string()));
274
275 let (new_tip_header_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
276 let (new_tip_block_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
277 let (proposed_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
278 let (new_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
279 let (new_reject_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
280 let (log_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
281
282 let stop_rx = new_tokio_exit_rx();
283 handle.spawn({
284 let new_tip_header_sender = new_tip_header_sender.clone();
285 let new_tip_block_sender = new_tip_block_sender.clone();
286 let new_transaction_sender = new_transaction_sender.clone();
287 let proposed_transaction_sender = proposed_transaction_sender.clone();
288 let new_reject_transaction_sender = new_reject_transaction_sender.clone();
289 let log_sender = log_sender.clone();
290 async move {
291 loop {
292 tokio::select! {
293 Some(block) = new_block_receiver.recv() => {
294 publiser_send!(ckb_jsonrpc_types::HeaderView, block.header(), new_tip_header_sender);
295 publiser_send!(ckb_jsonrpc_types::BlockView, block, new_tip_block_sender);
296 },
297 Some(tx_entry) = new_transaction_receiver.recv() => {
298 publiser_send!(ckb_jsonrpc_types::PoolTransactionEntry, tx_entry, new_transaction_sender);
299 },
300 Some(tx_entry) = proposed_transaction_receiver.recv() => {
301 publiser_send!(ckb_jsonrpc_types::PoolTransactionEntry, tx_entry, proposed_transaction_sender);
302 },
303 Some((tx_entry, reject)) = reject_transaction_receiver.recv() => {
304 publiser_send!((ckb_jsonrpc_types::PoolTransactionEntry, ckb_jsonrpc_types::PoolTransactionReject),
305 (tx_entry.into(), reject.into()),
306 new_reject_transaction_sender);
307 },
308 Some(log_entry) = log_receiver.recv() => {
309 publiser_send!(ckb_jsonrpc_types::LogEntry, convert_log_entry(log_entry), log_sender);
310 },
311 _ = stop_rx.cancelled() => {
312 break;
313 },
314 else => {
315 error!("SubscriptionRpcImpl tokio::select! unexpected error");
316 break;
317 }
318 }
319 }
320 }
321 });
322
323 Self {
324 new_tip_header_sender,
325 new_tip_block_sender,
326 new_transaction_sender,
327 proposed_transaction_sender,
328 new_reject_transaction_sender,
329 log_sender,
330 }
331 }
332}