tycho_core/block_strider/provider/
archive_provider.rs1use std::collections::{BTreeMap, btree_map};
2use std::io::Seek;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Result;
7use bytes::{BufMut, Bytes, BytesMut};
8use bytesize::ByteSize;
9use futures_util::future::BoxFuture;
10use serde::{Deserialize, Serialize};
11use tokio::sync::watch;
12use tokio::task::AbortHandle;
13use tycho_block_util::archive::Archive;
14use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
15use tycho_storage::fs::MappedFile;
16use tycho_types::models::BlockId;
17
18use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
19use crate::blockchain_rpc::{BlockchainRpcClient, PendingArchive, PendingArchiveResponse};
20use crate::overlay_client::{Neighbour, PunishReason};
21use crate::storage::CoreStorage;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(default)]
25pub struct ArchiveBlockProviderConfig {
26 pub max_archive_to_memory_size: ByteSize,
27}
28
29impl Default for ArchiveBlockProviderConfig {
30 fn default() -> Self {
31 Self {
32 max_archive_to_memory_size: ByteSize::mb(100),
33 }
34 }
35}
36
37#[derive(Clone)]
38#[repr(transparent)]
39pub struct ArchiveBlockProvider {
40 inner: Arc<Inner>,
41}
42
43impl ArchiveBlockProvider {
44 pub fn new(
45 client: BlockchainRpcClient,
46 storage: CoreStorage,
47 config: ArchiveBlockProviderConfig,
48 ) -> Self {
49 let proof_checker = ProofChecker::new(storage.clone());
50
51 Self {
52 inner: Arc::new(Inner {
53 client,
54 proof_checker,
55
56 known_archives: parking_lot::Mutex::new(Default::default()),
57
58 storage,
59 config,
60 }),
61 }
62 }
63
64 async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
65 let this = self.inner.as_ref();
66
67 let next_mc_seqno = block_id.seqno + 1;
68
69 loop {
70 let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
71 tracing::info!(mc_seqno = next_mc_seqno, "archive block provider finished");
72 break None;
73 };
74
75 let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
76 tracing::error!(
77 "received archive does not contain mc block with seqno {next_mc_seqno}"
78 );
79 info.from.punish(PunishReason::Malicious);
80 this.remove_archive_if_same(archive_key, &info);
81 continue;
82 };
83
84 match self
85 .checked_get_entry_by_id(&info.archive, block_id, block_id)
86 .await
87 {
88 Ok(block) => return Some(Ok(block.clone())),
89 Err(e) => {
90 tracing::error!(archive_key, %block_id, "invalid archive entry: {e}");
91 this.remove_archive_if_same(archive_key, &info);
92 info.from.punish(PunishReason::Malicious);
93 }
94 }
95 }
96 }
97
98 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
99 let this = self.inner.as_ref();
100
101 let block_id = block_id_relation.block_id;
102 let mc_block_id = block_id_relation.mc_block_id;
103
104 loop {
105 let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
106 tracing::warn!("shard block is too new for archives");
107
108 tokio::time::sleep(Duration::from_secs(1)).await;
110 continue;
111 };
112
113 match self
114 .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
115 .await
116 {
117 Ok(block) => return Some(Ok(block.clone())),
118 Err(e) => {
119 tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e}");
120 this.remove_archive_if_same(archive_key, &info);
121 info.from.punish(PunishReason::Malicious);
122 }
123 }
124 }
125 }
126
127 async fn checked_get_entry_by_id(
128 &self,
129 archive: &Arc<Archive>,
130 mc_block_id: &BlockId,
131 block_id: &BlockId,
132 ) -> Result<BlockStuffAug> {
133 let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
134 Ok(entry) => entry,
135 Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
136 };
137
138 self.inner
139 .proof_checker
140 .check_proof(CheckProof {
141 mc_block_id,
142 block: &block,
143 proof,
144 queue_diff,
145 store_on_success: true,
146 })
147 .await?;
148
149 Ok(block)
150 }
151}
152
153struct Inner {
154 storage: CoreStorage,
155
156 client: BlockchainRpcClient,
157 proof_checker: ProofChecker,
158
159 known_archives: parking_lot::Mutex<ArchivesMap>,
160
161 config: ArchiveBlockProviderConfig,
162}
163
164impl Inner {
165 async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
166 loop {
167 let mut pending = 'pending: {
168 let mut guard = self.known_archives.lock();
169
170 for (archive_key, value) in guard.iter() {
172 match value {
173 ArchiveSlot::Downloaded(info) => {
174 if info.archive.mc_block_ids.contains_key(&mc_seqno) {
175 return Some((*archive_key, info.clone()));
176 }
177 }
178 ArchiveSlot::Pending(task) => break 'pending task.clone(),
179 }
180 }
181
182 let task = self.make_downloader().spawn(mc_seqno);
184 guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
185
186 task
187 };
188
189 let mut res = None;
191 let mut finished = false;
192 loop {
193 match &*pending.rx.borrow_and_update() {
194 ArchiveTaskState::None => {}
195 ArchiveTaskState::Finished(archive) => {
196 res = archive.clone();
197 finished = true;
198 break;
199 }
200 ArchiveTaskState::Cancelled => break,
201 }
202 if pending.rx.changed().await.is_err() {
203 break;
204 }
205 }
206
207 match self.known_archives.lock().entry(pending.archive_key) {
209 btree_map::Entry::Vacant(_) => {
210 }
212 btree_map::Entry::Occupied(mut entry) => match &res {
213 None => {
214 entry.remove();
216 }
217 Some(info) => {
218 entry.insert(ArchiveSlot::Downloaded(info.clone()));
220 }
221 },
222 }
223
224 if finished {
225 return res.map(|info| (pending.archive_key, info));
226 }
227
228 tracing::warn!(mc_seqno, "archive task cancelled while in use");
229 tokio::task::yield_now().await;
231 }
232 }
233
234 fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
235 match self.known_archives.lock().entry(archive_key) {
236 btree_map::Entry::Vacant(_) => false,
237 btree_map::Entry::Occupied(entry) => {
238 if matches!(
239 entry.get(),
240 ArchiveSlot::Downloaded(info)
241 if Arc::ptr_eq(&info.archive, &prev.archive)
242 ) {
243 entry.remove();
244 true
245 } else {
246 false
247 }
248 }
249 }
250 }
251
252 fn make_downloader(&self) -> ArchiveDownloader {
253 ArchiveDownloader {
254 client: self.client.clone(),
255 storage: self.storage.clone(),
256 memory_threshold: self.config.max_archive_to_memory_size,
257 }
258 }
259
260 fn clear_outdated_archives(&self, bound: u32) {
261 const MAX_MC_PER_ARCHIVE: u32 = 100;
263
264 let mut entries_remaining = 0usize;
265 let mut entries_removed = 0usize;
266
267 let mut guard = self.known_archives.lock();
268 guard.retain(|_, archive| {
269 let retain;
270 match archive {
271 ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
272 None => retain = false,
273 Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
274 },
275 ArchiveSlot::Pending(task) => {
276 retain = task.archive_key.saturating_add(MAX_MC_PER_ARCHIVE) >= bound;
277 if !retain {
278 task.abort_handle.abort();
279 }
280 }
281 };
282
283 entries_remaining += retain as usize;
284 entries_removed += !retain as usize;
285 retain
286 });
287 drop(guard);
288
289 tracing::debug!(
290 entries_remaining,
291 entries_removed,
292 bound,
293 "removed known archives"
294 );
295 }
296}
297
298type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
299
300enum ArchiveSlot {
301 Downloaded(ArchiveInfo),
302 Pending(ArchiveTask),
303}
304
305#[derive(Clone)]
306struct ArchiveInfo {
307 from: Neighbour,
308 archive: Arc<Archive>,
309}
310
311struct ArchiveDownloader {
312 client: BlockchainRpcClient,
313 storage: CoreStorage,
314 memory_threshold: ByteSize,
315}
316
317impl ArchiveDownloader {
318 fn spawn(self, mc_seqno: u32) -> ArchiveTask {
319 const INTERVAL: Duration = Duration::from_secs(1);
321
322 let (tx, rx) = watch::channel(ArchiveTaskState::None);
323
324 let guard = scopeguard::guard(tx, move |tx| {
325 tracing::warn!(mc_seqno, "cancelled preloading archive");
326 tx.send_modify(|prev| {
327 if !matches!(prev, ArchiveTaskState::Finished(..)) {
328 *prev = ArchiveTaskState::Cancelled;
329 }
330 });
331 });
332
333 let handle = tokio::spawn(async move {
335 tracing::debug!(mc_seqno, "started preloading archive");
336 scopeguard::defer! {
337 tracing::debug!(mc_seqno, "finished preloading archive");
338 }
339
340 loop {
341 match self.try_download(mc_seqno).await {
342 Ok(res) => {
343 let tx = scopeguard::ScopeGuard::into_inner(guard);
344 tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
345 break;
346 }
347 Err(e) => {
348 tracing::error!(mc_seqno, "failed to preload archive {e}");
349 tokio::time::sleep(INTERVAL).await;
350 }
351 }
352 }
353 });
354
355 ArchiveTask {
356 archive_key: mc_seqno,
357 rx,
358 abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
359 }
360 }
361
362 async fn try_download(&self, seqno: u32) -> Result<Option<ArchiveInfo>> {
363 let response = self.client.find_archive(seqno).await?;
364 let pending = match response {
365 PendingArchiveResponse::Found(pending) => pending,
366 PendingArchiveResponse::TooNew => return Ok(None),
367 };
368
369 let neighbour = pending.neighbour.clone();
370
371 let writer = self.get_archive_writer(&pending)?;
372 let writer = self.client.download_archive(pending, writer).await?;
373
374 let span = tracing::Span::current();
375 tokio::task::spawn_blocking(move || {
376 let _span = span.enter();
377
378 let bytes = writer.try_freeze()?;
379
380 let archive = match Archive::new(bytes) {
381 Ok(array) => array,
382 Err(e) => {
383 neighbour.punish(PunishReason::Malicious);
384 return Err(e);
385 }
386 };
387
388 if let Err(e) = archive.check_mc_blocks_range() {
389 neighbour.punish(PunishReason::Malicious);
391 return Err(e);
392 }
393
394 Ok(ArchiveInfo {
395 archive: Arc::new(archive),
396 from: neighbour,
397 })
398 })
399 .await?
400 .map(Some)
401 }
402
403 fn get_archive_writer(&self, pending: &PendingArchive) -> Result<ArchiveWriter> {
404 Ok(if pending.size.get() > self.memory_threshold.as_u64() {
405 let file = self.storage.context().temp_files().unnamed_file().open()?;
406 ArchiveWriter::File(std::io::BufWriter::new(file))
407 } else {
408 ArchiveWriter::Bytes(BytesMut::new().writer())
409 })
410 }
411}
412
413#[derive(Clone)]
414struct ArchiveTask {
415 archive_key: u32,
416 rx: watch::Receiver<ArchiveTaskState>,
417 abort_handle: Arc<AbortOnDrop>,
418}
419
420#[repr(transparent)]
421struct AbortOnDrop(AbortHandle);
422
423impl std::ops::Deref for AbortOnDrop {
424 type Target = AbortHandle;
425
426 #[inline]
427 fn deref(&self) -> &Self::Target {
428 &self.0
429 }
430}
431
432impl Drop for AbortOnDrop {
433 fn drop(&mut self) {
434 self.0.abort();
435 }
436}
437
438#[derive(Default)]
439enum ArchiveTaskState {
440 #[default]
441 None,
442 Finished(Option<ArchiveInfo>),
443 Cancelled,
444}
445
446impl BlockProvider for ArchiveBlockProvider {
447 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
448 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
449 type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
450
451 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
452 Box::pin(self.get_next_block_impl(prev_block_id))
453 }
454
455 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
456 Box::pin(self.get_block_impl(block_id_relation))
457 }
458
459 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
460 self.inner.clear_outdated_archives(mc_seqno);
461 futures_util::future::ready(Ok(()))
462 }
463}
464
465enum ArchiveWriter {
466 File(std::io::BufWriter<std::fs::File>),
467 Bytes(bytes::buf::Writer<BytesMut>),
468}
469
470impl ArchiveWriter {
471 fn try_freeze(self) -> Result<Bytes, std::io::Error> {
472 match self {
473 Self::File(file) => match file.into_inner() {
474 Ok(mut file) => {
475 file.seek(std::io::SeekFrom::Start(0))?;
476 MappedFile::from_existing_file(file).map(Bytes::from_owner)
477 }
478 Err(e) => Err(e.into_error()),
479 },
480 Self::Bytes(data) => Ok(data.into_inner().freeze()),
481 }
482 }
483}
484
485impl std::io::Write for ArchiveWriter {
486 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
487 match self {
488 Self::File(writer) => writer.write(buf),
489 Self::Bytes(writer) => writer.write(buf),
490 }
491 }
492
493 fn flush(&mut self) -> std::io::Result<()> {
494 match self {
495 Self::File(writer) => writer.flush(),
496 Self::Bytes(writer) => writer.flush(),
497 }
498 }
499
500 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
501 match self {
502 Self::File(writer) => writer.write_all(buf),
503 Self::Bytes(writer) => writer.write_all(buf),
504 }
505 }
506
507 fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> std::io::Result<()> {
508 match self {
509 Self::File(writer) => writer.write_fmt(fmt),
510 Self::Bytes(writer) => writer.write_fmt(fmt),
511 }
512 }
513}