tycho_block_util/archive/
mod.rs1use std::collections::BTreeMap;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::Result;
13use bytes::Bytes;
14use tycho_types::models::BlockId;
15use tycho_util::FastHashMap;
16
17pub use self::proto::{
18 ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveEntryHeader, ArchiveEntryType,
19};
20pub use self::reader::{ArchiveEntry, ArchiveReader, ArchiveReaderError, ArchiveVerifier};
21use crate::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug};
22use crate::queue::{QueueDiffStuff, QueueDiffStuffAug};
23
24mod proto;
25mod reader;
26
27pub struct Archive {
28 pub mc_block_ids: BTreeMap<u32, BlockId>,
29 pub blocks: FastHashMap<BlockId, ArchiveDataEntry>,
30}
31
32impl Archive {
33 pub fn new<T>(data: T) -> Result<Self>
34 where
35 Bytes: From<T>,
36 {
37 let data = Bytes::from(data);
38 let reader = ArchiveReader::new(&data)?;
39
40 let mut res = Archive {
41 mc_block_ids: Default::default(),
42 blocks: Default::default(),
43 };
44
45 for entry_data in reader {
46 let entry = entry_data?;
47
48 let id = entry.block_id;
49 if id.is_masterchain() {
50 res.mc_block_ids.insert(id.seqno, id);
51 }
52
53 let parsed = res.blocks.entry(id).or_default();
54
55 match entry.ty {
56 ArchiveEntryType::Block => {
57 anyhow::ensure!(parsed.block.is_none(), "duplicate block data for: {id}");
58 parsed.block = Some(data.slice_ref(entry.data));
59 }
60 ArchiveEntryType::Proof => {
61 anyhow::ensure!(parsed.proof.is_none(), "duplicate block proof for: {id}");
62 parsed.proof = Some(data.slice_ref(entry.data));
63 }
64 ArchiveEntryType::QueueDiff => {
65 anyhow::ensure!(
66 parsed.queue_diff.is_none(),
67 "duplicate queue diff for: {id}"
68 );
69 parsed.queue_diff = Some(data.slice_ref(entry.data));
70 }
71 }
72 }
73
74 Ok(res)
75 }
76
77 pub fn check_mc_blocks_range(&self) -> Result<()> {
78 match (
79 self.mc_block_ids.first_key_value(),
80 self.mc_block_ids.last_key_value(),
81 ) {
82 (Some((first_seqno, _)), Some((last_seqno, _))) => {
83 if (last_seqno - first_seqno + 1) != self.mc_block_ids.len() as u32 {
84 anyhow::bail!("archive does not contain some mc blocks")
85 }
86
87 Ok(())
88 }
89 _ => {
90 anyhow::bail!("archive is empty")
91 }
92 }
93 }
94
95 pub async fn get_entry_by_id(
97 self: &Arc<Self>,
98 id: &BlockId,
99 ) -> Result<(BlockStuffAug, BlockProofStuffAug, QueueDiffStuffAug), ArchiveError> {
100 let this = self.clone();
101 let id = *id;
102
103 let (block, proof, queue_diff) = tycho_util::sync::rayon_run(move || {
104 let mut block_res = None;
105 let mut proof_res = None;
106 let mut diff_res = None;
107 rayon::scope(|s| {
108 s.spawn(|_| {
109 proof_res = Some(this.get_proof_by_id(&id));
110 diff_res = Some(this.get_queue_diff_by_id(&id));
111 });
112
113 block_res = Some(this.get_block_by_id(&id));
114 });
115
116 (
117 block_res.expect("scope must finish"),
118 proof_res.expect("scope must finish"),
119 diff_res.expect("scope must finish"),
120 )
121 })
122 .await;
123
124 Ok((block?, proof?, queue_diff?))
125 }
126
127 pub fn get_block_by_id(&self, id: &BlockId) -> Result<BlockStuffAug, ArchiveError> {
129 let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
130 entry
131 .block
132 .as_ref()
133 .ok_or(ArchiveError::BlockNotFound)
134 .and_then(|data| {
135 let block = BlockStuff::deserialize_checked(id, data)?;
136 Ok(WithArchiveData::new::<Bytes>(block, data.clone()))
137 })
138 }
139
140 pub fn get_proof_by_id(&self, id: &BlockId) -> Result<BlockProofStuffAug, ArchiveError> {
141 let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
142 entry
143 .proof
144 .as_ref()
145 .ok_or(ArchiveError::BlockNotFound)
146 .and_then(|data| {
147 let proof = BlockProofStuff::deserialize(id, data)?;
148 Ok(WithArchiveData::new::<Bytes>(proof, data.clone()))
149 })
150 }
151
152 pub fn get_queue_diff_by_id(&self, id: &BlockId) -> Result<QueueDiffStuffAug, ArchiveError> {
153 let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
154 entry
155 .queue_diff
156 .as_ref()
157 .ok_or(ArchiveError::BlockNotFound)
158 .and_then(|data| {
159 let diff = QueueDiffStuff::deserialize(id, data)?;
160 Ok(WithArchiveData::new::<Bytes>(diff, data.clone()))
161 })
162 }
163}
164
165#[derive(Default)]
166pub struct ArchiveDataEntry {
167 pub block: Option<Bytes>,
168 pub proof: Option<Bytes>,
169 pub queue_diff: Option<Bytes>,
170}
171
172#[derive(Clone)]
173pub enum ArchiveData {
174 New(Bytes),
176 Existing,
178}
179
180impl ArchiveData {
181 pub fn as_new_archive_data(&self) -> Result<&[u8], WithArchiveDataError> {
183 match self {
184 ArchiveData::New(data) => Ok(data),
185 ArchiveData::Existing => Err(WithArchiveDataError),
186 }
187 }
188
189 pub fn clone_new_archive_data(&self) -> Result<Bytes, WithArchiveDataError> {
191 match self {
192 ArchiveData::New(data) => Ok(data.clone()),
193 ArchiveData::Existing => Err(WithArchiveDataError),
194 }
195 }
196}
197
198#[derive(Clone)]
206pub struct WithArchiveData<T> {
207 pub data: T,
208 pub archive_data: ArchiveData,
209}
210
211impl<T> WithArchiveData<T> {
212 pub fn new<A>(data: T, archive_data: A) -> Self
214 where
215 Bytes: From<A>,
216 {
217 Self {
218 data,
219 archive_data: ArchiveData::New(Bytes::from(archive_data)),
220 }
221 }
222
223 pub fn loaded(data: T) -> Self {
225 Self {
226 data,
227 archive_data: ArchiveData::Existing,
228 }
229 }
230
231 pub fn as_new_archive_data(&self) -> Result<&[u8], WithArchiveDataError> {
233 self.archive_data.as_new_archive_data()
234 }
235
236 pub fn clone_new_archive_data(&self) -> Result<Bytes, WithArchiveDataError> {
238 self.archive_data.clone_new_archive_data()
239 }
240}
241
242impl<T> std::ops::Deref for WithArchiveData<T> {
243 type Target = T;
244
245 #[inline(always)]
246 fn deref(&self) -> &Self::Target {
247 &self.data
248 }
249}
250
251#[derive(Debug, Copy, Clone, thiserror::Error)]
252#[error("archive data not loaded")]
253pub struct WithArchiveDataError;
254
255#[derive(thiserror::Error, Debug)]
256pub enum ArchiveError {
257 #[error("block id is out of range")]
258 OutOfRange,
259 #[error("block not found")]
260 BlockNotFound,
261 #[error("proof not found")]
262 ProofNotFound,
263 #[error(transparent)]
264 Other(#[from] anyhow::Error),
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 pub fn correct_context() {
273 const DATA: &[u8] = &[1, 2, 3];
274
275 assert_eq!(
276 WithArchiveData::new((), DATA.to_vec())
277 .as_new_archive_data()
278 .unwrap(),
279 DATA
280 );
281 assert!(WithArchiveData::loaded(()).as_new_archive_data().is_err());
282 }
283}