Skip to main content

tycho_core/block_strider/subscriber/
mod.rs

1use std::future::Future;
2use std::sync::Arc;
3
4use anyhow::Result;
5use futures_util::future::{self, BoxFuture};
6use tycho_block_util::archive::ArchiveData;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_types::models::*;
10
11pub use self::futures::{
12    DelayedTasks, DelayedTasksJoinHandle, DelayedTasksSpawner, OptionHandleFut, OptionPrepareFut,
13};
14pub use self::metrics_subscriber::MetricsSubscriber;
15use crate::storage::CoreStorage;
16
17mod futures;
18mod metrics_subscriber;
19
20// === trait BlockSubscriber ===
21
22#[derive(Clone)]
23pub struct BlockSubscriberContext {
24    /// Related masterchain block id.
25    /// In case of context for mc block this id is the same as `block.id()`.
26    pub mc_block_id: BlockId,
27    /// Related masterchain block flag.
28    /// In case of context for mc block this flag is the same as `is_key_block`.
29    pub mc_is_key_block: bool,
30    /// Whether the `block` from this context is a key block.
31    pub is_key_block: bool,
32    /// Whether the `block` from this context is a top block.
33    pub is_top_block: bool,
34    /// Parsed block data.
35    pub block: BlockStuff,
36    /// Serialized block data.
37    pub archive_data: ArchiveData,
38    /// Delayed tasks to wait before commit.
39    pub delayed: DelayedTasks,
40}
41
42pub trait BlockSubscriber: Send + Sync + 'static {
43    type Prepared: Send;
44
45    type PrepareBlockFut<'a>: Future<Output = Result<Self::Prepared>> + Send + 'a;
46    type HandleBlockFut<'a>: Future<Output = Result<()>> + Send + 'a;
47
48    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a>;
49
50    fn handle_block<'a>(
51        &'a self,
52        cx: &'a BlockSubscriberContext,
53        prepared: Self::Prepared,
54    ) -> Self::HandleBlockFut<'a>;
55}
56
57impl<T: BlockSubscriber> BlockSubscriber for Option<T> {
58    type Prepared = Option<T::Prepared>;
59
60    type PrepareBlockFut<'a> = OptionPrepareFut<T::PrepareBlockFut<'a>>;
61    type HandleBlockFut<'a> = OptionHandleFut<T::HandleBlockFut<'a>>;
62
63    #[inline]
64    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
65        OptionPrepareFut::from(self.as_ref().map(|s| s.prepare_block(cx)))
66    }
67
68    fn handle_block<'a>(
69        &'a self,
70        cx: &'a BlockSubscriberContext,
71        prepared: Self::Prepared,
72    ) -> Self::HandleBlockFut<'a> {
73        OptionHandleFut::from(match (self, prepared) {
74            (Some(subscriber), Some(prepared)) => Some(subscriber.handle_block(cx, prepared)),
75            _ => None,
76        })
77    }
78}
79
80impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
81    type Prepared = T::Prepared;
82
83    type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
84    type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
85
86    #[inline]
87    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
88        <T as BlockSubscriber>::prepare_block(self, cx)
89    }
90
91    #[inline]
92    fn handle_block<'a>(
93        &'a self,
94        cx: &'a BlockSubscriberContext,
95        prepared: Self::Prepared,
96    ) -> Self::HandleBlockFut<'a> {
97        <T as BlockSubscriber>::handle_block(self, cx, prepared)
98    }
99}
100
101impl<T: BlockSubscriber> BlockSubscriber for Arc<T> {
102    type Prepared = T::Prepared;
103
104    type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
105    type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
106
107    #[inline]
108    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
109        <T as BlockSubscriber>::prepare_block(self, cx)
110    }
111
112    #[inline]
113    fn handle_block<'a>(
114        &'a self,
115        cx: &'a BlockSubscriberContext,
116        prepared: Self::Prepared,
117    ) -> Self::HandleBlockFut<'a> {
118        <T as BlockSubscriber>::handle_block(self, cx, prepared)
119    }
120}
121
122pub trait BlockSubscriberExt: Sized {
123    fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
124}
125
126impl<B: BlockSubscriber> BlockSubscriberExt for B {
127    fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
128        ChainSubscriber {
129            left: self,
130            right: other,
131        }
132    }
133}
134
135// === trait StateSubscriber ===
136
137pub struct StateSubscriberContext {
138    /// Related masterchain block id.
139    /// In case of context for mc block this id is the same as `block.id()`.
140    pub mc_block_id: BlockId,
141    /// Related masterchain block flag.
142    /// In case of context for mc block this flag is the same as `is_key_block`.
143    pub mc_is_key_block: bool,
144    /// Whether the `block` from this context is a key block.
145    pub is_key_block: bool,
146    /// Parsed block data.
147    pub block: BlockStuff,
148    /// Serialized block data.
149    pub archive_data: ArchiveData,
150    /// Applied shard state.
151    pub state: ShardStateStuff,
152    /// Delayed tasks to wait before commit.
153    pub delayed: DelayedTasks,
154}
155
156pub trait StateSubscriber: Send + Sync + 'static {
157    type HandleStateFut<'a>: Future<Output = Result<()>> + Send + 'a;
158
159    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a>;
160}
161
162impl<T: StateSubscriber> StateSubscriber for Option<T> {
163    type HandleStateFut<'a> = OptionHandleFut<T::HandleStateFut<'a>>;
164
165    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
166        OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_state(cx)))
167    }
168}
169
170impl<T: StateSubscriber> StateSubscriber for Box<T> {
171    type HandleStateFut<'a> = T::HandleStateFut<'a>;
172
173    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
174        <T as StateSubscriber>::handle_state(self, cx)
175    }
176}
177
178impl<T: StateSubscriber> StateSubscriber for Arc<T> {
179    type HandleStateFut<'a> = T::HandleStateFut<'a>;
180
181    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
182        <T as StateSubscriber>::handle_state(self, cx)
183    }
184}
185
186pub trait StateSubscriberExt: Sized {
187    fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
188}
189
190impl<B: StateSubscriber> StateSubscriberExt for B {
191    fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
192        ChainSubscriber {
193            left: self,
194            right: other,
195        }
196    }
197}
198
199// === trait ArchiveSubscriber ===
200
201pub struct ArchiveSubscriberContext<'a> {
202    pub archive_id: u32,
203    pub storage: &'a CoreStorage,
204}
205
206pub trait ArchiveSubscriber: Send + Sync + 'static {
207    type HandleArchiveFut<'a>: Future<Output = Result<()>> + Send + 'a;
208
209    fn handle_archive<'a>(
210        &'a self,
211        cx: &'a ArchiveSubscriberContext<'_>,
212    ) -> Self::HandleArchiveFut<'a>;
213}
214
215impl<T: ArchiveSubscriber> ArchiveSubscriber for Option<T> {
216    type HandleArchiveFut<'a> = OptionHandleFut<T::HandleArchiveFut<'a>>;
217
218    fn handle_archive<'a>(
219        &'a self,
220        cx: &'a ArchiveSubscriberContext<'_>,
221    ) -> Self::HandleArchiveFut<'a> {
222        OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_archive(cx)))
223    }
224}
225
226impl<T: ArchiveSubscriber> ArchiveSubscriber for Box<T> {
227    type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
228
229    fn handle_archive<'a>(
230        &'a self,
231        cx: &'a ArchiveSubscriberContext<'_>,
232    ) -> Self::HandleArchiveFut<'a> {
233        <T as ArchiveSubscriber>::handle_archive(self, cx)
234    }
235}
236
237impl<T: ArchiveSubscriber> ArchiveSubscriber for Arc<T> {
238    type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
239
240    fn handle_archive<'a>(
241        &'a self,
242        cx: &'a ArchiveSubscriberContext<'_>,
243    ) -> Self::HandleArchiveFut<'a> {
244        <T as ArchiveSubscriber>::handle_archive(self, cx)
245    }
246}
247
248pub trait ArchiveSubscriberExt: Sized {
249    fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
250}
251
252impl<B: ArchiveSubscriber> ArchiveSubscriberExt for B {
253    fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
254        ChainSubscriber {
255            left: self,
256            right: other,
257        }
258    }
259}
260
261// === NoopSubscriber ===
262
263#[derive(Default, Debug, Clone, Copy)]
264pub struct NoopSubscriber;
265
266impl BlockSubscriber for NoopSubscriber {
267    type Prepared = ();
268
269    type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
270    type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
271
272    #[inline]
273    fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
274        futures_util::future::ready(Ok(()))
275    }
276
277    #[inline]
278    fn handle_block(
279        &self,
280        _cx: &BlockSubscriberContext,
281        _: Self::Prepared,
282    ) -> Self::HandleBlockFut<'_> {
283        futures_util::future::ready(Ok(()))
284    }
285}
286
287impl StateSubscriber for NoopSubscriber {
288    type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
289
290    fn handle_state(&self, _cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
291        futures_util::future::ready(Ok(()))
292    }
293}
294
295// === ChainSubscriber ===
296
297pub struct ChainSubscriber<T1, T2> {
298    left: T1,
299    right: T2,
300}
301
302impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for ChainSubscriber<T1, T2> {
303    type Prepared = (T1::Prepared, T2::Prepared);
304
305    type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
306    type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
307
308    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
309        let left = self.left.prepare_block(cx);
310        let right = self.right.prepare_block(cx);
311
312        Box::pin(async move {
313            match future::join(left, right).await {
314                (Ok(l), Ok(r)) => Ok((l, r)),
315                (Err(e), _) | (_, Err(e)) => Err(e),
316            }
317        })
318    }
319
320    fn handle_block<'a>(
321        &'a self,
322        cx: &'a BlockSubscriberContext,
323        (left_prepared, right_prepared): Self::Prepared,
324    ) -> Self::HandleBlockFut<'a> {
325        let left = self.left.handle_block(cx, left_prepared);
326        let right = self.right.handle_block(cx, right_prepared);
327
328        Box::pin(async move {
329            left.await?;
330            right.await
331        })
332    }
333}
334
335impl<T1: StateSubscriber, T2: StateSubscriber> StateSubscriber for ChainSubscriber<T1, T2> {
336    type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
337
338    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
339        let left = self.left.handle_state(cx);
340        let right = self.right.handle_state(cx);
341
342        Box::pin(async move {
343            left.await?;
344            right.await
345        })
346    }
347}
348
349// === (T1, ..., Tn) aka `join` ===
350
351macro_rules! impl_subscriber_tuple {
352    ($join_fn:path, |$e:ident| $err_pat:pat, { $($n:tt: $var:ident = $ty:ident),*$(,)? }) => {
353        impl<$($ty),*> BlockSubscriber for ($($ty),*)
354        where
355            $($ty: BlockSubscriber),*
356        {
357            type Prepared = ($($ty::Prepared),*);
358
359            type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
360            type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
361
362            fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
363                $(let $var = self.$n.prepare_block(cx));*;
364
365                Box::pin(async move {
366                    match $join_fn($($var),*).await {
367                        ($(Ok($var)),*) => Ok(($($var),*)),
368                        $err_pat => Err($e),
369                    }
370                })
371            }
372
373            fn handle_block<'a>(
374                &'a self,
375                cx: &'a BlockSubscriberContext,
376                ($($var),*): Self::Prepared,
377            ) -> Self::HandleBlockFut<'a> {
378                $(let $var = self.$n.handle_block(cx, $var));*;
379
380                Box::pin(async move {
381                    match $join_fn($($var),*).await {
382                        $err_pat => Err($e),
383                        _ => Ok(()),
384                    }
385                })
386            }
387        }
388
389        impl<$($ty),*> StateSubscriber for ($($ty),*)
390        where
391            $($ty: StateSubscriber),*
392        {
393            type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
394
395            fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
396                $(let $var = self.$n.handle_state(cx));*;
397
398                Box::pin(async move {
399                    match $join_fn($($var),*).await {
400                        $err_pat => Err($e),
401                        _ => Ok(()),
402                    }
403                })
404            }
405        }
406    };
407}
408
409impl_subscriber_tuple! {
410    futures_util::future::join,
411    |e| (Err(e), _) | (_, Err(e)),
412    {
413        0: a = T0,
414        1: b = T1,
415    }
416}
417
418impl_subscriber_tuple! {
419    futures_util::future::join3,
420    |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
421    {
422        0: a = T0,
423        1: b = T1,
424        2: c = T2,
425    }
426}
427
428impl_subscriber_tuple! {
429    futures_util::future::join4,
430    |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
431    {
432        0: a = T0,
433        1: b = T1,
434        2: c = T2,
435        3: d = T3,
436    }
437}
438
439impl_subscriber_tuple! {
440    futures_util::future::join5,
441    |e|
442        (Err(e), _, _, _, _)
443        | (_, Err(e), _, _, _)
444        | (_, _, Err(e), _, _)
445        | (_, _, _, Err(e), _)
446        | (_, _, _, _, Err(e)),
447    {
448        0: a = T0,
449        1: b = T1,
450        2: c = T2,
451        3: d = T3,
452        4: e = T4,
453    }
454}
455
456#[cfg(any(test, feature = "test"))]
457pub mod test {
458    use super::*;
459
460    #[derive(Default, Debug, Clone, Copy)]
461    pub struct PrintSubscriber;
462
463    impl BlockSubscriber for PrintSubscriber {
464        type Prepared = ();
465
466        type PrepareBlockFut<'a> = future::Ready<Result<()>>;
467        type HandleBlockFut<'a> = future::Ready<Result<()>>;
468
469        fn prepare_block<'a>(
470            &'a self,
471            cx: &'a BlockSubscriberContext,
472        ) -> Self::PrepareBlockFut<'a> {
473            tracing::info!(
474                block_id = %cx.block.id(),
475                mc_block_id = %cx.mc_block_id,
476                "preparing block"
477            );
478            future::ready(Ok(()))
479        }
480
481        fn handle_block(
482            &self,
483            cx: &BlockSubscriberContext,
484            _: Self::Prepared,
485        ) -> Self::HandleBlockFut<'_> {
486            tracing::info!(
487                block_id = %cx.block.id(),
488                mc_block_id = %cx.mc_block_id,
489                "handling block"
490            );
491            future::ready(Ok(()))
492        }
493    }
494
495    impl StateSubscriber for PrintSubscriber {
496        type HandleStateFut<'a> = future::Ready<anyhow::Result<()>>;
497
498        fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
499            tracing::info!(
500                block_id = %cx.block.id(),
501                mc_block_id = %cx.mc_block_id,
502                "handling state"
503            );
504            future::ready(Ok(()))
505        }
506    }
507}