use std::collections::VecDeque;
use crate::storage::codec::SolArrayReader;
use alloy::{
dyn_abi::{DecodedEvent, DynSolType, DynSolValue, Specifier, Word},
json_abi::Event,
};
use alloy_primitives::{Address, B256, I256, U256};
use anyhow::Result;
use arrow::array::{
ArrayRef, BinaryArray, BooleanArray, Int8Array, Int16Array, Int32Array, Int64Array,
RecordBatch, StringArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
#[derive(Debug, Clone)]
pub struct DecodedEventWithHeader {
pub log_block: u64,
pub log_index: u32,
pub log_address: Address,
pub event: DecodedEvent,
}
pub struct BatchEventDecoder {
topic_readers: Vec<Box<dyn SolArrayReader + Send + Sync>>,
data_readers: Vec<Box<dyn SolArrayReader + Send + Sync>>,
selector: Option<B256>,
}
impl BatchEventDecoder {
pub fn new(event: &Event) -> Self {
let topic_readers: Vec<Box<dyn SolArrayReader + Send + Sync>> = event
.inputs
.iter()
.filter_map(|param| {
if param.indexed {
Some(dyn_sol_type_to_reader(¶m.ty.resolve().unwrap()))
} else {
None
}
})
.collect();
let data_readers: Vec<Box<dyn SolArrayReader + Send + Sync>> = event
.inputs
.iter()
.filter_map(|param| {
if param.indexed {
None
} else {
Some(dyn_sol_type_to_reader(¶m.ty.resolve().unwrap()))
}
})
.collect();
BatchEventDecoder {
topic_readers,
data_readers,
selector: Some(event.selector()),
}
}
pub fn decode_row(&self, batch: &RecordBatch, row: usize) -> Result<DecodedEventWithHeader> {
let log_block = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.map(|col| col.value(row))
.ok_or_else(|| anyhow::anyhow!("Failed to decode log_block"))?;
let log_index = batch
.column(1)
.as_any()
.downcast_ref::<UInt32Array>()
.map(|col| col.value(row))
.ok_or_else(|| anyhow::anyhow!("Failed to decode log_index"))?;
let log_address = batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.map(|col| col.value(row).parse::<Address>().unwrap())
.ok_or_else(|| anyhow::anyhow!("Failed to decode log_address"))?;
let indexed: Vec<DynSolValue> = self
.topic_readers
.iter()
.enumerate()
.map(|(i, reader)| reader.get(batch.column(i + 3), row))
.collect();
let body: Vec<DynSolValue> = self
.data_readers
.iter()
.enumerate()
.map(|(i, reader)| reader.get(batch.column(i + 3 + self.topic_readers.len()), row))
.collect();
Ok(DecodedEventWithHeader {
log_block,
log_index,
log_address,
event: DecodedEvent {
selector: self.selector,
indexed,
body,
},
})
}
pub fn decode_batch_iter(
&self,
batch: &RecordBatch,
) -> impl Iterator<Item = Result<DecodedEventWithHeader>> {
(0..batch.num_rows()).map(|row| self.decode_row(batch, row))
}
pub fn par_decode_batch_deque(
&self,
batch: &RecordBatch,
) -> Result<VecDeque<DecodedEventWithHeader>> {
Ok((0..batch.num_rows())
.into_par_iter()
.map(|row| self.decode_row(batch, row).expect("Failed to decode row"))
.collect::<VecDeque<_>>())
}
}
fn dyn_sol_type_to_reader(ty: &DynSolType) -> Box<dyn SolArrayReader + Send + Sync> {
match ty {
DynSolType::Bool => Box::<SolArrayReaderBool>::default(),
DynSolType::Int(n) => {
if *n > 64 {
Box::new(SolArrayReaderIntStr { n: *n })
} else if *n > 32 {
Box::new(SolArrayReaderInt64 { n: *n })
} else if *n > 16 {
Box::new(SolArrayReaderInt32 { n: *n })
} else if *n > 8 {
Box::new(SolArrayReaderInt16 { n: *n })
} else {
Box::new(SolArrayReaderInt8 { n: *n })
}
}
DynSolType::Uint(n) => {
if *n > 64 {
Box::new(SolArrayReaderUintStr { n: *n })
} else if *n > 32 {
Box::new(SolArrayReaderUint64 { n: *n })
} else if *n > 16 {
Box::new(SolArrayReaderUint32 { n: *n })
} else if *n > 8 {
Box::new(SolArrayReaderUint16 { n: *n })
} else {
Box::new(SolArrayReaderUint8 { n: *n })
}
}
DynSolType::Address => Box::<SolArrayReaderAddress>::default(),
DynSolType::Bytes => Box::<SolArrayReaderBytes>::default(),
DynSolType::FixedBytes(n) => Box::new(SolArrayReaderFixedBytes { n: *n }),
_ => unimplemented!(
"Support for transcoding {ty} solidity type to arrow is not yet implemented",
),
}
}
pub struct SolArrayReaderIntStr {
n: usize,
}
impl SolArrayReader for SolArrayReaderIntStr {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(index)
.to_string(),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt64 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt64 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt32 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt32 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt16 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt16 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int16Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt8 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt8 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int8Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderUintStr {
n: usize,
}
impl SolArrayReader for SolArrayReaderUintStr {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
let input = array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(index);
DynSolValue::Uint(U256::from_str_radix(input, 10).unwrap(), self.n)
}
}
pub struct SolArrayReaderUint64 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint64 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
pub struct SolArrayReaderUint32 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint32 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
pub struct SolArrayReaderUint16 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint16 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
pub struct SolArrayReaderUint8 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint8 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
#[derive(Default)]
pub struct SolArrayReaderAddress {}
impl SolArrayReader for SolArrayReaderAddress {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Address(
array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(index)
.parse()
.unwrap(),
)
}
}
#[derive(Default)]
pub struct SolArrayReaderBytes {}
impl SolArrayReader for SolArrayReaderBytes {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Bytes(
array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(index)
.into(),
)
}
}
#[derive(Default)]
pub struct SolArrayReaderFixedBytes {
n: usize,
}
impl SolArrayReader for SolArrayReaderFixedBytes {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
let bytes = array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(index);
if bytes.len() != self.n {
panic!(
"Expected fixed bytes of length {}, got {}",
self.n,
bytes.len()
);
}
DynSolValue::FixedBytes(Word::from_slice(bytes), self.n)
}
}
#[derive(Default)]
pub struct SolArrayReaderBool {}
impl SolArrayReader for SolArrayReaderBool {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Bool(
array
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.value(index),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::codec::SolEventCodec;
use alloy::{
primitives::{Address, B256, LogData},
rpc::types::Log,
sol,
sol_types::JsonAbiExt,
};
sol! {
#[sol(abi)]
event Transfer(address indexed from, address indexed to, uint256 value);
}
fn transfer_log(block: u64, log_idx: u32, from: Address, to: Address, value: u64) -> Log {
let event = Transfer::abi();
let mut from_topic = [0u8; 32];
from_topic[12..].copy_from_slice(from.as_slice());
let mut to_topic = [0u8; 32];
to_topic[12..].copy_from_slice(to.as_slice());
let mut val_data = [0u8; 32];
val_data[24..].copy_from_slice(&value.to_be_bytes());
Log {
inner: alloy::primitives::Log {
address: Address::ZERO,
data: LogData::new(
vec![
event.selector(),
B256::from(from_topic),
B256::from(to_topic),
],
val_data.into(),
)
.unwrap(),
},
block_hash: None,
block_number: Some(block),
block_timestamp: None,
transaction_hash: None,
transaction_index: None,
log_index: Some(log_idx as u64),
removed: false,
}
}
#[test]
fn round_trip_single_row() -> anyhow::Result<()> {
let event = Transfer::abi();
let codec = SolEventCodec::new(&event)?;
let mut encoder = codec.new_encoder();
let from = "0x1111111111111111111111111111111111111111"
.parse::<Address>()
.unwrap();
let to = "0x2222222222222222222222222222222222222222"
.parse::<Address>()
.unwrap();
let log = transfer_log(42, 7, from, to, 999);
encoder.append(&log);
let batch = encoder.finish()?;
assert_eq!(batch.num_rows(), 1);
let decoded = codec.decoder.decode_row(&batch, 0)?;
assert_eq!(decoded.log_block, 42, "log_block");
assert_eq!(decoded.log_index, 7, "log_index");
assert_eq!(
decoded.event.indexed[0],
alloy::dyn_abi::DynSolValue::Address(from),
"indexed[0] = from"
);
assert_eq!(
decoded.event.indexed[1],
alloy::dyn_abi::DynSolValue::Address(to),
"indexed[1] = to"
);
assert!(
matches!(decoded.event.body[0], alloy::dyn_abi::DynSolValue::Uint(v, 256) if v == alloy_primitives::U256::from(999u64)),
"body[0] = value 999"
);
Ok(())
}
#[test]
fn round_trip_multi_row() -> anyhow::Result<()> {
let event = Transfer::abi();
let codec = SolEventCodec::new(&event)?;
let mut encoder = codec.new_encoder();
let addr_a = "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
.parse::<Address>()
.unwrap();
let addr_b = "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
.parse::<Address>()
.unwrap();
for i in 0u64..5 {
encoder.append(&transfer_log(100 + i, i as u32, addr_a, addr_b, i * 1000));
}
let batch = encoder.finish()?;
assert_eq!(batch.num_rows(), 5);
let rows: Vec<_> = codec
.decoder
.decode_batch_iter(&batch)
.collect::<anyhow::Result<_>>()?;
for (i, row) in rows.iter().enumerate() {
assert_eq!(row.log_block, 100 + i as u64, "row {i} log_block");
assert_eq!(row.log_index, i as u32, "row {i} log_index");
}
Ok(())
}
#[test]
fn empty_batch_produces_no_rows() -> anyhow::Result<()> {
let event = Transfer::abi();
let codec = SolEventCodec::new(&event)?;
let mut encoder = codec.new_encoder();
let batch = encoder.finish()?;
assert_eq!(batch.num_rows(), 0);
let rows: Vec<_> = codec
.decoder
.decode_batch_iter(&batch)
.collect::<anyhow::Result<_>>()?;
assert!(rows.is_empty(), "no rows from empty batch");
Ok(())
}
#[test]
fn wrong_column_type_returns_err() -> anyhow::Result<()> {
use arrow::array::{StringArray, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
let schema = Arc::new(Schema::new(vec![
Field::new("log_block", DataType::Utf8, false), Field::new("log_index", DataType::UInt32, false),
]));
let col0: Arc<dyn arrow::array::Array> = Arc::new(StringArray::from(vec!["not-a-u64"]));
let col1: Arc<dyn arrow::array::Array> = Arc::new(UInt32Array::from(vec![0u32]));
let batch = arrow::array::RecordBatch::try_new(schema, vec![col0, col1])?;
let event = Transfer::abi();
let decoder = BatchEventDecoder::new(&event);
let result = decoder.decode_row(&batch, 0);
assert!(
result.is_err(),
"wrong column type must return Err, not panic"
);
Ok(())
}
#[test]
fn par_decode_matches_serial() -> anyhow::Result<()> {
let event = Transfer::abi();
let codec = SolEventCodec::new(&event)?;
let mut encoder = codec.new_encoder();
let a = "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
.parse::<Address>()
.unwrap();
let b = "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
.parse::<Address>()
.unwrap();
for i in 0u64..3 {
encoder.append(&transfer_log(200 + i, i as u32, a, b, i));
}
let batch = encoder.finish()?;
let serial: Vec<_> = codec
.decoder
.decode_batch_iter(&batch)
.collect::<anyhow::Result<_>>()?;
let parallel: Vec<_> = codec
.decoder
.par_decode_batch_deque(&batch)?
.into_iter()
.collect();
assert_eq!(serial.len(), parallel.len());
for (s, p) in serial.iter().zip(parallel.iter()) {
assert_eq!(s.log_block, p.log_block);
assert_eq!(s.log_index, p.log_index);
}
Ok(())
}
}