tycho_core/block_strider/subscriber/
mod.rs

1use std::future::Future;
2use std::sync::Arc;
3
4use anyhow::Result;
5use futures_util::future::{self, BoxFuture};
6use tycho_block_util::archive::ArchiveData;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_types::models::*;
10
11pub use self::futures::{
12    DelayedTasks, DelayedTasksJoinHandle, DelayedTasksSpawner, OptionHandleFut, OptionPrepareFut,
13};
14pub use self::gc_subscriber::{GcSubscriber, ManualGcTrigger};
15pub use self::metrics_subscriber::MetricsSubscriber;
16pub use self::ps_subscriber::PsSubscriber;
17use crate::storage::CoreStorage;
18
19mod futures;
20mod gc_subscriber;
21mod metrics_subscriber;
22mod ps_subscriber;
23
24// === trait BlockSubscriber ===
25
26#[derive(Clone)]
27pub struct BlockSubscriberContext {
28    /// Related masterchain block id.
29    /// In case of context for mc block this id is the same as `block.id()`.
30    pub mc_block_id: BlockId,
31    /// Related masterchain block flag.
32    /// In case of context for mc block this flag is the same as `is_key_block`.
33    pub mc_is_key_block: bool,
34    /// Whether the `block` from this context is a key block.
35    pub is_key_block: bool,
36    /// Parsed block data.
37    pub block: BlockStuff,
38    /// Serialized block data.
39    pub archive_data: ArchiveData,
40    /// Delayed tasks to wait before commit.
41    pub delayed: DelayedTasks,
42}
43
44pub trait BlockSubscriber: Send + Sync + 'static {
45    type Prepared: Send;
46
47    type PrepareBlockFut<'a>: Future<Output = Result<Self::Prepared>> + Send + 'a;
48    type HandleBlockFut<'a>: Future<Output = Result<()>> + Send + 'a;
49
50    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a>;
51
52    fn handle_block<'a>(
53        &'a self,
54        cx: &'a BlockSubscriberContext,
55        prepared: Self::Prepared,
56    ) -> Self::HandleBlockFut<'a>;
57}
58
59impl<T: BlockSubscriber> BlockSubscriber for Option<T> {
60    type Prepared = Option<T::Prepared>;
61
62    type PrepareBlockFut<'a> = OptionPrepareFut<T::PrepareBlockFut<'a>>;
63    type HandleBlockFut<'a> = OptionHandleFut<T::HandleBlockFut<'a>>;
64
65    #[inline]
66    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
67        OptionPrepareFut::from(self.as_ref().map(|s| s.prepare_block(cx)))
68    }
69
70    fn handle_block<'a>(
71        &'a self,
72        cx: &'a BlockSubscriberContext,
73        prepared: Self::Prepared,
74    ) -> Self::HandleBlockFut<'a> {
75        OptionHandleFut::from(match (self, prepared) {
76            (Some(subscriber), Some(prepared)) => Some(subscriber.handle_block(cx, prepared)),
77            _ => None,
78        })
79    }
80}
81
82impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
83    type Prepared = T::Prepared;
84
85    type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
86    type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
87
88    #[inline]
89    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
90        <T as BlockSubscriber>::prepare_block(self, cx)
91    }
92
93    #[inline]
94    fn handle_block<'a>(
95        &'a self,
96        cx: &'a BlockSubscriberContext,
97        prepared: Self::Prepared,
98    ) -> Self::HandleBlockFut<'a> {
99        <T as BlockSubscriber>::handle_block(self, cx, prepared)
100    }
101}
102
103impl<T: BlockSubscriber> BlockSubscriber for Arc<T> {
104    type Prepared = T::Prepared;
105
106    type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
107    type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
108
109    #[inline]
110    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
111        <T as BlockSubscriber>::prepare_block(self, cx)
112    }
113
114    #[inline]
115    fn handle_block<'a>(
116        &'a self,
117        cx: &'a BlockSubscriberContext,
118        prepared: Self::Prepared,
119    ) -> Self::HandleBlockFut<'a> {
120        <T as BlockSubscriber>::handle_block(self, cx, prepared)
121    }
122}
123
124pub trait BlockSubscriberExt: Sized {
125    fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
126}
127
128impl<B: BlockSubscriber> BlockSubscriberExt for B {
129    fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
130        ChainSubscriber {
131            left: self,
132            right: other,
133        }
134    }
135}
136
137// === trait StateSubscriber ===
138
139pub struct StateSubscriberContext {
140    /// Related masterchain block id.
141    /// In case of context for mc block this id is the same as `block.id()`.
142    pub mc_block_id: BlockId,
143    /// Related masterchain block flag.
144    /// In case of context for mc block this flag is the same as `is_key_block`.
145    pub mc_is_key_block: bool,
146    /// Whether the `block` from this context is a key block.
147    pub is_key_block: bool,
148    /// Parsed block data.
149    pub block: BlockStuff,
150    /// Serialized block data.
151    pub archive_data: ArchiveData,
152    /// Applied shard state.
153    pub state: ShardStateStuff,
154    /// Delayed tasks to wait before commit.
155    pub delayed: DelayedTasks,
156}
157
158pub trait StateSubscriber: Send + Sync + 'static {
159    type HandleStateFut<'a>: Future<Output = Result<()>> + Send + 'a;
160
161    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a>;
162}
163
164impl<T: StateSubscriber> StateSubscriber for Option<T> {
165    type HandleStateFut<'a> = OptionHandleFut<T::HandleStateFut<'a>>;
166
167    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
168        OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_state(cx)))
169    }
170}
171
172impl<T: StateSubscriber> StateSubscriber for Box<T> {
173    type HandleStateFut<'a> = T::HandleStateFut<'a>;
174
175    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
176        <T as StateSubscriber>::handle_state(self, cx)
177    }
178}
179
180impl<T: StateSubscriber> StateSubscriber for Arc<T> {
181    type HandleStateFut<'a> = T::HandleStateFut<'a>;
182
183    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
184        <T as StateSubscriber>::handle_state(self, cx)
185    }
186}
187
188pub trait StateSubscriberExt: Sized {
189    fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
190}
191
192impl<B: StateSubscriber> StateSubscriberExt for B {
193    fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
194        ChainSubscriber {
195            left: self,
196            right: other,
197        }
198    }
199}
200
201// === trait ArchiveSubscriber ===
202
203pub struct ArchiveSubscriberContext<'a> {
204    pub archive_id: u32,
205    pub storage: &'a CoreStorage,
206}
207
208pub trait ArchiveSubscriber: Send + Sync + 'static {
209    type HandleArchiveFut<'a>: Future<Output = Result<()>> + Send + 'a;
210
211    fn handle_archive<'a>(
212        &'a self,
213        cx: &'a ArchiveSubscriberContext<'_>,
214    ) -> Self::HandleArchiveFut<'a>;
215}
216
217impl<T: ArchiveSubscriber> ArchiveSubscriber for Option<T> {
218    type HandleArchiveFut<'a> = OptionHandleFut<T::HandleArchiveFut<'a>>;
219
220    fn handle_archive<'a>(
221        &'a self,
222        cx: &'a ArchiveSubscriberContext<'_>,
223    ) -> Self::HandleArchiveFut<'a> {
224        OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_archive(cx)))
225    }
226}
227
228impl<T: ArchiveSubscriber> ArchiveSubscriber for Box<T> {
229    type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
230
231    fn handle_archive<'a>(
232        &'a self,
233        cx: &'a ArchiveSubscriberContext<'_>,
234    ) -> Self::HandleArchiveFut<'a> {
235        <T as ArchiveSubscriber>::handle_archive(self, cx)
236    }
237}
238
239impl<T: ArchiveSubscriber> ArchiveSubscriber for Arc<T> {
240    type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
241
242    fn handle_archive<'a>(
243        &'a self,
244        cx: &'a ArchiveSubscriberContext<'_>,
245    ) -> Self::HandleArchiveFut<'a> {
246        <T as ArchiveSubscriber>::handle_archive(self, cx)
247    }
248}
249
250pub trait ArchiveSubscriberExt: Sized {
251    fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
252}
253
254impl<B: ArchiveSubscriber> ArchiveSubscriberExt for B {
255    fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
256        ChainSubscriber {
257            left: self,
258            right: other,
259        }
260    }
261}
262
263// === NoopSubscriber ===
264
265#[derive(Default, Debug, Clone, Copy)]
266pub struct NoopSubscriber;
267
268impl BlockSubscriber for NoopSubscriber {
269    type Prepared = ();
270
271    type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
272    type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
273
274    #[inline]
275    fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
276        futures_util::future::ready(Ok(()))
277    }
278
279    #[inline]
280    fn handle_block(
281        &self,
282        _cx: &BlockSubscriberContext,
283        _: Self::Prepared,
284    ) -> Self::HandleBlockFut<'_> {
285        futures_util::future::ready(Ok(()))
286    }
287}
288
289impl StateSubscriber for NoopSubscriber {
290    type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
291
292    fn handle_state(&self, _cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
293        futures_util::future::ready(Ok(()))
294    }
295}
296
297// === ChainSubscriber ===
298
299pub struct ChainSubscriber<T1, T2> {
300    left: T1,
301    right: T2,
302}
303
304impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for ChainSubscriber<T1, T2> {
305    type Prepared = (T1::Prepared, T2::Prepared);
306
307    type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
308    type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
309
310    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
311        let left = self.left.prepare_block(cx);
312        let right = self.right.prepare_block(cx);
313
314        Box::pin(async move {
315            match future::join(left, right).await {
316                (Ok(l), Ok(r)) => Ok((l, r)),
317                (Err(e), _) | (_, Err(e)) => Err(e),
318            }
319        })
320    }
321
322    fn handle_block<'a>(
323        &'a self,
324        cx: &'a BlockSubscriberContext,
325        (left_prepared, right_prepared): Self::Prepared,
326    ) -> Self::HandleBlockFut<'a> {
327        let left = self.left.handle_block(cx, left_prepared);
328        let right = self.right.handle_block(cx, right_prepared);
329
330        Box::pin(async move {
331            left.await?;
332            right.await
333        })
334    }
335}
336
337impl<T1: StateSubscriber, T2: StateSubscriber> StateSubscriber for ChainSubscriber<T1, T2> {
338    type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
339
340    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
341        let left = self.left.handle_state(cx);
342        let right = self.right.handle_state(cx);
343
344        Box::pin(async move {
345            left.await?;
346            right.await
347        })
348    }
349}
350
351// === (T1, ..., Tn) aka `join` ===
352
353macro_rules! impl_subscriber_tuple {
354    ($join_fn:path, |$e:ident| $err_pat:pat, { $($n:tt: $var:ident = $ty:ident),*$(,)? }) => {
355        impl<$($ty),*> BlockSubscriber for ($($ty),*)
356        where
357            $($ty: BlockSubscriber),*
358        {
359            type Prepared = ($($ty::Prepared),*);
360
361            type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
362            type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
363
364            fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
365                $(let $var = self.$n.prepare_block(cx));*;
366
367                Box::pin(async move {
368                    match $join_fn($($var),*).await {
369                        ($(Ok($var)),*) => Ok(($($var),*)),
370                        $err_pat => Err($e),
371                    }
372                })
373            }
374
375            fn handle_block<'a>(
376                &'a self,
377                cx: &'a BlockSubscriberContext,
378                ($($var),*): Self::Prepared,
379            ) -> Self::HandleBlockFut<'a> {
380                $(let $var = self.$n.handle_block(cx, $var));*;
381
382                Box::pin(async move {
383                    match $join_fn($($var),*).await {
384                        $err_pat => Err($e),
385                        _ => Ok(()),
386                    }
387                })
388            }
389        }
390
391        impl<$($ty),*> StateSubscriber for ($($ty),*)
392        where
393            $($ty: StateSubscriber),*
394        {
395            type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
396
397            fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
398                $(let $var = self.$n.handle_state(cx));*;
399
400                Box::pin(async move {
401                    match $join_fn($($var),*).await {
402                        $err_pat => Err($e),
403                        _ => Ok(()),
404                    }
405                })
406            }
407        }
408    };
409}
410
411impl_subscriber_tuple! {
412    futures_util::future::join,
413    |e| (Err(e), _) | (_, Err(e)),
414    {
415        0: a = T0,
416        1: b = T1,
417    }
418}
419
420impl_subscriber_tuple! {
421    futures_util::future::join3,
422    |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
423    {
424        0: a = T0,
425        1: b = T1,
426        2: c = T2,
427    }
428}
429
430impl_subscriber_tuple! {
431    futures_util::future::join4,
432    |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
433    {
434        0: a = T0,
435        1: b = T1,
436        2: c = T2,
437        3: d = T3,
438    }
439}
440
441impl_subscriber_tuple! {
442    futures_util::future::join5,
443    |e|
444        (Err(e), _, _, _, _)
445        | (_, Err(e), _, _, _)
446        | (_, _, Err(e), _, _)
447        | (_, _, _, Err(e), _)
448        | (_, _, _, _, Err(e)),
449    {
450        0: a = T0,
451        1: b = T1,
452        2: c = T2,
453        3: d = T3,
454        4: e = T4,
455    }
456}
457
458#[cfg(any(test, feature = "test"))]
459pub mod test {
460    use super::*;
461
462    #[derive(Default, Debug, Clone, Copy)]
463    pub struct PrintSubscriber;
464
465    impl BlockSubscriber for PrintSubscriber {
466        type Prepared = ();
467
468        type PrepareBlockFut<'a> = future::Ready<Result<()>>;
469        type HandleBlockFut<'a> = future::Ready<Result<()>>;
470
471        fn prepare_block<'a>(
472            &'a self,
473            cx: &'a BlockSubscriberContext,
474        ) -> Self::PrepareBlockFut<'a> {
475            tracing::info!(
476                block_id = %cx.block.id(),
477                mc_block_id = %cx.mc_block_id,
478                "preparing block"
479            );
480            future::ready(Ok(()))
481        }
482
483        fn handle_block(
484            &self,
485            cx: &BlockSubscriberContext,
486            _: Self::Prepared,
487        ) -> Self::HandleBlockFut<'_> {
488            tracing::info!(
489                block_id = %cx.block.id(),
490                mc_block_id = %cx.mc_block_id,
491                "handling block"
492            );
493            future::ready(Ok(()))
494        }
495    }
496
497    impl StateSubscriber for PrintSubscriber {
498        type HandleStateFut<'a> = future::Ready<anyhow::Result<()>>;
499
500        fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
501            tracing::info!(
502                block_id = %cx.block.id(),
503                mc_block_id = %cx.mc_block_id,
504                "handling state"
505            );
506            future::ready(Ok(()))
507        }
508    }
509}