ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::sync::Arc;

use crate::storage::codec::{decoder::BatchEventDecoder, encoder::BatchEventEncoder};
use alloy::{dyn_abi::DynSolValue, hex::ToHexExt, json_abi::Event};
use anyhow::Result;
use arrow::{
    array::{Array, ArrayRef},
    datatypes::SchemaRef,
};
use parquet::{arrow::ArrowSchemaConverter, schema::types::SchemaDescriptor};

pub mod decoder;
pub mod encoder;
pub mod schema;

// Accumulator for Solidity types to Arrow data arrays (used for accumulating event data in a RecordBatch)
pub trait SolArrayBuilder {
    fn append_value(&mut self, value: &DynSolValue);
    fn len(&self) -> usize;
    fn finish(&mut self) -> Arc<dyn Array>;
    fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

// Dynamic reader for Solidity arrays
pub trait SolArrayReader {
    fn get(&self, col: &ArrayRef, index: usize) -> DynSolValue;
}

pub struct SolEventCodec {
    pub event: Event,
    pub schema: SchemaRef,
    pub encoder: BatchEventEncoder,
    pub decoder: BatchEventDecoder,
}

impl SolEventCodec {
    pub fn new(event: &Event) -> Result<Self> {
        let schema = schema::create_schema(event);
        let decoder = BatchEventDecoder::new(event);
        let encoder = BatchEventEncoder::new(event, schema.clone())?;
        Ok(SolEventCodec {
            event: event.clone(),
            schema,
            encoder,
            decoder,
        })
    }

    pub fn new_encoder(&self) -> BatchEventEncoder {
        BatchEventEncoder::new(&self.event, self.schema.clone()).unwrap()
    }

    pub fn event_id(&self) -> String {
        format!(
            "{}-{}",
            self.event.name.to_lowercase(),
            &self.event.selector().encode_hex_with_prefix()[0..10]
        )
    }

    // pub fn path(&self, base_uri: impl AsRef<str>) -> String {
    //     format!("{}/{}", base_uri.as_ref(), self.event_id())
    // }

    pub fn parquet_schema(&self) -> SchemaDescriptor {
        ArrowSchemaConverter::new()
            .with_coerce_types(false)
            .convert(&self.schema)
            .expect("Failed to convert Arrow schema to Parquet schema")
    }
}

impl TryInto<SolEventCodec> for &Event {
    type Error = anyhow::Error;
    fn try_into(self) -> Result<SolEventCodec> {
        SolEventCodec::new(self)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use alloy::{sol, sol_types::JsonAbiExt};

    sol! {
        #[sol(abi)]
        #[derive(Debug, Default)]
        event PoolCreated(address indexed token0, address indexed token1, bool indexed stable, address pool, uint256 count);
        #[sol(abi)]
        event SetCustomFee(address indexed pool, uint256 fee);
    }

    #[test]
    fn test_sol_event_codec() -> Result<()> {
        let codec = SolEventCodec::new(&PoolCreated::abi())?;
        assert_eq!(codec.schema.fields().len(), 8); // 3 header + 5 event params
        Ok(())
    }

    /// Regression test for BigIntConversionError panic when encoding Mint events
    /// from non-standard pools (e.g. unsupported factory on Base).
    ///
    /// Tx: 0x3acf6cb136fa7f1756f5e699c8d8056ecc0748f4b92ee00431f0e14a6ce03f09
    /// Pool: 0x0ed46f2668a6394ffa806713a306a51b998eb070 (unsupported factory)
    ///
    /// The int24 tickLower/tickUpper fields decoded to values that overflowed
    /// when using the panicking `as_i*()` conversion on I256.
    #[test]
    fn test_encoder_no_panic_on_oversized_int_values() -> Result<()> {
        use alloy::primitives::{Address, B256, LogData};
        use alloy::rpc::types::Log;

        sol! {
            #[sol(abi)]
            event Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1);
        }

        let event = Mint::abi();
        let schema = super::schema::create_schema(&event);
        let mut encoder = super::encoder::BatchEventEncoder::new(&event, schema)?;

        // Construct a Mint log reproducing the real crash. The bad pool stuffed a
        // 160-bit address into the indexed int24 tickLower topic, which overflows
        // i32 and previously caused a BigIntConversionError panic in as_i32().
        //
        // Non-indexed fields: sender(address), amount(uint128), amount0(uint256), amount1(uint256)
        let data_bytes = [0u8; 128]; // 4 x 32-byte zero words

        let log = Log {
            inner: alloy::primitives::Log {
                address: "0x0ed46f2668a6394ffa806713a306a51b998eb070"
                    .parse::<Address>()
                    .unwrap(),
                data: LogData::new(
                    vec![
                        // topic[0]: Mint event selector
                        "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
                            .parse::<B256>()
                            .unwrap(),
                        // topic[1]: owner (indexed address)
                        "0x000000000000000000000000a9b1758a3e74e440c774e4cd7d7c00d9c5d27859"
                            .parse::<B256>()
                            .unwrap(),
                        // topic[2]: tickLower (indexed int24) - address stuffed in, overflows i32
                        "0x000000000000000000000000a9b1758a3e74e440c774e4cd7d7c00d9c5d27859"
                            .parse::<B256>()
                            .unwrap(),
                        // topic[3]: tickUpper (indexed int24) - valid small value
                        "0x0000000000000000000000000000000000000000000000000000000000032640"
                            .parse::<B256>()
                            .unwrap(),
                    ],
                    data_bytes.into(),
                )
                .unwrap(),
            },
            block_hash: None,
            block_number: Some(42601282),
            block_timestamp: None,
            transaction_hash: None,
            transaction_index: None,
            log_index: Some(429),
            removed: false,
        };

        // This previously panicked with BigIntConversionError on as_i32()
        encoder.append(&log);
        assert_eq!(encoder.len(), 1);

        let batch = encoder.finish()?;
        assert_eq!(batch.num_rows(), 1);
        Ok(())
    }
}