tycho_core/block_strider/
archive_handler.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use futures_util::future::BoxFuture;
5use parking_lot::Mutex;
6use tokio::sync::broadcast;
7use tycho_util::metrics::HistogramGuard;
8
9use crate::block_strider::{
10 ArchiveSubscriber, ArchiveSubscriberContext, BlockSubscriber, BlockSubscriberContext,
11};
12use crate::storage::CoreStorage;
13
14#[repr(transparent)]
15pub struct ArchiveHandler<S> {
16 inner: Arc<Inner<S>>,
17}
18
19impl<S> ArchiveHandler<S>
20where
21 S: ArchiveSubscriber,
22{
23 pub fn new(storage: CoreStorage, archive_subscriber: S) -> Result<Self> {
24 let rx = storage.block_storage().subscribe_to_archive_ids();
25
26 Ok(Self {
27 inner: Arc::new(Inner {
28 storage,
29 archive_subscriber,
30 archive_listener: ArchiveListener::new(rx),
31 }),
32 })
33 }
34
35 async fn handle_block_impl(&self, _cx: &BlockSubscriberContext, _prepared: ()) -> Result<()> {
36 let _histogram = HistogramGuard::begin("tycho_core_archive_handler_handle_block_time");
37
38 loop {
40 let archive_id = {
41 let mut rx = self.inner.archive_listener.rx.lock();
42 match rx.try_recv() {
43 Ok(id) => id,
44 Err(_) => break,
45 }
46 };
47
48 let _histogram = HistogramGuard::begin("tycho_core_subscriber_handle_archive_time");
49
50 let cx = ArchiveSubscriberContext {
51 archive_id,
52 storage: &self.inner.storage,
53 };
54
55 tracing::info!(id = cx.archive_id, "handling archive");
56 self.inner.archive_subscriber.handle_archive(&cx).await?;
57 }
58
59 Ok(())
61 }
62}
63
64impl<S> Clone for ArchiveHandler<S> {
65 #[inline]
66 fn clone(&self) -> Self {
67 Self {
68 inner: self.inner.clone(),
69 }
70 }
71}
72
73impl<S> BlockSubscriber for ArchiveHandler<S>
74where
75 S: ArchiveSubscriber,
76{
77 type Prepared = ();
78
79 type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
80 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
81
82 #[inline]
83 fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
84 futures_util::future::ready(Ok(()))
85 }
86
87 fn handle_block<'a>(
88 &'a self,
89 cx: &'a BlockSubscriberContext,
90 prepared: Self::Prepared,
91 ) -> Self::HandleBlockFut<'a> {
92 Box::pin(self.handle_block_impl(cx, prepared))
93 }
94}
95
96struct Inner<S> {
97 storage: CoreStorage,
98 archive_subscriber: S,
99 archive_listener: ArchiveListener,
100}
101
102struct ArchiveListener {
103 rx: Mutex<broadcast::Receiver<u32>>,
104}
105
106impl ArchiveListener {
107 fn new(rx: broadcast::Receiver<u32>) -> Self {
108 Self { rx: Mutex::new(rx) }
109 }
110}