substreams_ethereum_core/
rpc.rs

1use crate::pb::eth::rpc::{RpcCall, RpcCalls, RpcResponse, RpcResponses};
2use crate::pb::eth::rpc::{RpcGetBalanceRequests, RpcGetBalanceResponses};
3use crate::Function;
4use substreams::proto;
5
6pub trait RPCDecodable<R> {
7    fn output(data: &[u8]) -> Result<R, String>;
8}
9
10pub struct RpcBatch {
11    store: RpcCalls,
12}
13
14pub fn batch() -> RpcBatch {
15    RpcBatch {
16        store: RpcCalls {
17            ..Default::default()
18        },
19    }
20}
21
22impl RpcBatch {
23    pub fn new() -> RpcBatch {
24        RpcBatch {
25            store: RpcCalls { calls: vec![] },
26        }
27    }
28
29    pub fn add<F: Function>(mut self, call: F, address: Vec<u8>) -> Self {
30        self.store.calls.push(RpcCall {
31            to_addr: address,
32            data: call.encode(),
33        });
34        self
35    }
36
37    pub fn execute(self) -> Result<RpcResponses, String> {
38        Ok(eth_call(&self.store))
39    }
40
41    pub fn decode<R, T: RPCDecodable<R> + Function>(response: &RpcResponse) -> Option<R> {
42        if response.failed {
43            return None;
44        }
45
46        match T::output(response.raw.as_ref()) {
47            Ok(data) => Some(data),
48            Err(err) => {
49                substreams::log::info!(
50                    "Call output for function `{}` failed to decode with error: {}",
51                    T::NAME,
52                    err
53                );
54                None
55            }
56        }
57    }
58}
59
60#[cfg_attr(not(target_arch = "wasm32"), allow(unused_variables))]
61fn eth_call_internal(input: Vec<u8>) -> Vec<u8> {
62    #[cfg(target_arch = "wasm32")]
63    unsafe {
64        use substreams::memory;
65
66        let rpc_response_ptr = memory::alloc(8);
67        crate::externs::rpc::eth_call(input.as_ptr(), input.len() as u32, rpc_response_ptr);
68        return memory::get_output_data(rpc_response_ptr);
69    }
70
71    #[cfg(not(target_arch = "wasm32"))]
72    unimplemented!("this method is not implemented outside of 'wasm32' target compilation")
73}
74
75pub fn eth_call(input: &RpcCalls) -> RpcResponses {
76    let raw_resp: Vec<u8> = eth_call_internal(proto::encode(input).unwrap());
77    let resp: RpcResponses = proto::decode(&raw_resp).unwrap();
78
79    return resp;
80}
81
82#[cfg_attr(not(target_arch = "wasm32"), allow(unused_variables))]
83fn eth_get_balance_internal(input: Vec<u8>) -> Vec<u8> {
84    #[cfg(target_arch = "wasm32")]
85    unsafe {
86        use substreams::memory;
87
88        let rpc_response_ptr = memory::alloc(8);
89        crate::externs::rpc::eth_get_balance(input.as_ptr(), input.len() as u32, rpc_response_ptr);
90        return memory::get_output_data(rpc_response_ptr);
91    }
92
93    #[cfg(not(target_arch = "wasm32"))]
94    unimplemented!("this method is not implemented outside of 'wasm32' target compilation")
95}
96
97pub fn eth_get_balance(requests: &RpcGetBalanceRequests) -> RpcGetBalanceResponses {
98    let raw_req = proto::encode(requests).expect("failed to encode RpcGetBalanceRequests");
99    let raw_resp: Vec<u8> = eth_get_balance_internal(raw_req);
100    proto::decode(&raw_resp).expect("failed to decode RpcGetBalanceResponses")
101}