datafusion_ethers/
config.rs

1use alloy::rpc::types::eth::{BlockNumberOrTag, Filter};
2use datafusion::{
3    config::{ConfigEntry, ConfigExtension, ExtensionOptions},
4    error::DataFusionError,
5};
6
7use crate::stream::StreamOptions;
8
9///////////////////////////////////////////////////////////////////////////////////////////////////
10
11#[derive(Debug, Clone)]
12pub struct EthProviderConfig {
13    pub block_range_from: BlockNumberOrTag,
14    pub block_range_to: BlockNumberOrTag,
15    pub block_stride: u64,
16}
17
18///////////////////////////////////////////////////////////////////////////////////////////////////
19
20impl Default for EthProviderConfig {
21    fn default() -> Self {
22        Self {
23            block_range_from: BlockNumberOrTag::Earliest,
24            block_range_to: BlockNumberOrTag::Latest,
25            block_stride: 100_000,
26        }
27    }
28}
29
30///////////////////////////////////////////////////////////////////////////////////////////////////
31
32impl EthProviderConfig {
33    pub fn default_filter(&self) -> Filter {
34        Filter::new()
35            .from_block(self.block_range_from)
36            .to_block(self.block_range_to)
37    }
38
39    pub fn stream_options(&self) -> StreamOptions {
40        StreamOptions {
41            block_stride: self.block_stride,
42        }
43    }
44}
45
46///////////////////////////////////////////////////////////////////////////////////////////////////
47
48impl ConfigExtension for EthProviderConfig {
49    const PREFIX: &'static str = "ethereum";
50}
51
52///////////////////////////////////////////////////////////////////////////////////////////////////
53
54impl ExtensionOptions for EthProviderConfig {
55    fn as_any(&self) -> &dyn std::any::Any {
56        self
57    }
58
59    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
60        &mut *self
61    }
62
63    fn cloned(&self) -> Box<dyn ExtensionOptions> {
64        Box::new(self.clone())
65    }
66
67    fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
68        match key {
69            "block_range_from" => {
70                self.block_range_from = parse_block_number(value)
71                    .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
72                Ok(())
73            }
74            "block_range_to" => {
75                self.block_range_to = parse_block_number(value)
76                    .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
77                Ok(())
78            }
79            _ => Err(DataFusionError::Configuration(format!(
80                "Unsupported option: {key}"
81            ))),
82        }
83    }
84
85    fn entries(&self) -> Vec<ConfigEntry> {
86        vec![
87            ConfigEntry {
88                key: "block_range_from".to_string(),
89                value: Some(self.block_range_from.to_string()),
90                description:
91                    "Lower boundry (inclusive) restriction on block range when pushing down predicate to the node",
92            },
93            ConfigEntry {
94                key: "block_range_to".to_string(),
95                value: Some(self.block_range_to.to_string()),
96                description:
97                    "Upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
98            },
99        ]
100    }
101}
102
103///////////////////////////////////////////////////////////////////////////////////////////////////
104
105fn parse_block_number(s: &str) -> Result<BlockNumberOrTag, String> {
106    let block = match s.to_lowercase().as_str() {
107        "latest" => BlockNumberOrTag::Latest,
108        "finalized" => BlockNumberOrTag::Finalized,
109        "safe" => BlockNumberOrTag::Safe,
110        "earliest" => BlockNumberOrTag::Earliest,
111        "pending" => BlockNumberOrTag::Pending,
112        number => BlockNumberOrTag::Number(
113            number
114                .parse()
115                .map_err(|_| format!("Invalid block number: {number}"))?,
116        ),
117    };
118    Ok(block)
119}