datafusion_ethers/
provider.rs

1use alloy::primitives::{Address, B256};
2use alloy::providers::DynProvider;
3use alloy::rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption};
4use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
5use datafusion::error::{DataFusionError, Result as DfResult};
6use datafusion::execution::TaskContext;
7use datafusion::logical_expr::BinaryExpr;
8use datafusion::physical_expr::EquivalenceProperties;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
11use datafusion::{
12    arrow::{array::RecordBatch, datatypes::SchemaRef},
13    datasource::{TableProvider, TableType},
14    logical_expr::{Operator, TableProviderFilterPushDown},
15    physical_plan::ExecutionPlan,
16    prelude::*,
17    scalar::ScalarValue,
18};
19use futures::{Stream, TryStreamExt};
20use std::{any::Any, sync::Arc};
21
22use crate::config::EthProviderConfig;
23use crate::convert::Transcoder as _;
24use crate::stream::StreamOptions;
25use crate::utils::*;
26
27///////////////////////////////////////////////////////////////////////////////////////////////////
28// Catalog
29///////////////////////////////////////////////////////////////////////////////////////////////////
30
31#[derive(Debug)]
32pub struct EthCatalog {
33    rpc_client: DynProvider,
34}
35
36impl EthCatalog {
37    pub fn new(rpc_client: DynProvider) -> Self {
38        Self { rpc_client }
39    }
40}
41
42impl CatalogProvider for EthCatalog {
43    fn as_any(&self) -> &dyn Any {
44        self
45    }
46
47    fn schema_names(&self) -> Vec<String> {
48        vec!["eth".to_string()]
49    }
50
51    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
52        match name {
53            "eth" => Some(Arc::new(EthSchema::new(self.rpc_client.clone()))),
54            _ => None,
55        }
56    }
57}
58
59///////////////////////////////////////////////////////////////////////////////////////////////////
60// Schema
61///////////////////////////////////////////////////////////////////////////////////////////////////
62
63#[derive(Debug)]
64pub struct EthSchema {
65    rpc_client: DynProvider,
66}
67
68impl EthSchema {
69    pub fn new(rpc_client: DynProvider) -> Self {
70        Self { rpc_client }
71    }
72}
73
74#[async_trait::async_trait]
75impl SchemaProvider for EthSchema {
76    fn as_any(&self) -> &dyn Any {
77        self
78    }
79
80    fn table_names(&self) -> Vec<String> {
81        vec!["logs".to_string()]
82    }
83
84    async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
85        match name {
86            "logs" => Ok(Some(Arc::new(EthLogsTable::new(self.rpc_client.clone())))),
87            _ => Ok(None),
88        }
89    }
90
91    fn table_exist(&self, name: &str) -> bool {
92        name == "logs"
93    }
94}
95
96///////////////////////////////////////////////////////////////////////////////////////////////////
97// Table: eth_logs
98///////////////////////////////////////////////////////////////////////////////////////////////////
99
100#[derive(Debug)]
101pub struct EthLogsTable {
102    schema: SchemaRef,
103    rpc_client: DynProvider,
104}
105
106impl EthLogsTable {
107    pub fn new(rpc_client: DynProvider) -> Self {
108        Self {
109            schema: crate::convert::EthRawLogsToArrow::new().schema(),
110            rpc_client,
111        }
112    }
113
114    fn apply_expr(filter: Filter, expr: &Expr) -> (TableProviderFilterPushDown, Filter) {
115        match expr {
116            Expr::BinaryExpr(e) => match (&*e.left, e.op, &*e.right) {
117                (Expr::Column(left), op, right) => match (left.name.as_str(), op, right) {
118                    ("address", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
119                        TableProviderFilterPushDown::Exact,
120                        filter.address(Address::from_slice(&v[..])),
121                    ),
122                    ("topic0", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
123                        TableProviderFilterPushDown::Exact,
124                        filter.event_signature(B256::from_slice(&v[..])),
125                    ),
126                    ("topic1", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
127                        TableProviderFilterPushDown::Exact,
128                        filter.topic1(B256::from_slice(&v[..])),
129                    ),
130                    ("topic2", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
131                        TableProviderFilterPushDown::Exact,
132                        filter.topic2(B256::from_slice(&v[..])),
133                    ),
134                    ("topic3", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
135                        TableProviderFilterPushDown::Exact,
136                        filter.topic3(B256::from_slice(&v[..])),
137                    ),
138                    ("block_number", Operator::Eq, Expr::Literal(ScalarValue::UInt64(Some(v)))) => {
139                        (
140                            TableProviderFilterPushDown::Exact,
141                            filter.union((*v).into(), (*v).into()),
142                        )
143                    }
144                    ("block_number", Operator::Gt, Expr::Literal(ScalarValue::UInt64(Some(v)))) => {
145                        (
146                            TableProviderFilterPushDown::Exact,
147                            filter.union((*v + 1).into(), BlockNumberOrTag::Latest),
148                        )
149                    }
150                    (
151                        "block_number",
152                        Operator::GtEq,
153                        Expr::Literal(ScalarValue::UInt64(Some(v))),
154                    ) => (
155                        TableProviderFilterPushDown::Exact,
156                        filter.union((*v).into(), BlockNumberOrTag::Latest),
157                    ),
158                    ("block_number", Operator::Lt, Expr::Literal(ScalarValue::UInt64(Some(v)))) => {
159                        (
160                            TableProviderFilterPushDown::Exact,
161                            filter.union(BlockNumberOrTag::Earliest, (*v - 1).into()),
162                        )
163                    }
164                    (
165                        "block_number",
166                        Operator::LtEq,
167                        Expr::Literal(ScalarValue::UInt64(Some(v))),
168                    ) => (
169                        TableProviderFilterPushDown::Exact,
170                        filter.union(BlockNumberOrTag::Earliest, (*v).into()),
171                    ),
172                    _ => (TableProviderFilterPushDown::Unsupported, filter),
173                },
174                // expr OR expr OR ...
175                (Expr::BinaryExpr(_), Operator::Or, Expr::BinaryExpr(_)) => {
176                    match Self::collect_or_group(e) {
177                        (TableProviderFilterPushDown::Exact, Some(col), values) => {
178                            match col.name.as_str() {
179                                "address" => (
180                                    TableProviderFilterPushDown::Exact,
181                                    filter.address(
182                                        values
183                                            .into_iter()
184                                            .map(|v| Address::from_slice(&v[..]))
185                                            .collect::<Vec<_>>(),
186                                    ),
187                                ),
188                                "topic0" => (
189                                    TableProviderFilterPushDown::Exact,
190                                    filter.event_signature(
191                                        values
192                                            .into_iter()
193                                            .map(|v| B256::from_slice(&v[..]))
194                                            .collect::<Vec<_>>(),
195                                    ),
196                                ),
197                                "topic1" => (
198                                    TableProviderFilterPushDown::Exact,
199                                    filter.topic1(
200                                        values
201                                            .into_iter()
202                                            .map(|v| B256::from_slice(&v[..]))
203                                            .collect::<Vec<_>>(),
204                                    ),
205                                ),
206                                "topic2" => (
207                                    TableProviderFilterPushDown::Exact,
208                                    filter.topic2(
209                                        values
210                                            .into_iter()
211                                            .map(|v| B256::from_slice(&v[..]))
212                                            .collect::<Vec<_>>(),
213                                    ),
214                                ),
215                                "topic3" => (
216                                    TableProviderFilterPushDown::Exact,
217                                    filter.topic3(
218                                        values
219                                            .into_iter()
220                                            .map(|v| B256::from_slice(&v[..]))
221                                            .collect::<Vec<_>>(),
222                                    ),
223                                ),
224                                _ => (TableProviderFilterPushDown::Unsupported, filter),
225                            }
226                        }
227                        _ => (TableProviderFilterPushDown::Unsupported, filter),
228                    }
229                }
230                _ => (TableProviderFilterPushDown::Unsupported, filter),
231            },
232            _ => (TableProviderFilterPushDown::Unsupported, filter),
233        }
234    }
235
236    /// Converts expressions like:
237    ///   x = v1 or (x = v2 or (x = v3))
238    /// into:
239    ///   [v1, v2, v3]
240    fn collect_or_group(
241        expr: &BinaryExpr,
242    ) -> (TableProviderFilterPushDown, Option<&Column>, Vec<&Vec<u8>>) {
243        match (&*expr.left, expr.op, &*expr.right) {
244            (Expr::Column(col), Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(val)))) => {
245                (TableProviderFilterPushDown::Exact, Some(col), vec![val])
246            }
247            (Expr::BinaryExpr(left), Operator::Or, Expr::BinaryExpr(right)) => {
248                // Merge values from two branches, but only if they refer to the same column
249                let mut left = Self::collect_or_group(left);
250                let mut right = Self::collect_or_group(right);
251                if left.0 == right.0
252                    && right.0 == TableProviderFilterPushDown::Exact
253                    && left.1 == right.1
254                {
255                    left.2.append(&mut right.2);
256                    (TableProviderFilterPushDown::Exact, left.1, left.2)
257                } else {
258                    (TableProviderFilterPushDown::Unsupported, None, Vec::new())
259                }
260            }
261            _ => (TableProviderFilterPushDown::Unsupported, None, Vec::new()),
262        }
263    }
264}
265
266#[async_trait::async_trait]
267impl TableProvider for EthLogsTable {
268    fn as_any(&self) -> &dyn Any {
269        self
270    }
271
272    fn table_type(&self) -> TableType {
273        TableType::Base
274    }
275
276    fn schema(&self) -> SchemaRef {
277        self.schema.clone()
278    }
279
280    fn supports_filters_pushdown(
281        &self,
282        filters: &[&Expr],
283    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
284        tracing::debug!(?filters, "EthLogs: performing filter pushdown");
285
286        let mut filter = EthProviderConfig::default().default_filter();
287        let mut res = Vec::new();
288
289        for expr in filters {
290            let (support, new_filter) = Self::apply_expr(filter, expr);
291            filter = new_filter;
292            res.push(support);
293        }
294
295        tracing::debug!(?filter, "EthLogs: resulting filter");
296        Ok(res)
297    }
298
299    async fn scan(
300        &self,
301        state: &dyn Session,
302        mut projection: Option<&Vec<usize>>,
303        filters: &[Expr],
304        limit: Option<usize>,
305    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
306        let config = state
307            .config_options()
308            .extensions
309            .get::<EthProviderConfig>()
310            .cloned()
311            .unwrap_or_default();
312
313        tracing::debug!(
314            ?config,
315            ?projection,
316            ?filters,
317            ?limit,
318            "EthLogs: creating execution plan"
319        );
320
321        // TODO: Datafusion always passess a projection even when one is not required
322        if let Some(proj) = projection {
323            let is_redundant = proj.len() == self.schema.fields.len()
324                && proj[0] == 0
325                && proj
326                    .iter()
327                    .cloned()
328                    .reduce(|a, b| if a + 1 == b { b } else { a })
329                    == Some(self.schema.fields.len() - 1);
330            if is_redundant {
331                projection = None
332            }
333        }
334
335        let schema = if let Some(proj) = projection {
336            Arc::new(self.schema.project(proj)?)
337        } else {
338            self.schema.clone()
339        };
340
341        // Get filter with range restrictions from the session config level
342        let mut filter = config.default_filter();
343
344        // Push down filter expressions from the query
345        for expr in filters {
346            let (support, new_filter) = Self::apply_expr(filter, expr);
347            assert_eq!(support, TableProviderFilterPushDown::Exact);
348            filter = new_filter;
349        }
350
351        Ok(Arc::new(EthGetLogs::new(
352            self.rpc_client.clone(),
353            schema,
354            projection.cloned(),
355            filter,
356            config.stream_options(),
357            limit,
358        )))
359    }
360}
361
362///////////////////////////////////////////////////////////////////////////////////////////////////
363
364#[derive(Debug, Clone)]
365pub struct EthGetLogs {
366    projected_schema: SchemaRef,
367    projection: Option<Vec<usize>>,
368    rpc_client: DynProvider,
369    filter: Filter,
370    stream_options: StreamOptions,
371    limit: Option<usize>,
372    properties: PlanProperties,
373}
374
375impl EthGetLogs {
376    pub fn new(
377        rpc_client: DynProvider,
378        projected_schema: SchemaRef,
379        projection: Option<Vec<usize>>,
380        filter: Filter,
381        stream_options: StreamOptions,
382        limit: Option<usize>,
383    ) -> Self {
384        Self {
385            projected_schema: projected_schema.clone(),
386            projection,
387            rpc_client,
388            filter,
389            stream_options,
390            limit,
391            properties: PlanProperties::new(
392                EquivalenceProperties::new(projected_schema),
393                datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
394                datafusion::physical_plan::execution_plan::EmissionType::Incremental,
395                // TODO: Change to Unbounded
396                datafusion::physical_plan::execution_plan::Boundedness::Bounded,
397            ),
398        }
399    }
400
401    pub fn filter(&self) -> &Filter {
402        &self.filter
403    }
404
405    pub fn with_filter(mut self, filter: Filter) -> Self {
406        self.filter = filter;
407        self
408    }
409
410    fn execute_impl(
411        rpc_client: DynProvider,
412        filter: Filter,
413        options: StreamOptions,
414        projection: Option<Vec<usize>>,
415        limit: Option<usize>,
416    ) -> impl Stream<Item = DfResult<RecordBatch>> {
417        async_stream::try_stream! {
418            let limit = limit.unwrap_or(usize::MAX);
419            let mut coder = crate::convert::EthRawLogsToArrow::new();
420            let mut total = 0;
421
422            // TODO: Streaming/unbounded API
423            let mut log_stream = Box::pin(
424                crate::stream::RawLogsStream::paginate(rpc_client, filter, options, None)
425            );
426
427            while let Some(batch) = log_stream.try_next().await.map_err(|e| DataFusionError::External(e.into()))? {
428                if total >= limit {
429                    break;
430                }
431                let to_append = usize::min(batch.logs.len(), limit - total);
432                coder.append(&batch.logs[0..to_append]).map_err(|e| DataFusionError::External(e.into()))?;
433                total += to_append;
434
435                // Don't form batches that are too small
436                if coder.len() > 1_000 {
437                    let batch = if let Some(proj) = &projection {
438                        coder.finish().project(proj)?
439                    } else {
440                        coder.finish()
441                    };
442                    yield batch;
443                }
444            }
445
446            // Return at least one batch, even if empty
447            if !coder.is_empty() || total == 0 {
448                let batch = if let Some(proj) = &projection {
449                    coder.finish().project(proj)?
450                } else {
451                    coder.finish()
452                };
453                yield batch;
454            }
455        }
456    }
457}
458
459impl ExecutionPlan for EthGetLogs {
460    fn name(&self) -> &str {
461        Self::static_name()
462    }
463
464    fn as_any(&self) -> &dyn Any {
465        self
466    }
467
468    fn properties(&self) -> &PlanProperties {
469        &self.properties
470    }
471
472    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
473        // this is a leaf node and has no children
474        vec![]
475    }
476
477    fn with_new_children(
478        self: Arc<Self>,
479        _: Vec<Arc<dyn ExecutionPlan>>,
480    ) -> DfResult<Arc<dyn ExecutionPlan>> {
481        Ok(self)
482    }
483
484    fn execute(
485        &self,
486        partition: usize,
487        _context: Arc<TaskContext>,
488    ) -> DfResult<datafusion::execution::SendableRecordBatchStream> {
489        assert_eq!(partition, 0);
490
491        let stream = Self::execute_impl(
492            self.rpc_client.clone(),
493            self.filter.clone(),
494            self.stream_options.clone(),
495            self.projection.clone(),
496            self.limit,
497        );
498        Ok(Box::pin(RecordBatchStreamAdapter::new(
499            self.projected_schema.clone(),
500            stream,
501        )))
502    }
503}
504
505impl DisplayAs for EthGetLogs {
506    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
507        match t {
508            DisplayFormatType::Default | DisplayFormatType::Verbose => {
509                write!(f, "EthGetLogs: ")?;
510                if self.projection.is_none() {
511                    write!(f, "projection=[*]")?;
512                } else {
513                    let projection_str = self
514                        .projected_schema
515                        .fields
516                        .iter()
517                        .map(|f| f.name().as_str())
518                        .collect::<Vec<_>>()
519                        .join(", ");
520
521                    write!(f, "projection=[{}]", projection_str)?;
522                }
523
524                let mut filters = Vec::new();
525
526                match self.filter.block_option {
527                    FilterBlockOption::Range {
528                        from_block,
529                        to_block,
530                    } => filters.push(format!("block_number=[{:?}, {:?}]", from_block, to_block)),
531                    FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={}", h)),
532                }
533
534                if !self.filter.address.is_empty() {
535                    // Provide deterministic order
536                    let mut addrs: Vec<_> =
537                        self.filter.address.iter().map(|h| h.to_string()).collect();
538                    addrs.sort();
539                    filters.push(format!("address=[{}]", addrs.join(", ")));
540                }
541
542                for (i, t) in self
543                    .filter
544                    .topics
545                    .iter()
546                    .enumerate()
547                    .filter(|(_, t)| !t.is_empty())
548                {
549                    // Provide deterministic order
550                    let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
551                    topics.sort();
552                    filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
553                }
554
555                write!(f, ", filter=[{}]", filters.join(", "))?;
556
557                if let Some(limit) = self.limit {
558                    write!(f, ", limit={}", limit)?;
559                }
560
561                Ok(())
562            }
563            DisplayFormatType::TreeRender => {
564                writeln!(f, "EthGetLogs")?;
565
566                if self.projection.is_none() {
567                    writeln!(f, "projection=[*]")?;
568                } else {
569                    let projection_str = self
570                        .projected_schema
571                        .fields
572                        .iter()
573                        .map(|f| f.name().as_str())
574                        .collect::<Vec<_>>()
575                        .join(", ");
576
577                    writeln!(f, "projection=[{}]", projection_str)?;
578                }
579
580                let mut filters = Vec::new();
581
582                match self.filter.block_option {
583                    FilterBlockOption::Range {
584                        from_block,
585                        to_block,
586                    } => filters.push(format!("block_number=[{:?}, {:?}]", from_block, to_block)),
587                    FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={}", h)),
588                }
589
590                if !self.filter.address.is_empty() {
591                    // Provide deterministic order
592                    let mut addrs: Vec<_> =
593                        self.filter.address.iter().map(|h| h.to_string()).collect();
594                    addrs.sort();
595                    filters.push(format!("address=[{}]", addrs.join(", ")));
596                }
597
598                for (i, t) in self
599                    .filter
600                    .topics
601                    .iter()
602                    .enumerate()
603                    .filter(|(_, t)| !t.is_empty())
604                {
605                    // Provide deterministic order
606                    let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
607                    topics.sort();
608                    filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
609                }
610
611                writeln!(f, "filter=[{}]", filters.join(", "))?;
612
613                if let Some(limit) = self.limit {
614                    writeln!(f, "limit={}", limit)?;
615                }
616
617                Ok(())
618            }
619        }
620    }
621}
622
623///////////////////////////////////////////////////////////////////////////////////////////////////