tycho_core/block_strider/provider/
archive_provider.rs

1use std::collections::{BTreeMap, btree_map};
2use std::io::Seek;
3use std::num::NonZeroU64;
4use std::pin::pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::time::Duration;
8
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use bytes::{BufMut, Bytes, BytesMut};
12use bytesize::ByteSize;
13use futures_util::future::BoxFuture;
14use serde::{Deserialize, Serialize};
15use tokio::sync::watch;
16use tokio::task::AbortHandle;
17use tycho_block_util::archive::Archive;
18use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
19use tycho_storage::fs::MappedFile;
20use tycho_types::models::BlockId;
21
22use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
23use crate::blockchain_rpc;
24use crate::blockchain_rpc::BlockchainRpcClient;
25use crate::overlay_client::{Neighbour, PunishReason};
26#[cfg(feature = "s3")]
27use crate::s3::S3Client;
28use crate::storage::CoreStorage;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(default)]
32pub struct ArchiveBlockProviderConfig {
33    pub max_archive_to_memory_size: ByteSize,
34}
35
36impl Default for ArchiveBlockProviderConfig {
37    fn default() -> Self {
38        Self {
39            max_archive_to_memory_size: ByteSize::mb(100),
40        }
41    }
42}
43
44#[derive(Clone)]
45#[repr(transparent)]
46pub struct ArchiveBlockProvider {
47    inner: Arc<Inner>,
48}
49
50impl ArchiveBlockProvider {
51    pub fn new(
52        client: impl IntoArchiveClient,
53        storage: CoreStorage,
54        config: ArchiveBlockProviderConfig,
55    ) -> Self {
56        let proof_checker = ProofChecker::new(storage.clone());
57
58        Self {
59            inner: Arc::new(Inner {
60                client: client.into_archive_client(),
61                proof_checker,
62
63                known_archives: parking_lot::Mutex::new(Default::default()),
64
65                storage,
66                config,
67            }),
68        }
69    }
70
71    async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
72        let this = self.inner.as_ref();
73
74        let next_mc_seqno = block_id.seqno + 1;
75
76        loop {
77            let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
78                tracing::warn!(prev_block_id = ?block_id, "archive not found");
79                break None;
80            };
81
82            let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
83                tracing::error!(
84                    "received archive does not contain mc block with seqno {next_mc_seqno}"
85                );
86                this.remove_archive_if_same(archive_key, &info);
87                if let Some(from) = &info.from {
88                    from.punish(PunishReason::Malicious);
89                }
90                continue;
91            };
92
93            match self
94                .checked_get_entry_by_id(&info.archive, block_id, block_id)
95                .await
96            {
97                Ok(block) => return Some(Ok(block.clone())),
98                Err(e) => {
99                    tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
100                    this.remove_archive_if_same(archive_key, &info);
101                    if let Some(from) = &info.from {
102                        from.punish(PunishReason::Malicious);
103                    }
104                }
105            }
106        }
107    }
108
109    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
110        let this = self.inner.as_ref();
111
112        let block_id = block_id_relation.block_id;
113        let mc_block_id = block_id_relation.mc_block_id;
114
115        loop {
116            let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
117                tracing::warn!("shard block is too new for archives");
118
119                // NOTE: This is a strange situation, but if we wait a bit it might go away.
120                tokio::time::sleep(Duration::from_secs(1)).await;
121                continue;
122            };
123
124            match self
125                .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
126                .await
127            {
128                Ok(block) => return Some(Ok(block.clone())),
129                Err(e) => {
130                    tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
131                    this.remove_archive_if_same(archive_key, &info);
132                    if let Some(from) = &info.from {
133                        from.punish(PunishReason::Malicious);
134                    }
135                }
136            }
137        }
138    }
139
140    async fn checked_get_entry_by_id(
141        &self,
142        archive: &Arc<Archive>,
143        mc_block_id: &BlockId,
144        block_id: &BlockId,
145    ) -> Result<BlockStuffAug> {
146        let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
147            Ok(entry) => entry,
148            Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
149        };
150
151        self.inner
152            .proof_checker
153            .check_proof(CheckProof {
154                mc_block_id,
155                block: &block,
156                proof,
157                queue_diff,
158                store_on_success: true,
159            })
160            .await?;
161
162        Ok(block)
163    }
164}
165
166struct Inner {
167    storage: CoreStorage,
168
169    client: Arc<dyn ArchiveClient>,
170    proof_checker: ProofChecker,
171
172    known_archives: parking_lot::Mutex<ArchivesMap>,
173
174    config: ArchiveBlockProviderConfig,
175}
176
177impl Inner {
178    async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
179        loop {
180            let mut pending = 'pending: {
181                let mut guard = self.known_archives.lock();
182
183                // Search for the downloaded archive or for and existing downloader task.
184                for (archive_key, value) in guard.iter() {
185                    match value {
186                        ArchiveSlot::Downloaded(info) => {
187                            if info.archive.mc_block_ids.contains_key(&mc_seqno) {
188                                return Some((*archive_key, info.clone()));
189                            }
190                        }
191                        ArchiveSlot::Pending(task) => break 'pending task.clone(),
192                    }
193                }
194
195                // Start downloading otherwise
196                let task = self.make_downloader().spawn(mc_seqno);
197                guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
198
199                task
200            };
201
202            // Wait until the pending task is finished or cancelled
203            let mut res = None;
204            let mut finished = false;
205            loop {
206                match &*pending.rx.borrow_and_update() {
207                    ArchiveTaskState::None => {}
208                    ArchiveTaskState::Finished(archive) => {
209                        res = archive.clone();
210                        finished = true;
211                        break;
212                    }
213                    ArchiveTaskState::Cancelled => break,
214                }
215                if pending.rx.changed().await.is_err() {
216                    break;
217                }
218            }
219
220            // Replace pending with downloaded
221            match self.known_archives.lock().entry(pending.archive_key) {
222                btree_map::Entry::Vacant(_) => {
223                    // Do nothing if the entry was already removed.
224                }
225                btree_map::Entry::Occupied(mut entry) => match &res {
226                    None => {
227                        // Task was either cancelled or received `TooNew` so no archive received.
228                        entry.remove();
229                    }
230                    Some(info) => {
231                        // Task was finished with a non-empty result so store it.
232                        entry.insert(ArchiveSlot::Downloaded(info.clone()));
233                    }
234                },
235            }
236
237            if finished {
238                return res.map(|info| (pending.archive_key, info));
239            }
240
241            tracing::warn!(mc_seqno, "archive task cancelled while in use");
242            // Avoid spinloop just in case.
243            tokio::task::yield_now().await;
244        }
245    }
246
247    fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
248        match self.known_archives.lock().entry(archive_key) {
249            btree_map::Entry::Vacant(_) => false,
250            btree_map::Entry::Occupied(entry) => {
251                if matches!(
252                    entry.get(),
253                    ArchiveSlot::Downloaded(info)
254                    if Arc::ptr_eq(&info.archive, &prev.archive)
255                ) {
256                    entry.remove();
257                    true
258                } else {
259                    false
260                }
261            }
262        }
263    }
264
265    fn make_downloader(&self) -> ArchiveDownloader {
266        ArchiveDownloader {
267            client: self.client.clone(),
268            storage: self.storage.clone(),
269            memory_threshold: self.config.max_archive_to_memory_size,
270        }
271    }
272
273    fn clear_outdated_archives(&self, bound: u32) {
274        let mut entries_remaining = 0usize;
275        let mut entries_removed = 0usize;
276
277        let mut guard = self.known_archives.lock();
278        guard.retain(|_, archive| {
279            let retain;
280            match archive {
281                ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
282                    None => retain = false,
283                    Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
284                },
285                ArchiveSlot::Pending(task) => {
286                    retain = task
287                        .archive_key
288                        .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
289                        >= bound;
290                    if !retain {
291                        task.abort_handle.abort();
292                    }
293                }
294            };
295
296            entries_remaining += retain as usize;
297            entries_removed += !retain as usize;
298            retain
299        });
300        drop(guard);
301
302        tracing::debug!(
303            entries_remaining,
304            entries_removed,
305            bound,
306            "removed known archives"
307        );
308    }
309}
310
311type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
312
313enum ArchiveSlot {
314    Downloaded(ArchiveInfo),
315    Pending(ArchiveTask),
316}
317
318#[derive(Clone)]
319struct ArchiveInfo {
320    from: Option<Neighbour>, // None for S3
321    archive: Arc<Archive>,
322}
323
324struct ArchiveDownloader {
325    client: Arc<dyn ArchiveClient>,
326    storage: CoreStorage,
327    memory_threshold: ByteSize,
328}
329
330impl ArchiveDownloader {
331    fn spawn(self, mc_seqno: u32) -> ArchiveTask {
332        // TODO: Use a proper backoff here?
333        const INTERVAL: Duration = Duration::from_secs(1);
334
335        let (tx, rx) = watch::channel(ArchiveTaskState::None);
336
337        let guard = scopeguard::guard(tx, move |tx| {
338            tracing::warn!(mc_seqno, "cancelled preloading archive");
339            tx.send_modify(|prev| {
340                if !matches!(prev, ArchiveTaskState::Finished(..)) {
341                    *prev = ArchiveTaskState::Cancelled;
342                }
343            });
344        });
345
346        // NOTE: Use a separate downloader to prevent reference cycles
347        let handle = tokio::spawn(async move {
348            tracing::debug!(mc_seqno, "started preloading archive");
349            scopeguard::defer! {
350                tracing::debug!(mc_seqno, "finished preloading archive");
351            }
352
353            loop {
354                match self.try_download(mc_seqno).await {
355                    Ok(res) => {
356                        let tx = scopeguard::ScopeGuard::into_inner(guard);
357                        tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
358                        break;
359                    }
360                    Err(e) => {
361                        tracing::error!(mc_seqno, "failed to preload archive {e:?}");
362                        tokio::time::sleep(INTERVAL).await;
363                    }
364                }
365            }
366        });
367
368        ArchiveTask {
369            archive_key: mc_seqno,
370            rx,
371            abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
372        }
373    }
374
375    async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
376        let ctx = ArchiveDownloadContext {
377            storage: &self.storage,
378            memory_threshold: self.memory_threshold,
379        };
380        let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
381            return Ok(None);
382        };
383        let res = (found.download)().await?;
384
385        let span = tracing::Span::current();
386        tokio::task::spawn_blocking(move || {
387            let _span = span.enter();
388
389            let bytes = res.writer.try_freeze()?;
390
391            let archive = match Archive::new(bytes) {
392                Ok(array) => array,
393                Err(e) => {
394                    if let Some(neighbour) = res.neighbour {
395                        neighbour.punish(PunishReason::Malicious);
396                    }
397                    return Err(e);
398                }
399            };
400
401            if let Err(e) = archive.check_mc_blocks_range() {
402                // TODO: Punish a bit less for missing mc blocks?
403                if let Some(neighbour) = res.neighbour {
404                    neighbour.punish(PunishReason::Malicious);
405                }
406                return Err(e);
407            }
408
409            Ok(ArchiveInfo {
410                archive: Arc::new(archive),
411                from: res.neighbour,
412            })
413        })
414        .await?
415        .map(Some)
416    }
417}
418
419#[derive(Clone)]
420struct ArchiveTask {
421    archive_key: u32,
422    rx: watch::Receiver<ArchiveTaskState>,
423    abort_handle: Arc<AbortOnDrop>,
424}
425
426#[repr(transparent)]
427struct AbortOnDrop(AbortHandle);
428
429impl std::ops::Deref for AbortOnDrop {
430    type Target = AbortHandle;
431
432    #[inline]
433    fn deref(&self) -> &Self::Target {
434        &self.0
435    }
436}
437
438impl Drop for AbortOnDrop {
439    fn drop(&mut self) {
440        self.0.abort();
441    }
442}
443
444#[derive(Default)]
445enum ArchiveTaskState {
446    #[default]
447    None,
448    Finished(Option<ArchiveInfo>),
449    Cancelled,
450}
451
452impl BlockProvider for ArchiveBlockProvider {
453    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
454    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
455    type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
456
457    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
458        Box::pin(self.get_next_block_impl(prev_block_id))
459    }
460
461    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
462        Box::pin(self.get_block_impl(block_id_relation))
463    }
464
465    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
466        self.inner.clear_outdated_archives(mc_seqno);
467        futures_util::future::ready(Ok(()))
468    }
469}
470
471pub enum ArchiveWriter {
472    File(std::io::BufWriter<std::fs::File>),
473    Bytes(bytes::buf::Writer<BytesMut>),
474}
475
476impl ArchiveWriter {
477    fn try_freeze(self) -> Result<Bytes, std::io::Error> {
478        match self {
479            Self::File(file) => match file.into_inner() {
480                Ok(mut file) => {
481                    file.seek(std::io::SeekFrom::Start(0))?;
482                    MappedFile::from_existing_file(file).map(Bytes::from_owner)
483                }
484                Err(e) => Err(e.into_error()),
485            },
486            Self::Bytes(data) => Ok(data.into_inner().freeze()),
487        }
488    }
489}
490
491impl std::io::Write for ArchiveWriter {
492    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
493        match self {
494            Self::File(writer) => writer.write(buf),
495            Self::Bytes(writer) => writer.write(buf),
496        }
497    }
498
499    fn flush(&mut self) -> std::io::Result<()> {
500        match self {
501            Self::File(writer) => writer.flush(),
502            Self::Bytes(writer) => writer.flush(),
503        }
504    }
505
506    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
507        match self {
508            Self::File(writer) => writer.write_all(buf),
509            Self::Bytes(writer) => writer.write_all(buf),
510        }
511    }
512
513    fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> std::io::Result<()> {
514        match self {
515            Self::File(writer) => writer.write_fmt(fmt),
516            Self::Bytes(writer) => writer.write_fmt(fmt),
517        }
518    }
519}
520
521pub struct ArchiveResponse {
522    writer: ArchiveWriter,
523    neighbour: Option<Neighbour>,
524}
525
526#[derive(Clone, Copy)]
527pub struct ArchiveDownloadContext<'a> {
528    pub storage: &'a CoreStorage,
529    pub memory_threshold: ByteSize,
530}
531
532impl<'a> ArchiveDownloadContext<'a> {
533    pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<ArchiveWriter> {
534        Ok(if size.get() > self.memory_threshold.as_u64() {
535            let file = self.storage.context().temp_files().unnamed_file().open()?;
536            ArchiveWriter::File(std::io::BufWriter::new(file))
537        } else {
538            ArchiveWriter::Bytes(BytesMut::new().writer())
539        })
540    }
541
542    pub fn compute_archive_id(&self, mc_seqno: u32) -> Result<Option<u32>> {
543        const BLOCKS_PER_ARCHIVE: u32 = Archive::MAX_MC_BLOCKS_PER_ARCHIVE;
544
545        let storage = self.storage;
546
547        // Next block should not be too far in the future.
548        let last_mc_seqno = storage
549            .node_state()
550            .load_last_mc_block_id()
551            .context("no blocks applied yet")?
552            .seqno;
553        if mc_seqno > last_mc_seqno.saturating_add(BLOCKS_PER_ARCHIVE) {
554            return Ok(None);
555        }
556
557        // Archive id must be aligned to the latest key block id.
558        let prev_key_block_seqno = storage
559            .block_handle_storage()
560            .find_prev_key_block(mc_seqno)
561            .map(|handle| handle.id().seqno)
562            .unwrap_or_default();
563
564        // Key block causes the archive to be split earlier,
565        // after that archive sizes start all over again.
566        // Example:
567        // 001 101 201 301 401 <keyblock at 450> 451 551 651 751 ...
568        let archive_id_base = prev_key_block_seqno + 1;
569
570        let mut archive_id = mc_seqno;
571        if archive_id_base < mc_seqno {
572            // Example:
573            // archive_id_base = 151
574            // mc_seqno = 290
575            // archive_id = 290 - (290 - 151) % 100 = 251
576            archive_id -= (mc_seqno - archive_id_base) % BLOCKS_PER_ARCHIVE;
577        }
578
579        Ok(Some(archive_id))
580    }
581}
582
583pub trait IntoArchiveClient {
584    fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
585}
586
587impl<T: ArchiveClient> IntoArchiveClient for (T,) {
588    #[inline]
589    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
590        Arc::new(self.0)
591    }
592}
593
594impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
595    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
596        let (primary, secondary) = self;
597        match secondary {
598            None => Arc::new(primary),
599            Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
600        }
601    }
602}
603
604pub struct FoundArchive<'a> {
605    pub archive_id: u64,
606    pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
607}
608
609#[async_trait]
610pub trait ArchiveClient: Send + Sync + 'static {
611    async fn find_archive<'a>(
612        &'a self,
613        mc_seqno: u32,
614        ctx: ArchiveDownloadContext<'a>,
615    ) -> Result<Option<FoundArchive<'a>>>;
616}
617
618#[async_trait]
619impl ArchiveClient for BlockchainRpcClient {
620    async fn find_archive<'a>(
621        &'a self,
622        mc_seqno: u32,
623        ctx: ArchiveDownloadContext<'a>,
624    ) -> Result<Option<FoundArchive<'a>>> {
625        Ok(match self.find_archive(mc_seqno).await? {
626            blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
627                archive_id: found.id,
628                download: Box::new(move || {
629                    Box::pin(async move {
630                        let neighbour = found.neighbour.clone();
631
632                        let output = ctx.get_archive_writer(found.size)?;
633                        let writer = self.download_archive(found, output).await?;
634
635                        Ok(ArchiveResponse {
636                            writer,
637                            neighbour: Some(neighbour),
638                        })
639                    })
640                }),
641            }),
642            blockchain_rpc::PendingArchiveResponse::TooNew => None,
643        })
644    }
645}
646
647impl IntoArchiveClient for BlockchainRpcClient {
648    #[inline]
649    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
650        Arc::new(self)
651    }
652}
653
654#[cfg(feature = "s3")]
655#[async_trait]
656impl ArchiveClient for S3Client {
657    async fn find_archive<'a>(
658        &'a self,
659        mc_seqno: u32,
660        ctx: ArchiveDownloadContext<'a>,
661    ) -> Result<Option<FoundArchive<'a>>> {
662        let Some(archive_id) = ctx.compute_archive_id(mc_seqno)? else {
663            return Ok(None);
664        };
665        let Some(info) = self.get_archive_info(archive_id).await? else {
666            return Ok(None);
667        };
668        Ok(Some(FoundArchive {
669            archive_id: info.archive_id as u64,
670            download: Box::new(move || {
671                Box::pin(async move {
672                    let output = ctx.get_archive_writer(info.size)?;
673                    let writer = self.download_archive(info.archive_id, output).await?;
674
675                    Ok(ArchiveResponse {
676                        writer,
677                        neighbour: None,
678                    })
679                })
680            }),
681        }))
682    }
683}
684
685#[cfg(feature = "s3")]
686impl IntoArchiveClient for S3Client {
687    #[inline]
688    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
689        Arc::new(self)
690    }
691}
692
693pub struct HybridArchiveClient<T1, T2> {
694    primary: T1,
695    secondary: T2,
696    prefer: HybridArchiveClientState,
697}
698
699impl<T1, T2> HybridArchiveClient<T1, T2> {
700    pub fn new(primary: T1, secondary: T2) -> Self {
701        Self {
702            primary,
703            secondary,
704            prefer: Default::default(),
705        }
706    }
707}
708
709#[async_trait]
710impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
711where
712    T1: ArchiveClient,
713    T2: ArchiveClient,
714{
715    async fn find_archive<'a>(
716        &'a self,
717        mc_seqno: u32,
718        ctx: ArchiveDownloadContext<'a>,
719    ) -> Result<Option<FoundArchive<'a>>> {
720        // FIXME: There should be a better way of writing this.
721
722        if let Some(prefer) = self.prefer.get() {
723            tracing::debug!(mc_seqno, ?prefer);
724            match prefer {
725                HybridArchiveClientPart::Primary => {
726                    let res = self.primary.find_archive(mc_seqno, ctx).await;
727                    if matches!(&res, Ok(Some(_))) {
728                        return res;
729                    }
730                }
731                HybridArchiveClientPart::Secondary => {
732                    let res = self.secondary.find_archive(mc_seqno, ctx).await;
733                    if matches!(&res, Ok(Some(_))) {
734                        return res;
735                    }
736                }
737            }
738        }
739
740        self.prefer.set(None);
741
742        let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
743        let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
744        match futures_util::future::select(primary, secondary).await {
745            futures_util::future::Either::Left((found, other)) => {
746                match found {
747                    Ok(Some(found)) => {
748                        self.prefer.set(Some(HybridArchiveClientPart::Primary));
749                        return Ok(Some(found));
750                    }
751                    Ok(None) => {}
752                    Err(e) => tracing::warn!("primary archive client error: {e:?}"),
753                }
754                other.await.inspect(|res| {
755                    if res.is_some() {
756                        self.prefer.set(Some(HybridArchiveClientPart::Secondary));
757                    }
758                })
759            }
760            futures_util::future::Either::Right((found, other)) => {
761                match found {
762                    Ok(Some(found)) => {
763                        self.prefer.set(Some(HybridArchiveClientPart::Secondary));
764                        return Ok(Some(found));
765                    }
766                    Ok(None) => {}
767                    Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
768                }
769                other.await.inspect(|res| {
770                    if res.is_some() {
771                        self.prefer.set(Some(HybridArchiveClientPart::Primary));
772                    }
773                })
774            }
775        }
776    }
777}
778
779#[derive(Default)]
780struct HybridArchiveClientState(AtomicU8);
781
782impl HybridArchiveClientState {
783    fn get(&self) -> Option<HybridArchiveClientPart> {
784        match self.0.load(Ordering::Acquire) {
785            1 => Some(HybridArchiveClientPart::Primary),
786            2 => Some(HybridArchiveClientPart::Secondary),
787            _ => None,
788        }
789    }
790
791    fn set(&self, value: Option<HybridArchiveClientPart>) {
792        let value = match value {
793            None => 0,
794            Some(HybridArchiveClientPart::Primary) => 1,
795            Some(HybridArchiveClientPart::Secondary) => 2,
796        };
797        self.0.store(value, Ordering::Release);
798    }
799}
800
801#[derive(Debug, Clone, Copy)]
802enum HybridArchiveClientPart {
803    Primary,
804    Secondary,
805}
806
807impl std::ops::Not for HybridArchiveClientPart {
808    type Output = Self;
809
810    #[inline]
811    fn not(self) -> Self::Output {
812        match self {
813            Self::Primary => Self::Secondary,
814            Self::Secondary => Self::Primary,
815        }
816    }
817}