use async_trait::async_trait;
use broadcast::error::RecvError;
use ckb_async_runtime::Handle;
use ckb_jsonrpc_types::Topic;
use ckb_logger::error;
use ckb_notify::NotifyController;
use ckb_notify::{LogEntry, NOTIFY_CHANNEL_SIZE};
use ckb_stop_handler::new_tokio_exit_rx;
use futures_util::{Stream, stream::BoxStream};
use jsonrpc_core::Result;
use jsonrpc_utils::{pub_sub::PublishMsg, rpc};
use tokio::sync::broadcast;
#[allow(clippy::needless_return)]
#[rpc(openrpc)]
#[async_trait]
pub trait SubscriptionRpc {
type S: Stream<Item = PublishMsg<String>> + Send + 'static;
#[rpc(pub_sub(notify = "subscribe", unsubscribe = "unsubscribe"))]
fn subscribe(&self, topic: Topic) -> Result<Self::S>;
}
#[derive(Clone)]
pub struct SubscriptionRpcImpl {
pub new_tip_header_sender: broadcast::Sender<PublishMsg<String>>,
pub new_tip_block_sender: broadcast::Sender<PublishMsg<String>>,
pub new_transaction_sender: broadcast::Sender<PublishMsg<String>>,
pub proposed_transaction_sender: broadcast::Sender<PublishMsg<String>>,
pub new_reject_transaction_sender: broadcast::Sender<PublishMsg<String>>,
pub log_sender: broadcast::Sender<PublishMsg<String>>,
}
macro_rules! publiser_send {
($ty:ty, $info:expr, $sender:ident) => {{
let msg: $ty = $info.into();
let json_string = serde_json::to_string(&msg).expect("serialization should be ok");
drop($sender.send(PublishMsg::result(&json_string)));
}};
}
#[async_trait]
impl SubscriptionRpc for SubscriptionRpcImpl {
type S = BoxStream<'static, PublishMsg<String>>;
fn subscribe(&self, topic: Topic) -> Result<Self::S> {
let tx = match topic {
Topic::NewTipHeader => self.new_tip_header_sender.clone(),
Topic::NewTipBlock => self.new_tip_block_sender.clone(),
Topic::NewTransaction => self.new_transaction_sender.clone(),
Topic::ProposedTransaction => self.proposed_transaction_sender.clone(),
Topic::RejectedTransaction => self.new_reject_transaction_sender.clone(),
Topic::Log => self.log_sender.clone(),
};
let mut rx = tx.subscribe();
Ok(Box::pin(async_stream::stream! {
loop {
match rx.recv().await {
Ok(msg) => {
yield msg;
}
Err(RecvError::Lagged(cnt)) => {
error!("subscription lagged error: {:?}", cnt);
}
Err(RecvError::Closed) => {
break;
}
}
}
}))
}
}
fn convert_log_entry(entry: LogEntry) -> ckb_jsonrpc_types::LogEntry {
use ckb_logger::Level;
let level = match entry.level {
Level::Error => ckb_jsonrpc_types::LogLevel::Error,
Level::Warn => ckb_jsonrpc_types::LogLevel::Warn,
Level::Info => ckb_jsonrpc_types::LogLevel::Info,
Level::Debug => ckb_jsonrpc_types::LogLevel::Debug,
Level::Trace => ckb_jsonrpc_types::LogLevel::Trace,
};
ckb_jsonrpc_types::LogEntry {
message: entry.message,
level,
date: entry.date,
target: entry.target,
}
}
impl SubscriptionRpcImpl {
pub fn new(notify_controller: NotifyController, handle: Handle) -> Self {
const SUBSCRIBER_NAME: &str = "TcpSubscription";
let mut new_block_receiver =
handle.block_on(notify_controller.subscribe_new_block(SUBSCRIBER_NAME.to_string()));
let mut new_transaction_receiver = handle
.block_on(notify_controller.subscribe_new_transaction(SUBSCRIBER_NAME.to_string()));
let mut proposed_transaction_receiver = handle.block_on(
notify_controller.subscribe_proposed_transaction(SUBSCRIBER_NAME.to_string()),
);
let mut reject_transaction_receiver = handle
.block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));
let mut log_receiver =
handle.block_on(notify_controller.subscribe_log(SUBSCRIBER_NAME.to_string()));
let (new_tip_header_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_tip_block_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (proposed_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_reject_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (log_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let stop_rx = new_tokio_exit_rx();
handle.spawn({
let new_tip_header_sender = new_tip_header_sender.clone();
let new_tip_block_sender = new_tip_block_sender.clone();
let new_transaction_sender = new_transaction_sender.clone();
let proposed_transaction_sender = proposed_transaction_sender.clone();
let new_reject_transaction_sender = new_reject_transaction_sender.clone();
let log_sender = log_sender.clone();
async move {
loop {
tokio::select! {
Some(block) = new_block_receiver.recv() => {
publiser_send!(ckb_jsonrpc_types::HeaderView, block.header(), new_tip_header_sender);
publiser_send!(ckb_jsonrpc_types::BlockView, block, new_tip_block_sender);
},
Some(tx_entry) = new_transaction_receiver.recv() => {
publiser_send!(ckb_jsonrpc_types::PoolTransactionEntry, tx_entry, new_transaction_sender);
},
Some(tx_entry) = proposed_transaction_receiver.recv() => {
publiser_send!(ckb_jsonrpc_types::PoolTransactionEntry, tx_entry, proposed_transaction_sender);
},
Some((tx_entry, reject)) = reject_transaction_receiver.recv() => {
publiser_send!((ckb_jsonrpc_types::PoolTransactionEntry, ckb_jsonrpc_types::PoolTransactionReject),
(tx_entry.into(), reject.into()),
new_reject_transaction_sender);
},
Some(log_entry) = log_receiver.recv() => {
publiser_send!(ckb_jsonrpc_types::LogEntry, convert_log_entry(log_entry), log_sender);
},
_ = stop_rx.cancelled() => {
break;
},
else => {
error!("SubscriptionRpcImpl tokio::select! unexpected error");
break;
}
}
}
}
});
Self {
new_tip_header_sender,
new_tip_block_sender,
new_transaction_sender,
proposed_transaction_sender,
new_reject_transaction_sender,
log_sender,
}
}
}