ethl_cli/commands/
extract.rs

1use alloy::{json_abi::Event, rpc::types::Filter};
2use anyhow::Result;
3use clap::{Parser, arg};
4use ethl::{archive::extract::EventExtractor, rpc::config::ProviderSettings};
5use tracing::debug;
6
7use crate::commands::FilterArgs;
8
9#[derive(Parser, Debug)]
10pub struct ExtractArgs {
11    /// Filter arguments for scoping log queries
12    #[command(flatten)]
13    filter_args: FilterArgs,
14
15    /// The output directory for the archived logs (eg: file:///tmp/events or s3://my-bucket/events)
16    #[arg(long, required = true)]
17    output: String,
18
19    /// The target number of events per parquet file (default: 200,000)
20    #[arg(long, default_value_t = 200_000)]
21    target_events_per_file: usize,
22}
23
24pub async fn extract_events(providers: &ProviderSettings, args: ExtractArgs) -> Result<()> {
25    debug!("Extracting events: {:?} - {:?}", args, providers);
26
27    let events: Option<Vec<Event>> = (&args.filter_args).try_into()?;
28    let filter: Option<Filter> = (&args.filter_args).try_into()?;
29
30    if events.is_none() {
31        return Err(anyhow::anyhow!(
32            "At least one valid --event must be specified for the extract command"
33        ));
34    }
35
36    let mut indexer = EventExtractor::new(
37        args.output,
38        args.target_events_per_file,
39        providers.clone(),
40        events.unwrap(),
41        filter.unwrap(),
42    )?;
43
44    indexer.extract().await?;
45
46    Ok(())
47}