datafusion_ethers/
config.rs

1use alloy::rpc::types::eth::{BlockNumberOrTag, Filter};
2use datafusion::{
3    config::{ConfigEntry, ConfigExtension, ExtensionOptions},
4    error::DataFusionError,
5};
6
7use crate::stream::StreamOptions;
8
9///////////////////////////////////////////////////////////////////////////////////////////////////
10
11#[derive(Debug, Clone)]
12pub struct EthProviderConfig {
13    pub schema_name: String,
14    pub block_range_from: BlockNumberOrTag,
15    pub block_range_to: BlockNumberOrTag,
16    pub block_stride: u64,
17
18    /// Many providers don't yet return `blockTimestamp` from `eth_getLogs` RPC endpoint
19    /// and in such cases `block_timestamp` column will be `null`.
20    /// If you enable this fallback the library will perform additional calls to `eth_getBlock`
21    /// to populate the timestamp. Interpolation of block times within the batch is used to
22    /// avoid resolving every single block not to introduce significant performance penalty.
23    ///
24    /// See: https://github.com/ethereum/execution-apis/issues/295
25    pub use_block_timestamp_fallback: bool,
26}
27
28///////////////////////////////////////////////////////////////////////////////////////////////////
29
30impl Default for EthProviderConfig {
31    fn default() -> Self {
32        Self {
33            schema_name: "eth".to_string(),
34            block_range_from: BlockNumberOrTag::Earliest,
35            block_range_to: BlockNumberOrTag::Latest,
36            block_stride: 100_000,
37            use_block_timestamp_fallback: false,
38        }
39    }
40}
41
42///////////////////////////////////////////////////////////////////////////////////////////////////
43
44impl EthProviderConfig {
45    pub fn default_filter(&self) -> Filter {
46        Filter::new()
47            .from_block(self.block_range_from)
48            .to_block(self.block_range_to)
49    }
50
51    pub fn stream_options(&self) -> StreamOptions {
52        StreamOptions {
53            block_stride: self.block_stride,
54            use_block_timestamp_fallback: self.use_block_timestamp_fallback,
55        }
56    }
57}
58
59///////////////////////////////////////////////////////////////////////////////////////////////////
60
61impl ConfigExtension for EthProviderConfig {
62    const PREFIX: &'static str = "ethereum";
63}
64
65///////////////////////////////////////////////////////////////////////////////////////////////////
66
67impl ExtensionOptions for EthProviderConfig {
68    fn as_any(&self) -> &dyn std::any::Any {
69        self
70    }
71
72    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
73        &mut *self
74    }
75
76    fn cloned(&self) -> Box<dyn ExtensionOptions> {
77        Box::new(self.clone())
78    }
79
80    fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
81        match key {
82            "block_range_from" => {
83                self.block_range_from = parse_block_number(value)
84                    .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
85                Ok(())
86            }
87            "block_range_to" => {
88                self.block_range_to = parse_block_number(value)
89                    .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
90                Ok(())
91            }
92            _ => Err(DataFusionError::Configuration(format!(
93                "Unsupported option: {key}"
94            ))),
95        }
96    }
97
98    fn entries(&self) -> Vec<ConfigEntry> {
99        vec![
100            ConfigEntry {
101                key: "block_range_from".to_string(),
102                value: Some(self.block_range_from.to_string()),
103                description: "Lower boundry (inclusive) restriction on block range when pushing down predicate to the node",
104            },
105            ConfigEntry {
106                key: "block_range_to".to_string(),
107                value: Some(self.block_range_to.to_string()),
108                description: "Upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
109            },
110        ]
111    }
112}
113
114///////////////////////////////////////////////////////////////////////////////////////////////////
115
116fn parse_block_number(s: &str) -> Result<BlockNumberOrTag, String> {
117    let block = match s.to_lowercase().as_str() {
118        "latest" => BlockNumberOrTag::Latest,
119        "finalized" => BlockNumberOrTag::Finalized,
120        "safe" => BlockNumberOrTag::Safe,
121        "earliest" => BlockNumberOrTag::Earliest,
122        "pending" => BlockNumberOrTag::Pending,
123        number => BlockNumberOrTag::Number(
124            number
125                .parse()
126                .map_err(|_| format!("Invalid block number: {number}"))?,
127        ),
128    };
129    Ok(block)
130}