use ckb_jsonrpc_types::Topic;
use ckb_notify::NotifyController;
use jsonrpc_core::{Metadata, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{
typed::{Sink, Subscriber},
PubSubMetadata, Session, SubscriptionId,
};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
RwLock,
};
use tokio::runtime::Handle;
const SUBSCRIBER_NAME: &str = "TcpSubscription";
#[derive(Clone, Debug)]
pub struct SubscriptionSession {
pub(crate) subscription_ids: Arc<RwLock<HashSet<SubscriptionId>>>,
pub(crate) session: Arc<Session>,
}
impl SubscriptionSession {
pub fn new(session: Session) -> Self {
Self {
subscription_ids: Arc::new(RwLock::new(HashSet::new())),
session: Arc::new(session),
}
}
}
impl Metadata for SubscriptionSession {}
impl PubSubMetadata for SubscriptionSession {
fn session(&self) -> Option<Arc<Session>> {
Some(Arc::clone(&self.session))
}
}
#[allow(clippy::needless_return)]
#[rpc(server)]
pub trait SubscriptionRpc {
type Metadata;
#[pubsub(subscription = "subscribe", subscribe, name = "subscribe")]
fn subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<String>, topic: Topic);
#[pubsub(subscription = "subscribe", unsubscribe, name = "unsubscribe")]
fn unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
}
type Subscribers = HashMap<SubscriptionId, Sink<String>>;
#[derive(Default)]
pub struct SubscriptionRpcImpl {
pub(crate) id_generator: AtomicUsize,
pub(crate) subscribers: Arc<RwLock<HashMap<Topic, Subscribers>>>,
}
impl SubscriptionRpc for SubscriptionRpcImpl {
type Metadata = Option<SubscriptionSession>;
fn subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<String>, topic: Topic) {
if let Some(session) = meta {
let id = SubscriptionId::String(format!(
"{:#x}",
self.id_generator.fetch_add(1, Ordering::SeqCst)
));
if let Ok(sink) = subscriber.assign_id(id.clone()) {
let mut subscribers = self
.subscribers
.write()
.expect("acquiring subscribers write lock");
subscribers
.entry(topic)
.or_default()
.insert(id.clone(), sink);
session
.subscription_ids
.write()
.expect("acquiring subscription_ids write lock")
.insert(id);
}
}
}
fn unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
let mut subscribers = self
.subscribers
.write()
.expect("acquiring subscribers write lock");
match meta {
Some(Some(session)) => {
if session
.subscription_ids
.write()
.expect("acquiring subscription_ids write lock")
.remove(&id)
{
Ok(subscribers.values_mut().any(|s| s.remove(&id).is_some()))
} else {
Ok(false)
}
}
_ => {
subscribers.values_mut().for_each(|s| {
s.remove(&id);
});
Ok(true)
}
}
}
}
impl SubscriptionRpcImpl {
pub fn new(notify_controller: NotifyController, handle: Handle) -> Self {
let mut new_block_receiver =
handle.block_on(notify_controller.subscribe_new_block(SUBSCRIBER_NAME.to_string()));
let mut new_transaction_receiver = handle
.block_on(notify_controller.subscribe_new_transaction(SUBSCRIBER_NAME.to_string()));
let mut proposed_transaction_receiver = handle.block_on(
notify_controller.subscribe_proposed_transaction(SUBSCRIBER_NAME.to_string()),
);
let mut reject_transaction_receiver = handle
.block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));
let subscription_rpc_impl = SubscriptionRpcImpl::default();
let subscribers = Arc::clone(&subscription_rpc_impl.subscribers);
handle.spawn(async move {
loop {
tokio::select! {
Some(block) = new_block_receiver.recv() => {
let subscribers = subscribers.read().expect("acquiring subscribers read lock");
if let Some(new_tip_header_subscribers) = subscribers.get(&Topic::NewTipHeader) {
let header: ckb_jsonrpc_types::HeaderView = block.header().into();
let json_string = Ok(serde_json::to_string(&header).expect("serialization should be ok"));
for sink in new_tip_header_subscribers.values() {
let _ = sink.notify(json_string.clone());
}
}
if let Some(new_tip_block_subscribers) = subscribers.get(&Topic::NewTipBlock) {
let block: ckb_jsonrpc_types::BlockView = block.into();
let json_string = Ok(serde_json::to_string(&block).expect("serialization should be ok"));
for sink in new_tip_block_subscribers.values() {
let _ = sink.notify(json_string.clone());
}
}
},
Some(tx_entry) = new_transaction_receiver.recv() => {
let subscribers = subscribers.read().expect("acquiring subscribers read lock");
if let Some(new_transaction_subscribers) = subscribers.get(&Topic::NewTransaction) {
let entry: ckb_jsonrpc_types::PoolTransactionEntry = tx_entry.into();
let json_string = Ok(serde_json::to_string(&entry).expect("serialization should be ok"));
for sink in new_transaction_subscribers.values() {
let _ = sink.notify(json_string.clone());
}
}
},
Some(tx_entry) = proposed_transaction_receiver.recv() => {
let subscribers = subscribers.read().expect("acquiring subscribers read lock");
if let Some(new_transaction_subscribers) = subscribers.get(&Topic::ProposedTransaction) {
let entry: ckb_jsonrpc_types::PoolTransactionEntry = tx_entry.into();
let json_string = Ok(serde_json::to_string(&entry).expect("serialization should be ok"));
for sink in new_transaction_subscribers.values() {
let _ = sink.notify(json_string.clone());
}
}
},
Some((tx_entry, reject)) = reject_transaction_receiver.recv() => {
let subscribers = subscribers.read().expect("acquiring subscribers read lock");
if let Some(new_transaction_subscribers) = subscribers.get(&Topic::RejectedTransaction) {
let entry: ckb_jsonrpc_types::PoolTransactionEntry = tx_entry.into();
let reject: ckb_jsonrpc_types::PoolTransactionReject = reject.into();
let json_string = Ok(serde_json::to_string(&(entry, reject)).expect("serialization should be ok"));
for sink in new_transaction_subscribers.values() {
let _ = sink.notify(json_string.clone());
}
}
}
else => break,
}
}
});
subscription_rpc_impl
}
}