1use std::future::Future;
2use std::sync::Arc;
3
4use anyhow::Result;
5use futures_util::future::{self, BoxFuture};
6use tycho_block_util::archive::ArchiveData;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_types::models::*;
10
11pub use self::futures::{
12 DelayedTasks, DelayedTasksJoinHandle, DelayedTasksSpawner, OptionHandleFut, OptionPrepareFut,
13};
14pub use self::metrics_subscriber::MetricsSubscriber;
15use crate::storage::CoreStorage;
16
17mod futures;
18mod metrics_subscriber;
19
20#[derive(Clone)]
23pub struct BlockSubscriberContext {
24 pub mc_block_id: BlockId,
27 pub mc_is_key_block: bool,
30 pub is_key_block: bool,
32 pub block: BlockStuff,
34 pub archive_data: ArchiveData,
36 pub delayed: DelayedTasks,
38}
39
40pub trait BlockSubscriber: Send + Sync + 'static {
41 type Prepared: Send;
42
43 type PrepareBlockFut<'a>: Future<Output = Result<Self::Prepared>> + Send + 'a;
44 type HandleBlockFut<'a>: Future<Output = Result<()>> + Send + 'a;
45
46 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a>;
47
48 fn handle_block<'a>(
49 &'a self,
50 cx: &'a BlockSubscriberContext,
51 prepared: Self::Prepared,
52 ) -> Self::HandleBlockFut<'a>;
53}
54
55impl<T: BlockSubscriber> BlockSubscriber for Option<T> {
56 type Prepared = Option<T::Prepared>;
57
58 type PrepareBlockFut<'a> = OptionPrepareFut<T::PrepareBlockFut<'a>>;
59 type HandleBlockFut<'a> = OptionHandleFut<T::HandleBlockFut<'a>>;
60
61 #[inline]
62 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
63 OptionPrepareFut::from(self.as_ref().map(|s| s.prepare_block(cx)))
64 }
65
66 fn handle_block<'a>(
67 &'a self,
68 cx: &'a BlockSubscriberContext,
69 prepared: Self::Prepared,
70 ) -> Self::HandleBlockFut<'a> {
71 OptionHandleFut::from(match (self, prepared) {
72 (Some(subscriber), Some(prepared)) => Some(subscriber.handle_block(cx, prepared)),
73 _ => None,
74 })
75 }
76}
77
78impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
79 type Prepared = T::Prepared;
80
81 type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
82 type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
83
84 #[inline]
85 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
86 <T as BlockSubscriber>::prepare_block(self, cx)
87 }
88
89 #[inline]
90 fn handle_block<'a>(
91 &'a self,
92 cx: &'a BlockSubscriberContext,
93 prepared: Self::Prepared,
94 ) -> Self::HandleBlockFut<'a> {
95 <T as BlockSubscriber>::handle_block(self, cx, prepared)
96 }
97}
98
99impl<T: BlockSubscriber> BlockSubscriber for Arc<T> {
100 type Prepared = T::Prepared;
101
102 type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
103 type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
104
105 #[inline]
106 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
107 <T as BlockSubscriber>::prepare_block(self, cx)
108 }
109
110 #[inline]
111 fn handle_block<'a>(
112 &'a self,
113 cx: &'a BlockSubscriberContext,
114 prepared: Self::Prepared,
115 ) -> Self::HandleBlockFut<'a> {
116 <T as BlockSubscriber>::handle_block(self, cx, prepared)
117 }
118}
119
120pub trait BlockSubscriberExt: Sized {
121 fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
122}
123
124impl<B: BlockSubscriber> BlockSubscriberExt for B {
125 fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
126 ChainSubscriber {
127 left: self,
128 right: other,
129 }
130 }
131}
132
133pub struct StateSubscriberContext {
136 pub mc_block_id: BlockId,
139 pub mc_is_key_block: bool,
142 pub is_key_block: bool,
144 pub block: BlockStuff,
146 pub archive_data: ArchiveData,
148 pub state: ShardStateStuff,
150 pub delayed: DelayedTasks,
152}
153
154pub trait StateSubscriber: Send + Sync + 'static {
155 type HandleStateFut<'a>: Future<Output = Result<()>> + Send + 'a;
156
157 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a>;
158}
159
160impl<T: StateSubscriber> StateSubscriber for Option<T> {
161 type HandleStateFut<'a> = OptionHandleFut<T::HandleStateFut<'a>>;
162
163 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
164 OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_state(cx)))
165 }
166}
167
168impl<T: StateSubscriber> StateSubscriber for Box<T> {
169 type HandleStateFut<'a> = T::HandleStateFut<'a>;
170
171 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
172 <T as StateSubscriber>::handle_state(self, cx)
173 }
174}
175
176impl<T: StateSubscriber> StateSubscriber for Arc<T> {
177 type HandleStateFut<'a> = T::HandleStateFut<'a>;
178
179 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
180 <T as StateSubscriber>::handle_state(self, cx)
181 }
182}
183
184pub trait StateSubscriberExt: Sized {
185 fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
186}
187
188impl<B: StateSubscriber> StateSubscriberExt for B {
189 fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
190 ChainSubscriber {
191 left: self,
192 right: other,
193 }
194 }
195}
196
197pub struct ArchiveSubscriberContext<'a> {
200 pub archive_id: u32,
201 pub storage: &'a CoreStorage,
202}
203
204pub trait ArchiveSubscriber: Send + Sync + 'static {
205 type HandleArchiveFut<'a>: Future<Output = Result<()>> + Send + 'a;
206
207 fn handle_archive<'a>(
208 &'a self,
209 cx: &'a ArchiveSubscriberContext<'_>,
210 ) -> Self::HandleArchiveFut<'a>;
211}
212
213impl<T: ArchiveSubscriber> ArchiveSubscriber for Option<T> {
214 type HandleArchiveFut<'a> = OptionHandleFut<T::HandleArchiveFut<'a>>;
215
216 fn handle_archive<'a>(
217 &'a self,
218 cx: &'a ArchiveSubscriberContext<'_>,
219 ) -> Self::HandleArchiveFut<'a> {
220 OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_archive(cx)))
221 }
222}
223
224impl<T: ArchiveSubscriber> ArchiveSubscriber for Box<T> {
225 type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
226
227 fn handle_archive<'a>(
228 &'a self,
229 cx: &'a ArchiveSubscriberContext<'_>,
230 ) -> Self::HandleArchiveFut<'a> {
231 <T as ArchiveSubscriber>::handle_archive(self, cx)
232 }
233}
234
235impl<T: ArchiveSubscriber> ArchiveSubscriber for Arc<T> {
236 type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
237
238 fn handle_archive<'a>(
239 &'a self,
240 cx: &'a ArchiveSubscriberContext<'_>,
241 ) -> Self::HandleArchiveFut<'a> {
242 <T as ArchiveSubscriber>::handle_archive(self, cx)
243 }
244}
245
246pub trait ArchiveSubscriberExt: Sized {
247 fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
248}
249
250impl<B: ArchiveSubscriber> ArchiveSubscriberExt for B {
251 fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
252 ChainSubscriber {
253 left: self,
254 right: other,
255 }
256 }
257}
258
259#[derive(Default, Debug, Clone, Copy)]
262pub struct NoopSubscriber;
263
264impl BlockSubscriber for NoopSubscriber {
265 type Prepared = ();
266
267 type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
268 type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
269
270 #[inline]
271 fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
272 futures_util::future::ready(Ok(()))
273 }
274
275 #[inline]
276 fn handle_block(
277 &self,
278 _cx: &BlockSubscriberContext,
279 _: Self::Prepared,
280 ) -> Self::HandleBlockFut<'_> {
281 futures_util::future::ready(Ok(()))
282 }
283}
284
285impl StateSubscriber for NoopSubscriber {
286 type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
287
288 fn handle_state(&self, _cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
289 futures_util::future::ready(Ok(()))
290 }
291}
292
293pub struct ChainSubscriber<T1, T2> {
296 left: T1,
297 right: T2,
298}
299
300impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for ChainSubscriber<T1, T2> {
301 type Prepared = (T1::Prepared, T2::Prepared);
302
303 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
304 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
305
306 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
307 let left = self.left.prepare_block(cx);
308 let right = self.right.prepare_block(cx);
309
310 Box::pin(async move {
311 match future::join(left, right).await {
312 (Ok(l), Ok(r)) => Ok((l, r)),
313 (Err(e), _) | (_, Err(e)) => Err(e),
314 }
315 })
316 }
317
318 fn handle_block<'a>(
319 &'a self,
320 cx: &'a BlockSubscriberContext,
321 (left_prepared, right_prepared): Self::Prepared,
322 ) -> Self::HandleBlockFut<'a> {
323 let left = self.left.handle_block(cx, left_prepared);
324 let right = self.right.handle_block(cx, right_prepared);
325
326 Box::pin(async move {
327 left.await?;
328 right.await
329 })
330 }
331}
332
333impl<T1: StateSubscriber, T2: StateSubscriber> StateSubscriber for ChainSubscriber<T1, T2> {
334 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
335
336 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
337 let left = self.left.handle_state(cx);
338 let right = self.right.handle_state(cx);
339
340 Box::pin(async move {
341 left.await?;
342 right.await
343 })
344 }
345}
346
347macro_rules! impl_subscriber_tuple {
350 ($join_fn:path, |$e:ident| $err_pat:pat, { $($n:tt: $var:ident = $ty:ident),*$(,)? }) => {
351 impl<$($ty),*> BlockSubscriber for ($($ty),*)
352 where
353 $($ty: BlockSubscriber),*
354 {
355 type Prepared = ($($ty::Prepared),*);
356
357 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
358 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
359
360 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
361 $(let $var = self.$n.prepare_block(cx));*;
362
363 Box::pin(async move {
364 match $join_fn($($var),*).await {
365 ($(Ok($var)),*) => Ok(($($var),*)),
366 $err_pat => Err($e),
367 }
368 })
369 }
370
371 fn handle_block<'a>(
372 &'a self,
373 cx: &'a BlockSubscriberContext,
374 ($($var),*): Self::Prepared,
375 ) -> Self::HandleBlockFut<'a> {
376 $(let $var = self.$n.handle_block(cx, $var));*;
377
378 Box::pin(async move {
379 match $join_fn($($var),*).await {
380 $err_pat => Err($e),
381 _ => Ok(()),
382 }
383 })
384 }
385 }
386
387 impl<$($ty),*> StateSubscriber for ($($ty),*)
388 where
389 $($ty: StateSubscriber),*
390 {
391 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
392
393 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
394 $(let $var = self.$n.handle_state(cx));*;
395
396 Box::pin(async move {
397 match $join_fn($($var),*).await {
398 $err_pat => Err($e),
399 _ => Ok(()),
400 }
401 })
402 }
403 }
404 };
405}
406
407impl_subscriber_tuple! {
408 futures_util::future::join,
409 |e| (Err(e), _) | (_, Err(e)),
410 {
411 0: a = T0,
412 1: b = T1,
413 }
414}
415
416impl_subscriber_tuple! {
417 futures_util::future::join3,
418 |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
419 {
420 0: a = T0,
421 1: b = T1,
422 2: c = T2,
423 }
424}
425
426impl_subscriber_tuple! {
427 futures_util::future::join4,
428 |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
429 {
430 0: a = T0,
431 1: b = T1,
432 2: c = T2,
433 3: d = T3,
434 }
435}
436
437impl_subscriber_tuple! {
438 futures_util::future::join5,
439 |e|
440 (Err(e), _, _, _, _)
441 | (_, Err(e), _, _, _)
442 | (_, _, Err(e), _, _)
443 | (_, _, _, Err(e), _)
444 | (_, _, _, _, Err(e)),
445 {
446 0: a = T0,
447 1: b = T1,
448 2: c = T2,
449 3: d = T3,
450 4: e = T4,
451 }
452}
453
454#[cfg(any(test, feature = "test"))]
455pub mod test {
456 use super::*;
457
458 #[derive(Default, Debug, Clone, Copy)]
459 pub struct PrintSubscriber;
460
461 impl BlockSubscriber for PrintSubscriber {
462 type Prepared = ();
463
464 type PrepareBlockFut<'a> = future::Ready<Result<()>>;
465 type HandleBlockFut<'a> = future::Ready<Result<()>>;
466
467 fn prepare_block<'a>(
468 &'a self,
469 cx: &'a BlockSubscriberContext,
470 ) -> Self::PrepareBlockFut<'a> {
471 tracing::info!(
472 block_id = %cx.block.id(),
473 mc_block_id = %cx.mc_block_id,
474 "preparing block"
475 );
476 future::ready(Ok(()))
477 }
478
479 fn handle_block(
480 &self,
481 cx: &BlockSubscriberContext,
482 _: Self::Prepared,
483 ) -> Self::HandleBlockFut<'_> {
484 tracing::info!(
485 block_id = %cx.block.id(),
486 mc_block_id = %cx.mc_block_id,
487 "handling block"
488 );
489 future::ready(Ok(()))
490 }
491 }
492
493 impl StateSubscriber for PrintSubscriber {
494 type HandleStateFut<'a> = future::Ready<anyhow::Result<()>>;
495
496 fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
497 tracing::info!(
498 block_id = %cx.block.id(),
499 mc_block_id = %cx.mc_block_id,
500 "handling state"
501 );
502 future::ready(Ok(()))
503 }
504 }
505}