datafusion_ethers/
config.rs1use alloy::rpc::types::eth::{BlockNumberOrTag, Filter};
2use datafusion::{
3 config::{ConfigEntry, ConfigExtension, ExtensionOptions},
4 error::DataFusionError,
5};
6
7use crate::stream::StreamOptions;
8
9#[derive(Debug, Clone)]
12pub struct EthProviderConfig {
13 pub block_range_from: BlockNumberOrTag,
14 pub block_range_to: BlockNumberOrTag,
15 pub block_stride: u64,
16}
17
18impl Default for EthProviderConfig {
21 fn default() -> Self {
22 Self {
23 block_range_from: BlockNumberOrTag::Earliest,
24 block_range_to: BlockNumberOrTag::Latest,
25 block_stride: 100_000,
26 }
27 }
28}
29
30impl EthProviderConfig {
33 pub fn default_filter(&self) -> Filter {
34 Filter::new()
35 .from_block(self.block_range_from)
36 .to_block(self.block_range_to)
37 }
38
39 pub fn stream_options(&self) -> StreamOptions {
40 StreamOptions {
41 block_stride: self.block_stride,
42 }
43 }
44}
45
46impl ConfigExtension for EthProviderConfig {
49 const PREFIX: &'static str = "ethereum";
50}
51
52impl ExtensionOptions for EthProviderConfig {
55 fn as_any(&self) -> &dyn std::any::Any {
56 self
57 }
58
59 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
60 &mut *self
61 }
62
63 fn cloned(&self) -> Box<dyn ExtensionOptions> {
64 Box::new(self.clone())
65 }
66
67 fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
68 match key {
69 "block_range_from" => {
70 self.block_range_from = parse_block_number(value)
71 .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
72 Ok(())
73 }
74 "block_range_to" => {
75 self.block_range_to = parse_block_number(value)
76 .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
77 Ok(())
78 }
79 _ => Err(DataFusionError::Configuration(format!(
80 "Unsupported option: {key}"
81 ))),
82 }
83 }
84
85 fn entries(&self) -> Vec<ConfigEntry> {
86 vec![
87 ConfigEntry {
88 key: "block_range_from".to_string(),
89 value: Some(self.block_range_from.to_string()),
90 description:
91 "Lower boundry (inclusive) restriction on block range when pushing down predicate to the node",
92 },
93 ConfigEntry {
94 key: "block_range_to".to_string(),
95 value: Some(self.block_range_to.to_string()),
96 description:
97 "Upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
98 },
99 ]
100 }
101}
102
103fn parse_block_number(s: &str) -> Result<BlockNumberOrTag, String> {
106 let block = match s.to_lowercase().as_str() {
107 "latest" => BlockNumberOrTag::Latest,
108 "finalized" => BlockNumberOrTag::Finalized,
109 "safe" => BlockNumberOrTag::Safe,
110 "earliest" => BlockNumberOrTag::Earliest,
111 "pending" => BlockNumberOrTag::Pending,
112 number => BlockNumberOrTag::Number(
113 number
114 .parse()
115 .map_err(|_| format!("Invalid block number: {number}"))?,
116 ),
117 };
118 Ok(block)
119}