Skip to main content

ethl_cli/commands/
mod.rs

1use alloy::{json_abi::Event, primitives::Address, rpc::types::Filter};
2use anyhow::Result;
3use clap::{Parser, Subcommand};
4use ethl::rpc::config::{ProviderOptions, ProviderSettings};
5
6pub mod cat;
7pub mod extract;
8pub mod merge;
9pub mod repair;
10
11#[derive(Debug, Subcommand)]
12pub enum Commands {
13    /// Extract a set of events from RPC nodes to parquet files to a given path (file://, s3://, etc)
14    #[command(arg_required_else_help = true)]
15    Extract(extract::ExtractArgs),
16
17    /// Stream latest logs (or parsed events) to console
18    #[command(arg_required_else_help = false)]
19    Cat(cat::CatArgs),
20
21    /// Merge event parquet files into larger files with a minimum size
22    #[command(arg_required_else_help = true)]
23    Merge(merge::MergeArgs),
24
25    /// Repair event parquet files by removing orphaned files that are fully covered by other files
26    #[command(arg_required_else_help = true)]
27    Repair(repair::RepairArgs),
28}
29
30/// Common provider configuration for all commands
31#[derive(Debug, Parser)]
32pub struct ProviderArgs {
33    /// Chain ID used when auto-configuring provider URLs.
34    /// Supported: 1 (Ethereum), 42161 (Arbitrum), 8453 (Base), 10 (Optimism), 137 (Polygon), 56 (BNB Smart Chain)
35    /// If not specified, defaults to 1 (Ethereum mainnet)
36    #[arg(long, env, global = true)]
37    chain_id: Option<u64>,
38
39    /// Ankr API Key - Auto configures Ankr RPC and WSS endpoints for supported chain
40    #[arg(long, env, short, global = true)]
41    ankr_api_key: Option<String>,
42
43    /// Infura API Key - Auto configures Infura RPC and WSS endpoints for supported chain
44    #[arg(long, env, global = true)]
45    infura_api_key: Option<String>,
46
47    /// Quicknode API Key - Auto configures Quicknode RPC and WSS endpoints for supported chain
48    #[arg(long, env, global = true)]
49    quicknode_api_key: Option<String>,
50
51    /// Alchemy API Key - Auto configures Alchemy RPC and WSS endpoints for supported chain
52    #[arg(long, env, global = true)]
53    alchemy_api_key: Option<String>,
54
55    /// Comma separated list of custom HTTP RPC URLs
56    #[arg(long, env, global = true)]
57    rpc_url: Option<Vec<String>>,
58
59    /// Comma separated list of custom websocket URLs
60    #[arg(long, env, global = true)]
61    ws_url: Option<Vec<String>>,
62}
63
64impl TryFrom<ProviderArgs> for ProviderSettings {
65    type Error = anyhow::Error;
66    fn try_from(args: ProviderArgs) -> Result<Self, Self::Error> {
67        ProviderSettings::build(
68            ProviderOptions {
69                ankr_api_key: args.ankr_api_key,
70                infura_api_key: args.infura_api_key,
71                quicknode_api_key: args.quicknode_api_key,
72                alchemy_api_key: args.alchemy_api_key,
73                rpc_urls: args.rpc_url,
74                ws_urls: args.ws_url,
75            },
76            args.chain_id.unwrap_or(1), // Default to Ethereum mainnet
77        )
78    }
79}
80
81#[derive(Debug, Parser)]
82pub struct FilterArgs {
83    /// Full event signatures to filter and parse against (e.g. "event Transfer(address from,address to,uint256 amount)")
84    #[arg(long, required = false)]
85    events: Option<Vec<String>>,
86
87    /// Comma separated list of contract addresses to filter logs by
88    #[arg(long, short, required = false, value_delimiter = ',')]
89    addresses: Option<Vec<Address>>,
90
91    /// Optional end block for the archive (default: latest)
92    #[arg(long)]
93    to_block: Option<u64>,
94
95    /// Optional start block for the archive (default: 0)
96    #[arg(long)]
97    from_block: Option<u64>,
98}
99
100impl FilterArgs {
101    pub fn parsed_events(&self) -> Option<Result<Vec<Event>>> {
102        self.events.as_ref().map(|e| full_signatures_to_events(e))
103    }
104}
105
106impl TryInto<Option<Filter>> for &FilterArgs {
107    type Error = anyhow::Error;
108
109    fn try_into(self) -> Result<Option<Filter>> {
110        if self.addresses.is_none() && self.events.is_none() {
111            return Ok(None);
112        }
113
114        let mut filter = Filter::new();
115        if let Some(addresses) = &self.addresses {
116            filter = filter.address(addresses.clone());
117        }
118
119        let events: Option<Vec<Event>> = self.try_into()?;
120
121        if let Some(events) = events {
122            filter = filter.events(
123                events
124                    .iter()
125                    .map(|e| e.signature())
126                    .collect::<Vec<String>>(),
127            );
128        }
129
130        if let Some(from_block) = self.from_block {
131            filter = filter.from_block(from_block);
132        }
133
134        if let Some(to_block) = self.to_block {
135            filter = filter.to_block(to_block);
136        }
137
138        Ok(Some(filter))
139    }
140}
141
142impl TryInto<Option<Vec<Event>>> for &FilterArgs {
143    type Error = anyhow::Error;
144    fn try_into(self) -> Result<Option<Vec<Event>>> {
145        self.events
146            .as_ref()
147            .map(|e| full_signatures_to_events(e))
148            .transpose()
149    }
150}
151
152fn full_signatures_to_events(signatures: &[String]) -> Result<Vec<Event>> {
153    signatures
154        .iter()
155        .map(|s| Event::parse(s).map_err(anyhow::Error::from))
156        .collect::<Result<Vec<Event>>>()
157}