1use std::collections::{BTreeMap, btree_map};
2use std::io::Seek;
3use std::num::NonZeroU64;
4use std::pin::pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::time::Duration;
8
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use bytes::{BufMut, Bytes, BytesMut};
12use bytesize::ByteSize;
13use futures_util::future::BoxFuture;
14use serde::{Deserialize, Serialize};
15use tokio::sync::watch;
16use tokio::task::AbortHandle;
17use tycho_block_util::archive::Archive;
18use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
19use tycho_storage::fs::MappedFile;
20use tycho_types::models::BlockId;
21
22use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
23use crate::blockchain_rpc;
24use crate::blockchain_rpc::BlockchainRpcClient;
25use crate::overlay_client::{Neighbour, PunishReason};
26#[cfg(feature = "s3")]
27use crate::s3::S3Client;
28use crate::storage::CoreStorage;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(default)]
32pub struct ArchiveBlockProviderConfig {
33 pub max_archive_to_memory_size: ByteSize,
34}
35
36impl Default for ArchiveBlockProviderConfig {
37 fn default() -> Self {
38 Self {
39 max_archive_to_memory_size: ByteSize::mb(100),
40 }
41 }
42}
43
44#[derive(Clone)]
45#[repr(transparent)]
46pub struct ArchiveBlockProvider {
47 inner: Arc<Inner>,
48}
49
50impl ArchiveBlockProvider {
51 pub fn new(
52 client: impl IntoArchiveClient,
53 storage: CoreStorage,
54 config: ArchiveBlockProviderConfig,
55 ) -> Self {
56 let proof_checker = ProofChecker::new(storage.clone());
57
58 Self {
59 inner: Arc::new(Inner {
60 client: client.into_archive_client(),
61 proof_checker,
62
63 known_archives: parking_lot::Mutex::new(Default::default()),
64
65 storage,
66 config,
67 }),
68 }
69 }
70
71 async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
72 let this = self.inner.as_ref();
73
74 let next_mc_seqno = block_id.seqno + 1;
75
76 loop {
77 let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
78 tracing::warn!(prev_block_id = ?block_id, "archive not found");
79 break None;
80 };
81
82 let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
83 tracing::error!(
84 "received archive does not contain mc block with seqno {next_mc_seqno}"
85 );
86 this.remove_archive_if_same(archive_key, &info);
87 if let Some(from) = &info.from {
88 from.punish(PunishReason::Malicious);
89 }
90 continue;
91 };
92
93 match self
94 .checked_get_entry_by_id(&info.archive, block_id, block_id)
95 .await
96 {
97 Ok(block) => return Some(Ok(block.clone())),
98 Err(e) => {
99 tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
100 this.remove_archive_if_same(archive_key, &info);
101 if let Some(from) = &info.from {
102 from.punish(PunishReason::Malicious);
103 }
104 }
105 }
106 }
107 }
108
109 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
110 let this = self.inner.as_ref();
111
112 let block_id = block_id_relation.block_id;
113 let mc_block_id = block_id_relation.mc_block_id;
114
115 loop {
116 let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
117 tracing::warn!("shard block is too new for archives");
118
119 tokio::time::sleep(Duration::from_secs(1)).await;
121 continue;
122 };
123
124 match self
125 .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
126 .await
127 {
128 Ok(block) => return Some(Ok(block.clone())),
129 Err(e) => {
130 tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
131 this.remove_archive_if_same(archive_key, &info);
132 if let Some(from) = &info.from {
133 from.punish(PunishReason::Malicious);
134 }
135 }
136 }
137 }
138 }
139
140 async fn checked_get_entry_by_id(
141 &self,
142 archive: &Arc<Archive>,
143 mc_block_id: &BlockId,
144 block_id: &BlockId,
145 ) -> Result<BlockStuffAug> {
146 let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
147 Ok(entry) => entry,
148 Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
149 };
150
151 self.inner
152 .proof_checker
153 .check_proof(CheckProof {
154 mc_block_id,
155 block: &block,
156 proof,
157 queue_diff,
158 store_on_success: true,
159 })
160 .await?;
161
162 Ok(block)
163 }
164}
165
166struct Inner {
167 storage: CoreStorage,
168
169 client: Arc<dyn ArchiveClient>,
170 proof_checker: ProofChecker,
171
172 known_archives: parking_lot::Mutex<ArchivesMap>,
173
174 config: ArchiveBlockProviderConfig,
175}
176
177impl Inner {
178 async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
179 loop {
180 let mut pending = 'pending: {
181 let mut guard = self.known_archives.lock();
182
183 for (archive_key, value) in guard.iter() {
185 match value {
186 ArchiveSlot::Downloaded(info) => {
187 if info.archive.mc_block_ids.contains_key(&mc_seqno) {
188 return Some((*archive_key, info.clone()));
189 }
190 }
191 ArchiveSlot::Pending(task) => break 'pending task.clone(),
192 }
193 }
194
195 let task = self.make_downloader().spawn(mc_seqno);
197 guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
198
199 task
200 };
201
202 let mut res = None;
204 let mut finished = false;
205 loop {
206 match &*pending.rx.borrow_and_update() {
207 ArchiveTaskState::None => {}
208 ArchiveTaskState::Finished(archive) => {
209 res = archive.clone();
210 finished = true;
211 break;
212 }
213 ArchiveTaskState::Cancelled => break,
214 }
215 if pending.rx.changed().await.is_err() {
216 break;
217 }
218 }
219
220 match self.known_archives.lock().entry(pending.archive_key) {
222 btree_map::Entry::Vacant(_) => {
223 }
225 btree_map::Entry::Occupied(mut entry) => match &res {
226 None => {
227 entry.remove();
229 }
230 Some(info) => {
231 entry.insert(ArchiveSlot::Downloaded(info.clone()));
233 }
234 },
235 }
236
237 if finished {
238 return res.map(|info| (pending.archive_key, info));
239 }
240
241 tracing::warn!(mc_seqno, "archive task cancelled while in use");
242 tokio::task::yield_now().await;
244 }
245 }
246
247 fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
248 match self.known_archives.lock().entry(archive_key) {
249 btree_map::Entry::Vacant(_) => false,
250 btree_map::Entry::Occupied(entry) => {
251 if matches!(
252 entry.get(),
253 ArchiveSlot::Downloaded(info)
254 if Arc::ptr_eq(&info.archive, &prev.archive)
255 ) {
256 entry.remove();
257 true
258 } else {
259 false
260 }
261 }
262 }
263 }
264
265 fn make_downloader(&self) -> ArchiveDownloader {
266 ArchiveDownloader {
267 client: self.client.clone(),
268 storage: self.storage.clone(),
269 memory_threshold: self.config.max_archive_to_memory_size,
270 }
271 }
272
273 fn clear_outdated_archives(&self, bound: u32) {
274 let mut entries_remaining = 0usize;
275 let mut entries_removed = 0usize;
276
277 let mut guard = self.known_archives.lock();
278 guard.retain(|_, archive| {
279 let retain;
280 match archive {
281 ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
282 None => retain = false,
283 Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
284 },
285 ArchiveSlot::Pending(task) => {
286 retain = task
287 .archive_key
288 .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
289 >= bound;
290 if !retain {
291 task.abort_handle.abort();
292 }
293 }
294 };
295
296 entries_remaining += retain as usize;
297 entries_removed += !retain as usize;
298 retain
299 });
300 drop(guard);
301
302 tracing::debug!(
303 entries_remaining,
304 entries_removed,
305 bound,
306 "removed known archives"
307 );
308 }
309}
310
311type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
312
313enum ArchiveSlot {
314 Downloaded(ArchiveInfo),
315 Pending(ArchiveTask),
316}
317
318#[derive(Clone)]
319struct ArchiveInfo {
320 from: Option<Neighbour>, archive: Arc<Archive>,
322}
323
324struct ArchiveDownloader {
325 client: Arc<dyn ArchiveClient>,
326 storage: CoreStorage,
327 memory_threshold: ByteSize,
328}
329
330impl ArchiveDownloader {
331 fn spawn(self, mc_seqno: u32) -> ArchiveTask {
332 const INTERVAL: Duration = Duration::from_secs(1);
334
335 let (tx, rx) = watch::channel(ArchiveTaskState::None);
336
337 let guard = scopeguard::guard(tx, move |tx| {
338 tracing::warn!(mc_seqno, "cancelled preloading archive");
339 tx.send_modify(|prev| {
340 if !matches!(prev, ArchiveTaskState::Finished(..)) {
341 *prev = ArchiveTaskState::Cancelled;
342 }
343 });
344 });
345
346 let handle = tokio::spawn(async move {
348 tracing::debug!(mc_seqno, "started preloading archive");
349 scopeguard::defer! {
350 tracing::debug!(mc_seqno, "finished preloading archive");
351 }
352
353 loop {
354 match self.try_download(mc_seqno).await {
355 Ok(res) => {
356 let tx = scopeguard::ScopeGuard::into_inner(guard);
357 tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
358 break;
359 }
360 Err(e) => {
361 tracing::error!(mc_seqno, "failed to preload archive {e:?}");
362 tokio::time::sleep(INTERVAL).await;
363 }
364 }
365 }
366 });
367
368 ArchiveTask {
369 archive_key: mc_seqno,
370 rx,
371 abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
372 }
373 }
374
375 async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
376 let ctx = ArchiveDownloadContext {
377 storage: &self.storage,
378 memory_threshold: self.memory_threshold,
379 };
380 let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
381 return Ok(None);
382 };
383 let res = (found.download)().await?;
384
385 let span = tracing::Span::current();
386 tokio::task::spawn_blocking(move || {
387 let _span = span.enter();
388
389 let bytes = res.writer.try_freeze()?;
390
391 let archive = match Archive::new(bytes) {
392 Ok(array) => array,
393 Err(e) => {
394 if let Some(neighbour) = res.neighbour {
395 neighbour.punish(PunishReason::Malicious);
396 }
397 return Err(e);
398 }
399 };
400
401 if let Err(e) = archive.check_mc_blocks_range() {
402 if let Some(neighbour) = res.neighbour {
404 neighbour.punish(PunishReason::Malicious);
405 }
406 return Err(e);
407 }
408
409 Ok(ArchiveInfo {
410 archive: Arc::new(archive),
411 from: res.neighbour,
412 })
413 })
414 .await?
415 .map(Some)
416 }
417}
418
419#[derive(Clone)]
420struct ArchiveTask {
421 archive_key: u32,
422 rx: watch::Receiver<ArchiveTaskState>,
423 abort_handle: Arc<AbortOnDrop>,
424}
425
426#[repr(transparent)]
427struct AbortOnDrop(AbortHandle);
428
429impl std::ops::Deref for AbortOnDrop {
430 type Target = AbortHandle;
431
432 #[inline]
433 fn deref(&self) -> &Self::Target {
434 &self.0
435 }
436}
437
438impl Drop for AbortOnDrop {
439 fn drop(&mut self) {
440 self.0.abort();
441 }
442}
443
444#[derive(Default)]
445enum ArchiveTaskState {
446 #[default]
447 None,
448 Finished(Option<ArchiveInfo>),
449 Cancelled,
450}
451
452impl BlockProvider for ArchiveBlockProvider {
453 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
454 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
455 type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
456
457 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
458 Box::pin(self.get_next_block_impl(prev_block_id))
459 }
460
461 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
462 Box::pin(self.get_block_impl(block_id_relation))
463 }
464
465 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
466 self.inner.clear_outdated_archives(mc_seqno);
467 futures_util::future::ready(Ok(()))
468 }
469}
470
471pub enum ArchiveWriter {
472 File(std::io::BufWriter<std::fs::File>),
473 Bytes(bytes::buf::Writer<BytesMut>),
474}
475
476impl ArchiveWriter {
477 fn try_freeze(self) -> Result<Bytes, std::io::Error> {
478 match self {
479 Self::File(file) => match file.into_inner() {
480 Ok(mut file) => {
481 file.seek(std::io::SeekFrom::Start(0))?;
482 MappedFile::from_existing_file(file).map(Bytes::from_owner)
483 }
484 Err(e) => Err(e.into_error()),
485 },
486 Self::Bytes(data) => Ok(data.into_inner().freeze()),
487 }
488 }
489}
490
491impl std::io::Write for ArchiveWriter {
492 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
493 match self {
494 Self::File(writer) => writer.write(buf),
495 Self::Bytes(writer) => writer.write(buf),
496 }
497 }
498
499 fn flush(&mut self) -> std::io::Result<()> {
500 match self {
501 Self::File(writer) => writer.flush(),
502 Self::Bytes(writer) => writer.flush(),
503 }
504 }
505
506 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
507 match self {
508 Self::File(writer) => writer.write_all(buf),
509 Self::Bytes(writer) => writer.write_all(buf),
510 }
511 }
512
513 fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> std::io::Result<()> {
514 match self {
515 Self::File(writer) => writer.write_fmt(fmt),
516 Self::Bytes(writer) => writer.write_fmt(fmt),
517 }
518 }
519}
520
521pub struct ArchiveResponse {
522 writer: ArchiveWriter,
523 neighbour: Option<Neighbour>,
524}
525
526#[derive(Clone, Copy)]
527pub struct ArchiveDownloadContext<'a> {
528 pub storage: &'a CoreStorage,
529 pub memory_threshold: ByteSize,
530}
531
532impl<'a> ArchiveDownloadContext<'a> {
533 pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<ArchiveWriter> {
534 Ok(if size.get() > self.memory_threshold.as_u64() {
535 let file = self.storage.context().temp_files().unnamed_file().open()?;
536 ArchiveWriter::File(std::io::BufWriter::new(file))
537 } else {
538 ArchiveWriter::Bytes(BytesMut::new().writer())
539 })
540 }
541
542 pub fn compute_archive_id(&self, mc_seqno: u32) -> Result<Option<u32>> {
543 const BLOCKS_PER_ARCHIVE: u32 = Archive::MAX_MC_BLOCKS_PER_ARCHIVE;
544
545 let storage = self.storage;
546
547 let last_mc_seqno = storage
549 .node_state()
550 .load_last_mc_block_id()
551 .context("no blocks applied yet")?
552 .seqno;
553 if mc_seqno > last_mc_seqno.saturating_add(BLOCKS_PER_ARCHIVE) {
554 return Ok(None);
555 }
556
557 let prev_key_block_seqno = storage
559 .block_handle_storage()
560 .find_prev_key_block(mc_seqno)
561 .map(|handle| handle.id().seqno)
562 .unwrap_or_default();
563
564 let archive_id_base = prev_key_block_seqno + 1;
569
570 let mut archive_id = mc_seqno;
571 if archive_id_base < mc_seqno {
572 archive_id -= (mc_seqno - archive_id_base) % BLOCKS_PER_ARCHIVE;
577 }
578
579 Ok(Some(archive_id))
580 }
581}
582
583pub trait IntoArchiveClient {
584 fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
585}
586
587impl<T: ArchiveClient> IntoArchiveClient for (T,) {
588 #[inline]
589 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
590 Arc::new(self.0)
591 }
592}
593
594impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
595 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
596 let (primary, secondary) = self;
597 match secondary {
598 None => Arc::new(primary),
599 Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
600 }
601 }
602}
603
604pub struct FoundArchive<'a> {
605 pub archive_id: u64,
606 pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
607}
608
609#[async_trait]
610pub trait ArchiveClient: Send + Sync + 'static {
611 async fn find_archive<'a>(
612 &'a self,
613 mc_seqno: u32,
614 ctx: ArchiveDownloadContext<'a>,
615 ) -> Result<Option<FoundArchive<'a>>>;
616}
617
618#[async_trait]
619impl ArchiveClient for BlockchainRpcClient {
620 async fn find_archive<'a>(
621 &'a self,
622 mc_seqno: u32,
623 ctx: ArchiveDownloadContext<'a>,
624 ) -> Result<Option<FoundArchive<'a>>> {
625 Ok(match self.find_archive(mc_seqno).await? {
626 blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
627 archive_id: found.id,
628 download: Box::new(move || {
629 Box::pin(async move {
630 let neighbour = found.neighbour.clone();
631
632 let output = ctx.get_archive_writer(found.size)?;
633 let writer = self.download_archive(found, output).await?;
634
635 Ok(ArchiveResponse {
636 writer,
637 neighbour: Some(neighbour),
638 })
639 })
640 }),
641 }),
642 blockchain_rpc::PendingArchiveResponse::TooNew => None,
643 })
644 }
645}
646
647impl IntoArchiveClient for BlockchainRpcClient {
648 #[inline]
649 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
650 Arc::new(self)
651 }
652}
653
654#[cfg(feature = "s3")]
655#[async_trait]
656impl ArchiveClient for S3Client {
657 async fn find_archive<'a>(
658 &'a self,
659 mc_seqno: u32,
660 ctx: ArchiveDownloadContext<'a>,
661 ) -> Result<Option<FoundArchive<'a>>> {
662 let Some(archive_id) = ctx.compute_archive_id(mc_seqno)? else {
663 return Ok(None);
664 };
665 let Some(info) = self.get_archive_info(archive_id).await? else {
666 return Ok(None);
667 };
668 Ok(Some(FoundArchive {
669 archive_id: info.archive_id as u64,
670 download: Box::new(move || {
671 Box::pin(async move {
672 let output = ctx.get_archive_writer(info.size)?;
673 let writer = self.download_archive(info.archive_id, output).await?;
674
675 Ok(ArchiveResponse {
676 writer,
677 neighbour: None,
678 })
679 })
680 }),
681 }))
682 }
683}
684
685#[cfg(feature = "s3")]
686impl IntoArchiveClient for S3Client {
687 #[inline]
688 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
689 Arc::new(self)
690 }
691}
692
693pub struct HybridArchiveClient<T1, T2> {
694 primary: T1,
695 secondary: T2,
696 prefer: HybridArchiveClientState,
697}
698
699impl<T1, T2> HybridArchiveClient<T1, T2> {
700 pub fn new(primary: T1, secondary: T2) -> Self {
701 Self {
702 primary,
703 secondary,
704 prefer: Default::default(),
705 }
706 }
707}
708
709#[async_trait]
710impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
711where
712 T1: ArchiveClient,
713 T2: ArchiveClient,
714{
715 async fn find_archive<'a>(
716 &'a self,
717 mc_seqno: u32,
718 ctx: ArchiveDownloadContext<'a>,
719 ) -> Result<Option<FoundArchive<'a>>> {
720 if let Some(prefer) = self.prefer.get() {
723 tracing::debug!(mc_seqno, ?prefer);
724 match prefer {
725 HybridArchiveClientPart::Primary => {
726 let res = self.primary.find_archive(mc_seqno, ctx).await;
727 if matches!(&res, Ok(Some(_))) {
728 return res;
729 }
730 }
731 HybridArchiveClientPart::Secondary => {
732 let res = self.secondary.find_archive(mc_seqno, ctx).await;
733 if matches!(&res, Ok(Some(_))) {
734 return res;
735 }
736 }
737 }
738 }
739
740 self.prefer.set(None);
741
742 let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
743 let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
744 match futures_util::future::select(primary, secondary).await {
745 futures_util::future::Either::Left((found, other)) => {
746 match found {
747 Ok(Some(found)) => {
748 self.prefer.set(Some(HybridArchiveClientPart::Primary));
749 return Ok(Some(found));
750 }
751 Ok(None) => {}
752 Err(e) => tracing::warn!("primary archive client error: {e:?}"),
753 }
754 other.await.inspect(|res| {
755 if res.is_some() {
756 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
757 }
758 })
759 }
760 futures_util::future::Either::Right((found, other)) => {
761 match found {
762 Ok(Some(found)) => {
763 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
764 return Ok(Some(found));
765 }
766 Ok(None) => {}
767 Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
768 }
769 other.await.inspect(|res| {
770 if res.is_some() {
771 self.prefer.set(Some(HybridArchiveClientPart::Primary));
772 }
773 })
774 }
775 }
776 }
777}
778
779#[derive(Default)]
780struct HybridArchiveClientState(AtomicU8);
781
782impl HybridArchiveClientState {
783 fn get(&self) -> Option<HybridArchiveClientPart> {
784 match self.0.load(Ordering::Acquire) {
785 1 => Some(HybridArchiveClientPart::Primary),
786 2 => Some(HybridArchiveClientPart::Secondary),
787 _ => None,
788 }
789 }
790
791 fn set(&self, value: Option<HybridArchiveClientPart>) {
792 let value = match value {
793 None => 0,
794 Some(HybridArchiveClientPart::Primary) => 1,
795 Some(HybridArchiveClientPart::Secondary) => 2,
796 };
797 self.0.store(value, Ordering::Release);
798 }
799}
800
801#[derive(Debug, Clone, Copy)]
802enum HybridArchiveClientPart {
803 Primary,
804 Secondary,
805}
806
807impl std::ops::Not for HybridArchiveClientPart {
808 type Output = Self;
809
810 #[inline]
811 fn not(self) -> Self::Output {
812 match self {
813 Self::Primary => Self::Secondary,
814 Self::Secondary => Self::Primary,
815 }
816 }
817}