jsonrpsee_ws_client/
lib.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! # jsonrpsee-ws-client
28//!
29//! `jsonrpsee-ws-client` is a [JSON RPC](https://www.jsonrpc.org/specification) WebSocket client library that's is built for `async/await`.
30//!
31//! ## Async runtime support
32//!
33//! This library uses `tokio` as the runtime and does not support other runtimes.
34
35#![cfg_attr(not(test), warn(unused_crate_dependencies))]
36#![cfg_attr(docsrs, feature(doc_cfg))]
37
38#[cfg(test)]
39mod tests;
40
41pub use http::{HeaderMap, HeaderValue};
42pub use jsonrpsee_core::client::Client as WsClient;
43pub use jsonrpsee_core::client::async_client::PingConfig;
44pub use jsonrpsee_core::client::async_client::RpcService;
45pub use jsonrpsee_core::middleware::RpcServiceBuilder;
46use jsonrpsee_core::middleware::layer::RpcLoggerLayer;
47pub use jsonrpsee_types as types;
48
49use jsonrpsee_client_transport::ws::{AsyncRead, AsyncWrite, WsTransportClientBuilder};
50use jsonrpsee_core::TEN_MB_SIZE_BYTES;
51use jsonrpsee_core::client::{ClientBuilder, Error, IdKind, MaybeSend, TransportReceiverT, TransportSenderT};
52use std::time::Duration;
53use url::Url;
54
55type Logger = tower::layer::util::Stack<RpcLoggerLayer, tower::layer::util::Identity>;
56
57#[cfg(feature = "tls")]
58pub use jsonrpsee_client_transport::ws::CustomCertStore;
59
60#[cfg(feature = "tls")]
61use jsonrpsee_client_transport::ws::CertificateStore;
62
63/// Builder for [`WsClient`].
64///
65/// # Examples
66///
67/// ```no_run
68///
69/// use jsonrpsee_ws_client::{WsClientBuilder, HeaderMap, HeaderValue};
70///
71/// #[tokio::main]
72/// async fn main() {
73///     // Build custom headers used during the handshake process.
74///     let mut headers = HeaderMap::new();
75///     headers.insert("Any-Header-You-Like", HeaderValue::from_static("42"));
76///
77///     // Build client
78///     let client = WsClientBuilder::default()
79///          .set_headers(headers)
80///          .build("wss://localhost:443")
81///          .await
82///          .unwrap();
83///
84///     // use client....
85/// }
86///
87/// ```
88#[derive(Clone, Debug)]
89pub struct WsClientBuilder<RpcMiddleware = Logger> {
90	#[cfg(feature = "tls")]
91	certificate_store: CertificateStore,
92	max_request_size: u32,
93	max_response_size: u32,
94	max_frame_size: Option<u32>,
95	request_timeout: Duration,
96	connection_timeout: Duration,
97	ping_config: Option<PingConfig>,
98	headers: http::HeaderMap,
99	max_concurrent_requests: usize,
100	max_buffer_capacity_per_subscription: usize,
101	max_redirections: usize,
102	id_kind: IdKind,
103	tcp_no_delay: bool,
104	service_builder: RpcServiceBuilder<RpcMiddleware>,
105}
106
107impl Default for WsClientBuilder {
108	fn default() -> Self {
109		Self {
110			#[cfg(feature = "tls")]
111			certificate_store: CertificateStore::Native,
112			max_request_size: TEN_MB_SIZE_BYTES,
113			max_response_size: TEN_MB_SIZE_BYTES,
114			max_frame_size: None,
115			request_timeout: Duration::from_secs(60),
116			connection_timeout: Duration::from_secs(10),
117			ping_config: None,
118			headers: HeaderMap::new(),
119			max_concurrent_requests: 256,
120			max_buffer_capacity_per_subscription: 1024,
121			max_redirections: 5,
122			id_kind: IdKind::Number,
123			tcp_no_delay: true,
124			service_builder: RpcServiceBuilder::default().rpc_logger(1024),
125		}
126	}
127}
128
129impl WsClientBuilder {
130	/// Create a new WebSocket client builder.
131	pub fn new() -> WsClientBuilder {
132		WsClientBuilder::default()
133	}
134}
135
136impl<RpcMiddleware> WsClientBuilder<RpcMiddleware> {
137	/// Force to use a custom certificate store.
138	///
139	/// # Optional
140	///
141	/// This requires the optional `tls` feature.
142	///
143	/// # Example
144	///
145	/// ```no_run
146	/// use jsonrpsee_ws_client::{WsClientBuilder, CustomCertStore};
147	/// use rustls::{
148	///     client::danger::{self, HandshakeSignatureValid, ServerCertVerified},
149	///     pki_types::{CertificateDer, ServerName, UnixTime},
150	///     Error,
151	/// };
152	///
153	/// #[derive(Debug)]
154	/// struct NoCertificateVerification;
155	///
156	/// impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
157	///     fn verify_server_cert(
158	///         &self,
159	///         _: &CertificateDer<'_>,
160	///         _: &[CertificateDer<'_>],
161	///         _: &ServerName<'_>,
162	///         _: &[u8],
163	///         _: UnixTime,
164	///     ) -> Result<ServerCertVerified, Error> {
165	///         Ok(ServerCertVerified::assertion())
166	///     }
167	///
168	///     fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
169	///         vec![rustls::SignatureScheme::ECDSA_NISTP256_SHA256]
170	///     }
171	///
172	///     fn verify_tls12_signature(
173	///         &self,
174	///         _: &[u8],
175	///         _: &CertificateDer<'_>,
176	///         _: &rustls::DigitallySignedStruct,
177	///     ) -> Result<rustls::client::danger::HandshakeSignatureValid, Error> {
178	///         Ok(HandshakeSignatureValid::assertion())
179	///     }
180	///
181	///     fn verify_tls13_signature(
182	///         &self,
183	///         _: &[u8],
184	///         _: &CertificateDer<'_>,
185	///         _: &rustls::DigitallySignedStruct,
186	///     ) -> Result<HandshakeSignatureValid, Error> {
187	///         Ok(HandshakeSignatureValid::assertion())
188	///     }
189	/// }
190	///
191	/// let tls_cfg = CustomCertStore::builder()
192	///    .dangerous()
193	///    .with_custom_certificate_verifier(std::sync::Arc::new(NoCertificateVerification))
194	///    .with_no_client_auth();
195	///
196	/// // client builder with disabled certificate verification.
197	/// let client_builder = WsClientBuilder::new().with_custom_cert_store(tls_cfg);
198	/// ```
199	#[cfg(feature = "tls")]
200	pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
201		self.certificate_store = CertificateStore::Custom(cfg);
202		self
203	}
204
205	/// See documentation [`WsTransportClientBuilder::max_request_size`] (default is 10 MB).
206	pub fn max_request_size(mut self, size: u32) -> Self {
207		self.max_request_size = size;
208		self
209	}
210
211	/// See documentation [`WsTransportClientBuilder::max_response_size`] (default is 10 MB).
212	pub fn max_response_size(mut self, size: u32) -> Self {
213		self.max_response_size = size;
214		self
215	}
216
217	/// See documentation [`WsTransportClientBuilder::max_frame_size`] (default is none).
218	pub fn max_frame_size(mut self, size: u32) -> Self {
219		self.max_frame_size = Some(size);
220		self
221	}
222
223	/// See documentation [`ClientBuilder::request_timeout`] (default is 60 seconds).
224	pub fn request_timeout(mut self, timeout: Duration) -> Self {
225		self.request_timeout = timeout;
226		self
227	}
228
229	/// See documentation [`WsTransportClientBuilder::connection_timeout`] (default is 10 seconds).
230	pub fn connection_timeout(mut self, timeout: Duration) -> Self {
231		self.connection_timeout = timeout;
232		self
233	}
234
235	/// See documentation [`ClientBuilder::enable_ws_ping`] (disabled by default).
236	pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self {
237		self.ping_config = Some(cfg);
238		self
239	}
240
241	/// See documentation [`ClientBuilder::disable_ws_ping`]
242	pub fn disable_ws_ping(mut self) -> Self {
243		self.ping_config = None;
244		self
245	}
246
247	/// See documentation [`WsTransportClientBuilder::set_headers`] (default is none).
248	pub fn set_headers(mut self, headers: http::HeaderMap) -> Self {
249		self.headers = headers;
250		self
251	}
252
253	/// See documentation [`ClientBuilder::max_concurrent_requests`] (default is 256).
254	pub fn max_concurrent_requests(mut self, max: usize) -> Self {
255		self.max_concurrent_requests = max;
256		self
257	}
258
259	/// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024).
260	pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
261		self.max_buffer_capacity_per_subscription = max;
262		self
263	}
264
265	/// See documentation [`WsTransportClientBuilder::max_redirections`] (default is 5).
266	pub fn max_redirections(mut self, redirect: usize) -> Self {
267		self.max_redirections = redirect;
268		self
269	}
270
271	/// See documentation for [`ClientBuilder::id_format`] (default is Number).
272	pub fn id_format(mut self, kind: IdKind) -> Self {
273		self.id_kind = kind;
274		self
275	}
276
277	/// See documentation [`ClientBuilder::set_tcp_no_delay`] (default is true).
278	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
279		self.tcp_no_delay = no_delay;
280		self
281	}
282
283	/// Set the RPC service builder.
284	pub fn set_rpc_middleware<T>(self, service_builder: RpcServiceBuilder<T>) -> WsClientBuilder<T> {
285		WsClientBuilder {
286			#[cfg(feature = "tls")]
287			certificate_store: self.certificate_store,
288			max_request_size: self.max_request_size,
289			max_response_size: self.max_response_size,
290			max_frame_size: self.max_frame_size,
291			request_timeout: self.request_timeout,
292			connection_timeout: self.connection_timeout,
293			ping_config: self.ping_config,
294			headers: self.headers,
295			max_concurrent_requests: self.max_concurrent_requests,
296			max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
297			max_redirections: self.max_redirections,
298			id_kind: self.id_kind,
299			tcp_no_delay: self.tcp_no_delay,
300			service_builder,
301		}
302	}
303
304	/// Build the [`WsClient`] with specified [`TransportSenderT`] [`TransportReceiverT`] parameters
305	///
306	/// ## Panics
307	///
308	/// Panics if being called outside of `tokio` runtime context.
309	pub fn build_with_transport<S, R, Svc>(self, sender: S, receiver: R) -> WsClient<Svc>
310	where
311		S: TransportSenderT + Send,
312		R: TransportReceiverT + Send,
313		RpcMiddleware: tower::Layer<RpcService, Service = Svc> + Clone + Send + Sync + 'static,
314	{
315		let Self {
316			max_concurrent_requests,
317			request_timeout,
318			ping_config,
319			max_buffer_capacity_per_subscription,
320			id_kind,
321			tcp_no_delay,
322			service_builder,
323			..
324		} = self;
325
326		let mut client = ClientBuilder::default()
327			.max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription)
328			.request_timeout(request_timeout)
329			.max_concurrent_requests(max_concurrent_requests)
330			.id_format(id_kind)
331			.set_tcp_no_delay(tcp_no_delay)
332			.set_rpc_middleware(service_builder);
333
334		if let Some(cfg) = ping_config {
335			client = client.enable_ws_ping(cfg);
336		}
337
338		client.build_with_tokio(sender, receiver)
339	}
340
341	/// Build the [`WsClient`] with specified data stream, using [`WsTransportClientBuilder::build_with_stream`].
342	///
343	/// ## Panics
344	///
345	/// Panics if being called outside of `tokio` runtime context.
346	pub async fn build_with_stream<S, T>(self, url: impl AsRef<str>, data_stream: T) -> Result<WsClient<S>, Error>
347	where
348		T: AsyncRead + AsyncWrite + Unpin + MaybeSend + 'static,
349		RpcMiddleware: tower::Layer<RpcService, Service = S> + Clone + Send + Sync + 'static,
350	{
351		let transport_builder = WsTransportClientBuilder {
352			#[cfg(feature = "tls")]
353			certificate_store: self.certificate_store.clone(),
354			connection_timeout: self.connection_timeout,
355			headers: self.headers.clone(),
356			max_request_size: self.max_request_size,
357			max_response_size: self.max_response_size,
358			max_frame_size: self.max_frame_size,
359			max_redirections: self.max_redirections,
360			tcp_no_delay: self.tcp_no_delay,
361		};
362
363		let uri = Url::parse(url.as_ref()).map_err(|e| Error::Transport(e.into()))?;
364		let (sender, receiver) =
365			transport_builder.build_with_stream(uri, data_stream).await.map_err(|e| Error::Transport(e.into()))?;
366
367		let ws_client = self.build_with_transport(sender, receiver);
368		Ok(ws_client)
369	}
370
371	/// Build the [`WsClient`] with specified URL to connect to, using the default
372	/// [`WsTransportClientBuilder::build_with_stream`], therefore with the default TCP as transport layer.
373	///
374	/// ## Panics
375	///
376	/// Panics if being called outside of `tokio` runtime context.
377	pub async fn build<S>(self, url: impl AsRef<str>) -> Result<WsClient<S>, Error>
378	where
379		RpcMiddleware: tower::Layer<RpcService, Service = S> + Clone + Send + Sync + 'static,
380	{
381		let transport_builder = WsTransportClientBuilder {
382			#[cfg(feature = "tls")]
383			certificate_store: self.certificate_store.clone(),
384			connection_timeout: self.connection_timeout,
385			headers: self.headers.clone(),
386			max_request_size: self.max_request_size,
387			max_response_size: self.max_response_size,
388			max_frame_size: self.max_frame_size,
389			max_redirections: self.max_redirections,
390			tcp_no_delay: self.tcp_no_delay,
391		};
392
393		let uri = Url::parse(url.as_ref()).map_err(|e| Error::Transport(e.into()))?;
394		let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;
395
396		let ws_client = self.build_with_transport(sender, receiver);
397		Ok(ws_client)
398	}
399}