ethl 0.1.12

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::fmt::Display;

use anyhow::Result;
use object_store::{ObjectMeta, path::Path};

use crate::storage::store::EventStoreError;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IntegrityStatus {
    Intact,
    Repairable,
    Unrepairable,
}

#[derive(Debug, Clone)]
pub struct StoreIntegrityReport {
    pub gaps: Vec<(u64, u64)>,
    pub orphans: Vec<StoredFile>,
    pub overlaps: Vec<(u64, u64)>,
}

impl StoreIntegrityReport {
    pub fn status(&self) -> IntegrityStatus {
        if !self.gaps.is_empty() || !self.overlaps.is_empty() {
            IntegrityStatus::Unrepairable
        } else if !self.orphans.is_empty() {
            IntegrityStatus::Repairable
        } else {
            IntegrityStatus::Intact
        }
    }
}

impl Display for StoreIntegrityReport {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self.status() {
            IntegrityStatus::Intact => write!(f, "Store integrity: intact"),
            IntegrityStatus::Repairable => {
                write!(f, "Store integrity: repairable")?;
                for file in &self.orphans {
                    writeln!(
                        f,
                        "  - {} (blocks {} to {})",
                        file.path().filename().unwrap_or_default(),
                        file.block_start,
                        file.block_end
                    )?;
                }
                Ok(())
            }
            IntegrityStatus::Unrepairable => {
                writeln!(f, "Store integrity: unrepairable")?;
                if !self.gaps.is_empty() {
                    writeln!(f, "Gaps in stored files:")?;
                    for (start, end) in &self.gaps {
                        writeln!(f, "  - blocks {} to {}", start, end)?;
                    }
                }
                if !self.overlaps.is_empty() {
                    writeln!(f, "Overlapping files:")?;
                    for (start, end) in &self.overlaps {
                        writeln!(f, "  - blocks {} to {}", start, end)?;
                    }
                }
                Ok(())
            }
        }
    }
}

#[derive(Debug, Clone, Eq)]
pub struct StoredFile {
    pub block_start: u64,
    pub block_end: u64,
    pub meta: ObjectMeta,
}

impl StoredFile {
    pub fn new(meta: ObjectMeta) -> Result<Self> {
        let path = &meta.location;
        let parts: Vec<&str> = path.filename().unwrap().split('-').collect();
        if parts.len() == 2 {
            let block_start = parts[0].parse()?;
            let block_end = parts[1].replace(".parquet", "").parse()?;
            Ok(Self {
                block_start,
                block_end,
                meta,
            })
        } else {
            Err(anyhow::anyhow!("Invalid file name format"))
        }
    }

    pub fn path(&self) -> &Path {
        &self.meta.location
    }

    pub fn range(&self) -> (u64, u64) {
        (self.block_start, self.block_end)
    }

    pub fn overlaps_range(&self, start: u64, end: u64) -> bool {
        self.block_start <= end && start <= self.block_end
    }
}

impl PartialEq for StoredFile {
    fn eq(&self, other: &Self) -> bool {
        self.block_start == other.block_start
    }
}

impl Ord for StoredFile {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.block_start.cmp(&other.block_start)
    }
}

impl PartialOrd for StoredFile {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

/// Represents a directory of stored event files with methods to sanitize and check integrity.
pub struct StoreDirectory {
    pub files: Vec<StoredFile>,
}

impl From<Vec<StoredFile>> for StoreDirectory {
    fn from(files: Vec<StoredFile>) -> Self {
        StoreDirectory { files }
    }
}

impl StoreDirectory {
    /// Returns the overall block range covered by the stored files.
    pub fn stored_range(&self) -> (u64, u64) {
        if self.files.is_empty() {
            return (0, 0);
        }
        (
            self.files[0].block_start,
            self.files[self.files.len() - 1].block_end,
        )
    }

    pub fn sanitize(self) -> Result<Self, EventStoreError> {
        // Remove any covered files (orphans) and return Err if gaps or overlaps exist
        // Naive approach for now
        let integrity = self.integrity_report();

        match integrity.status() {
            IntegrityStatus::Unrepairable => Err(EventStoreError::IntegrityError(integrity)),
            IntegrityStatus::Repairable => {
                let files = self
                    .files
                    .into_iter()
                    .filter(|f| !integrity.orphans.contains(f))
                    .collect::<Vec<_>>();

                Ok(StoreDirectory { files })
            }
            IntegrityStatus::Intact => {
                // Nothing to do
                Ok(self)
            }
        }
    }

    fn orphans(&self) -> Vec<StoredFile> {
        let mut orphans = Vec::new();
        if self.files.is_empty() {
            return orphans;
        }
        // Naive approach O(n^2): any file that is fully covered by a previous file is an orphan
        // TODO: Leverage sorted order to make this O(n)
        for file in &self.files {
            for other in &self.files {
                if file != other
                    && file.block_start >= other.block_start
                    && file.block_end <= other.block_end
                {
                    orphans.push(file.clone());
                }
            }
        }
        orphans
    }

    fn gaps(&self) -> Vec<(u64, u64)> {
        self.files
            .windows(2)
            .filter(|w| w[1].block_start - 1 > w[0].block_end)
            .map(|w| (w[0].block_end + 1, w[1].block_start - 1))
            .collect::<Vec<_>>()
    }

    fn overlaps(&self) -> Vec<(u64, u64)> {
        self.files
            .windows(2)
            .filter(|w| w[0].block_end >= w[1].block_start)
            .map(|w| (w[1].block_start, w[0].block_end))
            .collect::<Vec<_>>()
    }

