datafusion_ethers/
provider.rs

1use alloy::primitives::{Address, B256};
2use alloy::providers::DynProvider;
3use alloy::rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption};
4use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
5use datafusion::error::{DataFusionError, Result as DfResult};
6use datafusion::execution::TaskContext;
7use datafusion::logical_expr::BinaryExpr;
8use datafusion::physical_expr::EquivalenceProperties;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
11use datafusion::{
12    arrow::{array::RecordBatch, datatypes::SchemaRef},
13    datasource::{TableProvider, TableType},
14    logical_expr::{Operator, TableProviderFilterPushDown},
15    physical_plan::ExecutionPlan,
16    prelude::*,
17    scalar::ScalarValue,
18};
19use futures::{Stream, TryStreamExt};
20use std::{any::Any, sync::Arc};
21
22use crate::config::EthProviderConfig;
23use crate::convert::Transcoder as _;
24use crate::stream::StreamOptions;
25use crate::utils::*;
26
27///////////////////////////////////////////////////////////////////////////////////////////////////
28// Catalog
29///////////////////////////////////////////////////////////////////////////////////////////////////
30
31#[derive(Debug)]
32pub struct EthCatalog {
33    config: EthProviderConfig,
34    rpc_client: DynProvider<alloy::network::AnyNetwork>,
35}
36
37impl EthCatalog {
38    pub fn new(
39        config: EthProviderConfig,
40        rpc_client: DynProvider<alloy::network::AnyNetwork>,
41    ) -> Self {
42        Self { config, rpc_client }
43    }
44}
45
46impl CatalogProvider for EthCatalog {
47    fn as_any(&self) -> &dyn Any {
48        self
49    }
50
51    fn schema_names(&self) -> Vec<String> {
52        vec![self.config.schema_name.clone()]
53    }
54
55    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
56        if name != self.config.schema_name {
57            return None;
58        }
59
60        Some(Arc::new(EthSchema::new(
61            self.config.clone(),
62            self.rpc_client.clone(),
63        )))
64    }
65}
66
67///////////////////////////////////////////////////////////////////////////////////////////////////
68// Schema
69///////////////////////////////////////////////////////////////////////////////////////////////////
70
71#[derive(Debug)]
72pub struct EthSchema {
73    config: EthProviderConfig,
74    rpc_client: DynProvider<alloy::network::AnyNetwork>,
75}
76
77impl EthSchema {
78    pub fn new(
79        config: EthProviderConfig,
80        rpc_client: DynProvider<alloy::network::AnyNetwork>,
81    ) -> Self {
82        Self { config, rpc_client }
83    }
84}
85
86#[async_trait::async_trait]
87impl SchemaProvider for EthSchema {
88    fn as_any(&self) -> &dyn Any {
89        self
90    }
91
92    fn table_names(&self) -> Vec<String> {
93        vec!["logs".to_string()]
94    }
95
96    async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
97        match name {
98            "logs" => Ok(Some(Arc::new(EthLogsTable::new(
99                &self.config,
100                self.rpc_client.clone(),
101            )))),
102            _ => Ok(None),
103        }
104    }
105
106    fn table_exist(&self, name: &str) -> bool {
107        name == "logs"
108    }
109}
110
111///////////////////////////////////////////////////////////////////////////////////////////////////
112// Table: eth_logs
113///////////////////////////////////////////////////////////////////////////////////////////////////
114
115#[derive(Debug)]
116pub struct EthLogsTable {
117    schema: SchemaRef,
118    rpc_client: DynProvider<alloy::network::AnyNetwork>,
119}
120
121impl EthLogsTable {
122    pub fn new(
123        config: &EthProviderConfig,
124        rpc_client: DynProvider<alloy::network::AnyNetwork>,
125    ) -> Self {
126        let encoder = crate::convert::EthRawLogsToArrow::new(&config.stream_options());
127        let schema = encoder.schema();
128
129        Self { schema, rpc_client }
130    }
131
132    fn apply_expr(filter: Filter, expr: &Expr) -> (TableProviderFilterPushDown, Filter) {
133        match expr {
134            Expr::BinaryExpr(e) => match (&*e.left, e.op, &*e.right) {
135                (Expr::Column(left), op, right) => match (left.name.as_str(), op, right) {
136                    (
137                        "address",
138                        Operator::Eq,
139                        Expr::Literal(ScalarValue::Binary(Some(v)), None),
140                    ) => (
141                        TableProviderFilterPushDown::Exact,
142                        filter.address(Address::from_slice(&v[..])),
143                    ),
144                    ("topic0", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
145                        (
146                            TableProviderFilterPushDown::Exact,
147                            filter.event_signature(B256::from_slice(&v[..])),
148                        )
149                    }
150                    ("topic1", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
151                        (
152                            TableProviderFilterPushDown::Exact,
153                            filter.topic1(B256::from_slice(&v[..])),
154                        )
155                    }
156                    ("topic2", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
157                        (
158                            TableProviderFilterPushDown::Exact,
159                            filter.topic2(B256::from_slice(&v[..])),
160                        )
161                    }
162                    ("topic3", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
163                        (
164                            TableProviderFilterPushDown::Exact,
165                            filter.topic3(B256::from_slice(&v[..])),
166                        )
167                    }
168                    (
169                        "block_number",
170                        Operator::Eq,
171                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
172                    ) => (
173                        TableProviderFilterPushDown::Exact,
174                        filter.union((*v).into(), (*v).into()),
175                    ),
176                    (
177                        "block_number",
178                        Operator::Gt,
179                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
180                    ) => (
181                        TableProviderFilterPushDown::Exact,
182                        filter.union((*v + 1).into(), BlockNumberOrTag::Latest),
183                    ),
184                    (
185                        "block_number",
186                        Operator::GtEq,
187                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
188                    ) => (
189                        TableProviderFilterPushDown::Exact,
190                        filter.union((*v).into(), BlockNumberOrTag::Latest),
191                    ),
192                    (
193                        "block_number",
194                        Operator::Lt,
195                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
196                    ) => (
197                        TableProviderFilterPushDown::Exact,
198                        filter.union(BlockNumberOrTag::Earliest, (*v - 1).into()),
199                    ),
200                    (
201                        "block_number",
202                        Operator::LtEq,
203                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
204                    ) => (
205                        TableProviderFilterPushDown::Exact,
206                        filter.union(BlockNumberOrTag::Earliest, (*v).into()),
207                    ),
208                    _ => (TableProviderFilterPushDown::Unsupported, filter),
209                },
210                // expr OR expr OR ...
211                (Expr::BinaryExpr(_), Operator::Or, Expr::BinaryExpr(_)) => {
212                    match Self::collect_or_group(e) {
213                        (TableProviderFilterPushDown::Exact, Some(col), values) => {
214                            match col.name.as_str() {
215                                "address" => (
216                                    TableProviderFilterPushDown::Exact,
217                                    filter.address(
218                                        values
219                                            .into_iter()
220                                            .map(|v| Address::from_slice(&v[..]))
221                                            .collect::<Vec<_>>(),
222                                    ),
223                                ),
224                                "topic0" => (
225                                    TableProviderFilterPushDown::Exact,
226                                    filter.event_signature(
227                                        values
228                                            .into_iter()
229                                            .map(|v| B256::from_slice(&v[..]))
230                                            .collect::<Vec<_>>(),
231                                    ),
232                                ),
233                                "topic1" => (
234                                    TableProviderFilterPushDown::Exact,
235                                    filter.topic1(
236                                        values
237                                            .into_iter()
238                                            .map(|v| B256::from_slice(&v[..]))
239                                            .collect::<Vec<_>>(),
240                                    ),
241                                ),
242                                "topic2" => (
243                                    TableProviderFilterPushDown::Exact,
244                                    filter.topic2(
245                                        values
246                                            .into_iter()
247                                            .map(|v| B256::from_slice(&v[..]))
248                                            .collect::<Vec<_>>(),
249                                    ),
250                                ),
251                                "topic3" => (
252                                    TableProviderFilterPushDown::Exact,
253                                    filter.topic3(
254                                        values
255                                            .into_iter()
256                                            .map(|v| B256::from_slice(&v[..]))
257                                            .collect::<Vec<_>>(),
258                                    ),
259                                ),
260                                _ => (TableProviderFilterPushDown::Unsupported, filter),
261                            }
262                        }
263                        _ => (TableProviderFilterPushDown::Unsupported, filter),
264                    }
265                }
266                _ => (TableProviderFilterPushDown::Unsupported, filter),
267            },
268            _ => (TableProviderFilterPushDown::Unsupported, filter),
269        }
270    }
271
272    /// Converts expressions like:
273    ///   x = v1 or (x = v2 or (x = v3))
274    /// into:
275    ///   [v1, v2, v3]
276    fn collect_or_group(
277        expr: &BinaryExpr,
278    ) -> (TableProviderFilterPushDown, Option<&Column>, Vec<&Vec<u8>>) {
279        match (&*expr.left, expr.op, &*expr.right) {
280            (
281                Expr::Column(col),
282                Operator::Eq,
283                Expr::Literal(ScalarValue::Binary(Some(val)), None),
284            ) => (TableProviderFilterPushDown::Exact, Some(col), vec![val]),
285            (Expr::BinaryExpr(left), Operator::Or, Expr::BinaryExpr(right)) => {
286                // Merge values from two branches, but only if they refer to the same column
287                let mut left = Self::collect_or_group(left);
288                let mut right = Self::collect_or_group(right);
289                if left.0 == right.0
290                    && right.0 == TableProviderFilterPushDown::Exact
291                    && left.1 == right.1
292                {
293                    left.2.append(&mut right.2);
294                    (TableProviderFilterPushDown::Exact, left.1, left.2)
295                } else {
296                    (TableProviderFilterPushDown::Unsupported, None, Vec::new())
297                }
298            }
299            _ => (TableProviderFilterPushDown::Unsupported, None, Vec::new()),
300        }
301    }
302}
303
304#[async_trait::async_trait]
305impl TableProvider for EthLogsTable {
306    fn as_any(&self) -> &dyn Any {
307        self
308    }
309
310    fn table_type(&self) -> TableType {
311        TableType::Base
312    }
313
314    fn schema(&self) -> SchemaRef {
315        self.schema.clone()
316    }
317
318    fn supports_filters_pushdown(
319        &self,
320        filters: &[&Expr],
321    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
322        tracing::debug!(?filters, "EthLogs: performing filter pushdown");
323
324        let mut filter = EthProviderConfig::default().default_filter();
325        let mut res = Vec::new();
326
327        for expr in filters {
328            let (support, new_filter) = Self::apply_expr(filter, expr);
329            filter = new_filter;
330            res.push(support);
331        }
332
333        tracing::debug!(?filter, "EthLogs: resulting filter");
334        Ok(res)
335    }
336
337    async fn scan(
338        &self,
339        state: &dyn Session,
340        mut projection: Option<&Vec<usize>>,
341        filters: &[Expr],
342        limit: Option<usize>,
343    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
344        let config = state
345            .config_options()
346            .extensions
347            .get::<EthProviderConfig>()
348            .cloned()
349            .unwrap_or_default();
350
351        tracing::debug!(
352            ?config,
353            ?projection,
354            ?filters,
355            ?limit,
356            "EthLogs: creating execution plan"
357        );
358
359        // TODO: Datafusion always passess a projection even when one is not required
360        if let Some(proj) = projection {
361            let is_redundant = proj.len() == self.schema.fields.len()
362                && proj[0] == 0
363                && proj
364                    .iter()
365                    .cloned()
366                    .reduce(|a, b| if a + 1 == b { b } else { a })
367                    == Some(self.schema.fields.len() - 1);
368            if is_redundant {
369                projection = None
370            }
371        }
372
373        let schema = if let Some(proj) = projection {
374            Arc::new(self.schema.project(proj)?)
375        } else {
376            self.schema.clone()
377        };
378
379        // Get filter with range restrictions from the session config level
380        let mut filter = config.default_filter();
381
382        // Push down filter expressions from the query
383        for expr in filters {
384            let (support, new_filter) = Self::apply_expr(filter, expr);
385            assert_eq!(support, TableProviderFilterPushDown::Exact);
386            filter = new_filter;
387        }
388
389        Ok(Arc::new(EthGetLogs::new(
390            self.rpc_client.clone(),
391            schema,
392            projection.cloned(),
393            filter,
394            config.stream_options(),
395            limit,
396        )))
397    }
398}
399
400///////////////////////////////////////////////////////////////////////////////////////////////////
401
402#[derive(Debug, Clone)]
403pub struct EthGetLogs {
404    projected_schema: SchemaRef,
405    projection: Option<Vec<usize>>,
406    rpc_client: DynProvider<alloy::network::AnyNetwork>,
407    filter: Filter,
408    stream_options: StreamOptions,
409    limit: Option<usize>,
410    properties: PlanProperties,
411}
412
413impl EthGetLogs {
414    pub fn new(
415        rpc_client: DynProvider<alloy::network::AnyNetwork>,
416        projected_schema: SchemaRef,
417        projection: Option<Vec<usize>>,
418        filter: Filter,
419        stream_options: StreamOptions,
420        limit: Option<usize>,
421    ) -> Self {
422        Self {
423            projected_schema: projected_schema.clone(),
424            projection,
425            rpc_client,
426            filter,
427            stream_options,
428            limit,
429            properties: PlanProperties::new(
430                EquivalenceProperties::new(projected_schema),
431                datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
432                datafusion::physical_plan::execution_plan::EmissionType::Incremental,
433                // TODO: Change to Unbounded
434                datafusion::physical_plan::execution_plan::Boundedness::Bounded,
435            )
436            .with_scheduling_type(
437                // TODO: Validate assumption: Since we pull data from the network
438                // we will never get a situation where control is never returned
439                // to the task scheduler.
440                datafusion::physical_plan::execution_plan::SchedulingType::Cooperative,
441            ),
442        }
443    }
444
445    pub fn filter(&self) -> &Filter {
446        &self.filter
447    }
448
449    pub fn with_filter(mut self, filter: Filter) -> Self {
450        self.filter = filter;
451        self
452    }
453
454    fn execute_impl(
455        rpc_client: DynProvider<alloy::network::AnyNetwork>,
456        filter: Filter,
457        options: StreamOptions,
458        projection: Option<Vec<usize>>,
459        limit: Option<usize>,
460    ) -> impl Stream<Item = DfResult<RecordBatch>> {
461        async_stream::try_stream! {
462            let limit = limit.unwrap_or(usize::MAX);
463            let mut coder = crate::convert::EthRawLogsToArrow::new(&options);
464            let mut total = 0;
465
466            // TODO: Streaming/unbounded API
467            let mut log_stream = Box::pin(
468                crate::stream::RawLogsStream::paginate(rpc_client.clone(), filter, options, None)
469            );
470
471            while let Some(batch) = log_stream.try_next().await.map_err(|e| DataFusionError::External(e.into()))? {
472                if total >= limit {
473                    break;
474                }
475                let to_append = usize::min(batch.logs.len(), limit - total);
476                coder.append(&batch.logs[0..to_append]).map_err(|e| DataFusionError::External(e.into()))?;
477                total += to_append;
478
479                // Don't form batches that are too small
480                if coder.len() > 1_000 {
481                    let batch = if let Some(proj) = &projection {
482                        coder.finish().project(proj)?
483                    } else {
484                        coder.finish()
485                    };
486                    yield batch;
487                }
488            }
489
490            // Return at least one batch, even if empty
491            if !coder.is_empty() || total == 0 {
492                let batch = if let Some(proj) = &projection {
493                    coder.finish().project(proj)?
494                } else {
495                    coder.finish()
496                };
497                yield batch;
498            }
499        }
500    }
501}
502
503impl ExecutionPlan for EthGetLogs {
504    fn name(&self) -> &str {
505        Self::static_name()
506    }
507
508    fn as_any(&self) -> &dyn Any {
509        self
510    }
511
512    fn properties(&self) -> &PlanProperties {
513        &self.properties
514    }
515
516    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
517        // this is a leaf node and has no children
518        vec![]
519    }
520
521    fn with_new_children(
522        self: Arc<Self>,
523        _: Vec<Arc<dyn ExecutionPlan>>,
524    ) -> DfResult<Arc<dyn ExecutionPlan>> {
525        Ok(self)
526    }
527
528    fn execute(
529        &self,
530        partition: usize,
531        _context: Arc<TaskContext>,
532    ) -> DfResult<datafusion::execution::SendableRecordBatchStream> {
533        assert_eq!(partition, 0);
534
535        let stream = Self::execute_impl(
536            self.rpc_client.clone(),
537            self.filter.clone(),
538            self.stream_options.clone(),
539            self.projection.clone(),
540            self.limit,
541        );
542
543        let record_batch_stream =
544            RecordBatchStreamAdapter::new(self.projected_schema.clone(), stream);
545
546        Ok(Box::pin(record_batch_stream))
547    }
548}
549
550impl DisplayAs for EthGetLogs {
551    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
552        match t {
553            DisplayFormatType::Default | DisplayFormatType::Verbose => {
554                write!(f, "EthGetLogs: ")?;
555                if self.projection.is_none() {
556                    write!(f, "projection=[*]")?;
557                } else {
558                    let projection_str = self
559                        .projected_schema
560                        .fields
561                        .iter()
562                        .map(|f| f.name().as_str())
563                        .collect::<Vec<_>>()
564                        .join(", ");
565
566                    write!(f, "projection=[{projection_str}]")?;
567                }
568
569                let mut filters = Vec::new();
570
571                match self.filter.block_option {
572                    FilterBlockOption::Range {
573                        from_block,
574                        to_block,
575                    } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
576                    FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
577                }
578
579                if !self.filter.address.is_empty() {
580                    // Provide deterministic order
581                    let mut addrs: Vec<_> =
582                        self.filter.address.iter().map(|h| h.to_string()).collect();
583                    addrs.sort();
584                    filters.push(format!("address=[{}]", addrs.join(", ")));
585                }
586
587                for (i, t) in self
588                    .filter
589                    .topics
590                    .iter()
591                    .enumerate()
592                    .filter(|(_, t)| !t.is_empty())
593                {
594                    // Provide deterministic order
595                    let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
596                    topics.sort();
597                    filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
598                }
599
600                write!(f, ", filter=[{}]", filters.join(", "))?;
601
602                if let Some(limit) = self.limit {
603                    write!(f, ", limit={limit}")?;
604                }
605
606                Ok(())
607            }
608            DisplayFormatType::TreeRender => {
609                writeln!(f, "EthGetLogs")?;
610
611                if self.projection.is_none() {
612                    writeln!(f, "projection=[*]")?;
613                } else {
614                    let projection_str = self
615                        .projected_schema
616                        .fields
617                        .iter()
618                        .map(|f| f.name().as_str())
619                        .collect::<Vec<_>>()
620                        .join(", ");
621
622                    writeln!(f, "projection=[{projection_str}]")?;
623                }
624
625                let mut filters = Vec::new();
626
627                match self.filter.block_option {
628                    FilterBlockOption::Range {
629                        from_block,
630                        to_block,
631                    } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
632                    FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
633                }
634
635                if !self.filter.address.is_empty() {
636                    // Provide deterministic order
637                    let mut addrs: Vec<_> =
638                        self.filter.address.iter().map(|h| h.to_string()).collect();
639                    addrs.sort();
640                    filters.push(format!("address=[{}]", addrs.join(", ")));
641                }
642
643                for (i, t) in self
644                    .filter
645                    .topics
646                    .iter()
647                    .enumerate()
648                    .filter(|(_, t)| !t.is_empty())
649                {
650                    // Provide deterministic order
651                    let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
652                    topics.sort();
653                    filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
654                }
655
656                writeln!(f, "filter=[{}]", filters.join(", "))?;
657
658                if let Some(limit) = self.limit {
659                    writeln!(f, "limit={limit}")?;
660                }
661
662                Ok(())
663            }
664        }
665    }
666}
667
668///////////////////////////////////////////////////////////////////////////////////////////////////