datafusion_ethers/
config.rs1use alloy::rpc::types::eth::{BlockNumberOrTag, Filter};
2use datafusion::{
3 config::{ConfigEntry, ConfigExtension, ExtensionOptions},
4 error::DataFusionError,
5};
6
7use crate::stream::StreamOptions;
8
9#[derive(Debug, Clone)]
12pub struct EthProviderConfig {
13 pub block_range_from: BlockNumberOrTag,
14 pub block_range_to: BlockNumberOrTag,
15 pub block_stride: u64,
16}
17
18impl Default for EthProviderConfig {
21 fn default() -> Self {
22 Self {
23 block_range_from: BlockNumberOrTag::Earliest,
24 block_range_to: BlockNumberOrTag::Latest,
25 block_stride: 100_000,
26 }
27 }
28}
29
30impl EthProviderConfig {
33 pub fn default_filter(&self) -> Filter {
34 Filter::new()
35 .from_block(self.block_range_from)
36 .to_block(self.block_range_to)
37 }
38
39 pub fn stream_options(&self) -> StreamOptions {
40 StreamOptions {
41 block_stride: self.block_stride,
42 }
43 }
44}
45
46impl ConfigExtension for EthProviderConfig {
49 const PREFIX: &'static str = "ethereum";
50}
51
52impl ExtensionOptions for EthProviderConfig {
55 fn as_any(&self) -> &dyn std::any::Any {
56 self
57 }
58
59 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
60 &mut *self
61 }
62
63 fn cloned(&self) -> Box<dyn ExtensionOptions> {
64 Box::new(self.clone())
65 }
66
67 fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
68 match key {
69 "block_range_from" => {
70 self.block_range_from = parse_block_number(value)
71 .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
72 Ok(())
73 }
74 "block_range_to" => {
75 self.block_range_to = parse_block_number(value)
76 .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
77 Ok(())
78 }
79 _ => Err(DataFusionError::Configuration(format!(
80 "Unsupported option: {key}"
81 ))),
82 }
83 }
84
85 fn entries(&self) -> Vec<ConfigEntry> {
86 vec![
87 ConfigEntry {
88 key: "block_range_from".to_string(),
89 value: Some(self.block_range_from.to_string()),
90 description: "Lower boundry (inclusive) restriction on block range when pushing down predicate to the node",
91 },
92 ConfigEntry {
93 key: "block_range_to".to_string(),
94 value: Some(self.block_range_to.to_string()),
95 description: "Upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
96 },
97 ]
98 }
99}
100
101fn parse_block_number(s: &str) -> Result<BlockNumberOrTag, String> {
104 let block = match s.to_lowercase().as_str() {
105 "latest" => BlockNumberOrTag::Latest,
106 "finalized" => BlockNumberOrTag::Finalized,
107 "safe" => BlockNumberOrTag::Safe,
108 "earliest" => BlockNumberOrTag::Earliest,
109 "pending" => BlockNumberOrTag::Pending,
110 number => BlockNumberOrTag::Number(
111 number
112 .parse()
113 .map_err(|_| format!("Invalid block number: {number}"))?,
114 ),
115 };
116 Ok(block)
117}