ckb-rpc 0.107.0

CKB RPC server.
Documentation
use ckb_jsonrpc_types::Topic;
use ckb_notify::NotifyController;
use jsonrpc_core::{Metadata, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{
    typed::{Sink, Subscriber},
    PubSubMetadata, Session, SubscriptionId,
};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    RwLock,
};
use tokio::runtime::Handle;

const SUBSCRIBER_NAME: &str = "TcpSubscription";

#[derive(Clone, Debug)]
pub struct SubscriptionSession {
    pub(crate) subscription_ids: Arc<RwLock<HashSet<SubscriptionId>>>,
    pub(crate) session: Arc<Session>,
}

impl SubscriptionSession {
    pub fn new(session: Session) -> Self {
        Self {
            subscription_ids: Arc::new(RwLock::new(HashSet::new())),
            session: Arc::new(session),
        }
    }
}

impl Metadata for SubscriptionSession {}

impl PubSubMetadata for SubscriptionSession {
    fn session(&self) -> Option<Arc<Session>> {
        Some(Arc::clone(&self.session))
    }
}

/// RPC Module Subscription that CKB node will push new messages to subscribers.
///
/// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of
/// TCP (enable with rpc.tcp_listen_address configuration option) and WebSocket (enable with
/// rpc.ws_listen_address).
///
/// ## Examples
///
/// TCP RPC subscription:
///
/// ```text
/// telnet localhost 18114
/// > {"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}
/// < {"jsonrpc":"2.0","result":0,"id":2}
/// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
///"subscription":0}}
/// < {"jsonrpc":"2.0","method":"subscribe","params":{"result":"...block header json...",
///"subscription":0}}
/// < ...
/// > {"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": [0]}
/// < {"jsonrpc":"2.0","result":true,"id":2}
/// ```
///
/// WebSocket RPC subscription:
///
/// ```javascript
/// let socket = new WebSocket("ws://localhost:28114")
///
/// socket.onmessage = function(event) {
///   console.log(`Data received from server: ${event.data}`);
/// }
///
/// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "subscribe", "params": ["new_tip_header"]}`)
///
/// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": [0]}`)
/// ```
#[allow(clippy::needless_return)]
#[rpc(server)]
pub trait SubscriptionRpc {
    /// Context to implement the subscription RPC.
    type Metadata;

    /// Subscribes to a topic.
    ///
    /// ## Params
    ///
    /// * `topic` - Subscription topic (enum: new_tip_header | new_tip_block | new_transaction | proposed_transaction | rejected_transaction)
    ///
    /// ## Returns
    ///
    /// This RPC returns the subscription ID as the result. CKB node will push messages in the subscribed
    /// topics to the current RPC connection. The subscript ID is also attached as
    /// `params.subscription` in the push messages.
    ///
    /// Example push message:
    ///
    /// ```json+skip
    /// {
    ///   "jsonrpc": "2.0",
    ///   "method": "subscribe",
    ///   "params": {
    ///     "result": { ... },
    ///     "subscription": "0x2a"
    ///   }
    /// }
    /// ```
    ///
    /// ## Topics
    ///
    /// ### `new_tip_header`
    ///
    /// Whenever there's a block that is appended to the canonical chain, the CKB node will publish the
    /// block header to subscribers.
    ///
    /// The type of the `params.result` in the push message is [`HeaderView`](../../ckb_jsonrpc_types/struct.HeaderView.html).
    ///
    /// ### `new_tip_block`
    ///
    /// Whenever there's a block that is appended to the canonical chain, the CKB node will publish the
    /// whole block to subscribers.
    ///
    /// The type of the `params.result` in the push message is [`BlockView`](../../ckb_jsonrpc_types/struct.BlockView.html).
    ///
    /// ### `new_transaction`
    ///
    /// Subscribers will get notified when a new transaction is submitted to the pool.
    ///
    /// The type of the `params.result` in the push message is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html).
    ///
    /// ### `proposed_transaction`
    ///
    /// Subscribers will get notified when an in-pool transaction is proposed by chain.
    ///
    /// The type of the `params.result` in the push message is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html).
    ///
    /// ### `rejected_transaction`
    ///
    /// Subscribers will get notified when a pending transaction is rejected by tx-pool.
    ///
    /// The type of the `params.result` in the push message is an array contain:
    ///
    /// The type of the `params.result` in the push message is a two-elements array, where
    ///
    /// -   the first item type is [`PoolTransactionEntry`](../../ckb_jsonrpc_types/struct.PoolTransactionEntry.html), and
    /// -   the second item type is [`PoolTransactionReject`](../../ckb_jsonrpc_types/struct.PoolTransactionReject.html).
    ///
    /// ## Examples
    ///
    /// Request
    ///
    /// ```json
    /// {
    ///   "id": 42,
    ///   "jsonrpc": "2.0",
    ///   "method": "subscribe",
    ///   "params": [
    ///     "new_tip_header"
    ///   ]
    /// }
    /// ```
    ///
    /// Response
    ///
    /// ```json
    /// {
    ///   "id": 42,
    ///   "jsonrpc": "2.0",
    ///   "result": "0x2a"
    /// }
    /// ```
    #[pubsub(subscription = "subscribe", subscribe, name = "subscribe")]
    fn subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<String>, topic: Topic);

    /// Unsubscribes from a subscribed topic.
    ///
    /// ## Params
    ///
    /// * `id` - Subscription ID
    ///
    /// ## Examples
    ///
    /// Request
    ///
    /// ```json
    /// {
    ///   "id": 42,
    ///   "jsonrpc": "2.0",
    ///   "method": "unsubscribe",
    ///   "params": [
    ///     "0x2a"
    ///   ]
    /// }
    /// ```
    ///
    /// Response
    ///
    /// ```json
    /// {
    ///   "id": 42,
    ///   "jsonrpc": "2.0",
    ///   "result": true
    /// }
    /// ```
    #[pubsub(subscription = "subscribe", unsubscribe, name = "unsubscribe")]
    fn unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
}

