Skip to main content

tycho_core/block_strider/subscriber/
mod.rs

1use std::future::Future;
2use std::sync::Arc;
3
4use anyhow::Result;
5use futures_util::future::{self, BoxFuture};
6use tycho_block_util::archive::ArchiveData;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_types::models::*;
10
11pub use self::futures::{
12    DelayedTasks, DelayedTasksJoinHandle, DelayedTasksSpawner, OptionHandleFut, OptionPrepareFut,
13};
14pub use self::metrics_subscriber::MetricsSubscriber;
15use crate::storage::CoreStorage;
16
17mod futures;
18mod metrics_subscriber;
19
20// === trait BlockSubscriber ===
21
22#[derive(Clone)]
23pub struct BlockSubscriberContext {
24    /// Related masterchain block id.
25    /// In case of context for mc block this id is the same as `block.id()`.
26    pub mc_block_id: BlockId,
27    /// Related masterchain block flag.
28    /// In case of context for mc block this flag is the same as `is_key_block`.
29    pub mc_is_key_block: bool,
30    /// Whether the `block` from this context is a key block.
31    pub is_key_block: bool,
32    /// Parsed block data.
33    pub block: BlockStuff,
34    /// Serialized block data.
35    pub archive_data: ArchiveData,
36    /// Delayed tasks to wait before commit.
37    pub delayed: DelayedTasks,
38}
39
40pub trait BlockSubscriber: Send + Sync + 'static {
41    type Prepared: Send;
42
43    type PrepareBlockFut<'a>: Future<Output = Result<Self::Prepared>> + Send + 'a;
44    type HandleBlockFut<'a>: Future<Output = Result<()>> + Send + 'a;
45
46    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a>;
47
48    fn handle_block<'a>(
49        &'a self,
50        cx: &'a BlockSubscriberContext,
51        prepared: Self::Prepared,
52    ) -> Self::HandleBlockFut<'a>;
53}
54
55impl<T: BlockSubscriber> BlockSubscriber for Option<T> {
56    type Prepared = Option<T::Prepared>;
57
58    type PrepareBlockFut<'a> = OptionPrepareFut<T::PrepareBlockFut<'a>>;
59    type HandleBlockFut<'a> = OptionHandleFut<T::HandleBlockFut<'a>>;
60
61    #[inline]
62    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
63        OptionPrepareFut::from(self.as_ref().map(|s| s.prepare_block(cx)))
64    }
65
66    fn handle_block<'a>(
67        &'a self,
68        cx: &'a BlockSubscriberContext,
69        prepared: Self::Prepared,
70    ) -> Self::HandleBlockFut<'a> {
71        OptionHandleFut::from(match (self, prepared) {
72            (Some(subscriber), Some(prepared)) => Some(subscriber.handle_block(cx, prepared)),
73            _ => None,
74        })
75    }
76}
77
78impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
79    type Prepared = T::Prepared;
80
81    type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
82    type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
83
84    #[inline]
85    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
86        <T as BlockSubscriber>::prepare_block(self, cx)
87    }
88
89    #[inline]
90    fn handle_block<'a>(
91        &'a self,
92        cx: &'a BlockSubscriberContext,
93        prepared: Self::Prepared,
94    ) -> Self::HandleBlockFut<'a> {
95        <T as BlockSubscriber>::handle_block(self, cx, prepared)
96    }
97}
98
99impl<T: BlockSubscriber> BlockSubscriber for Arc<T> {
100    type Prepared = T::Prepared;
101
102    type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
103    type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
104
105    #[inline]
106    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
107        <T as BlockSubscriber>::prepare_block(self, cx)
108    }
109
110    #[inline]
111    fn handle_block<'a>(
112        &'a self,
113        cx: &'a BlockSubscriberContext,
114        prepared: Self::Prepared,
115    ) -> Self::HandleBlockFut<'a> {
116        <T as BlockSubscriber>::handle_block(self, cx, prepared)
117    }
118}
119
120pub trait BlockSubscriberExt: Sized {
121    fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
122}
123
124impl<B: BlockSubscriber> BlockSubscriberExt for B {
125    fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
126        ChainSubscriber {
127            left: self,
128            right: other,
129        }
130    }
131}
132
133// === trait StateSubscriber ===
134
135pub struct StateSubscriberContext {
136    /// Related masterchain block id.
137    /// In case of context for mc block this id is the same as `block.id()`.
138    pub mc_block_id: BlockId,
139    /// Related masterchain block flag.
140    /// In case of context for mc block this flag is the same as `is_key_block`.
141    pub mc_is_key_block: bool,
142    /// Whether the `block` from this context is a key block.
143    pub is_key_block: bool,
144    /// Parsed block data.
145    pub block: BlockStuff,
146    /// Serialized block data.
147    pub archive_data: ArchiveData,
148    /// Applied shard state.
149    pub state: ShardStateStuff,
150    /// Delayed tasks to wait before commit.
151    pub delayed: DelayedTasks,
152}
153
154pub trait StateSubscriber: Send + Sync + 'static {
155    type HandleStateFut<'a>: Future<Output = Result<()>> + Send + 'a;
156
157    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a>;
158}
159
160impl<T: StateSubscriber> StateSubscriber for Option<T> {
161    type HandleStateFut<'a> = OptionHandleFut<T::HandleStateFut<'a>>;
162
163    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
164        OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_state(cx)))
165    }
166}
167
168impl<T: StateSubscriber> StateSubscriber for Box<T> {
169    type HandleStateFut<'a> = T::HandleStateFut<'a>;
170
171    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
172        <T as StateSubscriber>::handle_state(self, cx)
173    }
174}
175
176impl<T: StateSubscriber> StateSubscriber for Arc<T> {
177    type HandleStateFut<'a> = T::HandleStateFut<'a>;
178
179    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
180        <T as StateSubscriber>::handle_state(self, cx)
181    }
182}
183
184pub trait StateSubscriberExt: Sized {
185    fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
186}
187
188impl<B: StateSubscriber> StateSubscriberExt for B {
189    fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
190        ChainSubscriber {
191            left: self,
192            right: other,
193        }
194    }
195}
196
197// === trait ArchiveSubscriber ===
198
199pub struct ArchiveSubscriberContext<'a> {
200    pub archive_id: u32,
201    pub storage: &'a CoreStorage,
202}
203
204pub trait ArchiveSubscriber: Send + Sync + 'static {
205    type HandleArchiveFut<'a>: Future<Output = Result<()>> + Send + 'a;
206
207    fn handle_archive<'a>(
208        &'a self,
209        cx: &'a ArchiveSubscriberContext<'_>,
210    ) -> Self::HandleArchiveFut<'a>;
211}
212
213impl<T: ArchiveSubscriber> ArchiveSubscriber for Option<T> {
214    type HandleArchiveFut<'a> = OptionHandleFut<T::HandleArchiveFut<'a>>;
215
216    fn handle_archive<'a>(
217        &'a self,
218        cx: &'a ArchiveSubscriberContext<'_>,
219    ) -> Self::HandleArchiveFut<'a> {
220        OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_archive(cx)))
221    }
222}
223
224impl<T: ArchiveSubscriber> ArchiveSubscriber for Box<T> {
225    type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
226
227    fn handle_archive<'a>(
228        &'a self,
229        cx: &'a ArchiveSubscriberContext<'_>,
230    ) -> Self::HandleArchiveFut<'a> {
231        <T as ArchiveSubscriber>::handle_archive(self, cx)
232    }
233}
234
235impl<T: ArchiveSubscriber> ArchiveSubscriber for Arc<T> {
236    type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
237
238    fn handle_archive<'a>(
239        &'a self,
240        cx: &'a ArchiveSubscriberContext<'_>,
241    ) -> Self::HandleArchiveFut<'a> {
242        <T as ArchiveSubscriber>::handle_archive(self, cx)
243    }
244}
245
246pub trait ArchiveSubscriberExt: Sized {
247    fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
248}
249
250impl<B: ArchiveSubscriber> ArchiveSubscriberExt for B {
251    fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
252        ChainSubscriber {
253            left: self,
254            right: other,
255        }
256    }
257}
258
259// === NoopSubscriber ===
260
261#[derive(Default, Debug, Clone, Copy)]
262pub struct NoopSubscriber;
263
264impl BlockSubscriber for NoopSubscriber {
265    type Prepared = ();
266
267    type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
268    type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
269
270    #[inline]
271    fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
272        futures_util::future::ready(Ok(()))
273    }
274
275    #[inline]
276    fn handle_block(
277        &self,
278        _cx: &BlockSubscriberContext,
279        _: Self::Prepared,
280    ) -> Self::HandleBlockFut<'_> {
281        futures_util::future::ready(Ok(()))
282    }
283}
284
285impl StateSubscriber for NoopSubscriber {
286    type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
287
288    fn handle_state(&self, _cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
289        futures_util::future::ready(Ok(()))
290    }
291}
292
293// === ChainSubscriber ===
294
295pub struct ChainSubscriber<T1, T2> {
296    left: T1,
297    right: T2,
298}
299
300impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for ChainSubscriber<T1, T2> {
301    type Prepared = (T1::Prepared, T2::Prepared);
302
303    type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
304    type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
305
306    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
307        let left = self.left.prepare_block(cx);
308        let right = self.right.prepare_block(cx);
309
310        Box::pin(async move {
311            match future::join(left, right).await {
312                (Ok(l), Ok(r)) => Ok((l, r)),
313                (Err(e), _) | (_, Err(e)) => Err(e),
314            }
315        })
316    }
317
318    fn handle_block<'a>(
319        &'a self,
320        cx: &'a BlockSubscriberContext,
321        (left_prepared, right_prepared): Self::Prepared,
322    ) -> Self::HandleBlockFut<'a> {
323        let left = self.left.handle_block(cx, left_prepared);
324        let right = self.right.handle_block(cx, right_prepared);
325
326        Box::pin(async move {
327            left.await?;
328            right.await
329        })
330    }
331}
332
333impl<T1: StateSubscriber, T2: StateSubscriber> StateSubscriber for ChainSubscriber<T1, T2> {
334    type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
335
336    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
337        let left = self.left.handle_state(cx);
338        let right = self.right.handle_state(cx);
339
340        Box::pin(async move {
341            left.await?;
342            right.await
343        })
344    }
345}
346
347// === (T1, ..., Tn) aka `join` ===
348
349macro_rules! impl_subscriber_tuple {
350    ($join_fn:path, |$e:ident| $err_pat:pat, { $($n:tt: $var:ident = $ty:ident),*$(,)? }) => {
351        impl<$($ty),*> BlockSubscriber for ($($ty),*)
352        where
353            $($ty: BlockSubscriber),*
354        {
355            type Prepared = ($($ty::Prepared),*);
356
357            type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
358            type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
359
360            fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
361                $(let $var = self.$n.prepare_block(cx));*;
362
363                Box::pin(async move {
364                    match $join_fn($($var),*).await {
365                        ($(Ok($var)),*) => Ok(($($var),*)),
366                        $err_pat => Err($e),
367                    }
368                })
369            }
370
371            fn handle_block<'a>(
372                &'a self,
373                cx: &'a BlockSubscriberContext,
374                ($($var),*): Self::Prepared,
375            ) -> Self::HandleBlockFut<'a> {
376                $(let $var = self.$n.handle_block(cx, $var));*;
377
378                Box::pin(async move {
379                    match $join_fn($($var),*).await {
380                        $err_pat => Err($e),
381                        _ => Ok(()),
382                    }
383                })
384            }
385        }
386
387        impl<$($ty),*> StateSubscriber for ($($ty),*)
388        where
389            $($ty: StateSubscriber),*
390        {
391            type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
392
393            fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
394                $(let $var = self.$n.handle_state(cx));*;
395
396                Box::pin(async move {
397                    match $join_fn($($var),*).await {
398                        $err_pat => Err($e),
399                        _ => Ok(()),
400                    }
401                })
402            }
403        }
404    };
405}
406
407impl_subscriber_tuple! {
408    futures_util::future::join,
409    |e| (Err(e), _) | (_, Err(e)),
410    {
411        0: a = T0,
412        1: b = T1,
413    }
414}
415
416impl_subscriber_tuple! {
417    futures_util::future::join3,
418    |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
419    {
420        0: a = T0,
421        1: b = T1,
422        2: c = T2,
423    }
424}
425
426impl_subscriber_tuple! {
427    futures_util::future::join4,
428    |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
429    {
430        0: a = T0,
431        1: b = T1,
432        2: c = T2,
433        3: d = T3,
434    }
435}
436
437impl_subscriber_tuple! {
438    futures_util::future::join5,
439    |e|
440        (Err(e), _, _, _, _)
441        | (_, Err(e), _, _, _)
442        | (_, _, Err(e), _, _)
443        | (_, _, _, Err(e), _)
444        | (_, _, _, _, Err(e)),
445    {
446        0: a = T0,
447        1: b = T1,
448        2: c = T2,
449        3: d = T3,
450        4: e = T4,
451    }
452}
453
454#[cfg(any(test, feature = "test"))]
455pub mod test {
456    use super::*;
457
458    #[derive(Default, Debug, Clone, Copy)]
459    pub struct PrintSubscriber;
460
461    impl BlockSubscriber for PrintSubscriber {
462        type Prepared = ();
463
464        type PrepareBlockFut<'a> = future::Ready<Result<()>>;
465        type HandleBlockFut<'a> = future::Ready<Result<()>>;
466
467        fn prepare_block<'a>(
468            &'a self,
469            cx: &'a BlockSubscriberContext,
470        ) -> Self::PrepareBlockFut<'a> {
471            tracing::info!(
472                block_id = %cx.block.id(),
473                mc_block_id = %cx.mc_block_id,
474                "preparing block"
475            );
476            future::ready(Ok(()))
477        }
478
479        fn handle_block(
480            &self,
481            cx: &BlockSubscriberContext,
482            _: Self::Prepared,
483        ) -> Self::HandleBlockFut<'_> {
484            tracing::info!(
485                block_id = %cx.block.id(),
486                mc_block_id = %cx.mc_block_id,
487                "handling block"
488            );
489            future::ready(Ok(()))
490        }
491    }
492
493    impl StateSubscriber for PrintSubscriber {
494        type HandleStateFut<'a> = future::Ready<anyhow::Result<()>>;
495
496        fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
497            tracing::info!(
498                block_id = %cx.block.id(),
499                mc_block_id = %cx.mc_block_id,
500                "handling state"
501            );
502            future::ready(Ok(()))
503        }
504    }
505}