Skip to main content

ethl_cli/commands/
merge.rs

1use anyhow::Result;
2use clap::Parser;
3use ethl::storage::merger::EventFileMerger;
4use parse_size::parse_size;
5use tracing::info;
6
7use crate::commands::full_signatures_to_events;
8
9fn value_parse_size(s: &str) -> Result<u64, String> {
10    parse_size(s).map_err(|e| e.to_string())
11}
12
13#[derive(Parser, Debug)]
14pub struct MergeArgs {
15    /// Full event signatures to filter and parse against (e.g. "event Transfer(address from,address to,uint256 amount)")
16    #[arg(long, required = true)]
17    events: Vec<String>,
18
19    /// The archived events location (eg: file:///tmp/events or s3://my-bucket/events)
20    #[arg(long, required = true)]
21    archive_path: String,
22
23    /// Minimum file size in bytes to trigger a new file (default: 50MB)
24    /// Note: this is a soft limit, files may be larger if a single batch of logs exceeds this size
25    #[arg(long, value_parser = value_parse_size, help = "File size limit (e.g., '10MB', '2GiB')")]
26    min_size: u64,
27
28    /// Dry run - show what would be merged but do not perform any merges
29    #[arg(long, default_value_t = false)]
30    dry_run: bool,
31}
32
33pub async fn run_merge_command(args: MergeArgs) -> Result<()> {
34    let events = full_signatures_to_events(&args.events)?;
35    let archives = events
36        .into_iter()
37        .map(|e| {
38            (
39                e.clone(),
40                EventFileMerger::new(&args.archive_path, args.min_size, e),
41            )
42        })
43        .collect::<Vec<_>>();
44
45    for (event, merger_res) in archives {
46        let merger = merger_res?;
47        let plan = merger.generate_plan().await?;
48
49        if args.dry_run {
50            info!("DRY RUN: Merge plan for event {}:\n{}", event.name, plan);
51            continue;
52        }
53        merger.execute_plan(plan).await?;
54    }
55
56    Ok(())
57}