use crate::{
jsonrpc::{
self,
batch::{self, Batch},
Id, Request, Response, Version,
},
method::Method,
types::Empty,
};
use reqwest::{StatusCode, Url};
use serde::{de::DeserializeOwned, Serialize};
use std::{
env,
sync::atomic::{AtomicU32, Ordering},
};
use thiserror::Error;
pub struct Client {
client: reqwest::Client,
url: Url,
id: AtomicU32,
}
impl Client {
pub fn new(url: Url) -> Self {
Self::with_client(reqwest::Client::new(), url)
}
pub fn with_client(client: reqwest::Client, url: Url) -> Self {
Self {
client,
url,
id: Default::default(),
}
}
pub fn from_env() -> Self {
Self::new(
env::var("NODE_URL")
.expect("missing NODE_URL environment variable")
.parse()
.unwrap(),
)
}
fn next_id(&self) -> Id {
Id(self.id.fetch_add(1, Ordering::Relaxed))
}
async fn roundtrip<P, R>(&self, request: P) -> Result<R, ClientError>
where
P: Serialize,
R: DeserializeOwned,
{
let request = serde_json::to_string(&request)?;
let response = self
.client
.post(self.url.clone())
.header("content-type", "application/json")
.body(request)
.send()
.await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
return Err(ClientError::Status(status, body));
}
let result = serde_json::from_str(&body)?;
Ok(result)
}
pub async fn execute<M>(&self, method: M, params: M::Params) -> Result<M::Result, ClientError>
where
M: Method + Serialize,
{
Ok(self
.roundtrip::<_, Response<M>>(Request {
jsonrpc: Version::V2,
method,
params,
id: self.next_id(),
})
.await?
.result?)
}
pub async fn call<M>(&self, method: M) -> Result<M::Result, ClientError>
where
M: Method<Params = Empty> + Serialize,
{
self.execute::<M>(method, Empty).await
}
pub async fn batch<B>(&self, batch: B) -> Result<B::Values, ClientError>
where
B: Batch,
{
let results = self.try_batch(batch).await?;
let values = B::values(results)?;
Ok(values)
}
pub async fn try_batch<B>(&self, batch: B) -> Result<B::Results, ClientError>
where
B: Batch,
{
let mut requests = batch.serialize_requests()?;
for request in &mut requests {
request.id = self.next_id();
}
let mut responses = self.roundtrip::<_, Vec<batch::Response>>(&requests).await?;
if responses.len() != requests.len()
|| responses.iter().any(|response| response.id.is_none())
{
return Err(ClientError::Batch);
}
responses.sort_unstable_by_key(|response| response.id.unwrap().0);
if responses
.iter()
.zip(requests.iter())
.any(|(response, request)| response.id.unwrap() != request.id)
{
return Err(ClientError::Batch);
}
let results = B::deserialize_responses(responses)?;
Ok(results)
}
}
#[derive(Debug, Error)]
pub enum ClientError {
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("{0}: {1}")]
Status(StatusCode, String),
#[error("RPC error: {0}")]
Rpc(#[from] jsonrpc::Error),
#[error("batch responses do not match requests")]
Batch,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
eth,
types::{BlockId, BlockTag, Empty, Hydrated, TransactionCall},
web3,
};
use ethprim::{address, Digest};
use hex_literal::hex;
#[tokio::test]
#[ignore]
async fn connect_to_node() {
let client = Client::from_env();
let version = client.call(web3::ClientVersion).await.unwrap();
println!("client version: {version}");
}
#[tokio::test]
#[ignore]
async fn uses_conversion_types_for_serialization() {
let client = Client::from_env();
let domain = Digest::from_slice(
&client
.execute(
eth::Call,
(
TransactionCall {
to: Some(address!("0x9008D19f58AAbD9eD0D60971565AA8510560ab41")),
input: Some(hex!("f698da25").to_vec()),
..Default::default()
},
BlockId::default(),
),
)
.await
.unwrap(),
);
println!("CoW Protocol domain separator: {domain}");
}
#[tokio::test]
#[ignore]
async fn batch_request() {
let client = Client::from_env();
let (latest, safe) = client
.batch((
(eth::BlockNumber, Empty),
(eth::GetBlockByNumber, (BlockTag::Safe.into(), Hydrated::No)),
))
.await
.unwrap();
println!("Latest block: {latest}");
println!("Safe block: {}", safe.unwrap().number);
}
}