substrate_api_client/rpc/jsonrpsee_client/
mod.rs1use crate::rpc::{Error, Request, Result, RpcParams, Subscribe};
15use jsonrpsee::{
16 client_transport::ws::{Url, WsTransportClientBuilder},
17 core::{
18 client::{Client, ClientBuilder, ClientT, Error as JsonrpseeError, SubscriptionClientT},
19 traits::ToRpcParams,
20 },
21};
22use serde::de::DeserializeOwned;
23use serde_json::value::RawValue;
24use std::sync::Arc;
25
26pub use subscription::SubscriptionWrapper;
27
28mod subscription;
29
30#[derive(Clone)]
31pub struct JsonrpseeClient {
32 inner: Arc<Client>,
33}
34
35impl JsonrpseeClient {
36 pub async fn with_default_url() -> Result<Self> {
38 Self::new("ws://127.0.0.1:9944").await
39 }
40
41 pub async fn new(url: &str) -> Result<Self> {
44 let parsed_url: Url = url.parse().map_err(|e| Error::Client(Box::new(e)))?;
45 let (tx, rx) = WsTransportClientBuilder::default()
46 .build(parsed_url)
47 .await
48 .map_err(|e| Error::Client(Box::new(e)))?;
49 let client = ClientBuilder::default()
50 .max_buffer_capacity_per_subscription(4096)
51 .build_with_tokio(tx, rx);
52 Ok(Self { inner: Arc::new(client) })
53 }
54
55 pub async fn new_with_port(address: &str, port: u32) -> Result<Self> {
60 let url = format!("{address}:{port:?}");
61 Self::new(&url).await
62 }
63
64 pub fn new_with_client(client: Client) -> Self {
66 let inner = Arc::new(client);
67 Self { inner }
68 }
69}
70
71impl JsonrpseeClient {
72 pub fn is_connected(&self) -> bool {
74 self.inner.is_connected()
75 }
76
77 #[deprecated = "Use on_disconnect instead."]
80 pub async fn disconnect_reason(&self) -> JsonrpseeError {
81 self.inner.on_disconnect().await
82 }
83
84 pub async fn on_disconnect(&self) {
91 self.inner.on_disconnect().await;
92 }
93}
94
95#[maybe_async::async_impl(?Send)]
96impl Request for JsonrpseeClient {
97 async fn request<R: DeserializeOwned>(&self, method: &str, params: RpcParams) -> Result<R> {
98 self.inner
99 .request(method, RpcParamsWrapper(params))
100 .await
101 .map_err(|e| Error::Client(Box::new(e)))
102 }
103}
104
105#[maybe_async::async_impl(?Send)]
106impl Subscribe for JsonrpseeClient {
107 type Subscription<Notification>
108 = SubscriptionWrapper<Notification>
109 where
110 Notification: DeserializeOwned;
111
112 async fn subscribe<Notification: DeserializeOwned>(
113 &self,
114 sub: &str,
115 params: RpcParams,
116 unsub: &str,
117 ) -> Result<Self::Subscription<Notification>> {
118 self.inner
119 .subscribe(sub, RpcParamsWrapper(params), unsub)
120 .await
121 .map(|sub| sub.into())
122 .map_err(|e| Error::Client(Box::new(e)))
123 }
124}
125
126struct RpcParamsWrapper(RpcParams);
127
128impl ToRpcParams for RpcParamsWrapper {
129 fn to_rpc_params(self) -> core::result::Result<Option<Box<RawValue>>, serde_json::Error> {
130 if let Some(json) = self.0.build() {
131 RawValue::from_string(json).map(Some)
132 } else {
133 Ok(None)
134 }
135 }
136}