1use std::future::Future;
2use std::sync::Arc;
3
4use anyhow::Result;
5use futures_util::future::{self, BoxFuture};
6use tycho_block_util::archive::ArchiveData;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_types::models::*;
10
11pub use self::futures::{
12 DelayedTasks, DelayedTasksJoinHandle, DelayedTasksSpawner, OptionHandleFut, OptionPrepareFut,
13};
14pub use self::gc_subscriber::{GcSubscriber, ManualGcTrigger};
15pub use self::metrics_subscriber::MetricsSubscriber;
16pub use self::ps_subscriber::PsSubscriber;
17use crate::storage::CoreStorage;
18
19mod futures;
20mod gc_subscriber;
21mod metrics_subscriber;
22mod ps_subscriber;
23
24#[derive(Clone)]
27pub struct BlockSubscriberContext {
28 pub mc_block_id: BlockId,
31 pub mc_is_key_block: bool,
34 pub is_key_block: bool,
36 pub block: BlockStuff,
38 pub archive_data: ArchiveData,
40 pub delayed: DelayedTasks,
42}
43
44pub trait BlockSubscriber: Send + Sync + 'static {
45 type Prepared: Send;
46
47 type PrepareBlockFut<'a>: Future<Output = Result<Self::Prepared>> + Send + 'a;
48 type HandleBlockFut<'a>: Future<Output = Result<()>> + Send + 'a;
49
50 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a>;
51
52 fn handle_block<'a>(
53 &'a self,
54 cx: &'a BlockSubscriberContext,
55 prepared: Self::Prepared,
56 ) -> Self::HandleBlockFut<'a>;
57}
58
59impl<T: BlockSubscriber> BlockSubscriber for Option<T> {
60 type Prepared = Option<T::Prepared>;
61
62 type PrepareBlockFut<'a> = OptionPrepareFut<T::PrepareBlockFut<'a>>;
63 type HandleBlockFut<'a> = OptionHandleFut<T::HandleBlockFut<'a>>;
64
65 #[inline]
66 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
67 OptionPrepareFut::from(self.as_ref().map(|s| s.prepare_block(cx)))
68 }
69
70 fn handle_block<'a>(
71 &'a self,
72 cx: &'a BlockSubscriberContext,
73 prepared: Self::Prepared,
74 ) -> Self::HandleBlockFut<'a> {
75 OptionHandleFut::from(match (self, prepared) {
76 (Some(subscriber), Some(prepared)) => Some(subscriber.handle_block(cx, prepared)),
77 _ => None,
78 })
79 }
80}
81
82impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
83 type Prepared = T::Prepared;
84
85 type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
86 type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
87
88 #[inline]
89 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
90 <T as BlockSubscriber>::prepare_block(self, cx)
91 }
92
93 #[inline]
94 fn handle_block<'a>(
95 &'a self,
96 cx: &'a BlockSubscriberContext,
97 prepared: Self::Prepared,
98 ) -> Self::HandleBlockFut<'a> {
99 <T as BlockSubscriber>::handle_block(self, cx, prepared)
100 }
101}
102
103impl<T: BlockSubscriber> BlockSubscriber for Arc<T> {
104 type Prepared = T::Prepared;
105
106 type PrepareBlockFut<'a> = T::PrepareBlockFut<'a>;
107 type HandleBlockFut<'a> = T::HandleBlockFut<'a>;
108
109 #[inline]
110 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
111 <T as BlockSubscriber>::prepare_block(self, cx)
112 }
113
114 #[inline]
115 fn handle_block<'a>(
116 &'a self,
117 cx: &'a BlockSubscriberContext,
118 prepared: Self::Prepared,
119 ) -> Self::HandleBlockFut<'a> {
120 <T as BlockSubscriber>::handle_block(self, cx, prepared)
121 }
122}
123
124pub trait BlockSubscriberExt: Sized {
125 fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
126}
127
128impl<B: BlockSubscriber> BlockSubscriberExt for B {
129 fn chain<T: BlockSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
130 ChainSubscriber {
131 left: self,
132 right: other,
133 }
134 }
135}
136
137pub struct StateSubscriberContext {
140 pub mc_block_id: BlockId,
143 pub mc_is_key_block: bool,
146 pub is_key_block: bool,
148 pub block: BlockStuff,
150 pub archive_data: ArchiveData,
152 pub state: ShardStateStuff,
154 pub delayed: DelayedTasks,
156}
157
158pub trait StateSubscriber: Send + Sync + 'static {
159 type HandleStateFut<'a>: Future<Output = Result<()>> + Send + 'a;
160
161 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a>;
162}
163
164impl<T: StateSubscriber> StateSubscriber for Option<T> {
165 type HandleStateFut<'a> = OptionHandleFut<T::HandleStateFut<'a>>;
166
167 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
168 OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_state(cx)))
169 }
170}
171
172impl<T: StateSubscriber> StateSubscriber for Box<T> {
173 type HandleStateFut<'a> = T::HandleStateFut<'a>;
174
175 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
176 <T as StateSubscriber>::handle_state(self, cx)
177 }
178}
179
180impl<T: StateSubscriber> StateSubscriber for Arc<T> {
181 type HandleStateFut<'a> = T::HandleStateFut<'a>;
182
183 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
184 <T as StateSubscriber>::handle_state(self, cx)
185 }
186}
187
188pub trait StateSubscriberExt: Sized {
189 fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
190}
191
192impl<B: StateSubscriber> StateSubscriberExt for B {
193 fn chain<T: StateSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
194 ChainSubscriber {
195 left: self,
196 right: other,
197 }
198 }
199}
200
201pub struct ArchiveSubscriberContext<'a> {
204 pub archive_id: u32,
205 pub storage: &'a CoreStorage,
206}
207
208pub trait ArchiveSubscriber: Send + Sync + 'static {
209 type HandleArchiveFut<'a>: Future<Output = Result<()>> + Send + 'a;
210
211 fn handle_archive<'a>(
212 &'a self,
213 cx: &'a ArchiveSubscriberContext<'_>,
214 ) -> Self::HandleArchiveFut<'a>;
215}
216
217impl<T: ArchiveSubscriber> ArchiveSubscriber for Option<T> {
218 type HandleArchiveFut<'a> = OptionHandleFut<T::HandleArchiveFut<'a>>;
219
220 fn handle_archive<'a>(
221 &'a self,
222 cx: &'a ArchiveSubscriberContext<'_>,
223 ) -> Self::HandleArchiveFut<'a> {
224 OptionHandleFut::<_>::from(self.as_ref().map(|s| s.handle_archive(cx)))
225 }
226}
227
228impl<T: ArchiveSubscriber> ArchiveSubscriber for Box<T> {
229 type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
230
231 fn handle_archive<'a>(
232 &'a self,
233 cx: &'a ArchiveSubscriberContext<'_>,
234 ) -> Self::HandleArchiveFut<'a> {
235 <T as ArchiveSubscriber>::handle_archive(self, cx)
236 }
237}
238
239impl<T: ArchiveSubscriber> ArchiveSubscriber for Arc<T> {
240 type HandleArchiveFut<'a> = T::HandleArchiveFut<'a>;
241
242 fn handle_archive<'a>(
243 &'a self,
244 cx: &'a ArchiveSubscriberContext<'_>,
245 ) -> Self::HandleArchiveFut<'a> {
246 <T as ArchiveSubscriber>::handle_archive(self, cx)
247 }
248}
249
250pub trait ArchiveSubscriberExt: Sized {
251 fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T>;
252}
253
254impl<B: ArchiveSubscriber> ArchiveSubscriberExt for B {
255 fn chain<T: ArchiveSubscriber>(self, other: T) -> ChainSubscriber<Self, T> {
256 ChainSubscriber {
257 left: self,
258 right: other,
259 }
260 }
261}
262
263#[derive(Default, Debug, Clone, Copy)]
266pub struct NoopSubscriber;
267
268impl BlockSubscriber for NoopSubscriber {
269 type Prepared = ();
270
271 type PrepareBlockFut<'a> = futures_util::future::Ready<Result<()>>;
272 type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
273
274 #[inline]
275 fn prepare_block<'a>(&'a self, _cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
276 futures_util::future::ready(Ok(()))
277 }
278
279 #[inline]
280 fn handle_block(
281 &self,
282 _cx: &BlockSubscriberContext,
283 _: Self::Prepared,
284 ) -> Self::HandleBlockFut<'_> {
285 futures_util::future::ready(Ok(()))
286 }
287}
288
289impl StateSubscriber for NoopSubscriber {
290 type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
291
292 fn handle_state(&self, _cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
293 futures_util::future::ready(Ok(()))
294 }
295}
296
297pub struct ChainSubscriber<T1, T2> {
300 left: T1,
301 right: T2,
302}
303
304impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for ChainSubscriber<T1, T2> {
305 type Prepared = (T1::Prepared, T2::Prepared);
306
307 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
308 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
309
310 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
311 let left = self.left.prepare_block(cx);
312 let right = self.right.prepare_block(cx);
313
314 Box::pin(async move {
315 match future::join(left, right).await {
316 (Ok(l), Ok(r)) => Ok((l, r)),
317 (Err(e), _) | (_, Err(e)) => Err(e),
318 }
319 })
320 }
321
322 fn handle_block<'a>(
323 &'a self,
324 cx: &'a BlockSubscriberContext,
325 (left_prepared, right_prepared): Self::Prepared,
326 ) -> Self::HandleBlockFut<'a> {
327 let left = self.left.handle_block(cx, left_prepared);
328 let right = self.right.handle_block(cx, right_prepared);
329
330 Box::pin(async move {
331 left.await?;
332 right.await
333 })
334 }
335}
336
337impl<T1: StateSubscriber, T2: StateSubscriber> StateSubscriber for ChainSubscriber<T1, T2> {
338 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
339
340 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
341 let left = self.left.handle_state(cx);
342 let right = self.right.handle_state(cx);
343
344 Box::pin(async move {
345 left.await?;
346 right.await
347 })
348 }
349}
350
351macro_rules! impl_subscriber_tuple {
354 ($join_fn:path, |$e:ident| $err_pat:pat, { $($n:tt: $var:ident = $ty:ident),*$(,)? }) => {
355 impl<$($ty),*> BlockSubscriber for ($($ty),*)
356 where
357 $($ty: BlockSubscriber),*
358 {
359 type Prepared = ($($ty::Prepared),*);
360
361 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
362 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
363
364 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
365 $(let $var = self.$n.prepare_block(cx));*;
366
367 Box::pin(async move {
368 match $join_fn($($var),*).await {
369 ($(Ok($var)),*) => Ok(($($var),*)),
370 $err_pat => Err($e),
371 }
372 })
373 }
374
375 fn handle_block<'a>(
376 &'a self,
377 cx: &'a BlockSubscriberContext,
378 ($($var),*): Self::Prepared,
379 ) -> Self::HandleBlockFut<'a> {
380 $(let $var = self.$n.handle_block(cx, $var));*;
381
382 Box::pin(async move {
383 match $join_fn($($var),*).await {
384 $err_pat => Err($e),
385 _ => Ok(()),
386 }
387 })
388 }
389 }
390
391 impl<$($ty),*> StateSubscriber for ($($ty),*)
392 where
393 $($ty: StateSubscriber),*
394 {
395 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
396
397 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
398 $(let $var = self.$n.handle_state(cx));*;
399
400 Box::pin(async move {
401 match $join_fn($($var),*).await {
402 $err_pat => Err($e),
403 _ => Ok(()),
404 }
405 })
406 }
407 }
408 };
409}
410
411impl_subscriber_tuple! {
412 futures_util::future::join,
413 |e| (Err(e), _) | (_, Err(e)),
414 {
415 0: a = T0,
416 1: b = T1,
417 }
418}
419
420impl_subscriber_tuple! {
421 futures_util::future::join3,
422 |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
423 {
424 0: a = T0,
425 1: b = T1,
426 2: c = T2,
427 }
428}
429
430impl_subscriber_tuple! {
431 futures_util::future::join4,
432 |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
433 {
434 0: a = T0,
435 1: b = T1,
436 2: c = T2,
437 3: d = T3,
438 }
439}
440
441impl_subscriber_tuple! {
442 futures_util::future::join5,
443 |e|
444 (Err(e), _, _, _, _)
445 | (_, Err(e), _, _, _)
446 | (_, _, Err(e), _, _)
447 | (_, _, _, Err(e), _)
448 | (_, _, _, _, Err(e)),
449 {
450 0: a = T0,
451 1: b = T1,
452 2: c = T2,
453 3: d = T3,
454 4: e = T4,
455 }
456}
457
458#[cfg(any(test, feature = "test"))]
459pub mod test {
460 use super::*;
461
462 #[derive(Default, Debug, Clone, Copy)]
463 pub struct PrintSubscriber;
464
465 impl BlockSubscriber for PrintSubscriber {
466 type Prepared = ();
467
468 type PrepareBlockFut<'a> = future::Ready<Result<()>>;
469 type HandleBlockFut<'a> = future::Ready<Result<()>>;
470
471 fn prepare_block<'a>(
472 &'a self,
473 cx: &'a BlockSubscriberContext,
474 ) -> Self::PrepareBlockFut<'a> {
475 tracing::info!(
476 block_id = %cx.block.id(),
477 mc_block_id = %cx.mc_block_id,
478 "preparing block"
479 );
480 future::ready(Ok(()))
481 }
482
483 fn handle_block(
484 &self,
485 cx: &BlockSubscriberContext,
486 _: Self::Prepared,
487 ) -> Self::HandleBlockFut<'_> {
488 tracing::info!(
489 block_id = %cx.block.id(),
490 mc_block_id = %cx.mc_block_id,
491 "handling block"
492 );
493 future::ready(Ok(()))
494 }
495 }
496
497 impl StateSubscriber for PrintSubscriber {
498 type HandleStateFut<'a> = future::Ready<anyhow::Result<()>>;
499
500 fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
501 tracing::info!(
502 block_id = %cx.block.id(),
503 mc_block_id = %cx.mc_block_id,
504 "handling state"
505 );
506 future::ready(Ok(()))
507 }
508 }
509}