use std::sync::Arc;
use crate::storage::codec::{decoder::BatchEventDecoder, encoder::BatchEventEncoder};
use alloy::{dyn_abi::DynSolValue, hex::ToHexExt, json_abi::Event};
use anyhow::Result;
use arrow::{
array::{Array, ArrayRef},
datatypes::SchemaRef,
};
use parquet::{arrow::ArrowSchemaConverter, schema::types::SchemaDescriptor};
pub mod decoder;
pub mod encoder;
pub mod schema;
pub trait SolArrayBuilder {
fn append_value(&mut self, value: &DynSolValue);
fn len(&self) -> usize;
fn finish(&mut self) -> Arc<dyn Array>;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub trait SolArrayReader {
fn get(&self, col: &ArrayRef, index: usize) -> DynSolValue;
}
pub struct SolEventCodec {
pub event: Event,
pub schema: SchemaRef,
pub encoder: BatchEventEncoder,
pub decoder: BatchEventDecoder,
}
impl SolEventCodec {
pub fn new(event: &Event) -> Result<Self> {
let schema = schema::create_schema(event);
let decoder = BatchEventDecoder::new(event);
let encoder = BatchEventEncoder::new(event, schema.clone())?;
Ok(SolEventCodec {
event: event.clone(),
schema,
encoder,
decoder,
})
}
pub fn new_encoder(&self) -> BatchEventEncoder {
BatchEventEncoder::new(&self.event, self.schema.clone()).unwrap()
}
pub fn event_id(&self) -> String {
format!(
"{}-{}",
self.event.name.to_lowercase(),
&self.event.selector().encode_hex_with_prefix()[0..10]
)
}
pub fn parquet_schema(&self) -> SchemaDescriptor {
ArrowSchemaConverter::new()
.with_coerce_types(false)
.convert(&self.schema)
.expect("Failed to convert Arrow schema to Parquet schema")
}
}
impl TryInto<SolEventCodec> for &Event {
type Error = anyhow::Error;
fn try_into(self) -> Result<SolEventCodec> {
SolEventCodec::new(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy::{sol, sol_types::JsonAbiExt};
sol! {
#[sol(abi)]
#[derive(Debug, Default)]
event PoolCreated(address indexed token0, address indexed token1, bool indexed stable, address pool, uint256 count);
#[sol(abi)]
event SetCustomFee(address indexed pool, uint256 fee);
}
#[test]
fn test_sol_event_codec() -> Result<()> {
let codec = SolEventCodec::new(&PoolCreated::abi())?;
assert_eq!(codec.schema.fields().len(), 8); Ok(())
}
#[test]
fn test_encoder_no_panic_on_oversized_int_values() -> Result<()> {
use alloy::primitives::{Address, B256, LogData};
use alloy::rpc::types::Log;
sol! {
#[sol(abi)]
event Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1);
}
let event = Mint::abi();
let schema = super::schema::create_schema(&event);
let mut encoder = super::encoder::BatchEventEncoder::new(&event, schema)?;
let data_bytes = [0u8; 128];
let log = Log {
inner: alloy::primitives::Log {
address: "0x0ed46f2668a6394ffa806713a306a51b998eb070"
.parse::<Address>()
.unwrap(),
data: LogData::new(
vec![
"0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
.parse::<B256>()
.unwrap(),
"0x000000000000000000000000a9b1758a3e74e440c774e4cd7d7c00d9c5d27859"
.parse::<B256>()
.unwrap(),
"0x000000000000000000000000a9b1758a3e74e440c774e4cd7d7c00d9c5d27859"
.parse::<B256>()
.unwrap(),
"0x0000000000000000000000000000000000000000000000000000000000032640"
.parse::<B256>()
.unwrap(),
],
data_bytes.into(),
)
.unwrap(),
},
block_hash: None,
block_number: Some(42601282),
block_timestamp: None,
transaction_hash: None,
transaction_index: None,
log_index: Some(429),
removed: false,
};
encoder.append(&log);
assert_eq!(encoder.len(), 1);
let batch = encoder.finish()?;
assert_eq!(batch.num_rows(), 1);
Ok(())
}
}