type Subscribers = HashMap<SubscriptionId, Sink<String>>;

#[derive(Default)]
pub struct SubscriptionRpcImpl {
    pub(crate) id_generator: AtomicUsize,
    pub(crate) subscribers: Arc<RwLock<HashMap<Topic, Subscribers>>>,
}

impl SubscriptionRpc for SubscriptionRpcImpl {
    type Metadata = Option<SubscriptionSession>;

    fn subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<String>, topic: Topic) {
        if let Some(session) = meta {
            let id = SubscriptionId::String(format!(
                "{:#x}",
                self.id_generator.fetch_add(1, Ordering::SeqCst)
            ));
            if let Ok(sink) = subscriber.assign_id(id.clone()) {
                let mut subscribers = self
                    .subscribers
                    .write()
                    .expect("acquiring subscribers write lock");
                subscribers
                    .entry(topic)
                    .or_default()
                    .insert(id.clone(), sink);

                session
                    .subscription_ids
                    .write()
                    .expect("acquiring subscription_ids write lock")
                    .insert(id);
            }
        }
    }

    fn unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
        let mut subscribers = self
            .subscribers
            .write()
            .expect("acquiring subscribers write lock");
        match meta {
            // unsubscribe handler method is explicitly called.
            Some(Some(session)) => {
                if session
                    .subscription_ids
                    .write()
                    .expect("acquiring subscription_ids write lock")
                    .remove(&id)
                {
                    Ok(subscribers.values_mut().any(|s| s.remove(&id).is_some()))
                } else {
                    Ok(false)
                }
            }
            // closed or dropped connection
            _ => {
                subscribers.values_mut().for_each(|s| {
                    s.remove(&id);
                });
                Ok(true)
            }
        }
    }
}

impl SubscriptionRpcImpl {
    pub fn new(notify_controller: NotifyController, handle: Handle) -> Self {
        let mut new_block_receiver =
            handle.block_on(notify_controller.subscribe_new_block(SUBSCRIBER_NAME.to_string()));
        let mut new_transaction_receiver = handle
            .block_on(notify_controller.subscribe_new_transaction(SUBSCRIBER_NAME.to_string()));
        let mut proposed_transaction_receiver = handle.block_on(
            notify_controller.subscribe_proposed_transaction(SUBSCRIBER_NAME.to_string()),
        );
        let mut reject_transaction_receiver = handle
            .block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));

        let subscription_rpc_impl = SubscriptionRpcImpl::default();
        let subscribers = Arc::clone(&subscription_rpc_impl.subscribers);
        handle.spawn(async move {
            loop {
                tokio::select! {
                    Some(block) = new_block_receiver.recv() => {
                        let subscribers = subscribers.read().expect("acquiring subscribers read lock");
                        if let Some(new_tip_header_subscribers) = subscribers.get(&Topic::NewTipHeader) {
                            let header: ckb_jsonrpc_types::HeaderView  = block.header().into();
                            let json_string = Ok(serde_json::to_string(&header).expect("serialization should be ok"));
                            for sink in new_tip_header_subscribers.values() {
                                let _ = sink.notify(json_string.clone());
                            }
                        }
                        if let Some(new_tip_block_subscribers) = subscribers.get(&Topic::NewTipBlock) {
                            let block: ckb_jsonrpc_types::BlockView  = block.into();
                            let json_string = Ok(serde_json::to_string(&block).expect("serialization should be ok"));
                            for sink in new_tip_block_subscribers.values() {
                                let _ = sink.notify(json_string.clone());
                            }
                        }
                    },
                    Some(tx_entry) = new_transaction_receiver.recv() => {
                        let subscribers = subscribers.read().expect("acquiring subscribers read lock");
                        if let Some(new_transaction_subscribers) = subscribers.get(&Topic::NewTransaction) {
                            let entry: ckb_jsonrpc_types::PoolTransactionEntry = tx_entry.into();
                            let json_string = Ok(serde_json::to_string(&entry).expect("serialization should be ok"));
                            for sink in new_transaction_subscribers.values() {
                                let _ = sink.notify(json_string.clone());
                            }
                        }
                    },
                    Some(tx_entry) = proposed_transaction_receiver.recv() => {
                        let subscribers = subscribers.read().expect("acquiring subscribers read lock");
                        if let Some(new_transaction_subscribers) = subscribers.get(&Topic::ProposedTransaction) {
                            let entry: ckb_jsonrpc_types::PoolTransactionEntry = tx_entry.into();
                            let json_string = Ok(serde_json::to_string(&entry).expect("serialization should be ok"));
                            for sink in new_transaction_subscribers.values() {
                                let _ = sink.notify(json_string.clone());
                            }
                        }
                    },
                    Some((tx_entry, reject)) = reject_transaction_receiver.recv() => {
                        let subscribers = subscribers.read().expect("acquiring subscribers read lock");
                        if let Some(new_transaction_subscribers) = subscribers.get(&Topic::RejectedTransaction) {
                            let entry: ckb_jsonrpc_types::PoolTransactionEntry = tx_entry.into();
                            let reject: ckb_jsonrpc_types::PoolTransactionReject = reject.into();
                            let json_string = Ok(serde_json::to_string(&(entry, reject)).expect("serialization should be ok"));
                            for sink in new_transaction_subscribers.values() {
                                let _ = sink.notify(json_string.clone());
                            }
                        }
                    }
                    else => break,
                }
            }
        });

        subscription_rpc_impl
    }
}