1use alloy::primitives::{Address, B256};
2use alloy::providers::DynProvider;
3use alloy::rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption};
4use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
5use datafusion::error::{DataFusionError, Result as DfResult};
6use datafusion::execution::TaskContext;
7use datafusion::logical_expr::BinaryExpr;
8use datafusion::physical_expr::EquivalenceProperties;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
11use datafusion::{
12 arrow::{array::RecordBatch, datatypes::SchemaRef},
13 datasource::{TableProvider, TableType},
14 logical_expr::{Operator, TableProviderFilterPushDown},
15 physical_plan::ExecutionPlan,
16 prelude::*,
17 scalar::ScalarValue,
18};
19use futures::{Stream, TryStreamExt};
20use std::{any::Any, sync::Arc};
21
22use crate::config::EthProviderConfig;
23use crate::convert::Transcoder as _;
24use crate::stream::StreamOptions;
25use crate::utils::*;
26
27#[derive(Debug)]
32pub struct EthCatalog {
33 config: EthProviderConfig,
34 rpc_client: DynProvider<alloy::network::AnyNetwork>,
35}
36
37impl EthCatalog {
38 pub fn new(
39 config: EthProviderConfig,
40 rpc_client: DynProvider<alloy::network::AnyNetwork>,
41 ) -> Self {
42 Self { config, rpc_client }
43 }
44}
45
46impl CatalogProvider for EthCatalog {
47 fn as_any(&self) -> &dyn Any {
48 self
49 }
50
51 fn schema_names(&self) -> Vec<String> {
52 vec![self.config.schema_name.clone()]
53 }
54
55 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
56 if name != self.config.schema_name {
57 return None;
58 }
59
60 Some(Arc::new(EthSchema::new(
61 self.config.clone(),
62 self.rpc_client.clone(),
63 )))
64 }
65}
66
67#[derive(Debug)]
72pub struct EthSchema {
73 config: EthProviderConfig,
74 rpc_client: DynProvider<alloy::network::AnyNetwork>,
75}
76
77impl EthSchema {
78 pub fn new(
79 config: EthProviderConfig,
80 rpc_client: DynProvider<alloy::network::AnyNetwork>,
81 ) -> Self {
82 Self { config, rpc_client }
83 }
84}
85
86#[async_trait::async_trait]
87impl SchemaProvider for EthSchema {
88 fn as_any(&self) -> &dyn Any {
89 self
90 }
91
92 fn table_names(&self) -> Vec<String> {
93 vec!["logs".to_string()]
94 }
95
96 async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
97 match name {
98 "logs" => Ok(Some(Arc::new(EthLogsTable::new(
99 &self.config,
100 self.rpc_client.clone(),
101 )))),
102 _ => Ok(None),
103 }
104 }
105
106 fn table_exist(&self, name: &str) -> bool {
107 name == "logs"
108 }
109}
110
111#[derive(Debug)]
116pub struct EthLogsTable {
117 schema: SchemaRef,
118 rpc_client: DynProvider<alloy::network::AnyNetwork>,
119}
120
121impl EthLogsTable {
122 pub fn new(
123 config: &EthProviderConfig,
124 rpc_client: DynProvider<alloy::network::AnyNetwork>,
125 ) -> Self {
126 let encoder = crate::convert::EthRawLogsToArrow::new(&config.stream_options());
127 let schema = encoder.schema();
128
129 Self { schema, rpc_client }
130 }
131
132 fn apply_expr(filter: Filter, expr: &Expr) -> (TableProviderFilterPushDown, Filter) {
133 match expr {
134 Expr::BinaryExpr(e) => match (&*e.left, e.op, &*e.right) {
135 (Expr::Column(left), op, right) => match (left.name.as_str(), op, right) {
136 (
137 "address",
138 Operator::Eq,
139 Expr::Literal(ScalarValue::Binary(Some(v)), None),
140 ) => (
141 TableProviderFilterPushDown::Exact,
142 filter.address(Address::from_slice(&v[..])),
143 ),
144 ("topic0", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
145 (
146 TableProviderFilterPushDown::Exact,
147 filter.event_signature(B256::from_slice(&v[..])),
148 )
149 }
150 ("topic1", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
151 (
152 TableProviderFilterPushDown::Exact,
153 filter.topic1(B256::from_slice(&v[..])),
154 )
155 }
156 ("topic2", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
157 (
158 TableProviderFilterPushDown::Exact,
159 filter.topic2(B256::from_slice(&v[..])),
160 )
161 }
162 ("topic3", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
163 (
164 TableProviderFilterPushDown::Exact,
165 filter.topic3(B256::from_slice(&v[..])),
166 )
167 }
168 (
169 "block_number",
170 Operator::Eq,
171 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
172 ) => (
173 TableProviderFilterPushDown::Exact,
174 filter.union((*v).into(), (*v).into()),
175 ),
176 (
177 "block_number",
178 Operator::Gt,
179 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
180 ) => (
181 TableProviderFilterPushDown::Exact,
182 filter.union((*v + 1).into(), BlockNumberOrTag::Latest),
183 ),
184 (
185 "block_number",
186 Operator::GtEq,
187 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
188 ) => (
189 TableProviderFilterPushDown::Exact,
190 filter.union((*v).into(), BlockNumberOrTag::Latest),
191 ),
192 (
193 "block_number",
194 Operator::Lt,
195 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
196 ) => (
197 TableProviderFilterPushDown::Exact,
198 filter.union(BlockNumberOrTag::Earliest, (*v - 1).into()),
199 ),
200 (
201 "block_number",
202 Operator::LtEq,
203 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
204 ) => (
205 TableProviderFilterPushDown::Exact,
206 filter.union(BlockNumberOrTag::Earliest, (*v).into()),
207 ),
208 _ => (TableProviderFilterPushDown::Unsupported, filter),
209 },
210 (Expr::BinaryExpr(_), Operator::Or, Expr::BinaryExpr(_)) => {
212 match Self::collect_or_group(e) {
213 (TableProviderFilterPushDown::Exact, Some(col), values) => {
214 match col.name.as_str() {
215 "address" => (
216 TableProviderFilterPushDown::Exact,
217 filter.address(
218 values
219 .into_iter()
220 .map(|v| Address::from_slice(&v[..]))
221 .collect::<Vec<_>>(),
222 ),
223 ),
224 "topic0" => (
225 TableProviderFilterPushDown::Exact,
226 filter.event_signature(
227 values
228 .into_iter()
229 .map(|v| B256::from_slice(&v[..]))
230 .collect::<Vec<_>>(),
231 ),
232 ),
233 "topic1" => (
234 TableProviderFilterPushDown::Exact,
235 filter.topic1(
236 values
237 .into_iter()
238 .map(|v| B256::from_slice(&v[..]))
239 .collect::<Vec<_>>(),
240 ),
241 ),
242 "topic2" => (
243 TableProviderFilterPushDown::Exact,
244 filter.topic2(
245 values
246 .into_iter()
247 .map(|v| B256::from_slice(&v[..]))
248 .collect::<Vec<_>>(),
249 ),
250 ),
251 "topic3" => (
252 TableProviderFilterPushDown::Exact,
253 filter.topic3(
254 values
255 .into_iter()
256 .map(|v| B256::from_slice(&v[..]))
257 .collect::<Vec<_>>(),
258 ),
259 ),
260 _ => (TableProviderFilterPushDown::Unsupported, filter),
261 }
262 }
263 _ => (TableProviderFilterPushDown::Unsupported, filter),
264 }
265 }
266 _ => (TableProviderFilterPushDown::Unsupported, filter),
267 },
268 _ => (TableProviderFilterPushDown::Unsupported, filter),
269 }
270 }
271
272 fn collect_or_group(
277 expr: &BinaryExpr,
278 ) -> (TableProviderFilterPushDown, Option<&Column>, Vec<&Vec<u8>>) {
279 match (&*expr.left, expr.op, &*expr.right) {
280 (
281 Expr::Column(col),
282 Operator::Eq,
283 Expr::Literal(ScalarValue::Binary(Some(val)), None),
284 ) => (TableProviderFilterPushDown::Exact, Some(col), vec![val]),
285 (Expr::BinaryExpr(left), Operator::Or, Expr::BinaryExpr(right)) => {
286 let mut left = Self::collect_or_group(left);
288 let mut right = Self::collect_or_group(right);
289 if left.0 == right.0
290 && right.0 == TableProviderFilterPushDown::Exact
291 && left.1 == right.1
292 {
293 left.2.append(&mut right.2);
294 (TableProviderFilterPushDown::Exact, left.1, left.2)
295 } else {
296 (TableProviderFilterPushDown::Unsupported, None, Vec::new())
297 }
298 }
299 _ => (TableProviderFilterPushDown::Unsupported, None, Vec::new()),
300 }
301 }
302}
303
304#[async_trait::async_trait]
305impl TableProvider for EthLogsTable {
306 fn as_any(&self) -> &dyn Any {
307 self
308 }
309
310 fn table_type(&self) -> TableType {
311 TableType::Base
312 }
313
314 fn schema(&self) -> SchemaRef {
315 self.schema.clone()
316 }
317
318 fn supports_filters_pushdown(
319 &self,
320 filters: &[&Expr],
321 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
322 tracing::debug!(?filters, "EthLogs: performing filter pushdown");
323
324 let mut filter = EthProviderConfig::default().default_filter();
325 let mut res = Vec::new();
326
327 for expr in filters {
328 let (support, new_filter) = Self::apply_expr(filter, expr);
329 filter = new_filter;
330 res.push(support);
331 }
332
333 tracing::debug!(?filter, "EthLogs: resulting filter");
334 Ok(res)
335 }
336
337 async fn scan(
338 &self,
339 state: &dyn Session,
340 mut projection: Option<&Vec<usize>>,
341 filters: &[Expr],
342 limit: Option<usize>,
343 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
344 let config = state
345 .config_options()
346 .extensions
347 .get::<EthProviderConfig>()
348 .cloned()
349 .unwrap_or_default();
350
351 tracing::debug!(
352 ?config,
353 ?projection,
354 ?filters,
355 ?limit,
356 "EthLogs: creating execution plan"
357 );
358
359 if let Some(proj) = projection {
361 let is_redundant = proj.len() == self.schema.fields.len()
362 && proj[0] == 0
363 && proj
364 .iter()
365 .cloned()
366 .reduce(|a, b| if a + 1 == b { b } else { a })
367 == Some(self.schema.fields.len() - 1);
368 if is_redundant {
369 projection = None
370 }
371 }
372
373 let schema = if let Some(proj) = projection {
374 Arc::new(self.schema.project(proj)?)
375 } else {
376 self.schema.clone()
377 };
378
379 let mut filter = config.default_filter();
381
382 for expr in filters {
384 let (support, new_filter) = Self::apply_expr(filter, expr);
385 assert_eq!(support, TableProviderFilterPushDown::Exact);
386 filter = new_filter;
387 }
388
389 Ok(Arc::new(EthGetLogs::new(
390 self.rpc_client.clone(),
391 schema,
392 projection.cloned(),
393 filter,
394 config.stream_options(),
395 limit,
396 )))
397 }
398}
399
400#[derive(Debug, Clone)]
403pub struct EthGetLogs {
404 projected_schema: SchemaRef,
405 projection: Option<Vec<usize>>,
406 rpc_client: DynProvider<alloy::network::AnyNetwork>,
407 filter: Filter,
408 stream_options: StreamOptions,
409 limit: Option<usize>,
410 properties: PlanProperties,
411}
412
413impl EthGetLogs {
414 pub fn new(
415 rpc_client: DynProvider<alloy::network::AnyNetwork>,
416 projected_schema: SchemaRef,
417 projection: Option<Vec<usize>>,
418 filter: Filter,
419 stream_options: StreamOptions,
420 limit: Option<usize>,
421 ) -> Self {
422 Self {
423 projected_schema: projected_schema.clone(),
424 projection,
425 rpc_client,
426 filter,
427 stream_options,
428 limit,
429 properties: PlanProperties::new(
430 EquivalenceProperties::new(projected_schema),
431 datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
432 datafusion::physical_plan::execution_plan::EmissionType::Incremental,
433 datafusion::physical_plan::execution_plan::Boundedness::Bounded,
435 )
436 .with_scheduling_type(
437 datafusion::physical_plan::execution_plan::SchedulingType::Cooperative,
441 ),
442 }
443 }
444
445 pub fn filter(&self) -> &Filter {
446 &self.filter
447 }
448
449 pub fn with_filter(mut self, filter: Filter) -> Self {
450 self.filter = filter;
451 self
452 }
453
454 fn execute_impl(
455 rpc_client: DynProvider<alloy::network::AnyNetwork>,
456 filter: Filter,
457 options: StreamOptions,
458 projection: Option<Vec<usize>>,
459 limit: Option<usize>,
460 ) -> impl Stream<Item = DfResult<RecordBatch>> {
461 async_stream::try_stream! {
462 let limit = limit.unwrap_or(usize::MAX);
463 let mut coder = crate::convert::EthRawLogsToArrow::new(&options);
464 let mut total = 0;
465
466 let mut log_stream = Box::pin(
468 crate::stream::RawLogsStream::paginate(rpc_client.clone(), filter, options, None)
469 );
470
471 while let Some(batch) = log_stream.try_next().await.map_err(|e| DataFusionError::External(e.into()))? {
472 if total >= limit {
473 break;
474 }
475 let to_append = usize::min(batch.logs.len(), limit - total);
476 coder.append(&batch.logs[0..to_append]).map_err(|e| DataFusionError::External(e.into()))?;
477 total += to_append;
478
479 if coder.len() > 1_000 {
481 let batch = if let Some(proj) = &projection {
482 coder.finish().project(proj)?
483 } else {
484 coder.finish()
485 };
486 yield batch;
487 }
488 }
489
490 if !coder.is_empty() || total == 0 {
492 let batch = if let Some(proj) = &projection {
493 coder.finish().project(proj)?
494 } else {
495 coder.finish()
496 };
497 yield batch;
498 }
499 }
500 }
501}
502
503impl ExecutionPlan for EthGetLogs {
504 fn name(&self) -> &str {
505 Self::static_name()
506 }
507
508 fn as_any(&self) -> &dyn Any {
509 self
510 }
511
512 fn properties(&self) -> &PlanProperties {
513 &self.properties
514 }
515
516 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
517 vec![]
519 }
520
521 fn with_new_children(
522 self: Arc<Self>,
523 _: Vec<Arc<dyn ExecutionPlan>>,
524 ) -> DfResult<Arc<dyn ExecutionPlan>> {
525 Ok(self)
526 }
527
528 fn execute(
529 &self,
530 partition: usize,
531 _context: Arc<TaskContext>,
532 ) -> DfResult<datafusion::execution::SendableRecordBatchStream> {
533 assert_eq!(partition, 0);
534
535 let stream = Self::execute_impl(
536 self.rpc_client.clone(),
537 self.filter.clone(),
538 self.stream_options.clone(),
539 self.projection.clone(),
540 self.limit,
541 );
542
543 let record_batch_stream =
544 RecordBatchStreamAdapter::new(self.projected_schema.clone(), stream);
545
546 Ok(Box::pin(record_batch_stream))
547 }
548}
549
550impl DisplayAs for EthGetLogs {
551 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
552 match t {
553 DisplayFormatType::Default | DisplayFormatType::Verbose => {
554 write!(f, "EthGetLogs: ")?;
555 if self.projection.is_none() {
556 write!(f, "projection=[*]")?;
557 } else {
558 let projection_str = self
559 .projected_schema
560 .fields
561 .iter()
562 .map(|f| f.name().as_str())
563 .collect::<Vec<_>>()
564 .join(", ");
565
566 write!(f, "projection=[{projection_str}]")?;
567 }
568
569 let mut filters = Vec::new();
570
571 match self.filter.block_option {
572 FilterBlockOption::Range {
573 from_block,
574 to_block,
575 } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
576 FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
577 }
578
579 if !self.filter.address.is_empty() {
580 let mut addrs: Vec<_> =
582 self.filter.address.iter().map(|h| h.to_string()).collect();
583 addrs.sort();
584 filters.push(format!("address=[{}]", addrs.join(", ")));
585 }
586
587 for (i, t) in self
588 .filter
589 .topics
590 .iter()
591 .enumerate()
592 .filter(|(_, t)| !t.is_empty())
593 {
594 let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
596 topics.sort();
597 filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
598 }
599
600 write!(f, ", filter=[{}]", filters.join(", "))?;
601
602 if let Some(limit) = self.limit {
603 write!(f, ", limit={limit}")?;
604 }
605
606 Ok(())
607 }
608 DisplayFormatType::TreeRender => {
609 writeln!(f, "EthGetLogs")?;
610
611 if self.projection.is_none() {
612 writeln!(f, "projection=[*]")?;
613 } else {
614 let projection_str = self
615 .projected_schema
616 .fields
617 .iter()
618 .map(|f| f.name().as_str())
619 .collect::<Vec<_>>()
620 .join(", ");
621
622 writeln!(f, "projection=[{projection_str}]")?;
623 }
624
625 let mut filters = Vec::new();
626
627 match self.filter.block_option {
628 FilterBlockOption::Range {
629 from_block,
630 to_block,
631 } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
632 FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
633 }
634
635 if !self.filter.address.is_empty() {
636 let mut addrs: Vec<_> =
638 self.filter.address.iter().map(|h| h.to_string()).collect();
639 addrs.sort();
640 filters.push(format!("address=[{}]", addrs.join(", ")));
641 }
642
643 for (i, t) in self
644 .filter
645 .topics
646 .iter()
647 .enumerate()
648 .filter(|(_, t)| !t.is_empty())
649 {
650 let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
652 topics.sort();
653 filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
654 }
655
656 writeln!(f, "filter=[{}]", filters.join(", "))?;
657
658 if let Some(limit) = self.limit {
659 writeln!(f, "limit={limit}")?;
660 }
661
662 Ok(())
663 }
664 }
665 }
666}
667
668