    pub fn integrity_report(&self) -> StoreIntegrityReport {
        // Naive approach for now
        let orphans = self.orphans();
        let target = if orphans.is_empty() {
            self
        } else {
            let files = self
                .files
                .iter()
                .filter(|f| !orphans.contains(f))
                .cloned()
                .collect::<Vec<_>>();
            &StoreDirectory { files }
        };

        StoreIntegrityReport {
            gaps: target.gaps(),
            orphans,
            overlaps: target.overlaps(),
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::storage::store::{StoreDirectory, StoredFile};
    use anyhow::Result;
    use chrono::DateTime;
    use object_store::{ObjectMeta, path::Path};

    fn mock_file(location: &str) -> Result<StoredFile> {
        StoredFile::new(ObjectMeta {
            location: Path::parse(location).unwrap(),
            last_modified: DateTime::default(),
            size: 0,
            e_tag: None,
            version: None,
        })
    }

    #[test]
    pub fn test_name_parsing() -> Result<()> {
        let file = mock_file("000000000100-000000000200.parquet").unwrap();
        assert_eq!(file.block_start, 100);
        assert_eq!(file.block_end, 200);
        assert!(mock_file("invalid-name.parquet").is_err());
        Ok(())
    }

    #[test]
    pub fn test_overlap() {
        let file = mock_file("000000000100-000000000200.parquet").unwrap();
        assert_eq!(file.block_start, 100);
        assert_eq!(file.block_end, 200);
        assert!(file.overlaps_range(98, 101));
        assert!(file.overlaps_range(98, 201));
        assert!(file.overlaps_range(140, 160));
        assert!(file.overlaps_range(198, 201));
        assert!(!file.overlaps_range(201, 2000));
        assert!(!file.overlaps_range(97, 98));
    }

    #[test]
    fn test_integrity_overlap() -> Result<()> {
        let files: StoreDirectory = vec![
            mock_file("000000000001-000000000010.parquet")?,
            mock_file("000000000011-000000000020.parquet")?,
            mock_file("000000000021-000000000030.parquet")?,
        ]
        .into();

        assert!(files.overlaps().is_empty(), "Expected no overlaps");

        let files: StoreDirectory = vec![
            mock_file("000000000001-000000000010.parquet")?,
            mock_file("000000000010-000000000020.parquet")?,
            mock_file("000000000021-000000000030.parquet")?,
            mock_file("000000000025-000000000032.parquet")?,
        ]
        .into();

        println!("Overlaps: {:?}", files.overlaps());
        assert!(!files.overlaps().is_empty(), "Expected overlaps");
        assert_eq!(files.overlaps()[0], (10, 10));
        assert_eq!(files.overlaps()[1], (25, 30));

        Ok(())
    }

    #[test]
    fn test_integrity_gap() -> Result<()> {
        let files: StoreDirectory = vec![
            mock_file("000000000001-000000000010.parquet")?,
            mock_file("000000000011-000000000020.parquet")?,
            mock_file("000000000021-000000000030.parquet")?,
        ]
        .into();

        assert!(files.gaps().is_empty(), "Expected no gaps");

        let files: StoreDirectory = vec![
            mock_file("000000000001-000000000010.parquet")?,
            mock_file("000000000012-000000000020.parquet")?,
            mock_file("000000000021-000000000030.parquet")?,
            mock_file("000000000210-000000000300.parquet")?,
        ]
        .into();

        println!("Gaps: {:?}", files.gaps());
        assert!(!files.gaps().is_empty(), "Expected gaps");
        assert_eq!(files.gaps()[0], (11, 11), "Expected (11,11) gap");
        assert_eq!(files.gaps()[1], (31, 209), "Expected (31,209) gap");

        Ok(())
    }

    #[test]
    fn test_integrity_orphan() -> Result<()> {
        let dir: StoreDirectory = vec![
            mock_file("000000000001-000000000030.parquet").unwrap(),
            mock_file("000000000031-000000000040.parquet").unwrap(),
            mock_file("000000000041-000000000050.parquet").unwrap(),
        ]
        .into();

        assert!(dir.orphans().is_empty(), "Expected no gaps");

        let dir: StoreDirectory = vec![
            mock_file("000000000001-000000000030.parquet").unwrap(),
            mock_file("000000000015-000000000020.parquet").unwrap(),
            mock_file("000000000025-000000000030.parquet").unwrap(),
        ]
        .into();

        println!("Orphans: {:?}", dir.orphans());
        assert!(!dir.orphans().is_empty(), "Expected orphans");
        assert_eq!(dir.orphans()[0].range(), (15, 20));
        assert_eq!(dir.orphans()[1].range(), (25, 30));

        Ok(())
    }

    #[test]
    fn test_sanitize() -> Result<()> {
        let dir: StoreDirectory = vec![
            mock_file("000000000001-000000000030.parquet").unwrap(),
            mock_file("000000000015-000000000020.parquet").unwrap(),
            mock_file("000000000025-000000000030.parquet").unwrap(),
        ]
        .into();

        assert_eq!(
            dir.sanitize()?.stored_range(),
            (1, 30),
            "Orphans should be ignored"
        );

        let dir: StoreDirectory = vec![
            mock_file("000000000015-000000000020.parquet").unwrap(),
            mock_file("000000000025-000000000030.parquet").unwrap(),
        ]
        .into();
        assert!(dir.sanitize().is_err(), "Expected integrity error");

        let dir: StoreDirectory = vec![
            mock_file("000000000015-000000000020.parquet").unwrap(),
            mock_file("000000000020-000000000030.parquet").unwrap(),
        ]
        .into();
        assert!(dir.sanitize().is_err(), "Expected integrity error");

        let dir: StoreDirectory = vec![
            mock_file("000000000015-000000000020.parquet").unwrap(),
            mock_file("000000000021-000000000030.parquet").unwrap(),
        ]
        .into();
        assert!(dir.sanitize().is_ok());

        Ok(())
    }
}