datafusion_ethers/
config.rs1use alloy::rpc::types::eth::{BlockNumberOrTag, Filter};
2use datafusion::{
3 config::{ConfigEntry, ConfigExtension, ExtensionOptions},
4 error::DataFusionError,
5};
6
7use crate::stream::StreamOptions;
8
9#[derive(Debug, Clone)]
12pub struct EthProviderConfig {
13 pub schema_name: String,
14 pub block_range_from: BlockNumberOrTag,
15 pub block_range_to: BlockNumberOrTag,
16 pub block_stride: u64,
17
18 pub use_block_timestamp_fallback: bool,
26}
27
28impl Default for EthProviderConfig {
31 fn default() -> Self {
32 Self {
33 schema_name: "eth".to_string(),
34 block_range_from: BlockNumberOrTag::Earliest,
35 block_range_to: BlockNumberOrTag::Latest,
36 block_stride: 100_000,
37 use_block_timestamp_fallback: false,
38 }
39 }
40}
41
42impl EthProviderConfig {
45 pub fn default_filter(&self) -> Filter {
46 Filter::new()
47 .from_block(self.block_range_from)
48 .to_block(self.block_range_to)
49 }
50
51 pub fn stream_options(&self) -> StreamOptions {
52 StreamOptions {
53 block_stride: self.block_stride,
54 use_block_timestamp_fallback: self.use_block_timestamp_fallback,
55 }
56 }
57}
58
59impl ConfigExtension for EthProviderConfig {
62 const PREFIX: &'static str = "ethereum";
63}
64
65impl ExtensionOptions for EthProviderConfig {
68 fn as_any(&self) -> &dyn std::any::Any {
69 self
70 }
71
72 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
73 &mut *self
74 }
75
76 fn cloned(&self) -> Box<dyn ExtensionOptions> {
77 Box::new(self.clone())
78 }
79
80 fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
81 match key {
82 "block_range_from" => {
83 self.block_range_from = parse_block_number(value)
84 .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
85 Ok(())
86 }
87 "block_range_to" => {
88 self.block_range_to = parse_block_number(value)
89 .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
90 Ok(())
91 }
92 _ => Err(DataFusionError::Configuration(format!(
93 "Unsupported option: {key}"
94 ))),
95 }
96 }
97
98 fn entries(&self) -> Vec<ConfigEntry> {
99 vec![
100 ConfigEntry {
101 key: "block_range_from".to_string(),
102 value: Some(self.block_range_from.to_string()),
103 description: "Lower boundry (inclusive) restriction on block range when pushing down predicate to the node",
104 },
105 ConfigEntry {
106 key: "block_range_to".to_string(),
107 value: Some(self.block_range_to.to_string()),
108 description: "Upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
109 },
110 ]
111 }
112}
113
114fn parse_block_number(s: &str) -> Result<BlockNumberOrTag, String> {
117 let block = match s.to_lowercase().as_str() {
118 "latest" => BlockNumberOrTag::Latest,
119 "finalized" => BlockNumberOrTag::Finalized,
120 "safe" => BlockNumberOrTag::Safe,
121 "earliest" => BlockNumberOrTag::Earliest,
122 "pending" => BlockNumberOrTag::Pending,
123 number => BlockNumberOrTag::Number(
124 number
125 .parse()
126 .map_err(|_| format!("Invalid block number: {number}"))?,
127 ),
128 };
129 Ok(block)
130}