datafusion_ethers/convert/
raw.rs

1use alloy::rpc::types::eth::Log;
2use datafusion::arrow::array::{self, ArrayBuilder};
3use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
4use std::sync::Arc;
5
6use crate::stream::StreamOptions;
7
8use super::{AppendError, Transcoder};
9
10///////////////////////////////////////////////////////////////////////////////////////////////////
11
12/// Transcodes decoded Ethereum log events into Arrow record batches
13pub struct EthRawLogsToArrow {
14    schema: SchemaRef,
15    block_number: array::UInt64Builder,
16    block_hash: array::BinaryBuilder,
17    block_timestamp: array::TimestampSecondBuilder,
18    transaction_index: array::UInt64Builder,
19    transaction_hash: array::BinaryBuilder,
20    log_index: array::UInt64Builder,
21    address: array::BinaryBuilder,
22    topic0: array::BinaryBuilder,
23    topic1: array::BinaryBuilder,
24    topic2: array::BinaryBuilder,
25    topic3: array::BinaryBuilder,
26    data: array::BinaryBuilder,
27}
28
29impl EthRawLogsToArrow {
30    pub fn new(options: &StreamOptions) -> Self {
31        let utc: Arc<str> = Arc::from("UTC");
32        Self {
33            schema: Arc::new(Schema::new(vec![
34                Field::new("block_number", DataType::UInt64, false),
35                Field::new("block_hash", DataType::Binary, false),
36                // TODO: Remove nullable once most providers support this field
37                // See: https://github.com/ethereum/execution-apis/issues/295
38                Field::new(
39                    "block_timestamp",
40                    DataType::Timestamp(TimeUnit::Second, Some(utc.clone())),
41                    !options.use_block_timestamp_fallback,
42                ),
43                Field::new("transaction_index", DataType::UInt64, false),
44                Field::new("transaction_hash", DataType::Binary, false),
45                Field::new("log_index", DataType::UInt64, false),
46                Field::new("address", DataType::Binary, false),
47                Field::new("topic0", DataType::Binary, true),
48                Field::new("topic1", DataType::Binary, true),
49                Field::new("topic2", DataType::Binary, true),
50                Field::new("topic3", DataType::Binary, true),
51                Field::new("data", DataType::Binary, false),
52            ])),
53            block_number: array::UInt64Builder::new(),
54            block_hash: array::BinaryBuilder::new(),
55            block_timestamp: array::TimestampSecondBuilder::new().with_timezone(utc),
56            transaction_index: array::UInt64Builder::new(),
57            transaction_hash: array::BinaryBuilder::new(),
58            log_index: array::UInt64Builder::new(),
59            address: array::BinaryBuilder::new(),
60            topic0: array::BinaryBuilder::new(),
61            topic1: array::BinaryBuilder::new(),
62            topic2: array::BinaryBuilder::new(),
63            topic3: array::BinaryBuilder::new(),
64            data: array::BinaryBuilder::new(),
65        }
66    }
67}
68
69impl Transcoder for EthRawLogsToArrow {
70    fn schema(&self) -> SchemaRef {
71        self.schema.clone()
72    }
73
74    #[allow(clippy::get_first)]
75    fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
76        for log in logs {
77            self.block_number.append_value(log.block_number.unwrap());
78            self.block_hash.append_value(log.block_hash.unwrap());
79            self.block_timestamp
80                .append_option(log.block_timestamp.map(|t| t as i64));
81            self.transaction_index
82                .append_value(log.transaction_index.unwrap());
83            self.transaction_hash
84                .append_value(log.transaction_hash.unwrap());
85            self.log_index.append_value(log.log_index.unwrap());
86            self.address.append_value(log.address().as_slice());
87
88            assert!(log.topics().len() <= 4);
89            self.topic0.append_option(log.topics().get(0));
90            self.topic1.append_option(log.topics().get(1));
91            self.topic2.append_option(log.topics().get(2));
92            self.topic3.append_option(log.topics().get(3));
93
94            self.data.append_value(&log.data().data);
95        }
96
97        Ok(())
98    }
99
100    fn len(&self) -> usize {
101        self.block_number.len()
102    }
103
104    fn finish(&mut self) -> array::RecordBatch {
105        array::RecordBatch::try_new(
106            self.schema.clone(),
107            vec![
108                Arc::new(self.block_number.finish()),
109                Arc::new(self.block_hash.finish()),
110                Arc::new(self.block_timestamp.finish()),
111                Arc::new(self.transaction_index.finish()),
112                Arc::new(self.transaction_hash.finish()),
113                Arc::new(self.log_index.finish()),
114                Arc::new(self.address.finish()),
115                Arc::new(self.topic0.finish()),
116                Arc::new(self.topic1.finish()),
117                Arc::new(self.topic2.finish()),
118                Arc::new(self.topic3.finish()),
119                Arc::new(self.data.finish()),
120            ],
121        )
122        .unwrap()
123    }
124}
125
126///////////////////////////////////////////////////////////////////////////////////////////////////