forest/rpc/
client.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! # Design Goals
5//! - use [`jsonrpsee`] clients and primitives.
6//! - Support [`rpc::Request`](crate::rpc::Request).
7//! - Support different
8//!   - endpoint paths (`v0`, `v1`).
9//!   - communication protocols (`ws`, `http`).
10//! - Support per-request timeouts.
11
12use std::env;
13use std::fmt::{self, Debug};
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use anyhow::bail;
18use futures::future::Either;
19use http::{HeaderMap, HeaderValue, header};
20use jsonrpsee::core::ClientError;
21use jsonrpsee::core::client::ClientT as _;
22use jsonrpsee::core::params::{ArrayParams, ObjectParams};
23use jsonrpsee::core::traits::ToRpcParams;
24use serde::de::DeserializeOwned;
25use tracing::{Instrument, Level, debug};
26use url::Url;
27
28use super::{ApiPaths, MAX_REQUEST_BODY_SIZE, MAX_RESPONSE_BODY_SIZE, Request};
29
30/// A JSON-RPC client that can dispatch either a [`crate::rpc::Request`] to a single URL.
31pub struct Client {
32    /// SHOULD end in a slash, due to our use of [`Url::join`].
33    base_url: Url,
34    token: Option<String>,
35    // just having these versions inline is easier than using a map
36    v0: tokio::sync::OnceCell<UrlClient>,
37    v1: tokio::sync::OnceCell<UrlClient>,
38    v2: tokio::sync::OnceCell<UrlClient>,
39}
40
41impl Client {
42    /// Use either the URL in the environment or a default.
43    ///
44    /// If `token` is provided, use that over the token in either of the above.
45    pub fn default_or_from_env(token: Option<&str>) -> anyhow::Result<Self> {
46        static DEFAULT: LazyLock<Url> = LazyLock::new(|| "http://127.0.0.1:2345/".parse().unwrap());
47
48        let mut base_url = match env::var("FULLNODE_API_INFO") {
49            Ok(it) => {
50                let crate::utils::UrlFromMultiAddr(url) = it.parse()?;
51                url
52            }
53            Err(env::VarError::NotPresent) => DEFAULT.clone(),
54            Err(e @ env::VarError::NotUnicode(_)) => bail!(e),
55        };
56        if token.is_some() && base_url.set_password(token).is_err() {
57            bail!("couldn't set override password")
58        }
59        // Set default token if not provided
60        if token.is_none() && base_url.password().is_none() {
61            let client_config = crate::cli_shared::cli::Client::default();
62            let default_token_path = client_config.default_rpc_token_path();
63            if default_token_path.is_file() {
64                if let Ok(token) = std::fs::read_to_string(&default_token_path) {
65                    if base_url.set_password(Some(token.trim())).is_ok() {
66                        tracing::debug!("Loaded the default RPC token");
67                    } else {
68                        tracing::warn!("Failed to set the default RPC token");
69                    }
70                } else {
71                    tracing::warn!("Failed to load the default token file");
72                }
73            }
74        }
75        Ok(Self::from_url(base_url))
76    }
77    pub fn from_url(mut base_url: Url) -> Self {
78        let token = base_url.password().map(Into::into);
79        let _defer = base_url.set_password(None);
80        Self {
81            token,
82            base_url,
83            v0: Default::default(),
84            v1: Default::default(),
85            v2: Default::default(),
86        }
87    }
88    pub fn base_url(&self) -> &Url {
89        &self.base_url
90    }
91    pub async fn call<T: crate::lotus_json::HasLotusJson + std::fmt::Debug>(
92        &self,
93        req: Request<T>,
94    ) -> Result<T, ClientError> {
95        let max_api_path = req
96            .api_path()
97            .map_err(|e| ClientError::Custom(e.to_string()))?;
98        let Request {
99            method_name,
100            params,
101            timeout,
102            ..
103        } = req;
104        let method_name = method_name.as_ref();
105        let client = self.get_or_init_client(max_api_path).await?;
106        let span = tracing::debug_span!("request", method = %method_name, url = %client.url);
107        let work = async {
108            // jsonrpsee's clients have a global `timeout`, but not a per-request timeout, which
109            // RpcRequest expects.
110            // So shim in our own timeout
111            let result_or_timeout = tokio::time::timeout(
112                timeout,
113                match params {
114                    serde_json::Value::Null => Either::Left(Either::Left(
115                        client.request::<T::LotusJson, _>(method_name, ArrayParams::new()),
116                    )),
117                    serde_json::Value::Array(it) => {
118                        let mut params = ArrayParams::new();
119                        for param in it {
120                            params.insert(param)?
121                        }
122                        trace_params(params.clone());
123                        Either::Left(Either::Right(client.request(method_name, params)))
124                    }
125                    serde_json::Value::Object(it) => {
126                        let mut params = ObjectParams::new();
127                        for (name, param) in it {
128                            params.insert(&name, param)?
129                        }
130                        trace_params(params.clone());
131                        Either::Right(client.request(method_name, params))
132                    }
133                    prim @ (serde_json::Value::Bool(_)
134                    | serde_json::Value::Number(_)
135                    | serde_json::Value::String(_)) => {
136                        return Err(ClientError::Custom(format!(
137                            "invalid parameter type: `{prim}`"
138                        )));
139                    }
140                },
141            )
142            .await;
143            let result = match result_or_timeout {
144                Ok(Ok(it)) => Ok(T::from_lotus_json(it)),
145                Ok(Err(e)) => Err(e),
146                Err(_) => Err(ClientError::RequestTimeout),
147            };
148            debug!(?result);
149            result
150        };
151        work.instrument(span.or_current()).await
152    }
153    async fn get_or_init_client(&self, path: ApiPaths) -> Result<&UrlClient, ClientError> {
154        match path {
155            ApiPaths::V0 => &self.v0,
156            ApiPaths::V1 => &self.v1,
157            ApiPaths::V2 => &self.v2,
158        }
159        .get_or_try_init(|| async {
160            let url = self.base_url.join(path.path()).map_err(|it| {
161                ClientError::Custom(format!("creating url for endpoint failed: {it}"))
162            })?;
163            UrlClient::new(url, self.token.clone()).await
164        })
165        .await
166    }
167}
168
169fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) {
170    if tracing::enabled!(Level::TRACE) {
171        match params.to_rpc_params() {
172            Ok(Some(it)) => tracing::trace!(params = %it),
173            Ok(None) => tracing::trace!("no params"),
174            Err(error) => tracing::trace!(%error, "couldn't decode params"),
175        }
176    }
177}
178
179/// Represents a single, perhaps persistent connection to a URL over which requests
180/// can be made using [`jsonrpsee`] primitives.
181struct UrlClient {
182    url: Url,
183    inner: UrlClientInner,
184}
185
186impl Debug for UrlClient {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("OneClient")
189            .field("url", &self.url)
190            .finish_non_exhaustive()
191    }
192}
193
194impl UrlClient {
195    async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
196        const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves.
197        let headers = match token.into() {
198            Some(token) => HeaderMap::from_iter([(
199                header::AUTHORIZATION,
200                match HeaderValue::try_from(format!("Bearer {token}")) {
201                    Ok(token) => token,
202                    Err(e) => {
203                        return Err(ClientError::Custom(format!(
204                            "Invalid authorization token: {e}",
205                        )));
206                    }
207                },
208            )]),
209            None => HeaderMap::new(),
210        };
211        let inner = match url.scheme() {
212            "ws" | "wss" => UrlClientInner::Ws(
213                jsonrpsee::ws_client::WsClientBuilder::new()
214                    .set_headers(headers)
215                    .max_request_size(MAX_REQUEST_BODY_SIZE)
216                    .max_response_size(MAX_RESPONSE_BODY_SIZE)
217                    .request_timeout(ONE_DAY)
218                    .build(&url)
219                    .await?,
220            ),
221            "http" | "https" => UrlClientInner::Https(
222                jsonrpsee::http_client::HttpClientBuilder::new()
223                    .set_headers(headers)
224                    .max_request_size(MAX_REQUEST_BODY_SIZE)
225                    .max_response_size(MAX_RESPONSE_BODY_SIZE)
226                    .request_timeout(ONE_DAY)
227                    .build(&url)?,
228            ),
229            it => {
230                return Err(ClientError::Custom(format!("Unsupported URL scheme: {it}")));
231            }
232        };
233        Ok(Self { url, inner })
234    }
235}
236
237#[allow(clippy::large_enum_variant)]
238enum UrlClientInner {
239    Ws(jsonrpsee::ws_client::WsClient),
240    Https(jsonrpsee::http_client::HttpClient),
241}
242
243impl jsonrpsee::core::client::ClientT for UrlClient {
244    fn notification<Params>(
245        &self,
246        method: &str,
247        params: Params,
248    ) -> impl Future<Output = Result<(), jsonrpsee::core::client::Error>> + Send
249    where
250        Params: ToRpcParams + Send,
251    {
252        match &self.inner {
253            UrlClientInner::Ws(it) => Either::Left(it.notification(method, params)),
254            UrlClientInner::Https(it) => Either::Right(it.notification(method, params)),
255        }
256    }
257
258    fn request<R, Params>(
259        &self,
260        method: &str,
261        params: Params,
262    ) -> impl Future<Output = Result<R, jsonrpsee::core::client::Error>> + Send
263    where
264        R: DeserializeOwned,
265        Params: ToRpcParams + Send,
266    {
267        match &self.inner {
268            UrlClientInner::Ws(it) => Either::Left(it.request(method, params)),
269            UrlClientInner::Https(it) => Either::Right(it.request(method, params)),
270        }
271    }
272
273    fn batch_request<'a, R>(
274        &self,
275        batch: jsonrpsee::core::params::BatchRequestBuilder<'a>,
276    ) -> impl Future<
277        Output = Result<
278            jsonrpsee::core::client::BatchResponse<'a, R>,
279            jsonrpsee::core::client::Error,
280        >,
281    > + Send
282    where
283        R: DeserializeOwned + fmt::Debug + 'a,
284    {
285        match &self.inner {
286            UrlClientInner::Ws(it) => Either::Left(it.batch_request(batch)),
287            UrlClientInner::Https(it) => Either::Right(it.batch_request(batch)),
288        }
289    }
290}
291
292impl jsonrpsee::core::client::SubscriptionClientT for UrlClient {
293    fn subscribe<'a, N, Params>(
294        &self,
295        subscribe_method: &'a str,
296        params: Params,
297        unsubscribe_method: &'a str,
298    ) -> impl Future<
299        Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
300    >
301    where
302        Params: ToRpcParams + Send,
303        N: DeserializeOwned,
304    {
305        match &self.inner {
306            UrlClientInner::Ws(it) => {
307                Either::Left(it.subscribe(subscribe_method, params, unsubscribe_method))
308            }
309            UrlClientInner::Https(it) => {
310                Either::Right(it.subscribe(subscribe_method, params, unsubscribe_method))
311            }
312        }
313    }
314
315    fn subscribe_to_method<N>(
316        &self,
317        method: &str,
318    ) -> impl Future<
319        Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
320    >
321    where
322        N: DeserializeOwned,
323    {
324        match &self.inner {
325            UrlClientInner::Ws(it) => Either::Left(it.subscribe_to_method(method)),
326            UrlClientInner::Https(it) => Either::Right(it.subscribe_to_method(method)),
327        }
328    }
329}