substrate_api_client/rpc/jsonrpsee_client/
mod.rs

1/*
2   Copyright 2019 Supercomputing Systems AG
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6	   http://www.apache.org/licenses/LICENSE-2.0
7   Unless required by applicable law or agreed to in writing, software
8   distributed under the License is distributed on an "AS IS" BASIS,
9   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10   See the License for the specific language governing permissions and
11   limitations under the License.
12*/
13
14use crate::rpc::{Error, Request, Result, RpcParams, Subscribe};
15use jsonrpsee::{
16	client_transport::ws::{Url, WsTransportClientBuilder},
17	core::{
18		client::{Client, ClientBuilder, ClientT, Error as JsonrpseeError, SubscriptionClientT},
19		traits::ToRpcParams,
20	},
21};
22use serde::de::DeserializeOwned;
23use serde_json::value::RawValue;
24use std::sync::Arc;
25
26pub use subscription::SubscriptionWrapper;
27
28mod subscription;
29
30#[derive(Clone)]
31pub struct JsonrpseeClient {
32	inner: Arc<Client>,
33}
34
35impl JsonrpseeClient {
36	/// Create a new client to a local Substrate node with default port.
37	pub async fn with_default_url() -> Result<Self> {
38		Self::new("ws://127.0.0.1:9944").await
39	}
40
41	/// Create a new client with the given url string.
42	/// Example url input: "ws://127.0.0.1:9944"
43	pub async fn new(url: &str) -> Result<Self> {
44		let parsed_url: Url = url.parse().map_err(|e| Error::Client(Box::new(e)))?;
45		let (tx, rx) = WsTransportClientBuilder::default()
46			.build(parsed_url)
47			.await
48			.map_err(|e| Error::Client(Box::new(e)))?;
49		let client = ClientBuilder::default()
50			.max_buffer_capacity_per_subscription(4096)
51			.build_with_tokio(tx, rx);
52		Ok(Self { inner: Arc::new(client) })
53	}
54
55	/// Create a new client with the given address, port and max number of reconnection attempts.
56	/// Example input:
57	/// - address: "ws://127.0.0.1"
58	/// - port: 9944
59	pub async fn new_with_port(address: &str, port: u32) -> Result<Self> {
60		let url = format!("{address}:{port:?}");
61		Self::new(&url).await
62	}
63
64	/// Create a new client with a user-generated Jsonrpsee Client.
65	pub fn new_with_client(client: Client) -> Self {
66		let inner = Arc::new(client);
67		Self { inner }
68	}
69}
70
71impl JsonrpseeClient {
72	/// Checks if the client is connected to the target.
73	pub fn is_connected(&self) -> bool {
74		self.inner.is_connected()
75	}
76
77	/// This is similar to [`Client::on_disconnect`] but it can be used to get
78	/// the reason why the client was disconnected but it's not cancel-safe.
79	///
80	/// The typical use-case is that this method will be called after
81	/// [`Client::on_disconnect`] has returned in a "select loop".
82	///
83	/// # Cancel-safety
84	///
85	/// This method is not cancel-safe
86	pub async fn disconnect_reason(&self) -> JsonrpseeError {
87		self.inner.disconnect_reason().await
88	}
89
90	/// Completes when the client is disconnected or the client's background task encountered an error.
91	/// If the client is already disconnected, the future produced by this method will complete immediately.
92	///
93	/// # Cancel safety
94	///
95	/// This method is cancel safe.
96	pub async fn on_disconnect(&self) {
97		self.inner.on_disconnect().await;
98	}
99}
100
101#[maybe_async::async_impl(?Send)]
102impl Request for JsonrpseeClient {
103	async fn request<R: DeserializeOwned>(&self, method: &str, params: RpcParams) -> Result<R> {
104		self.inner
105			.request(method, RpcParamsWrapper(params))
106			.await
107			.map_err(|e| Error::Client(Box::new(e)))
108	}
109}
110
111#[maybe_async::async_impl(?Send)]
112impl Subscribe for JsonrpseeClient {
113	type Subscription<Notification>
114		= SubscriptionWrapper<Notification>
115	where
116		Notification: DeserializeOwned;
117
118	async fn subscribe<Notification: DeserializeOwned>(
119		&self,
120		sub: &str,
121		params: RpcParams,
122		unsub: &str,
123	) -> Result<Self::Subscription<Notification>> {
124		self.inner
125			.subscribe(sub, RpcParamsWrapper(params), unsub)
126			.await
127			.map(|sub| sub.into())
128			.map_err(|e| Error::Client(Box::new(e)))
129	}
130}
131
132struct RpcParamsWrapper(RpcParams);
133
134impl ToRpcParams for RpcParamsWrapper {
135	fn to_rpc_params(self) -> core::result::Result<Option<Box<RawValue>>, serde_json::Error> {
136		if let Some(json) = self.0.build() {
137			RawValue::from_string(json).map(Some)
138		} else {
139			Ok(None)
140		}
141	}
142}