datafusion_ethers/convert/
raw.rs1use alloy::rpc::types::eth::Log;
2use datafusion::arrow::array::{self, ArrayBuilder};
3use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
4use std::sync::Arc;
5
6use super::{AppendError, Transcoder};
7
8pub struct EthRawLogsToArrow {
12 schema: SchemaRef,
13 block_number: array::UInt64Builder,
14 block_hash: array::BinaryBuilder,
15 block_timestamp: array::TimestampSecondBuilder,
16 transaction_index: array::UInt64Builder,
17 transaction_hash: array::BinaryBuilder,
18 log_index: array::UInt64Builder,
19 address: array::BinaryBuilder,
20 topic0: array::BinaryBuilder,
21 topic1: array::BinaryBuilder,
22 topic2: array::BinaryBuilder,
23 topic3: array::BinaryBuilder,
24 data: array::BinaryBuilder,
25}
26
27impl Default for EthRawLogsToArrow {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl EthRawLogsToArrow {
34 pub fn new() -> Self {
35 let utc: Arc<str> = Arc::from("UTC");
36 Self {
37 schema: Arc::new(Schema::new(vec![
38 Field::new("block_number", DataType::UInt64, false),
39 Field::new("block_hash", DataType::Binary, false),
40 Field::new(
43 "block_timestamp",
44 DataType::Timestamp(TimeUnit::Second, Some(utc.clone())),
45 true,
46 ),
47 Field::new("transaction_index", DataType::UInt64, false),
48 Field::new("transaction_hash", DataType::Binary, false),
49 Field::new("log_index", DataType::UInt64, false),
50 Field::new("address", DataType::Binary, false),
51 Field::new("topic0", DataType::Binary, true),
52 Field::new("topic1", DataType::Binary, true),
53 Field::new("topic2", DataType::Binary, true),
54 Field::new("topic3", DataType::Binary, true),
55 Field::new("data", DataType::Binary, false),
56 ])),
57 block_number: array::UInt64Builder::new(),
58 block_hash: array::BinaryBuilder::new(),
59 block_timestamp: array::TimestampSecondBuilder::new().with_timezone(utc),
60 transaction_index: array::UInt64Builder::new(),
61 transaction_hash: array::BinaryBuilder::new(),
62 log_index: array::UInt64Builder::new(),
63 address: array::BinaryBuilder::new(),
64 topic0: array::BinaryBuilder::new(),
65 topic1: array::BinaryBuilder::new(),
66 topic2: array::BinaryBuilder::new(),
67 topic3: array::BinaryBuilder::new(),
68 data: array::BinaryBuilder::new(),
69 }
70 }
71}
72
73impl Transcoder for EthRawLogsToArrow {
74 fn schema(&self) -> SchemaRef {
75 self.schema.clone()
76 }
77
78 #[allow(clippy::get_first)]
79 fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
80 for log in logs {
81 self.block_number.append_value(log.block_number.unwrap());
82 self.block_hash.append_value(log.block_hash.unwrap());
83 self.block_timestamp
84 .append_option(log.block_timestamp.map(|t| t as i64));
85 self.transaction_index
86 .append_value(log.transaction_index.unwrap());
87 self.transaction_hash
88 .append_value(log.transaction_hash.unwrap());
89 self.log_index.append_value(log.log_index.unwrap());
90 self.address.append_value(log.address().as_slice());
91
92 assert!(log.topics().len() <= 4);
93 self.topic0.append_option(log.topics().get(0));
94 self.topic1.append_option(log.topics().get(1));
95 self.topic2.append_option(log.topics().get(2));
96 self.topic3.append_option(log.topics().get(3));
97
98 self.data.append_value(&log.data().data);
99 }
100
101 Ok(())
102 }
103
104 fn len(&self) -> usize {
105 self.block_number.len()
106 }
107
108 fn finish(&mut self) -> array::RecordBatch {
109 array::RecordBatch::try_new(
110 self.schema.clone(),
111 vec![
112 Arc::new(self.block_number.finish()),
113 Arc::new(self.block_hash.finish()),
114 Arc::new(self.block_timestamp.finish()),
115 Arc::new(self.transaction_index.finish()),
116 Arc::new(self.transaction_hash.finish()),
117 Arc::new(self.log_index.finish()),
118 Arc::new(self.address.finish()),
119 Arc::new(self.topic0.finish()),
120 Arc::new(self.topic1.finish()),
121 Arc::new(self.topic2.finish()),
122 Arc::new(self.topic3.finish()),
123 Arc::new(self.data.finish()),
124 ],
125 )
126 .unwrap()
127 }
128}
129
130