Skip to main content

tycho_core/block_strider/
archive_handler.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use futures_util::future::BoxFuture;
5use parking_lot::Mutex;
6use tokio::sync::broadcast;
7use tycho_util::metrics::HistogramGuard;
8
9use crate::block_strider::{
10    ArchiveSubscriber, ArchiveSubscriberContext, BlockSubscriber, BlockSubscriberContext,
11};
12use crate::storage::CoreStorage;
13
14#[repr(transparent)]
15pub struct ArchiveHandler<S> {
16    inner: Arc<Inner<S>>,
17}
18
19impl<S> ArchiveHandler<S>
20where
21    S: ArchiveSubscriber,
22{
23    pub fn new(storage: CoreStorage, archive_subscriber: S) -> Result<Self> {
24        let rx = storage.block_storage().subscribe_to_archive_ids();
25
26        Ok(Self {
27            inner: Arc::new(Inner {
28                storage,
29                archive_subscriber,
30                archive_listener: ArchiveListener::new(rx),
31            }),
32        })
33    }
34
35    async fn handle_block_impl(&self, _cx: &BlockSubscriberContext, _prepared: ()) -> Result<()> {
36        let _histogram = HistogramGuard::begin("tycho_core_archive_handler_handle_block_time");
37
38        // Process all available archive
39        loop {
40            let archive_id = {
41                let mut rx = self.inner.archive_listener.rx.lock();
42                match rx.try_recv() {
43                    Ok(id) => id,
44                    Err(_) => break,
45                }
46            };
47
48            let _histogram = HistogramGuard::begin("tycho_core_subscriber_handle_archive_time");
49
50            let cx = ArchiveSubscriberContext {
51                archive_id,
52                storage: &self.inner.storage,
53            };
54
55            tracing::info!(id = cx.archive_id, "handling archive");
56            self.inner.archive_subscriber.handle_archive(&cx).await?;
57        }
58
59        // Done
60        Ok(())
61    }
62}
63
64impl<S> Clone for ArchiveHandler<S> {
65    #[inline]
66    fn clone(&self) -> Self {
67        Self {
68            inner: self.inner.clone(),
69        }
70    }
71}
72
73impl<S> BlockSubscriber for ArchiveHandler<S>
74where
75    S: ArchiveSubscriber,
76{
77    type Prepared = ();
78
79    type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
80    type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
81
82    #[inline]
83    fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
84        futures_util::future::ready(Ok(()))
85    }
86
87    fn handle_block<'a>(
88        &'a self,
89        cx: &'a BlockSubscriberContext,
90        prepared: Self::Prepared,
91    ) -> Self::HandleBlockFut<'a> {
92        Box::pin(self.handle_block_impl(cx, prepared))
93    }
94}
95
96struct Inner<S> {
97    storage: CoreStorage,
98    archive_subscriber: S,
99    archive_listener: ArchiveListener,
100}
101
102struct ArchiveListener {
103    rx: Mutex<broadcast::Receiver<u32>>,
104}
105
106impl ArchiveListener {
107    fn new(rx: broadcast::Receiver<u32>) -> Self {
108        Self { rx: Mutex::new(rx) }
109    }
110}