1use std::env;
13use std::fmt::{self, Debug};
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use anyhow::bail;
18use futures::future::Either;
19use http::{HeaderMap, HeaderValue, header};
20use jsonrpsee::core::ClientError;
21use jsonrpsee::core::client::ClientT as _;
22use jsonrpsee::core::params::{ArrayParams, ObjectParams};
23use jsonrpsee::core::traits::ToRpcParams;
24use serde::de::DeserializeOwned;
25use tracing::{Instrument, Level, debug};
26use url::Url;
27
28use super::{ApiPaths, MAX_REQUEST_BODY_SIZE, MAX_RESPONSE_BODY_SIZE, Request};
29
30pub struct Client {
32 base_url: Url,
34 token: Option<String>,
35 v0: tokio::sync::OnceCell<UrlClient>,
37 v1: tokio::sync::OnceCell<UrlClient>,
38 v2: tokio::sync::OnceCell<UrlClient>,
39}
40
41impl Client {
42 pub fn default_or_from_env(token: Option<&str>) -> anyhow::Result<Self> {
46 static DEFAULT: LazyLock<Url> = LazyLock::new(|| "http://127.0.0.1:2345/".parse().unwrap());
47
48 let mut base_url = match env::var("FULLNODE_API_INFO") {
49 Ok(it) => {
50 let crate::utils::UrlFromMultiAddr(url) = it.parse()?;
51 url
52 }
53 Err(env::VarError::NotPresent) => DEFAULT.clone(),
54 Err(e @ env::VarError::NotUnicode(_)) => bail!(e),
55 };
56 if token.is_some() && base_url.set_password(token).is_err() {
57 bail!("couldn't set override password")
58 }
59 if token.is_none() && base_url.password().is_none() {
61 let client_config = crate::cli_shared::cli::Client::default();
62 let default_token_path = client_config.default_rpc_token_path();
63 if default_token_path.is_file() {
64 if let Ok(token) = std::fs::read_to_string(&default_token_path) {
65 if base_url.set_password(Some(token.trim())).is_ok() {
66 tracing::debug!("Loaded the default RPC token");
67 } else {
68 tracing::warn!("Failed to set the default RPC token");
69 }
70 } else {
71 tracing::warn!("Failed to load the default token file");
72 }
73 }
74 }
75 Ok(Self::from_url(base_url))
76 }
77 pub fn from_url(mut base_url: Url) -> Self {
78 let token = base_url.password().map(Into::into);
79 let _defer = base_url.set_password(None);
80 Self {
81 token,
82 base_url,
83 v0: Default::default(),
84 v1: Default::default(),
85 v2: Default::default(),
86 }
87 }
88 pub fn base_url(&self) -> &Url {
89 &self.base_url
90 }
91 pub async fn call<T: crate::lotus_json::HasLotusJson + std::fmt::Debug>(
92 &self,
93 req: Request<T>,
94 ) -> Result<T, ClientError> {
95 let api_path = req.api_path;
96 let Request {
97 method_name,
98 params,
99 timeout,
100 ..
101 } = req;
102 let method_name = method_name.as_ref();
103 let client = self.get_or_init_client(api_path).await?;
104 let span = tracing::debug_span!("request", method = %method_name, url = %client.url);
105 let work = async {
106 let result_or_timeout = tokio::time::timeout(
110 timeout,
111 match params {
112 serde_json::Value::Null => Either::Left(Either::Left(
113 client.request::<T::LotusJson, _>(method_name, ArrayParams::new()),
114 )),
115 serde_json::Value::Array(it) => {
116 let mut params = ArrayParams::new();
117 for param in it {
118 params.insert(param)?
119 }
120 trace_params(params.clone());
121 Either::Left(Either::Right(client.request(method_name, params)))
122 }
123 serde_json::Value::Object(it) => {
124 let mut params = ObjectParams::new();
125 for (name, param) in it {
126 params.insert(&name, param)?
127 }
128 trace_params(params.clone());
129 Either::Right(client.request(method_name, params))
130 }
131 prim @ (serde_json::Value::Bool(_)
132 | serde_json::Value::Number(_)
133 | serde_json::Value::String(_)) => {
134 return Err(ClientError::Custom(format!(
135 "invalid parameter type: `{prim}`"
136 )));
137 }
138 },
139 )
140 .await;
141 let result = match result_or_timeout {
142 Ok(Ok(it)) => Ok(T::from_lotus_json(it)),
143 Ok(Err(e)) => Err(e),
144 Err(_) => Err(ClientError::RequestTimeout),
145 };
146 debug!(?result);
147 result
148 };
149 work.instrument(span.or_current()).await
150 }
151 async fn get_or_init_client(&self, path: ApiPaths) -> Result<&UrlClient, ClientError> {
152 match path {
153 ApiPaths::V0 => &self.v0,
154 ApiPaths::V1 => &self.v1,
155 ApiPaths::V2 => &self.v2,
156 }
157 .get_or_try_init(|| async {
158 let url = self.base_url.join(path.path()).map_err(|it| {
159 ClientError::Custom(format!("creating url for endpoint failed: {it}"))
160 })?;
161 UrlClient::new(url, self.token.clone()).await
162 })
163 .await
164 }
165}
166
167fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) {
168 if tracing::enabled!(Level::TRACE) {
169 match params.to_rpc_params() {
170 Ok(Some(it)) => tracing::trace!(params = %it),
171 Ok(None) => tracing::trace!("no params"),
172 Err(error) => tracing::trace!(%error, "couldn't decode params"),
173 }
174 }
175}
176
177struct UrlClient {
180 url: Url,
181 inner: UrlClientInner,
182}
183
184impl Debug for UrlClient {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 f.debug_struct("OneClient")
187 .field("url", &self.url)
188 .finish_non_exhaustive()
189 }
190}
191
192impl UrlClient {
193 async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
194 const ONE_DAY: Duration = Duration::from_secs(24 * 3600); let headers = match token.into() {
196 Some(token) => HeaderMap::from_iter([(
197 header::AUTHORIZATION,
198 match HeaderValue::try_from(format!("Bearer {token}")) {
199 Ok(token) => token,
200 Err(e) => {
201 return Err(ClientError::Custom(format!(
202 "Invalid authorization token: {e}",
203 )));
204 }
205 },
206 )]),
207 None => HeaderMap::new(),
208 };
209 let inner = match url.scheme() {
210 "ws" | "wss" => UrlClientInner::Ws(
211 jsonrpsee::ws_client::WsClientBuilder::new()
212 .set_headers(headers)
213 .max_request_size(MAX_REQUEST_BODY_SIZE)
214 .max_response_size(MAX_RESPONSE_BODY_SIZE)
215 .request_timeout(ONE_DAY)
216 .build(&url)
217 .await?,
218 ),
219 "http" | "https" => UrlClientInner::Https(
220 jsonrpsee::http_client::HttpClientBuilder::new()
221 .set_headers(headers)
222 .max_request_size(MAX_REQUEST_BODY_SIZE)
223 .max_response_size(MAX_RESPONSE_BODY_SIZE)
224 .request_timeout(ONE_DAY)
225 .build(&url)?,
226 ),
227 it => {
228 return Err(ClientError::Custom(format!("Unsupported URL scheme: {it}")));
229 }
230 };
231 Ok(Self { url, inner })
232 }
233}
234
235#[allow(clippy::large_enum_variant)]
236enum UrlClientInner {
237 Ws(jsonrpsee::ws_client::WsClient),
238 Https(jsonrpsee::http_client::HttpClient),
239}
240
241impl jsonrpsee::core::client::ClientT for UrlClient {
242 fn notification<Params>(
243 &self,
244 method: &str,
245 params: Params,
246 ) -> impl Future<Output = Result<(), jsonrpsee::core::client::Error>> + Send
247 where
248 Params: ToRpcParams + Send,
249 {
250 match &self.inner {
251 UrlClientInner::Ws(it) => Either::Left(it.notification(method, params)),
252 UrlClientInner::Https(it) => Either::Right(it.notification(method, params)),
253 }
254 }
255
256 fn request<R, Params>(
257 &self,
258 method: &str,
259 params: Params,
260 ) -> impl Future<Output = Result<R, jsonrpsee::core::client::Error>> + Send
261 where
262 R: DeserializeOwned,
263 Params: ToRpcParams + Send,
264 {
265 match &self.inner {
266 UrlClientInner::Ws(it) => Either::Left(it.request(method, params)),
267 UrlClientInner::Https(it) => Either::Right(it.request(method, params)),
268 }
269 }
270
271 fn batch_request<'a, R>(
272 &self,
273 batch: jsonrpsee::core::params::BatchRequestBuilder<'a>,
274 ) -> impl Future<
275 Output = Result<
276 jsonrpsee::core::client::BatchResponse<'a, R>,
277 jsonrpsee::core::client::Error,
278 >,
279 > + Send
280 where
281 R: DeserializeOwned + fmt::Debug + 'a,
282 {
283 match &self.inner {
284 UrlClientInner::Ws(it) => Either::Left(it.batch_request(batch)),
285 UrlClientInner::Https(it) => Either::Right(it.batch_request(batch)),
286 }
287 }
288}
289
290impl jsonrpsee::core::client::SubscriptionClientT for UrlClient {
291 fn subscribe<'a, N, Params>(
292 &self,
293 subscribe_method: &'a str,
294 params: Params,
295 unsubscribe_method: &'a str,
296 ) -> impl Future<
297 Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
298 >
299 where
300 Params: ToRpcParams + Send,
301 N: DeserializeOwned,
302 {
303 match &self.inner {
304 UrlClientInner::Ws(it) => {
305 Either::Left(it.subscribe(subscribe_method, params, unsubscribe_method))
306 }
307 UrlClientInner::Https(it) => {
308 Either::Right(it.subscribe(subscribe_method, params, unsubscribe_method))
309 }
310 }
311 }
312
313 fn subscribe_to_method<N>(
314 &self,
315 method: &str,
316 ) -> impl Future<
317 Output = Result<jsonrpsee::core::client::Subscription<N>, jsonrpsee::core::client::Error>,
318 >
319 where
320 N: DeserializeOwned,
321 {
322 match &self.inner {
323 UrlClientInner::Ws(it) => Either::Left(it.subscribe_to_method(method)),
324 UrlClientInner::Https(it) => Either::Right(it.subscribe_to_method(method)),
325 }
326 }
327}