ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
//! This module provides functionality to merge smaller event archive files into larger ones
//! based on a target byte size. It uses a target size to determine when to merge files and
//! ensures that only contiguous block ranges are merged together. Files that are already
//! larger than the target size are skipped. The merging process involves streaming data from
//! the source files and writing them into a new Parquet file in the object store. Files which
//! are not merged (either because they are already large enough or because they are
//! unqualified for merging) are reported in the merge plan.
use std::fmt::Display;

use crate::storage::{
    codec::SolEventCodec,
    store::{EventStore, StoredFile},
};
use alloy::json_abi::Event;
use anyhow::Result;
use tracing::info;

#[derive(Debug, Clone)]
pub struct MergeGroup {
    pub sources: Vec<StoredFile>,
}

impl MergeGroup {
    pub fn init(file: StoredFile) -> Self {
        MergeGroup {
            sources: vec![file],
        }
    }

    pub fn append(&mut self, file: StoredFile) {
        self.sources.push(file);
    }

    pub fn range(&self) -> (u64, u64) {
        let start = self.sources.first().map(|f| f.block_start).unwrap();
        let end = self.sources.last().map(|f| f.block_end).unwrap();
        (start, end)
    }

    pub fn size(&self) -> u64 {
        self.sources.iter().map(|f| f.meta.size).sum()
    }

    fn filename(&self) -> String {
        let (start, end) = self.range();
        format!("{:012}-{:012}.parquet", start, end)
    }

    fn qualified(&self) -> bool {
        self.sources.len() > 1
    }
}

impl Display for MergeGroup {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let (start, end) = self.range();
        write!(
            f,
            "blocks {}-{} ({} files, {} bytes) -> {}",
            start,
            end,
            self.sources.len(),
            self.size(),
            self.filename()
        )
    }
}

#[derive(Debug, Clone)]
pub struct MergePlan {
    pub candidates: Vec<MergeGroup>,
    pub target_size_bytes: u64,
    pub skipped: Vec<StoredFile>,
}

impl MergePlan {
    pub fn new(target_size_bytes: u64) -> Self {
        MergePlan {
            candidates: vec![],
            target_size_bytes,
            skipped: vec![],
        }
    }

    fn add(&mut self, file: StoredFile) {
        if file.meta.size >= self.target_size_bytes {
            self.skipped.push(file);
            return;
        }

        if self.candidates.is_empty() {
            self.candidates.push(MergeGroup::init(file));
            return;
        }

        let tail = self.candidates.last().unwrap();
        if tail.size() >= self.target_size_bytes {
            self.candidates.push(MergeGroup::init(file));
            return;
        }

        if tail.range().1 + 1 != file.block_start {
            self.candidates.push(MergeGroup::init(file));
            return;
        }

        self.candidates.last_mut().unwrap().append(file);
    }

    fn finalize(&mut self) {
        let unqualified = self
            .candidates
            .iter()
            .filter(|g| !g.qualified())
            .cloned()
            .flat_map(|g| g.sources)
            .collect::<Vec<_>>();
        self.skipped.extend(unqualified);
        self.candidates.retain(|g| g.qualified());
    }
}

impl Display for MergePlan {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        writeln!(f, "Merge Plan:")?;
        for group in &self.candidates {
            writeln!(f, "  - {}", group)?;
        }
        if !self.skipped.is_empty() {
            writeln!(f, "Skipped files:")?;
            for file in &self.skipped {
                writeln!(
                    f,
                    "  - {} ({} bytes)",
                    file.path().filename().unwrap(),
                    file.meta.size
                )?;
            }
        }
        Ok(())
    }
}

/// This struct is used to archive events to an object store.
/// It allows for appending logs and flushing them to the store in batches.
/// It is designed to work with a specific event signature.
pub struct EventFileMerger {
    store: EventStore,
    byte_size: u64,
}

impl EventFileMerger {
    pub fn new(base_uri: impl AsRef<str>, byte_size: u64, event: Event) -> Result<Self> {
        let codec = SolEventCodec::new(&event)?;
        let store = EventStore::from_uri(&base_uri, &codec)?;
        Ok(EventFileMerger { store, byte_size })
    }

    pub async fn generate_plan(&self) -> Result<MergePlan> {
        let files: Vec<StoredFile> = self.store.list().await?.files;

        let mut plan = MergePlan::new(self.byte_size);
        for file in &files {
            plan.add(file.clone());
        }

        plan.finalize();

        Ok(plan)
    }

    pub async fn execute_plan(&self, plan: MergePlan) -> Result<()> {
        if plan.candidates.is_empty() {
            info!("No files to merge...");
            return Ok(());
        }

        for group in &plan.candidates {
            if !group.qualified() {
                continue;
            }
            info!("Merging {}", group);
            self.store
                .merge_files(&group.sources, &group.filename())
                .await?;
        }

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use chrono::DateTime;
    use object_store::ObjectMeta;

    use super::*;

    fn mock_file(start: u64, end: u64, size: u64) -> StoredFile {
        StoredFile {
            block_start: start,
            block_end: end,
            meta: ObjectMeta {
                size,
                last_modified: DateTime::default(),
                location: "".into(),
                e_tag: None,
                version: None,
            },
        }
    }

    #[test]
    fn test_merge_group() -> Result<()> {
        let file1 = mock_file(0, 100, 500);
        let file2 = mock_file(101, 200, 600);

        let mut group = MergeGroup::init(file1.clone());
        assert_eq!(group.range(), (0, 100));
        assert_eq!(group.size(), 500);
        assert!(!group.qualified());

        group.append(file2.clone());
        assert_eq!(group.range(), (0, 200));
        assert_eq!(group.size(), 1100);
        assert!(group.qualified());

        assert_eq!(group.filename(), "000000000000-000000000200.parquet");

        Ok(())
    }

    #[test]
    fn test_merge_plan() -> Result<()> {
        let target_size = 1000;

        let file1 = mock_file(0, 100, 500);
        let file2 = mock_file(101, 200, 400);
        let file3 = mock_file(201, 300, 1500);
        let file4 = mock_file(301, 400, 700);
        let file5 = mock_file(401, 500, 800);

        let mut plan = MergePlan::new(target_size);

        plan.add(file1.clone());
        plan.add(file2.clone());
        plan.add(file3.clone());
        plan.add(file4.clone());
        plan.add(file5.clone());

        plan.finalize();

        assert_eq!(plan.candidates.len(), 2);
        let group = &plan.candidates[0];
        assert_eq!(group.range(), (0, 200));
        let group = &plan.candidates[1];
        assert_eq!(group.range(), (301, 500));

        assert_eq!(plan.skipped.len(), 1);
        assert!(plan.skipped.contains(&file3));

        Ok(())
    }
}