subxt_rpcs/client/
rpc_client.rs1use super::{RawRpcSubscription, RpcClientT};
6use crate::Error;
7use futures::{Stream, StreamExt};
8use serde::{Serialize, de::DeserializeOwned};
9use serde_json::value::RawValue;
10use std::{pin::Pin, sync::Arc, task::Poll};
11
12#[derive(Clone)]
15pub struct RpcClient {
16 client: Arc<dyn RpcClientT>,
17}
18
19impl RpcClient {
20 #[cfg(feature = "jsonrpsee")]
21 #[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))]
22 pub async fn from_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
26 crate::utils::validate_url_is_secure(url.as_ref())?;
27 RpcClient::from_insecure_url(url).await
28 }
29
30 #[cfg(feature = "jsonrpsee")]
31 pub async fn from_insecure_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
35 let client = super::jsonrpsee_client(url.as_ref())
36 .await
37 .map_err(|e| Error::Client(Box::new(e)))?;
38 Ok(Self::new(client))
39 }
40
41 pub fn new<R: RpcClientT>(client: R) -> Self {
43 RpcClient {
44 client: Arc::new(client),
45 }
46 }
47
48 pub async fn request<Res: DeserializeOwned>(
53 &self,
54 method: &str,
55 params: RpcParams,
56 ) -> Result<Res, Error> {
57 let res = self.client.request_raw(method, params.build()).await?;
58 let val = serde_json::from_str(res.get()).map_err(Error::Deserialization)?;
59 Ok(val)
60 }
61
62 pub async fn subscribe<Res: DeserializeOwned>(
68 &self,
69 sub: &str,
70 params: RpcParams,
71 unsub: &str,
72 ) -> Result<RpcSubscription<Res>, Error> {
73 let sub = self
74 .client
75 .subscribe_raw(sub, params.build(), unsub)
76 .await?;
77 Ok(RpcSubscription::new(sub))
78 }
79}
80
81impl<C: RpcClientT> From<C> for RpcClient {
82 fn from(client: C) -> Self {
83 RpcClient::new(client)
84 }
85}
86
87impl std::fmt::Debug for RpcClient {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_tuple("RpcClient").finish()
90 }
91}
92
93impl std::ops::Deref for RpcClient {
94 type Target = dyn RpcClientT;
95 fn deref(&self) -> &Self::Target {
96 &*self.client
97 }
98}
99
100#[macro_export]
121macro_rules! rpc_params {
122 ($($p:expr), *) => {{
123 #[allow(unused_mut)]
125 let mut params = $crate::client::RpcParams::new();
126 $(
127 params.push($p).expect("values passed to rpc_params! must be serializable to JSON");
128 )*
129 params
130 }}
131}
132pub use rpc_params;
133
134#[derive(Debug, Clone, Default)]
152pub struct RpcParams(Vec<u8>);
153
154impl RpcParams {
155 pub fn new() -> Self {
157 Self(Vec::new())
158 }
159 pub fn push<P: Serialize>(&mut self, param: P) -> Result<(), Error> {
162 if self.0.is_empty() {
163 self.0.push(b'[');
164 } else {
165 self.0.push(b',')
166 }
167 serde_json::to_writer(&mut self.0, ¶m).map_err(Error::Deserialization)?;
168 Ok(())
169 }
170 pub fn build(mut self) -> Option<Box<RawValue>> {
173 if self.0.is_empty() {
174 None
175 } else {
176 self.0.push(b']');
177 let s = unsafe { String::from_utf8_unchecked(self.0) };
178 Some(RawValue::from_string(s).expect("Should be valid JSON"))
179 }
180 }
181}
182
183pub struct RpcSubscription<Res> {
187 inner: RawRpcSubscription,
188 _marker: std::marker::PhantomData<Res>,
189}
190
191impl<Res> std::fmt::Debug for RpcSubscription<Res> {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 f.debug_struct("RpcSubscription")
194 .field("inner", &"RawRpcSubscription")
195 .field("_marker", &self._marker)
196 .finish()
197 }
198}
199
200impl<Res> RpcSubscription<Res> {
201 pub fn new(inner: RawRpcSubscription) -> Self {
203 Self {
204 inner,
205 _marker: std::marker::PhantomData,
206 }
207 }
208
209 pub fn subscription_id(&self) -> Option<&str> {
211 self.inner.id.as_deref()
212 }
213}
214
215impl<Res: DeserializeOwned> RpcSubscription<Res> {
216 pub async fn next(&mut self) -> Option<Result<Res, Error>> {
219 StreamExt::next(self).await
220 }
221}
222
223impl<Res> std::marker::Unpin for RpcSubscription<Res> {}
224
225impl<Res: DeserializeOwned> Stream for RpcSubscription<Res> {
226 type Item = Result<Res, Error>;
227
228 fn poll_next(
229 mut self: Pin<&mut Self>,
230 cx: &mut std::task::Context<'_>,
231 ) -> Poll<Option<Self::Item>> {
232 let res = futures::ready!(self.inner.stream.poll_next_unpin(cx));
233
234 let res = res.map(|r| {
237 r.and_then(|raw_val| {
238 serde_json::from_str(raw_val.get()).map_err(Error::Deserialization)
239 })
240 });
241
242 Poll::Ready(res)
243 }
244}