1use alloy::primitives::{Address, B256};
2use alloy::providers::DynProvider;
3use alloy::rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption};
4use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
5use datafusion::error::{DataFusionError, Result as DfResult};
6use datafusion::execution::TaskContext;
7use datafusion::logical_expr::BinaryExpr;
8use datafusion::physical_expr::EquivalenceProperties;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
11use datafusion::{
12 arrow::{array::RecordBatch, datatypes::SchemaRef},
13 datasource::{TableProvider, TableType},
14 logical_expr::{Operator, TableProviderFilterPushDown},
15 physical_plan::ExecutionPlan,
16 prelude::*,
17 scalar::ScalarValue,
18};
19use futures::{Stream, TryStreamExt};
20use std::{any::Any, sync::Arc};
21
22use crate::config::EthProviderConfig;
23use crate::convert::Transcoder as _;
24use crate::stream::StreamOptions;
25use crate::utils::*;
26
27#[derive(Debug)]
32pub struct EthCatalog {
33 rpc_client: DynProvider,
34}
35
36impl EthCatalog {
37 pub fn new(rpc_client: DynProvider) -> Self {
38 Self { rpc_client }
39 }
40}
41
42impl CatalogProvider for EthCatalog {
43 fn as_any(&self) -> &dyn Any {
44 self
45 }
46
47 fn schema_names(&self) -> Vec<String> {
48 vec!["eth".to_string()]
49 }
50
51 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
52 match name {
53 "eth" => Some(Arc::new(EthSchema::new(self.rpc_client.clone()))),
54 _ => None,
55 }
56 }
57}
58
59#[derive(Debug)]
64pub struct EthSchema {
65 rpc_client: DynProvider,
66}
67
68impl EthSchema {
69 pub fn new(rpc_client: DynProvider) -> Self {
70 Self { rpc_client }
71 }
72}
73
74#[async_trait::async_trait]
75impl SchemaProvider for EthSchema {
76 fn as_any(&self) -> &dyn Any {
77 self
78 }
79
80 fn table_names(&self) -> Vec<String> {
81 vec!["logs".to_string()]
82 }
83
84 async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
85 match name {
86 "logs" => Ok(Some(Arc::new(EthLogsTable::new(self.rpc_client.clone())))),
87 _ => Ok(None),
88 }
89 }
90
91 fn table_exist(&self, name: &str) -> bool {
92 name == "logs"
93 }
94}
95
96#[derive(Debug)]
101pub struct EthLogsTable {
102 schema: SchemaRef,
103 rpc_client: DynProvider,
104}
105
106impl EthLogsTable {
107 pub fn new(rpc_client: DynProvider) -> Self {
108 Self {
109 schema: crate::convert::EthRawLogsToArrow::new().schema(),
110 rpc_client,
111 }
112 }
113
114 fn apply_expr(filter: Filter, expr: &Expr) -> (TableProviderFilterPushDown, Filter) {
115 match expr {
116 Expr::BinaryExpr(e) => match (&*e.left, e.op, &*e.right) {
117 (Expr::Column(left), op, right) => match (left.name.as_str(), op, right) {
118 ("address", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
119 TableProviderFilterPushDown::Exact,
120 filter.address(Address::from_slice(&v[..])),
121 ),
122 ("topic0", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
123 TableProviderFilterPushDown::Exact,
124 filter.event_signature(B256::from_slice(&v[..])),
125 ),
126 ("topic1", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
127 TableProviderFilterPushDown::Exact,
128 filter.topic1(B256::from_slice(&v[..])),
129 ),
130 ("topic2", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
131 TableProviderFilterPushDown::Exact,
132 filter.topic2(B256::from_slice(&v[..])),
133 ),
134 ("topic3", Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(v)))) => (
135 TableProviderFilterPushDown::Exact,
136 filter.topic3(B256::from_slice(&v[..])),
137 ),
138 ("block_number", Operator::Eq, Expr::Literal(ScalarValue::UInt64(Some(v)))) => {
139 (
140 TableProviderFilterPushDown::Exact,
141 filter.union((*v).into(), (*v).into()),
142 )
143 }
144 ("block_number", Operator::Gt, Expr::Literal(ScalarValue::UInt64(Some(v)))) => {
145 (
146 TableProviderFilterPushDown::Exact,
147 filter.union((*v + 1).into(), BlockNumberOrTag::Latest),
148 )
149 }
150 (
151 "block_number",
152 Operator::GtEq,
153 Expr::Literal(ScalarValue::UInt64(Some(v))),
154 ) => (
155 TableProviderFilterPushDown::Exact,
156 filter.union((*v).into(), BlockNumberOrTag::Latest),
157 ),
158 ("block_number", Operator::Lt, Expr::Literal(ScalarValue::UInt64(Some(v)))) => {
159 (
160 TableProviderFilterPushDown::Exact,
161 filter.union(BlockNumberOrTag::Earliest, (*v - 1).into()),
162 )
163 }
164 (
165 "block_number",
166 Operator::LtEq,
167 Expr::Literal(ScalarValue::UInt64(Some(v))),
168 ) => (
169 TableProviderFilterPushDown::Exact,
170 filter.union(BlockNumberOrTag::Earliest, (*v).into()),
171 ),
172 _ => (TableProviderFilterPushDown::Unsupported, filter),
173 },
174 (Expr::BinaryExpr(_), Operator::Or, Expr::BinaryExpr(_)) => {
176 match Self::collect_or_group(e) {
177 (TableProviderFilterPushDown::Exact, Some(col), values) => {
178 match col.name.as_str() {
179 "address" => (
180 TableProviderFilterPushDown::Exact,
181 filter.address(
182 values
183 .into_iter()
184 .map(|v| Address::from_slice(&v[..]))
185 .collect::<Vec<_>>(),
186 ),
187 ),
188 "topic0" => (
189 TableProviderFilterPushDown::Exact,
190 filter.event_signature(
191 values
192 .into_iter()
193 .map(|v| B256::from_slice(&v[..]))
194 .collect::<Vec<_>>(),
195 ),
196 ),
197 "topic1" => (
198 TableProviderFilterPushDown::Exact,
199 filter.topic1(
200 values
201 .into_iter()
202 .map(|v| B256::from_slice(&v[..]))
203 .collect::<Vec<_>>(),
204 ),
205 ),
206 "topic2" => (
207 TableProviderFilterPushDown::Exact,
208 filter.topic2(
209 values
210 .into_iter()
211 .map(|v| B256::from_slice(&v[..]))
212 .collect::<Vec<_>>(),
213 ),
214 ),
215 "topic3" => (
216 TableProviderFilterPushDown::Exact,
217 filter.topic3(
218 values
219 .into_iter()
220 .map(|v| B256::from_slice(&v[..]))
221 .collect::<Vec<_>>(),
222 ),
223 ),
224 _ => (TableProviderFilterPushDown::Unsupported, filter),
225 }
226 }
227 _ => (TableProviderFilterPushDown::Unsupported, filter),
228 }
229 }
230 _ => (TableProviderFilterPushDown::Unsupported, filter),
231 },
232 _ => (TableProviderFilterPushDown::Unsupported, filter),
233 }
234 }
235
236 fn collect_or_group(
241 expr: &BinaryExpr,
242 ) -> (TableProviderFilterPushDown, Option<&Column>, Vec<&Vec<u8>>) {
243 match (&*expr.left, expr.op, &*expr.right) {
244 (Expr::Column(col), Operator::Eq, Expr::Literal(ScalarValue::Binary(Some(val)))) => {
245 (TableProviderFilterPushDown::Exact, Some(col), vec![val])
246 }
247 (Expr::BinaryExpr(left), Operator::Or, Expr::BinaryExpr(right)) => {
248 let mut left = Self::collect_or_group(left);
250 let mut right = Self::collect_or_group(right);
251 if left.0 == right.0
252 && right.0 == TableProviderFilterPushDown::Exact
253 && left.1 == right.1
254 {
255 left.2.append(&mut right.2);
256 (TableProviderFilterPushDown::Exact, left.1, left.2)
257 } else {
258 (TableProviderFilterPushDown::Unsupported, None, Vec::new())
259 }
260 }
261 _ => (TableProviderFilterPushDown::Unsupported, None, Vec::new()),
262 }
263 }
264}
265
266#[async_trait::async_trait]
267impl TableProvider for EthLogsTable {
268 fn as_any(&self) -> &dyn Any {
269 self
270 }
271
272 fn table_type(&self) -> TableType {
273 TableType::Base
274 }
275
276 fn schema(&self) -> SchemaRef {
277 self.schema.clone()
278 }
279
280 fn supports_filters_pushdown(
281 &self,
282 filters: &[&Expr],
283 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
284 tracing::debug!(?filters, "EthLogs: performing filter pushdown");
285
286 let mut filter = EthProviderConfig::default().default_filter();
287 let mut res = Vec::new();
288
289 for expr in filters {
290 let (support, new_filter) = Self::apply_expr(filter, expr);
291 filter = new_filter;
292 res.push(support);
293 }
294
295 tracing::debug!(?filter, "EthLogs: resulting filter");
296 Ok(res)
297 }
298
299 async fn scan(
300 &self,
301 state: &dyn Session,
302 mut projection: Option<&Vec<usize>>,
303 filters: &[Expr],
304 limit: Option<usize>,
305 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
306 let config = state
307 .config_options()
308 .extensions
309 .get::<EthProviderConfig>()
310 .cloned()
311 .unwrap_or_default();
312
313 tracing::debug!(
314 ?config,
315 ?projection,
316 ?filters,
317 ?limit,
318 "EthLogs: creating execution plan"
319 );
320
321 if let Some(proj) = projection {
323 let is_redundant = proj.len() == self.schema.fields.len()
324 && proj[0] == 0
325 && proj
326 .iter()
327 .cloned()
328 .reduce(|a, b| if a + 1 == b { b } else { a })
329 == Some(self.schema.fields.len() - 1);
330 if is_redundant {
331 projection = None
332 }
333 }
334
335 let schema = if let Some(proj) = projection {
336 Arc::new(self.schema.project(proj)?)
337 } else {
338 self.schema.clone()
339 };
340
341 let mut filter = config.default_filter();
343
344 for expr in filters {
346 let (support, new_filter) = Self::apply_expr(filter, expr);
347 assert_eq!(support, TableProviderFilterPushDown::Exact);
348 filter = new_filter;
349 }
350
351 Ok(Arc::new(EthGetLogs::new(
352 self.rpc_client.clone(),
353 schema,
354 projection.cloned(),
355 filter,
356 config.stream_options(),
357 limit,
358 )))
359 }
360}
361
362#[derive(Debug, Clone)]
365pub struct EthGetLogs {
366 projected_schema: SchemaRef,
367 projection: Option<Vec<usize>>,
368 rpc_client: DynProvider,
369 filter: Filter,
370 stream_options: StreamOptions,
371 limit: Option<usize>,
372 properties: PlanProperties,
373}
374
375impl EthGetLogs {
376 pub fn new(
377 rpc_client: DynProvider,
378 projected_schema: SchemaRef,
379 projection: Option<Vec<usize>>,
380 filter: Filter,
381 stream_options: StreamOptions,
382 limit: Option<usize>,
383 ) -> Self {
384 Self {
385 projected_schema: projected_schema.clone(),
386 projection,
387 rpc_client,
388 filter,
389 stream_options,
390 limit,
391 properties: PlanProperties::new(
392 EquivalenceProperties::new(projected_schema),
393 datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
394 datafusion::physical_plan::execution_plan::EmissionType::Incremental,
395 datafusion::physical_plan::execution_plan::Boundedness::Bounded,
397 ),
398 }
399 }
400
401 pub fn filter(&self) -> &Filter {
402 &self.filter
403 }
404
405 pub fn with_filter(mut self, filter: Filter) -> Self {
406 self.filter = filter;
407 self
408 }
409
410 fn execute_impl(
411 rpc_client: DynProvider,
412 filter: Filter,
413 options: StreamOptions,
414 projection: Option<Vec<usize>>,
415 limit: Option<usize>,
416 ) -> impl Stream<Item = DfResult<RecordBatch>> {
417 async_stream::try_stream! {
418 let limit = limit.unwrap_or(usize::MAX);
419 let mut coder = crate::convert::EthRawLogsToArrow::new();
420 let mut total = 0;
421
422 let mut log_stream = Box::pin(
424 crate::stream::RawLogsStream::paginate(rpc_client, filter, options, None)
425 );
426
427 while let Some(batch) = log_stream.try_next().await.map_err(|e| DataFusionError::External(e.into()))? {
428 if total >= limit {
429 break;
430 }
431 let to_append = usize::min(batch.logs.len(), limit - total);
432 coder.append(&batch.logs[0..to_append]).map_err(|e| DataFusionError::External(e.into()))?;
433 total += to_append;
434
435 if coder.len() > 1_000 {
437 let batch = if let Some(proj) = &projection {
438 coder.finish().project(proj)?
439 } else {
440 coder.finish()
441 };
442 yield batch;
443 }
444 }
445
446 if !coder.is_empty() || total == 0 {
448 let batch = if let Some(proj) = &projection {
449 coder.finish().project(proj)?
450 } else {
451 coder.finish()
452 };
453 yield batch;
454 }
455 }
456 }
457}
458
459impl ExecutionPlan for EthGetLogs {
460 fn name(&self) -> &str {
461 Self::static_name()
462 }
463
464 fn as_any(&self) -> &dyn Any {
465 self
466 }
467
468 fn properties(&self) -> &PlanProperties {
469 &self.properties
470 }
471
472 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
473 vec![]
475 }
476
477 fn with_new_children(
478 self: Arc<Self>,
479 _: Vec<Arc<dyn ExecutionPlan>>,
480 ) -> DfResult<Arc<dyn ExecutionPlan>> {
481 Ok(self)
482 }
483
484 fn execute(
485 &self,
486 partition: usize,
487 _context: Arc<TaskContext>,
488 ) -> DfResult<datafusion::execution::SendableRecordBatchStream> {
489 assert_eq!(partition, 0);
490
491 let stream = Self::execute_impl(
492 self.rpc_client.clone(),
493 self.filter.clone(),
494 self.stream_options.clone(),
495 self.projection.clone(),
496 self.limit,
497 );
498 Ok(Box::pin(RecordBatchStreamAdapter::new(
499 self.projected_schema.clone(),
500 stream,
501 )))
502 }
503}
504
505impl DisplayAs for EthGetLogs {
506 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
507 match t {
508 DisplayFormatType::Default | DisplayFormatType::Verbose => {
509 write!(f, "EthGetLogs: ")?;
510 if self.projection.is_none() {
511 write!(f, "projection=[*]")?;
512 } else {
513 let projection_str = self
514 .projected_schema
515 .fields
516 .iter()
517 .map(|f| f.name().as_str())
518 .collect::<Vec<_>>()
519 .join(", ");
520
521 write!(f, "projection=[{}]", projection_str)?;
522 }
523
524 let mut filters = Vec::new();
525
526 match self.filter.block_option {
527 FilterBlockOption::Range {
528 from_block,
529 to_block,
530 } => filters.push(format!("block_number=[{:?}, {:?}]", from_block, to_block)),
531 FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={}", h)),
532 }
533
534 if !self.filter.address.is_empty() {
535 let mut addrs: Vec<_> =
537 self.filter.address.iter().map(|h| h.to_string()).collect();
538 addrs.sort();
539 filters.push(format!("address=[{}]", addrs.join(", ")));
540 }
541
542 for (i, t) in self
543 .filter
544 .topics
545 .iter()
546 .enumerate()
547 .filter(|(_, t)| !t.is_empty())
548 {
549 let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
551 topics.sort();
552 filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
553 }
554
555 write!(f, ", filter=[{}]", filters.join(", "))?;
556
557 if let Some(limit) = self.limit {
558 write!(f, ", limit={}", limit)?;
559 }
560
561 Ok(())
562 }
563 DisplayFormatType::TreeRender => {
564 writeln!(f, "EthGetLogs")?;
565
566 if self.projection.is_none() {
567 writeln!(f, "projection=[*]")?;
568 } else {
569 let projection_str = self
570 .projected_schema
571 .fields
572 .iter()
573 .map(|f| f.name().as_str())
574 .collect::<Vec<_>>()
575 .join(", ");
576
577 writeln!(f, "projection=[{}]", projection_str)?;
578 }
579
580 let mut filters = Vec::new();
581
582 match self.filter.block_option {
583 FilterBlockOption::Range {
584 from_block,
585 to_block,
586 } => filters.push(format!("block_number=[{:?}, {:?}]", from_block, to_block)),
587 FilterBlockOption::AtBlockHash(h) => filters.push(format!("block_hash={}", h)),
588 }
589
590 if !self.filter.address.is_empty() {
591 let mut addrs: Vec<_> =
593 self.filter.address.iter().map(|h| h.to_string()).collect();
594 addrs.sort();
595 filters.push(format!("address=[{}]", addrs.join(", ")));
596 }
597
598 for (i, t) in self
599 .filter
600 .topics
601 .iter()
602 .enumerate()
603 .filter(|(_, t)| !t.is_empty())
604 {
605 let mut topics: Vec<_> = t.iter().map(|h| h.to_string()).collect();
607 topics.sort();
608 filters.push(format!("topic{}=[{}]", i, topics.join(", ")));
609 }
610
611 writeln!(f, "filter=[{}]", filters.join(", "))?;
612
613 if let Some(limit) = self.limit {
614 writeln!(f, "limit={}", limit)?;
615 }
616
617 Ok(())
618 }
619 }
620 }
621}
622
623