1use alloy::primitives::{Address, B256};
2use alloy::providers::DynProvider;
3use alloy::rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption};
4use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
5use datafusion::error::{DataFusionError, Result as DfResult};
6use datafusion::execution::TaskContext;
7use datafusion::logical_expr::BinaryExpr;
8use datafusion::physical_expr::EquivalenceProperties;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
11use datafusion::{
12 arrow::{array::RecordBatch, datatypes::SchemaRef},
13 datasource::{TableProvider, TableType},
14 logical_expr::{Operator, TableProviderFilterPushDown},
15 physical_plan::ExecutionPlan,
16 prelude::*,
17 scalar::ScalarValue,
18};
19use futures::{Stream, TryStreamExt};
20use std::{any::Any, sync::Arc};
21
22use crate::config::EthProviderConfig;
23use crate::convert::Transcoder as _;
24use crate::stream::StreamOptions;
25use crate::utils::*;
26
27#[derive(Debug)]
32pub struct EthCatalog {
33 rpc_client: DynProvider,
34}
35
36impl EthCatalog {
37 pub fn new(rpc_client: DynProvider) -> Self {
38 Self { rpc_client }
39 }
40}
41
42impl CatalogProvider for EthCatalog {
43 fn as_any(&self) -> &dyn Any {
44 self
45 }
46
47 fn schema_names(&self) -> Vec<String> {
48 vec!["eth".to_string()]
49 }
50
51 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
52 match name {
53 "eth" => Some(Arc::new(EthSchema::new(self.rpc_client.clone()))),
54 _ => None,
55 }
56 }
57}
58
59#[derive(Debug)]
64pub struct EthSchema {
65 rpc_client: DynProvider,
66}
67
68impl EthSchema {
69 pub fn new(rpc_client: DynProvider) -> Self {
70 Self { rpc_client }
71 }
72}
73
74#[async_trait::async_trait]
75impl SchemaProvider for EthSchema {
76 fn as_any(&self) -> &dyn Any {
77 self
78 }
79
80 fn table_names(&self) -> Vec<String> {
81 vec!["logs".to_string()]
82 }
83
84 async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
85 match name {
86 "logs" => Ok(Some(Arc::new(EthLogsTable::new(self.rpc_client.clone())))),
87 _ => Ok(None),
88 }
89 }
90
91 fn table_exist(&self, name: &str) -> bool {
92 name == "logs"
93 }
94}
95
96#[derive(Debug)]
101pub struct EthLogsTable {
102 schema: SchemaRef,
103 rpc_client: DynProvider,
104}
105
106impl EthLogsTable {
107 pub fn new(rpc_client: DynProvider) -> Self {
108 Self {
109 schema: crate::convert::EthRawLogsToArrow::new().schema(),
110 rpc_client,
111 }
112 }
113
114 fn apply_expr(filter: Filter, expr: &Expr) -> (TableProviderFilterPushDown, Filter) {
115 match expr {
116 Expr::BinaryExpr(e) => match (&*e.left, e.op, &*e.right) {
117 (Expr::Column(left), op, right) => match (left.name.as_str(), op, right) {
118 (
119 "address",
120 Operator::Eq,
121 Expr::Literal(ScalarValue::Binary(Some(v)), None),
122 ) => (
123 TableProviderFilterPushDown::Exact,
124 filter.address(Address::from_slice(&v[..])),
125 ),
126 ("topic0", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
127 (
128 TableProviderFilterPushDown::Exact,
129 filter.event_signature(B256::from_slice(&v[..])),
130 )
131 }
132 ("topic1", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
133 (
134 TableProviderFilterPushDown::Exact,
135 filter.topic1(B256::from_slice(&v[..])),
136 )
137 }
138 ("topic2", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
139 (
140 TableProviderFilterPushDown::Exact,
141 filter.topic2(B256::from_slice(&v[..])),
142 )
143 }
144 ("topic3", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)), None)) => {
145 (
146 TableProviderFilterPushDown::Exact,
147 filter.topic3(B256::from_slice(&v[..])),
148 )
149 }
150 (
151 "block_number",
152 Operator::Eq,
153 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
154 ) => (
155 TableProviderFilterPushDown::Exact,
156 filter.union((*v).into(), (*v).into()),
157 ),
158 (
159 "block_number",
160 Operator::Gt,
161 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
162 ) => (
163 TableProviderFilterPushDown::Exact,
164 filter.union((*v + 1).into(), BlockNumberOrTag::Latest),
165 ),
166 (
167 "block_number",
168 Operator::GtEq,
169 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
170 ) => (
171 TableProviderFilterPushDown::Exact,
172 filter.union((*v).into(), BlockNumberOrTag::Latest),
173 ),
174 (
175 "block_number",
176 Operator::Lt,
177 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
178 ) => (
179 TableProviderFilterPushDown::Exact,
180 filter.union(BlockNumberOrTag::Earliest, (*v - 1).into()),
181 ),
182 (
183 "block_number",
184 Operator::LtEq,
185 Expr::Literal(ScalarValue::UInt64(Some(v)), None),
186 ) => (
187 TableProviderFilterPushDown::Exact,
188 filter.union(BlockNumberOrTag::Earliest, (*v).into()),
189 ),
190 _ => (TableProviderFilterPushDown::Unsupported, filter),
191 },
192 (Expr::BinaryExpr(_), Operator::Or, Expr::BinaryExpr(_)) => {
194 match Self::collect_or_group(e) {
195 (TableProviderFilterPushDown::Exact, Some(col), values) => {
196 match col.name.as_str() {
197 "address" => (
198 TableProviderFilterPushDown::Exact,
199 filter.address(
200 values
201 .into_iter()
202 .map(|v| Address::from_slice(&v[..]))
203 .collect::<Vec<_>>(),
204 ),
205 ),
206 "topic0" => (
207 TableProviderFilterPushDown::Exact,
208 filter.event_signature(
209 values
210 .into_iter()
211 .map(|v| B256::from_slice(&v[..]))
212 .collect::<Vec<_>>(),
213 ),
214 ),
215 "topic1" => (
216 TableProviderFilterPushDown::Exact,
217 filter.topic1(
218 values
219 .into_iter()
220 .map(|v| B256::from_slice(&v[..]))
221 .collect::<Vec<_>>(),
222 ),
223 ),
224 "topic2" => (
225 TableProviderFilterPushDown::Exact,
226 filter.topic2(
227 values
228 .into_iter()
229 .map(|v| B256::from_slice(&v[..]))
230 .collect::<Vec<_>>(),
231 ),
232 ),
233 "topic3" => (
234 TableProviderFilterPushDown::Exact,
235 filter.topic3(
236 values
237 .into_iter()
238 .map(|v| B256::from_slice(&v[..]))
239 .collect::<Vec<_>>(),
240 ),
241 ),
242 _ => (TableProviderFilterPushDown::Unsupported, filter),
243 }
244 }
245 _ => (TableProviderFilterPushDown::Unsupported, filter),
246 }
247 }
248 _ => (TableProviderFilterPushDown::Unsupported, filter),
249 },
250 _ => (TableProviderFilterPushDown::Unsupported, filter),
251 }
252 }
253
254 fn collect_or_group(
259 expr: &BinaryExpr,
260 ) -> (TableProviderFilterPushDown, Option<&Column>, Vec<&Vec<u8>>) {
261 match (&*expr.left, expr.op, &*expr.right) {
262 (
263 Expr::Column(col),
264 Operator::Eq,
265 Expr::Literal(ScalarValue::Binary(Some(val)), None),
266 ) => (TableProviderFilterPushDown::Exact, Some(col), vec![val]),
267 (Expr::BinaryExpr(left), Operator::Or, Expr::BinaryExpr(right)) => {
268 let mut left = Self::collect_or_group(left);
270 let mut right = Self::collect_or_group(right);
271 if left.0 == right.0
272 && right.0 == TableProviderFilterPushDown::Exact
273 && left.1 == right.1
274 {
275 left.2.append(&mut right.2);
276 (TableProviderFilterPushDown::Exact, left.1, left.2)
277 } else {
278 (TableProviderFilterPushDown::Unsupported, None, Vec::new())
279 }
280 }
281 _ => (TableProviderFilterPushDown::Unsupported, None, Vec::new()),
282 }
283 }
284}
285
286#[async_trait::async_trait]
287impl TableProvider for EthLogsTable {
288 fn as_any(&self) -> &dyn Any {
289 self
290 }
291
292 fn table_type(&self) -> TableType {
293 TableType::Base
294 }
295
296 fn schema(&self) -> SchemaRef {
297 self.schema.clone()
298 }
299
300 fn supports_filters_pushdown(
301 &self,
302 filters: &[&Expr],
303 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
304 tracing::debug!(?filters, "EthLogs: performing filter pushdown");
305
306 let mut filter = EthProviderConfig::default().default_filter();
307 let mut res = Vec::new();
308
309 for expr in filters {
310 let (support, new_filter) = Self::apply_expr(filter, expr);
311 filter = new_filter;
312 res.push(support);
313 }
314
315 tracing::debug!(?filter, "EthLogs: resulting filter");
316 Ok(res)
317 }
318
319 async fn scan(
320 &self,
321 state: &dyn Session,
322 mut projection: Option<&Vec<usize>>,
323 filters: &[Expr],
324 limit: Option<usize>,
325 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
326 let config = state
327 .config_options()
328 .extensions
329 .get::<EthProviderConfig>()
330 .cloned()
331 .unwrap_or_default();
332
333 tracing::debug!(
334 ?config,
335 ?projection,
336 ?filters,
337 ?limit,
338 "EthLogs: creating execution plan"
339 );
340
341 if let Some(proj) = projection {
343 let is_redundant = proj.len() == self.schema.fields.len()
344 && proj[0] == 0
345 && proj
346 .iter()
347 .cloned()
348 .reduce(|a, b| if a + 1 == b { b } else { a })
349 == Some(self.schema.fields.len() - 1);
350 if is_redundant {
351 projection = None
352 }
353 }
354
355 let schema = if let Some(proj) = projection {
356 Arc::new(self.schema.project(proj)?)
357 } else {
358 self.schema.clone()
359 };
360
361 let mut filter = config.default_filter();
363
364 for expr in filters {
366 let (support, new_filter) = Self::apply_expr(filter, expr);
367 assert_eq!(support, TableProviderFilterPushDown::Exact);
368 filter = new_filter;
369 }
370
371 Ok(Arc::new(EthGetLogs::new(
372 self.rpc_client.clone(),
373 schema,
374 projection.cloned(),
375 filter,
376 config.stream_options(),
377 limit,
378 )))
379 }
380}
381
382#[derive(Debug, Clone)]
385pub struct EthGetLogs {
386 projected_schema: SchemaRef,
387 projection: Option<Vec<usize>>,
388 rpc_client: DynProvider,
389 filter: Filter,
390 stream_options: StreamOptions,
391 limit: Option<usize>,
392 properties: PlanProperties,
393}
394
395impl EthGetLogs {
396 pub fn new(
397 rpc_client: DynProvider,
398 projected_schema: SchemaRef,
399 projection: Option<Vec<usize>>,
400 filter: Filter,
401 stream_options: StreamOptions,
402 limit: Option<usize>,
403 ) -> Self {
404 Self {
405 projected_schema: projected_schema.clone(),
406 projection,
407 rpc_client,
408 filter,
409 stream_options,
410 limit,
411 properties: PlanProperties::new(
412 EquivalenceProperties::new(projected_schema),
413 datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
414 datafusion::physical_plan::execution_plan::EmissionType::Incremental,
415 datafusion::physical_plan::execution_plan::Boundedness::Bounded,
417 )
418 .with_scheduling_type(
419 datafusion::physical_plan::execution_plan::SchedulingType::Cooperative,
423 ),
424 }
425 }
426
427 pub fn filter(&self) -> &Filter {
428 &self.filter
429 }
430
431 pub fn with_filter(mut self, filter: Filter) -> Self {
432 self.filter = filter;
433 self
434 }
435
436 fn execute_impl(
437 rpc_client: DynProvider,
438 filter: Filter,
439 options: StreamOptions,
440 projection: Option<Vec<usize>>,
441 limit: Option<usize>,
442 ) -> impl Stream<Item = DfResult<RecordBatch>> {
443 async_stream::try_stream! {
444 let limit = limit.unwrap_or(usize::MAX);
445 let mut coder = crate::convert::EthRawLogsToArrow::new();
446 let mut total = 0;
447
448 let mut log_stream = Box::pin(
450 crate::stream::RawLogsStream::paginate(rpc_client, filter, options, None)
451 );
452
453 while let Some(batch) = log_stream.try_next().await.map_err(|e| DataFusionError::External(e.into()))? {
454 if total >= limit {
455 break;
456 }
457 let to_append = usize::min(batch.logs.len(), limit - total);
458 coder.append(&batch.logs[0..to_append]).map_err(|e| DataFusionError::External(e.into()))?;
459 total += to_append;
460
461 if coder.len() > 1_000 {
463 let batch = if let Some(proj) = &projection {
464 coder.finish().project(proj)?
465 } else {
466 coder.finish()
467 };
468 yield batch;
469 }
470 }
471
472 if !coder.is_empty() || total == 0 {
474 let batch = if let Some(proj) = &projection {
475 coder.finish().project(proj)?
476 } else {
477 coder.finish()
478 };
479 yield batch;
480 }
481 }
482 }
483}
484
485impl ExecutionPlan for EthGetLogs {
486 fn name(&self) -> &str {
487 Self::static_name()
488 }
489
490 fn as_any(&self) -> &dyn Any {
491 self
492 }
493
494 fn properties(&self) -> &PlanProperties {
495 &self.properties
496 }
497
498 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
499 vec![]
501 }
502
503 fn with_new_children(
504 self: Arc<Self>,
505 _: Vec<Arc<dyn ExecutionPlan>>,
506 ) -> DfResult<Arc<dyn ExecutionPlan>> {
507 Ok(self)
508 }
509
510 fn execute(
511 &self,
512 partition: usize,
513 _context: Arc<TaskContext>,
514 ) -> DfResult<datafusion::execution::SendableRecordBatchStream> {
515 assert_eq!(partition, 0);
516
517 let stream = Self::execute_impl(
518 self.rpc_client.clone(),
519 self.filter.clone(),
520 self.stream_options.clone(),
521 self.projection.clone(),
522 self.limit,
523 );
524
525 let record_batch_stream =
526 RecordBatchStreamAdapter::new(self.projected_schema.clone(), stream);
527
528 Ok(Box::pin(record_batch_stream))
529 }
530}
531
532impl DisplayAs for EthGetLogs {
533 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
534 match t {
535 DisplayFormatType::Default | DisplayFormatType::Verbose => {
536 write!(f, "EthGetLogs: ")?;
537 if self.projection.is_none() {
538 write!(f, "projection=[*]")?;
539 } else {
540 let projection_str = self
541 .projected_schema
542 .fields
543 .iter()
544 .map(|f| f.name().as_str())
545 .collect::<Vec<_>>()
546 .join(", ");
547
548 write!(f, "projection=[{projection_str}]")?;
549 }
550
551 let mut filters = Vec::new();
552
553 match self.filter.block_option {
554 FilterBlockOption::Range {
555 from_block,
556 to_block,
557 } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
558 FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
559 }
560
561 if !self.filter.address.is_empty() {
562 let mut addrs: Vec<_> =
564 self.filter.address.iter().map(|h| h.to_string()).collect();
565 addrs.sort();
566 filters.push(format!("address=[{}]", addrs.join(", ")));
567 }
568
569 for (i, t) in self
570 .filter
571 .topics
572 .iter()
573 .enumerate()
574 .filter(|(_, t)| !t.is_empty())
575 {
576 let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
578 topics.sort();
579 filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
580 }
581
582 write!(f, ", filter=[{}]", filters.join(", "))?;
583
584 if let Some(limit) = self.limit {
585 write!(f, ", limit={limit}")?;
586 }
587
588 Ok(())
589 }
590 DisplayFormatType::TreeRender => {
591 writeln!(f, "EthGetLogs")?;
592
593 if self.projection.is_none() {
594 writeln!(f, "projection=[*]")?;
595 } else {
596 let projection_str = self
597 .projected_schema
598 .fields
599 .iter()
600 .map(|f| f.name().as_str())
601 .collect::<Vec<_>>()
602 .join(", ");
603
604 writeln!(f, "projection=[{projection_str}]")?;
605 }
606
607 let mut filters = Vec::new();
608
609 match self.filter.block_option {
610 FilterBlockOption::Range {
611 from_block,
612 to_block,
613 } => filters.push(format!("block_number=[{from_block:?}, {to_block:?}]")),
614 FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={h}")),
615 }
616
617 if !self.filter.address.is_empty() {
618 let mut addrs: Vec<_> =
620 self.filter.address.iter().map(|h| h.to_string()).collect();
621 addrs.sort();
622 filters.push(format!("address=[{}]", addrs.join(", ")));
623 }
624
625 for (i, t) in self
626 .filter
627 .topics
628 .iter()
629 .enumerate()
630 .filter(|(_, t)| !t.is_empty())
631 {
632 let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
634 topics.sort();
635 filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
636 }
637
638 writeln!(f, "filter=[{}]", filters.join(", "))?;
639
640 if let Some(limit) = self.limit {
641 writeln!(f, "limit={limit}")?;
642 }
643
644 Ok(())
645 }
646 }
647 }
648}
649
650