1use std::env;
13use std::fmt::{self, Debug};
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use anyhow::bail;
18use futures::future::Either;
19use http::{HeaderMap, HeaderValue, header};
20use jsonrpsee::core::ClientError;
21use jsonrpsee::core::client::ClientT as _;
22use jsonrpsee::core::params::{ArrayParams, ObjectParams};
23use jsonrpsee::core::traits::ToRpcParams;
24use serde::de::DeserializeOwned;
25use tracing::{Instrument, Level, debug};
26use url::Url;
27
28use super::{ApiPaths, MAX_REQUEST_BODY_SIZE, MAX_RESPONSE_BODY_SIZE, Request};
29
30pub struct Client {
32 base_url: Url,
34 token: Option<String>,
35 v0: tokio::sync::OnceCell<UrlClient>,
37 v1: tokio::sync::OnceCell<UrlClient>,
38 v2: tokio::sync::OnceCell<UrlClient>,
39}
40
41impl Client {
42 pub fn default_or_from_env(token: Option<&str>) -> anyhow::Result<Self> {
46 static DEFAULT: LazyLock<Url> = LazyLock::new(|| "http://127.0.0.1:2345/".parse().unwrap());
47
48 let mut base_url = match env::var("FULLNODE_API_INFO") {
49 Ok(it) => {
50 let crate::utils::UrlFromMultiAddr(url) = it.parse()?;
51 url
52 }
53 Err(env::VarError::NotPresent) => DEFAULT.clone(),
54 Err(e @ env::VarError::NotUnicode(_)) => bail!(e),
55 };
56 if token.is_some() && base_url.set_password(token).is_err() {
57 bail!("couldn't set override password")
58 }
59 if token.is_none() && base_url.password().is_none() {
61 let client_config = crate::cli_shared::cli::Client::default();
62 let default_token_path = client_config.default_rpc_token_path();
63 if default_token_path.is_file() {
64 if let Ok(token) = std::fs::read_to_string(&default_token_path) {
65 if base_url.set_password(Some(token.trim())).is_ok() {
66 tracing::debug!("Loaded the default RPC token");
67 } else {
68 tracing::warn!("Failed to set the default RPC token");
69 }
70 } else {
71 tracing::warn!("Failed to load the default token file");
72 }
73 }
74 }
75 Ok(Self::from_url(base_url))
76 }
77 pub fn from_url(mut base_url: Url) -> Self {
78 let token = base_url.password().map(Into::into);
79 let _defer = base_url.set_password(None);
80 Self {
81 token,
82 base_url,
83 v0: Default::default(),
84 v1: Default::default(),
85 v2: Default::default(),
86 }
87 }
88 pub fn base_url(&self) -> &Url {
89 &self.base_url
90 }
91 pub async fn call<T: crate::lotus_json::HasLotusJson + std::fmt::Debug>(
92 &self,
93 req: Request<T>,
94 ) -> Result<T, ClientError> {
95 let max_api_path = req
96 .api_path()
97 .map_err(|e| ClientError::Custom(e.to_string()))?;
98 let Request {
99 method_name,
100 params,
101 timeout,
102 ..
103 } = req;
104 let method_name = method_name.as_ref();
105 let client = self.get_or_init_client(max_api_path).await?;
106 let span = tracing::debug_span!("request", method = %method_name, url = %client.url);
107 let work = async {
108 let result_or_timeout = tokio::time::timeout(
112 timeout,
113 match params {
114 serde_json::Value::Null => Either::Left(Either::Left(
115 client.request::<T::LotusJson, _>(method_name, ArrayParams::new()),
116 )),
117 serde_json::Value::Array(it) => {
118 let mut params = ArrayParams::new();
119 for param in it {
120 params.insert(param)?
121 }
122 trace_params(params.clone());
123 Either::Left(Either::Right(client.request(method_name, params)))
124 }
125 serde_json::Value::Object(it) => {
126 let mut params = ObjectParams::new();
127 for (name, param) in it {
128 params.insert(&name, param)?
129 }
130 trace_params(params.clone());
131 Either::Right(client.request(method_name, params))
132 }
133 prim @ (serde_json::Value::Bool(_)
134 | serde_json::Value::Number(_)
135 | serde_json::Value::String(_)) => {
136 return Err(ClientError::Custom(format!(
137 "invalid parameter type: `{prim}`"
138 )));
139 }
140 },
141 )
142 .await;
143 let result = match result_or_timeout {
144 Ok(Ok(it)) => Ok(T::from_lotus_json(it)),
145 Ok(Err(e)) => Err(e),
146 Err(_) => Err(ClientError::RequestTimeout),
147 };
148 debug!(?result);
149 result
150 };
151 work.instrument(span.or_current()).await
152 }
153 async fn get_or_init_client(&self, path: ApiPaths) -> Result<&UrlClient, ClientError> {
154 match path {
155 ApiPaths::V0 => &self.v0,
156 ApiPaths::V1 => &self.v1,
157 ApiPaths::V2 => &self.v2,
158 }
159 .get_or_try_init(|| async {
160 let url = self.base_url.join(path.path()).map_err(|it| {
161 ClientError::Custom(format!("creating url for endpoint failed: {it}"))
162 })?;
163 UrlClient::new(url, self.token.clone()).await
164 })
165 .await
166 }
167}
168
169fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) {
170 if tracing::enabled!(Level::TRACE) {
171 match params.to_rpc_params() {
172 Ok(Some(it)) => tracing::trace!(params = %it),
173 Ok(None) => tracing::trace!("no params"),
174 Err(error) => tracing::trace!(%error, "couldn't decode params"),
175 }
176 }
177}
178
179struct UrlClient {
182 url: Url,
183 inner: UrlClientInner,
184}
185
186impl Debug for UrlClient {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 f.debug_struct("OneClient")
189 .field("url", &self.url)
190 .finish_non_exhaustive()
191 }
192}
193
194impl UrlClient {
195 async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
196 const ONE_DAY: Duration = Duration::from_secs(24 * 3600); let headers = match token.into() {
198 Some(token) => HeaderMap::from_iter([(
199 header::AUTHORIZATION,
200 match HeaderValue::try_from(format!("Bearer {token}")) {
201 Ok(token) => token,
202 Err(e) => {
203 return Err(ClientError::Custom(format!(
204 "Invalid authorization token: {e}",
205 )));
206 }
207 },
208 )]),
209 None => HeaderMap::new(),
210 };
211 let inner = match url.scheme() {
212 "ws" | "wss" => UrlClientInner::Ws(
213 jsonrpsee::ws_client::WsClientBuilder::new()
214 .set_headers(headers)
215 .max_request_size(MAX_REQUEST_BODY_SIZE)
216 .max_response_size(MAX_RESPONSE_BODY_SIZE)
217 .request_timeout(ONE_DAY)
218 .build(&url)
219 .await?,
220 ),
221 "http" | "https" => UrlClientInner::Https(
222 jsonrpsee::http_client::HttpClientBuilder::new()
223 .set_headers(headers)
224 .max_request_size(MAX_REQUEST_BODY_SIZE)
225 .max_response_size(MAX_RESPONSE_BODY_SIZE)
226 .request_timeout(ONE_DAY)
227 .build(&url)?,
228 ),
229 it => {
230 return Err(ClientError::Custom(format!("Unsupported URL scheme: {it}")));
231 }
232 };
233 Ok(Self { url, inner })
234 }
235}
236
237#[allow(clippy::large_enum_variant)]
238enum UrlClientInner {
239 Ws(jsonrpsee::ws_client::WsClient),
240 Https(jsonrpsee::http_client::HttpClient),
241}
242
243impl jsonrpsee::core::client::ClientT for UrlClient {
244 fn notification<Params>(
245 &self,
246 method: &str,
247 params: Params,
248 ) -> impl Future<Output = Result<(), jsonrpsee::core::client::Error>> + Send
249 where
250 Params: ToRpcParams + Send,
251 {
252 match &self.inner {
253 UrlClientInner::Ws(it) => Either::Left(it.notification(method, params)),
254 UrlClientInner::Https(it) => Either::Right(it.notification(method, params)),
255 }
256 }
257
258 fn request<R, Params>(
259 &self,
260 method: &str,
261 params: Params,
262 ) -> impl Future<Output = Result<R, jsonrpsee::core::client::Error>> + Send
263 where
264 R: DeserializeOwned,
265 Params: ToRpcParams + Send,
266 {
267 match &self.inner {
268 UrlClientInner::Ws(it) => Either::Left(it.request(method, params)),
269 UrlClientInner::Https(it) => Either::Right(it.request(method, params)),
270 }
271 }
272
273 fn batch_request<'a, R>(
274 &self,
275 batch: jsonrpsee::core::params::BatchRequestBuilder<'a>,
276 ) -> impl Future<
277 Output = Result<
278 jsonrpsee::core::client::BatchResponse<'a, R>,
279 jsonrpsee::core::client::Error,
280 >,
281 > + Send
282 where
283 R: DeserializeOwned + fmt::Debug + 'a,
284 {
285 match &self.inner {
286 UrlClientInner::Ws(it) => Either::Left(it.batch_request(batch)),
287 UrlClientInner::Https(it) => Either::Right(it.batch_request(batch)),
288 }
289 }
290}
291
292impl jsonrpsee::core::client::SubscriptionClientT for UrlClient {
293 fn subscribe<'a, N, Params>(
294 &self,
295 subscribe_method: &'a str,
296 params: Params,
297 unsubscribe_method: &'a str,
298 ) -> impl Future<
299 Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
300 >
301 where
302 Params: ToRpcParams + Send,
303 N: DeserializeOwned,
304 {
305 match &self.inner {
306 UrlClientInner::Ws(it) => {
307 Either::Left(it.subscribe(subscribe_method, params, unsubscribe_method))
308 }
309 UrlClientInner::Https(it) => {
310 Either::Right(it.subscribe(subscribe_method, params, unsubscribe_method))
311 }
312 }
313 }
314
315 fn subscribe_to_method<N>(
316 &self,
317 method: &str,
318 ) -> impl Future<
319 Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
320 >
321 where
322 N: DeserializeOwned,
323 {
324 match &self.inner {
325 UrlClientInner::Ws(it) => Either::Left(it.subscribe_to_method(method)),
326 UrlClientInner::Https(it) => Either::Right(it.subscribe_to_method(method)),
327 }
328 }
329}