tycho-block-util 0.3.7

Shared utilities for blockchain models.
Documentation
//! # Archive structure
//!
//! - Archive prefix (4 bytes): `0x65 0x8F 0x14 0x29`
//! - For each archive entry:
//!  * Archive entry header ([`ArchiveEntryHeader`] as TL)
//!  * Archive entry data

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use tycho_types::models::BlockId;
use tycho_util::FastHashMap;

pub use self::proto::{
    ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveEntryHeader, ArchiveEntryType,
};
pub use self::reader::{ArchiveEntry, ArchiveReader, ArchiveReaderError, ArchiveVerifier};
use crate::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
use crate::queue::{QueueDiffStuff, QueueDiffStuffAug};

mod proto;
mod reader;

pub struct Archive {
    pub mc_block_ids: BTreeMap<u32, BlockId>,
    pub blocks: FastHashMap<BlockId, ArchiveDataEntry>,
}

impl Archive {
    pub const MAX_MC_BLOCKS_PER_ARCHIVE: u32 = 100;

    pub fn new<T>(data: T) -> Result<Self>
    where
        Bytes: From<T>,
    {
        let data = Bytes::from(data);
        let reader = ArchiveReader::new(&data)?;

        let mut res = Archive {
            mc_block_ids: Default::default(),
            blocks: Default::default(),
        };

        for entry_data in reader {
            let entry = entry_data?;

            let id = entry.block_id;
            if id.is_masterchain() {
                res.mc_block_ids.insert(id.seqno, id);
            }

            let parsed = res.blocks.entry(id).or_default();

            match entry.ty {
                ArchiveEntryType::Block => {
                    anyhow::ensure!(parsed.block.is_none(), "duplicate block data for: {id}");
                    parsed.block = Some(data.slice_ref(entry.data));
                }
                ArchiveEntryType::Proof => {
                    anyhow::ensure!(parsed.proof.is_none(), "duplicate block proof for: {id}");
                    parsed.proof = Some(data.slice_ref(entry.data));
                }
                ArchiveEntryType::QueueDiff => {
                    anyhow::ensure!(
                        parsed.queue_diff.is_none(),
                        "duplicate queue diff for: {id}"
                    );
                    parsed.queue_diff = Some(data.slice_ref(entry.data));
                }
            }
        }

        Ok(res)
    }

    pub fn check_mc_blocks_range(&self) -> Result<()> {
        match (
            self.mc_block_ids.first_key_value(),
            self.mc_block_ids.last_key_value(),
        ) {
            (Some((first_seqno, _)), Some((last_seqno, _))) => {
                if (last_seqno - first_seqno + 1) != self.mc_block_ids.len() as u32 {
                    anyhow::bail!("archive does not contain some mc blocks")
                }

                Ok(())
            }
            _ => {
                anyhow::bail!("archive is empty")
            }
        }
    }

    /// NOTE: Takes up to a magnitude of seconds to run on large blocks.
    pub async fn get_entry_by_id(
        self: &Arc<Self>,
        id: &BlockId,
    ) -> Result<(BlockStuffAug, BlockProofStuffAug, QueueDiffStuffAug), ArchiveError> {
        let this = self.clone();
        let id = *id;

        let (block, proof, queue_diff) = tycho_util::sync::rayon_run(move || {
            let mut block_res = None;
            let mut proof_res = None;
            let mut diff_res = None;
            rayon::scope(|s| {
                s.spawn(|_| {
                    proof_res = Some(this.get_proof_by_id(&id));
                    diff_res = Some(this.get_queue_diff_by_id(&id));
                });

                block_res = Some(this.get_block_by_id(&id));
            });

            (
                block_res.expect("scope must finish"),
                proof_res.expect("scope must finish"),
                diff_res.expect("scope must finish"),
            )
        })
        .await;

        Ok((block?, proof?, queue_diff?))
    }

    /// NOTE: Takes up to a magnitude of seconds to run on large blocks.
    pub fn get_block_by_id(&self, id: &BlockId) -> Result<BlockStuffAug, ArchiveError> {
        let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
        entry
            .block
            .as_ref()
            .ok_or(ArchiveError::BlockNotFound)
            .and_then(|data| {
                let block = BlockStuff::deserialize_checked(id, data)?;
                Ok(WithArchiveData::new::<Bytes>(block, data.clone()))
            })
    }

