ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use alloy::{
    dyn_abi::{DynSolType, Specifier},
    json_abi::Event,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

/// Creates an arrow schema for the given Solidity event, which includes the log header fields
/// and the fields corresponding to the event parameters.
/// The format is highly opinionated with the following order:
/// 1. `log_block` - u64
/// 2. `log_index` - u32
/// 3. `log_address` - utf8 (address)
/// 4. Indexed parameters - each parameter is represented by its Solidity type converted to an Arrow
///    data type.
/// 5. Non-indexed parameters - each parameter is represented by its Solidity type converted to an Arrow
///    data type.
pub fn create_schema(event: &Event) -> SchemaRef {
    // Append the headers
    let mut fields = vec![
        Field::new("log_block", DataType::UInt64, false),
        Field::new("log_index", DataType::UInt32, false),
        Field::new("log_address", DataType::Utf8, false),
    ];

    // Append the indexed parameters
    for param in event.inputs.iter() {
        if param.indexed {
            fields.push(Field::new(
                param.name.clone(),
                dyn_sol_type_to_data_type(&param.ty.resolve().unwrap()),
                false,
            ));
        }
    }

    // Append the non-indexed parameters
    for param in event.inputs.iter() {
        if !param.indexed {
            fields.push(Field::new(
                param.name.clone(),
                dyn_sol_type_to_data_type(&param.ty.resolve().unwrap()),
                false,
            ));
        }
    }

    Schema::new(fields).into()
}

fn dyn_sol_type_to_data_type(ty: &DynSolType) -> DataType {
    match ty {
        DynSolType::Bool => DataType::Boolean,
        DynSolType::Int(n) => {
            if *n > 64 {
                DataType::Utf8
            } else if *n > 32 {
                DataType::Int64
            } else if *n > 16 {
                DataType::Int32
            } else if *n > 8 {
                DataType::Int16
            } else {
                DataType::Int8
            }
        }
        DynSolType::Uint(n) => {
            if *n > 64 {
                DataType::Utf8
            } else if *n > 32 {
                DataType::UInt64
            } else if *n > 16 {
                DataType::UInt32
            } else if *n > 8 {
                DataType::UInt16
            } else {
                DataType::UInt8
            }
        }
        DynSolType::Address | DynSolType::String => DataType::Utf8,
        DynSolType::Bytes | DynSolType::FixedBytes(_) => DataType::Binary,
        _ => unimplemented!(
            "Support for transcoding {ty} solidity type to arrow is not yet implemented",
        ),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_create_schema() {
        let event = Event::parse("event Swap(bytes32 indexed id, address indexed sender, int128 amount0, int128 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick, uint24 fee)").unwrap();

        let schema = create_schema(&event);
        assert_eq!(schema.fields().len(), 8 + 3);
        assert_eq!(schema.field(0).name(), "log_block");
        assert_eq!(schema.field(1).name(), "log_index");
        assert_eq!(schema.field(2).name(), "log_address");
        assert_eq!(schema.field(3).name(), "id");
        assert_eq!(schema.field(4).name(), "sender");
        assert_eq!(schema.field(5).name(), "amount0");
        assert_eq!(schema.field(6).name(), "amount1");
        assert_eq!(schema.field(7).name(), "sqrtPriceX96");
        assert_eq!(schema.field(8).name(), "liquidity");
        assert_eq!(schema.field(9).name(), "tick");
        assert_eq!(schema.field(10).name(), "fee");

        assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
        assert_eq!(schema.field(1).data_type(), &DataType::UInt32);
        assert_eq!(schema.field(2).data_type(), &DataType::Utf8);
        assert_eq!(schema.field(3).data_type(), &DataType::Binary);
        assert_eq!(schema.field(4).data_type(), &DataType::Utf8);
        assert_eq!(schema.field(5).data_type(), &DataType::Utf8);
        assert_eq!(schema.field(6).data_type(), &DataType::Utf8);
        assert_eq!(schema.field(7).data_type(), &DataType::Utf8);
        assert_eq!(schema.field(8).data_type(), &DataType::Utf8);
        assert_eq!(schema.field(9).data_type(), &DataType::Int32);
        assert_eq!(schema.field(10).data_type(), &DataType::UInt32);
    }
}