datafusion_ethers/
provider.rs

1use alloy::primitives::{Address, B256};
2use alloy::providers::DynProvider;
3use alloy::rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption};
4use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
5use datafusion::error::{DataFusionError, Result as DfResult};
6use datafusion::execution::TaskContext;
7use datafusion::logical_expr::BinaryExpr;
8use datafusion::physical_expr::EquivalenceProperties;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
11use datafusion::{
12    arrow::{array::RecordBatch, datatypes::SchemaRef},
13    datasource::{TableProvider, TableType},
14    logical_expr::{Operator, TableProviderFilterPushDown},
15    physical_plan::ExecutionPlan,
16    prelude::*,
17    scalar::ScalarValue,
18};
19use futures::{Stream, TryStreamExt};
20use std::{any::Any, sync::Arc};
21
22use crate::config::EthProviderConfig;
23use crate::convert::Transcoder as _;
24use crate::stream::StreamOptions;
25use crate::utils::*;
26
27///////////////////////////////////////////////////////////////////////////////////////////////////
28// Catalog
29///////////////////////////////////////////////////////////////////////////////////////////////////
30
31#[derive(Debug)]
32pub struct EthCatalog {
33    rpc_client: DynProvider,
34}
35
36impl EthCatalog {
37    pub fn new(rpc_client: DynProvider) -> Self {
38        Self { rpc_client }
39    }
40}
41
42impl CatalogProvider for EthCatalog {
43    fn as_any(&self) -> &dyn Any {
44        self
45    }
46
47    fn schema_names(&self) -> Vec<String> {
48        vec!["eth".to_string()]
49    }
50
51    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
52        match name {
53            "eth" => Some(Arc::new(EthSchema::new(self.rpc_client.clone()))),
54            _ => None,
55        }
56    }
57}
58
59///////////////////////////////////////////////////////////////////////////////////////////////////
60// Schema
61///////////////////////////////////////////////////////////////////////////////////////////////////
62
63#[derive(Debug)]
64pub struct EthSchema {
65    rpc_client: DynProvider,
66}
67
68impl EthSchema {
69    pub fn new(rpc_client: DynProvider) -> Self {
70        Self { rpc_client }
71    }
72}
73
74#[async_trait::async_trait]
75impl SchemaProvider for EthSchema {
76    fn as_any(&self) -> &dyn Any {
77        self
78    }
79
80    fn table_names(&self) -> Vec<String> {
81        vec!["logs".to_string()]
82    }
83
84    async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
85        match name {
86            "logs" => Ok(Some(Arc::new(EthLogsTable::new(self.rpc_client.clone())))),
87            _ => Ok(None),
88        }
89    }
90
91    fn table_exist(&self, name: &str) -> bool {
92        name == "logs"
93    }
94}
95
96///////////////////////////////////////////////////////////////////////////////////////////////////
97// Table: eth_logs
98///////////////////////////////////////////////////////////////////////////////////////////////////
99
100#[derive(Debug)]
101pub struct EthLogsTable {
102    schema: SchemaRef,
103    rpc_client: DynProvider,
104}
105
106impl EthLogsTable {
107    pub fn new(rpc_client: DynProvider) -> Self {
108        Self {
109            schema: crate::convert::EthRawLogsToArrow::new().schema(),
110            rpc_client,
111        }
112    }
113
114    fn apply_expr(filter: Filter, expr: &Expr) -> (TableProviderFilterPushDown, Filter) {
115        match expr {
116            Expr::BinaryExpr(e) => match (&*e.left, e.op, &*e.right) {
117                (Expr::Column(left), op, right) => match (left.name.as_str(), op, right) {
118                    (
119                        "address",
120                        Operator::Eq,
121                        Expr::Literal(ScalarValue::Binary(Some(v)), None),
122                    ) => (
123                        TableProviderFilterPushDown::Exact,
124                        filter.address(Address::from_slice(&v[..])),
125                    ),
126                    ("topic0", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
127                        (
128                            TableProviderFilterPushDown::Exact,
129                            filter.event_signature(B256::from_slice(&v[..])),
130                        )
131                    }
132                    ("topic1", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
133                        (
134                            TableProviderFilterPushDown::Exact,
135                            filter.topic1(B256::from_slice(&v[..])),
136                        )
137                    }
138                    ("topic2", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
139                        (
140                            TableProviderFilterPushDown::Exact,
141                            filter.topic2(B256::from_slice(&v[..])),
142                        )
143                    }
144                    ("topic3", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
145                        (
146                            TableProviderFilterPushDown::Exact,
147                            filter.topic3(B256::from_slice(&v[..])),
148                        )
149                    }
150                    (
151                        "block_number",
152                        Operator::Eq,
153                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
154                    ) => (
155                        TableProviderFilterPushDown::Exact,
156                        filter.union((*v).into(), (*v).into()),
157                    ),
158                    (
159                        "block_number",
160                        Operator::Gt,
161                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
162                    ) => (
163                        TableProviderFilterPushDown::Exact,
164                        filter.union((*v + 1).into(), BlockNumberOrTag::Latest),
165                    ),
166                    (
167                        "block_number",
168                        Operator::GtEq,
169                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
170                    ) => (
171                        TableProviderFilterPushDown::Exact,
172                        filter.union((*v).into(), BlockNumberOrTag::Latest),
173                    ),
174                    (
175                        "block_number",
176                        Operator::Lt,
177                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
178                    ) => (
179                        TableProviderFilterPushDown::Exact,
180                        filter.union(BlockNumberOrTag::Earliest, (*v - 1).into()),
181                    ),
182                    (
183                        "block_number",
184                        Operator::LtEq,
185                        Expr::Literal(ScalarValue::UInt64(Some(v)), None),
186                    ) => (
187                        TableProviderFilterPushDown::Exact,
188                        filter.union(BlockNumberOrTag::Earliest, (*v).into()),
189                    ),
190                    _ => (TableProviderFilterPushDown::Unsupported, filter),
191                },
192                // expr OR expr OR ...
193                (Expr::BinaryExpr(_), Operator::Or, Expr::BinaryExpr(_)) => {
194                    match Self::collect_or_group(e) {
195                        (TableProviderFilterPushDown::Exact, Some(col), values) => {
196                            match col.name.as_str() {
197                                "address" => (
198                                    TableProviderFilterPushDown::Exact,
199                                    filter.address(
200                                        values
201                                            .into_iter()
202                                            .map(|v| Address::from_slice(&v[..]))
203                                            .collect::<Vec<_>>(),
204                                    ),
205                                ),
206                                "topic0" => (
207                                    TableProviderFilterPushDown::Exact,
208                                    filter.event_signature(
209                                        values
210                                            .into_iter()
211                                            .map(|v| B256::from_slice(&v[..]))
212                                            .collect::<Vec<_>>(),
213                                    ),
214                                ),
215                                "topic1" => (
216                                    TableProviderFilterPushDown::Exact,
217                                    filter.topic1(
218                                        values
219                                            .into_iter()
220                                            .map(|v| B256::from_slice(&v[..]))
221                                            .collect::<Vec<_>>(),
222                                    ),
223                                ),
224                                "topic2" => (
225                                    TableProviderFilterPushDown::Exact,
226                                    filter.topic2(
227                                        values
228                                            .into_iter()
229                                            .map(|v| B256::from_slice(&v[..]))
230                                            .collect::<Vec<_>>(),
231                                    ),
232                                ),
233                                "topic3" => (
234                                    TableProviderFilterPushDown::Exact,
235                                    filter.topic3(
236                                        values
237                                            .into_iter()
238                                            .map(|v| B256::from_slice(&v[..]))
239                                            .collect::<Vec<_>>(),
240                                    ),
241                                ),
242                                _ => (TableProviderFilterPushDown::Unsupported, filter),
243                            }
244                        }
245                        _ => (TableProviderFilterPushDown::Unsupported, filter),
246                    }
247                }
248                _ => (TableProviderFilterPushDown::Unsupported, filter),
249            },
250            _ => (TableProviderFilterPushDown::Unsupported, filter),
251        }
252    }
253
254    /// Converts expressions like:
255    ///   x = v1 or (x = v2 or (x = v3))
256    /// into:
257    ///   [v1, v2, v3]
258    fn collect_or_group(
259        expr: &BinaryExpr,
260    ) -> (TableProviderFilterPushDown, Option<&Column>, Vec<&Vec<u8>>) {
261        match (&*expr.left, expr.op, &*expr.right) {
262            (
263                Expr::Column(col),
264                Operator::Eq,
265                Expr::Literal(ScalarValue::Binary(Some(val)), None),
266            ) => (TableProviderFilterPushDown::Exact, Some(col), vec![val]),
267            (Expr::BinaryExpr(left), Operator::Or, Expr::BinaryExpr(right)) => {
268                // Merge values from two branches, but only if they refer to the same column
269                let mut left = Self::collect_or_group(left);
270                let mut right = Self::collect_or_group(right);
271                if left.0 == right.0
272                    && right.0 == TableProviderFilterPushDown::Exact
273                    && left.1 == right.1
274                {
275                    left.2.append(&mut right.2);
276                    (TableProviderFilterPushDown::Exact, left.1, left.2)
277                } else {
278                    (TableProviderFilterPushDown::Unsupported, None, Vec::new())
279                }
280            }
281            _ => (TableProviderFilterPushDown::Unsupported, None, Vec::new()),
282        }
283    }
284}
285
286#[async_trait::async_trait]
287impl TableProvider for EthLogsTable {
288    fn as_any(&self) -> &dyn Any {
289        self
290    }
291
292    fn table_type(&self) -> TableType {
293        TableType::Base
294    }
295
296    fn schema(&self) -> SchemaRef {
297        self.schema.clone()
298    }
299
300    fn supports_filters_pushdown(
301        &self,
302        filters: &[&Expr],
303    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
304        tracing::debug!(?filters, "EthLogs: performing filter pushdown");
305
306        let mut filter = EthProviderConfig::default().default_filter();
307        let mut res = Vec::new();
308
309        for expr in filters {
310            let (support, new_filter) = Self::apply_expr(filter, expr);
311            filter = new_filter;
312            res.push(support);
313        }
314
315        tracing::debug!(?filter, "EthLogs: resulting filter");
316        Ok(res)
317    }
318
319    async fn scan(
320        &self,
321        state: &dyn Session,
322        mut projection: Option<&Vec<usize>>,
323        filters: &[Expr],
324        limit: Option<usize>,
325    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
326        let config = state
327            .config_options()
328            .extensions
329            .get::<EthProviderConfig>()
330            .cloned()
331            .unwrap_or_default();
332
333        tracing::debug!(
334            ?config,
335            ?projection,
336            ?filters,
337            ?limit,
338            "EthLogs: creating execution plan"
339        );
340
341        // TODO: Datafusion always passess a projection even when one is not required
342        if let Some(proj) = projection {
343            let is_redundant = proj.len() == self.schema.fields.len()
344                && proj[0] == 0
345                && proj
346                    .iter()
347                    .cloned()
348                    .reduce(|a, b| if a + 1 == b { b } else { a })
349                    == Some(self.schema.fields.len() - 1);
350            if is_redundant {
351                projection = None
352            }
353        }
354
355        let schema = if let Some(proj) = projection {
356            Arc::new(self.schema.project(proj)?)
357        } else {
358            self.schema.clone()
359        };
360
361        // Get filter with range restrictions from the session config level
362        let mut filter = config.default_filter();
363
364        // Push down filter expressions from the query
365        for expr in filters {
366            let (support, new_filter) = Self::apply_expr(filter, expr);
367            assert_eq!(support, TableProviderFilterPushDown::Exact);
368            filter = new_filter;
369        }
370
371        Ok(Arc::new(EthGetLogs::new(
372            self.rpc_client.clone(),
373            schema,
374            projection.cloned(),
375            filter,
376            config.stream_options(),
377            limit,
378        )))
379    }
380}
381
382///////////////////////////////////////////////////////////////////////////////////////////////////
383
384#[derive(Debug, Clone)]
385pub struct EthGetLogs {
386    projected_schema: SchemaRef,
387    projection: Option<Vec<usize>>,
388    rpc_client: DynProvider,
389    filter: Filter,
390    stream_options: StreamOptions,
391    limit: Option<usize>,
392    properties: PlanProperties,
393}
394
395impl EthGetLogs {
396    pub fn new(
397        rpc_client: DynProvider,
398        projected_schema: SchemaRef,
399        projection: Option<Vec<usize>>,
400        filter: Filter,
401        stream_options: StreamOptions,
402        limit: Option<usize>,
403    ) -> Self {
404        Self {
405            projected_schema: projected_schema.clone(),
406            projection,
407            rpc_client,
408            filter,
409            stream_options,
410            limit,
411            properties: PlanProperties::new(
412                EquivalenceProperties::new(projected_schema),
413                datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
414                datafusion::physical_plan::execution_plan::EmissionType::Incremental,
415                // TODO: Change to Unbounded
416                datafusion::physical_plan::execution_plan::Boundedness::Bounded,
417            )
418            .with_scheduling_type(
419                // TODO: Validate assumption: Since we pull data from the network
420                // we will never get a situation where control is never returned
421                // to the task scheduler.
422                datafusion::physical_plan::execution_plan::SchedulingType::Cooperative,
423            ),
424        }
425    }
426
427    pub fn filter(&self) -> &Filter {
428        &self.filter
429    }
430
431    pub fn with_filter(mut self, filter: Filter) -> Self {
432        self.filter = filter;
433        self
434    }
435
436    fn execute_impl(
437        rpc_client: DynProvider,
438        filter: Filter,
439        options: StreamOptions,
440        projection: Option<Vec<usize>>,
441        limit: Option<usize>,
442    ) -> impl Stream<Item = DfResult<RecordBatch>> {
443        async_stream::try_stream! {
444            let limit = limit.unwrap_or(usize::MAX);
445            let mut coder = crate::convert::EthRawLogsToArrow::new();
446            let mut total = 0;
447
448            // TODO: Streaming/unbounded API
449            let mut log_stream = Box::pin(
450                crate::stream::RawLogsStream::paginate(rpc_client, filter, options, None)
451            );
452
453            while let Some(batch) = log_stream.try_next().await.map_err(|e| DataFusionError::External(e.into()))? {
454                if total >= limit {
455                    break;
456                }
457                let to_append = usize::min(batch.logs.len(), limit - total);
458                coder.append(&batch.logs[0..to_append]).map_err(|e| DataFusionError::External(e.into()))?;
459                total += to_append;
460
461                // Don't form batches that are too small
462                if coder.len() > 1_000 {
463                    let batch = if let Some(proj) = &projection {
464                        coder.finish().project(proj)?
465                    } else {
466                        coder.finish()
467                    };
468                    yield batch;
469                }
470            }
471
472            // Return at least one batch, even if empty
473            if !coder.is_empty() || total == 0 {
474                let batch = if let Some(proj) = &projection {
475                    coder.finish().project(proj)?
476                } else {
477                    coder.finish()
478                };
479                yield batch;
480            }
481        }
482    }
483}
484
485impl ExecutionPlan for EthGetLogs {
486    fn name(&self) -> &str {
487        Self::static_name()
488    }
489
490    fn as_any(&self) -> &dyn Any {
491        self
492    }
493
494    fn properties(&self) -> &PlanProperties {
495        &self.properties
496    }
497
498    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
499        // this is a leaf node and has no children
500        vec![]
501    }
502
503    fn with_new_children(
504        self: Arc<Self>,
505        _: Vec<Arc<dyn ExecutionPlan>>,
506    ) -> DfResult<Arc<dyn ExecutionPlan>> {
507        Ok(self)
508    }
509
510    fn execute(
511        &self,
512        partition: usize,
513        _context: Arc<TaskContext>,
514    ) -> DfResult<datafusion::execution::SendableRecordBatchStream> {
515        assert_eq!(partition, 0);
516
517        let stream = Self::execute_impl(
518            self.rpc_client.clone(),
519            self.filter.clone(),
520            self.stream_options.clone(),
521            self.projection.clone(),
522            self.limit,
523        );
524
525        let record_batch_stream =
526            RecordBatchStreamAdapter::new(self.projected_schema.clone(), stream);
527
528        Ok(Box::pin(record_batch_stream))
529    }
530}
531
532impl DisplayAs for EthGetLogs {
533    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
534        match t {
535            DisplayFormatType::Default | DisplayFormatType::Verbose => {
536                write!(f, "EthGetLogs: ")?;
537                if self.projection.is_none() {
538                    write!(f, "projection=[*]")?;
539                } else {
540                    let projection_str = self
541                        .projected_schema
542                        .fields
543                        .iter()
544                        .map(|f| f.name().as_str())
545                        .collect::<Vec<_>>()
546                        .join(", ");
547
548                    write!(f, "projection=[{projection_str}]")?;
549                }
550
551                let mut filters = Vec::new();
552
553                match self.filter.block_option {
554                    FilterBlockOption::Range {
555                        from_block,
556                        to_block,
557                    } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
558                    FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
559                }
560
561                if !self.filter.address.is_empty() {
562                    // Provide deterministic order
563                    let mut addrs: Vec<_> =
564                        self.filter.address.iter().map(|h| h.to_string()).collect();
565                    addrs.sort();
566                    filters.push(format!("address=[{}]", addrs.join(", ")));
567                }
568
569                for (i, t) in self
570                    .filter
571                    .topics
572                    .iter()
573                    .enumerate()
574                    .filter(|(_, t)| !t.is_empty())
575                {
576                    // Provide deterministic order
577                    let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
578                    topics.sort();
579                    filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
580                }
581
582                write!(f, ", filter=[{}]", filters.join(", "))?;
583
584                if let Some(limit) = self.limit {
585                    write!(f, ", limit={limit}")?;
586                }
587
588                Ok(())
589            }
590            DisplayFormatType::TreeRender => {
591                writeln!(f, "EthGetLogs")?;
592
593                if self.projection.is_none() {
594                    writeln!(f, "projection=[*]")?;
595                } else {
596                    let projection_str = self
597                        .projected_schema
598                        .fields
599                        .iter()
600                        .map(|f| f.name().as_str())
601                        .collect::<Vec<_>>()
602                        .join(", ");
603
604                    writeln!(f, "projection=[{projection_str}]")?;
605                }
606
607                let mut filters = Vec::new();
608
609                match self.filter.block_option {
610                    FilterBlockOption::Range {
611                        from_block,
612                        to_block,
613                    } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
614                    FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
615                }
616
617                if !self.filter.address.is_empty() {
618                    // Provide deterministic order
619                    let mut addrs: Vec<_> =
620                        self.filter.address.iter().map(|h| h.to_string()).collect();
621                    addrs.sort();
622                    filters.push(format!("address=[{}]", addrs.join(", ")));
623                }
624
625                for (i, t) in self
626                    .filter
627                    .topics
628                    .iter()
629                    .enumerate()
630                    .filter(|(_, t)| !t.is_empty())
631                {
632                    // Provide deterministic order
633                    let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
634                    topics.sort();
635                    filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
636                }
637
638                writeln!(f, "filter=[{}]", filters.join(", "))?;
639
640                if let Some(limit) = self.limit {
641                    writeln!(f, "limit={limit}")?;
642                }
643
644                Ok(())
645            }
646        }
647    }
648}
649
650///////////////////////////////////////////////////////////////////////////////////////////////////