use std::collections::VecDeque;
use crate::storage::codec::SolArrayReader;
use alloy::{
dyn_abi::{DecodedEvent, DynSolType, DynSolValue, Specifier, Word},
json_abi::Event,
};
use alloy_primitives::{Address, B256, I256, U256};
use anyhow::Result;
use arrow::array::{
ArrayRef, BinaryArray, BooleanArray, Int8Array, Int16Array, Int32Array, Int64Array,
RecordBatch, StringArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
#[derive(Debug, Clone)]
pub struct DecodedEventWithHeader {
pub log_block: u64,
pub log_index: u32,
pub log_address: Address,
pub event: DecodedEvent,
}
pub struct BatchEventDecoder {
topic_readers: Vec<Box<dyn SolArrayReader + Send + Sync>>,
data_readers: Vec<Box<dyn SolArrayReader + Send + Sync>>,
selector: Option<B256>,
}
impl BatchEventDecoder {
pub fn new(event: &Event) -> Self {
let topic_readers: Vec<Box<dyn SolArrayReader + Send + Sync>> = event
.inputs
.iter()
.filter_map(|param| {
if param.indexed {
Some(dyn_sol_type_to_reader(¶m.ty.resolve().unwrap()))
} else {
None
}
})
.collect();
let data_readers: Vec<Box<dyn SolArrayReader + Send + Sync>> = event
.inputs
.iter()
.filter_map(|param| {
if param.indexed {
None
} else {
Some(dyn_sol_type_to_reader(¶m.ty.resolve().unwrap()))
}
})
.collect();
BatchEventDecoder {
topic_readers,
data_readers,
selector: Some(event.selector()),
}
}
pub fn decode_row(&self, batch: &RecordBatch, row: usize) -> Result<DecodedEventWithHeader> {
let log_block = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.map(|col| col.value(row))
.ok_or_else(|| anyhow::anyhow!("Failed to decode log_block"))?;
let log_index = batch
.column(1)
.as_any()
.downcast_ref::<UInt32Array>()
.map(|col| col.value(row))
.ok_or_else(|| anyhow::anyhow!("Failed to decode log_index"))?;
let log_address = batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.map(|col| col.value(row).parse::<Address>().unwrap())
.ok_or_else(|| anyhow::anyhow!("Failed to decode log_address"))?;
let indexed: Vec<DynSolValue> = self
.topic_readers
.iter()
.enumerate()
.map(|(i, reader)| reader.get(batch.column(i + 3), row))
.collect();
let body: Vec<DynSolValue> = self
.data_readers
.iter()
.enumerate()
.map(|(i, reader)| reader.get(batch.column(i + 3 + self.topic_readers.len()), row))
.collect();
Ok(DecodedEventWithHeader {
log_block,
log_index,
log_address,
event: DecodedEvent {
selector: self.selector,
indexed,
body,
},
})
}
pub fn decode_batch_iter(
&self,
batch: &RecordBatch,
) -> impl Iterator<Item = Result<DecodedEventWithHeader>> {
(0..batch.num_rows()).map(|row| self.decode_row(batch, row))
}
pub fn par_decode_batch_deque(
&self,
batch: &RecordBatch,
) -> Result<VecDeque<DecodedEventWithHeader>> {
Ok((0..batch.num_rows())
.into_par_iter()
.map(|row| self.decode_row(batch, row).expect("Failed to decode row"))
.collect::<VecDeque<_>>())
}
}
fn dyn_sol_type_to_reader(ty: &DynSolType) -> Box<dyn SolArrayReader + Send + Sync> {
match ty {
DynSolType::Bool => Box::<SolArrayReaderBool>::default(),
DynSolType::Int(n) => {
if *n > 64 {
Box::new(SolArrayReaderIntStr { n: *n })
} else if *n > 32 {
Box::new(SolArrayReaderInt64 { n: *n })
} else if *n > 16 {
Box::new(SolArrayReaderInt32 { n: *n })
} else if *n > 8 {
Box::new(SolArrayReaderInt16 { n: *n })
} else {
Box::new(SolArrayReaderInt8 { n: *n })
}
}
DynSolType::Uint(n) => {
if *n > 64 {
Box::new(SolArrayReaderUintStr { n: *n })
} else if *n > 32 {
Box::new(SolArrayReaderUint64 { n: *n })
} else if *n > 16 {
Box::new(SolArrayReaderUint32 { n: *n })
} else if *n > 8 {
Box::new(SolArrayReaderUint16 { n: *n })
} else {
Box::new(SolArrayReaderUint8 { n: *n })
}
}
DynSolType::Address => Box::<SolArrayReaderAddress>::default(),
DynSolType::Bytes => Box::<SolArrayReaderBytes>::default(),
DynSolType::FixedBytes(n) => Box::new(SolArrayReaderFixedBytes { n: *n }),
_ => unimplemented!(
"Support for transcoding {ty} solidity type to arrow is not yet implemented",
),
}
}
pub struct SolArrayReaderIntStr {
n: usize,
}
impl SolArrayReader for SolArrayReaderIntStr {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(index)
.to_string(),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt64 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt64 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt32 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt32 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt16 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt16 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int16Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderInt8 {
n: usize,
}
impl SolArrayReader for SolArrayReaderInt8 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Int(
I256::try_from(
array
.as_any()
.downcast_ref::<Int8Array>()
.unwrap()
.value(index),
)
.unwrap(),
self.n,
)
}
}
pub struct SolArrayReaderUintStr {
n: usize,
}
impl SolArrayReader for SolArrayReaderUintStr {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
let input = array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(index);
DynSolValue::Uint(U256::from_str_radix(input, 10).unwrap(), self.n)
}
}
pub struct SolArrayReaderUint64 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint64 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
pub struct SolArrayReaderUint32 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint32 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
pub struct SolArrayReaderUint16 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint16 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
pub struct SolArrayReaderUint8 {
n: usize,
}
impl SolArrayReader for SolArrayReaderUint8 {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Uint(
U256::from(
array
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap()
.value(index),
),
self.n,
)
}
}
#[derive(Default)]
pub struct SolArrayReaderAddress {}
impl SolArrayReader for SolArrayReaderAddress {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Address(
array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(index)
.parse()
.unwrap(),
)
}
}
#[derive(Default)]
pub struct SolArrayReaderBytes {}
impl SolArrayReader for SolArrayReaderBytes {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Bytes(
array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(index)
.into(),
)
}
}
#[derive(Default)]
pub struct SolArrayReaderFixedBytes {
n: usize,
}
impl SolArrayReader for SolArrayReaderFixedBytes {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
let bytes = array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(index);
if bytes.len() != self.n {
panic!(
"Expected fixed bytes of length {}, got {}",
self.n,
bytes.len()
);
}
DynSolValue::FixedBytes(Word::from_slice(bytes), self.n)
}
}
#[derive(Default)]
pub struct SolArrayReaderBool {}
impl SolArrayReader for SolArrayReaderBool {
fn get(&self, array: &ArrayRef, index: usize) -> DynSolValue {
DynSolValue::Bool(
array
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.value(index),
)
}
}