use crate::{Message, Request as KiRequest};
pub use alloy::rpc::json_rpc::ErrorPayload;
pub use alloy::rpc::types::eth::pubsub::SubscriptionResult;
pub use alloy::rpc::types::pubsub::Params;
pub use alloy::rpc::types::{
request::{TransactionInput, TransactionRequest},
Block, BlockId, BlockNumberOrTag, FeeHistory, Filter, FilterBlockOption, Log, Transaction,
TransactionReceipt,
};
pub use alloy::transports::Authorization as AlloyAuthorization;
pub use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, TxHash, U128, U256, U64, U8};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fmt;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub enum SubscriptionKind {
NewHeads,
Logs,
NewPendingTransactions,
Syncing,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthAction {
SubscribeLogs {
sub_id: u64,
chain_id: u64,
kind: SubscriptionKind,
params: serde_json::Value,
},
UnsubscribeLogs(u64),
Request {
chain_id: u64,
method: String,
params: serde_json::Value,
},
}
pub type EthSubResult = Result<EthSub, EthSubError>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EthSub {
pub id: u64,
pub result: serde_json::Value,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EthSubError {
pub id: u64,
pub error: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthResponse {
Ok,
Response(serde_json::Value),
Err(EthError),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthError {
RpcError(serde_json::Value),
MalformedRequest,
NoRpcForChain,
SubscriptionClosed(u64),
InvalidMethod(String),
InvalidParams,
PermissionDenied,
RpcTimeout,
RpcMalformedResponse,
}
impl fmt::Display for EthError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
EthError::RpcError(e) => write!(f, "RPC error: {:?}", e),
EthError::MalformedRequest => write!(f, "Malformed request"),
EthError::NoRpcForChain => write!(f, "No RPC provider for chain"),
EthError::SubscriptionClosed(id) => write!(f, "Subscription {} closed", id),
EthError::InvalidMethod(m) => write!(f, "Invalid method: {}", m),
EthError::InvalidParams => write!(f, "Invalid parameters"),
EthError::PermissionDenied => write!(f, "Permission denied"),
EthError::RpcTimeout => write!(f, "RPC request timed out"),
EthError::RpcMalformedResponse => write!(f, "RPC returned malformed response"),
}
}
}
impl Error for EthError {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthConfigAction {
AddProvider(ProviderConfig),
RemoveProvider((u64, String)),
SetPublic,
SetPrivate,
AllowNode(String),
UnallowNode(String),
DenyNode(String),
UndenyNode(String),
SetProviders(SavedConfigs),
GetProviders,
GetAccessSettings,
GetState,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthConfigResponse {
Ok,
Providers(SavedConfigs),
AccessSettings(AccessSettings),
PermissionDenied,
State {
active_subscriptions: HashMap<crate::Address, HashMap<u64, Option<String>>>, outstanding_requests: HashSet<u64>,
},
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct AccessSettings {
pub public: bool, pub allow: HashSet<String>, pub deny: HashSet<String>, }
pub type SavedConfigs = HashSet<ProviderConfig>;
#[derive(Clone, Debug, Deserialize, Serialize, Hash, Eq, PartialEq)]
pub struct ProviderConfig {
pub chain_id: u64,
pub trusted: bool,
pub provider: NodeOrRpcUrl,
}
#[derive(Clone, Debug, Deserialize, Serialize, Hash, Eq, PartialEq)]
pub enum Authorization {
Basic(String),
Bearer(String),
Raw(String),
}
impl From<Authorization> for AlloyAuthorization {
fn from(auth: Authorization) -> AlloyAuthorization {
match auth {
Authorization::Basic(value) => AlloyAuthorization::Basic(value),
Authorization::Bearer(value) => AlloyAuthorization::Bearer(value),
Authorization::Raw(value) => AlloyAuthorization::Raw(value),
}
}
}
#[derive(Clone, Debug, Serialize, Hash, Eq, PartialEq)]
pub enum NodeOrRpcUrl {
Node {
kns_update: crate::net::KnsUpdate,
use_as_provider: bool, },
RpcUrl {
url: String,
auth: Option<Authorization>,
},
}
impl std::cmp::PartialEq<str> for NodeOrRpcUrl {
fn eq(&self, other: &str) -> bool {
match self {
NodeOrRpcUrl::Node { kns_update, .. } => kns_update.name == other,
NodeOrRpcUrl::RpcUrl { url, .. } => url == other,
}
}
}
impl<'de> Deserialize<'de> for NodeOrRpcUrl {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum RpcUrlHelper {
String(String),
Struct {
url: String,
auth: Option<Authorization>,
},
}
#[derive(Deserialize)]
enum Helper {
Node {
kns_update: crate::net::KnsUpdate,
use_as_provider: bool,
},
RpcUrl(RpcUrlHelper),
}
let helper = Helper::deserialize(deserializer)?;
Ok(match helper {
Helper::Node {
kns_update,
use_as_provider,
} => NodeOrRpcUrl::Node {
kns_update,
use_as_provider,
},
Helper::RpcUrl(url_helper) => match url_helper {
RpcUrlHelper::String(url) => NodeOrRpcUrl::RpcUrl { url, auth: None },
RpcUrlHelper::Struct { url, auth } => NodeOrRpcUrl::RpcUrl { url, auth },
},
})
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Provider {
chain_id: u64,
request_timeout: u64,
}
impl Provider {
pub fn new(chain_id: u64, request_timeout: u64) -> Self {
Self {
chain_id,
request_timeout,
}
}
pub fn send_request_and_parse_response<T: serde::de::DeserializeOwned>(
&self,
action: EthAction,
) -> Result<T, EthError> {
let resp = KiRequest::new()
.target(("our", "eth", "distro", "sys"))
.body(serde_json::to_vec(&action).unwrap())
.send_and_await_response(self.request_timeout)
.unwrap()
.map_err(|_| EthError::RpcTimeout)?;
match resp {
Message::Response { body, .. } => match serde_json::from_slice::<EthResponse>(&body) {
Ok(EthResponse::Response(value)) => {
serde_json::from_value::<T>(value).map_err(|_| EthError::RpcMalformedResponse)
}
Ok(EthResponse::Err(e)) => Err(e),
_ => Err(EthError::RpcMalformedResponse),
},
_ => Err(EthError::RpcMalformedResponse),
}
}
pub fn get_block_number(&self) -> Result<u64, EthError> {
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_blockNumber".to_string(),
params: ().into(),
};
let res = self.send_request_and_parse_response::<U64>(action)?;
Ok(res.to::<u64>())
}
pub fn get_balance(&self, address: Address, tag: Option<BlockId>) -> Result<U256, EthError> {
let params = serde_json::to_value((
address,
tag.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)),
))
.unwrap();
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getBalance".to_string(),
params,
};
self.send_request_and_parse_response::<U256>(action)
}
pub fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, EthError> {
let Ok(params) = serde_json::to_value((filter,)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getLogs".to_string(),
params,
};
self.send_request_and_parse_response::<Vec<Log>>(action)
}
pub fn get_gas_price(&self) -> Result<U256, EthError> {
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_gasPrice".to_string(),
params: ().into(),
};
self.send_request_and_parse_response::<U256>(action)
}
pub fn get_transaction_count(
&self,
address: Address,
tag: Option<BlockId>,
) -> Result<U256, EthError> {
let Ok(params) = serde_json::to_value((address, tag.unwrap_or_default())) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getTransactionCount".to_string(),
params,
};
self.send_request_and_parse_response::<U256>(action)
}
pub fn get_block_by_hash(
&self,
hash: BlockHash,
full_tx: bool,
) -> Result<Option<Block>, EthError> {
let Ok(params) = serde_json::to_value((hash, full_tx)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getBlockByHash".to_string(),
params,
};
self.send_request_and_parse_response::<Option<Block>>(action)
}
pub fn get_block_by_number(
&self,
number: BlockNumberOrTag,
full_tx: bool,
) -> Result<Option<Block>, EthError> {
let Ok(params) = serde_json::to_value((number, full_tx)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getBlockByNumber".to_string(),
params,
};
self.send_request_and_parse_response::<Option<Block>>(action)
}
pub fn get_storage_at(
&self,
address: Address,
key: U256,
tag: Option<BlockId>,
) -> Result<Bytes, EthError> {
let Ok(params) = serde_json::to_value((address, key, tag.unwrap_or_default())) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getStorageAt".to_string(),
params,
};
self.send_request_and_parse_response::<Bytes>(action)
}
pub fn get_code_at(&self, address: Address, tag: BlockId) -> Result<Bytes, EthError> {
let Ok(params) = serde_json::to_value((address, tag)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getCode".to_string(),
params,
};
self.send_request_and_parse_response::<Bytes>(action)
}
pub fn get_transaction_by_hash(&self, hash: TxHash) -> Result<Option<Transaction>, EthError> {
let Ok(params) = serde_json::to_value((hash,)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getTransactionByHash".to_string(),
params,
};
self.send_request_and_parse_response::<Option<Transaction>>(action)
}
pub fn get_transaction_receipt(
&self,
hash: TxHash,
) -> Result<Option<TransactionReceipt>, EthError> {
let Ok(params) = serde_json::to_value((hash,)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_getTransactionReceipt".to_string(),
params,
};
self.send_request_and_parse_response::<Option<TransactionReceipt>>(action)
}
pub fn estimate_gas(
&self,
tx: TransactionRequest,
block: Option<BlockId>,
) -> Result<U256, EthError> {
let Ok(params) = serde_json::to_value((tx, block.unwrap_or_default())) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_estimateGas".to_string(),
params,
};
self.send_request_and_parse_response::<U256>(action)
}
pub fn get_accounts(&self) -> Result<Vec<Address>, EthError> {
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_accounts".to_string(),
params: serde_json::Value::Array(vec![]),
};
self.send_request_and_parse_response::<Vec<Address>>(action)
}
pub fn get_fee_history(
&self,
block_count: U256,
last_block: BlockNumberOrTag,
reward_percentiles: Vec<f64>,
) -> Result<FeeHistory, EthError> {
let Ok(params) = serde_json::to_value((block_count, last_block, reward_percentiles)) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_feeHistory".to_string(),
params,
};
self.send_request_and_parse_response::<FeeHistory>(action)
}
pub fn call(&self, tx: TransactionRequest, block: Option<BlockId>) -> Result<Bytes, EthError> {
let Ok(params) = serde_json::to_value((tx, block.unwrap_or_default())) else {
return Err(EthError::InvalidParams);
};
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_call".to_string(),
params,
};
self.send_request_and_parse_response::<Bytes>(action)
}
pub fn kimap(&self) -> crate::kimap::Kimap {
crate::kimap::Kimap::default(self.request_timeout)
}
pub fn kimap_with_address(self, address: Address) -> crate::kimap::Kimap {
crate::kimap::Kimap::new(self, address)
}
pub fn send_raw_transaction(&self, tx: Bytes) -> Result<TxHash, EthError> {
let action = EthAction::Request {
chain_id: self.chain_id,
method: "eth_sendRawTransaction".to_string(),
params: serde_json::to_value((tx,)).unwrap(),
};
self.send_request_and_parse_response::<TxHash>(action)
}
pub fn subscribe(&self, sub_id: u64, filter: Filter) -> Result<(), EthError> {
let action = EthAction::SubscribeLogs {
sub_id,
chain_id: self.chain_id,
kind: SubscriptionKind::Logs,
params: serde_json::to_value(Params::Logs(Box::new(filter)))
.map_err(|_| EthError::InvalidParams)?,
};
let Ok(body) = serde_json::to_vec(&action) else {
return Err(EthError::InvalidParams);
};
let resp = KiRequest::new()
.target(("our", "eth", "distro", "sys"))
.body(body)
.send_and_await_response(self.request_timeout)
.unwrap()
.map_err(|_| EthError::RpcTimeout)?;
match resp {
Message::Response { body, .. } => {
let response = serde_json::from_slice::<EthResponse>(&body);
match response {
Ok(EthResponse::Ok) => Ok(()),
Ok(EthResponse::Err(e)) => Err(e),
_ => Err(EthError::RpcMalformedResponse),
}
}
_ => Err(EthError::RpcMalformedResponse),
}
}
pub fn subscribe_loop(
&self,
sub_id: u64,
filter: Filter,
print_verbosity_success: u8,
print_verbosity_error: u8,
) {
loop {
match self.subscribe(sub_id, filter.clone()) {
Ok(()) => break,
Err(_) => {
crate::print_to_terminal(
print_verbosity_error,
"failed to subscribe to chain! trying again in 5s...",
);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
crate::print_to_terminal(print_verbosity_success, "subscribed to logs successfully");
}
pub fn unsubscribe(&self, sub_id: u64) -> Result<(), EthError> {
let action = EthAction::UnsubscribeLogs(sub_id);
let resp = KiRequest::new()
.target(("our", "eth", "distro", "sys"))
.body(serde_json::to_vec(&action).map_err(|_| EthError::MalformedRequest)?)
.send_and_await_response(self.request_timeout)
.unwrap()
.map_err(|_| EthError::RpcTimeout)?;
match resp {
Message::Response { body, .. } => match serde_json::from_slice::<EthResponse>(&body) {
Ok(EthResponse::Ok) => Ok(()),
_ => Err(EthError::RpcMalformedResponse),
},
_ => Err(EthError::RpcMalformedResponse),
}
}
}