Skip to main content

ethl_cli/commands/
extract.rs

1use std::sync::Arc;
2
3use alloy::{json_abi::Event, rpc::types::Filter};
4use anyhow::Result;
5use clap::Parser;
6use ethl::{
7    archive::extract::EventExtractor, rpc::config::ProviderSettings,
8    storage::store::parse_store_uri,
9};
10use tracing::debug;
11
12use crate::commands::FilterArgs;
13
14#[derive(Parser, Debug)]
15pub struct ExtractArgs {
16    /// Filter arguments for scoping log queries
17    #[command(flatten)]
18    filter_args: FilterArgs,
19
20    /// The output directory for the archived logs (eg: file:///tmp/events or s3://my-bucket/events)
21    #[arg(long, required = true)]
22    output: String,
23
24    /// The target number of events per parquet file (default: 200,000)
25    #[arg(long, default_value_t = 200_000)]
26    target_events_per_file: usize,
27}
28
29pub async fn extract_events(providers: &ProviderSettings, args: ExtractArgs) -> Result<()> {
30    debug!("Extracting events: {:?} - {:?}", args, providers);
31
32    let events: Option<Vec<Event>> = (&args.filter_args).try_into()?;
33    let filter: Option<Filter> = (&args.filter_args).try_into()?;
34    let (store, path) = parse_store_uri(&args.output)?;
35    if events.is_none() {
36        return Err(anyhow::anyhow!(
37            "At least one valid --event must be specified for the extract command"
38        ));
39    }
40
41    let mut indexer = EventExtractor::new(
42        Arc::new(store),
43        path,
44        args.target_events_per_file,
45        providers.clone(),
46        events.unwrap(),
47        filter.unwrap(),
48    )?;
49
50    indexer.extract().await?;
51
52    Ok(())
53}