datafusion_ethers/convert/
mod.rs

1mod decoded;
2mod hybrid;
3mod raw;
4
5use alloy::rpc::types::eth::{Filter, Log};
6use datafusion::arrow::array::RecordBatch;
7use datafusion::arrow::datatypes::SchemaRef;
8pub use decoded::*;
9pub use hybrid::*;
10pub use raw::*;
11
12use datafusion::error::Result as DfResult;
13use datafusion::execution::context::SessionContext;
14use datafusion::physical_plan::ExecutionPlan;
15
16///////////////////////////////////////////////////////////////////////////////////////////////////
17
18pub trait Transcoder {
19    fn schema(&self) -> SchemaRef;
20    fn append(&mut self, logs: &[Log]) -> Result<(), AppendError>;
21    fn len(&self) -> usize;
22    fn finish(&mut self) -> RecordBatch;
23    fn is_empty(&self) -> bool {
24        self.len() == 0
25    }
26}
27
28#[derive(Debug, thiserror::Error)]
29pub enum AppendError {
30    #[error(transparent)]
31    EventDecodingError(#[from] alloy::dyn_abi::Error),
32}
33
34///////////////////////////////////////////////////////////////////////////////////////////////////
35
36/// Analyzes the SQL and returns a pushed-down filter that will be used when querying logs from the ETH node
37pub async fn sql_to_pushdown_filter(ctx: &SessionContext, sql: &str) -> DfResult<Option<Filter>> {
38    let df = ctx.sql(sql).await?;
39    let plan = df.create_physical_plan().await?;
40    Ok(sql_to_pushdown_filter_rec(plan.as_ref()))
41}
42
43fn sql_to_pushdown_filter_rec(plan: &dyn ExecutionPlan) -> Option<Filter> {
44    let mut found = plan
45        .as_any()
46        .downcast_ref::<super::provider::EthGetLogs>()
47        .map(|scan| scan.filter().clone());
48
49    // Traverse all the children too to make sure there is only one scan in this query (e.g. no UNIONs)
50    for child in &plan.children() {
51        let child = sql_to_pushdown_filter_rec(child.as_ref());
52        if child.is_some() {
53            if found.is_some() {
54                unimplemented!("Multiple table scans in one query are not yet supported");
55            }
56            found = child;
57        }
58    }
59
60    found
61}