tycho_core/block_strider/provider/
archive_provider.rs

1use std::collections::{BTreeMap, btree_map};
2use std::io::Seek;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Result;
7use bytes::{BufMut, Bytes, BytesMut};
8use bytesize::ByteSize;
9use futures_util::future::BoxFuture;
10use serde::{Deserialize, Serialize};
11use tokio::sync::watch;
12use tokio::task::AbortHandle;
13use tycho_block_util::archive::Archive;
14use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
15use tycho_storage::fs::MappedFile;
16use tycho_types::models::BlockId;
17
18use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
19use crate::blockchain_rpc::{BlockchainRpcClient, PendingArchive, PendingArchiveResponse};
20use crate::overlay_client::{Neighbour, PunishReason};
21use crate::storage::CoreStorage;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(default)]
25pub struct ArchiveBlockProviderConfig {
26    pub max_archive_to_memory_size: ByteSize,
27}
28
29impl Default for ArchiveBlockProviderConfig {
30    fn default() -> Self {
31        Self {
32            max_archive_to_memory_size: ByteSize::mb(100),
33        }
34    }
35}
36
37#[derive(Clone)]
38#[repr(transparent)]
39pub struct ArchiveBlockProvider {
40    inner: Arc<Inner>,
41}
42
43impl ArchiveBlockProvider {
44    pub fn new(
45        client: BlockchainRpcClient,
46        storage: CoreStorage,
47        config: ArchiveBlockProviderConfig,
48    ) -> Self {
49        let proof_checker = ProofChecker::new(storage.clone());
50
51        Self {
52            inner: Arc::new(Inner {
53                client,
54                proof_checker,
55
56                known_archives: parking_lot::Mutex::new(Default::default()),
57
58                storage,
59                config,
60            }),
61        }
62    }
63
64    async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
65        let this = self.inner.as_ref();
66
67        let next_mc_seqno = block_id.seqno + 1;
68
69        loop {
70            let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
71                tracing::info!(mc_seqno = next_mc_seqno, "archive block provider finished");
72                break None;
73            };
74
75            let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
76                tracing::error!(
77                    "received archive does not contain mc block with seqno {next_mc_seqno}"
78                );
79                info.from.punish(PunishReason::Malicious);
80                this.remove_archive_if_same(archive_key, &info);
81                continue;
82            };
83
84            match self
85                .checked_get_entry_by_id(&info.archive, block_id, block_id)
86                .await
87            {
88                Ok(block) => return Some(Ok(block.clone())),
89                Err(e) => {
90                    tracing::error!(archive_key, %block_id, "invalid archive entry: {e}");
91                    this.remove_archive_if_same(archive_key, &info);
92                    info.from.punish(PunishReason::Malicious);
93                }
94            }
95        }
96    }
97
98    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
99        let this = self.inner.as_ref();
100
101        let block_id = block_id_relation.block_id;
102        let mc_block_id = block_id_relation.mc_block_id;
103
104        loop {
105            let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
106                tracing::warn!("shard block is too new for archives");
107
108                // NOTE: This is a strange situation, but if we wait a bit it might go away.
109                tokio::time::sleep(Duration::from_secs(1)).await;
110                continue;
111            };
112
113            match self
114                .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
115                .await
116            {
117                Ok(block) => return Some(Ok(block.clone())),
118                Err(e) => {
119                    tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e}");
120                    this.remove_archive_if_same(archive_key, &info);
121                    info.from.punish(PunishReason::Malicious);
122                }
123            }
124        }
125    }
126
127    async fn checked_get_entry_by_id(
128        &self,
129        archive: &Arc<Archive>,
130        mc_block_id: &BlockId,
131        block_id: &BlockId,
132    ) -> Result<BlockStuffAug> {
133        let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
134            Ok(entry) => entry,
135            Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
136        };
137
138        self.inner
139            .proof_checker
140            .check_proof(CheckProof {
141                mc_block_id,
142                block: &block,
143                proof,
144                queue_diff,
145                store_on_success: true,
146            })
147            .await?;
148
149        Ok(block)
150    }
151}
152
153struct Inner {
154    storage: CoreStorage,
155
156    client: BlockchainRpcClient,
157    proof_checker: ProofChecker,
158
159    known_archives: parking_lot::Mutex<ArchivesMap>,
160
161    config: ArchiveBlockProviderConfig,
162}
163
164impl Inner {
165    async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
166        loop {
167            let mut pending = 'pending: {
168                let mut guard = self.known_archives.lock();
169
170                // Search for the downloaded archive or for and existing downloader task.
171                for (archive_key, value) in guard.iter() {
172                    match value {
173                        ArchiveSlot::Downloaded(info) => {
174                            if info.archive.mc_block_ids.contains_key(&mc_seqno) {
175                                return Some((*archive_key, info.clone()));
176                            }
177                        }
178                        ArchiveSlot::Pending(task) => break 'pending task.clone(),
179                    }
180                }
181
182                // Start downloading otherwise
183                let task = self.make_downloader().spawn(mc_seqno);
184                guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
185
186                task
187            };
188
189            // Wait until the pending task is finished or cancelled
190            let mut res = None;
191            let mut finished = false;
192            loop {
193                match &*pending.rx.borrow_and_update() {
194                    ArchiveTaskState::None => {}
195                    ArchiveTaskState::Finished(archive) => {
196                        res = archive.clone();
197                        finished = true;
198                        break;
199                    }
200                    ArchiveTaskState::Cancelled => break,
201                }
202                if pending.rx.changed().await.is_err() {
203                    break;
204                }
205            }
206
207            // Replace pending with downloaded
208            match self.known_archives.lock().entry(pending.archive_key) {
209                btree_map::Entry::Vacant(_) => {
210                    // Do nothing if the entry was already removed.
211                }
212                btree_map::Entry::Occupied(mut entry) => match &res {
213                    None => {
214                        // Task was either cancelled or received `TooNew` so no archive received.
215                        entry.remove();
216                    }
217                    Some(info) => {
218                        // Task was finished with a non-empty result so store it.
219                        entry.insert(ArchiveSlot::Downloaded(info.clone()));
220                    }
221                },
222            }
223
224            if finished {
225                return res.map(|info| (pending.archive_key, info));
226            }
227
228            tracing::warn!(mc_seqno, "archive task cancelled while in use");
229            // Avoid spinloop just in case.
230            tokio::task::yield_now().await;
231        }
232    }
233
234    fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
235        match self.known_archives.lock().entry(archive_key) {
236            btree_map::Entry::Vacant(_) => false,
237            btree_map::Entry::Occupied(entry) => {
238                if matches!(
239                    entry.get(),
240                    ArchiveSlot::Downloaded(info)
241                    if Arc::ptr_eq(&info.archive, &prev.archive)
242                ) {
243                    entry.remove();
244                    true
245                } else {
246                    false
247                }
248            }
249        }
250    }
251
252    fn make_downloader(&self) -> ArchiveDownloader {
253        ArchiveDownloader {
254            client: self.client.clone(),
255            storage: self.storage.clone(),
256            memory_threshold: self.config.max_archive_to_memory_size,
257        }
258    }
259
260    fn clear_outdated_archives(&self, bound: u32) {
261        // TODO: Move into archive stuff
262        const MAX_MC_PER_ARCHIVE: u32 = 100;
263
264        let mut entries_remaining = 0usize;
265        let mut entries_removed = 0usize;
266
267        let mut guard = self.known_archives.lock();
268        guard.retain(|_, archive| {
269            let retain;
270            match archive {
271                ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
272                    None => retain = false,
273                    Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
274                },
275                ArchiveSlot::Pending(task) => {
276                    retain = task.archive_key.saturating_add(MAX_MC_PER_ARCHIVE) >= bound;
277                    if !retain {
278                        task.abort_handle.abort();
279                    }
280                }
281            };
282
283            entries_remaining += retain as usize;
284            entries_removed += !retain as usize;
285            retain
286        });
287        drop(guard);
288
289        tracing::debug!(
290            entries_remaining,
291            entries_removed,
292            bound,
293            "removed known archives"
294        );
295    }
296}
297
298type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
299
300enum ArchiveSlot {
301    Downloaded(ArchiveInfo),
302    Pending(ArchiveTask),
303}
304
305#[derive(Clone)]
306struct ArchiveInfo {
307    from: Neighbour,
308    archive: Arc<Archive>,
309}
310
311struct ArchiveDownloader {
312    client: BlockchainRpcClient,
313    storage: CoreStorage,
314    memory_threshold: ByteSize,
315}
316
317impl ArchiveDownloader {
318    fn spawn(self, mc_seqno: u32) -> ArchiveTask {
319        // TODO: Use a proper backoff here?
320        const INTERVAL: Duration = Duration::from_secs(1);
321
322        let (tx, rx) = watch::channel(ArchiveTaskState::None);
323
324        let guard = scopeguard::guard(tx, move |tx| {
325            tracing::warn!(mc_seqno, "cancelled preloading archive");
326            tx.send_modify(|prev| {
327                if !matches!(prev, ArchiveTaskState::Finished(..)) {
328                    *prev = ArchiveTaskState::Cancelled;
329                }
330            });
331        });
332
333        // NOTE: Use a separate downloader to prevent reference cycles
334        let handle = tokio::spawn(async move {
335            tracing::debug!(mc_seqno, "started preloading archive");
336            scopeguard::defer! {
337                tracing::debug!(mc_seqno, "finished preloading archive");
338            }
339
340            loop {
341                match self.try_download(mc_seqno).await {
342                    Ok(res) => {
343                        let tx = scopeguard::ScopeGuard::into_inner(guard);
344                        tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
345                        break;
346                    }
347                    Err(e) => {
348                        tracing::error!(mc_seqno, "failed to preload archive {e}");
349                        tokio::time::sleep(INTERVAL).await;
350                    }
351                }
352            }
353        });
354
355        ArchiveTask {
356            archive_key: mc_seqno,
357            rx,
358            abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
359        }
360    }
361
362    async fn try_download(&self, seqno: u32) -> Result<Option<ArchiveInfo>> {
363        let response = self.client.find_archive(seqno).await?;
364        let pending = match response {
365            PendingArchiveResponse::Found(pending) => pending,
366            PendingArchiveResponse::TooNew => return Ok(None),
367        };
368
369        let neighbour = pending.neighbour.clone();
370
371        let writer = self.get_archive_writer(&pending)?;
372        let writer = self.client.download_archive(pending, writer).await?;
373
374        let span = tracing::Span::current();
375        tokio::task::spawn_blocking(move || {
376            let _span = span.enter();
377
378            let bytes = writer.try_freeze()?;
379
380            let archive = match Archive::new(bytes) {
381                Ok(array) => array,
382                Err(e) => {
383                    neighbour.punish(PunishReason::Malicious);
384                    return Err(e);
385                }
386            };
387
388            if let Err(e) = archive.check_mc_blocks_range() {
389                // TODO: Punish a bit less for missing mc blocks?
390                neighbour.punish(PunishReason::Malicious);
391                return Err(e);
392            }
393
394            Ok(ArchiveInfo {
395                archive: Arc::new(archive),
396                from: neighbour,
397            })
398        })
399        .await?
400        .map(Some)
401    }
402
403    fn get_archive_writer(&self, pending: &PendingArchive) -> Result<ArchiveWriter> {
404        Ok(if pending.size.get() > self.memory_threshold.as_u64() {
405            let file = self.storage.context().temp_files().unnamed_file().open()?;
406            ArchiveWriter::File(std::io::BufWriter::new(file))
407        } else {
408            ArchiveWriter::Bytes(BytesMut::new().writer())
409        })
410    }
411}
412
413#[derive(Clone)]
414struct ArchiveTask {
415    archive_key: u32,
416    rx: watch::Receiver<ArchiveTaskState>,
417    abort_handle: Arc<AbortOnDrop>,
418}
419
420#[repr(transparent)]
421struct AbortOnDrop(AbortHandle);
422
423impl std::ops::Deref for AbortOnDrop {
424    type Target = AbortHandle;
425
426    #[inline]
427    fn deref(&self) -> &Self::Target {
428        &self.0
429    }
430}
431
432impl Drop for AbortOnDrop {
433    fn drop(&mut self) {
434        self.0.abort();
435    }
436}
437
438#[derive(Default)]
439enum ArchiveTaskState {
440    #[default]
441    None,
442    Finished(Option<ArchiveInfo>),
443    Cancelled,
444}
445
446impl BlockProvider for ArchiveBlockProvider {
447    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
448    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
449    type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
450
451    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
452        Box::pin(self.get_next_block_impl(prev_block_id))
453    }
454
455    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
456        Box::pin(self.get_block_impl(block_id_relation))
457    }
458
459    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
460        self.inner.clear_outdated_archives(mc_seqno);
461        futures_util::future::ready(Ok(()))
462    }
463}
464
465enum ArchiveWriter {
466    File(std::io::BufWriter<std::fs::File>),
467    Bytes(bytes::buf::Writer<BytesMut>),
468}
469
470impl ArchiveWriter {
471    fn try_freeze(self) -> Result<Bytes, std::io::Error> {
472        match self {
473            Self::File(file) => match file.into_inner() {
474                Ok(mut file) => {
475                    file.seek(std::io::SeekFrom::Start(0))?;
476                    MappedFile::from_existing_file(file).map(Bytes::from_owner)
477                }
478                Err(e) => Err(e.into_error()),
479            },
480            Self::Bytes(data) => Ok(data.into_inner().freeze()),
481        }
482    }
483}
484
485impl std::io::Write for ArchiveWriter {
486    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
487        match self {
488            Self::File(writer) => writer.write(buf),
489            Self::Bytes(writer) => writer.write(buf),
490        }
491    }
492
493    fn flush(&mut self) -> std::io::Result<()> {
494        match self {
495            Self::File(writer) => writer.flush(),
496            Self::Bytes(writer) => writer.flush(),
497        }
498    }
499
500    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
501        match self {
502            Self::File(writer) => writer.write_all(buf),
503            Self::Bytes(writer) => writer.write_all(buf),
504        }
505    }
506
507    fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> std::io::Result<()> {
508        match self {
509            Self::File(writer) => writer.write_fmt(fmt),
510            Self::Bytes(writer) => writer.write_fmt(fmt),
511        }
512    }
513}