    pub fn get_proof_by_id(&self, id: &BlockId) -> Result<BlockProofStuffAug, ArchiveError> {
        let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
        entry
            .proof
            .as_ref()
            .ok_or(ArchiveError::BlockNotFound)
            .and_then(|data| {
                let proof = BlockProofStuff::deserialize(id, data)?;
                Ok(WithArchiveData::new::<Bytes>(proof, data.clone()))
            })
    }

    pub fn get_queue_diff_by_id(&self, id: &BlockId) -> Result<QueueDiffStuffAug, ArchiveError> {
        let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
        entry
            .queue_diff
            .as_ref()
            .ok_or(ArchiveError::BlockNotFound)
            .and_then(|data| {
                let diff = QueueDiffStuff::deserialize(id, data)?;
                Ok(WithArchiveData::new::<Bytes>(diff, data.clone()))
            })
    }
}

#[derive(Default)]
pub struct ArchiveDataEntry {
    pub block: Option<Bytes>,
    pub proof: Option<Bytes>,
    pub queue_diff: Option<Bytes>,
}

#[derive(Clone)]
pub enum ArchiveData {
    /// The raw data is known.
    New(Bytes),
    /// Raw data is not known (due to nondeterministic serialization).
    Existing,
}

impl ArchiveData {
    /// Assumes that the object is constructed with known raw data.
    pub fn as_new_archive_data(&self) -> Result<&[u8], WithArchiveDataError> {
        match self {
            ArchiveData::New(data) => Ok(data),
            ArchiveData::Existing => Err(WithArchiveDataError),
        }
    }

    /// Assumes that the object is constructed with known raw data.
    pub fn clone_new_archive_data(&self) -> Result<Bytes, WithArchiveDataError> {
        match self {
            ArchiveData::New(data) => Ok(data.clone()),
            ArchiveData::Existing => Err(WithArchiveDataError),
        }
    }
}

/// Parsed data wrapper, augmented with the optional raw data.
///
/// Stores the raw data only in the context of the archive parser, or received block.
///
/// NOTE: Can be safely cloned, all raw bytes are shared (see [`Bytes`])
///
/// See: [`ArchiveData`]
#[derive(Clone)]
pub struct WithArchiveData<T> {
    pub data: T,
    pub archive_data: ArchiveData,
}

impl<T> WithArchiveData<T> {
    /// Constructs a new object from the context with known raw data.
    pub fn new<A>(data: T, archive_data: A) -> Self
    where
        Bytes: From<A>,
    {
        Self {
            data,
            archive_data: ArchiveData::New(Bytes::from(archive_data)),
        }
    }

    /// Constructs a new object from the context without known raw data.
    pub fn loaded(data: T) -> Self {
        Self {
            data,
            archive_data: ArchiveData::Existing,
        }
    }

    /// Assumes that the object is constructed with known raw data.
    pub fn as_new_archive_data(&self) -> Result<&[u8], WithArchiveDataError> {
        self.archive_data.as_new_archive_data()
    }

    /// Assumes that the object is constructed with known raw data.
    pub fn clone_new_archive_data(&self) -> Result<Bytes, WithArchiveDataError> {
        self.archive_data.clone_new_archive_data()
    }
}

impl<T> std::ops::Deref for WithArchiveData<T> {
    type Target = T;

    #[inline(always)]
    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

#[derive(Debug, Copy, Clone, thiserror::Error)]
#[error("archive data not loaded")]
pub struct WithArchiveDataError;

#[derive(thiserror::Error, Debug)]
pub enum ArchiveError {
    #[error("block id is out of range")]
    OutOfRange,
    #[error("block not found")]
    BlockNotFound,
    #[error("proof not found")]
    ProofNotFound,
    #[error(transparent)]
    Other(#[from] anyhow::Error),
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    pub fn correct_context() {
        const DATA: &[u8] = &[1, 2, 3];

        assert_eq!(
            WithArchiveData::new((), DATA.to_vec())
                .as_new_archive_data()
                .unwrap(),
            DATA
        );
        assert!(WithArchiveData::loaded(()).as_new_archive_data().is_err());
    }
}