datafusion_ethers/convert/
raw.rs1use alloy::rpc::types::eth::Log;
2use datafusion::arrow::array::{self, ArrayBuilder};
3use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
4use std::sync::Arc;
5
6use crate::stream::StreamOptions;
7
8use super::{AppendError, Transcoder};
9
10pub struct EthRawLogsToArrow {
14 schema: SchemaRef,
15 block_number: array::UInt64Builder,
16 block_hash: array::BinaryBuilder,
17 block_timestamp: array::TimestampSecondBuilder,
18 transaction_index: array::UInt64Builder,
19 transaction_hash: array::BinaryBuilder,
20 log_index: array::UInt64Builder,
21 address: array::BinaryBuilder,
22 topic0: array::BinaryBuilder,
23 topic1: array::BinaryBuilder,
24 topic2: array::BinaryBuilder,
25 topic3: array::BinaryBuilder,
26 data: array::BinaryBuilder,
27}
28
29impl EthRawLogsToArrow {
30 pub fn new(options: &StreamOptions) -> Self {
31 let utc: Arc<str> = Arc::from("UTC");
32 Self {
33 schema: Arc::new(Schema::new(vec![
34 Field::new("block_number", DataType::UInt64, false),
35 Field::new("block_hash", DataType::Binary, false),
36 Field::new(
39 "block_timestamp",
40 DataType::Timestamp(TimeUnit::Second, Some(utc.clone())),
41 !options.use_block_timestamp_fallback,
42 ),
43 Field::new("transaction_index", DataType::UInt64, false),
44 Field::new("transaction_hash", DataType::Binary, false),
45 Field::new("log_index", DataType::UInt64, false),
46 Field::new("address", DataType::Binary, false),
47 Field::new("topic0", DataType::Binary, true),
48 Field::new("topic1", DataType::Binary, true),
49 Field::new("topic2", DataType::Binary, true),
50 Field::new("topic3", DataType::Binary, true),
51 Field::new("data", DataType::Binary, false),
52 ])),
53 block_number: array::UInt64Builder::new(),
54 block_hash: array::BinaryBuilder::new(),
55 block_timestamp: array::TimestampSecondBuilder::new().with_timezone(utc),
56 transaction_index: array::UInt64Builder::new(),
57 transaction_hash: array::BinaryBuilder::new(),
58 log_index: array::UInt64Builder::new(),
59 address: array::BinaryBuilder::new(),
60 topic0: array::BinaryBuilder::new(),
61 topic1: array::BinaryBuilder::new(),
62 topic2: array::BinaryBuilder::new(),
63 topic3: array::BinaryBuilder::new(),
64 data: array::BinaryBuilder::new(),
65 }
66 }
67}
68
69impl Transcoder for EthRawLogsToArrow {
70 fn schema(&self) -> SchemaRef {
71 self.schema.clone()
72 }
73
74 #[allow(clippy::get_first)]
75 fn append(&mut self, logs: &[Log]) -> Result<(), AppendError> {
76 for log in logs {
77 self.block_number.append_value(log.block_number.unwrap());
78 self.block_hash.append_value(log.block_hash.unwrap());
79 self.block_timestamp
80 .append_option(log.block_timestamp.map(|t| t as i64));
81 self.transaction_index
82 .append_value(log.transaction_index.unwrap());
83 self.transaction_hash
84 .append_value(log.transaction_hash.unwrap());
85 self.log_index.append_value(log.log_index.unwrap());
86 self.address.append_value(log.address().as_slice());
87
88 assert!(log.topics().len() <= 4);
89 self.topic0.append_option(log.topics().get(0));
90 self.topic1.append_option(log.topics().get(1));
91 self.topic2.append_option(log.topics().get(2));
92 self.topic3.append_option(log.topics().get(3));
93
94 self.data.append_value(&log.data().data);
95 }
96
97 Ok(())
98 }
99
100 fn len(&self) -> usize {
101 self.block_number.len()
102 }
103
104 fn finish(&mut self) -> array::RecordBatch {
105 array::RecordBatch::try_new(
106 self.schema.clone(),
107 vec![
108 Arc::new(self.block_number.finish()),
109 Arc::new(self.block_hash.finish()),
110 Arc::new(self.block_timestamp.finish()),
111 Arc::new(self.transaction_index.finish()),
112 Arc::new(self.transaction_hash.finish()),
113 Arc::new(self.log_index.finish()),
114 Arc::new(self.address.finish()),
115 Arc::new(self.topic0.finish()),
116 Arc::new(self.topic1.finish()),
117 Arc::new(self.topic2.finish()),
118 Arc::new(self.topic3.finish()),
119 Arc::new(self.data.finish()),
120 ],
121 )
122 .unwrap()
123 }
124}
125
126