subxt_rpcs/client/
rpc_client.rs

1// Copyright 2019-2025 Parity Technologies (UK) Ltd.
2// This file is dual-licensed as Apache-2.0 or GPL-3.0.
3// see LICENSE for license details.
4
5use super::{RawRpcSubscription, RpcClientT};
6use crate::Error;
7use futures::{Stream, StreamExt};
8use serde::{Serialize, de::DeserializeOwned};
9use serde_json::value::RawValue;
10use std::{pin::Pin, sync::Arc, task::Poll};
11
12/// A concrete wrapper around an [`RpcClientT`] which provides some higher level helper methods
13/// and is cheaply cloneable.
14#[derive(Clone)]
15pub struct RpcClient {
16    client: Arc<dyn RpcClientT>,
17}
18
19impl RpcClient {
20    #[cfg(feature = "jsonrpsee")]
21    #[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))]
22    /// Create a default RPC client pointed at some URL, currently based on [`jsonrpsee`].
23    ///
24    /// Errors if an insecure URL is provided. In this case, use [`RpcClient::from_insecure_url`] instead.
25    pub async fn from_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
26        crate::utils::validate_url_is_secure(url.as_ref())?;
27        RpcClient::from_insecure_url(url).await
28    }
29
30    #[cfg(feature = "jsonrpsee")]
31    /// Create a default RPC client pointed at some URL, currently based on [`jsonrpsee`].
32    ///
33    /// Allows insecure URLs without SSL encryption, e.g. (http:// and ws:// URLs).
34    pub async fn from_insecure_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
35        let client = super::jsonrpsee_client(url.as_ref())
36            .await
37            .map_err(|e| Error::Client(Box::new(e)))?;
38        Ok(Self::new(client))
39    }
40
41    /// Create a new [`RpcClient`] from an arbitrary [`RpcClientT`] implementation.
42    pub fn new<R: RpcClientT>(client: R) -> Self {
43        RpcClient {
44            client: Arc::new(client),
45        }
46    }
47
48    /// Make an RPC request, given a method name and some parameters.
49    ///
50    /// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to
51    /// construct the parameters.
52    pub async fn request<Res: DeserializeOwned>(
53        &self,
54        method: &str,
55        params: RpcParams,
56    ) -> Result<Res, Error> {
57        let res = self.client.request_raw(method, params.build()).await?;
58        let val = serde_json::from_str(res.get()).map_err(Error::Deserialization)?;
59        Ok(val)
60    }
61
62    /// Subscribe to an RPC endpoint, providing the parameters and the method to call to
63    /// unsubscribe from it again.
64    ///
65    /// See [`RpcParams`] and the [`rpc_params!`] macro for an example of how to
66    /// construct the parameters.
67    pub async fn subscribe<Res: DeserializeOwned>(
68        &self,
69        sub: &str,
70        params: RpcParams,
71        unsub: &str,
72    ) -> Result<RpcSubscription<Res>, Error> {
73        let sub = self
74            .client
75            .subscribe_raw(sub, params.build(), unsub)
76            .await?;
77        Ok(RpcSubscription::new(sub))
78    }
79}
80
81impl<C: RpcClientT> From<C> for RpcClient {
82    fn from(client: C) -> Self {
83        RpcClient::new(client)
84    }
85}
86
87impl std::fmt::Debug for RpcClient {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_tuple("RpcClient").finish()
90    }
91}
92
93impl std::ops::Deref for RpcClient {
94    type Target = dyn RpcClientT;
95    fn deref(&self) -> &Self::Target {
96        &*self.client
97    }
98}
99
100/// Create some [`RpcParams`] to pass to our [`RpcClient`]. [`RpcParams`]
101/// simply enforces that parameters handed to our [`RpcClient`] methods
102/// are the correct shape.
103///
104/// As with the [`serde_json::json!`] macro, this will panic if you provide
105/// parameters which cannot successfully be serialized to JSON.
106///
107/// # Example
108///
109/// ```rust,standalone_crate
110/// use subxt_rpcs::client::{ rpc_params, RpcParams };
111///
112/// // If you provide no params you get `None` back
113/// let params: RpcParams = rpc_params![];
114/// assert!(params.build().is_none());
115///
116/// // If you provide params you get `Some<Box<RawValue>>` back.
117/// let params: RpcParams = rpc_params![1, true, "foo"];
118/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]");
119/// ```
120#[macro_export]
121macro_rules! rpc_params {
122    ($($p:expr), *) => {{
123        // May be unused if empty; no params.
124        #[allow(unused_mut)]
125        let mut params = $crate::client::RpcParams::new();
126        $(
127            params.push($p).expect("values passed to rpc_params! must be serializable to JSON");
128        )*
129        params
130    }}
131}
132pub use rpc_params;
133
134/// This represents the parameters passed to an [`RpcClient`], and exists to
135/// enforce that parameters are provided in the correct format.
136///
137/// Prefer to use the [`rpc_params!`] macro for simpler creation of these.
138///
139/// # Example
140///
141/// ```rust,standalone_crate
142/// use subxt_rpcs::client::RpcParams;
143///
144/// let mut params = RpcParams::new();
145/// params.push(1).unwrap();
146/// params.push(true).unwrap();
147/// params.push("foo").unwrap();
148///
149/// assert_eq!(params.build().unwrap().get(), "[1,true,\"foo\"]");
150/// ```
151#[derive(Debug, Clone, Default)]
152pub struct RpcParams(Vec<u8>);
153
154impl RpcParams {
155    /// Create a new empty set of [`RpcParams`].
156    pub fn new() -> Self {
157        Self(Vec::new())
158    }
159    /// Push a parameter into our [`RpcParams`]. This serializes it to JSON
160    /// in the process, and so will return an error if this is not possible.
161    pub fn push<P: Serialize>(&mut self, param: P) -> Result<(), Error> {
162        if self.0.is_empty() {
163            self.0.push(b'[');
164        } else {
165            self.0.push(b',')
166        }
167        serde_json::to_writer(&mut self.0, &param).map_err(Error::Deserialization)?;
168        Ok(())
169    }
170    /// Build a [`RawValue`] from our params, returning `None` if no parameters
171    /// were provided.
172    pub fn build(mut self) -> Option<Box<RawValue>> {
173        if self.0.is_empty() {
174            None
175        } else {
176            self.0.push(b']');
177            let s = unsafe { String::from_utf8_unchecked(self.0) };
178            Some(RawValue::from_string(s).expect("Should be valid JSON"))
179        }
180    }
181}
182
183/// A generic RPC Subscription. This implements [`Stream`], and so most of
184/// the functionality you'll need to interact with it comes from the
185/// [`StreamExt`] extension trait.
186pub struct RpcSubscription<Res> {
187    inner: RawRpcSubscription,
188    _marker: std::marker::PhantomData<Res>,
189}
190
191impl<Res> std::fmt::Debug for RpcSubscription<Res> {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("RpcSubscription")
194            .field("inner", &"RawRpcSubscription")
195            .field("_marker", &self._marker)
196            .finish()
197    }
198}
199
200impl<Res> RpcSubscription<Res> {
201    /// Creates a new [`RpcSubscription`].
202    pub fn new(inner: RawRpcSubscription) -> Self {
203        Self {
204            inner,
205            _marker: std::marker::PhantomData,
206        }
207    }
208
209    /// Obtain the ID associated with this subscription.
210    pub fn subscription_id(&self) -> Option<&str> {
211        self.inner.id.as_deref()
212    }
213}
214
215impl<Res: DeserializeOwned> RpcSubscription<Res> {
216    /// Returns the next item in the stream. This is just a wrapper around
217    /// [`StreamExt::next()`] so that you can avoid the extra import.
218    pub async fn next(&mut self) -> Option<Result<Res, Error>> {
219        StreamExt::next(self).await
220    }
221}
222
223impl<Res> std::marker::Unpin for RpcSubscription<Res> {}
224
225impl<Res: DeserializeOwned> Stream for RpcSubscription<Res> {
226    type Item = Result<Res, Error>;
227
228    fn poll_next(
229        mut self: Pin<&mut Self>,
230        cx: &mut std::task::Context<'_>,
231    ) -> Poll<Option<Self::Item>> {
232        let res = futures::ready!(self.inner.stream.poll_next_unpin(cx));
233
234        // Decode the inner RawValue to the type we're expecting and map
235        // any errors to the right shape:
236        let res = res.map(|r| {
237            r.and_then(|raw_val| {
238                serde_json::from_str(raw_val.get()).map_err(Error::Deserialization)
239            })
240        });
241
242        Poll::Ready(res)
243    }
244}