Skip to main content

tycho_core/block_strider/provider/
archive_provider.rs

1use std::collections::{BTreeMap, btree_map};
2use std::num::NonZeroU64;
3use std::pin::pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::time::Duration;
7
8use anyhow::Result;
9use async_trait::async_trait;
10use bytes::{BufMut, BytesMut};
11use bytesize::ByteSize;
12use futures_util::future::BoxFuture;
13use serde::{Deserialize, Serialize};
14use tokio::sync::watch;
15use tokio::task::AbortHandle;
16use tycho_block_util::archive::Archive;
17use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
18use tycho_types::models::BlockId;
19use tycho_util::fs::TargetWriter;
20use tycho_util::serde_helpers;
21
22use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
23use crate::blockchain_rpc;
24use crate::blockchain_rpc::BlockchainRpcClient;
25use crate::overlay_client::{Error, Neighbour, PunishReason};
26#[cfg(feature = "s3")]
27use crate::s3::S3Client;
28use crate::storage::CoreStorage;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(default)]
32pub struct ArchiveBlockProviderConfig {
33    pub max_archive_to_memory_size: ByteSize,
34
35    /// Time threshold to consider a block as recent.
36    ///
37    /// Default: 600 secs.
38    #[serde(with = "serde_helpers::humantime")]
39    pub recent_block_threshold: Duration,
40}
41
42impl Default for ArchiveBlockProviderConfig {
43    fn default() -> Self {
44        Self {
45            max_archive_to_memory_size: ByteSize::mb(100),
46            recent_block_threshold: Duration::from_secs(600),
47        }
48    }
49}
50
51#[derive(Clone)]
52#[repr(transparent)]
53pub struct ArchiveBlockProvider {
54    inner: Arc<Inner>,
55}
56
57impl ArchiveBlockProvider {
58    pub fn new(
59        client: impl IntoArchiveClient,
60        storage: CoreStorage,
61        config: ArchiveBlockProviderConfig,
62    ) -> Self {
63        let proof_checker = ProofChecker::new(storage.clone());
64
65        let sync_tracker = SyncTracker::new();
66
67        Self {
68            inner: Arc::new(Inner {
69                client: client.into_archive_client(),
70                proof_checker,
71
72                known_archives: parking_lot::Mutex::new(Default::default()),
73
74                sync_tracker,
75                storage,
76                config,
77            }),
78        }
79    }
80
81    async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
82        let this = self.inner.as_ref();
83
84        let next_mc_seqno = block_id.seqno + 1;
85
86        loop {
87            let Some((archive_key, info)) = this
88                .get_archive(ArchiveRequest::NextMcBlock {
89                    mc_seqno: next_mc_seqno,
90                    prev_block_id: *block_id,
91                })
92                .await
93            else {
94                tracing::warn!(prev_block_id = ?block_id, "archive not found");
95                break None;
96            };
97
98            let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
99                tracing::error!(
100                    "received archive does not contain mc block with seqno {next_mc_seqno}"
101                );
102                this.remove_archive_if_same(archive_key, &info);
103                if let Some(from) = &info.from {
104                    from.punish(PunishReason::Malicious);
105                }
106                continue;
107            };
108
109            match self
110                .checked_get_entry_by_id(&info.archive, block_id, block_id)
111                .await
112            {
113                Ok(block) => {
114                    self.inner.sync_tracker.try_log(archive_key, &block);
115                    return Some(Ok(block.clone()));
116                }
117                Err(e) => {
118                    tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
119                    this.remove_archive_if_same(archive_key, &info);
120                    if let Some(from) = &info.from {
121                        from.punish(PunishReason::Malicious);
122                    }
123                }
124            }
125        }
126    }
127
128    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
129        let this = self.inner.as_ref();
130
131        let block_id = block_id_relation.block_id;
132        let mc_block_id = block_id_relation.mc_block_id;
133
134        loop {
135            let Some((archive_key, info)) = this
136                .get_archive(ArchiveRequest::ShardBlock {
137                    mc_seqno: mc_block_id.seqno,
138                })
139                .await
140            else {
141                tracing::warn!("shard block is too new for archives");
142
143                // NOTE: This is a strange situation, but if we wait a bit it might go away.
144                tokio::time::sleep(Duration::from_secs(1)).await;
145                continue;
146            };
147
148            match self
149                .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
150                .await
151            {
152                Ok(block) => return Some(Ok(block.clone())),
153                Err(e) => {
154                    tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
155                    this.remove_archive_if_same(archive_key, &info);
156                    if let Some(from) = &info.from {
157                        from.punish(PunishReason::Malicious);
158                    }
159                }
160            }
161        }
162    }
163
164    async fn checked_get_entry_by_id(
165        &self,
166        archive: &Arc<Archive>,
167        mc_block_id: &BlockId,
168        block_id: &BlockId,
169    ) -> Result<BlockStuffAug> {
170        let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
171            Ok(entry) => entry,
172            Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
173        };
174
175        self.inner
176            .proof_checker
177            .check_proof(CheckProof {
178                mc_block_id,
179                block: &block,
180                proof,
181                queue_diff,
182                store_on_success: true,
183            })
184            .await?;
185
186        Ok(block)
187    }
188}
189
190struct Inner {
191    storage: CoreStorage,
192
193    client: Arc<dyn ArchiveClient>,
194    proof_checker: ProofChecker,
195
196    known_archives: parking_lot::Mutex<ArchivesMap>,
197
198    sync_tracker: SyncTracker,
199
200    config: ArchiveBlockProviderConfig,
201}
202
203impl Inner {
204    async fn get_archive(&self, request: ArchiveRequest) -> Option<(u32, ArchiveInfo)> {
205        let mc_seqno = request.mc_seqno();
206        loop {
207            let mut pending = 'pending: {
208                let mut guard = self.known_archives.lock();
209
210                // Search for the downloaded archive or for and existing downloader task.
211                for (archive_key, value) in guard.iter() {
212                    match value {
213                        ArchiveSlot::Downloaded(info) => {
214                            if info.archive.mc_block_ids.contains_key(&mc_seqno) {
215                                return Some((*archive_key, info.clone()));
216                            }
217                        }
218                        ArchiveSlot::Pending(task) => break 'pending task.clone(),
219                    }
220                }
221
222                // Start downloading otherwise
223                let task = self.make_downloader().spawn(request);
224                guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
225
226                task
227            };
228
229            // Wait until the pending task is finished or cancelled
230            let mut res = None;
231            let mut finished = false;
232            loop {
233                match &*pending.rx.borrow_and_update() {
234                    ArchiveTaskState::None => {}
235                    ArchiveTaskState::Finished(archive) => {
236                        res = archive.clone();
237                        finished = true;
238                        break;
239                    }
240                    ArchiveTaskState::Cancelled => break,
241                }
242                if pending.rx.changed().await.is_err() {
243                    break;
244                }
245            }
246
247            // Replace pending with downloaded
248            match self.known_archives.lock().entry(pending.archive_key) {
249                btree_map::Entry::Vacant(_) => {
250                    // Do nothing if the entry was already removed.
251                }
252                btree_map::Entry::Occupied(mut entry) => match &res {
253                    None => {
254                        // Task was either cancelled or received `TooNew` so no archive received.
255                        entry.remove();
256                    }
257                    Some(info) => {
258                        // Task was finished with a non-empty result so store it.
259                        entry.insert(ArchiveSlot::Downloaded(info.clone()));
260                    }
261                },
262            }
263
264            if finished {
265                return res.map(|info| (pending.archive_key, info));
266            }
267
268            tracing::warn!(mc_seqno, "archive task cancelled while in use");
269            // Avoid spinloop just in case.
270            tokio::task::yield_now().await;
271        }
272    }
273
274    fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
275        match self.known_archives.lock().entry(archive_key) {
276            btree_map::Entry::Vacant(_) => false,
277            btree_map::Entry::Occupied(entry) => {
278                if matches!(
279                    entry.get(),
280                    ArchiveSlot::Downloaded(info)
281                    if Arc::ptr_eq(&info.archive, &prev.archive)
282                ) {
283                    entry.remove();
284                    true
285                } else {
286                    false
287                }
288            }
289        }
290    }
291
292    fn make_downloader(&self) -> ArchiveDownloader {
293        ArchiveDownloader {
294            client: self.client.clone(),
295            storage: self.storage.clone(),
296            memory_threshold: self.config.max_archive_to_memory_size,
297            recent_block_threshold: self.config.recent_block_threshold,
298        }
299    }
300
301    fn clear_outdated_archives(&self, bound: u32) {
302        let mut entries_remaining = 0usize;
303        let mut entries_removed = 0usize;
304
305        let mut guard = self.known_archives.lock();
306        guard.retain(|_, archive| {
307            let retain;
308            match archive {
309                ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
310                    None => retain = false,
311                    Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
312                },
313                ArchiveSlot::Pending(task) => {
314                    retain = task
315                        .archive_key
316                        .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
317                        >= bound;
318                    if !retain {
319                        task.abort_handle.abort();
320                    }
321                }
322            };
323
324            entries_remaining += retain as usize;
325            entries_removed += !retain as usize;
326            retain
327        });
328        drop(guard);
329
330        tracing::debug!(
331            entries_remaining,
332            entries_removed,
333            bound,
334            "removed known archives"
335        );
336    }
337}
338
339type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
340
341enum ArchiveSlot {
342    Downloaded(ArchiveInfo),
343    Pending(ArchiveTask),
344}
345
346#[derive(Clone, Copy)]
347enum ArchiveRequest {
348    NextMcBlock {
349        mc_seqno: u32,
350        prev_block_id: BlockId,
351    },
352    ShardBlock {
353        mc_seqno: u32,
354    },
355}
356
357impl ArchiveRequest {
358    fn mc_seqno(&self) -> u32 {
359        match self {
360            Self::NextMcBlock { mc_seqno, .. } | Self::ShardBlock { mc_seqno } => *mc_seqno,
361        }
362    }
363
364    fn prev_block_id(&self) -> Option<BlockId> {
365        match self {
366            Self::NextMcBlock { prev_block_id, .. } => Some(*prev_block_id),
367            Self::ShardBlock { .. } => None,
368        }
369    }
370}
371
372#[derive(Clone)]
373struct ArchiveInfo {
374    from: Option<Neighbour>, // None for S3
375    archive: Arc<Archive>,
376}
377
378struct ArchiveDownloader {
379    client: Arc<dyn ArchiveClient>,
380    storage: CoreStorage,
381    memory_threshold: ByteSize,
382    recent_block_threshold: Duration,
383}
384
385impl ArchiveDownloader {
386    fn spawn(self, request: ArchiveRequest) -> ArchiveTask {
387        // TODO: Use a proper backoff here?
388        const INTERVAL: Duration = Duration::from_secs(1);
389
390        let mc_seqno = request.mc_seqno();
391
392        let (tx, rx) = watch::channel(ArchiveTaskState::None);
393
394        let guard = scopeguard::guard(tx, move |tx| {
395            tracing::warn!(mc_seqno, "cancelled preloading archive");
396            tx.send_modify(|prev| {
397                if !matches!(prev, ArchiveTaskState::Finished(..)) {
398                    *prev = ArchiveTaskState::Cancelled;
399                }
400            });
401        });
402
403        // NOTE: Use a separate downloader to prevent reference cycles
404        let handle = tokio::spawn(async move {
405            tracing::debug!(mc_seqno, "started preloading archive");
406            scopeguard::defer! {
407                tracing::debug!(mc_seqno, "finished preloading archive");
408            }
409
410            loop {
411                match self.try_download(mc_seqno).await {
412                    Ok(res) => {
413                        let tx = scopeguard::ScopeGuard::into_inner(guard);
414                        tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
415                        break;
416                    }
417                    Err(e) => {
418                        if let Some(Error::NotFound) = e.downcast_ref::<Error>() {
419                            // Check if block is recent - if so, switch to blockchain provider
420                            if let Some(prev_block_id) = request.prev_block_id()
421                                && let Some(handle) = self
422                                    .storage
423                                    .block_handle_storage()
424                                    .load_handle(&prev_block_id)
425                            {
426                                let now = tycho_util::time::now_sec();
427                                let prev_gen_utime = handle.gen_utime();
428
429                                if now.saturating_sub(prev_gen_utime)
430                                    < self.recent_block_threshold.as_secs() as u32
431                                {
432                                    // Block is recent, switch to blockchain provider
433                                    tracing::info!(mc_seqno, prev_gen_utime, "block is recent");
434
435                                    let tx = scopeguard::ScopeGuard::into_inner(guard);
436                                    tx.send_modify(move |prev| {
437                                        *prev = ArchiveTaskState::Finished(None);
438                                    });
439
440                                    break;
441                                }
442                            }
443                        }
444
445                        tracing::error!(mc_seqno, "failed to preload archive {e:?}");
446                        tokio::time::sleep(INTERVAL).await;
447                    }
448                }
449            }
450        });
451
452        ArchiveTask {
453            archive_key: mc_seqno,
454            rx,
455            abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
456        }
457    }
458
459    async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
460        let ctx = ArchiveDownloadContext {
461            storage: &self.storage,
462            memory_threshold: self.memory_threshold,
463        };
464        let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
465            return Ok(None);
466        };
467        let res = (found.download)().await?;
468
469        let span = tracing::Span::current();
470        tokio::task::spawn_blocking(move || {
471            let _span = span.enter();
472
473            let bytes = res.writer.try_freeze()?;
474
475            let archive = match Archive::new(bytes) {
476                Ok(array) => array,
477                Err(e) => {
478                    if let Some(neighbour) = res.neighbour {
479                        neighbour.punish(PunishReason::Malicious);
480                    }
481                    return Err(e);
482                }
483            };
484
485            if let Err(e) = archive.check_mc_blocks_range() {
486                // TODO: Punish a bit less for missing mc blocks?
487                if let Some(neighbour) = res.neighbour {
488                    neighbour.punish(PunishReason::Malicious);
489                }
490                return Err(e);
491            }
492
493            Ok(ArchiveInfo {
494                archive: Arc::new(archive),
495                from: res.neighbour,
496            })
497        })
498        .await?
499        .map(Some)
500    }
501}
502
503#[derive(Clone)]
504struct ArchiveTask {
505    archive_key: u32,
506    rx: watch::Receiver<ArchiveTaskState>,
507    abort_handle: Arc<AbortOnDrop>,
508}
509
510#[repr(transparent)]
511struct AbortOnDrop(AbortHandle);
512
513impl std::ops::Deref for AbortOnDrop {
514    type Target = AbortHandle;
515
516    #[inline]
517    fn deref(&self) -> &Self::Target {
518        &self.0
519    }
520}
521
522impl Drop for AbortOnDrop {
523    fn drop(&mut self) {
524        self.0.abort();
525    }
526}
527
528#[derive(Default)]
529enum ArchiveTaskState {
530    #[default]
531    None,
532    Finished(Option<ArchiveInfo>),
533    Cancelled,
534}
535
536impl BlockProvider for ArchiveBlockProvider {
537    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
538    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
539    type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
540
541    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
542        Box::pin(self.get_next_block_impl(prev_block_id))
543    }
544
545    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
546        Box::pin(self.get_block_impl(block_id_relation))
547    }
548
549    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
550        self.inner.clear_outdated_archives(mc_seqno);
551        futures_util::future::ready(Ok(()))
552    }
553}
554
555pub struct ArchiveResponse {
556    writer: TargetWriter,
557    neighbour: Option<Neighbour>,
558}
559
560#[derive(Clone, Copy)]
561pub struct ArchiveDownloadContext<'a> {
562    pub storage: &'a CoreStorage,
563    pub memory_threshold: ByteSize,
564}
565
566impl<'a> ArchiveDownloadContext<'a> {
567    pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<TargetWriter> {
568        Ok(if size.get() > self.memory_threshold.as_u64() {
569            let file = self.storage.context().temp_files().unnamed_file().open()?;
570            TargetWriter::File(std::io::BufWriter::new(file))
571        } else {
572            TargetWriter::Bytes(BytesMut::new().writer())
573        })
574    }
575
576    pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
577        self.storage.block_storage().estimate_archive_id(mc_seqno)
578    }
579}
580
581pub trait IntoArchiveClient {
582    fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
583}
584
585impl<T: ArchiveClient> IntoArchiveClient for (T,) {
586    #[inline]
587    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
588        Arc::new(self.0)
589    }
590}
591
592impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
593    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
594        let (primary, secondary) = self;
595        match secondary {
596            None => Arc::new(primary),
597            Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
598        }
599    }
600}
601
602pub struct FoundArchive<'a> {
603    pub archive_id: u64,
604    pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
605}
606
607#[async_trait]
608pub trait ArchiveClient: Send + Sync + 'static {
609    async fn find_archive<'a>(
610        &'a self,
611        mc_seqno: u32,
612        ctx: ArchiveDownloadContext<'a>,
613    ) -> Result<Option<FoundArchive<'a>>>;
614}
615
616#[async_trait]
617impl ArchiveClient for BlockchainRpcClient {
618    async fn find_archive<'a>(
619        &'a self,
620        mc_seqno: u32,
621        ctx: ArchiveDownloadContext<'a>,
622    ) -> Result<Option<FoundArchive<'a>>> {
623        Ok(match self.find_archive(mc_seqno).await? {
624            blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
625                archive_id: found.id,
626                download: Box::new(move || {
627                    Box::pin(async move {
628                        let neighbour = found.neighbour.clone();
629
630                        let output = ctx.get_archive_writer(found.size)?;
631                        let writer = self.download_archive(found, output).await?;
632
633                        Ok(ArchiveResponse {
634                            writer,
635                            neighbour: Some(neighbour),
636                        })
637                    })
638                }),
639            }),
640            blockchain_rpc::PendingArchiveResponse::TooNew => None,
641        })
642    }
643}
644
645impl IntoArchiveClient for BlockchainRpcClient {
646    #[inline]
647    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
648        Arc::new(self)
649    }
650}
651
652#[cfg(feature = "s3")]
653#[async_trait]
654impl ArchiveClient for S3Client {
655    async fn find_archive<'a>(
656        &'a self,
657        mc_seqno: u32,
658        ctx: ArchiveDownloadContext<'a>,
659    ) -> Result<Option<FoundArchive<'a>>> {
660        let archive_id = ctx.estimate_archive_id(mc_seqno);
661
662        let Some(info) = self.get_archive_info(archive_id).await? else {
663            return Ok(None);
664        };
665
666        Ok(Some(FoundArchive {
667            archive_id: info.archive_id as u64,
668            download: Box::new(move || {
669                Box::pin(async move {
670                    let output = ctx.get_archive_writer(info.size)?;
671                    let writer = self.download_archive(info.archive_id, output).await?;
672
673                    Ok(ArchiveResponse {
674                        writer,
675                        neighbour: None,
676                    })
677                })
678            }),
679        }))
680    }
681}
682
683#[cfg(feature = "s3")]
684impl IntoArchiveClient for S3Client {
685    #[inline]
686    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
687        Arc::new(self)
688    }
689}
690
691pub struct HybridArchiveClient<T1, T2> {
692    primary: T1,
693    secondary: T2,
694    prefer: HybridArchiveClientState,
695}
696
697impl<T1, T2> HybridArchiveClient<T1, T2> {
698    pub fn new(primary: T1, secondary: T2) -> Self {
699        Self {
700            primary,
701            secondary,
702            prefer: Default::default(),
703        }
704    }
705}
706
707#[async_trait]
708impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
709where
710    T1: ArchiveClient,
711    T2: ArchiveClient,
712{
713    async fn find_archive<'a>(
714        &'a self,
715        mc_seqno: u32,
716        ctx: ArchiveDownloadContext<'a>,
717    ) -> Result<Option<FoundArchive<'a>>> {
718        // FIXME: There should be a better way of writing this.
719
720        if let Some(prefer) = self.prefer.get() {
721            tracing::debug!(mc_seqno, ?prefer);
722            match prefer {
723                HybridArchiveClientPart::Primary => {
724                    let res = self.primary.find_archive(mc_seqno, ctx).await;
725                    if matches!(&res, Ok(Some(_))) {
726                        return res;
727                    }
728                }
729                HybridArchiveClientPart::Secondary => {
730                    let res = self.secondary.find_archive(mc_seqno, ctx).await;
731                    if matches!(&res, Ok(Some(_))) {
732                        return res;
733                    }
734                }
735            }
736        }
737
738        self.prefer.set(None);
739
740        let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
741        let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
742        match futures_util::future::select(primary, secondary).await {
743            futures_util::future::Either::Left((found, other)) => {
744                match found {
745                    Ok(Some(found)) => {
746                        self.prefer.set(Some(HybridArchiveClientPart::Primary));
747                        return Ok(Some(found));
748                    }
749                    Ok(None) => {}
750                    Err(e) => tracing::warn!("primary archive client error: {e:?}"),
751                }
752                other.await.inspect(|res| {
753                    if res.is_some() {
754                        self.prefer.set(Some(HybridArchiveClientPart::Secondary));
755                    }
756                })
757            }
758            futures_util::future::Either::Right((found, other)) => {
759                match found {
760                    Ok(Some(found)) => {
761                        self.prefer.set(Some(HybridArchiveClientPart::Secondary));
762                        return Ok(Some(found));
763                    }
764                    Ok(None) => {}
765                    Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
766                }
767                other.await.inspect(|res| {
768                    if res.is_some() {
769                        self.prefer.set(Some(HybridArchiveClientPart::Primary));
770                    }
771                })
772            }
773        }
774    }
775}
776
777#[derive(Default)]
778struct HybridArchiveClientState(AtomicU8);
779
780impl HybridArchiveClientState {
781    fn get(&self) -> Option<HybridArchiveClientPart> {
782        match self.0.load(Ordering::Acquire) {
783            1 => Some(HybridArchiveClientPart::Primary),
784            2 => Some(HybridArchiveClientPart::Secondary),
785            _ => None,
786        }
787    }
788
789    fn set(&self, value: Option<HybridArchiveClientPart>) {
790        let value = match value {
791            None => 0,
792            Some(HybridArchiveClientPart::Primary) => 1,
793            Some(HybridArchiveClientPart::Secondary) => 2,
794        };
795        self.0.store(value, Ordering::Release);
796    }
797}
798
799#[derive(Debug, Clone, Copy)]
800enum HybridArchiveClientPart {
801    Primary,
802    Secondary,
803}
804
805impl std::ops::Not for HybridArchiveClientPart {
806    type Output = Self;
807
808    #[inline]
809    fn not(self) -> Self::Output {
810        match self {
811            Self::Primary => Self::Secondary,
812            Self::Secondary => Self::Primary,
813        }
814    }
815}
816
817struct SyncTracker {
818    inner: parking_lot::Mutex<SyncTrackerInner>,
819}
820
821struct SyncTrackerInner {
822    window: std::collections::VecDeque<(u32, u64, u64)>, // (seqno, gen time, insert time)
823    last_archive_key: Option<u32>,
824}
825
826impl SyncTracker {
827    /// Number of blocks to track for ETA calculation
828    const WINDOW_SIZE: usize = 300;
829
830    fn new() -> Self {
831        Self {
832            inner: parking_lot::Mutex::new(SyncTrackerInner {
833                window: std::collections::VecDeque::with_capacity(Self::WINDOW_SIZE),
834                last_archive_key: None,
835            }),
836        }
837    }
838
839    fn try_log(&self, archive_key: u32, block: &BlockStuffAug) -> Option<()> {
840        let mut inner = self.inner.lock();
841
842        let seqno = block.id().seqno;
843
844        // Check if blocks are sequential. If not - clear buffer (provider switch or gap)
845        if let Some((last_seqno, _, _)) = inner.window.back()
846            && seqno != last_seqno + 1
847        {
848            inner.window.clear();
849            inner.last_archive_key = None;
850        }
851
852        let block_info = block.data.load_info().ok()?;
853
854        let now = tycho_util::time::now_millis();
855        let gen_utime = (block_info.gen_utime as u64) * 1000 + (block_info.gen_utime_ms as u64);
856        inner.window.push_back((seqno, gen_utime, now));
857
858        if inner.window.len() > Self::WINDOW_SIZE {
859            inner.window.pop_front();
860        }
861
862        // Check if we are ready to log (log on each archive)
863        if inner.last_archive_key == Some(archive_key) {
864            return None;
865        }
866        inner.last_archive_key = Some(archive_key);
867
868        let (_, first_gen_time, first_insert_time) = inner.window.front()?;
869        let (_, last_gen_time, last_insert_time) = inner.window.back()?;
870
871        if first_gen_time >= last_gen_time || first_insert_time >= last_insert_time {
872            return None;
873        }
874
875        let lag_ms = now.saturating_sub(*last_gen_time);
876
877        let sync_rate =
878            (last_gen_time - first_gen_time) as f64 / (last_insert_time - first_insert_time) as f64;
879
880        // NOTE: We use (sync_rate - 1) cause while syncing the blockchain continues to grow
881        let eta_ms = if sync_rate > 1.0 {
882            (lag_ms as f64 / (sync_rate - 1.0)) as u64
883        } else {
884            lag_ms // if sync_rate <= 1.0, we're falling behind - just show current lag
885        };
886
887        tracing::info!(
888            seqno = block.id().seqno,
889            lag = %humantime::format_duration(Duration::from_millis(lag_ms)),
890            rate = format!("{:.2}x", sync_rate),
891            eta = %humantime::format_duration(Duration::from_millis(eta_ms)),
892            "sync progress"
893        );
894
895        Some(())
896    }
897}