tycho_block_util/archive/
mod.rs

1//! # Archive structure
2//!
3//! - Archive prefix (4 bytes): `0x65 0x8F 0x14 0x29`
4//! - For each archive entry:
5//!  * Archive entry header ([`ArchiveEntryHeader`] as TL)
6//!  * Archive entry data
7
8use std::collections::BTreeMap;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::Result;
13use bytes::Bytes;
14use tycho_types::models::BlockId;
15use tycho_util::FastHashMap;
16
17pub use self::proto::{
18    ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveEntryHeader, ArchiveEntryType,
19};
20pub use self::reader::{ArchiveEntry, ArchiveReader, ArchiveReaderError, ArchiveVerifier};
21use crate::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
22use crate::queue::{QueueDiffStuff, QueueDiffStuffAug};
23
24mod proto;
25mod reader;
26
27pub struct Archive {
28    pub mc_block_ids: BTreeMap<u32, BlockId>,
29    pub blocks: FastHashMap<BlockId, ArchiveDataEntry>,
30}
31
32impl Archive {
33    pub fn new<T>(data: T) -> Result<Self>
34    where
35        Bytes: From<T>,
36    {
37        let data = Bytes::from(data);
38        let reader = ArchiveReader::new(&data)?;
39
40        let mut res = Archive {
41            mc_block_ids: Default::default(),
42            blocks: Default::default(),
43        };
44
45        for entry_data in reader {
46            let entry = entry_data?;
47
48            let id = entry.block_id;
49            if id.is_masterchain() {
50                res.mc_block_ids.insert(id.seqno, id);
51            }
52
53            let parsed = res.blocks.entry(id).or_default();
54
55            match entry.ty {
56                ArchiveEntryType::Block => {
57                    anyhow::ensure!(parsed.block.is_none(), "duplicate block data for: {id}");
58                    parsed.block = Some(data.slice_ref(entry.data));
59                }
60                ArchiveEntryType::Proof => {
61                    anyhow::ensure!(parsed.proof.is_none(), "duplicate block proof for: {id}");
62                    parsed.proof = Some(data.slice_ref(entry.data));
63                }
64                ArchiveEntryType::QueueDiff => {
65                    anyhow::ensure!(
66                        parsed.queue_diff.is_none(),
67                        "duplicate queue diff for: {id}"
68                    );
69                    parsed.queue_diff = Some(data.slice_ref(entry.data));
70                }
71            }
72        }
73
74        Ok(res)
75    }
76
77    pub fn check_mc_blocks_range(&self) -> Result<()> {
78        match (
79            self.mc_block_ids.first_key_value(),
80            self.mc_block_ids.last_key_value(),
81        ) {
82            (Some((first_seqno, _)), Some((last_seqno, _))) => {
83                if (last_seqno - first_seqno + 1) != self.mc_block_ids.len() as u32 {
84                    anyhow::bail!("archive does not contain some mc blocks")
85                }
86
87                Ok(())
88            }
89            _ => {
90                anyhow::bail!("archive is empty")
91            }
92        }
93    }
94
95    /// NOTE: Takes up to a magnitude of seconds to run on large blocks.
96    pub async fn get_entry_by_id(
97        self: &Arc<Self>,
98        id: &BlockId,
99    ) -> Result<(BlockStuffAug, BlockProofStuffAug, QueueDiffStuffAug), ArchiveError> {
100        let this = self.clone();
101        let id = *id;
102
103        let (block, proof, queue_diff) = tycho_util::sync::rayon_run(move || {
104            let mut block_res = None;
105            let mut proof_res = None;
106            let mut diff_res = None;
107            rayon::scope(|s| {
108                s.spawn(|_| {
109                    proof_res = Some(this.get_proof_by_id(&id));
110                    diff_res = Some(this.get_queue_diff_by_id(&id));
111                });
112
113                block_res = Some(this.get_block_by_id(&id));
114            });
115
116            (
117                block_res.expect("scope must finish"),
118                proof_res.expect("scope must finish"),
119                diff_res.expect("scope must finish"),
120            )
121        })
122        .await;
123
124        Ok((block?, proof?, queue_diff?))
125    }
126
127    /// NOTE: Takes up to a magnitude of seconds to run on large blocks.
128    pub fn get_block_by_id(&self, id: &BlockId) -> Result<BlockStuffAug, ArchiveError> {
129        let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
130        entry
131            .block
132            .as_ref()
133            .ok_or(ArchiveError::BlockNotFound)
134            .and_then(|data| {
135                let block = BlockStuff::deserialize_checked(id, data)?;
136                Ok(WithArchiveData::new::<Bytes>(block, data.clone()))
137            })
138    }
139
140    pub fn get_proof_by_id(&self, id: &BlockId) -> Result<BlockProofStuffAug, ArchiveError> {
141        let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
142        entry
143            .proof
144            .as_ref()
145            .ok_or(ArchiveError::BlockNotFound)
146            .and_then(|data| {
147                let proof = BlockProofStuff::deserialize(id, data)?;
148                Ok(WithArchiveData::new::<Bytes>(proof, data.clone()))
149            })
150    }
151
152    pub fn get_queue_diff_by_id(&self, id: &BlockId) -> Result<QueueDiffStuffAug, ArchiveError> {
153        let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
154        entry
155            .queue_diff
156            .as_ref()
157            .ok_or(ArchiveError::BlockNotFound)
158            .and_then(|data| {
159                let diff = QueueDiffStuff::deserialize(id, data)?;
160                Ok(WithArchiveData::new::<Bytes>(diff, data.clone()))
161            })
162    }
163}
164
165#[derive(Default)]
166pub struct ArchiveDataEntry {
167    pub block: Option<Bytes>,
168    pub proof: Option<Bytes>,
169    pub queue_diff: Option<Bytes>,
170}
171
172#[derive(Clone)]
173pub enum ArchiveData {
174    /// The raw data is known.
175    New(Bytes),
176    /// Raw data is not known (due to nondeterministic serialization).
177    Existing,
178}
179
180impl ArchiveData {
181    /// Assumes that the object is constructed with known raw data.
182    pub fn as_new_archive_data(&self) -> Result<&[u8], WithArchiveDataError> {
183        match self {
184            ArchiveData::New(data) => Ok(data),
185            ArchiveData::Existing => Err(WithArchiveDataError),
186        }
187    }
188
189    /// Assumes that the object is constructed with known raw data.
190    pub fn clone_new_archive_data(&self) -> Result<Bytes, WithArchiveDataError> {
191        match self {
192            ArchiveData::New(data) => Ok(data.clone()),
193            ArchiveData::Existing => Err(WithArchiveDataError),
194        }
195    }
196}
197
198/// Parsed data wrapper, augmented with the optional raw data.
199///
200/// Stores the raw data only in the context of the archive parser, or received block.
201///
202/// NOTE: Can be safely cloned, all raw bytes are shared (see [`Bytes`])
203///
204/// See: [`ArchiveData`]
205#[derive(Clone)]
206pub struct WithArchiveData<T> {
207    pub data: T,
208    pub archive_data: ArchiveData,
209}
210
211impl<T> WithArchiveData<T> {
212    /// Constructs a new object from the context with known raw data.
213    pub fn new<A>(data: T, archive_data: A) -> Self
214    where
215        Bytes: From<A>,
216    {
217        Self {
218            data,
219            archive_data: ArchiveData::New(Bytes::from(archive_data)),
220        }
221    }
222
223    /// Constructs a new object from the context without known raw data.
224    pub fn loaded(data: T) -> Self {
225        Self {
226            data,
227            archive_data: ArchiveData::Existing,
228        }
229    }
230
231    /// Assumes that the object is constructed with known raw data.
232    pub fn as_new_archive_data(&self) -> Result<&[u8], WithArchiveDataError> {
233        self.archive_data.as_new_archive_data()
234    }
235
236    /// Assumes that the object is constructed with known raw data.
237    pub fn clone_new_archive_data(&self) -> Result<Bytes, WithArchiveDataError> {
238        self.archive_data.clone_new_archive_data()
239    }
240}
241
242impl<T> std::ops::Deref for WithArchiveData<T> {
243    type Target = T;
244
245    #[inline(always)]
246    fn deref(&self) -> &Self::Target {
247        &self.data
248    }
249}
250
251#[derive(Debug, Copy, Clone, thiserror::Error)]
252#[error("archive data not loaded")]
253pub struct WithArchiveDataError;
254
255#[derive(thiserror::Error, Debug)]
256pub enum ArchiveError {
257    #[error("block id is out of range")]
258    OutOfRange,
259    #[error("block not found")]
260    BlockNotFound,
261    #[error("proof not found")]
262    ProofNotFound,
263    #[error(transparent)]
264    Other(#[from] anyhow::Error),
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    pub fn correct_context() {
273        const DATA: &[u8] = &[1, 2, 3];
274
275        assert_eq!(
276            WithArchiveData::new((), DATA.to_vec())
277                .as_new_archive_data()
278                .unwrap(),
279            DATA
280        );
281        assert!(WithArchiveData::loaded(()).as_new_archive_data().is_err());
282    }
283}