1use std::future::Future;
2use std::sync::Arc;
3
4use anyhow::Result;
5use futures_util::future::{self, BoxFuture};
6use tycho_block_util::archive::ArchiveData;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_types::models::*;
10
11pub use self::futures::{
12 DelayedTasks, DelayedTasksJoinHandle, DelayedTasksSpawner, OptionHandleFut, OptionPrepareFut,
13};
14pub use self::metrics_subscriber::MetricsSubscriber;
15use crate::storage::CoreStorage;
16
17mod futures;
18mod metrics_subscriber;
19
20#[derive(Clone)]
23pub struct BlockSubscriberContext {
24 pub mc_block_id: BlockId,
27 pub mc_is_key_block: bool,
30 pub is_key_block: bool,
32 pub is_top_block: bool,
34 pub block: BlockStuff,
36 pub archive_data: ArchiveData,
38 pub delayed: DelayedTasks,
40}
41
42pub trait BlockSubscriber: Send + Sync + 'static {
43 type Prepared: Send;
44
45 type PrepareBlockFut<'a>: Future<Output = Result<Self::Prepared>> + Send + 'a;
46 type HandleBlockFut<'a>: Future<Output = Result<()>> + Send + 'a;
47
48 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a>;
49
50 fn handle_block<'a>(
51 &'a self,
52 cx: &'a BlockSubscriberContext,
53 prepared: Self::Prepared,
54 ) -> Self::HandleBlockFut<'a>;
55}
56
57impl<T: BlockSubscriber> BlockSubscriber for Option<T> {
58 type Prepared = Option<T::Prepared>;
59
60 type PrepareBlockFut<'a> = OptionPrepareFut<T::PrepareBlockFut<'a>>;
61 type HandleBlockFut<'a> = OptionHandleFut<T::HandleBlockFut<'a>>;
62
63 #[inline]
64 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
65 OptionPrepareFut::from(self.as_ref().map(|s| s.prepare_block(cx)))
66 }
67
68 fn handle_block<'a>(
69 &'a self,
70 cx: &'a BlockSubscriberContext,
71 prepared: Self::Prepared,
72 ) -> Self::HandleBlockFut<'a> {
73 OptionHandleFut::from(match (self, prepared) {
74 (Some(subscriber), Some(prepared)) => Some(subscriber.handle_block(cx, prepared)),
75 _ => None,
76 })
77 }
78}
79
80impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
81 type Prepared = T::Prepared;
82
83 type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
84 type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
85
86 #[inline]
87 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
88 <T as BlockSubscriber>::prepare_block(self, cx)
89 }
90
91 #[inline]
92 fn handle_block<'a>(
93 &'a self,
94 cx: &'a BlockSubscriberContext,
95 prepared: Self::Prepared,
96 ) -> Self::HandleBlockFut<'a> {
97 <T as BlockSubscriber>::handle_block(self, cx, prepared)
98 }
99}
100
101impl<T: BlockSubscriber> BlockSubscriber for Arc<T> {
102 type Prepared = T::Prepared;
103
104 type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
105 type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
106
107 #[inline]
108 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
109 <T as BlockSubscriber>::prepare_block(self, cx)
110 }
111
112 #[inline]
113 fn handle_block<'a>(
114 &'a self,
115 cx: &'a BlockSubscriberContext,
116 prepared: Self::Prepared,
117 ) -> Self::HandleBlockFut<'a> {
118 <T as BlockSubscriber>::handle_block(self, cx, prepared)
119 }
120}
121
122pub trait BlockSubscriberExt: Sized {
123 fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
124}
125
126impl<B: BlockSubscriber> BlockSubscriberExt for B {
127 fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
128 ChainSubscriber {
129 left: self,
130 right: other,
131 }
132 }
133}
134
135pub struct StateSubscriberContext {
138 pub mc_block_id: BlockId,
141 pub mc_is_key_block: bool,
144 pub is_key_block: bool,
146 pub block: BlockStuff,
148 pub archive_data: ArchiveData,
150 pub state: ShardStateStuff,
152 pub delayed: DelayedTasks,
154}
155
156pub trait StateSubscriber: Send + Sync + 'static {
157 type HandleStateFut<'a>: Future<Output = Result<()>> + Send + 'a;
158
159 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a>;
160}
161
162impl<T: StateSubscriber> StateSubscriber for Option<T> {
163 type HandleStateFut<'a> = OptionHandleFut<T::HandleStateFut<'a>>;
164
165 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
166 OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_state(cx)))
167 }
168}
169
170impl<T: StateSubscriber> StateSubscriber for Box<T> {
171 type HandleStateFut<'a> = T::HandleStateFut<'a>;
172
173 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
174 <T as StateSubscriber>::handle_state(self, cx)
175 }
176}
177
178impl<T: StateSubscriber> StateSubscriber for Arc<T> {
179 type HandleStateFut<'a> = T::HandleStateFut<'a>;
180
181 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
182 <T as StateSubscriber>::handle_state(self, cx)
183 }
184}
185
186pub trait StateSubscriberExt: Sized {
187 fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
188}
189
190impl<B: StateSubscriber> StateSubscriberExt for B {
191 fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
192 ChainSubscriber {
193 left: self,
194 right: other,
195 }
196 }
197}
198
199pub struct ArchiveSubscriberContext<'a> {
202 pub archive_id: u32,
203 pub storage: &'a CoreStorage,
204}
205
206pub trait ArchiveSubscriber: Send + Sync + 'static {
207 type HandleArchiveFut<'a>: Future<Output = Result<()>> + Send + 'a;
208
209 fn handle_archive<'a>(
210 &'a self,
211 cx: &'a ArchiveSubscriberContext<'_>,
212 ) -> Self::HandleArchiveFut<'a>;
213}
214
215impl<T: ArchiveSubscriber> ArchiveSubscriber for Option<T> {
216 type HandleArchiveFut<'a> = OptionHandleFut<T::HandleArchiveFut<'a>>;
217
218 fn handle_archive<'a>(
219 &'a self,
220 cx: &'a ArchiveSubscriberContext<'_>,
221 ) -> Self::HandleArchiveFut<'a> {
222 OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_archive(cx)))
223 }
224}
225
226impl<T: ArchiveSubscriber> ArchiveSubscriber for Box<T> {
227 type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
228
229 fn handle_archive<'a>(
230 &'a self,
231 cx: &'a ArchiveSubscriberContext<'_>,
232 ) -> Self::HandleArchiveFut<'a> {
233 <T as ArchiveSubscriber>::handle_archive(self, cx)
234 }
235}
236
237impl<T: ArchiveSubscriber> ArchiveSubscriber for Arc<T> {
238 type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
239
240 fn handle_archive<'a>(
241 &'a self,
242 cx: &'a ArchiveSubscriberContext<'_>,
243 ) -> Self::HandleArchiveFut<'a> {
244 <T as ArchiveSubscriber>::handle_archive(self, cx)
245 }
246}
247
248pub trait ArchiveSubscriberExt: Sized {
249 fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
250}
251
252impl<B: ArchiveSubscriber> ArchiveSubscriberExt for B {
253 fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
254 ChainSubscriber {
255 left: self,
256 right: other,
257 }
258 }
259}
260
261#[derive(Default, Debug, Clone, Copy)]
264pub struct NoopSubscriber;
265
266impl BlockSubscriber for NoopSubscriber {
267 type Prepared = ();
268
269 type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
270 type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
271
272 #[inline]
273 fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
274 futures_util::future::ready(Ok(()))
275 }
276
277 #[inline]
278 fn handle_block(
279 &self,
280 _cx: &BlockSubscriberContext,
281 _: Self::Prepared,
282 ) -> Self::HandleBlockFut<'_> {
283 futures_util::future::ready(Ok(()))
284 }
285}
286
287impl StateSubscriber for NoopSubscriber {
288 type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
289
290 fn handle_state(&self, _cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
291 futures_util::future::ready(Ok(()))
292 }
293}
294
295pub struct ChainSubscriber<T1, T2> {
298 left: T1,
299 right: T2,
300}
301
302impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for ChainSubscriber<T1, T2> {
303 type Prepared = (T1::Prepared, T2::Prepared);
304
305 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
306 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
307
308 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
309 let left = self.left.prepare_block(cx);
310 let right = self.right.prepare_block(cx);
311
312 Box::pin(async move {
313 match future::join(left, right).await {
314 (Ok(l), Ok(r)) => Ok((l, r)),
315 (Err(e), _) | (_, Err(e)) => Err(e),
316 }
317 })
318 }
319
320 fn handle_block<'a>(
321 &'a self,
322 cx: &'a BlockSubscriberContext,
323 (left_prepared, right_prepared): Self::Prepared,
324 ) -> Self::HandleBlockFut<'a> {
325 let left = self.left.handle_block(cx, left_prepared);
326 let right = self.right.handle_block(cx, right_prepared);
327
328 Box::pin(async move {
329 left.await?;
330 right.await
331 })
332 }
333}
334
335impl<T1: StateSubscriber, T2: StateSubscriber> StateSubscriber for ChainSubscriber<T1, T2> {
336 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
337
338 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
339 let left = self.left.handle_state(cx);
340 let right = self.right.handle_state(cx);
341
342 Box::pin(async move {
343 left.await?;
344 right.await
345 })
346 }
347}
348
349macro_rules! impl_subscriber_tuple {
352 ($join_fn:path, |$e:ident| $err_pat:pat, { $($n:tt: $var:ident = $ty:ident),*$(,)? }) => {
353 impl<$($ty),*> BlockSubscriber for ($($ty),*)
354 where
355 $($ty: BlockSubscriber),*
356 {
357 type Prepared = ($($ty::Prepared),*);
358
359 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
360 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
361
362 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
363 $(let $var = self.$n.prepare_block(cx));*;
364
365 Box::pin(async move {
366 match $join_fn($($var),*).await {
367 ($(Ok($var)),*) => Ok(($($var),*)),
368 $err_pat => Err($e),
369 }
370 })
371 }
372
373 fn handle_block<'a>(
374 &'a self,
375 cx: &'a BlockSubscriberContext,
376 ($($var),*): Self::Prepared,
377 ) -> Self::HandleBlockFut<'a> {
378 $(let $var = self.$n.handle_block(cx, $var));*;
379
380 Box::pin(async move {
381 match $join_fn($($var),*).await {
382 $err_pat => Err($e),
383 _ => Ok(()),
384 }
385 })
386 }
387 }
388
389 impl<$($ty),*> StateSubscriber for ($($ty),*)
390 where
391 $($ty: StateSubscriber),*
392 {
393 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
394
395 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
396 $(let $var = self.$n.handle_state(cx));*;
397
398 Box::pin(async move {
399 match $join_fn($($var),*).await {
400 $err_pat => Err($e),
401 _ => Ok(()),
402 }
403 })
404 }
405 }
406 };
407}
408
409impl_subscriber_tuple! {
410 futures_util::future::join,
411 |e| (Err(e), _) | (_, Err(e)),
412 {
413 0: a = T0,
414 1: b = T1,
415 }
416}
417
418impl_subscriber_tuple! {
419 futures_util::future::join3,
420 |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
421 {
422 0: a = T0,
423 1: b = T1,
424 2: c = T2,
425 }
426}
427
428impl_subscriber_tuple! {
429 futures_util::future::join4,
430 |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
431 {
432 0: a = T0,
433 1: b = T1,
434 2: c = T2,
435 3: d = T3,
436 }
437}
438
439impl_subscriber_tuple! {
440 futures_util::future::join5,
441 |e|
442 (Err(e), _, _, _, _)
443 | (_, Err(e), _, _, _)
444 | (_, _, Err(e), _, _)
445 | (_, _, _, Err(e), _)
446 | (_, _, _, _, Err(e)),
447 {
448 0: a = T0,
449 1: b = T1,
450 2: c = T2,
451 3: d = T3,
452 4: e = T4,
453 }
454}
455
456#[cfg(any(test, feature = "test"))]
457pub mod test {
458 use super::*;
459
460 #[derive(Default, Debug, Clone, Copy)]
461 pub struct PrintSubscriber;
462
463 impl BlockSubscriber for PrintSubscriber {
464 type Prepared = ();
465
466 type PrepareBlockFut<'a> = future::Ready<Result<()>>;
467 type HandleBlockFut<'a> = future::Ready<Result<()>>;
468
469 fn prepare_block<'a>(
470 &'a self,
471 cx: &'a BlockSubscriberContext,
472 ) -> Self::PrepareBlockFut<'a> {
473 tracing::info!(
474 block_id = %cx.block.id(),
475 mc_block_id = %cx.mc_block_id,
476 "preparing block"
477 );
478 future::ready(Ok(()))
479 }
480
481 fn handle_block(
482 &self,
483 cx: &BlockSubscriberContext,
484 _: Self::Prepared,
485 ) -> Self::HandleBlockFut<'_> {
486 tracing::info!(
487 block_id = %cx.block.id(),
488 mc_block_id = %cx.mc_block_id,
489 "handling block"
490 );
491 future::ready(Ok(()))
492 }
493 }
494
495 impl StateSubscriber for PrintSubscriber {
496 type HandleStateFut<'a> = future::Ready<anyhow::Result<()>>;
497
498 fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
499 tracing::info!(
500 block_id = %cx.block.id(),
501 mc_block_id = %cx.mc_block_id,
502 "handling state"
503 );
504 future::ready(Ok(()))
505 }
506 }
507}