ethl_cli/commands/
mod.rs

1use alloy::{json_abi::Event, primitives::Address, rpc::types::Filter};
2use anyhow::Result;
3use clap::{Parser, Subcommand, arg};
4use ethl::rpc::config::{ProviderOptions, ProviderSettings};
5
6pub mod cat;
7pub mod extract;
8pub mod merge;
9pub mod repair;
10
11#[derive(Debug, Subcommand)]
12pub enum Commands {
13    /// Extract a set of events from RPC nodes to parquet files to a given path (file://, s3://, etc)
14    #[command(arg_required_else_help = true)]
15    Extract(extract::ExtractArgs),
16
17    /// Stream latest logs (or parsed events) to console
18    #[command(arg_required_else_help = false)]
19    Cat(cat::CatArgs),
20
21    /// Merge event parquet files into larger files with a minimum size
22    #[command(arg_required_else_help = true)]
23    Merge(merge::MergeArgs),
24
25    /// Repair event parquet files by removing orphaned files that are fully covered by other files
26    #[command(arg_required_else_help = true)]
27    Repair(repair::RepairArgs),
28}
29
30/// Common provider configuration for all commands
31#[derive(Debug, Parser)]
32pub struct ProviderArgs {
33    /// The chain ID to use for auto configuring providers (1=mainnet, 5=goerli, etc)
34    /// If not specified, defaults to Base mainnet (1)
35    #[arg(long, env, global = true)]
36    chain_id: Option<u64>,
37
38    /// Ankr API Key - Auto configures Ankr RPC and WSS endpoints for supported chain
39    #[arg(long, env, short, global = true)]
40    ankr_api_key: Option<String>,
41
42    /// Infura API Key - Auto configures Infura RPC and WSS endpoints for supported chain
43    #[arg(long, env, global = true)]
44    infura_api_key: Option<String>,
45
46    /// Quicknode API Key - Auto configures Quicknode RPC and WSS endpoints for supported chain
47    #[arg(long, env, global = true)]
48    quicknode_api_key: Option<String>,
49
50    /// Alchemy API Key - Auto configures Alchemy RPC and WSS endpoints for supported chain
51    #[arg(long, env, global = true)]
52    alchemy_api_key: Option<String>,
53
54    /// Comma separated list of custom HTTP RPC URLs
55    #[arg(long, env, global = true)]
56    rpc_url: Option<Vec<String>>,
57
58    /// Comma separated list of custom websocket URLs
59    #[arg(long, env, global = true)]
60    ws_url: Option<Vec<String>>,
61}
62
63impl TryFrom<ProviderArgs> for ProviderSettings {
64    type Error = anyhow::Error;
65    fn try_from(args: ProviderArgs) -> Result<Self, Self::Error> {
66        ProviderSettings::build(
67            ProviderOptions {
68                ankr_api_key: args.ankr_api_key,
69                infura_api_key: args.infura_api_key,
70                quicknode_api_key: args.quicknode_api_key,
71                alchemy_api_key: args.alchemy_api_key,
72                rpc_urls: args.rpc_url,
73                ws_urls: args.ws_url,
74            },
75            args.chain_id.unwrap_or(1), // Default to Base mainnet
76        )
77    }
78}
79
80#[derive(Debug, Parser)]
81pub struct FilterArgs {
82    /// Full event signatures to filter and parse against (e.g. "event Transfer(address from,address to,uint256 amount)")
83    #[arg(long, required = false)]
84    events: Option<Vec<String>>,
85
86    /// Comma separated list of contract addresses to filter logs by
87    #[arg(long, short, required = false, value_delimiter = ',')]
88    addresses: Option<Vec<Address>>,
89
90    /// Optional end block for the archive (default: latest)
91    #[arg(long)]
92    to_block: Option<u64>,
93
94    /// Optional start block for the archive (default: 0)
95    #[arg(long)]
96    from_block: Option<u64>,
97}
98
99impl FilterArgs {
100    pub fn parsed_events(&self) -> Option<Result<Vec<Event>>> {
101        self.events.as_ref().map(|e| full_signatures_to_events(e))
102    }
103}
104
105impl TryInto<Option<Filter>> for &FilterArgs {
106    type Error = anyhow::Error;
107
108    fn try_into(self) -> Result<Option<Filter>> {
109        if self.addresses.is_none() && self.events.is_none() {
110            return Ok(None);
111        }
112
113        let mut filter = Filter::new();
114        if let Some(addresses) = &self.addresses {
115            filter = filter.address(addresses.clone());
116        }
117
118        let events: Option<Vec<Event>> = self.try_into()?;
119
120        if let Some(events) = events {
121            filter = filter.events(
122                events
123                    .iter()
124                    .map(|e| e.signature())
125                    .collect::<Vec<String>>(),
126            );
127        }
128
129        if let Some(from_block) = self.from_block {
130            filter = filter.from_block(from_block);
131        }
132
133        if let Some(to_block) = self.to_block {
134            filter = filter.to_block(to_block);
135        }
136
137        Ok(Some(filter))
138    }
139}
140
141impl TryInto<Option<Vec<Event>>> for &FilterArgs {
142    type Error = anyhow::Error;
143    fn try_into(self) -> Result<Option<Vec<Event>>> {
144        self.events
145            .as_ref()
146            .map(|e| full_signatures_to_events(e))
147            .transpose()
148    }
149}
150
151fn full_signatures_to_events(signatures: &[String]) -> Result<Vec<Event>> {
152    signatures
153        .iter()
154        .map(|s| Event::parse(s).map_err(anyhow::Error::from))
155        .collect::<Result<Vec<Event>>>()
156}