use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::error::IndexerError;
use crate::types::IndexContext;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CallType {
Call,
DelegateCall,
StaticCall,
Create,
Create2,
SelfDestruct,
}
impl CallType {
pub fn from_geth(s: &str) -> Option<Self> {
match s.to_uppercase().as_str() {
"CALL" => Some(Self::Call),
"DELEGATECALL" => Some(Self::DelegateCall),
"STATICCALL" => Some(Self::StaticCall),
"CREATE" => Some(Self::Create),
"CREATE2" => Some(Self::Create2),
"SELFDESTRUCT" => Some(Self::SelfDestruct),
_ => None,
}
}
pub fn from_parity(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"call" => Some(Self::Call),
"delegatecall" => Some(Self::DelegateCall),
"staticcall" => Some(Self::StaticCall),
"create" => Some(Self::Create),
"create2" => Some(Self::Create2),
"suicide" | "selfdestruct" => Some(Self::SelfDestruct),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CallTrace {
pub call_type: CallType,
pub from: String,
pub to: String,
pub value: String,
pub gas_used: u64,
pub input: String,
pub output: String,
pub function_selector: Option<String>,
pub depth: u32,
pub block_number: u64,
pub tx_hash: String,
pub tx_index: u32,
pub trace_index: u32,
pub error: Option<String>,
pub reverted: bool,
}
impl CallTrace {
pub fn extract_selector(input: &str) -> Option<String> {
let hex = input.strip_prefix("0x").unwrap_or(input);
if hex.len() >= 8 {
Some(format!("0x{}", &hex[..8].to_lowercase()))
} else {
None
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TraceFilter {
pub addresses: HashSet<String>,
pub selectors: HashSet<String>,
pub call_types: HashSet<CallType>,
pub exclude_reverted: bool,
pub min_depth: Option<u32>,
pub max_depth: Option<u32>,
}
impl TraceFilter {
pub fn new() -> Self {
Self::default()
}
pub fn with_address(mut self, addr: impl Into<String>) -> Self {
self.addresses.insert(addr.into().to_lowercase());
self
}
pub fn with_selector(mut self, selector: impl Into<String>) -> Self {
self.selectors.insert(selector.into().to_lowercase());
self
}
pub fn with_call_type(mut self, call_type: CallType) -> Self {
self.call_types.insert(call_type);
self
}
pub fn exclude_reverted(mut self, exclude: bool) -> Self {
self.exclude_reverted = exclude;
self
}
pub fn min_depth(mut self, depth: u32) -> Self {
self.min_depth = Some(depth);
self
}
pub fn max_depth(mut self, depth: u32) -> Self {
self.max_depth = Some(depth);
self
}
pub fn matches(&self, trace: &CallTrace) -> bool {
if self.exclude_reverted && trace.reverted {
return false;
}
if !self.call_types.is_empty() && !self.call_types.contains(&trace.call_type) {
return false;
}
if !self.addresses.is_empty() {
let from_lower = trace.from.to_lowercase();
let to_lower = trace.to.to_lowercase();
if !self.addresses.contains(&from_lower) && !self.addresses.contains(&to_lower) {
return false;
}
}
if !self.selectors.is_empty() {
match &trace.function_selector {
Some(sel) => {
if !self.selectors.contains(&sel.to_lowercase()) {
return false;
}
}
None => return false,
}
}
if let Some(min) = self.min_depth {
if trace.depth < min {
return false;
}
}
if let Some(max) = self.max_depth {
if trace.depth > max {
return false;
}
}
true
}
}
#[async_trait]
pub trait TraceHandler: Send + Sync {
async fn handle_trace(&self, trace: &CallTrace, ctx: &IndexContext)
-> Result<(), IndexerError>;
fn name(&self) -> &str;
}
struct TraceEntry {
handler: Arc<dyn TraceHandler>,
filter: TraceFilter,
}
pub struct TraceRegistry {
entries: Vec<TraceEntry>,
}
impl TraceRegistry {
pub fn new() -> Self {
Self {
entries: Vec::new(),
}
}
pub fn register(&mut self, handler: Arc<dyn TraceHandler>, filter: TraceFilter) {
self.entries.push(TraceEntry { handler, filter });
}
pub async fn dispatch(
&self,
trace: &CallTrace,
ctx: &IndexContext,
) -> Result<(), IndexerError> {
for entry in &self.entries {
if entry.filter.matches(trace) {
entry.handler.handle_trace(trace, ctx).await.map_err(|e| {
IndexerError::Handler {
handler: entry.handler.name().to_string(),
reason: e.to_string(),
}
})?;
}
}
Ok(())
}
pub async fn dispatch_batch(
&self,
traces: &[CallTrace],
ctx: &IndexContext,
) -> Result<(), IndexerError> {
for trace in traces {
self.dispatch(trace, ctx).await?;
}
Ok(())
}
pub fn handler_count(&self) -> usize {
self.entries.len()
}
}
impl Default for TraceRegistry {
fn default() -> Self {
Self::new()
}
}
pub fn parse_geth_traces(
json: &serde_json::Value,
block_number: u64,
) -> Result<Vec<CallTrace>, IndexerError> {
let results = json
.as_array()
.ok_or_else(|| IndexerError::Rpc("expected array of trace results".into()))?;
let mut traces = Vec::new();
for (tx_index, entry) in results.iter().enumerate() {
let tx_hash = entry
.get("txHash")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let result = entry.get("result").unwrap_or(entry);
let mut trace_index: u32 = 0;
flatten_geth_call(
result,
block_number,
&tx_hash,
tx_index as u32,
0, false, &mut trace_index,
&mut traces,
);
}
Ok(traces)
}
#[allow(clippy::too_many_arguments)]
fn flatten_geth_call(
node: &serde_json::Value,
block_number: u64,
tx_hash: &str,
tx_index: u32,
depth: u32,
parent_reverted: bool,
trace_index: &mut u32,
out: &mut Vec<CallTrace>,
) {
let call_type_str = node.get("type").and_then(|v| v.as_str()).unwrap_or("CALL");
let call_type = CallType::from_geth(call_type_str).unwrap_or(CallType::Call);
let from = node
.get("from")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_lowercase();
let to = node
.get("to")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_lowercase();
let value = node
.get("value")
.and_then(|v| v.as_str())
.unwrap_or("0x0")
.to_string();
let gas_used = node
.get("gasUsed")
.and_then(|v| v.as_str())
.and_then(|s| u64::from_str_radix(s.strip_prefix("0x").unwrap_or(s), 16).ok())
.unwrap_or(0);
let input = node
.get("input")
.and_then(|v| v.as_str())
.unwrap_or("0x")
.to_string();
let output = node
.get("output")
.and_then(|v| v.as_str())
.unwrap_or("0x")
.to_string();
let error = node.get("error").and_then(|v| v.as_str()).map(String::from);
let reverted = parent_reverted || error.is_some();
let function_selector = CallTrace::extract_selector(&input);
let current_index = *trace_index;
*trace_index += 1;
out.push(CallTrace {
call_type,
from,
to,
value,
gas_used,
input,
output,
function_selector,
depth,
block_number,
tx_hash: tx_hash.to_string(),
tx_index,
trace_index: current_index,
error,
reverted,
});
if let Some(calls) = node.get("calls").and_then(|v| v.as_array()) {
for child in calls {
flatten_geth_call(
child,
block_number,
tx_hash,
tx_index,
depth + 1,
reverted,
trace_index,
out,
);
}
}
}
pub fn parse_parity_traces(
json: &serde_json::Value,
block_number: u64,
) -> Result<Vec<CallTrace>, IndexerError> {
let traces_arr = json
.as_array()
.ok_or_else(|| IndexerError::Rpc("expected array of parity traces".into()))?;
let mut traces = Vec::new();
for (i, entry) in traces_arr.iter().enumerate() {
let action = entry.get("action").unwrap_or(entry);
let trace_type = entry.get("type").and_then(|v| v.as_str()).unwrap_or("call");
let call_type = CallType::from_parity(trace_type).unwrap_or(CallType::Call);
let from = action
.get("from")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_lowercase();
let to = action
.get("to")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_lowercase();
let value = action
.get("value")
.and_then(|v| v.as_str())
.unwrap_or("0x0")
.to_string();
let gas_used = entry
.get("result")
.and_then(|r| r.get("gasUsed"))
.and_then(|v| v.as_str())
.and_then(|s| u64::from_str_radix(s.strip_prefix("0x").unwrap_or(s), 16).ok())
.unwrap_or(0);
let input = action
.get("input")
.and_then(|v| v.as_str())
.unwrap_or("0x")
.to_string();
let output = entry
.get("result")
.and_then(|r| r.get("output"))
.and_then(|v| v.as_str())
.unwrap_or("0x")
.to_string();
let tx_hash = entry
.get("transactionHash")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tx_index = entry
.get("transactionPosition")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
let depth = entry
.get("traceAddress")
.and_then(|v| v.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
let error_str = entry
.get("error")
.and_then(|v| v.as_str())
.map(String::from);
let reverted = error_str.is_some();
let function_selector = CallTrace::extract_selector(&input);
traces.push(CallTrace {
call_type,
from,
to,
value,
gas_used,
input,
output,
function_selector,
depth,
block_number,
tx_hash,
tx_index,
trace_index: i as u32,
error: error_str,
reverted,
});
}
Ok(traces)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
fn dummy_ctx() -> IndexContext {
IndexContext {
block: crate::types::BlockSummary {
number: 1,
hash: "0xa".into(),
parent_hash: "0x0".into(),
timestamp: 0,
tx_count: 0,
},
phase: crate::types::IndexPhase::Backfill,
chain: "ethereum".into(),
}
}
fn make_trace(
call_type: CallType,
from: &str,
to: &str,
selector: Option<&str>,
depth: u32,
reverted: bool,
) -> CallTrace {
let input = match selector {
Some(sel) => format!("{}0000000000000000", sel),
None => "0x".to_string(),
};
let function_selector = CallTrace::extract_selector(&input);
CallTrace {
call_type,
from: from.to_lowercase(),
to: to.to_lowercase(),
value: "0x0".into(),
gas_used: 21000,
input,
output: "0x".into(),
function_selector,
depth,
block_number: 100,
tx_hash: "0xtxhash".into(),
tx_index: 0,
trace_index: 0,
error: if reverted {
Some("execution reverted".into())
} else {
None
},
reverted,
}
}
#[test]
fn call_type_from_geth() {
assert_eq!(CallType::from_geth("CALL"), Some(CallType::Call));
assert_eq!(
CallType::from_geth("DELEGATECALL"),
Some(CallType::DelegateCall)
);
assert_eq!(
CallType::from_geth("STATICCALL"),
Some(CallType::StaticCall)
);
assert_eq!(CallType::from_geth("CREATE"), Some(CallType::Create));
assert_eq!(CallType::from_geth("CREATE2"), Some(CallType::Create2));
assert_eq!(
CallType::from_geth("SELFDESTRUCT"),
Some(CallType::SelfDestruct)
);
assert_eq!(CallType::from_geth("UNKNOWN"), None);
}
#[test]
fn call_type_from_parity() {
assert_eq!(CallType::from_parity("call"), Some(CallType::Call));
assert_eq!(
CallType::from_parity("delegatecall"),
Some(CallType::DelegateCall)
);
assert_eq!(
CallType::from_parity("suicide"),
Some(CallType::SelfDestruct)
);
assert_eq!(
CallType::from_parity("selfdestruct"),
Some(CallType::SelfDestruct)
);
assert_eq!(CallType::from_parity("create"), Some(CallType::Create));
}
#[test]
fn function_selector_extraction() {
assert_eq!(
CallTrace::extract_selector("0xa9059cbb0000000000000000000000001234"),
Some("0xa9059cbb".into())
);
assert_eq!(CallTrace::extract_selector("0x"), None);
assert_eq!(CallTrace::extract_selector("0xabcd"), None); assert_eq!(
CallTrace::extract_selector("0xA9059CBB"),
Some("0xa9059cbb".into()) );
}
#[test]
fn filter_matches_all_by_default() {
let filter = TraceFilter::new();
let trace = make_trace(
CallType::Call,
"0xaaa",
"0xbbb",
Some("0xa9059cbb"),
0,
false,
);
assert!(filter.matches(&trace));
}
#[test]
fn filter_by_address() {
let filter = TraceFilter::new().with_address("0xaaa");
let t1 = make_trace(
CallType::Call,
"0xaaa",
"0xbbb",
Some("0xa9059cbb"),
0,
false,
);
assert!(filter.matches(&t1));
let t2 = make_trace(
CallType::Call,
"0xbbb",
"0xaaa",
Some("0xa9059cbb"),
0,
false,
);
assert!(filter.matches(&t2));
let t3 = make_trace(
CallType::Call,
"0xbbb",
"0xccc",
Some("0xa9059cbb"),
0,
false,
);
assert!(!filter.matches(&t3));
}
#[test]
fn filter_by_selector() {
let filter = TraceFilter::new().with_selector("0xa9059cbb");
let t1 = make_trace(
CallType::Call,
"0xaaa",
"0xbbb",
Some("0xa9059cbb"),
0,
false,
);
assert!(filter.matches(&t1));
let t2 = make_trace(
CallType::Call,
"0xaaa",
"0xbbb",
Some("0x12345678"),
0,
false,
);
assert!(!filter.matches(&t2));
let t3 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, false);
assert!(!filter.matches(&t3));
}
#[test]
fn filter_by_call_type() {
let filter = TraceFilter::new().with_call_type(CallType::Create);
let t1 = make_trace(CallType::Create, "0xaaa", "0xbbb", None, 0, false);
assert!(filter.matches(&t1));
let t2 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, false);
assert!(!filter.matches(&t2));
}
#[test]
fn filter_exclude_reverted() {
let filter = TraceFilter::new().exclude_reverted(true);
let t1 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, false);
assert!(filter.matches(&t1));
let t2 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, true);
assert!(!filter.matches(&t2));
}
#[test]
fn filter_by_depth() {
let filter = TraceFilter::new().min_depth(1).max_depth(3);
let t0 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, false);
assert!(!filter.matches(&t0));
let t1 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 1, false);
assert!(filter.matches(&t1));
let t3 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 3, false);
assert!(filter.matches(&t3));
let t4 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 4, false);
assert!(!filter.matches(&t4)); }
struct CountingHandler {
count: Arc<AtomicU32>,
handler_name: String,
}
#[async_trait]
impl TraceHandler for CountingHandler {
async fn handle_trace(
&self,
_trace: &CallTrace,
_ctx: &IndexContext,
) -> Result<(), IndexerError> {
self.count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn name(&self) -> &str {
&self.handler_name
}
}
#[tokio::test]
async fn dispatch_to_matching_handler() {
let count = Arc::new(AtomicU32::new(0));
let handler = Arc::new(CountingHandler {
count: count.clone(),
handler_name: "test_handler".into(),
});
let mut registry = TraceRegistry::new();
registry.register(handler, TraceFilter::new().with_call_type(CallType::Create));
let ctx = dummy_ctx();
let t1 = make_trace(CallType::Create, "0xaaa", "0xbbb", None, 0, false);
registry.dispatch(&t1, &ctx).await.unwrap();
let t2 = make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, false);
registry.dispatch(&t2, &ctx).await.unwrap();
assert_eq!(count.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn dispatch_batch() {
let count = Arc::new(AtomicU32::new(0));
let handler = Arc::new(CountingHandler {
count: count.clone(),
handler_name: "batch_handler".into(),
});
let mut registry = TraceRegistry::new();
registry.register(handler, TraceFilter::new());
let ctx = dummy_ctx();
let traces = vec![
make_trace(CallType::Call, "0xaaa", "0xbbb", None, 0, false),
make_trace(CallType::Create, "0xaaa", "0xbbb", None, 0, false),
make_trace(CallType::DelegateCall, "0xaaa", "0xbbb", None, 0, false),
];
registry.dispatch_batch(&traces, &ctx).await.unwrap();
assert_eq!(count.load(Ordering::Relaxed), 3);
}
#[test]
fn parse_geth_trace_basic() {
let json = serde_json::json!([
{
"txHash": "0xabc123",
"result": {
"type": "CALL",
"from": "0xSender",
"to": "0xReceiver",
"value": "0xde0b6b3a7640000",
"gasUsed": "0x5208",
"input": "0xa9059cbb0000000000000000000000001234",
"output": "0x0000000000000000000000000000000000000001",
"calls": [
{
"type": "DELEGATECALL",
"from": "0xReceiver",
"to": "0xImpl",
"value": "0x0",
"gasUsed": "0x1000",
"input": "0xa9059cbb0000",
"output": "0x01"
}
]
}
}
]);
let traces = parse_geth_traces(&json, 12345).unwrap();
assert_eq!(traces.len(), 2);
assert_eq!(traces[0].call_type, CallType::Call);
assert_eq!(traces[0].from, "0xsender");
assert_eq!(traces[0].to, "0xreceiver");
assert_eq!(traces[0].depth, 0);
assert_eq!(traces[0].block_number, 12345);
assert_eq!(traces[0].tx_hash, "0xabc123");
assert_eq!(traces[0].gas_used, 0x5208);
assert_eq!(traces[0].function_selector, Some("0xa9059cbb".into()));
assert!(!traces[0].reverted);
assert_eq!(traces[0].trace_index, 0);
assert_eq!(traces[1].call_type, CallType::DelegateCall);
assert_eq!(traces[1].depth, 1);
assert_eq!(traces[1].trace_index, 1);
}
#[test]
fn parse_geth_trace_with_error() {
let json = serde_json::json!([
{
"txHash": "0xfailed",
"result": {
"type": "CALL",
"from": "0xSender",
"to": "0xReceiver",
"value": "0x0",
"gasUsed": "0x5208",
"input": "0x",
"output": "0x",
"error": "execution reverted",
"calls": [
{
"type": "CALL",
"from": "0xReceiver",
"to": "0xInner",
"value": "0x0",
"gasUsed": "0x100",
"input": "0x",
"output": "0x"
}
]
}
}
]);
let traces = parse_geth_traces(&json, 100).unwrap();
assert_eq!(traces.len(), 2);
assert!(traces[0].reverted);
assert_eq!(traces[0].error, Some("execution reverted".into()));
assert!(traces[1].reverted);
}
#[test]
fn parse_parity_trace_basic() {
let json = serde_json::json!([
{
"action": {
"from": "0xSender",
"to": "0xReceiver",
"value": "0xde0b6b3a7640000",
"input": "0xa9059cbb0000000000000000000000001234"
},
"result": {
"gasUsed": "0x5208",
"output": "0x0001"
},
"transactionHash": "0xparity_tx",
"transactionPosition": 0,
"traceAddress": [],
"type": "call"
},
{
"action": {
"from": "0xReceiver",
"to": "0xInner",
"value": "0x0",
"input": "0x12345678aabbccdd"
},
"result": {
"gasUsed": "0x1000",
"output": "0x"
},
"transactionHash": "0xparity_tx",
"transactionPosition": 0,
"traceAddress": [0],
"type": "call"
}
]);
let traces = parse_parity_traces(&json, 999).unwrap();
assert_eq!(traces.len(), 2);
assert_eq!(traces[0].call_type, CallType::Call);
assert_eq!(traces[0].from, "0xsender");
assert_eq!(traces[0].to, "0xreceiver");
assert_eq!(traces[0].depth, 0);
assert_eq!(traces[0].block_number, 999);
assert_eq!(traces[0].tx_hash, "0xparity_tx");
assert_eq!(traces[0].function_selector, Some("0xa9059cbb".into()));
assert_eq!(traces[1].depth, 1);
assert_eq!(traces[1].function_selector, Some("0x12345678".into()));
}
#[test]
fn parse_parity_trace_create() {
let json = serde_json::json!([
{
"action": {
"from": "0xDeployer",
"value": "0x0",
"init": "0x6080604052"
},
"result": {
"address": "0xNewContract",
"gasUsed": "0x30000",
"code": "0x6080"
},
"transactionHash": "0xcreate_tx",
"transactionPosition": 1,
"traceAddress": [],
"type": "create"
}
]);
let traces = parse_parity_traces(&json, 500).unwrap();
assert_eq!(traces.len(), 1);
assert_eq!(traces[0].call_type, CallType::Create);
assert_eq!(traces[0].from, "0xdeployer");
}
#[test]
fn parse_parity_trace_with_error() {
let json = serde_json::json!([
{
"action": {
"from": "0xSender",
"to": "0xReceiver",
"value": "0x0",
"input": "0x"
},
"transactionHash": "0xfail_tx",
"transactionPosition": 0,
"traceAddress": [],
"type": "call",
"error": "out of gas"
}
]);
let traces = parse_parity_traces(&json, 200).unwrap();
assert_eq!(traces.len(), 1);
assert!(traces[0].reverted);
assert_eq!(traces[0].error, Some("out of gas".into()));
}
#[test]
fn geth_trace_depth_tracking() {
let json = serde_json::json!([
{
"txHash": "0xdeep",
"result": {
"type": "CALL",
"from": "0xa",
"to": "0xb",
"value": "0x0",
"gasUsed": "0x100",
"input": "0x",
"output": "0x",
"calls": [
{
"type": "CALL",
"from": "0xb",
"to": "0xc",
"value": "0x0",
"gasUsed": "0x50",
"input": "0x",
"output": "0x",
"calls": [
{
"type": "STATICCALL",
"from": "0xc",
"to": "0xd",
"value": "0x0",
"gasUsed": "0x20",
"input": "0x",
"output": "0x"
}
]
}
]
}
}
]);
let traces = parse_geth_traces(&json, 1).unwrap();
assert_eq!(traces.len(), 3);
assert_eq!(traces[0].depth, 0);
assert_eq!(traces[1].depth, 1);
assert_eq!(traces[2].depth, 2);
assert_eq!(traces[2].call_type, CallType::StaticCall);
}
#[test]
fn combined_filter_all_criteria() {
let filter = TraceFilter::new()
.with_address("0xaaa")
.with_call_type(CallType::Call)
.with_selector("0xa9059cbb")
.exclude_reverted(true);
let t1 = make_trace(
CallType::Call,
"0xaaa",
"0xbbb",
Some("0xa9059cbb"),
0,
false,
);
assert!(filter.matches(&t1));
let t2 = make_trace(
CallType::Create,
"0xaaa",
"0xbbb",
Some("0xa9059cbb"),
0,
false,
);
assert!(!filter.matches(&t2));
let t3 = make_trace(
CallType::Call,
"0xzzz",
"0xbbb",
Some("0xa9059cbb"),
0,
false,
);
assert!(!filter.matches(&t3));
let t4 = make_trace(
CallType::Call,
"0xaaa",
"0xbbb",
Some("0xa9059cbb"),
0,
true,
);
assert!(!filter.matches(&t4));
}
}