use crate::imports::*;
use js_sys::Array;
use kaspa_addresses::Address;
use kaspa_notify::notification::Notification as NotificationT;
pub use kaspa_rpc_macros::{build_wrpc_wasm_bindgen_interface, build_wrpc_wasm_bindgen_subscriptions};
pub use serde_wasm_bindgen::*;
type JsResult<T> = std::result::Result<T, JsError>;
struct NotificationSink(Function);
unsafe impl Send for NotificationSink {}
impl From<NotificationSink> for Function {
fn from(f: NotificationSink) -> Self {
f.0
}
}
#[wasm_bindgen]
pub struct RpcClient {
client: KaspaRpcClient,
notification_task: AtomicBool,
notification_ctl: DuplexChannel,
notification_callback: Arc<Mutex<Option<NotificationSink>>>,
}
#[wasm_bindgen]
impl RpcClient {
#[wasm_bindgen(constructor)]
pub fn new(encoding: Encoding, url: &str) -> RpcClient {
RpcClient {
client: KaspaRpcClient::new(encoding, url).unwrap_or_else(|err| panic!("{err}")),
notification_task: AtomicBool::new(false),
notification_ctl: DuplexChannel::oneshot(),
notification_callback: Arc::new(Mutex::new(None)),
}
}
pub async fn connect(&self) -> JsResult<()> {
self.notification_task()?;
self.client.start().await?;
self.client.connect(true).await?; Ok(())
}
pub async fn disconnect(&self) -> JsResult<()> {
self.clear_notification_callback();
self.stop_notification_task().await?;
self.client.stop().await?;
self.client.shutdown().await?;
Ok(())
}
async fn stop_notification_task(&self) -> JsResult<()> {
if self.notification_task.load(Ordering::SeqCst) {
self.notification_task.store(false, Ordering::SeqCst);
self.notification_ctl.signal(()).await.map_err(|err| JsError::new(&err.to_string()))?;
}
Ok(())
}
fn clear_notification_callback(&self) {
*self.notification_callback.lock().unwrap() = None;
}
pub async fn notify(&self, callback: JsValue) -> JsResult<()> {
if callback.is_function() {
let fn_callback: Function = callback.into();
self.notification_callback.lock().unwrap().replace(NotificationSink(fn_callback));
} else {
self.stop_notification_task().await?;
self.clear_notification_callback();
}
Ok(())
}
}
impl RpcClient {
fn notification_task(&self) -> JsResult<()> {
let ctl_receiver = self.notification_ctl.request.receiver.clone();
let ctl_sender = self.notification_ctl.response.sender.clone();
let notification_receiver = self.client.notification_channel_receiver();
let notification_callback = self.notification_callback.clone();
spawn(async move {
loop {
select! {
_ = ctl_receiver.recv().fuse() => {
break;
},
msg = notification_receiver.recv().fuse() => {
if let Ok(notification) = &msg {
if let Some(callback) = notification_callback.lock().unwrap().as_ref() {
let op: RpcApiOps = notification.event_type().into();
let op_value = to_value(&op).map_err(|err|{
log_error!("Notification handler - unable to convert notification op: {}",err.to_string());
}).ok();
let op_payload = notification.to_value().map_err(|err| {
log_error!("Notification handler - unable to convert notification payload: {}",err.to_string());
}).ok();
if op_value.is_none() || op_payload.is_none() {
continue;
}
if let Err(err) = callback.0.call2(&JsValue::undefined(), &op_value.unwrap(), &op_payload.unwrap()) {
log_error!("Error while executing notification callback: {:?}",err);
}
}
}
}
}
}
ctl_sender.send(()).await.ok();
});
Ok(())
}
}
#[wasm_bindgen]
impl RpcClient {
#[wasm_bindgen(js_name = subscribeDaaScore)]
pub async fn subscribe_daa_score(&self) -> JsResult<()> {
self.client.start_notify(ListenerId::default(), Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?;
Ok(())
}
#[wasm_bindgen(js_name = unsubscribeDaaScore)]
pub async fn unsubscribe_daa_score(&self) -> JsResult<()> {
self.client.stop_notify(ListenerId::default(), Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?;
Ok(())
}
#[wasm_bindgen(js_name = subscribeUtxosChanged)]
pub async fn subscribe_utxos_changed(&self, addresses: &JsValue) -> JsResult<()> {
let addresses = Array::from(addresses)
.to_vec()
.into_iter()
.map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string())))
.collect::<std::result::Result<Vec<Address>, JsError>>()?;
self.client.start_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}
#[wasm_bindgen(js_name = unsubscribeUtxosChanged)]
pub async fn unsubscribe_utxos_changed(&self, addresses: &JsValue) -> JsResult<()> {
let addresses = Array::from(addresses)
.to_vec()
.into_iter()
.map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string())))
.collect::<std::result::Result<Vec<Address>, JsError>>()?;
self.client.stop_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
Ok(())
}
#[wasm_bindgen(js_name = subscribeVirtualChainChanged)]
pub async fn subscribe_virtual_chain_changed(&self, include_accepted_transaction_ids: bool) -> JsResult<()> {
self.client
.start_notify(
ListenerId::default(),
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}
#[wasm_bindgen(js_name = unsubscribeVirtualChainChanged)]
pub async fn unsubscribe_virtual_chain_changed(&self, include_accepted_transaction_ids: bool) -> JsResult<()> {
self.client
.stop_notify(
ListenerId::default(),
Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }),
)
.await?;
Ok(())
}
}
build_wrpc_wasm_bindgen_subscriptions!([
BlockAdded,
FinalityConflict,
FinalityConflictResolved,
SinkBlueScoreChanged,
VirtualDaaScoreChanged,
PruningPointUtxoSetOverride,
NewBlockTemplate,
]);
build_wrpc_wasm_bindgen_interface!(
[
GetBlockCount,
GetBlockDagInfo,
GetCoinSupply,
GetConnectedPeerInfo,
GetInfo,
GetPeerAddresses,
GetProcessMetrics,
GetSelectedTipHash,
GetSinkBlueScore,
Ping,
Shutdown,
],
[
AddPeer,
Ban,
EstimateNetworkHashesPerSecond,
GetBalanceByAddress,
GetBalancesByAddresses,
GetBlock,
GetBlocks,
GetBlockTemplate,
GetCurrentNetwork,
GetHeaders,
GetMempoolEntries,
GetMempoolEntriesByAddresses,
GetMempoolEntry,
GetSubnetwork,
GetVirtualChainFromBlock,
ResolveFinalityConflict,
SubmitBlock,
Unban,
]
);
#[wasm_bindgen]
impl RpcClient {
#[wasm_bindgen(js_name = submitTransaction)]
pub async fn submit_transaction(&self, request: JsValue) -> JsResult<JsValue> {
log_info!("submit_transaction req: {:?}", request);
let request: SubmitTransactionRequest = from_value(request)?;
let result: RpcResult<SubmitTransactionResponse> = self.client.submit_transaction_call(request).await;
let response: SubmitTransactionResponse = result.map_err(|err| wasm_bindgen::JsError::new(&err.to_string()))?;
to_value(&response).map_err(|err| err.into())
}
#[wasm_bindgen(js_name = getUtxosByAddresses)]
pub async fn get_utxos_by_addresses(&self, request: JsValue) -> JsResult<JsValue> {
log_info!("get_utxos_by_addresses req: {:?}", request);
let request: GetUtxosByAddressesRequest = from_value(request)?;
let result: RpcResult<GetUtxosByAddressesResponse> = self.client.get_utxos_by_addresses_call(request).await;
let response: GetUtxosByAddressesResponse = result.map_err(|err| wasm_bindgen::JsError::new(&err.to_string()))?;
to_value(&response).map_err(|err| err.into())
}
}