datafusion_ethers/convert/
mod.rs1mod decoded;
2mod hybrid;
3mod raw;
4
5use alloy::rpc::types::eth::{Filter, Log};
6use datafusion::arrow::array::RecordBatch;
7use datafusion::arrow::datatypes::SchemaRef;
8pub use decoded::*;
9pub use hybrid::*;
10pub use raw::*;
11
12use datafusion::error::Result as DfResult;
13use datafusion::execution::context::SessionContext;
14use datafusion::physical_plan::ExecutionPlan;
15
16pub trait Transcoder {
19 fn schema(&self) -> SchemaRef;
20 fn append(&mut self, logs: &[Log]) -> Result<(), AppendError>;
21 fn len(&self) -> usize;
22 fn finish(&mut self) -> RecordBatch;
23 fn is_empty(&self) -> bool {
24 self.len() == 0
25 }
26}
27
28#[derive(Debug, thiserror::Error)]
29pub enum AppendError {
30 #[error(transparent)]
31 EventDecodingError(#[from] alloy::dyn_abi::Error),
32}
33
34pub async fn sql_to_pushdown_filter(ctx: &SessionContext, sql: &str) -> DfResult<Option<Filter>> {
38 let df = ctx.sql(sql).await?;
39 let plan = df.create_physical_plan().await?;
40 Ok(sql_to_pushdown_filter_rec(plan.as_ref()))
41}
42
43fn sql_to_pushdown_filter_rec(plan: &dyn ExecutionPlan) -> Option<Filter> {
44 let mut found = plan
45 .as_any()
46 .downcast_ref::<super::provider::EthGetLogs>()
47 .map(|scan| scan.filter().clone());
48
49 for child in &plan.children() {
51 let child = sql_to_pushdown_filter_rec(child.as_ref());
52 if child.is_some() {
53 if found.is_some() {
54 unimplemented!("Multiple table scans in one query are not yet supported");
55 }
56 found = child;
57 }
58 }
59
60 found
61}