Skip to main content

forest/rpc/
client.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! # Design Goals
5//! - use [`jsonrpsee`] clients and primitives.
6//! - Support [`rpc::Request`](crate::rpc::Request).
7//! - Support different
8//!   - endpoint paths (`v0`, `v1`).
9//!   - communication protocols (`ws`, `http`).
10//! - Support per-request timeouts.
11
12use std::env;
13use std::fmt::{self, Debug};
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use anyhow::bail;
18use futures::future::Either;
19use http::{HeaderMap, HeaderValue, header};
20use jsonrpsee::core::ClientError;
21use jsonrpsee::core::client::ClientT as _;
22use jsonrpsee::core::params::{ArrayParams, ObjectParams};
23use jsonrpsee::core::traits::ToRpcParams;
24use serde::de::DeserializeOwned;
25use tracing::{Instrument, Level, debug};
26use url::Url;
27
28use super::{ApiPaths, MAX_REQUEST_BODY_SIZE, MAX_RESPONSE_BODY_SIZE, Request};
29
30/// A JSON-RPC client that can dispatch either a [`crate::rpc::Request`] to a single URL.
31pub struct Client {
32    /// SHOULD end in a slash, due to our use of [`Url::join`].
33    base_url: Url,
34    token: Option<String>,
35    // just having these versions inline is easier than using a map
36    v0: tokio::sync::OnceCell<UrlClient>,
37    v1: tokio::sync::OnceCell<UrlClient>,
38    v2: tokio::sync::OnceCell<UrlClient>,
39}
40
41impl Client {
42    /// Use either the URL in the environment or a default.
43    ///
44    /// If `token` is provided, use that over the token in either of the above.
45    pub fn default_or_from_env(token: Option<&str>) -> anyhow::Result<Self> {
46        static DEFAULT: LazyLock<Url> = LazyLock::new(|| "http://127.0.0.1:2345/".parse().unwrap());
47
48        let mut base_url = match env::var("FULLNODE_API_INFO") {
49            Ok(it) => {
50                let crate::utils::UrlFromMultiAddr(url) = it.parse()?;
51                url
52            }
53            Err(env::VarError::NotPresent) => DEFAULT.clone(),
54            Err(e @ env::VarError::NotUnicode(_)) => bail!(e),
55        };
56        if token.is_some() && base_url.set_password(token).is_err() {
57            bail!("couldn't set override password")
58        }
59        // Set default token if not provided
60        if token.is_none() && base_url.password().is_none() {
61            let client_config = crate::cli_shared::cli::Client::default();
62            let default_token_path = client_config.default_rpc_token_path();
63            if default_token_path.is_file() {
64                if let Ok(token) = std::fs::read_to_string(&default_token_path) {
65                    if base_url.set_password(Some(token.trim())).is_ok() {
66                        tracing::debug!("Loaded the default RPC token");
67                    } else {
68                        tracing::warn!("Failed to set the default RPC token");
69                    }
70                } else {
71                    tracing::warn!("Failed to load the default token file");
72                }
73            }
74        }
75        Ok(Self::from_url(base_url))
76    }
77    pub fn from_url(mut base_url: Url) -> Self {
78        let token = base_url.password().map(Into::into);
79        let _defer = base_url.set_password(None);
80        Self {
81            token,
82            base_url,
83            v0: Default::default(),
84            v1: Default::default(),
85            v2: Default::default(),
86        }
87    }
88    pub fn base_url(&self) -> &Url {
89        &self.base_url
90    }
91    pub async fn call<T: crate::lotus_json::HasLotusJson + std::fmt::Debug>(
92        &self,
93        req: Request<T>,
94    ) -> Result<T, ClientError> {
95        let api_path = req.api_path;
96        let Request {
97            method_name,
98            params,
99            timeout,
100            ..
101        } = req;
102        let method_name = method_name.as_ref();
103        let client = self.get_or_init_client(api_path).await?;
104        let span = tracing::debug_span!("request", method = %method_name, url = %client.url);
105        let work = async {
106            // jsonrpsee's clients have a global `timeout`, but not a per-request timeout, which
107            // RpcRequest expects.
108            // So shim in our own timeout
109            let result_or_timeout = tokio::time::timeout(
110                timeout,
111                match params {
112                    serde_json::Value::Null => Either::Left(Either::Left(
113                        client.request::<T::LotusJson, _>(method_name, ArrayParams::new()),
114                    )),
115                    serde_json::Value::Array(it) => {
116                        let mut params = ArrayParams::new();
117                        for param in it {
118                            params.insert(param)?
119                        }
120                        trace_params(params.clone());
121                        Either::Left(Either::Right(client.request(method_name, params)))
122                    }
123                    serde_json::Value::Object(it) => {
124                        let mut params = ObjectParams::new();
125                        for (name, param) in it {
126                            params.insert(&name, param)?
127                        }
128                        trace_params(params.clone());
129                        Either::Right(client.request(method_name, params))
130                    }
131                    prim @ (serde_json::Value::Bool(_)
132                    | serde_json::Value::Number(_)
133                    | serde_json::Value::String(_)) => {
134                        return Err(ClientError::Custom(format!(
135                            "invalid parameter type: `{prim}`"
136                        )));
137                    }
138                },
139            )
140            .await;
141            let result = match result_or_timeout {
142                Ok(Ok(it)) => Ok(T::from_lotus_json(it)),
143                Ok(Err(e)) => Err(e),
144                Err(_) => Err(ClientError::RequestTimeout),
145            };
146            debug!(?result);
147            result
148        };
149        work.instrument(span.or_current()).await
150    }
151    async fn get_or_init_client(&self, path: ApiPaths) -> Result<&UrlClient, ClientError> {
152        match path {
153            ApiPaths::V0 => &self.v0,
154            ApiPaths::V1 => &self.v1,
155            ApiPaths::V2 => &self.v2,
156        }
157        .get_or_try_init(|| async {
158            let url = self.base_url.join(path.path()).map_err(|it| {
159                ClientError::Custom(format!("creating url for endpoint failed: {it}"))
160            })?;
161            UrlClient::new(url, self.token.clone()).await
162        })
163        .await
164    }
165}
166
167fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) {
168    if tracing::enabled!(Level::TRACE) {
169        match params.to_rpc_params() {
170            Ok(Some(it)) => tracing::trace!(params = %it),
171            Ok(None) => tracing::trace!("no params"),
172            Err(error) => tracing::trace!(%error, "couldn't decode params"),
173        }
174    }
175}
176
177/// Represents a single, perhaps persistent connection to a URL over which requests
178/// can be made using [`jsonrpsee`] primitives.
179struct UrlClient {
180    url: Url,
181    inner: UrlClientInner,
182}
183
184impl Debug for UrlClient {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        f.debug_struct("OneClient")
187            .field("url", &self.url)
188            .finish_non_exhaustive()
189    }
190}
191
192impl UrlClient {
193    async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
194        const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves.
195        let headers = match token.into() {
196            Some(token) => HeaderMap::from_iter([(
197                header::AUTHORIZATION,
198                match HeaderValue::try_from(format!("Bearer {token}")) {
199                    Ok(token) => token,
200                    Err(e) => {
201                        return Err(ClientError::Custom(format!(
202                            "Invalid authorization token: {e}",
203                        )));
204                    }
205                },
206            )]),
207            None => HeaderMap::new(),
208        };
209        let inner = match url.scheme() {
210            "ws" | "wss" => UrlClientInner::Ws(
211                jsonrpsee::ws_client::WsClientBuilder::new()
212                    .set_headers(headers)
213                    .max_request_size(MAX_REQUEST_BODY_SIZE)
214                    .max_response_size(MAX_RESPONSE_BODY_SIZE)
215                    .request_timeout(ONE_DAY)
216                    .build(&url)
217                    .await?,
218            ),
219            "http" | "https" => UrlClientInner::Https(
220                jsonrpsee::http_client::HttpClientBuilder::new()
221                    .set_headers(headers)
222                    .max_request_size(MAX_REQUEST_BODY_SIZE)
223                    .max_response_size(MAX_RESPONSE_BODY_SIZE)
224                    .request_timeout(ONE_DAY)
225                    .build(&url)?,
226            ),
227            it => {
228                return Err(ClientError::Custom(format!("Unsupported URL scheme: {it}")));
229            }
230        };
231        Ok(Self { url, inner })
232    }
233}
234
235#[allow(clippy::large_enum_variant)]
236enum UrlClientInner {
237    Ws(jsonrpsee::ws_client::WsClient),
238    Https(jsonrpsee::http_client::HttpClient),
239}
240
241impl jsonrpsee::core::client::ClientT for UrlClient {
242    fn notification<Params>(
243        &self,
244        method: &str,
245        params: Params,
246    ) -> impl Future<Output = Result<(), jsonrpsee::core::client::Error>> + Send
247    where
248        Params: ToRpcParams + Send,
249    {
250        match &self.inner {
251            UrlClientInner::Ws(it) => Either::Left(it.notification(method, params)),
252            UrlClientInner::Https(it) => Either::Right(it.notification(method, params)),
253        }
254    }
255
256    fn request<R, Params>(
257        &self,
258        method: &str,
259        params: Params,
260    ) -> impl Future<Output = Result<R, jsonrpsee::core::client::Error>> + Send
261    where
262        R: DeserializeOwned,
263        Params: ToRpcParams + Send,
264    {
265        match &self.inner {
266            UrlClientInner::Ws(it) => Either::Left(it.request(method, params)),
267            UrlClientInner::Https(it) => Either::Right(it.request(method, params)),
268        }
269    }
270
271    fn batch_request<'a, R>(
272        &self,
273        batch: jsonrpsee::core::params::BatchRequestBuilder<'a>,
274    ) -> impl Future<
275        Output = Result<
276            jsonrpsee::core::client::BatchResponse<'a, R>,
277            jsonrpsee::core::client::Error,
278        >,
279    > + Send
280    where
281        R: DeserializeOwned + fmt::Debug + 'a,
282    {
283        match &self.inner {
284            UrlClientInner::Ws(it) => Either::Left(it.batch_request(batch)),
285            UrlClientInner::Https(it) => Either::Right(it.batch_request(batch)),
286        }
287    }
288}
289
290impl jsonrpsee::core::client::SubscriptionClientT for UrlClient {
291    fn subscribe<'a, N, Params>(
292        &self,
293        subscribe_method: &'a str,
294        params: Params,
295        unsubscribe_method: &'a str,
296    ) -> impl Future<
297        Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
298    >
299    where
300        Params: ToRpcParams + Send,
301        N: DeserializeOwned,
302    {
303        match &self.inner {
304            UrlClientInner::Ws(it) => {
305                Either::Left(it.subscribe(subscribe_method, params, unsubscribe_method))
306            }
307            UrlClientInner::Https(it) => {
308                Either::Right(it.subscribe(subscribe_method, params, unsubscribe_method))
309            }
310        }
311    }
312
313    fn subscribe_to_method<N>(
314        &self,
315        method: &str,
316    ) -> impl Future<
317        Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
318    >
319    where
320        N: DeserializeOwned,
321    {
322        match &self.inner {
323            UrlClientInner::Ws(it) => Either::Left(it.subscribe_to_method(method)),
324            UrlClientInner::Https(it) => Either::Right(it.subscribe_to_method(method)),
325        }
326    }
327}